123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 |
- package com.bowintek.practice.services.impl;
- import co.elastic.clients.elasticsearch.ElasticsearchClient;
- import co.elastic.clients.elasticsearch._types.FieldValue;
- import co.elastic.clients.elasticsearch._types.SortOptions;
- import co.elastic.clients.elasticsearch._types.SortOrder;
- import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
- import co.elastic.clients.elasticsearch._types.aggregations.Buckets;
- import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
- import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
- import co.elastic.clients.elasticsearch._types.mapping.Property;
- import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
- import co.elastic.clients.elasticsearch._types.query_dsl.*;
- import co.elastic.clients.elasticsearch.cat.IndicesResponse;
- import co.elastic.clients.elasticsearch.core.SearchRequest;
- import co.elastic.clients.elasticsearch.core.SearchResponse;
- import co.elastic.clients.elasticsearch.core.search.Hit;
- import co.elastic.clients.elasticsearch.indices.PutMappingRequest;
- import co.elastic.clients.elasticsearch.indices.PutMappingResponse;
- import co.elastic.clients.json.JsonData;
- import com.bowintek.practice.mapper.cquery.EsIndexCquery;
- import com.bowintek.practice.services.service.AnalyzeService;
- import com.bowintek.practice.services.service.EsQueryService;
- import com.bowintek.practice.util.Constant;
- import com.bowintek.practice.util.StringUtils;
- import com.bowintek.practice.vo.Analyze.ComparisonResult;
- import com.bowintek.practice.vo.Analyze.EsQueryText;
- import com.bowintek.practice.vo.EsIndexVo;
- import com.bowintek.practice.vo.EsIndexfieldVo;
- import com.bowintek.practice.vo.EsQueryLogVo;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.core.type.TypeReference;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.fasterxml.jackson.databind.node.ObjectNode;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.query.SortQuery;
- import org.springframework.stereotype.Component;
- import javax.naming.directory.SearchResult;
- import java.io.IOException;
- import java.util.*;
- import java.util.stream.Collectors;
- @Component
- @Slf4j
- public class EsQueryServiceImpl implements EsQueryService {
- @Autowired
- private ElasticsearchClient esClient;
- @Autowired
- private AnalyzeService analyzeService;
- @Autowired
- private EsIndexCquery esIndexCquery;
- private static Object lockObject = new Object();
- private static List<String> indexCache = null;
- private static List<String> pathCache = null;
- private static long cacheTime = 0;
- public void getCacheList() {
- synchronized (lockObject) {
- //从本地缓存加载数据
- long timeSpan = (new Date()).getTime() - cacheTime;
- if (timeSpan > 30 * 1000 || indexCache == null || pathCache == null) {
- List<EsIndexVo> list1 = esIndexCquery.getList(null, null, null);
- indexCache = list1.stream()
- .map(m -> m.getIndexCode())
- .distinct().collect(Collectors.toList());
- List<EsIndexfieldVo> list2 = esIndexCquery.getFieldList(null, null);
- pathCache = list2.stream().filter(p -> p.getDataType().equals("NESTED"))
- .map(m -> m.getFieldCode())
- .distinct().collect(Collectors.toList());
- cacheTime = (new Date()).getTime();
- }
- }
- }
- public List<String> getIndexCache() {
- getCacheList();
- return indexCache;
- }
- public List<String> getPathCache() {
- getCacheList();
- return pathCache;
- }
- public List<Query> getComparisonQueryList(EsQueryText queryText) {
- List<Query> queryList = new ArrayList<>();
- //分析查询字符串,有的字符串需要变成条件查询
- List<ComparisonResult> cmpList = analyzeService.analyzeJavas(queryText.getKeyString());
- //二级查询的路径信息
- List<String> pathList = getPathCache();
- for (int i = 0; i < cmpList.size(); i++) {
- ComparisonResult cmp = cmpList.get(i);
- //检查查询、对比的值是否为空
- if (StringUtils.IsNullEmpty(cmp.getKeyString())) continue;
- if (cmp.getSearchType().equals("comparison")) { //对比查询,有字段,有对比符号,有值
- //第一级,对比搜索
- queryList.addAll(getRangeQueryByComparison(cmp));
- //第二级,对比搜索
- for (String path : pathList) {
- queryList.addAll(getNestedRangeQueryByComparison(cmp, path));
- }
- } else if (StringUtils.IsNullEmpty(queryText.getField())) {//全文查询,没有限制字段
- //第一级,文字搜索
- queryList.add(getMultiMatchQuery(cmp.getKeyString()));
- //第二级,嵌套类型,文字搜索
- for (String path : pathList) {
- queryList.add(getNestedMultiMatchQuery(path, cmp.getKeyString()));
- }
- } else { //全文查询,限定了字段
- //第一级,文字搜索
- queryList.add(getMultiMatchQuery(new String[]{queryText.getField()}, cmp.getKeyString()));
- //第二级,嵌套类型,文字搜索
- for (String path : pathList) {
- queryList.add(getNestedMultiMatchQuery(path, new String[]{queryText.getField()}, cmp.getKeyString()));
- }
- }
- }
- return queryList;
- }
- @Override
- public Map<String, Object> query(List<EsQueryText> queryList, List<ComparisonResult> limiters,
- int page, int limit, String orderType, String orderBy) {
- //[1]需要返回的结果map
- Map<String, Object> result = new HashMap<>();
- result.put("total", 0);
- try {
- //二级查询的路径信息
- List<String> pathList = getPathCache();
- List<Query> queryMustList = new ArrayList<>();
- BoolQuery.Builder boolQuery = new BoolQuery.Builder();
- //[2]分析查询字符串,有的字符串需要变成条件查询
- for (EsQueryText queryText : queryList) {
- List<Query> shouldQuerys = getComparisonQueryList(queryText);
- if (shouldQuerys.size() > 0) {
- queryMustList.addAll(shouldQuerys);
- }
- }
- //[3]限定查询条件,都是有条件的查询
- for (ComparisonResult cmp : limiters) {
- //检查查询、对比的值是否为空
- if (StringUtils.IsNullEmpty(cmp.getValue())) continue;
- //第一级,对比搜索
- if (Constant.AND.equals(cmp.getRelation())) {
- boolQuery.must(getRangeQueryByComparison(cmp));
- } else if (Constant.OR.equals(cmp.getRelation())) {
- boolQuery.should(getRangeQueryByComparison(cmp));
- } else if (Constant.NOT.equals(cmp.getRelation())) {
- boolQuery.mustNot(getRangeQueryByComparison(cmp));
- } else {
- boolQuery.must(getRangeQueryByComparison(cmp));
- }
- }
- //[4]建立查询参数分析
- SearchRequest.Builder searchRequest = new SearchRequest.Builder();
- //==>要查询的索引列表,从数据库配置表中获取
- searchRequest.index(getIndexCache());
- //==>数据分页显示
- searchRequest.size(limit);
- searchRequest.from(page * limit);
- //==>设置查询条件
- if (queryMustList.size() > 0) {
- boolQuery.should(queryMustList);
- }
- Query queryMust = Query.of(q -> q.bool(boolQuery.build()));
- //排序
- if (orderType != null && !StringUtils.IsNullEmpty(orderBy)) {
- SortOptions.Builder sb = new SortOptions.Builder();
- sb.field(f -> f.field(orderBy).order(orderType.equals("asc") ? SortOrder.Asc : SortOrder.Desc));
- searchRequest.query(queryMust).sort(sb.build());
- }else{
- searchRequest.query(queryMust);
- }
- //==>高亮设置
- searchRequest.highlight(h -> h.fields("*", f -> f.preTags("<font color='red'>")
- .postTags("</font>")));
- //查询后分组
- searchRequest.aggregations("group_well", a -> a.terms(t -> t.field("well_id")));
- //[5]Es发起查询
- SearchRequest request = searchRequest.build();
- log.info("dsl:" + request.toString());
- SearchResponse<ObjectNode> response = esClient.search(request, ObjectNode.class);
- //[6]转换结果,可以对不同的index做出参数输出
- List<Map<String, Object>> rows = searchResponse2List(response);
- result.put("rows", rows);
- result.put("total", response.hits().total().value());
- result.put("agg", aggResponse2List(response));
- //请求参数输出,方便调试
- String[] jsonStrings = request.toString().split("typed_keys=true");
- result.put("SearchUrl", jsonStrings[0]);
- result.put("SearchRequest", stringToNodeJson(jsonStrings[1]));
- System.out.println(response.hits().total() + " " + request.toString());
- } catch (Exception ex) {
- result.put("Message", ex.getMessage());
- result.put("StackTrace", ex.getStackTrace());
- }
- return result;
- }
- public Query getMultiMatchQuery(String text) {
- //不指定,查询所有的一级字段
- return MultiMatchQuery.of(q -> q.query(text)
- .operator(Operator.Or))._toQuery();
- }
- public Query getMultiMatchQuery(List<String> fields, String text) {
- //限定查询字段
- return MultiMatchQuery.of(q -> q.fields(fields)
- .query(text)
- .operator(Operator.Or))._toQuery();
- }
- public Query getMultiMatchQuery(String[] fields, String text) {
- //限定查询字段
- return getMultiMatchQuery(Arrays.asList(fields), text);
- }
- public Query getNestedMultiMatchQuery(String path, String text) {
- //不指定,查询所有的二级字段
- return NestedQuery.of(q -> q.path(path)
- .query(getMultiMatchQuery(text)).innerHits(i -> i.highlight(hl -> hl
- .fields("*", ff -> ff
- .preTags("<font color='red'>")
- .postTags("</font>")
- )
- ))
- .ignoreUnmapped(true))._toQuery();
- }
- public Query getNestedMultiMatchQuery(String path, List<String> fields, String text) {
- //限定查询字段
- return NestedQuery.of(q -> q.path(path)
- .query(getMultiMatchQuery(fields, text))
- .ignoreUnmapped(true))._toQuery();
- }
- public Query getNestedMultiMatchQuery(String path, String[] fields, String text) {
- //限定查询字段
- return NestedQuery.of(q -> q.path(path)
- .query(getMultiMatchQuery(fields, text))
- .ignoreUnmapped(true))._toQuery();
- }
- public List<Query> getRangeQueryByComparison(ComparisonResult cmp) {
- List<Query> queryList = new ArrayList<>();
- String fieldString = cmp.getFields();
- //对比类型查询
- Query query = getComparisonQuery(fieldString, cmp.getOpreation(), cmp.getValue());
- if (query != null) queryList.add(query);
- return queryList;
- }
- public List<Query> getNestedRangeQueryByComparison(ComparisonResult cmp, String path) {
- List<Query> queryList = new ArrayList<>();
- String fieldString = cmp.getFields();
- //对比类型查询
- Query query = getComparisonQuery(path + "." + fieldString, cmp.getOpreation(), cmp.getValue());
- if (query != null) {
- Query nested = NestedQuery.of(q -> q.path(path).query(query)
- .ignoreUnmapped(true))._toQuery();
- queryList.add(nested);
- }
- return queryList;
- }
- public Query getComparisonQuery(String fieldString, String opreation, String value) {
- //对比类型查询
- Query query = null;
- if (opreation.equals("大于"))
- query = RangeQuery.of(q -> q.field(fieldString).gt(JsonData.of(value)))._toQuery();
- else if (opreation.equals("大于等于"))
- query = RangeQuery.of(q -> q.field(fieldString).gte(JsonData.of(value)))._toQuery();
- else if (opreation.equals("小于"))
- query = RangeQuery.of(q -> q.field(fieldString).lt(JsonData.of(value)))._toQuery();
- else if (opreation.equals("小于等于"))
- query = RangeQuery.of(q -> q.field(fieldString).lte(JsonData.of(value)))._toQuery();
- else if (opreation.equals("等于"))
- query = MatchQuery.of(q -> q.field(fieldString).query(value).minimumShouldMatch("80%"))._toQuery();
- else if (opreation.equals("包括IN")) {
- String[] ins = value.split(",");
- List<FieldValue> fls = new ArrayList<>();
- for (String inString : ins) fls.add(FieldValue.of(inString));
- query = TermsQuery.of(q -> q.field(fieldString).terms(s -> s.value(fls)))._toQuery();
- }
- return query;
- }
- public Map<String, Object> queryTest(String text, int page, int limit) {
- Map<String, Object> result = new HashMap<>();
- result.put("total", 0);
- try {
- IndicesResponse indicesResponse = esClient.cat().indices();
- indicesResponse.valueBody().forEach(i -> {
- System.out.println("get all index, health: " + i.health() + ", status: " + i.status() + ", index: " + i.index());
- });
- //建立查询参数分析
- SearchRequest.Builder searchRequest = new SearchRequest.Builder();
- //要查询的索引列表
- String[] indexs = new String[]{"dws_basic_info_history", "dws_dm_test_history", "fact_dwr_well_basic_information"};
- searchRequest.index(Arrays.asList(indexs));
- //数据分页显示
- searchRequest.size(limit);
- searchRequest.from(page * limit);
- //生成查询参数
- if (!StringUtils.IsNullEmpty(text)) {
- //非嵌套字段查询
- String[] fields = new String[]{"well_common_name", "testing_name"};
- Query query1 = MultiMatchQuery.of(q -> q.fields(Arrays.asList(fields)).query(text).operator(Operator.Or))._toQuery();
- //嵌套字段查询
- String[] nestedFields = new String[]{"historys.testing_name"};
- Query nestedQuery = MultiMatchQuery.of(q -> q.fields(Arrays.asList(nestedFields)).query(text).operator(Operator.Or))._toQuery();
- Query query3 = NestedQuery.of(q -> q.path("historys").query(nestedQuery).ignoreUnmapped(true))._toQuery();
- //对比类型查询
- Query query2 = RangeQuery.of(q -> q.field("authorized_md").gte(JsonData.of(500)))._toQuery();
- Query[] arys = new Query[]{query1, query2, query3};
- searchRequest.query(q -> q.bool(b -> b.should(Arrays.asList(arys))));
- //高亮设置
- searchRequest.highlight(h -> h.fields("historys.testing_name", f -> f.matchedFields("historys.testing_name")));
- }
- //Es发起查询
- SearchRequest request = searchRequest.build();
- SearchResponse response = esClient.search(request, ObjectNode.class);
- //转换结果,可以对不同的index做出参数输出
- List<Map<String, Object>> rows = searchResponse2List(response);
- result.put("rows", rows);
- result.put("total", response.hits().total().value());
- System.out.println(response.hits().total() + " " + request.toString());
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- return result;
- }
- public void updateMappingTest() throws IOException {
- Map<String, Property> maps = new HashMap<>();
- maps.put("well_common_name", Property.of(p -> p.text(TextProperty.of(t -> t.index(true).analyzer("ik_max_word")))));
- updateMappings("fact_dwr_well_basic_information", maps);
- }
- public void updateMappings(String index, Map<String, Property> maps) throws IOException {
- PutMappingRequest putMappingRequest = PutMappingRequest.of(m -> m.index(index).properties(maps));
- PutMappingResponse putMappingResponse = esClient.indices().putMapping(putMappingRequest);
- boolean acknowledged = putMappingResponse.acknowledged();
- System.out.println("update mappings ack: " + acknowledged);
- }
- public JsonNode stringToNodeJson(String jsonString) throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- JsonNode rootNode = objectMapper.readTree(jsonString);
- return rootNode;
- }
- public List<Map<String, Object>> aggResponse2List(SearchResponse<ObjectNode> searchResponse) {
- List<StringTermsBucket> buckets = searchResponse.aggregations().get("group_well").sterms().buckets().array();
- List<Map<String, Object>> aggs = new ArrayList<>();
- for (StringTermsBucket bucket : buckets) {
- Map<String, Object> map = new HashMap<>();
- map.put("key", bucket.key().stringValue());
- map.put("doc_count", bucket.docCount());
- aggs.add(map);
- }
- return aggs;
- }
- public List<Map<String, Object>> searchResponse2List(SearchResponse<ObjectNode> searchResponse) {
- if (searchResponse == null) {
- return new ArrayList<>(0);
- }
- if (searchResponse.hits() == null) {
- return new ArrayList<>(0);
- }
- //if (CommonUtils.isCollectionEmpty(searchResponse.hits().hits())) {return new ArrayList<>(0);}
- List<Hit<ObjectNode>> hits = searchResponse.hits().hits();
- List<Map<String, Object>> list = new ArrayList<>(hits.size());
- for (Hit<ObjectNode> hit : hits) {
- ObjectNode node = hit.source();
- Map<String, Object> map = objectNode2Map(hit.index(), node);
- map.put("index", hit.index());
- map.put("highlight", hit.highlight());
- map.put("innerHits", hit.innerHits());
- list.add(map);
- //输出结果日志
- String line = "=>";
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- line += " " + entry.getKey()
- + ":" + (entry.getValue() == null ? "null" : entry.getValue().toString());
- }
- System.out.println(line);
- }
- return list;
- }
- public String subFieldIndex(String index, String fieldName) {
- if (fieldName.startsWith(index) && fieldName.length() > index.length() + 1)
- return fieldName.substring(index.length() + 1);
- return fieldName;
- }
- public Map<String, Object> objectNode2Map(String index, ObjectNode objectNode) {
- if (null == objectNode) {
- return new HashMap<>(0);
- }
- if (objectNode.isEmpty()) {
- return new HashMap<>(0);
- }
- ObjectMapper objectMapper = new ObjectMapper();
- Map<String, Object> map = objectMapper.convertValue(objectNode, new TypeReference<Map<String, Object>>() {
- });
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- String name = subFieldIndex(index, entry.getKey());
- if (name.equals(entry.getKey())) continue;
- map.remove(entry.getKey());
- map.put(name, entry.getValue());
- }
- return map;
- }
- }
|