블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


데이타 플로우 개발환경 설정하기


조대협 (http://bcho.tistory.com)


데이타 플로우에 대한 이해가 끝났으면 이제 직접 코딩을 해보자. 데이타 플로우에 대한 개념등은 http://bcho.tistory.com/search/dataflow 를 참고하기 바란다.

데이타 플로우에서 지원하는 프로그래밍 언어는 자바와 파이썬이다. 파이썬은 아직 알파버전으로, 이 글에서는 자바를 이용해서 설명한다.


자바를 이용한 개발환경 설정은 이클립스 개발환경과 maven을 이용한 개발 환경 두가지가 있는데, 여기서는 조금 더 손 쉬운 이클립스 환경을 기준으로 설명한다.

메이븐 기반의 개발 환경 설정은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven 를 참고하기 바란다.


사전준비

클라우드 계정 생성 및 빌링 설정

구글 클라우드 계정 생성 및 빌링 설정 방법은 앞서 다른글에서도 많이 설명하였기 때문에 다시 설명하지 않는다. 자세한 내용은 http://bcho.tistory.com/1107 를 참고하기 바란다.

API 사용 설정하기

다음 데이타플로우와 기타 같이 사용할 제품들의 API를 사용하기 위해서 이를 설정해줘야 한다.

구글 클라우드 콘솔에서 API Manager를 선택한후 대쉬 보드에서 아래 서비스들을 선택하여 API를 Enable 해준다. Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Datastore APIs.





구글 Cloud SDK 설정

구글 데이타 플로우를 프로그래밍 하기 위해서, 데이타 플로우 API를 호출하기 위한 SDK와 조작을 위한 CLI (Command Line Interface)가 필요한데, 이는 구글 Cloud SDK를 설치하면 같이 설치가 된다.

클라우드 SDK 설치는 https://cloud.google.com/sdk/docs/ 를 참고하면 된다.

gcloud 인증하기

구글 Cloud SDK 설치가 끝났으면, gcloud 명령어를 사용하기 위해서 gcloud 명령어를 초기화 한다.

초기화는 어떤 구글 클라우드 프로젝트를 사용할것인지, 그리고 사용자 아이디등으로 인증을 하는 절차를 거친다.

프롬프트 상에서

%gcloud init

명령을 실행하여, 수행한다.

이클립스 환경 설정

이제 구글 클라우드 프로젝트 설정과, 이를 호출하기 위한 SDK 환경 설치가 끝났다. 이제 이클립스 기반의 개발 환경을 설정해보자.

이클립스 설치하기

이클립스는 4.4 버전 이상을 설치하고, JDK는 1.7 이상을 설정한다.

플러그인 설치하기

다음 구글 데이타 플로우 개발환경을 위한 이클립스 플러그인을 설치한다.

이클립스에서 Help > Install New Software를 선택한 다음에, Work with 텍스트 박스에  https://dl.google.com/dataflow/eclipse/  을 입력한다.


다음으로 Google Cloud Dataflow를 선택하여 설치를 진행한다.

설치가 끝난 후 확인은 이클립스에서 New > Project를 하면, 위자드를 선택하는 화면에서 아래와 같이 Google Cloud Platform이라는 폴더와 함께 그 안에 “Cloud Dataflow Java Project”를 선택할 수 있는 화면이 나온것을 볼 수 있다.



헬로우 데이타 플로우

개발 환경 설정이 끝났으니, 이제 간단한 데이타 플로우 프로그램을 하나 만들어보자.

이 프로그램은 단어들을 읽어드린 후에, 단어들의 발생 횟수를 카운트 해 주는 파이프라인이다.



단어들을 읽어드린 후 toUpper라는 트랜스폼에서, 각 단어들을 대문자로 변환한 후, Count라는 트랜스폼에서 단어별로 발생횔 수를 카운트 한후에, 이를 Key Value (단어:발생횟수)로 리턴한 후, Print라는 트랜스폼에서 화면으로 결과를 출력해주는 예제이다.


프로젝트 생성

예제 파이프라인을 만들기 위해서, 이클립스에서 프로젝트를 생성해보자. New > Project를 선택한 후 에, 아래 그림과 같이 Google Cloud Platform 폴더에서 Cloud Dataflow Java Project를 선택한다



다음 프로젝트에 대해서  Group ID, Artifact ID 그리고 패키지 명등을 입력한다.



다음 메뉴로 넘어가면 구글 데이타 플로우를 실행하기 위한 디테일한 정보를 넣어야 하는데,




프로젝트 명과, “Cloud Storage Staging Location”이라는 정보를 입력해야 한다. Cloud Storage Staging Location은 Google Cloud Storage 의 버킷명으로, 데이타 플로우 애플리케이션 코드가 로딩 되는 장소이다.

데이타플로우 애플리케이션을 구글 클라우드에서 실행하게 되면, 애플리케이션 코드와 애플리케이션을 실행하기 위한 라이브러리들이 각각의 워커 노드로 배포 되는데, 배포를 위해서 먼저 클라이언트에서 부터, 이러한 실행 코드를 Google Cloud Storage에 올려놓게 된다. 앞에서 정의하는 “Cloud Storage Staging Location”은, 이 클라우드 스토리지 버킷에 대한 경로 정의이다.

클라우드 스토리지 버킷은 아래와 같인 Google Cloud Storage 메뉴에서 아래와 같이 생성할 수 있다.


코드 제작

그러면 코드를 작성해 보자.



package com.terry.df;


import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;

import com.google.cloud.dataflow.sdk.values.KV;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


public class StarterPipeline {

 private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);


 public static void main(String[] args) {

   Pipeline p = Pipeline.create(

       PipelineOptionsFactory.fromArgs(args).withValidation().create());


   p.apply(Create.of("Hello", "World","hello","boy","hello","girl"))

   .apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))

   .apply(Count.<String>perElement())

   .apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>(){

@Override

public void processElement(ProcessContext c) throws Exception {

LOG.info(c.element().getKey() + " count:"+c.element().getValue());

}

   }));


   p.run();

 }

}



(참고 : 위의 소스코드는 https://github.com/bwcho75/googledataflow/tree/master/HelloDataFlow 에 있다.)


처음 p.apply(Create.of…)에서, 데이타를 생성하였다.

다음으로 .apply(ParDo.named("toUpper").of(new DoFn<String, String>() 에서 소문자를 대문자로 다 치완하는 데, ParDo는 이 작업을 여러 노드에서 병렬로 실행하겠다는 선언이고, named는 이 트랜스폼의 이름을 “toUpper”로 정의하겠다는 정의이다. (나중에 디버깅에 유용한다.) 다음으로, 트랜스폼 함수는 DoFn으로 정의했는데, <String,String>으로 정의되어 앞의 인자가 Input 그리고 뒤의 인자가 Output의 데이타 형으로 String 인자를 받아서, String 인자로 리턴하겠다는 것이다.


.apply(Count.<String>perElement()) 은 데이타플로우에서 미리 정의된, 트랜스폼으로,  <String>으로 된 데이타를 받아서 엘리먼트당 카운트를 해서 <String,Long> 형으로 리턴을 해준다. 즉 String형의 단어마다 카운트를 한 결과를 Long형으로 넣어서 이를 키밸류(KV)형식으로 묶어서 리턴해준다.

.apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>() 에서는 앞에서 전달해준  String,Long형이 키밸류형으로 정의된 KV<String,Long>형의 데이타를 받아서, 출력해주고, 마지막 트랜스폼이기 때문에 더 이상 뒤로 데이타를 넘기지 않을 것이기 때문에, Output의 인지 타입을 Void로 선언하였다.

실행

코드를 작성이 끝났으면 실제로 실행해보자 Run As에서 Dataflow Pipeline을 선택하면 실행을 할 수 있다.



이때 다음과 같이 실행환경을 설정할 수 있다.



여기서 Runner에 대한 개념을 짚고 넘어갈 필요가 있다.

Direct Pipeline Runner

Direct Pipeline Runner는 데이타플로우 코드를 로컬 개발 환경 (노트북이나 데스크탑)에서 실행하고자 할때 선택할 수 있는 러너이다. 주로 개발이나 테스트에서 사용할 수 있는데, 다른 클라우드 서비스 예를 들어  Pub/Sub이나 빅쿼리등이랑 연동이 되는 파이프라인의 경우에는 DirectPipelineRunner를 사용할 수 없으니 주의하기 바란다.

DataflowPipelineRunner

클라우드 환경에서 데이타 플로우를 실행하기 위해서는 DataflowPipelineRunner와  BlockingDataflowPipelineRunner 두 가지가 있다.

DataflowPipelineRunner는 데이타 플로우 애플리케이션을 구글 클라우드에서 실행을 해주는데, 데이타 플로우 잡을 클라우드에서 실행해놓고, 로컬 애플리케이션을 바로 종료 한다. (클라우드에 접수된 잡은 클라우드에서 계속 실행된다.)

BlockingDataflowPipelineRunner

BlockingDataflowPipelineRunner는 데이타 플로우잡을 구글 클라우드에서 실행해놓게 해놓고, 잡이 끝날때 까지 로컬 애플리케이션을 대기하도록 한다.  

배치와 같이 끝이 있는 경우에는 필요에 따라서 사용할 수 있다. 스트리밍의 경우 BlockingDataflowPipelineRunner를 사용하게 되면 스트리밍 잡을 명시적으로 끊지 않는 이상 계속해서 로컬 애플리케이션이 실행되는 상태가 된다.


DirectPipelineRunner로 실행을 해보면 다음과 같이 이클립스 콘솔에서 결과가 출력되는 것을 볼 수 있다.


BODY는 1,  GIRL 은 1, HELLO는 3개 그리고 WORLD는 1개가 출력되는 것을 볼 수 있다.


이번에는 클라우드에 배포를 하고 실행해보자, Run As에서, BlockingDataflowPipelineRunner를 선택하여 실행해보자.

실행을 하면 코드가 자동으로 클라우드로 배포 되서 실행되는 것을 확인할 수 있다. 구글 클라우드 콘솔의 데이타 플로우 메뉴를 보면, 새로운 잡이 생성된것을 확인할 수 있다.


해당 잡을 선택해서 들어가 보면 현재 잡의 실행 상황과 함께, 파이프라인에서 단계별 시간이나 기타 상세한 로그를 볼 수 있다.



데이타 플로우 애플리케이션이 기동이 완료되면, Logs 메뉴에서 Worker Logs라는 버튼을 누르면 각 워커 노드에서의 로그를 볼 수 있다.


Worker Logs를 누르면 다음과 같이  GIRL,WORLD,BOY,HELLO에 대한 count 수를 출력한 로그를 확인할 수 있다.


참고 : Logs 메뉴로 들어가서  Job Logs에서  Minimum serverity를 “All” 로 선택하면 전체 작업 상황을 알 수 있는데, 애플리케이션을 실행했다고 바로 데이타 플로우의 파이프라인이 실행되는 것이 아니라, 애플리케이션 코드가 구글 클라우드 스토리에 로드되고, 이 로드된 코드들이 각각의 워커 노드에 배포가 된후에, 워커 노드가 기동이 되야 잡이 실제로 수행된다.


워커가 제대로 기동되었는지는 Job Logs에서 Mimimum serverity를 All로 한후에 다음과 같이 “Worker have started successfully”라는 로그가 나오면 그때 부터 데이타 플로우 잡을 실행을 시작한다고 생각하면 된다.








데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다.

대충보는 Storm #6-Apache Storm 그룹핑 개념 이해하기

 조대협 (http://bcho.tistory.com)



지금까지 컴포넌트간의 경로 라우팅, 즉 Spout 에서 Bolt간, Bolt에서 Bolt간 경로를 설정하는 방법에 대해서 알아보왔다.

그렇다면 각 컴포넌트간 라우팅을 할때 그 안에 있는 Task간에는 어떻게 상세하게 라우팅이 될까? Storm에서는 이 Task간의 라우팅을 정의하기 위해서 Grouping이라는 개념을 사용한다.


Shuffling

가장 간단한 라우팅 방법으로 Bolt A에서 Bolt B로 라우팅을 한다고 했을때, Bolt A내의 있는 Task가 Bolt B에 있는 Task중 아무 Task로 임의로(랜덤하게) 라우팅 하는 방식이다.




Field

Bolt A에 있는 Task에서 Bolt B에 있는 Task로 라우팅을 할때, 규칙성을 갖는 것중 하나인데, 보내고자 하는 데이타의 특정 필드에 있는 값을 기준으로 Bolt B에 있는 특정 Task로 라우팅 하는 방식이다. 라우팅 기준은 지정한 필드의 값을 가지고 해쉬를 계산해서 해쉬에 따라서 Bolt B에 있는 Task로 라우팅 시키는 방식이다.

예를 들어, Bolt B에 Task가 3개가 있다고 가정할때, 나이라는 필드로 “Field Grouping”을 한다고 하면, 나이/3으로 나눈 나머지 값에 따라서 Task A,B,C로 라우팅 하는 방식이다. (나눗셈은 설명을 쉽게 하기 위해 예를 들었지만 비슷한 원리로 해쉬를 계산하여 라우팅을 한다.)

Bolt에서 로컬 캐쉬를 사용하거나 할때, 같은 해쉬의 데이타가 같은 Task로 라우팅이 되게 해서 캐쉬 히트율을 높이는 것등에 유용하게 사용될 수 있다.



Global 

Global 그룹핑은 모으는 개념(Aggregation)의 개념이다. Bolt A의 어느 Task에서 메세지를 보내더라도 항상Bolt B똑같은 하나의 Task로 라우팅이 되는 방식으로, Bolt B에 있는 Task중에서 Task ID가 가장 작은 특정 Task로만 라우팅을 한다.

분산해서 연산한 값을 모두 모아서 합산을 한다던가등에 사용할 수 있다.



All

All 그룹핑은 일종의 브로드 캐스트 개념으로 Bolt A의 하나의 Task가 메세지를 전송하면 Bolt B의 모든 Task가 메세지를 받는 형태이다.

각 Task들에 설정 변경등을 넘길때 유용하게 사용될 수 있다.





Direct

당연히 있을 것으로 생각했겠지만 당연히 있는 기능이다. Bolt A의 Task에서 Bolt B의 특정 Task로 명시적으로 라우팅을 지정하는 기능이다. 이때 주의할점이 Bolt B의 Task를 지정할때,  Task Id가 아니라 Task의 Index로 타겟을 지정한다. 예를 들어 Bolt B에 Task가 5개가 있을때, 0번, 1번식으로 타켓을 지정하게 된다.





Custom

Custom 그룹핑은 라우팅 로직을 개발자가 직접 작성해서 넣는 방식이다.


Local or Shuffle

다소 주의 깊게 볼 필요가 있는 그룹핑 방식이다. 기본적인 동작 방식은 Shuffle과 다르지 않으나,

Bolt A에서 Bolt B의 Task로 라우팅을 할때, Bolt A에서 메세지를 보내는 Task와 같은 JVM 인스턴스 (Woker)에 Bolt B의 Task가 있을 경우 같은 JVM 인스턴스에 있는 Task로 우선 라우팅을 한다. 이는 네트워크를 이용한 리모트 호출을 줄이기 위한 방법이다.

그러면 Bolt의 Task들은 각 Worker에 어떻게 배치 될것인가에 대한 질문이 올 수 있는데, 이렇게 Task를 Worker에 배치하는 행위를Scheduling(스케줄러)라고 하고, 배치를 하는 주체를 Scheduler라고 한다. 자료를 몇개 찾아봤지만 Scheduling 정책에 대해서는 명확하게 나와 있지 않고, 무작위 적으로 배치하는 것으로 보이는데, 조금 더 research가 필요할듯. 


참고 :Pluggable Scheduler

애플리케이션 성격에 맞게 스케쥴링 정책을 구현해서 사용할 수 있는데, 이를 Pluggable Scheduler라고 한다.

http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/ 의 예에 나와 있는 시나리오를 보면, 특정 Spout의 경우에는 상용 소프트웨어를 사용하는데, 이 상용 소프트웨어는 Machine당 라이센스를 가지고 있기 때문에 이 Spout은 반드시 라이센스가 설치된 서버에 스케쥴링(배포)되어야 한다. 그래서 특정 스케쥴링 정책이 필요한데, 첨부 링크에 있는 내용은 Pluggable scheduler를 구현하는 방법에 대해서 설명하고 있다.





대충보는 Storm #3-Storm 싱글 클러스터 노드 설치 및 배포

조대협(http://bcho.tistory.com)

 

지난번에는 간략하게, Storm을 이용한 HelloStorm 애플리케이션을 개발용 클러스터인 Local Cluster에서 구동해봤다. 이번에는 운영용 클러스터를 설정하고, 이 운영 클러스터에 지난번에 작성한 HelloStorm 토폴리지를 배포해보도록 한다.

Storm 클러스터의 기본 구조

Storm 클러스터를 기동하기 전에, 클러스터가 어떤 노드들로 구성이 되는지 먼저 알아보도록 하자 Storm 클러스터는 기본적으로 아래와 같은 3가지 구성요소로 구성이 되어 있다.

먼저 주요 노드인 Nimbus Supervior 노드에 대해서 알아보자, Nimbus Supervisor 노드는 각각 하나의 물리 서버로 생각하면 된다.


Nimbus

Nimbus는 마스터 노드로 주요 설정 정보를 가지고 있으며, Nimbus 노드를 통해서 프로그래밍 된 토폴로지를 Supervisor 노드로 배포한다. 일종의 중앙 컨트롤러로 생각하면 된다. Storm에서는 중앙의 하나의 Nimbus 노드만을 유지한다.

Supervisor

Supervisor 노드는 실제 워커 노드로, Nimbus로 부터 프로그램을 배포 받아서 탑재하고, Nimbus로 부터 배정된 작업을 실행하는 역할을 한다. 하나의 클러스터에는 여러개의 Supervisor 노드를 가질 수 있으며, 이를 통해서 여러개의 서버를 통해서 작업을 분산 처리할 수 있다.

Zookeeper

이렇게 여러개의 Supervisor를 관리하기 위해서, Storm Zookeeper를 통해서 각 노드의 상태를 모니터링 하고, 작업의 상태들을 공유한다.

Zookeeper는 아파치 오픈소스 프로젝트의 하나로, 분산 클러스터의 노드들의 상태를 체크하고 공유 정보를 관리하기 위한 분산 코디네이터 솔루션이다.

전체적인 클러스터의 구조를 살펴보면 다음과 같다.



<그림. Storm 클러스터의 구조>

하나의 싱글 머신에 Nimbus가 설치되고, 다른 각각의 머신에 Supervisor가 하나씩 설치된다. 그리고, ZooKeeper 클러스터를 통해서 기동이 된다.

※ 실제 물리 배포 구조에서는 Nimbus Supervisor, ZooKeeper등을 하나의 서버에 분산배포할 수 도 있고, 여러가지 다양한 배포구조를 취할 수 있으나, Supervisor의 경우에는 하나의 서버에 하나의 Supervisor만을 설치하는 것을 권장한다. Supervisor의 역할이 하나의 물리서버에 대한 작업을 관리하는 역할이기 때문에, 한 서버에 여러 Supervisor를 설치하는 것은 적절하지 않다.

 

설치와 기동

Storm 클러스터를 기동하기 위해서는 앞에서 설명한바와 같이 ZooKeeper가 필요하다. Zookeeper를 다운로드 받은 후에, ~/conf/zoo_sample.cfg 파일을 ~/conf/zoo.cfg로 복사한다.

다음으로 ZooKeeper를 실행한다.

% $ZooKeeper_HOME/bin/zkServer.cmd


<그림. 주키퍼 기동 로드>


다음으로 Nimbus 노드를 실행해보자. Storm을 다운 받은 후 압축을 푼다.

다음 $APACHE_STORM/bin 디렉토리에서

%storm nimbus

를 실행하면 nimbus 노드가 실행된다. 실행 결과나 에러는 $APACHE_STORM/logs 디렉토리에 nimbus.log라는 파일로 기록이 된다.

정상적으로 nimbus 노드가 기동이 되었으면 이번에는 supervisor 노드를 기동한다.

%storm supervisor

Supervisor에 대한 노드는 $APACHE_STORM/logs/supervisor.log 라는 이름으로 기록된다.

Storm은 자체적으로 클러서터를 모니터링 할 수 있는 UI 콘솔을 가지고 있다. UI를 기동하기 위해서는

%storm ui

로 실행을 해주면 UI가 기동이 되며 http://localhost:8080 에 접속을 하면 관리 콘솔을 통해서 현재 storm의 작동 상태를 볼 수 있다.



<그림. Storm UI를 이용한 기동 상태 모니터링>


현재 하나의 PC nimbus supervior,UI를 모두 배포하였기 때문에 다음과 같은 물리적인 토폴로지가 된다.



<그림. 싱글 서버에 nimbus supervisor를 같이 설치한 예>

싱글 클러스터 노드에 배포 하기

싱글 노드 클러스터를 구축했으니, 앞의 1장에서 만든 HelloStorm 토폴로지를 이 클러스터에 배포해보도록 하자.

전장의 예제에서 만든 토폴로지는 Local Cluster를 생성해서, 자체적으로 개발 테스트용 클러스터에 토폴로지를 배포하도록 하는 코드였다면, 이번에는 앞에서 생성한 Storm 클러스터에 배포할 수 있는 토폴로지를 다시 만들어야 한다. 이 코드의 차이는 기존 코드와는 다르게 LocalCluster를 생성하지 않고, 기동중인 클러스터에 HelloTopoloy Submit하도록 한다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<그림. HelloTopology.java>


토폴로지 클래스를 만들었으면 이를 빌드해보자

%mvn clean install

을 실행하면 ~/target 디렉토리에 토폴로지 jar가 생성된것을 확인할 수 있다.



jar 파일을 배포해보도록 하자.

배포는 storm {jar} {jar파일명} {토폴로지 클래스명} {토폴로지 이름} 명령을 실행하면 된다.

% storm jar hellostorm-0.0.1-SNAPSHOT.jar com.terry.storm.hellostorm.HelloTopology HelloTopology

배포 명령을 내리면 $APACHE_STORM_HOME/logs/nimbus.log에 다음과 같이 HelloTopology가 배포되는 것을 확인할 수 있다.


2015-01-25T07:35:03.352+0900 b.s.d.nimbus [INFO] Uploading file from client to storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.365+0900 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.443+0900 b.s.d.nimbus [INFO] Received topology submission for HelloTopology with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "HelloTopology", "storm.id" "HelloTopology-1-1422138903"}

2015-01-25T07:35:03.507+0900 b.s.d.nimbus [INFO] Activating HelloTopology: HelloTopology-1-1422138903

2015-01-25T07:35:03.606+0900 b.s.s.EvenScheduler [INFO] Available slots: (["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6702] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6701] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6700])

2015-01-25T07:35:03.652+0900 b.s.d.nimbus [INFO] Setting new assignment for topology id HelloTopology-1-1422138903: #backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local\\nimbus\\stormdist\\HelloTopology-1-1422138903", :node->host {"226ceb74-c1a3-4b1a-aab5-2384e68124c5" "terry-PC"}, :executor->node+port {[3 3] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [6 6] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [5 5] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [4 4] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [7 7] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [2 2] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [1 1] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703]}, :executor->start-time-secs {[7 7] 1422138903, [6 6] 1422138903, [5 5] 1422138903, [4 4] 1422138903, [3 3] 1422138903, [2 2] 1422138903, [1 1] 1422138903}}

2015-01-25T07:37:48.901+0900 b.s.d.nimbus [INFO] Updated HelloTopology-1-1422138903 with status {:type :inactive}

해당 토폴로지가 배포되었는지를 확인하려면 storm list라는 명령어를 사용하면 현재 기동되고 있는 토폴로지 목록을 확인할 수 있다.




<그림. storm list 명령으로 기동중인 토폴로지를 확인>


실행 결과는 $APACHE_STORM_HOME/logs 디렉토리를 보면 worker-xxx.logs 라는 파일이 생긴것을 확인해 볼 수 있는데, 파일 내용을 보면 다음과 같다.

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

우리가 앞서 구현한 Bolt에서 System.out으로 출력한 내용이 출력된다.

동작을 확인하였으면, 이제 기동중인 토폴로지를 정지 시켜 보자. 토폴로지의 정지는 storm deactivate {토폴로지명} 을 사용하면 된다. 아까 배포한 토폴로지 명이 HelloTopology였기 때문에 다음과 같이 토폴로지를 정지 시킨다.

%storm deactivate HelloTopology

그 후에 다시 storm list 명령을 이용해서 토폴로지 동작 상태를 확인해보면 다음과 같다.



< 그림. Storm  토폴로지 정지와 확인 >


만약에 Topology를 재 배포 하려면 storm kill로 해당 토폴로지를 삭제한 후에, 다시 배포 해야 한다. 이때 주의할점은 storm kill로 삭제해도 바로 삭제가 안되고 시간 텀이 있으니 약간 시간을 두고 재 배포를 해야 한다.


지금까지 간단하게, 운영용 클러스터를 구성하고, 운영 클러스터에 토폴로지를 배포 하는 것에 대해서 알아보았다.

HelloStorm 코드 구현, 클러스터 노드 구축 및 배포를 통해서 간단하게 스톰이 무엇을 하는 것인지는 파악했을 것으로 안다. 다음 장에는 조금 더 구체적으로 Storm의 개념과 스톰의 아키텍쳐등에 대해서 살펴보도록 하겠다.