블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Apache airflow


조대협 (http://bcho.tistory.com)

배경

빅데이타 분석이나, 머신러닝 코드를 만들다 보면 필요한것중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이타 베이스의 ETL 작업과 비슷한 흐름이라고 보면 된다.


예를 들어 머신러닝의 학습 과정을 보면 데이타 전처리,학습,배포,예측과 같은 단계를 가지게 된다.


  • rawdata를 읽어서 preprocessing 단계를 거쳐서 학습에 적절한 training data로 변경하고,

  • 변경된 training data를 가지고 머신러닝 모델을 학습한후, 학습된 모델을 저장한다.

  • 학습된 모델을 가지고 예측을 해서 결과를 저장한다.


이렇게 머신러닝은 여러개의 단계를 거쳐서 수행이 되는데, 각 단계가 끝나면 다음 단계를 수행해야 한다. 단순하게 CRON+쉘로 순차적으로 수행하는 것등이 가능하지만, 에러가 났을때 재처리를 하거나 , 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.

데이타 워크 플로우 관리 도구

이런 요구 사항 때문에 여러가지 툴이 개발되었는데, 대표적인 도구로는 하둡 에코시스템에 우지(oozie ) 등이 있다.



<그림. Oozie eclipse 클라이언트 >


하둡의 여러 에코 시스템 솔루션들을 유기적으로 조합하기 위해서 개발된 도구로, 하둡 에코 시스템에 있는 여러가지 다양한 솔루션과 연동하기 위한 아답터를 가지고 있다.

이외에도 rundeck, luigi와 같은 유사한 솔루션들이 있다.

오늘 소개하고자 하고자하는 데이타 워크 플로우 관리도구는 아파치 오픈소스 airflow 이다. 원래 airbnb에서 개발된 도구로 현재 아파치 오픈소스에서 인큐베이터 단계에 있는 소프트웨어이다.


airflow를 소개하는 이유는 첫번째 파이썬 기반으로 태스크 코드를 작성할 수 있기 때문에, 데이타 분석이나 머신러닝을 개발하는 엔지니어들에게 익숙한 언어이고, 한대에서 동작하는게 아니라 여러 머신에 분산하여 수행 될 수 있는 장점을 가지고 있다.



<그림. Apache airflow 의 작업 그래프 구조 화면 >

airflow 시작하기

그러면 간단하게 airflow에 대한 개념과 사용법에 대해서 알아보자

airflow 설치

airflow는 실행되는 작업의 상태등을 저장하기 위해서 데이타 베이스 (MySQL이나 Postgres)등이 필요하며, 분산 환경을 위해서 여러대에 설치할 수 있다. 또한 로컬 환경에 sqlite와 함께 간단하게 설치할 수 있다. 여기서는 간단하게 개인 맥북환경에 로컬로 설치 및 실행하는 시나리오로 설명한다.


설치 방법은 매우 간단하다. 파이썬 2.7 환경에서 아래와 같이 간단학 “pip install airflow”만 실행해주면 된다.

%pip install airflow



airflow가 설치되었으면 데이타 베이스 설정을 해줘야 하는데, 이 튜토리얼에서는 개발 및 테스트를 위해서 sqlite를 사용한다. sqlite를 초기화 하기 위해서 다음과 같이 “airflow initdb” 명령을 실행한다.

% airflow initdb


자아 이제 설치가 끝나고 airflow를 사용할 준비가 되었다. 이제 airflow 웹콘솔을 기동해보자

“airflow webserver -p 8080” 을 실행하고 웹에 http://localhost:8080에 접근하면 airflow 콘솔을 볼 수 있다.

airflow 코드

airflow에서 워크플로우를 저장하기 위해서 몇가지 추상화된 개념을 사용한다.

Airflow DAG의 구조

DAG (Directed Acyclic Graph)

DAG는 하나의 워크 플로우라고 보면 된다. 위의 예제처럼, 머신러닝 이라는 DAG를 정의한다면, Preprocessing,Training,Prediction 워크플로우가 하나의 DAG가 된다.

Operator and Task

Operator는 DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이야기 하는데, Pre processing, Training, Prediction 함수가 Operatorator 이다.

이 Operator 함수가 DAG 상에서 실제로 워크플로우 상에 정의되서 호출 되면 이것이 Task 이다.

객체지향 언어에서 Operator가 class 라면, Task는 object 라고 보면 된다.


이해가 잘안될 수 있는데, 코드를 보자


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator

from datetime import datetime,timedelta


dag = DAG('hello-airflow',description='Hello airflow DAG',

         schedule_interval = '*/5 0 * * *',

         start_date=datetime(2017,07,01),catchup=False)


def print_hello():

   return 'Hello Airflow'


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


bash_task.set_downstream(python_task)


DAG 정의 부분을 보자. DAG 객체는 DAG에 대한 전체 컨택스를 저장 및 유지 관리한다.

DAG('hello-airflow',description='Hello airflow DAG', 에서 DAG를 이름을 ‘hello-airflow’로 정의하고 description에 설명을 적는다.

schedule_interval = '*/5 * * * *', 다음으로 이 DAG가 실행되는 주기를 정해야 하는데, cron 명령과 같은 노테이션으로 정의한다. 위 설정은 매 5분마다 실행되도록 하는 설정이다.

마지막으로, start_date=datetime(2017,07,01), ,DAG를 언제부터 시작할것인지 지정한다. DAG는 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG를 부르는 sub DAG의 경우에는 지역 변수로 지정이 가능하다.


다음 task에 사용할 operator를 정의하는데, 파이썬 코드를 실행할 오퍼레이터인 PythonOperator와 쉘 커맨드를 실행할 BashOperator를 가지고 각각 파이썬 태스크 python_task와, 쉘 태스크 bash_task를 정의한다.


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


파이썬 태스크의 id는 “python_operator”라고 지정하였고, 실행시 print_hello를 호출하도록 하였다.

그리고 이 태스크는 DAG인 dag에 지정한다.


다음 쉘 태스크의 내용은 다음과 같다.

bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


print_data라는 이름으로 태스크를 정의하고, 쉘 명령어 date를 실행하도록 하였다.

등록

코드 작성이 끝나면 코드를 배포해보자. Dag 파일을 airflow에 등록해야 하는데, dag 파일을 저장하는 장소는 dags_folder 라는 변수로 $AIRFLOW_HOME/airflow.cfg 파일안에 정의 되어 있다. 디폴트 장소는 $AIRFLOW_HOME/dags/ 폴더이다. 위에서 작성한 코드를 해당 디렉토리에 복사하자

다음 dag이 제대로 등록되었는지를 확인한다. 커멘드 창에서 “airflow list_dags”라는 명령을 수행하면 현재 등록되어 있는 DAG 목록을 볼 수 있다. 아래 그림과 같이 hello-airflow dag가 등록된것을 확인할 수 있다.




hello-airflow dag안에 어떤 태스크들이 정의되어 있는지를 확인하려면 ‘airflow list_tasks hello-airflow’ 명령을 이용하면 hello-airflow 안에 등록된 태스크 목록을 출력해준다.


테스트

테스트를 하려면 태스크 단위로 테스트가 가능하다. airflow test {DAG ID} {태스크 ID} {실행날짜} 식으로 하면 된다.

, 예를 들어 print_date 태스크를 2017-07-01을 기준으로 실행하고자하면 airflow test hello-airflow print_date 2017-07-01

Hello-airflow DAG안에 print_date라는 태스크를 실행한다.



실행

DAG 코드 개발 등록과 테스트가 완료되었으면 이제 airflow scheduler 를 띄워준다. (일종의 데몬이다.) 스케쥴러는 DAG 코드에 정의된 스케쥴에 따라서 테스크를 실행해준다.

스케쥴러 실행은 간단하게 airflow scheduler 명령을 실행하면 된다.



스케쥴러가 실행되면, 각 DAG의 스케쥴에 따라서 자동으로 태스크들을 수행한다.


로그 모니터링

스케쥴러에 의해서 실행되는 DAG와 태스크들의 결과와 로그는 어떻게 모니터링 할까? airflow에 의해서 수행되는 태스크들은 $AIRFLOW_HOME/logs 디렉토리에 저장된다.

logs 디렉토리 아래에 각각 DAG 이름으로 저장이 되며, DAG 이름으로 된 디레토리안에는 태스크명으로 된 서브 디렉토리가 있고, 이 서브 디렉토리 아래에 시간대별 로그가 있다.

즉 hello-airflow DAG의 print_date 태스크에 대한 로그는 $AIRFLOW_HOME/logs/hello-airflow/print_date/{날짜및시간} 파일 명으로 저장된다.

웹 콘솔을 이용한 모니터링

airflow의 강력한 기능중의 하나는 웹 기반의 모니터링 콘솔을 제공한다. 뒤에서는 주요 웹 콘솔의 주요 기능에 대해서 알아보도록 한다.

Graph View

Graph View는 DAG의 구조를 그래프 형태로 보여주는 뷰이다.


복잡한 워크플로우의 경우 그 구조를 파악하는데 유용한다. 위의 그림은 앞서 만든 hello-airflow 에 대한 태스크간 그래프로 print_date를 호출한 후에, python_operator 태스크를 호출하는 것을 볼 수 있다.

Tree View


트리뷰를 보면, DAG의 구조를 트리 형태로 보여주고, DAG의 태스크가 각각 성공했는지 실패 했는지를 우측 그래프 처럼 표현해준다. 각 태스크를 로그를 보려면 각 태스크 실행 결과 그래프를 누르면 아래와 같이 세부 메뉴가 나온다.



여기서 View Log를 누르면 각 Task 별로 실행 당시의 로그를 볼 수 있다. 아래는 Python_Operator 태스크를 실행한 로그이다.



아래서 두번째 줄을 보면 Hello Airflow 라는 문자열을 리턴한것을 확인할 수 있다.


Task Duration

Task duration은 DAG에서 수행된 각 태스크의 수행 시간을 그래프 형태로 나타내준다.



어떤 태스크가 시간이 많이 걸리는지 그리고 수행시간이 매번 수행할때 마다 올바른지 (큰 변화가 없고 일정한지. 이건 매우 유용할듯) 등을 체크할 수 있다.

Task Tries


Task Tries 에서는 각 수행별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 즉 재시도 (RETRY)횟수를 모니터링할 수 있다.


Gantt


Gantt 차트는 각 수행에 대해서 태스크들의 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다.

앞의 차트에서 이미 얻을 수 있는 뷰이지만, 각 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉽다.


<그림 airflow gantt chart 그래프 예제 (출처 : https://www.agari.com/airflow-agari/) >


이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 등 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow를 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 수 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 쉽게  정교한 플로우 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.

노트7의 소셜 반응을 분석해 보았다. 


#3 제플린 노트북을 이용한 상세 분석



조대협 (http://bcho.tistory.com)



데이타 스튜디오는 편리하게 사용할 수 있지만, 쿼리 사용등이 불가능하기 때문에, 원본 데이타를 이용한 상세 분석에는 어려움이 있다. 원본 데이타를 이용해서 상세 분석을 하려면 노트북 계열의 애플리케이션이 효과적인데, 빅쿼리를 연동할 수 있는 노트북으로는 이전에 소개한 주피터 노트북 기반의 데이타랩 (datalab)과, 스파크나 다른 빅데이타 솔루션과 함께 많이 사용되는 제플린 노트북(zeppelin.apache.org) 이 있다.


지난 글에서 데이타랩에 대한 연동 방법은 이미 소개하였으니, 이번에는 제플린을 통하여, 빅쿼리의 데이타를 분석해보도록 한다.


제플린 설치

제플린을 설치 하는 방법은 간단하다. Zeppelin.apache.org 에서, 설치 파일을 다운로드 받는다.

빅쿼리 연동 인터프리터는 제플린 버전 0.61 버전 이상에 포함되어 있기 때문에, 0.61 버전 이상을 다운로드 받는다.  이 때 모든 인터프리터가 포함된 버전을 다운 받아야 한다. (아니면 별도로 인터프리터를 설치해야 하는 번거로움이 따른다.)


다운 로드 받은 파일의 압축을 푼다. 다음으로 제플린 설치 디렉토리로 들어가서 다음 명령어를 수행한다.

% ./bin/zeppelin.sh

윈도우의 경우에는 %./bin/zeppelin.cmd 를 실행하면 된다.

자바 애플리케이션이기 때문에 별도의 설치 과정이 필요없고, 제플린 애플리케이션을 실행하기만 하면 된다.

제플린이 기동되었으면 브라우져에서 http://localhost:8080 으로 접속하면 다음과 같이 제플린 콘솔을 볼 수 있다.

노트북 생성

제플린 콘솔에 들어왔으면 초기화면에서 Create new note 라는 메뉴를 이용하여 새로운 노트북을 생성하자. 여기서는 편의상 “BQ 노트북" 이라는 이름으로 노트북을 생성하였다.


분석 쿼리 작성

이제 분석할 내용은 수집된 트윗의 명사들에 대해서, 시간 단위로 그룹핑을 한 다음에, 각 단어에 대해서 발생한 횟수를 카운트해서 보여주는 내용을 구현하려고 한다.

예를 들어서 9월20일에는 “유행" 이라는 단어가 200회 발생하였고, “패션" 이라는 단어가 100회 발생하였다. 라는 식으로 조회를 하려고 한다.


현재 테이블 구조는 다음과 같다.

Date (발생 시간)

Noun (명사)

count (발생 횟수)


SQL 문장을 작성해보자

select date,noun,sum(count) from 테이블명

group by date,noun


이렇게 쿼리를 하면, 시간대 별로, 명사와 그 명사의 발생 횟수를 리턴을 해주는데, 우리는 앞의 데이타 플로우 프로그램에서 30초 단위로 통계를 집계하도록 하였기 때문에, 30초 단위로 결과가 리턴된다. 우리가 원하는 결과는 30초 단위가 아니라 1시간 단위의 결과 이기 때문에, 다음과 같이 쿼리를 수정한다.


select  DATE(date) as ddate,HOUR(date) as dhour,noun,sum(count) from 테이블명

group by ddate,dhour,noun


DATE와 HOUR라는 함수를 사용하였는데, DATE는 타임 스탬프 형태의 컬럼에서 날짜만을 추출해주는 함수 이고, HOUR는 타임 스탬프 형태의 컬럼에서 시간만을 추출해주는 함수 이다.

이렇게 날짜와 시간만을 추출한 다음에, group by 절을 이용하여, 날짜와,시간 그리고 명사로 그룹핑을 하게 되면 우리가 원하는 것과 같이 각 날짜의 시간대별로 명사별 발생횟수 ( sum(count)) 값의 통계를 얻을 수 있다.


제플린에서 빅쿼리 명령을 수행하려면 다음과 같이 %bigquery.sql 이라고 첫줄에 선언을 한 다음에 SQL 문장을 수행하면 된다.




결과는 디폴트로 테이블 형태로 나오는데, 아래 아이콘 중에서 그래프 아이콘을 누르면 그래프 형태로 볼 수 가 있는데, 이 때 X,Y축의 변수를 지정할 수 있다.

아래 그림과 같이 Keys (X축을) ddate,dhour를 선택하고 Values(Y축)을 dhour SUM을 선택하면, 시간별 나타난 단어수를 볼 수 있다.



그런데 이 쿼리를 수행하면, 각 시간별로 발생한 명사 단어의 수가 매우 많기 때문에, 보기가 매우 어렵다.

그렇다면 시간대별로 발생한 명사중에서 각 시간대별로 많이 발생한 명사 5개씩만을 볼 수 없을까? 즉 group by를 전체 데이타 구간이 아니라, 각각 시간대 별로 계산을 해줄 수 는 없을까 하는 필요가 발생한다.

빅쿼리 파티셔닝

데이타를 구간 별로 나눠서 연산할 수 있는 기능으로 빅쿼리에는 파티션이라는 기능이 있다.

예를 들어서 group by를 전체 결과에 대해 그룹핑을 하는 것이 아니라, 앞에서 언급한 요건 처럼 일 단위로 짤라서 그룹핑을 하는 것이 가능하다.




파티션을 이용해서 할 수 있는 것은 파티션별로 합계나, 통계를 내거나, 파티션의 각 로우의 값의 백분율(%)나 또는 소팅한 순서등을 볼 수 있다. 여기서는, 시간으로 파티션을 나누고  파티션내에서 명사의 수가 많은 수 순서대로 소팅을 한후에, RANK라는 함수를 이용하여 그 파티션에서 그 명사가 몇번째로 많이 나타났는지를 출력하도록 해보겠다.


파티션의 사용법은 다음과 같다.

“파티션 함수 OVER (PARTITION BY 파티션을할 키 목록)”

여기서는 일/시간 별로 파티션을 나눈 후에, 그 순위별로 소팅을 할 것이기 때문에, 다음과 같은 식을 쓴다.

RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank


이를 쿼리에 적용하면 다음과 같다.

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc


그러면 다음과 같이 일/날짜 파티션별로 많이 발생한 명사 순으로 발생횟수와 순위(rank)를 출력해준다.



그런데 쿼리를 돌려보면 알겠지만, 시간대별로 수집한 명사의 종류가 많기 때문에, 일자별 데이타가 매우 많다. 그래서 파티션별로 많이 등장하는 단어 5개만을 보려고 하면 rank <5 인것만 걸러내면 된다. 이는 중첩 쿼리를 이용해서 수행이 가능하다

다음은 이를 적용한 예이다.


SELECT ddate,dhour

   ,noun

   , rank

from (

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   where noun != "note7" and noun != "samsung" and noun !="galaxy"

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc

   )

where rank < 6


이렇게 하면, 각 시간대별로 자주 등장하는 단어 6개만을 보여준다.


이 쿼리를 이용하여 데이타를 어떻게 분석하는지는 예전글 http://bcho.tistory.com/1136 을 참고하세요.


간단하게나마 트위터 피드에서 특정 키워드를 기반으로 하여, 명사와 형용사를 추출하여 소셜 반응을 분석하는 애플리케이션 개발과 데이타 분석 방법에 대해서 설명하였다.

아이폰7을 분석해보니, 명사 분석도 의미가 있었지만, 아이폰7에 대한 기대를 형용사 분석을 통해서도 많은 인사이트를 얻을 수 있었다. Awesome, excellent와 같은 기대치가 높은 형용사가 많이 검출되었고 bad, fuck 과 같은 부정적인 의미의 형용사는 다소 낮게 검출되었다. (아마 이즈음이 노트7 폭발로 인하여 반사 이익을 얻은게 아닐까 추정되는데.)


이외에도, 이모콘티만 추출하여 분석을 한다거나, 부사등을 통해서 분석을 하는 것도 가능하고, 구글 자연어 처리 API는 글을 통해서 사람의 감정을 분석해주는 기능도 있기 때문에 응용 분야는 훨씬 더 넓다고 볼 수 있다.

람다 아키텍쳐의 소개와 해석

조대협 (http://bcho.tistory.com)


람다 아키텍쳐란

람다 아키텍쳐는 트위터에서 스트리밍 컴퓨팅에 있었던Nathan Marz에 의해서 소개된 아키텍쳐로, 실시간 분석을 지원하는 빅데이타 아키텍쳐이다.

아키텍쳐에 대한 자세한 내용은 http://lambda-architecture.net/ 에 소개되어 있다.


문제의 정의

아키텍쳐에 대한 이해를 돕기 위해서 예를 들어 설명해보자.

 페이스북과 SNS 애플리케이션 SNS가 있다고 가정하자. 이 애플리케이션은 모바일 애플리케이션이며, 글쓰기, 읽기, 댓글 달기, 스크롤 하기, 페이지 넘기기등 약 1000여개의 사용자 이벤트가 있다고 가정하자.

 사용자 수는 대략 1억명이며, 매일 이 각 사용자의 행동 패턴을 서버에 저장하여, 일별로, 사용자 이벤트의 개수를 통계로 추출한다고 하자.

클라이언트 디바이스로 부터 올라오는 데이타는 다음과 같다

  • 사용자 : 조대협

  • 날짜 : 2015년 1월 5일



<그림 1. 클라이언트에서 올라오는 데이타 포맷>

이런 환경에서, 기간별 특정 이벤트 추이, 가장 많이 활용되는 이벤트 TOP5 등의 통계 정보를 실시간으로 보고 싶다고 가정하자

가장 단순한 접근은 RDBMS에 저장하고 쿼리를 수행하는 방법이다.


<그림 2. 로그 데이타를 RDBMS에 저장한 포맷>

RDBM에 저장하고 SQL 쿼리문을 돌리면 되겠지만, 문제는 간단하지 않다. 1000개의 컬럼에, 1억명이 사용하는 시스템이다. 즉. 하루에 최대 1000개의 컬럼 짜리, 1억개의 레코드가 생성이 된다것이다.한달이면 30억개의 레코드이다.

이런 많은 데이타를 동적 SQL로 실행하였을때 그 수행시간이 많이 걸린다.


배치를 활용

그러면 이런 시간이 많이 걸리는 문제를 어떻게 해결하면 좋을까? 이를 위한 전통적인 접근 방식은 배치(BATCH)를 활용하는 것이다. 배치는, 어떤 특정 정해진 시간에, 계산을 미리 해놓는 것이다.

즉 데이타를 모아 놓았다가.밤마다.그날짜의 사용자들의 이벤트들의 합을 매일 계산해놓은 테이블을 만들어 놓으면 된다.



<그림 3. 일별 배치로 생성된 이벤트 데이타 테이블>

자아, 이렇게 배치로 테이블을 만들어 놓으면, 특정 기간에 각 이벤트별 통계를 내기가 쉬워 진다. 1년분의 데이타라하더라도 365 행 밖에 되지 않기 때문에, 속도 문제가 해결이 된다.

실시간 데이타의 반영

테이블 조인

이렇게 배치 테이블을 생성하면, 성능에 대한 문제는 해결이 되지만, 데이타가 배치 주기에 따라 최대 1일의 편차를 두게 된다. 즉 실시간 반영에 대한 문제가 발생한다.

그렇다면 어떻게 해결을 해야 할까? 해결은 배치 테이블과 그날의 데이타 테이블을 두개를 같이 사용하면 된다.

즉 어제까지의 데이타는 일별 배치로 생성된 테이블을 사용하고, 오늘 데이타 부분은 사용자별로 기록된 로그 테이블을 사용하여 두 테이블을 조인 하면, 오늘의 지금 순간의 통계값까지 볼 수 있다.

 


<그림 4. 테이블 조인을 이용한 실시간 데이타 통계 추출 >


실시간 집계 테이블의 활용

하루에 쌓이는 데이타량이 얼마 되지 않는다면 문제가 되지 않겠지만, 이 시나리오에서 하루에 쌓이는 데이타는 일 최대 1억건이 된다. 즉, 오늘 쌓이는 데이타 테이블을 조인 하면 1억개의 행에 대한 연산이 발생하여 적절한 성능을 기대하기 어렵다. 

그렇다면, 배치는 매일 돌리되, 오늘 데이타에 대한 통계 값을 실시간으로 업데이트 하는 방법을 생각해볼 수 있다. 

아래 그림과 같이 로그서버에서 클라이언트에서 받은 로그를 원본 데이타 테이블에 계속 저장을 하고, 오늘 통계에 대한 실시간 집계 테이블에, 글쓰기, 글 읽기 등 개별 이벤트의 값을 계산해서 더해 주면 된다.

 


<그림 5. 실시간 집계 테이블>

이렇게 하면, 실시간 집계 테이블과, 배치 테이블을 조인하여 빠르게 실시간 통계를 볼 수 있다.

즉 일별 실시간 통계는 다음 그림과 같이 당일전의 배치뷰와 당일의 실시간뷰를 합쳐서 통계를 낸 형태가 된다.

 


<그림 6. 실시간 통계를 뽑기 위한 테이블들의 관계>


람다 아키텍쳐를 활용

이 개념을 람다 아키텍쳐로 해석해보자. 데이타 흐름을 도식화 해보면 다음과 같다.

 


<그림 7. 람다 아키텍쳐의 개념>


먼저 배치 처리를 위해서, 로그 서버는 모든 로그 데이타를 저장소에 저장하고, 배치 처리 계층에서 일일 또는 일정한 시간을 주기로 배치 처리로 계산을 해서 배치 뷰(배치 테이블)을 만든다.

그리고 다른 흐름으로 실시간 처리쪽에 데이타를 전송해서 실시간 집계를 해서 실시간 집계 테이블을 만든다.

마지막으로, 이 두개의 뷰를 합쳐서 통계를 만든다.

배치뷰는 배치로 돌때만 쓰기가 가능하고 평상시에는 데이타를 읽기만 가능하게 한다. 이를 통해서 데이타가 변경되거나 오염(Corrupt)되는 것을 막을 수 있다.

실시간 뷰는 실시간으로 데이타를 쓰고, 읽을 수 있는 시스템을 사용한다.

위의 문제 정의 예제에서는 컬럼의 개수를 카운트 정도하는 간단한 예를 들었지만, 실제 빅데이타 분석에서는 단순 통계뿐 아니라 복잡한 수식이나 다단계를 거쳐야 하는 데이타 파일의 가공이 필요하기 때문에 복잡한 프로그래밍이 가능한 처리(배치/실시간)이 필요한데, 이 처리 계층에는 프로그램을 이용하려 알고리즘을 삽입할 수 있어야 한다.

이러한 특성에 맞춰서 각 데이타 처리 흐름에 솔루션을 맵핑 해보면 다음과 같다.



<그림 8. 람다 아키텍쳐에 대한 솔루션 맵핑> 


저장소는 대량의 데이타를 저비용으로, 안정성 있게 (유실이 없게) 저장할 수 있는 것이 필요하다. 그리고 이런 대량의 데이타를 배치로 처리할 때 되도록이면 빠른 시간내에 복잡한 알고리즘을 적용해서 계산할 수 있는 계층이 필요한데, 이러한 솔루션으로 제시되는 솔루션이 하둡의 HDFS(Hadoop File System)과 하둡의 MR (Map & Reduce)이다.

이렇게 계산된 배치 데이타를 저장할 장소가 필요한데, 하둡에서는 이런 데이타를 저장하고 고속으로 액세스할 수 있도록 HBase라는 NoSQL을 제공한다.

실시간 처리는 복잡한 알고지즘을 빠르게 데이타를 처리할 수 있는 솔루션이 필요한데, 대표적으로 Apache Storm등이 있으며, 빠른 읽기와 쓰기를 지원해야 하기 때문에, Redis와 같은 In-memory 기반의 NoSQL이 적절하게 추천되고 있다.

일반적으로 람다 아키텍쳐를 소개할때, 제안되는 솔루션의 형태이기는 하나, 람다 아키텍쳐는 특정 솔루션을 제안하는 아키텍쳐이기 보다는 데이타의 처리 기법을 소개하는 솔루션에 종속성이 없는 레퍼런스 아키텍쳐이다.

그래서 다른 솔루션 조합을 고려해볼 수 있는데, Dr.dobbs (http://www.drdobbs.com/database/applying-the-big-data-lambda-architectur/240162604)

에 소개된 솔루션 조합과 필자가 추천하는 조합을 추가해서 보면 다음과 같다.


<그림 9. 람다 아키텍쳐의 솔루션 조합>


여기서,필자가 Dr.Dobbs의 추천 솔루션 이외에, 배치 뷰와 실시간 뷰 쪽에, RDBMS를 추가하였는데, 배치뷰에 추가한 Amazon RedShift의 경우 아마존 클라우드 서비스에서 제공되는 Postgres 기반이 서비스로, 최대 16PB(페타바이트)까지의 용량을 지원한다. 이미 빅데이타라고 부를만큼의 충분한 데이타 사이즈를 지원할 뿐더라, RDBMS 기반의 SQL을 이용하여 유연한 데이타 조회가 가능하며, 리포트를 출력하기 위한 기존의 BI 툴과도 호환이 잘되서 많은 개발에 관련된 부분을 덜 수 있다. 실제로 통계 리포팅에서 가장 많은 시간이 소요되는 작업이, 비즈니스쪽 요구에 맞는 리포트를 만드는 작업이다.어떤 테이블과 그래프를 이용해서 데이터에 대한 의미를 보여줄 지는 단순한 리포팅 작업이라고 치부하기에는 매우 중요한 작업이며, 다양한 비즈니스 요건에 맞는 뷰를 보여 주기 위해서는 BI툴과의 연동은 많은 장점을 제공한다.

위에서 설명한 람다 아키텍쳐를 계층(Layer)로 나눠서 소개 하면 다음 그림과 같다.

실시간 데이터를 처리하는 부분을 스피드 레이어라고 부르며, 배치 처리는 배치 저장소와 배치 처리 부분을 배치 레이어라고 명명하고, 배치에 의해서 처리된 요약 데이터를 제공하는 부분을 서빙 레이어(Serving Layer)라고 한다.

 


<그림 10. 계층별로 추상화된 람다 아키텍쳐>

배치 레이어의 의미

배치 레이어의 저장소에는 가공전의 원본 데이터를 모두 저장한다. 데이터가 처리된 후에도 저장소에 데이터를 삭제 하지 않는다.

이렇게 원본 데이터를 저장함으로써, 배치 뷰의 데이터가 잘못 계산되었거나, 유실 되었을때, 복구가 가능하고, 현재 데이터 분석에서 없었던 새로운 뷰(통계)를 제공하고자 할 때 기존의 원본 데이터를 가지고 있음으로써, 기존 데이터에 대해서도 새로운 뷰의 통계 분석이 가능하다.


람다 아키텍쳐의 재구성

RDBMS를 활용한 유연성 증대 방안

이러한 람다 아키텍쳐는 대용량 데이터 처리와 실시간 정보 제공을 위한 장점을 가지고 있음에도 불구하고 대부분 하둡이나 NOSQL등의 솔루션을 조합해서 구현하는 경우가 대부분이기 때문에, 유연성 측면에서 문제점을 가지고 있다.

예를 들어 배치 뷰를 HBase를 사용하고, 실시간 뷰를 Redis를 사용할 경우, 상호 솔루션간 데이터 조인이 불가능할 뿐더러, 인덱스나 조인,그룹핑, 소팅 등이 어렵다. 이러한 기능이 필요하다면 각각 배치 처리와 실시간 처리 단계에 추가적으로 로직을 추가해서 새로운 뷰를 만들어야 한다.

쉽게 설명하면, 일반적인 NoSQL은 키-밸류 스토어의 개념을 가지고 있다.

그래서, 위의 그림3과 같은 테이블이 생성되었다 하더라도, 특정 컬럼 별로 데이터를 소팅해서 보여줄수 가 없다. 만약 소팅된 데이터를 표현하고자 한다면, 소팅이 된 테이블 뷰를 별도로 생성해야 한다.

참고 : NoSQL 데이터 모델링 패턴

http://bcho.tistory.com/665 , http://bcho.tistory.com/666

그래서 이런 문제점을 보강하기 위해서는 위에서도 잠깐 언급하였듯이 실시간 뷰와 배치 뷰 부분을 RDBMS를 사용하는 것을 고려해볼 수 있다. 쿼리에 특화된 OLAP 데이터 베이스를 활용하는 방법도 있고, 또는 HP Vertica 등을 활용할 수 있다. (HP Vertica는 SQL을 지원하지만, 전통적인RDBMS가 데이터를 행 단위로 처리하는데 반하여, Vertica는 데이터를 열 단위로 처리해서 통계나 쿼리에 성능이 매우 뛰어나다. 유료이지만 1테라까지는 무료로 사용할 수 있으니 뷰 테이블 용도 정도로 사용하는데는 크게 문제가 없다.)


데이터 분석 도구를 이용한 새로운 분석 모델 개발

분석 통계 데이터를 제공하다 보면, 저장소에 저장된 원본 데이터를 재 분석함으로써 추가적인 의미를 찾아낼 수 있는데, 이 영역은 데이터 과학자의 영역으로, 저장소에 있는 데이터를 통해서 새로운 데이터 모델을 추출해 내는 방식이다.

예를 들어, 글읽기 이벤트와 글쓰기 이벤트간의 상관 관계를 파악해내거나, 요일별 이벤트 변화량등을 분석해낼 수 있는데,

  1. 이 저장소에 R이나 MetLab과 같은 데이터 분석 도구를 이용하여, 샘플(표본) 데이터를 추출해서 데이터의 상관 관계를 파악해보고,

  2. 이러한 분석을 통해서 새로운 통계 모델을 설계하고 검증해볼 수 있다.

  3. 만약 이러한 모델이 적절하다면 알고리즘을 구현하고 이를 빅데이타 엔지니어에게 넘겨 준다.

  4. 빅데이타 엔지니어는 데이터 과학자에게서 받은 알고리즘을 람다 아키텍쳐의 각 레이어에 배치된 솔루션에 알맞은 형태로 구현한다.


 


<그림 11. 새로운 데이터 모델의 개발>

이러한 과정의 반복을 통해서, 분석 시스템은 지속적으로 발전되어가면서 데이터에 대한 더 많은 인사이트를 제공할 수 있게 된다.


결론

간단하게나마 람다 아키텍쳐에 대해서 알아보았다.

람다 아키텍쳐는 꼭 빅데이타에 적용하거나, 또는 하둡을 이용해야 하는 아키텍쳐가 아니다. RDBMS나 CSV 파일 등, 어떤 데이터 형태라도 기본은 배치를 이용한 집계 테이블과 실시간 뷰 테이블을 조인한다는 개념이기 때문에, 솔루션에 억메이지 말고, 적절한 시나리오를 찾아서 적용할 수 있도록 하면 좋겠다.


참고 : 

http://www.drdobbs.com/database/applying-the-big-data-lambda-architectur/240162604

http://www.infoq.com/articles/lambda-architecture-scalable-big-data-solutions