Elasticsearch分片技术实现

# Elasticsearch分片技术实现

# 概述

Elasticsearch分片(Sharding)是其分布式架构的核心,通过将索引分割成多个分片来实现水平扩展。每个分片都是一个独立的Lucene索引,可以分布在集群的不同节点上。

# Elasticsearch分片架构

# 1. 集群架构设计

# docker-compose.yml - Elasticsearch集群
version: '3.8'
services:
  # Master节点
  es-master-1:
    image: elasticsearch:8.8.0
    container_name: es-master-1
    environment:
      - node.name=es-master-1
      - node.roles=master
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9200:9200"
    volumes:
      - es_master_1_data:/usr/share/elasticsearch/data
    networks:
      - es-network
  
  es-master-2:
    image: elasticsearch:8.8.0
    container_name: es-master-2
    environment:
      - node.name=es-master-2
      - node.roles=master
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-1,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9201:9200"
    volumes:
      - es_master_2_data:/usr/share/elasticsearch/data
    networks:
      - es-network
  
  es-master-3:
    image: elasticsearch:8.8.0
    container_name: es-master-3
    environment:
      - node.name=es-master-3
      - node.roles=master
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-1,es-master-2
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9202:9200"
    volumes:
      - es_master_3_data:/usr/share/elasticsearch/data
    networks:
      - es-network
  
  # 数据节点
  es-data-1:
    image: elasticsearch:8.8.0
    container_name: es-data-1
    environment:
      - node.name=es-data-1
      - node.roles=data,ingest
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
    ports:
      - "9203:9200"
    volumes:
      - es_data_1_data:/usr/share/elasticsearch/data
    networks:
      - es-network
  
  es-data-2:
    image: elasticsearch:8.8.0
    container_name: es-data-2
    environment:
      - node.name=es-data-2
      - node.roles=data,ingest
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
    ports:
      - "9204:9200"
    volumes:
      - es_data_2_data:/usr/share/elasticsearch/data
    networks:
      - es-network
  
  es-data-3:
    image: elasticsearch:8.8.0
    container_name: es-data-3
    environment:
      - node.name=es-data-3
      - node.roles=data,ingest
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
    ports:
      - "9205:9200"
    volumes:
      - es_data_3_data:/usr/share/elasticsearch/data
    networks:
      - es-network
  
  # 协调节点
  es-coord-1:
    image: elasticsearch:8.8.0
    container_name: es-coord-1
    environment:
      - node.name=es-coord-1
      - node.roles=
      - cluster.name=es-cluster
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9206:9200"
    networks:
      - es-network
  
  # Kibana
  kibana:
    image: kibana:8.8.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://es-coord-1:9200
    ports:
      - "5601:5601"
    networks:
      - es-network
    depends_on:
      - es-coord-1

volumes:
  es_master_1_data:
  es_master_2_data:
  es_master_3_data:
  es_data_1_data:
  es_data_2_data:
  es_data_3_data:

networks:
  es-network:
    driver: bridge

# 2. 集群初始化脚本

#!/bin/bash
# elasticsearch-cluster-init.sh

echo "初始化Elasticsearch集群..."

# 等待集群启动
sleep 60

# 检查集群健康状态
echo "检查集群健康状态..."
curl -X GET "es-coord-1:9200/_cluster/health?pretty"

# 创建索引模板
echo "创建索引模板..."
curl -X PUT "es-coord-1:9200/_index_template/logs_template" -H 'Content-Type: application/json' -d'
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.routing.allocation.total_shards_per_node": 2
    },
    "mappings": {
      "properties": {
        "timestamp": {
          "type": "date"
        },
        "level": {
          "type": "keyword"
        },
        "message": {
          "type": "text",
          "analyzer": "standard"
        },
        "service": {
          "type": "keyword"
        },
        "host": {
          "type": "keyword"
        }
      }
    }
  }
}'

# 创建用户数据索引
echo "创建用户数据索引..."
curl -X PUT "es-coord-1:9200/users" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1,
    "index.routing.allocation.total_shards_per_node": 2,
    "index.routing.allocation.awareness.attributes": "zone"
  },
  "mappings": {
    "properties": {
      "user_id": {
        "type": "keyword"
      },
      "username": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "email": {
        "type": "keyword"
      },
      "age": {
        "type": "integer"
      },
      "created_at": {
        "type": "date"
      },
      "location": {
        "type": "geo_point"
      }
    }
  }
}'

echo "Elasticsearch集群初始化完成!"

# Java应用集成

# 1. Spring Boot配置

@Configuration
public class ElasticsearchShardingConfig {
    
    @Value("${elasticsearch.hosts}")
    private String[] hosts;
    
    @Bean
    public ElasticsearchClient elasticsearchClient() {
        HttpHost[] httpHosts = Arrays.stream(hosts)
            .map(host -> {
                String[] parts = host.split(":");
                return new HttpHost(parts[0], Integer.parseInt(parts[1]), "http");
            })
            .toArray(HttpHost[]::new);
        
        RestClientBuilder builder = RestClient.builder(httpHosts)
            .setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder
                    .setConnectTimeout(5000)
                    .setSocketTimeout(60000)
                    .setConnectionRequestTimeout(5000)
            )
            .setHttpClientConfigCallback(httpClientBuilder -> 
                httpClientBuilder
                    .setMaxConnTotal(100)
                    .setMaxConnPerRoute(50)
                    .setKeepAliveStrategy((response, context) -> 30000)
            );
        
        ElasticsearchTransport transport = new RestClientTransport(
            builder.build(), new JacksonJsonpMapper());
        
        return new ElasticsearchClient(transport);
    }
    
    @Bean
    public ElasticsearchOperations elasticsearchOperations() {
        return new ElasticsearchRestTemplate(
            RestClients.create(ClientConfiguration.builder()
                .connectedTo(hosts)
                .withConnectTimeout(Duration.ofSeconds(5))
                .withSocketTimeout(Duration.ofSeconds(60))
                .build()).rest());
    }
}

# 2. 分片管理服务

@Service
public class ElasticsearchShardingService {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;
    
    /**
     * 创建分片索引
     */
    public void createShardedIndex(String indexName, int shards, int replicas) {
        try {
            CreateIndexRequest request = CreateIndexRequest.of(builder -> 
                builder.index(indexName)
                    .settings(settings -> settings
                        .numberOfShards(String.valueOf(shards))
                        .numberOfReplicas(String.valueOf(replicas))
                        .put("index.routing.allocation.total_shards_per_node", "2")
                        .put("index.max_result_window", "50000")
                    )
            );
            
            CreateIndexResponse response = elasticsearchClient.indices().create(request);
            log.info("创建分片索引成功: {}, 分片数: {}, 副本数: {}", 
                indexName, shards, replicas);
            
        } catch (Exception e) {
            log.error("创建分片索引失败: {}", indexName, e);
            throw new RuntimeException("创建索引失败", e);
        }
    }
    
    /**
     * 动态调整分片副本数
     */
    public void updateReplicaCount(String indexName, int replicas) {
        try {
            PutIndicesSettingsRequest request = PutIndicesSettingsRequest.of(builder ->
                builder.index(indexName)
                    .settings(settings -> settings
                        .numberOfReplicas(String.valueOf(replicas))
                    )
            );
            
            PutIndicesSettingsResponse response = elasticsearchClient.indices()
                .putSettings(request);
            
            log.info("更新索引副本数成功: {}, 新副本数: {}", indexName, replicas);
            
        } catch (Exception e) {
            log.error("更新索引副本数失败: {}", indexName, e);
            throw new RuntimeException("更新副本数失败", e);
        }
    }
    
    /**
     * 分片重新分配
     */
    public void reallocateShards(String indexName, String fromNode, String toNode) {
        try {
            // 移动分片
            ClusterRerouteRequest request = ClusterRerouteRequest.of(builder ->
                builder.commands(commands -> commands
                    .move(move -> move
                        .index(indexName)
                        .shard(0)
                        .fromNode(fromNode)
                        .toNode(toNode)
                    )
                )
            );
            
            ClusterRerouteResponse response = elasticsearchClient.cluster().reroute(request);
            log.info("分片重新分配成功: {} 从 {} 移动到 {}", indexName, fromNode, toNode);
            
        } catch (Exception e) {
            log.error("分片重新分配失败", e);
            throw new RuntimeException("分片重新分配失败", e);
        }
    }
    
    /**
     * 获取分片分布信息
     */
    public Map<String, Object> getShardDistribution(String indexName) {
        try {
            IndicesStatsRequest request = IndicesStatsRequest.of(builder ->
                builder.index(indexName)
            );
            
            IndicesStatsResponse response = elasticsearchClient.indices().stats(request);
            
            Map<String, Object> distribution = new HashMap<>();
            
            response.indices().forEach((index, stats) -> {
                Map<String, Object> indexInfo = new HashMap<>();
                indexInfo.put("totalShards", stats.total().docs().count());
                indexInfo.put("primaryShards", stats.primaries().docs().count());
                indexInfo.put("storeSize", stats.total().store().sizeInBytes());
                distribution.put(index, indexInfo);
            });
            
            return distribution;
            
        } catch (Exception e) {
            log.error("获取分片分布信息失败: {}", indexName, e);
            throw new RuntimeException("获取分片信息失败", e);
        }
    }
    
    /**
     * 强制合并分片
     */
    public void forcemergeShards(String indexName, int maxNumSegments) {
        try {
            ForcemergeRequest request = ForcemergeRequest.of(builder ->
                builder.index(indexName)
                    .maxNumSegments(maxNumSegments)
                    .onlyExpungeDeletes(false)
                    .flush(true)
            );
            
            ForcemergeResponse response = elasticsearchClient.indices().forcemerge(request);
            log.info("强制合并分片成功: {}, 目标段数: {}", indexName, maxNumSegments);
            
        } catch (Exception e) {
            log.error("强制合并分片失败: {}", indexName, e);
            throw new RuntimeException("强制合并失败", e);
        }
    }
}

# 3. 路由策略实现

@Service
public class ElasticsearchRoutingService {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    /**
     * 基于用户ID的路由策略
     */
    public void indexWithUserRouting(String indexName, String userId, Object document) {
        try {
            // 使用用户ID作为路由键
            String routing = calculateRouting(userId);
            
            IndexRequest<Object> request = IndexRequest.of(builder ->
                builder.index(indexName)
                    .id(userId)
                    .routing(routing)
                    .document(document)
            );
            
            IndexResponse response = elasticsearchClient.index(request);
            log.debug("文档索引成功: {}, 路由: {}", response.id(), routing);
            
        } catch (Exception e) {
            log.error("文档索引失败", e);
            throw new RuntimeException("索引失败", e);
        }
    }
    
    /**
     * 基于时间的路由策略
     */
    public void indexWithTimeRouting(String indexPrefix, LocalDateTime timestamp, Object document) {
        try {
            // 按天分割索引
            String indexName = indexPrefix + "-" + timestamp.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
            
            // 使用小时作为路由键
            String routing = String.valueOf(timestamp.getHour());
            
            IndexRequest<Object> request = IndexRequest.of(builder ->
                builder.index(indexName)
                    .routing(routing)
                    .document(document)
            );
            
            IndexResponse response = elasticsearchClient.index(request);
            log.debug("时间路由索引成功: {}, 索引: {}, 路由: {}", 
                response.id(), indexName, routing);
            
        } catch (Exception e) {
            log.error("时间路由索引失败", e);
            throw new RuntimeException("索引失败", e);
        }
    }
    
    /**
     * 基于地理位置的路由策略
     */
    public void indexWithGeoRouting(String indexName, double lat, double lon, Object document) {
        try {
            // 基于地理位置计算路由
            String routing = calculateGeoRouting(lat, lon);
            
            IndexRequest<Object> request = IndexRequest.of(builder ->
                builder.index(indexName)
                    .routing(routing)
                    .document(document)
            );
            
            IndexResponse response = elasticsearchClient.index(request);
            log.debug("地理路由索引成功: {}, 路由: {}", response.id(), routing);
            
        } catch (Exception e) {
            log.error("地理路由索引失败", e);
            throw new RuntimeException("索引失败", e);
        }
    }
    
    /**
     * 批量索引with路由
     */
    public void bulkIndexWithRouting(String indexName, List<DocumentWithRouting> documents) {
        try {
            BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
            
            for (DocumentWithRouting doc : documents) {
                bulkBuilder.operations(op -> op
                    .index(idx -> idx
                        .index(indexName)
                        .id(doc.getId())
                        .routing(doc.getRouting())
                        .document(doc.getDocument())
                    )
                );
            }
            
            BulkResponse response = elasticsearchClient.bulk(bulkBuilder.build());
            
            if (response.errors()) {
                log.warn("批量索引部分失败");
                response.items().forEach(item -> {
                    if (item.error() != null) {
                        log.error("索引失败: {}, 错误: {}", item.id(), item.error().reason());
                    }
                });
            } else {
                log.info("批量索引成功,文档数: {}", documents.size());
            }
            
        } catch (Exception e) {
            log.error("批量索引失败", e);
            throw new RuntimeException("批量索引失败", e);
        }
    }
    
    /**
     * 路由查询
     */
    public <T> List<T> searchWithRouting(String indexName, String routing, 
                                       Query query, Class<T> clazz) {
        try {
            SearchRequest request = SearchRequest.of(builder ->
                builder.index(indexName)
                    .routing(routing)
                    .query(query)
                    .size(1000)
            );
            
            SearchResponse<T> response = elasticsearchClient.search(request, clazz);
            
            return response.hits().hits().stream()
                .map(hit -> hit.source())
                .collect(Collectors.toList());
            
        } catch (Exception e) {
            log.error("路由查询失败", e);
            throw new RuntimeException("查询失败", e);
        }
    }
    
    private String calculateRouting(String userId) {
        // 简单的哈希路由策略
        return String.valueOf(Math.abs(userId.hashCode()) % 10);
    }
    
    private String calculateGeoRouting(double lat, double lon) {
        // 基于地理位置的简单分区策略
        int latZone = (int) ((lat + 90) / 30); // 6个纬度区域
        int lonZone = (int) ((lon + 180) / 60); // 6个经度区域
        return latZone + "-" + lonZone;
    }
    
    public static class DocumentWithRouting {
        private String id;
        private String routing;
        private Object document;
        
        // 构造函数、getter、setter
    }
}

# 4. 索引生命周期管理

@Service
public class IndexLifecycleService {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    /**
     * 创建索引生命周期策略
     */
    public void createLifecyclePolicy(String policyName) {
        try {
            // 创建ILM策略
            Map<String, Object> policy = Map.of(
                "policy", Map.of(
                    "phases", Map.of(
                        "hot", Map.of(
                            "actions", Map.of(
                                "rollover", Map.of(
                                    "max_size", "10GB",
                                    "max_age", "7d",
                                    "max_docs", 10000000
                                )
                            )
                        ),
                        "warm", Map.of(
                            "min_age", "7d",
                            "actions", Map.of(
                                "allocate", Map.of(
                                    "number_of_replicas", 0
                                ),
                                "forcemerge", Map.of(
                                    "max_num_segments", 1
                                )
                            )
                        ),
                        "cold", Map.of(
                            "min_age", "30d",
                            "actions", Map.of(
                                "allocate", Map.of(
                                    "number_of_replicas", 0
                                )
                            )
                        ),
                        "delete", Map.of(
                            "min_age", "90d",
                            "actions", Map.of(
                                "delete", Map.of()
                            )
                        )
                    )
                )
            );
            
            // 这里需要使用低级客户端或REST API
            log.info("创建生命周期策略: {}", policyName);
            
        } catch (Exception e) {
            log.error("创建生命周期策略失败: {}", policyName, e);
            throw new RuntimeException("创建策略失败", e);
        }
    }
    
    /**
     * 创建索引模板with生命周期
     */
    public void createIndexTemplateWithLifecycle(String templateName, String indexPattern, 
                                                String lifecyclePolicy) {
        try {
            PutIndexTemplateRequest request = PutIndexTemplateRequest.of(builder ->
                builder.name(templateName)
                    .indexPatterns(indexPattern)
                    .template(template -> template
                        .settings(settings -> settings
                            .numberOfShards("3")
                            .numberOfReplicas("1")
                            .put("index.lifecycle.name", lifecyclePolicy)
                            .put("index.lifecycle.rollover_alias", indexPattern.replace("*", "alias"))
                        )
                        .mappings(mappings -> mappings
                            .properties("timestamp", property -> property
                                .date(date -> date.format("yyyy-MM-dd HH:mm:ss"))
                            )
                            .properties("level", property -> property
                                .keyword(keyword -> keyword)
                            )
                            .properties("message", property -> property
                                .text(text -> text.analyzer("standard"))
                            )
                        )
                    )
            );
            
            PutIndexTemplateResponse response = elasticsearchClient.indices()
                .putIndexTemplate(request);
            
            log.info("创建索引模板成功: {}, 生命周期策略: {}", templateName, lifecyclePolicy);
            
        } catch (Exception e) {
            log.error("创建索引模板失败: {}", templateName, e);
            throw new RuntimeException("创建模板失败", e);
        }
    }
    
    /**
     * 手动触发索引滚动
     */
    public void rolloverIndex(String aliasName) {
        try {
            RolloverRequest request = RolloverRequest.of(builder ->
                builder.alias(aliasName)
                    .conditions(conditions -> conditions
                        .maxSize("5GB")
                        .maxAge(Time.of(time -> time.time("1d")))
                        .maxDocs(5000000L)
                    )
            );
            
            RolloverResponse response = elasticsearchClient.indices().rollover(request);
            
            if (response.rolledOver()) {
                log.info("索引滚动成功: {} -> {}", aliasName, response.newIndex());
            } else {
                log.info("索引滚动条件未满足: {}", aliasName);
            }
            
        } catch (Exception e) {
            log.error("索引滚动失败: {}", aliasName, e);
            throw new RuntimeException("索引滚动失败", e);
        }
    }
    
    /**
     * 监控索引生命周期状态
     */
    @Scheduled(fixedRate = 3600000) // 1小时检查一次
    public void monitorIndexLifecycle() {
        try {
            // 获取所有索引的ILM状态
            GetLifecycleRequest request = GetLifecycleRequest.of(builder ->
                builder.index("*")
            );
            
            // 这里需要使用低级客户端获取ILM状态
            log.info("检查索引生命周期状态");
            
        } catch (Exception e) {
            log.error("监控索引生命周期失败", e);
        }
    }
    
    /**
     * 清理过期索引
     */
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void cleanupExpiredIndices() {
        try {
            LocalDateTime cutoffDate = LocalDateTime.now().minusDays(90);
            String cutoffPattern = "*-" + cutoffDate.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
            
            GetIndexRequest request = GetIndexRequest.of(builder ->
                builder.index(cutoffPattern)
            );
            
            GetIndexResponse response = elasticsearchClient.indices().get(request);
            
            for (String indexName : response.result().keySet()) {
                if (shouldDeleteIndex(indexName, cutoffDate)) {
                    deleteIndex(indexName);
                }
            }
            
        } catch (Exception e) {
            log.error("清理过期索引失败", e);
        }
    }
    
    private boolean shouldDeleteIndex(String indexName, LocalDateTime cutoffDate) {
        // 解析索引名称中的日期
        try {
            String[] parts = indexName.split("-");
            if (parts.length >= 2) {
                String datePart = parts[parts.length - 1];
                LocalDate indexDate = LocalDate.parse(datePart, DateTimeFormatter.ofPattern("yyyy.MM.dd"));
                return indexDate.isBefore(cutoffDate.toLocalDate());
            }
        } catch (Exception e) {
            log.warn("无法解析索引日期: {}", indexName);
        }
        return false;
    }
    
    private void deleteIndex(String indexName) {
        try {
            DeleteIndexRequest request = DeleteIndexRequest.of(builder ->
                builder.index(indexName)
            );
            
            DeleteIndexResponse response = elasticsearchClient.indices().delete(request);
            log.info("删除过期索引: {}", indexName);
            
        } catch (Exception e) {
            log.error("删除索引失败: {}", indexName, e);
        }
    }
}

# 性能优化策略

# 1. 分片大小优化

@Service
public class ShardSizeOptimizer {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 分析分片大小分布
     */
    public ShardSizeAnalysis analyzeShardSizes(String indexPattern) {
        try {
            IndicesStatsRequest request = IndicesStatsRequest.of(builder ->
                builder.index(indexPattern)
            );
            
            IndicesStatsResponse response = elasticsearchClient.indices().stats(request);
            
            List<ShardInfo> shardInfos = new ArrayList<>();
            
            response.indices().forEach((indexName, stats) -> {
                stats.shards().forEach((shardId, shardStats) -> {
                    ShardInfo info = new ShardInfo();
                    info.setIndexName(indexName);
                    info.setShardId(shardId);
                    info.setDocCount(shardStats.get(0).docs().count());
                    info.setStoreSize(shardStats.get(0).store().sizeInBytes());
                    shardInfos.add(info);
                });
            });
            
            return analyzeDistribution(shardInfos);
            
        } catch (Exception e) {
            log.error("分析分片大小失败", e);
            throw new RuntimeException("分析失败", e);
        }
    }
    
    /**
     * 推荐最优分片数量
     */
    public ShardRecommendation recommendShardCount(long estimatedDataSize, 
                                                 long estimatedDocCount,
                                                 int nodeCount) {
        // 目标分片大小: 10-50GB
        long targetShardSize = 30L * 1024 * 1024 * 1024; // 30GB
        
        // 基于数据大小计算分片数
        int shardsBySize = (int) Math.ceil((double) estimatedDataSize / targetShardSize);
        
        // 基于节点数计算分片数(每个节点1-3个分片)
        int shardsByNodes = nodeCount * 2;
        
        // 基于文档数计算分片数(每个分片不超过1000万文档)
        int shardsByDocs = (int) Math.ceil((double) estimatedDocCount / 10_000_000);
        
        // 取中间值
        int recommendedShards = Math.max(1, Math.min(
            Math.max(shardsBySize, shardsByDocs), 
            shardsByNodes
        ));
        
        ShardRecommendation recommendation = new ShardRecommendation();
        recommendation.setRecommendedShards(recommendedShards);
        recommendation.setRecommendedReplicas(Math.min(1, nodeCount - 1));
        recommendation.setReason(String.format(
            "基于数据大小(%dGB)、文档数(%d)、节点数(%d)的综合考虑",
            estimatedDataSize / (1024 * 1024 * 1024),
            estimatedDocCount,
            nodeCount
        ));
        
        return recommendation;
    }
    
    /**
     * 监控分片性能指标
     */
    @Scheduled(fixedRate = 300000) // 5分钟
    public void monitorShardPerformance() {
        try {
            NodesStatsRequest request = NodesStatsRequest.of(builder ->
                builder.metric("indices")
            );
            
            NodesStatsResponse response = elasticsearchClient.nodes().stats(request);
            
            response.nodes().forEach((nodeId, nodeStats) -> {
                if (nodeStats.indices() != null) {
                    // 索引性能指标
                    long indexingRate = nodeStats.indices().indexing().indexTotal();
                    long searchRate = nodeStats.indices().search().queryTotal();
                    long storeSize = nodeStats.indices().store().sizeInBytes();
                    
                    // 记录指标
                    Gauge.builder("elasticsearch.node.indexing.rate")
                        .tag("node", nodeId)
                        .register(meterRegistry, indexingRate);
                    
                    Gauge.builder("elasticsearch.node.search.rate")
                        .tag("node", nodeId)
                        .register(meterRegistry, searchRate);
                    
                    Gauge.builder("elasticsearch.node.store.size")
                        .tag("node", nodeId)
                        .register(meterRegistry, storeSize);
                }
            });
            
        } catch (Exception e) {
            log.error("监控分片性能失败", e);
        }
    }
    
    private ShardSizeAnalysis analyzeDistribution(List<ShardInfo> shardInfos) {
        if (shardInfos.isEmpty()) {
            return new ShardSizeAnalysis();
        }
        
        // 计算统计信息
        LongSummaryStatistics sizeStats = shardInfos.stream()
            .mapToLong(ShardInfo::getStoreSize)
            .summaryStatistics();
        
        LongSummaryStatistics docStats = shardInfos.stream()
            .mapToLong(ShardInfo::getDocCount)
            .summaryStatistics();
        
        ShardSizeAnalysis analysis = new ShardSizeAnalysis();
        analysis.setTotalShards(shardInfos.size());
        analysis.setAvgShardSize(sizeStats.getAverage());
        analysis.setMaxShardSize(sizeStats.getMax());
        analysis.setMinShardSize(sizeStats.getMin());
        analysis.setAvgDocCount(docStats.getAverage());
        analysis.setMaxDocCount(docStats.getMax());
        analysis.setMinDocCount(docStats.getMin());
        
        // 计算不平衡度
        double sizeImbalance = (sizeStats.getMax() - sizeStats.getMin()) / sizeStats.getAverage();
        analysis.setSizeImbalanceRatio(sizeImbalance);
        
        return analysis;
    }
    
    // 内部类定义
    public static class ShardInfo {
        private String indexName;
        private String shardId;
        private long docCount;
        private long storeSize;
        
        // getter和setter
    }
    
    public static class ShardSizeAnalysis {
        private int totalShards;
        private double avgShardSize;
        private long maxShardSize;
        private long minShardSize;
        private double avgDocCount;
        private long maxDocCount;
        private long minDocCount;
        private double sizeImbalanceRatio;
        
        // getter和setter
    }
    
    public static class ShardRecommendation {
        private int recommendedShards;
        private int recommendedReplicas;
        private String reason;
        
        // getter和setter
    }
}

# 2. 查询性能优化

@Service
public class SearchOptimizationService {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    /**
     * 优化的分页查询
     */
    public <T> PagedResult<T> optimizedPagedSearch(String indexName, 
                                                 Query query, 
                                                 int page, 
                                                 int size, 
                                                 Class<T> clazz) {
        try {
            // 使用search_after进行深度分页
            if (page * size > 10000) {
                return searchAfterPagination(indexName, query, page, size, clazz);
            } else {
                return standardPagination(indexName, query, page, size, clazz);
            }
            
        } catch (Exception e) {
            log.error("分页查询失败", e);
            throw new RuntimeException("查询失败", e);
        }
    }
    
    /**
     * 聚合查询优化
     */
    public Map<String, Object> optimizedAggregation(String indexName, 
                                                   Query query,
                                                   List<Aggregation> aggregations) {
        try {
            SearchRequest.Builder requestBuilder = new SearchRequest.Builder()
                .index(indexName)
                .query(query)
                .size(0); // 不返回文档,只返回聚合结果
            
            // 添加聚合
            for (Aggregation agg : aggregations) {
                requestBuilder.aggregations(agg.getName(), agg);
            }
            
            SearchRequest request = requestBuilder.build();
            SearchResponse<Void> response = elasticsearchClient.search(request, Void.class);
            
            Map<String, Object> results = new HashMap<>();
            response.aggregations().forEach((name, aggregation) -> {
                results.put(name, parseAggregationResult(aggregation));
            });
            
            return results;
            
        } catch (Exception e) {
            log.error("聚合查询失败", e);
            throw new RuntimeException("聚合查询失败", e);
        }
    }
    
    /**
     * 多索引并行查询
     */
    public <T> List<T> parallelMultiIndexSearch(List<String> indices, 
                                              Query query, 
                                              Class<T> clazz) {
        try {
            // 并行查询多个索引
            List<CompletableFuture<List<T>>> futures = indices.stream()
                .map(index -> CompletableFuture.supplyAsync(() -> {
                    try {
                        SearchRequest request = SearchRequest.of(builder ->
                            builder.index(index)
                                .query(query)
                                .size(1000)
                        );
                        
                        SearchResponse<T> response = elasticsearchClient.search(request, clazz);
                        return response.hits().hits().stream()
                            .map(hit -> hit.source())
                            .collect(Collectors.toList());
                    } catch (Exception e) {
                        log.error("查询索引失败: {}", index, e);
                        return Collections.<T>emptyList();
                    }
                }))
                .collect(Collectors.toList());
            
            // 合并结果
            return futures.stream()
                .map(CompletableFuture::join)
                .flatMap(List::stream)
                .collect(Collectors.toList());
            
        } catch (Exception e) {
            log.error("并行多索引查询失败", e);
            throw new RuntimeException("查询失败", e);
        }
    }
    
    /**
     * 缓存热点查询
     */
    @Cacheable(value = "elasticsearch-queries", key = "#indexName + ':' + #query.toString()")
    public <T> List<T> cachedSearch(String indexName, Query query, Class<T> clazz) {
        try {
            SearchRequest request = SearchRequest.of(builder ->
                builder.index(indexName)
                    .query(query)
                    .size(100)
            );
            
            SearchResponse<T> response = elasticsearchClient.search(request, clazz);
            return response.hits().hits().stream()
                .map(hit -> hit.source())
                .collect(Collectors.toList());
            
        } catch (Exception e) {
            log.error("缓存查询失败", e);
            throw new RuntimeException("查询失败", e);
        }
    }
    
    private <T> PagedResult<T> standardPagination(String indexName, Query query, 
                                                 int page, int size, Class<T> clazz) 
            throws IOException {
        SearchRequest request = SearchRequest.of(builder ->
            builder.index(indexName)
                .query(query)
                .from(page * size)
                .size(size)
        );
        
        SearchResponse<T> response = elasticsearchClient.search(request, clazz);
        
        List<T> content = response.hits().hits().stream()
            .map(hit -> hit.source())
            .collect(Collectors.toList());
        
        return new PagedResult<>(content, page, size, response.hits().total().value());
    }
    
    private <T> PagedResult<T> searchAfterPagination(String indexName, Query query, 
                                                    int page, int size, Class<T> clazz) 
            throws IOException {
        // 实现search_after分页逻辑
        // 这里简化实现,实际需要维护排序值
        return standardPagination(indexName, query, page, size, clazz);
    }
    
    private Object parseAggregationResult(Aggregate aggregation) {
        // 解析聚合结果
        return new HashMap<>(); // 简化实现
    }
    
    public static class PagedResult<T> {
        private List<T> content;
        private int page;
        private int size;
        private long total;
        
        public PagedResult(List<T> content, int page, int size, long total) {
            this.content = content;
            this.page = page;
            this.size = size;
            this.total = total;
        }
        
        // getter和setter
    }
}

# 监控与运维

# 1. 集群健康监控

@Component
public class ElasticsearchClusterMonitor {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 监控集群健康状态
     */
    @Scheduled(fixedRate = 30000)
    public void monitorClusterHealth() {
        try {
            HealthRequest request = HealthRequest.of(builder -> builder);
            HealthResponse response = elasticsearchClient.cluster().health(request);
            
            // 记录集群状态
            String status = response.status().jsonValue();
            Gauge.builder("elasticsearch.cluster.status")
                .tag("status", status)
                .register(meterRegistry, getStatusValue(status));
            
            // 记录节点数量
            Gauge.builder("elasticsearch.cluster.nodes.total")
                .register(meterRegistry, response.numberOfNodes());
            Gauge.builder("elasticsearch.cluster.nodes.data")
                .register(meterRegistry, response.numberOfDataNodes());
            
            // 记录分片状态
            Gauge.builder("elasticsearch.cluster.shards.active")
                .register(meterRegistry, response.activeShards());
            Gauge.builder("elasticsearch.cluster.shards.relocating")
                .register(meterRegistry, response.relocatingShards());
            Gauge.builder("elasticsearch.cluster.shards.initializing")
                .register(meterRegistry, response.initializingShards());
            Gauge.builder("elasticsearch.cluster.shards.unassigned")
                .register(meterRegistry, response.unassignedShards());
            
            // 检查是否有问题
            if (!"green".equals(status)) {
                log.warn("集群状态异常: {}, 未分配分片: {}", 
                    status, response.unassignedShards());
            }
            
        } catch (Exception e) {
            log.error("集群健康监控失败", e);
        }
    }
    
    /**
     * 监控索引状态
     */
    @Scheduled(fixedRate = 300000) // 5分钟
    public void monitorIndexHealth() {
        try {
            IndicesStatsRequest request = IndicesStatsRequest.of(builder ->
                builder.index("*")
            );
            
            IndicesStatsResponse response = elasticsearchClient.indices().stats(request);
            
            response.indices().forEach((indexName, stats) -> {
                // 索引大小
                long storeSize = stats.total().store().sizeInBytes();
                Gauge.builder("elasticsearch.index.store.size")
                    .tag("index", indexName)
                    .register(meterRegistry, storeSize);
                
                // 文档数量
                long docCount = stats.total().docs().count();
                Gauge.builder("elasticsearch.index.docs.count")
                    .tag("index", indexName)
                    .register(meterRegistry, docCount);
                
                // 索引操作统计
                if (stats.total().indexing() != null) {
                    long indexTotal = stats.total().indexing().indexTotal();
                    long indexTime = stats.total().indexing().indexTimeInMillis();
                    
                    Counter.builder("elasticsearch.index.indexing.total")
                        .tag("index", indexName)
                        .register(meterRegistry).increment(indexTotal);
                    
                    Timer.builder("elasticsearch.index.indexing.time")
                        .tag("index", indexName)
                        .register(meterRegistry).record(indexTime, TimeUnit.MILLISECONDS);
                }
                
                // 搜索操作统计
                if (stats.total().search() != null) {
                    long queryTotal = stats.total().search().queryTotal();
                    long queryTime = stats.total().search().queryTimeInMillis();
                    
                    Counter.builder("elasticsearch.index.search.total")
                        .tag("index", indexName)
                        .register(meterRegistry).increment(queryTotal);
                    
                    Timer.builder("elasticsearch.index.search.time")
                        .tag("index", indexName)
                        .register(meterRegistry).record(queryTime, TimeUnit.MILLISECONDS);
                }
            });
            
        } catch (Exception e) {
            log.error("索引健康监控失败", e);
        }
    }
    
    /**
     * 监控节点性能
     */
    @Scheduled(fixedRate = 60000) // 1分钟
    public void monitorNodePerformance() {
        try {
            NodesStatsRequest request = NodesStatsRequest.of(builder ->
                builder.metric("jvm", "os", "fs")
            );
            
            NodesStatsResponse response = elasticsearchClient.nodes().stats(request);
            
            response.nodes().forEach((nodeId, nodeStats) -> {
                String nodeName = nodeStats.name();
                
                // JVM内存使用
                if (nodeStats.jvm() != null && nodeStats.jvm().mem() != null) {
                    long heapUsed = nodeStats.jvm().mem().heapUsedInBytes();
                    long heapMax = nodeStats.jvm().mem().heapMaxInBytes();
                    double heapUsedPercent = (double) heapUsed / heapMax * 100;
                    
                    Gauge.builder("elasticsearch.node.jvm.heap.used")
                        .tag("node", nodeName)
                        .register(meterRegistry, heapUsed);
                    
                    Gauge.builder("elasticsearch.node.jvm.heap.used.percent")
                        .tag("node", nodeName)
                        .register(meterRegistry, heapUsedPercent);
                }
                
                // 系统负载
                if (nodeStats.os() != null) {
                    if (nodeStats.os().cpu() != null) {
                        int cpuPercent = nodeStats.os().cpu().percent();
                        Gauge.builder("elasticsearch.node.os.cpu.percent")
                            .tag("node", nodeName)
                            .register(meterRegistry, cpuPercent);
                    }
                    
                    if (nodeStats.os().mem() != null) {
                        long memUsed = nodeStats.os().mem().usedInBytes();
                        long memTotal = nodeStats.os().mem().totalInBytes();
                        double memUsedPercent = (double) memUsed / memTotal * 100;
                        
                        Gauge.builder("elasticsearch.node.os.mem.used.percent")
                            .tag("node", nodeName)
                            .register(meterRegistry, memUsedPercent);
                    }
                }
                
                // 磁盘使用
                if (nodeStats.fs() != null && nodeStats.fs().total() != null) {
                    long diskUsed = nodeStats.fs().total().totalInBytes() - 
                                  nodeStats.fs().total().availableInBytes();
                    long diskTotal = nodeStats.fs().total().totalInBytes();
                    double diskUsedPercent = (double) diskUsed / diskTotal * 100;
                    
                    Gauge.builder("elasticsearch.node.fs.used.percent")
                        .tag("node", nodeName)
                        .register(meterRegistry, diskUsedPercent);
                }
            });
            
        } catch (Exception e) {
            log.error("节点性能监控失败", e);
        }
    }
    
    private double getStatusValue(String status) {
        switch (status) {
            case "green": return 2;
            case "yellow": return 1;
            case "red": return 0;
            default: return -1;
        }
    }
}

# 2. 自动故障处理

@Service
public class ElasticsearchFailoverService {
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    @Autowired
    private NotificationService notificationService;
    
    /**
     * 检测并处理分片分配问题
     */
    @Scheduled(fixedRate = 60000)
    public void handleShardAllocationIssues() {
        try {
            HealthRequest request = HealthRequest.of(builder -> builder);
            HealthResponse health = elasticsearchClient.cluster().health(request);
            
            if (health.unassignedShards() > 0) {
                log.warn("检测到未分配分片: {}", health.unassignedShards());
                handleUnassignedShards();
            }
            
            if (health.relocatingShards() > 10) {
                log.warn("检测到大量分片重新分配: {}", health.relocatingShards());
                // 可能需要调整分配策略
            }
            
        } catch (Exception e) {
            log.error("分片分配检查失败", e);
        }
    }
    
    /**
     * 处理未分配分片
     */
    private void handleUnassignedShards() {
        try {
            // 获取未分配分片详情
            ClusterAllocationExplainRequest request = ClusterAllocationExplainRequest.of(builder ->
                builder.includeYesDecisions(true)
                    .includeDiskInfo(true)
            );
            
            ClusterAllocationExplainResponse response = elasticsearchClient.cluster()
                .allocationExplain(request);
            
            // 分析分配失败原因
            String reason = analyzeAllocationFailure(response);
            log.info("分片未分配原因: {}", reason);
            
            // 尝试自动修复
            attemptAutoFix(reason);
            
        } catch (Exception e) {
            log.error("处理未分配分片失败", e);
        }
    }
    
    /**
     * 自动修复常见问题
     */
    private void attemptAutoFix(String reason) {
        try {
            if (reason.contains("disk")) {
                // 磁盘空间不足,尝试清理旧数据
                cleanupOldIndices();
            } else if (reason.contains("allocation")) {
                // 分配策略问题,尝试调整设置
                adjustAllocationSettings();
            } else if (reason.contains("replica")) {
                // 副本分配问题,临时减少副本数
                reduceReplicaCount();
            }
        } catch (Exception e) {
            log.error("自动修复失败", e);
        }
    }
    
    /**
     * 清理旧索引
     */
    private void cleanupOldIndices() {
        try {
            LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
            String pattern = "logs-" + cutoff.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
            
            GetIndexRequest request = GetIndexRequest.of(builder ->
                builder.index(pattern)
            );
            
            GetIndexResponse response = elasticsearchClient.indices().get(request);
            
            for (String indexName : response.result().keySet()) {
                if (isOldIndex(indexName, cutoff)) {
                    deleteIndex(indexName);
                }
            }
            
        } catch (Exception e) {
            log.error("清理旧索引失败", e);
        }
    }
    
    /**
     * 调整分配设置
     */
    private void adjustAllocationSettings() {
        try {
            PutClusterSettingsRequest request = PutClusterSettingsRequest.of(builder ->
                builder.transient_(settings -> settings
                    .put("cluster.routing.allocation.enable", "all")
                    .put("cluster.routing.rebalance.enable", "all")
                    .put("cluster.routing.allocation.allow_rebalance", "always")
                )
            );
            
            PutClusterSettingsResponse response = elasticsearchClient.cluster()
                .putSettings(request);
            
            log.info("调整集群分配设置完成");
            
        } catch (Exception e) {
            log.error("调整分配设置失败", e);
        }
    }
    
    /**
     * 减少副本数
     */
    private void reduceReplicaCount() {
        try {
            // 临时将所有索引的副本数设为0
            PutIndicesSettingsRequest request = PutIndicesSettingsRequest.of(builder ->
                builder.index("*")
                    .settings(settings -> settings
                        .numberOfReplicas("0")
                    )
            );
            
            PutIndicesSettingsResponse response = elasticsearchClient.indices()
                .putSettings(request);
            
            log.info("临时减少副本数完成");
            
            // 发送通知
            notificationService.sendAlert("Elasticsearch副本数已临时调整为0");
            
        } catch (Exception e) {
            log.error("减少副本数失败", e);
        }
    }
    
    private String analyzeAllocationFailure(ClusterAllocationExplainResponse response) {
        // 简化实现,实际需要解析详细的分配决策
        return "分配失败分析";
    }
    
    private boolean isOldIndex(String indexName, LocalDateTime cutoff) {
        // 解析索引名称中的日期
        try {
            String[] parts = indexName.split("-");
            if (parts.length >= 2) {
                String datePart = parts[parts.length - 1];
                LocalDate indexDate = LocalDate.parse(datePart, DateTimeFormatter.ofPattern("yyyy.MM.dd"));
                return indexDate.isBefore(cutoff.toLocalDate());
            }
        } catch (Exception e) {
            log.warn("无法解析索引日期: {}", indexName);
        }
        return false;
    }
    
    private void deleteIndex(String indexName) {
        try {
            DeleteIndexRequest request = DeleteIndexRequest.of(builder ->
                builder.index(indexName)
            );
            
            DeleteIndexResponse response = elasticsearchClient.indices().delete(request);
            log.info("删除旧索引: {}", indexName);
            
        } catch (Exception e) {
            log.error("删除索引失败: {}", indexName, e);
        }
    }
}

# 配置文件

# application.yml

spring:
  application:
    name: elasticsearch-sharding-demo
  
elasticsearch:
  hosts:
    - "localhost:9200"
    - "localhost:9201"
    - "localhost:9202"
  connection-timeout: 5s
  socket-timeout: 60s
  
management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true

logging:
  level:
    org.elasticsearch: DEBUG
    com.example.elasticsearch: DEBUG

# 最佳实践

# 1. 分片设计原则

  • 分片大小: 保持在10-50GB之间
  • 分片数量: 避免过度分片,通常每个节点1-3个分片
  • 副本策略: 根据可用性需求设置副本数
  • 路由策略: 合理使用路由避免热点

# 2. 性能优化

  • 批量操作: 使用bulk API提高索引性能
  • 分片预分配: 根据数据增长预估分片数
  • 索引模板: 统一管理索引设置和映射
  • 生命周期管理: 自动化索引的创建、滚动和删除

# 3. 监控要点

  • 集群健康: 定期检查集群状态
  • 分片分布: 监控分片在节点间的分布
  • 性能指标: 关注索引和查询性能
  • 资源使用: 监控CPU、内存、磁盘使用情况

# 4. 故障处理

  • 自动恢复: 配置自动故障转移
  • 备份策略: 定期备份重要数据
  • 容量规划: 预留足够的存储空间
  • 版本升级: 制定滚动升级策略

# 总结

Elasticsearch分片技术是构建高性能、高可用搜索系统的关键。通过合理的分片设计、路由策略和监控机制,可以实现:

  1. 水平扩展: 支持PB级数据存储和查询
  2. 高可用性: 通过副本机制保证服务连续性
  3. 负载均衡: 分片分布实现查询负载分散
  4. 性能优化: 并行处理提升查询效率

在实际应用中,需要根据业务特点调整分片策略,并建立完善的监控和运维体系。