블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개


조대협 (http://bcho.tistroy.com)


빅데이타 분석에 있어서, 아키텍쳐적으로 중요한 모듈중의 하나는 여러 서버로 부터 생성되는 데이타를 어떻게 모을 것인가이다. 얼마전에, 일본의 사례를 보다가 눈에 띄는 솔루션이 있어서 주말을 통해서 이런 저런 테스트를 해봤다.


Embulk 소개

Embulk라는 솔루션인데, fluentd를 만들었던 사람이 만들었다고 한다.

여러 종류의 데이타 소스에서 데이타를 읽어서 로딩을 할 수 있다. 주요 특징을 보면

  • 플러그인 형태로 여러개의 소스와 타겟을 지원한다.
    jRuby로 개발이 되어서 ruby gem을 이용하여 손쉽게 플러그인을 설치할 수 있다.

  • 병렬 로딩이 가능하다.
    예를 들어 여러개의 파일을 동시에 로딩하거나 또는 하나의 큰 파일이라도 자동으로 여러개의 파일로 쪼게서 병렬로 로딩을 함으로써, 로딩 속도를 올릴 수 있다.

  • 변환이 가능하다.
    파일 포맷 변환뿐 아니라, 각 필드에 대한 형 변환 그리고, 간단한 필드 맵핑 등이 가능하다.

  • 스키마 예측 (Schema guessing)
    입력 데이타를 보고, 자동으로 입력 데이타의 스키마(테이블 구조)를 예측한다. 일일이 설정을 하려면 귀찮은 일인데, 자동으로 스키마를 인식해주시기 때문에, 설정양을 줄일 수 있다.

전제적인 개념은 미니 ETL과 유사하다고 볼 수 있는데, 그 사용법이 매우 쉽다.

Embulk 설치

이 글에서는 로컬에 있는 CSV 포맷의 파일을 구글 클라우드의 빅쿼리로 로딩하는 예제를 통해서 어떻게 Embulk를 사용하는지를 알아보겠다.

VM 생성

테스트는 구글 클라우드 VM에서 진행한다. 4코어 Ubuntu VM을 생성하고 테스트 데이타를 복사하였다.

VM을 생성할때, 빅쿼리 API를 사용할 것이기 때문에, Cloud API access scopes에 BigQuery API access 권한을 반드시 부여해야 한다.


이 예제에서는 VM 생성시 모든 Cloud API에 대한 사용권한을 부여한체 생성하였다. VM을 생성한 후에, 콘솔에서 VM 상세 정보를 확인해보면 위의 그림과 같이 “This instance has full API access to all Google Cloud services.”로, 모든  구글  클라우드 API에 대한 권한을 가지고 있는 것을 확인할 수 있다.

자바 설치

구글 Ubuntu VM에는 디폴트로 자바가 설치되어있지 않기 때문에, JVM을 설치한다.

% sudo apt-get update

% sudo apt-get install default-jre

Embulk 설치

JVM 설치가 끝났으면 Embulk를 설치해보자. 다음 명령어를 실행하면 Embulk 가 설치된다.

% curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
% source ~/.bashrc

Embulk는 ~/.embulk 디렉토리에 설치가 된다.

다음으로, 빅쿼리에 결과를 쓸 예정이기 때문에, 빅쿼리 Output 플러그인을 설치한다.

%embulk gem install embulk-output-bigquery


Embulk 로 빅쿼리에 CSV 파일 로딩하기

로딩할 데이타 살펴보기

로딩에 사용한 데이타는 게임 이벤트에 대한 데이타를 시뮬레이션 해놓은 것으로, 사용자가 NPC를 만나서 전투를 하는 각각의 이벤트를 기록해놓은 파일이다. 파일이름은 events000000000001 CSV 파일 포맷이고 총 1220395 레코드에, 243 MB의 크기이며 데이타 포맷은 다음과 같다.


파일 포맷은 다음과 같다.


eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


첫줄에, CSV 파일에 대한 컬럼명이 들어가고 두번째 줄 부터, “,” delimiter를 이용하여 각 컬럼을 구별하여 실 데이타가 들어가 있다.

스키마 예측을 통하여 자동으로 Config 파일 생성하기

이제, Embulk를 통해서 이 파일을 로딩하기 위해서, config 파일을 생성해보자.

Embulk에서 config 파일은 스키마 자동 예측을 통해서 자동으로 생성해낼 수 있다. Config 파일을 생성하기 위해서는 input과 output 에 대한 기본 정보를 기술해줘야 하는데, 다음과 같이 seed.yml 파일에 기본 정보를 기술한다.


in:  

 type: file  

 path_prefix: "/home/terrycho/data/events"

out:  

 type: bigquery


path_prefix에는 파일명을 정의하는데, /home/terrycho/data/events는 /home/terrycho/data/ 디렉토리내에 events*로 시작하는 모든 파일에 대해서 로딩을 하겠다는 정의이다.


seed.yml 파일 설정이 끝났으면 config 파일을 생성해보자

% embulk guess ./seed.yml -o config.yml

명령을 실행하면 아래와 같이 config.yml 파일이 생성된다.


in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: string}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out: {type: bigquery}


생성된 config.yml 파일을 보면 firstLogin 컬럼의 데이타 형이 string으로 되어 있는 것을 볼 수 있다. 빅쿼리 테이블에서 이 필드의 형은 실제로는 boolean이다. 아무래도 자동 인식이기 때문에, 이렇게 형들이 다르게 인식되는 경우가 있기 때문에, 생성 후에는 반드시 검토를 하고 알맞은 형으로 수정을 해줘야 한다.


다음으로 위의 파일에 데이타를 로딩할 빅쿼리에 대한 정보를 정의해줘야 한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: boolean}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 table: game_event

 source_format: CSV


“out:” 부분을 위와 같이 수정하였다.

mode는 append 모드로, 기존 파일에 데이타를 붙이는 모드로 하였다. auth_method에는 빅쿼리 API 호출을 위한 인증 방식을 정의하는데, 구글 클라우드의 VM에서 호출하기 때문에, compute_engine이라는 인증 방식을 사용하였다. (구글 클라우드의 VM에서 같은 프로젝트 내의 빅쿼리 API를 호출할 경우에는 별도의 인증을 생략할 수 있다.) 다른 인프라드에서 호출할 경우에는 IAM에서 Service account를 생성한 후에, json  파일을 다운 받아서, json 파일 인증 방식으로 변경하고, 다운 로드 받은 json 파일을 지정해주면 된다.

다음으로, project,dataset,table에, 로딩할 빅쿼리 데이블에 대한 프로젝트명, 데이타셋명, 테이블명을 기술해주었다. 그리고 마지막으로 입력 포맷이 CSV임을 source_format에서 CSV로 정의하였다.


이제 데이타 로딩을 위한 모든 준비가 끝났다.

Config 파일 테스트

데이타 로딩을 하기 전에, 이 config 파일이 제대로 작동하는지 테스트를 해보자

%embulk preview config.yml

의 명령어는 데이타를 읽어서 제대로 파싱을 하는지 설정 파일은 문제가 없는지 테스트를 해주는 명령어이다.

명령을 실행하면 다음과 같이 일부 데이타를 읽어서 파싱을 하고 결과를 보여주는 것을 볼 수 있다.



실행하기

테스트가 끝났으면 실제로 데이타를 로딩해보자. 로딩은 아래와 같이 embulk run 명령어를 사용하면 된다.

%embulk run config.yml

실제로 실행한 결과 약 12분이 소요되었다.


멀티 쓰레드를 이용하여 로딩 속도 올리기

앞에서 설명하였듯이, Embulk는 패레럴 로딩이 지원된다. 아래와 같이 config.yml 파일에 exec이라는 부분에, max_threads수와, min_output_tasks 수를 정해주면 되는데, min_output_tasks 수는 최소로 동시 실행할 로딩 테스크 수이다. 5로 정했기 때문에, 이 시나리오에서는 하나의 CSV 파일을 업로드 하기 때문에, 이 파일을 5개의 작은 파일로 잘라서 동시에 5개의 쓰레드로 동시에 업로딩 한다.


exec:

 max_threads: 20

 min_output_tasks: 5

in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

 :


실제로 테스트한 결과 디폴트 설정에서는 초당 약 1200줄을 업로드하였는데, 반하여, min_output_tasks를 5개로 하였을때는 초당 2000개 내외를 업로드 하였다. min_output_tasks를 10개,20개로 올려봤으나 성능은 비슷하였다. (아마 튜닝을 잘못한듯)

Parser-none으로 로딩 속도 올리기

앞의 시나리오는 데이타 라인을 각각 읽어서 컬럼을 일일이 파싱하고 이를 입력하도록 하는 시나리오였다.

만약에 CSV나 JSON 입력 파일이 빅쿼리 입력 포맷에 맞도록 이미 포매팅이 되어있다면, 일일이 파싱할 필요가 없다.

그냥 파일을 읽어서 파싱 없이 바로 빅쿼리에 insert만 하면되기 때문에, 이 경우에는 Parser를 제거하면 되는데, Parsing을 하지 않는 Parser로 embulk-parser-none이 있다.

이 Parser 다음과 같이 설치한다.

$ embulk gem install embulk-parser-none

다음 config 파일을 다음과 같이 수정한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001_nohead

 parser:

   type: none

   column_name: payload

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 schema_file: /home/terrycho/data/gameevent.schema.json

 table: game_event

 payload_column_index: 0


이때 중요한것중 하나는 데이타 파일 (CSV)파일 첫줄에 데이타에 대한 컬럼 정보가 들어가 있으면 안된다.

그래서 아래와 같이 원본 데이타 파일에서 첫줄을 지운다.

eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


다음 embulk run을 이용하여 이 config 파일을 실행해보면 같은 데이타인데도 로딩 타임이 약 50초 정도 밖에 소요되지 않는 것을 확인할 수 있다.

빅쿼리 관련 몇가지 추가 옵션

이외에도 다양한 옵션이 존재하기 때문에, 빅쿼리 output 플러그인 페이지인 https://github.com/embulk/embulk-output-bigquery 를 참고하기 바란다.

자동으로 중복을 제거하는 기능이나, 로딩할때 마다 동적으로 빅쿼리 테이블을 생성하는 기능등이 있으니 반드시 참고하기 바란다.

GCS를 경유하는 업로딩

Embulk의 패레럴 로딩이 좋기는 하지만 의외의 문제가 발생할 수 있는 부분이 하나가 있는데, 하나의 파일을 로딩하는데 Embulk는 여러개의 태스크로 병렬 처리를 하기 때문에, 빅쿼리 입장에서는 각각의 태스크가 빅쿼리 로딩 JOB으로 인식이 될 수 있다. 일반적으로 빅쿼리 JOB은 하루에 10,000개만 실행할 수 있는 제약을 가지고 있다. 그래서 만약에 데이타 로딩이 많을 경우 이런 병렬 로딩은 JOB 수를 깍아 먹는 원인이 될 수 있는데, bigquery output 플러그인에서는 다음과 같은 해법을 제공한다.


빅쿼리로 데이타를 로딩할때 GCS (Google Cloud Storage)를 사용하여, 와일드카드 (*)를 사용할 경우에는 하나의 디렉토리에 있는 여러 파일을 병렬로 로딩할 수 있으며, 이때 와일드 카드를 사용한 JOB은 하나의 JOB으로 인식된다. (병렬로 여러 파일을 로딩하더라도)


그래서 out 옵션에 다음과 같이 GCS  관련 옵션을 설정해주면 파일을 직접 로컬에서 로딩하는 것이 아니라, 처리를 다 끝난 파일을 GCS 버킷으로 업로딩한 후에, GCS 버킷에서 로딩을 하게 되기 때문에, JOB수를 줄일 수 있다.


out:

 type: bigquery

 gcs_bucket: bucket_name

 auto_create_gcs_bucket: false


성능과 활용도에 대한 분석

각 시나리오에 대한 성능 테스트 결과 값은 다음과 같다.

CSV를 구글에서 제공되는 bq load 명령어를 이용해도 108초가 나오는데 반해서, non-parser를 이용하면 파일을 자동으로 쪼게서 보내기 때문에, bq load를 이용하여 하나의 파일로 업로드 하는 것보다 높은 성능이 나온다.


시나리오

성능

bq load 명령어를 이용한 로딩

108초

CSV 파서를 사용한 경우

12분

non parser를 사용한 경우

50초


하나 고려할 사항은 Parser나 Filter의 경우 ruby로 개발된 것이 있고, java로 개발된 것들이 있는데, ruby로 개발된 플러그인의 경우 성능이 java 대비 많이 느리기 때문에 가급적이면 java로 개발된것을 사용하도록 한다.


다양한 데이타 소스와 저장소가 지원이 되고, 설정이 매우 간단하며 간단한 포맷 변환등이 지원되는 만큼, 쉽고 빠르게 데이타 연동 파이프라인을 구축하는데 활용도가 매우 높다. 이와 유사한 솔루션으로는 fluentd등이 있는데, fluentd는 조금 더 실시간 즉 스트리밍 데이타에 초점이 맞춰져 있으며, Embulk는 배치성 분석에 맞춰져 있다.


참고 자료


데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다.

데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #1/2


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다. 스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

개념 소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


먼저 dataflow의 개념을 이해해보도록 하자. 아래 그림은 dataflow에 대한 컨셉이다.


데이타가 들어오면, Pipeline IO에서 데이타를 읽어드린다. 읽어드린 데이타는 PCollection이라는 데이타 형으로 생성이 되고, 이 PCollection 데이타는 여러개의 중첩된 PTransform을 통해서 변환 및 가공이 된다. 가공이 끝난 결과는 마지막으로 Pipeline IO의 Output을 통해서 데이타 저장소 (빅쿼리나 파일등)에 저장이 된다.  이 Pipeline IO에서 부터 PTransform을 걸친 일련의 프로세싱 과정을 Pipeline이라고 한다.


예를 들어 설명해보자, 문자열을 입력 받은 후에, 문자열에서 단어를 추출하여, 각 단어의 개수를 세어 주는 파이프라인이 있다고 하자.


첫번째 실행에서 “Hello my daddy”라는 문자열이 입력되었다. 첫번째 Transform인 Extract words Transform을 거치면서, “Hello my daddy” 라는 문자열은 “Hello”, “my”, “daddy” 라는 각각의 단어로 쪼게진다. 다음으로 Count Element 라는 Transform에 의해서, 각 단어의 수를 세어서 저장한다. “Hello”는 1번, “my”는 1번, “daddy”는 1번 의 값이 저장된다.


두번째 실행에서 “Hello my bro” 라는 문자열이 들어오면, Extract words 에 의해서 “Hello”, “my”, “bro”라는 각각의 단어로 쪼게지고, Count Element Transform에서 이전에 세어놓은 단어의 수와 합산하여 계산이 된 결과가 저장이 된다. “Hello”는 이전에 한번 카운트가 되었고 이번에도 들어왔기 때문에, 2가 되고, 같은 원리로 “my”라는 단어의 카운트도 2가된다. “bro” 라는 단어는 이번에 처음 들어왔기 때문에 새 값으로 1로 저장된다.




세번째 “Hello my mom” 이라는 문자열이 들어오면 앞의 두개의 문자열과 마찬가지로 간 단어로 쪼게진 다음 Count Element에 의해서 각 단어의 수가 카운트되어 기존의 값과 누적 합산된다. 모든 데이타를 다 읽어서 처리가 끝나면, 저장된 결과를 Pipeline IO를 통해서 파일에 그 결과를 쓰게 된다.

배치와 스트리밍 처리

dataflow는 위에서 설명한 파이프라인의 개념을 배치와 스트리밍 처리 두가지 개념 모두로 지원해서 처리가 가능하다. 데이타가 파일과 같이 이미 쓰여지고 더 이상 증가나 수정이 되지 않은 데이타에 대해서는 일괄로 데이타를 읽어서 결과를 내는 배치 처리가 가능하고, 계속해서 들어오고 있는 데이타 (트위터 피드, 로그 데이타)는 스트리밍으로 처리가 가능하다.

윈도우의 개념

배치 처리야, 데이타 처리가 모두 끝난 후에 결과를 내보낸다고 하지만, 그렇다면 스트리밍 데이타는 계속해서 데이타가 들어오고 있는데, 언제 결과를 내보내야 할까?

개별 데이타를 변환해서 저장하는 경우에야, 개별 데이타 처리가 끝난후에 각각 하나씩 저장한다고 하지만, 위와 같이 들어오는 데이타에서 특정데이타 들에 대한 합이나 평균과 같은 처리를 하는 경우 어느 기간 단위로 해야 할까? 스트리밍 처리에서는 이러한 개념을 다루기 위해서 윈도우라는 개념을 사용한다.


예를 들어, “1시~1시10분까지 들어온 문자열에 대해서 문자열에 들어 있는 각 단어의 수를 카운트해서 출력해주는 기능" 이나, 또는 “매 5분 단위로 현재 시간에서 10분전까지 들어온 문자열에 대해서 각 단어의 수를 카운트 해서 출력 해주는 기능" 과 같이 작은 시간 기간의 단위를 가지고 그 기간 단위로 계산 하는 방법이며, 이 시간 단위를 윈도우(Window)라고 한다.


Fixed Window (고정 크기 윈도우)

앞의 예에서 1시~1시10분, 1시10분~1시20분 과 같이 고정된 크기를 가지는 윈도우의 개념을 Fixed Window라고 한다.


Sliding Window (슬라이딩 윈도우)

앞의 예에서와 같이 윈도우가 상대적인 시간 (이전 10분까지)의 개념을 가지면서, 다른 윈도우와 중첩되는 윈도우를 슬라이딩 윈도우라고 한다.


그림과 같이 1시10분의 윈도우는 1시 10분의 10분전인 1시에서 부터, 현재 시간 까지인 1시10분까지 값을 읽어서 처리하고 윈도우가 끝나는 시점인 1:10분에 그 값을 저장한다. 윈도우의 간격은 5분 단위로, 1시 15분에는 1시 15분의 10분전인 1시05분 부터 현재 시간인 1시15분까지 들어온 데이타에 대해서 처리를 하고 그 결과 값을 1시15분에 저장한다.

Session window (세션 윈도우)

다음은 세션 윈도우라는 개념을 가지고 있는데, 이를 이해하기 위해서는 먼저 세션의 개념을 먼저 이해해야 한다.

세션이랑 사용자가 한번 시스템을 사용한 후, 사용이 끝날때 까지의 기간을 정의한다. 스트리밍 시스템에서는 사용자 로그인이나 로그 아웃을 별도의 이벤트로 잡는 것이 아니기 때문에, 데이타가 들어온 후에, 일정 시간 이후에 그 사용자에 대한 데이타가 들어오지 않으면, 세션이 종료 된것으로 판단한다.

일반 적인 웹 프로그램에서 HttpSession과 같은 원리인데, 웹 사이트에 접속한 후, Session time out 시간이 지날때 까지 사용자가 별도의 request를 보내지 않으면 세션을 끊는 것과 같은 원리이다.

아래 그림은 세션 윈도우의 개념을 설명하기 위한 윈도우인데, User A와 User B의 데이타가 들어오고 있다고 하자.


그리고 세션 타임 아웃이 10분으로 정의했다. 즉 같은 사용자에 대해서 데이타가 들어온 후, 10분 내에 추가 데이타가 들어오지 않으면 세션이 종료 된것으로 판단한다.


User A는 1:00 에 첫 데이타가 들어와서1:00~1:10 사이에 두번째 데이타가 들어왔고, 1:10~1:20 사이에 세번째 데이타가 들어온 후, 네번째 데이타는 10분이 지난 후에 들어왔다. 그래서 1:00~1:20 까지가 하나의 세션이 되고, 이것이 User A에 대한 1:00~1:20의 세션 윈도우가 된다. 네번째 데이타 부터는 새로운 윈도우로 처리가 되는데, 1:40~1:50 사이에 다섯번째 데이타가 도착한후, 그 이후로 도착하지 않았기 때문에 이게 두번째 윈도우가 되고, 1:30~1:50의 시간 간격을 가지는 User A의 두번째 윈도우가 된다.

각 윈도우의 값은 User A의 1:00~1:20 윈도우의 값은 (1+1+1)로 3이 되고, 두번째 윈도우인 1:30~1:50 윈도우는 (2.5+1)로 3.5가 된다.


User B는 1:10에 데이타가 들어오고, 10분 후인 1:20까지 데이타가 들어오지 않고 그 이후 1:30 분에 두번째 데이타가 들어왔기 때문에, 1:10~1:10 길이의 첫번째 세션 윈도우가 생성된다. 다음 으로 1:30분에 데이타가 들어왔기 때문에 두번째 세션 윈도우를 생성하고, 2:00까지 계속 데이타가 들어오다가 멈추고 2:10까지 새로운 데이타가 들어오지 않았기 때문에 1:30~2:00 까지 두번째 윈도우로 취급한다.


이 Session Window는 앞서 언급한 Fixed Window나, Sliding Window와는 다르게, User A, User B와 사용자 단위와 같이 어떤 키에 따라서 개별적으로 윈도우를 처리 한다.  즉 Session Window는 User A나 USer B처럼 특정 키에 종속된 윈도우만을 갖는다.


반대로 Fixed Window나 Sliding Window는 키단위의 윈도우가 아니라 그 시간 범위내에 들어 있는 모든 키에 대한 값을 처리한다..

Fixed Window의 경우에는 30분 사이즈를 갖는 윈도우라고 하면 아래 그림과 같이


1:00~1:30 윈도우는 User A의 값 = (1+1+1) 과 User B의 값 1을 합쳐서 총 4가 되고

1:30~2:00 윈도우는 User A값 = (2.5+1)과 User B의 값 = (2+2+2) 를 합쳐서 9.5가 된다.


Sliding Window의 경우에는 길이가 30분이고, 주기가 20분인 Sliding 윈도우라고 할때,


1:00~1:30, 1:20~1:50, 1:40~2:00 3개의 Sliding 윈도우가 생성된다.

1:00~1:30 윈도우는 User A의 값=(1+1+1)과 User B의 값 1을 합산하여 4가 되고

1:20~1:50 윈도우는 User A의 값 = (2.5+1)과 User B의 값 =(2+2)를 합산하여 7.5가 된다.

1:40~2:00 윈도우는 User A의 값 = (2.5+1)과 User B의 값 (2+2)를 합산하여 7.5가 된다.