블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개


조대협 (http://bcho.tistroy.com)


빅데이타 분석에 있어서, 아키텍쳐적으로 중요한 모듈중의 하나는 여러 서버로 부터 생성되는 데이타를 어떻게 모을 것인가이다. 얼마전에, 일본의 사례를 보다가 눈에 띄는 솔루션이 있어서 주말을 통해서 이런 저런 테스트를 해봤다.


Embulk 소개

Embulk라는 솔루션인데, fluentd를 만들었던 사람이 만들었다고 한다.

여러 종류의 데이타 소스에서 데이타를 읽어서 로딩을 할 수 있다. 주요 특징을 보면

  • 플러그인 형태로 여러개의 소스와 타겟을 지원한다.
    jRuby로 개발이 되어서 ruby gem을 이용하여 손쉽게 플러그인을 설치할 수 있다.

  • 병렬 로딩이 가능하다.
    예를 들어 여러개의 파일을 동시에 로딩하거나 또는 하나의 큰 파일이라도 자동으로 여러개의 파일로 쪼게서 병렬로 로딩을 함으로써, 로딩 속도를 올릴 수 있다.

  • 변환이 가능하다.
    파일 포맷 변환뿐 아니라, 각 필드에 대한 형 변환 그리고, 간단한 필드 맵핑 등이 가능하다.

  • 스키마 예측 (Schema guessing)
    입력 데이타를 보고, 자동으로 입력 데이타의 스키마(테이블 구조)를 예측한다. 일일이 설정을 하려면 귀찮은 일인데, 자동으로 스키마를 인식해주시기 때문에, 설정양을 줄일 수 있다.

전제적인 개념은 미니 ETL과 유사하다고 볼 수 있는데, 그 사용법이 매우 쉽다.

Embulk 설치

이 글에서는 로컬에 있는 CSV 포맷의 파일을 구글 클라우드의 빅쿼리로 로딩하는 예제를 통해서 어떻게 Embulk를 사용하는지를 알아보겠다.

VM 생성

테스트는 구글 클라우드 VM에서 진행한다. 4코어 Ubuntu VM을 생성하고 테스트 데이타를 복사하였다.

VM을 생성할때, 빅쿼리 API를 사용할 것이기 때문에, Cloud API access scopes에 BigQuery API access 권한을 반드시 부여해야 한다.


이 예제에서는 VM 생성시 모든 Cloud API에 대한 사용권한을 부여한체 생성하였다. VM을 생성한 후에, 콘솔에서 VM 상세 정보를 확인해보면 위의 그림과 같이 “This instance has full API access to all Google Cloud services.”로, 모든  구글  클라우드 API에 대한 권한을 가지고 있는 것을 확인할 수 있다.

자바 설치

구글 Ubuntu VM에는 디폴트로 자바가 설치되어있지 않기 때문에, JVM을 설치한다.

% sudo apt-get update

% sudo apt-get install default-jre

Embulk 설치

JVM 설치가 끝났으면 Embulk를 설치해보자. 다음 명령어를 실행하면 Embulk 가 설치된다.

% curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
% source ~/.bashrc

Embulk는 ~/.embulk 디렉토리에 설치가 된다.

다음으로, 빅쿼리에 결과를 쓸 예정이기 때문에, 빅쿼리 Output 플러그인을 설치한다.

%embulk gem install embulk-output-bigquery


Embulk 로 빅쿼리에 CSV 파일 로딩하기

로딩할 데이타 살펴보기

로딩에 사용한 데이타는 게임 이벤트에 대한 데이타를 시뮬레이션 해놓은 것으로, 사용자가 NPC를 만나서 전투를 하는 각각의 이벤트를 기록해놓은 파일이다. 파일이름은 events000000000001 CSV 파일 포맷이고 총 1220395 레코드에, 243 MB의 크기이며 데이타 포맷은 다음과 같다.


파일 포맷은 다음과 같다.


eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


첫줄에, CSV 파일에 대한 컬럼명이 들어가고 두번째 줄 부터, “,” delimiter를 이용하여 각 컬럼을 구별하여 실 데이타가 들어가 있다.

스키마 예측을 통하여 자동으로 Config 파일 생성하기

이제, Embulk를 통해서 이 파일을 로딩하기 위해서, config 파일을 생성해보자.

Embulk에서 config 파일은 스키마 자동 예측을 통해서 자동으로 생성해낼 수 있다. Config 파일을 생성하기 위해서는 input과 output 에 대한 기본 정보를 기술해줘야 하는데, 다음과 같이 seed.yml 파일에 기본 정보를 기술한다.


in:  

 type: file  

 path_prefix: "/home/terrycho/data/events"

out:  

 type: bigquery


path_prefix에는 파일명을 정의하는데, /home/terrycho/data/events는 /home/terrycho/data/ 디렉토리내에 events*로 시작하는 모든 파일에 대해서 로딩을 하겠다는 정의이다.


seed.yml 파일 설정이 끝났으면 config 파일을 생성해보자

% embulk guess ./seed.yml -o config.yml

명령을 실행하면 아래와 같이 config.yml 파일이 생성된다.


in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: string}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out: {type: bigquery}


생성된 config.yml 파일을 보면 firstLogin 컬럼의 데이타 형이 string으로 되어 있는 것을 볼 수 있다. 빅쿼리 테이블에서 이 필드의 형은 실제로는 boolean이다. 아무래도 자동 인식이기 때문에, 이렇게 형들이 다르게 인식되는 경우가 있기 때문에, 생성 후에는 반드시 검토를 하고 알맞은 형으로 수정을 해줘야 한다.


다음으로 위의 파일에 데이타를 로딩할 빅쿼리에 대한 정보를 정의해줘야 한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: boolean}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 table: game_event

 source_format: CSV


“out:” 부분을 위와 같이 수정하였다.

mode는 append 모드로, 기존 파일에 데이타를 붙이는 모드로 하였다. auth_method에는 빅쿼리 API 호출을 위한 인증 방식을 정의하는데, 구글 클라우드의 VM에서 호출하기 때문에, compute_engine이라는 인증 방식을 사용하였다. (구글 클라우드의 VM에서 같은 프로젝트 내의 빅쿼리 API를 호출할 경우에는 별도의 인증을 생략할 수 있다.) 다른 인프라드에서 호출할 경우에는 IAM에서 Service account를 생성한 후에, json  파일을 다운 받아서, json 파일 인증 방식으로 변경하고, 다운 로드 받은 json 파일을 지정해주면 된다.

다음으로, project,dataset,table에, 로딩할 빅쿼리 데이블에 대한 프로젝트명, 데이타셋명, 테이블명을 기술해주었다. 그리고 마지막으로 입력 포맷이 CSV임을 source_format에서 CSV로 정의하였다.


이제 데이타 로딩을 위한 모든 준비가 끝났다.

Config 파일 테스트

데이타 로딩을 하기 전에, 이 config 파일이 제대로 작동하는지 테스트를 해보자

%embulk preview config.yml

의 명령어는 데이타를 읽어서 제대로 파싱을 하는지 설정 파일은 문제가 없는지 테스트를 해주는 명령어이다.

명령을 실행하면 다음과 같이 일부 데이타를 읽어서 파싱을 하고 결과를 보여주는 것을 볼 수 있다.



실행하기

테스트가 끝났으면 실제로 데이타를 로딩해보자. 로딩은 아래와 같이 embulk run 명령어를 사용하면 된다.

%embulk run config.yml

실제로 실행한 결과 약 12분이 소요되었다.


멀티 쓰레드를 이용하여 로딩 속도 올리기

앞에서 설명하였듯이, Embulk는 패레럴 로딩이 지원된다. 아래와 같이 config.yml 파일에 exec이라는 부분에, max_threads수와, min_output_tasks 수를 정해주면 되는데, min_output_tasks 수는 최소로 동시 실행할 로딩 테스크 수이다. 5로 정했기 때문에, 이 시나리오에서는 하나의 CSV 파일을 업로드 하기 때문에, 이 파일을 5개의 작은 파일로 잘라서 동시에 5개의 쓰레드로 동시에 업로딩 한다.


exec:

 max_threads: 20

 min_output_tasks: 5

in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

 :


실제로 테스트한 결과 디폴트 설정에서는 초당 약 1200줄을 업로드하였는데, 반하여, min_output_tasks를 5개로 하였을때는 초당 2000개 내외를 업로드 하였다. min_output_tasks를 10개,20개로 올려봤으나 성능은 비슷하였다. (아마 튜닝을 잘못한듯)

Parser-none으로 로딩 속도 올리기

앞의 시나리오는 데이타 라인을 각각 읽어서 컬럼을 일일이 파싱하고 이를 입력하도록 하는 시나리오였다.

만약에 CSV나 JSON 입력 파일이 빅쿼리 입력 포맷에 맞도록 이미 포매팅이 되어있다면, 일일이 파싱할 필요가 없다.

그냥 파일을 읽어서 파싱 없이 바로 빅쿼리에 insert만 하면되기 때문에, 이 경우에는 Parser를 제거하면 되는데, Parsing을 하지 않는 Parser로 embulk-parser-none이 있다.

이 Parser 다음과 같이 설치한다.

$ embulk gem install embulk-parser-none

다음 config 파일을 다음과 같이 수정한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001_nohead

 parser:

   type: none

   column_name: payload

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 schema_file: /home/terrycho/data/gameevent.schema.json

 table: game_event

 payload_column_index: 0


이때 중요한것중 하나는 데이타 파일 (CSV)파일 첫줄에 데이타에 대한 컬럼 정보가 들어가 있으면 안된다.

그래서 아래와 같이 원본 데이타 파일에서 첫줄을 지운다.

eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


다음 embulk run을 이용하여 이 config 파일을 실행해보면 같은 데이타인데도 로딩 타임이 약 50초 정도 밖에 소요되지 않는 것을 확인할 수 있다.

빅쿼리 관련 몇가지 추가 옵션

이외에도 다양한 옵션이 존재하기 때문에, 빅쿼리 output 플러그인 페이지인 https://github.com/embulk/embulk-output-bigquery 를 참고하기 바란다.

자동으로 중복을 제거하는 기능이나, 로딩할때 마다 동적으로 빅쿼리 테이블을 생성하는 기능등이 있으니 반드시 참고하기 바란다.

GCS를 경유하는 업로딩

Embulk의 패레럴 로딩이 좋기는 하지만 의외의 문제가 발생할 수 있는 부분이 하나가 있는데, 하나의 파일을 로딩하는데 Embulk는 여러개의 태스크로 병렬 처리를 하기 때문에, 빅쿼리 입장에서는 각각의 태스크가 빅쿼리 로딩 JOB으로 인식이 될 수 있다. 일반적으로 빅쿼리 JOB은 하루에 10,000개만 실행할 수 있는 제약을 가지고 있다. 그래서 만약에 데이타 로딩이 많을 경우 이런 병렬 로딩은 JOB 수를 깍아 먹는 원인이 될 수 있는데, bigquery output 플러그인에서는 다음과 같은 해법을 제공한다.


빅쿼리로 데이타를 로딩할때 GCS (Google Cloud Storage)를 사용하여, 와일드카드 (*)를 사용할 경우에는 하나의 디렉토리에 있는 여러 파일을 병렬로 로딩할 수 있으며, 이때 와일드 카드를 사용한 JOB은 하나의 JOB으로 인식된다. (병렬로 여러 파일을 로딩하더라도)


그래서 out 옵션에 다음과 같이 GCS  관련 옵션을 설정해주면 파일을 직접 로컬에서 로딩하는 것이 아니라, 처리를 다 끝난 파일을 GCS 버킷으로 업로딩한 후에, GCS 버킷에서 로딩을 하게 되기 때문에, JOB수를 줄일 수 있다.


out:

 type: bigquery

 gcs_bucket: bucket_name

 auto_create_gcs_bucket: false


성능과 활용도에 대한 분석

각 시나리오에 대한 성능 테스트 결과 값은 다음과 같다.

CSV를 구글에서 제공되는 bq load 명령어를 이용해도 108초가 나오는데 반해서, non-parser를 이용하면 파일을 자동으로 쪼게서 보내기 때문에, bq load를 이용하여 하나의 파일로 업로드 하는 것보다 높은 성능이 나온다.


시나리오

성능

bq load 명령어를 이용한 로딩

108초

CSV 파서를 사용한 경우

12분

non parser를 사용한 경우

50초


하나 고려할 사항은 Parser나 Filter의 경우 ruby로 개발된 것이 있고, java로 개발된 것들이 있는데, ruby로 개발된 플러그인의 경우 성능이 java 대비 많이 느리기 때문에 가급적이면 java로 개발된것을 사용하도록 한다.


다양한 데이타 소스와 저장소가 지원이 되고, 설정이 매우 간단하며 간단한 포맷 변환등이 지원되는 만큼, 쉽고 빠르게 데이타 연동 파이프라인을 구축하는데 활용도가 매우 높다. 이와 유사한 솔루션으로는 fluentd등이 있는데, fluentd는 조금 더 실시간 즉 스트리밍 데이타에 초점이 맞춰져 있으며, Embulk는 배치성 분석에 맞춰져 있다.


참고 자료


분산 로그 수집기 Fluentd 소개


조대협 (http://bcho.tistory.com)



요즘 들어 빅데이타 분석 관련 기술들을 보다보니, 역시나 여러 데이타 소스에서 데이타를 수집해 오는 부분이 여러 데이타 소스를 커버해야 하고, 분산된 여러 서버에서 데이타를 수집해야 하는 만큼 수집 컴포넌트의 중요성이 점점 더 올라가는 것 같다.

그래서 요즘 빅데이타를 위한 데이타(및 로그) 수집 플랫폼을 보고 있는데, 예전 Flume 등 여러 로그 수집 솔루션이 있었는 것에 비해서 조금 정리된 느낌이라고나 할까?  Scribed, Fluentd 그리고 ELK (Elastic Search + Logstash + Kibana 조합)에서 사용되는 Logstash등이 있는데, 대부분 Fluentd와 Logstash로 수렴 되는 것 같다. 양쪽 모두 오픈소스이고 별도의 엔터프라이즈 라이센스 정책을 가지고 있다.

Logstash는 아키텍쳐 적응에 대한 유연성과 연동 솔루션에 대한 호환성을 강조하고 있기 때문에 타 솔루션과 연동이 강하고 반면, Fluentd는 아키텍쳐의 단순성과 이를 기반으로 한 안정성을 초점을 두고 있다. 그래서 아키텍쳐 구성이나 설정이 간단하다.


이 글에서는 Fluentd에 대한 간략한 개념과 사용 방법에 대해서 알아보도록 하겠다.

Fluentd를 이용한 로그 수집 아키텍쳐

Fluentd를 이용한 로그 수집 아키텍쳐를 살펴보면 다음과 같다.

아래 그림과 같이 각 서버에, Fluentd를 설치하면, 서버에서 기동되고 있는 서버(또는 애플리케이션)에서 로그를 수집해서 중앙 로그 저장소 (Log Store)로 전송 하는 방식이다.




위의 그림은 가장 기본적인 구조로 Fluentd가 로그 수집 에이전트 역할만을 하는 구조인데, 이에 더해서 다음과 같이 각 서버에서 Fluentd에서 수집한 로그를 다른 Fluentd로 보내서 이 Fluentd가 최종적으로 로그 저장소에 저장하도록 할 수 도 있다.


중간에 fluentd를 넣는 이유는, 이 fluentd가 앞에서 들어오는 로그들을 수집해서 로그 저장소에 넣기 전에 로그 트래픽을 Throttling (속도 조절)을 해서 로그 저장소의 용량에 맞게 트래픽을 조정을 할 수 있다.

또는 다음 그림과 같이 로그를 여러개의 저장소에 복제해서 저장하거나 로그의 종류에 따라서 각각 다른 로그 저장소로 라우팅이 가능하다.


Fluentd 내부 구조

Fluentd를 이용해서 로그 수집 아키텍쳐를 구성하는 방법을 대략적으로 알아보았는데, 그렇다면 Fluentd 자체의 구조는 어떻게 되어 있을까?

Fluentd는 크게 다음 그림과 같이 Input,Parser,Engine,Filter,Buffer,Ouput,Formatter 7개의 컴포넌트로 구성이 된다.  7개의 컴포넌트중 Engine을 제외한 나머지 6개는 플러그인 형태로 제공이 되서 사용자가 설정이 가능하다.

일반적인 데이타 흐름은 Input → Engine → Output 의 흐름으로 이루어 지고,  Parser, Buffer, Filter, Formatter 등은 설정에 따라서 선택적으로 추가 또는 삭제할 수 있다.



Input

Input은 로그를 수집하는 플러그인으로, 다양한 로그 소스를 지원한다. HTTP, tail, TCP 등 기본 플러그인 이외에도, 확장 플러그인을 통해서 다양한 서버나 애플리케이션으로 부터 다양한 포맷의 데이타를 수집할 수 있도록 해준다.

Parser (Optional)

Input 플러그인을 통해서 데이타를 읽어도 데이타 포맷이 Fluentd에서 지원하지 않는 데이타 포맷인 경우가 있기 때문에, 이 데이타를 파싱 하기 위해서, Parser 플러그인을 선택적으로 사용할 수 있다. Regular expression  기반으로 스트링을 Parsing 하는 플러그인 뿐 아니라, apache, nginx, syslog등 다양한 포맷의 데이타를 파싱할 수 있는 플러그인을 제공한다.

Filter (Optional)

Filter 플러그인을 읽어드린 데이타를 output으로 보내기 전에, 다음과 같은 3가지 기능을 한다.

  • 필터링

  • 데이타 필드 추가

  • 데이타 필드 삭제 또는 특정 필드 마스킹

필터링은 특정 데이타만 output 필드로 보내고, 나머지는 버리도록 한다. 예를 들어 로그 데이타에 “seoul”이라는 문자열이 있을 경우만 로그 서버로 보내거나 “error”, “warning”과 같은 특정 패턴이 있을 경우에만 로그 저장소로 보내도록할 수 있다.


데이타 필드 추가는 기존 들어온 로그 데이타에 데이타를 전송한 서버명 (Host명)등을 추가해서 로그 저장소로 보낼 수 있다.

마지막으로 데이타 필드 삭제는 불필요한 필드를 삭제하거나 개인 정보등 민감 정보를 삭제하거나 해쉬화하여 데이타 저장소로 보낼 수 있는 기능을 한다.

Output

Output은 Input 플러그인과 반대로, 앞에서 필터링된 데이타를  데이타 저장소 솔루션에 데이타를 저장하도록 한다. (mongodb나 AWS S3 , Google의 Big query등)

Formatter (Optional)

Output 플러그인을 통해서 데이타를 저장소에 쓸 때, Formatter 를 이용하면 쓰는 데이타의 포맷을 정의할 수 있다.(cf. Input의 parser가 포맷에 맞게 읽는 플러그인이라면, Formatter는 Output을 위한 포맷을 지정하는 플러그인이라고 보면 된다.)

Buffer (Optional)

Input에서 들어온 데이타를 바로 Output으로 보내서 쓰는것이 아니라 중간에 선택적으로 Buffer를 둬서 Throttling을 할 수 있다. 버퍼는 File과  Memory 두가지를 사용할 수 있다.

간단하게 구조와 작동 원리를 보면 다음과 같다.



<그림. fluentd의 로그 writing 흐름>

원본  http://docs.fluentd.org/articles/buffer-plugin-overview


버퍼에는 로그데이타를 분리하는 tag 단위로 chunk가 생성이 된다.

chunk는 태그별 큐라고 보면 된다. 예를 들어 error, info, warning, user 와 같이 태그를 분리하면 error 로그는 error chunk에 저장이 되고, info 로그는 info chunk에 저장된다.

Chunk에 데이타가 쌓여서 buffer_chunk_limit 만큼 chunk가 쌓여서 full이 되거나, 또는 설정값에 정의된 flush_interval 주기가 되면 로그 저장소로 로그를 쓰기 위해서 Queue에 전달이 된다.


<그림. Memory buffer 설정 예제>

참고 : http://docs.fluentd.org/articles/buffer-plugin-overview


다음 Queue에서는 데이타를 읽어서 로그 저장소에 데이타를 쓰는데, 로그 저장소에 문제가 없다면 바로 로그가 써지겠지만, 네트워크 에러나 로그 저장소 에러로 로그를 쓰지 못할때는 retry_wait 시간 만큼 대기를 한 후에, 다시 쓰기를 시도한다. 다시 쓰기를 실패하면 전에 기다린 시간의 2배 만큼, 또 실패하면 또 2배만큼을 기다린다. (1초, 2초, 4초,8초…) 다시 쓰기 시도는 설정값에 지정된 retry_limit 횟수까지 계속 진행한다.

만약에 Queue 가 차버렸을때 처리에 대한 정책을 설정할 수 있는데, “exception”과, “block” 모드 두가지고 있고, exception 모드일 경우에는 BufferQueueLimitError 를 내도록 하고, block 모드의 경우에는 BufferQueueLimitError가 해결될때 까지, input plugin을 중지 시킨다 (더이상 로그를 수집하지 않는다는 이야기).

Queue가 차버렸을때 다른 처리 방법으로는 큐가 다 찾을때, Sencondary output을 지정해서, 다른 로그 저장소에 로그를 저장하는 방법이 있다. 예를 들어 로그를 mongodb에 저장하도록 했는데, mongodb 나 네트워크 장애로 로그를 쓸 수 없는 경우에는 secondary output을 AWS S3로 지정해놓고, S3로 로그를 일단 저장하게 하고 나중에 mongodb가 복구된 후에, S3에서 다시 mongodb로 로그를 수집하는 방식을 취할 수 있다.



<그림. Secondary output 설정 예제>

출처 : http://docs.fluentd.org/articles/buffer-plugin-overview


Buffer 플러그인과, 에러 처리에 대한 자세한 내용은 http://docs.fluentd.org/articles/buffer-plugin-overview 를 참고하기 바란다.


데이타 구조

다음으로 Fluentd가 내부적으로 어떻게 로그 데이타를 핸들링 하는지 데이타 구조를 살펴보면 다음과 같다.



출처 :http://pt.slideshare.net/frsyuki/fluentd-set-up-once-collect-more


데이타는 크게 3가지 파트로 구성된다. Time, tag, record

  • Time : 로그데이타의 생성 시간

  • Record : 로그 데이타의 내용으로 JSON형태로 정의된다.

  • Tag : 이게 가장 중요한데, 데이타의 분류이다. 각 로그 레코드는 tag를 통해서 로그의 종류가 정해지는데, 이 tag에 따라서 로그에 대한 필터링,라우팅과 같은 플러그인이 적용 된다.

간단한 테스트

테스트 환경은 맥북을 기준으로 하였다.

http://docs.fluentd.org/articles/install-by-dmg 를 따라서 테스트를 하면 되는데, 먼저 fluentd를 받아서 인스톨을 한다.

인스톨이 끝나면, fluentd 프로세스인 td-agent는 /opt/td-agent/usr/sbin/에 인스톨이 된다.

그리고 디폴트 설정 파일은 /etc/td-agent/td-agent.conf에 저장된다.

td-agent.conf의 내용을 보면 다음과 같다.



<ROOT>

 <match td.*.*>

   type tdlog

   apikey xxxxxx

   auto_create_table

   buffer_type file

   buffer_path /var/log/td-agent/buffer/td

   <secondary>

     type file

     path /var/log/td-agent/failed_records

     buffer_path /var/log/td-agent/failed_records.*

   </secondary>

 </match>

 <match debug.**>

   type stdout

 </match>

 <source>

   type forward

 </source>

 <source>

   type http

   port 8888

 </source>

 <source>

   type debug_agent

   bind 127.0.0.1

   port 24230

 </source>

</ROOT>



<source> 부분을 보면 type이 http, port가 8888인 정의가 있다. 이 정의는 http://localhost:8888 로 부터 로그를 수집하겠다는 정의이다.

다음 <match>부분을 보면 <match debug.**> 라는 정의로 태그가 debug.**  로 정의된 로그에 대해서 type stdout으로, stdout (화면)으로 바로 출력하겠다는 정의이다.

http://localhost:8888/{debug.**} 로 들어오는 요청에 대해서 stdout으로 로그를 출력하겠다는 설정이다.


설정 파일을 확인했으면, 이제 기동을 해보자

/opt/td-agent/usr/sbin 디렉토리에서 -c 옵션으로 설정 파일을 지정하고 td-agent를 다음과 같이 실행해보자

% ./td-agent -c /etc/td-agent/td-agent.conf


에이전트가 실행되었으면 curl 명령을 이용하여 http://localhost:8888/debug.test 로 {"json":"message"} 로그 문자열을 전송해보자

% curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test


다음은 실행 결과 이다.





다음과 같이 td-agent가 기동된 후에,  맨 아랫줄에 debug.test 라는 태그 이름으로 {“json”:”message”}라는 로그가 수집되어 출력된것을 볼 수 있다.

데몬으로 실행하기

앞에서는 CLI상에서 foreground로 실행을 하였는데, 맥에서 서비스로 백그라운드 작업으로 실행을 할 수 있다. 실행 방법은

%sudo launchctl load /Library/LaunchDaemons/td-agent.plist

를 실행하면 백그라운드로 실행된다. 백그라운드로 실행을 위한 스크립트인 td-agent.plist는 fluentd설치시  /Library/LaunchDaemons/td-agent.plist에 자동 생성된다.

백그라운드 작업이기 때문에, stdout이 없고 stdout으로 출력되는 로그는 /var/log/td-agent/td-agent.log로 확인할 수 있다.





실행중인 프로세스를 종료 하는 방법은

%sudo launchctl unload /Library/LaunchDaemons/td-agent.plist

를 사용하면 된다.


다음 글에는 실제로 fluentd 를 설정해서 Google의 Bigquery또는 큐로 로그를 전달하는 설정 방법에 대해서 알아보겠다.