블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 

'빅데이타/스트리밍 데이타 처리'에 해당되는 글 20

  1. 2016.09.22 노트7의 소셜 반응을 분석해 보았다. - #3 제플린 노트북을 이용한 상세 데이타 분석
  2. 2016.09.20 노트7의 소셜 반응을 분석해 보았다. - #2 구현하기
  3. 2016.09.20 노트7의 소셜 반응을 분석해 보았다. (3)
  4. 2016.09.09 트위터 피드 실시간 분석 시스템 디자인
  5. 2016.07.04 실시간 빅데이타 처리를 위한 스트리밍 처리의 개념 (1)
  6. 2015.06.09 Apache Spark-Python vs Scala 성능 비교 (1)
  7. 2015.06.09 Apache Spark - Key/Value Paris (Pair RDD)
  8. 2015.05.31 Apache Spark - RDD (Resilient Distributed DataSet) Persistence
  9. 2015.05.26 Apache Spark - RDD (Resilient Distributed DataSet) 이해하기 - #2
  10. 2015.05.22 Apache Spark - RDD (Resilient Distributed DataSet) 이해하기 - #1
  11. 2015.05.22 Apache Spark 소개 - 스파크 스택 구조
  12. 2015.05.18 Apache Spark 클러스터 구조
  13. 2015.05.18 Apache Spark 설치 하기
  14. 2015.05.18 Apache Spark이 왜 인기가 있을까? (7)
  15. 2015.01.29 Apache Storm을 이용한 실시간 데이타 처리 #6 –Storm 그룹핑 개념 이해하기
  16. 2015.01.25 Apache Storm을 이용한 실시간 데이타 처리 #5 –Storm의 병렬/분산 처리
  17. 2015.01.25 Apache Storm을 이용한 실시간 데이타 처리 #4 –소개와 기본 개념 (2)
  18. 2015.01.25 Apache Storm을 이용한 실시간 데이타 처리 #3 -Storm 클러스터 설정과 배포
  19. 2015.01.12 Apache Storm을 이용한 실시간 데이타 처리 #2-Storm 설치와 HelloStorm 작성하기
  20. 2015.01.12 Apache Storm을 이용한 실시간 데이타 처리#1-데이타 스트림 개념 이해하기 (2)
 

노트7의 소셜 반응을 분석해 보았다. 


#3 제플린 노트북을 이용한 상세 분석



조대협 (http://bcho.tistory.com)



데이타 스튜디오는 편리하게 사용할 수 있지만, 쿼리 사용등이 불가능하기 때문에, 원본 데이타를 이용한 상세 분석에는 어려움이 있다. 원본 데이타를 이용해서 상세 분석을 하려면 노트북 계열의 애플리케이션이 효과적인데, 빅쿼리를 연동할 수 있는 노트북으로는 이전에 소개한 주피터 노트북 기반의 데이타랩 (datalab)과, 스파크나 다른 빅데이타 솔루션과 함께 많이 사용되는 제플린 노트북(zeppelin.apache.org) 이 있다.


지난 글에서 데이타랩에 대한 연동 방법은 이미 소개하였으니, 이번에는 제플린을 통하여, 빅쿼리의 데이타를 분석해보도록 한다.


제플린 설치

제플린을 설치 하는 방법은 간단하다. Zeppelin.apache.org 에서, 설치 파일을 다운로드 받는다.

빅쿼리 연동 인터프리터는 제플린 버전 0.61 버전 이상에 포함되어 있기 때문에, 0.61 버전 이상을 다운로드 받는다.  이 때 모든 인터프리터가 포함된 버전을 다운 받아야 한다. (아니면 별도로 인터프리터를 설치해야 하는 번거로움이 따른다.)


다운 로드 받은 파일의 압축을 푼다. 다음으로 제플린 설치 디렉토리로 들어가서 다음 명령어를 수행한다.

% ./bin/zeppelin.sh

윈도우의 경우에는 %./bin/zeppelin.cmd 를 실행하면 된다.

자바 애플리케이션이기 때문에 별도의 설치 과정이 필요없고, 제플린 애플리케이션을 실행하기만 하면 된다.

제플린이 기동되었으면 브라우져에서 http://localhost:8080 으로 접속하면 다음과 같이 제플린 콘솔을 볼 수 있다.

노트북 생성

제플린 콘솔에 들어왔으면 초기화면에서 Create new note 라는 메뉴를 이용하여 새로운 노트북을 생성하자. 여기서는 편의상 “BQ 노트북" 이라는 이름으로 노트북을 생성하였다.


분석 쿼리 작성

이제 분석할 내용은 수집된 트윗의 명사들에 대해서, 시간 단위로 그룹핑을 한 다음에, 각 단어에 대해서 발생한 횟수를 카운트해서 보여주는 내용을 구현하려고 한다.

예를 들어서 9월20일에는 “유행" 이라는 단어가 200회 발생하였고, “패션" 이라는 단어가 100회 발생하였다. 라는 식으로 조회를 하려고 한다.


현재 테이블 구조는 다음과 같다.

Date (발생 시간)

Noun (명사)

count (발생 횟수)


SQL 문장을 작성해보자

select date,noun,sum(count) from 테이블명

group by date,noun


이렇게 쿼리를 하면, 시간대 별로, 명사와 그 명사의 발생 횟수를 리턴을 해주는데, 우리는 앞의 데이타 플로우 프로그램에서 30초 단위로 통계를 집계하도록 하였기 때문에, 30초 단위로 결과가 리턴된다. 우리가 원하는 결과는 30초 단위가 아니라 1시간 단위의 결과 이기 때문에, 다음과 같이 쿼리를 수정한다.


select  DATE(date) as ddate,HOUR(date) as dhour,noun,sum(count) from 테이블명

group by ddate,dhour,noun


DATE와 HOUR라는 함수를 사용하였는데, DATE는 타임 스탬프 형태의 컬럼에서 날짜만을 추출해주는 함수 이고, HOUR는 타임 스탬프 형태의 컬럼에서 시간만을 추출해주는 함수 이다.

이렇게 날짜와 시간만을 추출한 다음에, group by 절을 이용하여, 날짜와,시간 그리고 명사로 그룹핑을 하게 되면 우리가 원하는 것과 같이 각 날짜의 시간대별로 명사별 발생횟수 ( sum(count)) 값의 통계를 얻을 수 있다.


제플린에서 빅쿼리 명령을 수행하려면 다음과 같이 %bigquery.sql 이라고 첫줄에 선언을 한 다음에 SQL 문장을 수행하면 된다.




결과는 디폴트로 테이블 형태로 나오는데, 아래 아이콘 중에서 그래프 아이콘을 누르면 그래프 형태로 볼 수 가 있는데, 이 때 X,Y축의 변수를 지정할 수 있다.

아래 그림과 같이 Keys (X축을) ddate,dhour를 선택하고 Values(Y축)을 dhour SUM을 선택하면, 시간별 나타난 단어수를 볼 수 있다.



그런데 이 쿼리를 수행하면, 각 시간별로 발생한 명사 단어의 수가 매우 많기 때문에, 보기가 매우 어렵다.

그렇다면 시간대별로 발생한 명사중에서 각 시간대별로 많이 발생한 명사 5개씩만을 볼 수 없을까? 즉 group by를 전체 데이타 구간이 아니라, 각각 시간대 별로 계산을 해줄 수 는 없을까 하는 필요가 발생한다.

빅쿼리 파티셔닝

데이타를 구간 별로 나눠서 연산할 수 있는 기능으로 빅쿼리에는 파티션이라는 기능이 있다.

예를 들어서 group by를 전체 결과에 대해 그룹핑을 하는 것이 아니라, 앞에서 언급한 요건 처럼 일 단위로 짤라서 그룹핑을 하는 것이 가능하다.




파티션을 이용해서 할 수 있는 것은 파티션별로 합계나, 통계를 내거나, 파티션의 각 로우의 값의 백분율(%)나 또는 소팅한 순서등을 볼 수 있다. 여기서는, 시간으로 파티션을 나누고  파티션내에서 명사의 수가 많은 수 순서대로 소팅을 한후에, RANK라는 함수를 이용하여 그 파티션에서 그 명사가 몇번째로 많이 나타났는지를 출력하도록 해보겠다.


파티션의 사용법은 다음과 같다.

“파티션 함수 OVER (PARTITION BY 파티션을할 키 목록)”

여기서는 일/시간 별로 파티션을 나눈 후에, 그 순위별로 소팅을 할 것이기 때문에, 다음과 같은 식을 쓴다.

RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank


이를 쿼리에 적용하면 다음과 같다.

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc


그러면 다음과 같이 일/날짜 파티션별로 많이 발생한 명사 순으로 발생횟수와 순위(rank)를 출력해준다.



그런데 쿼리를 돌려보면 알겠지만, 시간대별로 수집한 명사의 종류가 많기 때문에, 일자별 데이타가 매우 많다. 그래서 파티션별로 많이 등장하는 단어 5개만을 보려고 하면 rank <5 인것만 걸러내면 된다. 이는 중첩 쿼리를 이용해서 수행이 가능하다

다음은 이를 적용한 예이다.


SELECT ddate,dhour

   ,noun

   , rank

from (

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   where noun != "note7" and noun != "samsung" and noun !="galaxy"

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc

   )

where rank < 6


이렇게 하면, 각 시간대별로 자주 등장하는 단어 6개만을 보여준다.


이 쿼리를 이용하여 데이타를 어떻게 분석하는지는 예전글 http://bcho.tistory.com/1136 을 참고하세요.


간단하게나마 트위터 피드에서 특정 키워드를 기반으로 하여, 명사와 형용사를 추출하여 소셜 반응을 분석하는 애플리케이션 개발과 데이타 분석 방법에 대해서 설명하였다.

아이폰7을 분석해보니, 명사 분석도 의미가 있었지만, 아이폰7에 대한 기대를 형용사 분석을 통해서도 많은 인사이트를 얻을 수 있었다. Awesome, excellent와 같은 기대치가 높은 형용사가 많이 검출되었고 bad, fuck 과 같은 부정적인 의미의 형용사는 다소 낮게 검출되었다. (아마 이즈음이 노트7 폭발로 인하여 반사 이익을 얻은게 아닐까 추정되는데.)


이외에도, 이모콘티만 추출하여 분석을 한다거나, 부사등을 통해서 분석을 하는 것도 가능하고, 구글 자연어 처리 API는 글을 통해서 사람의 감정을 분석해주는 기능도 있기 때문에 응용 분야는 훨씬 더 넓다고 볼 수 있다.

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


노트7의 소셜 반응을 분석해 보았다. 


#2 구현하기


조대협 (http://bcho.tistory.com)

지난번 글 http://bcho.tistory.com/1136에 이어서, 트위터를 통한 소셜 반응을 분석하는 시 스템을 구축하는 방법에 대해서 알아본다. 

시나리오 및 아키텍쳐

스트리밍 처리와 데이타 플로우에 대한 개념 이해가 끝났으면 이제 실제로 실시간 분석 애플리케이션을 만들어보자.

SNS를 이용한 마케팅 분석에서 대표적인 시나리오중 하나는 트위터 피드를 분석하여, 사람들의 반응을 분석하는 시나리오이다. 자주 언급 되는 단어나 형용사를 분석함으로써, 특정 제품이나 서비스에 대한 소셜 네트워크상의 바이럴 반응을 분석할 수 있는데, 여기서  구현하고자 하는 시나리오는 다음과 같다. 트위터 피드에서 특정 키워드로 트윗 문자열들을 수집한 후에, 구글의 자연어 분석 API를 통하여 트윗 문자열에서 명사와 형용사를 추출한다

추출한 명사와 형용사의 발생 횟수를 통계내어서 대쉬보드에 출력하는 시나리오이다.


이를 구현하기 위한 솔루션 아키텍쳐는 다음과 같다.


fluentd를 이용하여 트위터의 특정 키워드를 기반으로 트위터 피드를 수집하고, 수집된 피드들은 구글 클라우드의 큐 서비스인 Pub/Sub으로 전달된다. 전달된 데이타는 데이타 플로우에서 읽어서 필요한 데이타만 필터링한 후, 구글의 자연어 분석 API를 통해서 명사와 형용사를 분리한다.

분리된 명사와 형용사는 데이타플로우에서 30초 주기의 고정윈도우(Fixed Window) 단위로, 명사에서 발생한 단어의 수와, 형용사에서 발생한 단어의 수를 카운트 한 다음에, 빅쿼리에 명사 테이블과 형용사 테이블에 저장한다.

저장된 데이타는 구글의 리포팅 도구인 데이타 스튜디오를 통해서 그래프로 출력한다.


구현

그러면 위에서 설명한 아키텍쳐대로 시스템을 하나씩 구현해보자.

전체 예제 코드와 설정 파일은 https://github.com/bwcho75/googledataflow/tree/master/twitter 에서 받아볼 수 있다.

트위터 피드 수집 서버 설정

먼저 트위터에서 피드를 수집하기 위해서 fluentd 에이전트를 설정한다. 구글 컴퓨트 엔진에서 VM을 생성한 후에, 앞의 빅쿼리 예제에서 한것과 마찬가지로 fluentd 에이전트를 설치한다.

VM을 설치할때, 반드시 Cloud API access scopes를 full API access로 설정해야 하는데, 이 VM에서 fluentd를 통해서 수집한 피드를 Pub/Sub으로 전달할때, Pub/Sub API를 사용하기 때문이다.


Fluentd 가 설치되었으면 Pub/Sub으로 데이타를 전달하기 때문에,Fluentd pub/sub 에이전트를 추가설치 한다.

에이전트명은 “fluent-plugin-gcloud-pubsub”로

% sudo td-agent-gem install fluent-plugin-gcloud-pubsub

명령을 이용해서 설치한다.


에이전트 설치가 끝났으면 fluentd 에이전트 설정을 해야 한다.

다음은 트위터에서 “note7”에 관련된 피드를 읽어서 pub/sub 큐로 피드를 전송하는 fluentd 설정 예제이다.


<source>

 type twitter

 consumer_key        트위터 Consumer Key

 consumer_secret     트위터 Consumer Secrect

 oauth_token         트위터 Access Token

 oauth_token_secret  트위터 Access Token Secrect

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             note7

 output_format       nest                    # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type gcloud_pubsub

 project 본인의 프로젝트명

 topic projects/본인의 프로젝트명/topics/twitter

 key 다운로드받은 구글 클라우드 억세스 토큰 JSON 파일

 flush_interval 10

 autocreate_topic false

</match>


Fluentd 설정이 끝났다.

Pub/Sub 큐 설정

다음으로는 fluentd 읽어드린 트위터 피드를 받아드를 Pub/Sub 큐를 생성한다.

큐 생성 방법에 대해서는 앞의 Pub/Sub 챕터를 참고하기 바란다. (http://bcho.tistory.com/1120)

큐 이름은 twitter라고 한다. 전체 큐 이름은 “projects/본인 프로젝트명/twitter” 가 된다.

데이타 플로우 프로젝트 생성

큐까지 데이타를 읽어드렸으면, 이 데이타를 처리할 데이타 플로우 파이프라인을 구현한다.

이클립스에서 데이타 플로우 파이프라인 프로젝트를 생성하자. 프로젝트 생성은 앞장의 “데이타 플로우 개발환경 설정" 부분을 참고하기 바란다. (http://bcho.tistory.com/1128)


프로젝트가 생성되었으면, 이 프로젝트에서 사용할 의존성 라이브러리들을 메이븐 (maven) 빌드 스크립트인 pom.xml에 추가해준다.

추가해야 하는 API는 JSON 파싱을 위한 javax.json-api와, javax.json 그리고 구글의 자연서 분석 API를 호출하기 위한 google-api-client와 google-api-service-language 모듈이다.


다음 코드 블럭을 <dependencies> 엘리먼트 아래 하부 엘리먼트로 추가해준다


   <dependency>

   <groupId>javax.json</groupId>

   <artifactId>javax.json-api</artifactId>

   <scope>provided</scope>

   <version>1.0</version>

</dependency>

<dependency>

   <groupId>org.glassfish</groupId>

   <artifactId>javax.json</artifactId>

   <version>1.0.4</version>

</dependency>

<!-- NL API dependency -->

<dependency>

     <groupId>com.google.apis</groupId>

     <artifactId>google-api-services-language</artifactId>

     <version>v1beta1-rev7-1.22.0</version>

   </dependency>

   <dependency>

     <groupId>com.google.api-client</groupId>

     <artifactId>google-api-client</artifactId>

     <version>1.22.0</version>

   </dependency>


데이타 플로우 코드 작성

전체 파이프라인 흐름

파이프라인 코드 작성에 앞서서 전체 파이프라인 흐름을 살펴보자

전체 흐름은 다음과 같다.


  1. Read From PubSub
    PubSub의 “twitter” 큐에서 JSON 형태의 트위터 메세지를 읽는다.

  2. Parse Twitter
    트위터 JSON 메세지를 파싱한 후, 전체 메세지에서 트윗 메세지를 저장하고 있는 “text” 필드와 언어셋을 정의하고 있는 “lang” 필드만 추출한다.
    자연어 분석 API가 아직 영어, 스페인어, 일본어만 지원하기 때문에, 이 예제에서는 영어로 트윗만 추출하도록 한다.

  3. NL Processing
    앞에서 추출한 트윗 메세지를 구글의 자연어 분석 API에 분석을 요청하여 명사와 형용사만 추출해낸다.

  4. 명사 처리 파이프라인
    다양한 처리 방식을 보여주기 위해서, 이 예제에서는 하나의 데이타 스트림을 분기 처리하여 두개의 데이타 파이프라인에서 처리하는 방식으로 구현하였다. 명사 처리 파이프라인은 다음과 같은 단계를 거친다.

    1. Noun Filter
      명사와 형용사 리스트로 들어온 데이타 중에서 명사만 필터링 한다.

    2. Window 적용
      고정 크기 윈도우 (Fixed Window) 30초를 적용하여, 30초 단위로 데이타를 분석하도록 한다.

    3. Count.PerElement
      명사 단어와, 각 단어별 발생횟 수를 30초 단위로 모아서 카운트 한다.

    4. Noun Formating
      카운트된 결과를 빅쿼리에 쓰도록, [윈도우 시작 시간,명사 단어, 발생횟수] 형태의 빅쿼리 ROW(행) 데이타 타입으로 포매팅 한다.

    5. Write Noun Count to BQ
      포매팅 된 데이타를 빅쿼리에 쓴다.

  5. 형용사 처리 파이프라인
    형용사를 처리하는 파이프라인도 내용은 명사를 처리한 파이프라인과 다르지 않고 동일하게 다음과 같은 순서를 따른다.

    1. Adj Filter

    2. Window 적용

    3. Count.PerElement

    4. Adj Formating

    5. Write Adj Count to BQ

빅쿼리 데이타 구조

빅쿼리에는 두개의 테이블에 데이타를 나눠서 저장하였다.

명사와 형용사 테이블로 각각의 테이블 명과 구조는 다음과 같다.


명사 테이블 : noun

필드명

데이타 타입

date

TIMESTAMP

noun

STRING

count

INTEGER


형용사 테이블 : adj

필드명

데이타 타입

date

TIMESTAMP

adj

STRING

count

INTEGER

자연어 분석 클래스 작성

전체 데이타 흐름과 저장 구조가 이해되었으면, 파이프라인 코드 작성에 앞서서 자연어 처리 API를 호출하는 로직을 만들어보자


우리가 사용할 API는 String으로 문자열을 주면 다음과 같이 NLAnalyzeVO 객체로 분석 결과를 리턴해주는 코드이다.


package com.terry.nl;


import java.util.ArrayList;

import java.util.List;


public class NLAnalyzeVO {

List<String> nouns = new ArrayList<String>();

List<String> adjs = new ArrayList<String>();

List<String> emoticons = new ArrayList<String>();

float sentimental;


public List<String> getNouns() {

return nouns;

}


public List<String> getAdjs() {

return adjs;

}


public List<String> getEmoticons() {

return emoticons;

}


public float getSentimental() {

return sentimental;

}


public void setSentimental(float sentimental) {

this.sentimental = sentimental;

}

public void addNouns(String n){

nouns.add(n);

}

public void addAdj(String a){

adjs.add(a);

}

public void addEmoticons(String e){

emoticons.add(e);

}

}

<NLAnalyzeVO.java>


분석 결과로는 List<String> 타입으로 명사들의 목록을 nouns 로, 형용사들의 목록을 adj로 리턴해준다. float형으로 sentimental 이라는 필드에는 입력된 문장의 감정도를 리턴하도록 되어 있다. 음수값일 때는 부정적, 양수값일 경우에는 긍정을 의미한다.

VO안에는 List<String> emoticons 라는 필드가 있는데, 이는 트위터 메세지 내의 이모티콘을 추출하여 저장하기 위한 필드인데, 이 예제에서는 사용하지 않으니 신경 쓰지 않아도 된다.


package com.terry.nl;


import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;

import com.google.api.client.http.HttpRequest;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.json.JsonFactory;

import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI.Documents.AnnotateText;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPIScopes;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesResponse;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentResponse;

import com.google.api.services.language.v1beta1.model.AnnotateTextRequest;

import com.google.api.services.language.v1beta1.model.AnnotateTextResponse;

import com.google.api.services.language.v1beta1.model.Document;

import com.google.api.services.language.v1beta1.model.Entity;

import com.google.api.services.language.v1beta1.model.Features;

import com.google.api.services.language.v1beta1.model.Sentiment;

import com.google.api.services.language.v1beta1.model.Token;


import java.io.IOException;

import java.io.PrintStream;

import java.security.GeneralSecurityException;

import java.util.List;

import java.util.Map;


/**

*

* Google Cloud NL API wrapper

*/



@SuppressWarnings("serial")

public class NLAnalyze {


public static NLAnalyze getInstance() throws IOException,GeneralSecurityException {


return new NLAnalyze(getLanguageService());

}


public NLAnalyzeVO analyze(String text) throws IOException, GeneralSecurityException{

Sentiment  s = analyzeSentiment(text);

List <Token> tokens = analyzeSyntax(text);

NLAnalyzeVO vo = new NLAnalyzeVO();


for(Token token:tokens){

String tag = token.getPartOfSpeech().getTag();

String word = token.getText().getContent();


if(tag.equals("NOUN")) vo.addNouns(word);

else if(tag.equals("ADJ")) vo.addAdj(word);

}


vo.setSentimental(s.getPolarity());


return vo;

}



/**

* Be sure to specify the name of your application. If the application name is {@code null} or

* blank, the application will log a warning. Suggested format is "MyCompany-ProductName/1.0".

*/

private static final String APPLICATION_NAME = "Google-LanguagAPISample/1.0";


/**

* Connects to the Natural Language API using Application Default Credentials.

*/

public static CloudNaturalLanguageAPI getLanguageService()

throws IOException, GeneralSecurityException {

GoogleCredential credential =

GoogleCredential.getApplicationDefault().createScoped(CloudNaturalLanguageAPIScopes.all());

JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

return new CloudNaturalLanguageAPI.Builder(

GoogleNetHttpTransport.newTrustedTransport(),

jsonFactory, new HttpRequestInitializer() {

@Override

public void initialize(HttpRequest request) throws IOException {

credential.initialize(request);

}

})

.setApplicationName(APPLICATION_NAME)

.build();

}


private final CloudNaturalLanguageAPI languageApi;


/**

* Constructs a {@link Analyze} which connects to the Cloud Natural Language API.

*/

public NLAnalyze(CloudNaturalLanguageAPI languageApi) {

this.languageApi = languageApi;

}


public List<Token> analyzeSyntax(String text) throws IOException{

AnnotateTextRequest request =

new AnnotateTextRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"))

.setFeatures(new Features().setExtractSyntax(true))

.setEncodingType("UTF16");

AnnotateText analyze =

languageApi.documents().annotateText(request);


AnnotateTextResponse response = analyze.execute();


return response.getTokens();


}

/**

* Gets {@link Sentiment} from the string {@code text}.

*/

public Sentiment analyzeSentiment(String text) throws IOException {

AnalyzeSentimentRequest request =

new AnalyzeSentimentRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"));

CloudNaturalLanguageAPI.Documents.AnalyzeSentiment analyze =

languageApi.documents().analyzeSentiment(request);


AnalyzeSentimentResponse response = analyze.execute();

return response.getDocumentSentiment();

}


}


<NLAnalyze.java>


코드 상의 주요 부분을 살펴보자

public NLAnalyzeVO analyze(String text)

메서느가 주요 메서드로, 트윗 문자열을 text 인자로 넘겨주면 분석 결과를 NLAnalyzeVO로 리턴한다.

이 메서드 안에서는 두개의 메서드를 호출하는데, analyzeSentiment(text) 와, analyzeSyntax(text)

를 두개 호출한다.

analyzeSentiment(text) 메서드는 text 를 넣으면 float 타입으로 감정도인 Sentinetal 지수를 리턴한다.

analyzeSyntax(text)는 구문을 분석하여, 명사,형용사,접속사,조사 등과 단어간의 의존 관계등을 분석해서 리턴해주는데, Token 이라는 데이타 타입의 리스트 형태로 다음과 같이 리턴한다.

List <Token> tokens = analyzeSyntax(text);


여기서 단어의 형(명사,형용사)는 token에서 tag 라는 필드를 통해서 리턴되는데, 우리가 필요한것은 명사와 형용사만 필요하기 때문에, tag가 NOUN (명사)와 ADJ (형용사)로 된 단어만 추출해서 NLAnalyzeVO 객체에 넣어서 리턴한다. (태그의 종류는 https://cloud.google.com/natural-language/reference/rest/v1beta1/documents/annotateText#Tag ) 를 참고하기 바란다.


중요

이 코드를 이용해서 구글 클라우드의 자연어 분석 API를 호출할때 그러면 API 인증은 어떻게 할까? 보통 구글 클라우드 콘솔에서 다운 받는 서비스 어카운트 키 (Service Account Key) JSON 파일을 사용하는데, 구글 자연어 분석 API를 호출하기 위해서도 서비스 어카운트 키가 필요하다.

이 키를 콘솔에서 다운로드 받은 후에, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 서비스 어카운트 키의 경로를 지정해주면 된다.


예) export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your-project-credentials.json


자연어 분석 클래스를 다 만들었으면 테스트 코드를 만들어서 테스트를 해보자.

다음은 JUnit 4.X를 이용한 간단한 테스트 코드 이다.


package com.terry.nl.test;


import static org.junit.Assert.*;


import java.io.IOException;

import java.security.GeneralSecurityException;

import java.util.List;


import org.junit.Test;


import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


public class NLAnalyzeTest {


@Test

public void test() {

try {

NLAnalyze instance = NLAnalyze.getInstance();

String text="Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'";

NLAnalyzeVO vo = instance.analyze(text);

List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();

System.out.println("### NOUNS");

for(String noun:nouns){

System.out.println(noun);

}

System.out.println("### ADJS");

for(String adj:adjs){

System.out.println(adj);

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("API call error");

} catch (GeneralSecurityException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("Security exception");

}

}


}


"Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'" 문자열을 분석하여,  명사와 형용사를 추출하여 다음과 같이 결과를 출력해준다.

### NOUNS

Larry

Page

Google

co-founder

search

engine

something

### ADJS

perfect

파이프라인 코드 작성

이제 메인 파이프라인 개발을 위한 준비가 다 되었다. 이제 TwitterPipeline 이라는 이름으로 파이프라인을 구현해보자. 전체 코드는 다음과 같다.

package com.terry.dataflow;


import java.io.IOException;

import java.io.StringReader;

import java.security.GeneralSecurityException;

import java.util.ArrayList;

import java.util.List;


import javax.json.Json;

import javax.json.JsonObject;

import javax.json.JsonReader;


import org.joda.time.DateTime;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.joda.time.format.DateTimeFormat;

import org.joda.time.format.DateTimeFormatter;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.io.BigQueryIO;

import com.google.cloud.dataflow.sdk.io.PubsubIO;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;

import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;

import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;

import com.google.cloud.dataflow.sdk.transforms.windowing.Window;

import com.google.cloud.dataflow.sdk.values.KV;

import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


import com.google.cloud.dataflow.sdk.values.PCollection;


/**

* A starter example for writing Google Cloud Dataflow programs.

*

* <p>The example takes two strings, converts them to their upper-case

* representation and logs them.

*

* <p>To run this starter example locally using DirectPipelineRunner, just

* execute it without any additional parameters from your favorite development

* environment.

*

* <p>To run this starter example using managed resource in Google Cloud

* Platform, you should specify the following command-line options:

*   --project=<YOUR_PROJECT_ID>

*   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>

*   --runner=BlockingDataflowPipelineRunner

*/

public class TwitterPipeline {

private static final Logger LOG = LoggerFactory.getLogger(TwitterPipeline.class);

private static final String NOWN_TABLE=

"useful-hour-138023:twitter.noun";

private static final String ADJ_TABLE=

"useful-hour-138023:twitter.adj";


// Read Twitter feed as a JSON format

// extract twitt feed string and pass into next pipeline

static class ParseTwitterFeedDoFn extends DoFn<String,String>{


private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}


// Parse Twitter string into

// - list of nouns

// - list of adj

// - list of emoticon


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{ /**

*

*/

private static final long serialVersionUID = 3013780586389810713L;


// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}



static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}

}

}


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}


}


static class AddTimeStampAdj extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("adj", key)

.set("count", value);


c.output(row);

}


}

static class AdjFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("ADJ")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

c.output(value);

}

}

}


static class Echo extends DoFn<KV<String,Iterable<String>>,Void>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

}

}


}

public static void main(String[] args) {

Pipeline p = Pipeline.create(

PipelineOptionsFactory.fromArgs(args).withValidation().create());


@SuppressWarnings("unchecked")

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));



// Noun handling sub-pipeline

List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


// Adj handling sub-pipeline

fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("adj").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Adj Formating").of(new AddTimeStampAdj()) )

.apply(BigQueryIO.Write

.named("Write Adj Count to BQ")

.to( ADJ_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));





p.run();

}


}

<TwitterPipeline.java>

코드를 하나씩 분석해보자.

먼저 main함수 부분을 보자

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));

<TwitterPipeline.java에서 main() 함수 일부 >


파이프 라인이 시작되면, PubSubIO를 이용하여 “projects/useful-hour-138023/topics/twitter” 이름의 큐에서 데이타를 읽는다. 읽은 데이타는 ParseTwitterFeedDoFn() 라는 함수에서 파싱이 된다.

ParseTwitterFeedDoFn() 은 다음과 같다.


static class ParseTwitterFeedDoFn extends DoFn<String,String>{

private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}

<TwitterPipeline.java 에서 ParseTwitterFeedDoFn 클래스 구현부>


PubSub에서 읽어드린 데이타는 문자열로 안에 JSON 데이타를 가지고 있다. 이 JSON 문자열을 파싱해서 “text”와 “lang” 엘리먼트만 추출한 후에, “lang”이 “en”(영어) 인 경우에만 다음 파이프라인으로 “text”에서 추출한 문자열을 보내고, 영어가 아닌 경우에는 데이타를 무시한다.


다음은 NLAnalyticsDoFn에서 트윗 문자열을 받아서 자연어 분석을 한다.


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{

// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}

<TwitterPipeline.java 에서 NLAnalyticsDoFn 클래스 구현부>


앞에서 작성한 자연어 분석 클래스인 NLAnalyze 클래스를 이용하여 text를 넘기고, 리턴 값으로 NLAnalyzeVO를 리턴 값으로 받은 후, 명사는 KV<String,Iterable<String>> 타입으로 다음과 같을 저장해서 c.output을 이용해서 다음 파이프라인으로 넘기고

“NOUN”

명사1,명사2,명사3,...


마찬가지 방법으로 형용사도 같은 데이타 형인 KV<String,Iterable<String>> 타입으로 저장하여 다음 파이프라인으로 넘긴다.


이 데이타를 각각 명사와 형용사 두개의 처리 파이프라인으로 전달하는데, 두개의 파이프라인으로 단일 데이타를 보내는 방법은 다음과 같다.


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

: (중략)


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

: (중략)

nlprocessed는 PCollection 타입으로, NLAnalyticsDoFn에 의해서 처리된 결과이다.

이 결과 값에 두 개의 각각 다른 트랜스폼 (NounFilter와 AdjFilter)를 적용하였다.

이렇게 하나의 PCollection 값에 두 개의 트랜스폼을 각각 적용하면 적용된 각각의 파이프라인은 다른 파이프라인으로 아래 그림 처럼 분기 처리가 된다.


자아 그러면, 명사 처리 파이프라인 흐름을 따라가 보자

nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

< TwitterPipeline.java의 main() 함수 일부>


첫번째 NounFilter에서는 앞에 파이프라인에서 들어온 명사와 형용사중에서 명사만 필터링 해서 다음 파이프라인으로 전달한다.


static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}


}

}

<TwitterPipeline.java 에서 NounFilter 클래스 구현부>

명사 인지 형용사 인지는 앞에서 넘어오는 데이타 형이 KV<String, .. > 인데, 키 부분의 값이 “NOUN” 일 경우에 명사이기 때문에, 이 값이 아니면 무시한다. 명사인경우에도 종종 쓰레기 값이 들어오는데, 예를 들어 트위터 특성상 해쉬 태그등을 위해서 “#”이 사용되고, 링크를 위해서 “http…” 링크가 들어가기도 하는데 이는 명사가 아니기 때문에 이 내용은 모두 필터링해서 무시한다.


이렇게 정재된 데이타는 파이프라인의 다음 단계인 .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))) 를 통해서 30초 단위의 고정 윈도우가 적용되고, 다음  .apply(Count.<String>perElement()) 을 통해서 단어별로 그룹핑되서 카운트 되고 그 결과는 앞서 적용한 30초 윈도우 시간 단위로 다음 파이프 라인으로 전달된다.  전달되는 데이타의 모양은 대략 다음과 같다.

Key (String)

Value (Long)

airplane

100

boy

29

india

92


이렇게 전달된 데이타는 빅쿼리에 저장하기 위해서 빅쿼리의 ROW 데이타 타입은 TableRow로 변환한다.

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


<TwitterPipeline.java 에서 main() 함수중 >

AddTimeStampNoun()에서 이 작업을 수행하는데, 이 함수는 윈도우의 시간을 추출하여 data라는 필드에 추가해준다.  아래는 AddTimeStampNoun()  함수의 코드이다.


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}

}

<TwitterPipeline.java 에서 AddTimeStampNoun 클래스 구현부>


여기서 주의할점은 윈도우에 대한 데이타를 접근하기 위해서는 com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implementation 해야 한다.  그후에, 현재 윈도우에 대한 정보는 ProcessContext c 변수에서 c.window() 함수를 이용하면 윈도우의 정보를 읽어올 수 있다. 이 코드에서는 윈도우 시작 시간을 IntervalWindow w의 w.start() 를 통해서 읽어왔고, 이를 빅쿼리의 TIMESTAMP  데이타 타입으로 넣기 위해서 “yyyy-MM-dd HH:mm:ss” 형태로 포매팅을 한후 TableRow라는 빅쿼리의 row형 데이타 타입으로 생성한 후, 다음 파이프라인으로 넘겼다.


다음 파이프라인은 BigQueryIO로 Write 명령을 이용해서 NOWN_TABLE 에 (String 값은 “noun”) 데이타를 쓰도록 하였고, 쓰기 모드는 붙여쓰기 WRITE_APPEND로 하고, 테이블은 없으면 생성하도록 CREATE_IF_NEEDED로 지정하였다. 이때 테이블의 스키마를 정의해줘야 하는데, 테이블 스키마는 withSchema(schema) 함수로 지정을 했는데, 스키마를 정의한  schema 변수는 다음과 같이 정의 되어 있다.


List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);

<TwitterPipeline.java 에서 main() 중의 “noun” 테이블 스키마 정의 부분>


같은 방식으로 형용사를 처리하는 파이프라인도 정의를 한다음 정의가 끝났으면

p.run();

을 이용하여 파이프라인이 실행되도록 한다.

실행하기

모든 코드 구현이 끝났다. 이제, 파이프라인을 기동해보자

이클립스에서 파이프라인을 구동하는데, Run Configuration 부분을 아래와 같이 설정한다.


Runner를  DataflowPipelineRunner를 선택한다. BlockingPipeRunner의 경우에는 파이프라인이 기동되는 동안 이클립스에 프로그램이 실행중인것으로 되서, 이클립스에서 파이프라인을 멈춰버리면 전체 파이프라인이 멈추기 때문에 적절하지 않다.

다음 Argument 탭에서 아래와 같이 Program Argument에  --streaming 옵션을 추가한다.


데이타 플로우는 배치 및 스트리밍 모드 두개가 있는데, 이 예제는 스트리밍 예제이기 때문에, --streaming을 명시적으로 지정한다.

구글 자연어 분석 API에 대한 인증을 위해서 서비스 어카운트 키 (JSON 파일의 경로)를 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 설정해야 하는데, Environment 탭에서 New를 누른 후, GOOGLE_APPLICATION_CREDENTIALS를 Name으로 하고 Value 부분에 서비스 어카운트 키 파일의 경로를 적어준다.



환경 설정이 끝났으면 아래 Run 버튼을 눌러서 파이프라인을 기동시킨다.

파이프라인을 기동 시키면 구글 클라우드로 소스를 배포하고 인스턴스를 구동하는데 까지 수분이 걸리기 때문에 잠시 기다린다.

기다리는 동안 배포 상태를 보기 위해서, 구글 클라우드 콘솔로 들어가면 아래와 같이 Status가 Running으로 바뀔때 까지 기다린다.



Running으로 바뀌고 나서도 1~2분 정도 준비가 필요하기 때문에 기다렸다가 해당 JOB을 확인해보면 다음과 같이 잡이 정상적으로 기동 되고 있음을 확인할 수 있다.



작업이 실행되었으면 이 파이프라인에 데이타를 넣어주기 위한 Fluentd 에이전트를 실행해보자. Fluentd를 설치한 VM에 들어가서,

%sudo /etc/init.d/td-agent restart

명령을 이용해서 fluentd 에이전트를 가동한다.


데이타가 들어오기 시작하면 다시 구글 클라우드 콘솔의 데이타 플로우 화면을 보면 (위의 그림)

상단에 LOGS 라는 버튼을 볼 수 있는데



이 버튼을 누르면 죄측 하단에 다음과 같이 Job Logs라는 윈도우가 나타난다.



여기서 오른쪽의 “WORKER LOGS” 라는 버튼을 누르면 이 파이프라인의 전체 로그를 볼 수 있는데, 에러가 없는지를 잘 확인 한다.



별도의 에러가 없다면 정상적으로 데이타가 수집된다고 할 수 있다.

그러면 데이타가 제대로 수집되는지를 확인해보자

빅쿼리 콘솔로 들어가서 select count(*) from [noun 테이블명] LIMIT 1000

을 수행해서 데이타가 제대로 들어오는지 확인해보자


위의 그림과 같이 f0_가 0 이상이면 데이타가 쌓이고 있다고 생각해도 된다.

데이타 시각화와 분석

데이타 스튜디오(Google datastudio) 를 이용한 데이타 분석

쌓여 있는 데이타를 실제로 분석해보자. 리포트를 이용해서 시각화를 할 예정인데, 여기서 사용한 리포팅 도구는 구글 데이타스튜디오 라는 리포트 도구이다. (http://datastudio.google.com) 으로 9월 현재는 미국 지역만을 대상으로 서비스가 되고 있고, 곧 한국에 서비스가 오픈될 예정이다.


우리가 만들려고 하는 리포트는 다음과 같은 모양을 갖는다

전체 기간동안 가장 많이 발생한 명사 10개와 그 발생 회수를 표로 출력해주고, 우측에는 전체기간이 아닌 일자별로 많이 발생한 명사 10개에 대한 발생 회수 및 그 변화 추이를 출력해준다.

다음 행에는 형용사에 대한 분석 결과를 출력해준다.



새로운 리포트 생성

데이타 스튜디오 메인 화면에 들어오면 작성한 리포트 목록들이 아래와 같이 출력된다.


여기서 + 버튼을 누르면 아래와 같이 새로운 리포트를 생성할 수 있다.


새로운 리포트 화면에 들어오면 우측 하단에 “CREATE NEW DATA SOURCE”라는 버튼이 나타나는데, 이를 통해서 빅쿼리 테이블을 불러올 수 있다. “CREATE NEW DATA SOURCE” 버튼을 눌러보자


데이타 소스 생성해서 빅쿼리 테이블을 불러와야하는데, 데이타 스튜디오는 아래 그림에서와 같이 빅쿼리 뿐만 아니라 구글의 MySQL 서비스인 CloudSQL에서 부터 일반 MySQL 까지 연결이 가능하기 때문에, 빅쿼리 뿐 아니라 일반 데이타 분석에서 분석된 데이타르르 MySQL을 통해서 리포트로 시각화할 수 있고,

Google Sheet에 있는 데이타를 불러와서 같이 표현할 수 있는 기능을 제공하는데, 이는 특히 비지니스나 영업쪽에서 작성한 Sheet의 데이타를 실시간으로 읽어다가 하나의 리포트에 표현할 수 있기 때문에 매우 유용하게 사용할 수 있다.

아울러 YouTube나 Google Analytics 그리고, Adwords 광고 플랫폼등 다양한 구글 플랫폼의 데이타를 읽어서 시각화할 수 있다.


연동 소스 중에서 빅쿼리를 선택한 다음 프로젝트와 데이타셋 그리고 연동하고자 하는 테이블을 선택한다. 여기서는 noun 테이블을 선택하였다. 그러면 다음과 같이 테이블 스키마가 나오고 ADD TO REPORT 버튼이 나온다.


ADD TO REPORT를 눌러서 리포트에 추가하자

다음 리포트 화면에서 다음과 같이 Table 버튼을 눌러서 테이블을 추가하자


테이블을 추가하면 우측에 테이블에 출력하고자 하는 데이타를 선택할 수 있다.


우측에 Data source는 아까 불러들인 “noun”테이블을 선택하고, Dimension은 noun을 선택하고, Metric은 count를 추가하면 명사(noun)별, 발생횟수 (count)를 출력해준다.

위의 표를 보면 note7 등의 단어가 나오는데, 당연히 note7에 대해 검색했기 때문에  note7 단어가 많이 나오겠지만 이는 분석에서 얻고자 하는 데이타가 아니기 때문에 이 note7 등의 불필요한 문자열은 필터링해서 없애버리도록 한다.

필터는 우측 하단에 “Fiter”라는 메뉴에서 추가가 가능한데


메뉴에서 “+Add a filter”를 선택한후


Exclude (제외한다) 라는 버튼을 선택한 후에, Dimension을 noun으로 Match type을 Equal to 로 Expression을 note7 으로 선택하면 noun 필드에서 값이 note7인 내용은 제외 하도록 하는 필터이다. 필터가 적용되면 note7 단어는 필터링되서 출력되지 않는다.


다음으로 꺽은선 그래프를 추가하기 위해서 화면에서 Time series 버튼으로 꺽은선 그래프를 추가한다.



꺽은선 그래프가 추가되면 그래프에 출력될 데이타를 Time series Properties에서 다음과 같이 설정한다.


Data source는 noun 테이블로 하고, Dimention을 Time Dimention은 date로 하고, 여러 필드를 같이 분석하기 위해서 Breakdown Dimension에 noun을 추가하면 하나의 명사가 아니라 주요 명사들을 출력해준다.

그리고 Metric은 count로 선택하면 각 명사별 카운트수를 일자별로 볼 수 있다.


같은 방법으로 형용사에 대한 그래프도 추가한다.


그래프가 완성된 후에, 데이타를 수집 및 분석해보니 꽤나 의미가 있는 분석 결과를 얻을 수 있었다.


다음글에서는 오픈소스 데이타 분석 도구인 제플린을 이용하여 상세 데이타 분석을 하는 방법에 대해서 알아보기로 한다.

이글에서 소개한 데이타 스튜디오는 아직 한국에서는 서비스가 제공되지 않기 때문에, 한국에서 사용하고자 하는 사람들에게는 다음글의 제플린 기반의 데이타 분석이 훨씬 더 유용하리라 생각된다.





저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

구글 빅쿼리와 데이타 플로우를 이용한 노트7 소셜 반응 분석


조대협 (http://bcho.tistory.com)




이 글은 개인이 개인적인 취미와 빅데이타 분석 플랫폼 기술 공유를 위해서 데이타 분석용 시나리오로 소셜 트랜드 분석을 선택하였고, 그중 노트7을 하나의 예로 선택한 내용이기 때문에, 이러한 분석 내용이 악의적으로 활용되거나 해석되기를 바라지 않으며 이 글의 라이센스는 본인이 저작권을 소유하고 있으며 출처를 밝히는 인용을 포함하여 모든 인용 및 내용에 대한 활용을 금합니다.


구글의 빅데이타 플랫폼인 빅쿼리와(https://cloud.google.com/bigquery), 데이타플로우((https://cloud.google.com/dataflow) 에 대해서 테스트를 하던중, 아무래도 데이타 분석 애플리케이션을 만들어보려면 실제 시나리오를 가지고 분석을 해보는게 가장 적절할것 같아서, 트위터에서 노트7에 대한 데이타를 수집해서 분석해봤다.


전체적인 시나리오는 note7 으로 태깅된 트위터 피드를 읽어서 30초 단위로 실시간 분석을 하는데, 수집된 트윗에서 구글의 자연어 분석 API를 이용하여(https://cloud.google.com/natural-language/) 명사와 형용사만 필터링을 해서 수집하는 방식으로 진행하였다.


다음은 9/12~9/19일까지 수집한 데이타에 대한 통계이다.



가장 위의 파란선이 recall이라는 단어로 12일 부터 꾸준히 등장해서 16일에 피크를 치고 계속해서 내용이 나타나고 있다. 17일에 피크를 친 빨간선은 S7이라는 단어인데, 왜 노트7 트윗에 S7이 등장했는지는 약간 미지수이다. 일단위 보다는 시간단위 분석이 필요하고 각 일자별로 주요 지표를 상세하게 볼필요가 있다고 생각해서 제플린 노트북을 이용하여 빅쿼리에 수집된 데이타를 분석을 해봤다.


이 그래프는 시간대별 명사들의 카운트인데, 시간당 1500을 넘는 시점이 9/12 16:00, 9/12 23:00, 9/14 01:00 등으로 보인다. 일자별이나 모든 시간대별로 트윗을 분석하는 것보다, 몬가 이슈가 있으면 시간당 트윗이 급격하게 올라가기 때문에, 트윗에서 명사 카운트가 1500을 넘는 시간만 분석해봤다.




이것이 분석 결과인데, 그래프로 보기가 어려우니 표로 표현해서 보자. 주요 날짜별로 주요 키워드를 3개씩만 검출해보면 다음과 같은 결과가 나온다.


먼저 9월12일16시에는  flight와 india라는 단어가 많이 잡혔는데, 이날은 인도에서도 항공기에 노트7을 가지고 탑승을 못하도록 공지한 시간때로 보인다.


http://mashable.com/2016/09/12/samsung-galaxy-note7-flight-ban-india/#nTLbsAiVWqqr


다음 23시에 boy라는 단어가 주로 잡혔는데, 이날은 노트7으로 인하여 6살 어린이 어린이(boy)가 상처를 입은 사건이 발생한 때이다.


https://www.cnet.com/news/exploding-samsung-galaxy-phone-burns-6-year-old/


다음으로 14일과 16일 로그를 분석해보자



14일 1시에는 software, update, explosions라는 단어가 많이 검출되었는데,

http://money.cnn.com/2016/09/14/technology/samsung-galaxy-note-7-software-update-battery-fires/

소프트웨어 업데이트를 통해서 폭발을 막겠다고 발표한 시점이다.


16일은 미국 정부가 노트7의 리콜을 명령한 날로 5시와6시에 goverment와 recall 등의 단어가 집중적으로 올라왔다.


http://www.forbes.com/sites/shelbycarpenter/2016/09/16/government-official-recall-samsung-galaxy-note-7/#351a35c46e53



18일에는 report와 lawsuit (소송) 이라는 단어가 많이 검출되었는데, report는 찾아보니


http://bgr.com/2016/09/18/galaxy-note-7-explosion-burns-samsung-lawsuit/

로이터 통신에서 16일날 언급한 플로리다 남성이 노트7으로 화상을 입고 소송을 한 내용이 2일의 시차를 두고18일에 급격하게 퍼졌다.

그러다가 아래 표와 같이 19일에는 florida, man, pocket,lawsuit 와 같이 플로리다, 남성,주머니,소송 등의 단어가 검출되면서 18일의 내용이 점점 더 구체화 되어가는 과정을 보여주었다.

사실 노트7을 분석 아이템으로 삼은것은 노트7이 출시되었을때, 꽤나 완성도가 높았고 그래서 재미있는 반응도 꽤나 많을것으로 기대했다. 그래서 굳이 다른 키워드 보다 노트7을 고른거였고, 지금은 떠났지만 한때 몸 담았던 회사였기도 했기 때문에 잘 되기를 바라는 마음이었는데, 분석 결과를 지켜보면서 씁쓸한 마음이 이내 떠나지를 않았다.


이 분석을 통해서 얻고자 한것은, 이 시스템을 구축하는데 혼자서 5~6시간 정도의 시간밖에 걸리지 않았다. 예전이라면 이런 분석 시스템을 구축하려면 몇명은 몇달은 투자해야 할텐데, 이제는 혼자서도 이러한 빅데이타 분석이 가능하다는 메세지와 함께, 실시간 분석 시스템 구현 기술을 습득하기 위한 개인 작업이다.

의외로 데이타에서 많은 인사이트를 얻어낼 수 있었고 추후에 이 분석에 대한 모든 코드를 공개할 예정인데, 다른 사람들도 유용하게 사용할 수 있는 정보 공유의 목적이다.

.

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


스트리밍 분석 플랫폼인 Apache Beam (Dataflow)를 공부하다 보니, 예제가 필요해서 지난번에는 힐러리와 트럼프 후보가 언급된 피드를 읽어서, 구글의 자연어 분석 API를 통해서 긍정/부정 여부를 분석한 후, 빅쿼리에 넣어서, 파이썬 노트로 그래프로 표현해봤는데, 아무래도 자연어 분석 API의 정확도가 아직 떨어지는 건지, 대부분 부정으로 나오고, 분석 결과도 재미가 없다.


그래서 새로운 분석 예제를 고민 하다가, 다음 방향으로 정했다.



  1. 지난번과 마찬가지로 데이타 수집은 트위터에서 특정 키워드를 fluentd로 수집한다.
  2. 수집한 데이타는 Pub/sub에 저장한다.
  3. Pub/sub에 데이타 플로우 파이프라인을 연결한다.
    1. 데이타 플로우 파이프라인에서 데이타를 읽는다.
    2. 읽어온 데이타중 10%만 샘플링 한다. (아무래도 예제이다 보니)
    3. 트윗 데이타중, 트윗 문자열만 발라낸다. 
    4. 트윗 문자열중 RT가 아닌 문자열만 추출한다.
    5. 구글 자연어 처리 API를 호출한다.
    6. FIXED 윈도우 처리를 한다. (5초 주기)
    7. 자연어 처리 API에서 리턴된 결과로 검출된 명사와,그 명사의 발생횟수, 형용사와 각 형용사의 발생횟수, 이모티콘과 이모티콘의 발생횟수, 그리고 타임 윈도우등 트윗의 감정분석 값의 평균을 계산한다.
  4. 계산 결과를 빅쿼리에 저장한다.
  5. 빅쿼리에 저장된 결과를 제플린에서 읽어서 일별 발생한 명사와 명사의 횟수, 형용사와 형용사의 횟수, 이모티콘과 이모티코의 횟수를 탑 랭킹 8개씩 출력한다.
지난번 감성 분석보다, 같이 언급되는 형용사나 명사의 수를 카운트하면 재미있는 결과를 얻을 수 있지 않을까? 구현 목표는 6시간이내. (월요일??) 


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


데이타 스트리밍 처리에 대한 이해


조대협 (http://bcho.tistory.com)


근래에 Apache Beam 프로젝트를 공부하게 되서, 그간 묵혀놨던 데이타 스트리밍 처리에 대해서 다시 정리중인데, 예전에 Apache Storm을 봤을때 보다 트리거나, 윈도우등 많은 개념들이 들어가 있어서 데이타 스트리밍에 대한 개념 부터 다시 정리를 시작을 하고자한다.


Apache Storm에서 부터, Apache Spark 기반의 데이타 스트림 처리뿐 아니라 근래에는 Apache Flink와 같은 새로운 스트리밍 프레임웍크과 구글이 이미 클라우드를 통해서 서비스 하고 있는  google cloud dataflow (Apache Beam이라는 프로젝트로 오픈소스화 되었고, 현재 인큐베이션 단계에 있다.) 까지 빅데이타에 대한 실시간 처리성이 강조되면서 근래에 데이타 스트리밍 처리가 다시 주목 받는 것 같다. 이 문서는 구글이 개발한 dataflow에 대한 개념을 이해하기 위함이다.


본 문서의 내용과 그림은 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 를 참고하였다.


사전 개념 이해

스트리밍 데이타 처리를 이해하기 위해서는 몇몇 용어와 개념을 사전에 이해해야 하는 부분이 있다.

Bounded data 와 Unbounded data

먼저 스트리밍 데이타 처리를 이해하려면 데이타의 종류에 대해서 먼저 이해해야 한다.

  • Unbounded data 는 데이타의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이타라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이타를 Unbounded data 라고 한다.

  • Bounded data는 데이타가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이타를 뜻한다. 1월의 정산 데이타.

Event time과 Processing time

데이타의 발생 시간과 시스템에서 처리되는 시간이 차이가 있는데, 이를 각각 Event time과 Processing time 이라고 정의한다.

예를 들어, 게임에서 사용자가 공격을 한 이벤트를 서버에 전달해서 처리하여 저장하는 시나리오가 있다고 가정할때, 공격 이벤트가 1:00:00에 발생했으면, 이 데이타가 네트워크를 타고 서버로 도달하여 프로그램 로직을 수행하고 저장하는데 소요된 시간을 2초라고 가정하면, Event time 은 1:00가 되고, Processing time은 1:00:02가 된다.

이상적으로는 Event time과 Processing time이 동일하면 좋겠지만, 네트워크 시간이나 처리 시간에 따라 Processing time이 Event time 보다 늦고, 또한 Processing time에서 소요되는 실제 처리 시간은 일정하지 않고 아래 그림의 파란색 그래프(실제 처리 그래프) 처럼 들쭉 날쭉하다. 네트워크 상황이나, 서버의 CPU, IO 상황이 그때마다 차이가 나기 때문이다.


아래 그림을 통해서 개념을 다시 정리해보면,

X축은 Event time, Y축은 Processing Time이다. 0초에 발생한 데이타가 서버에 도착해서 처리하는 시간이 소요 되기 때문에, 아래 그림과 같이 Processing Time은 2초 부터 시작한다. Skew는 Event time과 Processing time간의 간격이다. 아래 그림에서 보면, Processing time에서 3초때에는 Event time 1초에서 발생한 데이타를 처리하고 있는데, 실제 Event time에서는 3초 시간의 데이타가 발생하고 있기 때문에, Processing time과 Event time은 약 2초의 지연이 발생하고 있고, 이를 Skew 라고 한다.



Bounded data의 처리

Bounded data는 이미 저장되어 있는 데이타를 처리하는 것이기 때문에 별다른 처리 패턴이 필요없다



데이타를 읽어서 한번에 처리해서 저장 하면 된다.

UnBounded data 처리

복잡한 것은 스트리밍 데이타 즉, Unbounded data 를 처리하는 방법인데, Unbounded data 는 크게 Batch와 Streaming 두 가지 방식으로 처리할 수 있다.

Batch로 처리

배치로 Unbounded data를 처리 하는 방식은 아래와 같이 두 가지 방식이 있다.

Fixed Windows

Fixed Windows 방식은 스트리밍으로 들어오는 데이타를 일정 시간 단위로 모은 후, 배치로 처리 하는 방식이다. 예를 들어서 아래 그림과 같이 10~11시 까지 데이타를 수집한후, 11시 이후에, 10~11시까지 들어온 데이타를 처리해서 정리 하는 방식이다.



이 방식은 구현이 간단하다는 장점이 있지만, 데이타가 수집 된 후 처리를 시작하기 때문에, 실시간성이 떨어진다. (근 실시간)

Streaming 처리

Unbounded 데이타를 제대로 처리하려면 스트리밍 처리를 하는 것이 좋은데, 스트리밍 처리 방법에는 아래와 같이 크게 Time agnostic, Filtering, Inner Join, Windowing 방식등이 있다.


스트리밍 처리는 배치 처리에 비해서 복잡한 것이, Unbounded 데이타는 기본적으로 특성이 Skew가 환경에 따라 변화가 심하고 그래서 데이타가 시스템에 도착하는 순서 역시 순차적으로 도착하지 않고 들쭉 날쭉 하다.

Time agnostic

Time agnostic 이란, 데이타가 시간 속성을 가지고 있지 않는 데이타 이다. 들어오는 데로 처리를 하면 되기 때문에, 별다른 노하우가 필요 없지만, 하나의 데이타 형이기 때문에 간단하게 언급만 한다.

Filtering

다음으로 많이 사용 되는 것이 필터링인데, 들어오는 데이타 중 특정 데이타만 필터링 해서 저장 하는 구조이다.


예를 들면, 웹 로깅 데이타를 수집해서, 특정 IP나 국가 대역에서 들어오는 데이타만 필터링해서 저장하는 시나리오등이 될 수 있다.

Inner joins (교집합)

Inner join은 두개의 Unbounded 데이타에서 들어오는 값을 서로 비교하여 매칭 시켜서 값을 구하는 방식이다.



모바일 뉴스 앱이 있다고 가정할때, 뉴스 앱에서는 사용자가 어떤 컨텐츠를 보는지에 대한 데이타를 수집 전송하고, 지도 앱에서는 현재 사용자의 위치를 수집해서 전송한다고 하자.

이 경우 사용자별 뉴스 뷰에 대한 Unbounded data 와, 사용자별 위치에 대한 Unbounded data 가 있게 되는데, 이 두개의 데이타 스트림을 사용자로 Inner Join을 하면 사용자가 어떤 위치에서 어떤 뉴스를 보는지에 대해서 분석을 할 수 있다.

Inner join을 구현하기 위해서는 양쪽 스트림에서 데이타가 항상 같은 시간에 도착하는 것이 아니기 때문에, 반대쪽 데이타가 도착할때 까지 먼저 도착한 데이타를 임시로 저장할 버퍼 영역이 필요하고, 이 영역에 임시로 일정 기간 데이타를 저장하고 있다가 반대쪽 스트림에서 데이타가 도착 하면 이를 조인해서 결과를 저장하고, 버퍼 영역에서 두개의 데이타를 삭제한다.

만약에 반대쪽의 데이타가 도착하지 않으면, 이 버퍼 영역에 데이타가 계속 쌓이기 때문에, 일정 기간이 지나면 반대쪽 스트림에서 데이타가 도착하지 않은 데이타를 주기적으로 삭제 해주는 (garbage collection) 정책이 필요하다.


cf. Inner join (교집합), Outer join (합집합)

Approximation algorithms (근사치 추정)

근사치 추정 방식은 실시간 데이타 분석에서 많이 사용되는데, 실시간 분석에서는 전체 데이타를 모두 분석할 수 있는 시간이 없는 경우가 많고, 시급한 분석이 필요한 경우가 있기 때문에, 전체 데이타를 분석하지 않고 일부만 분석하거나 또는 대략적인 데이타의 근사값만을 구하는 방법으로 해서, 빠르게 데이타를 분석하는 경우가 있다. 이를 근사치 추정 방식이라고 하는데, 예를 들어 VOD 서비스에서 지금 10분간 인기있는 비디오 목록, 12시간 동안 가장 인기 있는 판매 제품등 과 같은 시나리오인데, 이런 시나리오에서 데이타는 아주 정확하지 않아도 근사 값만 있으면 되고, 데이타를 그 시간에 보는 시급성이 중요하다.  이러한 시나리오에서는 전체 데이타를 다 보고 분석이 어렵기 때문에, 샘플링을 하거나 대략적인 근사 값만을 구해서 결과를 낸다.


이런 근사치를 추정하는 알고르즘은 K-means나 Approximate Top-N등이 이미 정의되어 있는 알고리즘이 많다.


참고 자료 :

Storm을 이용한 근사치 구하기 : https://pkghosh.wordpress.com/2014/09/10/realtime-trending-analysis-with-approximate-algorithms/

Apache Spark에서 K means로 근사치 구하기 :

https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html


Windowing

실시간 스트리밍 데이타 처리에서 중요한 개념중의 하나는 Windowing 인데, Windowing 이란 스트리밍 데이타를 처리할때 일정 시간 간격으로 처리하는 것을 정의한다.

예를 들어, 10분 단위의 Windowing의 경우 1시~2시까지 들어온 데이타를 1:10, 1:20,1:30, …  단위로 모아서 처리한다.

윈도우에는 자르는 방법에 따라서 다음과 같이 몇가지 방법이 있다.

Fixed Windows

정확하게 일정 시간 단위로 시간 윈도우를 쪼게는 개념이다. 앞에서 언급한 예와 같이 윈도우 사이즈가 10분 일때, 1시 10분은 1시00분~1시10분까지의 데이타를, 1시 20분은 1시10분~1시20분까지의 데이타를 처리한다.

Sliding Windows

Sliding Window 방식은 윈도우가 움직이는 개념이다.

슬라이딩 윈도우의 개념은 현재 시간으로 부터 +-N 시간 전후의 데이타를 매 M 시간 마다 추출 하는 것을 슬라이딩 윈도우라고 하고, 이 윈도우들은 서로 겹치게 된다.

예를 들면 현재시간으로부터 10분 전에서 부터  측정시간까지의 접속자를 1분 단위로 측정하는 시나리오가 될 수 있다. 매 1분 간격으로, 데이타를 추출하고, 매번 그 시간으로부터 10분전의 데이타를 추출하기 때문에 데이타가 중첩이 된다.  

이렇게 추출하는 간격을 Period (앞에서 1분), 그리고 추출하는 기간을 Length 또는 Size (앞에서 10분)라고 한다.



출처 : https://cloud.google.com/dataflow/model/windowing#sliding-time-windows

Session

다음으로는 Session Window의 개념이다.

Session Window에는 사용자가 일정 기간동안 반응이 없는 경우(데이타가 올라오지 않는 경우)에 세션 시작에서 부터, 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리한다

예를 들어서 세션 타임 아웃이 20분이라고 하고 데이타가 1:00 부터 올라오고 있는데,  1:01, 1:15에 데이타가 올라오고, 1:40분에 데이타가 올라오면 1:15 이후에 20분동안 (1:35까지) 데이타가 올라오지 않았기 때문에, 1:00,1:01,1:15은 하나의 세션으로 되고, 1:40은 새로운 세션 시작이 된다.



출처 : https://cloud.google.com/dataflow/model/windowing#session-windows


시간대별 Window 처리 방식

스트리밍 데이타에서 윈도우를 사용할때, 어느 시간을 기준 시간으로 할것인가를 정해야 하는데, 데이타가 시스템에 도착하는 Processing time을 기준으로 할 수 있고 또는  데이타가 실제 발생한 시간인 Event time을 기준으로도 할 수 있다.

Processing time based windowing

Processing time을 기준으로 데이타를 처리하는 것은 크게 어렵지 않다. 데이타가 도착한 순서대로 처리해서 저장하면 된다.


Event time based windowing

문제는 Event time을 기준으로 데이타를 처리 하는 경우인데, 데이타가 들어오는 것이 순서대로 들어오지 않는 경우가 많고, 또한 데이타의 도착 시간또한 일정하지 않다.




이 그림은 Event time을 기준으로 데이타를 처리하는 개념인데, 좌측 하얀색 화살표 처럼 12:00~13:00에 도착한 데이타가 11:00~12:00에 발생한 데이타 일 경우, 11:00~12:00 윈도우에 데이타를 반영해줘야 한다.

이러한 Event time 기반의 스트리밍 처리는 아래와 같이 기술적으로 두가지 주요 고려 사항이 필요하다.

  • Buffering
    늦게 도착한 데이타를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다.

  • Completeness
    Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가?
    즉 해당 시간에 발생한 모든 데이타는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이타가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다. (예를 들어 워터마크 기법 같은 것이 있는데, 이는 다음글에서 설명하도록 한다.)


지금까지 실시간 데이타 분석에 사용되는 대략적인 개념을 알아보았다. 다음 글에서는 Apache Beam을 이용하여 이러한 실시간 데이타 분석을 어떻게 구현하는지 알아보도록 하겠다.



참고 자료

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison#game-stats-advanced-stream-processing


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

스파크 성능이 안나오면, 우리 회사 데이타팀 팀장왈. 먼저 파이썬으로 짰는지 확인 부터 해보라길래, 파이썬과 스칼라로 만들어진 스파크 성능 차이가 얼마나 나는지 찾아봤더니 다음과 같은 수치가 나왔다.


http://emptypipes.org/2015/01/17/python-vs-scala-vs-spark/ (원본 출처)


일단 스파크를 할려면 스칼라는 필수인듯 하다. 

간단한 프로토타입핑등에는 파이썬을 사용할 수 있겠지만 결국 프로적션은 스칼라로 최적화해야 할듯.

근데. 자바대 스칼라 성능 비교는 없네

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Spark Key/Value Pairs

조대협 

http://bcho.tistory.com


RDD에는 어떤 데이타 형식이라던지 저장이 가능한데, 그중에서 Pair RDD라는 RDD가 있다. 이  RDD는 Key-Value  형태로 데이타를 저장하기 때문에, 병렬 데이타 처리 부분에서 그룹핑과 같은 추가적인 기능을 사용할 수 있다.

예를 들어 reduceByKey 와 같이 특정 키를 중심으로 데이타 연산 (각 키 값 기반으로 합이나 평균을 구한다던가) key 기반으로 join 을 한다던가와 같은 그룹핑 연산에 유용하게 사용할 수 있다.

Pair RDD를 생성하는 방법은 다음과 같다.

Java
mapToPair나 flatMapToPair 라는 메서드를 사용하면 된다.

mapToPair등의 함수를 이용할때, 아래와 같이 람다 표현식을 사용하는 방식이 있고

RDD.mapToParis( d-> new Tuple2(key,value))
(d는 RDD에서 읽어오는 값. Key는 새로운  RDD를 생성할때, 해당 Tuple의 키값,  value는 해당 Tuple의 Value값)

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(-> new Tuple2(s, 1));

또는 아래와 같이, Pair함수를 정의한 후에, Pair 함수를 mapToPair함수등에 넘기는 방법이 있다.

아래는 텍스트 라인에서 첫번째 단어를 Key로하고, 해당 라인을 Value로 하는 코드 예제이다.

PairFunction<String, String, String> keyData =
  new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
* Learning spark 코드 참조

Python

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


Apache Spark(스파크) - RDD Persistence (스토리지 옵션에 대해서)


조대협 (http://bcho.tistory.com)


Spark Persistence에 대해서


앞에 글에서 Spark RDD가 메모리에 상주 되는 방법에 대해서 간략하게 언급했는데, 다시 되 짚어 보면 Spark의 RDD는 filter() 등. 여러  Transformation Operation을 실행하더라도  Transformation  단계가 아니라 Action이 수행되는 단계에 로드된다고 설명하였다.


그리고, 매번 해당 RDD가 Action으로 수행될 때마다 다시금 소스에서 부터 다시 로드되서 수행된다고 했는데, 그렇다면 매번 로드 해서 계산하여 사용하는 것이 아니라, 저장해놓고 사용 하는 방법이 무엇이 있을까?


스파크에서는 RDD 를 저장해놓고 사용하는 기능으로 persist()와 cache() 라는 두 가지 오퍼레이션을 지원한다.

스파크는 RDD를 저장함에 있어서,  메모리와 디스크 두 가지 영역을 사용하며, 옵션에 따라서 RDD의 저장 영역을 지정할 수 있다.


기본 디폴트는 메모리에 저장하고, 옵션으로 디스크를 지정할 수 있다. 디스크를 지정하면, 메모리에서 모지란 부분을 마치 swapping 하듯이 디스크에 데이타를 저장한다.


아래 옵션 참고 




출처 : https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


여기에 특이한 점은, 메모리나 디스크에 저장할때, RDD를 RAW (원본 형식)으로 저장할 것인지 자바의 Serialized 된 형태로 저장할 지를 선택할 수 있다. ( Serealized 된 형태로 저장하기 MEMORY_ONLY_SER, MEMORY_AND_DISK_SER) 이렇게 저장하면, 메모리 사용량은 더 줄일 수 있지만, 반대로 저장시 Serizalied하는 오버로드와, 읽을때 De-Seriazlie 하는 오버로드가 더 붙어서 CPU 사용량은 오히려 증가하게 된다.


아래 데이타는 http://sujee.net/2015/01/22/understanding-spark-caching/#.VWcOh1ntlBc 의 데이타 긴데,

Serialized 로 저장하는 경우, 최대 약 4배 정도의 메모리 용량을 절약할 수 있으나, 반대로, 처리 시간은 400배 이상이 더 들어간다.

 









 


다음으로, 특이한 옵션중에 하나가 OFF_HEAP 이라는 옵션인데, 스파크는 JVM 상에서 동작하기 때문에, 스파크가 저장하는 메모리란 JVM 상의 메모리를 뜻한다. JVM  특성상 Garbage collection 에 의한 성능 제약을 받을 수 있으며 또한 별도로 서로 복제가 되지 않기 때문에, (기본 옵션의 경우에만), 안정적인 서비스를 원할 경우에는 별도의 복제 옵션을 선택해야 한다.

이런 문제를 해결하기 위한 다른 옵션으로는 JVM 내에 데이타를 저장하는 것이 아니라, 별도의 JVM 외의 메모리 공간에 데이타를 저장하는 방식이 OFF_HEAP 이라는 옵션이다. 아직 안정화 되지는 않았지만, http://tachyon-project.org/ 이라는 메모리 클러스터를 이용하여, 서로 복제가 가능한 외부 메모리 클러스터에 저장하는 방식으로, JVM  상 메모리 보다는 성능이 약간 떨어지지만, 디스크보다는 빠르며, 큰 메모리 공간을 장애 대응에 대한 상관 없이 (자체 적으로 HA  기능을 제공함) 사용이 가능하다. 

 cf. Redis나  Infinispan등과 같은 메모리 기반의 데이타 그리드 솔루션의 하나인 Hazelcast 역시 JVM 밖의 네이티브 메모리 공간에 데이타를 저장하는 유사한 방식을 사용한다.


Persist vs Cache


그렇다면, persist()와 cache()의 차이점은 무엇인가? cache()는  persist() 에서 저장 옵션을 MEMORY_ONLY로 한 옵션과 동일하다.


저장된 RDD는 메모리나 디스크에서 언제 삭제 되는가?

RDD가 메모리나 디스크에 로드되었다고 항상 로드된 상태로 있는 것이 아니다. 기본적으로 LRU (Least Recently Used)  알고리즘 (가장 근래에 사용되지 않은 데이타가 삭제되는 방식)에 의해서 삭제가 되가나, 또는 RDD.unpersiste() 함수를 호출하면 명시적으로 메모리나 디스크에서 삭제할 수 있다.


언제 어떤 타입의 Peristence옵션을 사용해야 하는가?


가장 좋은 옵션은 디폴트 옵션인  MEMORY_ONLY  옵션이다. 가장 빠르다.

다음으로 메모리가 모자를 경우에는  MEMORY_ONLY_SER 옵션을 이용하면, Seriazlied 된 형태로 저장하기 때문에 메모리 공간은 줄일 수 있으나, 대신 CPU 사용률이 올라간다. 그래도 여전히 빠른 방식이다.

데이타 양이 많을 경우에는 DISK에 저장하는 옵션보다는 차라리 Persist 를 하지 않고, 필요할때 마다 재계산 하는 것이 더 빠를 수 있다.

빠른 응답이 필요한 경우에 Persist 된 데이타에 대한 유실을 방지하려면, replicated storage 옵션을 사용하는 것이 좋다. (MEMORY_ONLY2 등).  다른 스토리지 타입 역시, 장애로 인해서 데이타가 유실되더라도 재계산을 통하여 복구가 가능하지만, 재계산 하는 것 보다는 RDD 의 복제본을 저장해 놓고, 장애시 페일오버 하는 것이 빠르기 때문에, 빠른 응답시간을 요구로 하는 웹 애플리케이션의 경우 이 스토리지 타입이 유리하다. (단, 메모리 사용량은 복제본을 저장하는데도 사용되기 때문에 상대적으로 일반 스토리지 옵션에 비해서 메모리 여유가 적다.)



참고 

Learning Spark

Spark document - https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


참고 - 이 글은 제가 스파크를 혼자 공부하면서 문서만을 통해서 정리한 글이기 때문에, 실무적인 경험이 많이 녹아 들어 있지 않습니다. 

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Apache Spark RDD 이해하기 #2


스파크에서 함수 넘기기 (Passing function to Spark)


조대협 (http://bcho.tistory.com)


Passing function
스파크는 개발자가 비지니스 로직을 함수로 정의한 후에, 이 함수를 스파크 클러스의 노드들로 보내서 수행할 수 있다. 스파크가 지원하는 프로그래밍 언어에 따라서, 이 함수를 넘기는 방법이나 특성이 다른데, 여기서는 Python을 이용하는 방법에 대해서 설명한다. (스칼라는 아직 공부를 못해서)

파이썬으로 함수 넘기기
파이썬으로 함수를 넘기는 방법은 크게 3가지가 있다.
  • 람다 표기법을 이용하는 방식
  • 모듈 상의 탑레벨 함수(Top-level function)
  • 파이썬 클래스 안에서 함수를 정의하여, 클래스 안에서 그 함수를 스파크로 넘기는 방식
각각의 방식에 대해서 살펴보도록 하자.

첫번째로 람다 표기법을 사용하는 방식이다.  람다 표기법이란, 함수를 정의하는데, 있어서 함수명이 없이 간략하게 함수의 기능만을 저장하는 표기법으로, 주로 간단한 결과를 구현할때, 코딩을 간결하게 (양이 적게) 표현하고자 하는데 사용한다.
다음은 필터 연산에서 함수를 람다 표기법으로 정의해서 넘기는 예제이다. “l” 이라는 RDD에 Apache 라는 문자열이 있는지 없는지를 행마다 체크하는 로직이다.


같은 로직을 함수를 정의해서 함수 자체로 넘길 수 가 있는데, 위의 람다 표현식으로 된 함수를 containsApache라는 함수로 정의하여 필터로 넘기는 예제이다.

마지막으로는 파이썬 클래스에서 클래스의 함수를 넘기는 방식이다.
다음은 MyClass 클래스를 정의한 다음. 클래스를 생성할때, filter에 사용할 문자열을 받은후, count라는 메서드에서, 그 문자열로 필터링을 한 후, 라인 수를 count하여 리턴하는 방법이다.


이 때 주의할 점은 self.query를 filter에 인자로 넘겼는데, 이 경우에 스파크로 넘어가는 것은 self.query 변수 내용 하나가 아니라, 이 객체 전체가 스파크로 넘어가게 된다. 동작상에는 문제가 없지만, 전체 객체가 스파크로 넘어가기 때문에 메모리 사용률이 많아지고, 전체 객체를 넘기는 과정 역시 인자만 넘기는 방식에 비해서 상대적으로 시간이 많이 걸리기 때문에 좋은 방법은 아니다. 
이런 문제를 피하는 방법은 클래스내에서 무슨 값을 넘길때는 self.xxx식으로 스파크에 넘기는 것이 아니라, 그 값을 복사하여 넘기는 방법을 사용하면 된다. 예를 들어서 위의 예제의 경우에는 아래와 같이 변경하면 된다.

즉 스파크에 self.query를 넘기는 것이 아니라 이 값은 로컬 변수인 x에 x=self.query로 저장한 후, 스파크에는 이 x 값을 넘기게 되면, 실제 모든 객체가 스파크에 전달되지 않고, 이 로컬 변후 x 만 넘어가기 때문에 메모리가 과 사용되는 것을 예방할 수 있다. 







저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Spark RDD  이해하기 #1

조대협(http://bcho.tistory.com)


기본 개념 잡기

RDD 는 여러 분산 노드에 걸쳐서 저장되는 변경이 불가능한 데이타(객체)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. (서로 다른 노드에서 분리되서 실행되는). 

쉽게 말해서 스파크 내에 저장된 데이타를 RDD라고 하고, 변경이 불가능하다. 변경을 하려면 새로운 데이타 셋을 생성해야 한다.

RDD의 생성은 외부로 부터 데이타를 로딩하거나 또는 코드에서 생성된 데이타를 저장함으로써 생성할 수 있다.

RDD에서는 딱 두 가지 오퍼레이션만 지원한다.
  • Transformation : 기존의 RDD 데이타를 변경하여 새로운 RDD 데이타를 생성해내는 것. 흔한 케이스는 filter와 같이 특정 데이타만 뽑아 내거나 map 함수 처럼, 데이타를 분산 배치 하는 것 등을 들 수 있다.
  • Action : RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는것으로 가장 쉬운 예로는 count()와 같은 operation들을 들 수 있다.

RDD의 데이타 로딩 방식은 Lazy 로딩 컨셉을 사용하는데, 예를 들어 sc.textFile(“파일”)로 파일을 로딩하더라도 실제로 로딩이 되지 않는다. 파일이 로딩되서 메모리에 올라가는 시점은 action을 이용해서 개선할 당시만 올라간다.
아래 코드를 보자 아래 코드는 “README.md” 파일을 RDD로 로딩 한후에

  1. pythonLines에 “Python”이라는 단어를 가지고 있는 라인만 추려서 새로운 RDD를 만들고,
  2. 그 다음 count() action을 이용하여, 그 줄 수 를 카운트 하는 예제이다.




그렇다면, 언제 실제 README.md 파일이 읽혀질까? 실제로 읽혀지는 시기는 README.md 파일을 sc.textFile로 오픈할 때가 아니라 .count() 라는 액션이 수행될 때 이다.
이유는 파일을 오픈할때 부터 RDD를 메모리에 올려놓게 되면 데이타가 클 경우, 전체가 메모리에 올라가야 하는데, 일반적으로 filter 등을 이용해서 데이타를 정재한 후에,  action을 수행하기 때문에, action을 수행할때, action수행시 필요한 부분만 (filter가 적용된 부분만) 메모리에 올리면 훨씬 작은 부분을 올릴 수 있기 때문에 수행시에 데이타를 로딩하게 된다. 그렇다면 로딩된 데이타는 언제 지워질까?
action을 수행한다음 바로 지워진다.

위에서 보면 lines.count()를 두번 수행하였는데, 이 실행시 마다 README.md 파일을 다시 읽어드린다. 만약에, 한번 읽어드린 RDD를 메모리에 상주하고 계속해서 재 사용하고 싶다면 RDD.persist()라는 메서드를 이용하면, RDD를 메모리에 상주 시킬 수 있다.

RDD 생성하기

앞에서도 언급했듯이, RDD를 생성하는 방법은 크게 두가지가 있다. 
  • 외부로 부터 파일을 읽어서 로딩하거나 파일은 일반 파일을 읽거나 S3,HBase,HDFS,Cassandra 등에서 데이타를 읽어올 수 있다. 
    파이썬 예제) lines = sc.textFile(“/path/filename.txt”)
  • 또는 드라이버 프로그램내에서 생성된 collection을 parallelize() 라는 메서드를 이용해서 RDD 화 하는 방법이다. (자바 컬렉션등을 RDD로 생성)
    자바 예제) JavaRDD<String> lines = sc.parallelize(Array.asList(“first”,”second”))

RDD Operations

1) Transformation (변환)

변환은 RDD를 필터링하거나 변환하여 새로운 RDD를 리턴하는 오퍼레이션이다.
다음 코드는 README.md 라는 파일을 읽어서 f 라는 RDD를 생성한후
f라는 RDD 에서 “Apache”라는 문자열을 가진 라인만을 모아서 t라는 RDD를 새롭게 생성한 후 화면으로 출력하는 예제이다.
f와 t는 전혀 다른  RDD로 RDD t는 filter에 의해서 새롭게 생성되었다.

<그림. 파이썬 예제>


변환 함수는 filter 뿐 아니라, map, group등 여러가지 함수들이 있으며, 자세한 함수 리스트는 https://spark.apache.org/docs/latest/programming-guide.html#transformations 를 참고하기 바란다.

2) Action (액션)

액션은 RDD를 가지고 계산을 해서 최종 결과를 리턴하거나 또는 데이타를 외부 저장소(External Storage)등에 쓸 수 있다.
최종 결과를 리턴하는 오퍼레이션으로는 앞의 예제에서도 설명한 count()나, 첫번째 element를 리턴하는 first등이 있으며, RDD를 저장하는 오퍼레이션으로는 saveAsTextFile(path)와 같은 오퍼레이션등이 있다.
 


본 포스팅을 오라일사의 "Learning Spark" 과  Sparing Programming Guide를 참고하여 작성하였습니다. https://spark.apache.org/docs/latest/programming-guide.html


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
Spark의 전체적인 스택 구조

조대협 (http://bcho.tistory.com)

스파크의  전체적인 스택 구조를 보면 다음과 같다.





  • 인프라 계층 : 먼저 스파크가 기동하기 위한 인프라는 스파크가 독립적으로 기동할 수 있는 Standalone Scheudler가 있고 (그냥 스팍만 OS위에 깔아서 사용한다고 생각하면 된다). 또는 하둡 종합 플랫폼인 YARN 위에서 기동될 수 있고 또는 Docker 가상화 플랫폼인 Mesos 위에서 기동될 수 있다.
  • 스파크 코어 : 메모리 기반의 분산 클러스터 컴퓨팅 환경인 스팍 코어가 그 위에 올라간다. 
  • 스파크 라이브러리  : 다음으로는 이 스파크 코어를 이용하여 특정한 기능에 목적이 맞추어진 각각의 라이브러리가 돌아간다. 빅데이타를 SQL로 핸들링할 수 있게 해주는 Spark SQL, 실시간으로 들어오는 데이타에 대한 리얼타임 스트리밍 처리를 해주는 Spark Streaming, 그리고 머신러닝을 위한 MLib, 그래프 데이타 프로세싱이 가능한 GraphX가 있다.

현재 글에서 설명하고 있는 부분은 먼저 스파크에 대한 기본을 이해하기 위해서 Spark Core 부분을 중점적으로 설명하고 있다. 


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License



Apache Spark Cluster 구조

스팍의 기본 구조는 다음과 같다.
스팍 프로그램은 일반적으로 “Driver Program”이라고 하는데, 이 Driver Program 은 여러개의 병렬적인 작업으로 나뉘어져사 Spark의 Worker Node(서버)에 있는  Executor(프로세스)에서 실행된다.



1. SparkContext가 SparkClusterManager에 접속한다. 이 클러스터 메니져는 스팍 자체의 클러스터 메니져가 될 수 도 있고 Mesos,YARN 등이 될 수 있다. 이 클러스터 메니저를 통해서 가용한 Excutor 들을 할당 받는다
2. Excutor를 할당 받으면, 각각의 Executor들에게 수행할 코드를 보낸다.
3. 다음으로 각 Excutor 안에서 Task에서 로직을 수행한다.


  • Executor : Process
  • Task : A Unit of work that will sent to one executor

cf. Storm 과 개념이 헷갈릴 수 있는데, 
Storm 은 Node가 하드웨어 서버, Worker가 프로세스,Executor가 쓰레드
Spark 은 Worker Node가 하드웨어 서버, Executor가 프로세스 이다.  


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License



Apache Spark 설치 하기


조대협 (http://bcho.tistory.com)


Spark 설치 하기


1. 스팍 홈페이지에서 다운로드.


다운로드시 Pre-built in Spark을 골라야 함. 여기서는 Hadoop 2.6용으로 빌드된 스팍을 선택한다.








2. 스팍 쉘을 실행 해보자


인스톨 디렉토리에서, 

%./bin/pyspark





을 실행하면, 위와 같이 파이썬 기반의 스팍 쉘이 실행됨을 확인할 수 있다.


3. 로깅 레벨 조정 및 간단한 스팍 예제


디폴트 로깅은 INFO 레벨로 되어 있기 때문에, 쉘에서 명령어를 하나라도 실행하면 INFO 메세지가 우루루 나온다. (몬가 할때 결과 값보다, 오히려 INFO 메세지가 많이 나온다.)

그래서, conf/log4j.properties 파일을 conf/log4j.properties.templates 파일을 복사해서 만든후 

log4j.rootCategory를 Info에서 WARN 레벨로 다음과 같이 수정한다.


log4j.rootCategory=WARN, console






환경 설정이 끝났으면 간단한 예제를 돌려보자

$SPARK_HOME 디렉토리에 있는  README.md 파일을 읽어서, 라인 수를 카운트 하는 예제이다.





스팍은 자체적으로 클러스터를 모니터링 할 수 있는 차체적인 Web UI가 있다. 

http://localhost:4040에 접속하면 다음과 같이 스팍 클러스터에 대한 모니터링 화명을 얻을 수 있다.









저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License




스팍에 대한 간단한 개념과 장점 소개


조대협 (http://bcho.tistory.com)



스팍의 개념과 주요 기능


요즘 주변에서 아파치 스팍을 공부하는 사람도 많고, 스팍을 기반으로한 Zeppelin을 이용하여 데이타 분석을 하는 경우도 많아서, 오늘부터 다시 Spark을 들여다 보기 시작했습니다.


스팍은 예전에도 몇번 관심을 가진적이 있는데, Storm과 같은 데이타 스트리밍 프레임웍에서 Storm과 같이 언급 되기도 하고, 머신 러닝 프레임웍을 볼때도  스팍 ML  라이브러리 기능이 언급 되기도 하고, 예전 모 회사의 데이타 분석 아키텍쳐를 보니, 카산드라에 저장된 데이타를 스팍/Shark라는 프레임웍으로 분석을 하더군요. 또 누구는 메모리 기반의 하둡이라고도 합니다.


스팍의 정의를 내려보면 한마디로

범용적 목적의 분산 고성능 클러스터링 플랫폼 (General purpose high performance distributed platform)

입니다 말이 정말 길고 모호한데, 달리 설명할만한 단어가 없습니다.


스팍은 범용 분산 플랫폼입니다. 하둡과 같이 Map & Reduce 만 돌리는 것도 아니고, Storm 과 같이 스트리밍 처리만 하는게 아니라, 그냥 분산된 여러대의 노드에서 연산을 할 수 있게 해주는 범용 분산 클러스터링 플랫폼으로, 이 위에, Map & Reduce나, 스트리밍 처리등의 모듈을 추가 올려서 그 기능을 수행하게 하는 기능을 제공합니다.


특히나, 메모리 하둡이라고도 불리는데, 이 스팍은 기존의 하둡이 MR(aka. Map & Reduce) 작업을 디스크 기반으로 수행하기 때문에 느려지는 성능을 메모리 기반으로 옮겨서 고속화 하고자 하는데서 출발하였습니다.


그 플랫폼 위에 SQL 기반으로 쿼리를 할 수 있는 기능이나, 스트리밍 기능등등을 확장하여 현재의 스팍과 같은 모습을 가지게 되었습니다.

스팍의 주요 기능은 앞에서 언급하였듯이 

  • Map & Reduce (cf. Hadoop)
  • Streaming 데이타 핸들링 (cf. Apache Storm)
  • SQL 기반의 데이타 쿼리 (cf. Hadoop의 Hive)
  • 머신 러닝 라이브러리 (cf. Apache Mahout)
등을 제공합니다.

스팍의 장점

빠른 속도라는 장점은 필수 이고, 왜 다들 스팍! 스팍 하는가 봤더니, 플랫폼으로써의 특성이 장점으로 작용합니다.
예를 들어 기존의 빅데이타 분석 플랫폼의 구현 구조를 보면, 배치 분석시 ETL 등을 써서 데이타를 로딩한 후에, Hadoop의 MR을 이용하여 데이타를 정재한후, 이 데이타를 OLAP 등의 데이타 베이스에 넣은 후에, 리포팅 툴로 그래프로 표현했습니다. 여기에 만약 실시간 분석을 요한다면 Storm을 연결해서 실시간 데이타 분석 내용을 더하는 일을 했습니다. 
사용되는 프레임웍만해도 몇가지가 되고, 이를 공부하는 시간과 시스템을 배포 운영하는데 여러가지 노력이 들어갔습니다만, 스팍은 하나로 이 모든것이 가능하다는 겁니다.

즉 한번 배워놓으면, 하나의 플랫폼내에서 여러가지가 가능한데다가 속도까지 빠릅니다.
그리고 한 플랫폼안에 배치,스트리밍, 머신 러닝등 다양한 처리를 제공하기 때문에, 하나의 데이타로 여러가지 형태로 데이타를 처리할 수 있는 기능을 가집니다.

연동성 부분에서도 장점을 볼 수 가 있는데, 스팍은 스칼라로 구현되었지만, 파이썬,자바,스칼라등 다양한 언어를 지원하기 위한 SDK를 가지고 있고, 데이타 저장단으로는 하둡, 아마존  S3, 카산드라등 다양한 데이트 스토리지를 지원합니다.
이런 연동성으로 다양한 언어로 다양한 데이타 저장소를 연동하여 데이타를 분석 및 처리할 수 있는 장점을 가지고 있습니다.


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

대충보는 Storm #6-Apache Storm 그룹핑 개념 이해하기

 조대협 (http://bcho.tistory.com)



지금까지 컴포넌트간의 경로 라우팅, 즉 Spout 에서 Bolt간, Bolt에서 Bolt간 경로를 설정하는 방법에 대해서 알아보왔다.

그렇다면 각 컴포넌트간 라우팅을 할때 그 안에 있는 Task간에는 어떻게 상세하게 라우팅이 될까? Storm에서는 이 Task간의 라우팅을 정의하기 위해서 Grouping이라는 개념을 사용한다.


Shuffling

가장 간단한 라우팅 방법으로 Bolt A에서 Bolt B로 라우팅을 한다고 했을때, Bolt A내의 있는 Task가 Bolt B에 있는 Task중 아무 Task로 임의로(랜덤하게) 라우팅 하는 방식이다.




Field

Bolt A에 있는 Task에서 Bolt B에 있는 Task로 라우팅을 할때, 규칙성을 갖는 것중 하나인데, 보내고자 하는 데이타의 특정 필드에 있는 값을 기준으로 Bolt B에 있는 특정 Task로 라우팅 하는 방식이다. 라우팅 기준은 지정한 필드의 값을 가지고 해쉬를 계산해서 해쉬에 따라서 Bolt B에 있는 Task로 라우팅 시키는 방식이다.

예를 들어, Bolt B에 Task가 3개가 있다고 가정할때, 나이라는 필드로 “Field Grouping”을 한다고 하면, 나이/3으로 나눈 나머지 값에 따라서 Task A,B,C로 라우팅 하는 방식이다. (나눗셈은 설명을 쉽게 하기 위해 예를 들었지만 비슷한 원리로 해쉬를 계산하여 라우팅을 한다.)

Bolt에서 로컬 캐쉬를 사용하거나 할때, 같은 해쉬의 데이타가 같은 Task로 라우팅이 되게 해서 캐쉬 히트율을 높이는 것등에 유용하게 사용될 수 있다.



Global 

Global 그룹핑은 모으는 개념(Aggregation)의 개념이다. Bolt A의 어느 Task에서 메세지를 보내더라도 항상Bolt B똑같은 하나의 Task로 라우팅이 되는 방식으로, Bolt B에 있는 Task중에서 Task ID가 가장 작은 특정 Task로만 라우팅을 한다.

분산해서 연산한 값을 모두 모아서 합산을 한다던가등에 사용할 수 있다.



All

All 그룹핑은 일종의 브로드 캐스트 개념으로 Bolt A의 하나의 Task가 메세지를 전송하면 Bolt B의 모든 Task가 메세지를 받는 형태이다.

각 Task들에 설정 변경등을 넘길때 유용하게 사용될 수 있다.





Direct

당연히 있을 것으로 생각했겠지만 당연히 있는 기능이다. Bolt A의 Task에서 Bolt B의 특정 Task로 명시적으로 라우팅을 지정하는 기능이다. 이때 주의할점이 Bolt B의 Task를 지정할때,  Task Id가 아니라 Task의 Index로 타겟을 지정한다. 예를 들어 Bolt B에 Task가 5개가 있을때, 0번, 1번식으로 타켓을 지정하게 된다.





Custom

Custom 그룹핑은 라우팅 로직을 개발자가 직접 작성해서 넣는 방식이다.


Local or Shuffle

다소 주의 깊게 볼 필요가 있는 그룹핑 방식이다. 기본적인 동작 방식은 Shuffle과 다르지 않으나,

Bolt A에서 Bolt B의 Task로 라우팅을 할때, Bolt A에서 메세지를 보내는 Task와 같은 JVM 인스턴스 (Woker)에 Bolt B의 Task가 있을 경우 같은 JVM 인스턴스에 있는 Task로 우선 라우팅을 한다. 이는 네트워크를 이용한 리모트 호출을 줄이기 위한 방법이다.

그러면 Bolt의 Task들은 각 Worker에 어떻게 배치 될것인가에 대한 질문이 올 수 있는데, 이렇게 Task를 Worker에 배치하는 행위를Scheduling(스케줄러)라고 하고, 배치를 하는 주체를 Scheduler라고 한다. 자료를 몇개 찾아봤지만 Scheduling 정책에 대해서는 명확하게 나와 있지 않고, 무작위 적으로 배치하는 것으로 보이는데, 조금 더 research가 필요할듯. 


참고 :Pluggable Scheduler

애플리케이션 성격에 맞게 스케쥴링 정책을 구현해서 사용할 수 있는데, 이를 Pluggable Scheduler라고 한다.

http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/ 의 예에 나와 있는 시나리오를 보면, 특정 Spout의 경우에는 상용 소프트웨어를 사용하는데, 이 상용 소프트웨어는 Machine당 라이센스를 가지고 있기 때문에 이 Spout은 반드시 라이센스가 설치된 서버에 스케쥴링(배포)되어야 한다. 그래서 특정 스케쥴링 정책이 필요한데, 첨부 링크에 있는 내용은 Pluggable scheduler를 구현하는 방법에 대해서 설명하고 있다.





저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

대충보는 Storm #5-Apache Storm 병렬 분산 처리 이해하기

 조대협 (http://bcho.tistory.com)

 

Storm에 있는 Spout Bolt들은 여러개의 머신에서 어떻게 나눠서 처리될까? Storm 클러스터는 여러대의 분산된 서버에서 운용되기 때문에, 당연히 Spout Bolt도 나눠서 처리된다 그렇다면 이런 Storm의 병렬 처리 구조는 어떻게 되는 것일까?

이 글에서는 Spout Bolt를 병렬로 처리하는 Storm의 구조에 대해서 알아보도록 한다.

Storm의 병렬 처리를 이해하기 위한 개념

Storm의 병렬 처리를 이해하기 위해서는 몇가지 개념을 정리해야 한다. Node,Worker,Exectutor,Task 이 네 가지 개념을 이해해야 한다.


Node

Node는 물리적인 서버이다. Nimbus Supervisor 프로세스가 기동되는 물리적인 서버이다.

Nimbus는 전체 노드에 하나의 프로세스만 기동하며, Supervisor는 일반적으로 하나의 노드에 하나만 기동한다. 여러대를 기동시킬 수 도 있지만, Supervisor의 역할 자체가 해당 노드를 관리하는 역할이기 때문에 하나의 노드에 여러개의 Supervisor를 기동할 필요는 없다.


Worker

Worker Supervisor가 기동되어 있는 노드에서 기동되는 자바 프로세스로 실제로 Spout Bolt를 실행하는 역할을 한다.


Executor

Executor Worker내에서 수행되는 하나의 자바 쓰레드를 지칭한다.


Task

Task Bolt Spout의 객체를 지칭한다. Task Executor (쓰레드)에 의해서 수행된다.

이 개념을 다시 정리해보면 다음과 같은 그림이 된다.



<그림. Node,Worker,Executor,Task 의 개념>

각 슬레이브 노드에는 Supervisor 프로세스가 하나씩 떠있고, conf/storm.yaml에 정의된 설정에 따라서 worker 프로세스를 띄운다.supervisor.slots.ports에 각 Worker가 사용할 TCP 포트를 정해주면 된다. 아래는 5개의 Worker 프로세스를 사용하도록 한 설정이다.



<그림. Storm 설정에서 Supervisor 5개 띄우도록한 설정>

 

그리고 난후에, Topology를 생성할때, Topology에 상세 Worker,Executor,Task의 수를 정의한다. 앞에서 예제로 사용했던 HelloTopology 클래스 코드를 다시 살펴보자. 아래 코드는 Worker,Executor,Task등을 설정한 예이다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),2)

                       .setNumTasks(4)

                       .shuffleGrouping("HelloSpout");

              

              

               Config conf = new Config();

               conf.setNumWorkers(5);

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<코드. Worker,Executor,Task 수를 설정한 HelloTopology 예제>

     Topology가 사용할 Worker 프로세스의 수 설정
Config
에서 setNumWorkers(5)를 이용해서 이 토폴로지에서 사용한 Worker 프로세스 수를 5개로 지정했다.

     Spout Executor(쓰레드 수) 설정
다음으로 setSpout에서 3번째 인자로 “2”라는 숫자를 넘겼는데, setSpout에 마지막 인자는 Executor의 수이다. 이를 Parallelism 힌트라고 하는데, Spout 컴포넌트가 수행될 쓰레드의 수이다. 여기서는 Spout Task (객체의 수)를 정의하지 않았는데, 정의하지 않은 경우 디폴트로 Executor의 수와 같이 설정된다.

     Bolt Executor(쓰레드 수)Task(객체)수 설정
Bolt
도 마찬가지로 setBolt 3번째 마지막 인자가 Parallelism 힌트인데, 역시 2개로 지정하였다. 여기서는 Task수를 별도로 지정하였는데, setTaskNum(4)을 이용해서 지정한다. 이렇게 설정하면 HelloBolt 객체는 총 4개가 생기고 2개의 Thread에서 번갈아 가면서 실행하게 된다.

자아 그러면 실제로 설정하는데로 동작하는 지 몇가지 확인을 해보자. 자바의 jps 명령을 이용하면 현재 동작중인 자바 프로세스 수를 볼 수 있다.



<그림 Worker 프로세스 수의 확인>

위의 테스트는 하나의 환경에서 nimbus,zookeeper,supervisor,worker를 모두 띄워놓은 형태인데,worker가 설정대로 5개의 프로세스가 떠있고, nimbus,supervisor가 떠 있는 것이 확인되고, QuorumPeerMainzookeeper 프로세스이다.

실제로 Executor가 지정한데로 Thread가 뜨는지 확인을 해보자. 여러개의 Worker 프로세스에 나눠서 뜨면 모니터링하기가 복잡하니 편의상 conf.setNumer(1)로 해서, 하나의 Worker 프로세스에서 모든 Executor가 뜨도록 Topology를 변경한후, Worker 프로세스의 쓰레드를 모니터링 하니 다음과 같은 결과를 얻었다.

코드상에서 HelloSpout에 대한 Parallelism 힌트를 2로 설정하고, HelloBolt에 대한 Parallelism 힌트도 2로 설정하였다.



<그림. Worker 프로세스의 쓰레드 덤프>

실제로 Worker 프로세스내의 쓰레드를 보면 HelloSpout용 쓰레드가 2, HelloBolt용 쓰레드가 2개가 기동됨을 확인할 수 있다.


리밸런싱

Storm 운영중에 노드를 추가 삭제 하거나 또는 성능 튜닝을 위해서 운영중인 환경에 Worker, Executor의 수를 재 조정이 가능하다. 이를 rebalance라고 하는데, 다음과 같은 명령어를 이용해서 가능하다.

% bin/storm rebalance [TopologyName] -n [NumberOfWorkers] -e [Spout]=[NumberOfExecutos] -e [Bolt1]=[NumberOfExecutos] [Bolt2]=[NumberOfExecutos]

미들웨어 엔지니어로써 본 Storm 튜닝

본인의 경우 경력이 톰캣이나 오라클社의 웹로직에 대해 장애진단과 성능 튜닝을 한 경력을 가지고 있어서 JVM이나 미들웨어 튜닝에 많은 관심을 가지고 있는데, 이 미들웨어 튜닝이라는 것이 대부분 JVM과 쓰레드 수등의 튜닝에 맞춰 있다보니, Storm의 병렬성 부분을 공부하다 보니, Executor Worker,Task의 수에 따라서 성능이 많이 차이가 나겠다는 생각이 든다.

특히나 하나의 토폴리지만 기동하는 것이 아니라, 여러개의 토폴로지를 하나의 클러스터에서 구동 할 경우 더 많은 변수가 작용할 수 있는데, 쓰레드란 것의 특성이 동시에 하나의 코어를 차지하고 돌기 때문에, 쓰레드수가 많다고 시스템의 성능이 좋아지지 않으며 반대로 적으면 성능을 극대화할 수 없기 때문에, 이 쓰레드의 수와 이 쓰레드에서 돌아가는 객체(Task)의 수에 따라서 성능 차이가 많이 날것으로 생각된다. 아마도 주요 튜닝 포인트가 되지 않을까 싶은데, 예전에는 보통 JVM당 적정 쓰레드 수는 50~100개 정도로 책정했는데 (톰캣과 같은 WAS 미들웨어 기준). 요즘은 코어수도 많아져서 조금 더 많은 쓰레드를 책정해도 되지 않을까 싶다. 쓰레드 수 뿐 아니라, 프로세스수도 영향을 미치는데, JVM 프로세스의 컨텐스트 스위칭은 쓰레드의 컨텐스트 스위칭보다 길기 때문에, 프로세스를 적게 띄우는 것이 좋을것으로 예상 되지만, JVM 프로세스는 메모리 GC에 의한 pausing 시간이 발생하기 때문에 이 GC 시간을 적절하게 나눠주기 위해서 적절 수 의 프로세스를 찾는 것도 숙제가 아닐까 싶다. 디폴트 worker의 옵션을 보니 768M의 힙 메모리를 가지고 기동하게 되어 있는데, 메모리를 많이 사용하는 연산는 다소 부족하지 않을까 하는 느낌이 든다.

Bolt가 데이타 베이스, 파일 또는 네트워크를 통해서 데이타를 주고 받는 연산을 얼마나 하느냐에 따라서도 CPU 사용률이 차이가 날것이기 때문에 (IO작업중에는 쓰레드가 idle 상태로 빠지고 CPU가 노는 상태가 되기 때문에) IO 작업이 많은 경우에는 쓰레드의 수를 늘리는 것이 어떨까 한다.

Bolt Spout와 같은 통신은 내부적으로 ZeroMQ를 사용하는 것으로 알고 있는데, 아직 내부 구조는 제대로 살펴보지는 않았지만, 같은 프로세스내에서는 네트워크 호출 없이 call-by-reference를 이용해서 통신 효율을 높이고, 통신이 잦은 컴포넌트는 같은 프로세스에 배치 하는 affinity와 같은 속성(?)이 있지 않을까 예측을 해본다.

결과적으로 튜닝 포인트는, Worker,Executor,Task 수의 적절한 산정과, 만약에 옵션이 있다면 리모트 호출을 줄이기 위한 Bolt Spout 컴포넌트의 배치 전략에 있지 않을까 한다.


다음 글에서는 이런 병렬 처리를 기반으로 각 컴포넌트간에 메세지를 보낼때, 여러 Task간에 어떻게 메세지를 라우팅을 하는지에 대한 정리한 그룹핑(Grouping)에 대한 개념에 대해서 알아보도록한다.


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

 

대충보는 Storm #4-Apache Storm 특징과 기본 개념

 조대협 (http://bcho.tistory.com)


지금까지 Storm에 대해서 이해하기 위해서, 실시간 스트리밍 서비스의 개념에 대해서 알아보고 간단한 HelloStorm 애플리케이션을 제작해서, 싱글 클러스터 노드에 배포해봤다. 대략 실시간 스트리밍이 무엇이고, Storm을 이용해서 어떻게 개발하는지에 대해서는 어느정도 이해를 했을 것이라고 생각한다.

그러면 지금까지의 경험을 조금 더 체졔적으로 정리해서 Storm에 대해서 이해해보도록 하자. 이번에는 Storm에 대한 개념과 아키텍쳐 구조에 대해서 알아보겠다.


Storm의 특징

Storm을 실시간 스트리밍을 처리하기 위한 서버이자 프레임웍이다. 그렇다면 이 Storm이 다른 스트리밍 처리 솔루션에 비해 가지는 특징은 무엇일까?


확장성

Storm은 클러스터링 기능을 이용해서 수평으로 확장이 가능하다. 그래서 많은 데이타를 다루어야 하는 빅데이타이 데이타 스트림 서비스에서도 가능한데, Storm홈페이지에 포스팅된 자료를 보면, 2x2.4GHz CPU 24GB 메모리 머신을 기반으로 초당 100바이트짜리 메세지를 100만개 정도 처리가 가능하다고 한다. (https://storm.apache.org/about/scalable.html) TPS로 환산하면 100 TPS이다. (Wow!!)


장애 대응성

Storm의 다른 특징 중의 하나는 Fault tolerant 구조를 통한 장애 대응 능력이다. ZooKeeper를 통해서 전체 클러스터의 운영 상태를 감지 하면서, 특정 노드가 장애가 나더라도, 시스템이 전혀 문제 없이 작업을 진행할 수 있으며, 장애가 난 노드에 할당된 작업은 다른 노드에 할당해서 처리하고, 장애 노드에 대해서는 복구 처리를 자동으로 수행해준다.


메세지 전달 보장

Storm은 메세지 처리에 안정성을 제공하는데, 장애가 나건 문제가 있건간에,유실 없이 최소한 한번 메세지가 처리될 수 있게 지원한다. (at least once : 이말은 반대로 이야기 하면, 1번 이상 같은 메세지가 중복 처리될 수 있다는 이야기이다.)

만약에 정확하게 메세지가 한번만 처리가 되기를 원하면 Trident (https://storm.apache.org/documentation/Trident-tutorial.html) 를 통해서 Storm을 확장하면, 정확히 하나의 메세지가 한번만 처리되도록 할 수 있다.


쉬운 배포

Storm은 메뉴얼에 따르면(?) 배포와 설정이 매우 쉽다. 실제로 클러스터를 구성해보면 분산 시스템인데 비해서 별로 어려움 없이 배포와 설정이 가능하다. 그리고 메뉴얼에 따르면(?) 배포 후에, 크게 많은 관리 없이 운영이 가능하다고는 하는데,