EsQueryServiceImpl.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package com.bowintek.practice.services.impl;
  2. import co.elastic.clients.elasticsearch.ElasticsearchClient;
  3. import co.elastic.clients.elasticsearch._types.FieldValue;
  4. import co.elastic.clients.elasticsearch._types.SortOrder;
  5. import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
  6. import co.elastic.clients.elasticsearch._types.aggregations.Buckets;
  7. import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
  8. import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
  9. import co.elastic.clients.elasticsearch._types.mapping.Property;
  10. import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
  11. import co.elastic.clients.elasticsearch._types.query_dsl.*;
  12. import co.elastic.clients.elasticsearch.cat.IndicesResponse;
  13. import co.elastic.clients.elasticsearch.core.SearchRequest;
  14. import co.elastic.clients.elasticsearch.core.SearchResponse;
  15. import co.elastic.clients.elasticsearch.core.search.Hit;
  16. import co.elastic.clients.elasticsearch.indices.PutMappingRequest;
  17. import co.elastic.clients.elasticsearch.indices.PutMappingResponse;
  18. import co.elastic.clients.json.JsonData;
  19. import com.bowintek.practice.mapper.cquery.EsIndexCquery;
  20. import com.bowintek.practice.services.service.AnalyzeService;
  21. import com.bowintek.practice.services.service.EsQueryService;
  22. import com.bowintek.practice.util.StringUtils;
  23. import com.bowintek.practice.vo.Analyze.ComparisonResult;
  24. import com.bowintek.practice.vo.Analyze.EsQueryText;
  25. import com.bowintek.practice.vo.EsIndexVo;
  26. import com.bowintek.practice.vo.EsIndexfieldVo;
  27. import com.bowintek.practice.vo.EsQueryLogVo;
  28. import com.fasterxml.jackson.core.JsonProcessingException;
  29. import com.fasterxml.jackson.core.type.TypeReference;
  30. import com.fasterxml.jackson.databind.JsonNode;
  31. import com.fasterxml.jackson.databind.ObjectMapper;
  32. import com.fasterxml.jackson.databind.node.ObjectNode;
  33. import org.springframework.beans.factory.annotation.Autowired;
  34. import org.springframework.stereotype.Component;
  35. import javax.naming.directory.SearchResult;
  36. import java.io.IOException;
  37. import java.util.*;
  38. import java.util.stream.Collectors;
  39. @Component
  40. public class EsQueryServiceImpl implements EsQueryService {
  41. @Autowired
  42. private ElasticsearchClient esClient;
  43. @Autowired
  44. private AnalyzeService analyzeService;
  45. @Autowired
  46. private EsIndexCquery esIndexCquery;
  47. private static Object lockObject = new Object();
  48. private static List<String> indexCache = null;
  49. private static List<String> pathCache = null;
  50. private static long cacheTime = 0;
  51. public void getCacheList(){
  52. synchronized (lockObject) {
  53. //从本地缓存加载数据
  54. long timeSpan = (new Date()).getTime() - cacheTime;
  55. if (timeSpan > 30 * 1000 || indexCache == null || pathCache==null) {
  56. List<EsIndexVo> list1 = esIndexCquery.getList(null, null, null);
  57. indexCache = list1.stream()
  58. .map(m->m.getIndexCode())
  59. .distinct().collect(Collectors.toList());
  60. List<EsIndexfieldVo> list2 = esIndexCquery.getFieldList(null,null);
  61. pathCache = list2.stream().filter(p->p.getDataType().equals("NESTED"))
  62. .map(m->m.getFieldCode())
  63. .distinct().collect(Collectors.toList());
  64. cacheTime = (new Date()).getTime();
  65. }
  66. }
  67. }
  68. public List<String> getIndexCache(){
  69. getCacheList();
  70. return indexCache;
  71. }
  72. public List<String> getPathCache(){
  73. getCacheList();
  74. return pathCache;
  75. }
  76. public List<Query> getComparisonQueryList(EsQueryText queryText){
  77. List<Query> queryList = new ArrayList<>();
  78. //分析查询字符串,有的字符串需要变成条件查询
  79. List<ComparisonResult> cmpList = analyzeService.analyzeJavas(queryText.getKeyString());
  80. //二级查询的路径信息
  81. List<String> pathList = getPathCache();
  82. for (int i = 0; i < cmpList.size(); i++) {
  83. ComparisonResult cmp = cmpList.get(i);
  84. //检查查询、对比的值是否为空
  85. if(StringUtils.IsNullEmpty(cmp.getKeyString())) continue;
  86. if (cmp.getSearchType().equals("comparison")) { //对比查询,有字段,有对比符号,有值
  87. //第一级,对比搜索
  88. queryList.addAll(getRangeQueryByComparison(cmp));
  89. //第二级,对比搜索
  90. for(String path : pathList) {
  91. queryList.addAll(getNestedRangeQueryByComparison(cmp, path));
  92. }
  93. } else if(StringUtils.IsNullEmpty(queryText.getField())){//全文查询,没有限制字段
  94. //第一级,文字搜索
  95. queryList.add(getMultiMatchQuery(cmp.getKeyString()));
  96. //第二级,嵌套类型,文字搜索
  97. for(String path : pathList) {
  98. queryList.add(getNestedMultiMatchQuery(path, cmp.getKeyString()));
  99. }
  100. } else { //全文查询,限定了字段
  101. //第一级,文字搜索
  102. queryList.add(getMultiMatchQuery(new String[]{queryText.getField()} ,cmp.getKeyString()));
  103. //第二级,嵌套类型,文字搜索
  104. for(String path : pathList) {
  105. queryList.add(getNestedMultiMatchQuery(path, new String[]{queryText.getField()}, cmp.getKeyString()));
  106. }
  107. }
  108. }
  109. return queryList;
  110. }
  111. @Override
  112. public Map<String, Object> query(List<EsQueryText> queryList, List<ComparisonResult> limiters, int page, int limit) {
  113. //[1]需要返回的结果map
  114. Map<String, Object> result = new HashMap<>();
  115. result.put("total", 0);
  116. try {
  117. //二级查询的路径信息
  118. List<String> pathList = getPathCache();
  119. List<Query> queryMustList = new ArrayList<>();
  120. //[2]分析查询字符串,有的字符串需要变成条件查询
  121. for(EsQueryText queryText : queryList){
  122. List<Query> shouldQuerys = getComparisonQueryList(queryText);
  123. if(shouldQuerys.size()>0){
  124. queryMustList.add(Query.of(q->q.bool(b->b.should(shouldQuerys))));
  125. }
  126. }
  127. //[3]限定查询条件,都是有条件的查询
  128. for(ComparisonResult cmp:limiters){
  129. List<Query> cmpQuerys = new ArrayList<>();
  130. //检查查询、对比的值是否为空
  131. if(StringUtils.IsNullEmpty(cmp.getValue())) continue;
  132. //第一级,对比搜索
  133. cmpQuerys.addAll(getRangeQueryByComparison(cmp));
  134. //第二级,对比搜索
  135. for(String path : pathList) {
  136. cmpQuerys.addAll(getNestedRangeQueryByComparison(cmp, path));
  137. }
  138. if(cmpQuerys.size()>0){
  139. queryMustList.add(Query.of(q->q.bool(b->b.should(cmpQuerys))));
  140. }
  141. }
  142. //[4]建立查询参数分析
  143. SearchRequest.Builder searchRequest = new SearchRequest.Builder();
  144. //==>要查询的索引列表,从数据库配置表中获取
  145. searchRequest.index(getIndexCache());
  146. //==>数据分页显示
  147. searchRequest.size(limit);
  148. searchRequest.from(page * limit);
  149. //==>设置查询条件
  150. if (queryMustList.size() > 0) {
  151. searchRequest.query(q -> q.bool(b -> b.must(queryMustList)));
  152. }
  153. //==>高亮设置
  154. searchRequest.highlight(h->h.fields("*", f->f.matchedFields("*")));
  155. //排序
  156. //searchRequest.sort(f->f.field(v->v.field("well_id").order(SortOrder.Asc)));
  157. //查询后分组
  158. searchRequest.aggregations("group_well",a->a.terms(t->t.field("well_id.keyword")));
  159. //[5]Es发起查询
  160. SearchRequest request = searchRequest.build();
  161. SearchResponse<ObjectNode> response = esClient.search(request, ObjectNode.class);
  162. //[6]转换结果,可以对不同的index做出参数输出
  163. List<Map<String, Object>> rows = searchResponse2List(response);
  164. result.put("rows", rows);
  165. result.put("total", response.hits().total().value());
  166. result.put("agg",aggResponse2List(response));
  167. //请求参数输出,方便调试
  168. String[] jsonStrings = request.toString().split("typed_keys=true");
  169. result.put("SearchUrl", jsonStrings[0]);
  170. result.put("SearchRequest", stringToNodeJson(jsonStrings[1]));
  171. System.out.println(response.hits().total() + " " + request.toString());
  172. }
  173. catch (Exception ex){
  174. result.put("Message", ex.getMessage());
  175. result.put("StackTrace", ex.getStackTrace());
  176. }
  177. return result;
  178. }
  179. public Query getMultiMatchQuery(String text){
  180. //不指定,查询所有的一级字段
  181. return MultiMatchQuery.of(q -> q.query(text)
  182. .operator(Operator.Or))._toQuery();
  183. }
  184. public Query getMultiMatchQuery(List<String> fields, String text){
  185. //限定查询字段
  186. return MultiMatchQuery.of(q -> q.fields(fields)
  187. .query(text)
  188. .operator(Operator.Or))._toQuery();
  189. }
  190. public Query getMultiMatchQuery(String[] fields, String text){
  191. //限定查询字段
  192. return getMultiMatchQuery(Arrays.asList(fields), text);
  193. }
  194. public Query getNestedMultiMatchQuery(String path, String text){
  195. //不指定,查询所有的二级字段
  196. return NestedQuery.of(q-> q.path(path)
  197. .query(getMultiMatchQuery(text))
  198. .ignoreUnmapped(true))._toQuery();
  199. }
  200. public Query getNestedMultiMatchQuery(String path, List<String> fields, String text){
  201. //限定查询字段
  202. return NestedQuery.of(q-> q.path(path)
  203. .query(getMultiMatchQuery(fields, text))
  204. .ignoreUnmapped(true))._toQuery();
  205. }
  206. public Query getNestedMultiMatchQuery(String path, String[] fields, String text){
  207. //限定查询字段
  208. return NestedQuery.of(q-> q.path(path)
  209. .query(getMultiMatchQuery(fields, text))
  210. .ignoreUnmapped(true))._toQuery();
  211. }
  212. public List<Query> getRangeQueryByComparison(ComparisonResult cmp){
  213. List<Query> queryList = new ArrayList<>();
  214. for(int i=0;i<cmp.getFields().length;i++){
  215. String fieldString = cmp.getFields()[i];
  216. //对比类型查询
  217. Query query = getComparisonQuery(fieldString, cmp.getOpreation(), cmp.getValue());
  218. if(query!=null) queryList.add(query);
  219. }
  220. return queryList;
  221. }
  222. public List<Query> getNestedRangeQueryByComparison(ComparisonResult cmp, String path){
  223. List<Query> queryList = new ArrayList<>();
  224. for(int i=0;i<cmp.getFields().length;i++){
  225. String fieldString = cmp.getFields()[i];
  226. //对比类型查询
  227. Query query = getComparisonQuery(path + "." + fieldString, cmp.getOpreation(), cmp.getValue());
  228. if(query!=null) {
  229. Query nested = NestedQuery.of(q-> q.path(path).query(query)
  230. .ignoreUnmapped(true))._toQuery();
  231. queryList.add(nested);
  232. }
  233. }
  234. return queryList;
  235. }
  236. public Query getComparisonQuery(String fieldString, String opreation, String value){
  237. //对比类型查询
  238. Query query = null;
  239. if(opreation.equals("大于"))
  240. query = RangeQuery.of(q->q.field(fieldString).gt(JsonData.of(value)))._toQuery();
  241. else if(opreation.equals("大于等于"))
  242. query = RangeQuery.of(q->q.field(fieldString).gte(JsonData.of(value)))._toQuery();
  243. else if(opreation.equals("小于"))
  244. query = RangeQuery.of(q->q.field(fieldString).lt(JsonData.of(value)))._toQuery();
  245. else if(opreation.equals("小于等于"))
  246. query = RangeQuery.of(q->q.field(fieldString).lte(JsonData.of(value)))._toQuery();
  247. else if(opreation.equals("等于"))
  248. query = TermQuery.of(q->q.field(fieldString).value(value))._toQuery();
  249. else if(opreation.equals("包括IN")) {
  250. String[] ins = value.split(",");
  251. List<FieldValue> fls = new ArrayList<>();
  252. for(String inString : ins) fls.add(FieldValue.of(inString));
  253. query = TermsQuery.of(q -> q.field(fieldString).terms(s -> s.value(fls)))._toQuery();
  254. }
  255. return query;
  256. }
  257. public Map<String, Object> queryTest(String text, int page, int limit) {
  258. Map<String, Object> result = new HashMap<>();
  259. result.put("total", 0);
  260. try {
  261. IndicesResponse indicesResponse = esClient.cat().indices();
  262. indicesResponse.valueBody().forEach(i -> {
  263. System.out.println("get all index, health: " + i.health() + ", status: " + i.status() + ", index: " + i.index());
  264. });
  265. //建立查询参数分析
  266. SearchRequest.Builder searchRequest = new SearchRequest.Builder();
  267. //要查询的索引列表
  268. String[] indexs = new String[]{"dws_basic_info_history", "dws_dm_test_history", "fact_dwr_well_basic_information"};
  269. searchRequest.index(Arrays.asList(indexs));
  270. //数据分页显示
  271. searchRequest.size(limit);
  272. searchRequest.from(page * limit);
  273. //生成查询参数
  274. if(!StringUtils.IsNullEmpty(text)) {
  275. //非嵌套字段查询
  276. String[] fields = new String[]{"well_common_name","testing_name"};
  277. Query query1 = MultiMatchQuery.of(q -> q.fields(Arrays.asList(fields)).query(text).operator(Operator.Or))._toQuery();
  278. //嵌套字段查询
  279. String[] nestedFields = new String[]{"historys.testing_name"};
  280. Query nestedQuery = MultiMatchQuery.of(q -> q.fields(Arrays.asList(nestedFields)).query(text).operator(Operator.Or))._toQuery();
  281. Query query3 = NestedQuery.of(q-> q.path("historys").query(nestedQuery).ignoreUnmapped(true))._toQuery();
  282. //对比类型查询
  283. Query query2 = RangeQuery.of(q->q.field("authorized_md").gte(JsonData.of(500)))._toQuery();
  284. Query[] arys = new Query[]{query1, query2, query3};
  285. searchRequest.query(q->q.bool(b->b.should(Arrays.asList(arys))));
  286. //高亮设置
  287. searchRequest.highlight(h->h.fields("historys.testing_name", f->f.matchedFields("historys.testing_name")));
  288. }
  289. //Es发起查询
  290. SearchRequest request = searchRequest.build();
  291. SearchResponse response = esClient.search(request, ObjectNode.class);
  292. //转换结果,可以对不同的index做出参数输出
  293. List<Map<String, Object>> rows = searchResponse2List(response);
  294. result.put("rows", rows);
  295. result.put("total", response.hits().total().value());
  296. System.out.println(response.hits().total()+" "+ request.toString());
  297. }
  298. catch (Exception ex){
  299. ex.printStackTrace();
  300. }
  301. return result;
  302. }
  303. public void updateMappingTest() throws IOException {
  304. Map<String, Property> maps = new HashMap<>();
  305. maps.put("well_common_name", Property.of(p->p.text(TextProperty.of(t->t.index(true).analyzer("ik_max_word")))));
  306. updateMappings("fact_dwr_well_basic_information", maps);
  307. }
  308. public void updateMappings(String index, Map<String, Property> maps) throws IOException {
  309. PutMappingRequest putMappingRequest = PutMappingRequest.of(m -> m.index(index).properties(maps));
  310. PutMappingResponse putMappingResponse = esClient.indices().putMapping(putMappingRequest);
  311. boolean acknowledged = putMappingResponse.acknowledged();
  312. System.out.println("update mappings ack: " + acknowledged);
  313. }
  314. public JsonNode stringToNodeJson(String jsonString) throws JsonProcessingException {
  315. ObjectMapper objectMapper = new ObjectMapper();
  316. JsonNode rootNode = objectMapper.readTree(jsonString);
  317. return rootNode;
  318. }
  319. public List<Map<String, Object>> aggResponse2List(SearchResponse<ObjectNode> searchResponse)
  320. {
  321. List<StringTermsBucket> buckets =searchResponse.aggregations().get("group_well").sterms().buckets().array();
  322. List<Map<String, Object>> aggs = new ArrayList<>();
  323. for (StringTermsBucket bucket : buckets) {
  324. Map<String, Object> map = new HashMap<>();
  325. map.put("key", bucket.key().stringValue());
  326. map.put("doc_count", bucket.docCount());
  327. aggs.add(map);
  328. }
  329. return aggs;
  330. }
  331. public List<Map<String, Object>> searchResponse2List(SearchResponse<ObjectNode> searchResponse) {
  332. if (searchResponse == null) {return new ArrayList<>(0);}
  333. if (searchResponse.hits() == null) {return new ArrayList<>(0);}
  334. //if (CommonUtils.isCollectionEmpty(searchResponse.hits().hits())) {return new ArrayList<>(0);}
  335. List<Hit<ObjectNode>> hits = searchResponse.hits().hits();
  336. List<Map<String, Object>> list = new ArrayList<>(hits.size());
  337. for (Hit<ObjectNode> hit : hits) {
  338. ObjectNode node = hit.source();
  339. Map<String, Object> map = objectNode2Map(hit.index(), node);
  340. map.put("index", hit.index());
  341. map.put("highlight", hit.highlight());
  342. list.add(map);
  343. //输出结果日志
  344. String line = "=>";
  345. for(Map.Entry<String, Object> entry : map.entrySet()){
  346. line += " "+entry.getKey()
  347. +":"+ (entry.getValue()==null?"null":entry.getValue().toString());
  348. }
  349. System.out.println(line);
  350. }
  351. return list;
  352. }
  353. public String subFieldIndex(String index, String fieldName){
  354. if(fieldName.startsWith(index) && fieldName.length()> index.length()+1) return fieldName.substring(index.length()+1);
  355. return fieldName;
  356. }
  357. public Map<String, Object> objectNode2Map(String index, ObjectNode objectNode) {
  358. if (null == objectNode) {return new HashMap<>(0);}
  359. if (objectNode.isEmpty()) {return new HashMap<>(0);}
  360. ObjectMapper objectMapper = new ObjectMapper();
  361. Map<String, Object> map = objectMapper.convertValue(objectNode, new TypeReference<Map<String, Object>>() {});
  362. for(Map.Entry<String, Object> entry : map.entrySet()){
  363. String name = subFieldIndex(index, entry.getKey());
  364. if(name.equals(entry.getKey())) continue;
  365. map.remove(entry.getKey());
  366. map.put(name, entry.getValue());
  367. }
  368. return map;
  369. }
  370. }