블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

데이타 플로우 프로그래밍 모델의 이해


조대협 (http://bcho.tistory.com)


앞의 글에서 스트리밍 프로세스의 개념과, 데이타 플로우의 스트리밍 처리 개념에 대해서 알아보았다. 그렇다면 실제로 이를 데이타 플로우를 이용해서 구현을 하기 위해서는 어떤 컴포넌트와 프로그래밍 모델을 사용하는지에 대해서 알아보자.


구글 데이타 플로우 프로그래밍 모델은 앞에서 설명한 바와 같이, 전체 데이타 파이프라인을 정의하는 Pipeline, 데이타를 저장하는 PCollections, 데이타를 외부 저장소에서 부터 읽거나 쓰는 Pipeline I/O, 그리고, 입력 데이타를 가공해서 출력해주는 Transforms , 총 4가지 컴포넌트로 구성이 되어 있다.


이번 글에서는 그 중에서 데이타를 가공하는  Transforms 컴포넌트들에 대해서 알아본다.

Transforms

Transforms는 데이타를 어떻게 가공하느냐에 따라서 다음 그림과 같이 세가지 형태로 분류된다.

Element-Wise

개별 데이타 엘리먼트를 단위로 연산을 수행하는 것을 Element-Wise Transform이라고 한다.

ParDo 함수가 이에 해당하며, 하나의 데이타를 입력 받아서, 연산을 한 후, 하나의 출력을 내보내는 연산이다.

ParDo 함수는 DoFn으로 정의된 연산 로직을 병렬로 여러개의 프로세스와 쓰레드에서 나눠서 처리를 해준다.


DoFn 클래스 는 다음과 같은 포맷으로 정의한다.


static class MyFunction extends DoFn<{입력 데이타 타입},{출력 데이타 타입}>{

     @Override

     public void processElement(ProcessContext c) {

       // do Something

     }

}


DoFn 클래스를 상속 받아서 클래스를 정의하고, 클래스 정의시 제네릭 타입 (Generic type)으로, 첫번째는 입력 데이타 타입을 두번째는 출력 데이타 타입을 정의한다.

그리고 processElement 함수를 오버라이드(Override)해서, 이 함수안에 연산 로직을 구현한다.

processElement는 ProcessContext를 인자로 받는데, 이 Context에는 입력 데이타를 포함하고 있다.

입력 데이타를 꺼내려면 c.element()라는 메서드를 이용하면, 입력 데이타 타입으로 정의된 입력 데이타를 꺼낼 수 있다.

데이타를 처리한 다음에 파이프라인상의 다음 컴포넌트로 데이타를 보내려면 c.output ({출력 데이타} ) 형태로 정의를 해주면 된다.


이렇게 정의된 DoFn함수는  ParDo를 이용하여 파이프라인 상에서 병렬로 실행될 수 있다. ParDo는, 파이프라인상에서 .apply 메서드를 이용하여 적용한다.


그러면 실제로 어떻게 적용이 되는지 다음 예제를 보자.

   p.apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))


String 인자를 입력으로 받아서, String 인자로 출력을 하는 DoFn 함수를 정의하였다.

processElement 부분에서, c.element()로 String 입력 값을 읽은 후, toUpperCase() 함수를 적용하여 대문자로 변환을 한 후, c.output을 이용하여 다음 파이프라인으로 출력 데이타를 넘겼다.


조금 더 디테일한 예제를 보자

p.apply(Create.of("key1,Hello", "key2,World","key1,hello","key3,boy","key4,hello","key2,girl"))

.apply(ParDo.named("Parse").of(new DoFn<String, KV<String,String>>() {

@Override

public void processElement(ProcessContext c) {

StringTokenizer st = new StringTokenizer(c.element(),",");

String key = st.nextToken();

String value = st.nextToken();


KV<String,String> outputValue =  KV.of(key,value);

c.output(outputValue);

}

}))

Create.of를 이용하여 “Key.Value” 문자열 형태로 데이타를 생성한 후, 이 문자열을 읽어서, DoFn<String,KV<String,String>> 클래스에서 이를 파싱하여, 문자열을 키밸류 데이타형인 KV 데이타 형으로 변환하여 리턴하는 예제이다. 아래 개념도를 보자


입력 값은 String 데이타 타입으로 “Key.Value”라는 형태의 문자열을 받는다.

DoFn에서 처리한 출력 값은 KV 형으로 KV 데이타 타입은 키와 값을 가지고 있는데, 키와 값의 타입도 제네릭 타입으로 키는 String, 값은 String 타입으로 정의하였다. 입력된 “Key.Value” 문자열은 “.” 전후로 분리가 되서, “.” 좌측은 키로, 우측은 값으로 해서, KV에 각각 들어간다.

processElement를 보면, c.element를 이용하여 String 문자열을 꺼낸 후, StringTokenizer를 이용하여 “.”을 분류 문자로 정의해서 파싱한다. 첫번째를 키로 저장하고, 두번째를 값으로 저장한다.

KV<String,String> outputValue =  KV.of(key,value)

를 이용하여, KV 객체를 생성한 후, c.output(outputValue); 을 이용하여 다음 파이프라인 컴포넌트로 값을 전달한다.


시스템내에서 수행되는 방법은 다음과 같이 된다. ParDo에 의해서 DoFn 클래스가 여러개의 워커에 의해서 분산 처리가된다.

어그리게이션(Aggregation)

어그리게이션 값을 특정 키 값을 이용하여 데이타를 그룹핑을 하는 개념이다.

이러한 어그리게이션 작업은 내부적으로 셔플링(Shuffling)이라는 개념을 이용해서 이루어지는데, 키별로 데이타를 모으거나 키별로 합산등의 계산을 하기 위해서는, 키별로 데이타를 모아서 특정 워커 프로세스로 보내야 한다.

ParDo를 이용하여 병렬 처리를 할 경우, 데이타가 키값에 상관 없이 여러 워커에 걸쳐서 분산되서 처리되기 때문에, 이를 재 정렬해야 하는데, 이 재 정렬 작업을 셔플링이라고 한다.


아래 그림을 보자, 파이프라인 상에서 첫번째 프로세스를 Worker 1과 Worker 2가 처리를 하였고, 결과는 Key1과  Key2를 키로 가지는 데이타라고 하자, 이를 어그리게이션 하면 아래 그림과 같이 Key1 데이타는 Worker 3로 모이고, Key 2 데이타는 Worker 4로 모인다. 이런 방식으로 셔플링이 이루어진다.


데이타 플로우의 어그리게이션 에는 특정 키별로 데이타를 묶어주는 Grouping과, 특정 키별로 데이타를 연산(합이나 평균을 내는)하는  Combine 두 가지가 있다.

Grouping

PCollection<KV<String, Integer>> wordsAndLines = ...;

에서 다음과 같이 String,Integer 페어로 KV 타입을 리턴한다고 하자.


 cat, 1
 dog, 5
 and, 1
 jump, 3
 tree, 2
 cat, 5
 dog, 2
 and, 2
 cat, 9
 and, 6
 ...


이를 다음과 같이 키에 따라서 밸류들을 그룹핑 하려면 GroupByKey를 사용해야 한다.


 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(

   GroupByKey.<String, Integer>create());


코드는 앞 단계에서 KV<String,Integer>로 들어온 wordLines 데이타를 GroupByKey를 이용하여 Key 단위로 그룹핑을 한후 이를 <String, Iterable<Integer>> 타입으로 리턴하는 Transformation이다.

<String, Iterable<Integer>>에서 앞의 String은 키가 되며, Iterable 다수의 값을 가지고 있는 밸류가 된다. 이 밸류는 Integer 타입으로 정의된다.

Combine

Grouping이 키 단위로 데이타를 묶어서 분류해주는 기능이라면, Combine은 키단위로 데이타를 묶어서 연산을 해주는 기능이다.

예를 들어 앞의 예제처럼, “cat”이라는 문자열 키로 된 데이타들이 [1,5,9] 가 있을때, 이에 대한 총합이나 평균등을 내는 것이 Combine 이다.


PCollection<Integer> pc = ...;

 PCollection<Integer> sum = pc.apply(

   Combine.globally(new Sum.SumIntegerFn()));


코드는 Integer로 들어오는 모든 값을 Combine에서 Sum 기능을 이용하여 모든 값을 더하는 코드이다.

전체 데이타에 대해서 적용하기 때문에, Combine.globally로 적용하였다.


아래와 같은 형태의 데이타가 있다고 가정하자. 키에 따라서 값이 그룹핑이 된 형태이다.

 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Integer>> occurrences = ...;

 PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());

 PCollection<KV<String, Integer>> firstOccurrences = pc.apply(

   Combine.groupedValues(new Min.MinIntegerFn()));


위의 데이타들이 PCollection<KV<String, Iterable<Integer>>> grouped

에 저장되었다고 할때, 각 키별로 최소값을 구하는 것을 Combine.groupedValue에서 Min을 호출하여 최소값을 구했다.


Transforms 컴포넌트의 기본적인 종류들을 알아보았다. 이외에도, 하나의 Transform 안에 여러개의 Transform을 집어 넣는 Composite Transform이나, 두개 이상의 데이타 스트림에서 데이타를 키에 따라 JOIN하는 기능들이 있는데, 이러한 고급 기능들은 뒤에 고급 프로그래밍 모델에서 설명하기로 한다.

PCollection

PCollection은 데이타 플로우 파이프라인 내에서 데이타를 저장하는 개념이다.

데이타는 이 PCollection 객체에 저장이되서, 파이프라인을 통해서 Transform으로 넘겨진다.

PCollection은 한번 생성이 되면, 그 데이타를 수정이 불가능하다. (데이타를 변경하거나 수정하기 위해서는 PCollection을 새로 생성해야 한다.)

Bounded & Unbounded PCollection

PCollection은 데이타의 종류에 따라 Bounded PCollection과 Unbounded PCollection 두가지로 나뉘어 진다.

Bounded PCollection

Bounded PCollection은 배치 처리 처럼, 데이타가 변화하지 않는 데이타로 파일이나, 업데이트가 더 이상 발생하지 않는 테이블등을 생각하면 된다.

TextIO,BigQueryIO,DataStoreIO등을 이용해서 데이타를 읽은 경우에는 Bounded PCollection으로 처리가 된다.

Bounded PCollection 데이타들은 배치 처리의 특성상 데이타를 한꺼번에 읽어서 처리한다.  

Unbounded PCollection

Unbounded PCollection은, 데이타가 계속 증가 하는 즉 흐르는 데이타를 표현한다. 트위터 타임 라인이나, 스마트 폰에서 서버로 올라오는 이벤트 로그등을 예로 들 수 있다.

이러한 Unbounded PCollection은 시간의 개념을 가지고 윈도우 처리등을 해야하기 때문에, PCollection 객체내에 타임 스탬프가 자동으로 붙는다.

UnBounded PCollection은 데이타를 BigQueryIO또는 Pub/Sub에서 읽을 때 생성된다.

특히 Unbounded PCollection에 Grouping이나, Combine등을 사용할 경우, 데이타가 파이프라인 상에서 언제 그룹핑된 데이타를 다음 Transform 컴포넌트로 넘겨야할지를 정의해야 하기 때문에, Window를 반드시 사용해야 한다.

데이타 타입

PCollection을 이용해서 정의할 수 있는 주요 데이타 타입은 다음과 같다.

일반 데이타 타입

PCollection<Integer> data

가장 기초적인 데이타 형으로, Integer,Float,String 등 자바의 일반 데이타 타입으로 정의되고 하나의 데이타 만을 저장한다.

KV 데이타 타입

PCollection< KV<String,Integer> key_value_data

키 밸류 데이타 타입으로, 키와 값으로 구성된다. 키와 값은 일반 데이타 타입과 같게, 자바의 일반 데이타 타입 사용이 가능하다.


PCollection<KV<String, Iterable<Integer>>>

키 밸류 데이타 타입중에 값에 여러개의 값을 넣을 수 있는 Iterable 타입이 있다.

앞의 Transform 예제에서 언급된것과 같이 키가 cat이고, 그 값이 2,6,7,9 와 같이 여러 값을 가지는 형이 이러한 타입에 해당한다.

커스텀 데이타 타입

단순한 데이타 타입 말고도, 복잡한 데이타 형을 처리하기 위해서  커스텀 데이타 타입을 지원한다.

커스텀 데이타 타입은 오픈 소스 Avro의 데이타 모델을 사용한다. http://avro.apache.org/


예를 들어 어떤 키에 대해서 카운트값,평균,총합 그리고 윈도우의 시작과 끝 시간을 저장하는 데이타 타입 Stat가 있다고 가정하자. 이 데이타 타입은 다음과 같이 정의된다.

자바에서 일반적인 Value Object 형태로 정의를 하면되고, 단 앞에 어노테이션으로 @DefaultCoder(AvroCoder.class) 와 같이 Avro 데이타 클래스임을 정의하면 된다.


@DefaultCoder(AvroCoder.class)

static class Stat{

Float sum;

Float avg;

Integer count;

Integer key;

Instant wStart; // windowStartTime

Instant wEnd; // windowEndTime

public Instant getwStart() {

return wStart;

}

public Instant getwEnd() {

return wEnd;

}


public Float getSum() {

return sum;

}

public Float getAvg() {

return avg;

}

public Integer getCount() {

return count;

}


public Integer getKey(){

return key;

}


public Stat(){}

public Stat(Integer k,Instant start,Instant end,Integer c,Float s,Float a){

this.key = k;

this.count = c;

this.sum = s;

this.avg = a;

this.wStart = start;

this.wEnd = end;

}


}



윈도우

스트리밍 데이타 처리를 할때 가장 중요한 개념이 윈도우이다.

특히나  Unbounded 데이타를 이용한 스트리밍 처리에서 Grouping이나 Combine 시에는 반드시 윈도우를 사용해야 한다.Grouping이나 Combine과 같은 aggregation을 하지 않으면, Unbounded 데이타라도 윈도우 처리가 필요없다.

또한 Bounded 데이타도, 데이타에 타임 스탬프를 추가하면 윈도우를 사용하여, 시간대별 데이타를 처리할 수 있다.

예를 들어 일일 배치를 돌리는 구매 로그가 있을때, 각 데이타의 구매 시간이 있으면, 이 구매시간을 타임 스탬프로 지정하여 배치라도 윈도우 단위로 연산을 할 수 있다.

고정 윈도우 적용

윈도우를 적용하는 방법은 정말 간단하다.

PCollection 객체에 apply 메소드를 이용하여 Window.into 메서드를 이용하여 적용하면된다.

예를 들어서 아래와 같이 PCollection<String> 형 데이타인 items 객체가 있을때, 여기에 윈도우를 적용하려면 다음과 같이 하면 된다.

 PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)));

items.apply를 이용하여 윈도우를 적용하는데, 데이타 타입이 String이기 때문에, Window.<String>into 로 윈도우를 적용하였다.

윈도우 타입은 Fixed 윈도우이기 때문에, FixedWindows.of를 사용하였고, 윈도우 주기는 1분주기라서 1,TimeUnit.MINUTES를 적용하였다.

슬라이딩  윈도우 적용

슬라이딩 윈도우는 윈도우의 크기 (Duration)과 주기를 인자로 넘겨야 한다.

아래 코드는 5초 주기로 30분 크기의 윈도우를 생성하는 예제이다.

 PCollection<String> items = ...;

 PCollection<String> sliding_windowed_items = items.apply(    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

윈도우가 5초마다 생성이되고, 각 윈도우는 30분 단위의 크기를 가지고 있게 된다.

세션 윈도우 적용

세션 윈도우는 HTTP 세션 처럼 특정 사용자가 일정 시간동안 데이타가 올라오지 않으면, 처음 데이타가 올라온 시간 부터 데이타가 올라오지 않은 시간 까지를 윈도우르 묶어주는 개념이다.

앞의 고정 윈도우나, 세션 윈도우와는 다르게 반드시 키별로 세션을 묶기 때문에 키가 필요하다.


아래 예제는 각 사용자 별로 세션당 점수를 계산해주는 예제이다.

userEvents

 .apply(Window.named("WindowIntoSessions")

       .<KV<String, Integer>>into(

             Sessions.withGapDuration(Duration.standardMinutes(Duration.standardMinutes(10))))

   .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))

 // For this use, we care only about the existence of the session, not any particular

 // information aggregated over it, so the following is an efficient way to do that.

 .apply(Combine.perKey(x -> 0))

 // Get the duration per session.

 .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))



다른 윈도우들과 마찬가지로 Window.into를 이용하여 윈도우를 적용하는데, 데이타 형을 잘 보면 <KV<String, Integer>> 형으로 정의된것을 확인할 수 있다.

Sessions.withGapDuration으로 세션 윈도우를 정의한다. 이때 얼마간의 시간 동안 데이타가 없으면 세션으로 짜를지를 지정해줘야 하는데, Duration.standardMinutes(10) 를 이용하여 10분간 해당 사용자에 대해서 데이타가 없으면 해당 사용자(키)에 대해서 세션 윈도우를 자르도록 하였다.

윈도우 시간 조회하기

윈도우를 사용하다보면, 이 윈도우의 시작과 종료 시간이 필요할때가 있다. 예를 들어 고정 크기 윈도우를 적용한다음에, 이를 데이타 베이스등에 저장할때, 이 윈도우가 언제시작해서 언제끝난 윈도우인지를 조회하여 윈도우 시작 시간과 함께 값을 저장하고자 하는 케이스이다. “1시에 몇건, 1시 10분에 몇건의 데이타가 저장되었다”와 같은 시나리오이다.


현재 윈도우에 대한 정보를 얻으려면 DoFn 클래스를 구현할때, com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implement 해야, 윈도우에 대한 정보를 억세스 할 수 있다.


static class StaticsDoFn extends DoFn<KV<Integer,Iterable<Twit>>, KV<Integer,Stat>>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

:


IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

Instant e = w.end();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTime eTime = e.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss");

String str_stime = sTime.toString(dtf);

String str_etime = eTime.toString(dtf);

                                      :


윈도우에 대한 정보는 ProcessContext c에서, c.window()를 호출하면, IntervalWindow라는 클래스로 윈도우에 대한 정보를 보내주고, 윈도우의 시작 시간과 종료 시간은 IntervalWindow 클래스의 start()와 end() 메서드를 이용해서 조회할 수 있다. 이 조회된 윈도우의 시작과 종료 시간은 org.joda.time.Instant 타입으로 리턴되는데, 조금 더 친숙한 DateTime 포맷으로 변환을 하려면, Instant.toDate() 메서드를 사용하면 되고, 이때, TimeZone 을 지정해주면 로컬 타임으로 변환을 하여, 윈도우의 시작과 종료시간을 조회할 수 있다.

타임 스탬프

윈도우는 시간을 기준으로 처리를 하기 때문에, 이 시간을 정의하는 타임스탬프를 어떻게 다루는지를 이해할 필요가 있다.

타임 스탬프 생성

Pub/Sub을 이용하여 unbounded 데이타를 읽을 경우 타임스탬프가 Pub/Sub에 데이타가 들어간 시간을 Event time으로 하여, PCollection에 추가된다.

타임 스탬프 지정하기

Pub/Sub에서와 같이 자동으로 타임 스템프를 부여하는게 아니라, 모바일 디바이스에서 발생한 이벤트 타임이나, 애플리케이션 적으로 PCollection에 직접 타임스탬프를 부여할 수 있다.

PCollection에 타임 스탬프를 부여 하는 방법은 간단하다.

DoFn내에서,  ProcessContext를 다음 파이프라인 컴포넌트로 보낼때,  c.output(데이타) 대신, c.output(데이타, 타임 스탬프)를 사용하면 된다. 타임 스탬프의 데이타 타입은  org.joda.time.Instant 를 사용한다.

예를 들어서

c.outputWithTimestamp(c.element(), logTimeStamp);

와 같은 방법으로 사용을 한다.


모바일 서비스 분석등, 실제 시간에 근접한 분석을 하려면, 로그를 수집할때, 이벤트 발생 시간을 같이 REST API등을 통해서 수집하고, outputWithTimestamp를 이용하여, 수집된 이벤트 발생시간을  PCollection에 추가하는 방식으로 타임 스탬프를 반영 하는 것이 좋다.







데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #1/2


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다. 스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

개념 소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


먼저 dataflow의 개념을 이해해보도록 하자. 아래 그림은 dataflow에 대한 컨셉이다.


데이타가 들어오면, Pipeline IO에서 데이타를 읽어드린다. 읽어드린 데이타는 PCollection이라는 데이타 형으로 생성이 되고, 이 PCollection 데이타는 여러개의 중첩된 PTransform을 통해서 변환 및 가공이 된다. 가공이 끝난 결과는 마지막으로 Pipeline IO의 Output을 통해서 데이타 저장소 (빅쿼리나 파일등)에 저장이 된다.  이 Pipeline IO에서 부터 PTransform을 걸친 일련의 프로세싱 과정을 Pipeline이라고 한다.


예를 들어 설명해보자, 문자열을 입력 받은 후에, 문자열에서 단어를 추출하여, 각 단어의 개수를 세어 주는 파이프라인이 있다고 하자.


첫번째 실행에서 “Hello my daddy”라는 문자열이 입력되었다. 첫번째 Transform인 Extract words Transform을 거치면서, “Hello my daddy” 라는 문자열은 “Hello”, “my”, “daddy” 라는 각각의 단어로 쪼게진다. 다음으로 Count Element 라는 Transform에 의해서, 각 단어의 수를 세어서 저장한다. “Hello”는 1번, “my”는 1번, “daddy”는 1번 의 값이 저장된다.


두번째 실행에서 “Hello my bro” 라는 문자열이 들어오면, Extract words 에 의해서 “Hello”, “my”, “bro”라는 각각의 단어로 쪼게지고, Count Element Transform에서 이전에 세어놓은 단어의 수와 합산하여 계산이 된 결과가 저장이 된다. “Hello”는 이전에 한번 카운트가 되었고 이번에도 들어왔기 때문에, 2가 되고, 같은 원리로 “my”라는 단어의 카운트도 2가된다. “bro” 라는 단어는 이번에 처음 들어왔기 때문에 새 값으로 1로 저장된다.




세번째 “Hello my mom” 이라는 문자열이 들어오면 앞의 두개의 문자열과 마찬가지로 간 단어로 쪼게진 다음 Count Element에 의해서 각 단어의 수가 카운트되어 기존의 값과 누적 합산된다. 모든 데이타를 다 읽어서 처리가 끝나면, 저장된 결과를 Pipeline IO를 통해서 파일에 그 결과를 쓰게 된다.

배치와 스트리밍 처리

dataflow는 위에서 설명한 파이프라인의 개념을 배치와 스트리밍 처리 두가지 개념 모두로 지원해서 처리가 가능하다. 데이타가 파일과 같이 이미 쓰여지고 더 이상 증가나 수정이 되지 않은 데이타에 대해서는 일괄로 데이타를 읽어서 결과를 내는 배치 처리가 가능하고, 계속해서 들어오고 있는 데이타 (트위터 피드, 로그 데이타)는 스트리밍으로 처리가 가능하다.

윈도우의 개념

배치 처리야, 데이타 처리가 모두 끝난 후에 결과를 내보낸다고 하지만, 그렇다면 스트리밍 데이타는 계속해서 데이타가 들어오고 있는데, 언제 결과를 내보내야 할까?

개별 데이타를 변환해서 저장하는 경우에야, 개별 데이타 처리가 끝난후에 각각 하나씩 저장한다고 하지만, 위와 같이 들어오는 데이타에서 특정데이타 들에 대한 합이나 평균과 같은 처리를 하는 경우 어느 기간 단위로 해야 할까? 스트리밍 처리에서는 이러한 개념을 다루기 위해서 윈도우라는 개념을 사용한다.


예를 들어, “1시~1시10분까지 들어온 문자열에 대해서 문자열에 들어 있는 각 단어의 수를 카운트해서 출력해주는 기능" 이나, 또는 “매 5분 단위로 현재 시간에서 10분전까지 들어온 문자열에 대해서 각 단어의 수를 카운트 해서 출력 해주는 기능" 과 같이 작은 시간 기간의 단위를 가지고 그 기간 단위로 계산 하는 방법이며, 이 시간 단위를 윈도우(Window)라고 한다.


Fixed Window (고정 크기 윈도우)

앞의 예에서 1시~1시10분, 1시10분~1시20분 과 같이 고정된 크기를 가지는 윈도우의 개념을 Fixed Window라고 한다.


Sliding Window (슬라이딩 윈도우)

앞의 예에서와 같이 윈도우가 상대적인 시간 (이전 10분까지)의 개념을 가지면서, 다른 윈도우와 중첩되는 윈도우를 슬라이딩 윈도우라고 한다.


그림과 같이 1시10분의 윈도우는 1시 10분의 10분전인 1시에서 부터, 현재 시간 까지인 1시10분까지 값을 읽어서 처리하고 윈도우가 끝나는 시점인 1:10분에 그 값을 저장한다. 윈도우의 간격은 5분 단위로, 1시 15분에는 1시 15분의 10분전인 1시05분 부터 현재 시간인 1시15분까지 들어온 데이타에 대해서 처리를 하고 그 결과 값을 1시15분에 저장한다.

Session window (세션 윈도우)

다음은 세션 윈도우라는 개념을 가지고 있는데, 이를 이해하기 위해서는 먼저 세션의 개념을 먼저 이해해야 한다.

세션이랑 사용자가 한번 시스템을 사용한 후, 사용이 끝날때 까지의 기간을 정의한다. 스트리밍 시스템에서는 사용자 로그인이나 로그 아웃을 별도의 이벤트로 잡는 것이 아니기 때문에, 데이타가 들어온 후에, 일정 시간 이후에 그 사용자에 대한 데이타가 들어오지 않으면, 세션이 종료 된것으로 판단한다.

일반 적인 웹 프로그램에서 HttpSession과 같은 원리인데, 웹 사이트에 접속한 후, Session time out 시간이 지날때 까지 사용자가 별도의 request를 보내지 않으면 세션을 끊는 것과 같은 원리이다.

아래 그림은 세션 윈도우의 개념을 설명하기 위한 윈도우인데, User A와 User B의 데이타가 들어오고 있다고 하자.


그리고 세션 타임 아웃이 10분으로 정의했다. 즉 같은 사용자에 대해서 데이타가 들어온 후, 10분 내에 추가 데이타가 들어오지 않으면 세션이 종료 된것으로 판단한다.


User A는 1:00 에 첫 데이타가 들어와서1:00~1:10 사이에 두번째 데이타가 들어왔고, 1:10~1:20 사이에 세번째 데이타가 들어온 후, 네번째 데이타는 10분이 지난 후에 들어왔다. 그래서 1:00~1:20 까지가 하나의 세션이 되고, 이것이 User A에 대한 1:00~1:20의 세션 윈도우가 된다. 네번째 데이타 부터는 새로운 윈도우로 처리가 되는데, 1:40~1:50 사이에 다섯번째 데이타가 도착한후, 그 이후로 도착하지 않았기 때문에 이게 두번째 윈도우가 되고, 1:30~1:50의 시간 간격을 가지는 User A의 두번째 윈도우가 된다.

각 윈도우의 값은 User A의 1:00~1:20 윈도우의 값은 (1+1+1)로 3이 되고, 두번째 윈도우인 1:30~1:50 윈도우는 (2.5+1)로 3.5가 된다.


User B는 1:10에 데이타가 들어오고, 10분 후인 1:20까지 데이타가 들어오지 않고 그 이후 1:30 분에 두번째 데이타가 들어왔기 때문에, 1:10~1:10 길이의 첫번째 세션 윈도우가 생성된다. 다음 으로 1:30분에 데이타가 들어왔기 때문에 두번째 세션 윈도우를 생성하고, 2:00까지 계속 데이타가 들어오다가 멈추고 2:10까지 새로운 데이타가 들어오지 않았기 때문에 1:30~2:00 까지 두번째 윈도우로 취급한다.


이 Session Window는 앞서 언급한 Fixed Window나, Sliding Window와는 다르게, User A, User B와 사용자 단위와 같이 어떤 키에 따라서 개별적으로 윈도우를 처리 한다.  즉 Session Window는 User A나 USer B처럼 특정 키에 종속된 윈도우만을 갖는다.


반대로 Fixed Window나 Sliding Window는 키단위의 윈도우가 아니라 그 시간 범위내에 들어 있는 모든 키에 대한 값을 처리한다..

Fixed Window의 경우에는 30분 사이즈를 갖는 윈도우라고 하면 아래 그림과 같이


1:00~1:30 윈도우는 User A의 값 = (1+1+1) 과 User B의 값 1을 합쳐서 총 4가 되고

1:30~2:00 윈도우는 User A값 = (2.5+1)과 User B의 값 = (2+2+2) 를 합쳐서 9.5가 된다.


Sliding Window의 경우에는 길이가 30분이고, 주기가 20분인 Sliding 윈도우라고 할때,


1:00~1:30, 1:20~1:50, 1:40~2:00 3개의 Sliding 윈도우가 생성된다.

1:00~1:30 윈도우는 User A의 값=(1+1+1)과 User B의 값 1을 합산하여 4가 되고

1:20~1:50 윈도우는 User A의 값 = (2.5+1)과 User B의 값 =(2+2)를 합산하여 7.5가 된다.

1:40~2:00 윈도우는 User A의 값 = (2.5+1)과 User B의 값 (2+2)를 합산하여 7.5가 된다.




구글 데이타 스트리밍 데이타 분석 플랫폼 dataflow - #1 소개


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다.  스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


런타임

Apache Beam으로 작성된 코드는 여러개의 런타임에서 동작할 수 있다. 구글 클라우드의 Dataflow 서비스에서 돌릴 수 도 있고, Apache Flink나 Apache Spark 클러스터 위에서도 그 코드를 실행할 수 있으며, 로컬에서는 Direct Pipeline이라는 Runner를 이용해서 실행이 가능하다.


여러 런타임이 있지만 구글 클라우드의 Dataflow 런타임을 사용하면 다음과 같은 장점이 있다.


매니지드 서비스로 설정과 운영이 필요 없다.

스트리밍 처리는 하나의 노드에서 수행되는 것이 아니라, 여러개의 노드에서 동시에 수행이 되기 때문에, 이 환경을 설치하고 유지 보수 하는 것만 해도 많은 노력이 들지만, Dataflow는 클라우드 서비스이기 때문에 별도의 설치나 운영이 필요없고, 작성한 코드를 올려서 실행 하기만 하면 된다.

Apache Spark등을 운영해본 사람들은 알겠지만, Spark 코드를 만드는 것 이외에도, Spark 클러스터를 설치하고 운영 하는 것 자체가 일이기 때문에, 개발에 집중할 시간이 줄어든다.

오토 스케일링을 지원하기 때문에, 필요한 만큼 컴퓨팅 자원을 끌어다가 빠르게 연산을 끝낼 수 있다.

클라우드 컴퓨팅의 장점은 무한한 자원을 이용하여, 워크로드에 따라서 자원을 탄력적으로 배치가 가능한 것인데, Dataflow 역시, 이러한 클라우드의 장점을 이용하여, 들어오는 데이타량이나 처리 부하에 따라서 자동을 오토 스케일링이 가능하다.


그림처럼 오전에 800 QPS (Query per second)의 처리를 하다가 12시경에 부하가 5000 QPS로 늘어나면 그만한 양의 리소스 (컴퓨팅)를 더 투여해서 늘어나는 부하에 따라서 탄력적으로 대응이 가능하다.

리밸런싱(Rebalancing)기능을 이용하여 작업을 골고루 분배가 가능하다.

Spark이나 Hadoop Map & Reduce와 같은 대용량 분산 처리 시스템의 경우 문제가 특정 노드의 연산이 늦게 끝나서 전체 연산이 늦게 끝나는 경우가 많다. 예를 들어 1000개의 데이타를 10개씩 100개의 노드에서 분산하여 처리를 한후 그 결과를 모두 모아서 합치는 연산이 있다고 할때, 1~2개의 노드가 연산이 늦게 끝나더라도 그 결과가 있어야 전체 값을 합칠 수 있기 때문에, 다른 노드의 연산이 끝나도 다른 노드들은 기다려야 하고 전체 연산 시간이 느려 진다.


Dataflow의 경우는 이런 문제를 해결 하기 위해서, 리밸런싱(rebalancing)이라는 메카니즘을 발생하는데, 위의 그림(좌측의 그래프는 각 노드의 연산 시간이다.) 과 같이 특정 노드의 연산이 느려진 경우, 느려진 노드의 데이타를 다른 연산이 끝난 노드로 나눠서 재 배치하여 아래와 같이 전체 연산 시간을 줄일 수 있다.





데이타 스트리밍 처리에 대한 이해


조대협 (http://bcho.tistory.com)


근래에 Apache Beam 프로젝트를 공부하게 되서, 그간 묵혀놨던 데이타 스트리밍 처리에 대해서 다시 정리중인데, 예전에 Apache Storm을 봤을때 보다 트리거나, 윈도우등 많은 개념들이 들어가 있어서 데이타 스트리밍에 대한 개념 부터 다시 정리를 시작을 하고자한다.


Apache Storm에서 부터, Apache Spark 기반의 데이타 스트림 처리뿐 아니라 근래에는 Apache Flink와 같은 새로운 스트리밍 프레임웍크과 구글이 이미 클라우드를 통해서 서비스 하고 있는  google cloud dataflow (Apache Beam이라는 프로젝트로 오픈소스화 되었고, 현재 인큐베이션 단계에 있다.) 까지 빅데이타에 대한 실시간 처리성이 강조되면서 근래에 데이타 스트리밍 처리가 다시 주목 받는 것 같다. 이 문서는 구글이 개발한 dataflow에 대한 개념을 이해하기 위함이다.


본 문서의 내용과 그림은 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 를 참고하였다.


사전 개념 이해

스트리밍 데이타 처리를 이해하기 위해서는 몇몇 용어와 개념을 사전에 이해해야 하는 부분이 있다.

Bounded data 와 Unbounded data

먼저 스트리밍 데이타 처리를 이해하려면 데이타의 종류에 대해서 먼저 이해해야 한다.

  • Unbounded data 는 데이타의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이타라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이타를 Unbounded data 라고 한다.

  • Bounded data는 데이타가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이타를 뜻한다. 1월의 정산 데이타.

Event time과 Processing time

데이타의 발생 시간과 시스템에서 처리되는 시간이 차이가 있는데, 이를 각각 Event time과 Processing time 이라고 정의한다.

예를 들어, 게임에서 사용자가 공격을 한 이벤트를 서버에 전달해서 처리하여 저장하는 시나리오가 있다고 가정할때, 공격 이벤트가 1:00:00에 발생했으면, 이 데이타가 네트워크를 타고 서버로 도달하여 프로그램 로직을 수행하고 저장하는데 소요된 시간을 2초라고 가정하면, Event time 은 1:00가 되고, Processing time은 1:00:02가 된다.

이상적으로는 Event time과 Processing time이 동일하면 좋겠지만, 네트워크 시간이나 처리 시간에 따라 Processing time이 Event time 보다 늦고, 또한 Processing time에서 소요되는 실제 처리 시간은 일정하지 않고 아래 그림의 파란색 그래프(실제 처리 그래프) 처럼 들쭉 날쭉하다. 네트워크 상황이나, 서버의 CPU, IO 상황이 그때마다 차이가 나기 때문이다.


아래 그림을 통해서 개념을 다시 정리해보면,

X축은 Event time, Y축은 Processing Time이다. 0초에 발생한 데이타가 서버에 도착해서 처리하는 시간이 소요 되기 때문에, 아래 그림과 같이 Processing Time은 2초 부터 시작한다. Skew는 Event time과 Processing time간의 간격이다. 아래 그림에서 보면, Processing time에서 3초때에는 Event time 1초에서 발생한 데이타를 처리하고 있는데, 실제 Event time에서는 3초 시간의 데이타가 발생하고 있기 때문에, Processing time과 Event time은 약 2초의 지연이 발생하고 있고, 이를 Skew 라고 한다.



Bounded data의 처리

Bounded data는 이미 저장되어 있는 데이타를 처리하는 것이기 때문에 별다른 처리 패턴이 필요없다



데이타를 읽어서 한번에 처리해서 저장 하면 된다.

UnBounded data 처리

복잡한 것은 스트리밍 데이타 즉, Unbounded data 를 처리하는 방법인데, Unbounded data 는 크게 Batch와 Streaming 두 가지 방식으로 처리할 수 있다.

Batch로 처리

배치로 Unbounded data를 처리 하는 방식은 아래와 같이 두 가지 방식이 있다.

Fixed Windows

Fixed Windows 방식은 스트리밍으로 들어오는 데이타를 일정 시간 단위로 모은 후, 배치로 처리 하는 방식이다. 예를 들어서 아래 그림과 같이 10~11시 까지 데이타를 수집한후, 11시 이후에, 10~11시까지 들어온 데이타를 처리해서 정리 하는 방식이다.



이 방식은 구현이 간단하다는 장점이 있지만, 데이타가 수집 된 후 처리를 시작하기 때문에, 실시간성이 떨어진다. (근 실시간)

Streaming 처리

Unbounded 데이타를 제대로 처리하려면 스트리밍 처리를 하는 것이 좋은데, 스트리밍 처리 방법에는 아래와 같이 크게 Time agnostic, Filtering, Inner Join, Windowing 방식등이 있다.


스트리밍 처리는 배치 처리에 비해서 복잡한 것이, Unbounded 데이타는 기본적으로 특성이 Skew가 환경에 따라 변화가 심하고 그래서 데이타가 시스템에 도착하는 순서 역시 순차적으로 도착하지 않고 들쭉 날쭉 하다.

Time agnostic

Time agnostic 이란, 데이타가 시간 속성을 가지고 있지 않는 데이타 이다. 들어오는 데로 처리를 하면 되기 때문에, 별다른 노하우가 필요 없지만, 하나의 데이타 형이기 때문에 간단하게 언급만 한다.

Filtering

다음으로 많이 사용 되는 것이 필터링인데, 들어오는 데이타 중 특정 데이타만 필터링 해서 저장 하는 구조이다.


예를 들면, 웹 로깅 데이타를 수집해서, 특정 IP나 국가 대역에서 들어오는 데이타만 필터링해서 저장하는 시나리오등이 될 수 있다.

Inner joins (교집합)

Inner join은 두개의 Unbounded 데이타에서 들어오는 값을 서로 비교하여 매칭 시켜서 값을 구하는 방식이다.



모바일 뉴스 앱이 있다고 가정할때, 뉴스 앱에서는 사용자가 어떤 컨텐츠를 보는지에 대한 데이타를 수집 전송하고, 지도 앱에서는 현재 사용자의 위치를 수집해서 전송한다고 하자.

이 경우 사용자별 뉴스 뷰에 대한 Unbounded data 와, 사용자별 위치에 대한 Unbounded data 가 있게 되는데, 이 두개의 데이타 스트림을 사용자로 Inner Join을 하면 사용자가 어떤 위치에서 어떤 뉴스를 보는지에 대해서 분석을 할 수 있다.

Inner join을 구현하기 위해서는 양쪽 스트림에서 데이타가 항상 같은 시간에 도착하는 것이 아니기 때문에, 반대쪽 데이타가 도착할때 까지 먼저 도착한 데이타를 임시로 저장할 버퍼 영역이 필요하고, 이 영역에 임시로 일정 기간 데이타를 저장하고 있다가 반대쪽 스트림에서 데이타가 도착 하면 이를 조인해서 결과를 저장하고, 버퍼 영역에서 두개의 데이타를 삭제한다.

만약에 반대쪽의 데이타가 도착하지 않으면, 이 버퍼 영역에 데이타가 계속 쌓이기 때문에, 일정 기간이 지나면 반대쪽 스트림에서 데이타가 도착하지 않은 데이타를 주기적으로 삭제 해주는 (garbage collection) 정책이 필요하다.


cf. Inner join (교집합), Outer join (합집합)

Approximation algorithms (근사치 추정)

근사치 추정 방식은 실시간 데이타 분석에서 많이 사용되는데, 실시간 분석에서는 전체 데이타를 모두 분석할 수 있는 시간이 없는 경우가 많고, 시급한 분석이 필요한 경우가 있기 때문에, 전체 데이타를 분석하지 않고 일부만 분석하거나 또는 대략적인 데이타의 근사값만을 구하는 방법으로 해서, 빠르게 데이타를 분석하는 경우가 있다. 이를 근사치 추정 방식이라고 하는데, 예를 들어 VOD 서비스에서 지금 10분간 인기있는 비디오 목록, 12시간 동안 가장 인기 있는 판매 제품등 과 같은 시나리오인데, 이런 시나리오에서 데이타는 아주 정확하지 않아도 근사 값만 있으면 되고, 데이타를 그 시간에 보는 시급성이 중요하다.  이러한 시나리오에서는 전체 데이타를 다 보고 분석이 어렵기 때문에, 샘플링을 하거나 대략적인 근사 값만을 구해서 결과를 낸다.


이런 근사치를 추정하는 알고르즘은 K-means나 Approximate Top-N등이 이미 정의되어 있는 알고리즘이 많다.


참고 자료 :

Storm을 이용한 근사치 구하기 : https://pkghosh.wordpress.com/2014/09/10/realtime-trending-analysis-with-approximate-algorithms/

Apache Spark에서 K means로 근사치 구하기 :

https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html


Windowing

실시간 스트리밍 데이타 처리에서 중요한 개념중의 하나는 Windowing 인데, Windowing 이란 스트리밍 데이타를 처리할때 일정 시간 간격으로 처리하는 것을 정의한다.

예를 들어, 10분 단위의 Windowing의 경우 1시~2시까지 들어온 데이타를 1:10, 1:20,1:30, …  단위로 모아서 처리한다.

윈도우에는 자르는 방법에 따라서 다음과 같이 몇가지 방법이 있다.

Fixed Windows

정확하게 일정 시간 단위로 시간 윈도우를 쪼게는 개념이다. 앞에서 언급한 예와 같이 윈도우 사이즈가 10분 일때, 1시 10분은 1시00분~1시10분까지의 데이타를, 1시 20분은 1시10분~1시20분까지의 데이타를 처리한다.

Sliding Windows

Sliding Window 방식은 윈도우가 움직이는 개념이다.

슬라이딩 윈도우의 개념은 현재 시간으로 부터 +-N 시간 전후의 데이타를 매 M 시간 마다 추출 하는 것을 슬라이딩 윈도우라고 하고, 이 윈도우들은 서로 겹치게 된다.

예를 들면 현재시간으로부터 10분 전에서 부터  측정시간까지의 접속자를 1분 단위로 측정하는 시나리오가 될 수 있다. 매 1분 간격으로, 데이타를 추출하고, 매번 그 시간으로부터 10분전의 데이타를 추출하기 때문에 데이타가 중첩이 된다.  

이렇게 추출하는 간격을 Period (앞에서 1분), 그리고 추출하는 기간을 Length 또는 Size (앞에서 10분)라고 한다.



출처 : https://cloud.google.com/dataflow/model/windowing#sliding-time-windows

Session

다음으로는 Session Window의 개념이다.

Session Window에는 사용자가 일정 기간동안 반응이 없는 경우(데이타가 올라오지 않는 경우)에 세션 시작에서 부터, 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리한다

예를 들어서 세션 타임 아웃이 20분이라고 하고 데이타가 1:00 부터 올라오고 있는데,  1:01, 1:15에 데이타가 올라오고, 1:40분에 데이타가 올라오면 1:15 이후에 20분동안 (1:35까지) 데이타가 올라오지 않았기 때문에, 1:00,1:01,1:15은 하나의 세션으로 되고, 1:40은 새로운 세션 시작이 된다.



출처 : https://cloud.google.com/dataflow/model/windowing#session-windows


시간대별 Window 처리 방식

스트리밍 데이타에서 윈도우를 사용할때, 어느 시간을 기준 시간으로 할것인가를 정해야 하는데, 데이타가 시스템에 도착하는 Processing time을 기준으로 할 수 있고 또는  데이타가 실제 발생한 시간인 Event time을 기준으로도 할 수 있다.

Processing time based windowing

Processing time을 기준으로 데이타를 처리하는 것은 크게 어렵지 않다. 데이타가 도착한 순서대로 처리해서 저장하면 된다.


Event time based windowing

문제는 Event time을 기준으로 데이타를 처리 하는 경우인데, 데이타가 들어오는 것이 순서대로 들어오지 않는 경우가 많고, 또한 데이타의 도착 시간또한 일정하지 않다.




이 그림은 Event time을 기준으로 데이타를 처리하는 개념인데, 좌측 하얀색 화살표 처럼 12:00~13:00에 도착한 데이타가 11:00~12:00에 발생한 데이타 일 경우, 11:00~12:00 윈도우에 데이타를 반영해줘야 한다.

이러한 Event time 기반의 스트리밍 처리는 아래와 같이 기술적으로 두가지 주요 고려 사항이 필요하다.

  • Buffering
    늦게 도착한 데이타를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다.

  • Completeness
    Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가?
    즉 해당 시간에 발생한 모든 데이타는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이타가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다. (예를 들어 워터마크 기법 같은 것이 있는데, 이는 다음글에서 설명하도록 한다.)


지금까지 실시간 데이타 분석에 사용되는 대략적인 개념을 알아보았다. 다음 글에서는 Apache Beam을 이용하여 이러한 실시간 데이타 분석을 어떻게 구현하는지 알아보도록 하겠다.



참고 자료

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison#game-stats-advanced-stream-processing





스팍에 대한 간단한 개념과 장점 소개


조대협 (http://bcho.tistory.com)



스팍의 개념과 주요 기능


요즘 주변에서 아파치 스팍을 공부하는 사람도 많고, 스팍을 기반으로한 Zeppelin을 이용하여 데이타 분석을 하는 경우도 많아서, 오늘부터 다시 Spark을 들여다 보기 시작했습니다.


스팍은 예전에도 몇번 관심을 가진적이 있는데, Storm과 같은 데이타 스트리밍 프레임웍에서 Storm과 같이 언급 되기도 하고, 머신 러닝 프레임웍을 볼때도  스팍 ML  라이브러리 기능이 언급 되기도 하고, 예전 모 회사의 데이타 분석 아키텍쳐를 보니, 카산드라에 저장된 데이타를 스팍/Shark라는 프레임웍으로 분석을 하더군요. 또 누구는 메모리 기반의 하둡이라고도 합니다.


스팍의 정의를 내려보면 한마디로

범용적 목적의 분산 고성능 클러스터링 플랫폼 (General purpose high performance distributed platform)

입니다 말이 정말 길고 모호한데, 달리 설명할만한 단어가 없습니다.


스팍은 범용 분산 플랫폼입니다. 하둡과 같이 Map & Reduce 만 돌리는 것도 아니고, Storm 과 같이 스트리밍 처리만 하는게 아니라, 그냥 분산된 여러대의 노드에서 연산을 할 수 있게 해주는 범용 분산 클러스터링 플랫폼으로, 이 위에, Map & Reduce나, 스트리밍 처리등의 모듈을 추가 올려서 그 기능을 수행하게 하는 기능을 제공합니다.


특히나, 메모리 하둡이라고도 불리는데, 이 스팍은 기존의 하둡이 MR(aka. Map & Reduce) 작업을 디스크 기반으로 수행하기 때문에 느려지는 성능을 메모리 기반으로 옮겨서 고속화 하고자 하는데서 출발하였습니다.


그 플랫폼 위에 SQL 기반으로 쿼리를 할 수 있는 기능이나, 스트리밍 기능등등을 확장하여 현재의 스팍과 같은 모습을 가지게 되었습니다.

스팍의 주요 기능은 앞에서 언급하였듯이 

  • Map & Reduce (cf. Hadoop)
  • Streaming 데이타 핸들링 (cf. Apache Storm)
  • SQL 기반의 데이타 쿼리 (cf. Hadoop의 Hive)
  • 머신 러닝 라이브러리 (cf. Apache Mahout)
등을 제공합니다.

스팍의 장점

빠른 속도라는 장점은 필수 이고, 왜 다들 스팍! 스팍 하는가 봤더니, 플랫폼으로써의 특성이 장점으로 작용합니다.
예를 들어 기존의 빅데이타 분석 플랫폼의 구현 구조를 보면, 배치 분석시 ETL 등을 써서 데이타를 로딩한 후에, Hadoop의 MR을 이용하여 데이타를 정재한후, 이 데이타를 OLAP 등의 데이타 베이스에 넣은 후에, 리포팅 툴로 그래프로 표현했습니다. 여기에 만약 실시간 분석을 요한다면 Storm을 연결해서 실시간 데이타 분석 내용을 더하는 일을 했습니다. 
사용되는 프레임웍만해도 몇가지가 되고, 이를 공부하는 시간과 시스템을 배포 운영하는데 여러가지 노력이 들어갔습니다만, 스팍은 하나로 이 모든것이 가능하다는 겁니다.

즉 한번 배워놓으면, 하나의 플랫폼내에서 여러가지가 가능한데다가 속도까지 빠릅니다.
그리고 한 플랫폼안에 배치,스트리밍, 머신 러닝등 다양한 처리를 제공하기 때문에, 하나의 데이타로 여러가지 형태로 데이타를 처리할 수 있는 기능을 가집니다.

연동성 부분에서도 장점을 볼 수 가 있는데, 스팍은 스칼라로 구현되었지만, 파이썬,자바,스칼라등 다양한 언어를 지원하기 위한 SDK를 가지고 있고, 데이타 저장단으로는 하둡, 아마존  S3, 카산드라등 다양한 데이트 스토리지를 지원합니다.
이런 연동성으로 다양한 언어로 다양한 데이타 저장소를 연동하여 데이타를 분석 및 처리할 수 있는 장점을 가지고 있습니다.


대충보는 Storm #5-Apache Storm 병렬 분산 처리 이해하기

 조대협 (http://bcho.tistory.com)

 

Storm에 있는 Spout Bolt들은 여러개의 머신에서 어떻게 나눠서 처리될까? Storm 클러스터는 여러대의 분산된 서버에서 운용되기 때문에, 당연히 Spout Bolt도 나눠서 처리된다 그렇다면 이런 Storm의 병렬 처리 구조는 어떻게 되는 것일까?

이 글에서는 Spout Bolt를 병렬로 처리하는 Storm의 구조에 대해서 알아보도록 한다.

Storm의 병렬 처리를 이해하기 위한 개념

Storm의 병렬 처리를 이해하기 위해서는 몇가지 개념을 정리해야 한다. Node,Worker,Exectutor,Task 이 네 가지 개념을 이해해야 한다.


Node

Node는 물리적인 서버이다. Nimbus Supervisor 프로세스가 기동되는 물리적인 서버이다.

Nimbus는 전체 노드에 하나의 프로세스만 기동하며, Supervisor는 일반적으로 하나의 노드에 하나만 기동한다. 여러대를 기동시킬 수 도 있지만, Supervisor의 역할 자체가 해당 노드를 관리하는 역할이기 때문에 하나의 노드에 여러개의 Supervisor를 기동할 필요는 없다.


Worker

Worker Supervisor가 기동되어 있는 노드에서 기동되는 자바 프로세스로 실제로 Spout Bolt를 실행하는 역할을 한다.


Executor

Executor Worker내에서 수행되는 하나의 자바 쓰레드를 지칭한다.


Task

Task Bolt Spout의 객체를 지칭한다. Task Executor (쓰레드)에 의해서 수행된다.

이 개념을 다시 정리해보면 다음과 같은 그림이 된다.



<그림. Node,Worker,Executor,Task 의 개념>

각 슬레이브 노드에는 Supervisor 프로세스가 하나씩 떠있고, conf/storm.yaml에 정의된 설정에 따라서 worker 프로세스를 띄운다.supervisor.slots.ports에 각 Worker가 사용할 TCP 포트를 정해주면 된다. 아래는 5개의 Worker 프로세스를 사용하도록 한 설정이다.



<그림. Storm 설정에서 Supervisor 5개 띄우도록한 설정>

 

그리고 난후에, Topology를 생성할때, Topology에 상세 Worker,Executor,Task의 수를 정의한다. 앞에서 예제로 사용했던 HelloTopology 클래스 코드를 다시 살펴보자. 아래 코드는 Worker,Executor,Task등을 설정한 예이다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),2)

                       .setNumTasks(4)

                       .shuffleGrouping("HelloSpout");

              

              

               Config conf = new Config();

               conf.setNumWorkers(5);

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<코드. Worker,Executor,Task 수를 설정한 HelloTopology 예제>

     Topology가 사용할 Worker 프로세스의 수 설정
Config
에서 setNumWorkers(5)를 이용해서 이 토폴로지에서 사용한 Worker 프로세스 수를 5개로 지정했다.

     Spout Executor(쓰레드 수) 설정
다음으로 setSpout에서 3번째 인자로 “2”라는 숫자를 넘겼는데, setSpout에 마지막 인자는 Executor의 수이다. 이를 Parallelism 힌트라고 하는데, Spout 컴포넌트가 수행될 쓰레드의 수이다. 여기서는 Spout Task (객체의 수)를 정의하지 않았는데, 정의하지 않은 경우 디폴트로 Executor의 수와 같이 설정된다.

     Bolt Executor(쓰레드 수)Task(객체)수 설정
Bolt
도 마찬가지로 setBolt 3번째 마지막 인자가 Parallelism 힌트인데, 역시 2개로 지정하였다. 여기서는 Task수를 별도로 지정하였는데, setTaskNum(4)을 이용해서 지정한다. 이렇게 설정하면 HelloBolt 객체는 총 4개가 생기고 2개의 Thread에서 번갈아 가면서 실행하게 된다.

자아 그러면 실제로 설정하는데로 동작하는 지 몇가지 확인을 해보자. 자바의 jps 명령을 이용하면 현재 동작중인 자바 프로세스 수를 볼 수 있다.



<그림 Worker 프로세스 수의 확인>

위의 테스트는 하나의 환경에서 nimbus,zookeeper,supervisor,worker를 모두 띄워놓은 형태인데,worker가 설정대로 5개의 프로세스가 떠있고, nimbus,supervisor가 떠 있는 것이 확인되고, QuorumPeerMainzookeeper 프로세스이다.

실제로 Executor가 지정한데로 Thread가 뜨는지 확인을 해보자. 여러개의 Worker 프로세스에 나눠서 뜨면 모니터링하기가 복잡하니 편의상 conf.setNumer(1)로 해서, 하나의 Worker 프로세스에서 모든 Executor가 뜨도록 Topology를 변경한후, Worker 프로세스의 쓰레드를 모니터링 하니 다음과 같은 결과를 얻었다.

코드상에서 HelloSpout에 대한 Parallelism 힌트를 2로 설정하고, HelloBolt에 대한 Parallelism 힌트도 2로 설정하였다.



<그림. Worker 프로세스의 쓰레드 덤프>

실제로 Worker 프로세스내의 쓰레드를 보면 HelloSpout용 쓰레드가 2, HelloBolt용 쓰레드가 2개가 기동됨을 확인할 수 있다.


리밸런싱

Storm 운영중에 노드를 추가 삭제 하거나 또는 성능 튜닝을 위해서 운영중인 환경에 Worker, Executor의 수를 재 조정이 가능하다. 이를 rebalance라고 하는데, 다음과 같은 명령어를 이용해서 가능하다.

% bin/storm rebalance [TopologyName] -n [NumberOfWorkers] -e [Spout]=[NumberOfExecutos] -e [Bolt1]=[NumberOfExecutos] [Bolt2]=[NumberOfExecutos]

미들웨어 엔지니어로써 본 Storm 튜닝

본인의 경우 경력이 톰캣이나 오라클社의 웹로직에 대해 장애진단과 성능 튜닝을 한 경력을 가지고 있어서 JVM이나 미들웨어 튜닝에 많은 관심을 가지고 있는데, 이 미들웨어 튜닝이라는 것이 대부분 JVM과 쓰레드 수등의 튜닝에 맞춰 있다보니, Storm의 병렬성 부분을 공부하다 보니, Executor Worker,Task의 수에 따라서 성능이 많이 차이가 나겠다는 생각이 든다.

특히나 하나의 토폴리지만 기동하는 것이 아니라, 여러개의 토폴로지를 하나의 클러스터에서 구동 할 경우 더 많은 변수가 작용할 수 있는데, 쓰레드란 것의 특성이 동시에 하나의 코어를 차지하고 돌기 때문에, 쓰레드수가 많다고 시스템의 성능이 좋아지지 않으며 반대로 적으면 성능을 극대화할 수 없기 때문에, 이 쓰레드의 수와 이 쓰레드에서 돌아가는 객체(Task)의 수에 따라서 성능 차이가 많이 날것으로 생각된다. 아마도 주요 튜닝 포인트가 되지 않을까 싶은데, 예전에는 보통 JVM당 적정 쓰레드 수는 50~100개 정도로 책정했는데 (톰캣과 같은 WAS 미들웨어 기준). 요즘은 코어수도 많아져서 조금 더 많은 쓰레드를 책정해도 되지 않을까 싶다. 쓰레드 수 뿐 아니라, 프로세스수도 영향을 미치는데, JVM 프로세스의 컨텐스트 스위칭은 쓰레드의 컨텐스트 스위칭보다 길기 때문에, 프로세스를 적게 띄우는 것이 좋을것으로 예상 되지만, JVM 프로세스는 메모리 GC에 의한 pausing 시간이 발생하기 때문에 이 GC 시간을 적절하게 나눠주기 위해서 적절 수 의 프로세스를 찾는 것도 숙제가 아닐까 싶다. 디폴트 worker의 옵션을 보니 768M의 힙 메모리를 가지고 기동하게 되어 있는데, 메모리를 많이 사용하는 연산는 다소 부족하지 않을까 하는 느낌이 든다.

Bolt가 데이타 베이스, 파일 또는 네트워크를 통해서 데이타를 주고 받는 연산을 얼마나 하느냐에 따라서도 CPU 사용률이 차이가 날것이기 때문에 (IO작업중에는 쓰레드가 idle 상태로 빠지고 CPU가 노는 상태가 되기 때문에) IO 작업이 많은 경우에는 쓰레드의 수를 늘리는 것이 어떨까 한다.

Bolt Spout와 같은 통신은 내부적으로 ZeroMQ를 사용하는 것으로 알고 있는데, 아직 내부 구조는 제대로 살펴보지는 않았지만, 같은 프로세스내에서는 네트워크 호출 없이 call-by-reference를 이용해서 통신 효율을 높이고, 통신이 잦은 컴포넌트는 같은 프로세스에 배치 하는 affinity와 같은 속성(?)이 있지 않을까 예측을 해본다.

결과적으로 튜닝 포인트는, Worker,Executor,Task 수의 적절한 산정과, 만약에 옵션이 있다면 리모트 호출을 줄이기 위한 Bolt Spout 컴포넌트의 배치 전략에 있지 않을까 한다.


다음 글에서는 이런 병렬 처리를 기반으로 각 컴포넌트간에 메세지를 보낼때, 여러 Task간에 어떻게 메세지를 라우팅을 하는지에 대한 정리한 그룹핑(Grouping)에 대한 개념에 대해서 알아보도록한다.


 

대충보는 Storm #4-Apache Storm 특징과 기본 개념

 조대협 (http://bcho.tistory.com)


지금까지 Storm에 대해서 이해하기 위해서, 실시간 스트리밍 서비스의 개념에 대해서 알아보고 간단한 HelloStorm 애플리케이션을 제작해서, 싱글 클러스터 노드에 배포해봤다. 대략 실시간 스트리밍이 무엇이고, Storm을 이용해서 어떻게 개발하는지에 대해서는 어느정도 이해를 했을 것이라고 생각한다.

그러면 지금까지의 경험을 조금 더 체졔적으로 정리해서 Storm에 대해서 이해해보도록 하자. 이번에는 Storm에 대한 개념과 아키텍쳐 구조에 대해서 알아보겠다.


Storm의 특징

Storm을 실시간 스트리밍을 처리하기 위한 서버이자 프레임웍이다. 그렇다면 이 Storm이 다른 스트리밍 처리 솔루션에 비해 가지는 특징은 무엇일까?


확장성

Storm은 클러스터링 기능을 이용해서 수평으로 확장이 가능하다. 그래서 많은 데이타를 다루어야 하는 빅데이타이 데이타 스트림 서비스에서도 가능한데, Storm홈페이지에 포스팅된 자료를 보면, 2x2.4GHz CPU 24GB 메모리 머신을 기반으로 초당 100바이트짜리 메세지를 100만개 정도 처리가 가능하다고 한다. (https://storm.apache.org/about/scalable.html) TPS로 환산하면 100 TPS이다. (Wow!!)


장애 대응성

Storm의 다른 특징 중의 하나는 Fault tolerant 구조를 통한 장애 대응 능력이다. ZooKeeper를 통해서 전체 클러스터의 운영 상태를 감지 하면서, 특정 노드가 장애가 나더라도, 시스템이 전혀 문제 없이 작업을 진행할 수 있으며, 장애가 난 노드에 할당된 작업은 다른 노드에 할당해서 처리하고, 장애 노드에 대해서는 복구 처리를 자동으로 수행해준다.


메세지 전달 보장

Storm은 메세지 처리에 안정성을 제공하는데, 장애가 나건 문제가 있건간에,유실 없이 최소한 한번 메세지가 처리될 수 있게 지원한다. (at least once : 이말은 반대로 이야기 하면, 1번 이상 같은 메세지가 중복 처리될 수 있다는 이야기이다.)

만약에 정확하게 메세지가 한번만 처리가 되기를 원하면 Trident (https://storm.apache.org/documentation/Trident-tutorial.html) 를 통해서 Storm을 확장하면, 정확히 하나의 메세지가 한번만 처리되도록 할 수 있다.


쉬운 배포

Storm은 메뉴얼에 따르면(?) 배포와 설정이 매우 쉽다. 실제로 클러스터를 구성해보면 분산 시스템인데 비해서 별로 어려움 없이 배포와 설정이 가능하다. 그리고 메뉴얼에 따르면(?) 배포 후에, 크게 많은 관리 없이 운영이 가능하다고는 하는데, 이것은 실제로 해보지 않았기 때문에 패스

일단, 오픈소스인데도 설치 후 웹 기반의 모니터링 콘솔을 제공하기 때문에 시스템의 상태를 쉽게 모니터링 하고 운영하는데 도움을 준다.


여러 프로그래밍 언어 지원

Storm은 기본적으로 JVM (Java Virtual Machine)위에서 동작하기는 하지만, Twitter Thrift 프로토콜을 기반으로 하기 때문에, 다양한 언어로 구현이 가능하다. Java 뿐 아니라, JVM을 사용하지 않는 경우에 대해서는 stdin/stdout을 통해서 데이타를 주고 받음으로써, Ruby,Python,Javascript,Perl 등 다양한 언어를 사용할 수 있다.


다양한 시스템 연계

Storm은 다양한 다른 솔루션과 통합이 가능하다. 데이타를 수집하는 부분에서는 Kestrel (http://robey.github.io/kestrel/), RabbitMQ (http://www.rabbitmq.com/) , Kafka (http://kafka.apache.org/), JMS 프로토콜, mazon Kinesis (http://aws.amazon.com/kinesis/)

등이 연동이 가능하며, 다양한 데이타 베이스 (RDBMS, Cassandra, MongoDB )에도 쉽게 연계가가능하다. CEP(Complex Event Processing을 지원하는) 이벤트 처리 분야에서는  Drools (http://www.drools.org/),  Esper (http://esper.codehaus.org/등이 연계 가능하고. 그외에도 Elastic Search (http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/storm.html

) , node.js (https://github.com/paralect/storm-nodejs-starterkit)등 다양한 솔루션과 연동을 통해서 시스템을 확장해 나갈 수 있다.


오픈소스

마지막으로 Storm은 오픈소스이다. 상업적 활용이 가능한 Apache License 2.0을 따르고 있다.

Apache License 2.0 http://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EB%9D%BC%EC%9D%B4%EC%84%A0%EC%8A%A4

Apache License 2.0 한국 번역본 http://yesarang.tistory.com/272

Storm은 유사한 특징을 가지고 있는 Apache Spark에 비해서, 개발이 된지 오래되어서 안정성이 높고 특유의 구조상 장애 대처 능력과 메세지 전달 보장 능력등이 좋다. 반대로 Spark은 최근에 만들어진 만큼 머신 러닝등 더 많은 기능을 가지고 있다.

Storm의 기본 개념

자아 그러면 이제 Storm의 개념을 다시 정립해보자. Storm의 개념을 이해하려면 필수적으로 먼저 이해해야 하는것이 Spout Bolt의 개념이다.

Spout Bolt

Spout Storm 클러스터로 데이타를 읽어들이는 데이타 소스이다. 외부의 로그 파일이나, 트위터 타임 피드와 같인 데이타 스트림, 큐등에서 데이타를 읽어드린다. 이렇게 읽어 드린 데이타를 다른 Bolt로 전달한다. Spout에는 크게 4가지 중요한 메서드가 있다.

Ÿ   open() : 이 메서드는 Spout이 처음 초기화 될때 한번만 호출되는 메서드로, 데이타 소스로 부터의 연결을 초기화 하는 등의 역할을 한다.

Ÿ   nextTuple() : 이 메서드는 데이타 스트림 하나를 읽고 나서, 다음 데이타 스트림을 읽을 때 호출 되는 메서드 이다.

Ÿ   ack(Object msgId) : 이 메서드는 데이타 스트림이 성공적으로 처리되었을때 호출되는데, 이 메서드에서는 성공 처리된 메세지를 지우는 등, 성공 처리에 대한 후처리를 구현한다.

Ÿ   fail(Object msgId) : 이 메서드는 해당 데이타 스트림이 Storm 토폴로지를 수행하던중에, 에러가 나거나 타임아웃등이 걸렸을때 호출되는데,이때에는 사용자가 에러에 대한 에처 처리 로직을 명시해야 한다. 흔히 재처리 로직을 구현하거나 또는 에러 로깅등의 처리를 하게 된다.

Bolt는 이렇게 읽어 드린 데이타를 처리하는 함수이다. 입력 값으로 데이타 스트림을 받고, 그 데이타를 내부의 비지니스 로직에 따라서 가공한 다음에 데이타 스트림으로 다른 Bolt로 넘겨주거나 종료 한다. Bolt에서 정의되는 주요한 메서드는 다음과 같다.

Ÿ   prepare (Map stormConf, TopologyContext context, OutputCollector collector): 이 메서드는 Bolt 객체가 생성될때 한번 호출 된다. 각종 설정 정보나 컨텍스트등 초기 설정에 필요한 부분을 세팅하게 된다.

Ÿ   execute(Tuple input): 가장 필수적인 메서드로, Bolt에 들어온 메세지를 처리하는 로직을 갖는다. 종단 Bolt가 아닌 경우에는 다음 Bolt로 메세지를 전달하기도 한다.

Storm 클러스터내에는 여러개의 Spout Bolt가 존재하게 된다.



<그림. Storm Spout Bolt의 개념>

Topology

이렇게 여러 개의 Spout Bolt간의 연관 관계를 정의해서 데이타 흐름을 정의하는 것을 토폴로지(Topology)라고 한다. 아래 그림과 같이 데이타가 어디로 들어와서 어디로 나가는지를 정의하는 것인데, 아래 그림은 두 개의 Spout에 대해서 각각의 토폴로지를 정의하였다.



<그림. Storm Topology>

Spout Bolt간의 연결 토폴로지는 TopologyBuilder라는 클래스를 통해서 정의한다. 그러면 간략하게, Spout Bolt, Bolt간의 데이타 흐름 관계를 어떻게 정의하는지 살펴보도록 하자.

다음과 같은 토폴로지 흐름을 정의한다고 가정하자



<그림. 간단한 토폴로지 정의 예제>


HelloSpout은 앞서 예제에서 만든것과 같은 Spout이고,

EchoBoltA 는 각각 들어온 메세지에 “Hello I am BoltA :”+메세지를 붙여서 화면에 출력한 후 전송하고 EechoBoltB는 들어온 메세지에 “Hello I am BoltB :”+메세지를 붙여서 전송한다.


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class EchoBoltA extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Hello I am Bolt A: "+value);

               collector.emit(new Values("Hello I am Bolt A :"+value));

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

                 declarer.declare(new Fields("say"));

        }

 

}

<그림 EchoBoltA 클래스>


※ 참고 EchoBoltB 클래스도 클래스명만 다르고 내부 구현 내용은 동일하다

토폴로지를 정의할때, HelloSpout hs라는 ID로 생성을 할것이고, EchoBoltA eba라는 ID, EchoBoltB ebb라는 이름으로 생성을 할것이다.

토폴로지 생성 코드를 보자. 아래 노랑색으로 표시된 부분이 실제 토폴로지는 구성하는 부분이다.


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

import com.terry.storm.hellostorm.EchoBoltB;

import com.terry.storm.hellostorm.EchoBoltA;

 

public class ToplogySequence {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("hs", new HelloSpout(),1);

               builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

               builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

              

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("ToplogySequence", conf,builder.createTopology());

               Utils.sleep(1000);

               // kill the LearningStormTopology

               cluster.killTopology("ToplogySequence");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

}

 

<그림. 위의 그림에 있는 토폴로리지를 실제로 구현한 >

 

Spout을 구현한 부분을 보자


builder.setSpout("hs", new HelloSpout(),1);

 

를 통해서 Spout을 생성하는데, setSpout(“{id}”,”{Spout 객체}”,”{Parallelism 힌트}”) 로 이루어진다. 여기서 id“hs”로 정의했고, Spout 객체는 HelloSpout을 지정했다.

    Paralleism 힌트는 나중에 병령 처리와 그룹핑 개념을 설명할때 다시 설명하도록 한다.

다음으로 Bolt를 생성하는데,

builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

 

로 각각의 Bolt를 생성했다. 이때 주목해야 하는 점이 뒤에 붙어 있는 shufflerGrouping이라는 메서드인데, Spout Bolt간의 연관 관계는 이 Grouping이라는 개념을 이용해서 생성한다. Grouping에는 여러가지 종류와 개념이 있지만 여기서는 간단한 shuuflerGrouping만을 사용했다.첫번째 EchoBoltA에서 자신을 “eba” 라는 id로 생성을 한후에, suffelerGrouping(“hs”)를 선언했는데, 이는 “hs”라는 ID를 가지고 있는 Spout이나 Bolt로 부터 메세지를 받아들이겠다는 이야기이다. 두번째 EchoBoltBsuffelerGrouping(“eba”)를 통해서, id“eba” Spout이나 Bolt, 즉 앞서 생성한 EchBoltA로 부터 메세지를 받아들이겠다는 이야기이다.

자 그러면 이 토폴로지를 실행해 보자.

실행하면 다음과 같은 로그를 얻을 수 있다.

5399 [Thread-12-hs] INFO  backtype.storm.daemon.task - Emitting: hs default [hello world 1]

5400 [Thread-8-eba] INFO  backtype.storm.daemon.executor - Processing received message source: hs:4, stream: default, id: {}, [hello world 1]

Hello I am Bolt A: hello world 1

5401 [Thread-8-eba] INFO  backtype.storm.daemon.task - Emitting: eba default [Hello I am Bolt A :hello world 1]

5409 [Thread-10-ebb] INFO  backtype.storm.daemon.executor - Processing received message source: eba:2, stream: default, id: {}, [Hello I am Bolt A :hello world 1]

Hello I am Bolt B: Hello I am Bolt A :hello world 1

 

     5399 번 라인에서 12번 쓰레드에서 수행되는 “hs” 라는 이름의 Spout, “hello world 1” 이라는 문자열을 emit (제출) 하였다

     5400 번 라인에서 8번 쓰레드에서 수행되는 “eba”라는 Bolt“hs”라는 Spout또는 Bolt에서 “hello world”라는 문자열을 받았다. 그 다음 라인에 “Hello I am Bolt A: hello world 1”가 출력되는 것을 확인할 수 있다.

     5401 라인에서 8번 쓰레드에서 수행되는 “eba” 볼트가 “Hello I am Bolt A :hello world 1” 라는 문자열을 제출하였다.

     5409 라인세서 10번 쓰레드에서 수행되는 “ebb”라는 id의 볼트가 “eba”로 부터 “Hello I am Bolt A :hello world 1” 라는 메세지를 받았다. 다음 행에 EchoBoltB에 의해서 처리되어 “Hello I am Bolt B: Hello I am Bolt A :hello world 1” 문자열이 출력되었음을 확인할 수 있다.

Stream Tuple

다음으로 데이타 Stream Tuple에 대한 개념을 이해해야 한다.

Storm에서 데이타는 Stream이라는 개념으로 정의되는데, Stream이란, Spout Bolt간 또는 Bolt간을 이동하는 데이타들의 집합을 이야기 한다.

각각의 Stream은 하나의 Tuple로 이루어 지는데, Tuple형태로 정의된다.



<그림. Storm Stream Tuple 개념>


앞에 예제에서는 하나의 키만 있는 Tuple을 사용하였다.

앞에서 사용한 HelloSpout 클래스를 다시 한번 살펴보면


public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private static int count=0;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 if(count++<10) this.collector.emit(new Values("hello world "+count));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout>

nextTuple 부분에서 newValue로 하나의 값을 보내는 것을 볼 수 있다. 그리고 이 Tuple의 키 구조는 아래 declareOutputFields 메서드에서 “say” 라는 필드이름으로 정의된것을 볼 수 있다.

실제로 HelloSpout에서 생성하는 데이타 스트림은 다음과 같다.



<그림 데이타 Stream Turple 구조>

이번 글에서는 간단하게 Storm의 특징과 기본 개념에 대해서 알아보았다. 다음글에서는 조금더 상세한 Storm의 아키텍쳐의 개념과 병렬 처리 개념에 대해서 알아보도록 하겠다.

 

 

대충보는 Storm #3-Storm 싱글 클러스터 노드 설치 및 배포

조대협(http://bcho.tistory.com)

 

지난번에는 간략하게, Storm을 이용한 HelloStorm 애플리케이션을 개발용 클러스터인 Local Cluster에서 구동해봤다. 이번에는 운영용 클러스터를 설정하고, 이 운영 클러스터에 지난번에 작성한 HelloStorm 토폴리지를 배포해보도록 한다.

Storm 클러스터의 기본 구조

Storm 클러스터를 기동하기 전에, 클러스터가 어떤 노드들로 구성이 되는지 먼저 알아보도록 하자 Storm 클러스터는 기본적으로 아래와 같은 3가지 구성요소로 구성이 되어 있다.

먼저 주요 노드인 Nimbus Supervior 노드에 대해서 알아보자, Nimbus Supervisor 노드는 각각 하나의 물리 서버로 생각하면 된다.


Nimbus

Nimbus는 마스터 노드로 주요 설정 정보를 가지고 있으며, Nimbus 노드를 통해서 프로그래밍 된 토폴로지를 Supervisor 노드로 배포한다. 일종의 중앙 컨트롤러로 생각하면 된다. Storm에서는 중앙의 하나의 Nimbus 노드만을 유지한다.

Supervisor

Supervisor 노드는 실제 워커 노드로, Nimbus로 부터 프로그램을 배포 받아서 탑재하고, Nimbus로 부터 배정된 작업을 실행하는 역할을 한다. 하나의 클러스터에는 여러개의 Supervisor 노드를 가질 수 있으며, 이를 통해서 여러개의 서버를 통해서 작업을 분산 처리할 수 있다.

Zookeeper

이렇게 여러개의 Supervisor를 관리하기 위해서, Storm Zookeeper를 통해서 각 노드의 상태를 모니터링 하고, 작업의 상태들을 공유한다.

Zookeeper는 아파치 오픈소스 프로젝트의 하나로, 분산 클러스터의 노드들의 상태를 체크하고 공유 정보를 관리하기 위한 분산 코디네이터 솔루션이다.

전체적인 클러스터의 구조를 살펴보면 다음과 같다.



<그림. Storm 클러스터의 구조>

하나의 싱글 머신에 Nimbus가 설치되고, 다른 각각의 머신에 Supervisor가 하나씩 설치된다. 그리고, ZooKeeper 클러스터를 통해서 기동이 된다.

※ 실제 물리 배포 구조에서는 Nimbus Supervisor, ZooKeeper등을 하나의 서버에 분산배포할 수 도 있고, 여러가지 다양한 배포구조를 취할 수 있으나, Supervisor의 경우에는 하나의 서버에 하나의 Supervisor만을 설치하는 것을 권장한다. Supervisor의 역할이 하나의 물리서버에 대한 작업을 관리하는 역할이기 때문에, 한 서버에 여러 Supervisor를 설치하는 것은 적절하지 않다.

 

설치와 기동

Storm 클러스터를 기동하기 위해서는 앞에서 설명한바와 같이 ZooKeeper가 필요하다. Zookeeper를 다운로드 받은 후에, ~/conf/zoo_sample.cfg 파일을 ~/conf/zoo.cfg로 복사한다.

다음으로 ZooKeeper를 실행한다.

% $ZooKeeper_HOME/bin/zkServer.cmd


<그림. 주키퍼 기동 로드>


다음으로 Nimbus 노드를 실행해보자. Storm을 다운 받은 후 압축을 푼다.

다음 $APACHE_STORM/bin 디렉토리에서

%storm nimbus

를 실행하면 nimbus 노드가 실행된다. 실행 결과나 에러는 $APACHE_STORM/logs 디렉토리에 nimbus.log라는 파일로 기록이 된다.

정상적으로 nimbus 노드가 기동이 되었으면 이번에는 supervisor 노드를 기동한다.

%storm supervisor

Supervisor에 대한 노드는 $APACHE_STORM/logs/supervisor.log 라는 이름으로 기록된다.

Storm은 자체적으로 클러서터를 모니터링 할 수 있는 UI 콘솔을 가지고 있다. UI를 기동하기 위해서는

%storm ui

로 실행을 해주면 UI가 기동이 되며 http://localhost:8080 에 접속을 하면 관리 콘솔을 통해서 현재 storm의 작동 상태를 볼 수 있다.



<그림. Storm UI를 이용한 기동 상태 모니터링>


현재 하나의 PC nimbus supervior,UI를 모두 배포하였기 때문에 다음과 같은 물리적인 토폴로지가 된다.



<그림. 싱글 서버에 nimbus supervisor를 같이 설치한 예>

싱글 클러스터 노드에 배포 하기

싱글 노드 클러스터를 구축했으니, 앞의 1장에서 만든 HelloStorm 토폴로지를 이 클러스터에 배포해보도록 하자.

전장의 예제에서 만든 토폴로지는 Local Cluster를 생성해서, 자체적으로 개발 테스트용 클러스터에 토폴로지를 배포하도록 하는 코드였다면, 이번에는 앞에서 생성한 Storm 클러스터에 배포할 수 있는 토폴로지를 다시 만들어야 한다. 이 코드의 차이는 기존 코드와는 다르게 LocalCluster를 생성하지 않고, 기동중인 클러스터에 HelloTopoloy Submit하도록 한다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<그림. HelloTopology.java>


토폴로지 클래스를 만들었으면 이를 빌드해보자

%mvn clean install

을 실행하면 ~/target 디렉토리에 토폴로지 jar가 생성된것을 확인할 수 있다.



jar 파일을 배포해보도록 하자.

배포는 storm {jar} {jar파일명} {토폴로지 클래스명} {토폴로지 이름} 명령을 실행하면 된다.

% storm jar hellostorm-0.0.1-SNAPSHOT.jar com.terry.storm.hellostorm.HelloTopology HelloTopology

배포 명령을 내리면 $APACHE_STORM_HOME/logs/nimbus.log에 다음과 같이 HelloTopology가 배포되는 것을 확인할 수 있다.


2015-01-25T07:35:03.352+0900 b.s.d.nimbus [INFO] Uploading file from client to storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.365+0900 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.443+0900 b.s.d.nimbus [INFO] Received topology submission for HelloTopology with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "HelloTopology", "storm.id" "HelloTopology-1-1422138903"}

2015-01-25T07:35:03.507+0900 b.s.d.nimbus [INFO] Activating HelloTopology: HelloTopology-1-1422138903

2015-01-25T07:35:03.606+0900 b.s.s.EvenScheduler [INFO] Available slots: (["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6702] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6701] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6700])

2015-01-25T07:35:03.652+0900 b.s.d.nimbus [INFO] Setting new assignment for topology id HelloTopology-1-1422138903: #backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local\\nimbus\\stormdist\\HelloTopology-1-1422138903", :node->host {"226ceb74-c1a3-4b1a-aab5-2384e68124c5" "terry-PC"}, :executor->node+port {[3 3] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [6 6] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [5 5] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [4 4] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [7 7] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [2 2] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [1 1] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703]}, :executor->start-time-secs {[7 7] 1422138903, [6 6] 1422138903, [5 5] 1422138903, [4 4] 1422138903, [3 3] 1422138903, [2 2] 1422138903, [1 1] 1422138903}}

2015-01-25T07:37:48.901+0900 b.s.d.nimbus [INFO] Updated HelloTopology-1-1422138903 with status {:type :inactive}

해당 토폴로지가 배포되었는지를 확인하려면 storm list라는 명령어를 사용하면 현재 기동되고 있는 토폴로지 목록을 확인할 수 있다.




<그림. storm list 명령으로 기동중인 토폴로지를 확인>


실행 결과는 $APACHE_STORM_HOME/logs 디렉토리를 보면 worker-xxx.logs 라는 파일이 생긴것을 확인해 볼 수 있는데, 파일 내용을 보면 다음과 같다.

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

우리가 앞서 구현한 Bolt에서 System.out으로 출력한 내용이 출력된다.

동작을 확인하였으면, 이제 기동중인 토폴로지를 정지 시켜 보자. 토폴로지의 정지는 storm deactivate {토폴로지명} 을 사용하면 된다. 아까 배포한 토폴로지 명이 HelloTopology였기 때문에 다음과 같이 토폴로지를 정지 시킨다.

%storm deactivate HelloTopology

그 후에 다시 storm list 명령을 이용해서 토폴로지 동작 상태를 확인해보면 다음과 같다.



< 그림. Storm  토폴로지 정지와 확인 >


만약에 Topology를 재 배포 하려면 storm kill로 해당 토폴로지를 삭제한 후에, 다시 배포 해야 한다. 이때 주의할점은 storm kill로 삭제해도 바로 삭제가 안되고 시간 텀이 있으니 약간 시간을 두고 재 배포를 해야 한다.


지금까지 간단하게, 운영용 클러스터를 구성하고, 운영 클러스터에 토폴로지를 배포 하는 것에 대해서 알아보았다.

HelloStorm 코드 구현, 클러스터 노드 구축 및 배포를 통해서 간단하게 스톰이 무엇을 하는 것인지는 파악했을 것으로 안다. 다음 장에는 조금 더 구체적으로 Storm의 개념과 스톰의 아키텍쳐등에 대해서 살펴보도록 하겠다.


 


대충보는 Storm #2-Storm 설치와 HelloStorm 작성하기

조대협(http://bcho.tistory.com)


Apache Storm Spark

앞서 데이타 스트리밍 처리에 대해서 설명했다. 스트리밍 처리에 대표적인 오픈소스 프레임웍으로는 Apache Storm Apache Spark이 있는데ㅔ, Spark은 최근에 나온 것으로 스트리밍 처리뿐 만 아니라 조금 더 보편적인 분산 컴퓨팅을 지원하는데, Storm의 경우 나온지도 오래되었고 무엇보다 안정성 부분에서 아직까지는 Spark보다 우위에 있기 때문에, Storm을 중심으로 설명하고자 한다

HelloStorm

Storm의 내부 구조 개념등을 설명하기에 앞서, 일단 깔아서 코드부터 돌려보고 개념을 잡아보자


HelloStorm 구조

HelloWorld 처럼 간단한 HelloStorm을 만들어보자. 만들어보려고 하는 Storm 프로그램은 다음과 같다.



<그림. HelloStorm 개념 구조>


HelloSpout 이라는 클래스는, Storm에 데이타를 읽어오는 클래스로 이 예제에서는 자체적으로 데이타를 생성해낸다. Storm으로 들어오는 데이타는 Tuple이라는 형식을 따르는데, Key/Value 형식의 데이타 형을 따른다. 여기서는 키(필드명)“say”, 데이타는 “Hello” 라는 문자열을 가지고 있는 데이타 tuple을 생성한다.

HelloSpout에서 생성된 데이타는 HelloBolt라는 곳으로 전달이 되는데, HelloBolt 클래스는 데이타를 받아서 처리하는 부분으로 간단하게 들어온 데이타에서 “say” 라는 필드의 데이타 값을 System.out으로 출력해주는 역할만을 한다.


개발하기

이클립스를 사용하여, maven project를 생성한다.



다음으로 pom.xml을 작성한다.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

 

  <groupId>com.terry.storm</groupId>

  <artifactId>hellostorm</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>jar</packaging>

 

  <name>hellostorm</name>

  <url>http://maven.apache.org</url>

 

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

<dependencies>

  <dependency>

    <groupId>junit</groupId>

    <artifactId>junit</artifactId>

    <version>3.8.1</version>

    <scope>test</scope>

  </dependency>

  <dependency>

        <groupId>org.apache.storm</groupId>

        <artifactId>storm-core</artifactId>

        <version>0.9.3</version>

  </dependency>

 

</dependencies>

 

<build>

  <plugins>

    <plugin>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.2.1</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <mainClass />

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>make-assembly</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

 

</project>

<그림. pom.xml>


이 예제에서는 storm 0.9.3을 사용했기 때문에 위와 같이 storm-core 0.9.3 dependency 부분에 정의하였다.

다음으로 데이타를 생성하는 HelloSpout을 구현하자


package com.terry.storm.hellostorm;

 

import java.util.Map;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 this.collector.emit(new Values("hello world"));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout.java>


HelloSpout 실행이 되면, 필드 “say” 값이 “hello world” 데이타를 생성해서 다음 워크 플로우로 보낸다.

nextTuple() 이라는 함수에서 외부에서 데이타를 받아들여서 다음 워크 플로우로 보내는 일을 하는데, 여기서는 외부에서 데이타를 받아들이지 않고 자체적으로 데이타를 생성하도록 한다. 데이타를 뒤에 워크플로우에 보내는 함수는 emit인데, emmit부분에 “hello world”라는 value 넣어서 보내도록 하였다. 그렇다면 필드의 값은 어떻게 정의 하느냐? 필드값은 declareOutputField라는 함수에 정의하는데, 데이타의 필드는 “say” 정의하였다.


다음으로 이 HelloSpout에서 생성할 데이타를 처리한 HelloBolt를 구현해보자


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

 

public class HelloBolt extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Tuple value is"+value);

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

              

        }

 

}

<그림. HelloBolt.java>


HelloSpout에서 생성된 데이타는 HelloBolt 들어오는데, 데이타가 들어오면 execute라는 메서드가 자동으로 수행된다. 이때, Tuple 통해서 데이타가 전달된다. 여기서는 tuple에서 필드이름이 “say” 값을 tuple.getStringByField(“say”) 이용해서 꺼내서 System.out으로 출력했다.

눈치가 빠른 사람이라면 벌써 알아차렸겠지만, 데이타를 다음 플로우로 보내고자 할때는 앞의 HelloSpout에서 한것처럼, execute 메서드내에서 데이타 처리가 끝난후에, collector.emit 이용해서 다음 플로우로 보내고, delcareOutputField에서 데이타에 대한 필드를 정의하면 된다.

데이타를 생성하는 Spout 데이타를 처리 하는 Bolt 구현했으면 둘을 연결 시켜줘야 한다. 이를 연결시켜주는 것이 Topology인데, HelloTopologyLocal 클래스를 구현해 보자


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

public class HelloTopologyLocal {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("HelloTopologyLocal", conf,builder.createTopology());

               Utils.sleep(10000);

               // kill the LearningStormTopology

               cluster.killTopology("HelloTopologyLocal");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

 

}

<그림. HelloTolologyLocal.java>


나중에 개념에서 자세하 설명하겠지만, Topology 데이타를 생성하는 Spout 처리하는 Bolt간에 토폴로지 데이타 흐름을 정의하는 부분이다. Spout Bolt들을 묶어 주는 부분이다.

먼저 TopologyBuilder 이용해서 Topology 생성하고, setSpout 이용해서 앞에서 구현한 HelloSpout 연결한다.

다음으로, setBolt 이용해서 Bolt Topology 연결한다. 후에, HelloSpout HelloBolt 연결해야 하는데, setBolt시에, SuffleGrouping 메서드를 이용하여, HelloBolt HelloSpout으로 부터 생성되는 데이타를 읽어들임을 명시한다.

builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

이렇게 Topology 구성되었으면이 Topology 실제로 실행해야 하는데, Topology 어떤 서버에서 어떤 포트등을 이용해서 실행될지는 Config 정의할 있지만, 여기서는 간단한 테스트이기  때문에 별도의 복잡한 Config 정보는 기술하지 않았다.

다음으로 이렇게 만들어진 Topology Storm 클러스터에 배포해야 하는데, Storm 개발의 편의를 위해서 두가지 형태의 클러스터를 제공한다. 개발용 클러스터와 실운영 환경용 클러스터를 제공하는데, 여기서는 LocalCluster cluster = new LocalCluster();

라는  것을 사용하였다.

LocalCluster 개발환경용 클러스터로, 개발자의 환경에서 최소한의 서버들만을 기동하여 개발한 토폴로지를 테스트할 있게 해준다. 이렇게 Cluster 생성했으면 cluster.submitTopology 이용하여 개발한 토폴로지를 배포한다. 토폴로지가 배포되면 자동으로 토폴로지가 실행이 된다. HelloSpout 계속해서 데이타를 생성하고, HelloBolt 생성된 데이타를 받아서 System.out.println으로 출력하게 되는데, 10초후에 멈추게 하기 위해서, Sleep 10초를 준다. 토폴로지 코드를 실행하는 쓰레드는 Sleep으로 빠질지 모르지만 토폴로지에서 생성된 HelloSpout HelloBolt 쓰레드는 백그라운드에서 작업을 계속 진행한다.

10초후에는 killTopology 이용해서 해당 토폴로지를 제거하고 shutdown 이용해서 Storm 클러스터를 종료시킨다.

실행하기

여기까지 구현했으면 첫번째 Storm 프로그램을 기동해보자. 다음과 같이 maven 명령어를 이용하면 실행이 가능하다.

C:\dev\ws\java_workspace\com.terry.storm>mvn exec:java -Dexec.mainClass=com.terry.storm.hellostorm.HelloTopologyLocal -Dexec.classpath.Scope=compile

실행을 해보면, HelloSpout 데이타를 생성하고, HelloBolt 이를 받아서 화면에 출력하는 것을 있다.


6292 [Thread-16-HelloSpout] INFO  backtype.storm.daemon.task - Emitting: HelloSpout default [hello world]

6292 [Thread-22-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello world]

Tuple value ishello world

6292 [Thread-10-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello world]

Tuple value ishello world

ZooKeeper 에러 대응하기

종종 환경에 따라서 실행이 안되면서 다음과 같은 에러가 출력되는 경우가 있는데


3629 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@7bfd25ce

3649 [main-SendThread(0:0:0:0:0:0:0:1:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (java.lang.SecurityException: 로그인 구성을 찾을 없습니다.)

3650 [main-SendThread(0:0:0:0:0:0:0:1:2000)] ERROR org.apache.storm.zookeeper.ClientCnxnSocketNIO - Unable to open socket to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000

3655 [main-SendThread(0:0:0:0:0:0:0:1:2000)] WARN  org.apache.storm.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.SocketException: Address family not supported by protocol family: connect

        at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_37]

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_37]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003) ~[storm-core-0.9.3.jar:0.9.3]


이 에러는 Storm Zookeeper와 연결을 할 수 없어서 나는 에러인데, LocalCluster 모드로 기동할 경우,Storm embedded Zookeeper를 기동해서 이 Zookeeper와 연결되어야 하나. IPV6로 연결을 시도하기 때문에, (ZK IPV4 Listen하는데) 발생하는 문제로

java로 실행할때 "-Djava.net.preferIPv4Stack=true" 옵션을 주면, JVM IPV6를 사용하지 않고, V4를 사용하기 때문에, ZooKeeper IPV4로 뜨고, Storm IPV4로 연결을 시도하기 때문에 문제가 없어진다.

지금까지 간단하게나마 첫번째 Storm 프로그램을 작성해서 실행해보았다.

다음에는 Storm 이루는 컴포넌트 구조와 아키텍쳐에 대해서 설명하도록 한다

 

Storm을 이용한 실시간 데이타 처리 #1-데이타 스트림 개념 이해하기

조대협(http://bcho.tistory.com)

 


Apache Storm 은 실시간 데이타 처리를 위한 시스템이다.. 나온지는 꽤 오래됬지만 요즘 빅데이타와 실시간 분석과 함께 다시 화두가 되고 있는데, Apache Storm에 대해서 알아보도록 하자.


 

스트리밍 처리에 대한 개념

Storm을 이해하기 위해서는 먼저 데이타 스트리밍9(Data Streaming) 에 대한 개념을 이해 해야 한다. 비유를 하자면 데이타가 흘러가는 강(river)와 같은 의미라고나 할까?

예를 들어보면, 트위터에서 생성되는 데이타 피드들은 일종의 데이타 스트림이다, 시간이 지나감에 따라서 끊임 없이 데이타 들이 생성된다.



<그림. 트위터의 트윗 데이타 스트림>

또는 공장의 팬베이어 벨트의 센서를 통해서 생성되는 각종 계측 값들도 데이타 스트림으로 볼 수 있다. 신용카드 트랜젝션, 전자 상거래 사이트의 구매 트렌젝션등. 이러한 데이타는 시간을 축으로 하여, 계속해서 생성되는 데이타 이다.

그렇다면 이런 스트림 데이타로 무엇을 할 수 있는가? 다시 트위터를 예를 들어보자


트위터 스트림을 이용한 소셜 마케팅 반응 분석

트위터 데이타 피드 스트림을 모니러링 하면서, 스마트폰과 관련된 키워드가 나오면 A社와 B社에 대한 스트림만을 수집하여, 각사별로 피드의 내용이 긍정적인지 부정적인지를 판단하는 시스템이 있다고 하자

이를 데이타 스트림 분석 개념으로 표현하면 다음과 같다.



<그림. 트위터 데이타 스트림 분석을 통한 제품 반응 분석>

먼저 스마트폰 관련 데이타 스트림을 모두 수집한다.

그 중에서, A사가 언급된 피드와, B사가 언급된 피드를 분리한다.

각 피드에 대해서, 문자의 구문이 긍정인지 부정인지를 판단한다. 예를 들어, Good,Awesome 등의 긍정적인 단어가 들어가 있으면 긍정적인 것으로 판단하고, Bad,fuck 등 부정적인 단어가 들어가 있으면 부정적인 반응으로 판단한다.

그후에, 각각의 반응에 대해서 카운트를 한다음에, 시계열(Time series) 그래프로 대쉬 보드에 표현한다.

정리해보면, 계속해서 들어오는 데이타들의 흐름을 여러 방향으로 분류하고 처리해서 계속해서 결과를 업데이트해주는 일종의 work flow와 같은 개념을 가지는 것이 데이타 스트림 처리이다.

이러한 데이타 스트림 처리는 일종의 중첩 함수의 개념으로도 볼수 있는데,

A사 제품에 대한 긍정적인 반응 = [“긍정적인 단어 필터 함수”(
[“A
사 제품에 대한 필터 함수”](“트위터의 스마트폰 관련 피드”)
);

와 같은 형태로 나타내어 줄 수 있다. (결국 실시간 데이타 스트림 처리는 실시간으로 들어오는 데이타 에 대해서 중첩으로 여러개의 함수 처리를 순차적으로 하는 것이다.)

물론 이러한 작업들은 데이타를 모두 모아서 수집해놓고 데이타 베이스에 저장해놓고 주기적으로 배치르 통해서 분석하는 것도 가능하다. 그러나 선거나 마케팅과 같이 실시간 대응이 중요한 경우에는 실시간 데이타 분석이 중요하기 때문에, 데이타 스트림을 실시간으로 분석하는 것이 필요하다.


이벤트 감지를 통한 신용 카드 이상 거래 감지

이번에는 데이타 스트림 처리에 이벤트 처리라는 개념을 추가해보자.

신용 카드 결재시, 결재 내용을 저장하는 것 뿐만 아니라, 사용자의 결재 내역이 문제가 없는지를 검출하는 시나리오이다.

이상거래 검출 요건은, 사용자가 평상시 결재 액보다 많은 금액이 결재되거나, 사용자가 결재할때, 물리적으로 이상한 곳에서 결재가 이루어 졌을때, 여를 들어 서울에서 카드 결재 후에, 10분 후에 부산에서 같은 카드로 결재가 이루어진 경우는 이상 거래로 검출하는 시스템이다.

다음 플로우를 보자



<그림. 실시간 신용 카드 이상 거래 검출>

신용 카드 결재 정보 데이타 스트림이 들어오면 결재 내용을 저장하는 스트림과 이상 거래를 검출하는 스트림 두가지로 분리되서 스트림이 처리된다.

이상 거래 검출 스트림에서는, 사용자의 구매 패턴을 분석하고 계속해서 학습한 후에, 이를 통해서 사용자 구매 패턴 검증단계에서 구매 금액이 일상적인 구매 패턴인지를 판단한다.

판단의 기준은 머신 러닝을 이용하여, 해당 사용자의 구매 패턴을 알고리즘화 해놓을 수 있다.

다음으로 결재 위치를 저장한 다음에 사용자 결재 위치 검증단계에서 지난 결재 가맹점 정보를 기반으로 이번 결재의 가맹점의 위치가 시간상으로 이동이 가능한 곳인지를 판단한다.

서울 강남에서 결재 한 후에, 30분후 서울 잠실에서 결재한 것은 정상 거래로 보지만 서울에서 결재했다가 10분후에 부산에서 결재 하는 것등은 이상거래로 취급한다.

이러한 시나리오는 결재가 이루지는 동시에 같이 실행되어야 하기 때문에 데이타 베이스 기반의 배치 처리로는 불가능하면 실시간 데이타 스트림을 분석해야 한다.

이외에도 시스템의 로그 스트림을 이용한 분석이라던지, 이벤트 처리, 데이타 가공, 머신러닝등 아주 다양한 부분의 실시간 처리에 데이타 스트리밍 처리는 사용될 수 있다.


데이타 스트림 처리를 이루는 기술들

앞의 신용카드 이상 거래 검출 시나리오를 보면 데이타 스트림을 분석하는데, 몇가지 추가적인 기술일 사용되었음을 볼 수 있다.


스트리밍 처리

먼저 데이타 스트림을 처리하기 위한 스트리밍 시스템이 필요하다, 데이타 스트림을 여러가지 경로로 분산하여, 각각의 단계별로 처리(사용자 구매 패턴 분석, 구매 패턴 검증,…)하는 워크 플로우 기능이 필요하다.

이러한 프레임웍은 앞으로 살펴보고자 하는 Apache Storm이 대표적이고 근래에는 Apache Spark도 많이 사용되고 있다.


대용량 분산 큐

다음으로 대용량으로 여러 경로를 통해서 들어오는 데이타를 수집하기 위한 터널(수집용 깔때기)이 필요한데, 비동기 처리를 위한 큐가 적절하다 그중에서도, 많은 데이타를 동시에 처리하기 위한 대용량 지원성이 필수적인데, 이를 위해서는 Kafka와 같은 대용량 분산 큐 솔루션이 적절하다.


머신러닝

사용자의 거래 패턴을 분석하여, 이상거래인지 검출을 하려면, 컴퓨터에서 사용자의 거래 패턴을 알려줄 필요가 있는데 이는 기존의 사용자 거래 내역을 학습하여 하여 패턴을 분석해내는 머신러닝 기술이 필요하며, 이는 Apache Mahout, Microsoft Azure ML (Machine Learning), Spark ML등이 있다.


이벤트 처리

마지막으로 이벤트 처리 부분이 있는데

카드 결재 장소가 지난 장소에 비해서 시간*20km 이상이면 이상거래로 판단해라”. 라는 이벤트를 정의할 수 있다. (※ 한시간에 20km를 이동할 수 있다고 가정)

특정 조건에 대해서 이벤트를 발생 시키는 것을 이벤트 처리라고 이런 처리를 CEP (Complex Event Processing) 라고 부르고, 이를 구현한 아키텍쳐를 EDA (Event Driven Architecture)라고 한다.

이러한 이벤트 처리 프레임웍으로는 JBoss Drool이나, Esper와 같은 프레임웍이 있다.


이외에도 데이타 스트림 분석을 위해서는 시나리오에 따라 더 많은 기술이 사용된다. 추가 사용되는 기술은 기회가 되면 향후에 추가로 소개하도록 한다. (아직도 공부 中)

지금까지 간략하게 나마 데이타 스트림의 개념과 이를 처리 하는 방법에 대해서 알아보았다.

그러면 다음에는 이런 데이타 스트림을 처리하기 위한 대표적인 프레임웍중의 하나인 apache storm에 대해서 소개해도록 한다.