package com.bowintek.practice.services.impl; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.FieldValue; import co.elastic.clients.elasticsearch._types.SortOptions; import co.elastic.clients.elasticsearch._types.SortOrder; import co.elastic.clients.elasticsearch._types.aggregations.Aggregate; import co.elastic.clients.elasticsearch._types.aggregations.Buckets; import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate; import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch._types.mapping.TextProperty; import co.elastic.clients.elasticsearch._types.query_dsl.*; import co.elastic.clients.elasticsearch.cat.IndicesResponse; import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.elasticsearch.core.SearchResponse; import co.elastic.clients.elasticsearch.core.search.Hit; import co.elastic.clients.elasticsearch.indices.PutMappingRequest; import co.elastic.clients.elasticsearch.indices.PutMappingResponse; import co.elastic.clients.json.JsonData; import com.bowintek.practice.mapper.cquery.EsIndexCquery; import com.bowintek.practice.services.service.AnalyzeService; import com.bowintek.practice.services.service.EsQueryService; import com.bowintek.practice.util.Constant; import com.bowintek.practice.util.StringUtils; import com.bowintek.practice.vo.Analyze.ComparisonResult; import com.bowintek.practice.vo.Analyze.EsQueryText; import com.bowintek.practice.vo.EsIndexVo; import com.bowintek.practice.vo.EsIndexfieldVo; import com.bowintek.practice.vo.EsQueryLogVo; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.query.SortQuery; import org.springframework.stereotype.Component; import javax.naming.directory.SearchResult; import java.io.IOException; import java.util.*; import java.util.stream.Collectors; @Component @Slf4j public class EsQueryServiceImpl implements EsQueryService { @Autowired private ElasticsearchClient esClient; @Autowired private AnalyzeService analyzeService; @Autowired private EsIndexCquery esIndexCquery; private static Object lockObject = new Object(); private static List indexCache = null; private static List pathCache = null; private static long cacheTime = 0; public void getCacheList() { synchronized (lockObject) { //从本地缓存加载数据 long timeSpan = (new Date()).getTime() - cacheTime; if (timeSpan > 30 * 1000 || indexCache == null || pathCache == null) { List list1 = esIndexCquery.getList(null, null, null); indexCache = list1.stream() .map(m -> m.getIndexCode()) .distinct().collect(Collectors.toList()); List list2 = esIndexCquery.getFieldList(null, null); pathCache = list2.stream().filter(p -> p.getDataType().equals("NESTED")) .map(m -> m.getFieldCode()) .distinct().collect(Collectors.toList()); cacheTime = (new Date()).getTime(); } } } public List getIndexCache() { getCacheList(); return indexCache; } public List getPathCache() { getCacheList(); return pathCache; } public List getComparisonQueryList(EsQueryText queryText) { List queryList = new ArrayList<>(); //分析查询字符串,有的字符串需要变成条件查询 List cmpList = analyzeService.analyzeJavas(queryText.getKeyString()); //二级查询的路径信息 List pathList = getPathCache(); for (int i = 0; i < cmpList.size(); i++) { ComparisonResult cmp = cmpList.get(i); //检查查询、对比的值是否为空 if (StringUtils.IsNullEmpty(cmp.getKeyString())) continue; if (cmp.getSearchType().equals("comparison")) { //对比查询,有字段,有对比符号,有值 //第一级,对比搜索 queryList.addAll(getRangeQueryByComparison(cmp)); //第二级,对比搜索 for (String path : pathList) { queryList.addAll(getNestedRangeQueryByComparison(cmp, path)); } } else if (StringUtils.IsNullEmpty(queryText.getField())) {//全文查询,没有限制字段 //第一级,文字搜索 queryList.add(getMultiMatchQuery(cmp.getKeyString())); //第二级,嵌套类型,文字搜索 for (String path : pathList) { queryList.add(getNestedMultiMatchQuery(path, cmp.getKeyString())); } } else { //全文查询,限定了字段 //第一级,文字搜索 queryList.add(getMultiMatchQuery(new String[]{queryText.getField()}, cmp.getKeyString())); //第二级,嵌套类型,文字搜索 for (String path : pathList) { queryList.add(getNestedMultiMatchQuery(path, new String[]{queryText.getField()}, cmp.getKeyString())); } } } return queryList; } @Override public Map query(List queryList, List limiters, int page, int limit, String orderType, String orderBy) { //[1]需要返回的结果map Map result = new HashMap<>(); result.put("total", 0); try { //二级查询的路径信息 List pathList = getPathCache(); List queryMustList = new ArrayList<>(); BoolQuery.Builder boolQuery = new BoolQuery.Builder(); //[2]分析查询字符串,有的字符串需要变成条件查询 for (EsQueryText queryText : queryList) { List shouldQuerys = getComparisonQueryList(queryText); if (shouldQuerys.size() > 0) { queryMustList.addAll(shouldQuerys); } } //[3]限定查询条件,都是有条件的查询 for (ComparisonResult cmp : limiters) { //检查查询、对比的值是否为空 if (StringUtils.IsNullEmpty(cmp.getValue())) continue; //第一级,对比搜索 if (Constant.AND.equals(cmp.getRelation())) { boolQuery.must(getRangeQueryByComparison(cmp)); } else if (Constant.OR.equals(cmp.getRelation())) { boolQuery.should(getRangeQueryByComparison(cmp)); } else if (Constant.NOT.equals(cmp.getRelation())) { boolQuery.mustNot(getRangeQueryByComparison(cmp)); } else { boolQuery.must(getRangeQueryByComparison(cmp)); } } //[4]建立查询参数分析 SearchRequest.Builder searchRequest = new SearchRequest.Builder(); //==>要查询的索引列表,从数据库配置表中获取 searchRequest.index(getIndexCache()); //==>数据分页显示 searchRequest.size(limit); searchRequest.from(page * limit); //==>设置查询条件 if (queryMustList.size() > 0) { boolQuery.should(queryMustList); } Query queryMust = Query.of(q -> q.bool(boolQuery.build())); //排序 if (orderType != null && !StringUtils.IsNullEmpty(orderBy)) { SortOptions.Builder sb = new SortOptions.Builder(); sb.field(f -> f.field(orderBy).order(orderType.equals("asc") ? SortOrder.Asc : SortOrder.Desc)); searchRequest.query(queryMust).sort(sb.build()); }else{ searchRequest.query(queryMust); } //==>高亮设置 searchRequest.highlight(h -> h.fields("*", f -> f.preTags("") .postTags(""))); //查询后分组 searchRequest.aggregations("group_well", a -> a.terms(t -> t.field("well_id"))); //[5]Es发起查询 SearchRequest request = searchRequest.build(); log.info("dsl:" + request.toString()); SearchResponse response = esClient.search(request, ObjectNode.class); //[6]转换结果,可以对不同的index做出参数输出 List> rows = searchResponse2List(response); result.put("rows", rows); result.put("total", response.hits().total().value()); result.put("agg", aggResponse2List(response)); //请求参数输出,方便调试 String[] jsonStrings = request.toString().split("typed_keys=true"); result.put("SearchUrl", jsonStrings[0]); result.put("SearchRequest", stringToNodeJson(jsonStrings[1])); System.out.println(response.hits().total() + " " + request.toString()); } catch (Exception ex) { result.put("Message", ex.getMessage()); result.put("StackTrace", ex.getStackTrace()); } return result; } public Query getMultiMatchQuery(String text) { //不指定,查询所有的一级字段 return MultiMatchQuery.of(q -> q.query(text) .operator(Operator.Or))._toQuery(); } public Query getMultiMatchQuery(List fields, String text) { //限定查询字段 return MultiMatchQuery.of(q -> q.fields(fields) .query(text) .operator(Operator.Or))._toQuery(); } public Query getMultiMatchQuery(String[] fields, String text) { //限定查询字段 return getMultiMatchQuery(Arrays.asList(fields), text); } public Query getNestedMultiMatchQuery(String path, String text) { //不指定,查询所有的二级字段 return NestedQuery.of(q -> q.path(path) .query(getMultiMatchQuery(text)).innerHits(i -> i.highlight(hl -> hl .fields("*", ff -> ff .preTags("") .postTags("") ) )) .ignoreUnmapped(true))._toQuery(); } public Query getNestedMultiMatchQuery(String path, List fields, String text) { //限定查询字段 return NestedQuery.of(q -> q.path(path) .query(getMultiMatchQuery(fields, text)) .ignoreUnmapped(true))._toQuery(); } public Query getNestedMultiMatchQuery(String path, String[] fields, String text) { //限定查询字段 return NestedQuery.of(q -> q.path(path) .query(getMultiMatchQuery(fields, text)) .ignoreUnmapped(true))._toQuery(); } public List getRangeQueryByComparison(ComparisonResult cmp) { List queryList = new ArrayList<>(); String fieldString = cmp.getFields(); //对比类型查询 Query query = getComparisonQuery(fieldString, cmp.getOpreation(), cmp.getValue()); if (query != null) queryList.add(query); return queryList; } public List getNestedRangeQueryByComparison(ComparisonResult cmp, String path) { List queryList = new ArrayList<>(); String fieldString = cmp.getFields(); //对比类型查询 Query query = getComparisonQuery(path + "." + fieldString, cmp.getOpreation(), cmp.getValue()); if (query != null) { Query nested = NestedQuery.of(q -> q.path(path).query(query) .ignoreUnmapped(true))._toQuery(); queryList.add(nested); } return queryList; } public Query getComparisonQuery(String fieldString, String opreation, String value) { //对比类型查询 Query query = null; if (opreation.equals("大于")) query = RangeQuery.of(q -> q.field(fieldString).gt(JsonData.of(value)))._toQuery(); else if (opreation.equals("大于等于")) query = RangeQuery.of(q -> q.field(fieldString).gte(JsonData.of(value)))._toQuery(); else if (opreation.equals("小于")) query = RangeQuery.of(q -> q.field(fieldString).lt(JsonData.of(value)))._toQuery(); else if (opreation.equals("小于等于")) query = RangeQuery.of(q -> q.field(fieldString).lte(JsonData.of(value)))._toQuery(); else if (opreation.equals("等于")) query = MatchQuery.of(q -> q.field(fieldString).query(value).minimumShouldMatch("80%"))._toQuery(); else if (opreation.equals("包括IN")) { String[] ins = value.split(","); List fls = new ArrayList<>(); for (String inString : ins) fls.add(FieldValue.of(inString)); query = TermsQuery.of(q -> q.field(fieldString).terms(s -> s.value(fls)))._toQuery(); } return query; } public Map queryTest(String text, int page, int limit) { Map result = new HashMap<>(); result.put("total", 0); try { IndicesResponse indicesResponse = esClient.cat().indices(); indicesResponse.valueBody().forEach(i -> { System.out.println("get all index, health: " + i.health() + ", status: " + i.status() + ", index: " + i.index()); }); //建立查询参数分析 SearchRequest.Builder searchRequest = new SearchRequest.Builder(); //要查询的索引列表 String[] indexs = new String[]{"dws_basic_info_history", "dws_dm_test_history", "fact_dwr_well_basic_information"}; searchRequest.index(Arrays.asList(indexs)); //数据分页显示 searchRequest.size(limit); searchRequest.from(page * limit); //生成查询参数 if (!StringUtils.IsNullEmpty(text)) { //非嵌套字段查询 String[] fields = new String[]{"well_common_name", "testing_name"}; Query query1 = MultiMatchQuery.of(q -> q.fields(Arrays.asList(fields)).query(text).operator(Operator.Or))._toQuery(); //嵌套字段查询 String[] nestedFields = new String[]{"historys.testing_name"}; Query nestedQuery = MultiMatchQuery.of(q -> q.fields(Arrays.asList(nestedFields)).query(text).operator(Operator.Or))._toQuery(); Query query3 = NestedQuery.of(q -> q.path("historys").query(nestedQuery).ignoreUnmapped(true))._toQuery(); //对比类型查询 Query query2 = RangeQuery.of(q -> q.field("authorized_md").gte(JsonData.of(500)))._toQuery(); Query[] arys = new Query[]{query1, query2, query3}; searchRequest.query(q -> q.bool(b -> b.should(Arrays.asList(arys)))); //高亮设置 searchRequest.highlight(h -> h.fields("historys.testing_name", f -> f.matchedFields("historys.testing_name"))); } //Es发起查询 SearchRequest request = searchRequest.build(); SearchResponse response = esClient.search(request, ObjectNode.class); //转换结果,可以对不同的index做出参数输出 List> rows = searchResponse2List(response); result.put("rows", rows); result.put("total", response.hits().total().value()); System.out.println(response.hits().total() + " " + request.toString()); } catch (Exception ex) { ex.printStackTrace(); } return result; } public void updateMappingTest() throws IOException { Map maps = new HashMap<>(); maps.put("well_common_name", Property.of(p -> p.text(TextProperty.of(t -> t.index(true).analyzer("ik_max_word"))))); updateMappings("fact_dwr_well_basic_information", maps); } public void updateMappings(String index, Map maps) throws IOException { PutMappingRequest putMappingRequest = PutMappingRequest.of(m -> m.index(index).properties(maps)); PutMappingResponse putMappingResponse = esClient.indices().putMapping(putMappingRequest); boolean acknowledged = putMappingResponse.acknowledged(); System.out.println("update mappings ack: " + acknowledged); } public JsonNode stringToNodeJson(String jsonString) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); JsonNode rootNode = objectMapper.readTree(jsonString); return rootNode; } public List> aggResponse2List(SearchResponse searchResponse) { List buckets = searchResponse.aggregations().get("group_well").sterms().buckets().array(); List> aggs = new ArrayList<>(); for (StringTermsBucket bucket : buckets) { Map map = new HashMap<>(); map.put("key", bucket.key().stringValue()); map.put("doc_count", bucket.docCount()); aggs.add(map); } return aggs; } public List> searchResponse2List(SearchResponse searchResponse) { if (searchResponse == null) { return new ArrayList<>(0); } if (searchResponse.hits() == null) { return new ArrayList<>(0); } //if (CommonUtils.isCollectionEmpty(searchResponse.hits().hits())) {return new ArrayList<>(0);} List> hits = searchResponse.hits().hits(); List> list = new ArrayList<>(hits.size()); for (Hit hit : hits) { ObjectNode node = hit.source(); Map map = objectNode2Map(hit.index(), node); map.put("index", hit.index()); map.put("highlight", hit.highlight()); map.put("innerHits", hit.innerHits()); list.add(map); //输出结果日志 String line = "=>"; for (Map.Entry entry : map.entrySet()) { line += " " + entry.getKey() + ":" + (entry.getValue() == null ? "null" : entry.getValue().toString()); } System.out.println(line); } return list; } public String subFieldIndex(String index, String fieldName) { if (fieldName.startsWith(index) && fieldName.length() > index.length() + 1) return fieldName.substring(index.length() + 1); return fieldName; } public Map objectNode2Map(String index, ObjectNode objectNode) { if (null == objectNode) { return new HashMap<>(0); } if (objectNode.isEmpty()) { return new HashMap<>(0); } ObjectMapper objectMapper = new ObjectMapper(); Map map = objectMapper.convertValue(objectNode, new TypeReference>() { }); for (Map.Entry entry : map.entrySet()) { String name = subFieldIndex(index, entry.getKey()); if (name.equals(entry.getKey())) continue; map.remove(entry.getKey()); map.put(name, entry.getValue()); } return map; } }