一 数据聚合

1. 聚合的种类

聚合(aggregations)可以实现对文档数据的统计分析运算。聚合常见的有三类:

  • 桶(Bucket)集合:用来对文档做分组
    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周一组,一月一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值最小值平均值
    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求maxminavgsum
  • 管道( pipeline)聚合:其它聚合的结果为基础做聚合

2. DSL实现聚合

DSL实现Bucket聚合

现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。

1
2
3
4
5
6
7
8
9
10
11
12
GET /hotel/_search
{
"size": 0, # 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { # 定义聚合
"brandAgg": { # 聚合起个名字
"terms": { # 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", # 参与聚合的字段
"size": 20 # 希望获取的聚合结果数量
}
}
}
}

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 聚合功能,自定义排序规则
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "asc"
}
}
}
}
}

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 聚合功能,自定义排序规则
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"_count": "asc"
}
}
}
}
}

DSL实现Metrics聚合

例如,我们要求获取每个品牌的用户评分的minmaxavg等值.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 聚合功能,自定义排序规则
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": { # 是brands聚合的子聚合,也就是分组后对每组分别计算
"scoreAgg": { # 聚合名称
"stats": { # 聚合类型,这里stats可以计算min、max、avg等
"field": "score" # 聚合字段,这里是score
}
}
}
}
}
}

3. RestAPI实现聚合

我们以品牌聚合为例,演示下JavaRestClient使用,先看请求组装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package cn.itcast.hotel;

import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class HotelSearchTest {
private RestHighLevelClient client;

@Test
void testAggregation() throws IOException {
// 1.准备 request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备DSL
// 2.1 设置size
request.source().size(0);
// 2.2 设置聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(20)
);
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
System.out.println(response);
// 解析聚合结果
Aggregations aggregations = response.getAggregations();
// 4.1 根据名称获取聚合结果
Terms brandTerms = aggregations.get("brandAgg");
// 4.2 获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3 遍历
for (Terms.Bucket bucket : buckets) {
// 4.4 获取key
String key = bucket.getKeyAsString();
System.out.println(key);
}

}

@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.127.132:9200")
// 集群可写多个
// HttpHost.create("http://192.168.127.131:9200"),
// HttpHost.create("http://192.168.127.131:9200")
));
}

@AfterEach
void tearDown() throws IOException {
this.client.close();
}
}

1. 案例 在IUserService中定义方法,实现对品牌、成熟、星级的聚合

需求: 搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package cn.itcast.hotel.service;

import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;
import java.util.Map;

public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);

Map<String, List<String>> filters();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package cn.itcast.hotel.service.impl;

import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;


@Override
public Map<String, List<String>> filters() {
try {
// 1.准备 request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备DSL
// 2.1 设置size
request.source().size(0);
buildAggregation(request);
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
// 解析聚合结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1 根据名称,获取品牌结果
List<String> brandList = getAggByName(aggregations, "brandAgg");
// 4.4 放入map
result.put("品牌", brandList);
// 4.1 根据名称,获取品牌结果
List<String> cityList = getAggByName(aggregations, "cityAgg");
// 4.4 放入map
result.put("城市", cityList);
// 4.1 根据名称,获取品牌结果
List<String> starList = getAggByName(aggregations, "starAgg");
// 4.4 放入map
result.put("星级", starList);
return result;
} catch (IOException e) {
throw new RuntimeException();
}
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1 根据名称获取聚合结果
Terms brandTerms = aggregations.get("aggName");
// 4.2 获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3 遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 4.4 获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}

private void buildAggregation(SearchRequest request) {
// 2.2 设置聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}

private void buildBasicQuery(RequestParams params, SearchRequest request) {
// 构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 关键字搜索
String key = params.getKey();
if (key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else {
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
// 条件过滤
// 城市条件
if (params.getCity() != null && !params.getCity().equals("")){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 品牌条件
if (params.getBrand() != null && !params.getBrand().equals("")){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 星级条件
if (params.getStarName() != null && !params.getStarName().equals("")){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
// 价格条件
if (params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders
.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
// 2. 算分控制
FunctionScoreQueryBuilder functionScoreQueryBuilder =
QueryBuilders.functionScoreQuery(
// 原生查询 相关性算分
boolQuery,
// function score 数组
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
// 其中一个function score
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
// 过滤条件
QueryBuilders.termQuery("AD", true),
// 算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
});
request.source().query(functionScoreQueryBuilder);
}

private PageResult handleResponse(SearchHits searchHits) {
// 4.1获取总条数
long total = searchHits.getTotalHits().value;
// 4.2 文档数组
SearchHit[] hits = searchHits.getHits();
// 4.3 遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 获取排序值
Object[] sortValues = hit.getSortValues();
if (sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
return new PageResult(total, hotels);
}
}

2.对接前端接口

前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:

可以知道请求参数与之前search时的RequestParam完全一致,这是在限定聚合时的文档范围。

例如:用户搜索“外滩”,价格在300~600,那聚合必须是在这个搜索条件基础上完成。

因此我们需要:

  • 1.编写controller接口,接收该请求
  • 2.修改IUserService#getFilters()方法,添加RequestParam参数
  • 3.修改getFilters方法的业务,聚合时添加query条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.itcast.hotel.web;

import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/hotel")
public class HotelController {

@Autowired
private IHotelService hotelService;
@PostMapping("/list")
public PageResult search(@RequestBody RequestParams params){
return hotelService.search(params);

}

@PostMapping("/filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
return hotelService.filters(params);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package cn.itcast.hotel.service.impl;

import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;


@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
// 1.准备 request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备DSL
// 2.1 query
buildBasicQuery(params, request);
// 2.2 设置size
request.source().size(0);
// 2.3 聚合
buildAggregation(request);
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
// 解析聚合结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1 根据名称,获取品牌结果
List<String> brandList = getAggByName(aggregations, "brandAgg");
// 4.4 放入map
result.put("品牌", brandList);
// 4.1 根据名称,获取品牌结果
List<String> cityList = getAggByName(aggregations, "cityAgg");
// 4.4 放入map
result.put("城市", cityList);
// 4.1 根据名称,获取品牌结果
List<String> starList = getAggByName(aggregations, "starAgg");
// 4.4 放入map
result.put("星级", starList);
return result;
} catch (IOException e) {
throw new RuntimeException();
}
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1 根据名称获取聚合结果
Terms brandTerms = aggregations.get(aggName);
// 4.2 获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3 遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 4.4 获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}

private void buildAggregation(SearchRequest request) {
// 2.2 设置聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}

private void buildBasicQuery(RequestParams params, SearchRequest request) {
// 构建BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 关键字搜索
String key = params.getKey();
if (key == null || "".equals(key)){
boolQuery.must(QueryBuilders.matchAllQuery());
}else {
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
// 条件过滤
// 城市条件
if (params.getCity() != null && !params.getCity().equals("")){
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 品牌条件
if (params.getBrand() != null && !params.getBrand().equals("")){
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 星级条件
if (params.getStarName() != null && !params.getStarName().equals("")){
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
// 价格条件
if (params.getMinPrice() != null && params.getMaxPrice() != null){
boolQuery.filter(QueryBuilders
.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
// 2. 算分控制
FunctionScoreQueryBuilder functionScoreQueryBuilder =
QueryBuilders.functionScoreQuery(
// 原生查询 相关性算分
boolQuery,
// function score 数组
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
// 其中一个function score
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
// 过滤条件
QueryBuilders.termQuery("AD", true),
// 算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
});
request.source().query(functionScoreQueryBuilder);
}

private PageResult handleResponse(SearchHits searchHits) {
// 4.1获取总条数
long total = searchHits.getTotalHits().value;
// 4.2 文档数组
SearchHit[] hits = searchHits.getHits();
// 4.3 遍历
List<HotelDoc> hotels = new ArrayList<>();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
// 反序列化
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 获取排序值
Object[] sortValues = hit.getSortValues();
if (sortValues.length > 0){
Object sortValue = sortValues[0];
hotelDoc.setDistance(sortValue);
}
hotels.add(hotelDoc);
}
return new PageResult(total, hotels);
}
}

二 自动补全

1. 安装拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。下载地址

安装方式与IK分词器一样,分三步:

  • 1.解压
  • 2.上传到虚拟机中,elasticsearch的plugin目录
  • 3.重启elasticseasch
  • 4.测试
1
2
3
4
5
6
7
GET /_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "pinyin"
}


2.配置拼音分词器

elasticsearch分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

自定义分词器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
GET /test/_analyze
{
"text": ["如家酒店还不错"],
"analyzer": "my_analyzer"
}

DELETE /test

# 自定义分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer"
}
}
}
}

POST /test/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test/_doc/2
{
"id": 2,
"name": "虱子"
}
POST /test/_search
{
"query": {
"match": {
"name": "掉入狮子笼咋办"
}
}
}

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。

分词器的问题
因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik smart分词器;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
DELETE /test

# 自定义分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}

POST /test/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test/_doc/2
{
"id": 2,
"name": "虱子"
}
POST /test/_search
{
"query": {
"match": {
"name": "掉入狮子笼咋办"
}
}
}

3. completion suggest 查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型
  • 字段的内容一般是用来不全的多个词条形成的数组

查询语法

查询语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 自动补全库
PUT /test2
{
"mappings": {
"properties": {
"title": {
"type": "completion"
}
}
}
}

# 示例数据
POST /test2/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST /test2/_doc
{
"title": ["SK-II", "PITERA"]
}
POST /test2/_doc
{
"title": ["Nintendo", "switch"]
}

# 自动补全查询
GET /test2/_search
{
"suggest": {
"titleSuggest": {
"text": "s", # 关键字
"completion": {
"field": "title", # 补全字段
"skip_duplicates": true, # 跳过重复的
"size": 10 # 获取前10条结果
}
}
}
}

自动补全案例:实现hotel索引库的自动补全、拼音搜索功能

实现思路如下:

  • 1.修改hotel索引库结构,设置自定义拼音分词器
  • 2.修改索引库的nameall字段,使用自定义分词器
  • 3.索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器
  • 4.给HotelDoc类添加suggestion字段,内容包含brandbusiness
  • 5.重新导入数据到hotel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
DELETE /hotel

# 查看酒店数据结构
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "text_analyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address": {
"type": "keyword",
"index": false
},
"brand": {
"type": "keyword",
"copy_to": [
"all"
]
},
"business": {
"type": "keyword",
"copy_to": [
"all"
]
},
"city": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"location": {
"type": "geo_point"
},
"pic": {
"type": "keyword",
"index": false
},
"price": {
"type": "integer"
},
"score": {
"type": "integer"
},
"starName": {
"type": "keyword"
},
"suggestion": {
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cn.itcast.hotel.pojo;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
private Object distance;
private Boolean isAD;
private List<String> suggestion;

public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
if (this.business.equals("/")){
// business 有多个值,需要切割
String[] arr = this.business.split("/");
// 添加元素
this.suggestion = new ArrayList<>();
this.suggestion.add(this.brand);
Collections.addAll(this.suggestion, arr);
}else {
this.suggestion = Arrays.asList(this.brand, this.business);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET /hotel/_search 
{
"suggest": {
"suggestion": {
"text": "sd",
"completion": {
"field": "suggestion",
"skip_duplicates": true,
"size": 10
}
}
}
}

RestAPI实现自动补全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package cn.itcast.hotel;

import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class HotelSearchTest {
private RestHighLevelClient client;

@Test
void testSuggest() throws IOException {
// 1. 准备Request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("h")
.skipDuplicates(true)
.size(10)
));
// 3. 发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
Suggest suggest = response.getSuggest();
// 根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
// 4.2 获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
// 4.3 遍历
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
System.out.println(text);
}
}

@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.127.132:9200")
// 集群可写多个
// HttpHost.create("http://192.168.127.131:9200"),
// HttpHost.create("http://192.168.127.131:9200")
));
}

@AfterEach
void tearDown() throws IOException {
this.client.close();
}
}

三 数据同步

1. 数据同步问题分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearchmysql之间的数据同步。

在微服务中,负责酒店管理(操作mysql )的业务与负责酒店搜索(操作elasticsearch )的业务可能在两个不同的微服务上,数据同步该如何实现呢?

方案一 同步调用

数据同步_同步调用

方案二 异步通知

数据同步_异步调用

方案三 监听binlog

数据同步_监听binlog

2. 案例:利用MQ实现mysql与elasticsearch数据同步

当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。

步骤:

  • 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
  • 声明exchangequeueRoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据·
  • 启动并测试数据同步功能

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
server:
port: 8099
spring:
datasource:
url: jdbc:mysql://localhost:3306/heima?useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
rabbitmq:
host: 192.168.127.132
port: 5672
username: itcast
password: 123321
virtual-host: /
logging:
level:
cn.itcast: debug
pattern:
dateformat: MM-dd HH:mm:ss:SSS
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
type-aliases-package: cn.itcast.hotel.pojo

定义MQ的常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.itcast.hotel;

public class MqConsistants {
/**
* 交换机
*/
private final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
private final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
private final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的 RoutingKey
*/
private final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 新增或修改的 RoutingKey
*/
private final static String HOTEL_DELETE_KEY = "hotel.delete";


}

进行队列相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cn.itcast.hotel.config;

import cn.itcast.hotel.MqConsistants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConsistants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue() {
return new Queue(MqConsistants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue() {
return new Queue(MqConsistants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConsistants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConsistants.HOTEL_DELETE_KEY);
}
}

控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package cn.itcast.hotel.web;

import cn.itcast.hotel.constants.MqConsistants;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.security.InvalidParameterException;

@RestController
@RequestMapping("hotel")
public class HotelController {

@Autowired
private IHotelService hotelService;

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}

@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));

return new PageResult(result.getTotal(), result.getRecords());
}

@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId());
}

@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId());

}

@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_DELETE_KEY, id);

}
}

监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConsistants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id
*/
@RabbitListener(queues = MqConsistants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}

/**
* 监听酒店删除的业务
* @param id
*/
@RabbitListener(queues = MqConsistants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}


消费者端三层结构处理(首先需要配置AMQP)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package cn.itcast.hotel.service;

import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;
import java.util.Map;

public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);

Map<String, List<String>> filters(RequestParams params);

List<String> getSuggestions(String prefix);

void insertById(Long id);

void deleteById(Long id);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package cn.itcast.hotel.web;

import cn.itcast.hotel.constants.MqConsistants;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.security.InvalidParameterException;

@RestController
@RequestMapping("hotel")
public class HotelController {

@Autowired
private IHotelService hotelService;

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}

@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));

return new PageResult(result.getTotal(), result.getRecords());
}

@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId());
}

@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId());

}

@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_DELETE_KEY, id);

}
}

四 elasticsearch集群

1.搭建ES集群

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
  • 单点故障问题:将分片数据在不同节点备份( replica )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# dockerCompose
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9202:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local

networks:
elastic:
driver: bridge

es运行需要修改一些Linux系统权限,修改/etc/sysctl.config文件

1
vi /etc/sysctl.conf

添加以下内容

1
vm.max_map_count=262144

然后执行命令,让配置生效:

1
sysctl -p

2.集群状态监控

kibana可以监控es集群,不过新版本需要依赖esx-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面

输入你的elasticsearch的任意节点的地址和端口

绿色的条,代表集群处于绿色(健康状态)。

3.索引库分片

在DevTools中输入指令:

1
2
3
4
5
6
7
8
9
10
11
12
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
// mapping映射定义 ...
}
}
}

4.ES集群的节点角色

elasticsearch中集群节点有不同的职责划分:

节点类型 参数配置 默认值 节点职责
master eligible node.master true 备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求
data node.data true 数据节点:存储数据、搜索、聚合、CRUD
ingest node.ingest true 数据存储之前的预处理
coordinating 上面3个参数都为false则为coordinating节点 路由请求到其它节点合并其它节点处理的结果,返回给用户

ES集群的分布式查询

elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。
集群的分布式查询

ES集群的脑裂

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

为了避免脑裂,需要要求选票超过( eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

5.ES集群的分布式新增和查询

ES集群的分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

shard = hash(_routing)% number_of_shards

说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

ES集群的分布式查询

elasticsearch的查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

6.ES集群的故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
集群的故障转移