블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기


조대협 (http://bcho.tistory.com)



개요

텐서플로우 학습에 있어서 데이타 포맷은 학습의 성능을 결정 짓는 중요한 요인중의 하나이다. 특히 이미지 파일의 경우 이미지 목록과 이미지 파일이 분리되어 있어서 텐서플로우에서 학습시 이미지 목록을 읽으면서, 거기에 있는 이미지 파일을 매번 읽어야 하기 때문에, 코딩이 다소 지저분해지고,IO 성능이 떨어질 수 있다

텐서플로우에서는 이러한 학습 데이타를 쉽게 읽을 수 있도록 tfrecord (http://bcho.tistory.com/1190)라는 파일 포맷을 지원한다.


이 글에서는 이미지 데이타를 읽어서 tfrecord 로 컨버팅하는 방법을 설명하며, 분산 데이타 처리 프레임웍인 오픈소스 Apache Beam을 기준으로 설명하나, tfrecord 변환 부분은 Apache Beam과 의존성이 없이 사용이 가능하기 때문에, 필요한 부분만 참고해도 된다. 이 Apache Beam을 구글의 Apache Beam 런타임 (매니지드 서비스)인 구글 클라우드의 Dataflow를 이용하여, 클러스터를 이용하여 빠르게 데이타를 처리하는 방법에 대해서 알아보도록 한다.


전체 코드는 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/4.%20Convert%20Pickle%20file%20to%20TFRecord%20by%20using%20Apache%20Beam.ipynb 에 있다.


이 코드는 CIFAR-10 이미지 데이타를 Apache Beam 오픈 소스를 이용하여, 텐서플로우 학습용 데이타 포맷인  tfrecord 형태로 변환 해주는 코드이다.


Apache Beam은 데이타 처리를 위한 프레임웍으로, 구글 클라우드 상에서 실행하거나 또는 개인 PC나 Spark 클러스터상 여러 환경에서 실행이 가능하며, 구글 클라우드 상에서 실행할 경우 오토스케일링이나 그래프 최적화 기능등으로 최적화된 성능을 낼 수 있다.


CIFAR-10 데이타 셋은 32x32 PNG 이미지 60,000개로 구성된 데이타 셋으로 해당 코드 실행시 최적화가 되지 않은 상태에서 약 16분 정도의 처리 시간이 소요된다. 이 중 6분 정도는 Apache Beam 코드를 구글 클라우드로 업로드 하는데 소요되는 시간이고 실제 처리시간은 10분정도가 소요된다. 전처리 과정에 Apache Beam을 사용하기 전에 고려해야 할 요소는 다음과 같다.

  • 데이타가 아주 많아서 전처리 시간이 수시간 이상 소요될 경우 Apache Beam + Google Cloud를 고려하여 여러 머신에서 동시에 처리하여 빠른 시간내에 수행되도록 할 수 있다.

  • 데이타가 그다지 많지 않고 싱글 머신에서 멀티 쓰레드로 처리를 원할 경우에는 Apache Beam으로 멀티 쓰레드 기반의 병렬 처리를 하는 방안을 고려할 수 있다. 이 경우 클라우드에 대한 의존성을 줄일 수 있다.

  • 다른 대안으로는 Spark/Hadoop 등의 오픈소스를 사용하여, On Prem에서 여러 머신을 이용하여 전처리 하는 방안을 고려할 수 있다.

여기서는 아주 많은 대량의 이미지 데이타에 대한 처리를 하는 것을 시나리오로 가정하였다.

전처리 파이프라인

Apache Beam을 이용한 데이타 전처리 파이프라인의 구조는 다음과 같다.

이미지 파일 준비

CIFAR-10 데이타셋 원본은 이미지 파일 형태가 아니라 PICKLE이라는 파일 포맷으로 되어 있기 때문에,  실제 개발 환경에서는 원본데이타가 이미지인것으로 가정하기 위해서 https://github.com/bwcho75/cifar-10/tree/master/pre-processing 의 1~2번 코드를 통해서 Pickle 파일을 이미지 파일로 변경하고, *.csv 파일에 {파일명},{레이블} 형태로 인덱스 데이타를 생성하였다.

생성된 이미지 파일과 *.csv 파일은 gsutil 명령어를 이용하여 Google Cloud Storage (aka GCS)에 업로드 하였다. 업로드 명령은 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/2.%20Convert%20CIFAR-10%20Pickle%20files%20to%20image%20file.ipynb 에 설명되어 있다.


전처리 파이프라인의 구조

Apache Beam으로 구현된 파이프라인의 구조는 다음과 같다.


1. TextIO의 ReadFromText로 CSV 파일에서 한 라인 단위로 문자열을 읽는다.

2. parseLine에서 라인을 ,로 구분하여 filename과 label을 추출한다.

3. readImage 에서 filename을 가지고, 이미지 파일을 읽어서, binary array 형태로 변환한다.

4. TFExampleFromImageDoFn에서 이미지 바이너리와 label을 가지고 TFRecord 데이타형인 TFExample 형태로 변환한다.

5. 마지막으로 TFRecordIOWriter를 통해서 TFExample을 *.tfrecord 파일에 쓴다.

코드 주요 부분 설명

환경 설정 부분

이 코드는 구글 클라우드와 로컬 환경 양쪽에서 모두 실행이 가능하도록 구현되었다.

SRC_DIR_DEV는 로컬환경에서 이미지와 CSV 파일이 위치한 위치이고, DES_DIR_DEV는 로컬환경에서 tfrecord 파일이 써지는 위치이다.

구글 클라우드에서 실행할 경우 파일 저장소를  GCS (Google Cloud Storage)를 사용한다. DES_BUCKET은 GCS 버킷 이름이다. 코드 실행전에 반드시 구글 클라우드 콘솔에서 GCS 버킷을 생성하기 바란다.  SRC_DIR_PRD와 DES_DIR_PRD는 GCS 버킷내의 각각 image,csv 파일의 경로와 tfrecord 파일이 써질 경로 이다. 이 경로에 맞춰서 구글 클라우드 콘솔에서 디렉토리를 먼저 생성해 놓기를 바란다.




PROJECT는 구글 클라우드 프로젝트 명이고, 마지막으로 DEV_MODE가 True이면 로컬에서 수행이되고 False이면 구글 클라우드에서 실행하도록 하는 환경 변수이다.

의존성 설정 부분

로컬에서 실행할 경우필요한  파이썬 라이브러리가 이미 설치되어야 있어야 한다.

만약에 구글 클라우드에서 실행할 경우 이 Apache Beam 코드가 사용하는 파이썬 모듈을 명시적으로 정의해놔야 한다. 클라우드에서 실행시에는 Apache Beam 코드만 업로드가 되기 때문에(의존성 라이브러리를 같이 업로드 하는 방법도 있는데, 이는 추후에 설명한다.), 의존성 라이브는 구글 클라우드에서 Dataflow 실행시 자동으로 설치할 수 있도록 할 수 있는데, 이를 위해서는 requirements.txt 파일에 사용하는 파이썬 모듈들을 정의해줘야 한다. 다음은 requirements.txt에 의존성이 있는 파이썬 모듈등을 정의하고 저장하는 부분이다.


Apache Beam 코드

Apache Beam의 코드 부분은 크게 복잡하지 않기 때문에 주요 부분만 설명하도록 한다.

Service account 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 코드 실행에 대한 권한을 줘야 한다. 구글 클라우드에서는 사용자가 아니라 애플리케이션에 권한을 부여하는 방법이 있는데, Service account라는 것을 사용한다. Service account는 json 파일로 실행 가능한 권한을 정의하고 있다.

Service account 파일을 생성하는 방법은 http://bcho.tistory.com/1166 를 참고하기 바란다.

Service account 파일이 생성되었으면, 이 파일을 적용해야 하는데 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 Service account  파일의 경로를 정의해주면 된다. 파이썬 환경에서 환경 변수를 설정하는 방법은 os.envorin[‘환경변수명']에 환경 변수 값을 지정해주면 된다.

Jobname 설정

구글 클라우드에서 Apache Beam 코드를 실행하면, 하나의 실행이 하나의 Job으로 생성되는데, 이 Job을 구별하기 위해서 Job 마다 ID 를 설정할 수 있다. 아래는 Job ID를 ‘cifar-10’+시간 형태로 지정하는 부분이다


환경 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 몇가지 환경을 지정해줘야 한다.


  • staging_location은 클라우드 상에서 실행시 Apache Beam 코드등이 저장되는 위치이다. GCS 버킷 아래 /staging이라는 디렉토리로 지정했는데, 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • temp_location은 기타 실행중 필요한 파일이 저장되는 위치이다. 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • zone은 dataflow worker가 실행되는 존으로 여기서는 asia-northeast1-c  (일본 리전의 c 존)으로 지정하였다.


DEV_MODE 에 따른 환경 설정

로컬 환경이나 클라우드 환경에서 실행이냐에 따라서 환경 변수 설정이 다소 달라져야 한다.


디렉토리 경로를 바꿔서 지정해야 하고, 중요한것은 RUNNER인데, 로컬에서 실행하기 위해서는 DirectRunner를 구글 클라우드 DataFlow 서비스를 사용하기 위해서는 DataflowRunner를 사용하면 된다.


readImage 부분

Read Image는 이미지 파일을 읽어서 byte[] 로 리턴하는 부분인데, 로컬 환경이냐, 클라우드 환경이냐에 따라서 동작 방식이 다소 다르다.

클라우드 환경에서는 이미지 파일이 GCS에 저장되어 있기 때문에 파이썬의 일반 파일 open 명령등을 사용할 수 없다.

그래서 클라우드 환경에서 동작할 경우에는 GCS에서 파일을 읽어서 Worker의 로컬 디스크에 복사를 해놓고 이미지를 읽어서 byte[]로 변환한 후에, 해당 파일을 지우는 방식을 사용한다.


아래 코드에서 보면 DEV_MODE가 False 인경우 GCS에서 파일을 읽어서 로컬에 저장하는 코드가 있다.


storageClient는 GCS 클라이언트이고 bucket 을 얻어온후, bucket에서 파일을 get_blob 명령어를 이용하여 경로를 저장하여 blob.download_to_file을 이용하여 로컬 파일에 저장하였다.

실행

코드 작성이 끝났으면 실행을 한다. 실행 상태는 구글 클라우드 콘솔의 Dataflow  메뉴에서 확인이 가능하다.

아래와 같이 실행중인 그리고 실행이 끝난 Job 리스트들이 출력된다.




코드 실행중에, 파이프라인 실행 상황 디테일을 Job 을 선택하면 볼 수 있다.


여기서 주목할만한 점은 우측 그래프인데, 우측 그래프는 Worker의 수를 나타낸다. 초기에 1대로 시작했다가 오토 스케일링에 의해서 9대 까지 증가한것을 볼 수 있다.

처음 실행이었기 때문에 적정한 인스턴스수를 몰랐기 때문에 디폴트로 1로 시작하고 오토스케일링을 하도록 했지만, 어느정도 테스트를 한후에 적정 인스턴수를 알면 오토 스케일링을 기다릴 필요없이 디폴트 인스턴스 수를 알면 처음부터 그 수만큼 인스턴스 수로 시작하도록 하면 실행 시간을 줄일 수 있다.

만약에 파이프라인 실행시 에러가 나면 우측 상단에 LOGS 버튼을 누르면 상세 로그를 볼 수 있다.


아래 그림은 파이프라인 실행이 실패한 예에서 STACK TRACES를 통해서 에러 내용을 확인하는 화면이다.



해당 로그를 클릭하면 Stack Driver (구글의 모니터링 툴)의 Error Reporting 시스템 화면으로 이동하게 된다.

여기서 디테일한 로그를 볼 수 있다.

아래 화면을 보면 ReadImage 단계에서 file_path라는 변수명을 찾을 수 없어서 나는 에러를 확인할 수 있다.


TFRecord 파일 검증

파이프라인 실행이 끝나면, GCS 버킷에 tfrecord 파일이 생성된것을 확인할 수 있다.


해당 파일을 클릭하면 다운로드 받을 수 있다.

노트북 아래 코드 부분이 TFRecord를 읽어서 확인하는 부분이다. 노트북에서 tfrecord 파일의 경로를 다운로드 받은 경로로 변경하고 실행을 하면 파일이 제대로 읽히는 지 확인할 수 있다.


파일 경로 부분은 코드상에서 다음과 같다.



정상적으로 실행이 된 경우, 다음과 같이 tfrecord에서 읽은 이미지와 라벨값이 출력됨을 확인할 수 있다.


라벨 값은 Label 줄에 values 부분에 출력된다. 위의 그림에서는 순서대로 라벨 값이 4와 2가 된다.



Apache airflow


조대협 (http://bcho.tistory.com)

배경

빅데이타 분석이나, 머신러닝 코드를 만들다 보면 필요한것중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이타 베이스의 ETL 작업과 비슷한 흐름이라고 보면 된다.


예를 들어 머신러닝의 학습 과정을 보면 데이타 전처리,학습,배포,예측과 같은 단계를 가지게 된다.


  • rawdata를 읽어서 preprocessing 단계를 거쳐서 학습에 적절한 training data로 변경하고,

  • 변경된 training data를 가지고 머신러닝 모델을 학습한후, 학습된 모델을 저장한다.

  • 학습된 모델을 가지고 예측을 해서 결과를 저장한다.


이렇게 머신러닝은 여러개의 단계를 거쳐서 수행이 되는데, 각 단계가 끝나면 다음 단계를 수행해야 한다. 단순하게 CRON+쉘로 순차적으로 수행하는 것등이 가능하지만, 에러가 났을때 재처리를 하거나 , 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.

데이타 워크 플로우 관리 도구

이런 요구 사항 때문에 여러가지 툴이 개발되었는데, 대표적인 도구로는 하둡 에코시스템에 우지(oozie ) 등이 있다.



<그림. Oozie eclipse 클라이언트 >


하둡의 여러 에코 시스템 솔루션들을 유기적으로 조합하기 위해서 개발된 도구로, 하둡 에코 시스템에 있는 여러가지 다양한 솔루션과 연동하기 위한 아답터를 가지고 있다.

이외에도 rundeck, luigi와 같은 유사한 솔루션들이 있다.

오늘 소개하고자 하고자하는 데이타 워크 플로우 관리도구는 아파치 오픈소스 airflow 이다. 원래 airbnb에서 개발된 도구로 현재 아파치 오픈소스에서 인큐베이터 단계에 있는 소프트웨어이다.


airflow를 소개하는 이유는 첫번째 파이썬 기반으로 태스크 코드를 작성할 수 있기 때문에, 데이타 분석이나 머신러닝을 개발하는 엔지니어들에게 익숙한 언어이고, 한대에서 동작하는게 아니라 여러 머신에 분산하여 수행 될 수 있는 장점을 가지고 있다.



<그림. Apache airflow 의 작업 그래프 구조 화면 >

airflow 시작하기

그러면 간단하게 airflow에 대한 개념과 사용법에 대해서 알아보자

airflow 설치

airflow는 실행되는 작업의 상태등을 저장하기 위해서 데이타 베이스 (MySQL이나 Postgres)등이 필요하며, 분산 환경을 위해서 여러대에 설치할 수 있다. 또한 로컬 환경에 sqlite와 함께 간단하게 설치할 수 있다. 여기서는 간단하게 개인 맥북환경에 로컬로 설치 및 실행하는 시나리오로 설명한다.


설치 방법은 매우 간단하다. 파이썬 2.7 환경에서 아래와 같이 간단학 “pip install airflow”만 실행해주면 된다.

%pip install airflow



airflow가 설치되었으면 데이타 베이스 설정을 해줘야 하는데, 이 튜토리얼에서는 개발 및 테스트를 위해서 sqlite를 사용한다. sqlite를 초기화 하기 위해서 다음과 같이 “airflow initdb” 명령을 실행한다.

% airflow initdb


자아 이제 설치가 끝나고 airflow를 사용할 준비가 되었다. 이제 airflow 웹콘솔을 기동해보자

“airflow webserver -p 8080” 을 실행하고 웹에 http://localhost:8080에 접근하면 airflow 콘솔을 볼 수 있다.

airflow 코드

airflow에서 워크플로우를 저장하기 위해서 몇가지 추상화된 개념을 사용한다.

Airflow DAG의 구조

DAG (Directed Acyclic Graph)

DAG는 하나의 워크 플로우라고 보면 된다. 위의 예제처럼, 머신러닝 이라는 DAG를 정의한다면, Preprocessing,Training,Prediction 워크플로우가 하나의 DAG가 된다.

Operator and Task

Operator는 DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이야기 하는데, Pre processing, Training, Prediction 함수가 Operatorator 이다.

이 Operator 함수가 DAG 상에서 실제로 워크플로우 상에 정의되서 호출 되면 이것이 Task 이다.

객체지향 언어에서 Operator가 class 라면, Task는 object 라고 보면 된다.


이해가 잘안될 수 있는데, 코드를 보자


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator

from datetime import datetime,timedelta


dag = DAG('hello-airflow',description='Hello airflow DAG',

         schedule_interval = '*/5 0 * * *',

         start_date=datetime(2017,07,01),catchup=False)


def print_hello():

   return 'Hello Airflow'


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


bash_task.set_downstream(python_task)


DAG 정의 부분을 보자. DAG 객체는 DAG에 대한 전체 컨택스를 저장 및 유지 관리한다.

DAG('hello-airflow',description='Hello airflow DAG', 에서 DAG를 이름을 ‘hello-airflow’로 정의하고 description에 설명을 적는다.

schedule_interval = '*/5 * * * *', 다음으로 이 DAG가 실행되는 주기를 정해야 하는데, cron 명령과 같은 노테이션으로 정의한다. 위 설정은 매 5분마다 실행되도록 하는 설정이다.

마지막으로, start_date=datetime(2017,07,01), ,DAG를 언제부터 시작할것인지 지정한다. DAG는 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG를 부르는 sub DAG의 경우에는 지역 변수로 지정이 가능하다.


다음 task에 사용할 operator를 정의하는데, 파이썬 코드를 실행할 오퍼레이터인 PythonOperator와 쉘 커맨드를 실행할 BashOperator를 가지고 각각 파이썬 태스크 python_task와, 쉘 태스크 bash_task를 정의한다.


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


파이썬 태스크의 id는 “python_operator”라고 지정하였고, 실행시 print_hello를 호출하도록 하였다.

그리고 이 태스크는 DAG인 dag에 지정한다.


다음 쉘 태스크의 내용은 다음과 같다.

bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


print_data라는 이름으로 태스크를 정의하고, 쉘 명령어 date를 실행하도록 하였다.

등록

코드 작성이 끝나면 코드를 배포해보자. Dag 파일을 airflow에 등록해야 하는데, dag 파일을 저장하는 장소는 dags_folder 라는 변수로 $AIRFLOW_HOME/airflow.cfg 파일안에 정의 되어 있다. 디폴트 장소는 $AIRFLOW_HOME/dags/ 폴더이다. 위에서 작성한 코드를 해당 디렉토리에 복사하자

다음 dag이 제대로 등록되었는지를 확인한다. 커멘드 창에서 “airflow list_dags”라는 명령을 수행하면 현재 등록되어 있는 DAG 목록을 볼 수 있다. 아래 그림과 같이 hello-airflow dag가 등록된것을 확인할 수 있다.




hello-airflow dag안에 어떤 태스크들이 정의되어 있는지를 확인하려면 ‘airflow list_tasks hello-airflow’ 명령을 이용하면 hello-airflow 안에 등록된 태스크 목록을 출력해준다.


테스트

테스트를 하려면 태스크 단위로 테스트가 가능하다. airflow test {DAG ID} {태스크 ID} {실행날짜} 식으로 하면 된다.

, 예를 들어 print_date 태스크를 2017-07-01을 기준으로 실행하고자하면 airflow test hello-airflow print_date 2017-07-01

Hello-airflow DAG안에 print_date라는 태스크를 실행한다.



실행

DAG 코드 개발 등록과 테스트가 완료되었으면 이제 airflow scheduler 를 띄워준다. (일종의 데몬이다.) 스케쥴러는 DAG 코드에 정의된 스케쥴에 따라서 테스크를 실행해준다.

스케쥴러 실행은 간단하게 airflow scheduler 명령을 실행하면 된다.



스케쥴러가 실행되면, 각 DAG의 스케쥴에 따라서 자동으로 태스크들을 수행한다.


로그 모니터링

스케쥴러에 의해서 실행되는 DAG와 태스크들의 결과와 로그는 어떻게 모니터링 할까? airflow에 의해서 수행되는 태스크들은 $AIRFLOW_HOME/logs 디렉토리에 저장된다.

logs 디렉토리 아래에 각각 DAG 이름으로 저장이 되며, DAG 이름으로 된 디레토리안에는 태스크명으로 된 서브 디렉토리가 있고, 이 서브 디렉토리 아래에 시간대별 로그가 있다.

즉 hello-airflow DAG의 print_date 태스크에 대한 로그는 $AIRFLOW_HOME/logs/hello-airflow/print_date/{날짜및시간} 파일 명으로 저장된다.

웹 콘솔을 이용한 모니터링

airflow의 강력한 기능중의 하나는 웹 기반의 모니터링 콘솔을 제공한다. 뒤에서는 주요 웹 콘솔의 주요 기능에 대해서 알아보도록 한다.

Graph View

Graph View는 DAG의 구조를 그래프 형태로 보여주는 뷰이다.


복잡한 워크플로우의 경우 그 구조를 파악하는데 유용한다. 위의 그림은 앞서 만든 hello-airflow 에 대한 태스크간 그래프로 print_date를 호출한 후에, python_operator 태스크를 호출하는 것을 볼 수 있다.

Tree View


트리뷰를 보면, DAG의 구조를 트리 형태로 보여주고, DAG의 태스크가 각각 성공했는지 실패 했는지를 우측 그래프 처럼 표현해준다. 각 태스크를 로그를 보려면 각 태스크 실행 결과 그래프를 누르면 아래와 같이 세부 메뉴가 나온다.



여기서 View Log를 누르면 각 Task 별로 실행 당시의 로그를 볼 수 있다. 아래는 Python_Operator 태스크를 실행한 로그이다.



아래서 두번째 줄을 보면 Hello Airflow 라는 문자열을 리턴한것을 확인할 수 있다.


Task Duration

Task duration은 DAG에서 수행된 각 태스크의 수행 시간을 그래프 형태로 나타내준다.



어떤 태스크가 시간이 많이 걸리는지 그리고 수행시간이 매번 수행할때 마다 올바른지 (큰 변화가 없고 일정한지. 이건 매우 유용할듯) 등을 체크할 수 있다.

Task Tries


Task Tries 에서는 각 수행별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 즉 재시도 (RETRY)횟수를 모니터링할 수 있다.


Gantt


Gantt 차트는 각 수행에 대해서 태스크들의 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다.

앞의 차트에서 이미 얻을 수 있는 뷰이지만, 각 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉽다.


<그림 airflow gantt chart 그래프 예제 (출처 : https://www.agari.com/airflow-agari/) >


이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 등 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow를 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 수 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 쉽게  정교한 플로우 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.

스파크 성능이 안나오면, 우리 회사 데이타팀 팀장왈. 먼저 파이썬으로 짰는지 확인 부터 해보라길래, 파이썬과 스칼라로 만들어진 스파크 성능 차이가 얼마나 나는지 찾아봤더니 다음과 같은 수치가 나왔다.


http://emptypipes.org/2015/01/17/python-vs-scala-vs-spark/ (원본 출처)


일단 스파크를 할려면 스칼라는 필수인듯 하다. 

간단한 프로토타입핑등에는 파이썬을 사용할 수 있겠지만 결국 프로적션은 스칼라로 최적화해야 할듯.

근데. 자바대 스칼라 성능 비교는 없네


Apache Spark(스파크) - RDD Persistence (스토리지 옵션에 대해서)


조대협 (http://bcho.tistory.com)


Spark Persistence에 대해서


앞에 글에서 Spark RDD가 메모리에 상주 되는 방법에 대해서 간략하게 언급했는데, 다시 되 짚어 보면 Spark의 RDD는 filter() 등. 여러  Transformation Operation을 실행하더라도  Transformation  단계가 아니라 Action이 수행되는 단계에 로드된다고 설명하였다.


그리고, 매번 해당 RDD가 Action으로 수행될 때마다 다시금 소스에서 부터 다시 로드되서 수행된다고 했는데, 그렇다면 매번 로드 해서 계산하여 사용하는 것이 아니라, 저장해놓고 사용 하는 방법이 무엇이 있을까?


스파크에서는 RDD 를 저장해놓고 사용하는 기능으로 persist()와 cache() 라는 두 가지 오퍼레이션을 지원한다.

스파크는 RDD를 저장함에 있어서,  메모리와 디스크 두 가지 영역을 사용하며, 옵션에 따라서 RDD의 저장 영역을 지정할 수 있다.


기본 디폴트는 메모리에 저장하고, 옵션으로 디스크를 지정할 수 있다. 디스크를 지정하면, 메모리에서 모지란 부분을 마치 swapping 하듯이 디스크에 데이타를 저장한다.


아래 옵션 참고 




출처 : https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


여기에 특이한 점은, 메모리나 디스크에 저장할때, RDD를 RAW (원본 형식)으로 저장할 것인지 자바의 Serialized 된 형태로 저장할 지를 선택할 수 있다. ( Serealized 된 형태로 저장하기 MEMORY_ONLY_SER, MEMORY_AND_DISK_SER) 이렇게 저장하면, 메모리 사용량은 더 줄일 수 있지만, 반대로 저장시 Serizalied하는 오버로드와, 읽을때 De-Seriazlie 하는 오버로드가 더 붙어서 CPU 사용량은 오히려 증가하게 된다.


아래 데이타는 http://sujee.net/2015/01/22/understanding-spark-caching/#.VWcOh1ntlBc 의 데이타 긴데,

Serialized 로 저장하는 경우, 최대 약 4배 정도의 메모리 용량을 절약할 수 있으나, 반대로, 처리 시간은 400배 이상이 더 들어간다.

 









 


다음으로, 특이한 옵션중에 하나가 OFF_HEAP 이라는 옵션인데, 스파크는 JVM 상에서 동작하기 때문에, 스파크가 저장하는 메모리란 JVM 상의 메모리를 뜻한다. JVM  특성상 Garbage collection 에 의한 성능 제약을 받을 수 있으며 또한 별도로 서로 복제가 되지 않기 때문에, (기본 옵션의 경우에만), 안정적인 서비스를 원할 경우에는 별도의 복제 옵션을 선택해야 한다.

이런 문제를 해결하기 위한 다른 옵션으로는 JVM 내에 데이타를 저장하는 것이 아니라, 별도의 JVM 외의 메모리 공간에 데이타를 저장하는 방식이 OFF_HEAP 이라는 옵션이다. 아직 안정화 되지는 않았지만, http://tachyon-project.org/ 이라는 메모리 클러스터를 이용하여, 서로 복제가 가능한 외부 메모리 클러스터에 저장하는 방식으로, JVM  상 메모리 보다는 성능이 약간 떨어지지만, 디스크보다는 빠르며, 큰 메모리 공간을 장애 대응에 대한 상관 없이 (자체 적으로 HA  기능을 제공함) 사용이 가능하다. 

 cf. Redis나  Infinispan등과 같은 메모리 기반의 데이타 그리드 솔루션의 하나인 Hazelcast 역시 JVM 밖의 네이티브 메모리 공간에 데이타를 저장하는 유사한 방식을 사용한다.


Persist vs Cache


그렇다면, persist()와 cache()의 차이점은 무엇인가? cache()는  persist() 에서 저장 옵션을 MEMORY_ONLY로 한 옵션과 동일하다.


저장된 RDD는 메모리나 디스크에서 언제 삭제 되는가?

RDD가 메모리나 디스크에 로드되었다고 항상 로드된 상태로 있는 것이 아니다. 기본적으로 LRU (Least Recently Used)  알고리즘 (가장 근래에 사용되지 않은 데이타가 삭제되는 방식)에 의해서 삭제가 되가나, 또는 RDD.unpersiste() 함수를 호출하면 명시적으로 메모리나 디스크에서 삭제할 수 있다.


언제 어떤 타입의 Peristence옵션을 사용해야 하는가?


가장 좋은 옵션은 디폴트 옵션인  MEMORY_ONLY  옵션이다. 가장 빠르다.

다음으로 메모리가 모자를 경우에는  MEMORY_ONLY_SER 옵션을 이용하면, Seriazlied 된 형태로 저장하기 때문에 메모리 공간은 줄일 수 있으나, 대신 CPU 사용률이 올라간다. 그래도 여전히 빠른 방식이다.

데이타 양이 많을 경우에는 DISK에 저장하는 옵션보다는 차라리 Persist 를 하지 않고, 필요할때 마다 재계산 하는 것이 더 빠를 수 있다.

빠른 응답이 필요한 경우에 Persist 된 데이타에 대한 유실을 방지하려면, replicated storage 옵션을 사용하는 것이 좋다. (MEMORY_ONLY2 등).  다른 스토리지 타입 역시, 장애로 인해서 데이타가 유실되더라도 재계산을 통하여 복구가 가능하지만, 재계산 하는 것 보다는 RDD 의 복제본을 저장해 놓고, 장애시 페일오버 하는 것이 빠르기 때문에, 빠른 응답시간을 요구로 하는 웹 애플리케이션의 경우 이 스토리지 타입이 유리하다. (단, 메모리 사용량은 복제본을 저장하는데도 사용되기 때문에 상대적으로 일반 스토리지 옵션에 비해서 메모리 여유가 적다.)



참고 

Learning Spark

Spark document - https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


참고 - 이 글은 제가 스파크를 혼자 공부하면서 문서만을 통해서 정리한 글이기 때문에, 실무적인 경험이 많이 녹아 들어 있지 않습니다. 

분산 대용량 큐-Apache Kafka에 대한 검토 내용 정리


실시간 빅데이타 분석 아키텍쳐를 검토하다가 아파치 스톰을 보다보니, 실시간 데이타 스트림은 큐를 이용해서 수집하는 경우가 많은데, 데이타의 양이 많다 보니 기존의 큐 솔루션으로는 한계가 있어서 분산 대용량 큐로 아파치 카프카(Kafka)가 많이 언급된다.

그래서, 아키텍쳐를 대략 보고, 실효성에 대해서 고민을 해봤는데, 큐의 기능은 기존의 JMS나 AMQP 기반의 RabbitMQ(데이타 기반 라우팅,페데레이션 기능등)등에 비해서는 많이 부족하지만 대용량 메세지를 지원할 수 있는 것이 가장 큰 특징이다. 특히 분산 환경에서 용량 뿐 아니라, 복사본을 다른 노드에 저장함으로써 노드 장애에 대한 장애 대응 성을 가지고 있기 때문에 용량에는 확실하게 강점을 보인다.

실제로 마이크로소프트 社의 엔지니어가 쓴 논문을 보면http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

 


카프카의 경우 10만 TPS 이상의 성능을 RabbitMQ는 2만 TPS 정도의 성능을 내는 것으로 나와 있는데, 여기서 생각해볼 문제가 큐는 비동기 처리 솔루션이다. 즉 응답 시간에 그렇게 민감 하지 않다는 것이다.

그리고 일반적인 웹 시스템의 성능이 1500~2000 TPS (엔터프라이즈 시스템의 경우) 내외인 것이 일반적이기 때문에, Rabbit MQ의 2만 TPS의 성능은 충분하다고 볼 수 있지 않을까 한다.

물론 네이버나 해외의 대형 SNS 서비스의 경우에는 충분히 저정도의 용량이 필요하겠지만, 현재로써는 일반적인 시스템에서는 카프카의 용량과 성능은 약간 오버 디자인이 아닌가 하는 생각이 든다.


Rabbit MQ is scalable and




대충보는 Storm #6-Apache Storm 그룹핑 개념 이해하기

 조대협 (http://bcho.tistory.com)



지금까지 컴포넌트간의 경로 라우팅, 즉 Spout 에서 Bolt간, Bolt에서 Bolt간 경로를 설정하는 방법에 대해서 알아보왔다.

그렇다면 각 컴포넌트간 라우팅을 할때 그 안에 있는 Task간에는 어떻게 상세하게 라우팅이 될까? Storm에서는 이 Task간의 라우팅을 정의하기 위해서 Grouping이라는 개념을 사용한다.


Shuffling

가장 간단한 라우팅 방법으로 Bolt A에서 Bolt B로 라우팅을 한다고 했을때, Bolt A내의 있는 Task가 Bolt B에 있는 Task중 아무 Task로 임의로(랜덤하게) 라우팅 하는 방식이다.




Field

Bolt A에 있는 Task에서 Bolt B에 있는 Task로 라우팅을 할때, 규칙성을 갖는 것중 하나인데, 보내고자 하는 데이타의 특정 필드에 있는 값을 기준으로 Bolt B에 있는 특정 Task로 라우팅 하는 방식이다. 라우팅 기준은 지정한 필드의 값을 가지고 해쉬를 계산해서 해쉬에 따라서 Bolt B에 있는 Task로 라우팅 시키는 방식이다.

예를 들어, Bolt B에 Task가 3개가 있다고 가정할때, 나이라는 필드로 “Field Grouping”을 한다고 하면, 나이/3으로 나눈 나머지 값에 따라서 Task A,B,C로 라우팅 하는 방식이다. (나눗셈은 설명을 쉽게 하기 위해 예를 들었지만 비슷한 원리로 해쉬를 계산하여 라우팅을 한다.)

Bolt에서 로컬 캐쉬를 사용하거나 할때, 같은 해쉬의 데이타가 같은 Task로 라우팅이 되게 해서 캐쉬 히트율을 높이는 것등에 유용하게 사용될 수 있다.



Global 

Global 그룹핑은 모으는 개념(Aggregation)의 개념이다. Bolt A의 어느 Task에서 메세지를 보내더라도 항상Bolt B똑같은 하나의 Task로 라우팅이 되는 방식으로, Bolt B에 있는 Task중에서 Task ID가 가장 작은 특정 Task로만 라우팅을 한다.

분산해서 연산한 값을 모두 모아서 합산을 한다던가등에 사용할 수 있다.



All

All 그룹핑은 일종의 브로드 캐스트 개념으로 Bolt A의 하나의 Task가 메세지를 전송하면 Bolt B의 모든 Task가 메세지를 받는 형태이다.

각 Task들에 설정 변경등을 넘길때 유용하게 사용될 수 있다.





Direct

당연히 있을 것으로 생각했겠지만 당연히 있는 기능이다. Bolt A의 Task에서 Bolt B의 특정 Task로 명시적으로 라우팅을 지정하는 기능이다. 이때 주의할점이 Bolt B의 Task를 지정할때,  Task Id가 아니라 Task의 Index로 타겟을 지정한다. 예를 들어 Bolt B에 Task가 5개가 있을때, 0번, 1번식으로 타켓을 지정하게 된다.





Custom

Custom 그룹핑은 라우팅 로직을 개발자가 직접 작성해서 넣는 방식이다.


Local or Shuffle

다소 주의 깊게 볼 필요가 있는 그룹핑 방식이다. 기본적인 동작 방식은 Shuffle과 다르지 않으나,

Bolt A에서 Bolt B의 Task로 라우팅을 할때, Bolt A에서 메세지를 보내는 Task와 같은 JVM 인스턴스 (Woker)에 Bolt B의 Task가 있을 경우 같은 JVM 인스턴스에 있는 Task로 우선 라우팅을 한다. 이는 네트워크를 이용한 리모트 호출을 줄이기 위한 방법이다.

그러면 Bolt의 Task들은 각 Worker에 어떻게 배치 될것인가에 대한 질문이 올 수 있는데, 이렇게 Task를 Worker에 배치하는 행위를Scheduling(스케줄러)라고 하고, 배치를 하는 주체를 Scheduler라고 한다. 자료를 몇개 찾아봤지만 Scheduling 정책에 대해서는 명확하게 나와 있지 않고, 무작위 적으로 배치하는 것으로 보이는데, 조금 더 research가 필요할듯. 


참고 :Pluggable Scheduler

애플리케이션 성격에 맞게 스케쥴링 정책을 구현해서 사용할 수 있는데, 이를 Pluggable Scheduler라고 한다.

http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/ 의 예에 나와 있는 시나리오를 보면, 특정 Spout의 경우에는 상용 소프트웨어를 사용하는데, 이 상용 소프트웨어는 Machine당 라이센스를 가지고 있기 때문에 이 Spout은 반드시 라이센스가 설치된 서버에 스케쥴링(배포)되어야 한다. 그래서 특정 스케쥴링 정책이 필요한데, 첨부 링크에 있는 내용은 Pluggable scheduler를 구현하는 방법에 대해서 설명하고 있다.





대충보는 Storm #5-Apache Storm 병렬 분산 처리 이해하기

 조대협 (http://bcho.tistory.com)

 

Storm에 있는 Spout Bolt들은 여러개의 머신에서 어떻게 나눠서 처리될까? Storm 클러스터는 여러대의 분산된 서버에서 운용되기 때문에, 당연히 Spout Bolt도 나눠서 처리된다 그렇다면 이런 Storm의 병렬 처리 구조는 어떻게 되는 것일까?

이 글에서는 Spout Bolt를 병렬로 처리하는 Storm의 구조에 대해서 알아보도록 한다.

Storm의 병렬 처리를 이해하기 위한 개념

Storm의 병렬 처리를 이해하기 위해서는 몇가지 개념을 정리해야 한다. Node,Worker,Exectutor,Task 이 네 가지 개념을 이해해야 한다.


Node

Node는 물리적인 서버이다. Nimbus Supervisor 프로세스가 기동되는 물리적인 서버이다.

Nimbus는 전체 노드에 하나의 프로세스만 기동하며, Supervisor는 일반적으로 하나의 노드에 하나만 기동한다. 여러대를 기동시킬 수 도 있지만, Supervisor의 역할 자체가 해당 노드를 관리하는 역할이기 때문에 하나의 노드에 여러개의 Supervisor를 기동할 필요는 없다.


Worker

Worker Supervisor가 기동되어 있는 노드에서 기동되는 자바 프로세스로 실제로 Spout Bolt를 실행하는 역할을 한다.


Executor

Executor Worker내에서 수행되는 하나의 자바 쓰레드를 지칭한다.


Task

Task Bolt Spout의 객체를 지칭한다. Task Executor (쓰레드)에 의해서 수행된다.

이 개념을 다시 정리해보면 다음과 같은 그림이 된다.



<그림. Node,Worker,Executor,Task 의 개념>

각 슬레이브 노드에는 Supervisor 프로세스가 하나씩 떠있고, conf/storm.yaml에 정의된 설정에 따라서 worker 프로세스를 띄운다.supervisor.slots.ports에 각 Worker가 사용할 TCP 포트를 정해주면 된다. 아래는 5개의 Worker 프로세스를 사용하도록 한 설정이다.



<그림. Storm 설정에서 Supervisor 5개 띄우도록한 설정>

 

그리고 난후에, Topology를 생성할때, Topology에 상세 Worker,Executor,Task의 수를 정의한다. 앞에서 예제로 사용했던 HelloTopology 클래스 코드를 다시 살펴보자. 아래 코드는 Worker,Executor,Task등을 설정한 예이다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),2)

                       .setNumTasks(4)

                       .shuffleGrouping("HelloSpout");

              

              

               Config conf = new Config();

               conf.setNumWorkers(5);

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<코드. Worker,Executor,Task 수를 설정한 HelloTopology 예제>

     Topology가 사용할 Worker 프로세스의 수 설정
Config
에서 setNumWorkers(5)를 이용해서 이 토폴로지에서 사용한 Worker 프로세스 수를 5개로 지정했다.

     Spout Executor(쓰레드 수) 설정
다음으로 setSpout에서 3번째 인자로 “2”라는 숫자를 넘겼는데, setSpout에 마지막 인자는 Executor의 수이다. 이를 Parallelism 힌트라고 하는데, Spout 컴포넌트가 수행될 쓰레드의 수이다. 여기서는 Spout Task (객체의 수)를 정의하지 않았는데, 정의하지 않은 경우 디폴트로 Executor의 수와 같이 설정된다.

     Bolt Executor(쓰레드 수)Task(객체)수 설정
Bolt
도 마찬가지로 setBolt 3번째 마지막 인자가 Parallelism 힌트인데, 역시 2개로 지정하였다. 여기서는 Task수를 별도로 지정하였는데, setTaskNum(4)을 이용해서 지정한다. 이렇게 설정하면 HelloBolt 객체는 총 4개가 생기고 2개의 Thread에서 번갈아 가면서 실행하게 된다.

자아 그러면 실제로 설정하는데로 동작하는 지 몇가지 확인을 해보자. 자바의 jps 명령을 이용하면 현재 동작중인 자바 프로세스 수를 볼 수 있다.



<그림 Worker 프로세스 수의 확인>

위의 테스트는 하나의 환경에서 nimbus,zookeeper,supervisor,worker를 모두 띄워놓은 형태인데,worker가 설정대로 5개의 프로세스가 떠있고, nimbus,supervisor가 떠 있는 것이 확인되고, QuorumPeerMainzookeeper 프로세스이다.

실제로 Executor가 지정한데로 Thread가 뜨는지 확인을 해보자. 여러개의 Worker 프로세스에 나눠서 뜨면 모니터링하기가 복잡하니 편의상 conf.setNumer(1)로 해서, 하나의 Worker 프로세스에서 모든 Executor가 뜨도록 Topology를 변경한후, Worker 프로세스의 쓰레드를 모니터링 하니 다음과 같은 결과를 얻었다.

코드상에서 HelloSpout에 대한 Parallelism 힌트를 2로 설정하고, HelloBolt에 대한 Parallelism 힌트도 2로 설정하였다.



<그림. Worker 프로세스의 쓰레드 덤프>

실제로 Worker 프로세스내의 쓰레드를 보면 HelloSpout용 쓰레드가 2, HelloBolt용 쓰레드가 2개가 기동됨을 확인할 수 있다.


리밸런싱

Storm 운영중에 노드를 추가 삭제 하거나 또는 성능 튜닝을 위해서 운영중인 환경에 Worker, Executor의 수를 재 조정이 가능하다. 이를 rebalance라고 하는데, 다음과 같은 명령어를 이용해서 가능하다.

% bin/storm rebalance [TopologyName] -n [NumberOfWorkers] -e [Spout]=[NumberOfExecutos] -e [Bolt1]=[NumberOfExecutos] [Bolt2]=[NumberOfExecutos]

미들웨어 엔지니어로써 본 Storm 튜닝

본인의 경우 경력이 톰캣이나 오라클社의 웹로직에 대해 장애진단과 성능 튜닝을 한 경력을 가지고 있어서 JVM이나 미들웨어 튜닝에 많은 관심을 가지고 있는데, 이 미들웨어 튜닝이라는 것이 대부분 JVM과 쓰레드 수등의 튜닝에 맞춰 있다보니, Storm의 병렬성 부분을 공부하다 보니, Executor Worker,Task의 수에 따라서 성능이 많이 차이가 나겠다는 생각이 든다.

특히나 하나의 토폴리지만 기동하는 것이 아니라, 여러개의 토폴로지를 하나의 클러스터에서 구동 할 경우 더 많은 변수가 작용할 수 있는데, 쓰레드란 것의 특성이 동시에 하나의 코어를 차지하고 돌기 때문에, 쓰레드수가 많다고 시스템의 성능이 좋아지지 않으며 반대로 적으면 성능을 극대화할 수 없기 때문에, 이 쓰레드의 수와 이 쓰레드에서 돌아가는 객체(Task)의 수에 따라서 성능 차이가 많이 날것으로 생각된다. 아마도 주요 튜닝 포인트가 되지 않을까 싶은데, 예전에는 보통 JVM당 적정 쓰레드 수는 50~100개 정도로 책정했는데 (톰캣과 같은 WAS 미들웨어 기준). 요즘은 코어수도 많아져서 조금 더 많은 쓰레드를 책정해도 되지 않을까 싶다. 쓰레드 수 뿐 아니라, 프로세스수도 영향을 미치는데, JVM 프로세스의 컨텐스트 스위칭은 쓰레드의 컨텐스트 스위칭보다 길기 때문에, 프로세스를 적게 띄우는 것이 좋을것으로 예상 되지만, JVM 프로세스는 메모리 GC에 의한 pausing 시간이 발생하기 때문에 이 GC 시간을 적절하게 나눠주기 위해서 적절 수 의 프로세스를 찾는 것도 숙제가 아닐까 싶다. 디폴트 worker의 옵션을 보니 768M의 힙 메모리를 가지고 기동하게 되어 있는데, 메모리를 많이 사용하는 연산는 다소 부족하지 않을까 하는 느낌이 든다.

Bolt가 데이타 베이스, 파일 또는 네트워크를 통해서 데이타를 주고 받는 연산을 얼마나 하느냐에 따라서도 CPU 사용률이 차이가 날것이기 때문에 (IO작업중에는 쓰레드가 idle 상태로 빠지고 CPU가 노는 상태가 되기 때문에) IO 작업이 많은 경우에는 쓰레드의 수를 늘리는 것이 어떨까 한다.

Bolt Spout와 같은 통신은 내부적으로 ZeroMQ를 사용하는 것으로 알고 있는데, 아직 내부 구조는 제대로 살펴보지는 않았지만, 같은 프로세스내에서는 네트워크 호출 없이 call-by-reference를 이용해서 통신 효율을 높이고, 통신이 잦은 컴포넌트는 같은 프로세스에 배치 하는 affinity와 같은 속성(?)이 있지 않을까 예측을 해본다.

결과적으로 튜닝 포인트는, Worker,Executor,Task 수의 적절한 산정과, 만약에 옵션이 있다면 리모트 호출을 줄이기 위한 Bolt Spout 컴포넌트의 배치 전략에 있지 않을까 한다.


다음 글에서는 이런 병렬 처리를 기반으로 각 컴포넌트간에 메세지를 보낼때, 여러 Task간에 어떻게 메세지를 라우팅을 하는지에 대한 정리한 그룹핑(Grouping)에 대한 개념에 대해서 알아보도록한다.


 

대충보는 Storm #4-Apache Storm 특징과 기본 개념

 조대협 (http://bcho.tistory.com)


지금까지 Storm에 대해서 이해하기 위해서, 실시간 스트리밍 서비스의 개념에 대해서 알아보고 간단한 HelloStorm 애플리케이션을 제작해서, 싱글 클러스터 노드에 배포해봤다. 대략 실시간 스트리밍이 무엇이고, Storm을 이용해서 어떻게 개발하는지에 대해서는 어느정도 이해를 했을 것이라고 생각한다.

그러면 지금까지의 경험을 조금 더 체졔적으로 정리해서 Storm에 대해서 이해해보도록 하자. 이번에는 Storm에 대한 개념과 아키텍쳐 구조에 대해서 알아보겠다.


Storm의 특징

Storm을 실시간 스트리밍을 처리하기 위한 서버이자 프레임웍이다. 그렇다면 이 Storm이 다른 스트리밍 처리 솔루션에 비해 가지는 특징은 무엇일까?


확장성

Storm은 클러스터링 기능을 이용해서 수평으로 확장이 가능하다. 그래서 많은 데이타를 다루어야 하는 빅데이타이 데이타 스트림 서비스에서도 가능한데, Storm홈페이지에 포스팅된 자료를 보면, 2x2.4GHz CPU 24GB 메모리 머신을 기반으로 초당 100바이트짜리 메세지를 100만개 정도 처리가 가능하다고 한다. (https://storm.apache.org/about/scalable.html) TPS로 환산하면 100 TPS이다. (Wow!!)


장애 대응성

Storm의 다른 특징 중의 하나는 Fault tolerant 구조를 통한 장애 대응 능력이다. ZooKeeper를 통해서 전체 클러스터의 운영 상태를 감지 하면서, 특정 노드가 장애가 나더라도, 시스템이 전혀 문제 없이 작업을 진행할 수 있으며, 장애가 난 노드에 할당된 작업은 다른 노드에 할당해서 처리하고, 장애 노드에 대해서는 복구 처리를 자동으로 수행해준다.


메세지 전달 보장

Storm은 메세지 처리에 안정성을 제공하는데, 장애가 나건 문제가 있건간에,유실 없이 최소한 한번 메세지가 처리될 수 있게 지원한다. (at least once : 이말은 반대로 이야기 하면, 1번 이상 같은 메세지가 중복 처리될 수 있다는 이야기이다.)

만약에 정확하게 메세지가 한번만 처리가 되기를 원하면 Trident (https://storm.apache.org/documentation/Trident-tutorial.html) 를 통해서 Storm을 확장하면, 정확히 하나의 메세지가 한번만 처리되도록 할 수 있다.


쉬운 배포

Storm은 메뉴얼에 따르면(?) 배포와 설정이 매우 쉽다. 실제로 클러스터를 구성해보면 분산 시스템인데 비해서 별로 어려움 없이 배포와 설정이 가능하다. 그리고 메뉴얼에 따르면(?) 배포 후에, 크게 많은 관리 없이 운영이 가능하다고는 하는데, 이것은 실제로 해보지 않았기 때문에 패스

일단, 오픈소스인데도 설치 후 웹 기반의 모니터링 콘솔을 제공하기 때문에 시스템의 상태를 쉽게 모니터링 하고 운영하는데 도움을 준다.


여러 프로그래밍 언어 지원

Storm은 기본적으로 JVM (Java Virtual Machine)위에서 동작하기는 하지만, Twitter Thrift 프로토콜을 기반으로 하기 때문에, 다양한 언어로 구현이 가능하다. Java 뿐 아니라, JVM을 사용하지 않는 경우에 대해서는 stdin/stdout을 통해서 데이타를 주고 받음으로써, Ruby,Python,Javascript,Perl 등 다양한 언어를 사용할 수 있다.


다양한 시스템 연계

Storm은 다양한 다른 솔루션과 통합이 가능하다. 데이타를 수집하는 부분에서는 Kestrel (http://robey.github.io/kestrel/), RabbitMQ (http://www.rabbitmq.com/) , Kafka (http://kafka.apache.org/), JMS 프로토콜, mazon Kinesis (http://aws.amazon.com/kinesis/)

등이 연동이 가능하며, 다양한 데이타 베이스 (RDBMS, Cassandra, MongoDB )에도 쉽게 연계가가능하다. CEP(Complex Event Processing을 지원하는) 이벤트 처리 분야에서는  Drools (http://www.drools.org/),  Esper (http://esper.codehaus.org/등이 연계 가능하고. 그외에도 Elastic Search (http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/storm.html

) , node.js (https://github.com/paralect/storm-nodejs-starterkit)등 다양한 솔루션과 연동을 통해서 시스템을 확장해 나갈 수 있다.


오픈소스

마지막으로 Storm은 오픈소스이다. 상업적 활용이 가능한 Apache License 2.0을 따르고 있다.

Apache License 2.0 http://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EB%9D%BC%EC%9D%B4%EC%84%A0%EC%8A%A4

Apache License 2.0 한국 번역본 http://yesarang.tistory.com/272

Storm은 유사한 특징을 가지고 있는 Apache Spark에 비해서, 개발이 된지 오래되어서 안정성이 높고 특유의 구조상 장애 대처 능력과 메세지 전달 보장 능력등이 좋다. 반대로 Spark은 최근에 만들어진 만큼 머신 러닝등 더 많은 기능을 가지고 있다.

Storm의 기본 개념

자아 그러면 이제 Storm의 개념을 다시 정립해보자. Storm의 개념을 이해하려면 필수적으로 먼저 이해해야 하는것이 Spout Bolt의 개념이다.

Spout Bolt

Spout Storm 클러스터로 데이타를 읽어들이는 데이타 소스이다. 외부의 로그 파일이나, 트위터 타임 피드와 같인 데이타 스트림, 큐등에서 데이타를 읽어드린다. 이렇게 읽어 드린 데이타를 다른 Bolt로 전달한다. Spout에는 크게 4가지 중요한 메서드가 있다.

Ÿ   open() : 이 메서드는 Spout이 처음 초기화 될때 한번만 호출되는 메서드로, 데이타 소스로 부터의 연결을 초기화 하는 등의 역할을 한다.

Ÿ   nextTuple() : 이 메서드는 데이타 스트림 하나를 읽고 나서, 다음 데이타 스트림을 읽을 때 호출 되는 메서드 이다.

Ÿ   ack(Object msgId) : 이 메서드는 데이타 스트림이 성공적으로 처리되었을때 호출되는데, 이 메서드에서는 성공 처리된 메세지를 지우는 등, 성공 처리에 대한 후처리를 구현한다.

Ÿ   fail(Object msgId) : 이 메서드는 해당 데이타 스트림이 Storm 토폴로지를 수행하던중에, 에러가 나거나 타임아웃등이 걸렸을때 호출되는데,이때에는 사용자가 에러에 대한 에처 처리 로직을 명시해야 한다. 흔히 재처리 로직을 구현하거나 또는 에러 로깅등의 처리를 하게 된다.

Bolt는 이렇게 읽어 드린 데이타를 처리하는 함수이다. 입력 값으로 데이타 스트림을 받고, 그 데이타를 내부의 비지니스 로직에 따라서 가공한 다음에 데이타 스트림으로 다른 Bolt로 넘겨주거나 종료 한다. Bolt에서 정의되는 주요한 메서드는 다음과 같다.

Ÿ   prepare (Map stormConf, TopologyContext context, OutputCollector collector): 이 메서드는 Bolt 객체가 생성될때 한번 호출 된다. 각종 설정 정보나 컨텍스트등 초기 설정에 필요한 부분을 세팅하게 된다.

Ÿ   execute(Tuple input): 가장 필수적인 메서드로, Bolt에 들어온 메세지를 처리하는 로직을 갖는다. 종단 Bolt가 아닌 경우에는 다음 Bolt로 메세지를 전달하기도 한다.

Storm 클러스터내에는 여러개의 Spout Bolt가 존재하게 된다.



<그림. Storm Spout Bolt의 개념>

Topology

이렇게 여러 개의 Spout Bolt간의 연관 관계를 정의해서 데이타 흐름을 정의하는 것을 토폴로지(Topology)라고 한다. 아래 그림과 같이 데이타가 어디로 들어와서 어디로 나가는지를 정의하는 것인데, 아래 그림은 두 개의 Spout에 대해서 각각의 토폴로지를 정의하였다.



<그림. Storm Topology>

Spout Bolt간의 연결 토폴로지는 TopologyBuilder라는 클래스를 통해서 정의한다. 그러면 간략하게, Spout Bolt, Bolt간의 데이타 흐름 관계를 어떻게 정의하는지 살펴보도록 하자.

다음과 같은 토폴로지 흐름을 정의한다고 가정하자



<그림. 간단한 토폴로지 정의 예제>


HelloSpout은 앞서 예제에서 만든것과 같은 Spout이고,

EchoBoltA 는 각각 들어온 메세지에 “Hello I am BoltA :”+메세지를 붙여서 화면에 출력한 후 전송하고 EechoBoltB는 들어온 메세지에 “Hello I am BoltB :”+메세지를 붙여서 전송한다.


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class EchoBoltA extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Hello I am Bolt A: "+value);

               collector.emit(new Values("Hello I am Bolt A :"+value));

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

                 declarer.declare(new Fields("say"));

        }

 

}

<그림 EchoBoltA 클래스>


※ 참고 EchoBoltB 클래스도 클래스명만 다르고 내부 구현 내용은 동일하다

토폴로지를 정의할때, HelloSpout hs라는 ID로 생성을 할것이고, EchoBoltA eba라는 ID, EchoBoltB ebb라는 이름으로 생성을 할것이다.

토폴로지 생성 코드를 보자. 아래 노랑색으로 표시된 부분이 실제 토폴로지는 구성하는 부분이다.


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

import com.terry.storm.hellostorm.EchoBoltB;

import com.terry.storm.hellostorm.EchoBoltA;

 

public class ToplogySequence {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("hs", new HelloSpout(),1);

               builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

               builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

              

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("ToplogySequence", conf,builder.createTopology());

               Utils.sleep(1000);

               // kill the LearningStormTopology

               cluster.killTopology("ToplogySequence");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

}

 

<그림. 위의 그림에 있는 토폴로리지를 실제로 구현한 >

 

Spout을 구현한 부분을 보자


builder.setSpout("hs", new HelloSpout(),1);

 

를 통해서 Spout을 생성하는데, setSpout(“{id}”,”{Spout 객체}”,”{Parallelism 힌트}”) 로 이루어진다. 여기서 id“hs”로 정의했고, Spout 객체는 HelloSpout을 지정했다.

    Paralleism 힌트는 나중에 병령 처리와 그룹핑 개념을 설명할때 다시 설명하도록 한다.

다음으로 Bolt를 생성하는데,

builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

 

로 각각의 Bolt를 생성했다. 이때 주목해야 하는 점이 뒤에 붙어 있는 shufflerGrouping이라는 메서드인데, Spout Bolt간의 연관 관계는 이 Grouping이라는 개념을 이용해서 생성한다. Grouping에는 여러가지 종류와 개념이 있지만 여기서는 간단한 shuuflerGrouping만을 사용했다.첫번째 EchoBoltA에서 자신을 “eba” 라는 id로 생성을 한후에, suffelerGrouping(“hs”)를 선언했는데, 이는 “hs”라는 ID를 가지고 있는 Spout이나 Bolt로 부터 메세지를 받아들이겠다는 이야기이다. 두번째 EchoBoltBsuffelerGrouping(“eba”)를 통해서, id“eba” Spout이나 Bolt, 즉 앞서 생성한 EchBoltA로 부터 메세지를 받아들이겠다는 이야기이다.

자 그러면 이 토폴로지를 실행해 보자.

실행하면 다음과 같은 로그를 얻을 수 있다.

5399 [Thread-12-hs] INFO  backtype.storm.daemon.task - Emitting: hs default [hello world 1]

5400 [Thread-8-eba] INFO  backtype.storm.daemon.executor - Processing received message source: hs:4, stream: default, id: {}, [hello world 1]

Hello I am Bolt A: hello world 1

5401 [Thread-8-eba] INFO  backtype.storm.daemon.task - Emitting: eba default [Hello I am Bolt A :hello world 1]

5409 [Thread-10-ebb] INFO  backtype.storm.daemon.executor - Processing received message source: eba:2, stream: default, id: {}, [Hello I am Bolt A :hello world 1]

Hello I am Bolt B: Hello I am Bolt A :hello world 1

 

     5399 번 라인에서 12번 쓰레드에서 수행되는 “hs” 라는 이름의 Spout, “hello world 1” 이라는 문자열을 emit (제출) 하였다

     5400 번 라인에서 8번 쓰레드에서 수행되는 “eba”라는 Bolt“hs”라는 Spout또는 Bolt에서 “hello world”라는 문자열을 받았다. 그 다음 라인에 “Hello I am Bolt A: hello world 1”가 출력되는 것을 확인할 수 있다.

     5401 라인에서 8번 쓰레드에서 수행되는 “eba” 볼트가 “Hello I am Bolt A :hello world 1” 라는 문자열을 제출하였다.

     5409 라인세서 10번 쓰레드에서 수행되는 “ebb”라는 id의 볼트가 “eba”로 부터 “Hello I am Bolt A :hello world 1” 라는 메세지를 받았다. 다음 행에 EchoBoltB에 의해서 처리되어 “Hello I am Bolt B: Hello I am Bolt A :hello world 1” 문자열이 출력되었음을 확인할 수 있다.

Stream Tuple

다음으로 데이타 Stream Tuple에 대한 개념을 이해해야 한다.

Storm에서 데이타는 Stream이라는 개념으로 정의되는데, Stream이란, Spout Bolt간 또는 Bolt간을 이동하는 데이타들의 집합을 이야기 한다.

각각의 Stream은 하나의 Tuple로 이루어 지는데, Tuple형태로 정의된다.



<그림. Storm Stream Tuple 개념>


앞에 예제에서는 하나의 키만 있는 Tuple을 사용하였다.

앞에서 사용한 HelloSpout 클래스를 다시 한번 살펴보면


public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private static int count=0;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 if(count++<10) this.collector.emit(new Values("hello world "+count));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout>

nextTuple 부분에서 newValue로 하나의 값을 보내는 것을 볼 수 있다. 그리고 이 Tuple의 키 구조는 아래 declareOutputFields 메서드에서 “say” 라는 필드이름으로 정의된것을 볼 수 있다.

실제로 HelloSpout에서 생성하는 데이타 스트림은 다음과 같다.



<그림 데이타 Stream Turple 구조>

이번 글에서는 간단하게 Storm의 특징과 기본 개념에 대해서 알아보았다. 다음글에서는 조금더 상세한 Storm의 아키텍쳐의 개념과 병렬 처리 개념에 대해서 알아보도록 하겠다.

 

 

대충보는 Storm #3-Storm 싱글 클러스터 노드 설치 및 배포

조대협(http://bcho.tistory.com)

 

지난번에는 간략하게, Storm을 이용한 HelloStorm 애플리케이션을 개발용 클러스터인 Local Cluster에서 구동해봤다. 이번에는 운영용 클러스터를 설정하고, 이 운영 클러스터에 지난번에 작성한 HelloStorm 토폴리지를 배포해보도록 한다.

Storm 클러스터의 기본 구조

Storm 클러스터를 기동하기 전에, 클러스터가 어떤 노드들로 구성이 되는지 먼저 알아보도록 하자 Storm 클러스터는 기본적으로 아래와 같은 3가지 구성요소로 구성이 되어 있다.

먼저 주요 노드인 Nimbus Supervior 노드에 대해서 알아보자, Nimbus Supervisor 노드는 각각 하나의 물리 서버로 생각하면 된다.


Nimbus

Nimbus는 마스터 노드로 주요 설정 정보를 가지고 있으며, Nimbus 노드를 통해서 프로그래밍 된 토폴로지를 Supervisor 노드로 배포한다. 일종의 중앙 컨트롤러로 생각하면 된다. Storm에서는 중앙의 하나의 Nimbus 노드만을 유지한다.

Supervisor

Supervisor 노드는 실제 워커 노드로, Nimbus로 부터 프로그램을 배포 받아서 탑재하고, Nimbus로 부터 배정된 작업을 실행하는 역할을 한다. 하나의 클러스터에는 여러개의 Supervisor 노드를 가질 수 있으며, 이를 통해서 여러개의 서버를 통해서 작업을 분산 처리할 수 있다.

Zookeeper

이렇게 여러개의 Supervisor를 관리하기 위해서, Storm Zookeeper를 통해서 각 노드의 상태를 모니터링 하고, 작업의 상태들을 공유한다.

Zookeeper는 아파치 오픈소스 프로젝트의 하나로, 분산 클러스터의 노드들의 상태를 체크하고 공유 정보를 관리하기 위한 분산 코디네이터 솔루션이다.

전체적인 클러스터의 구조를 살펴보면 다음과 같다.



<그림. Storm 클러스터의 구조>

하나의 싱글 머신에 Nimbus가 설치되고, 다른 각각의 머신에 Supervisor가 하나씩 설치된다. 그리고, ZooKeeper 클러스터를 통해서 기동이 된다.

※ 실제 물리 배포 구조에서는 Nimbus Supervisor, ZooKeeper등을 하나의 서버에 분산배포할 수 도 있고, 여러가지 다양한 배포구조를 취할 수 있으나, Supervisor의 경우에는 하나의 서버에 하나의 Supervisor만을 설치하는 것을 권장한다. Supervisor의 역할이 하나의 물리서버에 대한 작업을 관리하는 역할이기 때문에, 한 서버에 여러 Supervisor를 설치하는 것은 적절하지 않다.

 

설치와 기동

Storm 클러스터를 기동하기 위해서는 앞에서 설명한바와 같이 ZooKeeper가 필요하다. Zookeeper를 다운로드 받은 후에, ~/conf/zoo_sample.cfg 파일을 ~/conf/zoo.cfg로 복사한다.

다음으로 ZooKeeper를 실행한다.

% $ZooKeeper_HOME/bin/zkServer.cmd


<그림. 주키퍼 기동 로드>


다음으로 Nimbus 노드를 실행해보자. Storm을 다운 받은 후 압축을 푼다.

다음 $APACHE_STORM/bin 디렉토리에서

%storm nimbus

를 실행하면 nimbus 노드가 실행된다. 실행 결과나 에러는 $APACHE_STORM/logs 디렉토리에 nimbus.log라는 파일로 기록이 된다.

정상적으로 nimbus 노드가 기동이 되었으면 이번에는 supervisor 노드를 기동한다.

%storm supervisor

Supervisor에 대한 노드는 $APACHE_STORM/logs/supervisor.log 라는 이름으로 기록된다.

Storm은 자체적으로 클러서터를 모니터링 할 수 있는 UI 콘솔을 가지고 있다. UI를 기동하기 위해서는

%storm ui

로 실행을 해주면 UI가 기동이 되며 http://localhost:8080 에 접속을 하면 관리 콘솔을 통해서 현재 storm의 작동 상태를 볼 수 있다.



<그림. Storm UI를 이용한 기동 상태 모니터링>


현재 하나의 PC nimbus supervior,UI를 모두 배포하였기 때문에 다음과 같은 물리적인 토폴로지가 된다.



<그림. 싱글 서버에 nimbus supervisor를 같이 설치한 예>

싱글 클러스터 노드에 배포 하기

싱글 노드 클러스터를 구축했으니, 앞의 1장에서 만든 HelloStorm 토폴로지를 이 클러스터에 배포해보도록 하자.

전장의 예제에서 만든 토폴로지는 Local Cluster를 생성해서, 자체적으로 개발 테스트용 클러스터에 토폴로지를 배포하도록 하는 코드였다면, 이번에는 앞에서 생성한 Storm 클러스터에 배포할 수 있는 토폴로지를 다시 만들어야 한다. 이 코드의 차이는 기존 코드와는 다르게 LocalCluster를 생성하지 않고, 기동중인 클러스터에 HelloTopoloy Submit하도록 한다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<그림. HelloTopology.java>


토폴로지 클래스를 만들었으면 이를 빌드해보자

%mvn clean install

을 실행하면 ~/target 디렉토리에 토폴로지 jar가 생성된것을 확인할 수 있다.



jar 파일을 배포해보도록 하자.

배포는 storm {jar} {jar파일명} {토폴로지 클래스명} {토폴로지 이름} 명령을 실행하면 된다.

% storm jar hellostorm-0.0.1-SNAPSHOT.jar com.terry.storm.hellostorm.HelloTopology HelloTopology

배포 명령을 내리면 $APACHE_STORM_HOME/logs/nimbus.log에 다음과 같이 HelloTopology가 배포되는 것을 확인할 수 있다.


2015-01-25T07:35:03.352+0900 b.s.d.nimbus [INFO] Uploading file from client to storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.365+0900 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.443+0900 b.s.d.nimbus [INFO] Received topology submission for HelloTopology with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "HelloTopology", "storm.id" "HelloTopology-1-1422138903"}

2015-01-25T07:35:03.507+0900 b.s.d.nimbus [INFO] Activating HelloTopology: HelloTopology-1-1422138903

2015-01-25T07:35:03.606+0900 b.s.s.EvenScheduler [INFO] Available slots: (["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6702] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6701] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6700])

2015-01-25T07:35:03.652+0900 b.s.d.nimbus [INFO] Setting new assignment for topology id HelloTopology-1-1422138903: #backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local\\nimbus\\stormdist\\HelloTopology-1-1422138903", :node->host {"226ceb74-c1a3-4b1a-aab5-2384e68124c5" "terry-PC"}, :executor->node+port {[3 3] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [6 6] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [5 5] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [4 4] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [7 7] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [2 2] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [1 1] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703]}, :executor->start-time-secs {[7 7] 1422138903, [6 6] 1422138903, [5 5] 1422138903, [4 4] 1422138903, [3 3] 1422138903, [2 2] 1422138903, [1 1] 1422138903}}

2015-01-25T07:37:48.901+0900 b.s.d.nimbus [INFO] Updated HelloTopology-1-1422138903 with status {:type :inactive}

해당 토폴로지가 배포되었는지를 확인하려면 storm list라는 명령어를 사용하면 현재 기동되고 있는 토폴로지 목록을 확인할 수 있다.




<그림. storm list 명령으로 기동중인 토폴로지를 확인>


실행 결과는 $APACHE_STORM_HOME/logs 디렉토리를 보면 worker-xxx.logs 라는 파일이 생긴것을 확인해 볼 수 있는데, 파일 내용을 보면 다음과 같다.

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

우리가 앞서 구현한 Bolt에서 System.out으로 출력한 내용이 출력된다.

동작을 확인하였으면, 이제 기동중인 토폴로지를 정지 시켜 보자. 토폴로지의 정지는 storm deactivate {토폴로지명} 을 사용하면 된다. 아까 배포한 토폴로지 명이 HelloTopology였기 때문에 다음과 같이 토폴로지를 정지 시킨다.

%storm deactivate HelloTopology

그 후에 다시 storm list 명령을 이용해서 토폴로지 동작 상태를 확인해보면 다음과 같다.



< 그림. Storm  토폴로지 정지와 확인 >


만약에 Topology를 재 배포 하려면 storm kill로 해당 토폴로지를 삭제한 후에, 다시 배포 해야 한다. 이때 주의할점은 storm kill로 삭제해도 바로 삭제가 안되고 시간 텀이 있으니 약간 시간을 두고 재 배포를 해야 한다.


지금까지 간단하게, 운영용 클러스터를 구성하고, 운영 클러스터에 토폴로지를 배포 하는 것에 대해서 알아보았다.

HelloStorm 코드 구현, 클러스터 노드 구축 및 배포를 통해서 간단하게 스톰이 무엇을 하는 것인지는 파악했을 것으로 안다. 다음 장에는 조금 더 구체적으로 Storm의 개념과 스톰의 아키텍쳐등에 대해서 살펴보도록 하겠다.


 


대충보는 Storm #2-Storm 설치와 HelloStorm 작성하기

조대협(http://bcho.tistory.com)


Apache Storm Spark

앞서 데이타 스트리밍 처리에 대해서 설명했다. 스트리밍 처리에 대표적인 오픈소스 프레임웍으로는 Apache Storm Apache Spark이 있는데ㅔ, Spark은 최근에 나온 것으로 스트리밍 처리뿐 만 아니라 조금 더 보편적인 분산 컴퓨팅을 지원하는데, Storm의 경우 나온지도 오래되었고 무엇보다 안정성 부분에서 아직까지는 Spark보다 우위에 있기 때문에, Storm을 중심으로 설명하고자 한다

HelloStorm

Storm의 내부 구조 개념등을 설명하기에 앞서, 일단 깔아서 코드부터 돌려보고 개념을 잡아보자


HelloStorm 구조

HelloWorld 처럼 간단한 HelloStorm을 만들어보자. 만들어보려고 하는 Storm 프로그램은 다음과 같다.



<그림. HelloStorm 개념 구조>


HelloSpout 이라는 클래스는, Storm에 데이타를 읽어오는 클래스로 이 예제에서는 자체적으로 데이타를 생성해낸다. Storm으로 들어오는 데이타는 Tuple이라는 형식을 따르는데, Key/Value 형식의 데이타 형을 따른다. 여기서는 키(필드명)“say”, 데이타는 “Hello” 라는 문자열을 가지고 있는 데이타 tuple을 생성한다.

HelloSpout에서 생성된 데이타는 HelloBolt라는 곳으로 전달이 되는데, HelloBolt 클래스는 데이타를 받아서 처리하는 부분으로 간단하게 들어온 데이타에서 “say” 라는 필드의 데이타 값을 System.out으로 출력해주는 역할만을 한다.


개발하기

이클립스를 사용하여, maven project를 생성한다.



다음으로 pom.xml을 작성한다.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

 

  <groupId>com.terry.storm</groupId>

  <artifactId>hellostorm</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>jar</packaging>

 

  <name>hellostorm</name>

  <url>http://maven.apache.org</url>

 

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

<dependencies>

  <dependency>

    <groupId>junit</groupId>

    <artifactId>junit</artifactId>

    <version>3.8.1</version>

    <scope>test</scope>

  </dependency>

  <dependency>

        <groupId>org.apache.storm</groupId>

        <artifactId>storm-core</artifactId>

        <version>0.9.3</version>

  </dependency>

 

</dependencies>

 

<build>

  <plugins>

    <plugin>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.2.1</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <mainClass />

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>make-assembly</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

 

</project>

<그림. pom.xml>


이 예제에서는 storm 0.9.3을 사용했기 때문에 위와 같이 storm-core 0.9.3 dependency 부분에 정의하였다.

다음으로 데이타를 생성하는 HelloSpout을 구현하자


package com.terry.storm.hellostorm;

 

import java.util.Map;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 this.collector.emit(new Values("hello world"));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout.java>


HelloSpout 실행이 되면, 필드 “say” 값이 “hello world” 데이타를 생성해서 다음 워크 플로우로 보낸다.

nextTuple() 이라는 함수에서 외부에서 데이타를 받아들여서 다음 워크 플로우로 보내는 일을 하는데, 여기서는 외부에서 데이타를 받아들이지 않고 자체적으로 데이타를 생성하도록 한다. 데이타를 뒤에 워크플로우에 보내는 함수는 emit인데, emmit부분에 “hello world”라는 value 넣어서 보내도록 하였다. 그렇다면 필드의 값은 어떻게 정의 하느냐? 필드값은 declareOutputField라는 함수에 정의하는데, 데이타의 필드는 “say” 정의하였다.


다음으로 이 HelloSpout에서 생성할 데이타를 처리한 HelloBolt를 구현해보자


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

 

public class HelloBolt extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Tuple value is"+value);

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

              

        }

 

}

<그림. HelloBolt.java>


HelloSpout에서 생성된 데이타는 HelloBolt 들어오는데, 데이타가 들어오면 execute라는 메서드가 자동으로 수행된다. 이때, Tuple 통해서 데이타가 전달된다. 여기서는 tuple에서 필드이름이 “say” 값을 tuple.getStringByField(“say”) 이용해서 꺼내서 System.out으로 출력했다.

눈치가 빠른 사람이라면 벌써 알아차렸겠지만, 데이타를 다음 플로우로 보내고자 할때는 앞의 HelloSpout에서 한것처럼, execute 메서드내에서 데이타 처리가 끝난후에, collector.emit 이용해서 다음 플로우로 보내고, delcareOutputField에서 데이타에 대한 필드를 정의하면 된다.

데이타를 생성하는 Spout 데이타를 처리 하는 Bolt 구현했으면 둘을 연결 시켜줘야 한다. 이를 연결시켜주는 것이 Topology인데, HelloTopologyLocal 클래스를 구현해 보자


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

public class HelloTopologyLocal {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("HelloTopologyLocal", conf,builder.createTopology());

               Utils.sleep(10000);

               // kill the LearningStormTopology

               cluster.killTopology("HelloTopologyLocal");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

 

}

<그림. HelloTolologyLocal.java>


나중에 개념에서 자세하 설명하겠지만, Topology 데이타를 생성하는 Spout 처리하는 Bolt간에 토폴로지 데이타 흐름을 정의하는 부분이다. Spout Bolt들을 묶어 주는 부분이다.

먼저 TopologyBuilder 이용해서 Topology 생성하고, setSpout 이용해서 앞에서 구현한 HelloSpout 연결한다.

다음으로, setBolt 이용해서 Bolt Topology 연결한다. 후에, HelloSpout HelloBolt 연결해야 하는데, setBolt시에, SuffleGrouping 메서드를 이용하여, HelloBolt HelloSpout으로 부터 생성되는 데이타를 읽어들임을 명시한다.

builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

이렇게 Topology 구성되었으면이 Topology 실제로 실행해야 하는데, Topology 어떤 서버에서 어떤 포트등을 이용해서 실행될지는 Config 정의할 있지만, 여기서는 간단한 테스트이기  때문에 별도의 복잡한 Config 정보는 기술하지 않았다.

다음으로 이렇게 만들어진 Topology Storm 클러스터에 배포해야 하는데, Storm 개발의 편의를 위해서 두가지 형태의 클러스터를 제공한다. 개발용 클러스터와 실운영 환경용 클러스터를 제공하는데, 여기서는 LocalCluster cluster = new LocalCluster();

라는  것을 사용하였다.

LocalCluster 개발환경용 클러스터로, 개발자의 환경에서 최소한의 서버들만을 기동하여 개발한 토폴로지를 테스트할 있게 해준다. 이렇게 Cluster 생성했으면 cluster.submitTopology 이용하여 개발한 토폴로지를 배포한다. 토폴로지가 배포되면 자동으로 토폴로지가 실행이 된다. HelloSpout 계속해서 데이타를 생성하고, HelloBolt 생성된 데이타를 받아서 System.out.println으로 출력하게 되는데, 10초후에 멈추게 하기 위해서, Sleep 10초를 준다. 토폴로지 코드를 실행하는 쓰레드는 Sleep으로 빠질지 모르지만 토폴로지에서 생성된 HelloSpout HelloBolt 쓰레드는 백그라운드에서 작업을 계속 진행한다.

10초후에는 killTopology 이용해서 해당 토폴로지를 제거하고 shutdown 이용해서 Storm 클러스터를 종료시킨다.

실행하기

여기까지 구현했으면 첫번째 Storm 프로그램을 기동해보자. 다음과 같이 maven 명령어를 이용하면 실행이 가능하다.

C:\dev\ws\java_workspace\com.terry.storm>mvn exec:java -Dexec.mainClass=com.terry.storm.hellostorm.HelloTopologyLocal -Dexec.classpath.Scope=compile

실행을 해보면, HelloSpout 데이타를 생성하고, HelloBolt 이를 받아서 화면에 출력하는 것을 있다.


6292 [Thread-16-HelloSpout] INFO  backtype.storm.daemon.task - Emitting: HelloSpout default [hello world]

6292 [Thread-22-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello world]

Tuple value ishello world

6292 [Thread-10-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello world]

Tuple value ishello world

ZooKeeper 에러 대응하기

종종 환경에 따라서 실행이 안되면서 다음과 같은 에러가 출력되는 경우가 있는데


3629 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@7bfd25ce

3649 [main-SendThread(0:0:0:0:0:0:0:1:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (java.lang.SecurityException: 로그인 구성을 찾을 없습니다.)

3650 [main-SendThread(0:0:0:0:0:0:0:1:2000)] ERROR org.apache.storm.zookeeper.ClientCnxnSocketNIO - Unable to open socket to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000

3655 [main-SendThread(0:0:0:0:0:0:0:1:2000)] WARN  org.apache.storm.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.SocketException: Address family not supported by protocol family: connect

        at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_37]

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_37]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003) ~[storm-core-0.9.3.jar:0.9.3]


이 에러는 Storm Zookeeper와 연결을 할 수 없어서 나는 에러인데, LocalCluster 모드로 기동할 경우,Storm embedded Zookeeper를 기동해서 이 Zookeeper와 연결되어야 하나. IPV6로 연결을 시도하기 때문에, (ZK IPV4 Listen하는데) 발생하는 문제로

java로 실행할때 "-Djava.net.preferIPv4Stack=true" 옵션을 주면, JVM IPV6를 사용하지 않고, V4를 사용하기 때문에, ZooKeeper IPV4로 뜨고, Storm IPV4로 연결을 시도하기 때문에 문제가 없어진다.

지금까지 간단하게나마 첫번째 Storm 프로그램을 작성해서 실행해보았다.

다음에는 Storm 이루는 컴포넌트 구조와 아키텍쳐에 대해서 설명하도록 한다