블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


빅쿼리 대쉬 보드를 위한 오픈소스 메타 베이스


조대협 (http://bcho.tistory.com)


빅쿼리 분석 결과를 시각화 하는 도구로 구글에서 제공되는 툴은 일반 비지니스 사용자나, 초보자를 위한 데이타 스튜디오, 그리고 데이타 사이언티스트를 위한 DataLab 등이 있다.


그러다 보니, 데이타 사이언티스트는 아니면서 고급 사용자를 위한 데이타 분석툴 영역에 다른 툴이 필요하게 되는데, 상용 도구로는 타블루와 같은 설치형 도구나 Looker 등의 클라우드 서비스를 사용할 수 있는데, 유료이기 때문에, 대안적인 툴을 찾는 경우가 많다.


오픈 소스 도구로는 Redash가 있는데, 이 외에, Metabase(메타 베이스) 라는 도구가 있어서 소개한다.


쿼리 및 분석 기능

분석을 위해서 기본적인 화면상에서 쿼리가 가능하고, 쿼리 결과는 아래 그림과 같이 테이블이나 그래프 형태로 출력이 가능하기 때문에, AdHoc  분석이 손쉽게 가능하다. 




대쉬 보드 기능

이렇게 쿼리하고 분석한 내용을 바로 아래 그림과 같이 대쉬 보드에 추가할 수 있다. 



사용자 관리 기능

메타 베이스의 장점 중 하나가, 어느정도 규모가 되는 조직에서 사용이 가능하도록, 사용자 계정 관리 기능을 가지고 있다.  사용자 그룹을 통한 권한 관리 등이 가능하다.


관리자 기능

사용자 권한 관리를 하기 때문에 당연히 관리자 기능이 있는데, 재미있는 것은 필터나 매트릭등을 관리자가 정해놓고, 사용자가 이 매트릭을 불러다가 분석이나 리포팅에 사용할 수 있다.





<그림. 관리자 패널에서 필터를 정의하는 화면 >



빅쿼리와 메타 베이스를 연결하는 방법은 다음과 같다.

https://www.metabase.com/docs/latest/administration-guide/databases/bigquery.html


설치는 metabase.com 문서를 참고해야 하는데 mysql이나 postgres와 같은 외부 데이타 베이스를 설정해야 한다. 

https://www.metabase.com/docs/latest/operations-guide/running-the-metabase-jar-file.html



구글 스택드라이버를 이용한 애플리케이션 로그 모니터링

조대협 (http://bcho.tistory.com)

스택드라이버 소개

스택드라이버는 구글 클라우드에서 서비스로 제공되는 시스템 로그 및 모니터링 시스템이다. CPU,메모리사용량과 같은 하드웨어에 대한 정보에서 부터 웹서버나 OS와 같은 미들웨어 및 애플리케이션 로그를 수집, 검색 및 분석할 수 있으며, 여러 오픈 소스 (MongoDB, CouchDB, Redis - https://cloud.google.com/monitoring/agent/plugins/ )등에 대한 모니터링도 가능하다.

구글 클라우드 뿐 아니라, AWS에 대한 모니터링을 통합으로 지원하는 등, 상당히 많은 기능을 가지고 있다.

이 글에서는 스택드라이버를 이용하여 애플리케이션 로그를 수집하고 이를 분석하는 방법에 대해서 설명하고자 한다.

자바를 기반으로 애플리케이션 로깅을 설명한다. 자바 애플리케이션에서 스택드라이버로 로그를 남기는 방법은 여러가지가 있으나, 일반적으로 자바 프로그래밍 언어에서 많이 사용하는 로깅 프레임웍은 SL4J 를 이용한 로깅과, 스택드라이버 SDK를 이용하여 JSON 형태로 로그를 저장하는 방법에 대해서 알아보도록 한다.

API 인증

스택 드라이버를 사용하기 위해서는 로그 API에 대한 인증이 필요하다. 인증에는 여러가지 방법이 있는다. 사용이 쉬운 방법을 설명한다.

로컬 환경 또는 타 환경에서 인증

로컬 개발환경이나 클라우드에서 인증을 하는 방법은 서비스 어카운트 (Service Account)를 사용하는 방법이 있다. 서비스 어카운트는 구글 클라우드 콘솔에서  IAM 메뉴에서 생성할 수 있다. 서비스 어카운트 메뉴를 아래와 같이 선택한 다음.


상단 메뉴에서 Create Service Account  버튼을 누르고 서비스 어카운트 생성한다.


서비스 어카운트에는 서비스 어카운트의 권한을 설정할 수 있는데, Project Owner로 설정하면 모든 권한을 다 가질 수 있고, 여기서는 로깅 권한만을 줄것이기 때문에, Logs Writer 권한만을 지정한다.


계정 생성을 하면 json 파일이 다운로드 된다.

이 파일은 환경 변수 GOOGLE_APPLICATION_CREDENTIALS 에 파일 경로를 지정해주면 된다.

예시 $ export GOOGLE_APPLICATION_CREDENTIALS=/Users/terrycho/keys/terrycho-sandbox-projectowner.json




구글 클라우드 VM 내에서 인증

구글 클라우드 VM내에서 자바 코드를 실행할 경우 VM 자체에 API 접근 권한을 부여할 수 있다. 보통 운영환경에서는 이 방법이 권장된다.

아래와 같이 VM 생성시 “Identity and API access” 에서 API 접근 권한을 주면 된다. Set access for each API를 써서 Logging write 권한만을 줄 수 있고, 아니면 Allow full access to all Cloud APIs 를 이용해서 전체 API에 대한 권한을 줄 수 도 있다.




SL4J를 이용한 로깅

sl4j를 이용한 로깅은, 기존의 sl4j 로거를 그대로 사용하기 때문에 코드 변환이 거의 없고, 단지 maven 에서 라이브러리 의존성을 스택드라이버 로거로만 변경해주면 되기 때문에 별도의 학습이 필요없고 사용법이 단순하다는 장점이 있다. sl4j 로깅은 단순하다.

의존성 추가

먼저  pom.xml 에 아래와 같은 의존성을 추가 한다.

<dependency>
<groupId>com.google.cloud</groupId>       <artifactId>google-cloud-logging-logback</artifactId> <version>0.30.0-alpha</version>
</dependency>

logback.xml

다음 필요에 따라서 sl4j에 대한 설정을 위해서 logback.xml 을 추가 설정할 수 있다. 여기서 로깅 레벨등을 지정할 수 있으나, sl4j에 대한 내용이기 때문에 별도로 설명하지는 않는다.

자주 실수 하는 부분이 logback.xml은 클래스 패스의 경로내에 들어가 있어야 하는데 다른 방법으로는 자바 옵션으로 -Dlogback.configurationFile 으로 logback.xml 경로를 설정하면 된다.



코드

코드를 보자

package com.google.example.stackdriver;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



public class App {

 private static final Logger logger = LoggerFactory.getLogger(App.class);

 

 public static void main(String[] args) {

   logger.info("My Hello Log4j");

 }

}


코드는 간단하다. logger를 선언한 후에, .info, .error, .warning 등의 메서드로 텍스트 문자열을 남기면 된다.


자바 로거 연동은 sl4j이외에도 java.util.logging 도 연동이 가능하다. 자세한 내용은 https://cloud.google.com/logging/docs/setup/java 를 참고하기 바란다.

Logger를 이용한 로깅

sl4j는 사용이 간편한 반면에 텍스트 문자열로 로깅이 되기 때문에, 구조화된 정보 (JSON)이나 여러 필드를 가지는 로그를 남기기가 쉽지 않다는 단점을 가지고 있다. 스택드라이버 전용 SDK를 사용하면, JSON등 다양한 포맷으로 로그를 쉽게 남길 수 있다. (sl4j의 경우에도 LoggingEnahncer를 사용하면 가능하기는 하다)


전체 코드는 다음과 같다.


package com.google.example.stackdriver;

import com.google.cloud.MonitoredResource;

import com.google.cloud.logging.LogEntry;

import com.google.cloud.logging.Logging;

import com.google.cloud.logging.LoggingOptions;

import com.google.cloud.logging.Payload.JsonPayload;

import com.google.cloud.logging.Payload.StringPayload;

import com.google.cloud.logging.Severity;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;


public class LogWithLabel {

 //https://cloud.google.com/logging/docs/reference/libraries

 final static String LOG_NAME="terry-tutorial";

 /** Expects a new or existing Stackdriver log name as the first argument.*/

 public static void main(String... args) throws Exception {


   // Instantiates a client

   Logging logging = LoggingOptions.getDefaultInstance().getService();


   // The data to write to the log

   String text = "Hello, world!";

   Map<String, Object> jsonMap = new HashMap<String, Object>();

   jsonMap.put("elapsedtime", 11);

   

   for(int i=0;i<1000;i++){

    jsonMap.put("count", i);

   LogEntry entry

    //= LogEntry.newBuilder(StringPayload.of(text))

    // 한페이로드만 사용이 가능함. 오버라이드됨.

    = LogEntry.newBuilder(JsonPayload.of(jsonMap))

.setSeverity(Severity.ERROR)

       .setLogName(LOG_NAME)

       .setResource(MonitoredResource.newBuilder("global").build())

       .addLabel("instancename", "instance-1")

       .build();

   // Writes the log entry asynchronously

   logging.write(Collections.singleton(entry));

   }


   System.out.printf("Logged: %s%n", text);

 }

}


먼저 Logging 객체를 가지고 와야 한다. 별도의 설정 없이 다음과 같이 설정하면 되고, 프로젝트 및 인증은 앞에서 설정한 Service Account 파일의 정보를 그대로 사용한다.

Logging logging = LoggingOptions.getDefaultInstance().getService();


이 예제는 JSON 포맷으로 데이타를 저장하는 방법인데, 단순하게 1 레이어의 JSON을 저장하도록 하였다. Map을 이용하여 jsonMap을 정의하고, put을 이용하여 key, value 값을 저장한다.


   String text = "Hello, world!";

   Map<String, Object> jsonMap = new HashMap<String, Object>();

   jsonMap.put("elapsedtime", 11);


다음 로그를 저장하기 위해서는 LogEntry 객체를 이용해야 하는데, LogEntry는 LogEntry.newBuilder(PayLoad)를 이용하여 생성한다. Text 로그를 저장하는 TextPayLoad를 사용하거나 다른 페이로드도 있지만 여기서는 JsonPayLoad를 사용하였다.

LogEntry.newBuilder(JsonPayload.of(jsonMap))


다음 로그 Serverity (INFO,ERROR,WARNING)는 setServerity로 정할 수 있다. 스택 드라이버 로그는 정보 구조에서 계층 구조를 가질 수 있는데, 다음과 같은 개념을 가지고 있다.

리소스

리소스는 이 로그가 어떤 자원에 속하는지를 정의한다. 예를 들어, VM, 빅쿼리와 같이 어떤 인프라에 속하는지를 정의할 수 있는데, 애플리케이션의 경우 일반적으로 “global” 리소스로 정의한다.

리소스 명은 setResource메서드를 이용해서 지정이 가능하다.

라벨

다음 로그에 라벨을 달 수 있다. 예를 들어 이 리소스가 VM인데, 어떤 VM인지 식별을 하기 위해서 키를 name, 값을 인스턴스명 등으로 지정할 수 있다. 또는 개발/운영 환경인지를 구별하기 위해서 env 라는 키를 이용해서 환경에 따라 값을 dev,qa,prod 등으로 달 수 있다. 하나의 로그에는 여러개의 라벨을 붙이는 것이 가능하다. 라벨은 키,밸류 형태로 .addLabel(키,값)으로 추가가 가능하다.

로그 이름

로그 이름은 로그를 그룹핑할 수 있는데, 애플리케이션 종류등으로 그룹핑을할 수 있다. 이 로그는 사용자 로그, 게임 로그 등으로 그룹핑이 가능하다. 그룹 명을 setLogName으로 지정이 가능하다.


아래는 리소스를 global, 로그 이름을 LOG_NAME, 라벨에 instancename을 키로, instance-1이라는 값을 지정한 코드 예제이다.

       .setLogName(LOG_NAME)

       .setResource(MonitoredResource.newBuilder("global").build())

       .addLabel("instancename", "instance-1")

로그 확인

로그는 구글 클라우드 콘솔에서 STACKDRIVER > Logs 항목에서 확인이 가능하다.


위 그림과 같이 메뉴로 진입한 후에, 로그를 볼 수 있다.


리스트 박스에서 첫번째 박스는 리소스를 선택하는 화면으로 애플리케이션 로그는 앞의 예제에서 리소스를 global로 선택하였기 때문에, global을 선택한다. 그리고 두번째는 로그 이름을 고르는 화면인데, 앞에 예제에서 terry-tutorial로 로그 이름을 지정하였기 때문에 terry-tutorial을 선택한다.

다음 위의 화면에서 버튼을 누르면 실시간으로 로그를 볼 수 있는데, 통상 1분이내의 딜레이가 소요된다고 보면 된다.

로그에서 각 항목을 펼쳐보면 디테일을 볼 수 있다. 아래는 하나의 디테일인데, 중요한 부분은 timestamp에서 시간이 기록되고, serverity에 에러 레벨이 기록된다. 그리고 앞에서 지정한 Json PayLoad가 jsonPayLoad 라는 항목으로 들어간다.  라벨은 labels라는 항목에 키/밸류 형식으로 지정이 되는 것을 볼 수 있다.


로그 검색 및 필터링

스택드라이버의 강력한 기능중 하나가 로그에 대한 검색과 필터링인데, 스택 드라이버 콘솔 상단 화면에서 필터링(검색) 조건을 넣으면 각 필드 값에 따라서 다양한 형태로 로그 검색이 가능하다.


이 조건은 resource가 global이고, 그중에서 jsonPayload.count 가 900 보다 큰 로그만을 추출하는 방법이다. (Advanced filter를 사용하엿음)

표현식이 어렵지 않으니, https://cloud.google.com/logging/docs/view/advanced_filters 를 참고하면 손쉽게 로그 검색이 가능하다.

EXPORT

스택 드라이버의 다른 장점 중의 하나는 저장된 로그를 다른 시스템으로 EXPORT할 수 있는데, 크게 다음 3가지로 EXPORT가 가능하다.

  • GCS (파일) : Google Cloud Storage에 파일로 로그를 저장이 가능하다.

  • Pub/Sub (실시간 스트리밍) : 실시간으로 로그를 Pub/Sub 큐로 저장이 가능하다. Pub/Sub 뒤에 컨슈머를 둬서 다양한 처리가 가능하고 (알럿등) Apache Beam (Dataflow)연동을 통해서 실시간으로 로그를 분석 하는 것이 가능하다

  • BigQuery (데이타 베이스) : 실시간으로 데이타를 대용량 데이타 베이스는 빅쿼리에 저장하여 다양한 쿼리 및 시각화가 가능하다.


로그 EXPORT는 상단 메뉴의 CREATE EXPORT 버튼을 이용하면 EXPORT 정의가 가능하다.


이때 흥미로운 점은 로그 EXPORT시 필터에 조건을 걸어놓으면, 필터에 맞는 조건에 있는 로그만 EXPORT가 된다. 즉 로그 레벨이 CRITICAL한 로그만 Pub/Sub으로 로깅해서 알럿을 보내는 것과 같은 작업이 가능하게 된다.

빅쿼리로 EXPORT

그럼 그중에서 빅쿼리로 로그를 EXPORT하는 방법에 대해서 알아보기로 한다.

빅쿼리로 EXPORT하기 위해서는 CREATE EXPORT를 누른 후에, 로그 SINK 명을 지정하고 데이타셋을 지정해야 하는데, 데이타셋을 새로 생성하면 된다.


이 예제에서는 필터를 추가하여 label에서 instancename이 “instance-1”인 로그만 빅쿼리로 저장하도록 EXPORT 설정을 하였다.


http://bigquery.google.com에 들어가면 앞에 지정한 이름으로 데이타셋이 생긴것을 확인할 수 있고, 테이블명은 앞에서 지정한 로그명인 terry_tutorial 로 지정된것을 확인할 수 있다.

다음은 로그 시간과, JsonPay로드의 elapsedtime과, count 값을 조회하는 쿼리와 결과 이다.



쿼리 결과




데이타 스튜디오를 이용한 로그 시각화

이렇게 빅쿼리에 저장된 데이타는 구글 데이타 스튜디오를 이용하여 손쉽게 시각화가 가능하다.

https://datastudio.google.com에 접속한 후에, Start New Report에서 Blank Report 만들기를 선택한다.

새로운 리포트 화면이 나오면 우측 하단의

를 선택하여 빅쿼리 테이블과 연결을 한다.


좌측 커넥터를 선택하는 화면에서 BigQuery를 선택한후


MY PROJECT에서 내 프로젝트를 고르고, 데이타셋과 테이블은 선택한다.


다음으로 상단의 CONNECT 버튼을 눌러서 테이블을 연결한다. 또는 프로젝트를 선택하는 대신 CUSTOM QUERY를 누르면, 직접 SQL을 써서 특정 필드만 조회할 수 있다.


여기서는 전체 테이블을 불러오는 것으로 진행하도록 한다.

다음 화면에서는 필드 선택 및 제거, 그리고 타입 설정등이 가능하다.


적절하게 사용할 필드를 선택하고, 타입을 지정한후, 우측 상단의 ADD TO REPORT를 선택한다.

타임 스탬프는 일반적으로 일단위로 컨버팅 되기 때문에, 세밀한 로그를 원하면 분단위 등으로 변경하거나 커스텀 쿼리를 이용해서 초단위 값으로 컨버팅하기를 권장한다.

다음 메뉴에서 그래프나 표를 선택하여 적절하게 그리고, X 축은 Deminsion에 설정한다. 아래는 Dimension을 timestamp로 선택하고, Y축은 Metric 값으로 jsonPayload.count를 준 예이다.



혹시 테이블을 그린후에 데이타가 나오지 않는 경우가 있는데, 이 경우는 대부분 DataStudio의 Time zone과 빅쿼리에 저장된 Time이 맞지 않아서, 쿼리 범위에서 제외되는 경우인데, 이 경우는 그래프의 Property에서 날짜 범위를 다음과 같이 조정해주면 된다.



이외에도 다양한 기능이 있는데, 다음 문서들을 참고하기 바란다.



구글 빅쿼리 사용시 count(distinct)의 값이 정확하지 않은 문제


조대협 (http://bcho.tistory.com)


빅쿼리에서 count(distinct) 문을 사용하면, 종종 값이 부 정확하게 나오는 경우가 있다.

예를 들어서 아래의 두 쿼리는 같은 결과가 나와야 하는데, 아래 count(distinct id)를 쓴 쿼리는 다른 값을 리턴한다.

select count(*)

where id="mykey"

from mytable


select count(distinct id)

where id="mykey"

from mytable


빅쿼리에는 쿼리가 빅쿼리에 최적화된 SQL과 유사한 Legacy SQL 쿼리가 있고, ANSI SQL을 따르는 스탠다드 쿼리가 있다.

Legacy SQL 쿼리의 경우 확인해보니, 동작 방식이 다소 상이한 부분이 있다.
COUNT([DISTINCT] field [, n])
Returns the total number of non-NULL values in the scope of the function.

If you use the DISTINCT keyword, the function returns the number of distinct values for the specified field. Note that the returned value for DISTINCT is a statistical approximation and is not guaranteed to be exact.

Count(distinct) 함수의 경우 리턴 값이 1000이 넘을 경우에는 성능 향상을 위해서 정확한 값을 리턴하지 않고, 근사치 값 (approximation)을 리턴하도록 되어 있습니다. 그래서 상이한 결과가 나온다.

count(distinct platformadid,10000) 으로 하게 되면 리턴값이 10000을 넘을 경우에만 근사치 값을 리턴하게 됩니다. 즉 리턴값이 10000 이하이면 정확한 값을 리턴한다.

만약에 Legacy SQL에서 근사치 값이 아닌 정확한 값을 리턴 받으려면 count(distinct)
EXACT_COUNT_DISTINCT(field)
Returns the exact number of non-NULL, distinct values for the specified field. For better scalability and performance, use COUNT(DISTINCT field).

함수를 사용하시는 방법이 있고, 아니면 Legacy SQL 의 특성에 의한 오류를 방지하시려면 쿼리를 ANSI SQL 모드로 실행하시면 됩니다.
ANSI SQL 모드로 수행하는 방법은 아래와 같이 "Use Legacy SQL" Box를 unchecking 하시고, 테이블 이름을 []로 감싸지 마시고 ``로 감싸서 사용하시면 된다.


노트7의 소셜 반응을 분석해 보았다. 


#3 제플린 노트북을 이용한 상세 분석



조대협 (http://bcho.tistory.com)



데이타 스튜디오는 편리하게 사용할 수 있지만, 쿼리 사용등이 불가능하기 때문에, 원본 데이타를 이용한 상세 분석에는 어려움이 있다. 원본 데이타를 이용해서 상세 분석을 하려면 노트북 계열의 애플리케이션이 효과적인데, 빅쿼리를 연동할 수 있는 노트북으로는 이전에 소개한 주피터 노트북 기반의 데이타랩 (datalab)과, 스파크나 다른 빅데이타 솔루션과 함께 많이 사용되는 제플린 노트북(zeppelin.apache.org) 이 있다.


지난 글에서 데이타랩에 대한 연동 방법은 이미 소개하였으니, 이번에는 제플린을 통하여, 빅쿼리의 데이타를 분석해보도록 한다.


제플린 설치

제플린을 설치 하는 방법은 간단하다. Zeppelin.apache.org 에서, 설치 파일을 다운로드 받는다.

빅쿼리 연동 인터프리터는 제플린 버전 0.61 버전 이상에 포함되어 있기 때문에, 0.61 버전 이상을 다운로드 받는다.  이 때 모든 인터프리터가 포함된 버전을 다운 받아야 한다. (아니면 별도로 인터프리터를 설치해야 하는 번거로움이 따른다.)


다운 로드 받은 파일의 압축을 푼다. 다음으로 제플린 설치 디렉토리로 들어가서 다음 명령어를 수행한다.

% ./bin/zeppelin.sh

윈도우의 경우에는 %./bin/zeppelin.cmd 를 실행하면 된다.

자바 애플리케이션이기 때문에 별도의 설치 과정이 필요없고, 제플린 애플리케이션을 실행하기만 하면 된다.

제플린이 기동되었으면 브라우져에서 http://localhost:8080 으로 접속하면 다음과 같이 제플린 콘솔을 볼 수 있다.

노트북 생성

제플린 콘솔에 들어왔으면 초기화면에서 Create new note 라는 메뉴를 이용하여 새로운 노트북을 생성하자. 여기서는 편의상 “BQ 노트북" 이라는 이름으로 노트북을 생성하였다.


분석 쿼리 작성

이제 분석할 내용은 수집된 트윗의 명사들에 대해서, 시간 단위로 그룹핑을 한 다음에, 각 단어에 대해서 발생한 횟수를 카운트해서 보여주는 내용을 구현하려고 한다.

예를 들어서 9월20일에는 “유행" 이라는 단어가 200회 발생하였고, “패션" 이라는 단어가 100회 발생하였다. 라는 식으로 조회를 하려고 한다.


현재 테이블 구조는 다음과 같다.

Date (발생 시간)

Noun (명사)

count (발생 횟수)


SQL 문장을 작성해보자

select date,noun,sum(count) from 테이블명

group by date,noun


이렇게 쿼리를 하면, 시간대 별로, 명사와 그 명사의 발생 횟수를 리턴을 해주는데, 우리는 앞의 데이타 플로우 프로그램에서 30초 단위로 통계를 집계하도록 하였기 때문에, 30초 단위로 결과가 리턴된다. 우리가 원하는 결과는 30초 단위가 아니라 1시간 단위의 결과 이기 때문에, 다음과 같이 쿼리를 수정한다.


select  DATE(date) as ddate,HOUR(date) as dhour,noun,sum(count) from 테이블명

group by ddate,dhour,noun


DATE와 HOUR라는 함수를 사용하였는데, DATE는 타임 스탬프 형태의 컬럼에서 날짜만을 추출해주는 함수 이고, HOUR는 타임 스탬프 형태의 컬럼에서 시간만을 추출해주는 함수 이다.

이렇게 날짜와 시간만을 추출한 다음에, group by 절을 이용하여, 날짜와,시간 그리고 명사로 그룹핑을 하게 되면 우리가 원하는 것과 같이 각 날짜의 시간대별로 명사별 발생횟수 ( sum(count)) 값의 통계를 얻을 수 있다.


제플린에서 빅쿼리 명령을 수행하려면 다음과 같이 %bigquery.sql 이라고 첫줄에 선언을 한 다음에 SQL 문장을 수행하면 된다.




결과는 디폴트로 테이블 형태로 나오는데, 아래 아이콘 중에서 그래프 아이콘을 누르면 그래프 형태로 볼 수 가 있는데, 이 때 X,Y축의 변수를 지정할 수 있다.

아래 그림과 같이 Keys (X축을) ddate,dhour를 선택하고 Values(Y축)을 dhour SUM을 선택하면, 시간별 나타난 단어수를 볼 수 있다.



그런데 이 쿼리를 수행하면, 각 시간별로 발생한 명사 단어의 수가 매우 많기 때문에, 보기가 매우 어렵다.

그렇다면 시간대별로 발생한 명사중에서 각 시간대별로 많이 발생한 명사 5개씩만을 볼 수 없을까? 즉 group by를 전체 데이타 구간이 아니라, 각각 시간대 별로 계산을 해줄 수 는 없을까 하는 필요가 발생한다.

빅쿼리 파티셔닝

데이타를 구간 별로 나눠서 연산할 수 있는 기능으로 빅쿼리에는 파티션이라는 기능이 있다.

예를 들어서 group by를 전체 결과에 대해 그룹핑을 하는 것이 아니라, 앞에서 언급한 요건 처럼 일 단위로 짤라서 그룹핑을 하는 것이 가능하다.




파티션을 이용해서 할 수 있는 것은 파티션별로 합계나, 통계를 내거나, 파티션의 각 로우의 값의 백분율(%)나 또는 소팅한 순서등을 볼 수 있다. 여기서는, 시간으로 파티션을 나누고  파티션내에서 명사의 수가 많은 수 순서대로 소팅을 한후에, RANK라는 함수를 이용하여 그 파티션에서 그 명사가 몇번째로 많이 나타났는지를 출력하도록 해보겠다.


파티션의 사용법은 다음과 같다.

“파티션 함수 OVER (PARTITION BY 파티션을할 키 목록)”

여기서는 일/시간 별로 파티션을 나눈 후에, 그 순위별로 소팅을 할 것이기 때문에, 다음과 같은 식을 쓴다.

RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank


이를 쿼리에 적용하면 다음과 같다.

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc


그러면 다음과 같이 일/날짜 파티션별로 많이 발생한 명사 순으로 발생횟수와 순위(rank)를 출력해준다.



그런데 쿼리를 돌려보면 알겠지만, 시간대별로 수집한 명사의 종류가 많기 때문에, 일자별 데이타가 매우 많다. 그래서 파티션별로 많이 등장하는 단어 5개만을 보려고 하면 rank <5 인것만 걸러내면 된다. 이는 중첩 쿼리를 이용해서 수행이 가능하다

다음은 이를 적용한 예이다.


SELECT ddate,dhour

   ,noun

   , rank

from (

   SELECT

       DATE(date) as ddate,HOUR(date) as dhour

       ,noun

       ,sum(count) as ncount

       , RANK() OVER (PARTITION BY ddate,dhour ORDER BY ncount DESC  ) as rank

   FROM [useful-hour-138023:twitter.noun]

   where noun != "note7" and noun != "samsung" and noun !="galaxy"

   group by noun,ddate,dhour

   order by ddate,dhour,ncount desc

   )

where rank < 6


이렇게 하면, 각 시간대별로 자주 등장하는 단어 6개만을 보여준다.


이 쿼리를 이용하여 데이타를 어떻게 분석하는지는 예전글 http://bcho.tistory.com/1136 을 참고하세요.


간단하게나마 트위터 피드에서 특정 키워드를 기반으로 하여, 명사와 형용사를 추출하여 소셜 반응을 분석하는 애플리케이션 개발과 데이타 분석 방법에 대해서 설명하였다.

아이폰7을 분석해보니, 명사 분석도 의미가 있었지만, 아이폰7에 대한 기대를 형용사 분석을 통해서도 많은 인사이트를 얻을 수 있었다. Awesome, excellent와 같은 기대치가 높은 형용사가 많이 검출되었고 bad, fuck 과 같은 부정적인 의미의 형용사는 다소 낮게 검출되었다. (아마 이즈음이 노트7 폭발로 인하여 반사 이익을 얻은게 아닐까 추정되는데.)


이외에도, 이모콘티만 추출하여 분석을 한다거나, 부사등을 통해서 분석을 하는 것도 가능하고, 구글 자연어 처리 API는 글을 통해서 사람의 감정을 분석해주는 기능도 있기 때문에 응용 분야는 훨씬 더 넓다고 볼 수 있다.


노트7의 소셜 반응을 분석해 보았다. 


#2 구현하기


조대협 (http://bcho.tistory.com)

지난번 글 http://bcho.tistory.com/1136에 이어서, 트위터를 통한 소셜 반응을 분석하는 시 스템을 구축하는 방법에 대해서 알아본다. 

시나리오 및 아키텍쳐

스트리밍 처리와 데이타 플로우에 대한 개념 이해가 끝났으면 이제 실제로 실시간 분석 애플리케이션을 만들어보자.

SNS를 이용한 마케팅 분석에서 대표적인 시나리오중 하나는 트위터 피드를 분석하여, 사람들의 반응을 분석하는 시나리오이다. 자주 언급 되는 단어나 형용사를 분석함으로써, 특정 제품이나 서비스에 대한 소셜 네트워크상의 바이럴 반응을 분석할 수 있는데, 여기서  구현하고자 하는 시나리오는 다음과 같다. 트위터 피드에서 특정 키워드로 트윗 문자열들을 수집한 후에, 구글의 자연어 분석 API를 통하여 트윗 문자열에서 명사와 형용사를 추출한다

추출한 명사와 형용사의 발생 횟수를 통계내어서 대쉬보드에 출력하는 시나리오이다.


이를 구현하기 위한 솔루션 아키텍쳐는 다음과 같다.


fluentd를 이용하여 트위터의 특정 키워드를 기반으로 트위터 피드를 수집하고, 수집된 피드들은 구글 클라우드의 큐 서비스인 Pub/Sub으로 전달된다. 전달된 데이타는 데이타 플로우에서 읽어서 필요한 데이타만 필터링한 후, 구글의 자연어 분석 API를 통해서 명사와 형용사를 분리한다.

분리된 명사와 형용사는 데이타플로우에서 30초 주기의 고정윈도우(Fixed Window) 단위로, 명사에서 발생한 단어의 수와, 형용사에서 발생한 단어의 수를 카운트 한 다음에, 빅쿼리에 명사 테이블과 형용사 테이블에 저장한다.

저장된 데이타는 구글의 리포팅 도구인 데이타 스튜디오를 통해서 그래프로 출력한다.


구현

그러면 위에서 설명한 아키텍쳐대로 시스템을 하나씩 구현해보자.

전체 예제 코드와 설정 파일은 https://github.com/bwcho75/googledataflow/tree/master/twitter 에서 받아볼 수 있다.

트위터 피드 수집 서버 설정

먼저 트위터에서 피드를 수집하기 위해서 fluentd 에이전트를 설정한다. 구글 컴퓨트 엔진에서 VM을 생성한 후에, 앞의 빅쿼리 예제에서 한것과 마찬가지로 fluentd 에이전트를 설치한다.

VM을 설치할때, 반드시 Cloud API access scopes를 full API access로 설정해야 하는데, 이 VM에서 fluentd를 통해서 수집한 피드를 Pub/Sub으로 전달할때, Pub/Sub API를 사용하기 때문이다.


Fluentd 가 설치되었으면 Pub/Sub으로 데이타를 전달하기 때문에,Fluentd pub/sub 에이전트를 추가설치 한다.

에이전트명은 “fluent-plugin-gcloud-pubsub”로

% sudo td-agent-gem install fluent-plugin-gcloud-pubsub

명령을 이용해서 설치한다.


에이전트 설치가 끝났으면 fluentd 에이전트 설정을 해야 한다.

다음은 트위터에서 “note7”에 관련된 피드를 읽어서 pub/sub 큐로 피드를 전송하는 fluentd 설정 예제이다.


<source>

 type twitter

 consumer_key        트위터 Consumer Key

 consumer_secret     트위터 Consumer Secrect

 oauth_token         트위터 Access Token

 oauth_token_secret  트위터 Access Token Secrect

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             note7

 output_format       nest                    # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type gcloud_pubsub

 project 본인의 프로젝트명

 topic projects/본인의 프로젝트명/topics/twitter

 key 다운로드받은 구글 클라우드 억세스 토큰 JSON 파일

 flush_interval 10

 autocreate_topic false

</match>


Fluentd 설정이 끝났다.

Pub/Sub 큐 설정

다음으로는 fluentd 읽어드린 트위터 피드를 받아드를 Pub/Sub 큐를 생성한다.

큐 생성 방법에 대해서는 앞의 Pub/Sub 챕터를 참고하기 바란다. (http://bcho.tistory.com/1120)

큐 이름은 twitter라고 한다. 전체 큐 이름은 “projects/본인 프로젝트명/twitter” 가 된다.

데이타 플로우 프로젝트 생성

큐까지 데이타를 읽어드렸으면, 이 데이타를 처리할 데이타 플로우 파이프라인을 구현한다.

이클립스에서 데이타 플로우 파이프라인 프로젝트를 생성하자. 프로젝트 생성은 앞장의 “데이타 플로우 개발환경 설정" 부분을 참고하기 바란다. (http://bcho.tistory.com/1128)


프로젝트가 생성되었으면, 이 프로젝트에서 사용할 의존성 라이브러리들을 메이븐 (maven) 빌드 스크립트인 pom.xml에 추가해준다.

추가해야 하는 API는 JSON 파싱을 위한 javax.json-api와, javax.json 그리고 구글의 자연서 분석 API를 호출하기 위한 google-api-client와 google-api-service-language 모듈이다.


다음 코드 블럭을 <dependencies> 엘리먼트 아래 하부 엘리먼트로 추가해준다


   <dependency>

   <groupId>javax.json</groupId>

   <artifactId>javax.json-api</artifactId>

   <scope>provided</scope>

   <version>1.0</version>

</dependency>

<dependency>

   <groupId>org.glassfish</groupId>

   <artifactId>javax.json</artifactId>

   <version>1.0.4</version>

</dependency>

<!-- NL API dependency -->

<dependency>

     <groupId>com.google.apis</groupId>

     <artifactId>google-api-services-language</artifactId>

     <version>v1beta1-rev7-1.22.0</version>

   </dependency>

   <dependency>

     <groupId>com.google.api-client</groupId>

     <artifactId>google-api-client</artifactId>

     <version>1.22.0</version>

   </dependency>


데이타 플로우 코드 작성

전체 파이프라인 흐름

파이프라인 코드 작성에 앞서서 전체 파이프라인 흐름을 살펴보자

전체 흐름은 다음과 같다.


  1. Read From PubSub
    PubSub의 “twitter” 큐에서 JSON 형태의 트위터 메세지를 읽는다.

  2. Parse Twitter
    트위터 JSON 메세지를 파싱한 후, 전체 메세지에서 트윗 메세지를 저장하고 있는 “text” 필드와 언어셋을 정의하고 있는 “lang” 필드만 추출한다.
    자연어 분석 API가 아직 영어, 스페인어, 일본어만 지원하기 때문에, 이 예제에서는 영어로 트윗만 추출하도록 한다.

  3. NL Processing
    앞에서 추출한 트윗 메세지를 구글의 자연어 분석 API에 분석을 요청하여 명사와 형용사만 추출해낸다.

  4. 명사 처리 파이프라인
    다양한 처리 방식을 보여주기 위해서, 이 예제에서는 하나의 데이타 스트림을 분기 처리하여 두개의 데이타 파이프라인에서 처리하는 방식으로 구현하였다. 명사 처리 파이프라인은 다음과 같은 단계를 거친다.

    1. Noun Filter
      명사와 형용사 리스트로 들어온 데이타 중에서 명사만 필터링 한다.

    2. Window 적용
      고정 크기 윈도우 (Fixed Window) 30초를 적용하여, 30초 단위로 데이타를 분석하도록 한다.

    3. Count.PerElement
      명사 단어와, 각 단어별 발생횟 수를 30초 단위로 모아서 카운트 한다.

    4. Noun Formating
      카운트된 결과를 빅쿼리에 쓰도록, [윈도우 시작 시간,명사 단어, 발생횟수] 형태의 빅쿼리 ROW(행) 데이타 타입으로 포매팅 한다.

    5. Write Noun Count to BQ
      포매팅 된 데이타를 빅쿼리에 쓴다.

  5. 형용사 처리 파이프라인
    형용사를 처리하는 파이프라인도 내용은 명사를 처리한 파이프라인과 다르지 않고 동일하게 다음과 같은 순서를 따른다.

    1. Adj Filter

    2. Window 적용

    3. Count.PerElement

    4. Adj Formating

    5. Write Adj Count to BQ

빅쿼리 데이타 구조

빅쿼리에는 두개의 테이블에 데이타를 나눠서 저장하였다.

명사와 형용사 테이블로 각각의 테이블 명과 구조는 다음과 같다.


명사 테이블 : noun

필드명

데이타 타입

date

TIMESTAMP

noun

STRING

count

INTEGER


형용사 테이블 : adj

필드명

데이타 타입

date

TIMESTAMP

adj

STRING

count

INTEGER

자연어 분석 클래스 작성

전체 데이타 흐름과 저장 구조가 이해되었으면, 파이프라인 코드 작성에 앞서서 자연어 처리 API를 호출하는 로직을 만들어보자


우리가 사용할 API는 String으로 문자열을 주면 다음과 같이 NLAnalyzeVO 객체로 분석 결과를 리턴해주는 코드이다.


package com.terry.nl;


import java.util.ArrayList;

import java.util.List;


public class NLAnalyzeVO {

List<String> nouns = new ArrayList<String>();

List<String> adjs = new ArrayList<String>();

List<String> emoticons = new ArrayList<String>();

float sentimental;


public List<String> getNouns() {

return nouns;

}


public List<String> getAdjs() {

return adjs;

}


public List<String> getEmoticons() {

return emoticons;

}


public float getSentimental() {

return sentimental;

}


public void setSentimental(float sentimental) {

this.sentimental = sentimental;

}

public void addNouns(String n){

nouns.add(n);

}

public void addAdj(String a){

adjs.add(a);

}

public void addEmoticons(String e){

emoticons.add(e);

}

}

<NLAnalyzeVO.java>


분석 결과로는 List<String> 타입으로 명사들의 목록을 nouns 로, 형용사들의 목록을 adj로 리턴해준다. float형으로 sentimental 이라는 필드에는 입력된 문장의 감정도를 리턴하도록 되어 있다. 음수값일 때는 부정적, 양수값일 경우에는 긍정을 의미한다.

VO안에는 List<String> emoticons 라는 필드가 있는데, 이는 트위터 메세지 내의 이모티콘을 추출하여 저장하기 위한 필드인데, 이 예제에서는 사용하지 않으니 신경 쓰지 않아도 된다.


package com.terry.nl;


import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;

import com.google.api.client.http.HttpRequest;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.json.JsonFactory;

import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI.Documents.AnnotateText;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPIScopes;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesResponse;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentResponse;

import com.google.api.services.language.v1beta1.model.AnnotateTextRequest;

import com.google.api.services.language.v1beta1.model.AnnotateTextResponse;

import com.google.api.services.language.v1beta1.model.Document;

import com.google.api.services.language.v1beta1.model.Entity;

import com.google.api.services.language.v1beta1.model.Features;

import com.google.api.services.language.v1beta1.model.Sentiment;

import com.google.api.services.language.v1beta1.model.Token;


import java.io.IOException;

import java.io.PrintStream;

import java.security.GeneralSecurityException;

import java.util.List;

import java.util.Map;


/**

*

* Google Cloud NL API wrapper

*/



@SuppressWarnings("serial")

public class NLAnalyze {


public static NLAnalyze getInstance() throws IOException,GeneralSecurityException {


return new NLAnalyze(getLanguageService());

}


public NLAnalyzeVO analyze(String text) throws IOException, GeneralSecurityException{

Sentiment  s = analyzeSentiment(text);

List <Token> tokens = analyzeSyntax(text);

NLAnalyzeVO vo = new NLAnalyzeVO();


for(Token token:tokens){

String tag = token.getPartOfSpeech().getTag();

String word = token.getText().getContent();


if(tag.equals("NOUN")) vo.addNouns(word);

else if(tag.equals("ADJ")) vo.addAdj(word);

}


vo.setSentimental(s.getPolarity());


return vo;

}



/**

* Be sure to specify the name of your application. If the application name is {@code null} or

* blank, the application will log a warning. Suggested format is "MyCompany-ProductName/1.0".

*/

private static final String APPLICATION_NAME = "Google-LanguagAPISample/1.0";


/**

* Connects to the Natural Language API using Application Default Credentials.

*/

public static CloudNaturalLanguageAPI getLanguageService()

throws IOException, GeneralSecurityException {

GoogleCredential credential =

GoogleCredential.getApplicationDefault().createScoped(CloudNaturalLanguageAPIScopes.all());

JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

return new CloudNaturalLanguageAPI.Builder(

GoogleNetHttpTransport.newTrustedTransport(),

jsonFactory, new HttpRequestInitializer() {

@Override

public void initialize(HttpRequest request) throws IOException {

credential.initialize(request);

}

})

.setApplicationName(APPLICATION_NAME)

.build();

}


private final CloudNaturalLanguageAPI languageApi;


/**

* Constructs a {@link Analyze} which connects to the Cloud Natural Language API.

*/

public NLAnalyze(CloudNaturalLanguageAPI languageApi) {

this.languageApi = languageApi;

}


public List<Token> analyzeSyntax(String text) throws IOException{

AnnotateTextRequest request =

new AnnotateTextRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"))

.setFeatures(new Features().setExtractSyntax(true))

.setEncodingType("UTF16");

AnnotateText analyze =

languageApi.documents().annotateText(request);


AnnotateTextResponse response = analyze.execute();


return response.getTokens();


}

/**

* Gets {@link Sentiment} from the string {@code text}.

*/

public Sentiment analyzeSentiment(String text) throws IOException {

AnalyzeSentimentRequest request =

new AnalyzeSentimentRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"));

CloudNaturalLanguageAPI.Documents.AnalyzeSentiment analyze =

languageApi.documents().analyzeSentiment(request);


AnalyzeSentimentResponse response = analyze.execute();

return response.getDocumentSentiment();

}


}


<NLAnalyze.java>


코드 상의 주요 부분을 살펴보자

public NLAnalyzeVO analyze(String text)

메서느가 주요 메서드로, 트윗 문자열을 text 인자로 넘겨주면 분석 결과를 NLAnalyzeVO로 리턴한다.

이 메서드 안에서는 두개의 메서드를 호출하는데, analyzeSentiment(text) 와, analyzeSyntax(text)

를 두개 호출한다.

analyzeSentiment(text) 메서드는 text 를 넣으면 float 타입으로 감정도인 Sentinetal 지수를 리턴한다.

analyzeSyntax(text)는 구문을 분석하여, 명사,형용사,접속사,조사 등과 단어간의 의존 관계등을 분석해서 리턴해주는데, Token 이라는 데이타 타입의 리스트 형태로 다음과 같이 리턴한다.

List <Token> tokens = analyzeSyntax(text);


여기서 단어의 형(명사,형용사)는 token에서 tag 라는 필드를 통해서 리턴되는데, 우리가 필요한것은 명사와 형용사만 필요하기 때문에, tag가 NOUN (명사)와 ADJ (형용사)로 된 단어만 추출해서 NLAnalyzeVO 객체에 넣어서 리턴한다. (태그의 종류는 https://cloud.google.com/natural-language/reference/rest/v1beta1/documents/annotateText#Tag ) 를 참고하기 바란다.


중요

이 코드를 이용해서 구글 클라우드의 자연어 분석 API를 호출할때 그러면 API 인증은 어떻게 할까? 보통 구글 클라우드 콘솔에서 다운 받는 서비스 어카운트 키 (Service Account Key) JSON 파일을 사용하는데, 구글 자연어 분석 API를 호출하기 위해서도 서비스 어카운트 키가 필요하다.

이 키를 콘솔에서 다운로드 받은 후에, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 서비스 어카운트 키의 경로를 지정해주면 된다.


예) export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your-project-credentials.json


자연어 분석 클래스를 다 만들었으면 테스트 코드를 만들어서 테스트를 해보자.

다음은 JUnit 4.X를 이용한 간단한 테스트 코드 이다.


package com.terry.nl.test;


import static org.junit.Assert.*;


import java.io.IOException;

import java.security.GeneralSecurityException;

import java.util.List;


import org.junit.Test;


import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


public class NLAnalyzeTest {


@Test

public void test() {

try {

NLAnalyze instance = NLAnalyze.getInstance();

String text="Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'";

NLAnalyzeVO vo = instance.analyze(text);

List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();

System.out.println("### NOUNS");

for(String noun:nouns){

System.out.println(noun);

}

System.out.println("### ADJS");

for(String adj:adjs){

System.out.println(adj);

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("API call error");

} catch (GeneralSecurityException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("Security exception");

}

}


}


"Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'" 문자열을 분석하여,  명사와 형용사를 추출하여 다음과 같이 결과를 출력해준다.

### NOUNS

Larry

Page

Google

co-founder

search

engine

something

### ADJS

perfect

파이프라인 코드 작성

이제 메인 파이프라인 개발을 위한 준비가 다 되었다. 이제 TwitterPipeline 이라는 이름으로 파이프라인을 구현해보자. 전체 코드는 다음과 같다.

package com.terry.dataflow;


import java.io.IOException;

import java.io.StringReader;

import java.security.GeneralSecurityException;

import java.util.ArrayList;

import java.util.List;


import javax.json.Json;

import javax.json.JsonObject;

import javax.json.JsonReader;


import org.joda.time.DateTime;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.joda.time.format.DateTimeFormat;

import org.joda.time.format.DateTimeFormatter;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.io.BigQueryIO;

import com.google.cloud.dataflow.sdk.io.PubsubIO;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;

import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;

import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;

import com.google.cloud.dataflow.sdk.transforms.windowing.Window;

import com.google.cloud.dataflow.sdk.values.KV;

import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


import com.google.cloud.dataflow.sdk.values.PCollection;


/**

* A starter example for writing Google Cloud Dataflow programs.

*

* <p>The example takes two strings, converts them to their upper-case

* representation and logs them.

*

* <p>To run this starter example locally using DirectPipelineRunner, just

* execute it without any additional parameters from your favorite development

* environment.

*

* <p>To run this starter example using managed resource in Google Cloud

* Platform, you should specify the following command-line options:

*   --project=<YOUR_PROJECT_ID>

*   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>

*   --runner=BlockingDataflowPipelineRunner

*/

public class TwitterPipeline {

private static final Logger LOG = LoggerFactory.getLogger(TwitterPipeline.class);

private static final String NOWN_TABLE=

"useful-hour-138023:twitter.noun";

private static final String ADJ_TABLE=

"useful-hour-138023:twitter.adj";


// Read Twitter feed as a JSON format

// extract twitt feed string and pass into next pipeline

static class ParseTwitterFeedDoFn extends DoFn<String,String>{


private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}


// Parse Twitter string into

// - list of nouns

// - list of adj

// - list of emoticon


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{ /**

*

*/

private static final long serialVersionUID = 3013780586389810713L;


// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}



static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}

}

}


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}


}


static class AddTimeStampAdj extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("adj", key)

.set("count", value);


c.output(row);

}


}

static class AdjFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("ADJ")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

c.output(value);

}

}

}


static class Echo extends DoFn<KV<String,Iterable<String>>,Void>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

}

}


}

public static void main(String[] args) {

Pipeline p = Pipeline.create(

PipelineOptionsFactory.fromArgs(args).withValidation().create());


@SuppressWarnings("unchecked")

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));



// Noun handling sub-pipeline

List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


// Adj handling sub-pipeline

fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("adj").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Adj Formating").of(new AddTimeStampAdj()) )

.apply(BigQueryIO.Write

.named("Write Adj Count to BQ")

.to( ADJ_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));





p.run();

}


}

<TwitterPipeline.java>

코드를 하나씩 분석해보자.

먼저 main함수 부분을 보자

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));

<TwitterPipeline.java에서 main() 함수 일부 >


파이프 라인이 시작되면, PubSubIO를 이용하여 “projects/useful-hour-138023/topics/twitter” 이름의 큐에서 데이타를 읽는다. 읽은 데이타는 ParseTwitterFeedDoFn() 라는 함수에서 파싱이 된다.

ParseTwitterFeedDoFn() 은 다음과 같다.


static class ParseTwitterFeedDoFn extends DoFn<String,String>{

private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}

<TwitterPipeline.java 에서 ParseTwitterFeedDoFn 클래스 구현부>


PubSub에서 읽어드린 데이타는 문자열로 안에 JSON 데이타를 가지고 있다. 이 JSON 문자열을 파싱해서 “text”와 “lang” 엘리먼트만 추출한 후에, “lang”이 “en”(영어) 인 경우에만 다음 파이프라인으로 “text”에서 추출한 문자열을 보내고, 영어가 아닌 경우에는 데이타를 무시한다.


다음은 NLAnalyticsDoFn에서 트윗 문자열을 받아서 자연어 분석을 한다.


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{

// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}

<TwitterPipeline.java 에서 NLAnalyticsDoFn 클래스 구현부>


앞에서 작성한 자연어 분석 클래스인 NLAnalyze 클래스를 이용하여 text를 넘기고, 리턴 값으로 NLAnalyzeVO를 리턴 값으로 받은 후, 명사는 KV<String,Iterable<String>> 타입으로 다음과 같을 저장해서 c.output을 이용해서 다음 파이프라인으로 넘기고

“NOUN”

명사1,명사2,명사3,...


마찬가지 방법으로 형용사도 같은 데이타 형인 KV<String,Iterable<String>> 타입으로 저장하여 다음 파이프라인으로 넘긴다.


이 데이타를 각각 명사와 형용사 두개의 처리 파이프라인으로 전달하는데, 두개의 파이프라인으로 단일 데이타를 보내는 방법은 다음과 같다.


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

: (중략)


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

: (중략)

nlprocessed는 PCollection 타입으로, NLAnalyticsDoFn에 의해서 처리된 결과이다.

이 결과 값에 두 개의 각각 다른 트랜스폼 (NounFilter와 AdjFilter)를 적용하였다.

이렇게 하나의 PCollection 값에 두 개의 트랜스폼을 각각 적용하면 적용된 각각의 파이프라인은 다른 파이프라인으로 아래 그림 처럼 분기 처리가 된다.


자아 그러면, 명사 처리 파이프라인 흐름을 따라가 보자

nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

< TwitterPipeline.java의 main() 함수 일부>


첫번째 NounFilter에서는 앞에 파이프라인에서 들어온 명사와 형용사중에서 명사만 필터링 해서 다음 파이프라인으로 전달한다.


static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}


}

}

<TwitterPipeline.java 에서 NounFilter 클래스 구현부>

명사 인지 형용사 인지는 앞에서 넘어오는 데이타 형이 KV<String, .. > 인데, 키 부분의 값이 “NOUN” 일 경우에 명사이기 때문에, 이 값이 아니면 무시한다. 명사인경우에도 종종 쓰레기 값이 들어오는데, 예를 들어 트위터 특성상 해쉬 태그등을 위해서 “#”이 사용되고, 링크를 위해서 “http…” 링크가 들어가기도 하는데 이는 명사가 아니기 때문에 이 내용은 모두 필터링해서 무시한다.


이렇게 정재된 데이타는 파이프라인의 다음 단계인 .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))) 를 통해서 30초 단위의 고정 윈도우가 적용되고, 다음  .apply(Count.<String>perElement()) 을 통해서 단어별로 그룹핑되서 카운트 되고 그 결과는 앞서 적용한 30초 윈도우 시간 단위로 다음 파이프 라인으로 전달된다.  전달되는 데이타의 모양은 대략 다음과 같다.

Key (String)

Value (Long)

airplane

100

boy

29

india

92


이렇게 전달된 데이타는 빅쿼리에 저장하기 위해서 빅쿼리의 ROW 데이타 타입은 TableRow로 변환한다.

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


<TwitterPipeline.java 에서 main() 함수중 >

AddTimeStampNoun()에서 이 작업을 수행하는데, 이 함수는 윈도우의 시간을 추출하여 data라는 필드에 추가해준다.  아래는 AddTimeStampNoun()  함수의 코드이다.


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}

}

<TwitterPipeline.java 에서 AddTimeStampNoun 클래스 구현부>


여기서 주의할점은 윈도우에 대한 데이타를 접근하기 위해서는 com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implementation 해야 한다.  그후에, 현재 윈도우에 대한 정보는 ProcessContext c 변수에서 c.window() 함수를 이용하면 윈도우의 정보를 읽어올 수 있다. 이 코드에서는 윈도우 시작 시간을 IntervalWindow w의 w.start() 를 통해서 읽어왔고, 이를 빅쿼리의 TIMESTAMP  데이타 타입으로 넣기 위해서 “yyyy-MM-dd HH:mm:ss” 형태로 포매팅을 한후 TableRow라는 빅쿼리의 row형 데이타 타입으로 생성한 후, 다음 파이프라인으로 넘겼다.


다음 파이프라인은 BigQueryIO로 Write 명령을 이용해서 NOWN_TABLE 에 (String 값은 “noun”) 데이타를 쓰도록 하였고, 쓰기 모드는 붙여쓰기 WRITE_APPEND로 하고, 테이블은 없으면 생성하도록 CREATE_IF_NEEDED로 지정하였다. 이때 테이블의 스키마를 정의해줘야 하는데, 테이블 스키마는 withSchema(schema) 함수로 지정을 했는데, 스키마를 정의한  schema 변수는 다음과 같이 정의 되어 있다.


List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);

<TwitterPipeline.java 에서 main() 중의 “noun” 테이블 스키마 정의 부분>


같은 방식으로 형용사를 처리하는 파이프라인도 정의를 한다음 정의가 끝났으면

p.run();

을 이용하여 파이프라인이 실행되도록 한다.

실행하기

모든 코드 구현이 끝났다. 이제, 파이프라인을 기동해보자

이클립스에서 파이프라인을 구동하는데, Run Configuration 부분을 아래와 같이 설정한다.


Runner를  DataflowPipelineRunner를 선택한다. BlockingPipeRunner의 경우에는 파이프라인이 기동되는 동안 이클립스에 프로그램이 실행중인것으로 되서, 이클립스에서 파이프라인을 멈춰버리면 전체 파이프라인이 멈추기 때문에 적절하지 않다.

다음 Argument 탭에서 아래와 같이 Program Argument에  --streaming 옵션을 추가한다.


데이타 플로우는 배치 및 스트리밍 모드 두개가 있는데, 이 예제는 스트리밍 예제이기 때문에, --streaming을 명시적으로 지정한다.

구글 자연어 분석 API에 대한 인증을 위해서 서비스 어카운트 키 (JSON 파일의 경로)를 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 설정해야 하는데, Environment 탭에서 New를 누른 후, GOOGLE_APPLICATION_CREDENTIALS를 Name으로 하고 Value 부분에 서비스 어카운트 키 파일의 경로를 적어준다.



환경 설정이 끝났으면 아래 Run 버튼을 눌러서 파이프라인을 기동시킨다.

파이프라인을 기동 시키면 구글 클라우드로 소스를 배포하고 인스턴스를 구동하는데 까지 수분이 걸리기 때문에 잠시 기다린다.

기다리는 동안 배포 상태를 보기 위해서, 구글 클라우드 콘솔로 들어가면 아래와 같이 Status가 Running으로 바뀔때 까지 기다린다.



Running으로 바뀌고 나서도 1~2분 정도 준비가 필요하기 때문에 기다렸다가 해당 JOB을 확인해보면 다음과 같이 잡이 정상적으로 기동 되고 있음을 확인할 수 있다.



작업이 실행되었으면 이 파이프라인에 데이타를 넣어주기 위한 Fluentd 에이전트를 실행해보자. Fluentd를 설치한 VM에 들어가서,

%sudo /etc/init.d/td-agent restart

명령을 이용해서 fluentd 에이전트를 가동한다.


데이타가 들어오기 시작하면 다시 구글 클라우드 콘솔의 데이타 플로우 화면을 보면 (위의 그림)

상단에 LOGS 라는 버튼을 볼 수 있는데



이 버튼을 누르면 죄측 하단에 다음과 같이 Job Logs라는 윈도우가 나타난다.



여기서 오른쪽의 “WORKER LOGS” 라는 버튼을 누르면 이 파이프라인의 전체 로그를 볼 수 있는데, 에러가 없는지를 잘 확인 한다.



별도의 에러가 없다면 정상적으로 데이타가 수집된다고 할 수 있다.

그러면 데이타가 제대로 수집되는지를 확인해보자

빅쿼리 콘솔로 들어가서 select count(*) from [noun 테이블명] LIMIT 1000

을 수행해서 데이타가 제대로 들어오는지 확인해보자


위의 그림과 같이 f0_가 0 이상이면 데이타가 쌓이고 있다고 생각해도 된다.

데이타 시각화와 분석

데이타 스튜디오(Google datastudio) 를 이용한 데이타 분석

쌓여 있는 데이타를 실제로 분석해보자. 리포트를 이용해서 시각화를 할 예정인데, 여기서 사용한 리포팅 도구는 구글 데이타스튜디오 라는 리포트 도구이다. (http://datastudio.google.com) 으로 9월 현재는 미국 지역만을 대상으로 서비스가 되고 있고, 곧 한국에 서비스가 오픈될 예정이다.


우리가 만들려고 하는 리포트는 다음과 같은 모양을 갖는다

전체 기간동안 가장 많이 발생한 명사 10개와 그 발생 회수를 표로 출력해주고, 우측에는 전체기간이 아닌 일자별로 많이 발생한 명사 10개에 대한 발생 회수 및 그 변화 추이를 출력해준다.

다음 행에는 형용사에 대한 분석 결과를 출력해준다.



새로운 리포트 생성

데이타 스튜디오 메인 화면에 들어오면 작성한 리포트 목록들이 아래와 같이 출력된다.


여기서 + 버튼을 누르면 아래와 같이 새로운 리포트를 생성할 수 있다.


새로운 리포트 화면에 들어오면 우측 하단에 “CREATE NEW DATA SOURCE”라는 버튼이 나타나는데, 이를 통해서 빅쿼리 테이블을 불러올 수 있다. “CREATE NEW DATA SOURCE” 버튼을 눌러보자


데이타 소스 생성해서 빅쿼리 테이블을 불러와야하는데, 데이타 스튜디오는 아래 그림에서와 같이 빅쿼리 뿐만 아니라 구글의 MySQL 서비스인 CloudSQL에서 부터 일반 MySQL 까지 연결이 가능하기 때문에, 빅쿼리 뿐 아니라 일반 데이타 분석에서 분석된 데이타르르 MySQL을 통해서 리포트로 시각화할 수 있고,

Google Sheet에 있는 데이타를 불러와서 같이 표현할 수 있는 기능을 제공하는데, 이는 특히 비지니스나 영업쪽에서 작성한 Sheet의 데이타를 실시간으로 읽어다가 하나의 리포트에 표현할 수 있기 때문에 매우 유용하게 사용할 수 있다.

아울러 YouTube나 Google Analytics 그리고, Adwords 광고 플랫폼등 다양한 구글 플랫폼의 데이타를 읽어서 시각화할 수 있다.


연동 소스 중에서 빅쿼리를 선택한 다음 프로젝트와 데이타셋 그리고 연동하고자 하는 테이블을 선택한다. 여기서는 noun 테이블을 선택하였다. 그러면 다음과 같이 테이블 스키마가 나오고 ADD TO REPORT 버튼이 나온다.


ADD TO REPORT를 눌러서 리포트에 추가하자

다음 리포트 화면에서 다음과 같이 Table 버튼을 눌러서 테이블을 추가하자


테이블을 추가하면 우측에 테이블에 출력하고자 하는 데이타를 선택할 수 있다.


우측에 Data source는 아까 불러들인 “noun”테이블을 선택하고, Dimension은 noun을 선택하고, Metric은 count를 추가하면 명사(noun)별, 발생횟수 (count)를 출력해준다.

위의 표를 보면 note7 등의 단어가 나오는데, 당연히 note7에 대해 검색했기 때문에  note7 단어가 많이 나오겠지만 이는 분석에서 얻고자 하는 데이타가 아니기 때문에 이 note7 등의 불필요한 문자열은 필터링해서 없애버리도록 한다.

필터는 우측 하단에 “Fiter”라는 메뉴에서 추가가 가능한데


메뉴에서 “+Add a filter”를 선택한후


Exclude (제외한다) 라는 버튼을 선택한 후에, Dimension을 noun으로 Match type을 Equal to 로 Expression을 note7 으로 선택하면 noun 필드에서 값이 note7인 내용은 제외 하도록 하는 필터이다. 필터가 적용되면 note7 단어는 필터링되서 출력되지 않는다.


다음으로 꺽은선 그래프를 추가하기 위해서 화면에서 Time series 버튼으로 꺽은선 그래프를 추가한다.



꺽은선 그래프가 추가되면 그래프에 출력될 데이타를 Time series Properties에서 다음과 같이 설정한다.


Data source는 noun 테이블로 하고, Dimention을 Time Dimention은 date로 하고, 여러 필드를 같이 분석하기 위해서 Breakdown Dimension에 noun을 추가하면 하나의 명사가 아니라 주요 명사들을 출력해준다.

그리고 Metric은 count로 선택하면 각 명사별 카운트수를 일자별로 볼 수 있다.


같은 방법으로 형용사에 대한 그래프도 추가한다.


그래프가 완성된 후에, 데이타를 수집 및 분석해보니 꽤나 의미가 있는 분석 결과를 얻을 수 있었다.


다음글에서는 오픈소스 데이타 분석 도구인 제플린을 이용하여 상세 데이타 분석을 하는 방법에 대해서 알아보기로 한다.

이글에서 소개한 데이타 스튜디오는 아직 한국에서는 서비스가 제공되지 않기 때문에, 한국에서 사용하고자 하는 사람들에게는 다음글의 제플린 기반의 데이타 분석이 훨씬 더 유용하리라 생각된다.





파이어베이스 애널러틱스를 이용한 모바일 데이타 분석

#3 빅쿼리에 연동하여 모든 데이타를 분석하기


조대협 (http://bcho.tistory.com)


파이어베이스 애널러틱스의 대단한 기능중의 하나가, 모바일에서 올라온 모든 원본 로그를 빅쿼리에 저장하고, 이를 빅쿼리를 통해서 분석할 수 있는 기능이다. 대부분의 매니지드 서비스 형태의 모바일 애널리틱스 서비스는 서비스에서 제공하는 지표만, 서비스에서 제공하는 화면을 통해서만 볼 수 있기 때문에, 상세한 데이타 분석이 불가능하다. 파이어베이스의 경우에는 빅쿼리에 모든 원본 데이타를 저장함으로써 상세 분석을 가능하게 해준다.


아울러, 모바일 서비스 분석에 있어서, 상세 로그 분석을 위해서 로그 수집 및 분석 시스템을 별도로 만드는 경우가 많은데, 이 경우 모바일에 설치될 로그 수집 에이전트에서 부터 로그를 수집하는 API 서버, 이를 저장하기 위한 분산 큐(카프카 Kafka)와 같은 복잡한 백앤드 시스템을 설계 구현해야 하는데, 파이어베이스 애널러틱스의 로깅 기능을 이용하면 별도의 이런 인프라 구현이 없이도 손쉽게 로그를 수집 및 분석할 수 있다. (일종의 무임 승차라고나 할까?)


가격 정책

그렇다면 가장 고민이 되는 것이 가격 정책일 것이다. 파이어베이스 애널러틱스에서 빅쿼리에 데이타를 저장하려면 파이어베이스 플랜중 무료가 아닌 유료 플랜인 Blaze 플랜을 사용해야 한다.

그러나, 다행이도 Blaze 플랜은 “Pay as you go” 모델로 사용한 만큼 비용을 지불하는 모델인데, “Google Cloud Integration”은 별도의 비용이 부과 되지 않는다.



단지 빅쿼리에 대한 비용만 부담이 되는데, 빅쿼리의 경우 데이타 로딩은 무료이고, 저장 요금 역시 GB당 월 0.02$ (약 22원)이며, 90일동안 해당 데이타를 사용하지 않으면 이 요금은 50%로 자동 할인되서 GB당 월 0.01$(약 11원)만 과금된다. 이외에 쿼리당 비용이 과금되는데, 쿼리당 비용은 쿼리에서 스캔한 데이타 용량 만큼만 과금이 된다. TB를 쿼리 했을때 5$가 과금이되는데, 이역시 전체 테이블을 스캔을 하는것이 아니라, 쿼리에서 스캔하는 컬럼에 대해서만 과금이 되고, 전체 테이블이 아니라, 쿼리에서 스캔하는 날짜만 과금이 되기 때문에, 실제 과금 금액은 미미하다고 볼 수 있다. 실제로 실 서비스에서 모 앱의 하루 데이타를 수집한 경우 17만건의 이벤트가 수집되었는데 저장 용량은 전체 350 MB에 불과하다. 전체 컬럼을 스캔한다고 하더라도 (전체 컬럼을 스캔할 일은 없겠지만….) 쿼리 비용은 0.00175$에 불과하다.


파이어베이스 애널러틱스와 빅쿼리를 연동하여 데이타 수집하기

파이어베이스 애널러틱스에서 데이타를 빅쿼리로 수집하기 위해서는 앞에서 언급한바와 같이 먼저 파이어베이스 플랜을 Blaze로 업그레이드 해야 한다. 파이어베이스 콘솔 좌측 하단을 보면 아래와 같이 UPGRADE 버튼이 있다. 이 버튼을 눌러서 Blaze 플랜으로 업그레이드를 하자


다음으로 파이어베이스 애널러틱스 프로젝트를 빅쿼리와 연결을 해줘야 한다.

파이어베이스 콘솔 좌측 상단에서 설정 버튼을 누른 후에, Project settings 메뉴를 선택한다.


프로젝트 세팅 메뉴에 들어가서 상단 메뉴중에 ACCOUNT LINKING이라는 메뉴를 선택한다.


그러면 구글 플레이나 광고 플랫폼등과 연결할 수 있는 메뉴와 함께 아래 그림처럼 빅쿼리로 연결할 수 있는 메뉴와 “LINK TO BIGQUERY”라는 버튼이 화면에 출력된다.


이 버튼을 누르면 작업은 끝났다. 이제부터 파이어베이스의 모든 로그는 빅쿼리에 자동으로 수집되게 된다.

만약에 수집을 중단하고 싶다면 위의 같은 화면에서 LINK TO BIGQUERY라는 버튼이 MANAGE LINKING으로 바뀌어 있는데, 이 버튼을 누르면 아래와 같이 App Details가 나온다.



여기서 스위치 버튼으로 Send data to BigQuery를 끔 상태로 변경해주면 된다.

이제 부터 대략 한시간 내에, 데이타가 빅쿼리에 수집되기 시작할 것이다.  

수집 주기

그러면 파이어베이스 애널러틱스에서는 어떤 주기로 데이타를 수집하고 수집된 데이타는 언제 조회가 가능할까? 이를 이해하기 위해서는 앱 로그 수집에 관여되는 컴포넌트와 흐름을 먼저 이해할 필요가 있다.

로그 수집이 가능한 앱은 크게, 구글 플레이 스토어에서 배포되는 앱, 구글 플레이 스토어를 통하지 않고 배포되는 앱 그리고 iOS 앱 3가지로 나눌 수 있다.

이 앱들이 파이어베이스 서버로 로그를 보내는 방식은 앱마다 약간씩 차이가 있다.


  • 플레이스토어에서 다운 받은 앱 : 각 개별 앱이 이벤트 로그를 수집하여 저장하고 있다가 1시간 주기로, 모든 앱들의 로그를 모아서 파이어베이스 서버로 전송한다.

  • 플레이스토어에서 다운받지 않은 앱 : 플레이스토어에서 다운로드 받은 앱과 달리 다른 앱들과 로그를 모아서 함께 보내지 않고 한시간 단위로 로그를 모아서 개별로 파이어베이스에 전송한다.

  • iOS 앱 : 앱별로 한시간 단위로 로그를 모아서 파이어베이스 서버로 전송한다.


이렇게 앱에서 파이어베이스 서버로 전송된 데이타는 거의 실시간으로 구글 빅쿼리에 저장된다.

그러나 파이어베이스 애널러틱스의 대쉬 보다는 대략 최대 24시간 이후에 업데이트 된다. (24시간 단위로 분석 통계 작업을 하기 때문이다.)


이 전체 흐름을 도식화 해보면 다음과 같다.



수집된 데이타 구조

그러면 빅쿼리에 수집된 테이블은 어떤 구조를 가질까?

테이블 구조를 이해하기 전에 테이블 종류를 먼저 이해할 필요가 있다.

앱에서 수집한 로그는 안드로이드와 iOS 각각 다른 데이타셋에 저장되며, 테이블 명은

  • app_events_YYYYMMDD

가 된다. 2016년 8월30일에 수집한 로그는  app_events_20160830 이 된다.



Intraday 테이블

여기에 intraday 테이블이라는 개념이 존재하는데, 이 테이블은 app_events_intraday_YYYYMMDD 라는 이름으로 저장이 되는데, 이 테이블은 실시간 데이타 수집을 목적으로 하는 테이블로 오늘 데이타가 저장된다. 예를 들어 오늘이 2016년9월1일이라면, app_events테이블은 app_events_20160831 까지만 존재하고, 9월1일자 데이타는 app_events_intraday_20160901 이라는 테이블에 저장된다.

9월1일이 지나면 이 테이블은 다시 app_events_20160901 이라는 이름으로 변환된다.

intraday 테이블의 특성중의 하나는 몇몇 필드들은 값이 채워지지 않고 NULL로 반환된다. 모든 데이타를 수집하고 배치 연산을 통해서 계산이 끝나야 하는 필드들이 그러한데, LTV 값과 같은 필드가 여기에 해당한다.


여기서 주의할점 중의 하나가 intraday 테이블이 하나만 존재할것이라는 가정인데. 결론 부터 이야기 하면 최대 2개가 존재할 수 있다. 9월1일 시점에  app_events_intraday_20160901 테이블이 존재하다가 9월2일이 되면 app_events_intraday_20160902 테이블이 생성된다. app_events_intraday_20160901 를 app_events_20160901 테이블로 변환을 해야 하는데, 단순히 복사를 하는 것이 아니라, 배치 연산등을 수행하기 때문에 연산에 다소 시간이 걸린다. 그래서 연산을 수행하는 동안에는 app_events_intraday_20160901 테이블과 app_events_intraday_20160902이 동시에 존재하고, 9월1일 데이타에 대한 연산이 종료되면 app_events_intraday_20160901 은 app_events_20160901 로 변환 된다.  

테이블 스키마

빅쿼리에 저장된 데이타의 테이블 구조를 이해하기 위해서 빅쿼리의 데이타 저장 특성을 이해할 필요가 있는데, 빅쿼리는 테이블 데이타 구조를 가지면서도 JSON과 같이 컬럼안에 여러 컬럼이 들어가는 RECORD 타입이나, 하나의 컬럼안에 여러개의 데이타를 넣을 수 있는  REPEATED 필드라는 데이타 형을 지원한다.



<그림. 레코드 타입의 예>

레코드 타입은 위의 그림과 같이 Name이라는 하나의 컬럼 내에 Last_name과 First_name이라는 두개의 서브 컬럼을 가질 수 있는 구조이다.

아래는 REPEATED 필드(반복형 필드)의 데이타 예인데, Basket이라는 컬럼에 Books,Galaxy S7, Beer 라는 3개의 로우가 들어가 있다.


<그림. 반복형 필드 예>

이런 구조로 인하여, 빅쿼리는 JSON과 같이 트리 구조로 구조화된 데이타를 저장할 수 있고, 실제로 파이어베이스 애널러틱스에 의해 수집되어 저장되는 데이타도 JSON과 같은 데이타 구조형으로 저장이 된다.

많은 데이타 필드가 있지만, 큰 분류만 살펴보면 다음과 같은 구조를 갖는다.



하나의 레코드는 하나의 앱에서 올라온 로그를 나타낸다. 앱은 앞의 수집 주기에 따라서 한시간에 한번 로그를 올리기 때문에, 하나의 레코드(행/로우)는 매시간 그 앱에서 올라온 로그라고 보면 된다.


가장 상위 요소로 user_dim과, event_dim이라는 요소를 가지고 있다.

user_dim은 사용자나 디바이스에 대한 정보를 주로 저장하고 있고, event_dim은 앱에서 발생한 이벤트들을 리스트 형태로 저장하고 있다.

user_dim에서 주목할만한 것은 userid에 관련된 것인데, userid는 사용자 id 이지만, 파이어베이스가 자동으로 수집해주지는 않는다. 개발자가 앱의 파이어베이스 에이전트 코드에서 다음과 같이 setUserId 메서드를 이용해서 설정해줘야 빅쿼리에서 조회가 가능하다. (앱 서비스의 계정을 세팅해주면 된다.)

mFirebaseAnalytics.setUserId(Long.toString(user.id));

다음 주목할 필드는 user_dim에서 app_info.app_instance_id 라는 필드인데, 이 필드는 각 앱의 고유 ID를 나타낸다. 파이어베이스가 자동으로 부여하는 id로 설치된 앱의 id이다.

예를 들어 내가 갤럭시S7과 노트7를 가지고 같은 앱을 설치했다고 하더라도 각각 다른 디바이스에 설치되었기 때문에 각각의 앱 id는 다르다.


다음은 event_dim인데, event_dim은 이벤트들로 레코드들의 배열(리스트)로 구성이 되고 각각의 이벤트는 이벤트 이름과 그 이벤트에 값을 나타내는 name 과 params라는 필드로 구성이 되어 있다.  params는 레코드 타입으로 여러개의 인자를 가질 수 있고, params내의 인자는 또 각각 key와 value식으로 하여 인자의 이름과 값을 저장한다. values는 string_value,int_value,double_value 3가지 서브 필드를 가지고 있는데, 인자의 타입에 따라서 알맞은 필드에만 값이 채워진다. 예를 들어 인자의 타입이 문자열 “Cho” 이고, 인자의 이름이 “lastname”이면, params.key “lastname”이 되고, params.value.string_value=”Cho”가 되고 나머지 필드인 params.value.int_value와 params.value.float.value는 null이 된다.


   "event_dim": [

     {

       "name": "Screen",

       "params": [

         {

           "key": "firebase_event_origin",

           "value": {

             "string_value": "app",

             "int_value": null,

             "float_value": null,

             "double_value": null

           }

         },

         {

           "key": "Category",

           "value": {

             "string_value": "Main",

             "int_value": null,

             "float_value": null,

             "double_value": null

           }

         },

      ]

    },

     {

       "name": "Purchase",

       "params": [

         {

           "key": "amount",

           "value": {

             "string_value": null,

             "int_value": “5000”,

             "float_value": null,

             "double_value": null

           }

         }

         },

      ]

    },


위의 예제는 빅쿼리에 저장된 하나의 행을 쿼리하여 JSON형태로 리턴 받은 후, 그 중에서 event_dim 필드 내용 일부를 발췌한 것이다.

Screen과 Purchase라는 두개의 이벤트를 받았고,

Screen은 firebase_event_origin=”app”, Category=”main” 이라는 두개의 인자를 받았다.

Purchase는 amount=5000 이라는 정수형 인자 하나를 받았다.


전체 빅쿼리의 스키마는 다음과 같이 되어 있다.




파이어베이스 애널러틱스에서 빅쿼리로 저장된 테이블 스키마에 대한 상세는 https://support.google.com/firebase/answer/7029846?hl=en 를 참고하기 바란다.


구글 빅쿼리에 대한 자료 아래 링크를 참고하기 바란다.


  1. 2016.08.01 빅쿼리를 이용하여 두시간만에 트위터 실시간 데이타를 분석하는 대쉬보드 만들기

  2. 2016.07.31 빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개

  3. 2016.06.18 빅쿼리-#3 데이타 구조와 접근(공유) (3)

  4. 2016.06.16 구글 빅데이타 플랫폼 빅쿼리 아키텍쳐 소개

  5. 2016.06.15 구글 빅데이타 플랫폼 빅쿼리(BIGQUERY)에 소개

  6. 빅쿼리로 데이타 로딩 하기 http://whitechoi.tistory.com/25


다음은 데이타랩을 통하여 데이타를 직접 분석해보도록 하겠다.


Fluentd + Bigquery + Jupyter를 이용한 초간단 BI 구축하기


조대협

얼마전에 빅데이타의 전문가로 유명한 김형준님이 "Presto + Zeppelin을 이용한 초간단 BI 구축 사례"라는 발표 자료를 보았다. http://www.slideshare.net/babokim/presto-zeppelin-bi 오픈 소스 기술들을 조합하여, 초간단하게 빅데이타 분석 플랫폼을 만든 사례 인데, 상당히 실용적이기도 하고, 좋은 조합인것 같아서, 마침 구글 빅쿼리에 대한 자료를 정리하던중 비슷한 시나리오로 BI 대쉬 보드를 만들어보았다.

Fluentd를 이용해서 실시간으로 데이타를 수집하고, 이를 빅쿼리에 저장한 다음에 iPython nodebook (aka Jupyter)로 대쉬보드를 만드는 예제이다. 일부 제품에 대한 지식이 없었음에도 불구하고 실제 설정은 대략 2시간 정도 걸렸다.


아래 이제 예제는 정상적으로 작동 하지 않습니다. 트위터에서 JSON 스키마를 변경했는데, 거기에 맞는 빅쿼리 JSON 스키마를 구하기가 어렵네요. (만들자니 귀찮고). 참고로만 사용하세요



Fluentd 설치

예제는 Google Cloud에서 Ubuntu Linux 14.x VM에서 Fluentd를 이용하여 Twitter에서 데이타를 읽은 후, 빅쿼리에 데이타를 로딩하는 시나리오이다.

VM 생성

Fluentd를 설치할 VM을 생성해보자. 구글 클라우드 콘솔에서 아래 그림과 같이 VM을 생성할때, “Identity and API access” 부분에  “Allow full access to all Cloud APIs”를 선택한다. 이를 선택해서 이 VM이 모든 구글 클라우드 API에 대한 접근 권한 (BigQuery 포함)을 가지도록 한다.


tdagent 설치

생성한 VM에 fluentd의 로그 수집 에이전트인 tdagent를 설치한다.

tdagent는 OS나, 또는 같은 OS라도 OS 버전별로 설치 방법이 다르기 때문에, 버전별 설치 방법은 http://www.fluentd.org를 참고하기 바란다.

여기서는 Ubuntu 14.x를 기준으로 진행을 하였다.

다음 명령어를 실행하면 tdagent가 설치된다.

% curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

설치한 후 에이전트를 실행해서 확인해보자. 다음 명령으로 agent를 실행한 후에,

% sudo /etc/init.d/td-agent restart

실행이 끝난 후에 다음 명령으로 설치를 확인한다.

% sudo /etc/init.d/td-agent status


참고 (tdagent 관련 명령어)

tdagent 기동 - $sudo /etc/init.d/td-agent start
tdagent 정지 - $sudo /etc/init.d/td-agent stop
tdagent 재시작 - $sudo /etc/init.d/td-agent restart
tdagent 상태확인 - $sudo /etc/init.d/td-agent status




트위터 Input 설정하기

tdagent 에이전트 설치가 끝났으면 fluentd를 이용해서 트위터 피드를 읽어드리도록 해보자.

트위터 API 키 받기

트위터 피드는 트위터에서 제공하는 OPEN API를 통해서 읽어드린다. 그래서 이 OPEN API에 접근하기 위해서는 OPEN API키가 필요하다.

OPEN API 키는 https://apps.twitter.com/ 에 접속하고 Create New App 메뉴를 이용하면 새로운 앱을 등록할 수 있고, 여기에 Fluentd 앱을 정의해서 정보를 넣어주고 Key and secrect을 생성해주면 다음과 같이 키가 생성된 것을 웹에서 확인할 수 있다.


여기서 필요한 키값은 Consumer Key, Consumer Secret, Access Token, Access Token Secret 4가지가 필요하다.

트위터 플러그인 설치하기

API 접근을 위한 API Key를 모두 얻었으면 이제 fluentd에서 트위터 피드를 읽기 위한 트위터 플러그인을 설치해보자.

트위터 API는 libssl에 대한 의존성이 있기 때문에, libssl를 먼저 설치한다.

%sudo apt-get install build-essential libssl-dev

다음 트위터 플러그인이 사용하는 eventmachine 플러그인과, 트위터 플러그인을 설치한다.

% sudo td-agent-gem install eventmachine

% sudo td-agent-gem install fluent-plugin-twitter

설정하기

플러그인 설치가 끝났으면 설정을 해보자. 설정 파일은 /etc/td-agent/td-agent.conf 에 있다.

이 파일을 다음과 같이 편집하자.


<source>

 type twitter

 consumer_key        {앞서 트위터 콘솔에서 받은 Consumer Key}

 consumer_secret     {앞서 트위터 콘솔에서 받은 Consumer  secret}

 oauth_token         {앞서 트위터 콘솔에서 받은 Access token}

 oauth_token_secret {앞서 트위터 콘솔에서 받은 Access token secret}

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             galaxy,game        # 검색어

 output_format       nest                   # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type stdout

</match>


이 설정 파일은 keyword에 등록된 “galaxy”와 “game” 이라는 키워드를 찾아서, 읽어드린후 <match input.twitter.sampling> 에 의해서, 읽어드린 내용을 stdout으로 출력해주는 설정이다.

테스트

설정이 끝났으면 확인을 해보자

% sudo /etc/init.d/td-agent restart

명령어를 수행하여, td-agent를 리스타트 해서 새로운 config 파일이 반영되도록 하고

% tail -f /var/log/td-agent/td-agent.log          

를 통해서 stdout으로 올라오는 로그를 확인하자. 제대로 데이타가 수집되는 것을 확인했으면 다음 명령어를 이용해서, td-agent를 정지 시키자.

% sudo /etc/init.d/td-agent stop


빅쿼리로 저장하기

twitter로 부터 피드를 읽어드리는 플러그인이 정상적으로 작동함을 확인하였으면, 이번에는 읽어드린 데이타를 빅쿼리로 저장해보자.

빅쿼리 플러그인 설치 및 테이블 생성

빅쿼리로 데이타를 쓰기 위해서 빅쿼리 플러그인을 설치한다.

% sudo td-agent-gem install fluent-plugin-bigquery


다음으로 빅쿼리 프로젝트에서 트위터 데이타를 저장할 데이타셋과 테이블을 생성한다.

데이타 셋 이름은 편의상 “twitter”라고 하고, 테이블은 “ timeline”이라고 하고 생성을 하겠다.

테이블의 스키마는 트위터 피드에 대한 데이타 구조를 빅쿼리 스키마로 만들어놓은 스키마가 이미 https://gist.github.com/Salinger/ef39b81ad2c48516b596

에 있기 때문에, 이 스키마 파일을 읽어서 빅쿼리 콘솔에서 아래 그림과 같이 Schema 부분에 Copy & Paste를 해서 붙이면 테이블이 생성된다.


설정하기

테이블이 생성이 되었으면 fluentd 설정 파일을 수정하여 트위터 피드를 이 테이블에 저장하도록 설정한다.


<source>
 type twitter
   consumer_key        {앞서 트위터 콘솔에서 받은 Consumer Key}

 consumer_secret     {앞서 트위터 콘솔에서 받은 Consumer  secret}

 oauth_token         {앞서 트위터 콘솔에서 받은 Access token}

 oauth_token_secret {앞서 트위터 콘솔에서 받은 Access token secret}

 tag                 input.twitter.sampling  # Required
 timeline            sampling                # Required (tracking or sampling or location or userstream)
 keyword             hillary,clinton,donald,trump
 output_format       nest                    # Optional (nest or flat or simple[default])
</source>

<match input.twitter.sampling>
 type copy
<store>
  type bigquery
  buffer_type file
  buffer_path /var/log/td-agent/buffer/twi.*.buf
  method insert

  auth_method compute_engine
  project useful-hour-138023
  dataset twitter
  table timeline

  flush_interval 1
  buffer_chunk_limit 1000000
  buffer_queue_limit 5000
  flush_interval 1
  try_flush_interval 0.05
  num_threads 4
  queue_chunk_flush_interval 0.01

  time_format %s
  time_field log_time
  schema_path /home/terrycho/bq_tweet.json
  log_level error
</store>
</match>


기존 설정 파일에서 <match input.twitter.sampling> 부분을 빅쿼리로 변경하였다. <store>에서 type을 bigquery로 변경하였다.

중요한 필드들을 살펴보면

  • buffer_type, buffer_path : fluentd는 트위터에서 읽어드리는 데이타를 건건이 bigquery에 저장하는게 아니라 일정 단위로 모아서 bigquery에 저장한다. 그래서 buffer를 사용하는데, buffer를 파일을 이용하고, 이 파일의 위치를 지정해주었다.

  • auth_method, project,dataset,table : 데이타를 저장한 bigquery의 project,dataset,table 명을 정한다. 그리고 auth_method를 통해서 인증 방법을 설정하는데, 일반적으로는 service account에 대한 json 파일을 사용하는데, 여기서는 구글 클라우드내에 VM을 생성하였고, 앞에서 VM 생성시에 Bigquery에 대한 접근 권한을 이미 주었기 때문에, 인증 방식을 compute_engine으로 설정하면 된다.

  • flush_interval 은 어떤 주기로 버퍼된 데이타를 bigquery로 저장할것인지를 정한다. 여기서는 1초 단위로 저장하도록 하였다.

  • 그리고 중요한것중 하나가 schema_path 인데, 저장하고자 하는 bigquery 테이블의 스키마이다. 앞에서 테이블 생성에서 사용한 https://gist.github.com/Salinger/ef39b81ad2c48516b596 에서 다운 받았던 *.json으로 정의된 스키마 파일의 경로를 지정해주면 된다.

실행하기

모든 설정이 끝났으면

%sudo /etc/init.d/td-agent restart

명령을 이용해서 tdagent를 재기동하자.

그리고 빅쿼리 콘솔에서 “select count(*) from 테이블명” 명령을 사용하면 아래와 같이 카운트 수가 매번 올라가면서 데이타가 저장되는 것을 확인할 수 있다.


Datalab으로 대쉬보드 만들기

datalab은 오픈소스 iPython note의 구글 클라우드 버전이다. 자동으로 구글 클라우드 내의 앱앤진 내에 설치해주고, 구글 클라우드의 빅데이타 인프라 (빅쿼리등)과 손쉽게 연동되며, 구글 차트를 내장하고 있어서 그래프도 손쉽게 그려줄 수 있다.


데이타랩 준비하기

데이타랩을 사용하기 위해서는 https://datalab.cloud.google.com/ 에 접속하고, 로그인을 하면 다음과 같이 프로젝트를 선택하는 화면이 나온다.


만약에 아직 데이타랩을 설치 하지 않았으면 가운데 Deploy 버튼만 활성화가 된다. Deploy 버튼을 누르면 자동으로 데이타랩이 설치된다. 설치가 끝나면 Start 버튼이 활성화 된다. Start 버튼을 누르면 데이타 랩으로 들어갈 수 있다.

새로운 노트 만들기

다음은 데이타랩의 초기화면이다.


우리는 여기서, 새로운 노트를 만들어서 앞서 빅쿼리로 읽어드린 데이타를  lang(언어)별로 그룹핑을 해서 카운트하는 쿼리를 실행하고, 그 결과를 그래프로 만들것이다.

위의 초기화면에서 “+Notebook” 버튼을 눌려서 새로운 노트북을 만들어보자


노트화면이 로딩되었으면 상단의 메뉴를 보자


+Add code와, +Add Markdown 버튼을 볼 수 있는데,  Add Code는 파이썬이나 SQL과 같은 프로그래밍 언어를 정의하고 실행할 수 있는 공간이고, +Add Markdown은 일반적인 텍스트나 이미지를 통해서 간단한 글을 쓸 수 있는 공간을 만들어준다.

이렇게 코드써가면서 직접 실행해보고 그 결과를 확인하면서 그에 대한 내용을 설명하는 내용을 Markdown으로 작성하는 것과 같이 마치 노트에 계산을 해나가는 것처럼 써 나가기 때문에 이런 류의 프로그램을 노트북이라고 한다. (유사한 프로그램으로는 zeppelin 등이 있다 https://zeppelin.apache.org/)

쿼리 실행하기

그러면 Add code를 통해서 코드 섹션을 추가하고 SQL 문장을 추가해보자. 다음은 빅쿼리 트위터 테이블에서 lang 별로 그룹핑을 해서 카운트를 하는 SQL 문장이다.


이 문장을 실행하려면 노트북 상단의 “Run” 버튼을 누르면 된다.

다음과 같이 결과가 쿼리 바로 아래에 출력되는 것을 볼 수 있다.




그래프 그리기

다음으로 결과로 그래프를 그려보자

다음과 같이 두개의 코드 블럭을 추가하자


첫번째 코드 블럭에는 SQL 문장을 수행하는데 이때 --module twitter라고 정의를 해주면 결과가 twitter라는 모듈에 저장이 된다.

두번째 코드 블럭은 그래프를 그리기 위해서 chart 명령어를 이용하고 차트 타입은 pie로, 그래프의 x,y 축은 lang과, lang_count로 지정하고, 데이타 소스는  --date를 이용해서 앞의 쿼리 결과를 저장한 twitter로 지정한다.

다음으로 Run 버튼을 이용해서 쿼리를 수행해보면 다음과 같은 결과 화면을 얻을 수 있다.





지금까지 간략하게 Fluentd를 통해서 데이타를 수집하고 빅쿼리에 저장한 후, 데이타랩을 통해서 분석 및 리포팅을 하는 간단한 시나리오를 살펴보았다. fluentd나 데이타랩에 대한 사전적인 지식이 없었는데, 필자의 경우 이를 만드는데 대략 2시간의 시간이 소요되었다. 2시간의 시간으로 수 PB급의 빅데이타를 수집할 수 있고 분석할 수 있는 시스템을 구축할 수 있었다. 예전 같으면 하둡과 스팍 인스톨과 몇시간이 걸렸는데, 요즘 드는 생각은 빅데이타에 대한 접근 장벽이 많이 무너졌다고나 할까.

참고 자료


빅쿼리-#3 데이타 구조와 데이타 공유 권한관리


조대협 (http://bcho.tistory.com)


빅쿼리에 대한 개념 및 내부 구조에 대한 이해가 끝났으면, 빅쿼리의 데이타 구조와, 데이타에 대한 권한 관리에 대해서 알아보도록 한다.

데이타 구조

빅쿼리의 데이타 구조는 다음과 같은 논리 구조를 갖는다. 일반적인 RDBMS와 크게 다르지 않다.





데이타 구조

프로젝트 (Project)

먼저 프로젝트라는 개념을 가지고 있다. 하나의 프로젝트에는 여러개의 데이타셋이 들어갈 수 있다.

데이타셋 (Dataset)

데이타셋은 MySQL의 DB와 같은 개념으로, 여러개의 테이블을 가지고 있는 테이블의 집합이다. 이 단위로 다른 사용자와 데이타를 공유할 수 있다.

테이블 (Table)

데이타를 저장하고 있는 테이블이다.

잡 (Job)

쿼리나, 데이타 로딩, 삭제와 같이 데이타에 대해서 어떤 명령을 내렸을때, 그 명령을 잡(Job)이라고 하며, 각 Job들은 누가 언제 어떤 내용을 수행하였는지, 향후 감사를 목적으로 모두 로깅 된다.

데이타 타입

테이블에 저장될 수 있는 데이타 구조는 다음과 같다.

  • STRING : UTF-8인코딩. 최대 2MB

  • INTEGER : 64 bit

  • FLOAT :  Double precision

  • BOOLEAN

  • RECORD : Collection of one or more field

  • TIMESTAMP


특이한 것이 RECORD 라는 데이타 타입인데, 레코드는 JSON과 같이 여러개의 데이타를 가지는 데이타형을 이야기한다.


아래 그림은 웹UI에서 ID와 NAME이라는 두개의 컬럼을 가지고 있는 테이블을 생성하는 화면이다.

Name은 앞서 설명한 RECORD  타입으로 정의했고, Name 필드 안에는 Last_name과, First_name이라는 STRING형 필드를 갖는다.



이렇게 생성된 테이블의 구조는 다음과 같이 된다.


RECORD 형 데이타 타입 안에는 앞서 정의된 STRING,INTEGER 등의 데이타 타입으로 컬럼 정의가 가능하며, RECORD 형 데이타 타입이 또 그 안에 들어갈 수 있다.  (JSON 데이타형과 매우 유사하다고 보면 된다).


REPEATED FIELD

테이블내의 각 컬럼의 값들은 NULL (값이 없거나), 일반적인 테이블 처럼 1개의 값을 가질 수 도 있지만, 컬럼을 정의할때, REPEATABLE 이라고 정의하면 하나의 필드에도 여러개의 값을 가질 수 있다. (JSON의 배열 처럼)




위는 웹 UI에서, ID와 Basket이라는 두개의 STRING 필드를 가지고 있는 테이블을 정의하는 화면이고 그중, Basket을 REPEATED 필드로 정의하였다.

이렇게 정의된 테이블의 모양은 다음과 같다.



Terry.cho 데이타 처럼 Basket 하나의 컬럼에 여러개의 데이타를 가지고 있는 것을 볼 수 있다.

권한 관리 및 공유

빅쿼리는 여러 사람이 각자의 계정을 가지고 사용을 할 수 있으며, 각 사용자별로 특정 데이타셋에 대한 수정,조회,삭제 권한을 부여할 수 있다.

권한 적용 대상

권한을 적용하여 접근을 통제할 수 있는 대상은 프로젝트와, 데이타셋이다. (테이블 단위로는 권한 적용이 불가)


권한 부여 대상

  • 사용자
    개인 사용자에게 데이타에 대한 권한을 지정할 수 있다.

  • 구글 그룹스 기반의 사용자 집합
    다수의 사용자가 속해 있는 구글 그룹스 (https://groups.google.com/) 사용자들에게 권한을 지정할 수 있다.

  • 특정 역할(ROLE)을 가지고 있는 사용자
    사용자중에서 특정한 역할 (관리자, 부서등)을 가지고 있는 사용자들에게 권한을 지정할 수 있다.


권한 종류

데이타셋 권한

  • READER : 데이타셋에 대한 데이타 조회 가능

  • WRITER : 데이타셋 내의 테이블에 대한 생성, 데이타 조회 및 추가 가능

  • OWNER:  데이타셋 업데아트 및 삭제 가능

프로젝트 권한

  • Viewer : JOB 수행과 수행중인 JOB 모니터링 가능. 데이타셋에 대한  READER 권한

  • Editor : 데이타셋 생성 가능 + 데이타셋에 대한 WRITER 권한

  • Owner :  데이타셋에 대한 모든 JOB 수행 가능. 데이타셋 삭제 가능


데이타셋 권한

데이타 공유 하기

데이타셋 공유

빅쿼리 웹UI에서 다른 사람과 공유하고자 하는 데이타셋을 선택한 후에, “Share dataset” 이라는 메뉴를 선택하면 아래와 같이 데이타셋 공유 메뉴가 출력 된다.



이 메뉴에서 공유 적용 대상 (특정인, 이메일 그룹이나 특정 역할)을 선택하고 공유 권한 (View, Edit, Owner) 권한을 부여하면 공유를 부여 받은 대상이 이 데이타 셋에 접근할 수 있다.


데이타셋의 소유자가 데이타셋을 공유했다고, 공유 받는 쪽에서 자동으로 데이타가 보이는 것은 아니며, 데이타를 공유 받는 쪽에서, 프로젝트에서 Switch Project 메뉴를 선택한 후 Display Project메뉴를 선택하면


Add project 창이 나온다. 이때 앞에서 공유된 프로젝트 명을 입력한다. (공유된 프로젝트명을 알고 있어야 한다.)

그러면 공유된 데이타 셋이 속한 프로젝트와 공유되니 데이타셋이 좌측에 표시되고 접근이 가능하게 된다.




그림. github archive 프로젝트와 그아래 github,day 데이타셋이 공유되어 추가된 화면



프로젝트 공유

데이타셋 단위뿐 아니라 조금 더 큰 범주에서 프로젝트 자체를 공유할 수 있다.

프로젝트에 대한 권한 관리는 구글 클라우드 메뉴에서 IAM & Admin 메뉴로 들어가서


 IAM 메뉴에서 Add members를 선택하면, 아래 그림과 같이 사용자 이름을 입력할 수 있는 텍스트 박스가 뜨고, 우측에 Select Role 리스트 박스가 나온다. 여기서 Projects 메뉴를 선택하면 하부 메뉴로, Owner, Editor, Viewer 등의 권한을 부여할 수 있다.




빅쿼리의 데이타 구조, 데이타 타입 그리고 권한 접근 구조에 대해서 알아보았다.

다음에는 실제로 프로젝트와 데이타셋을 생성하고, 테이블을 생성한 후 데이타를 로딩해보도록 하겠다.