클라우드 컴퓨팅 & NoSQL/google cloud

실시간 데이타 분석 플랫폼 Dataflow - #4 개발환경 설정하기

Terry Cho 2016. 8. 9. 12:40


데이타 플로우 개발환경 설정하기


조대협 (http://bcho.tistory.com)


데이타 플로우에 대한 이해가 끝났으면 이제 직접 코딩을 해보자. 데이타 플로우에 대한 개념등은 http://bcho.tistory.com/search/dataflow 를 참고하기 바란다.

데이타 플로우에서 지원하는 프로그래밍 언어는 자바와 파이썬이다. 파이썬은 아직 알파버전으로, 이 글에서는 자바를 이용해서 설명한다.


자바를 이용한 개발환경 설정은 이클립스 개발환경과 maven을 이용한 개발 환경 두가지가 있는데, 여기서는 조금 더 손 쉬운 이클립스 환경을 기준으로 설명한다.

메이븐 기반의 개발 환경 설정은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven 를 참고하기 바란다.


사전준비

클라우드 계정 생성 및 빌링 설정

구글 클라우드 계정 생성 및 빌링 설정 방법은 앞서 다른글에서도 많이 설명하였기 때문에 다시 설명하지 않는다. 자세한 내용은 http://bcho.tistory.com/1107 를 참고하기 바란다.

API 사용 설정하기

다음 데이타플로우와 기타 같이 사용할 제품들의 API를 사용하기 위해서 이를 설정해줘야 한다.

구글 클라우드 콘솔에서 API Manager를 선택한후 대쉬 보드에서 아래 서비스들을 선택하여 API를 Enable 해준다. Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Datastore APIs.





구글 Cloud SDK 설정

구글 데이타 플로우를 프로그래밍 하기 위해서, 데이타 플로우 API를 호출하기 위한 SDK와 조작을 위한 CLI (Command Line Interface)가 필요한데, 이는 구글 Cloud SDK를 설치하면 같이 설치가 된다.

클라우드 SDK 설치는 https://cloud.google.com/sdk/docs/ 를 참고하면 된다.

gcloud 인증하기

구글 Cloud SDK 설치가 끝났으면, gcloud 명령어를 사용하기 위해서 gcloud 명령어를 초기화 한다.

초기화는 어떤 구글 클라우드 프로젝트를 사용할것인지, 그리고 사용자 아이디등으로 인증을 하는 절차를 거친다.

프롬프트 상에서

%gcloud init

명령을 실행하여, 수행한다.

이클립스 환경 설정

이제 구글 클라우드 프로젝트 설정과, 이를 호출하기 위한 SDK 환경 설치가 끝났다. 이제 이클립스 기반의 개발 환경을 설정해보자.

이클립스 설치하기

이클립스는 4.4 버전 이상을 설치하고, JDK는 1.7 이상을 설정한다.

플러그인 설치하기

다음 구글 데이타 플로우 개발환경을 위한 이클립스 플러그인을 설치한다.

이클립스에서 Help > Install New Software를 선택한 다음에, Work with 텍스트 박스에  https://dl.google.com/dataflow/eclipse/  을 입력한다.


다음으로 Google Cloud Dataflow를 선택하여 설치를 진행한다.

설치가 끝난 후 확인은 이클립스에서 New > Project를 하면, 위자드를 선택하는 화면에서 아래와 같이 Google Cloud Platform이라는 폴더와 함께 그 안에 “Cloud Dataflow Java Project”를 선택할 수 있는 화면이 나온것을 볼 수 있다.



헬로우 데이타 플로우

개발 환경 설정이 끝났으니, 이제 간단한 데이타 플로우 프로그램을 하나 만들어보자.

이 프로그램은 단어들을 읽어드린 후에, 단어들의 발생 횟수를 카운트 해 주는 파이프라인이다.



단어들을 읽어드린 후 toUpper라는 트랜스폼에서, 각 단어들을 대문자로 변환한 후, Count라는 트랜스폼에서 단어별로 발생횔 수를 카운트 한후에, 이를 Key Value (단어:발생횟수)로 리턴한 후, Print라는 트랜스폼에서 화면으로 결과를 출력해주는 예제이다.


프로젝트 생성

예제 파이프라인을 만들기 위해서, 이클립스에서 프로젝트를 생성해보자. New > Project를 선택한 후 에, 아래 그림과 같이 Google Cloud Platform 폴더에서 Cloud Dataflow Java Project를 선택한다



다음 프로젝트에 대해서  Group ID, Artifact ID 그리고 패키지 명등을 입력한다.



다음 메뉴로 넘어가면 구글 데이타 플로우를 실행하기 위한 디테일한 정보를 넣어야 하는데,




프로젝트 명과, “Cloud Storage Staging Location”이라는 정보를 입력해야 한다. Cloud Storage Staging Location은 Google Cloud Storage 의 버킷명으로, 데이타 플로우 애플리케이션 코드가 로딩 되는 장소이다.

데이타플로우 애플리케이션을 구글 클라우드에서 실행하게 되면, 애플리케이션 코드와 애플리케이션을 실행하기 위한 라이브러리들이 각각의 워커 노드로 배포 되는데, 배포를 위해서 먼저 클라이언트에서 부터, 이러한 실행 코드를 Google Cloud Storage에 올려놓게 된다. 앞에서 정의하는 “Cloud Storage Staging Location”은, 이 클라우드 스토리지 버킷에 대한 경로 정의이다.

클라우드 스토리지 버킷은 아래와 같인 Google Cloud Storage 메뉴에서 아래와 같이 생성할 수 있다.


코드 제작

그러면 코드를 작성해 보자.



package com.terry.df;


import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;

import com.google.cloud.dataflow.sdk.values.KV;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


public class StarterPipeline {

 private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);


 public static void main(String[] args) {

   Pipeline p = Pipeline.create(

       PipelineOptionsFactory.fromArgs(args).withValidation().create());


   p.apply(Create.of("Hello", "World","hello","boy","hello","girl"))

   .apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))

   .apply(Count.<String>perElement())

   .apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>(){

@Override

public void processElement(ProcessContext c) throws Exception {

LOG.info(c.element().getKey() + " count:"+c.element().getValue());

}

   }));


   p.run();

 }

}



(참고 : 위의 소스코드는 https://github.com/bwcho75/googledataflow/tree/master/HelloDataFlow 에 있다.)


처음 p.apply(Create.of…)에서, 데이타를 생성하였다.

다음으로 .apply(ParDo.named("toUpper").of(new DoFn<String, String>() 에서 소문자를 대문자로 다 치완하는 데, ParDo는 이 작업을 여러 노드에서 병렬로 실행하겠다는 선언이고, named는 이 트랜스폼의 이름을 “toUpper”로 정의하겠다는 정의이다. (나중에 디버깅에 유용한다.) 다음으로, 트랜스폼 함수는 DoFn으로 정의했는데, <String,String>으로 정의되어 앞의 인자가 Input 그리고 뒤의 인자가 Output의 데이타 형으로 String 인자를 받아서, String 인자로 리턴하겠다는 것이다.


.apply(Count.<String>perElement()) 은 데이타플로우에서 미리 정의된, 트랜스폼으로,  <String>으로 된 데이타를 받아서 엘리먼트당 카운트를 해서 <String,Long> 형으로 리턴을 해준다. 즉 String형의 단어마다 카운트를 한 결과를 Long형으로 넣어서 이를 키밸류(KV)형식으로 묶어서 리턴해준다.

.apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>() 에서는 앞에서 전달해준  String,Long형이 키밸류형으로 정의된 KV<String,Long>형의 데이타를 받아서, 출력해주고, 마지막 트랜스폼이기 때문에 더 이상 뒤로 데이타를 넘기지 않을 것이기 때문에, Output의 인지 타입을 Void로 선언하였다.

실행

코드를 작성이 끝났으면 실제로 실행해보자 Run As에서 Dataflow Pipeline을 선택하면 실행을 할 수 있다.



이때 다음과 같이 실행환경을 설정할 수 있다.



여기서 Runner에 대한 개념을 짚고 넘어갈 필요가 있다.

Direct Pipeline Runner

Direct Pipeline Runner는 데이타플로우 코드를 로컬 개발 환경 (노트북이나 데스크탑)에서 실행하고자 할때 선택할 수 있는 러너이다. 주로 개발이나 테스트에서 사용할 수 있는데, 다른 클라우드 서비스 예를 들어  Pub/Sub이나 빅쿼리등이랑 연동이 되는 파이프라인의 경우에는 DirectPipelineRunner를 사용할 수 없으니 주의하기 바란다.

DataflowPipelineRunner

클라우드 환경에서 데이타 플로우를 실행하기 위해서는 DataflowPipelineRunner와  BlockingDataflowPipelineRunner 두 가지가 있다.

DataflowPipelineRunner는 데이타 플로우 애플리케이션을 구글 클라우드에서 실행을 해주는데, 데이타 플로우 잡을 클라우드에서 실행해놓고, 로컬 애플리케이션을 바로 종료 한다. (클라우드에 접수된 잡은 클라우드에서 계속 실행된다.)

BlockingDataflowPipelineRunner

BlockingDataflowPipelineRunner는 데이타 플로우잡을 구글 클라우드에서 실행해놓게 해놓고, 잡이 끝날때 까지 로컬 애플리케이션을 대기하도록 한다.  

배치와 같이 끝이 있는 경우에는 필요에 따라서 사용할 수 있다. 스트리밍의 경우 BlockingDataflowPipelineRunner를 사용하게 되면 스트리밍 잡을 명시적으로 끊지 않는 이상 계속해서 로컬 애플리케이션이 실행되는 상태가 된다.


DirectPipelineRunner로 실행을 해보면 다음과 같이 이클립스 콘솔에서 결과가 출력되는 것을 볼 수 있다.


BODY는 1,  GIRL 은 1, HELLO는 3개 그리고 WORLD는 1개가 출력되는 것을 볼 수 있다.


이번에는 클라우드에 배포를 하고 실행해보자, Run As에서, BlockingDataflowPipelineRunner를 선택하여 실행해보자.

실행을 하면 코드가 자동으로 클라우드로 배포 되서 실행되는 것을 확인할 수 있다. 구글 클라우드 콘솔의 데이타 플로우 메뉴를 보면, 새로운 잡이 생성된것을 확인할 수 있다.


해당 잡을 선택해서 들어가 보면 현재 잡의 실행 상황과 함께, 파이프라인에서 단계별 시간이나 기타 상세한 로그를 볼 수 있다.



데이타 플로우 애플리케이션이 기동이 완료되면, Logs 메뉴에서 Worker Logs라는 버튼을 누르면 각 워커 노드에서의 로그를 볼 수 있다.


Worker Logs를 누르면 다음과 같이  GIRL,WORLD,BOY,HELLO에 대한 count 수를 출력한 로그를 확인할 수 있다.


참고 : Logs 메뉴로 들어가서  Job Logs에서  Minimum serverity를 “All” 로 선택하면 전체 작업 상황을 알 수 있는데, 애플리케이션을 실행했다고 바로 데이타 플로우의 파이프라인이 실행되는 것이 아니라, 애플리케이션 코드가 구글 클라우드 스토리에 로드되고, 이 로드된 코드들이 각각의 워커 노드에 배포가 된후에, 워커 노드가 기동이 되야 잡이 실제로 수행된다.


워커가 제대로 기동되었는지는 Job Logs에서 Mimimum serverity를 All로 한후에 다음과 같이 “Worker have started successfully”라는 로그가 나오면 그때 부터 데이타 플로우 잡을 실행을 시작한다고 생각하면 된다.