블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기


조대협 (http://bcho.tistory.com)



개요

텐서플로우 학습에 있어서 데이타 포맷은 학습의 성능을 결정 짓는 중요한 요인중의 하나이다. 특히 이미지 파일의 경우 이미지 목록과 이미지 파일이 분리되어 있어서 텐서플로우에서 학습시 이미지 목록을 읽으면서, 거기에 있는 이미지 파일을 매번 읽어야 하기 때문에, 코딩이 다소 지저분해지고,IO 성능이 떨어질 수 있다

텐서플로우에서는 이러한 학습 데이타를 쉽게 읽을 수 있도록 tfrecord (http://bcho.tistory.com/1190)라는 파일 포맷을 지원한다.


이 글에서는 이미지 데이타를 읽어서 tfrecord 로 컨버팅하는 방법을 설명하며, 분산 데이타 처리 프레임웍인 오픈소스 Apache Beam을 기준으로 설명하나, tfrecord 변환 부분은 Apache Beam과 의존성이 없이 사용이 가능하기 때문에, 필요한 부분만 참고해도 된다. 이 Apache Beam을 구글의 Apache Beam 런타임 (매니지드 서비스)인 구글 클라우드의 Dataflow를 이용하여, 클러스터를 이용하여 빠르게 데이타를 처리하는 방법에 대해서 알아보도록 한다.


전체 코드는 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/4.%20Convert%20Pickle%20file%20to%20TFRecord%20by%20using%20Apache%20Beam.ipynb 에 있다.


이 코드는 CIFAR-10 이미지 데이타를 Apache Beam 오픈 소스를 이용하여, 텐서플로우 학습용 데이타 포맷인  tfrecord 형태로 변환 해주는 코드이다.


Apache Beam은 데이타 처리를 위한 프레임웍으로, 구글 클라우드 상에서 실행하거나 또는 개인 PC나 Spark 클러스터상 여러 환경에서 실행이 가능하며, 구글 클라우드 상에서 실행할 경우 오토스케일링이나 그래프 최적화 기능등으로 최적화된 성능을 낼 수 있다.


CIFAR-10 데이타 셋은 32x32 PNG 이미지 60,000개로 구성된 데이타 셋으로 해당 코드 실행시 최적화가 되지 않은 상태에서 약 16분 정도의 처리 시간이 소요된다. 이 중 6분 정도는 Apache Beam 코드를 구글 클라우드로 업로드 하는데 소요되는 시간이고 실제 처리시간은 10분정도가 소요된다. 전처리 과정에 Apache Beam을 사용하기 전에 고려해야 할 요소는 다음과 같다.

  • 데이타가 아주 많아서 전처리 시간이 수시간 이상 소요될 경우 Apache Beam + Google Cloud를 고려하여 여러 머신에서 동시에 처리하여 빠른 시간내에 수행되도록 할 수 있다.

  • 데이타가 그다지 많지 않고 싱글 머신에서 멀티 쓰레드로 처리를 원할 경우에는 Apache Beam으로 멀티 쓰레드 기반의 병렬 처리를 하는 방안을 고려할 수 있다. 이 경우 클라우드에 대한 의존성을 줄일 수 있다.

  • 다른 대안으로는 Spark/Hadoop 등의 오픈소스를 사용하여, On Prem에서 여러 머신을 이용하여 전처리 하는 방안을 고려할 수 있다.

여기서는 아주 많은 대량의 이미지 데이타에 대한 처리를 하는 것을 시나리오로 가정하였다.

전처리 파이프라인

Apache Beam을 이용한 데이타 전처리 파이프라인의 구조는 다음과 같다.

이미지 파일 준비

CIFAR-10 데이타셋 원본은 이미지 파일 형태가 아니라 PICKLE이라는 파일 포맷으로 되어 있기 때문에,  실제 개발 환경에서는 원본데이타가 이미지인것으로 가정하기 위해서 https://github.com/bwcho75/cifar-10/tree/master/pre-processing 의 1~2번 코드를 통해서 Pickle 파일을 이미지 파일로 변경하고, *.csv 파일에 {파일명},{레이블} 형태로 인덱스 데이타를 생성하였다.

생성된 이미지 파일과 *.csv 파일은 gsutil 명령어를 이용하여 Google Cloud Storage (aka GCS)에 업로드 하였다. 업로드 명령은 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/2.%20Convert%20CIFAR-10%20Pickle%20files%20to%20image%20file.ipynb 에 설명되어 있다.


전처리 파이프라인의 구조

Apache Beam으로 구현된 파이프라인의 구조는 다음과 같다.


1. TextIO의 ReadFromText로 CSV 파일에서 한 라인 단위로 문자열을 읽는다.

2. parseLine에서 라인을 ,로 구분하여 filename과 label을 추출한다.

3. readImage 에서 filename을 가지고, 이미지 파일을 읽어서, binary array 형태로 변환한다.

4. TFExampleFromImageDoFn에서 이미지 바이너리와 label을 가지고 TFRecord 데이타형인 TFExample 형태로 변환한다.

5. 마지막으로 TFRecordIOWriter를 통해서 TFExample을 *.tfrecord 파일에 쓴다.

코드 주요 부분 설명

환경 설정 부분

이 코드는 구글 클라우드와 로컬 환경 양쪽에서 모두 실행이 가능하도록 구현되었다.

SRC_DIR_DEV는 로컬환경에서 이미지와 CSV 파일이 위치한 위치이고, DES_DIR_DEV는 로컬환경에서 tfrecord 파일이 써지는 위치이다.

구글 클라우드에서 실행할 경우 파일 저장소를  GCS (Google Cloud Storage)를 사용한다. DES_BUCKET은 GCS 버킷 이름이다. 코드 실행전에 반드시 구글 클라우드 콘솔에서 GCS 버킷을 생성하기 바란다.  SRC_DIR_PRD와 DES_DIR_PRD는 GCS 버킷내의 각각 image,csv 파일의 경로와 tfrecord 파일이 써질 경로 이다. 이 경로에 맞춰서 구글 클라우드 콘솔에서 디렉토리를 먼저 생성해 놓기를 바란다.




PROJECT는 구글 클라우드 프로젝트 명이고, 마지막으로 DEV_MODE가 True이면 로컬에서 수행이되고 False이면 구글 클라우드에서 실행하도록 하는 환경 변수이다.

의존성 설정 부분

로컬에서 실행할 경우필요한  파이썬 라이브러리가 이미 설치되어야 있어야 한다.

만약에 구글 클라우드에서 실행할 경우 이 Apache Beam 코드가 사용하는 파이썬 모듈을 명시적으로 정의해놔야 한다. 클라우드에서 실행시에는 Apache Beam 코드만 업로드가 되기 때문에(의존성 라이브러리를 같이 업로드 하는 방법도 있는데, 이는 추후에 설명한다.), 의존성 라이브는 구글 클라우드에서 Dataflow 실행시 자동으로 설치할 수 있도록 할 수 있는데, 이를 위해서는 requirements.txt 파일에 사용하는 파이썬 모듈들을 정의해줘야 한다. 다음은 requirements.txt에 의존성이 있는 파이썬 모듈등을 정의하고 저장하는 부분이다.


Apache Beam 코드

Apache Beam의 코드 부분은 크게 복잡하지 않기 때문에 주요 부분만 설명하도록 한다.

Service account 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 코드 실행에 대한 권한을 줘야 한다. 구글 클라우드에서는 사용자가 아니라 애플리케이션에 권한을 부여하는 방법이 있는데, Service account라는 것을 사용한다. Service account는 json 파일로 실행 가능한 권한을 정의하고 있다.

Service account 파일을 생성하는 방법은 http://bcho.tistory.com/1166 를 참고하기 바란다.

Service account 파일이 생성되었으면, 이 파일을 적용해야 하는데 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 Service account  파일의 경로를 정의해주면 된다. 파이썬 환경에서 환경 변수를 설정하는 방법은 os.envorin[‘환경변수명']에 환경 변수 값을 지정해주면 된다.

Jobname 설정

구글 클라우드에서 Apache Beam 코드를 실행하면, 하나의 실행이 하나의 Job으로 생성되는데, 이 Job을 구별하기 위해서 Job 마다 ID 를 설정할 수 있다. 아래는 Job ID를 ‘cifar-10’+시간 형태로 지정하는 부분이다


환경 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 몇가지 환경을 지정해줘야 한다.


  • staging_location은 클라우드 상에서 실행시 Apache Beam 코드등이 저장되는 위치이다. GCS 버킷 아래 /staging이라는 디렉토리로 지정했는데, 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • temp_location은 기타 실행중 필요한 파일이 저장되는 위치이다. 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • zone은 dataflow worker가 실행되는 존으로 여기서는 asia-northeast1-c  (일본 리전의 c 존)으로 지정하였다.


DEV_MODE 에 따른 환경 설정

로컬 환경이나 클라우드 환경에서 실행이냐에 따라서 환경 변수 설정이 다소 달라져야 한다.


디렉토리 경로를 바꿔서 지정해야 하고, 중요한것은 RUNNER인데, 로컬에서 실행하기 위해서는 DirectRunner를 구글 클라우드 DataFlow 서비스를 사용하기 위해서는 DataflowRunner를 사용하면 된다.


readImage 부분

Read Image는 이미지 파일을 읽어서 byte[] 로 리턴하는 부분인데, 로컬 환경이냐, 클라우드 환경이냐에 따라서 동작 방식이 다소 다르다.

클라우드 환경에서는 이미지 파일이 GCS에 저장되어 있기 때문에 파이썬의 일반 파일 open 명령등을 사용할 수 없다.

그래서 클라우드 환경에서 동작할 경우에는 GCS에서 파일을 읽어서 Worker의 로컬 디스크에 복사를 해놓고 이미지를 읽어서 byte[]로 변환한 후에, 해당 파일을 지우는 방식을 사용한다.


아래 코드에서 보면 DEV_MODE가 False 인경우 GCS에서 파일을 읽어서 로컬에 저장하는 코드가 있다.


storageClient는 GCS 클라이언트이고 bucket 을 얻어온후, bucket에서 파일을 get_blob 명령어를 이용하여 경로를 저장하여 blob.download_to_file을 이용하여 로컬 파일에 저장하였다.

실행

코드 작성이 끝났으면 실행을 한다. 실행 상태는 구글 클라우드 콘솔의 Dataflow  메뉴에서 확인이 가능하다.

아래와 같이 실행중인 그리고 실행이 끝난 Job 리스트들이 출력된다.




코드 실행중에, 파이프라인 실행 상황 디테일을 Job 을 선택하면 볼 수 있다.


여기서 주목할만한 점은 우측 그래프인데, 우측 그래프는 Worker의 수를 나타낸다. 초기에 1대로 시작했다가 오토 스케일링에 의해서 9대 까지 증가한것을 볼 수 있다.

처음 실행이었기 때문에 적정한 인스턴스수를 몰랐기 때문에 디폴트로 1로 시작하고 오토스케일링을 하도록 했지만, 어느정도 테스트를 한후에 적정 인스턴수를 알면 오토 스케일링을 기다릴 필요없이 디폴트 인스턴스 수를 알면 처음부터 그 수만큼 인스턴스 수로 시작하도록 하면 실행 시간을 줄일 수 있다.

만약에 파이프라인 실행시 에러가 나면 우측 상단에 LOGS 버튼을 누르면 상세 로그를 볼 수 있다.


아래 그림은 파이프라인 실행이 실패한 예에서 STACK TRACES를 통해서 에러 내용을 확인하는 화면이다.



해당 로그를 클릭하면 Stack Driver (구글의 모니터링 툴)의 Error Reporting 시스템 화면으로 이동하게 된다.

여기서 디테일한 로그를 볼 수 있다.

아래 화면을 보면 ReadImage 단계에서 file_path라는 변수명을 찾을 수 없어서 나는 에러를 확인할 수 있다.


TFRecord 파일 검증

파이프라인 실행이 끝나면, GCS 버킷에 tfrecord 파일이 생성된것을 확인할 수 있다.


해당 파일을 클릭하면 다운로드 받을 수 있다.

노트북 아래 코드 부분이 TFRecord를 읽어서 확인하는 부분이다. 노트북에서 tfrecord 파일의 경로를 다운로드 받은 경로로 변경하고 실행을 하면 파일이 제대로 읽히는 지 확인할 수 있다.


파일 경로 부분은 코드상에서 다음과 같다.



정상적으로 실행이 된 경우, 다음과 같이 tfrecord에서 읽은 이미지와 라벨값이 출력됨을 확인할 수 있다.


라벨 값은 Label 줄에 values 부분에 출력된다. 위의 그림에서는 순서대로 라벨 값이 4와 2가 된다.



구글의 IOT 솔루션

클라우드 컴퓨팅 & NoSQL/M2M & IOT | 2017.03.10 10:31 | Posted by 조대협


구글의 IOT 솔루션


조대협 (http://bcho.tistory.com)


오늘 샌프란시스코 구글 NEXT 행사에서 IOT 솔루션에 대한 소개가 있었는데, 내용이 괜찮아서 정리를 해놓는다.



구글의 특징은 안드로이드 플랫폼, 클라우드 , 분석 플랫폼, 개발자 에코 시스템  등 End to End 에 걸쳐서 상당히 다양한 포트폴리오를 가지고 있다는 것이 장점인데 이를 잘 녹여낸 아키텍쳐 구성이다.

디바이스 OS

IOT는 라즈베리파이와 같은 임베디드 디바이스를 사용하는 것이 일반적인데, 이런 임베디드 시스템 운용에 어려운 점중의 하나가 보안이다.

장비에 따라서 보안적인 문제가 없는지 체크를 해야 하고, 주기적으로 기능 및 보안에 대한 업데이트를 해줘야 하는데, 구글의 Android IOT (https://developer.android.com/things/index.html) 플랫폼은 이를 다 자동으로 해준다.


더구나, 기존의 모바일 안드로이드 플랫폼을 기반으로 하기 때문에, 안드로이드 개발자 풀을 그대로 사용할 수 있다는 장점이 있다.

이미 Android IOT 플랫폼은 인텔,라즈베리파이등 여러 디바이스 업체와 협업을 하면서 Certi 작업을 하고 있기 때문에 잘 알려진 플랫폼이라면 보안 테스트나 별도의 기능 테스트 없이 바로 사용이 가능하다.


백앤드

IOT의 백앤드는 구글 클라우드 플랫폼을 이용한다.

  • 디바이스로 부터 수집된 데이타는 Pub/Sub 큐에 저장된후

  • DataFlow 프레임웍을 통해서 배치나 실시간 스트리밍 분석이 되고

  • 분석된 데이타는 빅테이블이나 빅쿼리에 저장된다. 분석이나 리포팅을 위해서는 빅쿼리, 타임 시리즈 데이타나 고속의 데이타 접근을 위해서는 빅테이블이 사용된다.

  • 이렇게 저장된 데이타들은 구글의 머신러닝 프레임웍 텐서플로우의 클라우드 런타임인 CloudML을 사용해서 분석 및 예측 모델을 만들게 된다.



머신러닝을 등에 탑재한  디바이스

구글이 재미있는 점은 텐서플로우라는 머신러닝 프레임웍을 가지고 있다는 것인데, 애초부터 텐서플로우의 디자인은 서버 뿐만 아니라, 클라이언트 그리고 IOT 디바이스에서 동작하게 디자인이 되었다. 그래서 학습된 모델을 디바이스로 전송하여, 디바이스에서 머신러닝을 이용한 예측이 가능하다.

예를 들어 방범용 카메라를 만들었을때, 방문자의 사진을 클라우드로 저장하는 시나리오가 있다고 하자.

그런데 매번 전송을 하면 배터리나 네트워크 패킷 요금이 문제가 될 수 있기 때문에, 텐서 플로우 기반의 얼굴 인식 모델을 탑재하여 등록되지 않은 사용자만 사진을 찍어서 클라우드로 전송하게 하는 등의 시나리오 구현이 가능하다.


파이어 베이스 연동

동영상을 보다가 놀란점 중의 하나는 파이어 베이스가 Android IOT에 연동이 된다.

아래 그림은 온도를 측정해서 팬의 속도를 조정하는 시나리오인데, 우측 하단에 보면 파이어베이스가 위치해 있다.



센서로 부터 온도를 측정한 다음, 디바이스 컨트롤러로 온도 조정 명령을 내리는 것을 파이어베이스 메시징 서비스를 이용하도록 되어 있다.


결론

Android IOT 서비스 하나만 IOT 서비스로 내놓은 것이 아니라 구글 클라우드 플랫폼, 텐서플로우에 파이어베이스까지 구글의 기존의 노하우들을 묶어서 포트폴리오를 만들어 내었고, 더구나 이러한 기술들이 개발자 에코 시스템이 이미 형성이 되어 있는 시스템인 점에서, IOT 개발에 있어서 누구나 쉽게 IOT 서비스를 개발할 수 있게 한다는데, 큰 의미가 있다고 본다.


'클라우드 컴퓨팅 & NoSQL > M2M & IOT' 카테고리의 다른 글

구글의 IOT 솔루션  (0) 2017.03.10
TI의 IOT 개발용 센서 키트  (0) 2016.03.17
MQTT 서버 간단 공부 노트  (2) 2014.02.13

노트7의 소셜 반응을 분석해 보았다. 


#3 제플린 노트북을 이용한 상세 분석



조대협 (http://bcho.tistory.com)



데이타 스튜디오는 편리하게 사용할 수 있지만, 쿼리 사용등이 불가능하기 때문에, 원본 데이타를 이용한 상세 분석에는 어려움이 있다. 원본 데이타를 이용해서 상세 분석을 하려면 노트북 계열의 애플리케이션이 효과적인데, 빅쿼리를 연동할 수 있는 노트북으로는 이전에 소개한 주피터 노트북 기반의 데이타랩 (datalab)과, 스파크나 다른 빅데이타 솔루션과 함께 많이 사용되는 제플린 노트북(zeppelin.apache.org) 이 있다.


지난 글에서 데이타랩에 대한 연동 방법은 이미 소개하였으니, 이번에는 제플린을 통하여, 빅쿼리의 데이타를 분석해보도록 한다.


제플린 설치

제플린을 설치 하는 방법은 간단하다. Zeppelin.apache.org 에서, 설치 파일을 다운로드 받는다.

빅쿼리 연동 인터프리터는 제플린 버전 0.61 버전 이상에 포함되어 있기 때문에, 0.61 버전 이상을 다운로드 받는다.  이 때 모든 인터프리터가 포함된 버전을 다운 받아야 한다. (아니면 별도로 인터프리터를 설치해야 하는 번거로움이 따른다.)


다운 로드 받은 파일의 압축을 푼다. 다음으로 제플린 설치 디렉토리로 들어가서 다음 명령어를 수행한다.

% ./bin/zeppelin.sh

윈도우의 경우에는 %./bin/zeppelin.cmd 를 실행하면 된다.

자바 애플리케이션이기 때문에 별도의 설치 과정이 필요없고, 제플린 애플리케이션을 실행하기만 하면 된다.

제플린이 기동되었으면 브라우져에서 http://localhost:8080 으로 접속하면 다음과 같이 제플린 콘솔을 볼 수 있다.

노트북 생성

제플린 콘솔에 들어왔으면 초기화면에서 Create new note 라는 메뉴를 이용하여 새로운 노트북을 생성하자. 여기서는 편의상 “BQ 노트북" 이라는 이름으로 노트북을 생성하였다.


분석 쿼리 작성

이제 분석할 내용은 수집된 트윗의 명사들에 대해서, 시간 단위로 그룹핑을 한 다음에, 각 단어에 대해서 발생한 횟수를 카운트해서 보여주는 내용을 구현하려고 한다.

예를 들어서 9월20일에는 “유행" 이라는 단어가 200회 발생하였고, “패션" 이라는 단어가 100회 발생하였다. 라는 식으로 조회를 하려고 한다.


현재 테이블 구조는 다음과 같다.

Date (발생 시간)

Noun (명사)

count (발생 횟수)


SQL 문장을 작성해보자

select date,noun,sum(count) from 테이블명

group by date,noun


이렇게 쿼리를 하면, 시간대 별로, 명사와 그 명사의 발생 횟수를 리턴을 해주는데, 우리는 앞의 데이타 플로우 프로그램에서 30초 단위로 통계를 집계하도록 하였기 때문에, 30초 단위로 결과가 리턴된다. 우리가 원하는 결과는 30초 단위가 아니라 1시간 단위의 결과 이기 때문에, 다음과 같이 쿼리를 수정한다.


select  DATE(date) as ddate,HOUR(date) as dhour,noun,sum(count) from 테이블명

group by ddate,dhour,noun


DATE와 HOUR라는 함수를 사용하였는데, DATE는 타임 스탬프 형태의 컬럼에서 날짜만을 추출해주는 함수 이고, HOUR는 타임 스탬프 형태의 컬럼에서 시간만을 추출해주는 함수 이다.

이렇게 날짜와 시간만을 추출한 다음에, group by 절을 이용하여, 날짜와,시간 그리고 명사로 그룹핑을 하게 되면 우리가 원하는 것과 같이 각 날짜의 시간대별로 명사별 발생횟수 ( sum(count)) 값의 통계를 얻을 수 있다.


제플린에서 빅쿼리 명령을 수행하려면 다음과 같이 %bigquery.sql 이라고 첫줄에 선언을 한 다음에 SQL 문장을 수행하면 된다.




결과는 디폴트로 테이블 형태로 나오는데, 아래 아이콘 중에서 그래프 아이콘을 누르면 그래프 형태로 볼 수 가 있는데, 이 때 X,Y축의 변수를 지정할 수 있다.

아래 그림과 같이 Keys (X축을) ddate,dhour를 선택하고 Values(Y축)을 dhour SUM을 선택하면, 시간별 나타난 단어수를 볼 수 있다.



그런데 이 쿼리를 수행하면, 각 시간별로 발생한 명사 단어의 수가 매우 많기 때문에, 보기가 매우 어렵다.

그렇다면 시간대별로 발생한 명사중에서 각 시간대별로 많이 발생한 명사 5개씩만을 볼 수 없을까? 즉 group by를 전체 데이타 구간이 아니라, 각각 시간대 별로 계산을 해줄 수 는 없을까 하는 필요가 발생한다.

빅쿼리 파티셔닝

데이타를 구간 별로 나눠서 연산할 수 있는 기능으로 빅쿼리에는 파티션이라는 기능이 있다.

예를 들어서 group by를 전체 결과에 대해 그룹핑을 하는 것이 아니라, 앞에서 언급한 요건 처럼 일 단위로 짤라서 그룹핑을 하는 것이 가능하다.




파티션을 이용해서 할 수 있는 것은 파티션별로 합계나, 통계를 내거나, 파티션의 각 로우의 값의 백분율(%)나 또는 소팅한 순서등을 볼 수 있다. 여기서는, 시간으로 파티션을 나누고  파티션내에서 명사의 수가 많은 수 순서대로 소팅을 한후에, RANK라는 함수를 이용하여 그 파티션에서 그 명사가 몇번째로 많이 나타났는지를 출력하도록 해보겠다.


파티션의 사용법은 다음과 같다.

“파티션 함수 OVER (PARTITION BY 파티션을할 키 목록)”

여기서는 일/시간 별로 파티션을 나눈 후에, 그 순위별로 소팅을 할 것이기 때문에, 다음과 같은 식을 쓴다.

RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank


이를 쿼리에 적용하면 다음과 같다.

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc


그러면 다음과 같이 일/날짜 파티션별로 많이 발생한 명사 순으로 발생횟수와 순위(rank)를 출력해준다.



그런데 쿼리를 돌려보면 알겠지만, 시간대별로 수집한 명사의 종류가 많기 때문에, 일자별 데이타가 매우 많다. 그래서 파티션별로 많이 등장하는 단어 5개만을 보려고 하면 rank <5 인것만 걸러내면 된다. 이는 중첩 쿼리를 이용해서 수행이 가능하다

다음은 이를 적용한 예이다.


SELECT ddate,dhour

   ,noun

   , rank

from (

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   where noun != "note7" and noun != "samsung" and noun !="galaxy"

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc

   )

where rank < 6


이렇게 하면, 각 시간대별로 자주 등장하는 단어 6개만을 보여준다.


이 쿼리를 이용하여 데이타를 어떻게 분석하는지는 예전글 http://bcho.tistory.com/1136 을 참고하세요.


간단하게나마 트위터 피드에서 특정 키워드를 기반으로 하여, 명사와 형용사를 추출하여 소셜 반응을 분석하는 애플리케이션 개발과 데이타 분석 방법에 대해서 설명하였다.

아이폰7을 분석해보니, 명사 분석도 의미가 있었지만, 아이폰7에 대한 기대를 형용사 분석을 통해서도 많은 인사이트를 얻을 수 있었다. Awesome, excellent와 같은 기대치가 높은 형용사가 많이 검출되었고 bad, fuck 과 같은 부정적인 의미의 형용사는 다소 낮게 검출되었다. (아마 이즈음이 노트7 폭발로 인하여 반사 이익을 얻은게 아닐까 추정되는데.)


이외에도, 이모콘티만 추출하여 분석을 한다거나, 부사등을 통해서 분석을 하는 것도 가능하고, 구글 자연어 처리 API는 글을 통해서 사람의 감정을 분석해주는 기능도 있기 때문에 응용 분야는 훨씬 더 넓다고 볼 수 있다.


노트7의 소셜 반응을 분석해 보았다. 


#2 구현하기


조대협 (http://bcho.tistory.com)

지난번 글 http://bcho.tistory.com/1136에 이어서, 트위터를 통한 소셜 반응을 분석하는 시 스템을 구축하는 방법에 대해서 알아본다. 

시나리오 및 아키텍쳐

스트리밍 처리와 데이타 플로우에 대한 개념 이해가 끝났으면 이제 실제로 실시간 분석 애플리케이션을 만들어보자.

SNS를 이용한 마케팅 분석에서 대표적인 시나리오중 하나는 트위터 피드를 분석하여, 사람들의 반응을 분석하는 시나리오이다. 자주 언급 되는 단어나 형용사를 분석함으로써, 특정 제품이나 서비스에 대한 소셜 네트워크상의 바이럴 반응을 분석할 수 있는데, 여기서  구현하고자 하는 시나리오는 다음과 같다. 트위터 피드에서 특정 키워드로 트윗 문자열들을 수집한 후에, 구글의 자연어 분석 API를 통하여 트윗 문자열에서 명사와 형용사를 추출한다

추출한 명사와 형용사의 발생 횟수를 통계내어서 대쉬보드에 출력하는 시나리오이다.


이를 구현하기 위한 솔루션 아키텍쳐는 다음과 같다.


fluentd를 이용하여 트위터의 특정 키워드를 기반으로 트위터 피드를 수집하고, 수집된 피드들은 구글 클라우드의 큐 서비스인 Pub/Sub으로 전달된다. 전달된 데이타는 데이타 플로우에서 읽어서 필요한 데이타만 필터링한 후, 구글의 자연어 분석 API를 통해서 명사와 형용사를 분리한다.

분리된 명사와 형용사는 데이타플로우에서 30초 주기의 고정윈도우(Fixed Window) 단위로, 명사에서 발생한 단어의 수와, 형용사에서 발생한 단어의 수를 카운트 한 다음에, 빅쿼리에 명사 테이블과 형용사 테이블에 저장한다.

저장된 데이타는 구글의 리포팅 도구인 데이타 스튜디오를 통해서 그래프로 출력한다.


구현

그러면 위에서 설명한 아키텍쳐대로 시스템을 하나씩 구현해보자.

전체 예제 코드와 설정 파일은 https://github.com/bwcho75/googledataflow/tree/master/twitter 에서 받아볼 수 있다.

트위터 피드 수집 서버 설정

먼저 트위터에서 피드를 수집하기 위해서 fluentd 에이전트를 설정한다. 구글 컴퓨트 엔진에서 VM을 생성한 후에, 앞의 빅쿼리 예제에서 한것과 마찬가지로 fluentd 에이전트를 설치한다.

VM을 설치할때, 반드시 Cloud API access scopes를 full API access로 설정해야 하는데, 이 VM에서 fluentd를 통해서 수집한 피드를 Pub/Sub으로 전달할때, Pub/Sub API를 사용하기 때문이다.


Fluentd 가 설치되었으면 Pub/Sub으로 데이타를 전달하기 때문에,Fluentd pub/sub 에이전트를 추가설치 한다.

에이전트명은 “fluent-plugin-gcloud-pubsub”로

% sudo td-agent-gem install fluent-plugin-gcloud-pubsub

명령을 이용해서 설치한다.


에이전트 설치가 끝났으면 fluentd 에이전트 설정을 해야 한다.

다음은 트위터에서 “note7”에 관련된 피드를 읽어서 pub/sub 큐로 피드를 전송하는 fluentd 설정 예제이다.


<source>

 type twitter

 consumer_key        트위터 Consumer Key

 consumer_secret     트위터 Consumer Secrect

 oauth_token         트위터 Access Token

 oauth_token_secret  트위터 Access Token Secrect

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             note7

 output_format       nest                    # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type gcloud_pubsub

 project 본인의 프로젝트명

 topic projects/본인의 프로젝트명/topics/twitter

 key 다운로드받은 구글 클라우드 억세스 토큰 JSON 파일

 flush_interval 10

 autocreate_topic false

</match>


Fluentd 설정이 끝났다.

Pub/Sub 큐 설정

다음으로는 fluentd 읽어드린 트위터 피드를 받아드를 Pub/Sub 큐를 생성한다.

큐 생성 방법에 대해서는 앞의 Pub/Sub 챕터를 참고하기 바란다. (http://bcho.tistory.com/1120)

큐 이름은 twitter라고 한다. 전체 큐 이름은 “projects/본인 프로젝트명/twitter” 가 된다.

데이타 플로우 프로젝트 생성

큐까지 데이타를 읽어드렸으면, 이 데이타를 처리할 데이타 플로우 파이프라인을 구현한다.

이클립스에서 데이타 플로우 파이프라인 프로젝트를 생성하자. 프로젝트 생성은 앞장의 “데이타 플로우 개발환경 설정" 부분을 참고하기 바란다. (http://bcho.tistory.com/1128)


프로젝트가 생성되었으면, 이 프로젝트에서 사용할 의존성 라이브러리들을 메이븐 (maven) 빌드 스크립트인 pom.xml에 추가해준다.

추가해야 하는 API는 JSON 파싱을 위한 javax.json-api와, javax.json 그리고 구글의 자연서 분석 API를 호출하기 위한 google-api-client와 google-api-service-language 모듈이다.


다음 코드 블럭을 <dependencies> 엘리먼트 아래 하부 엘리먼트로 추가해준다


   <dependency>

   <groupId>javax.json</groupId>

   <artifactId>javax.json-api</artifactId>

   <scope>provided</scope>

   <version>1.0</version>

</dependency>

<dependency>

   <groupId>org.glassfish</groupId>

   <artifactId>javax.json</artifactId>

   <version>1.0.4</version>

</dependency>

<!-- NL API dependency -->

<dependency>

     <groupId>com.google.apis</groupId>

     <artifactId>google-api-services-language</artifactId>

     <version>v1beta1-rev7-1.22.0</version>

   </dependency>

   <dependency>

     <groupId>com.google.api-client</groupId>

     <artifactId>google-api-client</artifactId>

     <version>1.22.0</version>

   </dependency>


데이타 플로우 코드 작성

전체 파이프라인 흐름

파이프라인 코드 작성에 앞서서 전체 파이프라인 흐름을 살펴보자

전체 흐름은 다음과 같다.


  1. Read From PubSub
    PubSub의 “twitter” 큐에서 JSON 형태의 트위터 메세지를 읽는다.

  2. Parse Twitter
    트위터 JSON 메세지를 파싱한 후, 전체 메세지에서 트윗 메세지를 저장하고 있는 “text” 필드와 언어셋을 정의하고 있는 “lang” 필드만 추출한다.
    자연어 분석 API가 아직 영어, 스페인어, 일본어만 지원하기 때문에, 이 예제에서는 영어로 트윗만 추출하도록 한다.

  3. NL Processing
    앞에서 추출한 트윗 메세지를 구글의 자연어 분석 API에 분석을 요청하여 명사와 형용사만 추출해낸다.

  4. 명사 처리 파이프라인
    다양한 처리 방식을 보여주기 위해서, 이 예제에서는 하나의 데이타 스트림을 분기 처리하여 두개의 데이타 파이프라인에서 처리하는 방식으로 구현하였다. 명사 처리 파이프라인은 다음과 같은 단계를 거친다.

    1. Noun Filter
      명사와 형용사 리스트로 들어온 데이타 중에서 명사만 필터링 한다.

    2. Window 적용
      고정 크기 윈도우 (Fixed Window) 30초를 적용하여, 30초 단위로 데이타를 분석하도록 한다.

    3. Count.PerElement
      명사 단어와, 각 단어별 발생횟 수를 30초 단위로 모아서 카운트 한다.

    4. Noun Formating
      카운트된 결과를 빅쿼리에 쓰도록, [윈도우 시작 시간,명사 단어, 발생횟수] 형태의 빅쿼리 ROW(행) 데이타 타입으로 포매팅 한다.

    5. Write Noun Count to BQ
      포매팅 된 데이타를 빅쿼리에 쓴다.

  5. 형용사 처리 파이프라인
    형용사를 처리하는 파이프라인도 내용은 명사를 처리한 파이프라인과 다르지 않고 동일하게 다음과 같은 순서를 따른다.

    1. Adj Filter

    2. Window 적용

    3. Count.PerElement

    4. Adj Formating

    5. Write Adj Count to BQ

빅쿼리 데이타 구조

빅쿼리에는 두개의 테이블에 데이타를 나눠서 저장하였다.

명사와 형용사 테이블로 각각의 테이블 명과 구조는 다음과 같다.


명사 테이블 : noun

필드명

데이타 타입

date

TIMESTAMP

noun

STRING

count

INTEGER


형용사 테이블 : adj

필드명

데이타 타입

date

TIMESTAMP

adj

STRING

count

INTEGER

자연어 분석 클래스 작성

전체 데이타 흐름과 저장 구조가 이해되었으면, 파이프라인 코드 작성에 앞서서 자연어 처리 API를 호출하는 로직을 만들어보자


우리가 사용할 API는 String으로 문자열을 주면 다음과 같이 NLAnalyzeVO 객체로 분석 결과를 리턴해주는 코드이다.


package com.terry.nl;


import java.util.ArrayList;

import java.util.List;


public class NLAnalyzeVO {

List<String> nouns = new ArrayList<String>();

List<String> adjs = new ArrayList<String>();

List<String> emoticons = new ArrayList<String>();

float sentimental;


public List<String> getNouns() {

return nouns;

}


public List<String> getAdjs() {

return adjs;

}


public List<String> getEmoticons() {

return emoticons;

}


public float getSentimental() {

return sentimental;

}


public void setSentimental(float sentimental) {

this.sentimental = sentimental;

}

public void addNouns(String n){

nouns.add(n);

}

public void addAdj(String a){

adjs.add(a);

}

public void addEmoticons(String e){

emoticons.add(e);

}

}

<NLAnalyzeVO.java>


분석 결과로는 List<String> 타입으로 명사들의 목록을 nouns 로, 형용사들의 목록을 adj로 리턴해준다. float형으로 sentimental 이라는 필드에는 입력된 문장의 감정도를 리턴하도록 되어 있다. 음수값일 때는 부정적, 양수값일 경우에는 긍정을 의미한다.

VO안에는 List<String> emoticons 라는 필드가 있는데, 이는 트위터 메세지 내의 이모티콘을 추출하여 저장하기 위한 필드인데, 이 예제에서는 사용하지 않으니 신경 쓰지 않아도 된다.


package com.terry.nl;


import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;

import com.google.api.client.http.HttpRequest;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.json.JsonFactory;

import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI.Documents.AnnotateText;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPIScopes;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesResponse;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentResponse;

import com.google.api.services.language.v1beta1.model.AnnotateTextRequest;

import com.google.api.services.language.v1beta1.model.AnnotateTextResponse;

import com.google.api.services.language.v1beta1.model.Document;

import com.google.api.services.language.v1beta1.model.Entity;

import com.google.api.services.language.v1beta1.model.Features;

import com.google.api.services.language.v1beta1.model.Sentiment;

import com.google.api.services.language.v1beta1.model.Token;


import java.io.IOException;

import java.io.PrintStream;

import java.security.GeneralSecurityException;

import java.util.List;

import java.util.Map;


/**

*

* Google Cloud NL API wrapper

*/



@SuppressWarnings("serial")

public class NLAnalyze {


public static NLAnalyze getInstance() throws IOException,GeneralSecurityException {


return new NLAnalyze(getLanguageService());

}


public NLAnalyzeVO analyze(String text) throws IOException, GeneralSecurityException{

Sentiment  s = analyzeSentiment(text);

List <Token> tokens = analyzeSyntax(text);

NLAnalyzeVO vo = new NLAnalyzeVO();


for(Token token:tokens){

String tag = token.getPartOfSpeech().getTag();

String word = token.getText().getContent();


if(tag.equals("NOUN")) vo.addNouns(word);

else if(tag.equals("ADJ")) vo.addAdj(word);

}


vo.setSentimental(s.getPolarity());


return vo;

}



/**

* Be sure to specify the name of your application. If the application name is {@code null} or

* blank, the application will log a warning. Suggested format is "MyCompany-ProductName/1.0".

*/

private static final String APPLICATION_NAME = "Google-LanguagAPISample/1.0";


/**

* Connects to the Natural Language API using Application Default Credentials.

*/

public static CloudNaturalLanguageAPI getLanguageService()

throws IOException, GeneralSecurityException {

GoogleCredential credential =

GoogleCredential.getApplicationDefault().createScoped(CloudNaturalLanguageAPIScopes.all());

JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

return new CloudNaturalLanguageAPI.Builder(

GoogleNetHttpTransport.newTrustedTransport(),

jsonFactory, new HttpRequestInitializer() {

@Override

public void initialize(HttpRequest request) throws IOException {

credential.initialize(request);

}

})

.setApplicationName(APPLICATION_NAME)

.build();

}


private final CloudNaturalLanguageAPI languageApi;


/**

* Constructs a {@link Analyze} which connects to the Cloud Natural Language API.

*/

public NLAnalyze(CloudNaturalLanguageAPI languageApi) {

this.languageApi = languageApi;

}


public List<Token> analyzeSyntax(String text) throws IOException{

AnnotateTextRequest request =

new AnnotateTextRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"))

.setFeatures(new Features().setExtractSyntax(true))

.setEncodingType("UTF16");

AnnotateText analyze =

languageApi.documents().annotateText(request);


AnnotateTextResponse response = analyze.execute();


return response.getTokens();


}

/**

* Gets {@link Sentiment} from the string {@code text}.

*/

public Sentiment analyzeSentiment(String text) throws IOException {

AnalyzeSentimentRequest request =

new AnalyzeSentimentRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"));

CloudNaturalLanguageAPI.Documents.AnalyzeSentiment analyze =

languageApi.documents().analyzeSentiment(request);


AnalyzeSentimentResponse response = analyze.execute();

return response.getDocumentSentiment();

}


}


<NLAnalyze.java>


코드 상의 주요 부분을 살펴보자

public NLAnalyzeVO analyze(String text)

메서느가 주요 메서드로, 트윗 문자열을 text 인자로 넘겨주면 분석 결과를 NLAnalyzeVO로 리턴한다.

이 메서드 안에서는 두개의 메서드를 호출하는데, analyzeSentiment(text) 와, analyzeSyntax(text)

를 두개 호출한다.

analyzeSentiment(text) 메서드는 text 를 넣으면 float 타입으로 감정도인 Sentinetal 지수를 리턴한다.

analyzeSyntax(text)는 구문을 분석하여, 명사,형용사,접속사,조사 등과 단어간의 의존 관계등을 분석해서 리턴해주는데, Token 이라는 데이타 타입의 리스트 형태로 다음과 같이 리턴한다.

List <Token> tokens = analyzeSyntax(text);


여기서 단어의 형(명사,형용사)는 token에서 tag 라는 필드를 통해서 리턴되는데, 우리가 필요한것은 명사와 형용사만 필요하기 때문에, tag가 NOUN (명사)와 ADJ (형용사)로 된 단어만 추출해서 NLAnalyzeVO 객체에 넣어서 리턴한다. (태그의 종류는 https://cloud.google.com/natural-language/reference/rest/v1beta1/documents/annotateText#Tag ) 를 참고하기 바란다.


중요

이 코드를 이용해서 구글 클라우드의 자연어 분석 API를 호출할때 그러면 API 인증은 어떻게 할까? 보통 구글 클라우드 콘솔에서 다운 받는 서비스 어카운트 키 (Service Account Key) JSON 파일을 사용하는데, 구글 자연어 분석 API를 호출하기 위해서도 서비스 어카운트 키가 필요하다.

이 키를 콘솔에서 다운로드 받은 후에, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 서비스 어카운트 키의 경로를 지정해주면 된다.


예) export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your-project-credentials.json


자연어 분석 클래스를 다 만들었으면 테스트 코드를 만들어서 테스트를 해보자.

다음은 JUnit 4.X를 이용한 간단한 테스트 코드 이다.


package com.terry.nl.test;


import static org.junit.Assert.*;


import java.io.IOException;

import java.security.GeneralSecurityException;

import java.util.List;


import org.junit.Test;


import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


public class NLAnalyzeTest {


@Test

public void test() {

try {

NLAnalyze instance = NLAnalyze.getInstance();

String text="Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'";

NLAnalyzeVO vo = instance.analyze(text);

List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();

System.out.println("### NOUNS");

for(String noun:nouns){

System.out.println(noun);

}

System.out.println("### ADJS");

for(String adj:adjs){

System.out.println(adj);

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("API call error");

} catch (GeneralSecurityException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("Security exception");

}

}


}


"Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'" 문자열을 분석하여,  명사와 형용사를 추출하여 다음과 같이 결과를 출력해준다.

### NOUNS

Larry

Page

Google

co-founder

search

engine

something

### ADJS

perfect

파이프라인 코드 작성

이제 메인 파이프라인 개발을 위한 준비가 다 되었다. 이제 TwitterPipeline 이라는 이름으로 파이프라인을 구현해보자. 전체 코드는 다음과 같다.

package com.terry.dataflow;


import java.io.IOException;

import java.io.StringReader;

import java.security.GeneralSecurityException;

import java.util.ArrayList;

import java.util.List;


import javax.json.Json;

import javax.json.JsonObject;

import javax.json.JsonReader;


import org.joda.time.DateTime;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.joda.time.format.DateTimeFormat;

import org.joda.time.format.DateTimeFormatter;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.io.BigQueryIO;

import com.google.cloud.dataflow.sdk.io.PubsubIO;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;

import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;

import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;

import com.google.cloud.dataflow.sdk.transforms.windowing.Window;

import com.google.cloud.dataflow.sdk.values.KV;

import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


import com.google.cloud.dataflow.sdk.values.PCollection;


/**

* A starter example for writing Google Cloud Dataflow programs.

*

* <p>The example takes two strings, converts them to their upper-case

* representation and logs them.

*

* <p>To run this starter example locally using DirectPipelineRunner, just

* execute it without any additional parameters from your favorite development

* environment.

*

* <p>To run this starter example using managed resource in Google Cloud

* Platform, you should specify the following command-line options:

*   --project=<YOUR_PROJECT_ID>

*   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>

*   --runner=BlockingDataflowPipelineRunner

*/

public class TwitterPipeline {

private static final Logger LOG = LoggerFactory.getLogger(TwitterPipeline.class);

private static final String NOWN_TABLE=

"useful-hour-138023:twitter.noun";

private static final String ADJ_TABLE=

"useful-hour-138023:twitter.adj";


// Read Twitter feed as a JSON format

// extract twitt feed string and pass into next pipeline

static class ParseTwitterFeedDoFn extends DoFn<String,String>{


private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}


// Parse Twitter string into

// - list of nouns

// - list of adj

// - list of emoticon


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{ /**

*

*/

private static final long serialVersionUID = 3013780586389810713L;


// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}



static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}

}

}


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}


}


static class AddTimeStampAdj extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("adj", key)

.set("count", value);


c.output(row);

}


}

static class AdjFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("ADJ")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

c.output(value);

}

}

}


static class Echo extends DoFn<KV<String,Iterable<String>>,Void>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

}

}


}

public static void main(String[] args) {

Pipeline p = Pipeline.create(

PipelineOptionsFactory.fromArgs(args).withValidation().create());


@SuppressWarnings("unchecked")

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));



// Noun handling sub-pipeline

List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


// Adj handling sub-pipeline

fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("adj").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Adj Formating").of(new AddTimeStampAdj()) )

.apply(BigQueryIO.Write

.named("Write Adj Count to BQ")

.to( ADJ_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));





p.run();

}


}

<TwitterPipeline.java>

코드를 하나씩 분석해보자.

먼저 main함수 부분을 보자

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));

<TwitterPipeline.java에서 main() 함수 일부 >


파이프 라인이 시작되면, PubSubIO를 이용하여 “projects/useful-hour-138023/topics/twitter” 이름의 큐에서 데이타를 읽는다. 읽은 데이타는 ParseTwitterFeedDoFn() 라는 함수에서 파싱이 된다.

ParseTwitterFeedDoFn() 은 다음과 같다.


static class ParseTwitterFeedDoFn extends DoFn<String,String>{

private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}

<TwitterPipeline.java 에서 ParseTwitterFeedDoFn 클래스 구현부>


PubSub에서 읽어드린 데이타는 문자열로 안에 JSON 데이타를 가지고 있다. 이 JSON 문자열을 파싱해서 “text”와 “lang” 엘리먼트만 추출한 후에, “lang”이 “en”(영어) 인 경우에만 다음 파이프라인으로 “text”에서 추출한 문자열을 보내고, 영어가 아닌 경우에는 데이타를 무시한다.


다음은 NLAnalyticsDoFn에서 트윗 문자열을 받아서 자연어 분석을 한다.


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{

// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}

<TwitterPipeline.java 에서 NLAnalyticsDoFn 클래스 구현부>


앞에서 작성한 자연어 분석 클래스인 NLAnalyze 클래스를 이용하여 text를 넘기고, 리턴 값으로 NLAnalyzeVO를 리턴 값으로 받은 후, 명사는 KV<String,Iterable<String>> 타입으로 다음과 같을 저장해서 c.output을 이용해서 다음 파이프라인으로 넘기고

“NOUN”

명사1,명사2,명사3,...


마찬가지 방법으로 형용사도 같은 데이타 형인 KV<String,Iterable<String>> 타입으로 저장하여 다음 파이프라인으로 넘긴다.


이 데이타를 각각 명사와 형용사 두개의 처리 파이프라인으로 전달하는데, 두개의 파이프라인으로 단일 데이타를 보내는 방법은 다음과 같다.


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

: (중략)


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

: (중략)

nlprocessed는 PCollection 타입으로, NLAnalyticsDoFn에 의해서 처리된 결과이다.

이 결과 값에 두 개의 각각 다른 트랜스폼 (NounFilter와 AdjFilter)를 적용하였다.

이렇게 하나의 PCollection 값에 두 개의 트랜스폼을 각각 적용하면 적용된 각각의 파이프라인은 다른 파이프라인으로 아래 그림 처럼 분기 처리가 된다.


자아 그러면, 명사 처리 파이프라인 흐름을 따라가 보자

nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

< TwitterPipeline.java의 main() 함수 일부>


첫번째 NounFilter에서는 앞에 파이프라인에서 들어온 명사와 형용사중에서 명사만 필터링 해서 다음 파이프라인으로 전달한다.


static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}


}

}

<TwitterPipeline.java 에서 NounFilter 클래스 구현부>

명사 인지 형용사 인지는 앞에서 넘어오는 데이타 형이 KV<String, .. > 인데, 키 부분의 값이 “NOUN” 일 경우에 명사이기 때문에, 이 값이 아니면 무시한다. 명사인경우에도 종종 쓰레기 값이 들어오는데, 예를 들어 트위터 특성상 해쉬 태그등을 위해서 “#”이 사용되고, 링크를 위해서 “http…” 링크가 들어가기도 하는데 이는 명사가 아니기 때문에 이 내용은 모두 필터링해서 무시한다.


이렇게 정재된 데이타는 파이프라인의 다음 단계인 .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))) 를 통해서 30초 단위의 고정 윈도우가 적용되고, 다음  .apply(Count.<String>perElement()) 을 통해서 단어별로 그룹핑되서 카운트 되고 그 결과는 앞서 적용한 30초 윈도우 시간 단위로 다음 파이프 라인으로 전달된다.  전달되는 데이타의 모양은 대략 다음과 같다.

Key (String)

Value (Long)

airplane

100

boy

29

india

92


이렇게 전달된 데이타는 빅쿼리에 저장하기 위해서 빅쿼리의 ROW 데이타 타입은 TableRow로 변환한다.

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


<TwitterPipeline.java 에서 main() 함수중 >

AddTimeStampNoun()에서 이 작업을 수행하는데, 이 함수는 윈도우의 시간을 추출하여 data라는 필드에 추가해준다.  아래는 AddTimeStampNoun()  함수의 코드이다.


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}

}

<TwitterPipeline.java 에서 AddTimeStampNoun 클래스 구현부>


여기서 주의할점은 윈도우에 대한 데이타를 접근하기 위해서는 com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implementation 해야 한다.  그후에, 현재 윈도우에 대한 정보는 ProcessContext c 변수에서 c.window() 함수를 이용하면 윈도우의 정보를 읽어올 수 있다. 이 코드에서는 윈도우 시작 시간을 IntervalWindow w의 w.start() 를 통해서 읽어왔고, 이를 빅쿼리의 TIMESTAMP  데이타 타입으로 넣기 위해서 “yyyy-MM-dd HH:mm:ss” 형태로 포매팅을 한후 TableRow라는 빅쿼리의 row형 데이타 타입으로 생성한 후, 다음 파이프라인으로 넘겼다.


다음 파이프라인은 BigQueryIO로 Write 명령을 이용해서 NOWN_TABLE 에 (String 값은 “noun”) 데이타를 쓰도록 하였고, 쓰기 모드는 붙여쓰기 WRITE_APPEND로 하고, 테이블은 없으면 생성하도록 CREATE_IF_NEEDED로 지정하였다. 이때 테이블의 스키마를 정의해줘야 하는데, 테이블 스키마는 withSchema(schema) 함수로 지정을 했는데, 스키마를 정의한  schema 변수는 다음과 같이 정의 되어 있다.


List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);

<TwitterPipeline.java 에서 main() 중의 “noun” 테이블 스키마 정의 부분>


같은 방식으로 형용사를 처리하는 파이프라인도 정의를 한다음 정의가 끝났으면

p.run();

을 이용하여 파이프라인이 실행되도록 한다.

실행하기

모든 코드 구현이 끝났다. 이제, 파이프라인을 기동해보자

이클립스에서 파이프라인을 구동하는데, Run Configuration 부분을 아래와 같이 설정한다.


Runner를  DataflowPipelineRunner를 선택한다. BlockingPipeRunner의 경우에는 파이프라인이 기동되는 동안 이클립스에 프로그램이 실행중인것으로 되서, 이클립스에서 파이프라인을 멈춰버리면 전체 파이프라인이 멈추기 때문에 적절하지 않다.

다음 Argument 탭에서 아래와 같이 Program Argument에  --streaming 옵션을 추가한다.


데이타 플로우는 배치 및 스트리밍 모드 두개가 있는데, 이 예제는 스트리밍 예제이기 때문에, --streaming을 명시적으로 지정한다.

구글 자연어 분석 API에 대한 인증을 위해서 서비스 어카운트 키 (JSON 파일의 경로)를 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 설정해야 하는데, Environment 탭에서 New를 누른 후, GOOGLE_APPLICATION_CREDENTIALS를 Name으로 하고 Value 부분에 서비스 어카운트 키 파일의 경로를 적어준다.



환경 설정이 끝났으면 아래 Run 버튼을 눌러서 파이프라인을 기동시킨다.

파이프라인을 기동 시키면 구글 클라우드로 소스를 배포하고 인스턴스를 구동하는데 까지 수분이 걸리기 때문에 잠시 기다린다.

기다리는 동안 배포 상태를 보기 위해서, 구글 클라우드 콘솔로 들어가면 아래와 같이 Status가 Running으로 바뀔때 까지 기다린다.



Running으로 바뀌고 나서도 1~2분 정도 준비가 필요하기 때문에 기다렸다가 해당 JOB을 확인해보면 다음과 같이 잡이 정상적으로 기동 되고 있음을 확인할 수 있다.



작업이 실행되었으면 이 파이프라인에 데이타를 넣어주기 위한 Fluentd 에이전트를 실행해보자. Fluentd를 설치한 VM에 들어가서,

%sudo /etc/init.d/td-agent restart

명령을 이용해서 fluentd 에이전트를 가동한다.


데이타가 들어오기 시작하면 다시 구글 클라우드 콘솔의 데이타 플로우 화면을 보면 (위의 그림)

상단에 LOGS 라는 버튼을 볼 수 있는데



이 버튼을 누르면 죄측 하단에 다음과 같이 Job Logs라는 윈도우가 나타난다.



여기서 오른쪽의 “WORKER LOGS” 라는 버튼을 누르면 이 파이프라인의 전체 로그를 볼 수 있는데, 에러가 없는지를 잘 확인 한다.



별도의 에러가 없다면 정상적으로 데이타가 수집된다고 할 수 있다.

그러면 데이타가 제대로 수집되는지를 확인해보자

빅쿼리 콘솔로 들어가서 select count(*) from [noun 테이블명] LIMIT 1000

을 수행해서 데이타가 제대로 들어오는지 확인해보자


위의 그림과 같이 f0_가 0 이상이면 데이타가 쌓이고 있다고 생각해도 된다.

데이타 시각화와 분석

데이타 스튜디오(Google datastudio) 를 이용한 데이타 분석

쌓여 있는 데이타를 실제로 분석해보자. 리포트를 이용해서 시각화를 할 예정인데, 여기서 사용한 리포팅 도구는 구글 데이타스튜디오 라는 리포트 도구이다. (http://datastudio.google.com) 으로 9월 현재는 미국 지역만을 대상으로 서비스가 되고 있고, 곧 한국에 서비스가 오픈될 예정이다.


우리가 만들려고 하는 리포트는 다음과 같은 모양을 갖는다

전체 기간동안 가장 많이 발생한 명사 10개와 그 발생 회수를 표로 출력해주고, 우측에는 전체기간이 아닌 일자별로 많이 발생한 명사 10개에 대한 발생 회수 및 그 변화 추이를 출력해준다.

다음 행에는 형용사에 대한 분석 결과를 출력해준다.



새로운 리포트 생성

데이타 스튜디오 메인 화면에 들어오면 작성한 리포트 목록들이 아래와 같이 출력된다.


여기서 + 버튼을 누르면 아래와 같이 새로운 리포트를 생성할 수 있다.


새로운 리포트 화면에 들어오면 우측 하단에 “CREATE NEW DATA SOURCE”라는 버튼이 나타나는데, 이를 통해서 빅쿼리 테이블을 불러올 수 있다. “CREATE NEW DATA SOURCE” 버튼을 눌러보자


데이타 소스 생성해서 빅쿼리 테이블을 불러와야하는데, 데이타 스튜디오는 아래 그림에서와 같이 빅쿼리 뿐만 아니라 구글의 MySQL 서비스인 CloudSQL에서 부터 일반 MySQL 까지 연결이 가능하기 때문에, 빅쿼리 뿐 아니라 일반 데이타 분석에서 분석된 데이타르르 MySQL을 통해서 리포트로 시각화할 수 있고,

Google Sheet에 있는 데이타를 불러와서 같이 표현할 수 있는 기능을 제공하는데, 이는 특히 비지니스나 영업쪽에서 작성한 Sheet의 데이타를 실시간으로 읽어다가 하나의 리포트에 표현할 수 있기 때문에 매우 유용하게 사용할 수 있다.

아울러 YouTube나 Google Analytics 그리고, Adwords 광고 플랫폼등 다양한 구글 플랫폼의 데이타를 읽어서 시각화할 수 있다.


연동 소스 중에서 빅쿼리를 선택한 다음 프로젝트와 데이타셋 그리고 연동하고자 하는 테이블을 선택한다. 여기서는 noun 테이블을 선택하였다. 그러면 다음과 같이 테이블 스키마가 나오고 ADD TO REPORT 버튼이 나온다.


ADD TO REPORT를 눌러서 리포트에 추가하자

다음 리포트 화면에서 다음과 같이 Table 버튼을 눌러서 테이블을 추가하자


테이블을 추가하면 우측에 테이블에 출력하고자 하는 데이타를 선택할 수 있다.


우측에 Data source는 아까 불러들인 “noun”테이블을 선택하고, Dimension은 noun을 선택하고, Metric은 count를 추가하면 명사(noun)별, 발생횟수 (count)를 출력해준다.

위의 표를 보면 note7 등의 단어가 나오는데, 당연히 note7에 대해 검색했기 때문에  note7 단어가 많이 나오겠지만 이는 분석에서 얻고자 하는 데이타가 아니기 때문에 이 note7 등의 불필요한 문자열은 필터링해서 없애버리도록 한다.

필터는 우측 하단에 “Fiter”라는 메뉴에서 추가가 가능한데


메뉴에서 “+Add a filter”를 선택한후


Exclude (제외한다) 라는 버튼을 선택한 후에, Dimension을 noun으로 Match type을 Equal to 로 Expression을 note7 으로 선택하면 noun 필드에서 값이 note7인 내용은 제외 하도록 하는 필터이다. 필터가 적용되면 note7 단어는 필터링되서 출력되지 않는다.


다음으로 꺽은선 그래프를 추가하기 위해서 화면에서 Time series 버튼으로 꺽은선 그래프를 추가한다.



꺽은선 그래프가 추가되면 그래프에 출력될 데이타를 Time series Properties에서 다음과 같이 설정한다.


Data source는 noun 테이블로 하고, Dimention을 Time Dimention은 date로 하고, 여러 필드를 같이 분석하기 위해서 Breakdown Dimension에 noun을 추가하면 하나의 명사가 아니라 주요 명사들을 출력해준다.

그리고 Metric은 count로 선택하면 각 명사별 카운트수를 일자별로 볼 수 있다.


같은 방법으로 형용사에 대한 그래프도 추가한다.


그래프가 완성된 후에, 데이타를 수집 및 분석해보니 꽤나 의미가 있는 분석 결과를 얻을 수 있었다.


다음글에서는 오픈소스 데이타 분석 도구인 제플린을 이용하여 상세 데이타 분석을 하는 방법에 대해서 알아보기로 한다.

이글에서 소개한 데이타 스튜디오는 아직 한국에서는 서비스가 제공되지 않기 때문에, 한국에서 사용하고자 하는 사람들에게는 다음글의 제플린 기반의 데이타 분석이 훨씬 더 유용하리라 생각된다.






스트리밍 분석 플랫폼인 Apache Beam (Dataflow)를 공부하다 보니, 예제가 필요해서 지난번에는 힐러리와 트럼프 후보가 언급된 피드를 읽어서, 구글의 자연어 분석 API를 통해서 긍정/부정 여부를 분석한 후, 빅쿼리에 넣어서, 파이썬 노트로 그래프로 표현해봤는데, 아무래도 자연어 분석 API의 정확도가 아직 떨어지는 건지, 대부분 부정으로 나오고, 분석 결과도 재미가 없다.


그래서 새로운 분석 예제를 고민 하다가, 다음 방향으로 정했다.



  1. 지난번과 마찬가지로 데이타 수집은 트위터에서 특정 키워드를 fluentd로 수집한다.
  2. 수집한 데이타는 Pub/sub에 저장한다.
  3. Pub/sub에 데이타 플로우 파이프라인을 연결한다.
    1. 데이타 플로우 파이프라인에서 데이타를 읽는다.
    2. 읽어온 데이타중 10%만 샘플링 한다. (아무래도 예제이다 보니)
    3. 트윗 데이타중, 트윗 문자열만 발라낸다. 
    4. 트윗 문자열중 RT가 아닌 문자열만 추출한다.
    5. 구글 자연어 처리 API를 호출한다.
    6. FIXED 윈도우 처리를 한다. (5초 주기)
    7. 자연어 처리 API에서 리턴된 결과로 검출된 명사와,그 명사의 발생횟수, 형용사와 각 형용사의 발생횟수, 이모티콘과 이모티콘의 발생횟수, 그리고 타임 윈도우등 트윗의 감정분석 값의 평균을 계산한다.
  4. 계산 결과를 빅쿼리에 저장한다.
  5. 빅쿼리에 저장된 결과를 제플린에서 읽어서 일별 발생한 명사와 명사의 횟수, 형용사와 형용사의 횟수, 이모티콘과 이모티코의 횟수를 탑 랭킹 8개씩 출력한다.
지난번 감성 분석보다, 같이 언급되는 형용사나 명사의 수를 카운트하면 재미있는 결과를 얻을 수 있지 않을까? 구현 목표는 6시간이내. (월요일??) 



데이타 플로우 개발환경 설정하기


조대협 (http://bcho.tistory.com)


데이타 플로우에 대한 이해가 끝났으면 이제 직접 코딩을 해보자. 데이타 플로우에 대한 개념등은 http://bcho.tistory.com/search/dataflow 를 참고하기 바란다.

데이타 플로우에서 지원하는 프로그래밍 언어는 자바와 파이썬이다. 파이썬은 아직 알파버전으로, 이 글에서는 자바를 이용해서 설명한다.


자바를 이용한 개발환경 설정은 이클립스 개발환경과 maven을 이용한 개발 환경 두가지가 있는데, 여기서는 조금 더 손 쉬운 이클립스 환경을 기준으로 설명한다.

메이븐 기반의 개발 환경 설정은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven 를 참고하기 바란다.


사전준비

클라우드 계정 생성 및 빌링 설정

구글 클라우드 계정 생성 및 빌링 설정 방법은 앞서 다른글에서도 많이 설명하였기 때문에 다시 설명하지 않는다. 자세한 내용은 http://bcho.tistory.com/1107 를 참고하기 바란다.

API 사용 설정하기

다음 데이타플로우와 기타 같이 사용할 제품들의 API를 사용하기 위해서 이를 설정해줘야 한다.

구글 클라우드 콘솔에서 API Manager를 선택한후 대쉬 보드에서 아래 서비스들을 선택하여 API를 Enable 해준다. Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Datastore APIs.





구글 Cloud SDK 설정

구글 데이타 플로우를 프로그래밍 하기 위해서, 데이타 플로우 API를 호출하기 위한 SDK와 조작을 위한 CLI (Command Line Interface)가 필요한데, 이는 구글 Cloud SDK를 설치하면 같이 설치가 된다.

클라우드 SDK 설치는 https://cloud.google.com/sdk/docs/ 를 참고하면 된다.

gcloud 인증하기

구글 Cloud SDK 설치가 끝났으면, gcloud 명령어를 사용하기 위해서 gcloud 명령어를 초기화 한다.

초기화는 어떤 구글 클라우드 프로젝트를 사용할것인지, 그리고 사용자 아이디등으로 인증을 하는 절차를 거친다.

프롬프트 상에서

%gcloud init

명령을 실행하여, 수행한다.

이클립스 환경 설정

이제 구글 클라우드 프로젝트 설정과, 이를 호출하기 위한 SDK 환경 설치가 끝났다. 이제 이클립스 기반의 개발 환경을 설정해보자.

이클립스 설치하기

이클립스는 4.4 버전 이상을 설치하고, JDK는 1.7 이상을 설정한다.

플러그인 설치하기

다음 구글 데이타 플로우 개발환경을 위한 이클립스 플러그인을 설치한다.

이클립스에서 Help > Install New Software를 선택한 다음에, Work with 텍스트 박스에  https://dl.google.com/dataflow/eclipse/  을 입력한다.


다음으로 Google Cloud Dataflow를 선택하여 설치를 진행한다.

설치가 끝난 후 확인은 이클립스에서 New > Project를 하면, 위자드를 선택하는 화면에서 아래와 같이 Google Cloud Platform이라는 폴더와 함께 그 안에 “Cloud Dataflow Java Project”를 선택할 수 있는 화면이 나온것을 볼 수 있다.



헬로우 데이타 플로우

개발 환경 설정이 끝났으니, 이제 간단한 데이타 플로우 프로그램을 하나 만들어보자.

이 프로그램은 단어들을 읽어드린 후에, 단어들의 발생 횟수를 카운트 해 주는 파이프라인이다.



단어들을 읽어드린 후 toUpper라는 트랜스폼에서, 각 단어들을 대문자로 변환한 후, Count라는 트랜스폼에서 단어별로 발생횔 수를 카운트 한후에, 이를 Key Value (단어:발생횟수)로 리턴한 후, Print라는 트랜스폼에서 화면으로 결과를 출력해주는 예제이다.


프로젝트 생성

예제 파이프라인을 만들기 위해서, 이클립스에서 프로젝트를 생성해보자. New > Project를 선택한 후 에, 아래 그림과 같이 Google Cloud Platform 폴더에서 Cloud Dataflow Java Project를 선택한다



다음 프로젝트에 대해서  Group ID, Artifact ID 그리고 패키지 명등을 입력한다.



다음 메뉴로 넘어가면 구글 데이타 플로우를 실행하기 위한 디테일한 정보를 넣어야 하는데,




프로젝트 명과, “Cloud Storage Staging Location”이라는 정보를 입력해야 한다. Cloud Storage Staging Location은 Google Cloud Storage 의 버킷명으로, 데이타 플로우 애플리케이션 코드가 로딩 되는 장소이다.

데이타플로우 애플리케이션을 구글 클라우드에서 실행하게 되면, 애플리케이션 코드와 애플리케이션을 실행하기 위한 라이브러리들이 각각의 워커 노드로 배포 되는데, 배포를 위해서 먼저 클라이언트에서 부터, 이러한 실행 코드를 Google Cloud Storage에 올려놓게 된다. 앞에서 정의하는 “Cloud Storage Staging Location”은, 이 클라우드 스토리지 버킷에 대한 경로 정의이다.

클라우드 스토리지 버킷은 아래와 같인 Google Cloud Storage 메뉴에서 아래와 같이 생성할 수 있다.


코드 제작

그러면 코드를 작성해 보자.



package com.terry.df;


import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;

import com.google.cloud.dataflow.sdk.values.KV;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


public class StarterPipeline {

 private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);


 public static void main(String[] args) {

   Pipeline p = Pipeline.create(

       PipelineOptionsFactory.fromArgs(args).withValidation().create());


   p.apply(Create.of("Hello", "World","hello","boy","hello","girl"))

   .apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))

   .apply(Count.<String>perElement())

   .apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>(){

@Override

public void processElement(ProcessContext c) throws Exception {

LOG.info(c.element().getKey() + " count:"+c.element().getValue());

}

   }));


   p.run();

 }

}



(참고 : 위의 소스코드는 https://github.com/bwcho75/googledataflow/tree/master/HelloDataFlow 에 있다.)


처음 p.apply(Create.of…)에서, 데이타를 생성하였다.

다음으로 .apply(ParDo.named("toUpper").of(new DoFn<String, String>() 에서 소문자를 대문자로 다 치완하는 데, ParDo는 이 작업을 여러 노드에서 병렬로 실행하겠다는 선언이고, named는 이 트랜스폼의 이름을 “toUpper”로 정의하겠다는 정의이다. (나중에 디버깅에 유용한다.) 다음으로, 트랜스폼 함수는 DoFn으로 정의했는데, <String,String>으로 정의되어 앞의 인자가 Input 그리고 뒤의 인자가 Output의 데이타 형으로 String 인자를 받아서, String 인자로 리턴하겠다는 것이다.


.apply(Count.<String>perElement()) 은 데이타플로우에서 미리 정의된, 트랜스폼으로,  <String>으로 된 데이타를 받아서 엘리먼트당 카운트를 해서 <String,Long> 형으로 리턴을 해준다. 즉 String형의 단어마다 카운트를 한 결과를 Long형으로 넣어서 이를 키밸류(KV)형식으로 묶어서 리턴해준다.

.apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>() 에서는 앞에서 전달해준  String,Long형이 키밸류형으로 정의된 KV<String,Long>형의 데이타를 받아서, 출력해주고, 마지막 트랜스폼이기 때문에 더 이상 뒤로 데이타를 넘기지 않을 것이기 때문에, Output의 인지 타입을 Void로 선언하였다.

실행

코드를 작성이 끝났으면 실제로 실행해보자 Run As에서 Dataflow Pipeline을 선택하면 실행을 할 수 있다.



이때 다음과 같이 실행환경을 설정할 수 있다.



여기서 Runner에 대한 개념을 짚고 넘어갈 필요가 있다.

Direct Pipeline Runner

Direct Pipeline Runner는 데이타플로우 코드를 로컬 개발 환경 (노트북이나 데스크탑)에서 실행하고자 할때 선택할 수 있는 러너이다. 주로 개발이나 테스트에서 사용할 수 있는데, 다른 클라우드 서비스 예를 들어  Pub/Sub이나 빅쿼리등이랑 연동이 되는 파이프라인의 경우에는 DirectPipelineRunner를 사용할 수 없으니 주의하기 바란다.

DataflowPipelineRunner

클라우드 환경에서 데이타 플로우를 실행하기 위해서는 DataflowPipelineRunner와  BlockingDataflowPipelineRunner 두 가지가 있다.

DataflowPipelineRunner는 데이타 플로우 애플리케이션을 구글 클라우드에서 실행을 해주는데, 데이타 플로우 잡을 클라우드에서 실행해놓고, 로컬 애플리케이션을 바로 종료 한다. (클라우드에 접수된 잡은 클라우드에서 계속 실행된다.)

BlockingDataflowPipelineRunner

BlockingDataflowPipelineRunner는 데이타 플로우잡을 구글 클라우드에서 실행해놓게 해놓고, 잡이 끝날때 까지 로컬 애플리케이션을 대기하도록 한다.  

배치와 같이 끝이 있는 경우에는 필요에 따라서 사용할 수 있다. 스트리밍의 경우 BlockingDataflowPipelineRunner를 사용하게 되면 스트리밍 잡을 명시적으로 끊지 않는 이상 계속해서 로컬 애플리케이션이 실행되는 상태가 된다.


DirectPipelineRunner로 실행을 해보면 다음과 같이 이클립스 콘솔에서 결과가 출력되는 것을 볼 수 있다.


BODY는 1,  GIRL 은 1, HELLO는 3개 그리고 WORLD는 1개가 출력되는 것을 볼 수 있다.


이번에는 클라우드에 배포를 하고 실행해보자, Run As에서, BlockingDataflowPipelineRunner를 선택하여 실행해보자.

실행을 하면 코드가 자동으로 클라우드로 배포 되서 실행되는 것을 확인할 수 있다. 구글 클라우드 콘솔의 데이타 플로우 메뉴를 보면, 새로운 잡이 생성된것을 확인할 수 있다.


해당 잡을 선택해서 들어가 보면 현재 잡의 실행 상황과 함께, 파이프라인에서 단계별 시간이나 기타 상세한 로그를 볼 수 있다.



데이타 플로우 애플리케이션이 기동이 완료되면, Logs 메뉴에서 Worker Logs라는 버튼을 누르면 각 워커 노드에서의 로그를 볼 수 있다.


Worker Logs를 누르면 다음과 같이  GIRL,WORLD,BOY,HELLO에 대한 count 수를 출력한 로그를 확인할 수 있다.


참고 : Logs 메뉴로 들어가서  Job Logs에서  Minimum serverity를 “All” 로 선택하면 전체 작업 상황을 알 수 있는데, 애플리케이션을 실행했다고 바로 데이타 플로우의 파이프라인이 실행되는 것이 아니라, 애플리케이션 코드가 구글 클라우드 스토리에 로드되고, 이 로드된 코드들이 각각의 워커 노드에 배포가 된후에, 워커 노드가 기동이 되야 잡이 실제로 수행된다.


워커가 제대로 기동되었는지는 Job Logs에서 Mimimum serverity를 All로 한후에 다음과 같이 “Worker have started successfully”라는 로그가 나오면 그때 부터 데이타 플로우 잡을 실행을 시작한다고 생각하면 된다.








데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다.