EsQueryServiceImpl.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package com.bowintek.practice.services.impl;
  2. import co.elastic.clients.elasticsearch.ElasticsearchClient;
  3. import co.elastic.clients.elasticsearch._types.FieldValue;
  4. import co.elastic.clients.elasticsearch._types.SortOptions;
  5. import co.elastic.clients.elasticsearch._types.SortOrder;
  6. import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
  7. import co.elastic.clients.elasticsearch._types.aggregations.Buckets;
  8. import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
  9. import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
  10. import co.elastic.clients.elasticsearch._types.mapping.Property;
  11. import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
  12. import co.elastic.clients.elasticsearch._types.query_dsl.*;
  13. import co.elastic.clients.elasticsearch.cat.IndicesResponse;
  14. import co.elastic.clients.elasticsearch.core.SearchRequest;
  15. import co.elastic.clients.elasticsearch.core.SearchResponse;
  16. import co.elastic.clients.elasticsearch.core.search.Hit;
  17. import co.elastic.clients.elasticsearch.indices.PutMappingRequest;
  18. import co.elastic.clients.elasticsearch.indices.PutMappingResponse;
  19. import co.elastic.clients.json.JsonData;
  20. import com.bowintek.practice.mapper.cquery.EsIndexCquery;
  21. import com.bowintek.practice.services.service.AnalyzeService;
  22. import com.bowintek.practice.services.service.EsQueryService;
  23. import com.bowintek.practice.util.Constant;
  24. import com.bowintek.practice.util.StringUtils;
  25. import com.bowintek.practice.vo.Analyze.ComparisonResult;
  26. import com.bowintek.practice.vo.Analyze.EsQueryText;
  27. import com.bowintek.practice.vo.EsIndexVo;
  28. import com.bowintek.practice.vo.EsIndexfieldVo;
  29. import com.bowintek.practice.vo.EsQueryLogVo;
  30. import com.fasterxml.jackson.core.JsonProcessingException;
  31. import com.fasterxml.jackson.core.type.TypeReference;
  32. import com.fasterxml.jackson.databind.JsonNode;
  33. import com.fasterxml.jackson.databind.ObjectMapper;
  34. import com.fasterxml.jackson.databind.node.ObjectNode;
  35. import lombok.extern.slf4j.Slf4j;
  36. import org.springframework.beans.factory.annotation.Autowired;
  37. import org.springframework.data.redis.core.query.SortQuery;
  38. import org.springframework.stereotype.Component;
  39. import javax.naming.directory.SearchResult;
  40. import java.io.IOException;
  41. import java.util.*;
  42. import java.util.stream.Collectors;
  43. @Component
  44. @Slf4j
  45. public class EsQueryServiceImpl implements EsQueryService {
  46. @Autowired
  47. private ElasticsearchClient esClient;
  48. @Autowired
  49. private AnalyzeService analyzeService;
  50. @Autowired
  51. private EsIndexCquery esIndexCquery;
  52. private static Object lockObject = new Object();
  53. private static List<String> indexCache = null;
  54. private static List<String> pathCache = null;
  55. private static long cacheTime = 0;
  56. public void getCacheList() {
  57. synchronized (lockObject) {
  58. //从本地缓存加载数据
  59. long timeSpan = (new Date()).getTime() - cacheTime;
  60. if (timeSpan > 30 * 1000 || indexCache == null || pathCache == null) {
  61. List<EsIndexVo> list1 = esIndexCquery.getList(null, null, null);
  62. indexCache = list1.stream()
  63. .map(m -> m.getIndexCode())
  64. .distinct().collect(Collectors.toList());
  65. List<EsIndexfieldVo> list2 = esIndexCquery.getFieldList(null, null);
  66. pathCache = list2.stream().filter(p -> p.getDataType().equals("NESTED"))
  67. .map(m -> m.getFieldCode())
  68. .distinct().collect(Collectors.toList());
  69. cacheTime = (new Date()).getTime();
  70. }
  71. }
  72. }
  73. public List<String> getIndexCache() {
  74. getCacheList();
  75. return indexCache;
  76. }
  77. public List<String> getPathCache() {
  78. getCacheList();
  79. return pathCache;
  80. }
  81. public List<Query> getComparisonQueryList(EsQueryText queryText) {
  82. List<Query> queryList = new ArrayList<>();
  83. //分析查询字符串,有的字符串需要变成条件查询
  84. List<ComparisonResult> cmpList = analyzeService.analyzeJavas(queryText.getKeyString());
  85. //二级查询的路径信息
  86. List<String> pathList = getPathCache();
  87. for (int i = 0; i < cmpList.size(); i++) {
  88. ComparisonResult cmp = cmpList.get(i);
  89. //检查查询、对比的值是否为空
  90. if (StringUtils.IsNullEmpty(cmp.getKeyString())) continue;
  91. if (cmp.getSearchType().equals("comparison")) { //对比查询,有字段,有对比符号,有值
  92. //第一级,对比搜索
  93. queryList.addAll(getRangeQueryByComparison(cmp));
  94. //第二级,对比搜索
  95. for (String path : pathList) {
  96. queryList.addAll(getNestedRangeQueryByComparison(cmp, path));
  97. }
  98. } else if (StringUtils.IsNullEmpty(queryText.getField())) {//全文查询,没有限制字段
  99. //第一级,文字搜索
  100. queryList.add(getMultiMatchQuery(cmp.getKeyString()));
  101. //第二级,嵌套类型,文字搜索
  102. for (String path : pathList) {
  103. queryList.add(getNestedMultiMatchQuery(path, cmp.getKeyString()));
  104. }
  105. } else { //全文查询,限定了字段
  106. //第一级,文字搜索
  107. queryList.add(getMultiMatchQuery(new String[]{queryText.getField()}, cmp.getKeyString()));
  108. //第二级,嵌套类型,文字搜索
  109. for (String path : pathList) {
  110. queryList.add(getNestedMultiMatchQuery(path, new String[]{queryText.getField()}, cmp.getKeyString()));
  111. }
  112. }
  113. }
  114. return queryList;
  115. }
  116. @Override
  117. public Map<String, Object> query(List<EsQueryText> queryList, List<ComparisonResult> limiters,
  118. int page, int limit, String orderType, String orderBy) {
  119. //[1]需要返回的结果map
  120. Map<String, Object> result = new HashMap<>();
  121. result.put("total", 0);
  122. try {
  123. //二级查询的路径信息
  124. List<String> pathList = getPathCache();
  125. List<Query> queryMustList = new ArrayList<>();
  126. BoolQuery.Builder boolQuery = new BoolQuery.Builder();
  127. //[2]分析查询字符串,有的字符串需要变成条件查询
  128. for (EsQueryText queryText : queryList) {
  129. List<Query> shouldQuerys = getComparisonQueryList(queryText);
  130. if (shouldQuerys.size() > 0) {
  131. queryMustList.addAll(shouldQuerys);
  132. }
  133. }
  134. //[3]限定查询条件,都是有条件的查询
  135. for (ComparisonResult cmp : limiters) {
  136. //检查查询、对比的值是否为空
  137. if (StringUtils.IsNullEmpty(cmp.getValue())) continue;
  138. //第一级,对比搜索
  139. if (Constant.AND.equals(cmp.getRelation())) {
  140. boolQuery.must(getRangeQueryByComparison(cmp));
  141. } else if (Constant.OR.equals(cmp.getRelation())) {
  142. boolQuery.should(getRangeQueryByComparison(cmp));
  143. } else if (Constant.NOT.equals(cmp.getRelation())) {
  144. boolQuery.mustNot(getRangeQueryByComparison(cmp));
  145. } else {
  146. boolQuery.must(getRangeQueryByComparison(cmp));
  147. }
  148. }
  149. //[4]建立查询参数分析
  150. SearchRequest.Builder searchRequest = new SearchRequest.Builder();
  151. //==>要查询的索引列表,从数据库配置表中获取
  152. searchRequest.index(getIndexCache());
  153. //==>数据分页显示
  154. searchRequest.size(limit);
  155. searchRequest.from(page * limit);
  156. //==>设置查询条件
  157. if (queryMustList.size() > 0) {
  158. boolQuery.should(queryMustList);
  159. }
  160. Query queryMust = Query.of(q -> q.bool(boolQuery.build()));
  161. //排序
  162. if (orderType != null && !StringUtils.IsNullEmpty(orderBy)) {
  163. SortOptions.Builder sb = new SortOptions.Builder();
  164. sb.field(f -> f.field(orderBy).order(orderType.equals("asc") ? SortOrder.Asc : SortOrder.Desc));
  165. searchRequest.query(queryMust).sort(sb.build());
  166. }else{
  167. searchRequest.query(queryMust);
  168. }
  169. //==>高亮设置
  170. searchRequest.highlight(h -> h.fields("*", f -> f.preTags("<font color='red'>")
  171. .postTags("</font>")));
  172. //查询后分组
  173. searchRequest.aggregations("group_well", a -> a.terms(t -> t.field("well_id")));
  174. //[5]Es发起查询
  175. SearchRequest request = searchRequest.build();
  176. log.info("dsl:" + request.toString());
  177. SearchResponse<ObjectNode> response = esClient.search(request, ObjectNode.class);
  178. //[6]转换结果,可以对不同的index做出参数输出
  179. List<Map<String, Object>> rows = searchResponse2List(response);
  180. result.put("rows", rows);
  181. result.put("total", response.hits().total().value());
  182. result.put("agg", aggResponse2List(response));
  183. //请求参数输出,方便调试
  184. String[] jsonStrings = request.toString().split("typed_keys=true");
  185. result.put("SearchUrl", jsonStrings[0]);
  186. result.put("SearchRequest", stringToNodeJson(jsonStrings[1]));
  187. System.out.println(response.hits().total() + " " + request.toString());
  188. } catch (Exception ex) {
  189. result.put("Message", ex.getMessage());
  190. result.put("StackTrace", ex.getStackTrace());
  191. }
  192. return result;
  193. }
  194. public Query getMultiMatchQuery(String text) {
  195. //不指定,查询所有的一级字段
  196. return MultiMatchQuery.of(q -> q.query(text)
  197. .operator(Operator.Or))._toQuery();
  198. }
  199. public Query getMultiMatchQuery(List<String> fields, String text) {
  200. //限定查询字段
  201. return MultiMatchQuery.of(q -> q.fields(fields)
  202. .query(text)
  203. .operator(Operator.Or))._toQuery();
  204. }
  205. public Query getMultiMatchQuery(String[] fields, String text) {
  206. //限定查询字段
  207. return getMultiMatchQuery(Arrays.asList(fields), text);
  208. }
  209. public Query getNestedMultiMatchQuery(String path, String text) {
  210. //不指定,查询所有的二级字段
  211. return NestedQuery.of(q -> q.path(path)
  212. .query(getMultiMatchQuery(text)).innerHits(i -> i.highlight(hl -> hl
  213. .fields("*", ff -> ff
  214. .preTags("<font color='red'>")
  215. .postTags("</font>")
  216. )
  217. ))
  218. .ignoreUnmapped(true))._toQuery();
  219. }
  220. public Query getNestedMultiMatchQuery(String path, List<String> fields, String text) {
  221. //限定查询字段
  222. return NestedQuery.of(q -> q.path(path)
  223. .query(getMultiMatchQuery(fields, text))
  224. .ignoreUnmapped(true))._toQuery();
  225. }
  226. public Query getNestedMultiMatchQuery(String path, String[] fields, String text) {
  227. //限定查询字段
  228. return NestedQuery.of(q -> q.path(path)
  229. .query(getMultiMatchQuery(fields, text))
  230. .ignoreUnmapped(true))._toQuery();
  231. }
  232. public List<Query> getRangeQueryByComparison(ComparisonResult cmp) {
  233. List<Query> queryList = new ArrayList<>();
  234. String fieldString = cmp.getFields();
  235. //对比类型查询
  236. Query query = getComparisonQuery(fieldString, cmp.getOpreation(), cmp.getValue());
  237. if (query != null) queryList.add(query);
  238. return queryList;
  239. }
  240. public List<Query> getNestedRangeQueryByComparison(ComparisonResult cmp, String path) {
  241. List<Query> queryList = new ArrayList<>();
  242. String fieldString = cmp.getFields();
  243. //对比类型查询
  244. Query query = getComparisonQuery(path + "." + fieldString, cmp.getOpreation(), cmp.getValue());
  245. if (query != null) {
  246. Query nested = NestedQuery.of(q -> q.path(path).query(query)
  247. .ignoreUnmapped(true))._toQuery();
  248. queryList.add(nested);
  249. }
  250. return queryList;
  251. }
  252. public Query getComparisonQuery(String fieldString, String opreation, String value) {
  253. //对比类型查询
  254. Query query = null;
  255. if (opreation.equals("大于"))
  256. query = RangeQuery.of(q -> q.field(fieldString).gt(JsonData.of(value)))._toQuery();
  257. else if (opreation.equals("大于等于"))
  258. query = RangeQuery.of(q -> q.field(fieldString).gte(JsonData.of(value)))._toQuery();
  259. else if (opreation.equals("小于"))
  260. query = RangeQuery.of(q -> q.field(fieldString).lt(JsonData.of(value)))._toQuery();
  261. else if (opreation.equals("小于等于"))
  262. query = RangeQuery.of(q -> q.field(fieldString).lte(JsonData.of(value)))._toQuery();
  263. else if (opreation.equals("等于"))
  264. query = MatchQuery.of(q -> q.field(fieldString).query(value).minimumShouldMatch("80%"))._toQuery();
  265. else if (opreation.equals("包括IN")) {
  266. String[] ins = value.split(",");
  267. List<FieldValue> fls = new ArrayList<>();
  268. for (String inString : ins) fls.add(FieldValue.of(inString));
  269. query = TermsQuery.of(q -> q.field(fieldString).terms(s -> s.value(fls)))._toQuery();
  270. }
  271. return query;
  272. }
  273. public Map<String, Object> queryTest(String text, int page, int limit) {
  274. Map<String, Object> result = new HashMap<>();
  275. result.put("total", 0);
  276. try {
  277. IndicesResponse indicesResponse = esClient.cat().indices();
  278. indicesResponse.valueBody().forEach(i -> {
  279. System.out.println("get all index, health: " + i.health() + ", status: " + i.status() + ", index: " + i.index());
  280. });
  281. //建立查询参数分析
  282. SearchRequest.Builder searchRequest = new SearchRequest.Builder();
  283. //要查询的索引列表
  284. String[] indexs = new String[]{"dws_basic_info_history", "dws_dm_test_history", "fact_dwr_well_basic_information"};
  285. searchRequest.index(Arrays.asList(indexs));
  286. //数据分页显示
  287. searchRequest.size(limit);
  288. searchRequest.from(page * limit);
  289. //生成查询参数
  290. if (!StringUtils.IsNullEmpty(text)) {
  291. //非嵌套字段查询
  292. String[] fields = new String[]{"well_common_name", "testing_name"};
  293. Query query1 = MultiMatchQuery.of(q -> q.fields(Arrays.asList(fields)).query(text).operator(Operator.Or))._toQuery();
  294. //嵌套字段查询
  295. String[] nestedFields = new String[]{"historys.testing_name"};
  296. Query nestedQuery = MultiMatchQuery.of(q -> q.fields(Arrays.asList(nestedFields)).query(text).operator(Operator.Or))._toQuery();
  297. Query query3 = NestedQuery.of(q -> q.path("historys").query(nestedQuery).ignoreUnmapped(true))._toQuery();
  298. //对比类型查询
  299. Query query2 = RangeQuery.of(q -> q.field("authorized_md").gte(JsonData.of(500)))._toQuery();
  300. Query[] arys = new Query[]{query1, query2, query3};
  301. searchRequest.query(q -> q.bool(b -> b.should(Arrays.asList(arys))));
  302. //高亮设置
  303. searchRequest.highlight(h -> h.fields("historys.testing_name", f -> f.matchedFields("historys.testing_name")));
  304. }
  305. //Es发起查询
  306. SearchRequest request = searchRequest.build();
  307. SearchResponse response = esClient.search(request, ObjectNode.class);
  308. //转换结果,可以对不同的index做出参数输出
  309. List<Map<String, Object>> rows = searchResponse2List(response);
  310. result.put("rows", rows);
  311. result.put("total", response.hits().total().value());
  312. System.out.println(response.hits().total() + " " + request.toString());
  313. } catch (Exception ex) {
  314. ex.printStackTrace();
  315. }
  316. return result;
  317. }
  318. public void updateMappingTest() throws IOException {
  319. Map<String, Property> maps = new HashMap<>();
  320. maps.put("well_common_name", Property.of(p -> p.text(TextProperty.of(t -> t.index(true).analyzer("ik_max_word")))));
  321. updateMappings("fact_dwr_well_basic_information", maps);
  322. }
  323. public void updateMappings(String index, Map<String, Property> maps) throws IOException {
  324. PutMappingRequest putMappingRequest = PutMappingRequest.of(m -> m.index(index).properties(maps));
  325. PutMappingResponse putMappingResponse = esClient.indices().putMapping(putMappingRequest);
  326. boolean acknowledged = putMappingResponse.acknowledged();
  327. System.out.println("update mappings ack: " + acknowledged);
  328. }
  329. public JsonNode stringToNodeJson(String jsonString) throws JsonProcessingException {
  330. ObjectMapper objectMapper = new ObjectMapper();
  331. JsonNode rootNode = objectMapper.readTree(jsonString);
  332. return rootNode;
  333. }
  334. public List<Map<String, Object>> aggResponse2List(SearchResponse<ObjectNode> searchResponse) {
  335. List<StringTermsBucket> buckets = searchResponse.aggregations().get("group_well").sterms().buckets().array();
  336. List<Map<String, Object>> aggs = new ArrayList<>();
  337. for (StringTermsBucket bucket : buckets) {
  338. Map<String, Object> map = new HashMap<>();
  339. map.put("key", bucket.key().stringValue());
  340. map.put("doc_count", bucket.docCount());
  341. aggs.add(map);
  342. }
  343. return aggs;
  344. }
  345. public List<Map<String, Object>> searchResponse2List(SearchResponse<ObjectNode> searchResponse) {
  346. if (searchResponse == null) {
  347. return new ArrayList<>(0);
  348. }
  349. if (searchResponse.hits() == null) {
  350. return new ArrayList<>(0);
  351. }
  352. //if (CommonUtils.isCollectionEmpty(searchResponse.hits().hits())) {return new ArrayList<>(0);}
  353. List<Hit<ObjectNode>> hits = searchResponse.hits().hits();
  354. List<Map<String, Object>> list = new ArrayList<>(hits.size());
  355. for (Hit<ObjectNode> hit : hits) {
  356. ObjectNode node = hit.source();
  357. Map<String, Object> map = objectNode2Map(hit.index(), node);
  358. map.put("index", hit.index());
  359. map.put("highlight", hit.highlight());
  360. map.put("innerHits", hit.innerHits());
  361. list.add(map);
  362. //输出结果日志
  363. String line = "=>";
  364. for (Map.Entry<String, Object> entry : map.entrySet()) {
  365. line += " " + entry.getKey()
  366. + ":" + (entry.getValue() == null ? "null" : entry.getValue().toString());
  367. }
  368. System.out.println(line);
  369. }
  370. return list;
  371. }
  372. public String subFieldIndex(String index, String fieldName) {
  373. if (fieldName.startsWith(index) && fieldName.length() > index.length() + 1)
  374. return fieldName.substring(index.length() + 1);
  375. return fieldName;
  376. }
  377. public Map<String, Object> objectNode2Map(String index, ObjectNode objectNode) {
  378. if (null == objectNode) {
  379. return new HashMap<>(0);
  380. }
  381. if (objectNode.isEmpty()) {
  382. return new HashMap<>(0);
  383. }
  384. ObjectMapper objectMapper = new ObjectMapper();
  385. Map<String, Object> map = objectMapper.convertValue(objectNode, new TypeReference<Map<String, Object>>() {
  386. });
  387. for (Map.Entry<String, Object> entry : map.entrySet()) {
  388. String name = subFieldIndex(index, entry.getKey());
  389. if (name.equals(entry.getKey())) continue;
  390. map.remove(entry.getKey());
  391. map.put(name, entry.getValue());
  392. }
  393. return map;
  394. }
  395. }