블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 

'Spark'에 해당되는 글 23

  1. 2018.01.01 Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기
  2. 2017.03.23 클라우드에 최적화된 하둡 배포 아키텍쳐 생각하기
  3. 2016.08.25 실시간 데이타 분석 플랫폼 Dataflow - #5 데이타 플로우 프로그래밍 모델 (1)
  4. 2016.08.09 실시간 데이타 분석 플랫폼 Dataflow - #4 개발환경 설정하기
  5. 2016.07.22 데이타 스트리밍 분석 플랫폼 DataFlow - #2 개념 소개 (2/2) (1)
  6. 2016.07.17 데이타 스트리밍 분석 플랫폼 DataFlow - #2 개념 소개 (1/2) (1)
  7. 2016.07.17 데이타 스트리밍 분석 플랫폼 dataflow - #1. 소개
  8. 2016.07.04 실시간 빅데이타 처리를 위한 스트리밍 처리의 개념 (1)
  9. 2015.06.09 Apache Spark - Key/Value Paris (Pair RDD)
  10. 2015.05.31 Apache Spark - RDD (Resilient Distributed DataSet) Persistence (1)
  11. 2015.05.26 Apache Spark - RDD (Resilient Distributed DataSet) 이해하기 - #2 (1)
  12. 2015.05.22 Apache Spark - RDD (Resilient Distributed DataSet) 이해하기 - #1
  13. 2015.05.22 Apache Spark 소개 - 스파크 스택 구조
  14. 2015.05.18 Apache Spark 클러스터 구조
  15. 2015.05.18 Apache Spark 설치 하기
  16. 2015.05.13 Zepplin (제플린) 설치하기 (1)
  17. 2015.02.11 머신러닝 프레임웍에 대한 간단 메모 (1)
  18. 2015.01.29 Apache Storm을 이용한 실시간 데이타 처리 #6 –Storm 그룹핑 개념 이해하기
  19. 2015.01.25 Apache Storm을 이용한 실시간 데이타 처리 #4 –소개와 기본 개념 (2)
  20. 2015.01.25 Apache Storm을 이용한 실시간 데이타 처리 #3 -Storm 클러스터 설정과 배포
 

Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기


조대협 (http://bcho.tistory.com)



개요

텐서플로우 학습에 있어서 데이타 포맷은 학습의 성능을 결정 짓는 중요한 요인중의 하나이다. 특히 이미지 파일의 경우 이미지 목록과 이미지 파일이 분리되어 있어서 텐서플로우에서 학습시 이미지 목록을 읽으면서, 거기에 있는 이미지 파일을 매번 읽어야 하기 때문에, 코딩이 다소 지저분해지고,IO 성능이 떨어질 수 있다

텐서플로우에서는 이러한 학습 데이타를 쉽게 읽을 수 있도록 tfrecord (http://bcho.tistory.com/1190)라는 파일 포맷을 지원한다.


이 글에서는 이미지 데이타를 읽어서 tfrecord 로 컨버팅하는 방법을 설명하며, 분산 데이타 처리 프레임웍인 오픈소스 Apache Beam을 기준으로 설명하나, tfrecord 변환 부분은 Apache Beam과 의존성이 없이 사용이 가능하기 때문에, 필요한 부분만 참고해도 된다. 이 Apache Beam을 구글의 Apache Beam 런타임 (매니지드 서비스)인 구글 클라우드의 Dataflow를 이용하여, 클러스터를 이용하여 빠르게 데이타를 처리하는 방법에 대해서 알아보도록 한다.


전체 코드는 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/4.%20Convert%20Pickle%20file%20to%20TFRecord%20by%20using%20Apache%20Beam.ipynb 에 있다.


이 코드는 CIFAR-10 이미지 데이타를 Apache Beam 오픈 소스를 이용하여, 텐서플로우 학습용 데이타 포맷인  tfrecord 형태로 변환 해주는 코드이다.


Apache Beam은 데이타 처리를 위한 프레임웍으로, 구글 클라우드 상에서 실행하거나 또는 개인 PC나 Spark 클러스터상 여러 환경에서 실행이 가능하며, 구글 클라우드 상에서 실행할 경우 오토스케일링이나 그래프 최적화 기능등으로 최적화된 성능을 낼 수 있다.


CIFAR-10 데이타 셋은 32x32 PNG 이미지 60,000개로 구성된 데이타 셋으로 해당 코드 실행시 최적화가 되지 않은 상태에서 약 16분 정도의 처리 시간이 소요된다. 이 중 6분 정도는 Apache Beam 코드를 구글 클라우드로 업로드 하는데 소요되는 시간이고 실제 처리시간은 10분정도가 소요된다. 전처리 과정에 Apache Beam을 사용하기 전에 고려해야 할 요소는 다음과 같다.

  • 데이타가 아주 많아서 전처리 시간이 수시간 이상 소요될 경우 Apache Beam + Google Cloud를 고려하여 여러 머신에서 동시에 처리하여 빠른 시간내에 수행되도록 할 수 있다.

  • 데이타가 그다지 많지 않고 싱글 머신에서 멀티 쓰레드로 처리를 원할 경우에는 Apache Beam으로 멀티 쓰레드 기반의 병렬 처리를 하는 방안을 고려할 수 있다. 이 경우 클라우드에 대한 의존성을 줄일 수 있다.

  • 다른 대안으로는 Spark/Hadoop 등의 오픈소스를 사용하여, On Prem에서 여러 머신을 이용하여 전처리 하는 방안을 고려할 수 있다.

여기서는 아주 많은 대량의 이미지 데이타에 대한 처리를 하는 것을 시나리오로 가정하였다.

전처리 파이프라인

Apache Beam을 이용한 데이타 전처리 파이프라인의 구조는 다음과 같다.

이미지 파일 준비

CIFAR-10 데이타셋 원본은 이미지 파일 형태가 아니라 PICKLE이라는 파일 포맷으로 되어 있기 때문에,  실제 개발 환경에서는 원본데이타가 이미지인것으로 가정하기 위해서 https://github.com/bwcho75/cifar-10/tree/master/pre-processing 의 1~2번 코드를 통해서 Pickle 파일을 이미지 파일로 변경하고, *.csv 파일에 {파일명},{레이블} 형태로 인덱스 데이타를 생성하였다.

생성된 이미지 파일과 *.csv 파일은 gsutil 명령어를 이용하여 Google Cloud Storage (aka GCS)에 업로드 하였다. 업로드 명령은 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/2.%20Convert%20CIFAR-10%20Pickle%20files%20to%20image%20file.ipynb 에 설명되어 있다.


전처리 파이프라인의 구조

Apache Beam으로 구현된 파이프라인의 구조는 다음과 같다.


1. TextIO의 ReadFromText로 CSV 파일에서 한 라인 단위로 문자열을 읽는다.

2. parseLine에서 라인을 ,로 구분하여 filename과 label을 추출한다.

3. readImage 에서 filename을 가지고, 이미지 파일을 읽어서, binary array 형태로 변환한다.

4. TFExampleFromImageDoFn에서 이미지 바이너리와 label을 가지고 TFRecord 데이타형인 TFExample 형태로 변환한다.

5. 마지막으로 TFRecordIOWriter를 통해서 TFExample을 *.tfrecord 파일에 쓴다.

코드 주요 부분 설명

환경 설정 부분

이 코드는 구글 클라우드와 로컬 환경 양쪽에서 모두 실행이 가능하도록 구현되었다.

SRC_DIR_DEV는 로컬환경에서 이미지와 CSV 파일이 위치한 위치이고, DES_DIR_DEV는 로컬환경에서 tfrecord 파일이 써지는 위치이다.

구글 클라우드에서 실행할 경우 파일 저장소를  GCS (Google Cloud Storage)를 사용한다. DES_BUCKET은 GCS 버킷 이름이다. 코드 실행전에 반드시 구글 클라우드 콘솔에서 GCS 버킷을 생성하기 바란다.  SRC_DIR_PRD와 DES_DIR_PRD는 GCS 버킷내의 각각 image,csv 파일의 경로와 tfrecord 파일이 써질 경로 이다. 이 경로에 맞춰서 구글 클라우드 콘솔에서 디렉토리를 먼저 생성해 놓기를 바란다.




PROJECT는 구글 클라우드 프로젝트 명이고, 마지막으로 DEV_MODE가 True이면 로컬에서 수행이되고 False이면 구글 클라우드에서 실행하도록 하는 환경 변수이다.

의존성 설정 부분

로컬에서 실행할 경우필요한  파이썬 라이브러리가 이미 설치되어야 있어야 한다.

만약에 구글 클라우드에서 실행할 경우 이 Apache Beam 코드가 사용하는 파이썬 모듈을 명시적으로 정의해놔야 한다. 클라우드에서 실행시에는 Apache Beam 코드만 업로드가 되기 때문에(의존성 라이브러리를 같이 업로드 하는 방법도 있는데, 이는 추후에 설명한다.), 의존성 라이브는 구글 클라우드에서 Dataflow 실행시 자동으로 설치할 수 있도록 할 수 있는데, 이를 위해서는 requirements.txt 파일에 사용하는 파이썬 모듈들을 정의해줘야 한다. 다음은 requirements.txt에 의존성이 있는 파이썬 모듈등을 정의하고 저장하는 부분이다.


Apache Beam 코드

Apache Beam의 코드 부분은 크게 복잡하지 않기 때문에 주요 부분만 설명하도록 한다.

Service account 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 코드 실행에 대한 권한을 줘야 한다. 구글 클라우드에서는 사용자가 아니라 애플리케이션에 권한을 부여하는 방법이 있는데, Service account라는 것을 사용한다. Service account는 json 파일로 실행 가능한 권한을 정의하고 있다.

Service account 파일을 생성하는 방법은 http://bcho.tistory.com/1166 를 참고하기 바란다.

Service account 파일이 생성되었으면, 이 파일을 적용해야 하는데 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 Service account  파일의 경로를 정의해주면 된다. 파이썬 환경에서 환경 변수를 설정하는 방법은 os.envorin[‘환경변수명']에 환경 변수 값을 지정해주면 된다.

Jobname 설정

구글 클라우드에서 Apache Beam 코드를 실행하면, 하나의 실행이 하나의 Job으로 생성되는데, 이 Job을 구별하기 위해서 Job 마다 ID 를 설정할 수 있다. 아래는 Job ID를 ‘cifar-10’+시간 형태로 지정하는 부분이다


환경 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 몇가지 환경을 지정해줘야 한다.


  • staging_location은 클라우드 상에서 실행시 Apache Beam 코드등이 저장되는 위치이다. GCS 버킷 아래 /staging이라는 디렉토리로 지정했는데, 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • temp_location은 기타 실행중 필요한 파일이 저장되는 위치이다. 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • zone은 dataflow worker가 실행되는 존으로 여기서는 asia-northeast1-c  (일본 리전의 c 존)으로 지정하였다.


DEV_MODE 에 따른 환경 설정

로컬 환경이나 클라우드 환경에서 실행이냐에 따라서 환경 변수 설정이 다소 달라져야 한다.


디렉토리 경로를 바꿔서 지정해야 하고, 중요한것은 RUNNER인데, 로컬에서 실행하기 위해서는 DirectRunner를 구글 클라우드 DataFlow 서비스를 사용하기 위해서는 DataflowRunner를 사용하면 된다.


readImage 부분

Read Image는 이미지 파일을 읽어서 byte[] 로 리턴하는 부분인데, 로컬 환경이냐, 클라우드 환경이냐에 따라서 동작 방식이 다소 다르다.

클라우드 환경에서는 이미지 파일이 GCS에 저장되어 있기 때문에 파이썬의 일반 파일 open 명령등을 사용할 수 없다.

그래서 클라우드 환경에서 동작할 경우에는 GCS에서 파일을 읽어서 Worker의 로컬 디스크에 복사를 해놓고 이미지를 읽어서 byte[]로 변환한 후에, 해당 파일을 지우는 방식을 사용한다.


아래 코드에서 보면 DEV_MODE가 False 인경우 GCS에서 파일을 읽어서 로컬에 저장하는 코드가 있다.


storageClient는 GCS 클라이언트이고 bucket 을 얻어온후, bucket에서 파일을 get_blob 명령어를 이용하여 경로를 저장하여 blob.download_to_file을 이용하여 로컬 파일에 저장하였다.

실행

코드 작성이 끝났으면 실행을 한다. 실행 상태는 구글 클라우드 콘솔의 Dataflow  메뉴에서 확인이 가능하다.

아래와 같이 실행중인 그리고 실행이 끝난 Job 리스트들이 출력된다.




코드 실행중에, 파이프라인 실행 상황 디테일을 Job 을 선택하면 볼 수 있다.


여기서 주목할만한 점은 우측 그래프인데, 우측 그래프는 Worker의 수를 나타낸다. 초기에 1대로 시작했다가 오토 스케일링에 의해서 9대 까지 증가한것을 볼 수 있다.

처음 실행이었기 때문에 적정한 인스턴스수를 몰랐기 때문에 디폴트로 1로 시작하고 오토스케일링을 하도록 했지만, 어느정도 테스트를 한후에 적정 인스턴수를 알면 오토 스케일링을 기다릴 필요없이 디폴트 인스턴스 수를 알면 처음부터 그 수만큼 인스턴스 수로 시작하도록 하면 실행 시간을 줄일 수 있다.

만약에 파이프라인 실행시 에러가 나면 우측 상단에 LOGS 버튼을 누르면 상세 로그를 볼 수 있다.


아래 그림은 파이프라인 실행이 실패한 예에서 STACK TRACES를 통해서 에러 내용을 확인하는 화면이다.



해당 로그를 클릭하면 Stack Driver (구글의 모니터링 툴)의 Error Reporting 시스템 화면으로 이동하게 된다.

여기서 디테일한 로그를 볼 수 있다.

아래 화면을 보면 ReadImage 단계에서 file_path라는 변수명을 찾을 수 없어서 나는 에러를 확인할 수 있다.


TFRecord 파일 검증

파이프라인 실행이 끝나면, GCS 버킷에 tfrecord 파일이 생성된것을 확인할 수 있다.


해당 파일을 클릭하면 다운로드 받을 수 있다.

노트북 아래 코드 부분이 TFRecord를 읽어서 확인하는 부분이다. 노트북에서 tfrecord 파일의 경로를 다운로드 받은 경로로 변경하고 실행을 하면 파일이 제대로 읽히는 지 확인할 수 있다.


파일 경로 부분은 코드상에서 다음과 같다.



정상적으로 실행이 된 경우, 다음과 같이 tfrecord에서 읽은 이미지와 라벨값이 출력됨을 확인할 수 있다.


라벨 값은 Label 줄에 values 부분에 출력된다. 위의 그림에서는 순서대로 라벨 값이 4와 2가 된다.



클러스터 상에서 하둡 배포 아키텍쳐


조대협 (http://bcho.tistory.com)


오늘 빅데이타 관련 교육을 받다가 클라우드 상에서 하둡 클러스터 활용에 대한 영감을 받은 부분이 있어서 정리해보고자 한다. 하둡의 경우에는 On-prem 환경에 적절하게 디자인이 된 오픈 소스라서, 이걸 클라우드에서 사용할 경우에도 on-prem에서 사용하는 형태와 유사하게 사용하는 경우가 많다. 일종의 습관 또는 관성이라고 해야 하나? 인프라가 바뀌면 그 장점에 맞는 아키텍쳐를 선택해야 하는데, 이 부분을 놓치고 있지 않았나 싶다.

Job별 클러스터를 생성하는 아키텍쳐

job을 수행하는 방법을 보면, 일반적으로 On-Prem에서 사용하는 방법은 하나의 하둡 클러스터에 Job을 실행하고 Job이 끝나면 다음 Job을 수행하는 방식을 사용한다.



이러한 경험(습관)때문인지 보통 클라우드 상에 하둡 클러스터를 만들어놓고 그 클러스터에 On-prem과 마찬가지 방식으로 Job을 넣고 그 작업이 끝나면 같은 클러스터에 Job을 넣는 방식을 사용한다.

그런데 클라우드의 특성상 자원이 거의 무한대인데, 여러개의 Job을 하나의 클러스터에서 순차적으로 실행할 필요가 있을까?


Job별로 클러스터를 생성해서 연산을 하고 Job이 끝나면 클러스터를 끄는 모델을 사용하면, 동시에 여러 작업을 빠르게 처리할 수 있지 않을까?



이 경우 문제가 될 수 있는 것이 같은 데이타셋을 가지고 여러가지 Job을 실행할 경우 기존의 On-Prem의 경우에는 동시에 같은 데이타셋에 접근할 경우 오버로드가 심하기 때문에, 어렵지만 클라우드의 경우에는 바로 HDFS를 사용하는 것보다는 GCS와 같은 클라우드 스토리지를 사용하는데, 여기서 오는 장점이 이런 클라우드 스토리지에 저장된 데이타는 동시에 여러 하둡 클러스터에서 접근이 가능하다.


이러한 아키텍쳐를 생각하게된 이유는 구글 클라우드의 특성 때문인데, 구글 클라우드의 과금 모델은 분당 과금 모델을 사용한다. 그래서 Job별로 클러스터를 전개했다가 작업이 끝난 다음에 끄더라도 비용 낭비가 없다. 시간당 과금인 경우에는 클러스터를 껐다켰다 하면 비용 낭비가 발생할 수 있기 때문에 바람직하지 않을 수 있다. 예를 들어서 1시간 10분이 걸리는 작업과 20분이 걸리는 작업을 각각 다른 클러스터에서 돌린다면 2시간, 그리고 1시간 비용이 과금되기 때문에 총 3시간이 과금된다.

그러나 구글의 분당 과금을 적용하면 총 1시간30분이 과금되기 때문에 비용이 1/2로 절감된다.


또한 구글 클라우드의 경우 하둡 클러스터 배포시간이 통상 90초이내로 빠르기때문에, Job마다 클러스터를 생성하고 끈다고 해도, 실행 시간이 크게 늘어나지 않는다.


클러스터별로 인프라를 최적화

만약에 위에처럼 Job별로 클러스터를 나눈다면, Job 의 특성에 맞춰서 클러스터의 인스턴스 수나 구조(?)를 변경하는 것도 가능하다.

즉 많은 컴퓨팅 작업이 필요한 Job이라면 클러스터에 많은 인스턴스를 넣고, 그렇지 않다면 인스턴스 수를 줄일 수 있다.




조금 더 나가면, 상태 정보 같이 HDFS에 저장하는 데이타가 적을 경우 워커노드가 중간에 정지되더라도 전체 연산에 크게 문제가 없을 경우에는 Preemptible VM과 같이 가격은 싸지만 저장 기능이 없는 VM의 수를 늘려서 전체 비용을 낮출 수 있다.




여기서 한걸음 더 나가면, 구글 클라우드의 인스턴스 타입중의 하나가 Cutomizable 인스턴스로 CPU와 메모리 수를 마음대로 조정할 수 있다. 즉 Job이 메모리를 많이 사용하는 Spark과 같은 Job일 경우 클러스터를 구성하는 인스턴스에 CPU보다 메모리를 상대적으로 많이 넣고 구성하는 것들이 가능하다.


아키텍쳐 설계도 의미가 있었지만, 그동안 간과하고 있었던 점이 기술이나 인프라가 바뀌면 활용 하는 방법도 다시 고민해야 하는데, 그동안의 관성 때문인지 기존 시스템을 활용하는 방법을 그대로 의심 없이 사용하려 했다는 것에 다시한번 생각할 필요가 있었던 주제였다.






데이타 플로우 프로그래밍 모델의 이해


조대협 (http://bcho.tistory.com)


앞의 글에서 스트리밍 프로세스의 개념과, 데이타 플로우의 스트리밍 처리 개념에 대해서 알아보았다. 그렇다면 실제로 이를 데이타 플로우를 이용해서 구현을 하기 위해서는 어떤 컴포넌트와 프로그래밍 모델을 사용하는지에 대해서 알아보자.


구글 데이타 플로우 프로그래밍 모델은 앞에서 설명한 바와 같이, 전체 데이타 파이프라인을 정의하는 Pipeline, 데이타를 저장하는 PCollections, 데이타를 외부 저장소에서 부터 읽거나 쓰는 Pipeline I/O, 그리고, 입력 데이타를 가공해서 출력해주는 Transforms , 총 4가지 컴포넌트로 구성이 되어 있다.


이번 글에서는 그 중에서 데이타를 가공하는  Transforms 컴포넌트들에 대해서 알아본다.

Transforms

Transforms는 데이타를 어떻게 가공하느냐에 따라서 다음 그림과 같이 세가지 형태로 분류된다.

Element-Wise

개별 데이타 엘리먼트를 단위로 연산을 수행하는 것을 Element-Wise Transform이라고 한다.

ParDo 함수가 이에 해당하며, 하나의 데이타를 입력 받아서, 연산을 한 후, 하나의 출력을 내보내는 연산이다.

ParDo 함수는 DoFn으로 정의된 연산 로직을 병렬로 여러개의 프로세스와 쓰레드에서 나눠서 처리를 해준다.


DoFn 클래스 는 다음과 같은 포맷으로 정의한다.


static class MyFunction extends DoFn<{입력 데이타 타입},{출력 데이타 타입}>{

     @Override

     public void processElement(ProcessContext c) {

       // do Something

     }

}


DoFn 클래스를 상속 받아서 클래스를 정의하고, 클래스 정의시 제네릭 타입 (Generic type)으로, 첫번째는 입력 데이타 타입을 두번째는 출력 데이타 타입을 정의한다.

그리고 processElement 함수를 오버라이드(Override)해서, 이 함수안에 연산 로직을 구현한다.

processElement는 ProcessContext를 인자로 받는데, 이 Context에는 입력 데이타를 포함하고 있다.

입력 데이타를 꺼내려면 c.element()라는 메서드를 이용하면, 입력 데이타 타입으로 정의된 입력 데이타를 꺼낼 수 있다.

데이타를 처리한 다음에 파이프라인상의 다음 컴포넌트로 데이타를 보내려면 c.output ({출력 데이타} ) 형태로 정의를 해주면 된다.


이렇게 정의된 DoFn함수는  ParDo를 이용하여 파이프라인 상에서 병렬로 실행될 수 있다. ParDo는, 파이프라인상에서 .apply 메서드를 이용하여 적용한다.


그러면 실제로 어떻게 적용이 되는지 다음 예제를 보자.

   p.apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))


String 인자를 입력으로 받아서, String 인자로 출력을 하는 DoFn 함수를 정의하였다.

processElement 부분에서, c.element()로 String 입력 값을 읽은 후, toUpperCase() 함수를 적용하여 대문자로 변환을 한 후, c.output을 이용하여 다음 파이프라인으로 출력 데이타를 넘겼다.


조금 더 디테일한 예제를 보자

p.apply(Create.of("key1,Hello", "key2,World","key1,hello","key3,boy","key4,hello","key2,girl"))

.apply(ParDo.named("Parse").of(new DoFn<String, KV<String,String>>() {

@Override

public void processElement(ProcessContext c) {

StringTokenizer st = new StringTokenizer(c.element(),",");

String key = st.nextToken();

String value = st.nextToken();


KV<String,String> outputValue =  KV.of(key,value);

c.output(outputValue);

}

}))

Create.of를 이용하여 “Key.Value” 문자열 형태로 데이타를 생성한 후, 이 문자열을 읽어서, DoFn<String,KV<String,String>> 클래스에서 이를 파싱하여, 문자열을 키밸류 데이타형인 KV 데이타 형으로 변환하여 리턴하는 예제이다. 아래 개념도를 보자


입력 값은 String 데이타 타입으로 “Key.Value”라는 형태의 문자열을 받는다.

DoFn에서 처리한 출력 값은 KV 형으로 KV 데이타 타입은 키와 값을 가지고 있는데, 키와 값의 타입도 제네릭 타입으로 키는 String, 값은 String 타입으로 정의하였다. 입력된 “Key.Value” 문자열은 “.” 전후로 분리가 되서, “.” 좌측은 키로, 우측은 값으로 해서, KV에 각각 들어간다.

processElement를 보면, c.element를 이용하여 String 문자열을 꺼낸 후, StringTokenizer를 이용하여 “.”을 분류 문자로 정의해서 파싱한다. 첫번째를 키로 저장하고, 두번째를 값으로 저장한다.

KV<String,String> outputValue =  KV.of(key,value)

를 이용하여, KV 객체를 생성한 후, c.output(outputValue); 을 이용하여 다음 파이프라인 컴포넌트로 값을 전달한다.


시스템내에서 수행되는 방법은 다음과 같이 된다. ParDo에 의해서 DoFn 클래스가 여러개의 워커에 의해서 분산 처리가된다.

어그리게이션(Aggregation)

어그리게이션 값을 특정 키 값을 이용하여 데이타를 그룹핑을 하는 개념이다.

이러한 어그리게이션 작업은 내부적으로 셔플링(Shuffling)이라는 개념을 이용해서 이루어지는데, 키별로 데이타를 모으거나 키별로 합산등의 계산을 하기 위해서는, 키별로 데이타를 모아서 특정 워커 프로세스로 보내야 한다.

ParDo를 이용하여 병렬 처리를 할 경우, 데이타가 키값에 상관 없이 여러 워커에 걸쳐서 분산되서 처리되기 때문에, 이를 재 정렬해야 하는데, 이 재 정렬 작업을 셔플링이라고 한다.


아래 그림을 보자, 파이프라인 상에서 첫번째 프로세스를 Worker 1과 Worker 2가 처리를 하였고, 결과는 Key1과  Key2를 키로 가지는 데이타라고 하자, 이를 어그리게이션 하면 아래 그림과 같이 Key1 데이타는 Worker 3로 모이고, Key 2 데이타는 Worker 4로 모인다. 이런 방식으로 셔플링이 이루어진다.


데이타 플로우의 어그리게이션 에는 특정 키별로 데이타를 묶어주는 Grouping과, 특정 키별로 데이타를 연산(합이나 평균을 내는)하는  Combine 두 가지가 있다.

Grouping

PCollection<KV<String, Integer>> wordsAndLines = ...;

에서 다음과 같이 String,Integer 페어로 KV 타입을 리턴한다고 하자.


 cat, 1
 dog, 5
 and, 1
 jump, 3
 tree, 2
 cat, 5
 dog, 2
 and, 2
 cat, 9
 and, 6
 ...


이를 다음과 같이 키에 따라서 밸류들을 그룹핑 하려면 GroupByKey를 사용해야 한다.


 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(

   GroupByKey.<String, Integer>create());


코드는 앞 단계에서 KV<String,Integer>로 들어온 wordLines 데이타를 GroupByKey를 이용하여 Key 단위로 그룹핑을 한후 이를 <String, Iterable<Integer>> 타입으로 리턴하는 Transformation이다.

<String, Iterable<Integer>>에서 앞의 String은 키가 되며, Iterable 다수의 값을 가지고 있는 밸류가 된다. 이 밸류는 Integer 타입으로 정의된다.

Combine

Grouping이 키 단위로 데이타를 묶어서 분류해주는 기능이라면, Combine은 키단위로 데이타를 묶어서 연산을 해주는 기능이다.

예를 들어 앞의 예제처럼, “cat”이라는 문자열 키로 된 데이타들이 [1,5,9] 가 있을때, 이에 대한 총합이나 평균등을 내는 것이 Combine 이다.


PCollection<Integer> pc = ...;

 PCollection<Integer> sum = pc.apply(

   Combine.globally(new Sum.SumIntegerFn()));


코드는 Integer로 들어오는 모든 값을 Combine에서 Sum 기능을 이용하여 모든 값을 더하는 코드이다.

전체 데이타에 대해서 적용하기 때문에, Combine.globally로 적용하였다.


아래와 같은 형태의 데이타가 있다고 가정하자. 키에 따라서 값이 그룹핑이 된 형태이다.

 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Integer>> occurrences = ...;

 PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());

 PCollection<KV<String, Integer>> firstOccurrences = pc.apply(

   Combine.groupedValues(new Min.MinIntegerFn()));


위의 데이타들이 PCollection<KV<String, Iterable<Integer>>> grouped

에 저장되었다고 할때, 각 키별로 최소값을 구하는 것을 Combine.groupedValue에서 Min을 호출하여 최소값을 구했다.


Transforms 컴포넌트의 기본적인 종류들을 알아보았다. 이외에도, 하나의 Transform 안에 여러개의 Transform을 집어 넣는 Composite Transform이나, 두개 이상의 데이타 스트림에서 데이타를 키에 따라 JOIN하는 기능들이 있는데, 이러한 고급 기능들은 뒤에 고급 프로그래밍 모델에서 설명하기로 한다.

PCollection

PCollection은 데이타 플로우 파이프라인 내에서 데이타를 저장하는 개념이다.

데이타는 이 PCollection 객체에 저장이되서, 파이프라인을 통해서 Transform으로 넘겨진다.

PCollection은 한번 생성이 되면, 그 데이타를 수정이 불가능하다. (데이타를 변경하거나 수정하기 위해서는 PCollection을 새로 생성해야 한다.)

Bounded & Unbounded PCollection

PCollection은 데이타의 종류에 따라 Bounded PCollection과 Unbounded PCollection 두가지로 나뉘어 진다.

Bounded PCollection

Bounded PCollection은 배치 처리 처럼, 데이타가 변화하지 않는 데이타로 파일이나, 업데이트가 더 이상 발생하지 않는 테이블등을 생각하면 된다.

TextIO,BigQueryIO,DataStoreIO등을 이용해서 데이타를 읽은 경우에는 Bounded PCollection으로 처리가 된다.

Bounded PCollection 데이타들은 배치 처리의 특성상 데이타를 한꺼번에 읽어서 처리한다.  

Unbounded PCollection

Unbounded PCollection은, 데이타가 계속 증가 하는 즉 흐르는 데이타를 표현한다. 트위터 타임 라인이나, 스마트 폰에서 서버로 올라오는 이벤트 로그등을 예로 들 수 있다.

이러한 Unbounded PCollection은 시간의 개념을 가지고 윈도우 처리등을 해야하기 때문에, PCollection 객체내에 타임 스탬프가 자동으로 붙는다.

UnBounded PCollection은 데이타를 BigQueryIO또는 Pub/Sub에서 읽을 때 생성된다.

특히 Unbounded PCollection에 Grouping이나, Combine등을 사용할 경우, 데이타가 파이프라인 상에서 언제 그룹핑된 데이타를 다음 Transform 컴포넌트로 넘겨야할지를 정의해야 하기 때문에, Window를 반드시 사용해야 한다.

데이타 타입

PCollection을 이용해서 정의할 수 있는 주요 데이타 타입은 다음과 같다.

일반 데이타 타입

PCollection<Integer> data

가장 기초적인 데이타 형으로, Integer,Float,String 등 자바의 일반 데이타 타입으로 정의되고 하나의 데이타 만을 저장한다.

KV 데이타 타입

PCollection< KV<String,Integer> key_value_data

키 밸류 데이타 타입으로, 키와 값으로 구성된다. 키와 값은 일반 데이타 타입과 같게, 자바의 일반 데이타 타입 사용이 가능하다.


PCollection<KV<String, Iterable<Integer>>>

키 밸류 데이타 타입중에 값에 여러개의 값을 넣을 수 있는 Iterable 타입이 있다.

앞의 Transform 예제에서 언급된것과 같이 키가 cat이고, 그 값이 2,6,7,9 와 같이 여러 값을 가지는 형이 이러한 타입에 해당한다.

커스텀 데이타 타입

단순한 데이타 타입 말고도, 복잡한 데이타 형을 처리하기 위해서  커스텀 데이타 타입을 지원한다.

커스텀 데이타 타입은 오픈 소스 Avro의 데이타 모델을 사용한다. http://avro.apache.org/


예를 들어 어떤 키에 대해서 카운트값,평균,총합 그리고 윈도우의 시작과 끝 시간을 저장하는 데이타 타입 Stat가 있다고 가정하자. 이 데이타 타입은 다음과 같이 정의된다.

자바에서 일반적인 Value Object 형태로 정의를 하면되고, 단 앞에 어노테이션으로 @DefaultCoder(AvroCoder.class) 와 같이 Avro 데이타 클래스임을 정의하면 된다.


@DefaultCoder(AvroCoder.class)

static class Stat{

Float sum;

Float avg;

Integer count;

Integer key;

Instant wStart; // windowStartTime

Instant wEnd; // windowEndTime

public Instant getwStart() {

return wStart;

}

public Instant getwEnd() {

return wEnd;

}


public Float getSum() {

return sum;

}

public Float getAvg() {

return avg;

}

public Integer getCount() {

return count;

}


public Integer getKey(){

return key;

}


public Stat(){}

public Stat(Integer k,Instant start,Instant end,Integer c,Float s,Float a){

this.key = k;

this.count = c;

this.sum = s;

this.avg = a;

this.wStart = start;

this.wEnd = end;

}


}



윈도우

스트리밍 데이타 처리를 할때 가장 중요한 개념이 윈도우이다.

특히나  Unbounded 데이타를 이용한 스트리밍 처리에서 Grouping이나 Combine 시에는 반드시 윈도우를 사용해야 한다.Grouping이나 Combine과 같은 aggregation을 하지 않으면, Unbounded 데이타라도 윈도우 처리가 필요없다.

또한 Bounded 데이타도, 데이타에 타임 스탬프를 추가하면 윈도우를 사용하여, 시간대별 데이타를 처리할 수 있다.

예를 들어 일일 배치를 돌리는 구매 로그가 있을때, 각 데이타의 구매 시간이 있으면, 이 구매시간을 타임 스탬프로 지정하여 배치라도 윈도우 단위로 연산을 할 수 있다.

고정 윈도우 적용

윈도우를 적용하는 방법은 정말 간단하다.

PCollection 객체에 apply 메소드를 이용하여 Window.into 메서드를 이용하여 적용하면된다.

예를 들어서 아래와 같이 PCollection<String> 형 데이타인 items 객체가 있을때, 여기에 윈도우를 적용하려면 다음과 같이 하면 된다.

 PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)));

items.apply를 이용하여 윈도우를 적용하는데, 데이타 타입이 String이기 때문에, Window.<String>into 로 윈도우를 적용하였다.

윈도우 타입은 Fixed 윈도우이기 때문에, FixedWindows.of를 사용하였고, 윈도우 주기는 1분주기라서 1,TimeUnit.MINUTES를 적용하였다.

슬라이딩  윈도우 적용

슬라이딩 윈도우는 윈도우의 크기 (Duration)과 주기를 인자로 넘겨야 한다.

아래 코드는 5초 주기로 30분 크기의 윈도우를 생성하는 예제이다.

 PCollection<String> items = ...;

 PCollection<String> sliding_windowed_items = items.apply(    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

윈도우가 5초마다 생성이되고, 각 윈도우는 30분 단위의 크기를 가지고 있게 된다.

세션 윈도우 적용

세션 윈도우는 HTTP 세션 처럼 특정 사용자가 일정 시간동안 데이타가 올라오지 않으면, 처음 데이타가 올라온 시간 부터 데이타가 올라오지 않은 시간 까지를 윈도우르 묶어주는 개념이다.

앞의 고정 윈도우나, 세션 윈도우와는 다르게 반드시 키별로 세션을 묶기 때문에 키가 필요하다.


아래 예제는 각 사용자 별로 세션당 점수를 계산해주는 예제이다.

userEvents

 .apply(Window.named("WindowIntoSessions")

       .<KV<String, Integer>>into(

             Sessions.withGapDuration(Duration.standardMinutes(Duration.standardMinutes(10))))

   .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))

 // For this use, we care only about the existence of the session, not any particular

 // information aggregated over it, so the following is an efficient way to do that.

 .apply(Combine.perKey(x -> 0))

 // Get the duration per session.

 .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))



다른 윈도우들과 마찬가지로 Window.into를 이용하여 윈도우를 적용하는데, 데이타 형을 잘 보면 <KV<String, Integer>> 형으로 정의된것을 확인할 수 있다.

Sessions.withGapDuration으로 세션 윈도우를 정의한다. 이때 얼마간의 시간 동안 데이타가 없으면 세션으로 짜를지를 지정해줘야 하는데, Duration.standardMinutes(10) 를 이용하여 10분간 해당 사용자에 대해서 데이타가 없으면 해당 사용자(키)에 대해서 세션 윈도우를 자르도록 하였다.

윈도우 시간 조회하기

윈도우를 사용하다보면, 이 윈도우의 시작과 종료 시간이 필요할때가 있다. 예를 들어 고정 크기 윈도우를 적용한다음에, 이를 데이타 베이스등에 저장할때, 이 윈도우가 언제시작해서 언제끝난 윈도우인지를 조회하여 윈도우 시작 시간과 함께 값을 저장하고자 하는 케이스이다. “1시에 몇건, 1시 10분에 몇건의 데이타가 저장되었다”와 같은 시나리오이다.


현재 윈도우에 대한 정보를 얻으려면 DoFn 클래스를 구현할때, com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implement 해야, 윈도우에 대한 정보를 억세스 할 수 있다.


static class StaticsDoFn extends DoFn<KV<Integer,Iterable<Twit>>, KV<Integer,Stat>>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

:


IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

Instant e = w.end();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTime eTime = e.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss");

String str_stime = sTime.toString(dtf);

String str_etime = eTime.toString(dtf);

                                      :


윈도우에 대한 정보는 ProcessContext c에서, c.window()를 호출하면, IntervalWindow라는 클래스로 윈도우에 대한 정보를 보내주고, 윈도우의 시작 시간과 종료 시간은 IntervalWindow 클래스의 start()와 end() 메서드를 이용해서 조회할 수 있다. 이 조회된 윈도우의 시작과 종료 시간은 org.joda.time.Instant 타입으로 리턴되는데, 조금 더 친숙한 DateTime 포맷으로 변환을 하려면, Instant.toDate() 메서드를 사용하면 되고, 이때, TimeZone 을 지정해주면 로컬 타임으로 변환을 하여, 윈도우의 시작과 종료시간을 조회할 수 있다.

타임 스탬프

윈도우는 시간을 기준으로 처리를 하기 때문에, 이 시간을 정의하는 타임스탬프를 어떻게 다루는지를 이해할 필요가 있다.

타임 스탬프 생성

Pub/Sub을 이용하여 unbounded 데이타를 읽을 경우 타임스탬프가 Pub/Sub에 데이타가 들어간 시간을 Event time으로 하여, PCollection에 추가된다.

타임 스탬프 지정하기

Pub/Sub에서와 같이 자동으로 타임 스템프를 부여하는게 아니라, 모바일 디바이스에서 발생한 이벤트 타임이나, 애플리케이션 적으로 PCollection에 직접 타임스탬프를 부여할 수 있다.

PCollection에 타임 스탬프를 부여 하는 방법은 간단하다.

DoFn내에서,  ProcessContext를 다음 파이프라인 컴포넌트로 보낼때,  c.output(데이타) 대신, c.output(데이타, 타임 스탬프)를 사용하면 된다. 타임 스탬프의 데이타 타입은  org.joda.time.Instant 를 사용한다.

예를 들어서

c.outputWithTimestamp(c.element(), logTimeStamp);

와 같은 방법으로 사용을 한다.


모바일 서비스 분석등, 실제 시간에 근접한 분석을 하려면, 로그를 수집할때, 이벤트 발생 시간을 같이 REST API등을 통해서 수집하고, outputWithTimestamp를 이용하여, 수집된 이벤트 발생시간을  PCollection에 추가하는 방식으로 타임 스탬프를 반영 하는 것이 좋다.








데이타 플로우 개발환경 설정하기


조대협 (http://bcho.tistory.com)


데이타 플로우에 대한 이해가 끝났으면 이제 직접 코딩을 해보자. 데이타 플로우에 대한 개념등은 http://bcho.tistory.com/search/dataflow 를 참고하기 바란다.

데이타 플로우에서 지원하는 프로그래밍 언어는 자바와 파이썬이다. 파이썬은 아직 알파버전으로, 이 글에서는 자바를 이용해서 설명한다.


자바를 이용한 개발환경 설정은 이클립스 개발환경과 maven을 이용한 개발 환경 두가지가 있는데, 여기서는 조금 더 손 쉬운 이클립스 환경을 기준으로 설명한다.

메이븐 기반의 개발 환경 설정은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven 를 참고하기 바란다.


사전준비

클라우드 계정 생성 및 빌링 설정

구글 클라우드 계정 생성 및 빌링 설정 방법은 앞서 다른글에서도 많이 설명하였기 때문에 다시 설명하지 않는다. 자세한 내용은 http://bcho.tistory.com/1107 를 참고하기 바란다.

API 사용 설정하기

다음 데이타플로우와 기타 같이 사용할 제품들의 API를 사용하기 위해서 이를 설정해줘야 한다.

구글 클라우드 콘솔에서 API Manager를 선택한후 대쉬 보드에서 아래 서비스들을 선택하여 API를 Enable 해준다. Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Datastore APIs.





구글 Cloud SDK 설정

구글 데이타 플로우를 프로그래밍 하기 위해서, 데이타 플로우 API를 호출하기 위한 SDK와 조작을 위한 CLI (Command Line Interface)가 필요한데, 이는 구글 Cloud SDK를 설치하면 같이 설치가 된다.

클라우드 SDK 설치는 https://cloud.google.com/sdk/docs/ 를 참고하면 된다.

gcloud 인증하기

구글 Cloud SDK 설치가 끝났으면, gcloud 명령어를 사용하기 위해서 gcloud 명령어를 초기화 한다.

초기화는 어떤 구글 클라우드 프로젝트를 사용할것인지, 그리고 사용자 아이디등으로 인증을 하는 절차를 거친다.

프롬프트 상에서

%gcloud init

명령을 실행하여, 수행한다.

이클립스 환경 설정

이제 구글 클라우드 프로젝트 설정과, 이를 호출하기 위한 SDK 환경 설치가 끝났다. 이제 이클립스 기반의 개발 환경을 설정해보자.

이클립스 설치하기

이클립스는 4.4 버전 이상을 설치하고, JDK는 1.7 이상을 설정한다.

플러그인 설치하기

다음 구글 데이타 플로우 개발환경을 위한 이클립스 플러그인을 설치한다.

이클립스에서 Help > Install New Software를 선택한 다음에, Work with 텍스트 박스에  https://dl.google.com/dataflow/eclipse/  을 입력한다.


다음으로 Google Cloud Dataflow를 선택하여 설치를 진행한다.

설치가 끝난 후 확인은 이클립스에서 New > Project를 하면, 위자드를 선택하는 화면에서 아래와 같이 Google Cloud Platform이라는 폴더와 함께 그 안에 “Cloud Dataflow Java Project”를 선택할 수 있는 화면이 나온것을 볼 수 있다.



헬로우 데이타 플로우

개발 환경 설정이 끝났으니, 이제 간단한 데이타 플로우 프로그램을 하나 만들어보자.

이 프로그램은 단어들을 읽어드린 후에, 단어들의 발생 횟수를 카운트 해 주는 파이프라인이다.



단어들을 읽어드린 후 toUpper라는 트랜스폼에서, 각 단어들을 대문자로 변환한 후, Count라는 트랜스폼에서 단어별로 발생횔 수를 카운트 한후에, 이를 Key Value (단어:발생횟수)로 리턴한 후, Print라는 트랜스폼에서 화면으로 결과를 출력해주는 예제이다.


프로젝트 생성

예제 파이프라인을 만들기 위해서, 이클립스에서 프로젝트를 생성해보자. New > Project를 선택한 후 에, 아래 그림과 같이 Google Cloud Platform 폴더에서 Cloud Dataflow Java Project를 선택한다



다음 프로젝트에 대해서  Group ID, Artifact ID 그리고 패키지 명등을 입력한다.



다음 메뉴로 넘어가면 구글 데이타 플로우를 실행하기 위한 디테일한 정보를 넣어야 하는데,




프로젝트 명과, “Cloud Storage Staging Location”이라는 정보를 입력해야 한다. Cloud Storage Staging Location은 Google Cloud Storage 의 버킷명으로, 데이타 플로우 애플리케이션 코드가 로딩 되는 장소이다.

데이타플로우 애플리케이션을 구글 클라우드에서 실행하게 되면, 애플리케이션 코드와 애플리케이션을 실행하기 위한 라이브러리들이 각각의 워커 노드로 배포 되는데, 배포를 위해서 먼저 클라이언트에서 부터, 이러한 실행 코드를 Google Cloud Storage에 올려놓게 된다. 앞에서 정의하는 “Cloud Storage Staging Location”은, 이 클라우드 스토리지 버킷에 대한 경로 정의이다.

클라우드 스토리지 버킷은 아래와 같인 Google Cloud Storage 메뉴에서 아래와 같이 생성할 수 있다.


코드 제작

그러면 코드를 작성해 보자.



package com.terry.df;


import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;

import com.google.cloud.dataflow.sdk.values.KV;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


public class StarterPipeline {

 private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);


 public static void main(String[] args) {

   Pipeline p = Pipeline.create(

       PipelineOptionsFactory.fromArgs(args).withValidation().create());


   p.apply(Create.of("Hello", "World","hello","boy","hello","girl"))

   .apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))

   .apply(Count.<String>perElement())

   .apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>(){

@Override

public void processElement(ProcessContext c) throws Exception {

LOG.info(c.element().getKey() + " count:"+c.element().getValue());

}

   }));


   p.run();

 }

}



(참고 : 위의 소스코드는 https://github.com/bwcho75/googledataflow/tree/master/HelloDataFlow 에 있다.)


처음 p.apply(Create.of…)에서, 데이타를 생성하였다.

다음으로 .apply(ParDo.named("toUpper").of(new DoFn<String, String>() 에서 소문자를 대문자로 다 치완하는 데, ParDo는 이 작업을 여러 노드에서 병렬로 실행하겠다는 선언이고, named는 이 트랜스폼의 이름을 “toUpper”로 정의하겠다는 정의이다. (나중에 디버깅에 유용한다.) 다음으로, 트랜스폼 함수는 DoFn으로 정의했는데, <String,String>으로 정의되어 앞의 인자가 Input 그리고 뒤의 인자가 Output의 데이타 형으로 String 인자를 받아서, String 인자로 리턴하겠다는 것이다.


.apply(Count.<String>perElement()) 은 데이타플로우에서 미리 정의된, 트랜스폼으로,  <String>으로 된 데이타를 받아서 엘리먼트당 카운트를 해서 <String,Long> 형으로 리턴을 해준다. 즉 String형의 단어마다 카운트를 한 결과를 Long형으로 넣어서 이를 키밸류(KV)형식으로 묶어서 리턴해준다.

.apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>() 에서는 앞에서 전달해준  String,Long형이 키밸류형으로 정의된 KV<String,Long>형의 데이타를 받아서, 출력해주고, 마지막 트랜스폼이기 때문에 더 이상 뒤로 데이타를 넘기지 않을 것이기 때문에, Output의 인지 타입을 Void로 선언하였다.

실행

코드를 작성이 끝났으면 실제로 실행해보자 Run As에서 Dataflow Pipeline을 선택하면 실행을 할 수 있다.



이때 다음과 같이 실행환경을 설정할 수 있다.



여기서 Runner에 대한 개념을 짚고 넘어갈 필요가 있다.

Direct Pipeline Runner

Direct Pipeline Runner는 데이타플로우 코드를 로컬 개발 환경 (노트북이나 데스크탑)에서 실행하고자 할때 선택할 수 있는 러너이다. 주로 개발이나 테스트에서 사용할 수 있는데, 다른 클라우드 서비스 예를 들어  Pub/Sub이나 빅쿼리등이랑 연동이 되는 파이프라인의 경우에는 DirectPipelineRunner를 사용할 수 없으니 주의하기 바란다.

DataflowPipelineRunner

클라우드 환경에서 데이타 플로우를 실행하기 위해서는 DataflowPipelineRunner와  BlockingDataflowPipelineRunner 두 가지가 있다.

DataflowPipelineRunner는 데이타 플로우 애플리케이션을 구글 클라우드에서 실행을 해주는데, 데이타 플로우 잡을 클라우드에서 실행해놓고, 로컬 애플리케이션을 바로 종료 한다. (클라우드에 접수된 잡은 클라우드에서 계속 실행된다.)

BlockingDataflowPipelineRunner

BlockingDataflowPipelineRunner는 데이타 플로우잡을 구글 클라우드에서 실행해놓게 해놓고, 잡이 끝날때 까지 로컬 애플리케이션을 대기하도록 한다.  

배치와 같이 끝이 있는 경우에는 필요에 따라서 사용할 수 있다. 스트리밍의 경우 BlockingDataflowPipelineRunner를 사용하게 되면 스트리밍 잡을 명시적으로 끊지 않는 이상 계속해서 로컬 애플리케이션이 실행되는 상태가 된다.


DirectPipelineRunner로 실행을 해보면 다음과 같이 이클립스 콘솔에서 결과가 출력되는 것을 볼 수 있다.


BODY는 1,  GIRL 은 1, HELLO는 3개 그리고 WORLD는 1개가 출력되는 것을 볼 수 있다.


이번에는 클라우드에 배포를 하고 실행해보자, Run As에서, BlockingDataflowPipelineRunner를 선택하여 실행해보자.

실행을 하면 코드가 자동으로 클라우드로 배포 되서 실행되는 것을 확인할 수 있다. 구글 클라우드 콘솔의 데이타 플로우 메뉴를 보면, 새로운 잡이 생성된것을 확인할 수 있다.


해당 잡을 선택해서 들어가 보면 현재 잡의 실행 상황과 함께, 파이프라인에서 단계별 시간이나 기타 상세한 로그를 볼 수 있다.



데이타 플로우 애플리케이션이 기동이 완료되면, Logs 메뉴에서 Worker Logs라는 버튼을 누르면 각 워커 노드에서의 로그를 볼 수 있다.


Worker Logs를 누르면 다음과 같이  GIRL,WORLD,BOY,HELLO에 대한 count 수를 출력한 로그를 확인할 수 있다.


참고 : Logs 메뉴로 들어가서  Job Logs에서  Minimum serverity를 “All” 로 선택하면 전체 작업 상황을 알 수 있는데, 애플리케이션을 실행했다고 바로 데이타 플로우의 파이프라인이 실행되는 것이 아니라, 애플리케이션 코드가 구글 클라우드 스토리에 로드되고, 이 로드된 코드들이 각각의 워커 노드에 배포가 된후에, 워커 노드가 기동이 되야 잡이 실제로 수행된다.


워커가 제대로 기동되었는지는 Job Logs에서 Mimimum serverity를 All로 한후에 다음과 같이 “Worker have started successfully”라는 로그가 나오면 그때 부터 데이타 플로우 잡을 실행을 시작한다고 생각하면 된다.








데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다.

데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #1/2


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다. 스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

개념 소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


먼저 dataflow의 개념을 이해해보도록 하자. 아래 그림은 dataflow에 대한 컨셉이다.


데이타가 들어오면, Pipeline IO에서 데이타를 읽어드린다. 읽어드린 데이타는 PCollection이라는 데이타 형으로 생성이 되고, 이 PCollection 데이타는 여러개의 중첩된 PTransform을 통해서 변환 및 가공이 된다. 가공이 끝난 결과는 마지막으로 Pipeline IO의 Output을 통해서 데이타 저장소 (빅쿼리나 파일등)에 저장이 된다.  이 Pipeline IO에서 부터 PTransform을 걸친 일련의 프로세싱 과정을 Pipeline이라고 한다.


예를 들어 설명해보자, 문자열을 입력 받은 후에, 문자열에서 단어를 추출하여, 각 단어의 개수를 세어 주는 파이프라인이 있다고 하자.


첫번째 실행에서 “Hello my daddy”라는 문자열이 입력되었다. 첫번째 Transform인 Extract words Transform을 거치면서, “Hello my daddy” 라는 문자열은 “Hello”, “my”, “daddy” 라는 각각의 단어로 쪼게진다. 다음으로 Count Element 라는 Transform에 의해서, 각 단어의 수를 세어서 저장한다. “Hello”는 1번, “my”는 1번, “daddy”는 1번 의 값이 저장된다.


두번째 실행에서 “Hello my bro” 라는 문자열이 들어오면, Extract words 에 의해서 “Hello”, “my”, “bro”라는 각각의 단어로 쪼게지고, Count Element Transform에서 이전에 세어놓은 단어의 수와 합산하여 계산이 된 결과가 저장이 된다. “Hello”는 이전에 한번 카운트가 되었고 이번에도 들어왔기 때문에, 2가 되고, 같은 원리로 “my”라는 단어의 카운트도 2가된다. “bro” 라는 단어는 이번에 처음 들어왔기 때문에 새 값으로 1로 저장된다.




세번째 “Hello my mom” 이라는 문자열이 들어오면 앞의 두개의 문자열과 마찬가지로 간 단어로 쪼게진 다음 Count Element에 의해서 각 단어의 수가 카운트되어 기존의 값과 누적 합산된다. 모든 데이타를 다 읽어서 처리가 끝나면, 저장된 결과를 Pipeline IO를 통해서 파일에 그 결과를 쓰게 된다.

배치와 스트리밍 처리

dataflow는 위에서 설명한 파이프라인의 개념을 배치와 스트리밍 처리 두가지 개념 모두로 지원해서 처리가 가능하다. 데이타가 파일과 같이 이미 쓰여지고 더 이상 증가나 수정이 되지 않은 데이타에 대해서는 일괄로 데이타를 읽어서 결과를 내는 배치 처리가 가능하고, 계속해서 들어오고 있는 데이타 (트위터 피드, 로그 데이타)는 스트리밍으로 처리가 가능하다.

윈도우의 개념

배치 처리야, 데이타 처리가 모두 끝난 후에 결과를 내보낸다고 하지만, 그렇다면 스트리밍 데이타는 계속해서 데이타가 들어오고 있는데, 언제 결과를 내보내야 할까?

개별 데이타를 변환해서 저장하는 경우에야, 개별 데이타 처리가 끝난후에 각각 하나씩 저장한다고 하지만, 위와 같이 들어오는 데이타에서 특정데이타 들에 대한 합이나 평균과 같은 처리를 하는 경우 어느 기간 단위로 해야 할까? 스트리밍 처리에서는 이러한 개념을 다루기 위해서 윈도우라는 개념을 사용한다.


예를 들어, “1시~1시10분까지 들어온 문자열에 대해서 문자열에 들어 있는 각 단어의 수를 카운트해서 출력해주는 기능" 이나, 또는 “매 5분 단위로 현재 시간에서 10분전까지 들어온 문자열에 대해서 각 단어의 수를 카운트 해서 출력 해주는 기능" 과 같이 작은 시간 기간의 단위를 가지고 그 기간 단위로 계산 하는 방법이며, 이 시간 단위를 윈도우(Window)라고 한다.


Fixed Window (고정 크기 윈도우)

앞의 예에서 1시~1시10분, 1시10분~1시20분 과 같이 고정된 크기를 가지는 윈도우의 개념을 Fixed Window라고 한다.


Sliding Window (슬라이딩 윈도우)

앞의 예에서와 같이 윈도우가 상대적인 시간 (이전 10분까지)의 개념을 가지면서, 다른 윈도우와 중첩되는 윈도우를 슬라이딩 윈도우라고 한다.


그림과 같이 1시10분의 윈도우는 1시 10분의 10분전인 1시에서 부터, 현재 시간 까지인 1시10분까지 값을 읽어서 처리하고 윈도우가 끝나는 시점인 1:10분에 그 값을 저장한다. 윈도우의 간격은 5분 단위로, 1시 15분에는 1시 15분의 10분전인 1시05분 부터 현재 시간인 1시15분까지 들어온 데이타에 대해서 처리를 하고 그 결과 값을 1시15분에 저장한다.

Session window (세션 윈도우)

다음은 세션 윈도우라는 개념을 가지고 있는데, 이를 이해하기 위해서는 먼저 세션의 개념을 먼저 이해해야 한다.

세션이랑 사용자가 한번 시스템을 사용한 후, 사용이 끝날때 까지의 기간을 정의한다. 스트리밍 시스템에서는 사용자 로그인이나 로그 아웃을 별도의 이벤트로 잡는 것이 아니기 때문에, 데이타가 들어온 후에, 일정 시간 이후에 그 사용자에 대한 데이타가 들어오지 않으면, 세션이 종료 된것으로 판단한다.

일반 적인 웹 프로그램에서 HttpSession과 같은 원리인데, 웹 사이트에 접속한 후, Session time out 시간이 지날때 까지 사용자가 별도의 request를 보내지 않으면 세션을 끊는 것과 같은 원리이다.

아래 그림은 세션 윈도우의 개념을 설명하기 위한 윈도우인데, User A와 User B의 데이타가 들어오고 있다고 하자.


그리고 세션 타임 아웃이 10분으로 정의했다. 즉 같은 사용자에 대해서 데이타가 들어온 후, 10분 내에 추가 데이타가 들어오지 않으면 세션이 종료 된것으로 판단한다.


User A는 1:00 에 첫 데이타가 들어와서1:00~1:10 사이에 두번째 데이타가 들어왔고, 1:10~1:20 사이에 세번째 데이타가 들어온 후, 네번째 데이타는 10분이 지난 후에 들어왔다. 그래서 1:00~1:20 까지가 하나의 세션이 되고, 이것이 User A에 대한 1:00~1:20의 세션 윈도우가 된다. 네번째 데이타 부터는 새로운 윈도우로 처리가 되는데, 1:40~1:50 사이에 다섯번째 데이타가 도착한후, 그 이후로 도착하지 않았기 때문에 이게 두번째 윈도우가 되고, 1:30~1:50의 시간 간격을 가지는 User A의 두번째 윈도우가 된다.

각 윈도우의 값은 User A의 1:00~1:20 윈도우의 값은 (1+1+1)로 3이 되고, 두번째 윈도우인 1:30~1:50 윈도우는 (2.5+1)로 3.5가 된다.


User B는 1:10에 데이타가 들어오고, 10분 후인 1:20까지 데이타가 들어오지 않고 그 이후 1:30 분에 두번째 데이타가 들어왔기 때문에, 1:10~1:10 길이의 첫번째 세션 윈도우가 생성된다. 다음 으로 1:30분에 데이타가 들어왔기 때문에 두번째 세션 윈도우를 생성하고, 2:00까지 계속 데이타가 들어오다가 멈추고 2:10까지 새로운 데이타가 들어오지 않았기 때문에 1:30~2:00 까지 두번째 윈도우로 취급한다.


이 Session Window는 앞서 언급한 Fixed Window나, Sliding Window와는 다르게, User A, User B와 사용자 단위와 같이 어떤 키에 따라서 개별적으로 윈도우를 처리 한다.  즉 Session Window는 User A나 USer B처럼 특정 키에 종속된 윈도우만을 갖는다.


반대로 Fixed Window나 Sliding Window는 키단위의 윈도우가 아니라 그 시간 범위내에 들어 있는 모든 키에 대한 값을 처리한다..

Fixed Window의 경우에는 30분 사이즈를 갖는 윈도우라고 하면 아래 그림과 같이


1:00~1:30 윈도우는 User A의 값 = (1+1+1) 과 User B의 값 1을 합쳐서 총 4가 되고

1:30~2:00 윈도우는 User A값 = (2.5+1)과 User B의 값 = (2+2+2) 를 합쳐서 9.5가 된다.


Sliding Window의 경우에는 길이가 30분이고, 주기가 20분인 Sliding 윈도우라고 할때,


1:00~1:30, 1:20~1:50, 1:40~2:00 3개의 Sliding 윈도우가 생성된다.

1:00~1:30 윈도우는 User A의 값=(1+1+1)과 User B의 값 1을 합산하여 4가 되고

1:20~1:50 윈도우는 User A의 값 = (2.5+1)과 User B의 값 =(2+2)를 합산하여 7.5가 된다.

1:40~2:00 윈도우는 User A의 값 = (2.5+1)과 User B의 값 (2+2)를 합산하여 7.5가 된다.




구글 데이타 스트리밍 데이타 분석 플랫폼 dataflow - #1 소개


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다.  스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


런타임

Apache Beam으로 작성된 코드는 여러개의 런타임에서 동작할 수 있다. 구글 클라우드의 Dataflow 서비스에서 돌릴 수 도 있고, Apache Flink나 Apache Spark 클러스터 위에서도 그 코드를 실행할 수 있으며, 로컬에서는 Direct Pipeline이라는 Runner를 이용해서 실행이 가능하다.


여러 런타임이 있지만 구글 클라우드의 Dataflow 런타임을 사용하면 다음과 같은 장점이 있다.


매니지드 서비스로 설정과 운영이 필요 없다.

스트리밍 처리는 하나의 노드에서 수행되는 것이 아니라, 여러개의 노드에서 동시에 수행이 되기 때문에, 이 환경을 설치하고 유지 보수 하는 것만 해도 많은 노력이 들지만, Dataflow는 클라우드 서비스이기 때문에 별도의 설치나 운영이 필요없고, 작성한 코드를 올려서 실행 하기만 하면 된다.

Apache Spark등을 운영해본 사람들은 알겠지만, Spark 코드를 만드는 것 이외에도, Spark 클러스터를 설치하고 운영 하는 것 자체가 일이기 때문에, 개발에 집중할 시간이 줄어든다.

오토 스케일링을 지원하기 때문에, 필요한 만큼 컴퓨팅 자원을 끌어다가 빠르게 연산을 끝낼 수 있다.

클라우드 컴퓨팅의 장점은 무한한 자원을 이용하여, 워크로드에 따라서 자원을 탄력적으로 배치가 가능한 것인데, Dataflow 역시, 이러한 클라우드의 장점을 이용하여, 들어오는 데이타량이나 처리 부하에 따라서 자동을 오토 스케일링이 가능하다.


그림처럼 오전에 800 QPS (Query per second)의 처리를 하다가 12시경에 부하가 5000 QPS로 늘어나면 그만한 양의 리소스 (컴퓨팅)를 더 투여해서 늘어나는 부하에 따라서 탄력적으로 대응이 가능하다.

리밸런싱(Rebalancing)기능을 이용하여 작업을 골고루 분배가 가능하다.

Spark이나 Hadoop Map & Reduce와 같은 대용량 분산 처리 시스템의 경우 문제가 특정 노드의 연산이 늦게 끝나서 전체 연산이 늦게 끝나는 경우가 많다. 예를 들어 1000개의 데이타를 10개씩 100개의 노드에서 분산하여 처리를 한후 그 결과를 모두 모아서 합치는 연산이 있다고 할때, 1~2개의 노드가 연산이 늦게 끝나더라도 그 결과가 있어야 전체 값을 합칠 수 있기 때문에, 다른 노드의 연산이 끝나도 다른 노드들은 기다려야 하고 전체 연산 시간이 느려 진다.


Dataflow의 경우는 이런 문제를 해결 하기 위해서, 리밸런싱(rebalancing)이라는 메카니즘을 발생하는데, 위의 그림(좌측의 그래프는 각 노드의 연산 시간이다.) 과 같이 특정 노드의 연산이 느려진 경우, 느려진 노드의 데이타를 다른 연산이 끝난 노드로 나눠서 재 배치하여 아래와 같이 전체 연산 시간을 줄일 수 있다.





데이타 스트리밍 처리에 대한 이해


조대협 (http://bcho.tistory.com)


근래에 Apache Beam 프로젝트를 공부하게 되서, 그간 묵혀놨던 데이타 스트리밍 처리에 대해서 다시 정리중인데, 예전에 Apache Storm을 봤을때 보다 트리거나, 윈도우등 많은 개념들이 들어가 있어서 데이타 스트리밍에 대한 개념 부터 다시 정리를 시작을 하고자한다.


Apache Storm에서 부터, Apache Spark 기반의 데이타 스트림 처리뿐 아니라 근래에는 Apache Flink와 같은 새로운 스트리밍 프레임웍크과 구글이 이미 클라우드를 통해서 서비스 하고 있는  google cloud dataflow (Apache Beam이라는 프로젝트로 오픈소스화 되었고, 현재 인큐베이션 단계에 있다.) 까지 빅데이타에 대한 실시간 처리성이 강조되면서 근래에 데이타 스트리밍 처리가 다시 주목 받는 것 같다. 이 문서는 구글이 개발한 dataflow에 대한 개념을 이해하기 위함이다.


본 문서의 내용과 그림은 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 를 참고하였다.


사전 개념 이해

스트리밍 데이타 처리를 이해하기 위해서는 몇몇 용어와 개념을 사전에 이해해야 하는 부분이 있다.

Bounded data 와 Unbounded data

먼저 스트리밍 데이타 처리를 이해하려면 데이타의 종류에 대해서 먼저 이해해야 한다.

  • Unbounded data 는 데이타의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이타라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이타를 Unbounded data 라고 한다.

  • Bounded data는 데이타가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이타를 뜻한다. 1월의 정산 데이타.

Event time과 Processing time

데이타의 발생 시간과 시스템에서 처리되는 시간이 차이가 있는데, 이를 각각 Event time과 Processing time 이라고 정의한다.

예를 들어, 게임에서 사용자가 공격을 한 이벤트를 서버에 전달해서 처리하여 저장하는 시나리오가 있다고 가정할때, 공격 이벤트가 1:00:00에 발생했으면, 이 데이타가 네트워크를 타고 서버로 도달하여 프로그램 로직을 수행하고 저장하는데 소요된 시간을 2초라고 가정하면, Event time 은 1:00가 되고, Processing time은 1:00:02가 된다.

이상적으로는 Event time과 Processing time이 동일하면 좋겠지만, 네트워크 시간이나 처리 시간에 따라 Processing time이 Event time 보다 늦고, 또한 Processing time에서 소요되는 실제 처리 시간은 일정하지 않고 아래 그림의 파란색 그래프(실제 처리 그래프) 처럼 들쭉 날쭉하다. 네트워크 상황이나, 서버의 CPU, IO 상황이 그때마다 차이가 나기 때문이다.


아래 그림을 통해서 개념을 다시 정리해보면,

X축은 Event time, Y축은 Processing Time이다. 0초에 발생한 데이타가 서버에 도착해서 처리하는 시간이 소요 되기 때문에, 아래 그림과 같이 Processing Time은 2초 부터 시작한다. Skew는 Event time과 Processing time간의 간격이다. 아래 그림에서 보면, Processing time에서 3초때에는 Event time 1초에서 발생한 데이타를 처리하고 있는데, 실제 Event time에서는 3초 시간의 데이타가 발생하고 있기 때문에, Processing time과 Event time은 약 2초의 지연이 발생하고 있고, 이를 Skew 라고 한다.



Bounded data의 처리

Bounded data는 이미 저장되어 있는 데이타를 처리하는 것이기 때문에 별다른 처리 패턴이 필요없다



데이타를 읽어서 한번에 처리해서 저장 하면 된다.

UnBounded data 처리

복잡한 것은 스트리밍 데이타 즉, Unbounded data 를 처리하는 방법인데, Unbounded data 는 크게 Batch와 Streaming 두 가지 방식으로 처리할 수 있다.

Batch로 처리

배치로 Unbounded data를 처리 하는 방식은 아래와 같이 두 가지 방식이 있다.

Fixed Windows

Fixed Windows 방식은 스트리밍으로 들어오는 데이타를 일정 시간 단위로 모은 후, 배치로 처리 하는 방식이다. 예를 들어서 아래 그림과 같이 10~11시 까지 데이타를 수집한후, 11시 이후에, 10~11시까지 들어온 데이타를 처리해서 정리 하는 방식이다.



이 방식은 구현이 간단하다는 장점이 있지만, 데이타가 수집 된 후 처리를 시작하기 때문에, 실시간성이 떨어진다. (근 실시간)

Streaming 처리

Unbounded 데이타를 제대로 처리하려면 스트리밍 처리를 하는 것이 좋은데, 스트리밍 처리 방법에는 아래와 같이 크게 Time agnostic, Filtering, Inner Join, Windowing 방식등이 있다.


스트리밍 처리는 배치 처리에 비해서 복잡한 것이, Unbounded 데이타는 기본적으로 특성이 Skew가 환경에 따라 변화가 심하고 그래서 데이타가 시스템에 도착하는 순서 역시 순차적으로 도착하지 않고 들쭉 날쭉 하다.

Time agnostic

Time agnostic 이란, 데이타가 시간 속성을 가지고 있지 않는 데이타 이다. 들어오는 데로 처리를 하면 되기 때문에, 별다른 노하우가 필요 없지만, 하나의 데이타 형이기 때문에 간단하게 언급만 한다.

Filtering

다음으로 많이 사용 되는 것이 필터링인데, 들어오는 데이타 중 특정 데이타만 필터링 해서 저장 하는 구조이다.


예를 들면, 웹 로깅 데이타를 수집해서, 특정 IP나 국가 대역에서 들어오는 데이타만 필터링해서 저장하는 시나리오등이 될 수 있다.

Inner joins (교집합)

Inner join은 두개의 Unbounded 데이타에서 들어오는 값을 서로 비교하여 매칭 시켜서 값을 구하는 방식이다.



모바일 뉴스 앱이 있다고 가정할때, 뉴스 앱에서는 사용자가 어떤 컨텐츠를 보는지에 대한 데이타를 수집 전송하고, 지도 앱에서는 현재 사용자의 위치를 수집해서 전송한다고 하자.

이 경우 사용자별 뉴스 뷰에 대한 Unbounded data 와, 사용자별 위치에 대한 Unbounded data 가 있게 되는데, 이 두개의 데이타 스트림을 사용자로 Inner Join을 하면 사용자가 어떤 위치에서 어떤 뉴스를 보는지에 대해서 분석을 할 수 있다.

Inner join을 구현하기 위해서는 양쪽 스트림에서 데이타가 항상 같은 시간에 도착하는 것이 아니기 때문에, 반대쪽 데이타가 도착할때 까지 먼저 도착한 데이타를 임시로 저장할 버퍼 영역이 필요하고, 이 영역에 임시로 일정 기간 데이타를 저장하고 있다가 반대쪽 스트림에서 데이타가 도착 하면 이를 조인해서 결과를 저장하고, 버퍼 영역에서 두개의 데이타를 삭제한다.

만약에 반대쪽의 데이타가 도착하지 않으면, 이 버퍼 영역에 데이타가 계속 쌓이기 때문에, 일정 기간이 지나면 반대쪽 스트림에서 데이타가 도착하지 않은 데이타를 주기적으로 삭제 해주는 (garbage collection) 정책이 필요하다.


cf. Inner join (교집합), Outer join (합집합)

Approximation algorithms (근사치 추정)

근사치 추정 방식은 실시간 데이타 분석에서 많이 사용되는데, 실시간 분석에서는 전체 데이타를 모두 분석할 수 있는 시간이 없는 경우가 많고, 시급한 분석이 필요한 경우가 있기 때문에, 전체 데이타를 분석하지 않고 일부만 분석하거나 또는 대략적인 데이타의 근사값만을 구하는 방법으로 해서, 빠르게 데이타를 분석하는 경우가 있다. 이를 근사치 추정 방식이라고 하는데, 예를 들어 VOD 서비스에서 지금 10분간 인기있는 비디오 목록, 12시간 동안 가장 인기 있는 판매 제품등 과 같은 시나리오인데, 이런 시나리오에서 데이타는 아주 정확하지 않아도 근사 값만 있으면 되고, 데이타를 그 시간에 보는 시급성이 중요하다.  이러한 시나리오에서는 전체 데이타를 다 보고 분석이 어렵기 때문에, 샘플링을 하거나 대략적인 근사 값만을 구해서 결과를 낸다.


이런 근사치를 추정하는 알고르즘은 K-means나 Approximate Top-N등이 이미 정의되어 있는 알고리즘이 많다.


참고 자료 :

Storm을 이용한 근사치 구하기 : https://pkghosh.wordpress.com/2014/09/10/realtime-trending-analysis-with-approximate-algorithms/

Apache Spark에서 K means로 근사치 구하기 :

https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html


Windowing

실시간 스트리밍 데이타 처리에서 중요한 개념중의 하나는 Windowing 인데, Windowing 이란 스트리밍 데이타를 처리할때 일정 시간 간격으로 처리하는 것을 정의한다.

예를 들어, 10분 단위의 Windowing의 경우 1시~2시까지 들어온 데이타를 1:10, 1:20,1:30, …  단위로 모아서 처리한다.

윈도우에는 자르는 방법에 따라서 다음과 같이 몇가지 방법이 있다.

Fixed Windows

정확하게 일정 시간 단위로 시간 윈도우를 쪼게는 개념이다. 앞에서 언급한 예와 같이 윈도우 사이즈가 10분 일때, 1시 10분은 1시00분~1시10분까지의 데이타를, 1시 20분은 1시10분~1시20분까지의 데이타를 처리한다.

Sliding Windows

Sliding Window 방식은 윈도우가 움직이는 개념이다.

슬라이딩 윈도우의 개념은 현재 시간으로 부터 +-N 시간 전후의 데이타를 매 M 시간 마다 추출 하는 것을 슬라이딩 윈도우라고 하고, 이 윈도우들은 서로 겹치게 된다.

예를 들면 현재시간으로부터 10분 전에서 부터  측정시간까지의 접속자를 1분 단위로 측정하는 시나리오가 될 수 있다. 매 1분 간격으로, 데이타를 추출하고, 매번 그 시간으로부터 10분전의 데이타를 추출하기 때문에 데이타가 중첩이 된다.  

이렇게 추출하는 간격을 Period (앞에서 1분), 그리고 추출하는 기간을 Length 또는 Size (앞에서 10분)라고 한다.



출처 : https://cloud.google.com/dataflow/model/windowing#sliding-time-windows

Session

다음으로는 Session Window의 개념이다.

Session Window에는 사용자가 일정 기간동안 반응이 없는 경우(데이타가 올라오지 않는 경우)에 세션 시작에서 부터, 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리한다

예를 들어서 세션 타임 아웃이 20분이라고 하고 데이타가 1:00 부터 올라오고 있는데,  1:01, 1:15에 데이타가 올라오고, 1:40분에 데이타가 올라오면 1:15 이후에 20분동안 (1:35까지) 데이타가 올라오지 않았기 때문에, 1:00,1:01,1:15은 하나의 세션으로 되고, 1:40은 새로운 세션 시작이 된다.



출처 : https://cloud.google.com/dataflow/model/windowing#session-windows


시간대별 Window 처리 방식

스트리밍 데이타에서 윈도우를 사용할때, 어느 시간을 기준 시간으로 할것인가를 정해야 하는데, 데이타가 시스템에 도착하는 Processing time을 기준으로 할 수 있고 또는  데이타가 실제 발생한 시간인 Event time을 기준으로도 할 수 있다.

Processing time based windowing

Processing time을 기준으로 데이타를 처리하는 것은 크게 어렵지 않다. 데이타가 도착한 순서대로 처리해서 저장하면 된다.


Event time based windowing

문제는 Event time을 기준으로 데이타를 처리 하는 경우인데, 데이타가 들어오는 것이 순서대로 들어오지 않는 경우가 많고, 또한 데이타의 도착 시간또한 일정하지 않다.




이 그림은 Event time을 기준으로 데이타를 처리하는 개념인데, 좌측 하얀색 화살표 처럼 12:00~13:00에 도착한 데이타가 11:00~12:00에 발생한 데이타 일 경우, 11:00~12:00 윈도우에 데이타를 반영해줘야 한다.

이러한 Event time 기반의 스트리밍 처리는 아래와 같이 기술적으로 두가지 주요 고려 사항이 필요하다.

  • Buffering
    늦게 도착한 데이타를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다.

  • Completeness
    Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가?
    즉 해당 시간에 발생한 모든 데이타는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이타가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다. (예를 들어 워터마크 기법 같은 것이 있는데, 이는 다음글에서 설명하도록 한다.)


지금까지 실시간 데이타 분석에 사용되는 대략적인 개념을 알아보았다. 다음 글에서는 Apache Beam을 이용하여 이러한 실시간 데이타 분석을 어떻게 구현하는지 알아보도록 하겠다.



참고 자료

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison#game-stats-advanced-stream-processing


Spark Key/Value Pairs

조대협 

http://bcho.tistory.com


RDD에는 어떤 데이타 형식이라던지 저장이 가능한데, 그중에서 Pair RDD라는 RDD가 있다. 이  RDD는 Key-Value  형태로 데이타를 저장하기 때문에, 병렬 데이타 처리 부분에서 그룹핑과 같은 추가적인 기능을 사용할 수 있다.

예를 들어 reduceByKey 와 같이 특정 키를 중심으로 데이타 연산 (각 키 값 기반으로 합이나 평균을 구한다던가) key 기반으로 join 을 한다던가와 같은 그룹핑 연산에 유용하게 사용할 수 있다.

Pair RDD를 생성하는 방법은 다음과 같다.

Java
mapToPair나 flatMapToPair 라는 메서드를 사용하면 된다.

mapToPair등의 함수를 이용할때, 아래와 같이 람다 표현식을 사용하는 방식이 있고

RDD.mapToParis( d-> new Tuple2(key,value))
(d는 RDD에서 읽어오는 값. Key는 새로운  RDD를 생성할때, 해당 Tuple의 키값,  value는 해당 Tuple의 Value값)

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(-> new Tuple2(s, 1));

또는 아래와 같이, Pair함수를 정의한 후에, Pair 함수를 mapToPair함수등에 넘기는 방법이 있다.

아래는 텍스트 라인에서 첫번째 단어를 Key로하고, 해당 라인을 Value로 하는 코드 예제이다.

PairFunction<String, String, String> keyData =
  new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
* Learning spark 코드 참조

Python

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)



Apache Spark(스파크) - RDD Persistence (스토리지 옵션에 대해서)


조대협 (http://bcho.tistory.com)


Spark Persistence에 대해서


앞에 글에서 Spark RDD가 메모리에 상주 되는 방법에 대해서 간략하게 언급했는데, 다시 되 짚어 보면 Spark의 RDD는 filter() 등. 여러  Transformation Operation을 실행하더라도  Transformation  단계가 아니라 Action이 수행되는 단계에 로드된다고 설명하였다.


그리고, 매번 해당 RDD가 Action으로 수행될 때마다 다시금 소스에서 부터 다시 로드되서 수행된다고 했는데, 그렇다면 매번 로드 해서 계산하여 사용하는 것이 아니라, 저장해놓고 사용 하는 방법이 무엇이 있을까?


스파크에서는 RDD 를 저장해놓고 사용하는 기능으로 persist()와 cache() 라는 두 가지 오퍼레이션을 지원한다.

스파크는 RDD를 저장함에 있어서,  메모리와 디스크 두 가지 영역을 사용하며, 옵션에 따라서 RDD의 저장 영역을 지정할 수 있다.


기본 디폴트는 메모리에 저장하고, 옵션으로 디스크를 지정할 수 있다. 디스크를 지정하면, 메모리에서 모지란 부분을 마치 swapping 하듯이 디스크에 데이타를 저장한다.


아래 옵션 참고 




출처 : https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


여기에 특이한 점은, 메모리나 디스크에 저장할때, RDD를 RAW (원본 형식)으로 저장할 것인지 자바의 Serialized 된 형태로 저장할 지를 선택할 수 있다. ( Serealized 된 형태로 저장하기 MEMORY_ONLY_SER, MEMORY_AND_DISK_SER) 이렇게 저장하면, 메모리 사용량은 더 줄일 수 있지만, 반대로 저장시 Serizalied하는 오버로드와, 읽을때 De-Seriazlie 하는 오버로드가 더 붙어서 CPU 사용량은 오히려 증가하게 된다.


아래 데이타는 http://sujee.net/2015/01/22/understanding-spark-caching/#.VWcOh1ntlBc 의 데이타 긴데,

Serialized 로 저장하는 경우, 최대 약 4배 정도의 메모리 용량을 절약할 수 있으나, 반대로, 처리 시간은 400배 이상이 더 들어간다.

 









 


다음으로, 특이한 옵션중에 하나가 OFF_HEAP 이라는 옵션인데, 스파크는 JVM 상에서 동작하기 때문에, 스파크가 저장하는 메모리란 JVM 상의 메모리를 뜻한다. JVM  특성상 Garbage collection 에 의한 성능 제약을 받을 수 있으며 또한 별도로 서로 복제가 되지 않기 때문에, (기본 옵션의 경우에만), 안정적인 서비스를 원할 경우에는 별도의 복제 옵션을 선택해야 한다.

이런 문제를 해결하기 위한 다른 옵션으로는 JVM 내에 데이타를 저장하는 것이 아니라, 별도의 JVM 외의 메모리 공간에 데이타를 저장하는 방식이 OFF_HEAP 이라는 옵션이다. 아직 안정화 되지는 않았지만, http://tachyon-project.org/ 이라는 메모리 클러스터를 이용하여, 서로 복제가 가능한 외부 메모리 클러스터에 저장하는 방식으로, JVM  상 메모리 보다는 성능이 약간 떨어지지만, 디스크보다는 빠르며, 큰 메모리 공간을 장애 대응에 대한 상관 없이 (자체 적으로 HA  기능을 제공함) 사용이 가능하다. 

 cf. Redis나  Infinispan등과 같은 메모리 기반의 데이타 그리드 솔루션의 하나인 Hazelcast 역시 JVM 밖의 네이티브 메모리 공간에 데이타를 저장하는 유사한 방식을 사용한다.


Persist vs Cache


그렇다면, persist()와 cache()의 차이점은 무엇인가? cache()는  persist() 에서 저장 옵션을 MEMORY_ONLY로 한 옵션과 동일하다.


저장된 RDD는 메모리나 디스크에서 언제 삭제 되는가?

RDD가 메모리나 디스크에 로드되었다고 항상 로드된 상태로 있는 것이 아니다. 기본적으로 LRU (Least Recently Used)  알고리즘 (가장 근래에 사용되지 않은 데이타가 삭제되는 방식)에 의해서 삭제가 되가나, 또는 RDD.unpersiste() 함수를 호출하면 명시적으로 메모리나 디스크에서 삭제할 수 있다.


언제 어떤 타입의 Peristence옵션을 사용해야 하는가?


가장 좋은 옵션은 디폴트 옵션인  MEMORY_ONLY  옵션이다. 가장 빠르다.

다음으로 메모리가 모자를 경우에는  MEMORY_ONLY_SER 옵션을 이용하면, Seriazlied 된 형태로 저장하기 때문에 메모리 공간은 줄일 수 있으나, 대신 CPU 사용률이 올라간다. 그래도 여전히 빠른 방식이다.

데이타 양이 많을 경우에는 DISK에 저장하는 옵션보다는 차라리 Persist 를 하지 않고, 필요할때 마다 재계산 하는 것이 더 빠를 수 있다.

빠른 응답이 필요한 경우에 Persist 된 데이타에 대한 유실을 방지하려면, replicated storage 옵션을 사용하는 것이 좋다. (MEMORY_ONLY2 등).  다른 스토리지 타입 역시, 장애로 인해서 데이타가 유실되더라도 재계산을 통하여 복구가 가능하지만, 재계산 하는 것 보다는 RDD 의 복제본을 저장해 놓고, 장애시 페일오버 하는 것이 빠르기 때문에, 빠른 응답시간을 요구로 하는 웹 애플리케이션의 경우 이 스토리지 타입이 유리하다. (단, 메모리 사용량은 복제본을 저장하는데도 사용되기 때문에 상대적으로 일반 스토리지 옵션에 비해서 메모리 여유가 적다.)



참고 

Learning Spark

Spark document - https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


참고 - 이 글은 제가 스파크를 혼자 공부하면서 문서만을 통해서 정리한 글이기 때문에, 실무적인 경험이 많이 녹아 들어 있지 않습니다. 

Apache Spark RDD 이해하기 #2


스파크에서 함수 넘기기 (Passing function to Spark)


조대협 (http://bcho.tistory.com)


Passing function
스파크는 개발자가 비지니스 로직을 함수로 정의한 후에, 이 함수를 스파크 클러스의 노드들로 보내서 수행할 수 있다. 스파크가 지원하는 프로그래밍 언어에 따라서, 이 함수를 넘기는 방법이나 특성이 다른데, 여기서는 Python을 이용하는 방법에 대해서 설명한다. (스칼라는 아직 공부를 못해서)

파이썬으로 함수 넘기기
파이썬으로 함수를 넘기는 방법은 크게 3가지가 있다.
  • 람다 표기법을 이용하는 방식
  • 모듈 상의 탑레벨 함수(Top-level function)
  • 파이썬 클래스 안에서 함수를 정의하여, 클래스 안에서 그 함수를 스파크로 넘기는 방식
각각의 방식에 대해서 살펴보도록 하자.

첫번째로 람다 표기법을 사용하는 방식이다.  람다 표기법이란, 함수를 정의하는데, 있어서 함수명이 없이 간략하게 함수의 기능만을 저장하는 표기법으로, 주로 간단한 결과를 구현할때, 코딩을 간결하게 (양이 적게) 표현하고자 하는데 사용한다.
다음은 필터 연산에서 함수를 람다 표기법으로 정의해서 넘기는 예제이다. “l” 이라는 RDD에 Apache 라는 문자열이 있는지 없는지를 행마다 체크하는 로직이다.


같은 로직을 함수를 정의해서 함수 자체로 넘길 수 가 있는데, 위의 람다 표현식으로 된 함수를 containsApache라는 함수로 정의하여 필터로 넘기는 예제이다.

마지막으로는 파이썬 클래스에서 클래스의 함수를 넘기는 방식이다.
다음은 MyClass 클래스를 정의한 다음. 클래스를 생성할때, filter에 사용할 문자열을 받은후, count라는 메서드에서, 그 문자열로 필터링을 한 후, 라인 수를 count하여 리턴하는 방법이다.


이 때 주의할 점은 self.query를 filter에 인자로 넘겼는데, 이 경우에 스파크로 넘어가는 것은 self.query 변수 내용 하나가 아니라, 이 객체 전체가 스파크로 넘어가게 된다. 동작상에는 문제가 없지만, 전체 객체가 스파크로 넘어가기 때문에 메모리 사용률이 많아지고, 전체 객체를 넘기는 과정 역시 인자만 넘기는 방식에 비해서 상대적으로 시간이 많이 걸리기 때문에 좋은 방법은 아니다. 
이런 문제를 피하는 방법은 클래스내에서 무슨 값을 넘길때는 self.xxx식으로 스파크에 넘기는 것이 아니라, 그 값을 복사하여 넘기는 방법을 사용하면 된다. 예를 들어서 위의 예제의 경우에는 아래와 같이 변경하면 된다.

즉 스파크에 self.query를 넘기는 것이 아니라 이 값은 로컬 변수인 x에 x=self.query로 저장한 후, 스파크에는 이 x 값을 넘기게 되면, 실제 모든 객체가 스파크에 전달되지 않고, 이 로컬 변후 x 만 넘어가기 때문에 메모리가 과 사용되는 것을 예방할 수 있다. 







Spark RDD  이해하기 #1

조대협(http://bcho.tistory.com)


기본 개념 잡기

RDD 는 여러 분산 노드에 걸쳐서 저장되는 변경이 불가능한 데이타(객체)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. (서로 다른 노드에서 분리되서 실행되는). 

쉽게 말해서 스파크 내에 저장된 데이타를 RDD라고 하고, 변경이 불가능하다. 변경을 하려면 새로운 데이타 셋을 생성해야 한다.

RDD의 생성은 외부로 부터 데이타를 로딩하거나 또는 코드에서 생성된 데이타를 저장함으로써 생성할 수 있다.

RDD에서는 딱 두 가지 오퍼레이션만 지원한다.
  • Transformation : 기존의 RDD 데이타를 변경하여 새로운 RDD 데이타를 생성해내는 것. 흔한 케이스는 filter와 같이 특정 데이타만 뽑아 내거나 map 함수 처럼, 데이타를 분산 배치 하는 것 등을 들 수 있다.
  • Action : RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는것으로 가장 쉬운 예로는 count()와 같은 operation들을 들 수 있다.

RDD의 데이타 로딩 방식은 Lazy 로딩 컨셉을 사용하는데, 예를 들어 sc.textFile(“파일”)로 파일을 로딩하더라도 실제로 로딩이 되지 않는다. 파일이 로딩되서 메모리에 올라가는 시점은 action을 이용해서 개선할 당시만 올라간다.
아래 코드를 보자 아래 코드는 “README.md” 파일을 RDD로 로딩 한후에

  1. pythonLines에 “Python”이라는 단어를 가지고 있는 라인만 추려서 새로운 RDD를 만들고,
  2. 그 다음 count() action을 이용하여, 그 줄 수 를 카운트 하는 예제이다.




그렇다면, 언제 실제 README.md 파일이 읽혀질까? 실제로 읽혀지는 시기는 README.md 파일을 sc.textFile로 오픈할 때가 아니라 .count() 라는 액션이 수행될 때 이다.
이유는 파일을 오픈할때 부터 RDD를 메모리에 올려놓게 되면 데이타가 클 경우, 전체가 메모리에 올라가야 하는데, 일반적으로 filter 등을 이용해서 데이타를 정재한 후에,  action을 수행하기 때문에, action을 수행할때, action수행시 필요한 부분만 (filter가 적용된 부분만) 메모리에 올리면 훨씬 작은 부분을 올릴 수 있기 때문에 수행시에 데이타를 로딩하게 된다. 그렇다면 로딩된 데이타는 언제 지워질까?
action을 수행한다음 바로 지워진다.

위에서 보면 lines.count()를 두번 수행하였는데, 이 실행시 마다 README.md 파일을 다시 읽어드린다. 만약에, 한번 읽어드린 RDD를 메모리에 상주하고 계속해서 재 사용하고 싶다면 RDD.persist()라는 메서드를 이용하면, RDD를 메모리에 상주 시킬 수 있다.

RDD 생성하기

앞에서도 언급했듯이, RDD를 생성하는 방법은 크게 두가지가 있다. 
  • 외부로 부터 파일을 읽어서 로딩하거나 파일은 일반 파일을 읽거나 S3,HBase,HDFS,Cassandra 등에서 데이타를 읽어올 수 있다. 
    파이썬 예제) lines = sc.textFile(“/path/filename.txt”)
  • 또는 드라이버 프로그램내에서 생성된 collection을 parallelize() 라는 메서드를 이용해서 RDD 화 하는 방법이다. (자바 컬렉션등을 RDD로 생성)
    자바 예제) JavaRDD<String> lines = sc.parallelize(Array.asList(“first”,”second”))

RDD Operations

1) Transformation (변환)

변환은 RDD를 필터링하거나 변환하여 새로운 RDD를 리턴하는 오퍼레이션이다.
다음 코드는 README.md 라는 파일을 읽어서 f 라는 RDD를 생성한후
f라는 RDD 에서 “Apache”라는 문자열을 가진 라인만을 모아서 t라는 RDD를 새롭게 생성한 후 화면으로 출력하는 예제이다.
f와 t는 전혀 다른  RDD로 RDD t는 filter에 의해서 새롭게 생성되었다.

<그림. 파이썬 예제>


변환 함수는 filter 뿐 아니라, map, group등 여러가지 함수들이 있으며, 자세한 함수 리스트는 https://spark.apache.org/docs/latest/programming-guide.html#transformations 를 참고하기 바란다.

2) Action (액션)

액션은 RDD를 가지고 계산을 해서 최종 결과를 리턴하거나 또는 데이타를 외부 저장소(External Storage)등에 쓸 수 있다.
최종 결과를 리턴하는 오퍼레이션으로는 앞의 예제에서도 설명한 count()나, 첫번째 element를 리턴하는 first등이 있으며, RDD를 저장하는 오퍼레이션으로는 saveAsTextFile(path)와 같은 오퍼레이션등이 있다.
 


본 포스팅을 오라일사의 "Learning Spark" 과  Sparing Programming Guide를 참고하여 작성하였습니다. https://spark.apache.org/docs/latest/programming-guide.html


Spark의 전체적인 스택 구조

조대협 (http://bcho.tistory.com)

스파크의  전체적인 스택 구조를 보면 다음과 같다.





  • 인프라 계층 : 먼저 스파크가 기동하기 위한 인프라는 스파크가 독립적으로 기동할 수 있는 Standalone Scheudler가 있고 (그냥 스팍만 OS위에 깔아서 사용한다고 생각하면 된다). 또는 하둡 종합 플랫폼인 YARN 위에서 기동될 수 있고 또는 Docker 가상화 플랫폼인 Mesos 위에서 기동될 수 있다.
  • 스파크 코어 : 메모리 기반의 분산 클러스터 컴퓨팅 환경인 스팍 코어가 그 위에 올라간다. 
  • 스파크 라이브러리  : 다음으로는 이 스파크 코어를 이용하여 특정한 기능에 목적이 맞추어진 각각의 라이브러리가 돌아간다. 빅데이타를 SQL로 핸들링할 수 있게 해주는 Spark SQL, 실시간으로 들어오는 데이타에 대한 리얼타임 스트리밍 처리를 해주는 Spark Streaming, 그리고 머신러닝을 위한 MLib, 그래프 데이타 프로세싱이 가능한 GraphX가 있다.

현재 글에서 설명하고 있는 부분은 먼저 스파크에 대한 기본을 이해하기 위해서 Spark Core 부분을 중점적으로 설명하고 있다. 




Apache Spark Cluster 구조

스팍의 기본 구조는 다음과 같다.
스팍 프로그램은 일반적으로 “Driver Program”이라고 하는데, 이 Driver Program 은 여러개의 병렬적인 작업으로 나뉘어져사 Spark의 Worker Node(서버)에 있는  Executor(프로세스)에서 실행된다.



1. SparkContext가 SparkClusterManager에 접속한다. 이 클러스터 메니져는 스팍 자체의 클러스터 메니져가 될 수 도 있고 Mesos,YARN 등이 될 수 있다. 이 클러스터 메니저를 통해서 가용한 Excutor 들을 할당 받는다
2. Excutor를 할당 받으면, 각각의 Executor들에게 수행할 코드를 보낸다.
3. 다음으로 각 Excutor 안에서 Task에서 로직을 수행한다.


  • Executor : Process
  • Task : A Unit of work that will sent to one executor

cf. Storm 과 개념이 헷갈릴 수 있는데, 
Storm 은 Node가 하드웨어 서버, Worker가 프로세스,Executor가 쓰레드
Spark 은 Worker Node가 하드웨어 서버, Executor가 프로세스 이다.  




Apache Spark 설치 하기


조대협 (http://bcho.tistory.com)


Spark 설치 하기


1. 스팍 홈페이지에서 다운로드.


다운로드시 Pre-built in Spark을 골라야 함. 여기서는 Hadoop 2.6용으로 빌드된 스팍을 선택한다.








2. 스팍 쉘을 실행 해보자


인스톨 디렉토리에서, 

%./bin/pyspark





을 실행하면, 위와 같이 파이썬 기반의 스팍 쉘이 실행됨을 확인할 수 있다.


3. 로깅 레벨 조정 및 간단한 스팍 예제


디폴트 로깅은 INFO 레벨로 되어 있기 때문에, 쉘에서 명령어를 하나라도 실행하면 INFO 메세지가 우루루 나온다. (몬가 할때 결과 값보다, 오히려 INFO 메세지가 많이 나온다.)

그래서, conf/log4j.properties 파일을 conf/log4j.properties.templates 파일을 복사해서 만든후 

log4j.rootCategory를 Info에서 WARN 레벨로 다음과 같이 수정한다.


log4j.rootCategory=WARN, console






환경 설정이 끝났으면 간단한 예제를 돌려보자

$SPARK_HOME 디렉토리에 있는  README.md 파일을 읽어서, 라인 수를 카운트 하는 예제이다.





스팍은 자체적으로 클러스터를 모니터링 할 수 있는 차체적인 Web UI가 있다. 

http://localhost:4040에 접속하면 다음과 같이 스팍 클러스터에 대한 모니터링 화명을 얻을 수 있다.









Zepplin (제플린) 설치하기

빅데이타/Zepplin | 2015.05.13 01:46 | Posted by 조대협

제플린 설치하기

 (맥북 기준 - Darwin  커널 기준)


1. 선행 설치

 git 설치

maven 3.3 설치

JDK 설치 (1.8 설치)


2. 소스코드 다운로드

% git clone https://github.com/NFLabs/zeppelin 





위와 같이 코드가 다운로드됨을 확인할 수 있음


3. 컴파일

코드가 다운되면 컴파일을 해야 하는데, 여기서는 간단한 테스트를 위해서 클러스터 모드가 아닌 로컬 모드로 설치를 진행한다.

% mvn install -DskipTests


이때 주의할점은 맥에서는 mvn으로 설치할때, 몇몇 의존성 모듈 설치시 루트 권한을 필요로 하는 것이 있기 때문에 % sudo mvn install -DskipTests 를 이용하여 루트 권한으로 설치한다.


설치시 다음과 같은 에러가 나올 수 있다.

[ERROR] Failed to execute goal com.github.eirslett:frontend-maven-plugin:0.0.20:install-node-and-npm (install node and npm) on project zeppelin-web: Execution install node and npm of goal com.github.eirslett:frontend-maven-plugin:0.0.20:install-node-and-npm failed: A required class was missing while executing com.github.eirslett:frontend-maven-plugin:0.0.20:install-node-and-npm: org/slf4j/helpers/MarkerIgnoringBase


이 에러는 eirlett모듈 설치시 나는 에러인데, maven 3.3에서 발생한다. 해결 방법은

$ZEPPELIN_HOME/zeppelin-web/pom.xml 파일에서 다음과 같이 eirlett버전을 0.0.20에서 0.0.23으로  변경하면 된다.


4. 컴파일이 끝났으면 구동하여 제대로 설치가 되었는지 확인하자


.% /bin/zeppelin-daemon.sh start 을 실행하면 제플린 서버가 기동된다.


다음 웹페이지로 접속하여 http://localhost:8080을 확인하면, 제플린이 가동됨을 확인할 수 있다. 





다음 글에서는 제플린에 대한 소개를 진행한다.

'빅데이타 > Zepplin' 카테고리의 다른 글

Zepplin (제플린) 설치하기  (1) 2015.05.13

머신 러닝 프레임웍에 대한 간단 정리


머신 러닝을 다시 시작해서 보다 보니 어떤 언어로 개발을 해야 하는지 의문이 들어서 페이스북 Server Side architecture 그룹에 올렸더니, 좋은 정보가 많이 들어왔다.

Matalab이나 R과 같은 언어는 수학 라이브러리가 풍부해서, 주로 모델을 만들어서 시뮬레이션 하는데 많이 사용되고

Python이 수학 라이브러리가 풍부해서 그런지 ML 부분에서 많이 사용되는데, Production 까지 올라가는 경우는 잘 못본거 같고, 주로 Python으로 모델을 프로토타이핑 하는 수준으로 사용되는 것으로 보인다. 아직까지 자세히는 보지 못했지만, 자바의 Spark이나 Mahout과 같은 분산 환경 지원성이 약하고, 언어의 특성상 다른 언어보다 성능이 떨어져서, 실제 Production은 다른 언어, 주로 자바를 많이 사용하는 듯 하다.


Python으로 ML을 하려면, numpy,matplot등 다양한 패키지를 설치해야 하는데, 이 경우 방화벽과 프록시가 있는 환경에서는 설치가 쉽지 않다. (몇시간을 무지 삽질했던 경험이.. Proxy를 설정해도 패키지 인스톨이 잘안되서)

Python의 경우 이런 주요 수학 라이브러리를 패키징해놓은 인스톨 패키징이 있는데

대표적으로 Continum의 아나콘다 http://continuum.io/downloads

http://www.scipy.org/ 등이 있다.

그리고 Python에서 많이 사용되는 ML 프레임웍으로는 http://scikit-learn.org/ 등이 있다.


각 언어별로 ML 지원 라이브러리와 사용 용도를 정리해놓은 글이 있다. https://github.com/josephmisiti/awesome-machine-learning


알고리즘을 직접 작성하는 경우가 대부분이겠지만, 왠만해서는 기존 알고리즘 보다 잘 만들기가 어렵기 때문에 기존 알고리즘을 잘 활용하거나 데이타 샘플링을 잘하거나 또는 구현 인프라를 최적화 하는 방안을 고려해볼 수 있겠고, 여러 알고리즘을 중첩 적용하여 조합 함으로써 좋은 결과를 이끌어내는 방법을 고려해볼 수 있겠다.
아울러 근래에는 클라우드에 ML 라이브러리를 제공하고 있기 때문에, Azure ML이나 IBM Watson등을 고려해볼 수 있다.

SSAG에서 관련된 몇가지 중요한 댓글 메모

하용호 참고로 애초에 분산환경을 활용하도록 만들어진 MLLib등을 제외하면, 자바든 C든 R이든 속도는 대동소이 합니다. 대부분 매트릭스 연산에 그쪽으로 최적화된 LAPACK이나 BLAS, 돈 좀 쓰면 MKL등의 라이브러를 가져다가 쓰게 되어 있어서요. 뭐랄까 다들 같은 육수집에서 육수 받아서 쓴다랄까. 파이썬 쓰세요 파이썬 ㅎㅎㅎ 으하핫

민경국 mvn clean package -DskipTests 가 문제없이 돌아가는 방화벽 상황이라면 제플린으로 스프크와 스파크ML 을 보시는 건 어떨지요?
mvn 명령 한방으로 제플린 + 스파크가 설치되니 학습하기 좋은것 같습니다.
...더 보기

서민구 R이 싫으면 파이썬이 좋은데 설치가 잘 안된다니 안타깝네요. Pandas, numpy, scipy, scikitlearn, nltk 정도만 있어도 좋은데요. 언어가 파이썬이라 개발자들이 쉽게 배우구요. 통계분석 라이브러리는 문서화가 미비하지만 scikitlearn 의 문서는 대단히 훌륭합니다.

KwangHo Yoon 몇년전에는 직접 구현하는 걸 선호했는데.. 지금은 python이나 R이 라이브러리가 너무 좋아서 저도 파이썬을 사용하고 있습니다. PredictionIO나 h2o를 사용하시면 hbase나 spark등의관리를 편하게 해주어서..대용량의 데이터를 처리할 때에도 머신 러닝에 더 집중하여 개발할 수 있습니다.h2o에도 위에서 말씀하신 제플린과 비슷한 h2o flow가 있는데..인터렉티브한 화면으로 예측 결과까지 제공합니다 https://www.youtube.com/watch?v=wzeuFfbW7WE



대충보는 Storm #6-Apache Storm 그룹핑 개념 이해하기

 조대협 (http://bcho.tistory.com)



지금까지 컴포넌트간의 경로 라우팅, 즉 Spout 에서 Bolt간, Bolt에서 Bolt간 경로를 설정하는 방법에 대해서 알아보왔다.

그렇다면 각 컴포넌트간 라우팅을 할때 그 안에 있는 Task간에는 어떻게 상세하게 라우팅이 될까? Storm에서는 이 Task간의 라우팅을 정의하기 위해서 Grouping이라는 개념을 사용한다.


Shuffling

가장 간단한 라우팅 방법으로 Bolt A에서 Bolt B로 라우팅을 한다고 했을때, Bolt A내의 있는 Task가 Bolt B에 있는 Task중 아무 Task로 임의로(랜덤하게) 라우팅 하는 방식이다.




Field

Bolt A에 있는 Task에서 Bolt B에 있는 Task로 라우팅을 할때, 규칙성을 갖는 것중 하나인데, 보내고자 하는 데이타의 특정 필드에 있는 값을 기준으로 Bolt B에 있는 특정 Task로 라우팅 하는 방식이다. 라우팅 기준은 지정한 필드의 값을 가지고 해쉬를 계산해서 해쉬에 따라서 Bolt B에 있는 Task로 라우팅 시키는 방식이다.

예를 들어, Bolt B에 Task가 3개가 있다고 가정할때, 나이라는 필드로 “Field Grouping”을 한다고 하면, 나이/3으로 나눈 나머지 값에 따라서 Task A,B,C로 라우팅 하는 방식이다. (나눗셈은 설명을 쉽게 하기 위해 예를 들었지만 비슷한 원리로 해쉬를 계산하여 라우팅을 한다.)

Bolt에서 로컬 캐쉬를 사용하거나 할때, 같은 해쉬의 데이타가 같은 Task로 라우팅이 되게 해서 캐쉬 히트율을 높이는 것등에 유용하게 사용될 수 있다.



Global 

Global 그룹핑은 모으는 개념(Aggregation)의 개념이다. Bolt A의 어느 Task에서 메세지를 보내더라도 항상Bolt B똑같은 하나의 Task로 라우팅이 되는 방식으로, Bolt B에 있는 Task중에서 Task ID가 가장 작은 특정 Task로만 라우팅을 한다.

분산해서 연산한 값을 모두 모아서 합산을 한다던가등에 사용할 수 있다.



All

All 그룹핑은 일종의 브로드 캐스트 개념으로 Bolt A의 하나의 Task가 메세지를 전송하면 Bolt B의 모든 Task가 메세지를 받는 형태이다.

각 Task들에 설정 변경등을 넘길때 유용하게 사용될 수 있다.





Direct

당연히 있을 것으로 생각했겠지만 당연히 있는 기능이다. Bolt A의 Task에서 Bolt B의 특정 Task로 명시적으로 라우팅을 지정하는 기능이다. 이때 주의할점이 Bolt B의 Task를 지정할때,  Task Id가 아니라 Task의 Index로 타겟을 지정한다. 예를 들어 Bolt B에 Task가 5개가 있을때, 0번, 1번식으로 타켓을 지정하게 된다.





Custom

Custom 그룹핑은 라우팅 로직을 개발자가 직접 작성해서 넣는 방식이다.


Local or Shuffle

다소 주의 깊게 볼 필요가 있는 그룹핑 방식이다. 기본적인 동작 방식은 Shuffle과 다르지 않으나,

Bolt A에서 Bolt B의 Task로 라우팅을 할때, Bolt A에서 메세지를 보내는 Task와 같은 JVM 인스턴스 (Woker)에 Bolt B의 Task가 있을 경우 같은 JVM 인스턴스에 있는 Task로 우선 라우팅을 한다. 이는 네트워크를 이용한 리모트 호출을 줄이기 위한 방법이다.

그러면 Bolt의 Task들은 각 Worker에 어떻게 배치 될것인가에 대한 질문이 올 수 있는데, 이렇게 Task를 Worker에 배치하는 행위를Scheduling(스케줄러)라고 하고, 배치를 하는 주체를 Scheduler라고 한다. 자료를 몇개 찾아봤지만 Scheduling 정책에 대해서는 명확하게 나와 있지 않고, 무작위 적으로 배치하는 것으로 보이는데, 조금 더 research가 필요할듯. 


참고 :Pluggable Scheduler

애플리케이션 성격에 맞게 스케쥴링 정책을 구현해서 사용할 수 있는데, 이를 Pluggable Scheduler라고 한다.

http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/ 의 예에 나와 있는 시나리오를 보면, 특정 Spout의 경우에는 상용 소프트웨어를 사용하는데, 이 상용 소프트웨어는 Machine당 라이센스를 가지고 있기 때문에 이 Spout은 반드시 라이센스가 설치된 서버에 스케쥴링(배포)되어야 한다. 그래서 특정 스케쥴링 정책이 필요한데, 첨부 링크에 있는 내용은 Pluggable scheduler를 구현하는 방법에 대해서 설명하고 있다.





 

대충보는 Storm #4-Apache Storm 특징과 기본 개념

 조대협 (http://bcho.tistory.com)


지금까지 Storm에 대해서 이해하기 위해서, 실시간 스트리밍 서비스의 개념에 대해서 알아보고 간단한 HelloStorm 애플리케이션을 제작해서, 싱글 클러스터 노드에 배포해봤다. 대략 실시간 스트리밍이 무엇이고, Storm을 이용해서 어떻게 개발하는지에 대해서는 어느정도 이해를 했을 것이라고 생각한다.

그러면 지금까지의 경험을 조금 더 체졔적으로 정리해서 Storm에 대해서 이해해보도록 하자. 이번에는 Storm에 대한 개념과 아키텍쳐 구조에 대해서 알아보겠다.


Storm의 특징

Storm을 실시간 스트리밍을 처리하기 위한 서버이자 프레임웍이다. 그렇다면 이 Storm이 다른 스트리밍 처리 솔루션에 비해 가지는 특징은 무엇일까?


확장성

Storm은 클러스터링 기능을 이용해서 수평으로 확장이 가능하다. 그래서 많은 데이타를 다루어야 하는 빅데이타이 데이타 스트림 서비스에서도 가능한데, Storm홈페이지에 포스팅된 자료를 보면, 2x2.4GHz CPU 24GB 메모리 머신을 기반으로 초당 100바이트짜리 메세지를 100만개 정도 처리가 가능하다고 한다. (https://storm.apache.org/about/scalable.html) TPS로 환산하면 100 TPS이다. (Wow!!)


장애 대응성

Storm의 다른 특징 중의 하나는 Fault tolerant 구조를 통한 장애 대응 능력이다. ZooKeeper를 통해서 전체 클러스터의 운영 상태를 감지 하면서, 특정 노드가 장애가 나더라도, 시스템이 전혀 문제 없이 작업을 진행할 수 있으며, 장애가 난 노드에 할당된 작업은 다른 노드에 할당해서 처리하고, 장애 노드에 대해서는 복구 처리를 자동으로 수행해준다.


메세지 전달 보장

Storm은 메세지 처리에 안정성을 제공하는데, 장애가 나건 문제가 있건간에,유실 없이 최소한 한번 메세지가 처리될 수 있게 지원한다. (at least once : 이말은 반대로 이야기 하면, 1번 이상 같은 메세지가 중복 처리될 수 있다는 이야기이다.)

만약에 정확하게 메세지가 한번만 처리가 되기를 원하면 Trident (https://storm.apache.org/documentation/Trident-tutorial.html) 를 통해서 Storm을 확장하면, 정확히 하나의 메세지가 한번만 처리되도록 할 수 있다.


쉬운 배포

Storm은 메뉴얼에 따르면(?) 배포와 설정이 매우 쉽다. 실제로 클러스터를 구성해보면 분산 시스템인데 비해서 별로 어려움 없이 배포와 설정이 가능하다. 그리고 메뉴얼에 따르면(?) 배포 후에, 크게 많은 관리 없이 운영이 가능하다고는 하는데, 이것은 실제로 해보지 않았기 때문에 패스

일단, 오픈소스인데도 설치 후 웹 기반의 모니터링 콘솔을 제공하기 때문에 시스템의 상태를 쉽게 모니터링 하고 운영하는데 도움을 준다.


여러 프로그래밍 언어 지원

Storm은 기본적으로 JVM (Java Virtual Machine)위에서 동작하기는 하지만, Twitter Thrift 프로토콜을 기반으로 하기 때문에, 다양한 언어로 구현이 가능하다. Java 뿐 아니라, JVM을 사용하지 않는 경우에 대해서는 stdin/stdout을 통해서 데이타를 주고 받음으로써, Ruby,Python,Javascript,Perl 등 다양한 언어를 사용할 수 있다.


다양한 시스템 연계

Storm은 다양한 다른 솔루션과 통합이 가능하다. 데이타를 수집하는 부분에서는 Kestrel (http://robey.github.io/kestrel/), RabbitMQ (http://www.rabbitmq.com/) , Kafka (http://kafka.apache.org/), JMS 프로토콜, mazon Kinesis (http://aws.amazon.com/kinesis/)

등이 연동이 가능하며, 다양한 데이타 베이스 (RDBMS, Cassandra, MongoDB )에도 쉽게 연계가가능하다. CEP(Complex Event Processing을 지원하는) 이벤트 처리 분야에서는  Drools (http://www.drools.org/),  Esper (http://esper.codehaus.org/등이 연계 가능하고. 그외에도 Elastic Search (http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/storm.html

) , node.js (https://github.com/paralect/storm-nodejs-starterkit)등 다양한 솔루션과 연동을 통해서 시스템을 확장해 나갈 수 있다.


오픈소스

마지막으로 Storm은 오픈소스이다. 상업적 활용이 가능한 Apache License 2.0을 따르고 있다.

Apache License 2.0 http://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EB%9D%BC%EC%9D%B4%EC%84%A0%EC%8A%A4

Apache License 2.0 한국 번역본 http://yesarang.tistory.com/272

Storm은 유사한 특징을 가지고 있는 Apache Spark에 비해서, 개발이 된지 오래되어서 안정성이 높고 특유의 구조상 장애 대처 능력과 메세지 전달 보장 능력등이 좋다. 반대로 Spark은 최근에 만들어진 만큼 머신 러닝등 더 많은 기능을 가지고 있다.

Storm의 기본 개념

자아 그러면 이제 Storm의 개념을 다시 정립해보자. Storm의 개념을 이해하려면 필수적으로 먼저 이해해야 하는것이 Spout Bolt의 개념이다.

Spout Bolt

Spout Storm 클러스터로 데이타를 읽어들이는 데이타 소스이다. 외부의 로그 파일이나, 트위터 타임 피드와 같인 데이타 스트림, 큐등에서 데이타를 읽어드린다. 이렇게 읽어 드린 데이타를 다른 Bolt로 전달한다. Spout에는 크게 4가지 중요한 메서드가 있다.

Ÿ   open() : 이 메서드는 Spout이 처음 초기화 될때 한번만 호출되는 메서드로, 데이타 소스로 부터의 연결을 초기화 하는 등의 역할을 한다.

Ÿ   nextTuple() : 이 메서드는 데이타 스트림 하나를 읽고 나서, 다음 데이타 스트림을 읽을 때 호출 되는 메서드 이다.

Ÿ   ack(Object msgId) : 이 메서드는 데이타 스트림이 성공적으로 처리되었을때 호출되는데, 이 메서드에서는 성공 처리된 메세지를 지우는 등, 성공 처리에 대한 후처리를 구현한다.

Ÿ   fail(Object msgId) : 이 메서드는 해당 데이타 스트림이 Storm 토폴로지를 수행하던중에, 에러가 나거나 타임아웃등이 걸렸을때 호출되는데,이때에는 사용자가 에러에 대한 에처 처리 로직을 명시해야 한다. 흔히 재처리 로직을 구현하거나 또는 에러 로깅등의 처리를 하게 된다.

Bolt는 이렇게 읽어 드린 데이타를 처리하는 함수이다. 입력 값으로 데이타 스트림을 받고, 그 데이타를 내부의 비지니스 로직에 따라서 가공한 다음에 데이타 스트림으로 다른 Bolt로 넘겨주거나 종료 한다. Bolt에서 정의되는 주요한 메서드는 다음과 같다.

Ÿ   prepare (Map stormConf, TopologyContext context, OutputCollector collector): 이 메서드는 Bolt 객체가 생성될때 한번 호출 된다. 각종 설정 정보나 컨텍스트등 초기 설정에 필요한 부분을 세팅하게 된다.

Ÿ   execute(Tuple input): 가장 필수적인 메서드로, Bolt에 들어온 메세지를 처리하는 로직을 갖는다. 종단 Bolt가 아닌 경우에는 다음 Bolt로 메세지를 전달하기도 한다.

Storm 클러스터내에는 여러개의 Spout Bolt가 존재하게 된다.



<그림. Storm Spout Bolt의 개념>

Topology

이렇게 여러 개의 Spout Bolt간의 연관 관계를 정의해서 데이타 흐름을 정의하는 것을 토폴로지(Topology)라고 한다. 아래 그림과 같이 데이타가 어디로 들어와서 어디로 나가는지를 정의하는 것인데, 아래 그림은 두 개의 Spout에 대해서 각각의 토폴로지를 정의하였다.



<그림. Storm Topology>

Spout Bolt간의 연결 토폴로지는 TopologyBuilder라는 클래스를 통해서 정의한다. 그러면 간략하게, Spout Bolt, Bolt간의 데이타 흐름 관계를 어떻게 정의하는지 살펴보도록 하자.

다음과 같은 토폴로지 흐름을 정의한다고 가정하자



<그림. 간단한 토폴로지 정의 예제>


HelloSpout은 앞서 예제에서 만든것과 같은 Spout이고,

EchoBoltA 는 각각 들어온 메세지에 “Hello I am BoltA :”+메세지를 붙여서 화면에 출력한 후 전송하고 EechoBoltB는 들어온 메세지에 “Hello I am BoltB :”+메세지를 붙여서 전송한다.


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class EchoBoltA extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Hello I am Bolt A: "+value);

               collector.emit(new Values("Hello I am Bolt A :"+value));

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

                 declarer.declare(new Fields("say"));

        }

 

}

<그림 EchoBoltA 클래스>


※ 참고 EchoBoltB 클래스도 클래스명만 다르고 내부 구현 내용은 동일하다

토폴로지를 정의할때, HelloSpout hs라는 ID로 생성을 할것이고, EchoBoltA eba라는 ID, EchoBoltB ebb라는 이름으로 생성을 할것이다.

토폴로지 생성 코드를 보자. 아래 노랑색으로 표시된 부분이 실제 토폴로지는 구성하는 부분이다.


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

import com.terry.storm.hellostorm.EchoBoltB;

import com.terry.storm.hellostorm.EchoBoltA;

 

public class ToplogySequence {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("hs", new HelloSpout(),1);

               builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

               builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

              

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("ToplogySequence", conf,builder.createTopology());

               Utils.sleep(1000);

               // kill the LearningStormTopology

               cluster.killTopology("ToplogySequence");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

}

 

<그림. 위의 그림에 있는 토폴로리지를 실제로 구현한 >

 

Spout을 구현한 부분을 보자


builder.setSpout("hs", new HelloSpout(),1);

 

를 통해서 Spout을 생성하는데, setSpout(“{id}”,”{Spout 객체}”,”{Parallelism 힌트}”) 로 이루어진다. 여기서 id“hs”로 정의했고, Spout 객체는 HelloSpout을 지정했다.

    Paralleism 힌트는 나중에 병령 처리와 그룹핑 개념을 설명할때 다시 설명하도록 한다.

다음으로 Bolt를 생성하는데,

builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

 

로 각각의 Bolt를 생성했다. 이때 주목해야 하는 점이 뒤에 붙어 있는 shufflerGrouping이라는 메서드인데, Spout Bolt간의 연관 관계는 이 Grouping이라는 개념을 이용해서 생성한다. Grouping에는 여러가지 종류와 개념이 있지만 여기서는 간단한 shuuflerGrouping만을 사용했다.첫번째 EchoBoltA에서 자신을 “eba” 라는 id로 생성을 한후에, suffelerGrouping(“hs”)를 선언했는데, 이는 “hs”라는 ID를 가지고 있는 Spout이나 Bolt로 부터 메세지를 받아들이겠다는 이야기이다. 두번째 EchoBoltBsuffelerGrouping(“eba”)를 통해서, id“eba” Spout이나 Bolt, 즉 앞서 생성한 EchBoltA로 부터 메세지를 받아들이겠다는 이야기이다.

자 그러면 이 토폴로지를 실행해 보자.

실행하면 다음과 같은 로그를 얻을 수 있다.

5399 [Thread-12-hs] INFO  backtype.storm.daemon.task - Emitting: hs default [hello world 1]

5400 [Thread-8-eba] INFO  backtype.storm.daemon.executor - Processing received message source: hs:4, stream: default, id: {}, [hello world 1]

Hello I am Bolt A: hello world 1

5401 [Thread-8-eba] INFO  backtype.storm.daemon.task - Emitting: eba default [Hello I am Bolt A :hello world 1]

5409 [Thread-10-ebb] INFO  backtype.storm.daemon.executor - Processing received message source: eba:2, stream: default, id: {}, [Hello I am Bolt A :hello world 1]

Hello I am Bolt B: Hello I am Bolt A :hello world 1

 

     5399 번 라인에서 12번 쓰레드에서 수행되는 “hs” 라는 이름의 Spout, “hello world 1” 이라는 문자열을 emit (제출) 하였다

     5400 번 라인에서 8번 쓰레드에서 수행되는 “eba”라는 Bolt“hs”라는 Spout또는 Bolt에서 “hello world”라는 문자열을 받았다. 그 다음 라인에 “Hello I am Bolt A: hello world 1”가 출력되는 것을 확인할 수 있다.

     5401 라인에서 8번 쓰레드에서 수행되는 “eba” 볼트가 “Hello I am Bolt A :hello world 1” 라는 문자열을 제출하였다.

     5409 라인세서 10번 쓰레드에서 수행되는 “ebb”라는 id의 볼트가 “eba”로 부터 “Hello I am Bolt A :hello world 1” 라는 메세지를 받았다. 다음 행에 EchoBoltB에 의해서 처리되어 “Hello I am Bolt B: Hello I am Bolt A :hello world 1” 문자열이 출력되었음을 확인할 수 있다.

Stream Tuple

다음으로 데이타 Stream Tuple에 대한 개념을 이해해야 한다.

Storm에서 데이타는 Stream이라는 개념으로 정의되는데, Stream이란, Spout Bolt간 또는 Bolt간을 이동하는 데이타들의 집합을 이야기 한다.

각각의 Stream은 하나의 Tuple로 이루어 지는데, Tuple형태로 정의된다.



<그림. Storm Stream Tuple 개념>


앞에 예제에서는 하나의 키만 있는 Tuple을 사용하였다.

앞에서 사용한 HelloSpout 클래스를 다시 한번 살펴보면


public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private static int count=0;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 if(count++<10) this.collector.emit(new Values("hello world "+count));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout>

nextTuple 부분에서 newValue로 하나의 값을 보내는 것을 볼 수 있다. 그리고 이 Tuple의 키 구조는 아래 declareOutputFields 메서드에서 “say” 라는 필드이름으로 정의된것을 볼 수 있다.

실제로 HelloSpout에서 생성하는 데이타 스트림은 다음과 같다.



<그림 데이타 Stream Turple 구조>

이번 글에서는 간단하게 Storm의 특징과 기본 개념에 대해서 알아보았다. 다음글에서는 조금더 상세한 Storm의 아키텍쳐의 개념과 병렬 처리 개념에 대해서 알아보도록 하겠다.

 

 

대충보는 Storm #3-Storm 싱글 클러스터 노드 설치 및 배포

조대협(http://bcho.tistory.com)

 

지난번에는 간략하게, Storm을 이용한 HelloStorm 애플리케이션을 개발용 클러스터인 Local Cluster에서 구동해봤다. 이번에는 운영용 클러스터를 설정하고, 이 운영 클러스터에 지난번에 작성한 HelloStorm 토폴리지를 배포해보도록 한다.

Storm 클러스터의 기본 구조

Storm 클러스터를 기동하기 전에, 클러스터가 어떤 노드들로 구성이 되는지 먼저 알아보도록 하자 Storm 클러스터는 기본적으로 아래와 같은 3가지 구성요소로 구성이 되어 있다.

먼저 주요 노드인 Nimbus Supervior 노드에 대해서 알아보자, Nimbus Supervisor 노드는 각각 하나의 물리 서버로 생각하면 된다.


Nimbus

Nimbus는 마스터 노드로 주요 설정 정보를 가지고 있으며, Nimbus 노드를 통해서 프로그래밍 된 토폴로지를 Supervisor 노드로 배포한다. 일종의 중앙 컨트롤러로 생각하면 된다. Storm에서는 중앙의 하나의 Nimbus 노드만을 유지한다.

Supervisor

Supervisor 노드는 실제 워커 노드로, Nimbus로 부터 프로그램을 배포 받아서 탑재하고, Nimbus로 부터 배정된 작업을 실행하는 역할을 한다. 하나의 클러스터에는 여러개의 Supervisor 노드를 가질 수 있으며, 이를 통해서 여러개의 서버를 통해서 작업을 분산 처리할 수 있다.

Zookeeper

이렇게 여러개의 Supervisor를 관리하기 위해서, Storm Zookeeper를 통해서 각 노드의 상태를 모니터링 하고, 작업의 상태들을 공유한다.

Zookeeper는 아파치 오픈소스 프로젝트의 하나로, 분산 클러스터의 노드들의 상태를 체크하고 공유 정보를 관리하기 위한 분산 코디네이터 솔루션이다.

전체적인 클러스터의 구조를 살펴보면 다음과 같다.



<그림. Storm 클러스터의 구조>

하나의 싱글 머신에 Nimbus가 설치되고, 다른 각각의 머신에 Supervisor가 하나씩 설치된다. 그리고, ZooKeeper 클러스터를 통해서 기동이 된다.

※ 실제 물리 배포 구조에서는 Nimbus Supervisor, ZooKeeper등을 하나의 서버에 분산배포할 수 도 있고, 여러가지 다양한 배포구조를 취할 수 있으나, Supervisor의 경우에는 하나의 서버에 하나의 Supervisor만을 설치하는 것을 권장한다. Supervisor의 역할이 하나의 물리서버에 대한 작업을 관리하는 역할이기 때문에, 한 서버에 여러 Supervisor를 설치하는 것은 적절하지 않다.

 

설치와 기동

Storm 클러스터를 기동하기 위해서는 앞에서 설명한바와 같이 ZooKeeper가 필요하다. Zookeeper를 다운로드 받은 후에, ~/conf/zoo_sample.cfg 파일을 ~/conf/zoo.cfg로 복사한다.

다음으로 ZooKeeper를 실행한다.

% $ZooKeeper_HOME/bin/zkServer.cmd


<그림. 주키퍼 기동 로드>


다음으로 Nimbus 노드를 실행해보자. Storm을 다운 받은 후 압축을 푼다.

다음 $APACHE_STORM/bin 디렉토리에서

%storm nimbus

를 실행하면 nimbus 노드가 실행된다. 실행 결과나 에러는 $APACHE_STORM/logs 디렉토리에 nimbus.log라는 파일로 기록이 된다.

정상적으로 nimbus 노드가 기동이 되었으면 이번에는 supervisor 노드를 기동한다.

%storm supervisor

Supervisor에 대한 노드는 $APACHE_STORM/logs/supervisor.log 라는 이름으로 기록된다.

Storm은 자체적으로 클러서터를 모니터링 할 수 있는 UI 콘솔을 가지고 있다. UI를 기동하기 위해서는

%storm ui

로 실행을 해주면 UI가 기동이 되며 http://localhost:8080 에 접속을 하면 관리 콘솔을 통해서 현재 storm의 작동 상태를 볼 수 있다.



<그림. Storm UI를 이용한 기동 상태 모니터링>


현재 하나의 PC nimbus supervior,UI를 모두 배포하였기 때문에 다음과 같은 물리적인 토폴로지가 된다.



<그림. 싱글 서버에 nimbus supervisor를 같이 설치한 예>

싱글 클러스터 노드에 배포 하기

싱글 노드 클러스터를 구축했으니, 앞의 1장에서 만든 HelloStorm 토폴로지를 이 클러스터에 배포해보도록 하자.

전장의 예제에서 만든 토폴로지는 Local Cluster를 생성해서, 자체적으로 개발 테스트용 클러스터에 토폴로지를 배포하도록 하는 코드였다면, 이번에는 앞에서 생성한 Storm 클러스터에 배포할 수 있는 토폴로지를 다시 만들어야 한다. 이 코드의 차이는 기존 코드와는 다르게 LocalCluster를 생성하지 않고, 기동중인 클러스터에 HelloTopoloy Submit하도록 한다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<그림. HelloTopology.java>


토폴로지 클래스를 만들었으면 이를 빌드해보자

%mvn clean install

을 실행하면 ~/target 디렉토리에 토폴로지 jar가 생성된것을 확인할 수 있다.



jar 파일을 배포해보도록 하자.

배포는 storm {jar} {jar파일명} {토폴로지 클래스명} {토폴로지 이름} 명령을 실행하면 된다.

% storm jar hellostorm-0.0.1-SNAPSHOT.jar com.terry.storm.hellostorm.HelloTopology HelloTopology

배포 명령을 내리면 $APACHE_STORM_HOME/logs/nimbus.log에 다음과 같이 HelloTopology가 배포되는 것을 확인할 수 있다.


2015-01-25T07:35:03.352+0900 b.s.d.nimbus [INFO] Uploading file from client to storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.365+0900 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.443+0900 b.s.d.nimbus [INFO] Received topology submission for HelloTopology with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "HelloTopology", "storm.id" "HelloTopology-1-1422138903"}

2015-01-25T07:35:03.507+0900 b.s.d.nimbus [INFO] Activating HelloTopology: HelloTopology-1-1422138903

2015-01-25T07:35:03.606+0900 b.s.s.EvenScheduler [INFO] Available slots: (["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6702] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6701] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6700])

2015-01-25T07:35:03.652+0900 b.s.d.nimbus [INFO] Setting new assignment for topology id HelloTopology-1-1422138903: #backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local\\nimbus\\stormdist\\HelloTopology-1-1422138903", :node->host {"226ceb74-c1a3-4b1a-aab5-2384e68124c5" "terry-PC"}, :executor->node+port {[3 3] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [6 6] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [5 5] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [4 4] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [7 7] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [2 2] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [1 1] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703]}, :executor->start-time-secs {[7 7] 1422138903, [6 6] 1422138903, [5 5] 1422138903, [4 4] 1422138903, [3 3] 1422138903, [2 2] 1422138903, [1 1] 1422138903}}

2015-01-25T07:37:48.901+0900 b.s.d.nimbus [INFO] Updated HelloTopology-1-1422138903 with status {:type :inactive}

해당 토폴로지가 배포되었는지를 확인하려면 storm list라는 명령어를 사용하면 현재 기동되고 있는 토폴로지 목록을 확인할 수 있다.




<그림. storm list 명령으로 기동중인 토폴로지를 확인>


실행 결과는 $APACHE_STORM_HOME/logs 디렉토리를 보면 worker-xxx.logs 라는 파일이 생긴것을 확인해 볼 수 있는데, 파일 내용을 보면 다음과 같다.

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

우리가 앞서 구현한 Bolt에서 System.out으로 출력한 내용이 출력된다.

동작을 확인하였으면, 이제 기동중인 토폴로지를 정지 시켜 보자. 토폴로지의 정지는 storm deactivate {토폴로지명} 을 사용하면 된다. 아까 배포한 토폴로지 명이 HelloTopology였기 때문에 다음과 같이 토폴로지를 정지 시킨다.

%storm deactivate HelloTopology

그 후에 다시 storm list 명령을 이용해서 토폴로지 동작 상태를 확인해보면 다음과 같다.



< 그림. Storm  토폴로지 정지와 확인 >


만약에 Topology를 재 배포 하려면 storm kill로 해당 토폴로지를 삭제한 후에, 다시 배포 해야 한다. 이때 주의할점은 storm kill로 삭제해도 바로 삭제가 안되고 시간 텀이 있으니 약간 시간을 두고 재 배포를 해야 한다.


지금까지 간단하게, 운영용 클러스터를 구성하고, 운영 클러스터에 토폴로지를 배포 하는 것에 대해서 알아보았다.

HelloStorm 코드 구현, 클러스터 노드 구축 및 배포를 통해서 간단하게 스톰이 무엇을 하는 것인지는 파악했을 것으로 안다. 다음 장에는 조금 더 구체적으로 Storm의 개념과 스톰의 아키텍쳐등에 대해서 살펴보도록 하겠다.