본문 바로가기
ELK/ElasticSearch

ElasticSearch Aggregations aggs composite afterkey Spring 집계 활용방법

by by 앵과장 2020. 5. 12.
반응형

 

이번에 회사옮긴지 한달정도 되었는데 ElasticSearch 를 해본적이 없어서 오랜만에 삽질 하고있는 개발자 입니다.

죽을꺼같아요..쓰바 집에가고 싶다.. 한주동안 결과를 달라는데 이런... 괜히 이직했어..

 

하필 데이터도 많은 deal 상품 처리 하는 팀으로 와서 뽑아달라는 데이터가 몇억건에서 1건씩 max인값을 뽑아달라는데 난감합니다.

 

group by 를 하면된다고 하는데요 일단 RDB 가 존재하고 RDB를 기준으로 document를 만들어둔 ES가 존재해서 

당연히 처음에는 할줄아는 RDBMS로 데이터를 추출하려고 시도해보았습니다. 안나오네여 

거의 많이보는 결과는 time out 아니면 뻗드라구여 slow Query 누가 날렸냐구 DB팀에서 DM날라옵니다.

 

ElasticSearch 데이터 집계형태로 뽑아 내기위해서는 aggregations 를 사용하면 된다고합니다.

기본 Select Query도 못짜는데 난감하네요

 

자 기본적인것을 한번 예를 들어볼께요

 

집계를 개발 하기 위해서는 기본적인 형식을 인지해야합니다.

POST /index/type/_search
{
  "aggs": { 
  }
  , "query": {    
  }
  , "size": 0
}

aggs  : 집계하고자하는 필드 정보를 구현합니다.

query : 조건에 해당하는 정보를 구현합니다. 만약 Query 영역을 생략하거나 명시 하지 않는다면 Match_all Query와 동일합니다.

size :  hits Arrays 안에 있는 결과 데이터는 사용하지 않고 집계결과만 사용하기때문에 0으로 합니다.

 

 

 

샘플 1 

아래는 bank table에 state group by 하고 order by DESC 에 대해서 aggs 를 구현한 형태입니다.

출력되는 필드는 state, count(*) 값입니다.

 

지금 하시려고 하는게 이정도 간단한 건 아닐겁니다 이건 그냥 이해를 돕고자하는 샘플입니다.

GET /bank/_search
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

해당 루씬 쿼리는 아래 RDBMS SQL 과 같습니다.

SELECT state, COUNT(*) FROM bank GROUP BY state ORDER BY COUNT(*) DESC

 

예시를 한번 들어보겠습니다.

파트너 사들이 10000만건 있고 각 파트너사들마다 1:N 관계인 상품 document 구조가 있다고 가정해볼께요

그중 현재 데이터는 1억건정도가 적재되어있습니다.

 

파트너사 들중에 상품이 오늘 기준 4개월치 데이터만 뽑아달라는 요건입니다.

생성일 : MAX, 상태코드 : Y  인것만 뽑아주세요

어떤 파트너인지 알수 있게 파트너 번호 생성날짜도 함께 알려주세요 라는 데이터를 추출하기 위해서는 1억건에 document를 스캔해야합니다.

 

원천데이터인 RDBMS도 존재하지만 데이터에 건수가 많기 때문에 timeout 아니면 배치로 뭔가 데이터를 추출하는데 정말 엄청 긴시간이 필요합니다. 이럴때 NOSQL로 적재되어있는 ES를 사용하면 원하는 데이터를 뽑아낼수 있습니다. 

 

GET product-*/_search
{
  "size": 0,	//집계쪽 데이터를 사용하기때문에 hits 결과물은 사용할필요가 없어 0으로 설정
  "query": {	//생성일과 상태값에 대한 조건으로 Query를 구현함	
    "bool": {
      "must": [
        {
          "range": {
            "createDate": {
              "from": "2019-12-01",
              "to": "2020-05-31",
              "include_lower": false,
              "include_upper": false,
              "boost": 1
            }
          }
        },
        {
          "term": {
            "status": {
              "value": "Y",
              "boost": 1
            }
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1
    }
  },
  "version": true,
  "aggregations": {
    "group": {				//group 으로 묶는 인지가능한 변수정도 의미
      "composite": {		//한개의 필드가 아닌 여러개 필드로 group 필요하다면 composite 를 이용하면되고 출력되는 결과물이 많아서 한번에 출력이 너무 오래걸리면 composite 를이용해서 페이징처럼 잘라서 가져올수 있습니다. 
        "size": 1000,		//1억건중 group by 되는 한번에 출력 가능한 데이터 건수 이부분은 결과물 Result 시간에 따라 변경해주시면됩니다.
        "sources": [
          {
            "partnerNo": {		//partnerNo 로 group by 함
              "terms": {
                "field": "partnerNo",
                "missing_bucket": true
              }
            }
          }
        ]
      },
      "aggregations": {		
        "createDate": {     //partnerNo 로 group by 된 데이터에서 createDate max 인 데이터 추출
          "top_hits": {		// elastic search 집계에서 제공되는 내장함수 
            "_source": {	// 출력해야하는 컬럼 정의
              "includes": [
                "createDate",		//생성일
                "dealNo",			//상품번호
                "partnerNo"		//파트너번호
              ],
              "excludes": []
            },
            "size": 1,		// top hits 에 출력되는 size 값
            "version": false,
            "explain": false,
            "sort": [
              {
                "createDate": {
                  "order": "desc"		//생성일 기준 DESC
                }
              }
            ]
          }
        }
      }
       
    }
  }
}

집계 SQL 호출하면 아래와같은 형태에 Document 결과물을 전달받을수 있다.

1000개 group by 된 데이터를 가져오게되는데 aggs에서는 totalcount 값을 알수있는 방법이 존재하지 않습니다.

after_key 라는 값으로 가장 마지막 데이터 이후 또 1000개씩 가져올수 있으며 재귀호출로 after_key값이 null이 나올때까지 접근하도록

처리해주시면됩니다. 

{
  "took" : 9014,
  "timed_out" : false,
  "_shards" : {
    "total" : 9,
    "successful" : 9,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 42450300,		//집계할때 처리된 document 총 Count 를 나타냅니다. 
    "max_score" : 0.0,
    "hits" : [ ]			//size 0으로 설정했기 때문에 hits결과물은 출력되지 않습니다.
  },
  "aggregations" : {
    "partnergroup" : {
      "after_key" : {
        "partnerNo" : 933		//aggregations 에서는 정확한 total 값을 알수 없기때문에 after_key 가 안나올떄까지 재귀호출로 aggs 데이터를 next하여 가져오게 구현필요
      },
      "buckets" : [
        {
          "key" : {
            "partnerNo" : 1004,
          },
          "doc_count" : 6,
          "start_date" : {
            "hits" : {
              "total" : 6,
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "maindeal-2020",
                  "_type" : "doc",
                  "_id" : "83948302",
                  "_score" : null,
                  "_source" : {
                    "dealNo" : 92308234,
                    "partnerNo" : 1004,
                    "createDate" : "2020-01-27T15:00:00.000Z"
                  },
                  "sort" : [
                    1580137200000
                  ]
                }
              ]
            }
          }
        },
        .
        .
        .
        .
        .
        {
          "key" : {
            "partner_srl" : 933
          },
          "doc_count" : 5,
          "start_date" : {
            "hits" : {
              "total" : 5,
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "maindeal-2020",
                  "_type" : "doc",
                  "_id" : "3049965274",
                  "_score" : null,
                  "_source" : {
                    "dealNo" : 3049965274,
                    "partnerNo" : 933,
                    "createDate" : "2020-02-20T15:00:00.000Z"
                  },
                  "sort" : [
                    1582210800000
                  ]
                }
              ]
            }
          }
        }
      ]
    }
  }
}

 

그다음 1000개 size를 접근할때 SQL 질의는 아래와 같이 하시면됩니다.

GET product-*/_search
{
  "size": 0,	//집계쪽 데이터를 사용하기때문에 hits 결과물은 사용할필요가 없어 0으로 설정
  "query": {	//생성일과 상태값에 대한 조건으로 Query를 구현함	
    "bool": {
      "must": [
        {
          "range": {
            "createDate": {
              "from": "2019-12-01",
              "to": "2020-05-31",
              "include_lower": false,
              "include_upper": false,
              "boost": 1
            }
          }
        },
        {
          "term": {
            "status": {
              "value": "Y",
              "boost": 1
            }
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1
    }
  },
  "version": true,
  "aggregations": {
    "group": {				//group 으로 묶는 인지가능한 변수정도 의미
      "composite": {		//한개의 필드가 아닌 여러개 필드로 group 필요하다면 composite 를 이용하면되고 출력되는 결과물이 많아서 한번에 출력이 너무 오래걸리면 composite 를이용해서 페이징처럼 잘라서 가져올수 있습니다. 
        "size": 1000,		//1억건중 group by 되는 한번에 출력 가능한 데이터 건수 이부분은 결과물 Result 시간에 따라 변경해주시면됩니다.
        "sources": [
          {
            "partnerNo": {		//partnerNo 로 group by 함
              "terms": {
                "field": "partnerNo",
                "missing_bucket": true
              }
            }
          }
        ],
         "after_key" : {
           "partnerNo" : 933	//결과에서 전달받은 after_key 정보를 after_key 정보가 null이 나올때까지 재귀호출한다.
         }
      },
      "aggregations": {		
        "createDate": {     //partnerNo 로 group by 된 데이터에서 createDate max 인 데이터 추출
          "top_hits": {		// elastic search 집계에서 제공되는 내장함수 
            "_source": {	// 출력해야하는 컬럼 정의
              "includes": [
                "createDate",		//생성일
                "dealNo",			//상품번호
                "partnerNo"		//파트너번호
              ],
              "excludes": []
            },
            "size": 1,		// top hits 에 출력되는 size 값
            "version": false,
            "explain": false,
            "sort": [
              {
                "createDate": {
                  "order": "desc"		//생성일 기준 DESC
                }
              }
            ]
          }
        }
      }
       
    }
  }
}

 

 

집계형태인 데이터를 페이징으로 삽질하고 있을 많은 개발자분들에게 포스팅을 공유드립니다.

 

아래는 참고 사이트입니다.

 

Composite aggregation | Elasticsearch Reference [7.10] | Elastic

index sort can slowdown indexing, it is very important to test index sorting with your specific use case and dataset to ensure that it matches your requirement. If it doesn’t note that composite aggregations will also try to early terminate on non-sorted

www.elastic.co