빅데이타 & 머신러닝/Google BigQuery

빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개

Terry Cho 2016. 7. 31. 17:33

빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개


조대협 (http://bcho.tistroy.com)


빅데이타 분석에 있어서, 아키텍쳐적으로 중요한 모듈중의 하나는 여러 서버로 부터 생성되는 데이타를 어떻게 모을 것인가이다. 얼마전에, 일본의 사례를 보다가 눈에 띄는 솔루션이 있어서 주말을 통해서 이런 저런 테스트를 해봤다.


Embulk 소개

Embulk라는 솔루션인데, fluentd를 만들었던 사람이 만들었다고 한다.

여러 종류의 데이타 소스에서 데이타를 읽어서 로딩을 할 수 있다. 주요 특징을 보면

  • 플러그인 형태로 여러개의 소스와 타겟을 지원한다.
    jRuby로 개발이 되어서 ruby gem을 이용하여 손쉽게 플러그인을 설치할 수 있다.

  • 병렬 로딩이 가능하다.
    예를 들어 여러개의 파일을 동시에 로딩하거나 또는 하나의 큰 파일이라도 자동으로 여러개의 파일로 쪼게서 병렬로 로딩을 함으로써, 로딩 속도를 올릴 수 있다.

  • 변환이 가능하다.
    파일 포맷 변환뿐 아니라, 각 필드에 대한 형 변환 그리고, 간단한 필드 맵핑 등이 가능하다.

  • 스키마 예측 (Schema guessing)
    입력 데이타를 보고, 자동으로 입력 데이타의 스키마(테이블 구조)를 예측한다. 일일이 설정을 하려면 귀찮은 일인데, 자동으로 스키마를 인식해주시기 때문에, 설정양을 줄일 수 있다.

전제적인 개념은 미니 ETL과 유사하다고 볼 수 있는데, 그 사용법이 매우 쉽다.

Embulk 설치

이 글에서는 로컬에 있는 CSV 포맷의 파일을 구글 클라우드의 빅쿼리로 로딩하는 예제를 통해서 어떻게 Embulk를 사용하는지를 알아보겠다.

VM 생성

테스트는 구글 클라우드 VM에서 진행한다. 4코어 Ubuntu VM을 생성하고 테스트 데이타를 복사하였다.

VM을 생성할때, 빅쿼리 API를 사용할 것이기 때문에, Cloud API access scopes에 BigQuery API access 권한을 반드시 부여해야 한다.


이 예제에서는 VM 생성시 모든 Cloud API에 대한 사용권한을 부여한체 생성하였다. VM을 생성한 후에, 콘솔에서 VM 상세 정보를 확인해보면 위의 그림과 같이 “This instance has full API access to all Google Cloud services.”로, 모든  구글  클라우드 API에 대한 권한을 가지고 있는 것을 확인할 수 있다.

자바 설치

구글 Ubuntu VM에는 디폴트로 자바가 설치되어있지 않기 때문에, JVM을 설치한다.

% sudo apt-get update

% sudo apt-get install default-jre

Embulk 설치

JVM 설치가 끝났으면 Embulk를 설치해보자. 다음 명령어를 실행하면 Embulk 가 설치된다.

% curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
% source ~/.bashrc

Embulk는 ~/.embulk 디렉토리에 설치가 된다.

다음으로, 빅쿼리에 결과를 쓸 예정이기 때문에, 빅쿼리 Output 플러그인을 설치한다.

%embulk gem install embulk-output-bigquery


Embulk 로 빅쿼리에 CSV 파일 로딩하기

로딩할 데이타 살펴보기

로딩에 사용한 데이타는 게임 이벤트에 대한 데이타를 시뮬레이션 해놓은 것으로, 사용자가 NPC를 만나서 전투를 하는 각각의 이벤트를 기록해놓은 파일이다. 파일이름은 events000000000001 CSV 파일 포맷이고 총 1220395 레코드에, 243 MB의 크기이며 데이타 포맷은 다음과 같다.


파일 포맷은 다음과 같다.


eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


첫줄에, CSV 파일에 대한 컬럼명이 들어가고 두번째 줄 부터, “,” delimiter를 이용하여 각 컬럼을 구별하여 실 데이타가 들어가 있다.

스키마 예측을 통하여 자동으로 Config 파일 생성하기

이제, Embulk를 통해서 이 파일을 로딩하기 위해서, config 파일을 생성해보자.

Embulk에서 config 파일은 스키마 자동 예측을 통해서 자동으로 생성해낼 수 있다. Config 파일을 생성하기 위해서는 input과 output 에 대한 기본 정보를 기술해줘야 하는데, 다음과 같이 seed.yml 파일에 기본 정보를 기술한다.


in:  

 type: file  

 path_prefix: "/home/terrycho/data/events"

out:  

 type: bigquery


path_prefix에는 파일명을 정의하는데, /home/terrycho/data/events는 /home/terrycho/data/ 디렉토리내에 events*로 시작하는 모든 파일에 대해서 로딩을 하겠다는 정의이다.


seed.yml 파일 설정이 끝났으면 config 파일을 생성해보자

% embulk guess ./seed.yml -o config.yml

명령을 실행하면 아래와 같이 config.yml 파일이 생성된다.


in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: string}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out: {type: bigquery}


생성된 config.yml 파일을 보면 firstLogin 컬럼의 데이타 형이 string으로 되어 있는 것을 볼 수 있다. 빅쿼리 테이블에서 이 필드의 형은 실제로는 boolean이다. 아무래도 자동 인식이기 때문에, 이렇게 형들이 다르게 인식되는 경우가 있기 때문에, 생성 후에는 반드시 검토를 하고 알맞은 형으로 수정을 해줘야 한다.


다음으로 위의 파일에 데이타를 로딩할 빅쿼리에 대한 정보를 정의해줘야 한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: boolean}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 table: game_event

 source_format: CSV


“out:” 부분을 위와 같이 수정하였다.

mode는 append 모드로, 기존 파일에 데이타를 붙이는 모드로 하였다. auth_method에는 빅쿼리 API 호출을 위한 인증 방식을 정의하는데, 구글 클라우드의 VM에서 호출하기 때문에, compute_engine이라는 인증 방식을 사용하였다. (구글 클라우드의 VM에서 같은 프로젝트 내의 빅쿼리 API를 호출할 경우에는 별도의 인증을 생략할 수 있다.) 다른 인프라드에서 호출할 경우에는 IAM에서 Service account를 생성한 후에, json  파일을 다운 받아서, json 파일 인증 방식으로 변경하고, 다운 로드 받은 json 파일을 지정해주면 된다.

다음으로, project,dataset,table에, 로딩할 빅쿼리 데이블에 대한 프로젝트명, 데이타셋명, 테이블명을 기술해주었다. 그리고 마지막으로 입력 포맷이 CSV임을 source_format에서 CSV로 정의하였다.


이제 데이타 로딩을 위한 모든 준비가 끝났다.

Config 파일 테스트

데이타 로딩을 하기 전에, 이 config 파일이 제대로 작동하는지 테스트를 해보자

%embulk preview config.yml

의 명령어는 데이타를 읽어서 제대로 파싱을 하는지 설정 파일은 문제가 없는지 테스트를 해주는 명령어이다.

명령을 실행하면 다음과 같이 일부 데이타를 읽어서 파싱을 하고 결과를 보여주는 것을 볼 수 있다.



실행하기

테스트가 끝났으면 실제로 데이타를 로딩해보자. 로딩은 아래와 같이 embulk run 명령어를 사용하면 된다.

%embulk run config.yml

실제로 실행한 결과 약 12분이 소요되었다.


멀티 쓰레드를 이용하여 로딩 속도 올리기

앞에서 설명하였듯이, Embulk는 패레럴 로딩이 지원된다. 아래와 같이 config.yml 파일에 exec이라는 부분에, max_threads수와, min_output_tasks 수를 정해주면 되는데, min_output_tasks 수는 최소로 동시 실행할 로딩 테스크 수이다. 5로 정했기 때문에, 이 시나리오에서는 하나의 CSV 파일을 업로드 하기 때문에, 이 파일을 5개의 작은 파일로 잘라서 동시에 5개의 쓰레드로 동시에 업로딩 한다.


exec:

 max_threads: 20

 min_output_tasks: 5

in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

 :


실제로 테스트한 결과 디폴트 설정에서는 초당 약 1200줄을 업로드하였는데, 반하여, min_output_tasks를 5개로 하였을때는 초당 2000개 내외를 업로드 하였다. min_output_tasks를 10개,20개로 올려봤으나 성능은 비슷하였다. (아마 튜닝을 잘못한듯)

Parser-none으로 로딩 속도 올리기

앞의 시나리오는 데이타 라인을 각각 읽어서 컬럼을 일일이 파싱하고 이를 입력하도록 하는 시나리오였다.

만약에 CSV나 JSON 입력 파일이 빅쿼리 입력 포맷에 맞도록 이미 포매팅이 되어있다면, 일일이 파싱할 필요가 없다.

그냥 파일을 읽어서 파싱 없이 바로 빅쿼리에 insert만 하면되기 때문에, 이 경우에는 Parser를 제거하면 되는데, Parsing을 하지 않는 Parser로 embulk-parser-none이 있다.

이 Parser 다음과 같이 설치한다.

$ embulk gem install embulk-parser-none

다음 config 파일을 다음과 같이 수정한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001_nohead

 parser:

   type: none

   column_name: payload

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 schema_file: /home/terrycho/data/gameevent.schema.json

 table: game_event

 payload_column_index: 0


이때 중요한것중 하나는 데이타 파일 (CSV)파일 첫줄에 데이타에 대한 컬럼 정보가 들어가 있으면 안된다.

그래서 아래와 같이 원본 데이타 파일에서 첫줄을 지운다.

eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


다음 embulk run을 이용하여 이 config 파일을 실행해보면 같은 데이타인데도 로딩 타임이 약 50초 정도 밖에 소요되지 않는 것을 확인할 수 있다.

빅쿼리 관련 몇가지 추가 옵션

이외에도 다양한 옵션이 존재하기 때문에, 빅쿼리 output 플러그인 페이지인 https://github.com/embulk/embulk-output-bigquery 를 참고하기 바란다.

자동으로 중복을 제거하는 기능이나, 로딩할때 마다 동적으로 빅쿼리 테이블을 생성하는 기능등이 있으니 반드시 참고하기 바란다.

GCS를 경유하는 업로딩

Embulk의 패레럴 로딩이 좋기는 하지만 의외의 문제가 발생할 수 있는 부분이 하나가 있는데, 하나의 파일을 로딩하는데 Embulk는 여러개의 태스크로 병렬 처리를 하기 때문에, 빅쿼리 입장에서는 각각의 태스크가 빅쿼리 로딩 JOB으로 인식이 될 수 있다. 일반적으로 빅쿼리 JOB은 하루에 10,000개만 실행할 수 있는 제약을 가지고 있다. 그래서 만약에 데이타 로딩이 많을 경우 이런 병렬 로딩은 JOB 수를 깍아 먹는 원인이 될 수 있는데, bigquery output 플러그인에서는 다음과 같은 해법을 제공한다.


빅쿼리로 데이타를 로딩할때 GCS (Google Cloud Storage)를 사용하여, 와일드카드 (*)를 사용할 경우에는 하나의 디렉토리에 있는 여러 파일을 병렬로 로딩할 수 있으며, 이때 와일드 카드를 사용한 JOB은 하나의 JOB으로 인식된다. (병렬로 여러 파일을 로딩하더라도)


그래서 out 옵션에 다음과 같이 GCS  관련 옵션을 설정해주면 파일을 직접 로컬에서 로딩하는 것이 아니라, 처리를 다 끝난 파일을 GCS 버킷으로 업로딩한 후에, GCS 버킷에서 로딩을 하게 되기 때문에, JOB수를 줄일 수 있다.


out:

 type: bigquery

 gcs_bucket: bucket_name

 auto_create_gcs_bucket: false


성능과 활용도에 대한 분석

각 시나리오에 대한 성능 테스트 결과 값은 다음과 같다.

CSV를 구글에서 제공되는 bq load 명령어를 이용해도 108초가 나오는데 반해서, non-parser를 이용하면 파일을 자동으로 쪼게서 보내기 때문에, bq load를 이용하여 하나의 파일로 업로드 하는 것보다 높은 성능이 나온다.


시나리오

성능

bq load 명령어를 이용한 로딩

108초

CSV 파서를 사용한 경우

12분

non parser를 사용한 경우

50초


하나 고려할 사항은 Parser나 Filter의 경우 ruby로 개발된 것이 있고, java로 개발된 것들이 있는데, ruby로 개발된 플러그인의 경우 성능이 java 대비 많이 느리기 때문에 가급적이면 java로 개발된것을 사용하도록 한다.


다양한 데이타 소스와 저장소가 지원이 되고, 설정이 매우 간단하며 간단한 포맷 변환등이 지원되는 만큼, 쉽고 빠르게 데이타 연동 파이프라인을 구축하는데 활용도가 매우 높다. 이와 유사한 솔루션으로는 fluentd등이 있는데, fluentd는 조금 더 실시간 즉 스트리밍 데이타에 초점이 맞춰져 있으며, Embulk는 배치성 분석에 맞춰져 있다.


참고 자료


그리드형