Archive»


파이어베이스 애널러틱스를 이용한 모바일 데이타 분석 #1-Hello Firebase

조대협 (http://bcho.tistory.com)


얼마전에 구글은 모바일 백앤드 플랫폼인 파이어베이스를 인수하고 이를 서비스로 공개하였다.

파이어 베이스는 모바일 백앤드의 종합 솔루션으로, 크래쉬 리포팅, 리모트 컨피그를 이용한 A/B 테스팅 플랫폼, 클라우드와 자동 동기화가 가능한 리얼타임 데이타 베이스, 사용자 인증 기능, 강력한 푸쉬 플랫폼 다양한 모바일 기기에 대해서 테스트를 해볼 수 있는 테스트랩 등, 모바일 앱 개발에 필요한 모든 서비스를 제공해주는 종합 패키지와 같은 플랫폼이라고 보면 된다. 안드로이드 뿐만 아니라 iOS까지 지원하여 모든 모바일 앱 개발에 공통적으로 사용할 수 있다.



그중에서 파이어베이스 애널러틱스 (Firebase analytics)는 모바일 부분은 모바일 앱에 대한 모든 이벤트를 수집 및 분석하여 자동으로 대쉬 보드를 통하여 분석을 가능하게 해준다.


이 글에서는 파이어베이스 전체 제품군중에서 파이어베이스 애널러틱스에 대해서 수회에 걸쳐서 설명을 하고자 한다.


파이어베이스 애널러틱스

이미 시장에는 모바일 앱에 대한 데이타 분석이 가능한 유료 또는 무료 제품이 많다.

대표적으로 야후의 flurry, 트위터 fabric, 구글 애널러틱스등이 대표적인 제품군인데, 그렇다면 파이어베이스가 애널러틱스가 가지고 있는 장단점은 무엇인가?


퍼널 분석 및 코호트 분석 지원

파이어베이스 애널러틱스는 데이타 분석 방법중에 퍼넬 분석과 코호트 분석을 지원한다.

퍼널 분석은 한글로 깔데기 분석이라고 하는데, 예를 들어 사용자가 가입한 후에, 쇼핑몰의 상품 정보를 보고  주문 및 결재를 하는 단계 까지 각 단계별로 사용자가 이탈하게 된다. 이 구조를 그려보면 깔데기 모양이 되는데,사용자 가입에서 부터 최종 목표인 주문 결재까지 이루도록 단계별로 이탈율을 분석하여 서비스를 개선하고, 이탈율을 줄이는데 사용할 수 있다.

코호트 분석은 데이타를 집단으로 나누어서 분석하는 방법으로 일일 사용자 데이타 (DAU:Daily Active User)그래프가 있을때, 일일 사용자가 연령별로 어떻게 분포가 되는지등을 나눠서 분석하여 데이타를 조금 더 세밀하게 분석할 수 있는 방법이다.


이러한 코호트 분석과 퍼넬 분석은 모바일 데이타 분석 플랫폼 중에서 일부만 지원하는데, 파이어베이스 애널러틱스는 퍼넬과 코호트 분석을 기본적으로 제공하고 있으며, 특히 코호트 분석으로 많이 사용되는 사용자 잔존율 (Retention 분석)의 경우 별다른 설정 없이도 기본으로 제공하고 있다.


<그림. 구글 파이어베이스의 사용자 잔존율 코호트 분석 차트>

출처 : https://support.google.com/firebase/answer/6317510?hl=en

무제한 앱 및 무제한 사용자 무료 지원

이러한 모바일 서비스 분석 서비스의 경우 사용자 수나 수집할 수 있는 이벤트 수나 사용할 수 있는 앱수에 제약이 있는데, 파이어베이스 애널러틱스의 경우에는 제약이 없다.

빅쿼리 연계 지원

가장 강력한 기능중의 하나이자, 이 글에서 주로 다루고자 하는 내용이 빅쿼리 연동 지원이다.

모바일 데이타 분석 서비스 플랫폼의 경우 대 부분 플랫폼 서비스의 형태를 띄기 때문에, 분석 플랫폼에서 제공해주는 일부 데이타만 볼 수 가 있고, 원본 데이타에 접근하는 것이 대부분 불가능 하다.

그래서 모바일 애플리케이션 서버에서 생성된 데이타나, 또는 광고 플랫폼등 외부 연동 플랫폼에서 온 데이타에 대한 연관 분석이 불가능하고, 원본 데이타를 통하여 여러가지 지표를 분석하는 것이 불가능하다.


파이어베이스 애널러틱스의 경우에는 구글의 데이타 분석 플랫폼이 빅쿼리 연동을 통하여 모든 데이타를 빅쿼리에 저장하여 간단하게 분석이 가능하다.

구글 빅쿼리에 대한 소개는 http://bcho.tistory.com/1116 를 참고하기 바란다.

구글의 빅쿼리는 아마존 S3나, 구글의 스토리지 서비스인 GCS 보다 저렴한 비용으로 데이타를 저장하면서도, 수천억 레코드에 대한 연산을 수십초만에 8~9000개의 CPU와 3~4000개의 디스크를 사용해서 끝낼만큼 어마어마한 성능을 제공하면서도, 사용료 매우 저렴하며 기존 SQL 문법을 사용하기 때문에, 매우 쉽게 접근이 가능하다.

모바일 데이타 분석을 쉽게 구현이 가능

보통 모바일 서비스에 대한 데이타 분석을 할때는 무료 서비스를 통해서 DAU나 세션과 같은 기본적인 정보 수집은 가능하지만, 추가적인 이벤트를 수집하여 저장 및 분석을 하거나 서버나 다른 시스템의 지표를 통합 분석 하는 것은 별도의 로그 수집 시스템을 모바일 앱과 서버에 만들어야 하였고, 이를 분석 및 저장하고 리포팅 하기 위해서 하둡이나 스파크와 같은 복잡한 빅데이타 기술을 사용하고 리포팅에도 많은 시간이 소요 되었다.


파이어베이스 애널러틱스를 이용하면, 손 쉽게, 추가 이벤트나 로그 정보를 기존의 로깅 프레임웍을 통하여 빅쿼리에 저장할 수 있고, 복잡한 하둡이나 스파크의 설치나 프로그래밍 없이 빅쿼리에서 간략하게 SQL만을 사용하여 분석을 하고 오픈소스 시각화 도구인 Jupyter 노트북이나 구글의 데이타스튜디오 (http://datastudio.google.com)을 통하여 시작화가 간단하기 때문에, 이제는 누구나 쉽게 빅데이타 로그를 수집하고 분석할 수 있게 된다.

실시간 데이타 분석은 지원하지 않음

파이어베이스 애널러틱스가 그러면 만능 도구이고 좋은 기능만 있는가? 그건 아니다. 파이어베이스 애널러틱스는 아직까지는 실시간 데이타 분석을 지원하고 있지 않다. 수집된 데이타는 보통 수시간이 지나야 대쉬 보드에 반영이 되기 때문에 현재 접속자나, 실시간 모니터링에는 적절하지 않다.

그래서 보완을 위해서 다른 모니터링 도구와 혼용해서 사용하는 게 좋다. 실시간 분석이 강한 서비스로는 트위터 fabric이나 Google analytics 등이 있다.

이러한 도구를 이용하여 데이타에 대한 실시간 분석을 하고, 정밀 지표에 대한 분석을 파이어베이스 애널러틱스를 사용 하는 것이 좋다.


파이어베이스 애널러틱스 적용해보기

백문이 불여일견이라고, 파이어베이스 애널러틱스를 직접 적용해보자.

https://firebase.google.com/ 사이트로 가서, 가입을 한 후에, “콘솔로 이동하기"를 통해서 파이어 베이스 콘솔로 들어가자.

프로젝트 생성하기

다음으로 파이어베이스 프로젝트를 생성한다. 상단 메뉴에서 “CREATE NEW PROJECT”를 선택하면 새로운 파이어 베이스 프로젝트를 생성할 수 있다. 만약에 기존에 사용하던 구글 클라우드 프로젝트등이 있으면 별도의 프로젝트를 생성하지 않고 “IMPORT GOOGLE PROJECT”를 이용하여 기존의 프로젝트를 불러와서 연결할 수 있다.



프로젝트가 생성되었으면 파이어베이스를 사용하고자 하는 앱을 등록해야 한다.

파이어베이스 화면에서 “ADD APP” 이라는 버튼을 누르면 앱을 추가할 수 있다.

아래는 앱을 추가하는 화면중 첫번째 화면으로 앱에 대한 기본 정보를 넣는 화면이다.

“Package name” 에, 파이어베이스와 연동하고자 하는 안드로이드 앱의 패키지 명을 넣는다.


ADD APP 버튼을 누르고 다음 단계로 넘어가면 google-services.json 이라는 파일이 자동으로 다운된다. 이 파일은 나중에 안드로이드 앱의 소스에 추가해야 하기 때문에 잘 보관한다.


Continue 버튼을 누르면 아래와 같이 다음 단계로 넘어간다. 다음 단계에서는 안드로이드 앱을 개발할때 파이어베이스를 연동하려면 어떻게 해야 하는지에 대한 가이드가 나오는데, 이 부분은 나중에 코딩 부분에서 설명할 예정이니 넘어가도록 하자.


자 이제 파이어베이스 콘솔에서, 프로젝트를 생성하고 앱을 추가하였다.

이제 연동을 할 안드로이드 애플리케이션을 만들어보자.

안드로이드 빌드 환경 설정

콘솔에서 앱이 추가되었으니, 이제 코드를 작성해보자, 아래 예제는 안드로이드 스튜디오 2.1.2 버전 (맥 OS 기준) 으로 작성되었다.


먼저 안드로이드 프로젝트를 생성하였다. 이때 반드시 안드로이드 프로젝트에서 앱 패키지 명은 앞에 파이어베이스 콘솔에서 지정한 com.terry.hellofirebase가 되어야 한다.

안드로이드 프로젝트에는 프로젝트 레벨의 build.gradle 파일과, 앱 레벨의 build.gradle 파일이 있는데



프로젝트 레벨의 build.gradle 파일에 classpath 'com.google.gms:google-services:3.0.0' 를 추가하여  다음과 같이 수정한다.


// Top-level build file where you can add configuration options common to all sub-projects/modules.


buildscript {

  repositories {

      jcenter()

  }

  dependencies {

      classpath 'com.android.tools.build:gradle:2.1.2'

      classpath 'com.google.gms:google-services:3.0.0'

      // NOTE: Do not place your application dependencies here; they belong

      // in the individual module build.gradle files

  }

}


allprojects {

  repositories {

      jcenter()

  }

}


task clean(type: Delete) {

  delete rootProject.buildDir

}



다음으로, 앱레벨의 build.gradle 파일도 dependencies 부분에    compile 'com.google.firebase:firebase-core:9.4.0' 를 추가하고, 파일 맨 아래 apply plugin: 'com.google.gms.google-services' 를 추가 하여 아래와 같이 수정한다.

apply plugin: 'com.android.application'


android {

  compileSdkVersion 24

  buildToolsVersion "24.0.2"


  defaultConfig {

      applicationId "com.terry.hellofirebase"

      minSdkVersion 16

      targetSdkVersion 24

      versionCode 1

      versionName "1.0"

  }

  buildTypes {

      release {

          minifyEnabled false

          proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'

      }

  }

}


dependencies {

  compile fileTree(dir: 'libs', include: ['*.jar'])

  testCompile 'junit:junit:4.12'

  compile 'com.android.support:appcompat-v7:24.2.0'

  compile 'com.google.firebase:firebase-core:9.4.0'

}

apply plugin: 'com.google.gms.google-services'



그리고 파이어베이스 콘솔에서 앱을 추가할때 다운된 google-services.json 파일을 app디렉토리에 복사한다.




이 예제의 경우에는 /Users/terrycho/AndroidStudioProjects/HelloFireBase에 프로젝트를 만들었기 때문에,  /Users/terrycho/AndroidStudioProjects/HelloFireBase/app 디렉토리에 복사하였다.


Gradle 파일 수정이 끝나고, google-services.json 파일을 복사하였으면 안드로이드 스튜디오는 gradle 파일이 변경이 되었음을 인지하고 sync를 하도록 아래 그림과 같이 “Sync now”라는 버튼이 상단에 표시된다.


“Sync now”를 눌러서 프로젝트를 동기화 한다.

예제 코드 만들기

이제 안드로이드 스튜디오의 프로젝트 환경 설정이 완료되었다. 이제, 예제 코드를 만들어 보자.

이 예제 코드는 단순하게, 텍스트 박스를 통해서 아이템 ID,이름, 그리고 종류를 입력 받아서, 파이어베이스 애널러틱스에 이벤트를 로깅하는 예제이다.

파이어베이스 애널러틱스 서버로 로그를 보낼 것이기 때문에, AndroidManifest 파일에 아래와 같이  수정하여 INTERNET과 ACCESS_NETWORK_STATE 권한을 추가한다.

<?xml version="1.0" encoding="utf-8"?>

<manifest xmlns:android="http://schemas.android.com/apk/res/android"

  package="com.terry.hellofirebase">

  <uses-permission android:name="android.permission.INTERNET" />

  <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

 

  <application

      android:allowBackup="true"

      android:icon="@mipmap/ic_launcher"

      android:label="@string/app_name"

      android:supportsRtl="true"

      android:theme="@style/AppTheme">

      <activity android:name=".MainActivity">

          <intent-filter>

              <action android:name="android.intent.action.MAIN" />


              <category android:name="android.intent.category.LAUNCHER" />

          </intent-filter>

      </activity>

  </application>


</manifest>


다음으로 화면을 구성해야 하는데, 우리가 구성하려는 화면 레이아웃은 대략 다음과 같다.



각각의 EditText 컴포넌트는 tv_contentsId, tv_contentsName,tv_contentsCategory로 지정하였다.

위의 레이아웃을 정의한 activity_main.xml은 다음과 같다.


<?xml version="1.0" encoding="utf-8"?>

<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"

  xmlns:tools="http://schemas.android.com/tools"

  android:layout_width="match_parent"

  android:layout_height="match_parent"

  android:paddingBottom="@dimen/activity_vertical_margin"

  android:paddingLeft="@dimen/activity_horizontal_margin"

  android:paddingRight="@dimen/activity_horizontal_margin"

  android:paddingTop="@dimen/activity_vertical_margin"

  tools:context="com.terry.hellofirebase.MainActivity">


  <LinearLayout

      android:orientation="vertical"

      android:layout_width="match_parent"

      android:layout_height="match_parent"

      android:layout_alignParentLeft="true"

      android:layout_alignParentStart="true">


      <TextView

          android:layout_width="wrap_content"

          android:layout_height="wrap_content"

          android:textAppearance="?android:attr/textAppearanceMedium"

          android:text="Contents ID"

          android:id="@+id/tv_contetnsId" />


      <EditText

          android:layout_width="match_parent"

          android:layout_height="wrap_content"

          android:id="@+id/txt_contentsId"

          android:layout_gravity="center_horizontal" />


      <TextView

          android:layout_width="wrap_content"

          android:layout_height="wrap_content"

          android:textAppearance="?android:attr/textAppearanceMedium"

          android:text="Contents Name"

          android:id="@+id/tv_contentsName" />


      <EditText

          android:layout_width="match_parent"

          android:layout_height="wrap_content"

          android:id="@+id/txt_contentsName" />


      <TextView

          android:layout_width="wrap_content"

          android:layout_height="wrap_content"

          android:textAppearance="?android:attr/textAppearanceMedium"

          android:text="Contents Category"

          android:id="@+id/tv_contentsCategory" />


      <EditText

          android:layout_width="match_parent"

          android:layout_height="wrap_content"

          android:id="@+id/txt_contentsCategory" />


      <Button

          android:layout_width="wrap_content"

          android:layout_height="wrap_content"

          android:text="Send Event"

          android:id="@+id/btn_sendEvent"

          android:layout_gravity="center_horizontal"

          android:onClick="onSendEvent" />

  </LinearLayout>

</RelativeLayout>


레이아웃 설계가 끝났으면, SEND EVENT 버튼을 눌렀을때, 이벤트를 파이어베이스 애널러틱스 서버로 보내는 코드를 만들어 보자.

MainActivity인 com.terry.hellofirebase.MainActivity 클래스의 코드는 다음과 같다.


package com.terry.hellofirebase;


import android.support.v7.app.AppCompatActivity;

import android.os.Bundle;

import android.view.View;

import android.widget.EditText;

import android.widget.Toast;


import com.google.firebase.analytics.FirebaseAnalytics;


public class MainActivity extends AppCompatActivity {


  // add firebase analytics object

  private FirebaseAnalytics mFirebaseAnalytics;


  @Override

  protected void onCreate(Bundle savedInstanceState) {

      super.onCreate(savedInstanceState);

      mFirebaseAnalytics = FirebaseAnalytics.getInstance(this);

      setContentView(R.layout.activity_main);

  }


  public void onSendEvent(View view){

      String contentsId;

      String contentsName;

      String contentsCategory;


      EditText txtContentsId = (EditText)findViewById(R.id.txt_contentsId);

      EditText txtContentsName = (EditText)findViewById(R.id.txt_contentsName);

      EditText txtContentsCategory = (EditText)findViewById(R.id.txt_contentsCategory);


      contentsId = txtContentsId.getText().toString();

      contentsName = txtContentsName.getText().toString();

      contentsCategory = txtContentsCategory.getText().toString();


      Bundle bundle = new Bundle();

      bundle.putString(FirebaseAnalytics.Param.ITEM_ID, contentsId);

      bundle.putString(FirebaseAnalytics.Param.ITEM_NAME, contentsName);

      bundle.putString(FirebaseAnalytics.Param.CONTENT_TYPE, contentsCategory);

      mFirebaseAnalytics.logEvent(FirebaseAnalytics.Event.SELECT_CONTENT, bundle);


      Toast.makeText(getApplicationContext(), "Sent event", Toast.LENGTH_LONG).show();

  }

}


MainActivity 클래스에 FirebaseAnalytics 객체를 mFirebaseAnalytics라는 이름으로 정의하고 onCreate메서드에서 FirebaseAnalytics.getInstance(this) 메서드를 이용하여 파이어베이스 애널러틱스 객체를 생성한다.


다음 onSendEvent라는 메서드를 구현한다. 이 메서드는 화면에서 “SEND EVENT”라는 버튼을 누르면 EditText 박스에서 입력된 값으로 SELECT_CONTENT라는 이벤트를 만들어서 파이어베이스 애널러틱스 서버로 보내는 기능을 한다.

컨텐츠 ID,NAME,CATEGORY를 EditText 박스에서 읽어온 후에, Bundle 이라는 객체를 만들어서 넣는다.

파이어베이스 애널러틱스 로그는 이벤트와 번들이라는 개념으로 구성이 된다.

이벤트는 로그인, 컨텐츠 보기, 물품 구매와 같은 이벤트이고, Bundle은 이벤트에 구체적인 인자를 묶어서 저장하는 객체이다. 위의 예제인 경우 SELECT_CONTENTS 라는 이벤트가 발생할때 컨텐츠 ID, 이름(Name), 종류(Category)를 인자로 하여, Bundle에 묶어서 전달하도록 하였다.

Bundle 클래스를 생성한후, bundle.putString(“인자명",”인자값") 형태로 Bundle 객체를 설정한 후에, mFirebaseAnalytics.logEvent(“이벤트명",”Bundle 객체") 메서드를 이용하여 SELECT_CONTENTS 이벤트에 앞서 작성한 Bundle을 통하여 인자를 전달하였다.


앱 개발이 모두 완료되었다. 이제 테스트를 해보자

실행하기

앱을 실행하고 아래와 같이 데이타를 넣어보자


컨텐츠 ID는 200, 컨텐츠 이름은 W, 그리고 컨텐츠 종류는 webtoon으로 입력하였다.

SEND EVENT 눌러서 이벤트를 보내서 파이어베이스 웹콘솔에 들어가서 Analytics 메뉴에 상단 메뉴인 “Events”를 선택하면 처음에는 아무런 값이 나오지 않는다.

앞에서 설명했듯이 파이어베이스 애널러틱스는 아직까지 실시간 분석을 지원하지 않기 때문에 수시간이 지난 후에야 그 값이 반영 된다.


본인의 경우 밤 12시에 테스트를 진행하고 아침 9시경에 확인을 하였더니 아래와 같은 결과를 얻을 수 있었다.



실제로 테스트 시에 select contents 이벤트를 3번을 보냈더니, Count가 3개로 나온다.

그러나 이벤트에 보낸 컨텐츠 ID, 이름 , 분류등은 나타나지 않는다. 기본 설정에서는 이벤트에 대한 디테일 정보를 얻기가 어렵다. 그래서 빅쿼리 연동이 필요한데 이는 후에 다시 다루도록 하겠다.


Dashboard 메뉴를 들어가면 다음과 같이 지역 분포나 단말명등 기본적인 정보를 얻을 수 있다.



이벤트와 이벤트 인자

앞서처럼 이벤트와 인자등을 정해줬음에도 불구하고 대쉬보드나 기타 화면에 수치들이 상세하지 않은 것을 인지할 수 있다. 정확한 데이타를 분석하려면 마찬가지로 정확한 데이타를 보내줘야 하는데, 화면 로그인이나 구매등과 같은 앱에서의 이벤트를 앱 코드내에 삽입해줘야 상세한 분석이 가능하다.

이벤트는 https://firebase.google.com/docs/reference/android/com/google/firebase/analytics/FirebaseAnalytics.Event 에 정의가 되어 있고, 각 이벤트별 인자에 대한 설명은 https://firebase.google.com/docs/reference/android/com/google/firebase/analytics/FirebaseAnalytics.Param 에 있는데, 이미 파이어베이스에서는 게임이나 미디어 컨텐츠, 쇼핑과 같은 주요 모바일 앱 시나리오에 대해서 이벤트와 인자들은 미리 정의해놓았다.

https://support.google.com/firebase/topic/6317484?hl=en&ref_topic=6386699

를 보면 모바일 앱의 종류에 따라서 어떠한 이벤트를 사용해야 하는지가 정의되어 있다.


또한 미리 정의되어 있는 이벤트 이외에도 사용자가 직접 이벤트를 정의해서 사용할 수 있다.  이러한 이벤트를 커스텀 이벤트라고 하는데 https://firebase.google.com/docs/analytics/android/events 를 참고하면 된다.


지금까지 간략하게 나마 파이어베이스 애널러틱스의 소개와 예제 코드를 통한 사용 방법을 알아보았다.

모바일 데이타 분석이나 빅데이타 분석에서 가장 중요한 것은 데이타를 모으는 것도 중요하지만, 모아진 데이타에 대한 지표 정의와 그 의미를 파악하는 것이 중요하다. 그래서 다음 글에서는 파이어베이스 애널러틱스에 정의된 이벤트의 종류와 그 의미 그리고, 대쉬 보드를 해석하는 방법에 대해서 설명하고, 그 후에 빅쿼리 연동을 통해서 상세 지표 분석을 하는 방법에 대해서 소개하고자 한다.


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

세번째 책이 나왔습니다.

사는 이야기 | 2016.08.29 13:25 | Posted by 조대협

빠르게 훑어보는 구글 클라우드 플랫폼


오늘 세번째 책이 나왔습니다.



이번에 출간된 책은 구글 클라우드에 대해서 간략한 사용 방법을 소개한 "빠르게 훑어보는 구글 클라우드 플랫폼" 이라는 책입니다.

구글에 입사한지도 이제 3개월이 막 지났는데, 막상 사람들 이야기를 들어보니, 한글 자료가 없고, 기초적인 (SSH설정)에서 부터 막히는 분들이 많아서, 구글 한국 사용자 그룹분들과 함께 간략한 소개 서적을 만들었습니다.


한빛 미디어에서 보정 및 조판 작업을 도와주셨구요. (엔지니어 출신이신 이복연님이 꼼꼼하게 봐주신 덕분에 원고 품질이 많이 올라갔습니다.)


이책은 정보 공유 차원에서 무료 EBOOK 형태로 배포됩니다.

http://www.hanbit.co.kr/realtime/books/book_view.html?p_code=E5359426070


많이들 공유하시고, 구글 클라우드 관심 가져주세요.

다음권으로 모바일 빅데이타 분석 기술에 대한 책도 조만간 출간 예정입니다.


인세등 관련 수익금은 전액 사회 기부 됩니다. 






저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

데이타 플로우 프로그래밍 모델의 이해


조대협 (http://bcho.tistory.com)


앞의 글에서 스트리밍 프로세스의 개념과, 데이타 플로우의 스트리밍 처리 개념에 대해서 알아보았다. 그렇다면 실제로 이를 데이타 플로우를 이용해서 구현을 하기 위해서는 어떤 컴포넌트와 프로그래밍 모델을 사용하는지에 대해서 알아보자.


구글 데이타 플로우 프로그래밍 모델은 앞에서 설명한 바와 같이, 전체 데이타 파이프라인을 정의하는 Pipeline, 데이타를 저장하는 PCollections, 데이타를 외부 저장소에서 부터 읽거나 쓰는 Pipeline I/O, 그리고, 입력 데이타를 가공해서 출력해주는 Transforms , 총 4가지 컴포넌트로 구성이 되어 있다.


이번 글에서는 그 중에서 데이타를 가공하는  Transforms 컴포넌트들에 대해서 알아본다.

Transforms

Transforms는 데이타를 어떻게 가공하느냐에 따라서 다음 그림과 같이 세가지 형태로 분류된다.

Element-Wise

개별 데이타 엘리먼트를 단위로 연산을 수행하는 것을 Element-Wise Transform이라고 한다.

ParDo 함수가 이에 해당하며, 하나의 데이타를 입력 받아서, 연산을 한 후, 하나의 출력을 내보내는 연산이다.

ParDo 함수는 DoFn으로 정의된 연산 로직을 병렬로 여러개의 프로세스와 쓰레드에서 나눠서 처리를 해준다.


DoFn 클래스 는 다음과 같은 포맷으로 정의한다.


static class MyFunction extends DoFn<{입력 데이타 타입},{출력 데이타 타입}>{

     @Override

     public void processElement(ProcessContext c) {

       // do Something

     }

}


DoFn 클래스를 상속 받아서 클래스를 정의하고, 클래스 정의시 제네릭 타입 (Generic type)으로, 첫번째는 입력 데이타 타입을 두번째는 출력 데이타 타입을 정의한다.

그리고 processElement 함수를 오버라이드(Override)해서, 이 함수안에 연산 로직을 구현한다.

processElement는 ProcessContext를 인자로 받는데, 이 Context에는 입력 데이타를 포함하고 있다.

입력 데이타를 꺼내려면 c.element()라는 메서드를 이용하면, 입력 데이타 타입으로 정의된 입력 데이타를 꺼낼 수 있다.

데이타를 처리한 다음에 파이프라인상의 다음 컴포넌트로 데이타를 보내려면 c.output ({출력 데이타} ) 형태로 정의를 해주면 된다.


이렇게 정의된 DoFn함수는  ParDo를 이용하여 파이프라인 상에서 병렬로 실행될 수 있다. ParDo는, 파이프라인상에서 .apply 메서드를 이용하여 적용한다.


그러면 실제로 어떻게 적용이 되는지 다음 예제를 보자.

   p.apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))


String 인자를 입력으로 받아서, String 인자로 출력을 하는 DoFn 함수를 정의하였다.

processElement 부분에서, c.element()로 String 입력 값을 읽은 후, toUpperCase() 함수를 적용하여 대문자로 변환을 한 후, c.output을 이용하여 다음 파이프라인으로 출력 데이타를 넘겼다.


조금 더 디테일한 예제를 보자

p.apply(Create.of("key1,Hello", "key2,World","key1,hello","key3,boy","key4,hello","key2,girl"))

.apply(ParDo.named("Parse").of(new DoFn<String, KV<String,String>>() {

@Override

public void processElement(ProcessContext c) {

StringTokenizer st = new StringTokenizer(c.element(),",");

String key = st.nextToken();

String value = st.nextToken();


KV<String,String> outputValue =  KV.of(key,value);

c.output(outputValue);

}

}))

Create.of를 이용하여 “Key.Value” 문자열 형태로 데이타를 생성한 후, 이 문자열을 읽어서, DoFn<String,KV<String,String>> 클래스에서 이를 파싱하여, 문자열을 키밸류 데이타형인 KV 데이타 형으로 변환하여 리턴하는 예제이다. 아래 개념도를 보자


입력 값은 String 데이타 타입으로 “Key.Value”라는 형태의 문자열을 받는다.

DoFn에서 처리한 출력 값은 KV 형으로 KV 데이타 타입은 키와 값을 가지고 있는데, 키와 값의 타입도 제네릭 타입으로 키는 String, 값은 String 타입으로 정의하였다. 입력된 “Key.Value” 문자열은 “.” 전후로 분리가 되서, “.” 좌측은 키로, 우측은 값으로 해서, KV에 각각 들어간다.

processElement를 보면, c.element를 이용하여 String 문자열을 꺼낸 후, StringTokenizer를 이용하여 “.”을 분류 문자로 정의해서 파싱한다. 첫번째를 키로 저장하고, 두번째를 값으로 저장한다.

KV<String,String> outputValue =  KV.of(key,value)

를 이용하여, KV 객체를 생성한 후, c.output(outputValue); 을 이용하여 다음 파이프라인 컴포넌트로 값을 전달한다.


시스템내에서 수행되는 방법은 다음과 같이 된다. ParDo에 의해서 DoFn 클래스가 여러개의 워커에 의해서 분산 처리가된다.

어그리게이션(Aggregation)

어그리게이션 값을 특정 키 값을 이용하여 데이타를 그룹핑을 하는 개념이다.

이러한 어그리게이션 작업은 내부적으로 셔플링(Shuffling)이라는 개념을 이용해서 이루어지는데, 키별로 데이타를 모으거나 키별로 합산등의 계산을 하기 위해서는, 키별로 데이타를 모아서 특정 워커 프로세스로 보내야 한다.

ParDo를 이용하여 병렬 처리를 할 경우, 데이타가 키값에 상관 없이 여러 워커에 걸쳐서 분산되서 처리되기 때문에, 이를 재 정렬해야 하는데, 이 재 정렬 작업을 셔플링이라고 한다.


아래 그림을 보자, 파이프라인 상에서 첫번째 프로세스를 Worker 1과 Worker 2가 처리를 하였고, 결과는 Key1과  Key2를 키로 가지는 데이타라고 하자, 이를 어그리게이션 하면 아래 그림과 같이 Key1 데이타는 Worker 3로 모이고, Key 2 데이타는 Worker 4로 모인다. 이런 방식으로 셔플링이 이루어진다.


데이타 플로우의 어그리게이션 에는 특정 키별로 데이타를 묶어주는 Grouping과, 특정 키별로 데이타를 연산(합이나 평균을 내는)하는  Combine 두 가지가 있다.

Grouping

PCollection<KV<String, Integer>> wordsAndLines = ...;

에서 다음과 같이 String,Integer 페어로 KV 타입을 리턴한다고 하자.


 cat, 1
 dog, 5
 and, 1
 jump, 3
 tree, 2
 cat, 5
 dog, 2
 and, 2
 cat, 9
 and, 6
 ...


이를 다음과 같이 키에 따라서 밸류들을 그룹핑 하려면 GroupByKey를 사용해야 한다.


 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(

   GroupByKey.<String, Integer>create());


코드는 앞 단계에서 KV<String,Integer>로 들어온 wordLines 데이타를 GroupByKey를 이용하여 Key 단위로 그룹핑을 한후 이를 <String, Iterable<Integer>> 타입으로 리턴하는 Transformation이다.

<String, Iterable<Integer>>에서 앞의 String은 키가 되며, Iterable 다수의 값을 가지고 있는 밸류가 된다. 이 밸류는 Integer 타입으로 정의된다.

Combine

Grouping이 키 단위로 데이타를 묶어서 분류해주는 기능이라면, Combine은 키단위로 데이타를 묶어서 연산을 해주는 기능이다.

예를 들어 앞의 예제처럼, “cat”이라는 문자열 키로 된 데이타들이 [1,5,9] 가 있을때, 이에 대한 총합이나 평균등을 내는 것이 Combine 이다.


PCollection<Integer> pc = ...;

 PCollection<Integer> sum = pc.apply(

   Combine.globally(new Sum.SumIntegerFn()));


코드는 Integer로 들어오는 모든 값을 Combine에서 Sum 기능을 이용하여 모든 값을 더하는 코드이다.

전체 데이타에 대해서 적용하기 때문에, Combine.globally로 적용하였다.


아래와 같은 형태의 데이타가 있다고 가정하자. 키에 따라서 값이 그룹핑이 된 형태이다.

 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Integer>> occurrences = ...;

 PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());

 PCollection<KV<String, Integer>> firstOccurrences = pc.apply(

   Combine.groupedValues(new Min.MinIntegerFn()));


위의 데이타들이 PCollection<KV<String, Iterable<Integer>>> grouped

에 저장되었다고 할때, 각 키별로 최소값을 구하는 것을 Combine.groupedValue에서 Min을 호출하여 최소값을 구했다.


Transforms 컴포넌트의 기본적인 종류들을 알아보았다. 이외에도, 하나의 Transform 안에 여러개의 Transform을 집어 넣는 Composite Transform이나, 두개 이상의 데이타 스트림에서 데이타를 키에 따라 JOIN하는 기능들이 있는데, 이러한 고급 기능들은 뒤에 고급 프로그래밍 모델에서 설명하기로 한다.

PCollection

PCollection은 데이타 플로우 파이프라인 내에서 데이타를 저장하는 개념이다.

데이타는 이 PCollection 객체에 저장이되서, 파이프라인을 통해서 Transform으로 넘겨진다.

PCollection은 한번 생성이 되면, 그 데이타를 수정이 불가능하다. (데이타를 변경하거나 수정하기 위해서는 PCollection을 새로 생성해야 한다.)

Bounded & Unbounded PCollection

PCollection은 데이타의 종류에 따라 Bounded PCollection과 Unbounded PCollection 두가지로 나뉘어 진다.

Bounded PCollection

Bounded PCollection은 배치 처리 처럼, 데이타가 변화하지 않는 데이타로 파일이나, 업데이트가 더 이상 발생하지 않는 테이블등을 생각하면 된다.

TextIO,BigQueryIO,DataStoreIO등을 이용해서 데이타를 읽은 경우에는 Bounded PCollection으로 처리가 된다.

Bounded PCollection 데이타들은 배치 처리의 특성상 데이타를 한꺼번에 읽어서 처리한다.  

Unbounded PCollection

Unbounded PCollection은, 데이타가 계속 증가 하는 즉 흐르는 데이타를 표현한다. 트위터 타임 라인이나, 스마트 폰에서 서버로 올라오는 이벤트 로그등을 예로 들 수 있다.

이러한 Unbounded PCollection은 시간의 개념을 가지고 윈도우 처리등을 해야하기 때문에, PCollection 객체내에 타임 스탬프가 자동으로 붙는다.

UnBounded PCollection은 데이타를 BigQueryIO또는 Pub/Sub에서 읽을 때 생성된다.

특히 Unbounded PCollection에 Grouping이나, Combine등을 사용할 경우, 데이타가 파이프라인 상에서 언제 그룹핑된 데이타를 다음 Transform 컴포넌트로 넘겨야할지를 정의해야 하기 때문에, Window를 반드시 사용해야 한다.

데이타 타입

PCollection을 이용해서 정의할 수 있는 주요 데이타 타입은 다음과 같다.

일반 데이타 타입

PCollection<Integer> data

가장 기초적인 데이타 형으로, Integer,Float,String 등 자바의 일반 데이타 타입으로 정의되고 하나의 데이타 만을 저장한다.

KV 데이타 타입

PCollection< KV<String,Integer> key_value_data

키 밸류 데이타 타입으로, 키와 값으로 구성된다. 키와 값은 일반 데이타 타입과 같게, 자바의 일반 데이타 타입 사용이 가능하다.


PCollection<KV<String, Iterable<Integer>>>

키 밸류 데이타 타입중에 값에 여러개의 값을 넣을 수 있는 Iterable 타입이 있다.

앞의 Transform 예제에서 언급된것과 같이 키가 cat이고, 그 값이 2,6,7,9 와 같이 여러 값을 가지는 형이 이러한 타입에 해당한다.

커스텀 데이타 타입

단순한 데이타 타입 말고도, 복잡한 데이타 형을 처리하기 위해서  커스텀 데이타 타입을 지원한다.

커스텀 데이타 타입은 오픈 소스 Avro의 데이타 모델을 사용한다. http://avro.apache.org/


예를 들어 어떤 키에 대해서 카운트값,평균,총합 그리고 윈도우의 시작과 끝 시간을 저장하는 데이타 타입 Stat가 있다고 가정하자. 이 데이타 타입은 다음과 같이 정의된다.

자바에서 일반적인 Value Object 형태로 정의를 하면되고, 단 앞에 어노테이션으로 @DefaultCoder(AvroCoder.class) 와 같이 Avro 데이타 클래스임을 정의하면 된다.


@DefaultCoder(AvroCoder.class)

static class Stat{

Float sum;

Float avg;

Integer count;

Integer key;

Instant wStart; // windowStartTime

Instant wEnd; // windowEndTime

public Instant getwStart() {

return wStart;

}

public Instant getwEnd() {

return wEnd;

}


public Float getSum() {

return sum;

}

public Float getAvg() {

return avg;

}

public Integer getCount() {

return count;

}


public Integer getKey(){

return key;

}


public Stat(){}

public Stat(Integer k,Instant start,Instant end,Integer c,Float s,Float a){

this.key = k;

this.count = c;

this.sum = s;

this.avg = a;

this.wStart = start;

this.wEnd = end;

}


}



윈도우

스트리밍 데이타 처리를 할때 가장 중요한 개념이 윈도우이다.

특히나  Unbounded 데이타를 이용한 스트리밍 처리에서 Grouping이나 Combine 시에는 반드시 윈도우를 사용해야 한다.Grouping이나 Combine과 같은 aggregation을 하지 않으면, Unbounded 데이타라도 윈도우 처리가 필요없다.

또한 Bounded 데이타도, 데이타에 타임 스탬프를 추가하면 윈도우를 사용하여, 시간대별 데이타를 처리할 수 있다.

예를 들어 일일 배치를 돌리는 구매 로그가 있을때, 각 데이타의 구매 시간이 있으면, 이 구매시간을 타임 스탬프로 지정하여 배치라도 윈도우 단위로 연산을 할 수 있다.

고정 윈도우 적용

윈도우를 적용하는 방법은 정말 간단하다.

PCollection 객체에 apply 메소드를 이용하여 Window.into 메서드를 이용하여 적용하면된다.

예를 들어서 아래와 같이 PCollection<String> 형 데이타인 items 객체가 있을때, 여기에 윈도우를 적용하려면 다음과 같이 하면 된다.

 PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)));

items.apply를 이용하여 윈도우를 적용하는데, 데이타 타입이 String이기 때문에, Window.<String>into 로 윈도우를 적용하였다.

윈도우 타입은 Fixed 윈도우이기 때문에, FixedWindows.of를 사용하였고, 윈도우 주기는 1분주기라서 1,TimeUnit.MINUTES를 적용하였다.

슬라이딩  윈도우 적용

슬라이딩 윈도우는 윈도우의 크기 (Duration)과 주기를 인자로 넘겨야 한다.

아래 코드는 5초 주기로 30분 크기의 윈도우를 생성하는 예제이다.

 PCollection<String> items = ...;

 PCollection<String> sliding_windowed_items = items.apply(    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

윈도우가 5초마다 생성이되고, 각 윈도우는 30분 단위의 크기를 가지고 있게 된다.

세션 윈도우 적용

세션 윈도우는 HTTP 세션 처럼 특정 사용자가 일정 시간동안 데이타가 올라오지 않으면, 처음 데이타가 올라온 시간 부터 데이타가 올라오지 않은 시간 까지를 윈도우르 묶어주는 개념이다.

앞의 고정 윈도우나, 세션 윈도우와는 다르게 반드시 키별로 세션을 묶기 때문에 키가 필요하다.


아래 예제는 각 사용자 별로 세션당 점수를 계산해주는 예제이다.

userEvents

 .apply(Window.named("WindowIntoSessions")

       .<KV<String, Integer>>into(

             Sessions.withGapDuration(Duration.standardMinutes(Duration.standardMinutes(10))))

   .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))

 // For this use, we care only about the existence of the session, not any particular

 // information aggregated over it, so the following is an efficient way to do that.

 .apply(Combine.perKey(x -> 0))

 // Get the duration per session.

 .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))



다른 윈도우들과 마찬가지로 Window.into를 이용하여 윈도우를 적용하는데, 데이타 형을 잘 보면 <KV<String, Integer>> 형으로 정의된것을 확인할 수 있다.

Sessions.withGapDuration으로 세션 윈도우를 정의한다. 이때 얼마간의 시간 동안 데이타가 없으면 세션으로 짜를지를 지정해줘야 하는데, Duration.standardMinutes(10) 를 이용하여 10분간 해당 사용자에 대해서 데이타가 없으면 해당 사용자(키)에 대해서 세션 윈도우를 자르도록 하였다.

윈도우 시간 조회하기

윈도우를 사용하다보면, 이 윈도우의 시작과 종료 시간이 필요할때가 있다. 예를 들어 고정 크기 윈도우를 적용한다음에, 이를 데이타 베이스등에 저장할때, 이 윈도우가 언제시작해서 언제끝난 윈도우인지를 조회하여 윈도우 시작 시간과 함께 값을 저장하고자 하는 케이스이다. “1시에 몇건, 1시 10분에 몇건의 데이타가 저장되었다”와 같은 시나리오이다.


현재 윈도우에 대한 정보를 얻으려면 DoFn 클래스를 구현할때, com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implement 해야, 윈도우에 대한 정보를 억세스 할 수 있다.


static class StaticsDoFn extends DoFn<KV<Integer,Iterable<Twit>>, KV<Integer,Stat>>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

:


IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

Instant e = w.end();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTime eTime = e.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss");

String str_stime = sTime.toString(dtf);

String str_etime = eTime.toString(dtf);

                                      :


윈도우에 대한 정보는 ProcessContext c에서, c.window()를 호출하면, IntervalWindow라는 클래스로 윈도우에 대한 정보를 보내주고, 윈도우의 시작 시간과 종료 시간은 IntervalWindow 클래스의 start()와 end() 메서드를 이용해서 조회할 수 있다. 이 조회된 윈도우의 시작과 종료 시간은 org.joda.time.Instant 타입으로 리턴되는데, 조금 더 친숙한 DateTime 포맷으로 변환을 하려면, Instant.toDate() 메서드를 사용하면 되고, 이때, TimeZone 을 지정해주면 로컬 타임으로 변환을 하여, 윈도우의 시작과 종료시간을 조회할 수 있다.

타임 스탬프

윈도우는 시간을 기준으로 처리를 하기 때문에, 이 시간을 정의하는 타임스탬프를 어떻게 다루는지를 이해할 필요가 있다.

타임 스탬프 생성

Pub/Sub을 이용하여 unbounded 데이타를 읽을 경우 타임스탬프가 Pub/Sub에 데이타가 들어간 시간을 Event time으로 하여, PCollection에 추가된다.

타임 스탬프 지정하기

Pub/Sub에서와 같이 자동으로 타임 스템프를 부여하는게 아니라, 모바일 디바이스에서 발생한 이벤트 타임이나, 애플리케이션 적으로 PCollection에 직접 타임스탬프를 부여할 수 있다.

PCollection에 타임 스탬프를 부여 하는 방법은 간단하다.

DoFn내에서,  ProcessContext를 다음 파이프라인 컴포넌트로 보낼때,  c.output(데이타) 대신, c.output(데이타, 타임 스탬프)를 사용하면 된다. 타임 스탬프의 데이타 타입은  org.joda.time.Instant 를 사용한다.

예를 들어서

c.outputWithTimestamp(c.element(), logTimeStamp);

와 같은 방법으로 사용을 한다.


모바일 서비스 분석등, 실제 시간에 근접한 분석을 하려면, 로그를 수집할때, 이벤트 발생 시간을 같이 REST API등을 통해서 수집하고, outputWithTimestamp를 이용하여, 수집된 이벤트 발생시간을  PCollection에 추가하는 방식으로 타임 스탬프를 반영 하는 것이 좋다.







저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


데이타 플로우 개발환경 설정하기


조대협 (http://bcho.tistory.com)


데이타 플로우에 대한 이해가 끝났으면 이제 직접 코딩을 해보자. 데이타 플로우에 대한 개념등은 http://bcho.tistory.com/search/dataflow 를 참고하기 바란다.

데이타 플로우에서 지원하는 프로그래밍 언어는 자바와 파이썬이다. 파이썬은 아직 알파버전으로, 이 글에서는 자바를 이용해서 설명한다.


자바를 이용한 개발환경 설정은 이클립스 개발환경과 maven을 이용한 개발 환경 두가지가 있는데, 여기서는 조금 더 손 쉬운 이클립스 환경을 기준으로 설명한다.

메이븐 기반의 개발 환경 설정은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven 를 참고하기 바란다.


사전준비

클라우드 계정 생성 및 빌링 설정

구글 클라우드 계정 생성 및 빌링 설정 방법은 앞서 다른글에서도 많이 설명하였기 때문에 다시 설명하지 않는다. 자세한 내용은 http://bcho.tistory.com/1107 를 참고하기 바란다.

API 사용 설정하기

다음 데이타플로우와 기타 같이 사용할 제품들의 API를 사용하기 위해서 이를 설정해줘야 한다.

구글 클라우드 콘솔에서 API Manager를 선택한후 대쉬 보드에서 아래 서비스들을 선택하여 API를 Enable 해준다. Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Datastore APIs.





구글 Cloud SDK 설정

구글 데이타 플로우를 프로그래밍 하기 위해서, 데이타 플로우 API를 호출하기 위한 SDK와 조작을 위한 CLI (Command Line Interface)가 필요한데, 이는 구글 Cloud SDK를 설치하면 같이 설치가 된다.

클라우드 SDK 설치는 https://cloud.google.com/sdk/docs/ 를 참고하면 된다.

gcloud 인증하기

구글 Cloud SDK 설치가 끝났으면, gcloud 명령어를 사용하기 위해서 gcloud 명령어를 초기화 한다.

초기화는 어떤 구글 클라우드 프로젝트를 사용할것인지, 그리고 사용자 아이디등으로 인증을 하는 절차를 거친다.

프롬프트 상에서

%gcloud init

명령을 실행하여, 수행한다.

이클립스 환경 설정

이제 구글 클라우드 프로젝트 설정과, 이를 호출하기 위한 SDK 환경 설치가 끝났다. 이제 이클립스 기반의 개발 환경을 설정해보자.

이클립스 설치하기

이클립스는 4.4 버전 이상을 설치하고, JDK는 1.7 이상을 설정한다.

플러그인 설치하기

다음 구글 데이타 플로우 개발환경을 위한 이클립스 플러그인을 설치한다.

이클립스에서 Help > Install New Software를 선택한 다음에, Work with 텍스트 박스에  https://dl.google.com/dataflow/eclipse/  을 입력한다.


다음으로 Google Cloud Dataflow를 선택하여 설치를 진행한다.

설치가 끝난 후 확인은 이클립스에서 New > Project를 하면, 위자드를 선택하는 화면에서 아래와 같이 Google Cloud Platform이라는 폴더와 함께 그 안에 “Cloud Dataflow Java Project”를 선택할 수 있는 화면이 나온것을 볼 수 있다.



헬로우 데이타 플로우

개발 환경 설정이 끝났으니, 이제 간단한 데이타 플로우 프로그램을 하나 만들어보자.

이 프로그램은 단어들을 읽어드린 후에, 단어들의 발생 횟수를 카운트 해 주는 파이프라인이다.



단어들을 읽어드린 후 toUpper라는 트랜스폼에서, 각 단어들을 대문자로 변환한 후, Count라는 트랜스폼에서 단어별로 발생횔 수를 카운트 한후에, 이를 Key Value (단어:발생횟수)로 리턴한 후, Print라는 트랜스폼에서 화면으로 결과를 출력해주는 예제이다.


프로젝트 생성

예제 파이프라인을 만들기 위해서, 이클립스에서 프로젝트를 생성해보자. New > Project를 선택한 후 에, 아래 그림과 같이 Google Cloud Platform 폴더에서 Cloud Dataflow Java Project를 선택한다



다음 프로젝트에 대해서  Group ID, Artifact ID 그리고 패키지 명등을 입력한다.



다음 메뉴로 넘어가면 구글 데이타 플로우를 실행하기 위한 디테일한 정보를 넣어야 하는데,




프로젝트 명과, “Cloud Storage Staging Location”이라는 정보를 입력해야 한다. Cloud Storage Staging Location은 Google Cloud Storage 의 버킷명으로, 데이타 플로우 애플리케이션 코드가 로딩 되는 장소이다.

데이타플로우 애플리케이션을 구글 클라우드에서 실행하게 되면, 애플리케이션 코드와 애플리케이션을 실행하기 위한 라이브러리들이 각각의 워커 노드로 배포 되는데, 배포를 위해서 먼저 클라이언트에서 부터, 이러한 실행 코드를 Google Cloud Storage에 올려놓게 된다. 앞에서 정의하는 “Cloud Storage Staging Location”은, 이 클라우드 스토리지 버킷에 대한 경로 정의이다.

클라우드 스토리지 버킷은 아래와 같인 Google Cloud Storage 메뉴에서 아래와 같이 생성할 수 있다.


코드 제작

그러면 코드를 작성해 보자.



package com.terry.df;


import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;

import com.google.cloud.dataflow.sdk.values.KV;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


public class StarterPipeline {

 private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);


 public static void main(String[] args) {

   Pipeline p = Pipeline.create(

       PipelineOptionsFactory.fromArgs(args).withValidation().create());


   p.apply(Create.of("Hello", "World","hello","boy","hello","girl"))

   .apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))

   .apply(Count.<String>perElement())

   .apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>(){

@Override

public void processElement(ProcessContext c) throws Exception {

LOG.info(c.element().getKey() + " count:"+c.element().getValue());

}

   }));


   p.run();

 }

}



(참고 : 위의 소스코드는 https://github.com/bwcho75/googledataflow/tree/master/HelloDataFlow 에 있다.)


처음 p.apply(Create.of…)에서, 데이타를 생성하였다.

다음으로 .apply(ParDo.named("toUpper").of(new DoFn<String, String>() 에서 소문자를 대문자로 다 치완하는 데, ParDo는 이 작업을 여러 노드에서 병렬로 실행하겠다는 선언이고, named는 이 트랜스폼의 이름을 “toUpper”로 정의하겠다는 정의이다. (나중에 디버깅에 유용한다.) 다음으로, 트랜스폼 함수는 DoFn으로 정의했는데, <String,String>으로 정의되어 앞의 인자가 Input 그리고 뒤의 인자가 Output의 데이타 형으로 String 인자를 받아서, String 인자로 리턴하겠다는 것이다.


.apply(Count.<String>perElement()) 은 데이타플로우에서 미리 정의된, 트랜스폼으로,  <String>으로 된 데이타를 받아서 엘리먼트당 카운트를 해서 <String,Long> 형으로 리턴을 해준다. 즉 String형의 단어마다 카운트를 한 결과를 Long형으로 넣어서 이를 키밸류(KV)형식으로 묶어서 리턴해준다.

.apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>() 에서는 앞에서 전달해준  String,Long형이 키밸류형으로 정의된 KV<String,Long>형의 데이타를 받아서, 출력해주고, 마지막 트랜스폼이기 때문에 더 이상 뒤로 데이타를 넘기지 않을 것이기 때문에, Output의 인지 타입을 Void로 선언하였다.

실행

코드를 작성이 끝났으면 실제로 실행해보자 Run As에서 Dataflow Pipeline을 선택하면 실행을 할 수 있다.



이때 다음과 같이 실행환경을 설정할 수 있다.



여기서 Runner에 대한 개념을 짚고 넘어갈 필요가 있다.

Direct Pipeline Runner

Direct Pipeline Runner는 데이타플로우 코드를 로컬 개발 환경 (노트북이나 데스크탑)에서 실행하고자 할때 선택할 수 있는 러너이다. 주로 개발이나 테스트에서 사용할 수 있는데, 다른 클라우드 서비스 예를 들어  Pub/Sub이나 빅쿼리등이랑 연동이 되는 파이프라인의 경우에는 DirectPipelineRunner를 사용할 수 없으니 주의하기 바란다.

DataflowPipelineRunner

클라우드 환경에서 데이타 플로우를 실행하기 위해서는 DataflowPipelineRunner와  BlockingDataflowPipelineRunner 두 가지가 있다.

DataflowPipelineRunner는 데이타 플로우 애플리케이션을 구글 클라우드에서 실행을 해주는데, 데이타 플로우 잡을 클라우드에서 실행해놓고, 로컬 애플리케이션을 바로 종료 한다. (클라우드에 접수된 잡은 클라우드에서 계속 실행된다.)

BlockingDataflowPipelineRunner

BlockingDataflowPipelineRunner는 데이타 플로우잡을 구글 클라우드에서 실행해놓게 해놓고, 잡이 끝날때 까지 로컬 애플리케이션을 대기하도록 한다.  

배치와 같이 끝이 있는 경우에는 필요에 따라서 사용할 수 있다. 스트리밍의 경우 BlockingDataflowPipelineRunner를 사용하게 되면 스트리밍 잡을 명시적으로 끊지 않는 이상 계속해서 로컬 애플리케이션이 실행되는 상태가 된다.


DirectPipelineRunner로 실행을 해보면 다음과 같이 이클립스 콘솔에서 결과가 출력되는 것을 볼 수 있다.


BODY는 1,  GIRL 은 1, HELLO는 3개 그리고 WORLD는 1개가 출력되는 것을 볼 수 있다.


이번에는 클라우드에 배포를 하고 실행해보자, Run As에서, BlockingDataflowPipelineRunner를 선택하여 실행해보자.

실행을 하면 코드가 자동으로 클라우드로 배포 되서 실행되는 것을 확인할 수 있다. 구글 클라우드 콘솔의 데이타 플로우 메뉴를 보면, 새로운 잡이 생성된것을 확인할 수 있다.


해당 잡을 선택해서 들어가 보면 현재 잡의 실행 상황과 함께, 파이프라인에서 단계별 시간이나 기타 상세한 로그를 볼 수 있다.



데이타 플로우 애플리케이션이 기동이 완료되면, Logs 메뉴에서 Worker Logs라는 버튼을 누르면 각 워커 노드에서의 로그를 볼 수 있다.


Worker Logs를 누르면 다음과 같이  GIRL,WORLD,BOY,HELLO에 대한 count 수를 출력한 로그를 확인할 수 있다.


참고 : Logs 메뉴로 들어가서  Job Logs에서  Minimum serverity를 “All” 로 선택하면 전체 작업 상황을 알 수 있는데, 애플리케이션을 실행했다고 바로 데이타 플로우의 파이프라인이 실행되는 것이 아니라, 애플리케이션 코드가 구글 클라우드 스토리에 로드되고, 이 로드된 코드들이 각각의 워커 노드에 배포가 된후에, 워커 노드가 기동이 되야 잡이 실제로 수행된다.


워커가 제대로 기동되었는지는 Job Logs에서 Mimimum serverity를 All로 한후에 다음과 같이 “Worker have started successfully”라는 로그가 나오면 그때 부터 데이타 플로우 잡을 실행을 시작한다고 생각하면 된다.








저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Fluentd + Bigquery + Jupyter를 이용한 초간단 BI 구축하기


조대협

얼마전에 빅데이타의 전문가로 유명한 김형준님이 "Presto + Zeppelin을 이용한 초간단 BI 구축 사례"라는 발표 자료를 보았다. http://www.slideshare.net/babokim/presto-zeppelin-bi 오픈 소스 기술들을 조합하여, 초간단하게 빅데이타 분석 플랫폼을 만든 사례 인데, 상당히 실용적이기도 하고, 좋은 조합인것 같아서, 마침 구글 빅쿼리에 대한 자료를 정리하던중 비슷한 시나리오로 BI 대쉬 보드를 만들어보았다.

Fluentd를 이용해서 실시간으로 데이타를 수집하고, 이를 빅쿼리에 저장한 다음에 iPython nodebook (aka Jupyter)로 대쉬보드를 만드는 예제이다. 일부 제품에 대한 지식이 없었음에도 불구하고 실제 설정은 대략 2시간 정도 걸렸다.


Fluentd 설치

예제는 Google Cloud에서 Ubuntu Linux 14.x VM에서 Fluentd를 이용하여 Twitter에서 데이타를 읽은 후, 빅쿼리에 데이타를 로딩하는 시나리오이다.

VM 생성

Fluentd를 설치할 VM을 생성해보자. 구글 클라우드 콘솔에서 아래 그림과 같이 VM을 생성할때, “Identity and API access” 부분에  “Allow full access to all Cloud APIs”를 선택한다. 이를 선택해서 이 VM이 모든 구글 클라우드 API에 대한 접근 권한 (BigQuery 포함)을 가지도록 한다.


tdagent 설치

생성한 VM에 fluentd의 로그 수집 에이전트인 tdagent를 설치한다.

tdagent는 OS나, 또는 같은 OS라도 OS 버전별로 설치 방법이 다르기 때문에, 버전별 설치 방법은 http://www.fluentd.org를 참고하기 바란다.

여기서는 Ubuntu 14.x를 기준으로 진행을 하였다.

다음 명령어를 실행하면 tdagent가 설치된다.

% curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

설치한 후 에이전트를 실행해서 확인해보자. 다음 명령으로 agent를 실행한 후에,

% sudo /etc/init.d/td-agent restart

실행이 끝난 후에 다음 명령으로 설치를 확인한다.

% sudo /etc/init.d/td-agent status


참고 (tdagent 관련 명령어)

tdagent 기동 - $sudo /etc/init.d/td-agent start
tdagent 정지 - $sudo /etc/init.d/td-agent stop
tdagent 재시작 - $sudo /etc/init.d/td-agent restart
tdagent 상태확인 - $sudo /etc/init.d/td-agent status




트위터 Input 설정하기

tdagent 에이전트 설치가 끝났으면 fluentd를 이용해서 트위터 피드를 읽어드리도록 해보자.

트위터 API 키 받기

트위터 피드는 트위터에서 제공하는 OPEN API를 통해서 읽어드린다. 그래서 이 OPEN API에 접근하기 위해서는 OPEN API키가 필요하다.

OPEN API 키는 https://apps.twitter.com/ 에 접속하고 Create New App 메뉴를 이용하면 새로운 앱을 등록할 수 있고, 여기에 Fluentd 앱을 정의해서 정보를 넣어주고 Key and secrect을 생성해주면 다음과 같이 키가 생성된 것을 웹에서 확인할 수 있다.


여기서 필요한 키값은 Consumer Key, Consumer Secret, Access Token, Access Token Secret 4가지가 필요하다.

트위터 플러그인 설치하기

API 접근을 위한 API Key를 모두 얻었으면 이제 fluentd에서 트위터 피드를 읽기 위한 트위터 플러그인을 설치해보자.

트위터 API는 libssl에 대한 의존성이 있기 때문에, libssl를 먼저 설치한다.

%sudo apt-get install build-essential libssl-dev

다음 트위터 플러그인이 사용하는 eventmachine 플러그인과, 트위터 플러그인을 설치한다.

% sudo td-agent-gem install eventmachine

% sudo td-agent-gem install fluent-plugin-twitter

설정하기

플러그인 설치가 끝났으면 설정을 해보자. 설정 파일은 /etc/td-agent/td-agent.conf 에 있다.

이 파일을 다음과 같이 편집하자.


<source>

 type twitter

 consumer_key        {앞서 트위터 콘솔에서 받은 Consumer Key}

 consumer_secret     {앞서 트위터 콘솔에서 받은 Consumer  secret}

 oauth_token         {앞서 트위터 콘솔에서 받은 Access token}

 oauth_token_secret {앞서 트위터 콘솔에서 받은 Access token secret}

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             galaxy,game        # 검색어

 output_format       nest                   # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type stdout

</match>


이 설정 파일은 keyword에 등록된 “galaxy”와 “game” 이라는 키워드를 찾아서, 읽어드린후 <match input.twitter.sampling> 에 의해서, 읽어드린 내용을 stdout으로 출력해주는 설정이다.

테스트

설정이 끝났으면 확인을 해보자

% sudo /etc/init.d/td-agent restart

명령어를 수행하여, td-agent를 리스타트 해서 새로운 config 파일이 반영되도록 하고

% tail -f /var/log/td-agent/td-agent.log          

를 통해서 stdout으로 올라오는 로그를 확인하자. 제대로 데이타가 수집되는 것을 확인했으면 다음 명령어를 이용해서, td-agent를 정지 시키자.

% sudo /etc/init.d/td-agent stop


빅쿼리로 저장하기

twitter로 부터 피드를 읽어드리는 플러그인이 정상적으로 작동함을 확인하였으면, 이번에는 읽어드린 데이타를 빅쿼리로 저장해보자.

빅쿼리 플러그인 설치 및 테이블 생성

빅쿼리로 데이타를 쓰기 위해서 빅쿼리 플러그인을 설치한다.

% sudo td-agent-gem install fluent-plugin-bigquery


다음으로 빅쿼리 프로젝트에서 트위터 데이타를 저장할 데이타셋과 테이블을 생성한다.

데이타 셋 이름은 편의상 “twitter”라고 하고, 테이블은 “ timeline”이라고 하고 생성을 하겠다.

테이블의 스키마는 트위터 피드에 대한 데이타 구조를 빅쿼리 스키마로 만들어놓은 스키마가 이미 https://gist.github.com/Salinger/ef39b81ad2c48516b596

에 있기 때문에, 이 스키마 파일을 읽어서 빅쿼리 콘솔에서 아래 그림과 같이 Schema 부분에 Copy & Paste를 해서 붙이면 테이블이 생성된다.


설정하기

테이블이 생성이 되었으면 fluentd 설정 파일을 수정하여 트위터 피드를 이 테이블에 저장하도록 설정한다.


<source>
 type twitter
   consumer_key        {앞서 트위터 콘솔에서 받은 Consumer Key}

 consumer_secret     {앞서 트위터 콘솔에서 받은 Consumer  secret}

 oauth_token         {앞서 트위터 콘솔에서 받은 Access token}

 oauth_token_secret {앞서 트위터 콘솔에서 받은 Access token secret}

 tag                 input.twitter.sampling  # Required
 timeline            sampling                # Required (tracking or sampling or location or userstream)
 keyword             hillary,clinton,donald,trump
 output_format       nest                    # Optional (nest or flat or simple[default])
</source>

<match input.twitter.sampling>
 type copy
<store>
  type bigquery
  buffer_type file
  buffer_path /var/log/td-agent/buffer/twi.*.buf
  method insert

  auth_method compute_engine
  project useful-hour-138023
  dataset twitter
  table timeline

  flush_interval 1
  buffer_chunk_limit 1000000
  buffer_queue_limit 5000
  flush_interval 1
  try_flush_interval 0.05
  num_threads 4
  queue_chunk_flush_interval 0.01

  time_format %s
  time_field log_time
  schema_path /home/terrycho/bq_tweet.json
  log_level error
</store>
</match>


기존 설정 파일에서 <match input.twitter.sampling> 부분을 빅쿼리로 변경하였다. <store>에서 type을 bigquery로 변경하였다.

중요한 필드들을 살펴보면

  • buffer_type, buffer_path : fluentd는 트위터에서 읽어드리는 데이타를 건건이 bigquery에 저장하는게 아니라 일정 단위로 모아서 bigquery에 저장한다. 그래서 buffer를 사용하는데, buffer를 파일을 이용하고, 이 파일의 위치를 지정해주었다.

  • auth_method, project,dataset,table : 데이타를 저장한 bigquery의 project,dataset,table 명을 정한다. 그리고 auth_method를 통해서 인증 방법을 설정하는데, 일반적으로는 service account에 대한 json 파일을 사용하는데, 여기서는 구글 클라우드내에 VM을 생성하였고, 앞에서 VM 생성시에 Bigquery에 대한 접근 권한을 이미 주었기 때문에, 인증 방식을 compute_engine으로 설정하면 된다.

  • flush_interval 은 어떤 주기로 버퍼된 데이타를 bigquery로 저장할것인지를 정한다. 여기서는 1초 단위로 저장하도록 하였다.

  • 그리고 중요한것중 하나가 schema_path 인데, 저장하고자 하는 bigquery 테이블의 스키마이다. 앞에서 테이블 생성에서 사용한 https://gist.github.com/Salinger/ef39b81ad2c48516b596 에서 다운 받았던 *.json으로 정의된 스키마 파일의 경로를 지정해주면 된다.

실행하기

모든 설정이 끝났으면

%sudo /etc/init.d/td-agent restart

명령을 이용해서 tdagent를 재기동하자.

그리고 빅쿼리 콘솔에서 “select count(*) from 테이블명” 명령을 사용하면 아래와 같이 카운트 수가 매번 올라가면서 데이타가 저장되는 것을 확인할 수 있다.


Datalab으로 대쉬보드 만들기

datalab은 오픈소스 iPython note의 구글 클라우드 버전이다. 자동으로 구글 클라우드 내의 앱앤진 내에 설치해주고, 구글 클라우드의 빅데이타 인프라 (빅쿼리등)과 손쉽게 연동되며, 구글 차트를 내장하고 있어서 그래프도 손쉽게 그려줄 수 있다.


데이타랩 준비하기

데이타랩을 사용하기 위해서는 https://datalab.cloud.google.com/ 에 접속하고, 로그인을 하면 다음과 같이 프로젝트를 선택하는 화면이 나온다.


만약에 아직 데이타랩을 설치 하지 않았으면 가운데 Deploy 버튼만 활성화가 된다. Deploy 버튼을 누르면 자동으로 데이타랩이 설치된다. 설치가 끝나면 Start 버튼이 활성화 된다. Start 버튼을 누르면 데이타 랩으로 들어갈 수 있다.

새로운 노트 만들기

다음은 데이타랩의 초기화면이다.


우리는 여기서, 새로운 노트를 만들어서 앞서 빅쿼리로 읽어드린 데이타를  lang(언어)별로 그룹핑을 해서 카운트하는 쿼리를 실행하고, 그 결과를 그래프로 만들것이다.

위의 초기화면에서 “+Notebook” 버튼을 눌려서 새로운 노트북을 만들어보자


노트화면이 로딩되었으면 상단의 메뉴를 보자


+Add code와, +Add Markdown 버튼을 볼 수 있는데,  Add Code는 파이썬이나 SQL과 같은 프로그래밍 언어를 정의하고 실행할 수 있는 공간이고, +Add Markdown은 일반적인 텍스트나 이미지를 통해서 간단한 글을 쓸 수 있는 공간을 만들어준다.

이렇게 코드써가면서 직접 실행해보고 그 결과를 확인하면서 그에 대한 내용을 설명하는 내용을 Markdown으로 작성하는 것과 같이 마치 노트에 계산을 해나가는 것처럼 써 나가기 때문에 이런 류의 프로그램을 노트북이라고 한다. (유사한 프로그램으로는 zeppelin 등이 있다 https://zeppelin.apache.org/)

쿼리 실행하기

그러면 Add code를 통해서 코드 섹션을 추가하고 SQL 문장을 추가해보자. 다음은 빅쿼리 트위터 테이블에서 lang 별로 그룹핑을 해서 카운트를 하는 SQL 문장이다.


이 문장을 실행하려면 노트북 상단의 “Run” 버튼을 누르면 된다.

다음과 같이 결과가 쿼리 바로 아래에 출력되는 것을 볼 수 있다.




그래프 그리기

다음으로 결과로 그래프를 그려보자

다음과 같이 두개의 코드 블럭을 추가하자


첫번째 코드 블럭에는 SQL 문장을 수행하는데 이때 --module twitter라고 정의를 해주면 결과가 twitter라는 모듈에 저장이 된다.

두번째 코드 블럭은 그래프를 그리기 위해서 chart 명령어를 이용하고 차트 타입은 pie로, 그래프의 x,y 축은 lang과, lang_count로 지정하고, 데이타 소스는  --date를 이용해서 앞의 쿼리 결과를 저장한 twitter로 지정한다.

다음으로 Run 버튼을 이용해서 쿼리를 수행해보면 다음과 같은 결과 화면을 얻을 수 있다.





지금까지 간략하게 Fluentd를 통해서 데이타를 수집하고 빅쿼리에 저장한 후, 데이타랩을 통해서 분석 및 리포팅을 하는 간단한 시나리오를 살펴보았다. fluentd나 데이타랩에 대한 사전적인 지식이 없었는데, 필자의 경우 이를 만드는데 대략 2시간의 시간이 소요되었다. 2시간의 시간으로 수 PB급의 빅데이타를 수집할 수 있고 분석할 수 있는 시스템을 구축할 수 있었다. 예전 같으면 하둡과 스팍 인스톨과 몇시간이 걸렸는데, 요즘 드는 생각은 빅데이타에 대한 접근 장벽이 많이 무너졌다고나 할까.

참고 자료


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개


조대협 (http://bcho.tistroy.com)


빅데이타 분석에 있어서, 아키텍쳐적으로 중요한 모듈중의 하나는 여러 서버로 부터 생성되는 데이타를 어떻게 모을 것인가이다. 얼마전에, 일본의 사례를 보다가 눈에 띄는 솔루션이 있어서 주말을 통해서 이런 저런 테스트를 해봤다.


Embulk 소개

Embulk라는 솔루션인데, fluentd를 만들었던 사람이 만들었다고 한다.

여러 종류의 데이타 소스에서 데이타를 읽어서 로딩을 할 수 있다. 주요 특징을 보면

  • 플러그인 형태로 여러개의 소스와 타겟을 지원한다.
    jRuby로 개발이 되어서 ruby gem을 이용하여 손쉽게 플러그인을 설치할 수 있다.

  • 병렬 로딩이 가능하다.
    예를 들어 여러개의 파일을 동시에 로딩하거나 또는 하나의 큰 파일이라도 자동으로 여러개의 파일로 쪼게서 병렬로 로딩을 함으로써, 로딩 속도를 올릴 수 있다.

  • 변환이 가능하다.
    파일 포맷 변환뿐 아니라, 각 필드에 대한 형 변환 그리고, 간단한 필드 맵핑 등이 가능하다.

  • 스키마 예측 (Schema guessing)
    입력 데이타를 보고, 자동으로 입력 데이타의 스키마(테이블 구조)를 예측한다. 일일이 설정을 하려면 귀찮은 일인데, 자동으로 스키마를 인식해주시기 때문에, 설정양을 줄일 수 있다.

전제적인 개념은 미니 ETL과 유사하다고 볼 수 있는데, 그 사용법이 매우 쉽다.

Embulk 설치

이 글에서는 로컬에 있는 CSV 포맷의 파일을 구글 클라우드의 빅쿼리로 로딩하는 예제를 통해서 어떻게 Embulk를 사용하는지를 알아보겠다.

VM 생성

테스트는 구글 클라우드 VM에서 진행한다. 4코어 Ubuntu VM을 생성하고 테스트 데이타를 복사하였다.

VM을 생성할때, 빅쿼리 API를 사용할 것이기 때문에, Cloud API access scopes에 BigQuery API access 권한을 반드시 부여해야 한다.


이 예제에서는 VM 생성시 모든 Cloud API에 대한 사용권한을 부여한체 생성하였다. VM을 생성한 후에, 콘솔에서 VM 상세 정보를 확인해보면 위의 그림과 같이 “This instance has full API access to all Google Cloud services.”로, 모든  구글  클라우드 API에 대한 권한을 가지고 있는 것을 확인할 수 있다.

자바 설치

구글 Ubuntu VM에는 디폴트로 자바가 설치되어있지 않기 때문에, JVM을 설치한다.

% sudo apt-get update

% sudo apt-get install default-jre

Embulk 설치

JVM 설치가 끝났으면 Embulk를 설치해보자. 다음 명령어를 실행하면 Embulk 가 설치된다.

% curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
% source ~/.bashrc

Embulk는 ~/.embulk 디렉토리에 설치가 된다.

다음으로, 빅쿼리에 결과를 쓸 예정이기 때문에, 빅쿼리 Output 플러그인을 설치한다.

%embulk gem install embulk-output-bigquery


Embulk 로 빅쿼리에 CSV 파일 로딩하기

로딩할 데이타 살펴보기

로딩에 사용한 데이타는 게임 이벤트에 대한 데이타를 시뮬레이션 해놓은 것으로, 사용자가 NPC를 만나서 전투를 하는 각각의 이벤트를 기록해놓은 파일이다. 파일이름은 events000000000001 CSV 파일 포맷이고 총 1220395 레코드에, 243 MB의 크기이며 데이타 포맷은 다음과 같다.


파일 포맷은 다음과 같다.


eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


첫줄에, CSV 파일에 대한 컬럼명이 들어가고 두번째 줄 부터, “,” delimiter를 이용하여 각 컬럼을 구별하여 실 데이타가 들어가 있다.

스키마 예측을 통하여 자동으로 Config 파일 생성하기

이제, Embulk를 통해서 이 파일을 로딩하기 위해서, config 파일을 생성해보자.

Embulk에서 config 파일은 스키마 자동 예측을 통해서 자동으로 생성해낼 수 있다. Config 파일을 생성하기 위해서는 input과 output 에 대한 기본 정보를 기술해줘야 하는데, 다음과 같이 seed.yml 파일에 기본 정보를 기술한다.


in:  

 type: file  

 path_prefix: "/home/terrycho/data/events"

out:  

 type: bigquery


path_prefix에는 파일명을 정의하는데, /home/terrycho/data/events는 /home/terrycho/data/ 디렉토리내에 events*로 시작하는 모든 파일에 대해서 로딩을 하겠다는 정의이다.


seed.yml 파일 설정이 끝났으면 config 파일을 생성해보자

% embulk guess ./seed.yml -o config.yml

명령을 실행하면 아래와 같이 config.yml 파일이 생성된다.


in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: string}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out: {type: bigquery}


생성된 config.yml 파일을 보면 firstLogin 컬럼의 데이타 형이 string으로 되어 있는 것을 볼 수 있다. 빅쿼리 테이블에서 이 필드의 형은 실제로는 boolean이다. 아무래도 자동 인식이기 때문에, 이렇게 형들이 다르게 인식되는 경우가 있기 때문에, 생성 후에는 반드시 검토를 하고 알맞은 형으로 수정을 해줘야 한다.


다음으로 위의 파일에 데이타를 로딩할 빅쿼리에 대한 정보를 정의해줘야 한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: boolean}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 table: game_event

 source_format: CSV


“out:” 부분을 위와 같이 수정하였다.

mode는 append 모드로, 기존 파일에 데이타를 붙이는 모드로 하였다. auth_method에는 빅쿼리 API 호출을 위한 인증 방식을 정의하는데, 구글 클라우드의 VM에서 호출하기 때문에, compute_engine이라는 인증 방식을 사용하였다. (구글 클라우드의 VM에서 같은 프로젝트 내의 빅쿼리 API를 호출할 경우에는 별도의 인증을 생략할 수 있다.) 다른 인프라드에서 호출할 경우에는 IAM에서 Service account를 생성한 후에, json  파일을 다운 받아서, json 파일 인증 방식으로 변경하고, 다운 로드 받은 json 파일을 지정해주면 된다.

다음으로, project,dataset,table에, 로딩할 빅쿼리 데이블에 대한 프로젝트명, 데이타셋명, 테이블명을 기술해주었다. 그리고 마지막으로 입력 포맷이 CSV임을 source_format에서 CSV로 정의하였다.


이제 데이타 로딩을 위한 모든 준비가 끝났다.

Config 파일 테스트

데이타 로딩을 하기 전에, 이 config 파일이 제대로 작동하는지 테스트를 해보자

%embulk preview config.yml

의 명령어는 데이타를 읽어서 제대로 파싱을 하는지 설정 파일은 문제가 없는지 테스트를 해주는 명령어이다.

명령을 실행하면 다음과 같이 일부 데이타를 읽어서 파싱을 하고 결과를 보여주는 것을 볼 수 있다.



실행하기

테스트가 끝났으면 실제로 데이타를 로딩해보자. 로딩은 아래와 같이 embulk run 명령어를 사용하면 된다.

%embulk run config.yml

실제로 실행한 결과 약 12분이 소요되었다.


멀티 쓰레드를 이용하여 로딩 속도 올리기

앞에서 설명하였듯이, Embulk는 패레럴 로딩이 지원된다. 아래와 같이 config.yml 파일에 exec이라는 부분에, max_threads수와, min_output_tasks 수를 정해주면 되는데, min_output_tasks 수는 최소로 동시 실행할 로딩 테스크 수이다. 5로 정했기 때문에, 이 시나리오에서는 하나의 CSV 파일을 업로드 하기 때문에, 이 파일을 5개의 작은 파일로 잘라서 동시에 5개의 쓰레드로 동시에 업로딩 한다.


exec:

 max_threads: 20

 min_output_tasks: 5

in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

 :


실제로 테스트한 결과 디폴트 설정에서는 초당 약 1200줄을 업로드하였는데, 반하여, min_output_tasks를 5개로 하였을때는 초당 2000개 내외를 업로드 하였다. min_output_tasks를 10개,20개로 올려봤으나 성능은 비슷하였다. (아마 튜닝을 잘못한듯)

Parser-none으로 로딩 속도 올리기

앞의 시나리오는 데이타 라인을 각각 읽어서 컬럼을 일일이 파싱하고 이를 입력하도록 하는 시나리오였다.

만약에 CSV나 JSON 입력 파일이 빅쿼리 입력 포맷에 맞도록 이미 포매팅이 되어있다면, 일일이 파싱할 필요가 없다.

그냥 파일을 읽어서 파싱 없이 바로 빅쿼리에 insert만 하면되기 때문에, 이 경우에는 Parser를 제거하면 되는데, Parsing을 하지 않는 Parser로 embulk-parser-none이 있다.

이 Parser 다음과 같이 설치한다.

$ embulk gem install embulk-parser-none

다음 config 파일을 다음과 같이 수정한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001_nohead

 parser:

   type: none

   column_name: payload

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 schema_file: /home/terrycho/data/gameevent.schema.json

 table: game_event

 payload_column_index: 0


이때 중요한것중 하나는 데이타 파일 (CSV)파일 첫줄에 데이타에 대한 컬럼 정보가 들어가 있으면 안된다.

그래서 아래와 같이 원본 데이타 파일에서 첫줄을 지운다.

eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


다음 embulk run을 이용하여 이 config 파일을 실행해보면 같은 데이타인데도 로딩 타임이 약 50초 정도 밖에 소요되지 않는 것을 확인할 수 있다.

빅쿼리 관련 몇가지 추가 옵션

이외에도 다양한 옵션이 존재하기 때문에, 빅쿼리 output 플러그인 페이지인 https://github.com/embulk/embulk-output-bigquery 를 참고하기 바란다.

자동으로 중복을 제거하는 기능이나, 로딩할때 마다 동적으로 빅쿼리 테이블을 생성하는 기능등이 있으니 반드시 참고하기 바란다.

GCS를 경유하는 업로딩

Embulk의 패레럴 로딩이 좋기는 하지만 의외의 문제가 발생할 수 있는 부분이 하나가 있는데, 하나의 파일을 로딩하는데 Embulk는 여러개의 태스크로 병렬 처리를 하기 때문에, 빅쿼리 입장에서는 각각의 태스크가 빅쿼리 로딩 JOB으로 인식이 될 수 있다. 일반적으로 빅쿼리 JOB은 하루에 10,000개만 실행할 수 있는 제약을 가지고 있다. 그래서 만약에 데이타 로딩이 많을 경우 이런 병렬 로딩은 JOB 수를 깍아 먹는 원인이 될 수 있는데, bigquery output 플러그인에서는 다음과 같은 해법을 제공한다.


빅쿼리로 데이타를 로딩할때 GCS (Google Cloud Storage)를 사용하여, 와일드카드 (*)를 사용할 경우에는 하나의 디렉토리에 있는 여러 파일을 병렬로 로딩할 수 있으며, 이때 와일드 카드를 사용한 JOB은 하나의 JOB으로 인식된다. (병렬로 여러 파일을 로딩하더라도)


그래서 out 옵션에 다음과 같이 GCS  관련 옵션을 설정해주면 파일을 직접 로컬에서 로딩하는 것이 아니라, 처리를 다 끝난 파일을 GCS 버킷으로 업로딩한 후에, GCS 버킷에서 로딩을 하게 되기 때문에, JOB수를 줄일 수 있다.


out:

 type: bigquery

 gcs_bucket: bucket_name

 auto_create_gcs_bucket: false


성능과 활용도에 대한 분석

각 시나리오에 대한 성능 테스트 결과 값은 다음과 같다.

CSV를 구글에서 제공되는 bq load 명령어를 이용해도 108초가 나오는데 반해서, non-parser를 이용하면 파일을 자동으로 쪼게서 보내기 때문에, bq load를 이용하여 하나의 파일로 업로드 하는 것보다 높은 성능이 나온다.


시나리오

성능

bq load 명령어를 이용한 로딩

108초

CSV 파서를 사용한 경우

12분

non parser를 사용한 경우

50초


하나 고려할 사항은 Parser나 Filter의 경우 ruby로 개발된 것이 있고, java로 개발된 것들이 있는데, ruby로 개발된 플러그인의 경우 성능이 java 대비 많이 느리기 때문에 가급적이면 java로 개발된것을 사용하도록 한다.


다양한 데이타 소스와 저장소가 지원이 되고, 설정이 매우 간단하며 간단한 포맷 변환등이 지원되는 만큼, 쉽고 빠르게 데이타 연동 파이프라인을 구축하는데 활용도가 매우 높다. 이와 유사한 솔루션으로는 fluentd등이 있는데, fluentd는 조금 더 실시간 즉 스트리밍 데이타에 초점이 맞춰져 있으며, Embulk는 배치성 분석에 맞춰져 있다.


참고 자료


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Google 앱앤진에 node.js 애플리케이션을 배포하기


조대협 (http://bcho.tistory.com)

PaaS 서비스란?

PaaS란 Platform as a service의 약자로, 간단하게 설명하면, Linux VM등에 직접 node.js나 기반 환경을 설치하고 디스크와 네트워크 구성, 오토스케일링등의 구성이 필요 없이, 개발자가 작성한 코드만 올려주면, 이 모든 것을 클라우드에서 대행해주는 서비스의 형태이다.

인프라 운영을 위한 별도의 작업이 필요하지 않기 때문에, 숫자가 적은 기업이나 개발에만 집중하고 싶은 기업에는 적절한 모델이라고 볼 수 있다.

근래에 스타트업 비지니스가 발달하고 또한 사용하는 기술 스택들이 복잡해짐에 따라 각각의 기술 스택에 대한 설치나 운영에 대한 인력을 두기보다는 PaaS와 같은 매니지드 서비스를 이용해서 애플리케이션을 개발하는  방식이 스타트업을 중심으로 많이 이루어지고 있다.

구글 클라우드 소개

구글 클라우드는 전세계에 걸쳐 유투브와 구글 검색, 이메일등의 서비스를 하는 구글의 인프라의 경험을 구글 외부의 개발자들에게 개방하기 위해서 개발된 클라우드 서비스이다.

글로벌을 대상으로 서비스를 하던 경험을 기반으로 글로벌 그리고 대용량 서비스에 적절한 특징들을 많이 가지고 있는데, 그 중에서 눈여겨 볼만한것이 네트워크와 빅데이타 서비스이다.

네트워크

구글은 자체 서비스를 위해서 전세계에 걸쳐서 많은 네트워크 망을 가지고 있다. 특히 전세계에 걸쳐 70개 이상의 Pop (Point of Presence)라는 서버들을 보유하고 있는데, 구글 클라우드에서 동작하는 애플리케이션은 클라이언트가 직접 그 데이타 센터의 서버로 인터넷을 통해서 접속하는 것이 아니라, 먼저 가장 가까운 Pop 서버로 연결이 된 후, Pop 서버와 구글 클라우드 데이타 센터간은 구글의 전용 광케이블을 이용해서 접속이 되기 때문에 빠른 네트워크 성능을 보장한다.

예를 들어 한국에서 미국의 서버를 접속하게 되면 일반 인터넷망을 타는 것이 아니라, 가까운 일본에 있는 구글 Pop 서버를 접속한 후에, 구글 광케이블망을 통해서 미국 데이타 센터에 접속하게 된다. 얼마전 7월에 일본-미국을 연결하는 20TB의 구글 클라우드 전용망이 오픈되었기 때문에, 훨씬 더 빠른 접속 속도를 보장받을 수 있다.

이런 특징 때문에, 구글 클라우드를 사용할 경우 여러 국가에 서비스를 제공하는 글로벌 서비스의 경우 이러한 망 가속의 잇점을 볼 수 있다.


또한 내부 네트워크 역시, 1 CPU당 2 GB의 대역폭을 제공한다. (실제 테스트해보면 1.6~1.8 GB정도가 나온다) 최대 8 CPU에 16 GB의 대역폭 까지 지원하기 때문에, 내부 노드간 연산이 많거나 또는 노드간 통신이 많은 클러스터링 솔루션을 사용할때 별도의 비용을 지불하지 않고도 넉넉한 네트워크 대역폭 사용이 가능하다.

빅데이타

구글은 사람들에게 알려진바와 같이 데이타를 기반으로 한 서비스에 강하며 특히 빅데이타를 처리하는 능력이 뛰어난 회사이다. 이러한 빅데이타 처리 기술이 클라우드 서비스로 제공되는데, 알파고와 같은 예측 통계학적인 머신러닝이나 딥러닝 이외에도, 분석 통계학적인 데이타 분석 서비스가 많은데 그중에서 특출난 서비스들로 Pub/Sub, Dataflow, BigQuery 등이 있다.

Pub/Sub은 전달 보장이 가능한 대용량 큐 시스템이다. 오픈소스 Kafka와 같은 서비스라고 보면 되고, 이러한 분산큐를 직접 깔아서 운영하기 어려운 사람들에게 쉽게 대용량 큐 서비스를 제공한다.

Dataflow는 스트리밍 및 배치 분석 시스템인데, Spark Streaming이나 Apache Flink와 같은 스트리밍 프레임웍과 유사하다고 보면된다. 윈도우,트리거,워터마크와 같은 스트리밍 서비스에서 발전된 개념을 이미 내장하고 있다.

마지막으로 BigQuery는 대용량 데이타 저장/분석 시스템으로 SQL과 같은 문법으로 데이타 분석이 가능하며, 데이타 저장 비용은 구글에서도 가장 싸다는 데이타 저장소인 CloudStorage 보다 저렴하며, 1000억개, 4TB의 레코드에 대해서 like 검색을 하는데 소요되는 시간이 불과 30여초로, 빠른 성능을 보장한다. (이 30초 동안에 CPU가 수천개, DISK역시 수천개가 사용된다.)


구글 클라우드 앱앤진

오늘 살펴볼 서비스는 구글 클라우드 중에서도 앱앤진이라는 PaaS 서비스이다. 구글은 처음 클라우드 서비스를 시작할때 PaaS 기반의 서비스를 제공하였고, 내부적으로도 PaaS 서비스를 오랫동안 제공해온 만큼, 진보된 형태의 PaaS 플랫폼을 가지고 있다.

그중에서 이번에는 새롭게 앱앤진 Flex environment 라는 새로운 서비스를 소개 하였다.

Flex environment에는 기존의 Java,PHP뿐 아니라, 요즘 스타트업등에서 많이 사용되는 Python, Ruby on rails 그리고 node.js를 지원한다.

그리고 재미있는 점 중의 하나는 일반적인 PaaS가 VM에 대한 로그인 (telnet 이나 SSH)를 지원하지 않는데 반하여, Flex environment 는 SSH 로그인을 제공함으로써, 고급 사용자들에게 많은 기능과 디버깅에 있어서 편의성을 제공한다.

계정 가입

자아 그러면 이제, 구글 앱앤진 Flex environment를 이용하여, node.js 애플리케이션을 배포해보도록 하자.

먼저 GCP 클라우드를 사용하기 위해서는 구글 계정에 가입한다. 기존에 gmail 계정이 있으면 gmail 계정을 사용하면 된다. http://www.google.com/cloud 로 가서, 좌측 상당에 Try it Free 버튼을 눌러서 구글 클라우드에 가입한다.




다음 콘솔에서 상단의 Google Cloud Platform 을 누르면 좌측에 메뉴가 나타나는데, 메뉴 중에서 “결제" 메뉴를 선택한후 결제 계정 추가를 통해서 개인 신용 카드 정보를 등록한다.


개인 신용 카드 정보를 등록해야 모든 서비스를 제한 없이 사용할 수 있다.  단 Trial의 경우 자동으로 한달간 300$의 비용을 사용할 수 있는 크레딧이 자동으로 등록되니, 이 범위를 넘지 않으면 자동으로 결제가 되는 일이 없으니 크게 걱정할 필요는 없다. (사용자가 Plan을 올려야 실제 카드에서 결재가 된다. 그전에는 사용자의 카드에서 결재가 되지 않으니 걱정하지 마시기를)


프로젝트 생성

계정 생성 및 결제 계정 세팅이 끝났으면 프로젝트를 생성한다.

프로젝트는 VM이나 네트워크 자원, SQL등 클라우드 내의 자원을 묶어서 관리하는 하나의 집합이다. 여러 사람이 하나의 클라우드를 사용할때 이렇게 프로젝트를 별도로 만들어서 별도로 과금을 하거나 각 시스템이나 팀별로 프로젝트를 나눠서 정의하면 관리하기가 용이하다.


화면 우측 상단에서 프로젝트 생성 메뉴를  선택하여 프로젝트를 생성한다.



프로젝트 생성 버튼을 누르면 아래와 같이 프로젝트 명을 입력 받는 창이 나온다. 여기에 프로젝트명을 넣으면 된다.


node.js 애플리케이션 준비

클라우드 프로젝트가 준비되었으면, 이제 구글 클라우드에 배포할 node.js 애플리케이션을 준비해보자.

Node.js 애플리케이션은 Express 프레임웍을 이용한 간단한 웹 애플리케이션을 작성해서 배포하도록 한다.

기본적으로 nodejs와 npm이 설치되어 있다고 가정한다. 이 글에서는 node.js 4.4.4 버전과 npm 2.15.1 버전을 기준으로 작성하였다.

Express generator 설치

Express 프로젝트는 Express 프로젝트 고유의 디렉토리 구조와 의존되는 파일들을 가지고 있다. 그래서 프로젝트를 생성하려면 Express generator가 필요하다. Express generator가 Express 프로젝트에 맞는 프로젝트 구조를 생성해주는데, Express generator를 설치하려면 아래 그림과 같이

% npm install express-generator -g

명령을 사용하면 설치가 된다.



프로젝트 생성

Express generator 설치가 끝났으면 이제, express 프로젝트를 생성해보자.

다음 명령어를 이용하여 helloappengine이라는 Express 프로젝트를 생성한다.

% express --session --ejs --css stylus helloappengine




Express 프로젝트 및 Express 프레임웍에 대해서는 http://bcho.tistory.com/887 http://bcho.tistory.com/888 글을 참고하기 바란다.

코드 수정

생성된 코드에서  ~/routes/index.js 파일을 다음과 같이 수정한다.


var express = require('express');

var router = express.Router();


/* GET home page. */

router.get('/', function(req, res, next) {

 res.render('index', { title: 'Hello Appengine' });

 console.log('Hello Appengine');

});


module.exports = router;


의존성 모듈 설치와 node.js 런타임 버전 정의

Express 애플리케이션 개발이 다 끝났다. Express 애플리케이션을 실행하려면, Express 프로젝트에 필요한 모듈들을 설치해야 하는데, 의존 모듈들은 ~/package.json에 이미 자동으로 생성되어 있다.

이 파일을 다음과 같이 수정한다.

“engines”라는 항목에 node.js의 버전을 아래와 같이 명기하는데, 이는 구글 앱앤진이 이 애플리케이션을 수행할때 어느 버전의 node.js를 가지고 수행을 할지를 지정한다.


{

 "name": "helloappengine",

 "version": "0.0.0",

 "private": true,

 "scripts": {

   "start": "node ./bin/www"

 },

 "engines":{

    "node":"4.4.4"

 },

 "dependencies": {

   "body-parser": "~1.15.1",

   "cookie-parser": "~1.4.3",

   "debug": "~2.2.0",

   "ejs": "~2.4.1",

   "express": "~4.13.4",

   "morgan": "~1.7.0",

   "serve-favicon": "~2.3.0",

   "stylus": "0.54.5"

 }

}




이 의존성 모듈 설치는 package.json 파일이 있는 디렉토리에서 아래와 같이

% npm install

명령어만 실행해주면 자동으로 설치된다.


테스트

샘플 애플리케이션 개발 및, 이를 실행하기 위한 환경 설정이 모두 끝났다.

그러면 이제 샘플 애플리케이션을 로컬에서 실행해보자.

실행은 ~/ 디렉토리에서 다음과 같은 명령을 실행하면된다.

% node ./bin/www



실행한 후, 결과를 확인하려면 http://localhost:3000 번으로 접속하면 아래와 같은 결과가 나오는 것을 확인할 수 있다.



구글 앱앤진으로 배포 준비

애플리케이션 개발 및 테스트가 끝났으면 이제 구글 앱앤진으로 이 애플리케이션을 배포해보자.

app.yaml 파일 작성

구글 앱앤진 Flex environment 로 애플리케이션을 배포하기 위해서는 애플리케이션에 대한 기본 정보를 app.yaml에 정의해야 한다. 이 파일은 배포할 node.js 애플리케이션이 있는 ~/ 디렉토리에 위치 시킨다.

runtime은 어떤 언어 (node.js , ruby 등)을 지정하고, VM으로 실행할지를 vm:true 로 정의한다.


runtime: nodejs
vm: true


Google Cloud SDK 설치

다음으로 배포를 하려면, gcloud라는 명령을 사용해야 하는데, 이 명령어는 Google Cloud SDK를 설치해야 사용할 수 있다. Google Cloud SDK는 https://cloud.google.com/sdk/downloads 에서 다운 받아서 설치한다.

Google cloud SDK 설정하기

Google cloud SDK 설치가 끝났으면, 혹시 google cloud SDK가 업데이트 되었을 수 있으니, 다음 명령어를 이용하여 최신 SDK로 업데이트 한다. (알파,베타 버전과 같은 신 기능이 들어가면 업데이트가 된다.)


% gcloud component update


라는 명령어를 수행하면 다음과 같이 업데이트 된 내용이 나오면서 업데이트 를 할 것인지 물어보고 “Y”를 선택하면 자동으로 Google Cloud SDK를 업데이트 한다.



Google Cloud SDK 배포가 끝났으면 gcloud 명령을 이용하기 위해서 초기화 작업을 해야 한다. 초기화 작업은 gcloud 명령을 사용했을때, 내 구글 클라우드 프로젝트 중 어느 프로젝트에 명령을 내릴 것인지, 그리고 어떠한 구글 계정을 사용할것인지 그리고 명령을 내릴때 어떤 리전을 디폴트로 해서 명령을 내릴것인지를 정하는 것이다. 사용자에 따라서 여러개의 프로젝트를 가질 수 도 있고, 또한 관리 목적상 여러개의 클라우드 계정을 가질 수 도 있기 때문이다.

초기화 방법은 다음과 같이 명령어를 실행하면 된다

% gcloud init

이 명령을 사용하면, 앞에서 언급한 바와 같이, gcloud 명령을 내릴 때 사용할 계정과, 디폴트 프로젝트 그리고, 리전을 선택하게 된다.


이제 배포를 위한 모든 준비가 끝났다.

구글 앱앤진으로 배포

이제 배포를 해보자. 배포는 매우매우 쉽다.

앞서 작성한 Express 애플리케이션이 있는 ~/ 디렉토리에 가서 다음 명령어를 수행하면 된다.

% gcloud app deploy

명령을 실행하면 다음과 같이 node.js 애플리케이션이 배포 되는 과정을 볼 수 있다.

배포는 수분이 소요된다. (내부적으로는 Docker 컨테이너를 이용하여 배포가 된다.)

서비스 기동확인

배포가 완료되면 자동으로 서비스가 기동이 된다.

서비스 기동 확인은 http:{프로젝트명}.appspot.com 으로 자동으로 서비스가 뜨게 된다.

해당 URL을 접속해보자. 여기서는 프로젝트명이 “useful-hour-138023”이다.


서비스가 정상적으로 기동 됨을 확인하였다.

모니터링

구글 클라우드 콘솔에 들어가 보면 현재 기동중인 node.js 앱을 확인할 수 있다.

콘솔에서 “App Engine”을 선택하면 다음과 같은 화면이 나온다.



아래 Instances 부분을 보면 Flexible 이라는 이름으로 현재 2개의 인스턴스가 기동되고 있고, 평균 QPS (Query Per Second : 초당 처리량)이 10.898 인것을 확인할 수 있다.


매니지드 서비스이기 때문에, 부하가 늘어나면 자동으로 인스턴스를 추가하고 줄어들면 자동으로 삭제하여 부하에 탄력적으로 대응을 자동으로 해준다.


로그 모니터링은 클라우드 콘솔에서 “Logging”이라는 메뉴를 선택하면 아래와 같은 화면이 나온다.


앱앤진 관련 request log 및 기타 로그들이 다 나오는데, 앞서 샘플코드에서 console.log(“Hello Appengine”) 명령어를 이용하여 Stdout으로 “Hello Appengine”이라는 문자열을 출력하도록 하였기 때문에, 이 문자열이 제대로 출력되었는지를 확인해보자.

App Engine 을 선택하고, 다음 “stdout”을 선택하면 위와 같이 앱앤진의 Stdout으로 출력한 로그들만 볼 수 있는데, 위와 같이 “Hello Appengine” 문자열이 출력된것을 확인할 수 있다.

수정 및 재 배포

그러면 다음으로 애플리케이션을 수정해서 배포해보자. routes/index.js를 다음과 같이 수정해보자


var express = require('express');

var router = express.Router();


/* GET home page. */

router.get('/', function(req, res, next) {

 res.render('index', { title: 'Hello Appengine is updated' });

 console.log('Hello Appengine is updated');

});


module.exports = router;



코드 수정이 끝났으면 배포를 해보자. 재 배포 역시 간단하게

%gcloud app deploy

명령어를 수행하면 간단하게 배포가 된다.

배포가 완료된 후 URL로 들어가서 확인을 해보면


애플리케이션이 업데이트 된것을 확인할 수 있다.

롤백

앱앤진의 장점 중에 하나가 배포도 쉽지만, 롤백도 매우 쉽게 가능하다는 것이다.

클라우드 콘솔에서 App Engine을 선택하고, 아래와 같이 Versions라는 메뉴를 선택하면 아래와 같은 그림을 볼 수 있다.




현재 두개의 버전이 배포되어 있고, 위의 최신 버전에 Traffic Allocation이 100%로 되어 있는 것을 확인할 수 있다. 이는 새로 배포된 버전으로만 트래픽이 100% 가고 있다는 의미인데, 롤백은 트래픽을 예전 버전으로 라우팅 해주고, 새버전의 서비스를 정지 시키면 된다.


아래 그림과 같이 예전 버전을 선택한 후에, 상단 메뉴에서 “Migrate Traffic”을 선택한다.



그러면 아래와 같이 트래픽이 이전 버전으로 100% 가고 있음을 확인할 수 있다.

그리고 나서 새 버전을 선택한 후 상단의 STOP 버튼을 눌러주면 아래 그림과 같이 새버전의 상태가 Serving 에서 Stopped로 변경된것을 확인할 수 있다.




예전 버전으로 롤백이 잘 되었는지를 확인해보자.

아래 그림처럼 예전 버전으로 롤백이 되었음을 확인할 수 있다.



버전별로 트래픽 분산하기

앱앤진의 장점 중의 하나가 여러 버전을 유지할 수 있고, 버전간에 트래픽을 자유롭게 조절할 수 있다. 예를 들어서 v1으로 90%, v2로 10% 의 트래픽을 보내는 것등이 가능하다.


이렇게 트래픽을 조절하는 것은 크게 2가지 방법으로 활용이 가능한데, 서버단의 A/B 테스팅과 카날리 테스팅이다.

A/B 테스팅은 사용자를 두개 이상의 집단으로 나눈후, 기능 A,B 에 대한 반응을 본 후, 반응이 좋은 기능을 선택하는 방식으로, 모바일 클라이언트단에서 많이 사용되고, 서버단에는 구현이 쉽지 않았는데, 트래픽을 버전별로 나눠서 주는 기능을 사용하면 손쉽게 A/B 테스팅을 수행할 수 있다.


다음으로 카날리 테스트는 옛날에 광부들이 광산에 들어갈때 카나리아 새를 데리고 들어가서 유독가스가 나오면 카나리아 새가 먼저 죽는 것을 보고 위험을 감지하는 방법에서 유래된것으로 서버 배포에서는 전체 사용자를 대상으로 새 버전을 배포하는 것이 아니라 1~2%의 사용자에게 배포해보고 문제가 없으면 전체 사용자에게 배포하는 개념으로, 마찬가지로 v2에 1~2%의 트래픽만 할당하고 나머지 98~99%는 v1에 할당하는 방식으로, 해서 v2에 대한 안정성 검증을 한 후에, v2를 전체 배포 하는데 활용할 수 있다.


버전간 트래픽 비율을 지정하는 방법은 클라우드 콘솔의 앱앤진 메뉴에서 Version을 선택하고 상단 메뉴에서 → 가 두개 겹쳐진 아이콘 (맨 우측)

를 선택하면 아래와 같이 Split traffic 이라는 메뉴가 나온다.

여기서 1개 이상의 Version을 추가한 후에, 각 버전으로 보낼 트래픽의 비중 (%)을 정의한다.






위의 예제에서는 xxx4403 버전으로 30%, xxxx3136 버전으로 70%의 트래픽을 보내도록 하였다.

설정을 완료한 후에 SAVE를 누르고, 다시 Version 부분을 보면, 수분 있다가 아래 그림과 같이 xxxx3136 버전에 70% 트래픽이 xxx4403 버전으로 가게 된다.


부가 기능에 대해서

구글 클라우드에서 앱앤진만 쓸 수 있는 몇가지 좋은 서비스 들이 있는데, 다음과 같다.

매니지드 memcached

앱앤진에서는 구글 매니지드 memcached 서비스를 사용할 수 있다 별도의 설정 없이 바로 사용이 가능하며, 최대 100GB 까지 사용이 가능하다. (100GB 이상도 요청하면 사용이 가능) Shared memcached의 경우에는 별도의 비용없이 일반적인 캐쉬로 사용이 가능하지만, 용량이나 성능등이 보장이 되지 않으니 주의가 필요하다.

https://cloud.google.com/appengine/docs/python/memcache/

보안 스캐닝 (Security Scanning)

매우 유용한 기능중의 하나가 보안 스캐닝 기능이다 앱앤진 메뉴에서 Security Scanning이라는 메뉴를 선택하면 앱앤진으로 개발한 웹 사이트에 대한 보안 스캐닝을 할 수 있는데, 한번만 스캐닝할 수 도 있고 자동 스케쥴 방식으로, 주기적으로 스캐닝 하는 것도 가능하다.


보안 정책이 수시로 업데이트 되기 때문에, 정기적으로 보안 스캔을 하는 것을 권장한다.

텍스트 검색 (Search)

node.js에는 지원하지 않지만, Python,Java,PHP,Go (앱앤진 Standard environment)에서만 지원되는 기능중 하나는 구글 검색 엔진과 같은 검색 엔진을 지원한다. 텍스트 검색 HTML 그리고 GEO_POINT (위/경도 기반 검색)이 가능하다.

https://cloud.google.com/appengine/docs/java/search/?hl=en_US

결론

지금까지 간략하게나마 구글 클라우드에 대한 소개, 앱앤진에 대한 개념 및, 간단한 node.js express 애플리케이션을 작성해서 배포해봤다. 다음글은 앱앤진에 대한 조금 더 구체적인 환경에 대해서 알아보도록 하겠다.


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다. 다음 글에서는 이러한

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #1/2


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다. 스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

개념 소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


먼저 dataflow의 개념을 이해해보도록 하자. 아래 그림은 dataflow에 대한 컨셉이다.


데이타가 들어오면, Pipeline IO에서 데이타를 읽어드린다. 읽어드린 데이타는 PCollection이라는 데이타 형으로 생성이 되고, 이 PCollection 데이타는 여러개의 중첩된 PTransform을 통해서 변환 및 가공이 된다. 가공이 끝난 결과는 마지막으로 Pipeline IO의 Output을 통해서 데이타 저장소 (빅쿼리나 파일등)에 저장이 된다.  이 Pipeline IO에서 부터 PTransform을 걸친 일련의 프로세싱 과정을 Pipeline이라고 한다.


예를 들어 설명해보자, 문자열을 입력 받은 후에, 문자열에서 단어를 추출하여, 각 단어의 개수를 세어 주는 파이프라인이 있다고 하자.


첫번째 실행에서 “Hello my daddy”라는 문자열이 입력되었다. 첫번째 Transform인 Extract words Transform을 거치면서, “Hello my daddy” 라는 문자열은 “Hello”, “my”, “daddy” 라는 각각의 단어로 쪼게진다. 다음으로 Count Element 라는 Transform에 의해서, 각 단어의 수를 세어서 저장한다. “Hello”는 1번, “my”는 1번, “daddy”는 1번 의 값이 저장된다.


두번째 실행에서 “Hello my bro” 라는 문자열이 들어오면, Extract words 에 의해서 “Hello”, “my”, “bro”라는 각각의 단어로 쪼게지고, Count Element Transform에서 이전에 세어놓은 단어의 수와 합산하여 계산이 된 결과가 저장이 된다. “Hello”는 이전에 한번 카운트가 되었고 이번에도 들어왔기 때문에, 2가 되고, 같은 원리로 “my”라는 단어의 카운트도 2가된다. “bro” 라는 단어는 이번에 처음 들어왔기 때문에 새 값으로 1로 저장된다.




세번째 “Hello my mom” 이라는 문자열이 들어오면 앞의 두개의 문자열과 마찬가지로 간 단어로 쪼게진 다음 Count Element에 의해서 각 단어의 수가 카운트되어 기존의 값과 누적 합산된다. 모든 데이타를 다 읽어서 처리가 끝나면, 저장된 결과를 Pipeline IO를 통해서 파일에 그 결과를 쓰게 된다.

배치와 스트리밍 처리

dataflow는 위에서 설명한 파이프라인의 개념을 배치와 스트리밍 처리 두가지 개념 모두로 지원해서 처리가 가능하다. 데이타가 파일과 같이 이미 쓰여지고 더 이상 증가나 수정이 되지 않은 데이타에 대해서는 일괄로 데이타를 읽어서 결과를 내는 배치 처리가 가능하고, 계속해서 들어오고 있는 데이타 (트위터 피드, 로그 데이타)는 스트리밍으로 처리가 가능하다.

윈도우의 개념

배치 처리야, 데이타 처리가 모두 끝난 후에 결과를 내보낸다고 하지만, 그렇다면 스트리밍 데이타는 계속해서 데이타가 들어오고 있는데, 언제 결과를 내보내야 할까?

개별 데이타를 변환해서 저장하는 경우에야, 개별 데이타 처리가 끝난후에 각각 하나씩 저장한다고 하지만, 위와 같이 들어오는 데이타에서 특정데이타 들에 대한 합이나 평균과 같은 처리를 하는 경우 어느 기간 단위로 해야 할까? 스트리밍 처리에서는 이러한 개념을 다루기 위해서 윈도우라는 개념을 사용한다.


예를 들어, “1시~1시10분까지 들어온 문자열에 대해서 문자열에 들어 있는 각 단어의 수를 카운트해서 출력해주는 기능" 이나, 또는 “매 5분 단위로 현재 시간에서 10분전까지 들어온 문자열에 대해서 각 단어의 수를 카운트 해서 출력 해주는 기능" 과 같이 작은 시간 기간의 단위를 가지고 그 기간 단위로 계산 하는 방법이며, 이 시간 단위를 윈도우(Window)라고 한다.


Fixed Window (고정 크기 윈도우)

앞의 예에서 1시~1시10분, 1시10분~1시20분 과 같이 고정된 크기를 가지는 윈도우의 개념을 Fixed Window라고 한다.


Sliding Window (슬라이딩 윈도우)

앞의 예에서와 같이 윈도우가 상대적인 시간 (이전 10분까지)의 개념을 가지면서, 다른 윈도우와 중첩되는 윈도우를 슬라이딩 윈도우라고 한다.


그림과 같이 1시10분의 윈도우는 1시 10분의 10분전인 1시에서 부터, 현재 시간 까지인 1시10분까지 값을 읽어서 처리하고 윈도우가 끝나는 시점인 1:10분에 그 값을 저장한다. 윈도우의 간격은 5분 단위로, 1시 15분에는 1시 15분의 10분전인 1시05분 부터 현재 시간인 1시15분까지 들어온 데이타에 대해서 처리를 하고 그 결과 값을 1시15분에 저장한다.

Session window (세션 윈도우)

다음은 세션 윈도우라는 개념을 가지고 있는데, 이를 이해하기 위해서는 먼저 세션의 개념을 먼저 이해해야 한다.

세션이랑 사용자가 한번 시스템을 사용한 후, 사용이 끝날때 까지의 기간을 정의한다. 스트리밍 시스템에서는 사용자 로그인이나 로그 아웃을 별도의 이벤트로 잡는 것이 아니기 때문에, 데이타가 들어온 후에, 일정 시간 이후에 그 사용자에 대한 데이타가 들어오지 않으면, 세션이 종료 된것으로 판단한다.

일반 적인 웹 프로그램에서 HttpSession과 같은 원리인데, 웹 사이트에 접속한 후, Session time out 시간이 지날때 까지 사용자가 별도의 request를 보내지 않으면 세션을 끊는 것과 같은 원리이다.

아래 그림은 세션 윈도우의 개념을 설명하기 위한 윈도우인데, User A와 User B의 데이타가 들어오고 있다고 하자.


그리고 세션 타임 아웃이 10분으로 정의했다. 즉 같은 사용자에 대해서 데이타가 들어온 후, 10분 내에 추가 데이타가 들어오지 않으면 세션이 종료 된것으로 판단한다.


User A는 1:00 에 첫 데이타가 들어와서1:00~1:10 사이에 두번째 데이타가 들어왔고, 1:10~1:20 사이에 세번째 데이타가 들어온 후, 네번째 데이타는 10분이 지난 후에 들어왔다. 그래서 1:00~1:20 까지가 하나의 세션이 되고, 이것이 User A에 대한 1:00~1:20의 세션 윈도우가 된다. 네번째 데이타 부터는 새로운 윈도우로 처리가 되는데, 1:40~1:50 사이에 다섯번째 데이타가 도착한후, 그 이후로 도착하지 않았기 때문에 이게 두번째 윈도우가 되고, 1:30~1:50의 시간 간격을 가지는 User A의 두번째 윈도우가 된다.

각 윈도우의 값은 User A의 1:00~1:20 윈도우의 값은 (1+1+1)로 3이 되고, 두번째 윈도우인 1:30~1:50 윈도우는 (2.5+1)로 3.5가 된다.


User B는 1:10에 데이타가 들어오고, 10분 후인 1:20까지 데이타가 들어오지 않고 그 이후 1:30 분에 두번째 데이타가 들어왔기 때문에, 1:10~1:10 길이의 첫번째 세션 윈도우가 생성된다. 다음 으로 1:30분에 데이타가 들어왔기 때문에 두번째 세션 윈도우를 생성하고, 2:00까지 계속 데이타가 들어오다가 멈추고 2:10까지 새로운 데이타가 들어오지 않았기 때문에 1:30~2:00 까지 두번째 윈도우로 취급한다.


이 Session Window는 앞서 언급한 Fixed Window나, Sliding Window와는 다르게, User A, User B와 사용자 단위와 같이 어떤 키에 따라서 개별적으로 윈도우를 처리 한다.  즉 Session Window는 User A나 USer B처럼 특정 키에 종속된 윈도우만을 갖는다.


반대로 Fixed Window나 Sliding Window는 키단위의 윈도우가 아니라 그 시간 범위내에 들어 있는 모든 키에 대한 값을 처리한다..

Fixed Window의 경우에는 30분 사이즈를 갖는 윈도우라고 하면 아래 그림과 같이


1:00~1:30 윈도우는 User A의 값 = (1+1+1) 과 User B의 값 1을 합쳐서 총 4가 되고

1:30~2:00 윈도우는 User A값 = (2.5+1)과 User B의 값 = (2+2+2) 를 합쳐서 9.5가 된다.


Sliding Window의 경우에는 길이가 30분이고, 주기가 20분인 Sliding 윈도우라고 할때,


1:00~1:30, 1:20~1:50, 1:40~2:00 3개의 Sliding 윈도우가 생성된다.

1:00~1:30 윈도우는 User A의 값=(1+1+1)과 User B의 값 1을 합산하여 4가 되고

1:20~1:50 윈도우는 User A의 값 = (2.5+1)과 User B의 값 =(2+2)를 합산하여 7.5가 된다.

1:40~2:00 윈도우는 User A의 값 = (2.5+1)과 User B의 값 (2+2)를 합산하여 7.5가 된다.



저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


구글 데이타 스트리밍 데이타 분석 플랫폼 dataflow - #1 소개


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다.  스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


런타임

Apache Beam으로 작성된 코드는 여러개의 런타임에서 동작할 수 있다. 구글 클라우드의 Dataflow 서비스에서 돌릴 수 도 있고, Apache Flink나 Apache Spark 클러스터 위에서도 그 코드를 실행할 수 있으며, 로컬에서는 Direct Pipeline이라는 Runner를 이용해서 실행이 가능하다.


여러 런타임이 있지만 구글 클라우드의 Dataflow 런타임을 사용하면 다음과 같은 장점이 있다.


매니지드 서비스로 설정과 운영이 필요 없다.

스트리밍 처리는 하나의 노드에서 수행되는 것이 아니라, 여러개의 노드에서 동시에 수행이 되기 때문에, 이 환경을 설치하고 유지 보수 하는 것만 해도 많은 노력이 들지만, Dataflow는 클라우드 서비스이기 때문에 별도의 설치나 운영이 필요없고, 작성한 코드를 올려서 실행 하기만 하면 된다.

Apache Spark등을 운영해본 사람들은 알겠지만, Spark 코드를 만드는 것 이외에도, Spark 클러스터를 설치하고 운영 하는 것 자체가 일이기 때문에, 개발에 집중할 시간이 줄어든다.

오토 스케일링을 지원하기 때문에, 필요한 만큼 컴퓨팅 자원을 끌어다가 빠르게 연산을 끝낼 수 있다.

클라우드 컴퓨팅의 장점은 무한한 자원을 이용하여, 워크로드에 따라서 자원을 탄력적으로 배치가 가능한 것인데, Dataflow 역시, 이러한 클라우드의 장점을 이용하여, 들어오는 데이타량이나 처리 부하에 따라서 자동을 오토 스케일링이 가능하다.


그림처럼 오전에 800 QPS (Query per second)의 처리를 하다가 12시경에 부하가 5000 QPS로 늘어나면 그만한 양의 리소스 (컴퓨팅)를 더 투여해서 늘어나는 부하에 따라서 탄력적으로 대응이 가능하다.

리밸런싱(Rebalancing)기능을 이용하여 작업을 골고루 분배가 가능하다.

Spark이나 Hadoop Map & Reduce와 같은 대용량 분산 처리 시스템의 경우 문제가 특정 노드의 연산이 늦게 끝나서 전체 연산이 늦게 끝나는 경우가 많다. 예를 들어 1000개의 데이타를 10개씩 100개의 노드에서 분산하여 처리를 한후 그 결과를 모두 모아서 합치는 연산이 있다고 할때, 1~2개의 노드가 연산이 늦게 끝나더라도 그 결과가 있어야 전체 값을 합칠 수 있기 때문에, 다른 노드의 연산이 끝나도 다른 노드들은 기다려야 하고 전체 연산 시간이 느려 진다.


Dataflow의 경우는 이런 문제를 해결 하기 위해서, 리밸런싱(rebalancing)이라는 메카니즘을 발생하는데, 위의 그림(좌측의 그래프는 각 노드의 연산 시간이다.) 과 같이 특정 노드의 연산이 느려진 경우, 느려진 노드의 데이타를 다른 연산이 끝난 노드로 나눠서 재 배치하여 아래와 같이 전체 연산 시간을 줄일 수 있다.




저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

구글 클라우드의 대용량 분산 큐 서비스인 Pub/Sub 소개 #2


node.js를 통하여 메세지를 보내고 받기

조대협 (http://bcho.tistory.com)


node.js에서 메세지 보내고 받기


이번 글에서는 node.js를 이용하여 실제로 pub/sub에 메세지를 보내고 받도록 해보자


키 파일 준비 하기

Pub/Sub에 접속하기 위해서는 보안 인증을 위해서 키 파일이 필요하다.

키 파일은 구글 클라우드 콘솔에서, API manager 메뉴로 들어가서 Credential 부분에서 Create Credential을 선택하면 아래와 같은 화면이 나온다.

다음으로, 메뉴에서 Service account key를 선택하여 키를 생성한다.


키가 생성이 되면 json 파일로 다운로드가 된다.

여기서는 편의상 키 파일명을 “pubsub-key.json”이라고 하겠다.


메세지 보내기

node.js를 이용해서 메세지를 보내보자. 먼저 코드를 보자


var gcloud = require('gcloud');

var pubsub = gcloud.pubsub({

projectId:'terrycho-sandbox',

keyFilename: '/Users/terrycho/keys/pubsub-key.json'

});


var topic = pubsub.topic('projects/terrycho-sandbox/topics/repository-changes.default');


for(var i=0;i<3;i++){

topic.publish({

data:{

userId:process.argv[2]+i,

name:'terry.cho'

}

},function(err){

if(err != null) console.log("Error :"+err);

});

};


pub/sub을 사용하려면 구글 클라우드 라이브러리인 gcloud 모듈이 필요하다.

명령어 창에서 npm install gcloud를 이용해서, gcloud모듈을 먼저 인스톨 해놓자.

다음으로, gcloud라이브러리에서 pubsub 객체를 만든다. 여기서는 projectId와, keyFilename을 지정한다.

projectId는 사용하고자 하는 본인의 구글 프로젝트 ID를 넣으면 되고 (여기서는 ‘terrycho-sandbox’), 키 파일은 앞에서 준비한 키 JSON 파일의 경로를 설정하면 된다.


pubsub 객체가 생성되었으면, 메세지를 보낼 topic을 가져와야 한다. topic은 다음과 같은 pubsub.topic메서드를 이용해서 불러올 수 있다. 이때, topic의 경로를 아래와 같이 적어준다.

var topic = pubsub.topic('projects/terrycho-sandbox/topics/repository-changes.default');

여기서 사용한 topic은 projects/terrycho-sandbox/topics/repository-changes.default 이다.

topic을 받아왔으면, 이 topic에 실제로 메세지를 publish하면 되는데, topic.publish( {메세지},{error callback 함수}); 형태로 지정하면 된다.


pub/sub은 앞의 글에서 설명한바와 같이, message와, message attribute 두가지로 분리가 되는데,

message는

data :{

// 여기에 메세지 정의

}


형태로 정의해서 전달하고,message attribute는

attributes:{

  key1:’value1’,

  key2:’value2’
}


형태로 전달한다.

실제 사용예를 보면 다음과 같다.

var registerMessage = {
 data: {
   userId: 3,
   name: 'Stephen',
   event: 'new user'
 },
 attributes: {
   key: 'value',
   hello: 'world'
 }
};


위의 보내기 예제에서는 userId와, name 필드 두개만, 메세지로 3번 보내도록 하였다.

data:{

userId:process.argv[2]+i,

name:'terry.cho'

}

메세지 받기

메세지를 전달하였으면 이제 메세지를 읽어보도록 한다. 전체 코드는 다음과 같다.


var gcloud = require('gcloud');

var pubsub = gcloud.pubsub({

projectId:'terrycho-sandbox',

keyFilename: '/Users/terrycho/keys/pubsub-key.json'

});


var topic = pubsub.topic('projects/terrycho-sandbox/topics/repository-changes.default');

var options = {

 reuseExisting: true, // if the subscription is already exist reuse subscription, option is not changed

 interval:10,

 maxInProgress:5,

 autoAck:false

};


topic.subscribe('nodejs-subscription',options,function(err,subscription,apiResponse){

if(err != null){

console.log('subscription creation failed :'+err);

exit(1);

}

console.log('Subscription :'+subscription);

subscription.on('error',function(err){

console.log('error:'+err);

});


subscription.on('message',function(message){

// read message from queue

console.log(message);

// send ack

subscription.ack(message.ackId,function(err,apiResponse){

console.log('info:sent ack');

});

});

});

topic을 생성하는 것까지는 앞의 메세지 보내기 부분의 코드와 같다.

topic으로 부터 메세지를 받기 위해서, subscription에 대한 옵션을 설정한다.


var options = {

 reuseExisting: true,

 interval:10,

 maxInProgress:5,

 autoAck:false

};

reuseExisting은 별도로 subscription을 생성하지 않고, 기존의 subscription을 사용한다. 이 경우에는, 기존 subscription의 옵션이 그대로 적용되며, 새로운 option이 적용되지 않는다.

interval은 10초 단위로 subscription을 polling 하는 것이고, maxInProgress는 한번에 읽어올 수 있는 메세지 수를 정의한다.

autoAck는 메세지를 보낸 후에, 자동으로 ack를 보내는 옵션인데, 여기서는 false로 하였기 때문에, 수동으로 ack를 보내야 한다.


옵션을 정의하였으면 아래와 같이 topic에 대한 subscription에 대해서, 메세지를 subscription 한다.

topic.subscribe('nodejs-subscription',options,function(err,subscription,apiResponse){

‘nodejs-subscription’은 subscription 이름이고, options는 subscription에 대한 옵션 그리고 마지막은 콜백함수 있다.

메세지를 받았을때 처리 방법을 정의해야 하는데, 앞의 콜백 함수에서 전달되는 subscription객체에 “message”라는 이벤트에 대해서 핸들러를 작성하면 메세지를 받을 수 있다.


아래는 핸들러 코드이다.

subscription.on('message',function(message){

// read message from queue

console.log(message);

// send ack

subscription.ack(message.ackId,function(err,apiResponse){

console.log('info:sent ack');

});

});


메세지가 들어오면 console에 메세지를 출력하고, subscription.ack를 이용하여 ack를 보낸다. 이때, 메세지에서 들어오는 message.ackId를 인자로 하여, 그 메세지에 대해서 ack를 보낸다.


다음은 명령어를 실행하여 메세지를 보내는 실행화면이다.


그리고 다음은 메세지를 받는 프로그램을 수행하여 실제로 메세지를 받은 결과 화면이다.