블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

t-SNE를 이용한 차원 감소


조대협 (http://bcho.tistory.com)


PCA 기반 차원 감소의 문제점

앞의 글에서 차원 감소에 대한 개념과, 차원 감소 알고리즘의 하나인 PCA 알고리즘에 대해서 살펴보았다.

PCA의 경우 선형 분석 방식으로 값을 사상하기 때문에 차원이 감소되면서 군집화 되어 있는 데이타들이 뭉게져서 제대로 구별할 수 없는 문제를 가지고 있다. 아래 그림을 보자


출처 https://www.youtube.com/watch?v=NEaUSP4YerM


이 그림은 2차원에서 1차원으로 PCA 분석을 이용하여 차원을 줄인 예인데, 2차원에서는 파란색과 붉은색이 구별이 되는데, 1차원으로 줄면서 1차원상의 위치가 유사한 바람에, 두 군집의 변별력이 없어져 버렸다.

t-SNE

이런 문제를 해결하기 위한 차원 감소 방법으로는 t-SNE (티스니라고 읽음) 방식이 있는데, 대략적인 원리는 다음과 같다.


먼저 점을 하나 선택한다. 아래는 검정색점을 선택했는데, 이 점에서 부터 다른점까지의 거리를 측정한다.



다음 T 분포 그래프를 이용하여, 검정 점(기준점) 을 T 분포 상의 가운데 위치한다면, 기준점으로부터 상대점 까지 거리에 있는 T 분포의 값을 선택(위의 T 분포 그래프에서 파란점에서 위로 점섬이 올라가서 T분포 그래프상에 붉은 색으로 X 표가 되어 있는 값)하여, 이 값을 친밀도 (Similarity)로 하고, 이 친밀도가 가까운 값끼리 묶는다.


이 경우 PCA 처럼 군집이 중복되지 않는 장점은 있지만, 매번 계산할때 마다 축의 위치가 바뀌어서, 다른 모양으로 나타난다. 단 데이타의 군집성과 같은 특성들은 유지 되기 때문에 시각화를 통한 데이타 분석에는 유용하지만, 매번 값이 바뀌는 특성으로 인하여, 머신러닝 모델의 학습 피쳐로 사용하기는 다소 어려운점이 있다.


아래 그림은 같은 데이타로 t-SNE 분석을 각각 한번씩한 결과를 시각화 해서 표현한 결과 인데, 보는 것과 같이 군집에 대한 특성은 그대로 유지 되지만 값 자체는 변화가 된것을 확인할 수 있다.




sklearn 을 이용한 t-SNE 구현

전체 코드는 https://github.com/bwcho75/dataanalyticsandML/blob/master/dimension%20reduction/2.%20t-SNE%20visualization.ipynb 에 공개되어 있으니 참고하기 바란다.


# Perform the necessary imports
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

model = TSNE(learning_rate=100)
transformed = model.fit_transform(feature)

xs = transformed[:,0]
ys = transformed[:,1]
plt.scatter(xs,ys,c=labels)

plt.show()


사실 코드가 너무 간단해서 설명할것이 없다. TSNE 객체를 선언하고 학습속도 (learning_rate)를 지정한다음 fit_transform 하면 끝이다. (싸이킷런 만세…)


다음글에서는 차원 감소 방법중에 마지막을 Matrix Factorization (행렬 인수 분해) 방법에 대해서 알아보도록 하겠다.






파이썬을 이용한 데이타 시각화 #1 - Matplotlib 기본 그래프 그리기


조대협 (http://bcho.tistory.com)


백앤드 엔지니어와 백그라운드를 가진 경험상, 머신러닝을 공부하면서 헷갈렸던 부분중 하나가, 데이타에 대한 시각화이다. 머신러닝은 모델을 구현하는 것도 중요하지만, 학습할 데이타를 선별하고, 만들어진 모델을 검증하기 위해서는 데이타를 이해하는 것이 필수적이고 이를 위해서는 데이타를 시각화 해서 보는 것이 매우 중요하다.


그동안 그래프를 그리는 것을 스택오버플로우등에서 찾아서 복붙을 해서 사용하다가 matplotlib를 정리해야겠다고 해서 메뉴얼을 봤지만 도무지 이해가 되지 않아서, 결국 온라인 강좌를 들어서 정리해봤는데, 역시 강좌를 들으니까는 훨씬 빠르게 이해가 된다.

참고한 코스는 datacamp에 있는 “Introduction to Data Visualization with Python” 코스이다.


오늘은 matplotlib를 이용하여 기본적인 그래프를 그리는 방법에 대해서 정리하도록 한다.

기본 그래프 그리기

기본적인 그래프를 그리기 위해서는 matplotlib.pyplot에서  plot(x,y)를 사용하면 된다. x,y는 각각 X축과 Y축의 값이 된다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y)
plt.show()


색깔 바꾸기

그래프를 그릴때 선의 색을 지정하기 위해서는 plot에서 인자로 컬러를 주면된다. 컬러표는 아래를 참고하면 되고 붉은색은 r, 파란색은 b으로 정의한다.

from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y,'r')
plt.show()





선 종류 변경하기

선을 그릴때, 다양한 선의 종류를 선택할 수 있다. 디폴트가 직선이고, 점으로 표현하는 마커나 점선등을 선택할 수 있다.

선의 선택은 plot에서 세번째 인자에 선의 종류를 지정하면 되고, 색을 같이 지정하려면 다음문자에 색을 지정하면 된다 다음은 동그란 마커 ‘o’를 붉은색 ‘r’로 표현하기 때문에, 세번째 인자를 ‘or’로 전달하였다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y,'or')
plt.show()




다음은 선에 대한 종류표이다.



라벨과 타이틀

그래프를 그릴때 그래프의 타이틀과 X,Y축의 라벨을 표현하기 위해서는 타이틀은 plt.title(“타이틀명"),  X,Y축에 대한 라벨은 plt.xlabel(‘X축 라벨명'), plt.ylabel(‘Y축 라벨명') 을 사용한다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y)
plt.xlabel('x axis')
plt.ylabel('y axis')
plt.title('matplotlib sample')
plt.show()



구간 확대/축소

그래프는 입력되는 x,y의 최소,최대 구간으로 자동으로 그려지는데, 이 구간을 키우거나 줄이기 위해서 x,y의 구간을 정의할 수 있다. x축은 plt.xlim(최소,최대),  y축은 plt.ylim(최소,최대)로 정의하면 된다.

아래는 x축을 2~3, y축을 5~20으로 확대해서 그래프를 그리는 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.xlim(2,3)
plt.ylim(5,20)
plt.plot(x,y)
plt.xlabel('x axis')
plt.ylabel('y axis')
plt.title('matplotlib sample')
plt.show()



레전드

그래프를 그릴때 여러개의 그래프를 같이 그릴 수 있는데, 이경우 각 그래프가 구분이 안되기 때문에, 그래프마다 라벨을 달고 이 라벨명을 출력할 수 있는데, 이를 legend라고 한다.

아래는 first와 second 라는 두개의 그래프를 그리고, 우측 상단에 legend를 표현한 예이다.

legend를 사용하기 위해서는 plt.plot에서 label 변수에 그래프의 이름을 정의하고, plt.legend(‘위치')를 정해주면  legend를 그래프상에 표현해주는데, legend의 위치는 아래 표를 참고하면 된다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10,0.1)
y = x*0.2
y2 = np.sin(x)

plt.plot(x,y,'b',label='first')
plt.plot(x,y2,'r',label='second')
plt.xlabel('x axis')
plt.ylabel('y axis')
plt.title('matplotlib sample')
plt.legend(loc='upper right')
plt.show()



어노테이션

다음은 어노테이션이라는 기능으로, 그래프에 화살표를 그린후, 그 화살표에 문자열을 출력하는 기능이다. 예를들어 “이값이 최소값" 이런식으로 화살표를 그려서 표현할때 사용하는데 plt.annotate 함수를 사용하면 된다.

plt.annotate(‘문자열',xy,xytext,arrowprops) 식으로 사용한다.

문자열은 어노테이션에서 나타낼 문자열이고, xy는 화살표가 가르키는 점의 위치, xytext는 문자열이 출력될 위치, arrowprops는 화살표의 속성으로 칼라등을 정의한다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y)
plt.annotate('annotate',xy=(2,10),xytext=(5,20),arrowprops={'color':'green'})
plt.show()



서브플롯

여러개의 그래프를 그리고 싶을때가 있는데, 이 경우 서브플롯이라는 것을 사용한다. 서브플롯은 그래프가 그려질 위치를 격자형으로 지정하는데, plt.subplot(nrow,ncol,pos) 식으로 사용한다.

nrow,ncol은 그래프를 그린 plain의 크기를 지정하는데, 3,2면 3줄로, 가로는 2칸으로 된 그래프 plain 설정한다. 그리고 마자막 pos는 몇번째 plain에 그래프를 그릴지 지정하는데, 아래와 같이 상단에서 부터 우측,아래 방향으로 1,2,3,4,5,6 순서가 된다.


1

2

3

4

5

6



아래 그림은 2,1 크기의 plain 을 만들어놓고 그래프를 위,아래로 두개를 그리는 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.subplot(2,1,1)
plt.plot(x,y1)
plt.subplot(2,1,2)
plt.plot(x,y2)
plt.show()



아래 그림은 한줄의 두칸 plain을 만들어놓고, 좌우에 두개의 그래프를 그린 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.subplot(1,2,1)
plt.plot(x,y1)
plt.subplot(1,2,2)
plt.plot(x,y2)
plt.show()




다음은 2x2 plain으로 4개의 그래프를 그린 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.subplot(2,2,1)
plt.plot(x,y1)
plt.subplot(2,2,2)
plt.plot(x,y2)
plt.subplot(2,2,3)
plt.plot(x,y3)
plt.subplot(2,2,4)
plt.plot(x,y4)
plt.show()


그래프 사이즈

그래프를 크게 그리고 싶을때 그래프 자체의 크기를 변경할 수 있는데, plt.figure를 이용하여 figsize=(가로,세로)를 인자로 주면 그래프가 그려질 전체 그림의 크기를 조절할 수 있다. 아래는 20x5 크기로 그래프를 그릴 크기를 지정하는 예제이다.


import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.figure(figsize=(20,5))
plt.subplot(2,2,1)
plt.plot(x,y1)
plt.subplot(2,2,2)
plt.plot(x,y2)
plt.subplot(2,2,3)
plt.plot(x,y3)
plt.subplot(2,2,4)
plt.plot(x,y4)
plt.show()




지금까지 간단하게 matplotlib를 이용하여 기본 그래프를 그리는 방법에 대해서 알아보았다. 다음글은 바차트,히스토그램등 다양한 그래프 타입에 대해서 알아본다.



노트7의 소셜 반응을 분석해 보았다. 


#2 구현하기


조대협 (http://bcho.tistory.com)

지난번 글 http://bcho.tistory.com/1136에 이어서, 트위터를 통한 소셜 반응을 분석하는 시 스템을 구축하는 방법에 대해서 알아본다. 

시나리오 및 아키텍쳐

스트리밍 처리와 데이타 플로우에 대한 개념 이해가 끝났으면 이제 실제로 실시간 분석 애플리케이션을 만들어보자.

SNS를 이용한 마케팅 분석에서 대표적인 시나리오중 하나는 트위터 피드를 분석하여, 사람들의 반응을 분석하는 시나리오이다. 자주 언급 되는 단어나 형용사를 분석함으로써, 특정 제품이나 서비스에 대한 소셜 네트워크상의 바이럴 반응을 분석할 수 있는데, 여기서  구현하고자 하는 시나리오는 다음과 같다. 트위터 피드에서 특정 키워드로 트윗 문자열들을 수집한 후에, 구글의 자연어 분석 API를 통하여 트윗 문자열에서 명사와 형용사를 추출한다

추출한 명사와 형용사의 발생 횟수를 통계내어서 대쉬보드에 출력하는 시나리오이다.


이를 구현하기 위한 솔루션 아키텍쳐는 다음과 같다.


fluentd를 이용하여 트위터의 특정 키워드를 기반으로 트위터 피드를 수집하고, 수집된 피드들은 구글 클라우드의 큐 서비스인 Pub/Sub으로 전달된다. 전달된 데이타는 데이타 플로우에서 읽어서 필요한 데이타만 필터링한 후, 구글의 자연어 분석 API를 통해서 명사와 형용사를 분리한다.

분리된 명사와 형용사는 데이타플로우에서 30초 주기의 고정윈도우(Fixed Window) 단위로, 명사에서 발생한 단어의 수와, 형용사에서 발생한 단어의 수를 카운트 한 다음에, 빅쿼리에 명사 테이블과 형용사 테이블에 저장한다.

저장된 데이타는 구글의 리포팅 도구인 데이타 스튜디오를 통해서 그래프로 출력한다.


구현

그러면 위에서 설명한 아키텍쳐대로 시스템을 하나씩 구현해보자.

전체 예제 코드와 설정 파일은 https://github.com/bwcho75/googledataflow/tree/master/twitter 에서 받아볼 수 있다.

트위터 피드 수집 서버 설정

먼저 트위터에서 피드를 수집하기 위해서 fluentd 에이전트를 설정한다. 구글 컴퓨트 엔진에서 VM을 생성한 후에, 앞의 빅쿼리 예제에서 한것과 마찬가지로 fluentd 에이전트를 설치한다.

VM을 설치할때, 반드시 Cloud API access scopes를 full API access로 설정해야 하는데, 이 VM에서 fluentd를 통해서 수집한 피드를 Pub/Sub으로 전달할때, Pub/Sub API를 사용하기 때문이다.


Fluentd 가 설치되었으면 Pub/Sub으로 데이타를 전달하기 때문에,Fluentd pub/sub 에이전트를 추가설치 한다.

에이전트명은 “fluent-plugin-gcloud-pubsub”로

% sudo td-agent-gem install fluent-plugin-gcloud-pubsub

명령을 이용해서 설치한다.


에이전트 설치가 끝났으면 fluentd 에이전트 설정을 해야 한다.

다음은 트위터에서 “note7”에 관련된 피드를 읽어서 pub/sub 큐로 피드를 전송하는 fluentd 설정 예제이다.


<source>

 type twitter

 consumer_key        트위터 Consumer Key

 consumer_secret     트위터 Consumer Secrect

 oauth_token         트위터 Access Token

 oauth_token_secret  트위터 Access Token Secrect

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             note7

 output_format       nest                    # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type gcloud_pubsub

 project 본인의 프로젝트명

 topic projects/본인의 프로젝트명/topics/twitter

 key 다운로드받은 구글 클라우드 억세스 토큰 JSON 파일

 flush_interval 10

 autocreate_topic false

</match>


Fluentd 설정이 끝났다.

Pub/Sub 큐 설정

다음으로는 fluentd 읽어드린 트위터 피드를 받아드를 Pub/Sub 큐를 생성한다.

큐 생성 방법에 대해서는 앞의 Pub/Sub 챕터를 참고하기 바란다. (http://bcho.tistory.com/1120)

큐 이름은 twitter라고 한다. 전체 큐 이름은 “projects/본인 프로젝트명/twitter” 가 된다.

데이타 플로우 프로젝트 생성

큐까지 데이타를 읽어드렸으면, 이 데이타를 처리할 데이타 플로우 파이프라인을 구현한다.

이클립스에서 데이타 플로우 파이프라인 프로젝트를 생성하자. 프로젝트 생성은 앞장의 “데이타 플로우 개발환경 설정" 부분을 참고하기 바란다. (http://bcho.tistory.com/1128)


프로젝트가 생성되었으면, 이 프로젝트에서 사용할 의존성 라이브러리들을 메이븐 (maven) 빌드 스크립트인 pom.xml에 추가해준다.

추가해야 하는 API는 JSON 파싱을 위한 javax.json-api와, javax.json 그리고 구글의 자연서 분석 API를 호출하기 위한 google-api-client와 google-api-service-language 모듈이다.


다음 코드 블럭을 <dependencies> 엘리먼트 아래 하부 엘리먼트로 추가해준다


   <dependency>

   <groupId>javax.json</groupId>

   <artifactId>javax.json-api</artifactId>

   <scope>provided</scope>

   <version>1.0</version>

</dependency>

<dependency>

   <groupId>org.glassfish</groupId>

   <artifactId>javax.json</artifactId>

   <version>1.0.4</version>

</dependency>

<!-- NL API dependency -->

<dependency>

     <groupId>com.google.apis</groupId>

     <artifactId>google-api-services-language</artifactId>

     <version>v1beta1-rev7-1.22.0</version>

   </dependency>

   <dependency>

     <groupId>com.google.api-client</groupId>

     <artifactId>google-api-client</artifactId>

     <version>1.22.0</version>

   </dependency>


데이타 플로우 코드 작성

전체 파이프라인 흐름

파이프라인 코드 작성에 앞서서 전체 파이프라인 흐름을 살펴보자

전체 흐름은 다음과 같다.


  1. Read From PubSub
    PubSub의 “twitter” 큐에서 JSON 형태의 트위터 메세지를 읽는다.

  2. Parse Twitter
    트위터 JSON 메세지를 파싱한 후, 전체 메세지에서 트윗 메세지를 저장하고 있는 “text” 필드와 언어셋을 정의하고 있는 “lang” 필드만 추출한다.
    자연어 분석 API가 아직 영어, 스페인어, 일본어만 지원하기 때문에, 이 예제에서는 영어로 트윗만 추출하도록 한다.

  3. NL Processing
    앞에서 추출한 트윗 메세지를 구글의 자연어 분석 API에 분석을 요청하여 명사와 형용사만 추출해낸다.

  4. 명사 처리 파이프라인
    다양한 처리 방식을 보여주기 위해서, 이 예제에서는 하나의 데이타 스트림을 분기 처리하여 두개의 데이타 파이프라인에서 처리하는 방식으로 구현하였다. 명사 처리 파이프라인은 다음과 같은 단계를 거친다.

    1. Noun Filter
      명사와 형용사 리스트로 들어온 데이타 중에서 명사만 필터링 한다.

    2. Window 적용
      고정 크기 윈도우 (Fixed Window) 30초를 적용하여, 30초 단위로 데이타를 분석하도록 한다.

    3. Count.PerElement
      명사 단어와, 각 단어별 발생횟 수를 30초 단위로 모아서 카운트 한다.

    4. Noun Formating
      카운트된 결과를 빅쿼리에 쓰도록, [윈도우 시작 시간,명사 단어, 발생횟수] 형태의 빅쿼리 ROW(행) 데이타 타입으로 포매팅 한다.

    5. Write Noun Count to BQ
      포매팅 된 데이타를 빅쿼리에 쓴다.

  5. 형용사 처리 파이프라인
    형용사를 처리하는 파이프라인도 내용은 명사를 처리한 파이프라인과 다르지 않고 동일하게 다음과 같은 순서를 따른다.

    1. Adj Filter

    2. Window 적용

    3. Count.PerElement

    4. Adj Formating

    5. Write Adj Count to BQ

빅쿼리 데이타 구조

빅쿼리에는 두개의 테이블에 데이타를 나눠서 저장하였다.

명사와 형용사 테이블로 각각의 테이블 명과 구조는 다음과 같다.


명사 테이블 : noun

필드명

데이타 타입

date

TIMESTAMP

noun

STRING

count

INTEGER


형용사 테이블 : adj

필드명

데이타 타입

date

TIMESTAMP

adj

STRING

count

INTEGER

자연어 분석 클래스 작성

전체 데이타 흐름과 저장 구조가 이해되었으면, 파이프라인 코드 작성에 앞서서 자연어 처리 API를 호출하는 로직을 만들어보자


우리가 사용할 API는 String으로 문자열을 주면 다음과 같이 NLAnalyzeVO 객체로 분석 결과를 리턴해주는 코드이다.


package com.terry.nl;


import java.util.ArrayList;

import java.util.List;


public class NLAnalyzeVO {

List<String> nouns = new ArrayList<String>();

List<String> adjs = new ArrayList<String>();

List<String> emoticons = new ArrayList<String>();

float sentimental;


public List<String> getNouns() {

return nouns;

}


public List<String> getAdjs() {

return adjs;

}


public List<String> getEmoticons() {

return emoticons;

}


public float getSentimental() {

return sentimental;

}


public void setSentimental(float sentimental) {

this.sentimental = sentimental;

}

public void addNouns(String n){

nouns.add(n);

}

public void addAdj(String a){

adjs.add(a);

}

public void addEmoticons(String e){

emoticons.add(e);

}

}

<NLAnalyzeVO.java>


분석 결과로는 List<String> 타입으로 명사들의 목록을 nouns 로, 형용사들의 목록을 adj로 리턴해준다. float형으로 sentimental 이라는 필드에는 입력된 문장의 감정도를 리턴하도록 되어 있다. 음수값일 때는 부정적, 양수값일 경우에는 긍정을 의미한다.

VO안에는 List<String> emoticons 라는 필드가 있는데, 이는 트위터 메세지 내의 이모티콘을 추출하여 저장하기 위한 필드인데, 이 예제에서는 사용하지 않으니 신경 쓰지 않아도 된다.


package com.terry.nl;


import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;

import com.google.api.client.http.HttpRequest;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.json.JsonFactory;

import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI.Documents.AnnotateText;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPIScopes;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesResponse;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentResponse;

import com.google.api.services.language.v1beta1.model.AnnotateTextRequest;

import com.google.api.services.language.v1beta1.model.AnnotateTextResponse;

import com.google.api.services.language.v1beta1.model.Document;

import com.google.api.services.language.v1beta1.model.Entity;

import com.google.api.services.language.v1beta1.model.Features;

import com.google.api.services.language.v1beta1.model.Sentiment;

import com.google.api.services.language.v1beta1.model.Token;


import java.io.IOException;

import java.io.PrintStream;

import java.security.GeneralSecurityException;

import java.util.List;

import java.util.Map;


/**

*

* Google Cloud NL API wrapper

*/



@SuppressWarnings("serial")

public class NLAnalyze {


public static NLAnalyze getInstance() throws IOException,GeneralSecurityException {


return new NLAnalyze(getLanguageService());

}


public NLAnalyzeVO analyze(String text) throws IOException, GeneralSecurityException{

Sentiment  s = analyzeSentiment(text);

List <Token> tokens = analyzeSyntax(text);

NLAnalyzeVO vo = new NLAnalyzeVO();


for(Token token:tokens){

String tag = token.getPartOfSpeech().getTag();

String word = token.getText().getContent();


if(tag.equals("NOUN")) vo.addNouns(word);

else if(tag.equals("ADJ")) vo.addAdj(word);

}


vo.setSentimental(s.getPolarity());


return vo;

}



/**

* Be sure to specify the name of your application. If the application name is {@code null} or

* blank, the application will log a warning. Suggested format is "MyCompany-ProductName/1.0".

*/

private static final String APPLICATION_NAME = "Google-LanguagAPISample/1.0";


/**

* Connects to the Natural Language API using Application Default Credentials.

*/

public static CloudNaturalLanguageAPI getLanguageService()

throws IOException, GeneralSecurityException {

GoogleCredential credential =

GoogleCredential.getApplicationDefault().createScoped(CloudNaturalLanguageAPIScopes.all());

JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

return new CloudNaturalLanguageAPI.Builder(

GoogleNetHttpTransport.newTrustedTransport(),

jsonFactory, new HttpRequestInitializer() {

@Override

public void initialize(HttpRequest request) throws IOException {

credential.initialize(request);

}

})

.setApplicationName(APPLICATION_NAME)

.build();

}


private final CloudNaturalLanguageAPI languageApi;


/**

* Constructs a {@link Analyze} which connects to the Cloud Natural Language API.

*/

public NLAnalyze(CloudNaturalLanguageAPI languageApi) {

this.languageApi = languageApi;

}


public List<Token> analyzeSyntax(String text) throws IOException{

AnnotateTextRequest request =

new AnnotateTextRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"))

.setFeatures(new Features().setExtractSyntax(true))

.setEncodingType("UTF16");

AnnotateText analyze =

languageApi.documents().annotateText(request);


AnnotateTextResponse response = analyze.execute();


return response.getTokens();


}

/**

* Gets {@link Sentiment} from the string {@code text}.

*/

public Sentiment analyzeSentiment(String text) throws IOException {

AnalyzeSentimentRequest request =

new AnalyzeSentimentRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"));

CloudNaturalLanguageAPI.Documents.AnalyzeSentiment analyze =

languageApi.documents().analyzeSentiment(request);


AnalyzeSentimentResponse response = analyze.execute();

return response.getDocumentSentiment();

}


}


<NLAnalyze.java>


코드 상의 주요 부분을 살펴보자

public NLAnalyzeVO analyze(String text)

메서느가 주요 메서드로, 트윗 문자열을 text 인자로 넘겨주면 분석 결과를 NLAnalyzeVO로 리턴한다.

이 메서드 안에서는 두개의 메서드를 호출하는데, analyzeSentiment(text) 와, analyzeSyntax(text)

를 두개 호출한다.

analyzeSentiment(text) 메서드는 text 를 넣으면 float 타입으로 감정도인 Sentinetal 지수를 리턴한다.

analyzeSyntax(text)는 구문을 분석하여, 명사,형용사,접속사,조사 등과 단어간의 의존 관계등을 분석해서 리턴해주는데, Token 이라는 데이타 타입의 리스트 형태로 다음과 같이 리턴한다.

List <Token> tokens = analyzeSyntax(text);


여기서 단어의 형(명사,형용사)는 token에서 tag 라는 필드를 통해서 리턴되는데, 우리가 필요한것은 명사와 형용사만 필요하기 때문에, tag가 NOUN (명사)와 ADJ (형용사)로 된 단어만 추출해서 NLAnalyzeVO 객체에 넣어서 리턴한다. (태그의 종류는 https://cloud.google.com/natural-language/reference/rest/v1beta1/documents/annotateText#Tag ) 를 참고하기 바란다.


중요

이 코드를 이용해서 구글 클라우드의 자연어 분석 API를 호출할때 그러면 API 인증은 어떻게 할까? 보통 구글 클라우드 콘솔에서 다운 받는 서비스 어카운트 키 (Service Account Key) JSON 파일을 사용하는데, 구글 자연어 분석 API를 호출하기 위해서도 서비스 어카운트 키가 필요하다.

이 키를 콘솔에서 다운로드 받은 후에, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 서비스 어카운트 키의 경로를 지정해주면 된다.


예) export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your-project-credentials.json


자연어 분석 클래스를 다 만들었으면 테스트 코드를 만들어서 테스트를 해보자.

다음은 JUnit 4.X를 이용한 간단한 테스트 코드 이다.


package com.terry.nl.test;


import static org.junit.Assert.*;


import java.io.IOException;

import java.security.GeneralSecurityException;

import java.util.List;


import org.junit.Test;


import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


public class NLAnalyzeTest {


@Test

public void test() {

try {

NLAnalyze instance = NLAnalyze.getInstance();

String text="Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'";

NLAnalyzeVO vo = instance.analyze(text);

List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();

System.out.println("### NOUNS");

for(String noun:nouns){

System.out.println(noun);

}

System.out.println("### ADJS");

for(String adj:adjs){

System.out.println(adj);

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("API call error");

} catch (GeneralSecurityException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("Security exception");

}

}


}


"Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'" 문자열을 분석하여,  명사와 형용사를 추출하여 다음과 같이 결과를 출력해준다.

### NOUNS

Larry

Page

Google

co-founder

search

engine

something

### ADJS

perfect

파이프라인 코드 작성

이제 메인 파이프라인 개발을 위한 준비가 다 되었다. 이제 TwitterPipeline 이라는 이름으로 파이프라인을 구현해보자. 전체 코드는 다음과 같다.

package com.terry.dataflow;


import java.io.IOException;

import java.io.StringReader;

import java.security.GeneralSecurityException;

import java.util.ArrayList;

import java.util.List;


import javax.json.Json;

import javax.json.JsonObject;

import javax.json.JsonReader;


import org.joda.time.DateTime;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.joda.time.format.DateTimeFormat;

import org.joda.time.format.DateTimeFormatter;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.io.BigQueryIO;

import com.google.cloud.dataflow.sdk.io.PubsubIO;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;

import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;

import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;

import com.google.cloud.dataflow.sdk.transforms.windowing.Window;

import com.google.cloud.dataflow.sdk.values.KV;

import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


import com.google.cloud.dataflow.sdk.values.PCollection;


/**

* A starter example for writing Google Cloud Dataflow programs.

*

* <p>The example takes two strings, converts them to their upper-case

* representation and logs them.

*

* <p>To run this starter example locally using DirectPipelineRunner, just

* execute it without any additional parameters from your favorite development

* environment.

*

* <p>To run this starter example using managed resource in Google Cloud

* Platform, you should specify the following command-line options:

*   --project=<YOUR_PROJECT_ID>

*   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>

*   --runner=BlockingDataflowPipelineRunner

*/

public class TwitterPipeline {

private static final Logger LOG = LoggerFactory.getLogger(TwitterPipeline.class);

private static final String NOWN_TABLE=

"useful-hour-138023:twitter.noun";

private static final String ADJ_TABLE=

"useful-hour-138023:twitter.adj";


// Read Twitter feed as a JSON format

// extract twitt feed string and pass into next pipeline

static class ParseTwitterFeedDoFn extends DoFn<String,String>{


private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}


// Parse Twitter string into

// - list of nouns

// - list of adj

// - list of emoticon


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{ /**

*

*/

private static final long serialVersionUID = 3013780586389810713L;


// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}



static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}

}

}


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}


}


static class AddTimeStampAdj extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("adj", key)

.set("count", value);


c.output(row);

}


}

static class AdjFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("ADJ")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

c.output(value);

}

}

}


static class Echo extends DoFn<KV<String,Iterable<String>>,Void>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

}

}


}

public static void main(String[] args) {

Pipeline p = Pipeline.create(

PipelineOptionsFactory.fromArgs(args).withValidation().create());


@SuppressWarnings("unchecked")

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));



// Noun handling sub-pipeline

List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


// Adj handling sub-pipeline

fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("adj").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Adj Formating").of(new AddTimeStampAdj()) )

.apply(BigQueryIO.Write

.named("Write Adj Count to BQ")

.to( ADJ_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));





p.run();

}


}

<TwitterPipeline.java>

코드를 하나씩 분석해보자.

먼저 main함수 부분을 보자

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));

<TwitterPipeline.java에서 main() 함수 일부 >


파이프 라인이 시작되면, PubSubIO를 이용하여 “projects/useful-hour-138023/topics/twitter” 이름의 큐에서 데이타를 읽는다. 읽은 데이타는 ParseTwitterFeedDoFn() 라는 함수에서 파싱이 된다.

ParseTwitterFeedDoFn() 은 다음과 같다.


static class ParseTwitterFeedDoFn extends DoFn<String,String>{

private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}

<TwitterPipeline.java 에서 ParseTwitterFeedDoFn 클래스 구현부>


PubSub에서 읽어드린 데이타는 문자열로 안에 JSON 데이타를 가지고 있다. 이 JSON 문자열을 파싱해서 “text”와 “lang” 엘리먼트만 추출한 후에, “lang”이 “en”(영어) 인 경우에만 다음 파이프라인으로 “text”에서 추출한 문자열을 보내고, 영어가 아닌 경우에는 데이타를 무시한다.


다음은 NLAnalyticsDoFn에서 트윗 문자열을 받아서 자연어 분석을 한다.


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{

// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}

<TwitterPipeline.java 에서 NLAnalyticsDoFn 클래스 구현부>


앞에서 작성한 자연어 분석 클래스인 NLAnalyze 클래스를 이용하여 text를 넘기고, 리턴 값으로 NLAnalyzeVO를 리턴 값으로 받은 후, 명사는 KV<String,Iterable<String>> 타입으로 다음과 같을 저장해서 c.output을 이용해서 다음 파이프라인으로 넘기고

“NOUN”

명사1,명사2,명사3,...


마찬가지 방법으로 형용사도 같은 데이타 형인 KV<String,Iterable<String>> 타입으로 저장하여 다음 파이프라인으로 넘긴다.


이 데이타를 각각 명사와 형용사 두개의 처리 파이프라인으로 전달하는데, 두개의 파이프라인으로 단일 데이타를 보내는 방법은 다음과 같다.


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

: (중략)


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

: (중략)

nlprocessed는 PCollection 타입으로, NLAnalyticsDoFn에 의해서 처리된 결과이다.

이 결과 값에 두 개의 각각 다른 트랜스폼 (NounFilter와 AdjFilter)를 적용하였다.

이렇게 하나의 PCollection 값에 두 개의 트랜스폼을 각각 적용하면 적용된 각각의 파이프라인은 다른 파이프라인으로 아래 그림 처럼 분기 처리가 된다.


자아 그러면, 명사 처리 파이프라인 흐름을 따라가 보자

nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

< TwitterPipeline.java의 main() 함수 일부>


첫번째 NounFilter에서는 앞에 파이프라인에서 들어온 명사와 형용사중에서 명사만 필터링 해서 다음 파이프라인으로 전달한다.


static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}


}

}

<TwitterPipeline.java 에서 NounFilter 클래스 구현부>

명사 인지 형용사 인지는 앞에서 넘어오는 데이타 형이 KV<String, .. > 인데, 키 부분의 값이 “NOUN” 일 경우에 명사이기 때문에, 이 값이 아니면 무시한다. 명사인경우에도 종종 쓰레기 값이 들어오는데, 예를 들어 트위터 특성상 해쉬 태그등을 위해서 “#”이 사용되고, 링크를 위해서 “http…” 링크가 들어가기도 하는데 이는 명사가 아니기 때문에 이 내용은 모두 필터링해서 무시한다.


이렇게 정재된 데이타는 파이프라인의 다음 단계인 .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))) 를 통해서 30초 단위의 고정 윈도우가 적용되고, 다음  .apply(Count.<String>perElement()) 을 통해서 단어별로 그룹핑되서 카운트 되고 그 결과는 앞서 적용한 30초 윈도우 시간 단위로 다음 파이프 라인으로 전달된다.  전달되는 데이타의 모양은 대략 다음과 같다.

Key (String)

Value (Long)

airplane

100

boy

29

india

92


이렇게 전달된 데이타는 빅쿼리에 저장하기 위해서 빅쿼리의 ROW 데이타 타입은 TableRow로 변환한다.

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


<TwitterPipeline.java 에서 main() 함수중 >

AddTimeStampNoun()에서 이 작업을 수행하는데, 이 함수는 윈도우의 시간을 추출하여 data라는 필드에 추가해준다.  아래는 AddTimeStampNoun()  함수의 코드이다.


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}

}

<TwitterPipeline.java 에서 AddTimeStampNoun 클래스 구현부>


여기서 주의할점은 윈도우에 대한 데이타를 접근하기 위해서는 com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implementation 해야 한다.  그후에, 현재 윈도우에 대한 정보는 ProcessContext c 변수에서 c.window() 함수를 이용하면 윈도우의 정보를 읽어올 수 있다. 이 코드에서는 윈도우 시작 시간을 IntervalWindow w의 w.start() 를 통해서 읽어왔고, 이를 빅쿼리의 TIMESTAMP  데이타 타입으로 넣기 위해서 “yyyy-MM-dd HH:mm:ss” 형태로 포매팅을 한후 TableRow라는 빅쿼리의 row형 데이타 타입으로 생성한 후, 다음 파이프라인으로 넘겼다.


다음 파이프라인은 BigQueryIO로 Write 명령을 이용해서 NOWN_TABLE 에 (String 값은 “noun”) 데이타를 쓰도록 하였고, 쓰기 모드는 붙여쓰기 WRITE_APPEND로 하고, 테이블은 없으면 생성하도록 CREATE_IF_NEEDED로 지정하였다. 이때 테이블의 스키마를 정의해줘야 하는데, 테이블 스키마는 withSchema(schema) 함수로 지정을 했는데, 스키마를 정의한  schema 변수는 다음과 같이 정의 되어 있다.


List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);

<TwitterPipeline.java 에서 main() 중의 “noun” 테이블 스키마 정의 부분>


같은 방식으로 형용사를 처리하는 파이프라인도 정의를 한다음 정의가 끝났으면

p.run();

을 이용하여 파이프라인이 실행되도록 한다.

실행하기

모든 코드 구현이 끝났다. 이제, 파이프라인을 기동해보자

이클립스에서 파이프라인을 구동하는데, Run Configuration 부분을 아래와 같이 설정한다.


Runner를  DataflowPipelineRunner를 선택한다. BlockingPipeRunner의 경우에는 파이프라인이 기동되는 동안 이클립스에 프로그램이 실행중인것으로 되서, 이클립스에서 파이프라인을 멈춰버리면 전체 파이프라인이 멈추기 때문에 적절하지 않다.

다음 Argument 탭에서 아래와 같이 Program Argument에  --streaming 옵션을 추가한다.


데이타 플로우는 배치 및 스트리밍 모드 두개가 있는데, 이 예제는 스트리밍 예제이기 때문에, --streaming을 명시적으로 지정한다.

구글 자연어 분석 API에 대한 인증을 위해서 서비스 어카운트 키 (JSON 파일의 경로)를 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 설정해야 하는데, Environment 탭에서 New를 누른 후, GOOGLE_APPLICATION_CREDENTIALS를 Name으로 하고 Value 부분에 서비스 어카운트 키 파일의 경로를 적어준다.



환경 설정이 끝났으면 아래 Run 버튼을 눌러서 파이프라인을 기동시킨다.

파이프라인을 기동 시키면 구글 클라우드로 소스를 배포하고 인스턴스를 구동하는데 까지 수분이 걸리기 때문에 잠시 기다린다.

기다리는 동안 배포 상태를 보기 위해서, 구글 클라우드 콘솔로 들어가면 아래와 같이 Status가 Running으로 바뀔때 까지 기다린다.



Running으로 바뀌고 나서도 1~2분 정도 준비가 필요하기 때문에 기다렸다가 해당 JOB을 확인해보면 다음과 같이 잡이 정상적으로 기동 되고 있음을 확인할 수 있다.



작업이 실행되었으면 이 파이프라인에 데이타를 넣어주기 위한 Fluentd 에이전트를 실행해보자. Fluentd를 설치한 VM에 들어가서,

%sudo /etc/init.d/td-agent restart

명령을 이용해서 fluentd 에이전트를 가동한다.


데이타가 들어오기 시작하면 다시 구글 클라우드 콘솔의 데이타 플로우 화면을 보면 (위의 그림)

상단에 LOGS 라는 버튼을 볼 수 있는데



이 버튼을 누르면 죄측 하단에 다음과 같이 Job Logs라는 윈도우가 나타난다.



여기서 오른쪽의 “WORKER LOGS” 라는 버튼을 누르면 이 파이프라인의 전체 로그를 볼 수 있는데, 에러가 없는지를 잘 확인 한다.



별도의 에러가 없다면 정상적으로 데이타가 수집된다고 할 수 있다.

그러면 데이타가 제대로 수집되는지를 확인해보자

빅쿼리 콘솔로 들어가서 select count(*) from [noun 테이블명] LIMIT 1000

을 수행해서 데이타가 제대로 들어오는지 확인해보자


위의 그림과 같이 f0_가 0 이상이면 데이타가 쌓이고 있다고 생각해도 된다.

데이타 시각화와 분석

데이타 스튜디오(Google datastudio) 를 이용한 데이타 분석

쌓여 있는 데이타를 실제로 분석해보자. 리포트를 이용해서 시각화를 할 예정인데, 여기서 사용한 리포팅 도구는 구글 데이타스튜디오 라는 리포트 도구이다. (http://datastudio.google.com) 으로 9월 현재는 미국 지역만을 대상으로 서비스가 되고 있고, 곧 한국에 서비스가 오픈될 예정이다.


우리가 만들려고 하는 리포트는 다음과 같은 모양을 갖는다

전체 기간동안 가장 많이 발생한 명사 10개와 그 발생 회수를 표로 출력해주고, 우측에는 전체기간이 아닌 일자별로 많이 발생한 명사 10개에 대한 발생 회수 및 그 변화 추이를 출력해준다.

다음 행에는 형용사에 대한 분석 결과를 출력해준다.



새로운 리포트 생성

데이타 스튜디오 메인 화면에 들어오면 작성한 리포트 목록들이 아래와 같이 출력된다.


여기서 + 버튼을 누르면 아래와 같이 새로운 리포트를 생성할 수 있다.


새로운 리포트 화면에 들어오면 우측 하단에 “CREATE NEW DATA SOURCE”라는 버튼이 나타나는데, 이를 통해서 빅쿼리 테이블을 불러올 수 있다. “CREATE NEW DATA SOURCE” 버튼을 눌러보자


데이타 소스 생성해서 빅쿼리 테이블을 불러와야하는데, 데이타 스튜디오는 아래 그림에서와 같이 빅쿼리 뿐만 아니라 구글의 MySQL 서비스인 CloudSQL에서 부터 일반 MySQL 까지 연결이 가능하기 때문에, 빅쿼리 뿐 아니라 일반 데이타 분석에서 분석된 데이타르르 MySQL을 통해서 리포트로 시각화할 수 있고,

Google Sheet에 있는 데이타를 불러와서 같이 표현할 수 있는 기능을 제공하는데, 이는 특히 비지니스나 영업쪽에서 작성한 Sheet의 데이타를 실시간으로 읽어다가 하나의 리포트에 표현할 수 있기 때문에 매우 유용하게 사용할 수 있다.

아울러 YouTube나 Google Analytics 그리고, Adwords 광고 플랫폼등 다양한 구글 플랫폼의 데이타를 읽어서 시각화할 수 있다.


연동 소스 중에서 빅쿼리를 선택한 다음 프로젝트와 데이타셋 그리고 연동하고자 하는 테이블을 선택한다. 여기서는 noun 테이블을 선택하였다. 그러면 다음과 같이 테이블 스키마가 나오고 ADD TO REPORT 버튼이 나온다.


ADD TO REPORT를 눌러서 리포트에 추가하자

다음 리포트 화면에서 다음과 같이 Table 버튼을 눌러서 테이블을 추가하자


테이블을 추가하면 우측에 테이블에 출력하고자 하는 데이타를 선택할 수 있다.


우측에 Data source는 아까 불러들인 “noun”테이블을 선택하고, Dimension은 noun을 선택하고, Metric은 count를 추가하면 명사(noun)별, 발생횟수 (count)를 출력해준다.

위의 표를 보면 note7 등의 단어가 나오는데, 당연히 note7에 대해 검색했기 때문에  note7 단어가 많이 나오겠지만 이는 분석에서 얻고자 하는 데이타가 아니기 때문에 이 note7 등의 불필요한 문자열은 필터링해서 없애버리도록 한다.

필터는 우측 하단에 “Fiter”라는 메뉴에서 추가가 가능한데


메뉴에서 “+Add a filter”를 선택한후


Exclude (제외한다) 라는 버튼을 선택한 후에, Dimension을 noun으로 Match type을 Equal to 로 Expression을 note7 으로 선택하면 noun 필드에서 값이 note7인 내용은 제외 하도록 하는 필터이다. 필터가 적용되면 note7 단어는 필터링되서 출력되지 않는다.


다음으로 꺽은선 그래프를 추가하기 위해서 화면에서 Time series 버튼으로 꺽은선 그래프를 추가한다.



꺽은선 그래프가 추가되면 그래프에 출력될 데이타를 Time series Properties에서 다음과 같이 설정한다.


Data source는 noun 테이블로 하고, Dimention을 Time Dimention은 date로 하고, 여러 필드를 같이 분석하기 위해서 Breakdown Dimension에 noun을 추가하면 하나의 명사가 아니라 주요 명사들을 출력해준다.

그리고 Metric은 count로 선택하면 각 명사별 카운트수를 일자별로 볼 수 있다.


같은 방법으로 형용사에 대한 그래프도 추가한다.


그래프가 완성된 후에, 데이타를 수집 및 분석해보니 꽤나 의미가 있는 분석 결과를 얻을 수 있었다.


다음글에서는 오픈소스 데이타 분석 도구인 제플린을 이용하여 상세 데이타 분석을 하는 방법에 대해서 알아보기로 한다.

이글에서 소개한 데이타 스튜디오는 아직 한국에서는 서비스가 제공되지 않기 때문에, 한국에서 사용하고자 하는 사람들에게는 다음글의 제플린 기반의 데이타 분석이 훨씬 더 유용하리라 생각된다.





파이어베이스 애널러틱스를 이용한 모바일 데이타 분석

#4 주피터 노트북을 이용한 파이어베이스 데이타 분석 및 시각화

조대협 (http://bcho.tistory.com)

노트북의 개념

빅데이타 분석에서 리포팅 도구중 많이 사용되는 제품군 중의 하나가 노트북이라는 제품군이다. 대표적인 제품으로는 오픈소스 제품중 주피터(https://ipython.org/notebook.html) 와 제플린(https://zeppelin.apache.org/) 이 있다.

노트북은 비지니스에 전달하기 위한 멋진 액셀이나 대쉬보드와 같은 리포트 보다는 데이타를 다루는 데이타 과학자와 같은 사람들이 사용하는 분석도구인데, 제품의 이름 처럼 노트북의 개념을 가지고 있다.

예를 들어서 설명해보자 우리가 수학문제를 풀려면 연습장을 펴놓고 공식을 사용해가면서 하나하나 문제를 풀어나간다. 이처럼, 빅데이타 분석을 하려면, 여러데이타를 분석해가면서 그 과정을 노트하고 노트한 결과를 기반으로 다음 단계의 문제를 풀어나가는 것이 통상적인데, 노트북 소프트웨어는 문제 풀이에 있어서 기존의 연습장 노트와 같은 사용자 경험을 제공한다.

이러한 노트북 소프트웨어의 특징은 메모를 위한 글과, 계산을 위한 소스 코드를 한페이지에 같이 적을 수 있고, 이 소스 코드는 노트북 내에서 실행이 가능하고 결과도 같은 페이지에 출력해준다.


다음 화면은 본인이 작성했던 노트북의 일부로 딥러닝 프레임웍인 텐서플로우에 대해서 공부하면서 간단하게 문법과 샘플 코드를 노트북에 정리한 예이다.



데이타랩

구글의 데이타랩(https://cloud.google.com/datalab/) 은 오픈소스 주피터 노트북을 구글 클라우드 플랫폼에 맞게 기능을 추가한 노트북이다. 기본이 되는 주피터 노트북이 오픈소스이기 때문에, 데이타랩 역시 오프소스로 코드가 공개되어 있다.


데이타랩은 기본으로 파이썬 언어를 지원하며, 빅쿼리 연동등을 위해서 SQL과, 자바 스크립트를 지원한다.

또한 머신러닝의 딥러닝 프레임웍인 텐서플로우도 지원하고 있다.

데이타랩에서 연동할 수 있는 데이타는 구글 클라우드상의 VM이나, 빅쿼리, Google Cloud Storage

데이타랩은 오픈소스로 별도의 사용료가 부가되지 않으며, 사용 목적에 따라서 VM에 설치해서 실행할 수 도 있고, 로컬 데스크탑에 설치해서 사용할 수 도 있다. 도커로 패키징이 되어 있기 때문에 도커 환경만 있다면 손쉽게 설치 및 실행이 가능하다.

데이타 랩 설치

이 글에서는 로컬 맥북 환경에 데이타랩을 설치해서 데이타를 분석 해보도록 하자.

데이타 랩은 앞에서 언급한것과 같이 구글 클라우드 플랫폼 상의 VM에 설치할 수 도 있고, 맥,윈도우 기반의 로컬 데스크탑에도 설치할 수 있다. 각 플랫폼별 설치 가이드는  https://cloud.google.com/datalab/docs/quickstarts/quickstart-local 를 참고하기 바란다. 이 문서에서는 맥 OS를 기반으로 설치하는 방법을 설명한다.


데이타 랩은 컨테이너 솔루션인 도커로 패키징이 되어 있다. 그래서 도커 런타임을 설치해야 한다.

https://www.docker.com/products/docker 에서 도커 런타임을 다운 받아서 설치한다.

도커 런타임을 설치하면 애플리케이션 목록에 다음과 같이 고래 모양의 도커 런타임 아이콘이 나오는 것을 확인할 수 있다.



하나 주의할점이라면 맥에서 예전의 도커 런타임은 오라클의 버추얼 박스를 이용했었으나, 제반 설정등이 복잡하기 때문에, 이미 오라클 버추얼 박스 기반의 도커 런타임을 설치했다면 이 기회에, 도커 런타임을 새로 설치하기를 권장한다.

다음으로 도커 사용을 도와주는 툴로 Kitematic 이라는 툴을 설치한다. (https://kitematic.com/) 이 툴은 도커 컨테이너에 관련한 명령을 내리거나 이미지를 손쉽게 관리할 수 있는 GUI 환경을 제공한다.


Kitematic의 설치가 끝났으면 데이타랩 컨테이너 이미지를 받아서 실행해보자, Kitematic 좌측 하단의 “Dokcer CLI” 버튼을 누르면, 도커 호스트 VM의 쉘 스크립트를 수행할 수 있는 터미널이 구동된다.


터미널에서 다음 명령어를 실행하자


docker run -it -p 8081:8080 -v "${HOME}:/content" \

  -e "PROJECT_ID=terrycho-firebase" \

  gcr.io/cloud-datalab/datalab:local


데이타랩은 8080 포트로 실행이 되고 있는데, 위에서 8081:8080은  도커 컨테이너안에서 8080으로 실행되고 있는 데이타 랩을 외부에서 8081로 접속을 하겠다고 정의하였고, PROJECT_ID는 데이타랩이 접속할 구글 클라우드 프로젝트의 ID를 적어주면 된다.

명령을 실행하면, 데이타랩 이미지가 다운로드 되고 실행이 될것이다.

실행이 된 다음에는 브라우져에서 http://localhost:8081로 접속하면 다음과 같이 데이타랩이 수행된 것을 볼 수 있다.


데이타랩을 이용한 파이어베이스 애널러틱스 데이타 분석 (책에서는 위치 이동 할것 파이어 베이스로)

데이타랩이 설치되었으면, 파이어베이스 애널러틱스를 이용하여 빅쿼리에 수집한 로그를 분석해보자

데이타 랩에서 “+Notebook” 버튼을 눌러서 새로운 노트북을 생성하자

생성된 노트북으로 들어가서 “Add Code” 버튼을 누르고, 생성된 코드 블록 박스에 아래와 같은 SQL을 추가하자


%%sql

SELECT user_dim.app_info.app_instance_id, user_dim.device_info.device_category, user_dim.device_info.user_default_language, user_dim.device_info.platform_version, user_dim.device_info.device_model, user_dim.geo_info.country, user_dim.geo_info.city, user_dim.app_info.app_version, user_dim.app_info.app_store, user_dim.app_info.app_platform

FROM [terrycho-firebase:my_ios.app_events_20160830]


%%sql은 빅쿼리 SQL을 수행하겠다는 선언이다.

다음에 SQL 문장을 기술했는데, 테이블은 terrycho-firebase 프로젝트의 my_ios 데이타셋의 app_events_20160830 테이블에서 쿼리를 하였다.

2016년 8월 30일의 iOS 앱에서 올라온 사용자 관련 정보를 쿼리하는 내용이다. (디바이스 정보, 국가등)

다음은 쿼리 결과 이다.



다음 쿼리는 2016년 6월 1일의 안드로이드와 iOS 접속자에 대해서 국가별 사용자 수 통계를 내는 쿼리이다.


%%sql

SELECT

 user_dim.geo_info.country as country,

 EXACT_COUNT_DISTINCT( user_dim.app_info.app_instance_id ) as users

FROM

[firebase-analytics-sample-data:android_dataset.app_events_20160601],

 [firebase-analytics-sample-data:ios_dataset.app_events_20160601]

GROUP BY

 country

ORDER BY

 users DESC




다음은 2016년 6월 1일 사용자중, 안드로이드와 iOS 모두에서 사용자가 사용하는 언어별로 쿼리를 하는 내용이다.


%%sql

SELECT

 user_dim.user_properties.value.value.string_value as language_code,

 EXACT_COUNT_DISTINCT(user_dim.app_info.app_instance_id) as users,

FROM [firebase-analytics-sample-data:android_dataset.app_events_20160601],

 [firebase-analytics-sample-data:ios_dataset.app_events_20160601]

WHERE

user_dim.user_properties.key = "language"

GROUP BY

language_code

ORDER BY

users DESC


쿼리 결과



이번에는 차트를 사용하는 방법을 알아보자, 안드로이드 로그에서 이벤트 로그중에, 많이 나오는 로그 20개에 대한 분포도를 파이 차트로 그려내는 예제이다.

%%sql --module events

SELECT event_dim.name as event_name, COUNT(event_dim.name) as event_count  

FROM [firebase-analytics-sample-data:android_dataset.app_events_20160601]

GROUP BY event_name

ORDER BY event_count DESC

LIMIT 20


쿼리 결과를 --module 명령을 이용하여 events라는 모듈에 저장한후


%%chart pie --fields event_name,event_count --data events

title: Event count

height: 400

width: 800

pieStartAngle: 20

slices:

 0:

   offset: .2


구글 차트 명령을 이용하여 pie 차트를 그린다. 필드는 앞의 모듈에서 쿼리한 event_name과 event_count 필드를 이용하고, 데이타는 앞에서 정의한 “events” 모듈에서 읽어온다.

차트 실행 결과는 다음과 같다.



이외에도 Tensorflow 연동이나 GCS를 연동하는 방법, 그리고 구글 차트 이외에 일반 plot 함수를 이용하여 그래프를 그리는 등 다양한 기능을 제공하는데, 이에 대한 자세한 설명은 데이타랩을 설치하면 /docs/README.md 파일을 참조하면 다양한 가이드를 찾을 수 있다.