블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

데이타 플로우 프로그래밍 모델의 이해


조대협 (http://bcho.tistory.com)


앞의 글에서 스트리밍 프로세스의 개념과, 데이타 플로우의 스트리밍 처리 개념에 대해서 알아보았다. 그렇다면 실제로 이를 데이타 플로우를 이용해서 구현을 하기 위해서는 어떤 컴포넌트와 프로그래밍 모델을 사용하는지에 대해서 알아보자.


구글 데이타 플로우 프로그래밍 모델은 앞에서 설명한 바와 같이, 전체 데이타 파이프라인을 정의하는 Pipeline, 데이타를 저장하는 PCollections, 데이타를 외부 저장소에서 부터 읽거나 쓰는 Pipeline I/O, 그리고, 입력 데이타를 가공해서 출력해주는 Transforms , 총 4가지 컴포넌트로 구성이 되어 있다.


이번 글에서는 그 중에서 데이타를 가공하는  Transforms 컴포넌트들에 대해서 알아본다.

Transforms

Transforms는 데이타를 어떻게 가공하느냐에 따라서 다음 그림과 같이 세가지 형태로 분류된다.

Element-Wise

개별 데이타 엘리먼트를 단위로 연산을 수행하는 것을 Element-Wise Transform이라고 한다.

ParDo 함수가 이에 해당하며, 하나의 데이타를 입력 받아서, 연산을 한 후, 하나의 출력을 내보내는 연산이다.

ParDo 함수는 DoFn으로 정의된 연산 로직을 병렬로 여러개의 프로세스와 쓰레드에서 나눠서 처리를 해준다.


DoFn 클래스 는 다음과 같은 포맷으로 정의한다.


static class MyFunction extends DoFn<{입력 데이타 타입},{출력 데이타 타입}>{

     @Override

     public void processElement(ProcessContext c) {

       // do Something

     }

}


DoFn 클래스를 상속 받아서 클래스를 정의하고, 클래스 정의시 제네릭 타입 (Generic type)으로, 첫번째는 입력 데이타 타입을 두번째는 출력 데이타 타입을 정의한다.

그리고 processElement 함수를 오버라이드(Override)해서, 이 함수안에 연산 로직을 구현한다.

processElement는 ProcessContext를 인자로 받는데, 이 Context에는 입력 데이타를 포함하고 있다.

입력 데이타를 꺼내려면 c.element()라는 메서드를 이용하면, 입력 데이타 타입으로 정의된 입력 데이타를 꺼낼 수 있다.

데이타를 처리한 다음에 파이프라인상의 다음 컴포넌트로 데이타를 보내려면 c.output ({출력 데이타} ) 형태로 정의를 해주면 된다.


이렇게 정의된 DoFn함수는  ParDo를 이용하여 파이프라인 상에서 병렬로 실행될 수 있다. ParDo는, 파이프라인상에서 .apply 메서드를 이용하여 적용한다.


그러면 실제로 어떻게 적용이 되는지 다음 예제를 보자.

   p.apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))


String 인자를 입력으로 받아서, String 인자로 출력을 하는 DoFn 함수를 정의하였다.

processElement 부분에서, c.element()로 String 입력 값을 읽은 후, toUpperCase() 함수를 적용하여 대문자로 변환을 한 후, c.output을 이용하여 다음 파이프라인으로 출력 데이타를 넘겼다.


조금 더 디테일한 예제를 보자

p.apply(Create.of("key1,Hello", "key2,World","key1,hello","key3,boy","key4,hello","key2,girl"))

.apply(ParDo.named("Parse").of(new DoFn<String, KV<String,String>>() {

@Override

public void processElement(ProcessContext c) {

StringTokenizer st = new StringTokenizer(c.element(),",");

String key = st.nextToken();

String value = st.nextToken();


KV<String,String> outputValue =  KV.of(key,value);

c.output(outputValue);

}

}))

Create.of를 이용하여 “Key.Value” 문자열 형태로 데이타를 생성한 후, 이 문자열을 읽어서, DoFn<String,KV<String,String>> 클래스에서 이를 파싱하여, 문자열을 키밸류 데이타형인 KV 데이타 형으로 변환하여 리턴하는 예제이다. 아래 개념도를 보자


입력 값은 String 데이타 타입으로 “Key.Value”라는 형태의 문자열을 받는다.

DoFn에서 처리한 출력 값은 KV 형으로 KV 데이타 타입은 키와 값을 가지고 있는데, 키와 값의 타입도 제네릭 타입으로 키는 String, 값은 String 타입으로 정의하였다. 입력된 “Key.Value” 문자열은 “.” 전후로 분리가 되서, “.” 좌측은 키로, 우측은 값으로 해서, KV에 각각 들어간다.

processElement를 보면, c.element를 이용하여 String 문자열을 꺼낸 후, StringTokenizer를 이용하여 “.”을 분류 문자로 정의해서 파싱한다. 첫번째를 키로 저장하고, 두번째를 값으로 저장한다.

KV<String,String> outputValue =  KV.of(key,value)

를 이용하여, KV 객체를 생성한 후, c.output(outputValue); 을 이용하여 다음 파이프라인 컴포넌트로 값을 전달한다.


시스템내에서 수행되는 방법은 다음과 같이 된다. ParDo에 의해서 DoFn 클래스가 여러개의 워커에 의해서 분산 처리가된다.

어그리게이션(Aggregation)

어그리게이션 값을 특정 키 값을 이용하여 데이타를 그룹핑을 하는 개념이다.

이러한 어그리게이션 작업은 내부적으로 셔플링(Shuffling)이라는 개념을 이용해서 이루어지는데, 키별로 데이타를 모으거나 키별로 합산등의 계산을 하기 위해서는, 키별로 데이타를 모아서 특정 워커 프로세스로 보내야 한다.

ParDo를 이용하여 병렬 처리를 할 경우, 데이타가 키값에 상관 없이 여러 워커에 걸쳐서 분산되서 처리되기 때문에, 이를 재 정렬해야 하는데, 이 재 정렬 작업을 셔플링이라고 한다.


아래 그림을 보자, 파이프라인 상에서 첫번째 프로세스를 Worker 1과 Worker 2가 처리를 하였고, 결과는 Key1과  Key2를 키로 가지는 데이타라고 하자, 이를 어그리게이션 하면 아래 그림과 같이 Key1 데이타는 Worker 3로 모이고, Key 2 데이타는 Worker 4로 모인다. 이런 방식으로 셔플링이 이루어진다.


데이타 플로우의 어그리게이션 에는 특정 키별로 데이타를 묶어주는 Grouping과, 특정 키별로 데이타를 연산(합이나 평균을 내는)하는  Combine 두 가지가 있다.

Grouping

PCollection<KV<String, Integer>> wordsAndLines = ...;

에서 다음과 같이 String,Integer 페어로 KV 타입을 리턴한다고 하자.


 cat, 1
 dog, 5
 and, 1
 jump, 3
 tree, 2
 cat, 5
 dog, 2
 and, 2
 cat, 9
 and, 6
 ...


이를 다음과 같이 키에 따라서 밸류들을 그룹핑 하려면 GroupByKey를 사용해야 한다.


 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(

   GroupByKey.<String, Integer>create());


코드는 앞 단계에서 KV<String,Integer>로 들어온 wordLines 데이타를 GroupByKey를 이용하여 Key 단위로 그룹핑을 한후 이를 <String, Iterable<Integer>> 타입으로 리턴하는 Transformation이다.

<String, Iterable<Integer>>에서 앞의 String은 키가 되며, Iterable 다수의 값을 가지고 있는 밸류가 된다. 이 밸류는 Integer 타입으로 정의된다.

Combine

Grouping이 키 단위로 데이타를 묶어서 분류해주는 기능이라면, Combine은 키단위로 데이타를 묶어서 연산을 해주는 기능이다.

예를 들어 앞의 예제처럼, “cat”이라는 문자열 키로 된 데이타들이 [1,5,9] 가 있을때, 이에 대한 총합이나 평균등을 내는 것이 Combine 이다.


PCollection<Integer> pc = ...;

 PCollection<Integer> sum = pc.apply(

   Combine.globally(new Sum.SumIntegerFn()));


코드는 Integer로 들어오는 모든 값을 Combine에서 Sum 기능을 이용하여 모든 값을 더하는 코드이다.

전체 데이타에 대해서 적용하기 때문에, Combine.globally로 적용하였다.


아래와 같은 형태의 데이타가 있다고 가정하자. 키에 따라서 값이 그룹핑이 된 형태이다.

 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Integer>> occurrences = ...;

 PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());

 PCollection<KV<String, Integer>> firstOccurrences = pc.apply(

   Combine.groupedValues(new Min.MinIntegerFn()));


위의 데이타들이 PCollection<KV<String, Iterable<Integer>>> grouped

에 저장되었다고 할때, 각 키별로 최소값을 구하는 것을 Combine.groupedValue에서 Min을 호출하여 최소값을 구했다.


Transforms 컴포넌트의 기본적인 종류들을 알아보았다. 이외에도, 하나의 Transform 안에 여러개의 Transform을 집어 넣는 Composite Transform이나, 두개 이상의 데이타 스트림에서 데이타를 키에 따라 JOIN하는 기능들이 있는데, 이러한 고급 기능들은 뒤에 고급 프로그래밍 모델에서 설명하기로 한다.

PCollection

PCollection은 데이타 플로우 파이프라인 내에서 데이타를 저장하는 개념이다.

데이타는 이 PCollection 객체에 저장이되서, 파이프라인을 통해서 Transform으로 넘겨진다.

PCollection은 한번 생성이 되면, 그 데이타를 수정이 불가능하다. (데이타를 변경하거나 수정하기 위해서는 PCollection을 새로 생성해야 한다.)

Bounded & Unbounded PCollection

PCollection은 데이타의 종류에 따라 Bounded PCollection과 Unbounded PCollection 두가지로 나뉘어 진다.

Bounded PCollection

Bounded PCollection은 배치 처리 처럼, 데이타가 변화하지 않는 데이타로 파일이나, 업데이트가 더 이상 발생하지 않는 테이블등을 생각하면 된다.

TextIO,BigQueryIO,DataStoreIO등을 이용해서 데이타를 읽은 경우에는 Bounded PCollection으로 처리가 된다.

Bounded PCollection 데이타들은 배치 처리의 특성상 데이타를 한꺼번에 읽어서 처리한다.  

Unbounded PCollection

Unbounded PCollection은, 데이타가 계속 증가 하는 즉 흐르는 데이타를 표현한다. 트위터 타임 라인이나, 스마트 폰에서 서버로 올라오는 이벤트 로그등을 예로 들 수 있다.

이러한 Unbounded PCollection은 시간의 개념을 가지고 윈도우 처리등을 해야하기 때문에, PCollection 객체내에 타임 스탬프가 자동으로 붙는다.

UnBounded PCollection은 데이타를 BigQueryIO또는 Pub/Sub에서 읽을 때 생성된다.

특히 Unbounded PCollection에 Grouping이나, Combine등을 사용할 경우, 데이타가 파이프라인 상에서 언제 그룹핑된 데이타를 다음 Transform 컴포넌트로 넘겨야할지를 정의해야 하기 때문에, Window를 반드시 사용해야 한다.

데이타 타입

PCollection을 이용해서 정의할 수 있는 주요 데이타 타입은 다음과 같다.

일반 데이타 타입

PCollection<Integer> data

가장 기초적인 데이타 형으로, Integer,Float,String 등 자바의 일반 데이타 타입으로 정의되고 하나의 데이타 만을 저장한다.

KV 데이타 타입

PCollection< KV<String,Integer> key_value_data

키 밸류 데이타 타입으로, 키와 값으로 구성된다. 키와 값은 일반 데이타 타입과 같게, 자바의 일반 데이타 타입 사용이 가능하다.


PCollection<KV<String, Iterable<Integer>>>

키 밸류 데이타 타입중에 값에 여러개의 값을 넣을 수 있는 Iterable 타입이 있다.

앞의 Transform 예제에서 언급된것과 같이 키가 cat이고, 그 값이 2,6,7,9 와 같이 여러 값을 가지는 형이 이러한 타입에 해당한다.

커스텀 데이타 타입

단순한 데이타 타입 말고도, 복잡한 데이타 형을 처리하기 위해서  커스텀 데이타 타입을 지원한다.

커스텀 데이타 타입은 오픈 소스 Avro의 데이타 모델을 사용한다. http://avro.apache.org/


예를 들어 어떤 키에 대해서 카운트값,평균,총합 그리고 윈도우의 시작과 끝 시간을 저장하는 데이타 타입 Stat가 있다고 가정하자. 이 데이타 타입은 다음과 같이 정의된다.

자바에서 일반적인 Value Object 형태로 정의를 하면되고, 단 앞에 어노테이션으로 @DefaultCoder(AvroCoder.class) 와 같이 Avro 데이타 클래스임을 정의하면 된다.


@DefaultCoder(AvroCoder.class)

static class Stat{

Float sum;

Float avg;

Integer count;

Integer key;

Instant wStart; // windowStartTime

Instant wEnd; // windowEndTime

public Instant getwStart() {

return wStart;

}

public Instant getwEnd() {

return wEnd;

}


public Float getSum() {

return sum;

}

public Float getAvg() {

return avg;

}

public Integer getCount() {

return count;

}


public Integer getKey(){

return key;

}


public Stat(){}

public Stat(Integer k,Instant start,Instant end,Integer c,Float s,Float a){

this.key = k;

this.count = c;

this.sum = s;

this.avg = a;

this.wStart = start;

this.wEnd = end;

}


}



윈도우

스트리밍 데이타 처리를 할때 가장 중요한 개념이 윈도우이다.

특히나  Unbounded 데이타를 이용한 스트리밍 처리에서 Grouping이나 Combine 시에는 반드시 윈도우를 사용해야 한다.Grouping이나 Combine과 같은 aggregation을 하지 않으면, Unbounded 데이타라도 윈도우 처리가 필요없다.

또한 Bounded 데이타도, 데이타에 타임 스탬프를 추가하면 윈도우를 사용하여, 시간대별 데이타를 처리할 수 있다.

예를 들어 일일 배치를 돌리는 구매 로그가 있을때, 각 데이타의 구매 시간이 있으면, 이 구매시간을 타임 스탬프로 지정하여 배치라도 윈도우 단위로 연산을 할 수 있다.

고정 윈도우 적용

윈도우를 적용하는 방법은 정말 간단하다.

PCollection 객체에 apply 메소드를 이용하여 Window.into 메서드를 이용하여 적용하면된다.

예를 들어서 아래와 같이 PCollection<String> 형 데이타인 items 객체가 있을때, 여기에 윈도우를 적용하려면 다음과 같이 하면 된다.

 PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)));

items.apply를 이용하여 윈도우를 적용하는데, 데이타 타입이 String이기 때문에, Window.<String>into 로 윈도우를 적용하였다.

윈도우 타입은 Fixed 윈도우이기 때문에, FixedWindows.of를 사용하였고, 윈도우 주기는 1분주기라서 1,TimeUnit.MINUTES를 적용하였다.

슬라이딩  윈도우 적용

슬라이딩 윈도우는 윈도우의 크기 (Duration)과 주기를 인자로 넘겨야 한다.

아래 코드는 5초 주기로 30분 크기의 윈도우를 생성하는 예제이다.

 PCollection<String> items = ...;

 PCollection<String> sliding_windowed_items = items.apply(    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

윈도우가 5초마다 생성이되고, 각 윈도우는 30분 단위의 크기를 가지고 있게 된다.

세션 윈도우 적용

세션 윈도우는 HTTP 세션 처럼 특정 사용자가 일정 시간동안 데이타가 올라오지 않으면, 처음 데이타가 올라온 시간 부터 데이타가 올라오지 않은 시간 까지를 윈도우르 묶어주는 개념이다.

앞의 고정 윈도우나, 세션 윈도우와는 다르게 반드시 키별로 세션을 묶기 때문에 키가 필요하다.


아래 예제는 각 사용자 별로 세션당 점수를 계산해주는 예제이다.

userEvents

 .apply(Window.named("WindowIntoSessions")

       .<KV<String, Integer>>into(

             Sessions.withGapDuration(Duration.standardMinutes(Duration.standardMinutes(10))))

   .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))

 // For this use, we care only about the existence of the session, not any particular

 // information aggregated over it, so the following is an efficient way to do that.

 .apply(Combine.perKey(x -> 0))

 // Get the duration per session.

 .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))



다른 윈도우들과 마찬가지로 Window.into를 이용하여 윈도우를 적용하는데, 데이타 형을 잘 보면 <KV<String, Integer>> 형으로 정의된것을 확인할 수 있다.

Sessions.withGapDuration으로 세션 윈도우를 정의한다. 이때 얼마간의 시간 동안 데이타가 없으면 세션으로 짜를지를 지정해줘야 하는데, Duration.standardMinutes(10) 를 이용하여 10분간 해당 사용자에 대해서 데이타가 없으면 해당 사용자(키)에 대해서 세션 윈도우를 자르도록 하였다.

윈도우 시간 조회하기

윈도우를 사용하다보면, 이 윈도우의 시작과 종료 시간이 필요할때가 있다. 예를 들어 고정 크기 윈도우를 적용한다음에, 이를 데이타 베이스등에 저장할때, 이 윈도우가 언제시작해서 언제끝난 윈도우인지를 조회하여 윈도우 시작 시간과 함께 값을 저장하고자 하는 케이스이다. “1시에 몇건, 1시 10분에 몇건의 데이타가 저장되었다”와 같은 시나리오이다.


현재 윈도우에 대한 정보를 얻으려면 DoFn 클래스를 구현할때, com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implement 해야, 윈도우에 대한 정보를 억세스 할 수 있다.


static class StaticsDoFn extends DoFn<KV<Integer,Iterable<Twit>>, KV<Integer,Stat>>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

:


IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

Instant e = w.end();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTime eTime = e.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss");

String str_stime = sTime.toString(dtf);

String str_etime = eTime.toString(dtf);

                                      :


윈도우에 대한 정보는 ProcessContext c에서, c.window()를 호출하면, IntervalWindow라는 클래스로 윈도우에 대한 정보를 보내주고, 윈도우의 시작 시간과 종료 시간은 IntervalWindow 클래스의 start()와 end() 메서드를 이용해서 조회할 수 있다. 이 조회된 윈도우의 시작과 종료 시간은 org.joda.time.Instant 타입으로 리턴되는데, 조금 더 친숙한 DateTime 포맷으로 변환을 하려면, Instant.toDate() 메서드를 사용하면 되고, 이때, TimeZone 을 지정해주면 로컬 타임으로 변환을 하여, 윈도우의 시작과 종료시간을 조회할 수 있다.

타임 스탬프

윈도우는 시간을 기준으로 처리를 하기 때문에, 이 시간을 정의하는 타임스탬프를 어떻게 다루는지를 이해할 필요가 있다.

타임 스탬프 생성

Pub/Sub을 이용하여 unbounded 데이타를 읽을 경우 타임스탬프가 Pub/Sub에 데이타가 들어간 시간을 Event time으로 하여, PCollection에 추가된다.

타임 스탬프 지정하기

Pub/Sub에서와 같이 자동으로 타임 스템프를 부여하는게 아니라, 모바일 디바이스에서 발생한 이벤트 타임이나, 애플리케이션 적으로 PCollection에 직접 타임스탬프를 부여할 수 있다.

PCollection에 타임 스탬프를 부여 하는 방법은 간단하다.

DoFn내에서,  ProcessContext를 다음 파이프라인 컴포넌트로 보낼때,  c.output(데이타) 대신, c.output(데이타, 타임 스탬프)를 사용하면 된다. 타임 스탬프의 데이타 타입은  org.joda.time.Instant 를 사용한다.

예를 들어서

c.outputWithTimestamp(c.element(), logTimeStamp);

와 같은 방법으로 사용을 한다.


모바일 서비스 분석등, 실제 시간에 근접한 분석을 하려면, 로그를 수집할때, 이벤트 발생 시간을 같이 REST API등을 통해서 수집하고, outputWithTimestamp를 이용하여, 수집된 이벤트 발생시간을  PCollection에 추가하는 방식으로 타임 스탬프를 반영 하는 것이 좋다.







데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다.

데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #1/2


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다. 스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

개념 소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


먼저 dataflow의 개념을 이해해보도록 하자. 아래 그림은 dataflow에 대한 컨셉이다.


데이타가 들어오면, Pipeline IO에서 데이타를 읽어드린다. 읽어드린 데이타는 PCollection이라는 데이타 형으로 생성이 되고, 이 PCollection 데이타는 여러개의 중첩된 PTransform을 통해서 변환 및 가공이 된다. 가공이 끝난 결과는 마지막으로 Pipeline IO의 Output을 통해서 데이타 저장소 (빅쿼리나 파일등)에 저장이 된다.  이 Pipeline IO에서 부터 PTransform을 걸친 일련의 프로세싱 과정을 Pipeline이라고 한다.


예를 들어 설명해보자, 문자열을 입력 받은 후에, 문자열에서 단어를 추출하여, 각 단어의 개수를 세어 주는 파이프라인이 있다고 하자.


첫번째 실행에서 “Hello my daddy”라는 문자열이 입력되었다. 첫번째 Transform인 Extract words Transform을 거치면서, “Hello my daddy” 라는 문자열은 “Hello”, “my”, “daddy” 라는 각각의 단어로 쪼게진다. 다음으로 Count Element 라는 Transform에 의해서, 각 단어의 수를 세어서 저장한다. “Hello”는 1번, “my”는 1번, “daddy”는 1번 의 값이 저장된다.


두번째 실행에서 “Hello my bro” 라는 문자열이 들어오면, Extract words 에 의해서 “Hello”, “my”, “bro”라는 각각의 단어로 쪼게지고, Count Element Transform에서 이전에 세어놓은 단어의 수와 합산하여 계산이 된 결과가 저장이 된다. “Hello”는 이전에 한번 카운트가 되었고 이번에도 들어왔기 때문에, 2가 되고, 같은 원리로 “my”라는 단어의 카운트도 2가된다. “bro” 라는 단어는 이번에 처음 들어왔기 때문에 새 값으로 1로 저장된다.




세번째 “Hello my mom” 이라는 문자열이 들어오면 앞의 두개의 문자열과 마찬가지로 간 단어로 쪼게진 다음 Count Element에 의해서 각 단어의 수가 카운트되어 기존의 값과 누적 합산된다. 모든 데이타를 다 읽어서 처리가 끝나면, 저장된 결과를 Pipeline IO를 통해서 파일에 그 결과를 쓰게 된다.

배치와 스트리밍 처리

dataflow는 위에서 설명한 파이프라인의 개념을 배치와 스트리밍 처리 두가지 개념 모두로 지원해서 처리가 가능하다. 데이타가 파일과 같이 이미 쓰여지고 더 이상 증가나 수정이 되지 않은 데이타에 대해서는 일괄로 데이타를 읽어서 결과를 내는 배치 처리가 가능하고, 계속해서 들어오고 있는 데이타 (트위터 피드, 로그 데이타)는 스트리밍으로 처리가 가능하다.

윈도우의 개념

배치 처리야, 데이타 처리가 모두 끝난 후에 결과를 내보낸다고 하지만, 그렇다면 스트리밍 데이타는 계속해서 데이타가 들어오고 있는데, 언제 결과를 내보내야 할까?

개별 데이타를 변환해서 저장하는 경우에야, 개별 데이타 처리가 끝난후에 각각 하나씩 저장한다고 하지만, 위와 같이 들어오는 데이타에서 특정데이타 들에 대한 합이나 평균과 같은 처리를 하는 경우 어느 기간 단위로 해야 할까? 스트리밍 처리에서는 이러한 개념을 다루기 위해서 윈도우라는 개념을 사용한다.


예를 들어, “1시~1시10분까지 들어온 문자열에 대해서 문자열에 들어 있는 각 단어의 수를 카운트해서 출력해주는 기능" 이나, 또는 “매 5분 단위로 현재 시간에서 10분전까지 들어온 문자열에 대해서 각 단어의 수를 카운트 해서 출력 해주는 기능" 과 같이 작은 시간 기간의 단위를 가지고 그 기간 단위로 계산 하는 방법이며, 이 시간 단위를 윈도우(Window)라고 한다.


Fixed Window (고정 크기 윈도우)

앞의 예에서 1시~1시10분, 1시10분~1시20분 과 같이 고정된 크기를 가지는 윈도우의 개념을 Fixed Window라고 한다.


Sliding Window (슬라이딩 윈도우)

앞의 예에서와 같이 윈도우가 상대적인 시간 (이전 10분까지)의 개념을 가지면서, 다른 윈도우와 중첩되는 윈도우를 슬라이딩 윈도우라고 한다.


그림과 같이 1시10분의 윈도우는 1시 10분의 10분전인 1시에서 부터, 현재 시간 까지인 1시10분까지 값을 읽어서 처리하고 윈도우가 끝나는 시점인 1:10분에 그 값을 저장한다. 윈도우의 간격은 5분 단위로, 1시 15분에는 1시 15분의 10분전인 1시05분 부터 현재 시간인 1시15분까지 들어온 데이타에 대해서 처리를 하고 그 결과 값을 1시15분에 저장한다.

Session window (세션 윈도우)

다음은 세션 윈도우라는 개념을 가지고 있는데, 이를 이해하기 위해서는 먼저 세션의 개념을 먼저 이해해야 한다.

세션이랑 사용자가 한번 시스템을 사용한 후, 사용이 끝날때 까지의 기간을 정의한다. 스트리밍 시스템에서는 사용자 로그인이나 로그 아웃을 별도의 이벤트로 잡는 것이 아니기 때문에, 데이타가 들어온 후에, 일정 시간 이후에 그 사용자에 대한 데이타가 들어오지 않으면, 세션이 종료 된것으로 판단한다.

일반 적인 웹 프로그램에서 HttpSession과 같은 원리인데, 웹 사이트에 접속한 후, Session time out 시간이 지날때 까지 사용자가 별도의 request를 보내지 않으면 세션을 끊는 것과 같은 원리이다.

아래 그림은 세션 윈도우의 개념을 설명하기 위한 윈도우인데, User A와 User B의 데이타가 들어오고 있다고 하자.


그리고 세션 타임 아웃이 10분으로 정의했다. 즉 같은 사용자에 대해서 데이타가 들어온 후, 10분 내에 추가 데이타가 들어오지 않으면 세션이 종료 된것으로 판단한다.


User A는 1:00 에 첫 데이타가 들어와서1:00~1:10 사이에 두번째 데이타가 들어왔고, 1:10~1:20 사이에 세번째 데이타가 들어온 후, 네번째 데이타는 10분이 지난 후에 들어왔다. 그래서 1:00~1:20 까지가 하나의 세션이 되고, 이것이 User A에 대한 1:00~1:20의 세션 윈도우가 된다. 네번째 데이타 부터는 새로운 윈도우로 처리가 되는데, 1:40~1:50 사이에 다섯번째 데이타가 도착한후, 그 이후로 도착하지 않았기 때문에 이게 두번째 윈도우가 되고, 1:30~1:50의 시간 간격을 가지는 User A의 두번째 윈도우가 된다.

각 윈도우의 값은 User A의 1:00~1:20 윈도우의 값은 (1+1+1)로 3이 되고, 두번째 윈도우인 1:30~1:50 윈도우는 (2.5+1)로 3.5가 된다.


User B는 1:10에 데이타가 들어오고, 10분 후인 1:20까지 데이타가 들어오지 않고 그 이후 1:30 분에 두번째 데이타가 들어왔기 때문에, 1:10~1:10 길이의 첫번째 세션 윈도우가 생성된다. 다음 으로 1:30분에 데이타가 들어왔기 때문에 두번째 세션 윈도우를 생성하고, 2:00까지 계속 데이타가 들어오다가 멈추고 2:10까지 새로운 데이타가 들어오지 않았기 때문에 1:30~2:00 까지 두번째 윈도우로 취급한다.


이 Session Window는 앞서 언급한 Fixed Window나, Sliding Window와는 다르게, User A, User B와 사용자 단위와 같이 어떤 키에 따라서 개별적으로 윈도우를 처리 한다.  즉 Session Window는 User A나 USer B처럼 특정 키에 종속된 윈도우만을 갖는다.


반대로 Fixed Window나 Sliding Window는 키단위의 윈도우가 아니라 그 시간 범위내에 들어 있는 모든 키에 대한 값을 처리한다..

Fixed Window의 경우에는 30분 사이즈를 갖는 윈도우라고 하면 아래 그림과 같이


1:00~1:30 윈도우는 User A의 값 = (1+1+1) 과 User B의 값 1을 합쳐서 총 4가 되고

1:30~2:00 윈도우는 User A값 = (2.5+1)과 User B의 값 = (2+2+2) 를 합쳐서 9.5가 된다.


Sliding Window의 경우에는 길이가 30분이고, 주기가 20분인 Sliding 윈도우라고 할때,


1:00~1:30, 1:20~1:50, 1:40~2:00 3개의 Sliding 윈도우가 생성된다.

1:00~1:30 윈도우는 User A의 값=(1+1+1)과 User B의 값 1을 합산하여 4가 되고

1:20~1:50 윈도우는 User A의 값 = (2.5+1)과 User B의 값 =(2+2)를 합산하여 7.5가 된다.

1:40~2:00 윈도우는 User A의 값 = (2.5+1)과 User B의 값 (2+2)를 합산하여 7.5가 된다.




데이타 스트리밍 처리에 대한 이해


조대협 (http://bcho.tistory.com)


근래에 Apache Beam 프로젝트를 공부하게 되서, 그간 묵혀놨던 데이타 스트리밍 처리에 대해서 다시 정리중인데, 예전에 Apache Storm을 봤을때 보다 트리거나, 윈도우등 많은 개념들이 들어가 있어서 데이타 스트리밍에 대한 개념 부터 다시 정리를 시작을 하고자한다.


Apache Storm에서 부터, Apache Spark 기반의 데이타 스트림 처리뿐 아니라 근래에는 Apache Flink와 같은 새로운 스트리밍 프레임웍크과 구글이 이미 클라우드를 통해서 서비스 하고 있는  google cloud dataflow (Apache Beam이라는 프로젝트로 오픈소스화 되었고, 현재 인큐베이션 단계에 있다.) 까지 빅데이타에 대한 실시간 처리성이 강조되면서 근래에 데이타 스트리밍 처리가 다시 주목 받는 것 같다. 이 문서는 구글이 개발한 dataflow에 대한 개념을 이해하기 위함이다.


본 문서의 내용과 그림은 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 를 참고하였다.


사전 개념 이해

스트리밍 데이타 처리를 이해하기 위해서는 몇몇 용어와 개념을 사전에 이해해야 하는 부분이 있다.

Bounded data 와 Unbounded data

먼저 스트리밍 데이타 처리를 이해하려면 데이타의 종류에 대해서 먼저 이해해야 한다.

  • Unbounded data 는 데이타의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이타라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이타를 Unbounded data 라고 한다.

  • Bounded data는 데이타가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이타를 뜻한다. 1월의 정산 데이타.

Event time과 Processing time

데이타의 발생 시간과 시스템에서 처리되는 시간이 차이가 있는데, 이를 각각 Event time과 Processing time 이라고 정의한다.

예를 들어, 게임에서 사용자가 공격을 한 이벤트를 서버에 전달해서 처리하여 저장하는 시나리오가 있다고 가정할때, 공격 이벤트가 1:00:00에 발생했으면, 이 데이타가 네트워크를 타고 서버로 도달하여 프로그램 로직을 수행하고 저장하는데 소요된 시간을 2초라고 가정하면, Event time 은 1:00가 되고, Processing time은 1:00:02가 된다.

이상적으로는 Event time과 Processing time이 동일하면 좋겠지만, 네트워크 시간이나 처리 시간에 따라 Processing time이 Event time 보다 늦고, 또한 Processing time에서 소요되는 실제 처리 시간은 일정하지 않고 아래 그림의 파란색 그래프(실제 처리 그래프) 처럼 들쭉 날쭉하다. 네트워크 상황이나, 서버의 CPU, IO 상황이 그때마다 차이가 나기 때문이다.


아래 그림을 통해서 개념을 다시 정리해보면,

X축은 Event time, Y축은 Processing Time이다. 0초에 발생한 데이타가 서버에 도착해서 처리하는 시간이 소요 되기 때문에, 아래 그림과 같이 Processing Time은 2초 부터 시작한다. Skew는 Event time과 Processing time간의 간격이다. 아래 그림에서 보면, Processing time에서 3초때에는 Event time 1초에서 발생한 데이타를 처리하고 있는데, 실제 Event time에서는 3초 시간의 데이타가 발생하고 있기 때문에, Processing time과 Event time은 약 2초의 지연이 발생하고 있고, 이를 Skew 라고 한다.



Bounded data의 처리

Bounded data는 이미 저장되어 있는 데이타를 처리하는 것이기 때문에 별다른 처리 패턴이 필요없다



데이타를 읽어서 한번에 처리해서 저장 하면 된다.

UnBounded data 처리

복잡한 것은 스트리밍 데이타 즉, Unbounded data 를 처리하는 방법인데, Unbounded data 는 크게 Batch와 Streaming 두 가지 방식으로 처리할 수 있다.

Batch로 처리

배치로 Unbounded data를 처리 하는 방식은 아래와 같이 두 가지 방식이 있다.

Fixed Windows

Fixed Windows 방식은 스트리밍으로 들어오는 데이타를 일정 시간 단위로 모은 후, 배치로 처리 하는 방식이다. 예를 들어서 아래 그림과 같이 10~11시 까지 데이타를 수집한후, 11시 이후에, 10~11시까지 들어온 데이타를 처리해서 정리 하는 방식이다.



이 방식은 구현이 간단하다는 장점이 있지만, 데이타가 수집 된 후 처리를 시작하기 때문에, 실시간성이 떨어진다. (근 실시간)

Streaming 처리

Unbounded 데이타를 제대로 처리하려면 스트리밍 처리를 하는 것이 좋은데, 스트리밍 처리 방법에는 아래와 같이 크게 Time agnostic, Filtering, Inner Join, Windowing 방식등이 있다.


스트리밍 처리는 배치 처리에 비해서 복잡한 것이, Unbounded 데이타는 기본적으로 특성이 Skew가 환경에 따라 변화가 심하고 그래서 데이타가 시스템에 도착하는 순서 역시 순차적으로 도착하지 않고 들쭉 날쭉 하다.

Time agnostic

Time agnostic 이란, 데이타가 시간 속성을 가지고 있지 않는 데이타 이다. 들어오는 데로 처리를 하면 되기 때문에, 별다른 노하우가 필요 없지만, 하나의 데이타 형이기 때문에 간단하게 언급만 한다.

Filtering

다음으로 많이 사용 되는 것이 필터링인데, 들어오는 데이타 중 특정 데이타만 필터링 해서 저장 하는 구조이다.


예를 들면, 웹 로깅 데이타를 수집해서, 특정 IP나 국가 대역에서 들어오는 데이타만 필터링해서 저장하는 시나리오등이 될 수 있다.

Inner joins (교집합)

Inner join은 두개의 Unbounded 데이타에서 들어오는 값을 서로 비교하여 매칭 시켜서 값을 구하는 방식이다.



모바일 뉴스 앱이 있다고 가정할때, 뉴스 앱에서는 사용자가 어떤 컨텐츠를 보는지에 대한 데이타를 수집 전송하고, 지도 앱에서는 현재 사용자의 위치를 수집해서 전송한다고 하자.

이 경우 사용자별 뉴스 뷰에 대한 Unbounded data 와, 사용자별 위치에 대한 Unbounded data 가 있게 되는데, 이 두개의 데이타 스트림을 사용자로 Inner Join을 하면 사용자가 어떤 위치에서 어떤 뉴스를 보는지에 대해서 분석을 할 수 있다.

Inner join을 구현하기 위해서는 양쪽 스트림에서 데이타가 항상 같은 시간에 도착하는 것이 아니기 때문에, 반대쪽 데이타가 도착할때 까지 먼저 도착한 데이타를 임시로 저장할 버퍼 영역이 필요하고, 이 영역에 임시로 일정 기간 데이타를 저장하고 있다가 반대쪽 스트림에서 데이타가 도착 하면 이를 조인해서 결과를 저장하고, 버퍼 영역에서 두개의 데이타를 삭제한다.

만약에 반대쪽의 데이타가 도착하지 않으면, 이 버퍼 영역에 데이타가 계속 쌓이기 때문에, 일정 기간이 지나면 반대쪽 스트림에서 데이타가 도착하지 않은 데이타를 주기적으로 삭제 해주는 (garbage collection) 정책이 필요하다.


cf. Inner join (교집합), Outer join (합집합)

Approximation algorithms (근사치 추정)

근사치 추정 방식은 실시간 데이타 분석에서 많이 사용되는데, 실시간 분석에서는 전체 데이타를 모두 분석할 수 있는 시간이 없는 경우가 많고, 시급한 분석이 필요한 경우가 있기 때문에, 전체 데이타를 분석하지 않고 일부만 분석하거나 또는 대략적인 데이타의 근사값만을 구하는 방법으로 해서, 빠르게 데이타를 분석하는 경우가 있다. 이를 근사치 추정 방식이라고 하는데, 예를 들어 VOD 서비스에서 지금 10분간 인기있는 비디오 목록, 12시간 동안 가장 인기 있는 판매 제품등 과 같은 시나리오인데, 이런 시나리오에서 데이타는 아주 정확하지 않아도 근사 값만 있으면 되고, 데이타를 그 시간에 보는 시급성이 중요하다.  이러한 시나리오에서는 전체 데이타를 다 보고 분석이 어렵기 때문에, 샘플링을 하거나 대략적인 근사 값만을 구해서 결과를 낸다.


이런 근사치를 추정하는 알고르즘은 K-means나 Approximate Top-N등이 이미 정의되어 있는 알고리즘이 많다.


참고 자료 :

Storm을 이용한 근사치 구하기 : https://pkghosh.wordpress.com/2014/09/10/realtime-trending-analysis-with-approximate-algorithms/

Apache Spark에서 K means로 근사치 구하기 :

https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html


Windowing

실시간 스트리밍 데이타 처리에서 중요한 개념중의 하나는 Windowing 인데, Windowing 이란 스트리밍 데이타를 처리할때 일정 시간 간격으로 처리하는 것을 정의한다.

예를 들어, 10분 단위의 Windowing의 경우 1시~2시까지 들어온 데이타를 1:10, 1:20,1:30, …  단위로 모아서 처리한다.

윈도우에는 자르는 방법에 따라서 다음과 같이 몇가지 방법이 있다.

Fixed Windows

정확하게 일정 시간 단위로 시간 윈도우를 쪼게는 개념이다. 앞에서 언급한 예와 같이 윈도우 사이즈가 10분 일때, 1시 10분은 1시00분~1시10분까지의 데이타를, 1시 20분은 1시10분~1시20분까지의 데이타를 처리한다.

Sliding Windows

Sliding Window 방식은 윈도우가 움직이는 개념이다.

슬라이딩 윈도우의 개념은 현재 시간으로 부터 +-N 시간 전후의 데이타를 매 M 시간 마다 추출 하는 것을 슬라이딩 윈도우라고 하고, 이 윈도우들은 서로 겹치게 된다.

예를 들면 현재시간으로부터 10분 전에서 부터  측정시간까지의 접속자를 1분 단위로 측정하는 시나리오가 될 수 있다. 매 1분 간격으로, 데이타를 추출하고, 매번 그 시간으로부터 10분전의 데이타를 추출하기 때문에 데이타가 중첩이 된다.  

이렇게 추출하는 간격을 Period (앞에서 1분), 그리고 추출하는 기간을 Length 또는 Size (앞에서 10분)라고 한다.



출처 : https://cloud.google.com/dataflow/model/windowing#sliding-time-windows

Session

다음으로는 Session Window의 개념이다.

Session Window에는 사용자가 일정 기간동안 반응이 없는 경우(데이타가 올라오지 않는 경우)에 세션 시작에서 부터, 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리한다

예를 들어서 세션 타임 아웃이 20분이라고 하고 데이타가 1:00 부터 올라오고 있는데,  1:01, 1:15에 데이타가 올라오고, 1:40분에 데이타가 올라오면 1:15 이후에 20분동안 (1:35까지) 데이타가 올라오지 않았기 때문에, 1:00,1:01,1:15은 하나의 세션으로 되고, 1:40은 새로운 세션 시작이 된다.



출처 : https://cloud.google.com/dataflow/model/windowing#session-windows


시간대별 Window 처리 방식

스트리밍 데이타에서 윈도우를 사용할때, 어느 시간을 기준 시간으로 할것인가를 정해야 하는데, 데이타가 시스템에 도착하는 Processing time을 기준으로 할 수 있고 또는  데이타가 실제 발생한 시간인 Event time을 기준으로도 할 수 있다.

Processing time based windowing

Processing time을 기준으로 데이타를 처리하는 것은 크게 어렵지 않다. 데이타가 도착한 순서대로 처리해서 저장하면 된다.


Event time based windowing

문제는 Event time을 기준으로 데이타를 처리 하는 경우인데, 데이타가 들어오는 것이 순서대로 들어오지 않는 경우가 많고, 또한 데이타의 도착 시간또한 일정하지 않다.




이 그림은 Event time을 기준으로 데이타를 처리하는 개념인데, 좌측 하얀색 화살표 처럼 12:00~13:00에 도착한 데이타가 11:00~12:00에 발생한 데이타 일 경우, 11:00~12:00 윈도우에 데이타를 반영해줘야 한다.

이러한 Event time 기반의 스트리밍 처리는 아래와 같이 기술적으로 두가지 주요 고려 사항이 필요하다.

  • Buffering
    늦게 도착한 데이타를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다.

  • Completeness
    Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가?
    즉 해당 시간에 발생한 모든 데이타는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이타가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다. (예를 들어 워터마크 기법 같은 것이 있는데, 이는 다음글에서 설명하도록 한다.)


지금까지 실시간 데이타 분석에 사용되는 대략적인 개념을 알아보았다. 다음 글에서는 Apache Beam을 이용하여 이러한 실시간 데이타 분석을 어떻게 구현하는지 알아보도록 하겠다.



참고 자료

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison#game-stats-advanced-stream-processing