一 数据聚合
1. 聚合的种类
聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:
桶(Bucket)集合:用来对文档做分组
TermAggregation:按照文档字段值分组
Date Histogram:按照日期阶梯分组,例如一周一组,一月一组
度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
Avg:求平均值
Max:求最大值
Min:求最小值
Stats:同时求max、min、avg、sum等
管道( pipeline)聚合:其它聚合的结果为基础做聚合
2. DSL实现聚合
DSL实现Bucket聚合
现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
1 2 3 4 5 6 7 8 9 10 11 12
| GET /hotel/_search { "size": 0, # 设置size为0,结果中不包含文档,只包含聚合结果 "aggs": { # 定义聚合 "brandAgg": { # 聚合起个名字 "terms": { # 聚合的类型,按照品牌值聚合,所以选择term "field": "brand", # 参与聚合的字段 "size": 20 # 希望获取的聚合结果数量 } } } }
|
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| # 聚合功能,自定义排序规则 GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20, "order": { "_count": "asc" } } } } }
|
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| # 聚合功能,自定义排序规则 GET /hotel/_search { "query": { "range": { "price": { "lte": 200 } } }, "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20, "order": { "_count": "asc" } } } } }
|
DSL实现Metrics聚合
例如,我们要求获取每个品牌的用户评分的min、max、avg等值.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| # 聚合功能,自定义排序规则 GET /hotel/_search { "query": { "range": { "price": { "lte": 200 } } }, "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20, "order": { "scoreAgg.avg": "desc" } }, "aggs": { # 是brands聚合的子聚合,也就是分组后对每组分别计算 "scoreAgg": { # 聚合名称 "stats": { # 聚合类型,这里stats可以计算min、max、avg等 "field": "score" # 聚合字段,这里是score } } } } } }
|
3. RestAPI实现聚合
我们以品牌聚合为例,演示下Java的RestClient使用,先看请求组装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| package cn.itcast.hotel;
import cn.itcast.hotel.pojo.HotelDoc; import com.alibaba.fastjson.JSON; import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.util.CollectionUtils;
import java.io.IOException; import java.util.List; import java.util.Map;
public class HotelSearchTest { private RestHighLevelClient client; @Test void testAggregation() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().size(0); request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(20) ); SearchResponse response = client.search(request, RequestOptions.DEFAULT); System.out.println(response); Aggregations aggregations = response.getAggregations(); Terms brandTerms = aggregations.get("brandAgg"); List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); System.out.println(key); }
}
@BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.127.132:9200")
)); }
@AfterEach void tearDown() throws IOException { this.client.close(); } }
|
1. 案例 在IUserService中定义方法,实现对品牌、成熟、星级的聚合
需求: 搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List; import java.util.Map;
public interface IHotelService extends IService<Hotel> { PageResult search(RequestParams params);
Map<String, List<String>> filters(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
| package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.HotelDoc; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
@Service public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService { @Autowired private RestHighLevelClient client;
@Override public Map<String, List<String>> filters() { try { SearchRequest request = new SearchRequest("hotel"); request.source().size(0); buildAggregation(request); SearchResponse response = client.search(request, RequestOptions.DEFAULT); Map<String, List<String>> result = new HashMap<>(); Aggregations aggregations = response.getAggregations(); List<String> brandList = getAggByName(aggregations, "brandAgg"); result.put("品牌", brandList); List<String> cityList = getAggByName(aggregations, "cityAgg"); result.put("城市", cityList); List<String> starList = getAggByName(aggregations, "starAgg"); result.put("星级", starList); return result; } catch (IOException e) { throw new RuntimeException(); } }
private List<String> getAggByName(Aggregations aggregations, String aggName) { Terms brandTerms = aggregations.get("aggName"); List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); List<String> brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; }
private void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(100) ); request.source().aggregation(AggregationBuilders .terms("cityAgg") .field("city") .size(100) ); request.source().aggregation(AggregationBuilders .terms("starAgg") .field("starName") .size(100) ); }
private void buildBasicQuery(RequestParams params, SearchRequest request) { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); String key = params.getKey(); if (key == null || "".equals(key)){ boolQuery.must(QueryBuilders.matchAllQuery()); }else { boolQuery.must(QueryBuilders.matchQuery("all", key)); } if (params.getCity() != null && !params.getCity().equals("")){ boolQuery.filter(QueryBuilders.termQuery("city", params.getCity())); } if (params.getBrand() != null && !params.getBrand().equals("")){ boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand())); } if (params.getStarName() != null && !params.getStarName().equals("")){ boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName())); } if (params.getMinPrice() != null && params.getMaxPrice() != null){ boolQuery.filter(QueryBuilders .rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice())); } FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders.functionScoreQuery( boolQuery, new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
new FunctionScoreQueryBuilder.FilterFunctionBuilder( QueryBuilders.termQuery("AD", true), ScoreFunctionBuilders.weightFactorFunction(10) ) }); request.source().query(functionScoreQueryBuilder); }
private PageResult handleResponse(SearchHits searchHits) { long total = searchHits.getTotalHits().value; SearchHit[] hits = searchHits.getHits(); List<HotelDoc> hotels = new ArrayList<>(); for (SearchHit hit : hits) { String json = hit.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); Object[] sortValues = hit.getSortValues(); if (sortValues.length > 0){ Object sortValue = sortValues[0]; hotelDoc.setDistance(sortValue); } hotels.add(hotelDoc); } return new PageResult(total, hotels); } }
|
2.对接前端接口
前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:
可以知道请求参数与之前search时的RequestParam完全一致,这是在限定聚合时的文档范围。
例如:用户搜索“外滩”,价格在300~600,那聚合必须是在这个搜索条件基础上完成。
因此我们需要:
- 1.编写
controller接口,接收该请求
- 2.修改
IUserService#getFilters()方法,添加RequestParam参数
- 3.修改
getFilters方法的业务,聚合时添加query条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package cn.itcast.hotel.web;
import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.Map;
@RestController @RequestMapping("/hotel") public class HotelController {
@Autowired private IHotelService hotelService; @PostMapping("/list") public PageResult search(@RequestBody RequestParams params){ return hotelService.search(params);
}
@PostMapping("/filters") public Map<String, List<String>> getFilters(@RequestBody RequestParams params){ return hotelService.filters(params); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
| package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.HotelDoc; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
@Service public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService { @Autowired private RestHighLevelClient client;
@Override public Map<String, List<String>> filters(RequestParams params) { try { SearchRequest request = new SearchRequest("hotel"); buildBasicQuery(params, request); request.source().size(0); buildAggregation(request); SearchResponse response = client.search(request, RequestOptions.DEFAULT); Map<String, List<String>> result = new HashMap<>(); Aggregations aggregations = response.getAggregations(); List<String> brandList = getAggByName(aggregations, "brandAgg"); result.put("品牌", brandList); List<String> cityList = getAggByName(aggregations, "cityAgg"); result.put("城市", cityList); List<String> starList = getAggByName(aggregations, "starAgg"); result.put("星级", starList); return result; } catch (IOException e) { throw new RuntimeException(); } }
private List<String> getAggByName(Aggregations aggregations, String aggName) { Terms brandTerms = aggregations.get(aggName); List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); List<String> brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; }
private void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(100) ); request.source().aggregation(AggregationBuilders .terms("cityAgg") .field("city") .size(100) ); request.source().aggregation(AggregationBuilders .terms("starAgg") .field("starName") .size(100) ); }
private void buildBasicQuery(RequestParams params, SearchRequest request) { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); String key = params.getKey(); if (key == null || "".equals(key)){ boolQuery.must(QueryBuilders.matchAllQuery()); }else { boolQuery.must(QueryBuilders.matchQuery("all", key)); } if (params.getCity() != null && !params.getCity().equals("")){ boolQuery.filter(QueryBuilders.termQuery("city", params.getCity())); } if (params.getBrand() != null && !params.getBrand().equals("")){ boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand())); } if (params.getStarName() != null && !params.getStarName().equals("")){ boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName())); } if (params.getMinPrice() != null && params.getMaxPrice() != null){ boolQuery.filter(QueryBuilders .rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice())); } FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders.functionScoreQuery( boolQuery, new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
new FunctionScoreQueryBuilder.FilterFunctionBuilder( QueryBuilders.termQuery("AD", true), ScoreFunctionBuilders.weightFactorFunction(10) ) }); request.source().query(functionScoreQueryBuilder); }
private PageResult handleResponse(SearchHits searchHits) { long total = searchHits.getTotalHits().value; SearchHit[] hits = searchHits.getHits(); List<HotelDoc> hotels = new ArrayList<>(); for (SearchHit hit : hits) { String json = hit.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); Object[] sortValues = hit.getSortValues(); if (sortValues.length > 0){ Object sortValue = sortValues[0]; hotelDoc.setDistance(sortValue); } hotels.add(hotelDoc); } return new PageResult(total, hotels); } }
|
二 自动补全
1. 安装拼音分词器
要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。下载地址
安装方式与IK分词器一样,分三步:
- 1.解压
- 2.上传到虚拟机中,elasticsearch的plugin目录
- 3.重启elasticseasch
- 4.测试
1 2 3 4 5 6 7
| GET /_analyze { "text": ["如家酒店还不错"], "analyzer": "pinyin" }
|
2.配置拼音分词器
elasticsearch中分词器(analyzer)的组成包含三部分:
character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| GET /test/_analyze { "text": ["如家酒店还不错"], "analyzer": "my_analyzer" }
DELETE /test
# 自定义分词器 PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name": { "type": "text", "analyzer": "my_analyzer" } } } }
POST /test/_doc/1 { "id": 1, "name": "狮子" } POST /test/_doc/2 { "id": 2, "name": "虱子" } POST /test/_search { "query": { "match": { "name": "掉入狮子笼咋办" } } }
|
拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。

因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik smart分词器;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| DELETE /test
# 自定义分词器 PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name": { "type": "text", "analyzer": "my_analyzer", "search_analyzer": "ik_smart" } } } }
POST /test/_doc/1 { "id": 1, "name": "狮子" } POST /test/_doc/2 { "id": 2, "name": "虱子" } POST /test/_search { "query": { "match": { "name": "掉入狮子笼咋办" } } }
|
3. completion suggest 查询
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为提高补全查询的效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须是completion类型
- 字段的内容一般是用来不全的多个词条形成的数组
查询语法
查询语法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| # 自动补全库 PUT /test2 { "mappings": { "properties": { "title": { "type": "completion" } } } }
# 示例数据 POST /test2/_doc { "title": ["Sony", "WH-1000XM3"] } POST /test2/_doc { "title": ["SK-II", "PITERA"] } POST /test2/_doc { "title": ["Nintendo", "switch"] }
# 自动补全查询 GET /test2/_search { "suggest": { "titleSuggest": { "text": "s", # 关键字 "completion": { "field": "title", # 补全字段 "skip_duplicates": true, # 跳过重复的 "size": 10 # 获取前10条结果 } } } }
|
自动补全案例:实现hotel索引库的自动补全、拼音搜索功能
实现思路如下:
- 1.修改
hotel索引库结构,设置自定义拼音分词器
- 2.修改索引库的
name、all字段,使用自定义分词器
- 3.索引库添加一个新字段
suggestion,类型为completion类型,使用自定义的分词器
- 4.给
HotelDoc类添加suggestion字段,内容包含brand、business
- 5.重新导入数据到
hotel库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| DELETE /hotel
# 查看酒店数据结构 PUT /hotel { "settings": { "analysis": { "analyzer": { "text_analyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name": { "type": "text", "analyzer": "text_analyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address": { "type": "keyword", "index": false }, "brand": { "type": "keyword", "copy_to": [ "all" ] }, "business": { "type": "keyword", "copy_to": [ "all" ] }, "city": { "type": "keyword" }, "id": { "type": "keyword" }, "location": { "type": "geo_point" }, "pic": { "type": "keyword", "index": false }, "price": { "type": "integer" }, "score": { "type": "integer" }, "starName": { "type": "keyword" }, "suggestion": { "type": "completion", "analyzer": "completion_analyzer" } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package cn.itcast.hotel.pojo;
import lombok.Data; import lombok.NoArgsConstructor;
import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List;
@Data @NoArgsConstructor public class HotelDoc { private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String location; private String pic; private Object distance; private Boolean isAD; private List<String> suggestion;
public HotelDoc(Hotel hotel) { this.id = hotel.getId(); this.name = hotel.getName(); this.address = hotel.getAddress(); this.price = hotel.getPrice(); this.score = hotel.getScore(); this.brand = hotel.getBrand(); this.city = hotel.getCity(); this.starName = hotel.getStarName(); this.business = hotel.getBusiness(); this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); this.pic = hotel.getPic(); if (this.business.equals("/")){ String[] arr = this.business.split("/"); this.suggestion = new ArrayList<>(); this.suggestion.add(this.brand); Collections.addAll(this.suggestion, arr); }else { this.suggestion = Arrays.asList(this.brand, this.business); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| GET /hotel/_search { "suggest": { "suggestion": { "text": "sd", "completion": { "field": "suggestion", "skip_duplicates": true, "size": 10 } } } }
|
RestAPI实现自动补全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| package cn.itcast.hotel;
import cn.itcast.hotel.pojo.HotelDoc; import com.alibaba.fastjson.JSON; import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.util.CollectionUtils;
import java.io.IOException; import java.util.List; import java.util.Map;
public class HotelSearchTest { private RestHighLevelClient client;
@Test void testSuggest() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix("h") .skipDuplicates(true) .size(10) )); SearchResponse response = client.search(request, RequestOptions.DEFAULT); Suggest suggest = response.getSuggest(); CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); for (CompletionSuggestion.Entry.Option option : options) { String text = option.getText().toString(); System.out.println(text); } }
@BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.127.132:9200")
)); }
@AfterEach void tearDown() throws IOException { this.client.close(); } }
|
三 数据同步
1. 数据同步问题分析
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。
在微服务中,负责酒店管理(操作mysql )的业务与负责酒店搜索(操作elasticsearch )的业务可能在两个不同的微服务上,数据同步该如何实现呢?
方案一 同步调用

方案二 异步通知

方案三 监听binlog

2. 案例:利用MQ实现mysql与elasticsearch数据同步
当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
步骤:
- 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的
CRUD。
- 声明
exchange、queue、RoutingKey
- 在hotel-admin中的增、删、改业务中完成消息发送
- 在hotel-demo中完成消息监听,并更新
elasticsearch中数据·
- 启动并测试数据同步功能
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| server: port: 8099 spring: datasource: url: jdbc:mysql://localhost:3306/heima?useSSL=false username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver rabbitmq: host: 192.168.127.132 port: 5672 username: itcast password: 123321 virtual-host: / logging: level: cn.itcast: debug pattern: dateformat: MM-dd HH:mm:ss:SSS mybatis-plus: configuration: map-underscore-to-camel-case: true type-aliases-package: cn.itcast.hotel.pojo
|
定义MQ的常量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package cn.itcast.hotel;
public class MqConsistants {
private final static String HOTEL_EXCHANGE = "hotel.topic";
private final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
private final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
private final static String HOTEL_INSERT_KEY = "hotel.insert";
private final static String HOTEL_DELETE_KEY = "hotel.delete";
}
|
进行队列相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package cn.itcast.hotel.config;
import cn.itcast.hotel.MqConsistants; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MqConfig { @Bean public TopicExchange topicExchange(){ return new TopicExchange(MqConsistants.HOTEL_EXCHANGE, true, false); } @Bean public Queue insertQueue() { return new Queue(MqConsistants.HOTEL_INSERT_QUEUE, true); } @Bean public Queue deleteQueue() { return new Queue(MqConsistants.HOTEL_DELETE_QUEUE, true); } @Bean public Binding insertQueueBinding(){ return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConsistants.HOTEL_INSERT_KEY); } @Bean public Binding deleteQueueBinding(){ return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConsistants.HOTEL_DELETE_KEY); } }
|
控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package cn.itcast.hotel.web;
import cn.itcast.hotel.constants.MqConsistants; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.service.IHotelService; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;
import java.security.InvalidParameterException;
@RestController @RequestMapping("hotel") public class HotelController {
@Autowired private IHotelService hotelService;
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}") public Hotel queryById(@PathVariable("id") Long id){ return hotelService.getById(id); }
@GetMapping("/list") public PageResult hotelList( @RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size ){ Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords()); }
@PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId()); }
@PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId());
}
@DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_DELETE_KEY, id);
} }
|
监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package cn.itcast.hotel.mq;
import cn.itcast.hotel.constants.MqConsistants; import cn.itcast.hotel.service.IHotelService; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class HotelListener { @Autowired private IHotelService hotelService;
@RabbitListener(queues = MqConsistants.HOTEL_INSERT_QUEUE) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); }
@RabbitListener(queues = MqConsistants.HOTEL_DELETE_QUEUE) public void listenHotelDelete(Long id){ hotelService.deleteById(id); } }
|
消费者端三层结构处理(首先需要配置AMQP)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List; import java.util.Map;
public interface IHotelService extends IService<Hotel> { PageResult search(RequestParams params);
Map<String, List<String>> filters(RequestParams params);
List<String> getSuggestions(String prefix);
void insertById(Long id);
void deleteById(Long id); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package cn.itcast.hotel.web;
import cn.itcast.hotel.constants.MqConsistants; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.service.IHotelService; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;
import java.security.InvalidParameterException;
@RestController @RequestMapping("hotel") public class HotelController {
@Autowired private IHotelService hotelService;
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}") public Hotel queryById(@PathVariable("id") Long id){ return hotelService.getById(id); }
@GetMapping("/list") public PageResult hotelList( @RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size ){ Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords()); }
@PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId()); }
@PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_INSERT_KEY, hotel.getId());
}
@DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConsistants.HOTEL_EXCHANGE, MqConsistants.HOTEL_DELETE_KEY, id);
} }
|
四 elasticsearch集群
1.搭建ES集群
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
- 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
- 单点故障问题:将分片数据在不同节点备份( replica )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| version: '2.2' services: es01: image: elasticsearch:7.12.1 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data01:/usr/share/elasticsearch/data ports: - 9200:9200 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local
networks: elastic: driver: bridge
|
es运行需要修改一些Linux系统权限,修改/etc/sysctl.config文件
添加以下内容
然后执行命令,让配置生效:
2.集群状态监控
kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。
这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro
双击其中的cerebro.bat文件即可启动服务。
访问http://localhost:9000 即可进入管理界面
输入你的elasticsearch的任意节点的地址和端口
绿色的条,代表集群处于绿色(健康状态)。
3.索引库分片
在DevTools中输入指令:
1 2 3 4 5 6 7 8 9 10 11 12
| PUT /itcast { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { } } }
|
4.ES集群的节点角色
elasticsearch中集群节点有不同的职责划分:
| 节点类型 |
参数配置 |
默认值 |
节点职责 |
master eligible |
node.master |
true |
备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求 |
data |
node.data |
true |
数据节点:存储数据、搜索、聚合、CRUD |
ingest |
node.ingest |
true |
数据存储之前的预处理 |
coordinating |
上面3个参数都为false则为coordinating节点 |
无 |
路由请求到其它节点合并其它节点处理的结果,返回给用户 |
ES集群的分布式查询
elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。

ES集群的脑裂
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
为了避免脑裂,需要要求选票超过( eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
5.ES集群的分布式新增和查询
ES集群的分布式存储
当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
shard = hash(_routing)% number_of_shards
说明:
_routing默认是文档的id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
ES集群的分布式查询
elasticsearch的查询分成两个阶段:
scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
6.ES集群的故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
