123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- package com.bowintek.practice.services.impl;
- import co.elastic.clients.elasticsearch.ElasticsearchClient;
- import co.elastic.clients.elasticsearch._types.FieldValue;
- import co.elastic.clients.elasticsearch._types.SortOrder;
- import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
- import co.elastic.clients.elasticsearch._types.aggregations.Buckets;
- import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
- import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
- import co.elastic.clients.elasticsearch._types.mapping.Property;
- import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
- import co.elastic.clients.elasticsearch._types.query_dsl.*;
- import co.elastic.clients.elasticsearch.cat.IndicesResponse;
- import co.elastic.clients.elasticsearch.core.SearchRequest;
- import co.elastic.clients.elasticsearch.core.SearchResponse;
- import co.elastic.clients.elasticsearch.core.search.Hit;
- import co.elastic.clients.elasticsearch.indices.PutMappingRequest;
- import co.elastic.clients.elasticsearch.indices.PutMappingResponse;
- import co.elastic.clients.json.JsonData;
- import com.bowintek.practice.mapper.cquery.EsIndexCquery;
- import com.bowintek.practice.services.service.AnalyzeService;
- import com.bowintek.practice.services.service.EsQueryService;
- import com.bowintek.practice.util.StringUtils;
- import com.bowintek.practice.vo.Analyze.ComparisonResult;
- import com.bowintek.practice.vo.Analyze.EsQueryText;
- import com.bowintek.practice.vo.EsIndexVo;
- import com.bowintek.practice.vo.EsIndexfieldVo;
- import com.bowintek.practice.vo.EsQueryLogVo;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.core.type.TypeReference;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.fasterxml.jackson.databind.node.ObjectNode;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import javax.naming.directory.SearchResult;
- import java.io.IOException;
- import java.util.*;
- import java.util.stream.Collectors;
- @Component
- public class EsQueryServiceImpl implements EsQueryService {
- @Autowired
- private ElasticsearchClient esClient;
- @Autowired
- private AnalyzeService analyzeService;
- @Autowired
- private EsIndexCquery esIndexCquery;
- private static Object lockObject = new Object();
- private static List<String> indexCache = null;
- private static List<String> pathCache = null;
- private static long cacheTime = 0;
- public void getCacheList(){
- synchronized (lockObject) {
- //从本地缓存加载数据
- long timeSpan = (new Date()).getTime() - cacheTime;
- if (timeSpan > 30 * 1000 || indexCache == null || pathCache==null) {
- List<EsIndexVo> list1 = esIndexCquery.getList(null, null, null);
- indexCache = list1.stream()
- .map(m->m.getIndexCode())
- .distinct().collect(Collectors.toList());
- List<EsIndexfieldVo> list2 = esIndexCquery.getFieldList(null,null);
- pathCache = list2.stream().filter(p->p.getDataType().equals("NESTED"))
- .map(m->m.getFieldCode())
- .distinct().collect(Collectors.toList());
- cacheTime = (new Date()).getTime();
- }
- }
- }
- public List<String> getIndexCache(){
- getCacheList();
- return indexCache;
- }
- public List<String> getPathCache(){
- getCacheList();
- return pathCache;
- }
- public List<Query> getComparisonQueryList(EsQueryText queryText){
- List<Query> queryList = new ArrayList<>();
- //分析查询字符串,有的字符串需要变成条件查询
- List<ComparisonResult> cmpList = analyzeService.analyzeJavas(queryText.getKeyString());
- //二级查询的路径信息
- List<String> pathList = getPathCache();
- for (int i = 0; i < cmpList.size(); i++) {
- ComparisonResult cmp = cmpList.get(i);
- //检查查询、对比的值是否为空
- if(StringUtils.IsNullEmpty(cmp.getKeyString())) continue;
- if (cmp.getSearchType().equals("comparison")) { //对比查询,有字段,有对比符号,有值
- //第一级,对比搜索
- queryList.addAll(getRangeQueryByComparison(cmp));
- //第二级,对比搜索
- for(String path : pathList) {
- queryList.addAll(getNestedRangeQueryByComparison(cmp, path));
- }
- } else if(StringUtils.IsNullEmpty(queryText.getField())){//全文查询,没有限制字段
- //第一级,文字搜索
- queryList.add(getMultiMatchQuery(cmp.getKeyString()));
- //第二级,嵌套类型,文字搜索
- for(String path : pathList) {
- queryList.add(getNestedMultiMatchQuery(path, cmp.getKeyString()));
- }
- } else { //全文查询,限定了字段
- //第一级,文字搜索
- queryList.add(getMultiMatchQuery(new String[]{queryText.getField()} ,cmp.getKeyString()));
- //第二级,嵌套类型,文字搜索
- for(String path : pathList) {
- queryList.add(getNestedMultiMatchQuery(path, new String[]{queryText.getField()}, cmp.getKeyString()));
- }
- }
- }
- return queryList;
- }
- @Override
- public Map<String, Object> query(List<EsQueryText> queryList, List<ComparisonResult> limiters, int page, int limit) {
- //[1]需要返回的结果map
- Map<String, Object> result = new HashMap<>();
- result.put("total", 0);
- try {
- //二级查询的路径信息
- List<String> pathList = getPathCache();
- List<Query> queryMustList = new ArrayList<>();
- //[2]分析查询字符串,有的字符串需要变成条件查询
- for(EsQueryText queryText : queryList){
- List<Query> shouldQuerys = getComparisonQueryList(queryText);
- if(shouldQuerys.size()>0){
- queryMustList.add(Query.of(q->q.bool(b->b.should(shouldQuerys))));
- }
- }
- //[3]限定查询条件,都是有条件的查询
- for(ComparisonResult cmp:limiters){
- List<Query> cmpQuerys = new ArrayList<>();
- //检查查询、对比的值是否为空
- if(StringUtils.IsNullEmpty(cmp.getValue())) continue;
- //第一级,对比搜索
- cmpQuerys.addAll(getRangeQueryByComparison(cmp));
- //第二级,对比搜索
- for(String path : pathList) {
- cmpQuerys.addAll(getNestedRangeQueryByComparison(cmp, path));
- }
- if(cmpQuerys.size()>0){
- queryMustList.add(Query.of(q->q.bool(b->b.should(cmpQuerys))));
- }
- }
- //[4]建立查询参数分析
- SearchRequest.Builder searchRequest = new SearchRequest.Builder();
- //==>要查询的索引列表,从数据库配置表中获取
- searchRequest.index(getIndexCache());
- //==>数据分页显示
- searchRequest.size(limit);
- searchRequest.from(page * limit);
- //==>设置查询条件
- if (queryMustList.size() > 0) {
- searchRequest.query(q -> q.bool(b -> b.must(queryMustList)));
- }
- //==>高亮设置
- searchRequest.highlight(h->h.fields("*", f->f.matchedFields("*")));
- //排序
- //searchRequest.sort(f->f.field(v->v.field("well_id").order(SortOrder.Asc)));
- //查询后分组
- searchRequest.aggregations("group_well",a->a.terms(t->t.field("well_id.keyword")));
- //[5]Es发起查询
- SearchRequest request = searchRequest.build();
- SearchResponse<ObjectNode> response = esClient.search(request, ObjectNode.class);
- //[6]转换结果,可以对不同的index做出参数输出
- List<Map<String, Object>> rows = searchResponse2List(response);
- result.put("rows", rows);
- result.put("total", response.hits().total().value());
- result.put("agg",aggResponse2List(response));
- //请求参数输出,方便调试
- String[] jsonStrings = request.toString().split("typed_keys=true");
- result.put("SearchUrl", jsonStrings[0]);
- result.put("SearchRequest", stringToNodeJson(jsonStrings[1]));
- System.out.println(response.hits().total() + " " + request.toString());
- }
- catch (Exception ex){
- result.put("Message", ex.getMessage());
- result.put("StackTrace", ex.getStackTrace());
- }
- return result;
- }
- public Query getMultiMatchQuery(String text){
- //不指定,查询所有的一级字段
- return MultiMatchQuery.of(q -> q.query(text)
- .operator(Operator.Or))._toQuery();
- }
- public Query getMultiMatchQuery(List<String> fields, String text){
- //限定查询字段
- return MultiMatchQuery.of(q -> q.fields(fields)
- .query(text)
- .operator(Operator.Or))._toQuery();
- }
- public Query getMultiMatchQuery(String[] fields, String text){
- //限定查询字段
- return getMultiMatchQuery(Arrays.asList(fields), text);
- }
- public Query getNestedMultiMatchQuery(String path, String text){
- //不指定,查询所有的二级字段
- return NestedQuery.of(q-> q.path(path)
- .query(getMultiMatchQuery(text))
- .ignoreUnmapped(true))._toQuery();
- }
- public Query getNestedMultiMatchQuery(String path, List<String> fields, String text){
- //限定查询字段
- return NestedQuery.of(q-> q.path(path)
- .query(getMultiMatchQuery(fields, text))
- .ignoreUnmapped(true))._toQuery();
- }
- public Query getNestedMultiMatchQuery(String path, String[] fields, String text){
- //限定查询字段
- return NestedQuery.of(q-> q.path(path)
- .query(getMultiMatchQuery(fields, text))
- .ignoreUnmapped(true))._toQuery();
- }
- public List<Query> getRangeQueryByComparison(ComparisonResult cmp){
- List<Query> queryList = new ArrayList<>();
- for(int i=0;i<cmp.getFields().length;i++){
- String fieldString = cmp.getFields()[i];
- //对比类型查询
- Query query = getComparisonQuery(fieldString, cmp.getOpreation(), cmp.getValue());
- if(query!=null) queryList.add(query);
- }
- return queryList;
- }
- public List<Query> getNestedRangeQueryByComparison(ComparisonResult cmp, String path){
- List<Query> queryList = new ArrayList<>();
- for(int i=0;i<cmp.getFields().length;i++){
- String fieldString = cmp.getFields()[i];
- //对比类型查询
- Query query = getComparisonQuery(path + "." + fieldString, cmp.getOpreation(), cmp.getValue());
- if(query!=null) {
- Query nested = NestedQuery.of(q-> q.path(path).query(query)
- .ignoreUnmapped(true))._toQuery();
- queryList.add(nested);
- }
- }
- return queryList;
- }
- public Query getComparisonQuery(String fieldString, String opreation, String value){
- //对比类型查询
- Query query = null;
- if(opreation.equals("大于"))
- query = RangeQuery.of(q->q.field(fieldString).gt(JsonData.of(value)))._toQuery();
- else if(opreation.equals("大于等于"))
- query = RangeQuery.of(q->q.field(fieldString).gte(JsonData.of(value)))._toQuery();
- else if(opreation.equals("小于"))
- query = RangeQuery.of(q->q.field(fieldString).lt(JsonData.of(value)))._toQuery();
- else if(opreation.equals("小于等于"))
- query = RangeQuery.of(q->q.field(fieldString).lte(JsonData.of(value)))._toQuery();
- else if(opreation.equals("等于"))
- query = TermQuery.of(q->q.field(fieldString).value(value))._toQuery();
- else if(opreation.equals("包括IN")) {
- String[] ins = value.split(",");
- List<FieldValue> fls = new ArrayList<>();
- for(String inString : ins) fls.add(FieldValue.of(inString));
- query = TermsQuery.of(q -> q.field(fieldString).terms(s -> s.value(fls)))._toQuery();
- }
- return query;
- }
- public Map<String, Object> queryTest(String text, int page, int limit) {
- Map<String, Object> result = new HashMap<>();
- result.put("total", 0);
- try {
- IndicesResponse indicesResponse = esClient.cat().indices();
- indicesResponse.valueBody().forEach(i -> {
- System.out.println("get all index, health: " + i.health() + ", status: " + i.status() + ", index: " + i.index());
- });
- //建立查询参数分析
- SearchRequest.Builder searchRequest = new SearchRequest.Builder();
- //要查询的索引列表
- String[] indexs = new String[]{"dws_basic_info_history", "dws_dm_test_history", "fact_dwr_well_basic_information"};
- searchRequest.index(Arrays.asList(indexs));
- //数据分页显示
- searchRequest.size(limit);
- searchRequest.from(page * limit);
- //生成查询参数
- if(!StringUtils.IsNullEmpty(text)) {
- //非嵌套字段查询
- String[] fields = new String[]{"well_common_name","testing_name"};
- Query query1 = MultiMatchQuery.of(q -> q.fields(Arrays.asList(fields)).query(text).operator(Operator.Or))._toQuery();
- //嵌套字段查询
- String[] nestedFields = new String[]{"historys.testing_name"};
- Query nestedQuery = MultiMatchQuery.of(q -> q.fields(Arrays.asList(nestedFields)).query(text).operator(Operator.Or))._toQuery();
- Query query3 = NestedQuery.of(q-> q.path("historys").query(nestedQuery).ignoreUnmapped(true))._toQuery();
- //对比类型查询
- Query query2 = RangeQuery.of(q->q.field("authorized_md").gte(JsonData.of(500)))._toQuery();
- Query[] arys = new Query[]{query1, query2, query3};
- searchRequest.query(q->q.bool(b->b.should(Arrays.asList(arys))));
- //高亮设置
- searchRequest.highlight(h->h.fields("historys.testing_name", f->f.matchedFields("historys.testing_name")));
- }
- //Es发起查询
- SearchRequest request = searchRequest.build();
- SearchResponse response = esClient.search(request, ObjectNode.class);
- //转换结果,可以对不同的index做出参数输出
- List<Map<String, Object>> rows = searchResponse2List(response);
- result.put("rows", rows);
- result.put("total", response.hits().total().value());
- System.out.println(response.hits().total()+" "+ request.toString());
- }
- catch (Exception ex){
- ex.printStackTrace();
- }
- return result;
- }
- public void updateMappingTest() throws IOException {
- Map<String, Property> maps = new HashMap<>();
- maps.put("well_common_name", Property.of(p->p.text(TextProperty.of(t->t.index(true).analyzer("ik_max_word")))));
- updateMappings("fact_dwr_well_basic_information", maps);
- }
- public void updateMappings(String index, Map<String, Property> maps) throws IOException {
- PutMappingRequest putMappingRequest = PutMappingRequest.of(m -> m.index(index).properties(maps));
- PutMappingResponse putMappingResponse = esClient.indices().putMapping(putMappingRequest);
- boolean acknowledged = putMappingResponse.acknowledged();
- System.out.println("update mappings ack: " + acknowledged);
- }
- public JsonNode stringToNodeJson(String jsonString) throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- JsonNode rootNode = objectMapper.readTree(jsonString);
- return rootNode;
- }
- public List<Map<String, Object>> aggResponse2List(SearchResponse<ObjectNode> searchResponse)
- {
- List<StringTermsBucket> buckets =searchResponse.aggregations().get("group_well").sterms().buckets().array();
- List<Map<String, Object>> aggs = new ArrayList<>();
- for (StringTermsBucket bucket : buckets) {
- Map<String, Object> map = new HashMap<>();
- map.put("key", bucket.key().stringValue());
- map.put("doc_count", bucket.docCount());
- aggs.add(map);
- }
- return aggs;
- }
- public List<Map<String, Object>> searchResponse2List(SearchResponse<ObjectNode> searchResponse) {
- if (searchResponse == null) {return new ArrayList<>(0);}
- if (searchResponse.hits() == null) {return new ArrayList<>(0);}
- //if (CommonUtils.isCollectionEmpty(searchResponse.hits().hits())) {return new ArrayList<>(0);}
- List<Hit<ObjectNode>> hits = searchResponse.hits().hits();
- List<Map<String, Object>> list = new ArrayList<>(hits.size());
- for (Hit<ObjectNode> hit : hits) {
- ObjectNode node = hit.source();
- Map<String, Object> map = objectNode2Map(hit.index(), node);
- map.put("index", hit.index());
- map.put("highlight", hit.highlight());
- list.add(map);
- //输出结果日志
- String line = "=>";
- for(Map.Entry<String, Object> entry : map.entrySet()){
- line += " "+entry.getKey()
- +":"+ (entry.getValue()==null?"null":entry.getValue().toString());
- }
- System.out.println(line);
- }
- return list;
- }
- public String subFieldIndex(String index, String fieldName){
- if(fieldName.startsWith(index) && fieldName.length()> index.length()+1) return fieldName.substring(index.length()+1);
- return fieldName;
- }
- public Map<String, Object> objectNode2Map(String index, ObjectNode objectNode) {
- if (null == objectNode) {return new HashMap<>(0);}
- if (objectNode.isEmpty()) {return new HashMap<>(0);}
- ObjectMapper objectMapper = new ObjectMapper();
- Map<String, Object> map = objectMapper.convertValue(objectNode, new TypeReference<Map<String, Object>>() {});
- for(Map.Entry<String, Object> entry : map.entrySet()){
- String name = subFieldIndex(index, entry.getKey());
- if(name.equals(entry.getKey())) continue;
- map.remove(entry.getKey());
- map.put(name, entry.getValue());
- }
- return map;
- }
- }
|