블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 

'Big Data'에 해당되는 글 20

  1. 2017.11.15 t-SNE를 이용한 차원 감소 (Dimension reduction)
  2. 2017.09.23 파이썬을 이용한 데이타 시각화 #1 - Matplotlib 기본 그래프 그리기 (1)
  3. 2017.01.24 한시간에 만드는 대용량 로그수집 분석 시스템 (1)
  4. 2016.10.04 수학포기자를 위한 딥러닝-#1 머신러닝과 딥러닝 개요 (5)
  5. 2016.09.20 노트7의 소셜 반응을 분석해 보았다. (3)
  6. 2016.08.25 실시간 데이타 분석 플랫폼 Dataflow - #5 데이타 플로우 프로그래밍 모델 (1)
  7. 2016.08.01 빅쿼리를 이용하여 두시간만에 트위터 실시간 데이타를 분석하는 대쉬보드 만들기 (3)
  8. 2016.07.31 빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개
  9. 2016.07.13 구글 클라우드의 대용량 분산 큐 서비스인 Pub/Sub 소개 #1
  10. 2016.07.04 실시간 빅데이타 처리를 위한 스트리밍 처리의 개념 (1)
  11. 2016.06.16 구글 빅데이타 플랫폼 빅쿼리 아키텍쳐 소개
  12. 2016.06.15 구글 빅데이타 플랫폼 빅쿼리(BIGQUERY)에 소개
  13. 2015.05.13 Zepplin (제플린) 설치하기
  14. 2015.01.25 Apache Storm을 이용한 실시간 데이타 처리 #4 –소개와 기본 개념 (2)
  15. 2015.01.12 Apache Storm을 이용한 실시간 데이타 처리 #2-Storm 설치와 HelloStorm 작성하기
  16. 2015.01.12 Apache Storm을 이용한 실시간 데이타 처리#1-데이타 스트림 개념 이해하기 (2)
  17. 2014.11.11 R 소개 #1 - R의 기본 데이타 형
  18. 2014.01.24 In memory dictionary Redis 소개 (18)
  19. 2012.12.01 빅데이타 분석을 위한 Amazon의 새 서비스 - redshift와 data pipe line
  20. 2012.02.21 NoSQL Riak Overview #1/2
 

t-SNE를 이용한 차원 감소


조대협 (http://bcho.tistory.com)


PCA 기반 차원 감소의 문제점

앞의 글에서 차원 감소에 대한 개념과, 차원 감소 알고리즘의 하나인 PCA 알고리즘에 대해서 살펴보았다.

PCA의 경우 선형 분석 방식으로 값을 사상하기 때문에 차원이 감소되면서 군집화 되어 있는 데이타들이 뭉게져서 제대로 구별할 수 없는 문제를 가지고 있다. 아래 그림을 보자


출처 https://www.youtube.com/watch?v=NEaUSP4YerM


이 그림은 2차원에서 1차원으로 PCA 분석을 이용하여 차원을 줄인 예인데, 2차원에서는 파란색과 붉은색이 구별이 되는데, 1차원으로 줄면서 1차원상의 위치가 유사한 바람에, 두 군집의 변별력이 없어져 버렸다.

t-SNE

이런 문제를 해결하기 위한 차원 감소 방법으로는 t-SNE (티스니라고 읽음) 방식이 있는데, 대략적인 원리는 다음과 같다.


먼저 점을 하나 선택한다. 아래는 검정색점을 선택했는데, 이 점에서 부터 다른점까지의 거리를 측정한다.



다음 T 분포 그래프를 이용하여, 검정 점(기준점) 을 T 분포 상의 가운데 위치한다면, 기준점으로부터 상대점 까지 거리에 있는 T 분포의 값을 선택(위의 T 분포 그래프에서 파란점에서 위로 점섬이 올라가서 T분포 그래프상에 붉은 색으로 X 표가 되어 있는 값)하여, 이 값을 친밀도 (Similarity)로 하고, 이 친밀도가 가까운 값끼리 묶는다.


이 경우 PCA 처럼 군집이 중복되지 않는 장점은 있지만, 매번 계산할때 마다 축의 위치가 바뀌어서, 다른 모양으로 나타난다. 단 데이타의 군집성과 같은 특성들은 유지 되기 때문에 시각화를 통한 데이타 분석에는 유용하지만, 매번 값이 바뀌는 특성으로 인하여, 머신러닝 모델의 학습 피쳐로 사용하기는 다소 어려운점이 있다.


아래 그림은 같은 데이타로 t-SNE 분석을 각각 한번씩한 결과를 시각화 해서 표현한 결과 인데, 보는 것과 같이 군집에 대한 특성은 그대로 유지 되지만 값 자체는 변화가 된것을 확인할 수 있다.




sklearn 을 이용한 t-SNE 구현

전체 코드는 https://github.com/bwcho75/dataanalyticsandML/blob/master/dimension%20reduction/2.%20t-SNE%20visualization.ipynb 에 공개되어 있으니 참고하기 바란다.


# Perform the necessary imports
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

model = TSNE(learning_rate=100)
transformed = model.fit_transform(feature)

xs = transformed[:,0]
ys = transformed[:,1]
plt.scatter(xs,ys,c=labels)

plt.show()


사실 코드가 너무 간단해서 설명할것이 없다. TSNE 객체를 선언하고 학습속도 (learning_rate)를 지정한다음 fit_transform 하면 끝이다. (싸이킷런 만세…)


다음글에서는 차원 감소 방법중에 마지막을 Matrix Factorization (행렬 인수 분해) 방법에 대해서 알아보도록 하겠다.






저작자 표시 비영리
신고

파이썬을 이용한 데이타 시각화 #1 - Matplotlib 기본 그래프 그리기


조대협 (http://bcho.tistory.com)


백앤드 엔지니어와 백그라운드를 가진 경험상, 머신러닝을 공부하면서 헷갈렸던 부분중 하나가, 데이타에 대한 시각화이다. 머신러닝은 모델을 구현하는 것도 중요하지만, 학습할 데이타를 선별하고, 만들어진 모델을 검증하기 위해서는 데이타를 이해하는 것이 필수적이고 이를 위해서는 데이타를 시각화 해서 보는 것이 매우 중요하다.


그동안 그래프를 그리는 것을 스택오버플로우등에서 찾아서 복붙을 해서 사용하다가 matplotlib를 정리해야겠다고 해서 메뉴얼을 봤지만 도무지 이해가 되지 않아서, 결국 온라인 강좌를 들어서 정리해봤는데, 역시 강좌를 들으니까는 훨씬 빠르게 이해가 된다.

참고한 코스는 datacamp에 있는 “Introduction to Data Visualization with Python” 코스이다.


오늘은 matplotlib를 이용하여 기본적인 그래프를 그리는 방법에 대해서 정리하도록 한다.

기본 그래프 그리기

기본적인 그래프를 그리기 위해서는 matplotlib.pyplot에서  plot(x,y)를 사용하면 된다. x,y는 각각 X축과 Y축의 값이 된다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y)
plt.show()


색깔 바꾸기

그래프를 그릴때 선의 색을 지정하기 위해서는 plot에서 인자로 컬러를 주면된다. 컬러표는 아래를 참고하면 되고 붉은색은 r, 파란색은 b으로 정의한다.

from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y,'r')
plt.show()





선 종류 변경하기

선을 그릴때, 다양한 선의 종류를 선택할 수 있다. 디폴트가 직선이고, 점으로 표현하는 마커나 점선등을 선택할 수 있다.

선의 선택은 plot에서 세번째 인자에 선의 종류를 지정하면 되고, 색을 같이 지정하려면 다음문자에 색을 지정하면 된다 다음은 동그란 마커 ‘o’를 붉은색 ‘r’로 표현하기 때문에, 세번째 인자를 ‘or’로 전달하였다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y,'or')
plt.show()




다음은 선에 대한 종류표이다.



라벨과 타이틀

그래프를 그릴때 그래프의 타이틀과 X,Y축의 라벨을 표현하기 위해서는 타이틀은 plt.title(“타이틀명"),  X,Y축에 대한 라벨은 plt.xlabel(‘X축 라벨명'), plt.ylabel(‘Y축 라벨명') 을 사용한다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y)
plt.xlabel('x axis')
plt.ylabel('y axis')
plt.title('matplotlib sample')
plt.show()



구간 확대/축소

그래프는 입력되는 x,y의 최소,최대 구간으로 자동으로 그려지는데, 이 구간을 키우거나 줄이기 위해서 x,y의 구간을 정의할 수 있다. x축은 plt.xlim(최소,최대),  y축은 plt.ylim(최소,최대)로 정의하면 된다.

아래는 x축을 2~3, y축을 5~20으로 확대해서 그래프를 그리는 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.xlim(2,3)
plt.ylim(5,20)
plt.plot(x,y)
plt.xlabel('x axis')
plt.ylabel('y axis')
plt.title('matplotlib sample')
plt.show()



레전드

그래프를 그릴때 여러개의 그래프를 같이 그릴 수 있는데, 이경우 각 그래프가 구분이 안되기 때문에, 그래프마다 라벨을 달고 이 라벨명을 출력할 수 있는데, 이를 legend라고 한다.

아래는 first와 second 라는 두개의 그래프를 그리고, 우측 상단에 legend를 표현한 예이다.

legend를 사용하기 위해서는 plt.plot에서 label 변수에 그래프의 이름을 정의하고, plt.legend(‘위치')를 정해주면  legend를 그래프상에 표현해주는데, legend의 위치는 아래 표를 참고하면 된다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10,0.1)
y = x*0.2
y2 = np.sin(x)

plt.plot(x,y,'b',label='first')
plt.plot(x,y2,'r',label='second')
plt.xlabel('x axis')
plt.ylabel('y axis')
plt.title('matplotlib sample')
plt.legend(loc='upper right')
plt.show()



어노테이션

다음은 어노테이션이라는 기능으로, 그래프에 화살표를 그린후, 그 화살표에 문자열을 출력하는 기능이다. 예를들어 “이값이 최소값" 이런식으로 화살표를 그려서 표현할때 사용하는데 plt.annotate 함수를 사용하면 된다.

plt.annotate(‘문자열',xy,xytext,arrowprops) 식으로 사용한다.

문자열은 어노테이션에서 나타낼 문자열이고, xy는 화살표가 가르키는 점의 위치, xytext는 문자열이 출력될 위치, arrowprops는 화살표의 속성으로 칼라등을 정의한다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y = x*5

plt.plot(x,y)
plt.annotate('annotate',xy=(2,10),xytext=(5,20),arrowprops={'color':'green'})
plt.show()



서브플롯

여러개의 그래프를 그리고 싶을때가 있는데, 이 경우 서브플롯이라는 것을 사용한다. 서브플롯은 그래프가 그려질 위치를 격자형으로 지정하는데, plt.subplot(nrow,ncol,pos) 식으로 사용한다.

nrow,ncol은 그래프를 그린 plain의 크기를 지정하는데, 3,2면 3줄로, 가로는 2칸으로 된 그래프 plain 설정한다. 그리고 마자막 pos는 몇번째 plain에 그래프를 그릴지 지정하는데, 아래와 같이 상단에서 부터 우측,아래 방향으로 1,2,3,4,5,6 순서가 된다.


1

2

3

4

5

6



아래 그림은 2,1 크기의 plain 을 만들어놓고 그래프를 위,아래로 두개를 그리는 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.subplot(2,1,1)
plt.plot(x,y1)
plt.subplot(2,1,2)
plt.plot(x,y2)
plt.show()



아래 그림은 한줄의 두칸 plain을 만들어놓고, 좌우에 두개의 그래프를 그린 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.subplot(1,2,1)
plt.plot(x,y1)
plt.subplot(1,2,2)
plt.plot(x,y2)
plt.show()




다음은 2x2 plain으로 4개의 그래프를 그린 예제이다.


from matplotlib import pyplot as plt
import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.subplot(2,2,1)
plt.plot(x,y1)
plt.subplot(2,2,2)
plt.plot(x,y2)
plt.subplot(2,2,3)
plt.plot(x,y3)
plt.subplot(2,2,4)
plt.plot(x,y4)
plt.show()


그래프 사이즈

그래프를 크게 그리고 싶을때 그래프 자체의 크기를 변경할 수 있는데, plt.figure를 이용하여 figsize=(가로,세로)를 인자로 주면 그래프가 그려질 전체 그림의 크기를 조절할 수 있다. 아래는 20x5 크기로 그래프를 그릴 크기를 지정하는 예제이다.


import numpy as np

x = np.arange(1,10)
y1 = x*5
y2 = x*1
y3 = x*0.3
y4 = x*0.2

plt.figure(figsize=(20,5))
plt.subplot(2,2,1)
plt.plot(x,y1)
plt.subplot(2,2,2)
plt.plot(x,y2)
plt.subplot(2,2,3)
plt.plot(x,y3)
plt.subplot(2,2,4)
plt.plot(x,y4)
plt.show()




지금까지 간단하게 matplotlib를 이용하여 기본 그래프를 그리는 방법에 대해서 알아보았다. 다음글은 바차트,히스토그램등 다양한 그래프 타입에 대해서 알아본다.


저작자 표시 비영리
신고


한시간에 만드는 대용량 로그 수집 시스템

조대협 (http://bcho.tistory.com)


정정 및 참고 내용

2017.1.24 몇가지 내용을 정정합니다.

https://cloud.google.com/logging/quota-policy 를 보면 스택드라이버 로깅에 쿼타 제한이 초당 500건/계정으로 잡혀있어서. 일반적인 경우는 최대 500 TPS의 성능을 낼 수 있습니다. 그 이상의 성능이 필요하면, 여러 계정을 사용해야 합니다 또는 구글에 별도의 쿼타 증설 요청을 해야 합니다.

하루에, 최대 2천5백만건의 로그를 하나의 프로젝트를 통해서 수집이 가능합니다.


또한 프리티어의 경우에는 한달에 로그를 5GB  까지 수집이 가능한데, 이게 넘으면 로그가 더이상 수집되지 않습니다. 그래서 아래 내용 처럼 빅쿼리로 Export를 해서 로그가 5GB 이상 스택드라이버에 저장되지 않도록 해야 합니다. (차기전에 데이타를 퍼나르는)

애플리케이션 로그 이외에도, VM 로그등도 이 5GB의 용량을 공유하기 때문에, VM 로그등도 차기전에 GCS로 퍼 나르거나 또는 구글 Support 티켓을 통하여 애플리케이션 로그 이외의 로그를 수집하지 않도록 별도 요청해야 합니다. (로그 저장 용량에 대해서 비용을 지불하면, 이런 제약은 없음)


백앤드 시스템에서 중요한 컴포넌트중의 하나가, 클라이언트로 부터 로그를 수집 및 분석하는 시스템이다.

오늘 설명할 내용은 500 TPS (Transaction Per Sec)가 넘는 대용량 로그 수집 및 분석 시스템을  managed 서비스를 이용하여, 쉽고 빠르게 구축할 수 있는 방법에 대해서 소개하고자한다.


일반적인 로그 수집 및 분석 시스템 아키텍쳐

일반적으로 클라이언트에서 로그를 수집하여 분석 및 리포팅 하는 시스템의 구조는 다음과 같다.


  • 앞단의  API 서버가 로그를 클라이언트로 부터 수집하고 데이타를 정재한다.

  • 로그 저장소가 순간적으로 많은 트래픽을 감당할 수 없는 경우가 많기 때문에, 중간에 Message Q를 넣어서, 들어오는 로그를 Message Q에 저장하여 완충을 한다.

  • 이 Message Q로 부터 로그를 Message Consumer가 순차적으로 읽어서 Log Storage에 저장한다.

  • 저장된 로그는 Reporting 툴을 이용하여 시각화 한다.


이런 구조 이외에도 API 서버에서 파일로 로그를 저장한 후,  Fluentd나, LogStash 등의 로그 수집기를 이용하는 방법등 다양한 아키텍쳐가 존재한다.


이런 시스템을 구축하기 위한 일반적인 솔루션들은 다음과 같다.


컴포넌트

솔루션


API 서버

node.js, ruby, php 등 일반적인 웹서버


Message Q

Rabbit MQ와 같은 일반적인 큐
Kafaka 와 같은 대량 큐

AWS SQS나 구글 Pub/Sub 같은 클라우드 큐


Message Consumer

Multi Thread(or Process) + Timer를 조합하여 메세지를 폴링 방식으로 읽어오는 애플리케이션 개발


Log Storage

Hadoop, HBase 와 같은 하둡 제품

Drill,Druid와 같은 SQL 기반 빅데이타 플랫폼

Elastic Search


Reporting

Zeppeline, Jupyter 와 같은 노트북류

Kibana



구조나 개념상으로는 그리 복잡한 시스템은 아니지만, 저러한 솔루션을 모두 배우고, 설치하고 운영하는데 시간이 들고, 각각의 컴포넌트를 구현해야하기 때문에 꽤나 시간이 걸리는 작업이다.


그러면 이러한 로그 수집 및 분석 작업을 클라우드 서비스를 이용하여 단순화 할 수 없을까?

스택 드라이버

스택 드라이버는 구글 클라우드의 모니터링, 로깅 및 애플리케이션 성능 분석등 모니터링 분야에서 다양한 기능을 제공하는 서비스 이다.

그중에서 스택드라이버 로깅은 구글 클라우드나 아마존 또는 기타 인프라에 대한 모니터링과, Apache, MySQL과 같은 써드 파티 미들웨어에 대한 로그 수집 및 모니터링을 지원하는데, 이 외에도, 사용자가 애플리케이션에서 로깅한 데이타를 수집하여 모니터링할 수 있다.



스택 드라이버와 빅쿼리를 이용한 로그 수집 분석 시스템 구현

스택 드라이버 로깅의 재미있는 기능중 하나는 로그 EXPORT 기능인데, 로그 데이타를 구글 클라우드 내의 다른 서비스로 로그 데이타를 내보낼 수 있다.


  • GCS (Google Cloud Storage)로 주기적으로 파일로 로그 데이타를 내보내거나

  • Pub/Sub이나 Big Query로 실시간으로 데이타를 내보낼 수 있다.


그렇다면 스택 드라이버를 통해서 빅쿼리에 로그 데이타를 직접 저장한다면 복잡한 Message Q나, Message Consumer 등의 구현도 불필요하고, 로그 저장도 복잡한 오픈 소스를 이용한 개발이나 운영도 필요 없이, 매니지드 서비스인 빅쿼리를 이용하여 간략하게 구현할 수 있다.

스택 드라이버 로깅을 이용한 로그 수집 시스템 구현


스택 드라이버 애플리케이셔 로깅 기능을 이용하여 클라이언트로 부터 로그를 수집하여 분석하는 시스템의 아키텍쳐를 그려 보면 다음과 같다.




API 서버를 이용하여 클라이언트로 부터 로그를 수집하고, API 서버는 스택 드라이버 로깅 서비스로 로그를 보낸다. 스택 드라이버 로깅은 Export 기능을 이용하여, 수집된 로그를 실시간으로 빅쿼리로 전송한다. 빅쿼리에 저장된 로그는 구글 데이타 스튜디오 (http://datastudio.google.com)이나 제플린, 파이썬 주피터 노트북과 같은 리포팅 도구에 의해서 시각화 리포팅이 된다.

API 서버쪽에서 스택 드라이버 로깅으로 로그를 보내는 부분을 살펴보자

아래는 파이썬 Flask 를 이용하여 로그를 스택 드라이버로 보내는 코드이다.


import uuid

from flask import Flask

from google.cloud import logging


app = Flask(__name__)

logging_client = logging.Client()

tlogger = logging_client.logger(‘my-flask-log’)

slogger = logging_client.logger('struct_log')

@app.route('/')

def text_log():

   logstring = "This is random log "+ str(uuid.uuid4())

   tlogger.log_text(logstring)

   return logstring


@app.route('/slog')

def struct_log():

   struct  = "This is struct log "+ str(uuid.uuid4())

   slogger.log_struct({

               'name':'myterry',

               'text':struct,

               'key' : 'mykey'})      

   return struct


if __name__ == '__main__':

   app.run('0.0.0.0',7001)

   

google.cloud 패키지에서 logging 모듈을 임포트한 다음에, 로깅 클라이언트로 부터

tlogger = logging_client.logger(‘my-flask-log’)

slogger = logging_client.logger('struct_log')

로 각각 “my-flask-log”와 “struct_log”라는 이름을 가지는 logger 둘을 생성한다.

(뒤에서 언급하겠지만, 이 로거 단위로, 로그를 필터링 하거나, 또는 이 로거 단위로 로그 메세지를 다른 시스템으로 export 할 수 있다.)


다음, 로그를 쓸 때는 이 logger를 이용하여 로그를 써주기만 하면 된다.

   tlogger.log_text(logstring)

는 텍스트로 된 한줄 로그를 쓰는 명령이고,

   slogger.log_struct({

               'name':'myterry',

               'text':struct,

               'key' : 'mykey'})  

는 JSON과 같이 구조화된 계층 구조를 로그로 쓰는 방식이다.

이렇게 개발된 로그 수집용 API 서버의 코드는 직접 VM을 만들어서 Flask 서버를 깔고 인스톨 해도 되지만  앱앤진을 사용하면 코드만 배포하면, Flask 서버의 관리, 배포 및 롤백, 그리고 오토 스케일링등 모든 관리를 자동으로 해준다. 앱앤진을 이용한 코드 배포 및 관리에 대한 부분은 다음 문서 http://bcho.tistory.com/1125 를 참고 하기 바란다.

스택 드라이버에서 로그 확인

코드가 배포되고, 실제로 로그를 기록하기 시작했다면 스택 드라이버에 로그가 제대로 전달 및 저장되었는지 확인해보자. 구글 클라우드 콘솔에서 스택 드라이버 로깅으로 이동한 다음 아래 그림과 같이 리소스를 “Global” 을 선택한 후, 앞에 애플리케이션에서 남긴 “my-flask-log”와 “struct-log” 만을 선택해서 살펴보자





다음과 같이 로그가 출력되는 것을 확인할 수 있으며, struct_log의 예를 보면 로그의 내용은 time_stamp  와 프로젝트 정보와 같은 부가 정보와 함께, 애플리케이션에서 남긴 로그는 “jsonPayload” 앨리먼트 아래에 저장된것을 확인할 수 있다.



빅쿼리로 Export 하기

스택 드라이버로 로그가 전달되는 것을 확인했으면, 이 로그를 빅쿼리에 저장해보자. Export 기능을 이용해서 가능한다. 아래와 같이 스택 드라이버 로깅 화면에서 상단의 “CREATE EXPORT”  버튼을 누른다.

다음 리소스 (Global)과 로그 (struct_log)를 선택한 다음에,



Sink Name에 Export 이름을 적고 Sink Service는 BigQuery를 선택한다. 다음으로 Sink Destination에는 이 로그를 저장할 Big Query의 DataSet 이름을 넣는다.

마지막으로 Create Sink를 누르면, 이 로그는 이제부터 실시간으로 BigQuery의 structlog라는 데이타셋에 저장이 되면 테이블명은 아래 그림과 같이 strcut_log_YYYYMMDD와 같은 형태의 테이블로 생성이 된다.




테이블 프리뷰 기능을 이용하여 데이타가 제대로 들어갔는지 확인해보자. 아래와 같이 위의 코드에서 저장한 name,key,text는 테이블에서 jsonPayload.name, jsonPayload.key, jsonPayload.text 라는 필드로 각각 저장이 되게 된다.



빅쿼리는 실시간으로 데이타를 저장할때는 초당 100,000건까지 지원이 가능하기 때문에 이 시스템은 100,000TPS 까지 지원이 가능하고, 만약에 그 이상의 성능이 필요할때는 로그 테이블을 나누면(Sharding) 그 테이블 수 * 100,000 TPS까지 성능을 올릴 수 있다. 즉, 일별 테이블을 10개로 Sharding 하면, 초당 최대 1,000,000 TPS를 받는 로그 서비스를 만들 수 있으며, 이 테이블 Sharding은 빅쿼리 테이블 템플릿을 사용하면 쉽게 설정이 가능하다. (정정 빅쿼리는 100K TPS를 지원하나, 스택 드라이버가 500 TPS로 성능을 제한하고 있음)


이렇게 저장된 로그는 빅쿼리를 지원하는 각종 리포팅 툴을 이용하여 시각화가 가능하다.

시각화 도구는

을 참고하기 바란다.


이렇게 간단하게, 코드 몇줄과 설정 몇 가지로 100,000 500 TPS 를 지원하는 로그 서버를 만들어 보았다.

스택 드라이버를 이용한 로그 분석 수집 시스템의 확장

이 외에도 스택 드라이버는 빅쿼리뿐 아니라 다른 시스템으로의 연동과 매트릭에 대한 모니터링 기능을 가지고 있어서 다양한 확장이 가능한데, 몇가지 흥미로운 기능에 대해서 살펴보도록 하자.


실시간 스트리밍 분석 및 이벤트 핸들링

스택 드라이버 로깅의 Export 기능은, 하나의 로그를 여러 연동 시스템으로 Export를 할 수 있다. 앞에서는 빅쿼리로 로그를 Export 하였지만, 같은 Log를 Dataflow에 Export 하였을 경우, 로그 데이타를 실시간 스트림으로 받아서, 실시간 스트리밍 분석이 가능하다.


구글 데이타 플로우에 대한 설명은 아래 링크를 참고하기 바란다.


또는 실시간 스트리밍이 아니라, 로그 메세지 하나하나를 받아서 이벤트로 처리하고자 할 경우, Pub/Sub 큐에 넣은 후에, 그 뒤에 GAE또는 Cloud function (https://cloud.google.com/functions/) 에서 메세지를 받는 구조로 구현이 가능하다.


로그 모니터링

스택 드라이버 로깅은 단순히 로그를 수집할 뿐만 아니라 훨씬 더 많은 기능을 제공한다.

앞에서 스택 드라이버 로깅을 이용한 로그 수집 시스템을 만드는 방법을 알아보았지만, 부가적인 몇가지 기능이 같이 제공되는데 다음과 같다.

필터를 이용한 특정 로그 핸들링

logger를 통해서 수집된 로그에는 필터를 걸어서 특정 로그만 모니터링할 수 있다.

예를 들어서 text 문자열에 “error” 가 들어간 로그나, latency가 1초이상인 로그와 같이 특정 로그만을 볼 수 있다.

다음은 jsonPayload.text 로그 문자열에 “-a”로 시작하는 문자열이 있는 로그만 출력하도록 하는 것이다.



이 기능을 사용하면, 로그 메세지에서 특정 로그만 쉽게 검색하거나, 특정 에러 또는 특정 사용자의 에러, 특정 ErrorID 등으로 손쉽게 검색이 가능해서 로그 추적이 용이해진다.

매트릭 모니터링

다음은 메트릭이라는 기능인데, 로그를 가지고 모니터링 지표를 만들 수 있다.

예를 들어 하루 발생한 에러의 수 라던지, 평균 응답 시간등의 지표를 정의할 수 있다.

이렇게 정의된 지표는 대쉬보드에서 모니터링이 가능하고 또는 이러한 지표를 이용하여 이벤트로 사용할 수 있다. 응답시간이 얼마 이상 떨어지면 오토 스케일링을 하게 한다던가 또는 이메일로 관리자에게 ALERT을 보낸다던가의 기능 정의가 가능하다.


매트릭 생성

지표 정의는 로그 화면에서 필터에 로그 검색 조건을 넣은 채로, CREATE METRIC 버튼을 누르면 사용자가 지표를 매트릭으로 정의할 수 있다.



대쉬 보드 생성


이렇게 정의된 매트릭은 스택 드라이버 대쉬 보드 화면에서 불러다가 그래프로 시각화가 가능한데, 다음 그림은 struct_log의 전체 수와를 나타내는 매트릭과, struct_log에서 log text에 “-a”를 포함하는 로그의 수를 나타내는 메트릭을 정의하여 차트로 그리는 설정이다.



위에 의해서 생성된 차트를 보면 다음과 같이 전체 로그 수 대비 “-a”  문자열이 들어간 로그의 수를 볼 수 있다.


지금까지 스택드라이버 로깅과 빅쿼리를 이용하여 간단하게 대용량 로그 수집 서버를 만드는 방법을 살펴보았다. 두개의 제품을 이용해서 로그 수집 시스템을 구현하는 방법도 중요하지만, 이제는 개발의 방향이 이러한 대용량 시스템을 구현하는데, 클라우드 서비스를 이용하면 매우 짧은 시간내에 개발이 가능하고 저비용으로 운영이 가능하다. 요즘 개발의 트랜드를 보면 이렇게 클라우드 서비스를 이용하여 개발과 운영 노력을 최소화하고 빠른 개발 스피드로 개발을 하면서, 실제로 비지니스에 필요한 기능 개발 및 특히 데이타 분석 쪽에 많이 집중을 하는 모습이 보인다.


단순히 로그 수집 시스템의 하나의 레퍼런스 아키텍쳐에 대한 이해 관점 보다는 전체적인 개발 트렌드의 변화 측면에서 한번 더 생각할 수 있는 계기가 되면 좋겠다.


저작자 표시 비영리
신고

수포자를 위한 딥러닝

#1 - 머신러닝의 개요

조대협(http://bcho.tistory.com)

들어가기에 앞서서 

몇년전부터 빅데이타와 머신러닝이 유행하면서 이분야를 공부해야겠다고 생각을 하고 코세라의 Andrew.NG 교수님의 강의도 듣고, 통계학 책도 보고, 수학적인 지식이 부족해서 고등학교 수학 참고서도 봤지만, 도저히 답이 나오지 않는다. 머신 러닝에 사용되는 알고리즘은 복잡도가 높고 일반적인 수학 지식으로 이해조차 어려운데, 실제 운영 시스템에 적용할 수 있는 수준의 알고리즘은 석박사급의 전문가적인 지식이 아니면 쉽게 만들 수 없는 것으로 보였다. 예를 들어 인공지능망(뉴럴네트워크:Neural Network) 알고리즘에 대한 원리는 이해할 수 있지만, 실제로 서비스에 사용되는 알고르즘을 보니 보통 60~90개의 계층으로 이루어져 있는데, (그냥 복잡하다는 이야기로 이해하면 됨) 이런 복잡한 알고리즘을 수학 초보자인 내가 만든다는 것은 거의 불가능에 가까워 보였고, 이런것을 만들기 위해서 몇년의 시간을 투자해서 머신러닝 전문가로 커리어패스를 전환할 수 는 있겠지만 많은 시간과 노력이 드는데 반해서, 이미 나에게는 소프트웨어 개발과 백앤드 시스템이라는 전문분야가 있어싸.

그래도 조금씩 보다보니, 머신 러닝에서 소개되는 알고리즘은 주로 사용되는 것은 약 20개 내외였고, 이미 다 정형화 되어 있어서 그 알고리즘을 만들어내기보다는, 가져다 쓰기만 하면 될 것 같다는 느낌이 들었다. 아직 많이 보지는 못했지만, 실제로 머신 러닝 기반의 시스템들은 나와 있는 알고리즘을 코드로 옮겨서 운영 환경에 올리는 경우가 대부분이었다.

비유를 하자면 우리가 복잡한 해쉬 리스트나, 소팅 알고리즘을 모르고도 간단하게 프로그래밍 언어에 있는 라이브러리를 가져다 쓰는 것과 같은 원리라고나 할까? 그래서, 완벽하게 이해하고 만들기 보다는 기본적인 원리를 파악하고 이미 공개된 알고리즘과 특히 레퍼런스 코드를 가져다가 운영환경에다 쓸 수 있는 정도의 수준을 목표로 하기로 했다.

이제 아주 아주 초보적인 수준의 이해를 가지고, 구글의 텐서플로우 기반으로 머신러닝과 딥러닝을 공부하면서 내용을 공유하고자 한다. 글을 쓰는 나역시도 수포자이며 머신러닝에 대한 초보자이기 때문에, 설명이 부족할 수 도 있고, 틀린 내용이 있을 수 있음을 미리 알리고 시작한다. (틀린 내용은 알려주세요)

머신러닝

머신 러닝은 데이타를 기반으로 학습을 시켜서 몬가를 예측하게 만드는 기법이다.

통계학적으로는 추측 통계학 (Inferential statistics)에 해당하는 영역인데, 근래에 들어서 알파고와 같은 인공지능이나 자동 주행 자동차, 로봇 기술등을 기반으로 주목을 받고 있다.



<그림. 구글의 자동 주행 자동차>


간단한 활용 사례를 보면

  • 학습된 컴퓨터에 의한 이메일 스팸 필터링

  • 편지지의 우편번호 글자 인식

  • 쇼핑몰이나 케이블 TV의 추천 시스템

  • 자연어 인식

  • 자동차 자율 주행

등을 볼 수 있다.


이러한 시나리오는 지속적인 샘플 데이타를 수집 및 정제하고 지속적으로 알고리즘을 학습해나감에 따라서 최적의 알고리즘을 찾아나가도록 한다.

쇼핑몰의 추천 시스템의 경우 사용자의 구매 패턴을 군집화하여 유사한 패턴을 찾아냄으로써 적절한 상품을 추천하는데, 예를 들어 30대 남성/미혼/연수입 5000만원/차량 보유한 사용자가 카메라,배낭등을 구매했을 경우 여행 상품을 구매할 확률이 높다는 것을 학습하였을때, 이러한 패턴의 사용자에게 여행 상품을 추천해주는 것과 같은 답을 제공할 수 있다.

지도 학습과 비지도 학습

머신러닝은 학습 방법에 따라서 지도 학습 (Supervised Learning)과 비지도 학습 (Unsupervised Learning)으로 분류될 수 있다.

지도 학습 (Supervised Learning)



예를 들어 학생에게 곱셈을 학습 시킬때,

“2*3=6이고, 2*4=8이야, 그러면 2*5= 얼마일까? “

처럼 문제에 대한 정답을 주고 학습을 한 후, 나중에 문제를 줬을때 정답을 구하도록 하는 것이 지도 학습 (Supervised Learning)이다.

비지도 학습 (Unsupervised learning)

반대로 비지도 학습은 정답을 주지않고 문제로만 학습을 시키는 방식을 비지도 학습이라고 한다.

예를 들어 영화에 대한 종류를 학습 시키기 위해서, 연령,성별과 영화의 종류 (액션, 드라마, SF)를 학습 시켰을때, 이를 군집화 해보면 20대 남성은 액션 영화를 좋아하고 20대 여성은 드라마 영화를 좋아 하는 것과 같은 군집된 결과를 얻을 수 있고, 이를 기반으로 20대 남성이 좋아하는 영화의 종류는 유사한 군집의 결과인 ”액션 영화" 라는 답을 내게 되낟.


여기서 문제에 대한 답을 전문적인 용어로 이야기 하면 라벨된 데이타 (Labeled data)라고 한다.


머신러닝의 대표적인 문제 Regression과 Classification 문제

머신러닝을 이용해서 해결하는 문제의 타입은 크게 regression과 classification 문제 두가지로 대표가 된다.

Classification

Classification은 입력값에 대한 결과값이 연속적이지 않고 몇개의 종류로 딱딱 나눠서 끊어지는 결과가 나오는 것을 이야기 한다. 예를 들어 종양의 크기가 0.3cm 이상이고 20대이면, 암이 양성, 또는 종양의 크기가 0.2cm 이하이고 30대이면, 암이 음성과 같이 결과 값이 ”양성암/음성암"과 같이 두개의 결과를 갖는 것이 예가 된다.


<종양 크기에 따른, 암의 양성/음성 여부에 대한 그래프>

또 다른 예로는 사진을 업로드 했을때, 사진의 물체를 인식할때 ”이사진은 개이다.” “이사진은 고양이이다.” 처럼 특정 종류에 대한 결과값이 나오는 것 역시 Classification 문제로 볼 수 있다.


Regression

Regression 문제는 결과값이 연속성을 가지고 있을때 Regression 문제라고 한다. 즉 택시의 주행거리에 따른 요금과 같은 문제인데, 변수 택시 주행 거리에 대해서, 결과 택시 값이 기대 되는 경우로 변수와 결과값이 연속적으로 이루어 지는 경우를 말한다.


<그림. 주행 거리에 따른 택시비 >

머신 러닝과 딥러닝

이러한 머신 러닝의 분야중, 인공 지능망 (뉴럴 네트워크 / Artificial neural network)라는 기법이 있는데, 사람의 뇌의 구조를 분석하여, 사람 뇌의 모양이 여러개의 뉴런이 모여서 이루어진것 처럼, 머신 러닝의 학습 모델을 두뇌의 모양과 같이 여러개의 계산 노드를 여러 층으로 연결해서 만들어낸 모델이다.


<알파고에 사용된 뉴럴네트워크 구조>


이 모델은 기존에 다른 기법으로 풀지 못하였던 복잡한 문제를 풀어낼 수 있었지만, 계층을 깊게 하면 계산이 복잡하여 연산이 불가능하다는  이유로 그간 관심을 가지고 있지 못했다가

캐나다의 CIFAR (Canadian Institute for Advanced Research) 연구소에서 2006년에 Hinton 교수가 ”A fast learning algorithm for deep belifef nets” 논문을 발표하게 되는데,  이 논문을 통해서 뉴럴네트워크에 입력하는 초기값을 제대로 입력하면 여러 계층의 레이어에서도 연산이 가능하다는 것을 증명하였고,  2007년 Yosua Bengio 라는 분이 ”Greedy Layer-Wise training of deep network” 라는 논문에서 깊게 신경망을 구축하면 굉장히 복잡한 문제를 풀 수 있다는 것을 증명해냈다.


이때 부터 뉴럴네트워크가 다시 주목을 받기 시작했는데,  이때 뉴럴 네트워크라는 모델을 사람들에게 부정적인 인식이 있었기 때문에, 다시 이 뉴럴 네트워크를 딥러닝 (Deep learning)이라는 이름으로 다시 브랜딩을 하였다.

그 이후에 IMAGENET 챌린지라는 머신러닝에 대한 일종의 컨테스트가 있는데, 이 대회는 이미지를 입력하고 머신 러닝을 통해서 컴퓨터가 이미지의 물체등을 인식할 수 있게 하는 대회로, 머신 러닝 알고리즘의 정확도를 측정하는 대회이다. 이 대회에서 2012년   Hinton 교수님 랩에 있던 Alex 라는 박사 과정의 학생이 딥러닝 기반의 머신 러닝 알고리즘으로 혁신 적인 결과를 내었고 지금은 이 딥러닝이 머신 러닝의 큰 주류중의 하나로 자리잡게 되었다.


<이미지넷에서 사용되는 이미지>



저작자 표시 비영리
신고

구글 빅쿼리와 데이타 플로우를 이용한 노트7 소셜 반응 분석


조대협 (http://bcho.tistory.com)




이 글은 개인이 개인적인 취미와 빅데이타 분석 플랫폼 기술 공유를 위해서 데이타 분석용 시나리오로 소셜 트랜드 분석을 선택하였고, 그중 노트7을 하나의 예로 선택한 내용이기 때문에, 이러한 분석 내용이 악의적으로 활용되거나 해석되기를 바라지 않으며 이 글의 라이센스는 본인이 저작권을 소유하고 있으며 출처를 밝히는 인용을 포함하여 모든 인용 및 내용에 대한 활용을 금합니다.


구글의 빅데이타 플랫폼인 빅쿼리와(https://cloud.google.com/bigquery), 데이타플로우((https://cloud.google.com/dataflow) 에 대해서 테스트를 하던중, 아무래도 데이타 분석 애플리케이션을 만들어보려면 실제 시나리오를 가지고 분석을 해보는게 가장 적절할것 같아서, 트위터에서 노트7에 대한 데이타를 수집해서 분석해봤다.


전체적인 시나리오는 note7 으로 태깅된 트위터 피드를 읽어서 30초 단위로 실시간 분석을 하는데, 수집된 트윗에서 구글의 자연어 분석 API를 이용하여(https://cloud.google.com/natural-language/) 명사와 형용사만 필터링을 해서 수집하는 방식으로 진행하였다.


다음은 9/12~9/19일까지 수집한 데이타에 대한 통계이다.



가장 위의 파란선이 recall이라는 단어로 12일 부터 꾸준히 등장해서 16일에 피크를 치고 계속해서 내용이 나타나고 있다. 17일에 피크를 친 빨간선은 S7이라는 단어인데, 왜 노트7 트윗에 S7이 등장했는지는 약간 미지수이다. 일단위 보다는 시간단위 분석이 필요하고 각 일자별로 주요 지표를 상세하게 볼필요가 있다고 생각해서 제플린 노트북을 이용하여 빅쿼리에 수집된 데이타를 분석을 해봤다.


이 그래프는 시간대별 명사들의 카운트인데, 시간당 1500을 넘는 시점이 9/12 16:00, 9/12 23:00, 9/14 01:00 등으로 보인다. 일자별이나 모든 시간대별로 트윗을 분석하는 것보다, 몬가 이슈가 있으면 시간당 트윗이 급격하게 올라가기 때문에, 트윗에서 명사 카운트가 1500을 넘는 시간만 분석해봤다.




이것이 분석 결과인데, 그래프로 보기가 어려우니 표로 표현해서 보자. 주요 날짜별로 주요 키워드를 3개씩만 검출해보면 다음과 같은 결과가 나온다.


먼저 9월12일16시에는  flight와 india라는 단어가 많이 잡혔는데, 이날은 인도에서도 항공기에 노트7을 가지고 탑승을 못하도록 공지한 시간때로 보인다.


http://mashable.com/2016/09/12/samsung-galaxy-note7-flight-ban-india/#nTLbsAiVWqqr


다음 23시에 boy라는 단어가 주로 잡혔는데, 이날은 노트7으로 인하여 6살 어린이 어린이(boy)가 상처를 입은 사건이 발생한 때이다.


https://www.cnet.com/news/exploding-samsung-galaxy-phone-burns-6-year-old/


다음으로 14일과 16일 로그를 분석해보자



14일 1시에는 software, update, explosions라는 단어가 많이 검출되었는데,

http://money.cnn.com/2016/09/14/technology/samsung-galaxy-note-7-software-update-battery-fires/

소프트웨어 업데이트를 통해서 폭발을 막겠다고 발표한 시점이다.


16일은 미국 정부가 노트7의 리콜을 명령한 날로 5시와6시에 goverment와 recall 등의 단어가 집중적으로 올라왔다.


http://www.forbes.com/sites/shelbycarpenter/2016/09/16/government-official-recall-samsung-galaxy-note-7/#351a35c46e53



18일에는 report와 lawsuit (소송) 이라는 단어가 많이 검출되었는데, report는 찾아보니


http://bgr.com/2016/09/18/galaxy-note-7-explosion-burns-samsung-lawsuit/

로이터 통신에서 16일날 언급한 플로리다 남성이 노트7으로 화상을 입고 소송을 한 내용이 2일의 시차를 두고18일에 급격하게 퍼졌다.

그러다가 아래 표와 같이 19일에는 florida, man, pocket,lawsuit 와 같이 플로리다, 남성,주머니,소송 등의 단어가 검출되면서 18일의 내용이 점점 더 구체화 되어가는 과정을 보여주었다.

사실 노트7을 분석 아이템으로 삼은것은 노트7이 출시되었을때, 꽤나 완성도가 높았고 그래서 재미있는 반응도 꽤나 많을것으로 기대했다. 그래서 굳이 다른 키워드 보다 노트7을 고른거였고, 지금은 떠났지만 한때 몸 담았던 회사였기도 했기 때문에 잘 되기를 바라는 마음이었는데, 분석 결과를 지켜보면서 씁쓸한 마음이 이내 떠나지를 않았다.


이 분석을 통해서 얻고자 한것은, 이 시스템을 구축하는데 혼자서 5~6시간 정도의 시간밖에 걸리지 않았다. 예전이라면 이런 분석 시스템을 구축하려면 몇명은 몇달은 투자해야 할텐데, 이제는 혼자서도 이러한 빅데이타 분석이 가능하다는 메세지와 함께, 실시간 분석 시스템 구현 기술을 습득하기 위한 개인 작업이다.

의외로 데이타에서 많은 인사이트를 얻어낼 수 있었고 추후에 이 분석에 대한 모든 코드를 공개할 예정인데, 다른 사람들도 유용하게 사용할 수 있는 정보 공유의 목적이다.

.

저작자 표시 비영리
신고

데이타 플로우 프로그래밍 모델의 이해


조대협 (http://bcho.tistory.com)


앞의 글에서 스트리밍 프로세스의 개념과, 데이타 플로우의 스트리밍 처리 개념에 대해서 알아보았다. 그렇다면 실제로 이를 데이타 플로우를 이용해서 구현을 하기 위해서는 어떤 컴포넌트와 프로그래밍 모델을 사용하는지에 대해서 알아보자.


구글 데이타 플로우 프로그래밍 모델은 앞에서 설명한 바와 같이, 전체 데이타 파이프라인을 정의하는 Pipeline, 데이타를 저장하는 PCollections, 데이타를 외부 저장소에서 부터 읽거나 쓰는 Pipeline I/O, 그리고, 입력 데이타를 가공해서 출력해주는 Transforms , 총 4가지 컴포넌트로 구성이 되어 있다.


이번 글에서는 그 중에서 데이타를 가공하는  Transforms 컴포넌트들에 대해서 알아본다.

Transforms

Transforms는 데이타를 어떻게 가공하느냐에 따라서 다음 그림과 같이 세가지 형태로 분류된다.

Element-Wise

개별 데이타 엘리먼트를 단위로 연산을 수행하는 것을 Element-Wise Transform이라고 한다.

ParDo 함수가 이에 해당하며, 하나의 데이타를 입력 받아서, 연산을 한 후, 하나의 출력을 내보내는 연산이다.

ParDo 함수는 DoFn으로 정의된 연산 로직을 병렬로 여러개의 프로세스와 쓰레드에서 나눠서 처리를 해준다.


DoFn 클래스 는 다음과 같은 포맷으로 정의한다.


static class MyFunction extends DoFn<{입력 데이타 타입},{출력 데이타 타입}>{

     @Override

     public void processElement(ProcessContext c) {

       // do Something

     }

}


DoFn 클래스를 상속 받아서 클래스를 정의하고, 클래스 정의시 제네릭 타입 (Generic type)으로, 첫번째는 입력 데이타 타입을 두번째는 출력 데이타 타입을 정의한다.

그리고 processElement 함수를 오버라이드(Override)해서, 이 함수안에 연산 로직을 구현한다.

processElement는 ProcessContext를 인자로 받는데, 이 Context에는 입력 데이타를 포함하고 있다.

입력 데이타를 꺼내려면 c.element()라는 메서드를 이용하면, 입력 데이타 타입으로 정의된 입력 데이타를 꺼낼 수 있다.

데이타를 처리한 다음에 파이프라인상의 다음 컴포넌트로 데이타를 보내려면 c.output ({출력 데이타} ) 형태로 정의를 해주면 된다.


이렇게 정의된 DoFn함수는  ParDo를 이용하여 파이프라인 상에서 병렬로 실행될 수 있다. ParDo는, 파이프라인상에서 .apply 메서드를 이용하여 적용한다.


그러면 실제로 어떻게 적용이 되는지 다음 예제를 보자.

   p.apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))


String 인자를 입력으로 받아서, String 인자로 출력을 하는 DoFn 함수를 정의하였다.

processElement 부분에서, c.element()로 String 입력 값을 읽은 후, toUpperCase() 함수를 적용하여 대문자로 변환을 한 후, c.output을 이용하여 다음 파이프라인으로 출력 데이타를 넘겼다.


조금 더 디테일한 예제를 보자

p.apply(Create.of("key1,Hello", "key2,World","key1,hello","key3,boy","key4,hello","key2,girl"))

.apply(ParDo.named("Parse").of(new DoFn<String, KV<String,String>>() {

@Override

public void processElement(ProcessContext c) {

StringTokenizer st = new StringTokenizer(c.element(),",");

String key = st.nextToken();

String value = st.nextToken();


KV<String,String> outputValue =  KV.of(key,value);

c.output(outputValue);

}

}))

Create.of를 이용하여 “Key.Value” 문자열 형태로 데이타를 생성한 후, 이 문자열을 읽어서, DoFn<String,KV<String,String>> 클래스에서 이를 파싱하여, 문자열을 키밸류 데이타형인 KV 데이타 형으로 변환하여 리턴하는 예제이다. 아래 개념도를 보자


입력 값은 String 데이타 타입으로 “Key.Value”라는 형태의 문자열을 받는다.

DoFn에서 처리한 출력 값은 KV 형으로 KV 데이타 타입은 키와 값을 가지고 있는데, 키와 값의 타입도 제네릭 타입으로 키는 String, 값은 String 타입으로 정의하였다. 입력된 “Key.Value” 문자열은 “.” 전후로 분리가 되서, “.” 좌측은 키로, 우측은 값으로 해서, KV에 각각 들어간다.

processElement를 보면, c.element를 이용하여 String 문자열을 꺼낸 후, StringTokenizer를 이용하여 “.”을 분류 문자로 정의해서 파싱한다. 첫번째를 키로 저장하고, 두번째를 값으로 저장한다.

KV<String,String> outputValue =  KV.of(key,value)

를 이용하여, KV 객체를 생성한 후, c.output(outputValue); 을 이용하여 다음 파이프라인 컴포넌트로 값을 전달한다.


시스템내에서 수행되는 방법은 다음과 같이 된다. ParDo에 의해서 DoFn 클래스가 여러개의 워커에 의해서 분산 처리가된다.

어그리게이션(Aggregation)

어그리게이션 값을 특정 키 값을 이용하여 데이타를 그룹핑을 하는 개념이다.

이러한 어그리게이션 작업은 내부적으로 셔플링(Shuffling)이라는 개념을 이용해서 이루어지는데, 키별로 데이타를 모으거나 키별로 합산등의 계산을 하기 위해서는, 키별로 데이타를 모아서 특정 워커 프로세스로 보내야 한다.

ParDo를 이용하여 병렬 처리를 할 경우, 데이타가 키값에 상관 없이 여러 워커에 걸쳐서 분산되서 처리되기 때문에, 이를 재 정렬해야 하는데, 이 재 정렬 작업을 셔플링이라고 한다.


아래 그림을 보자, 파이프라인 상에서 첫번째 프로세스를 Worker 1과 Worker 2가 처리를 하였고, 결과는 Key1과  Key2를 키로 가지는 데이타라고 하자, 이를 어그리게이션 하면 아래 그림과 같이 Key1 데이타는 Worker 3로 모이고, Key 2 데이타는 Worker 4로 모인다. 이런 방식으로 셔플링이 이루어진다.


데이타 플로우의 어그리게이션 에는 특정 키별로 데이타를 묶어주는 Grouping과, 특정 키별로 데이타를 연산(합이나 평균을 내는)하는  Combine 두 가지가 있다.

Grouping

PCollection<KV<String, Integer>> wordsAndLines = ...;

에서 다음과 같이 String,Integer 페어로 KV 타입을 리턴한다고 하자.


 cat, 1
 dog, 5
 and, 1
 jump, 3
 tree, 2
 cat, 5
 dog, 2
 and, 2
 cat, 9
 and, 6
 ...


이를 다음과 같이 키에 따라서 밸류들을 그룹핑 하려면 GroupByKey를 사용해야 한다.


 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(

   GroupByKey.<String, Integer>create());


코드는 앞 단계에서 KV<String,Integer>로 들어온 wordLines 데이타를 GroupByKey를 이용하여 Key 단위로 그룹핑을 한후 이를 <String, Iterable<Integer>> 타입으로 리턴하는 Transformation이다.

<String, Iterable<Integer>>에서 앞의 String은 키가 되며, Iterable 다수의 값을 가지고 있는 밸류가 된다. 이 밸류는 Integer 타입으로 정의된다.

Combine

Grouping이 키 단위로 데이타를 묶어서 분류해주는 기능이라면, Combine은 키단위로 데이타를 묶어서 연산을 해주는 기능이다.

예를 들어 앞의 예제처럼, “cat”이라는 문자열 키로 된 데이타들이 [1,5,9] 가 있을때, 이에 대한 총합이나 평균등을 내는 것이 Combine 이다.


PCollection<Integer> pc = ...;

 PCollection<Integer> sum = pc.apply(

   Combine.globally(new Sum.SumIntegerFn()));


코드는 Integer로 들어오는 모든 값을 Combine에서 Sum 기능을 이용하여 모든 값을 더하는 코드이다.

전체 데이타에 대해서 적용하기 때문에, Combine.globally로 적용하였다.


아래와 같은 형태의 데이타가 있다고 가정하자. 키에 따라서 값이 그룹핑이 된 형태이다.

 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Integer>> occurrences = ...;

 PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());

 PCollection<KV<String, Integer>> firstOccurrences = pc.apply(

   Combine.groupedValues(new Min.MinIntegerFn()));


위의 데이타들이 PCollection<KV<String, Iterable<Integer>>> grouped

에 저장되었다고 할때, 각 키별로 최소값을 구하는 것을 Combine.groupedValue에서 Min을 호출하여 최소값을 구했다.


Transforms 컴포넌트의 기본적인 종류들을 알아보았다. 이외에도, 하나의 Transform 안에 여러개의 Transform을 집어 넣는 Composite Transform이나, 두개 이상의 데이타 스트림에서 데이타를 키에 따라 JOIN하는 기능들이 있는데, 이러한 고급 기능들은 뒤에 고급 프로그래밍 모델에서 설명하기로 한다.

PCollection

PCollection은 데이타 플로우 파이프라인 내에서 데이타를 저장하는 개념이다.

데이타는 이 PCollection 객체에 저장이되서, 파이프라인을 통해서 Transform으로 넘겨진다.

PCollection은 한번 생성이 되면, 그 데이타를 수정이 불가능하다. (데이타를 변경하거나 수정하기 위해서는 PCollection을 새로 생성해야 한다.)

Bounded & Unbounded PCollection

PCollection은 데이타의 종류에 따라 Bounded PCollection과 Unbounded PCollection 두가지로 나뉘어 진다.

Bounded PCollection

Bounded PCollection은 배치 처리 처럼, 데이타가 변화하지 않는 데이타로 파일이나, 업데이트가 더 이상 발생하지 않는 테이블등을 생각하면 된다.

TextIO,BigQueryIO,DataStoreIO등을 이용해서 데이타를 읽은 경우에는 Bounded PCollection으로 처리가 된다.

Bounded PCollection 데이타들은 배치 처리의 특성상 데이타를 한꺼번에 읽어서 처리한다.  

Unbounded PCollection

Unbounded PCollection은, 데이타가 계속 증가 하는 즉 흐르는 데이타를 표현한다. 트위터 타임 라인이나, 스마트 폰에서 서버로 올라오는 이벤트 로그등을 예로 들 수 있다.

이러한 Unbounded PCollection은 시간의 개념을 가지고 윈도우 처리등을 해야하기 때문에, PCollection 객체내에 타임 스탬프가 자동으로 붙는다.

UnBounded PCollection은 데이타를 BigQueryIO또는 Pub/Sub에서 읽을 때 생성된다.

특히 Unbounded PCollection에 Grouping이나, Combine등을 사용할 경우, 데이타가 파이프라인 상에서 언제 그룹핑된 데이타를 다음 Transform 컴포넌트로 넘겨야할지를 정의해야 하기 때문에, Window를 반드시 사용해야 한다.

데이타 타입

PCollection을 이용해서 정의할 수 있는 주요 데이타 타입은 다음과 같다.

일반 데이타 타입

PCollection<Integer> data

가장 기초적인 데이타 형으로, Integer,Float,String 등 자바의 일반 데이타 타입으로 정의되고 하나의 데이타 만을 저장한다.

KV 데이타 타입

PCollection< KV<String,Integer> key_value_data

키 밸류 데이타 타입으로, 키와 값으로 구성된다. 키와 값은 일반 데이타 타입과 같게, 자바의 일반 데이타 타입 사용이 가능하다.


PCollection<KV<String, Iterable<Integer>>>

키 밸류 데이타 타입중에 값에 여러개의 값을 넣을 수 있는 Iterable 타입이 있다.

앞의 Transform 예제에서 언급된것과 같이 키가 cat이고, 그 값이 2,6,7,9 와 같이 여러 값을 가지는 형이 이러한 타입에 해당한다.

커스텀 데이타 타입

단순한 데이타 타입 말고도, 복잡한 데이타 형을 처리하기 위해서  커스텀 데이타 타입을 지원한다.

커스텀 데이타 타입은 오픈 소스 Avro의 데이타 모델을 사용한다. http://avro.apache.org/


예를 들어 어떤 키에 대해서 카운트값,평균,총합 그리고 윈도우의 시작과 끝 시간을 저장하는 데이타 타입 Stat가 있다고 가정하자. 이 데이타 타입은 다음과 같이 정의된다.

자바에서 일반적인 Value Object 형태로 정의를 하면되고, 단 앞에 어노테이션으로 @DefaultCoder(AvroCoder.class) 와 같이 Avro 데이타 클래스임을 정의하면 된다.


@DefaultCoder(AvroCoder.class)

static class Stat{

Float sum;

Float avg;

Integer count;

Integer key;

Instant wStart; // windowStartTime

Instant wEnd; // windowEndTime

public Instant getwStart() {

return wStart;

}

public Instant getwEnd() {

return wEnd;

}


public Float getSum() {

return sum;

}

public Float getAvg() {

return avg;

}

public Integer getCount() {

return count;

}


public Integer getKey(){

return key;

}


public Stat(){}

public Stat(Integer k,Instant start,Instant end,Integer c,Float s,Float a){

this.key = k;

this.count = c;

this.sum = s;

this.avg = a;

this.wStart = start;

this.wEnd = end;

}


}



윈도우

스트리밍 데이타 처리를 할때 가장 중요한 개념이 윈도우이다.

특히나  Unbounded 데이타를 이용한 스트리밍 처리에서 Grouping이나 Combine 시에는 반드시 윈도우를 사용해야 한다.Grouping이나 Combine과 같은 aggregation을 하지 않으면, Unbounded 데이타라도 윈도우 처리가 필요없다.

또한 Bounded 데이타도, 데이타에 타임 스탬프를 추가하면 윈도우를 사용하여, 시간대별 데이타를 처리할 수 있다.

예를 들어 일일 배치를 돌리는 구매 로그가 있을때, 각 데이타의 구매 시간이 있으면, 이 구매시간을 타임 스탬프로 지정하여 배치라도 윈도우 단위로 연산을 할 수 있다.

고정 윈도우 적용

윈도우를 적용하는 방법은 정말 간단하다.

PCollection 객체에 apply 메소드를 이용하여 Window.into 메서드를 이용하여 적용하면된다.

예를 들어서 아래와 같이 PCollection<String> 형 데이타인 items 객체가 있을때, 여기에 윈도우를 적용하려면 다음과 같이 하면 된다.

 PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)));

items.apply를 이용하여 윈도우를 적용하는데, 데이타 타입이 String이기 때문에, Window.<String>into 로 윈도우를 적용하였다.

윈도우 타입은 Fixed 윈도우이기 때문에, FixedWindows.of를 사용하였고, 윈도우 주기는 1분주기라서 1,TimeUnit.MINUTES를 적용하였다.

슬라이딩  윈도우 적용

슬라이딩 윈도우는 윈도우의 크기 (Duration)과 주기를 인자로 넘겨야 한다.

아래 코드는 5초 주기로 30분 크기의 윈도우를 생성하는 예제이다.

 PCollection<String> items = ...;

 PCollection<String> sliding_windowed_items = items.apply(    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

윈도우가 5초마다 생성이되고, 각 윈도우는 30분 단위의 크기를 가지고 있게 된다.

세션 윈도우 적용

세션 윈도우는 HTTP 세션 처럼 특정 사용자가 일정 시간동안 데이타가 올라오지 않으면, 처음 데이타가 올라온 시간 부터 데이타가 올라오지 않은 시간 까지를 윈도우르 묶어주는 개념이다.

앞의 고정 윈도우나, 세션 윈도우와는 다르게 반드시 키별로 세션을 묶기 때문에 키가 필요하다.


아래 예제는 각 사용자 별로 세션당 점수를 계산해주는 예제이다.

userEvents

 .apply(Window.named("WindowIntoSessions")

       .<KV<String, Integer>>into(

             Sessions.withGapDuration(Duration.standardMinutes(Duration.standardMinutes(10))))

   .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))

 // For this use, we care only about the existence of the session, not any particular

 // information aggregated over it, so the following is an efficient way to do that.

 .apply(Combine.perKey(x -> 0))

 // Get the duration per session.

 .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))



다른 윈도우들과 마찬가지로 Window.into를 이용하여 윈도우를 적용하는데, 데이타 형을 잘 보면 <KV<String, Integer>> 형으로 정의된것을 확인할 수 있다.

Sessions.withGapDuration으로 세션 윈도우를 정의한다. 이때 얼마간의 시간 동안 데이타가 없으면 세션으로 짜를지를 지정해줘야 하는데, Duration.standardMinutes(10) 를 이용하여 10분간 해당 사용자에 대해서 데이타가 없으면 해당 사용자(키)에 대해서 세션 윈도우를 자르도록 하였다.

윈도우 시간 조회하기

윈도우를 사용하다보면, 이 윈도우의 시작과 종료 시간이 필요할때가 있다. 예를 들어 고정 크기 윈도우를 적용한다음에, 이를 데이타 베이스등에 저장할때, 이 윈도우가 언제시작해서 언제끝난 윈도우인지를 조회하여 윈도우 시작 시간과 함께 값을 저장하고자 하는 케이스이다. “1시에 몇건, 1시 10분에 몇건의 데이타가 저장되었다”와 같은 시나리오이다.


현재 윈도우에 대한 정보를 얻으려면 DoFn 클래스를 구현할때, com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implement 해야, 윈도우에 대한 정보를 억세스 할 수 있다.


static class StaticsDoFn extends DoFn<KV<Integer,Iterable<Twit>>, KV<Integer,Stat>>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

:


IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

Instant e = w.end();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTime eTime = e.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss");

String str_stime = sTime.toString(dtf);

String str_etime = eTime.toString(dtf);

                                      :


윈도우에 대한 정보는 ProcessContext c에서, c.window()를 호출하면, IntervalWindow라는 클래스로 윈도우에 대한 정보를 보내주고, 윈도우의 시작 시간과 종료 시간은 IntervalWindow 클래스의 start()와 end() 메서드를 이용해서 조회할 수 있다. 이 조회된 윈도우의 시작과 종료 시간은 org.joda.time.Instant 타입으로 리턴되는데, 조금 더 친숙한 DateTime 포맷으로 변환을 하려면, Instant.toDate() 메서드를 사용하면 되고, 이때, TimeZone 을 지정해주면 로컬 타임으로 변환을 하여, 윈도우의 시작과 종료시간을 조회할 수 있다.

타임 스탬프

윈도우는 시간을 기준으로 처리를 하기 때문에, 이 시간을 정의하는 타임스탬프를 어떻게 다루는지를 이해할 필요가 있다.

타임 스탬프 생성

Pub/Sub을 이용하여 unbounded 데이타를 읽을 경우 타임스탬프가 Pub/Sub에 데이타가 들어간 시간을 Event time으로 하여, PCollection에 추가된다.

타임 스탬프 지정하기

Pub/Sub에서와 같이 자동으로 타임 스템프를 부여하는게 아니라, 모바일 디바이스에서 발생한 이벤트 타임이나, 애플리케이션 적으로 PCollection에 직접 타임스탬프를 부여할 수 있다.

PCollection에 타임 스탬프를 부여 하는 방법은 간단하다.

DoFn내에서,  ProcessContext를 다음 파이프라인 컴포넌트로 보낼때,  c.output(데이타) 대신, c.output(데이타, 타임 스탬프)를 사용하면 된다. 타임 스탬프의 데이타 타입은  org.joda.time.Instant 를 사용한다.

예를 들어서

c.outputWithTimestamp(c.element(), logTimeStamp);

와 같은 방법으로 사용을 한다.


모바일 서비스 분석등, 실제 시간에 근접한 분석을 하려면, 로그를 수집할때, 이벤트 발생 시간을 같이 REST API등을 통해서 수집하고, outputWithTimestamp를 이용하여, 수집된 이벤트 발생시간을  PCollection에 추가하는 방식으로 타임 스탬프를 반영 하는 것이 좋다.







저작자 표시 비영리
신고

Fluentd + Bigquery + Jupyter를 이용한 초간단 BI 구축하기


조대협

얼마전에 빅데이타의 전문가로 유명한 김형준님이 "Presto + Zeppelin을 이용한 초간단 BI 구축 사례"라는 발표 자료를 보았다. http://www.slideshare.net/babokim/presto-zeppelin-bi 오픈 소스 기술들을 조합하여, 초간단하게 빅데이타 분석 플랫폼을 만든 사례 인데, 상당히 실용적이기도 하고, 좋은 조합인것 같아서, 마침 구글 빅쿼리에 대한 자료를 정리하던중 비슷한 시나리오로 BI 대쉬 보드를 만들어보았다.

Fluentd를 이용해서 실시간으로 데이타를 수집하고, 이를 빅쿼리에 저장한 다음에 iPython nodebook (aka Jupyter)로 대쉬보드를 만드는 예제이다. 일부 제품에 대한 지식이 없었음에도 불구하고 실제 설정은 대략 2시간 정도 걸렸다.


아래 이제 예제는 정상적으로 작동 하지 않습니다. 트위터에서 JSON 스키마를 변경했는데, 거기에 맞는 빅쿼리 JSON 스키마를 구하기가 어렵네요. (만들자니 귀찮고). 참고로만 사용하세요



Fluentd 설치

예제는 Google Cloud에서 Ubuntu Linux 14.x VM에서 Fluentd를 이용하여 Twitter에서 데이타를 읽은 후, 빅쿼리에 데이타를 로딩하는 시나리오이다.

VM 생성

Fluentd를 설치할 VM을 생성해보자. 구글 클라우드 콘솔에서 아래 그림과 같이 VM을 생성할때, “Identity and API access” 부분에  “Allow full access to all Cloud APIs”를 선택한다. 이를 선택해서 이 VM이 모든 구글 클라우드 API에 대한 접근 권한 (BigQuery 포함)을 가지도록 한다.


tdagent 설치

생성한 VM에 fluentd의 로그 수집 에이전트인 tdagent를 설치한다.

tdagent는 OS나, 또는 같은 OS라도 OS 버전별로 설치 방법이 다르기 때문에, 버전별 설치 방법은 http://www.fluentd.org를 참고하기 바란다.

여기서는 Ubuntu 14.x를 기준으로 진행을 하였다.

다음 명령어를 실행하면 tdagent가 설치된다.

% curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

설치한 후 에이전트를 실행해서 확인해보자. 다음 명령으로 agent를 실행한 후에,

% sudo /etc/init.d/td-agent restart

실행이 끝난 후에 다음 명령으로 설치를 확인한다.

% sudo /etc/init.d/td-agent status


참고 (tdagent 관련 명령어)

tdagent 기동 - $sudo /etc/init.d/td-agent start
tdagent 정지 - $sudo /etc/init.d/td-agent stop
tdagent 재시작 - $sudo /etc/init.d/td-agent restart
tdagent 상태확인 - $sudo /etc/init.d/td-agent status




트위터 Input 설정하기

tdagent 에이전트 설치가 끝났으면 fluentd를 이용해서 트위터 피드를 읽어드리도록 해보자.

트위터 API 키 받기

트위터 피드는 트위터에서 제공하는 OPEN API를 통해서 읽어드린다. 그래서 이 OPEN API에 접근하기 위해서는 OPEN API키가 필요하다.

OPEN API 키는 https://apps.twitter.com/ 에 접속하고 Create New App 메뉴를 이용하면 새로운 앱을 등록할 수 있고, 여기에 Fluentd 앱을 정의해서 정보를 넣어주고 Key and secrect을 생성해주면 다음과 같이 키가 생성된 것을 웹에서 확인할 수 있다.


여기서 필요한 키값은 Consumer Key, Consumer Secret, Access Token, Access Token Secret 4가지가 필요하다.

트위터 플러그인 설치하기

API 접근을 위한 API Key를 모두 얻었으면 이제 fluentd에서 트위터 피드를 읽기 위한 트위터 플러그인을 설치해보자.

트위터 API는 libssl에 대한 의존성이 있기 때문에, libssl를 먼저 설치한다.

%sudo apt-get install build-essential libssl-dev

다음 트위터 플러그인이 사용하는 eventmachine 플러그인과, 트위터 플러그인을 설치한다.

% sudo td-agent-gem install eventmachine

% sudo td-agent-gem install fluent-plugin-twitter

설정하기

플러그인 설치가 끝났으면 설정을 해보자. 설정 파일은 /etc/td-agent/td-agent.conf 에 있다.

이 파일을 다음과 같이 편집하자.


<source>

 type twitter

 consumer_key        {앞서 트위터 콘솔에서 받은 Consumer Key}

 consumer_secret     {앞서 트위터 콘솔에서 받은 Consumer  secret}

 oauth_token         {앞서 트위터 콘솔에서 받은 Access token}

 oauth_token_secret {앞서 트위터 콘솔에서 받은 Access token secret}

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             galaxy,game        # 검색어

 output_format       nest                   # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type stdout

</match>


이 설정 파일은 keyword에 등록된 “galaxy”와 “game” 이라는 키워드를 찾아서, 읽어드린후 <match input.twitter.sampling> 에 의해서, 읽어드린 내용을 stdout으로 출력해주는 설정이다.

테스트

설정이 끝났으면 확인을 해보자

% sudo /etc/init.d/td-agent restart

명령어를 수행하여, td-agent를 리스타트 해서 새로운 config 파일이 반영되도록 하고

% tail -f /var/log/td-agent/td-agent.log          

를 통해서 stdout으로 올라오는 로그를 확인하자. 제대로 데이타가 수집되는 것을 확인했으면 다음 명령어를 이용해서, td-agent를 정지 시키자.

% sudo /etc/init.d/td-agent stop


빅쿼리로 저장하기

twitter로 부터 피드를 읽어드리는 플러그인이 정상적으로 작동함을 확인하였으면, 이번에는 읽어드린 데이타를 빅쿼리로 저장해보자.

빅쿼리 플러그인 설치 및 테이블 생성

빅쿼리로 데이타를 쓰기 위해서 빅쿼리 플러그인을 설치한다.

% sudo td-agent-gem install fluent-plugin-bigquery


다음으로 빅쿼리 프로젝트에서 트위터 데이타를 저장할 데이타셋과 테이블을 생성한다.

데이타 셋 이름은 편의상 “twitter”라고 하고, 테이블은 “ timeline”이라고 하고 생성을 하겠다.

테이블의 스키마는 트위터 피드에 대한 데이타 구조를 빅쿼리 스키마로 만들어놓은 스키마가 이미 https://gist.github.com/Salinger/ef39b81ad2c48516b596

에 있기 때문에, 이 스키마 파일을 읽어서 빅쿼리 콘솔에서 아래 그림과 같이 Schema 부분에 Copy & Paste를 해서 붙이면 테이블이 생성된다.


설정하기

테이블이 생성이 되었으면 fluentd 설정 파일을 수정하여 트위터 피드를 이 테이블에 저장하도록 설정한다.


<source>
 type twitter
   consumer_key        {앞서 트위터 콘솔에서 받은 Consumer Key}

 consumer_secret     {앞서 트위터 콘솔에서 받은 Consumer  secret}

 oauth_token         {앞서 트위터 콘솔에서 받은 Access token}

 oauth_token_secret {앞서 트위터 콘솔에서 받은 Access token secret}

 tag                 input.twitter.sampling  # Required
 timeline            sampling                # Required (tracking or sampling or location or userstream)
 keyword             hillary,clinton,donald,trump
 output_format       nest                    # Optional (nest or flat or simple[default])
</source>

<match input.twitter.sampling>
 type copy
<store>
  type bigquery
  buffer_type file
  buffer_path /var/log/td-agent/buffer/twi.*.buf
  method insert

  auth_method compute_engine
  project useful-hour-138023
  dataset twitter
  table timeline

  flush_interval 1
  buffer_chunk_limit 1000000
  buffer_queue_limit 5000
  flush_interval 1
  try_flush_interval 0.05
  num_threads 4
  queue_chunk_flush_interval 0.01

  time_format %s
  time_field log_time
  schema_path /home/terrycho/bq_tweet.json
  log_level error
</store>
</match>


기존 설정 파일에서 <match input.twitter.sampling> 부분을 빅쿼리로 변경하였다. <store>에서 type을 bigquery로 변경하였다.

중요한 필드들을 살펴보면

  • buffer_type, buffer_path : fluentd는 트위터에서 읽어드리는 데이타를 건건이 bigquery에 저장하는게 아니라 일정 단위로 모아서 bigquery에 저장한다. 그래서 buffer를 사용하는데, buffer를 파일을 이용하고, 이 파일의 위치를 지정해주었다.

  • auth_method, project,dataset,table : 데이타를 저장한 bigquery의 project,dataset,table 명을 정한다. 그리고 auth_method를 통해서 인증 방법을 설정하는데, 일반적으로는 service account에 대한 json 파일을 사용하는데, 여기서는 구글 클라우드내에 VM을 생성하였고, 앞에서 VM 생성시에 Bigquery에 대한 접근 권한을 이미 주었기 때문에, 인증 방식을 compute_engine으로 설정하면 된다.

  • flush_interval 은 어떤 주기로 버퍼된 데이타를 bigquery로 저장할것인지를 정한다. 여기서는 1초 단위로 저장하도록 하였다.

  • 그리고 중요한것중 하나가 schema_path 인데, 저장하고자 하는 bigquery 테이블의 스키마이다. 앞에서 테이블 생성에서 사용한 https://gist.github.com/Salinger/ef39b81ad2c48516b596 에서 다운 받았던 *.json으로 정의된 스키마 파일의 경로를 지정해주면 된다.

실행하기

모든 설정이 끝났으면

%sudo /etc/init.d/td-agent restart

명령을 이용해서 tdagent를 재기동하자.

그리고 빅쿼리 콘솔에서 “select count(*) from 테이블명” 명령을 사용하면 아래와 같이 카운트 수가 매번 올라가면서 데이타가 저장되는 것을 확인할 수 있다.


Datalab으로 대쉬보드 만들기

datalab은 오픈소스 iPython note의 구글 클라우드 버전이다. 자동으로 구글 클라우드 내의 앱앤진 내에 설치해주고, 구글 클라우드의 빅데이타 인프라 (빅쿼리등)과 손쉽게 연동되며, 구글 차트를 내장하고 있어서 그래프도 손쉽게 그려줄 수 있다.


데이타랩 준비하기

데이타랩을 사용하기 위해서는 https://datalab.cloud.google.com/ 에 접속하고, 로그인을 하면 다음과 같이 프로젝트를 선택하는 화면이 나온다.


만약에 아직 데이타랩을 설치 하지 않았으면 가운데 Deploy 버튼만 활성화가 된다. Deploy 버튼을 누르면 자동으로 데이타랩이 설치된다. 설치가 끝나면 Start 버튼이 활성화 된다. Start 버튼을 누르면 데이타 랩으로 들어갈 수 있다.

새로운 노트 만들기

다음은 데이타랩의 초기화면이다.


우리는 여기서, 새로운 노트를 만들어서 앞서 빅쿼리로 읽어드린 데이타를  lang(언어)별로 그룹핑을 해서 카운트하는 쿼리를 실행하고, 그 결과를 그래프로 만들것이다.

위의 초기화면에서 “+Notebook” 버튼을 눌려서 새로운 노트북을 만들어보자


노트화면이 로딩되었으면 상단의 메뉴를 보자


+Add code와, +Add Markdown 버튼을 볼 수 있는데,  Add Code는 파이썬이나 SQL과 같은 프로그래밍 언어를 정의하고 실행할 수 있는 공간이고, +Add Markdown은 일반적인 텍스트나 이미지를 통해서 간단한 글을 쓸 수 있는 공간을 만들어준다.

이렇게 코드써가면서 직접 실행해보고 그 결과를 확인하면서 그에 대한 내용을 설명하는 내용을 Markdown으로 작성하는 것과 같이 마치 노트에 계산을 해나가는 것처럼 써 나가기 때문에 이런 류의 프로그램을 노트북이라고 한다. (유사한 프로그램으로는 zeppelin 등이 있다 https://zeppelin.apache.org/)

쿼리 실행하기

그러면 Add code를 통해서 코드 섹션을 추가하고 SQL 문장을 추가해보자. 다음은 빅쿼리 트위터 테이블에서 lang 별로 그룹핑을 해서 카운트를 하는 SQL 문장이다.


이 문장을 실행하려면 노트북 상단의 “Run” 버튼을 누르면 된다.

다음과 같이 결과가 쿼리 바로 아래에 출력되는 것을 볼 수 있다.




그래프 그리기

다음으로 결과로 그래프를 그려보자

다음과 같이 두개의 코드 블럭을 추가하자


첫번째 코드 블럭에는 SQL 문장을 수행하는데 이때 --module twitter라고 정의를 해주면 결과가 twitter라는 모듈에 저장이 된다.

두번째 코드 블럭은 그래프를 그리기 위해서 chart 명령어를 이용하고 차트 타입은 pie로, 그래프의 x,y 축은 lang과, lang_count로 지정하고, 데이타 소스는  --date를 이용해서 앞의 쿼리 결과를 저장한 twitter로 지정한다.

다음으로 Run 버튼을 이용해서 쿼리를 수행해보면 다음과 같은 결과 화면을 얻을 수 있다.





지금까지 간략하게 Fluentd를 통해서 데이타를 수집하고 빅쿼리에 저장한 후, 데이타랩을 통해서 분석 및 리포팅을 하는 간단한 시나리오를 살펴보았다. fluentd나 데이타랩에 대한 사전적인 지식이 없었는데, 필자의 경우 이를 만드는데 대략 2시간의 시간이 소요되었다. 2시간의 시간으로 수 PB급의 빅데이타를 수집할 수 있고 분석할 수 있는 시스템을 구축할 수 있었다. 예전 같으면 하둡과 스팍 인스톨과 몇시간이 걸렸는데, 요즘 드는 생각은 빅데이타에 대한 접근 장벽이 많이 무너졌다고나 할까.

참고 자료


저작자 표시 비영리
신고

빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개


조대협 (http://bcho.tistroy.com)


빅데이타 분석에 있어서, 아키텍쳐적으로 중요한 모듈중의 하나는 여러 서버로 부터 생성되는 데이타를 어떻게 모을 것인가이다. 얼마전에, 일본의 사례를 보다가 눈에 띄는 솔루션이 있어서 주말을 통해서 이런 저런 테스트를 해봤다.


Embulk 소개

Embulk라는 솔루션인데, fluentd를 만들었던 사람이 만들었다고 한다.

여러 종류의 데이타 소스에서 데이타를 읽어서 로딩을 할 수 있다. 주요 특징을 보면

  • 플러그인 형태로 여러개의 소스와 타겟을 지원한다.
    jRuby로 개발이 되어서 ruby gem을 이용하여 손쉽게 플러그인을 설치할 수 있다.

  • 병렬 로딩이 가능하다.
    예를 들어 여러개의 파일을 동시에 로딩하거나 또는 하나의 큰 파일이라도 자동으로 여러개의 파일로 쪼게서 병렬로 로딩을 함으로써, 로딩 속도를 올릴 수 있다.

  • 변환이 가능하다.
    파일 포맷 변환뿐 아니라, 각 필드에 대한 형 변환 그리고, 간단한 필드 맵핑 등이 가능하다.

  • 스키마 예측 (Schema guessing)
    입력 데이타를 보고, 자동으로 입력 데이타의 스키마(테이블 구조)를 예측한다. 일일이 설정을 하려면 귀찮은 일인데, 자동으로 스키마를 인식해주시기 때문에, 설정양을 줄일 수 있다.

전제적인 개념은 미니 ETL과 유사하다고 볼 수 있는데, 그 사용법이 매우 쉽다.

Embulk 설치

이 글에서는 로컬에 있는 CSV 포맷의 파일을 구글 클라우드의 빅쿼리로 로딩하는 예제를 통해서 어떻게 Embulk를 사용하는지를 알아보겠다.

VM 생성

테스트는 구글 클라우드 VM에서 진행한다. 4코어 Ubuntu VM을 생성하고 테스트 데이타를 복사하였다.

VM을 생성할때, 빅쿼리 API를 사용할 것이기 때문에, Cloud API access scopes에 BigQuery API access 권한을 반드시 부여해야 한다.


이 예제에서는 VM 생성시 모든 Cloud API에 대한 사용권한을 부여한체 생성하였다. VM을 생성한 후에, 콘솔에서 VM 상세 정보를 확인해보면 위의 그림과 같이 “This instance has full API access to all Google Cloud services.”로, 모든  구글  클라우드 API에 대한 권한을 가지고 있는 것을 확인할 수 있다.

자바 설치

구글 Ubuntu VM에는 디폴트로 자바가 설치되어있지 않기 때문에, JVM을 설치한다.

% sudo apt-get update

% sudo apt-get install default-jre

Embulk 설치

JVM 설치가 끝났으면 Embulk를 설치해보자. 다음 명령어를 실행하면 Embulk 가 설치된다.

% curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
% source ~/.bashrc

Embulk는 ~/.embulk 디렉토리에 설치가 된다.

다음으로, 빅쿼리에 결과를 쓸 예정이기 때문에, 빅쿼리 Output 플러그인을 설치한다.

%embulk gem install embulk-output-bigquery


Embulk 로 빅쿼리에 CSV 파일 로딩하기

로딩할 데이타 살펴보기

로딩에 사용한 데이타는 게임 이벤트에 대한 데이타를 시뮬레이션 해놓은 것으로, 사용자가 NPC를 만나서 전투를 하는 각각의 이벤트를 기록해놓은 파일이다. 파일이름은 events000000000001 CSV 파일 포맷이고 총 1220395 레코드에, 243 MB의 크기이며 데이타 포맷은 다음과 같다.


파일 포맷은 다음과 같다.


eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


첫줄에, CSV 파일에 대한 컬럼명이 들어가고 두번째 줄 부터, “,” delimiter를 이용하여 각 컬럼을 구별하여 실 데이타가 들어가 있다.

스키마 예측을 통하여 자동으로 Config 파일 생성하기

이제, Embulk를 통해서 이 파일을 로딩하기 위해서, config 파일을 생성해보자.

Embulk에서 config 파일은 스키마 자동 예측을 통해서 자동으로 생성해낼 수 있다. Config 파일을 생성하기 위해서는 input과 output 에 대한 기본 정보를 기술해줘야 하는데, 다음과 같이 seed.yml 파일에 기본 정보를 기술한다.


in:  

 type: file  

 path_prefix: "/home/terrycho/data/events"

out:  

 type: bigquery


path_prefix에는 파일명을 정의하는데, /home/terrycho/data/events는 /home/terrycho/data/ 디렉토리내에 events*로 시작하는 모든 파일에 대해서 로딩을 하겠다는 정의이다.


seed.yml 파일 설정이 끝났으면 config 파일을 생성해보자

% embulk guess ./seed.yml -o config.yml

명령을 실행하면 아래와 같이 config.yml 파일이 생성된다.


in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: string}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out: {type: bigquery}


생성된 config.yml 파일을 보면 firstLogin 컬럼의 데이타 형이 string으로 되어 있는 것을 볼 수 있다. 빅쿼리 테이블에서 이 필드의 형은 실제로는 boolean이다. 아무래도 자동 인식이기 때문에, 이렇게 형들이 다르게 인식되는 경우가 있기 때문에, 생성 후에는 반드시 검토를 하고 알맞은 형으로 수정을 해줘야 한다.


다음으로 위의 파일에 데이타를 로딩할 빅쿼리에 대한 정보를 정의해줘야 한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: boolean}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 table: game_event

 source_format: CSV


“out:” 부분을 위와 같이 수정하였다.

mode는 append 모드로, 기존 파일에 데이타를 붙이는 모드로 하였다. auth_method에는 빅쿼리 API 호출을 위한 인증 방식을 정의하는데, 구글 클라우드의 VM에서 호출하기 때문에, compute_engine이라는 인증 방식을 사용하였다. (구글 클라우드의 VM에서 같은 프로젝트 내의 빅쿼리 API를 호출할 경우에는 별도의 인증을 생략할 수 있다.) 다른 인프라드에서 호출할 경우에는 IAM에서 Service account를 생성한 후에, json  파일을 다운 받아서, json 파일 인증 방식으로 변경하고, 다운 로드 받은 json 파일을 지정해주면 된다.

다음으로, project,dataset,table에, 로딩할 빅쿼리 데이블에 대한 프로젝트명, 데이타셋명, 테이블명을 기술해주었다. 그리고 마지막으로 입력 포맷이 CSV임을 source_format에서 CSV로 정의하였다.


이제 데이타 로딩을 위한 모든 준비가 끝났다.

Config 파일 테스트

데이타 로딩을 하기 전에, 이 config 파일이 제대로 작동하는지 테스트를 해보자

%embulk preview config.yml

의 명령어는 데이타를 읽어서 제대로 파싱을 하는지 설정 파일은 문제가 없는지 테스트를 해주는 명령어이다.

명령을 실행하면 다음과 같이 일부 데이타를 읽어서 파싱을 하고 결과를 보여주는 것을 볼 수 있다.



실행하기

테스트가 끝났으면 실제로 데이타를 로딩해보자. 로딩은 아래와 같이 embulk run 명령어를 사용하면 된다.

%embulk run config.yml

실제로 실행한 결과 약 12분이 소요되었다.


멀티 쓰레드를 이용하여 로딩 속도 올리기

앞에서 설명하였듯이, Embulk는 패레럴 로딩이 지원된다. 아래와 같이 config.yml 파일에 exec이라는 부분에, max_threads수와, min_output_tasks 수를 정해주면 되는데, min_output_tasks 수는 최소로 동시 실행할 로딩 테스크 수이다. 5로 정했기 때문에, 이 시나리오에서는 하나의 CSV 파일을 업로드 하기 때문에, 이 파일을 5개의 작은 파일로 잘라서 동시에 5개의 쓰레드로 동시에 업로딩 한다.


exec:

 max_threads: 20

 min_output_tasks: 5

in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

 :


실제로 테스트한 결과 디폴트 설정에서는 초당 약 1200줄을 업로드하였는데, 반하여, min_output_tasks를 5개로 하였을때는 초당 2000개 내외를 업로드 하였다. min_output_tasks를 10개,20개로 올려봤으나 성능은 비슷하였다. (아마 튜닝을 잘못한듯)

Parser-none으로 로딩 속도 올리기

앞의 시나리오는 데이타 라인을 각각 읽어서 컬럼을 일일이 파싱하고 이를 입력하도록 하는 시나리오였다.

만약에 CSV나 JSON 입력 파일이 빅쿼리 입력 포맷에 맞도록 이미 포매팅이 되어있다면, 일일이 파싱할 필요가 없다.

그냥 파일을 읽어서 파싱 없이 바로 빅쿼리에 insert만 하면되기 때문에, 이 경우에는 Parser를 제거하면 되는데, Parsing을 하지 않는 Parser로 embulk-parser-none이 있다.

이 Parser 다음과 같이 설치한다.

$ embulk gem install embulk-parser-none

다음 config 파일을 다음과 같이 수정한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001_nohead

 parser:

   type: none

   column_name: payload

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 schema_file: /home/terrycho/data/gameevent.schema.json

 table: game_event

 payload_column_index: 0


이때 중요한것중 하나는 데이타 파일 (CSV)파일 첫줄에 데이타에 대한 컬럼 정보가 들어가 있으면 안된다.

그래서 아래와 같이 원본 데이타 파일에서 첫줄을 지운다.

eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


다음 embulk run을 이용하여 이 config 파일을 실행해보면 같은 데이타인데도 로딩 타임이 약 50초 정도 밖에 소요되지 않는 것을 확인할 수 있다.

빅쿼리 관련 몇가지 추가 옵션

이외에도 다양한 옵션이 존재하기 때문에, 빅쿼리 output 플러그인 페이지인 https://github.com/embulk/embulk-output-bigquery 를 참고하기 바란다.

자동으로 중복을 제거하는 기능이나, 로딩할때 마다 동적으로 빅쿼리 테이블을 생성하는 기능등이 있으니 반드시 참고하기 바란다.

GCS를 경유하는 업로딩

Embulk의 패레럴 로딩이 좋기는 하지만 의외의 문제가 발생할 수 있는 부분이 하나가 있는데, 하나의 파일을 로딩하는데 Embulk는 여러개의 태스크로 병렬 처리를 하기 때문에, 빅쿼리 입장에서는 각각의 태스크가 빅쿼리 로딩 JOB으로 인식이 될 수 있다. 일반적으로 빅쿼리 JOB은 하루에 10,000개만 실행할 수 있는 제약을 가지고 있다. 그래서 만약에 데이타 로딩이 많을 경우 이런 병렬 로딩은 JOB 수를 깍아 먹는 원인이 될 수 있는데, bigquery output 플러그인에서는 다음과 같은 해법을 제공한다.


빅쿼리로 데이타를 로딩할때 GCS (Google Cloud Storage)를 사용하여, 와일드카드 (*)를 사용할 경우에는 하나의 디렉토리에 있는 여러 파일을 병렬로 로딩할 수 있으며, 이때 와일드 카드를 사용한 JOB은 하나의 JOB으로 인식된다. (병렬로 여러 파일을 로딩하더라도)


그래서 out 옵션에 다음과 같이 GCS  관련 옵션을 설정해주면 파일을 직접 로컬에서 로딩하는 것이 아니라, 처리를 다 끝난 파일을 GCS 버킷으로 업로딩한 후에, GCS 버킷에서 로딩을 하게 되기 때문에, JOB수를 줄일 수 있다.


out:

 type: bigquery

 gcs_bucket: bucket_name

 auto_create_gcs_bucket: false


성능과 활용도에 대한 분석

각 시나리오에 대한 성능 테스트 결과 값은 다음과 같다.

CSV를 구글에서 제공되는 bq load 명령어를 이용해도 108초가 나오는데 반해서, non-parser를 이용하면 파일을 자동으로 쪼게서 보내기 때문에, bq load를 이용하여 하나의 파일로 업로드 하는 것보다 높은 성능이 나온다.


시나리오

성능

bq load 명령어를 이용한 로딩

108초

CSV 파서를 사용한 경우

12분

non parser를 사용한 경우

50초


하나 고려할 사항은 Parser나 Filter의 경우 ruby로 개발된 것이 있고, java로 개발된 것들이 있는데, ruby로 개발된 플러그인의 경우 성능이 java 대비 많이 느리기 때문에 가급적이면 java로 개발된것을 사용하도록 한다.


다양한 데이타 소스와 저장소가 지원이 되고, 설정이 매우 간단하며 간단한 포맷 변환등이 지원되는 만큼, 쉽고 빠르게 데이타 연동 파이프라인을 구축하는데 활용도가 매우 높다. 이와 유사한 솔루션으로는 fluentd등이 있는데, fluentd는 조금 더 실시간 즉 스트리밍 데이타에 초점이 맞춰져 있으며, Embulk는 배치성 분석에 맞춰져 있다.


참고 자료


저작자 표시 비영리
신고


구글 클라우드의 대용량 메세지 큐 Pub/Sub 소개

조대협 (http://bcho.tistory.com)




구글 클라우드의 Pub/Sub 은 클라우드 기반의 대용량 메세지 큐이다.

흔히들 사용하는 RabbitMQ, JMS나 Kafka의 클라우드 버전으로 보면 된다. Rabbit MQ와 같은 설치형 큐가 작은 메세지에 대해서 세심한 컨트롤을 제공한다고 하면, Kafka나 Pub/Sub은 대용량 스케일의 메세지를 처리하기 위해서 설계 되었고, 자잘한 기능보다는 용량에 목적을 둔다.

그중에서 Pub/Sub은 클라우드 기반의 서비스로 비동기 메세징이 필요한 기능을 매니지드 서비스 형태로 제공함으로써, 별도의 설치나 운영이 필요 없이 손쉽게, 사용이 가능하다.

보통 특정 클라우드 벤더의 매지니드 솔루션은 Lock in 이슈 (한번 개발하면 다른 플랫폼으로 옮기기가 어려운)가 있어서 쉽사리 권하기는 어렵지만, 사용법이 간단하고 Lock in을 감수하고도 기능이 막강한 서비스나, 타 서비스로 전환이 쉬운 서비스일 경우에는 적극적으로 권장하는 편이다.


Pub/Sub의 경우 대용량 큐 서비스이기 때문에 Kafka 처럼 설치나 운영이 필요없음에도 불구하고 대용량 처리를 지원하면서 사용이 매우 쉽고, 코딩 양이 매우 적어서 차후에 다른 솔루션으로 교체가 용이하고 또한 대용량 장점과, 운영 대행의 장점으로 Lock in에 대한 단점을 충분히 커버하리라고 본다.

특징

주요 특징을 살펴보면, 글로벌 스케일 큐로, 전세계 어느 데이타 센터에서 접속하던지 구글 자체 광케이블망을 이용하여 빠른 접근을 제공한다.

메세지 전달 보장을 기능이 있으며, 큐에서 메세지를 PULLING 하는 기능뿐만 아니라, 큐가 메세지를 받는 쪽으로 HTTP를 이용하여 PUSH 해줄 수 있다.

토폴로지

구글 Pub/Sub 은 Message Provider (보내는쪽)과 Message Consumer (받는쪽)이 1:1 관계가 아니라. 1:N 관계이다.


Pub/Sub에는 Topic과 Subscription이라는 개념이 존재하는데,  Topic 을 큐로 생각하면 된다.

Message Provider가 Topic으로 메세지를 보내게 되고, 메세지를 읽으려면 Subscription 이라는 구독 채널을 설정해야 한다. Subscription은 하나의 Topic에 대해서 1..N개가 생성될 수 있다.

클라이언트는 각각의 Subscription에 붙어서 메세지를 받을 수 있다.

예를 들어서 하나의 메세지를 로그 시스템과 데이타 베이스 양쪽에 저장하고 싶을때는 Topic을 만든 후에, 로그 시스템용 Subscription, 데이타 베이스용 Subscription을 각각 만들어서 데이타를 읽으면 된다.


클라이언트 인터페이스

구글 Pub/Sub의 연동은 크게 다음과 같이 3가지 방법으로 접근이 가능하다.

메세지 구조와 생명 주기

Pub/Sub에 넣을 수 있는 메세지는 간단하다. String 형태의 메세지를 넣을 수 있으며, 메세지의 크기는 base64-encoding이 된 기준으로 최대 10M까지 지원이 된다.

메세지는 Message 와, Message Attribute  두가지 블럭으로 구분된다. 비교해서 이해하자면 Message 는 HTTP BODY, Message Attribute는 HTTP Header와 같은 개념으로 생각하면 되는데, Message는 통째로 TEXT가 들어가고, Message Attribute는 Key/Value 형태로 각각의 필드가 들어간다.  

생명주기 및 재처리 정책

메세지 생명 주기가 재미있는데, 먼저 Push로 받거나 Pull로 받은 메세지는 큐에서는 일단은 보이지 않는다. (다시 가지고 올 수 없다는 이야기). 메세지 처리가 끝난 후에는 클라이언트는 Pub/Sub으로 Acknowlege를 보내야 하는데, 만약에 정해진 시간 (이를 message acknowlegement deadline이라고 하고 디폴트는 10초)내에 ack를 주지 않으면, 그 메세지는 다시 Pub/Sub으로 들어간다.  이 acknowlegement를 통해서 메세지 전달 보장이 가능하다.


다시 Pub/Sub으로 돌아간 메세지는 Publishing time으로 부터 최대 7일까지 보관이 되서 클라이언트에서 다시  읽어드릴 수 있다.


https://cloud.google.com/pubsub/subscriber#ack_deadline


순서 보장

Pub/Sub 큐에 들어온 메세지는 Consumer에서 읽어드릴때, Pub/Sub에서 보낸 순서대로 읽을 수 없고, 랜덤한 순서로 전달된다. 즉 전달 순서 보장이 되지 않는다. 이는 Pub/Sub이 기본적으로 분산형 아키텍쳐를 띄고 있기 때문에, 내부에 어떤 노드로 데이타가 전달되는지, 그리고 각 노드중 어느 노드에서 데이타를 읽는지 예측이 불가능하기 때문이다.

메세지 전달 방식 (Message delivery type)

Pub/Sub은 일반적은 큐 시스템과 다르게 메세지를 Subscriber가 읽어오는 Pull 방식 이외에, Pub/Sub이 직접 Subscriber에게 메세지를 쏴주는 Push 방식을 같이 지원한다.

Pub/Sub 테스트 하기

대략적인 개념 이해가 끝났으면, 이제 실제 테스트를 해보자

Topic 생성하기

구글 클라우드 콘솔에서 Pub/Sub을 선택한 후, 아래 그림과 같이 메뉴에 들어오면, Create Topic 메뉴를 선택하여 Pub/Sub Topic을 생성한다.


여기서는 아래 그림과 같이 “mytopic”이라는 이름으로 토픽을 생성하겠다.


토픽명은 “projects/{프로젝트명}/topcis/{토픽명}” 식으로 생성된다. 이 예제에서 사용한 프로젝트명은 terrycho-sandbox이고, 지정한 토픽명은 “mytopic”이기 때문에, topic의 전체 이름은 “projects/terrycho-sandbox/topcis/mytopic”이 된다.

Subscription 생성하기

이제 앞에서 생성한 Topic으로 부터 메세지를 읽어드리기 위해서 Subscription을 생성해보자.

Pub/Sub 메뉴에서 아래와 같이 앞서 생성한 Topic을 확인할 수 있다. 이 메뉴에서 생성한 Topic의 “+New subscrition”이라는 버튼을 선택하면 새로운 Subscription을 생성할 수 있다.


아래 그림과 같이 subscription 생성화면에서 subscription 이름을  mysubscription으로 지정하자.

topic과 마찬가지로 subscription의 full name 역시 “projects/{프로젝트명}/subscriptions/{서브스크립션명}” 이 된다.


그리고 Delivery type (메세지 전달 방식)은 Pull을 선택한다.

아래 그림과 같이 Advanced option에서  Acknowlegement deadline을 설정할 수 있는데, 건들지 말고 디폴트 10초로 놔둔다.



메시지 보내보기

메세지를 보내는 테스트를 하기 위해서는 클라우드 콘솔 Pub/Sub 메뉴에서 앞에서 생성한 Topic을 선택하면 아래 그림과 같이 “Publish” 버튼이 나온다.


Publish 버튼을 누르면 아래와 같이 메세지를 직접 입력할 수 있는 창이 나온다.


위의 그림과 같이 Message 창에 보내고 싶은 메세지를 적고 Publish버튼을 누르면 Pub/Sub에 메세지가 퍼블리슁 된다.

보낸 메세지 읽어드리기

이제 퍼블리슁된 메세지를 읽어보자. 메세지는 gcloud라는 구글 클라우드 클라이언트를 이용해서 할것인데, 설치 방법은 https://cloud.google.com/sdk/gcloud/ 를 참고하면된다.

설치가 귀찮은 경우에는 아래 그림과 같이 구글 클라우드 콘솔의 상단 부분에 우측에 “>.” 이렇게 생긴 아이콘을 누르면 된다.



클라우드 쉘이라는 것인데, 구글 클라우드에 대해서 Command Line으로 명령을 내릴 수 있는 기본 명령어들이 미리 깔려있는 Linux 접속창이다.



pub/sub 은 아직 알파 버전이기 때문에, gcloud를 업그레이드 해서 alpha 버전 명령어가 수행이 가능하도록 해야 한다.

다음 명령어를 실행하자

%gcloud components install alpha

이제 gcloud 명령어가 업데이트 되었다. 이제 Pub/Sub topic에서 데이타를 읽어와보자

다음 명령어를 실행하면 mysubscrition에서 메세지를 읽어올 수 있다.

gcloud alpha pubsub subscriptions pull projects/terrycho-sandbox/subscriptions/mysubscription

다음은 명령을 실행해서 데이타를 읽어온 결과 이다.




10초 정도 후에 같은 명령어 실행해보면 같은 메세지가 리턴되는 것을 볼 수 있는데, 이는 ack를 주지 않았기 때문이다. gcloud에서 자동으로 ack를 보내는 방법은 명령어에 --auto-ack라는 옵션을 추가하면 된다.

옵션을 추가하고 명령을 실행해보자

gcloud alpha pubsub subscriptions pull projects/terrycho-sandbox/subscriptions/mysubscription --auto-ack

아래 결과와 같이, 첫번째 실행에서는 메세지가 도착하지만, 두번째 실행에서는 같은 메세지가 도착 하지 않는 것을 확인할 수 있다.


이 밖에도 gcloud 명령으로 하나의 메세지 뿐 아니라 한번에 여러개의 메세지를 리턴받을 수 도 있고, 여러개의 메세지를  pagination을 통해서 리턴 받을 수 도 있다. 자세한 옵션은 https://cloud.google.com/sdk/gcloud/reference/alpha/pubsub/subscriptions/pull 를 참고하기 바란다.


클라우드 웹 콘솔과, gcloud 명령어를 이용해서, 메세지를 퍼블리슁하고 읽어들이 것을 알아보았다. 다음 글에서는 실제로 SDK를 이용해서 메세지를 퍼블리슁하고 읽어들이는 예제를 소개하도록 하겠다.

저작자 표시 비영리
신고


데이타 스트리밍 처리에 대한 이해


조대협 (http://bcho.tistory.com)


근래에 Apache Beam 프로젝트를 공부하게 되서, 그간 묵혀놨던 데이타 스트리밍 처리에 대해서 다시 정리중인데, 예전에 Apache Storm을 봤을때 보다 트리거나, 윈도우등 많은 개념들이 들어가 있어서 데이타 스트리밍에 대한 개념 부터 다시 정리를 시작을 하고자한다.


Apache Storm에서 부터, Apache Spark 기반의 데이타 스트림 처리뿐 아니라 근래에는 Apache Flink와 같은 새로운 스트리밍 프레임웍크과 구글이 이미 클라우드를 통해서 서비스 하고 있는  google cloud dataflow (Apache Beam이라는 프로젝트로 오픈소스화 되었고, 현재 인큐베이션 단계에 있다.) 까지 빅데이타에 대한 실시간 처리성이 강조되면서 근래에 데이타 스트리밍 처리가 다시 주목 받는 것 같다. 이 문서는 구글이 개발한 dataflow에 대한 개념을 이해하기 위함이다.


본 문서의 내용과 그림은 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 를 참고하였다.


사전 개념 이해

스트리밍 데이타 처리를 이해하기 위해서는 몇몇 용어와 개념을 사전에 이해해야 하는 부분이 있다.

Bounded data 와 Unbounded data

먼저 스트리밍 데이타 처리를 이해하려면 데이타의 종류에 대해서 먼저 이해해야 한다.

  • Unbounded data 는 데이타의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이타라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이타를 Unbounded data 라고 한다.

  • Bounded data는 데이타가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이타를 뜻한다. 1월의 정산 데이타.

Event time과 Processing time

데이타의 발생 시간과 시스템에서 처리되는 시간이 차이가 있는데, 이를 각각 Event time과 Processing time 이라고 정의한다.

예를 들어, 게임에서 사용자가 공격을 한 이벤트를 서버에 전달해서 처리하여 저장하는 시나리오가 있다고 가정할때, 공격 이벤트가 1:00:00에 발생했으면, 이 데이타가 네트워크를 타고 서버로 도달하여 프로그램 로직을 수행하고 저장하는데 소요된 시간을 2초라고 가정하면, Event time 은 1:00가 되고, Processing time은 1:00:02가 된다.

이상적으로는 Event time과 Processing time이 동일하면 좋겠지만, 네트워크 시간이나 처리 시간에 따라 Processing time이 Event time 보다 늦고, 또한 Processing time에서 소요되는 실제 처리 시간은 일정하지 않고 아래 그림의 파란색 그래프(실제 처리 그래프) 처럼 들쭉 날쭉하다. 네트워크 상황이나, 서버의 CPU, IO 상황이 그때마다 차이가 나기 때문이다.


아래 그림을 통해서 개념을 다시 정리해보면,

X축은 Event time, Y축은 Processing Time이다. 0초에 발생한 데이타가 서버에 도착해서 처리하는 시간이 소요 되기 때문에, 아래 그림과 같이 Processing Time은 2초 부터 시작한다. Skew는 Event time과 Processing time간의 간격이다. 아래 그림에서 보면, Processing time에서 3초때에는 Event time 1초에서 발생한 데이타를 처리하고 있는데, 실제 Event time에서는 3초 시간의 데이타가 발생하고 있기 때문에, Processing time과 Event time은 약 2초의 지연이 발생하고 있고, 이를 Skew 라고 한다.



Bounded data의 처리

Bounded data는 이미 저장되어 있는 데이타를 처리하는 것이기 때문에 별다른 처리 패턴이 필요없다



데이타를 읽어서 한번에 처리해서 저장 하면 된다.

UnBounded data 처리

복잡한 것은 스트리밍 데이타 즉, Unbounded data 를 처리하는 방법인데, Unbounded data 는 크게 Batch와 Streaming 두 가지 방식으로 처리할 수 있다.

Batch로 처리

배치로 Unbounded data를 처리 하는 방식은 아래와 같이 두 가지 방식이 있다.

Fixed Windows

Fixed Windows 방식은 스트리밍으로 들어오는 데이타를 일정 시간 단위로 모은 후, 배치로 처리 하는 방식이다. 예를 들어서 아래 그림과 같이 10~11시 까지 데이타를 수집한후, 11시 이후에, 10~11시까지 들어온 데이타를 처리해서 정리 하는 방식이다.



이 방식은 구현이 간단하다는 장점이 있지만, 데이타가 수집 된 후 처리를 시작하기 때문에, 실시간성이 떨어진다. (근 실시간)

Streaming 처리

Unbounded 데이타를 제대로 처리하려면 스트리밍 처리를 하는 것이 좋은데, 스트리밍 처리 방법에는 아래와 같이 크게 Time agnostic, Filtering, Inner Join, Windowing 방식등이 있다.


스트리밍 처리는 배치 처리에 비해서 복잡한 것이, Unbounded 데이타는 기본적으로 특성이 Skew가 환경에 따라 변화가 심하고 그래서 데이타가 시스템에 도착하는 순서 역시 순차적으로 도착하지 않고 들쭉 날쭉 하다.

Time agnostic

Time agnostic 이란, 데이타가 시간 속성을 가지고 있지 않는 데이타 이다. 들어오는 데로 처리를 하면 되기 때문에, 별다른 노하우가 필요 없지만, 하나의 데이타 형이기 때문에 간단하게 언급만 한다.

Filtering

다음으로 많이 사용 되는 것이 필터링인데, 들어오는 데이타 중 특정 데이타만 필터링 해서 저장 하는 구조이다.


예를 들면, 웹 로깅 데이타를 수집해서, 특정 IP나 국가 대역에서 들어오는 데이타만 필터링해서 저장하는 시나리오등이 될 수 있다.

Inner joins (교집합)

Inner join은 두개의 Unbounded 데이타에서 들어오는 값을 서로 비교하여 매칭 시켜서 값을 구하는 방식이다.



모바일 뉴스 앱이 있다고 가정할때, 뉴스 앱에서는 사용자가 어떤 컨텐츠를 보는지에 대한 데이타를 수집 전송하고, 지도 앱에서는 현재 사용자의 위치를 수집해서 전송한다고 하자.

이 경우 사용자별 뉴스 뷰에 대한 Unbounded data 와, 사용자별 위치에 대한 Unbounded data 가 있게 되는데, 이 두개의 데이타 스트림을 사용자로 Inner Join을 하면 사용자가 어떤 위치에서 어떤 뉴스를 보는지에 대해서 분석을 할 수 있다.

Inner join을 구현하기 위해서는 양쪽 스트림에서 데이타가 항상 같은 시간에 도착하는 것이 아니기 때문에, 반대쪽 데이타가 도착할때 까지 먼저 도착한 데이타를 임시로 저장할 버퍼 영역이 필요하고, 이 영역에 임시로 일정 기간 데이타를 저장하고 있다가 반대쪽 스트림에서 데이타가 도착 하면 이를 조인해서 결과를 저장하고, 버퍼 영역에서 두개의 데이타를 삭제한다.

만약에 반대쪽의 데이타가 도착하지 않으면, 이 버퍼 영역에 데이타가 계속 쌓이기 때문에, 일정 기간이 지나면 반대쪽 스트림에서 데이타가 도착하지 않은 데이타를 주기적으로 삭제 해주는 (garbage collection) 정책이 필요하다.


cf. Inner join (교집합), Outer join (합집합)

Approximation algorithms (근사치 추정)

근사치 추정 방식은 실시간 데이타 분석에서 많이 사용되는데, 실시간 분석에서는 전체 데이타를 모두 분석할 수 있는 시간이 없는 경우가 많고, 시급한 분석이 필요한 경우가 있기 때문에, 전체 데이타를 분석하지 않고 일부만 분석하거나 또는 대략적인 데이타의 근사값만을 구하는 방법으로 해서, 빠르게 데이타를 분석하는 경우가 있다. 이를 근사치 추정 방식이라고 하는데, 예를 들어 VOD 서비스에서 지금 10분간 인기있는 비디오 목록, 12시간 동안 가장 인기 있는 판매 제품등 과 같은 시나리오인데, 이런 시나리오에서 데이타는 아주 정확하지 않아도 근사 값만 있으면 되고, 데이타를 그 시간에 보는 시급성이 중요하다.  이러한 시나리오에서는 전체 데이타를 다 보고 분석이 어렵기 때문에, 샘플링을 하거나 대략적인 근사 값만을 구해서 결과를 낸다.


이런 근사치를 추정하는 알고르즘은 K-means나 Approximate Top-N등이 이미 정의되어 있는 알고리즘이 많다.


참고 자료 :

Storm을 이용한 근사치 구하기 : https://pkghosh.wordpress.com/2014/09/10/realtime-trending-analysis-with-approximate-algorithms/

Apache Spark에서 K means로 근사치 구하기 :

https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html


Windowing

실시간 스트리밍 데이타 처리에서 중요한 개념중의 하나는 Windowing 인데, Windowing 이란 스트리밍 데이타를 처리할때 일정 시간 간격으로 처리하는 것을 정의한다.

예를 들어, 10분 단위의 Windowing의 경우 1시~2시까지 들어온 데이타를 1:10, 1:20,1:30, …  단위로 모아서 처리한다.

윈도우에는 자르는 방법에 따라서 다음과 같이 몇가지 방법이 있다.

Fixed Windows

정확하게 일정 시간 단위로 시간 윈도우를 쪼게는 개념이다. 앞에서 언급한 예와 같이 윈도우 사이즈가 10분 일때, 1시 10분은 1시00분~1시10분까지의 데이타를, 1시 20분은 1시10분~1시20분까지의 데이타를 처리한다.

Sliding Windows

Sliding Window 방식은 윈도우가 움직이는 개념이다.

슬라이딩 윈도우의 개념은 현재 시간으로 부터 +-N 시간 전후의 데이타를 매 M 시간 마다 추출 하는 것을 슬라이딩 윈도우라고 하고, 이 윈도우들은 서로 겹치게 된다.

예를 들면 현재시간으로부터 10분 전에서 부터  측정시간까지의 접속자를 1분 단위로 측정하는 시나리오가 될 수 있다. 매 1분 간격으로, 데이타를 추출하고, 매번 그 시간으로부터 10분전의 데이타를 추출하기 때문에 데이타가 중첩이 된다.  

이렇게 추출하는 간격을 Period (앞에서 1분), 그리고 추출하는 기간을 Length 또는 Size (앞에서 10분)라고 한다.



출처 : https://cloud.google.com/dataflow/model/windowing#sliding-time-windows

Session

다음으로는 Session Window의 개념이다.

Session Window에는 사용자가 일정 기간동안 반응이 없는 경우(데이타가 올라오지 않는 경우)에 세션 시작에서 부터, 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리한다

예를 들어서 세션 타임 아웃이 20분이라고 하고 데이타가 1:00 부터 올라오고 있는데,  1:01, 1:15에 데이타가 올라오고, 1:40분에 데이타가 올라오면 1:15 이후에 20분동안 (1:35까지) 데이타가 올라오지 않았기 때문에, 1:00,1:01,1:15은 하나의 세션으로 되고, 1:40은 새로운 세션 시작이 된다.



출처 : https://cloud.google.com/dataflow/model/windowing#session-windows


시간대별 Window 처리 방식

스트리밍 데이타에서 윈도우를 사용할때, 어느 시간을 기준 시간으로 할것인가를 정해야 하는데, 데이타가 시스템에 도착하는 Processing time을 기준으로 할 수 있고 또는  데이타가 실제 발생한 시간인 Event time을 기준으로도 할 수 있다.

Processing time based windowing

Processing time을 기준으로 데이타를 처리하는 것은 크게 어렵지 않다. 데이타가 도착한 순서대로 처리해서 저장하면 된다.


Event time based windowing

문제는 Event time을 기준으로 데이타를 처리 하는 경우인데, 데이타가 들어오는 것이 순서대로 들어오지 않는 경우가 많고, 또한 데이타의 도착 시간또한 일정하지 않다.




이 그림은 Event time을 기준으로 데이타를 처리하는 개념인데, 좌측 하얀색 화살표 처럼 12:00~13:00에 도착한 데이타가 11:00~12:00에 발생한 데이타 일 경우, 11:00~12:00 윈도우에 데이타를 반영해줘야 한다.

이러한 Event time 기반의 스트리밍 처리는 아래와 같이 기술적으로 두가지 주요 고려 사항이 필요하다.

  • Buffering
    늦게 도착한 데이타를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다.

  • Completeness
    Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가?
    즉 해당 시간에 발생한 모든 데이타는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이타가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다. (예를 들어 워터마크 기법 같은 것이 있는데, 이는 다음글에서 설명하도록 한다.)


지금까지 실시간 데이타 분석에 사용되는 대략적인 개념을 알아보았다. 다음 글에서는 Apache Beam을 이용하여 이러한 실시간 데이타 분석을 어떻게 구현하는지 알아보도록 하겠다.



참고 자료

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison#game-stats-advanced-stream-processing


저작자 표시 비영리
신고

빅쿼리 #2-아키텍쳐


조대협 (http://bcho.tistory.com)


이번글에서는 앞에서 소개한 구글의 대용량 데이타 저장/분석 시스템인 빅쿼리의 내부 아키텍쳐에 대해서 알아보도록 한다.

컬럼 기반 저장소

다음과 같은 테이블이 있다고 하자




전통적인 데이타 베이스는 파일에 물리적으로 데이타를 저장할때 개념 적으로 다음과 같은 방식으로 저장한다.


FILE 1 : “001;Cho;Terry;Seoul;30,002;Lee;Simon;Suwon;40,003;Kim;Carl;Busan;22”


그래서 하나의 레코드를 가지고 오면 그 레코드에 해당하는 모든 값을 가지고 올 수 있다.

반면 컬럼 기반 저장소의 경우에는 각 컬럼을 다음과 같이 다른 파일에 나눠서 저장한다.


FILE 1: 001:Cho,002:Lee,003:Kim

FILE 2: 001:Terry,002:Simon,003:Carl

FILE 3: 001:Seoul;002:Suwon:003:Busan

:


이렇게 컬럼 단위로 저장을 하게 되면 장점은 Last Name 컬럼의 내용만 읽고 싶을때 로우 단위로 저장한 시스템의 경우, 모든 데이타를 다 스캔해야 하는데, 컬럼 단위로 저장한 시스템은 Last Name을 저장한 파일 하나만 스캔하면 되고, Last Name 이외의 다른 컬럼의 데이타는 읽어드리지 않아도 되기 때문에 데이타 효율성이 높다.


그래서 특정 컬럼만 읽어서 개수를 세거나 통계를 내는 분석용 데이타 베이스(OLAP)  작업등에 유리하다.

Columnar databases boost performance by reducing the amount of data that needs to be read from disk, both by efficiently compressing the similar columnar data and by reading only the data necessary to answer the query.

In practice, columnar databases are well-suited for OLAP-like workloads (e.g., data warehouses) which typically involve highly complex queries over all data (possibly petabytes). However, some work must be done to write data into a columnar database. Transactions (INSERTs) must be separated into columns and compressed as they are stored, making it less suited for OLTP workloads. Row-oriented databases are well-suited for OLTP-like workloads which are more heavily loaded with interactive transactions.”

출처 : https://en.wikipedia.org/wiki/Column-oriented_DBMS

실제로 빅쿼리도 하나의 파일에 하나의 컬럼 데이타만 저장하도록 되어 있다.

빅쿼리는 앞의 글에서도 설명하였듯이 데이타의 안정성을 위해서 3개의 데이타 센터에 나눠서 데이타를 저장하는데, 각 컬럼을 저장하는 파일이 총 3개의 복제본으로 그림과 같이 각각 다른 데이타 센터에 나줘서 저장이 된다.


트리 구조의 데이타 처리 구조

실제로 빅쿼리의 시스템 구조는 다음과 같이 되어있다.



출처 https://codelabs.developers.google.com/codelabs/cp100-big-query/#3


GFS (Google File System)의 후속 파일 시스템인 분산 스토리지 Colossus가 맨 아래에서 저장소를 제공하고, Jupiter 라는 TB 급의 네트워크 망을 통해서, 컴퓨팅 노드와 통신을 한다.

그리고 연산을 담당하는 컴퓨팅 계층은 Leaf , Mixer 1, Mixer 0 계층으로 되어 있다.

이 컴퓨팅 계층은 디스크 없이 Colossus에서 읽은 데이타를 처리해서 각각 위의 계층으로 올리는 역할을한다.


다음 SQL이 어떻게 연산이 수행되는지를 살펴보면 아키텍쳐를 이해할 수 있다.




natality라는 테이블에서 1980~1990년대에 태어난 아이들의 수를 주(state)별로 그룹핑해서 내림차순으로 정렬한 후 상위 10개의 데이타만 출력해주는 쿼리이다.


이 쿼리를 수행하면 빅쿼리는 내부적으로 다음과 같은 연산을 수행한다.



  • STORAGE : 디스크에서 STATE,YEAR 컬럼만을 읽어드린다.

  • LEAF : 다음 각각의  LEAF 노드에서, 읽어드린 데이타를 가지고 1980~1990년대 데이타를 주 단위로 태어난 아이들의 수를 카운팅 한다.

  • MIXER 1:에서 LEAF에서 계산해온 주(STATE) 별 아이들의 수를 합친다.

  • MIXER 0:에서는 MIXER 1에서 올라온 모든 값을 합치면서 소팅을 한다. (머지 소트를 생각하면 된다.). 소팅이 끝난 후에, 맨 위의 10개의 레코드만을 리턴한다.


조인이나 그룹핑의 경우 조금 더 복잡한 연산이 실행되기는 하지만 큰 흐름은 유사하다.

다른 구조적인 특징

빅쿼리가 대용량과 성능을 지원하기는 하지만 일반적인 데이타 베이스와 다소 다른 특성을 가지고 있다.

NO-KEY,NO-INDEX (FULL SCAN ONLY)

빅쿼리에는 키나 인덱스의 개념이 없다. 무조건 풀스캔이다. (스캔의 범위를 조정할 수 있는 기법이 있는데 이는 나중에 설명하도록 한다.)

NO UPDATE,DELETE ROW

빅쿼리는 성능을 위해서 테이블에 데이타를 추가 (APPEND)하는 것만을 지원한다. 한번 입력된 데이타는 변경되거나 삭제될 수 없으며, 데이타가 잘못 입력되었을 경우에는 테이블을 지우고 다시 생성해야 한다. (이 문제를 해결하는 디자인 패턴에 대해서는 나중에 설명한다.)

EVENTUAL CONSISTENCY

데이타를 3개의 데이타 센터에 걸쳐서 복제하기 때문에, 데이타를 입력한 후 바로 조회하면 데이타가 조회가 되지 않을 수 있다. 이는 데이타를 복제하는데 소요되기 때문인데, 보통 바로 반영되거나 상황에 따라서 최대 수분이 걸릴 수 있다.


다음 글에서는 빅쿼리의 데이타 모델과 인터페이스에 대해서 알아보도록 한다.


참고 자료



저작자 표시 비영리
신고



구글 빅데이타 플랫폼 빅쿼리 소개


조대협 (http://bcho.tistory.com)


구글의 클라우드 관련 기술중 무엇이 좋은게 있을까 살펴 보면서 기술을 하나하나씩 보다 보니, 구글 클라우드의 특징은 여러가지가 있겠지만, 데이타 회사 답게 빅데이타 및 머신 러닝 플랫폼이 상당히 강하다.


그중에서 빅데이타 플랫폼의 중심에 BIG QUERY라는 빅데이타 플랫폼이 있어서, 몇 회에 걸쳐서 빅쿼리에 대해서 소개해보고자 한다.

구글 빅데이타 분석의 역사

구글은 빅데이타를 다루면서, 그 근간이 되는 기술들의 논문들을 공개했다. 하둡 파일 시스템의 시초가 되는 GFS나, 하둡의 시초인 MapReduce 논문, 그리고 Hive를 통해 오픈소스화가 된 Big Table등의 논문들이 있다. 구글의 빅쿼리는 Dremel 이라는 논문을 근간으로 한다.

빅쿼리랑 무엇인가?

빅쿼리는 페타 바이트급의 데이타 저장 및 분석용 클라우드 서비스이다.

요즘은 페타바이트급의 data warehouse로 부르는데, 쉽게 말해서 페타바이트급의 데이타를 저장해놓고, 쿼리를 통해서 조회나 통계 작업등을 할 수 있는 DB(라고 보기에는 약간 애매하지만)이다.

빅쿼리의 특징

대략적인 특징을 살펴보면 다음과 같다.

클라우드 서비스로 설치/운영이 필요 없음 (NoOps)

어디에 설치해서 사용하는 서비스가 아니라 구글 클라우드 서비스를 통해서 제공되는 빅데이타 저장 분석 서비스이다. 클릭 몇번으로 서비스 사용이 가능하고, 별도의 설정이나 운영이 필요 없다.

SQL 언어 사용

기존의 RDBMS에서 사용되는 SQL언어를 그대로 사용하기 때문에, 사용이 매우 쉽다.

클라우드 스케일의 인프라를 통한 대용량 지원과 빠른 성능

빅쿼리의 성능이나 스케일을 보려면 다음 예제를 보는게 좋다.

https://cloud.google.com/blog/big-data/2016/01/anatomy-of-a-bigquery-query


위키피디아에서 100 billion record (1000억개)의 레코드를 스캐닝해서 regular expression으로 “G.*o.*o.*g”) 문자열을 찾아내서 그 문서의 뷰수를 카운트 하는 예제이다.

대략 4TB 용량의 데이타가 핸들링 되고, 약 30초가 소요된다.

30초 동안, 약 3,300개의 CPU와, 330개의 하드 디스크, 330 Gigabit의 네트웍이 사용된다.

(자료 : https://cloud.google.com/blog/big-data/2016/01/bigquery-under-the-hood)

이 쿼리를 수행하는데 소요되는 비용은 딱 $20가 소요된다.

일반적인 인프라에서 빅데이타 연산을 하는데, 3300개의 CPU를 동시에 사용하기란 쉽지 않은 일이고, 이런 대용량 연산을 20$에 할 수 있는 것은 대용량 인프라를 공유하는 클라우드 서비스이기 때문에 가능하다.

데이타 복제를 통한 안정성

데이타는 3개의 복제본이 서로 다른 3개의 데이타 센터에 분산되어 저장되기 때문에 데이타에 대한 유실 위험이 적다.

배치와 스트리밍 모두 지원

한꺼번에 데이타를 로딩하는 배치 이외에도 REST API등을 통해서 실시간으로 데이타를 입력할 수 있는 스트리밍 기능을 제공하며, 스트리밍시에는 초당 100,000개의 행(row)의 데이타를 입력할 수 있다.

비용 정책

비용 정책 역시 클라우드 서비스 답게, DB 인스턴스와 같은 과금 방식이 아니라서 큰 데이타를 핸들링 하기 위해서 큰 인스턴스를 쓰고 사용하지 않는 동안에도 과금이 되는 정책이 아니라,

딱  저장되는 데이타 사이즈와, 쿼리시에 발생하는 트렌젝션 비용만큼만 과금이 된다.  데이타 저장 요금은 GB당 0.02$이고, 90일이 지나서 사용하지 않는 데이타는 자동으로 0.01$로 가격이 떨어진다.

클라우드 서비스에서 가격이 싸다는 일반적인 오브젝트 스토리지 (Google Cloud Storage : GB당 0.026$)보다 싸다. 트렌젝션 비용은 쿼리 수행시 스캔되는 데이타를 기준으로 TB당 $5 이다.  (월  1TB는 무료)
(나중에 자세하게 설명하겠지만, 스캔되는 컬럼당 비용이 나오기 때문에 사실상 비용을 계산해보면 그리 높지 않다)

가격 정책 : https://cloud.google.com/bigquery/pricing


빅쿼리가 기존의 빅데이타 플랫폼과 다른점은?

그렇다면 빅쿼리가 기존의 빅데이타 분석 플랫폼인 Hadoop, Spark등과의 차이가 무엇일까? 앞의 장점을 기반으로 그 차이점을 정리하자면 크게 다음과 같은 3가지를 들 수 있다.

쉽다.

보통 Hadoop이나 Spark등을 사용하게 되면, Map&Reduce(이하 MR) 로직을 사용하거나 SparkSQL을 사용하더라도 일정 수준 이상의 전문성이 필요하다. 또한 MR 로직의 경우 전문성이 있는 개발자가 분석 로직을 개발해야 하기 때문에 시간이 상대적으로 많이 소요되지만 빅쿼리는 로그인 후 SQL만 수행하면 되기 때문에, 상대적으로 빅데이타 분석이 쉽다.

운영이 필요 없다

Hadoop이나 Spark과 같은 빅데이타 솔루션의 경우에는 인스톨과 설정 그리고 클러스터의 유지 보수가 보통 일이 아니다. 그래서 별도의 운영 조직이 필요하고 여기에 많은 리소스가 소요되지만, 빅쿼리는 클라우드 서비스 이기 때문에, 별도의 운영등에 신경을 쓸 필요가 없이 개발과 분석에만 집중하면 된다.  

인프라에 대한 투자없이 막강한 컴퓨팅 자원을 활용

앞의 예에서 본것과 같이, 빅쿼리를 이용하면 수천개의 CPU와 수백/수천개의 컴퓨팅 자원을 사용할 수 있다. 물론 기존 빅데이타 플랫폼도 클라우드 환경에 올리면 수천개의 CPU를 사용하는 것이 가능은 하지만, 그 설정 작업과 비용적인 측면에서 차이가 크다.

빅쿼리 맛보기

그러면 직접 빅쿼리를 사용해보자. 빅쿼리 버전 HelloWorld라고 생각하면 된다.

가입 하기

http://cloud.google.com 으로 들어가서 구글 클라우드 서비스에 가입을 한후에,  로그인을 해서 아래 그림 처럼 결재 메뉴에서 빌링 정보를 입력한다 (신용 카드 정보 입력)



계정이 생성되면 자동으로 $300 의 무료 사용권이 생성되고, 이 금액은 60일동안 사용할 수 있다. (60일이 지나면 자동으로 소멸된다. ).

신용 카드 정보를 넣었더라도, 사용자가 직접 과금이 되는 플랜으로 업그레이드를 하지 않는 이상 과금이 되지 않으니 이 부분은 걱정하지 말기 바란다.

프로젝트 생성

구글 클라우드는 VM이나 각종 자원들을 프로젝트라는 개념으로 묶어서 사용한다. 처음 계정을 생성했으면 프로젝트가 없기 때문에 프로젝트를 생성하자.

아래 그림과 같이 상단 우측 메뉴에 프로젝트 생성 메뉴가 있다.


프로젝트 생성을 선택한 후 아래와 같이 프로젝트 이름을 입력하면 프로젝트가 생성된다.


빅쿼리 콘솔로 이동하기

프로젝트가 생성되었으면 메뉴에서 아래 그림과 같이 BigQuery 메뉴를 선택하게 되면 빅쿼리 웹 콘솔로 이동이 된다.




빅쿼리 메뉴로 들어가면 다음과 같은 작업 창이 나온다.




좌측은 프로젝트와 프로젝트에 속한 데이타셋과 테이블 리스트가 나온다.

나중에 데이타 모델을 다시 설명하겠지만, 데이타 셋 (dataset)은 RDBMS의 db와 같은 개념으로 테이블의 집합이라고 보면 되고, 그 안에 개별 테이블들이 들어가 있다.

우측 상단 쿼리 입력창에는 SQL을 입력해서 쿼리를 실행하고, 우측 아래에는 쿼리 결과를 볼 수 있다.

쿼리 실행

그러면 실제로 간단한 쿼리를 수행해보자

빅쿼리에서는 테스트를 위해서 몇가지 데이타 셋을 공개로 해놓았는데, bigquery-samples라는 데이타 셋에서 1000억개의 레코드를 가지고 있는  wikipedia_benchmark.Wiki100B 테이블에서, 위키 페이지 제목이 “Seoul”또는 “seoul”인 페이지의 제목과 뷰수를 쿼리를 해본다.


다음과 같이 쿼리를 입력하고


select title,sum(views) as views

from [bigquery-samples:wikipedia_benchmark.Wiki100B]

where regexp_match(title,'[Ss]eoul')

group by title

order by views desc;


쿼리 입력창 하단에 체크 마크를 누르면 다음과 같은 화면이 출력된다.


쿼리를 수행하기 전에, 쿼리가 제대로 되었는지 확인을 해주고, 위와 같이

Valid: This query will process 3.64 TB when run.”

3.64 TB를 스캐닝 할것임을 알려준다. (이를 통해서 쿼리 수행 비용을 예측해볼 수 있다.)


“Run Query” 버튼을 눌러서 쿼리를 수행하면 다음과 같은 결과를 얻을 수 있다.


RUN QUERY 버튼 가장 우측에 총 3.64TB를 처리했고, 총 수행 시간은 38.9초가 걸렸음을 확인할 수 있다.

그리고, 아래 쿼리 결과가 나온다.

Seoul 로 된 페이지가 11258720회 조회되었고, Seoul_National_University가 다음으로 894040회, FC_Seoul이 802570회 조회 된것을 확인할 수 있다.


지금까지 간략하게나마 빅쿼리에 대한 소개와 주요 특징 그리고 간단한 사용법을 소개했다.

다음 글에서는 빅쿼리의 내부 아키텍쳐에 대해서 설명하도록 한다.


저작자 표시 비영리
신고