본문 바로가기
ELK/ElasticSearch

ElasticSearch SendMail 발송 batch 용도 Scroll API 구현하기

by by 앵과장 2020. 5. 13.
반응형

 

 

ElasticSearch

scroll API 라는 기능이 존재한다.

가장 많이 사용되는 이유는 호출해야되는 데이터가 너무 많을경우 Batch 개발시 사용된다

페이징으로 개발되어있는 리스트를 전체 다운로드 해야하는경우 ElasticSearch 10000건으로 제한되어있다. 5.x버전이후로 성능상 이슈로 막혀있는데 이부분을 강제적으로 풀수는 있지만 권장하지않는다 

개발자의 잘못된 실수로 부하가 줄수있는 요청에 대해서 미연해 방지해서 서비스가 죽는 사태를 방지하기 위해서다.

 

10000건 제약때문에 어떻게 해야할지 난감한 상황이 생겼을때 사용하게된다.

대량의 데이터 조회하기 위해 RDBMS의 cursor같은 기능을 하는것이 바로 Scroll API 이다.

 

Hits 를 가져올때 사용하는것이고 Aggs 같은 집계형태 대량 데이터는 afterKey를 사용하면된다.

 

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-search-scroll.html

 

Search Scroll API | Java REST Client [7.7] | Elastic

The Scroll API can be used to retrieve a large number of results from a search request. In order to use scrolling, the following steps need to be executed in the given order. Initialize the search scroll contextedit An initial search request with a scroll

www.elastic.co

현재 spring에서 제공하는 플러그인을 사용하였다.

물론 최신버전 ES를 쓸수없어서 현재 ES6.5버전사용중인데 ㅜ.ㅜ 

이거죄다 뜯어고치려니 귀찮아요 미안..

 

org.springframework.data.elasticsearch 에서 제공하는 기준으로 scroll 샘플을 공유드려용

 

2가지 버전으로 샘플링 합니다.

Scroll

Stream 

2가지 function을 제공드릴테니 편하신방법으로 진행하시면됩니다.

public List<MaindealEs> scroll(MaindealEsRequest request) {
   ScrolledPage<MaindealEs> scroll = elasticsearchOperations.startScroll(1000, this.maindealSearchQueryBuilder(request), MaindealEs.class);
   String scrollId = scroll.getScrollId();
   List<MaindealEs> maindealEsList = new ArrayList<>();
   while (scroll.hasContent()) {
      maindealEsList.addAll(scroll.getContent());
      scrollId = scroll.getScrollId();
      scroll = elasticsearchOperations.continueScroll(scrollId, 1000, MaindealEs.class);
   }
   elasticsearchOperations.clearScroll(scrollId);
   return maindealEsList;
}
 
public List<MaindealEs> stream(MaindealEsRequest request) {
   CloseableIterator<MaindealEs> stream = elasticsearchOperations.stream(this.maindealSearchQueryBuilder(request), MaindealEs.class);
   List<MaindealEs> maindealEsList = new ArrayList<>();
   while (stream.hasNext()) {
      maindealEsList.add(stream.next());
   }
   return maindealEsList;
}
 
// unit test
@Test
public void stream() {
   MaindealEsRequest request = new MaindealEsRequest().setStatusTypeList(Collections.singletonList("CN")).setSize(9999);
   List<MaindealEs> result = service.stream(request);
   System.err.println(result.size());
}