블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


Data Preprocessing in ML Pipeline


본글은 구글 클라우드 블로그에 포스팅한 글을, 재 포스팅 허가를 받은 후 포스팅한 글입니다.

다른 좋은 글들도 많으니 아래 출처 링크를 참고해 주새요

출처 링크


머신러닝 파이프라인에서, 데이터는 모델 학습 및 서빙의 입력에 알맞게 가공되어야 한다. 이를 전처리라고 하는데, 이번 글에서는 전처리에 대한 개념과 이에 대한 구현 옵션등에 대해서 알아보도록 한다.

처리 단계별 데이터 분류

머신러닝에서 데이터 전처리는 모델 학습에 사용되는 데이터 형태로 데이터를 가공하는 과정을 이야기한다.

데이터 전처리는 여러 단계로 이루어지는데, 단계별로 처리된 데이터에 대해서 다음과 같이 명명한다. 

Raw data

초기에 수집된 원본 데이터로 분석이나, 머신러닝 학습 용도로 전혀 전처리가 되지 않은 데이터를 의미한다.

하둡과 같은 데이터 레이크에 저장된 데이터나, 기본적인 처리를 통해서 테이블 구조로 데이터 레이크에 저장된 데이터가 Raw 데이터에 해당한다.

Prepared data

Prepared data는 Data engineering 전처리에 의해서, 학습을 위한 데이터만 추출한 서브셋 데이터를 의미한다. 예를 들어 서울 20대 사용자의 구매 패턴을 머신러닝 모델로 만들고자 할때, 서울 20대 사용자 데이터만 추출한 경우 이 데이터를 Prepared data라고 한다. 단순하게 서브셋만을 추출하는 것이 아니라, 깨끗한 상태의 데이터로 정재된 데이터인데, 정재의 의미는 비어 있는 행이나 열을 삭제한 데이터를 의미한다. 

Engineered feature

이렇게 정제된 데이터는 머신러닝 학습과 서빙에 적절한 형태로 재가공 되어야 하는데 이를 Feature Engineering 이라고 한다. 예를 들어 숫자와 같은 값을 0~1 사이로 맵핑 시키거나 , 카테고리 밸류 예를 들어 남자/여자를 0,1과 같은 값으로 맵핑 시키고, 전체 데이터를 학습,평가용으로 7:3 분할하여 저장하는 것이 이에 해당 한다. 



<그림. 데이터 전처리 단계 및 단계별 생성된 데이터 >

데이터 전처리 기법

그러면, 이 데이터 전처리 과정에서 구체적으로 어떤 기법으로 데이터를 처리할까? 몇가지 대표적인 기법을 정리해보면 다음과 같다. 

  • Data cleansing : 데이터에서 값이 잘못되거나 타입이 맞지 않는 행이나 열을 제거하는 작업을 한다. 

  • Instance selection & partitioning : 데이터를 학습,평가,테스트용 데이터로 나누는 작업을 한다. 단순히 나누는 작업 뿐만 아니라, 데이터를 샘플링 할때, 그 분포를 맞추는 작업을 병행한다. 예를 들어 서울/대구/부산의 선거 투표 데이타가 있을때, 인구 비율이 9:2:3이라고 할때, 전체 인구를 랜덤하게 샘플링해서 데이타를 추출하는 것이 아니라, 서울/대구/부산의 인구 비율에 따라서 서울에서 9, 대구에서 2, 부산에서 3의 비율로 샘플링을 할 수 있다. 이를 stratified partitioning 이라고 한다. 또는 데이터 분포상에서 특정 카테고리의 데이터 비율이 적을때, 이 카테고리에 대해서 샘플의 비율을 높이는 minority classed oversampling 등의 기법을 이 과정에서 사용한다. 

  • Feature tuning : 머신러닝 피처의 품질을 높이기 위해서 0~1값으로 값을 normalization 시키거나, missing value를 제거 하거나, 아웃라이어등을 제거하는 등의 과정을 수행한다.

  • Representation transformation : 피처를 숫자로 맵핑 시키는 작업을 한다. 카레고리컬 피처를 one hot encoding 등을 통해서 숫자로 맵핑하거나, 텍스트를 embedding 을 통해서 숫자로 변환하는 작업등을 수행한다. 

  • Feature extraction : PCA와 같은 차원 감소 기법을 이용하여, 전체 피처의 수를 줄이는 작업을 수행하거나, 피처를 해시값으로 변환하여, 더 효율적인 피쳐를 사용하는 작업을 한다. . 

  • Feature selection : 여러개의 피처(컬럼)중에 머신러닝에 사용할 피처만을 선별한다. 

  • Feature construction : 기존의 피처를 기반으로 polynomial expansion 이나,  feature crossing 등의 기법을 이용하여 새로운 피처를 만들어낸다. 

데이터 전처리 단위

Instance level transformation & Full pass transformation

데이터 전처리를 할때 어떤 단위로 데이터를 전처리 할지에 대한 정의이다. 예를 들어 숫자 데이터의 값을 0~1 사이로 맵핑하고자 하면, 그 데이터의 최소/최대 값을 알아야 0~1사이로 맵핑할 수가 있는데, 최소/최대값을 추출하려면, 전체 데이터에 대한 스캔이 필요하다. 반대로 NULL 값을 0으로 변환하는 작업은 전체 데이터에 대한 스캔이 필요없고 개별 데이터만 변환하면 된다. 앞에 설명한 전체 데이터에 대한 스캔이 필요한 방식을 full pass transformation 이라고 하고, 전체 데이터를 볼 필요 없이 개별 데이터에 대해 변환하는 작업을 instance level transformation이라고 한다. 


Window aggregation

전체 데이터의 볼륨이 클 경우 이를 윈도우 단위로 잘라서 처리할 수 있는 방법이 있는데, 예를 들어 10분 단위로 데이터를 처리해서, 10분 단위로 최소/최대 값을 구하거나 또는 10분 단위로 어떤 값의 평균값을 대표값으로 사용하는 것들이 이에 해당한다. 

일반적으로 입력값은 (entity, timestamp, value) 형태가 되며, 전처리된 출력 값은 다음과 같이. (entity, time_index, aggregated_value_over_time_window) 엔터티(피쳐)에 대해서 윈도우별로 처리된 값을 저장하는 형태가 된다.  보통 이런 window aggregation 방식은 리얼 타임 스트리밍 데이터에서 시간 윈도우 단위로 데이터를 처리하는 경우에 많이 사용이 되며, Apache Beam과 같은 스트리밍 프레임워크를 이용하여 구현한다. 

구글 클라우드에서 데이터 전처리 방식

이러한 데이터 전처리는 다양한 컴포넌트를 이용해서 처리할 수 있는데, 어떤 방식이 있는지 살펴보기 전에 먼저 구글 클라우드 기반의 머신러닝 학습 파이프라인 아키텍처를 살펴보자.  아래는 일반적인 구글 클라우드 기반의 머신러닝 파이프라인 아키텍처이다. 


<그림. 구글 클라우드 플랫폼 기반의 일반적인 머신러닝 학습 파이프라인 아키텍처 >


  1. 원본 데이터는 빅쿼리에 저장된다. (테이블 형태의 데이터가 아닌 이미지나 텍스트등은 클라우드 스토리지(GCS)에 저장된다.)

  2. 저장된 원본 데이터는 Dataflow를 이용해서 머신러닝 학습에 알맞은 형태로 전처리 된다. 학습/평가/테스트 셋으로 나누는 것을 포함해서, 가능하면 텐서플로우 파일형태인 *.tfrecord 형태로 인코딩 된후에, GCS 에 저장된다. 

  3. 텐서플로우등으로 모델을 개발한 후에, trainer-package로 패키징을 하고, AI Platform 트레이닝에 이 모델을 업로드 한다. 업로드된 모델을 앞서 전처리된 데이터를 이용해서 학습이되고, 학습이 된 모델은 GCS에 저장된다. (텐서플로우에서 SavedModel로 저장한다.)

  4. GCS 에 저장된 모델은 AI Plaform 서빙 엔진에 배포되고 REST API를 이용하여 서빙된다.

  5. 클라이언트에서는 이 REST API를 이용하여 학습된 모델에 대한 서빙을 이용한다.

  6. 전체 워크플로우에 대한 파이프라인 관리는 Apache Airflow 매니지드 서비스인 Composer 를 이용한다. 또는 머신러닝에 특화된 파이프라인이기 때문에, AI Platform pipeline을 사용하는 것이 좋다.

Option A: 빅쿼리에서 데이터 전처리

일반적으로 빅쿼리를 이용한 전처리는 다음과 같은 시나리오에 유용하다.

  • Sampling : 데이터에서 랜덤하게 일부 데이터셋만 가지고 오는 용도

  • Filtering : 학습에 필요한 데이터만 WHERE 문을 이용해서 가지고 오는 용도

  • Partitioning : 데이터를 학습/평가/테스트 용도로 나누는 용도

주로 빅쿼리는 Dataflow로 데이터를 인입하기 전체 최초 전처리 용도로 사용이 되는데, 주의할점은 빅쿼리에 전처리 로직이 많을 경우 향후 서빙에서 재 구현이 필요할 수 있다. 무슨 이야기인가 하면, 서빙시에도 입력 데이터에 대한 동일한 전처리가 필요한데, 빅쿼리에서 SQL로 작성한 전처리 로직은 서빙시에는 사용할 수 없기 때문에, 자바나 파이썬으로 전처리 로직을 다시 구현해야 하는 이중작업이 될 수 있다. 물론 서빙이 빅쿼리에 있는 데이터를 사용하는 배치 서빙일 경우 문제가 없지만, 리얼타임으로 단건의 데이터에 대해서 서빙을 하는 경우에는 빅쿼리에서 서빙용 데이터를 전처리할 수 없다. 


그럼에도 불구하고 배치 서빙용인 경우 전처리를 빅쿼리를 이용할 경우 편리하고 특히 Dataflow 에 데이터를 입력하기전에 Full pass transformation 이 필요한 전체 통계 데이터 (예를 들어 평균,분산,최소/최대값)은 SQL을 통해서 쉽게 뽑아낼 수 있는 장점이 있다. 

Option B: Dataflow 에서 데이터 전처리

복잡한 데이터 변환 로직이 있는 경우등에 효율적으로 사용할 수 있는 방식인데, Instance level transformation 뿐만 아니라, full pass transformation, 그리고 window aggregation 타입 모두를 지원할 수 있다.

Dataflow는 Apache Beam 오픈소스 기반의 런타임이지만, 다양한 구현 방식을 지원하고 있다.

  • Apache Beam을 사용하는 방법 : 가장 일반적인 방식으로 Apache Beam Java/Python SDK 을 이용하여 데이터 변환 로직을 구현할 수 있다.  

  • Tensorflow Transformation 을 사용하는 방법 : 텐서플로우의 경우 Tensorflow Transformation (이하 TFT) 이라는 이름으로 데이터 변환 프레임워크를 제공한다. TFT는 Apache Beam 기반으로 동작하는데, 텐서플로우 코드를 기반으로 하기 때문에, 머신러닝 개발자 입장에서는 접근이 상대적으로 쉬운 장점이 있다. 

  • Dataflow SQL을 사용하는 방법 : 앞의 두 방식의 경우에는 Java나 Python 기반의 코딩이 필요한데, 이런 코딩 없이 Window aggregation이나, 기타 복잡한 로직을 구현하고자 할때 사용할 수 있는 방식이 Dataflow SQL이다.SQL을 사용하여 구현하지만, Dataflow의 함수등을 사용할 수 있는 장점이 있다. 

  • Dataflow Template + UDF를 사용 하는 방법 : 복잡한 변환이 아니라 단순한 맵핑이나 문자열 변환들을 어렵지 않게 구현하는 방식으로 Dataflow는 Pre-built in 된 Template을 제공한다. 이 템플릿 중에는 비즈니스 로직을 자바스크립트로 넣을 수 있는 UDF 라는 방식을 지원하는데, Apache Beam 형태로 구현할 필요 없이 단순한 변환 로직을 자바스크립트로 구현하여 GCS에 파일을 저장하고, 설정 정보에서 자바 스크립트 파일만 지정하면되기 때문에, 쉽게 사용할 수 있다. 


서빙시에도 다양한 아키텍처 구현이 가능한데, Pub/Sub 큐를 통해서 데이터를 실시간으로 인입한 데이터를 머신러닝 모델로 서빙한후에, Pub/Sub으로 내보내는 near realtime 서빙이 가능하고 또는 bigtable에 서빙 결과를 저장하여 마치 serving 결과에 대한 캐쉬식으로 사용하는 구조도 가능하다.




<그림. 스트림 데이터를 이용하여 서빙을 제공하는 아키텍처>

Option C: Tensorflow 모델 내에서 데이터 전처리

아니면 데이터 전처리를 Tensorflow 모델 코드내에서 하는 방식이 있다.

  • feature_column 를 이용하여 피처를 임베딩하거나, 버킷화 하는 방식이 있고

  • 아니면 데이터를 피딩하는  input functions(train_input_fn, eval_input_fn, and serving_input_fn) 안에 데이터 전처리 로직을 구현하는 방법이 있다. 

  • Custom estimator를 사용하는 경우에는 model_fn 자체에 데이터 전처리 로직을 넣을 수 있다. 

이렇게 텐서 플로우 코드단에 전처리 기능을 넣는 경우는 Instance level transformation은 가능하지만 다른 방식에 대해서는 불가능하다. 그렇지만 이미지 데이터를 학습전에 rotation하거나 flip 하는 argumentation 등은 텐서플로우 코드에서 하게 되면 동적으로 데이터를 학습 단계에 argumentation할 수 있기 때문에 효율이 좋은 장점이 있다. 

Option D: DataPrep을 이용한 데이터  전처리

구글 클라우드 플랫폼에서는 데이터의 특성을 분석하고 간단한 변환을 지원하기 위한 wrangling 도구로 DataPrep을 제공한다. Engineered feature 단계까지 데이터를 가공하는 것은 어려울 수 있겠지만, Raw data를 Prepared data 형태로 cleansing 하는 용도로는 충분히 사용할 수 있으며, 특히 시각화를 통한 데이터 분포나 아웃라이어 분석이나 단순 변환등에는 효과적으로 사용할 수 있다.


<그림 DataPrep 을 이용한 Wrangling 과정 예시> 

Option E: DataProc을 이용한 데이터 전처리

DataProc은 Hadoop/Spark 에 대한 구글 매니지드 서비스이다. Apache Beam을 사용하는 Dataflow와 같이 코딩을 기반으로 한다는 점은 같지만, 기존에 Hadoop/Spark 에코 시스템에 익숙한 사용자들의 경우에는 기존의 에코 시스템과 개발 코드를 재활용할 수 있다는 장점을 가지고 있다. 

데이터 전처리시 고려할점

그러면 이러한 기술을 이용해서 데이터를 전처리할때, 고려해야하는 점은 무엇이 있을까?

학습/서빙 데이터에 대한 스큐(skew)

모델을 학습하여, 서비스에 배포한후에, 향후 들어오는 데이터로 서빙을 하게 되는데, 이때 학습에서 사용한 데이터와 서빙시 사용한 데이터의 특성이 다를때 이를 training-serving skew 라고 한다. 

예를 들어 피처 A가 학습시에 범위가 1~255 였는데, 서빙시에 1~500 사이로 들어오게 되면 이 모델의 서빙 결과는 정확하지 못하게 된다.

(참고 : 이런 문제를 해결하기 위해서 데이터의 분포나, 수학적 통계값을 저장해 놓은 후에, 서빙전에 검증하는 방식을 사용할 수 있으며 이는 Tensorflow data validation으로 구현이 가능하다. )

Full pass transformation

Option C의 텐서플로우 모델내의 데이터 변환 로직은 Full pass transformation을 지원하지 않기 때문에, feature scaling이나, normalization 적용이 불가능하다. 이러한 전처리 기법은 최소/최대값등의 통계 데이터가 필요한데, 이러한 데이터는 모델 학습전에 계산되어야 하고, 계산된 데이터는 어디에든 저장되어 있어야 하며, 학습과/서빙 단계에 모두 일관되게 사용될 수 있어야 한다. 

성능 향상을 위한 Up front data loading 

Option C 텐서플로우 모델내에 데이터 변환 로직을 구현할때, 고려해야 하는 사항이다.

모델 코드 상에 데이터 전처리 로직이 있을 경우, 아래 그림과 같이 데이터 변환 작업이 끝나면, 그 데이터로 모델을 학습 시키는 구조가 된다. 


<그림. 데이터 전처리가 모델 학습전에 발생하여, 대기하는 현상>


이 경우에 데이터가 전처리되고 있는 동안에는 학습이 이루어지지 않기 때문에 자원이 낭비되는 문제가 발생하고, 모델의 학습 시간에 전처리 시간까지 포함되기 때문에 전체 학습시간이 상대적으로 오래걸린다. 


Option B의 데이터 플로우를 사용하는 것처럼 미리 여러 학습에 사용될 데이터를 전처리를 해놓거나 아니면 아래 그림과 같이 병렬적으로 데이터 플로우에서 데이터를 전처리하면서 모델은 학습에만 전념하도록 하면, 모델의 전체학습 시간을 줄일 수 있다. 


<그림. 병렬로 데이타 전처리를 해서 모델 학습을 최적화 하는 방식>

이를 up front data loading 이라고 하는데, 텐서플로우에서는 Prefetching, Interleave, Parallel mapping 등을 tf.data.DataSet에서 다양한 방식으로 이를 지원하고 있다. 


Tensorflow Transform

텐서플로우 프레임웍은 이러한 데이터 변환을 위해서 Tensorflow Transform (이하 TFT) 라는 프레임웍을 데이터 전처리 기능을 제공한다. 이 TFT를 구글 클라우드에서 실행하게 되면, Dataflow를 기반으로 실행할 수 있다. (Option B) 

tf.Transform 이라는 패키지로 제공된다. TFT는 instant level transformation 뿐만 아니라, full pass transformation, window aggregation 을 지원하는데, 특히 full pass transformation을 지원하기 위해서 데이터를 변환하기 전에 Analyze 라는 단계를 거치게 된다. 

아래 그림이 TFT가 작동하는 전반적인 구조를 기술한것인데,



Analyze 단계에서는 데이터의 통계적인 특성 (최소,최대,평균 값등)을 추출하고, Transform 단계에서는 이 값을 이용하여, 데이터 변환을 수행한다. 각 단계는 tft_beam.AnalyzeDataset , tft_beam.TransformDataset 로 실행될 수 있으며, 이 두 단계를 tft_beam.AnalyzeAndTransformDataset 로 합쳐서 한번에 실행하는 것도 가능하다. 


  • Analyze 단계 : Analyze 단계에서는 통계적인 값을 Full pass operation 을 통해서 계산해내는 것이외에도, transform_fn을 생성해내는 작업을 한다. transform_fn은 텐서플로우 그래프로, 데이터 변환에 대한 instance level operation 을 계산해낸 통계값을 사용해서 수행한다. 

  • Transform 단계 : 데이터 변환 단계에서는 transform fn을 인입 데이터에 적용하여, instance level로 데이터를 변환하는 작업을 수행한다. 


모델 학습시 데이터에 대한 전처리는 학습 데이터뿐만 아니라, 평가 (Eval) 데이터에도 동일하게 적용이 되어야 하는데, Analyze는 학습데이터에만 적용되서 데이터의 특성을 추출하고, 평가 데이터에는 별도로 Analyze를 수행하지 않고, 학습 데이터에서 추출된 데이터 특성을 그대로 사용한다

TFT pipeline export  

transform_fn으로 구성된 데이터 변환 파이프라인은 내부적으로 텐서 플로우 그래프로 변환이 되는데, 학습된 텐서플로우 모델을 export 하여 SavedModel로 저장할때, 이 transform_fn 그래프가  서빙용 데이터 입력함수인 serving_input_fn에 붙어서 같이 export 된다. 이 말은, 학습에서 사용한 데이터 전처리 로직인 transform_fn이 그대로 서빙단에도 같이 적용된다는 이야기이다. 물론 full-pass transformation에서 계산한 통계값도 상수형태로 저장하게 된다. 그래서 입력값에 대해서 학습과 서빙시 같은 변환 로직을 사용할 수 있게 된다.

데이터 전처리 옵션 정리

앞서 설명한 데이터 변환 전처리 옵션을 Instance level transformation, full pass level transformation, window aggregation 에 따라 정리해보면 다음과 같다. 


Disclaimer

본 글의 작성자는 Google 직원입니다. 그러나 본 글의 내용은 개인의 입장에서 작성된 글이며, Google의 입장을 대변하지 않으며, Google이 본 컨텐츠를 보장하지 않습니다.


References






Instance-level transformation

(stateless transformation)

Full pass during training

instance -level during serving

(stateful transformation)

Real-time (window) aggregations

during training and serving 

(streaming transformation)

배치 서빙

온라인 서빙

배치 서빙

온라인 서빙

배치 서빙

온라인 서빙

BigQuery (SQL)

OK

같은 데이터 변환 로직을 학습과 서빙 단계에 적용 가능

가능은 하지만 권장하지 않음


서빙시에는 BigQuery가 아니라 다른 방식으로 데이터 변환 로직을 구현해야 하기 때문에 결과적으로 학습/서빙 Skew를 유발할 수 있음

가능


BigQuery에서 수학적 통계값(최소/최대)를 계산하여, 이 값을 이용하면 가능하다.

그러나 계산된 값을 별도로 저장해서 학습/서빙시에 사용해야 하기 때문에 구현이 번거롭다.

N/A

가능은 하지만 권장하지 않음


BigQuery의 윈도우 함수등을 이용하여 구현은 가능하지만, 서빙시에는 BigQuery가 아닌 다른 툴로 구현을 해야 하기 때문에 학습/서빙 Skew가 발생할 수 있음

Dataflow (Apache Beam)

OK

서빙시 데이터가 Pub/sub을 통해서 데이터 블로우로 들어오면 가능하지만, 그렇지 않은 경우 학습/서빙 데이터간 Skew가 발생할 수 있음

가능


Dataflow에서 수학적 통계값(최소/최대)를 계산하여, 이 값을 이용하면 가능하다.

그러나 계산된 값을 별도로 저장해서 학습/서빙시에 사용해야 하기 때문에 구현이 번거롭다.

OK


동일한 Apache Beam 기반의 데이터 변환 로직이 학습을 서빙시 적용이 가능함

Dataflow (Apache Beam + TFT)

권장함


학습과 서빙의 Skew를 방지할 수 있고, 학습/서빙전 데이터를 미리 준비할 수 있음

권장함


데이터 변환 로직과, 모델 학습시에 계산된 통계 결과 텐서플로우 그래프 형태로 저장되서, 서빙 모델을 export할시에 같이 저장됨

Tensorflow
(input_fn & serving_input_fn)

가능은 하지만 권장하지 않음


학습과 서빙 효율성을 생각하면, 학습전에 데이터를 변환하는게 좋음

가능은 하지만 권장하지 않음


학습과 서빙 효율성을 생각하면, 학습전에 데이터를 변환하는게 좋음

불가능

불가능


본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요


컨테이너 기반의 워크플로우 솔루션 argo

조대협 (http://bcho.tistory.com)


argo는 컨테이너 워크플로우 솔루션이다.

컨테이너 기반으로 빅데이타 분석, CI/CD, 머신러닝 파이프라인을 만들때 유용하게 사용할 수 있는 오픈 소스 솔루션으로 개념은 다음과 같다.


워크플로우를 정의하되 워크플로우의 각각의 스텝을 컨테이너로 정의한다.

워크플로우 스펙은 YAML로 정의하면, 실행할때 마다 컨테이너를 생성해서, 작업을 수행하는 개념이다.


기존에 아파치 에어플로우 (https://airflow.apache.org/)등 많은 워크 플로우 솔루션이 있지만, 이러한 솔루션은 컴포넌트가 VM/컨테이너에서 이미 준비되서 돌고 있음을 전제로 하고, 각각의 컴포넌트를 흐름에 따라서 호출하는데 목적이 맞춰서 있다면, argo 의 경우는 워크플로우를 시작하면서 컨테이너를 배포하고, 워크플로우 작업이 끝나면 컨테이너가 종료되기 때문에, 실행할때만 컨테이너를 통해서 컴퓨팅 자원을 점유하기 때문에 자원 활용면에서 장점이 있다고 볼 수 있다.


argo 설치는 쿠버네티스 클러스터가 있는 상태라면 https://argoproj.github.io/docs/argo/demo.html 를 통해서 간단하게 설치가 가능하다. 설치와 사용법은 위의 문서링크를 활용하기 바란다.

HelloWorld

간단한 워크플로우 예제를 살펴보자. 워크 플로우를 실행하기 위해서는 워크플로우 스펙을 yaml 파일로 정의해야 한다. 아래는 helloworld 의 간단한 예제이다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: hello-world-

spec:

 entrypoint: whalesay

 templates:

 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [cowsay]

     args: ["hello world"]


워크플로우의 이름은 metadata 부분에 generateName에서 워크플로우 JOB의 이름을 정의할 수 있다. 여기서는 hello-world-로 정의했는데, 작업이 생성될때 마다 hello-world-xxx 이라는 이름으로 작업이 생성된다.

Templates 부분에 사용하고자 하는 컨테이너를 정의한다. 위의 예제에서는 docker/whalesay:latest 이미지로 컨테이너를 생성하도록 하였고, 생성후에는 “cosway”라는 명령어를 “hello world” 라는 인자를 줘서 실행하도록 하였다.

Template 부분에는 여러개의 컨테이너를 조합하여, 어떤 순서로 실행할지를 정의한다. 이 예제에서는 하나의 컨테이너만 실행하도록 정의하였다.

다음에 어느 컨테이너 부터 시작하게 할것인지는 sepc 부분에 정의하고, 시작 부분은 entrypoint라는 구문을 이용해서 정의할 수 있다.이 예제에서는 template에 정의한 whalesay라는 컨테이너부터 실행하도록 한다.

이렇게 생성된 워크플로우 스펙은 “argo submit”이라는 CLI 명령을 이용해서 실행한다.


%argo submit --watch {yaml filename}


워크플로우가 실행되면 각 단계별로 쿠버네티스 Pod 가 생성되고, 생성 결과는 argo logs {pod name}으로 확인할 수 있다.


%argo list

명령을 이용하면 argo 워크플로우의 상태를 확인할 수 있다.



위의 그림과 같이 hello-world는 hello-world-smjxq 라는 작업으로 생성되었다.

Pod 명은 이 {argo 작업이름}-xxx 식으로 명명이 된다.

%kubectl get pod

명령으로 확인해보면 아래 그림과 같이 hello-world-smjxq 라는 이름으로 pod가 생성된것을 확인할 수 있다.


이 pod의 실행 결과를 보기 위해서

%argo logs hello-world-smjxq

명령을 실행하면 된다.


위의 그림과 같이 고래 그림을 결과로 출력한것을 확인할 수 있다.

ArgoUI

워크플로우의 목록과 실행결과는 CLI뿐 아니라 웹 기반의 GUI에서도 확인이 가능하다.

argo ui는 argo라는 이름의 deployment에 생성이 되어 있는데, clusterIP (쿠버네티스 내부 IP)로 생성이 되어 있기 때문에 외부에서 접근이 불가능하다. 포트포워딩 기능을 이용해서 argo deployment의 8001 포트를 로컬 PC로 포워딩해서 접속할 수 있다.


% kubectl -n argo port-forward deployment/argo-ui 8001:8001


다음에 http://localhost:8001을 이용해서 접속해보면 다음과 같이 현재 등록되어 있는 워크플로우 목록을 확인할 수 있다.




이 목록에서 아까 수행한 hello-world-xxx 워크플로우를 확인해보자. 아래 그림과 같이 워크플로우의 구조를 보여준다.



hello-world-xxx 노드를 클릭하면 각 노드의 상세 내용을 볼 수 있다.


그림에서 Summary > Logs 부분을 선택하면 아래 그림과 같이 각 단계별로 실행한 결과 로그를 볼 수 있다.


연속된 작업의 실행

앞에서 간단한 설치 및 사용법에 대해서 알아봤는데, 앞에서 살펴본 예제는 하나의 태스크로 된 워크플로우이다. 워크플로우는 좀더 복잡하게 여러개의 태스크를 순차적으로 실행하거나 또는 병렬로 실행이 된다.


예제 원본 https://argoproj.github.io/docs/argo/examples/README.html


다음 워크플로우 정의를 보자

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: steps-

spec:

 entrypoint: hello-hello-hello


 # This spec contains two templates: hello-hello-hello and whalesay

 templates:

 - name: hello-hello-hello

   # Instead of just running a container

   # This template has a sequence of steps

   steps:

   - - name: hello1            #hello1 is run before the following steps

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello1"

   - - name: hello2a           #double dash => run after previous step

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello2a"

     - name: hello2b           #single dash => run in parallel with previous step

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello2b"


 # This is the same template as from the previous example

 - name: whalesay

   inputs:

     parameters:

     - name: message

   container:

     image: docker/whalesay

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


구조를 살펴보면 다음과 같다.



  • Metadata 부분에 generateName으로 이 워크플로우의 이름을 정의했다. 워크플로우 작업이 실행될때마다 steps-xxx  라는 이름으로 생성이 된다.

  • 워크 플로우는 templates 부분에 spec 부분에 정의하는데, hello1 작업을 수행한 후에, hello2a,hello2b 를 동시에 실행한다. 동시 실행인지 순차 실행인지는 steps의 인덴트(탭으로 띄워쓰기 한부분)을 확인하면 되는데, hello1은 - -name: hello1으로 정의 하였고 다음 단계는 - -name: hello2a와, - name:hello2b로 지정하였다. 잘 보면, hello2b는 “-”가 두개가 아니고 한개로 되어 있고 hello2a와 같은 띄어 쓰기로 되어 있는 것을 볼 수 있다.

  • 마지막으로 이 워크플로우에서 사용되는 컨테이너 이미지를 정의하면 된다.


실행시에 --watch 옵션을 주면, 각 단계별 실행 상태와, 워크플로우 그래프의 구조를 볼 수 있다.


%argo submit --watch helloworld-seq.yaml


위의 그림을 보면 step-h44qf라는 이름으로 작업이 수행되는데, hello1이 먼저 실행되고 다음에 hello2a,hello2b가 동시에 실행된것을 확인할 수 있다.


실행후에 UI에서 실행 내역을 확인해보면 다음과 같이 hello1이 먼저 실행 된 후에,  hello2a,hello2b가 병렬로 동시에 실행된것을 확인할 수 있다.



DAG를 이용한 워크플로우 정의

Yaml 파일 형식이 워크플로우의 실행 순서를 정의할 경우 명시성이 떨어져서 가독성 측면에서 읽기에 불편할 수 있는데, DAG(Directed acyclic graph)를 이용하면 조금 더 명시적으로 워크플로우 정의가 가능하다.



apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: dag-diamond-

spec:

 entrypoint: diamond

 templates:

 - name: echo

   inputs:

     parameters:

     - name: message

   container:

     image: alpine:3.7

     command: [echo, "{{inputs.parameters.message}}"]

 - name: diamond

   dag:

     tasks:

     - name: A

       template: echo

       arguments:

         parameters: [{name: message, value: A}]

     - name: B

       dependencies: [A]

       template: echo

       arguments:

         parameters: [{name: message, value: B}]

     - name: C

       dependencies: [A]

       template: echo

       arguments:

         parameters: [{name: message, value: C}]

     - name: D

       dependencies: [B, C]

       template: echo

       arguments:

         parameters: [{name: message, value: D}]


entrypoint는 diamond dag를 실행하도록 한다.

dag 정의 부분을 보면 맨 앞에 name: A인 task를 실행하도록 하고, 다음 B,C는 A에 의존성을 가지도록 한다.D는 B,C의 의존성을 가지게 해서 실행 순서는 A→ B,C → D형태가 된다.

다음과 같은 순서로 실행이 된다.



입력/출력값 전달

argo의 개념과 워크플로우의 개념을 이해했으면, 워크플로우에서 태스크간의 데이타를 어떻게 전달하는지 살펴보도록 하자.

입력값의 전달

argo에서 변수를 입력값으로 사용하는 방법은 간단하다. 먼저 변수를 정의한 다음에, 정의된 변수를 입력이나 출력으로 사용할지 워크플로우의 태스크에서 정의한후, 그 변수를 사용하면 된다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: hello-world-parameters-

spec:

 # invoke the whalesay template with

 # "hello world" as the argument

 # to the message parameter

 entrypoint: whalesay

 arguments:

   parameters:

   - name: message

     value: hello world


 templates:

 - name: whalesay

   inputs:

     parameters:

     - name: message       #parameter declaration

   container:

     # run cowsay with that message input parameter as args

     image: docker/whalesay

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


위의 코드를 살펴보면 먼저 spec.arguments 부분에서 message라는 변수를 선언 하였고, 그 값은 “hello world”로 초기화를 했다.

그리고 워크플로우의 whalesay 태스크에서 message 변수를 input 변수로 사용하도록 선언하였다. 그 후에, args에서 input.parameters.message를 참조하여 message변수의 값을 도커 컨테이너의 실행 변수로 넘기도록 하였다.


만약의 변수의 값을 CLI에서 바꾸고자 한다면 다음과 같이 argo submit시에 -p 옵션을 주면 된다. argo submit {workflow yaml file name} -p {parameter name}={value}

아래는 message=”hello terry”로 바꿔서 실행한 예이다.

ex) argo submit argument.yaml -p message=”hello terry”

출력값 사용

아래 코드를 보자.

아래 코드는 whalesay 컨테이너의 결과를 print-message 컨테이너로 넘기는 코드이다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: output-parameter-

spec:

 entrypoint: output-parameter

 templates:

 - name: output-parameter

   steps:

   - - name: generate-parameter

       template: whalesay

   - - name: consume-parameter

       template: print-message

       arguments:

         parameters:

         # Pass the hello-param output from the generate-parameter step as the message input to print-message

         - name: message

           value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"


 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [sh, -c]

     args: ["echo -n hello world > /tmp/hello_world.txt"]  #generate the content of hello_world.txt

   outputs:

     parameters:

     - name: hello-param       #name of output parameter

       valueFrom:

         path: /tmp/hello_world.txt    #set the value of hello-param to the contents of this hello-world.txt


 - name: print-message

   inputs:

     parameters:

     - name: message

   container:

     image: docker/whalesay:latest

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


구조를 보면 다음과 같은데


먼제 whalesay 정의 부분을 보면 outputs.parameters에 hello-param이라는 이름으로 output 변수를 정의하였고, output의 내용은 /tmp/hello_world.txt 파일 내용으로 채워진다.


다음 print-message 컨테이너 정의 부분을 보면 input param으로 message라는 변수를 정의하였다.

steps를 보면, print-message를 실행할때, message 변수의 값을 {{steps.generate-parameter.outputs.parameters.hello-param}} 로 정의하여, print-message의 이전 스탭인 generate-parameter의 output param중에 hello-param이라는 변수의 값으로 채우는 것을 볼 수 있다.


이 흐름을 그림으로 도식화 해보면 다음과 같다.


Whalesay 컨테이너에서 /tmp/hello_world.txt 파일 내용을 hello-param이라는 output param으로 전달하고, print-message 컨테이너는 입력값으로 message라는 param을 받는데, 이 값을 앞단계의 hello-param의 값을 받도록 한것이다.

Artifact

워크플로우 태스크에 대한 입/출력값을 parameter로 전달할 수 도 있지만, CI/CD 빌드 파이프라인에서는 소스코드, 빌드 바이너리가 될 수 도 있고, 빅데이타 파이프라인에서는 데이타 파일과 같이 큰 사이즈의 파일이나 데이타가 될 수 있다. 이 경우 parameter 를 이용해서 넘기기에는 부담이 되는데, 이런 요구 사항을 위해서 제공되는 것이 artifact라는 기능이다. 태스크의 결과값을 로컬 스토리지가 아니라, AWS S3나 GCP GCS와 같은 외부 스토리지에 쓸 수 있게 하고, 반대로 태스크에 대한 입력 값을 외부 스토리지에서 읽어올 수 있게 하는 기능이다.


예를 들어 텐서플로우로 학습을 시키는 파이프라인이 있을때, 학습된 모델을 S3나 GCS에 저장하도록 하는 등의 작업을 할 수 있다.


아래 예제를 보면 앞에서 소개한 generate-artifact → consume-artifact 워크플로우에서 parameter로 값을 넘기는 방식이 artifact 방식으로 바뀐것을 확인할 수 있다. 단순하게 parameter 로 선언한 부분을 artifact로만 변경해주었다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: artifact-passing-

spec:

 entrypoint: artifact-example

 templates:

 - name: artifact-example

   steps:

   - - name: generate-artifact

       template: whalesay

   - - name: consume-artifact

       template: print-message

       arguments:

         artifacts:

         # bind message to the hello-art artifact

         # generated by the generate-artifact step

         - name: message

           from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"


 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [sh, -c]

     args: ["cowsay hello world | tee /tmp/hello_world.txt"]

   outputs:

     artifacts:

     # generate hello-art artifact from /tmp/hello_world.txt

     # artifacts can be directories as well as files

     - name: hello-art

       path: /tmp/hello_world.txt


 - name: print-message

   inputs:

     artifacts:

     # unpack the message input artifact

     # and put it at /tmp/message

     - name: message

       path: /tmp/message

   container:

     image: alpine:latest

     command: [sh, -c]

     args: ["cat /tmp/message"]


기타

간단하게 기본 개념만 설명했지만, 이외에도 여러가지 기능을 이용하여 좀 더 복잡한 워크플로우를 구현할 수 있다. 예를 들어서 Condition 기능을 이용해서, 조건에 따라서 워크플로우를 분기하거나 조건에 따라 재귀호출을 하는 Recursive 호출, 호출중에 조건에 따라 워크플로우를 종료할 수 있는 기능등을 이용할 수 있다.

이외에도 컨테이너 배포시 사이드카(http://bcho.tistory.com/1256) 를 삽입하여 컨테이너 배포시 로그 수집이나 기타 기능등을 동시에 수행하도록 할 수 있다.


본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

데이타 플로우 프로그래밍 모델의 이해


조대협 (http://bcho.tistory.com)


앞의 글에서 스트리밍 프로세스의 개념과, 데이타 플로우의 스트리밍 처리 개념에 대해서 알아보았다. 그렇다면 실제로 이를 데이타 플로우를 이용해서 구현을 하기 위해서는 어떤 컴포넌트와 프로그래밍 모델을 사용하는지에 대해서 알아보자.


구글 데이타 플로우 프로그래밍 모델은 앞에서 설명한 바와 같이, 전체 데이타 파이프라인을 정의하는 Pipeline, 데이타를 저장하는 PCollections, 데이타를 외부 저장소에서 부터 읽거나 쓰는 Pipeline I/O, 그리고, 입력 데이타를 가공해서 출력해주는 Transforms , 총 4가지 컴포넌트로 구성이 되어 있다.


이번 글에서는 그 중에서 데이타를 가공하는  Transforms 컴포넌트들에 대해서 알아본다.

Transforms

Transforms는 데이타를 어떻게 가공하느냐에 따라서 다음 그림과 같이 세가지 형태로 분류된다.

Element-Wise

개별 데이타 엘리먼트를 단위로 연산을 수행하는 것을 Element-Wise Transform이라고 한다.

ParDo 함수가 이에 해당하며, 하나의 데이타를 입력 받아서, 연산을 한 후, 하나의 출력을 내보내는 연산이다.

ParDo 함수는 DoFn으로 정의된 연산 로직을 병렬로 여러개의 프로세스와 쓰레드에서 나눠서 처리를 해준다.


DoFn 클래스 는 다음과 같은 포맷으로 정의한다.


static class MyFunction extends DoFn<{입력 데이타 타입},{출력 데이타 타입}>{

     @Override

     public void processElement(ProcessContext c) {

       // do Something

     }

}


DoFn 클래스를 상속 받아서 클래스를 정의하고, 클래스 정의시 제네릭 타입 (Generic type)으로, 첫번째는 입력 데이타 타입을 두번째는 출력 데이타 타입을 정의한다.

그리고 processElement 함수를 오버라이드(Override)해서, 이 함수안에 연산 로직을 구현한다.

processElement는 ProcessContext를 인자로 받는데, 이 Context에는 입력 데이타를 포함하고 있다.

입력 데이타를 꺼내려면 c.element()라는 메서드를 이용하면, 입력 데이타 타입으로 정의된 입력 데이타를 꺼낼 수 있다.

데이타를 처리한 다음에 파이프라인상의 다음 컴포넌트로 데이타를 보내려면 c.output ({출력 데이타} ) 형태로 정의를 해주면 된다.


이렇게 정의된 DoFn함수는  ParDo를 이용하여 파이프라인 상에서 병렬로 실행될 수 있다. ParDo는, 파이프라인상에서 .apply 메서드를 이용하여 적용한다.


그러면 실제로 어떻게 적용이 되는지 다음 예제를 보자.

   p.apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))


String 인자를 입력으로 받아서, String 인자로 출력을 하는 DoFn 함수를 정의하였다.

processElement 부분에서, c.element()로 String 입력 값을 읽은 후, toUpperCase() 함수를 적용하여 대문자로 변환을 한 후, c.output을 이용하여 다음 파이프라인으로 출력 데이타를 넘겼다.


조금 더 디테일한 예제를 보자

p.apply(Create.of("key1,Hello", "key2,World","key1,hello","key3,boy","key4,hello","key2,girl"))

.apply(ParDo.named("Parse").of(new DoFn<String, KV<String,String>>() {

@Override

public void processElement(ProcessContext c) {

StringTokenizer st = new StringTokenizer(c.element(),",");

String key = st.nextToken();

String value = st.nextToken();


KV<String,String> outputValue =  KV.of(key,value);

c.output(outputValue);

}

}))

Create.of를 이용하여 “Key.Value” 문자열 형태로 데이타를 생성한 후, 이 문자열을 읽어서, DoFn<String,KV<String,String>> 클래스에서 이를 파싱하여, 문자열을 키밸류 데이타형인 KV 데이타 형으로 변환하여 리턴하는 예제이다. 아래 개념도를 보자


입력 값은 String 데이타 타입으로 “Key.Value”라는 형태의 문자열을 받는다.

DoFn에서 처리한 출력 값은 KV 형으로 KV 데이타 타입은 키와 값을 가지고 있는데, 키와 값의 타입도 제네릭 타입으로 키는 String, 값은 String 타입으로 정의하였다. 입력된 “Key.Value” 문자열은 “.” 전후로 분리가 되서, “.” 좌측은 키로, 우측은 값으로 해서, KV에 각각 들어간다.

processElement를 보면, c.element를 이용하여 String 문자열을 꺼낸 후, StringTokenizer를 이용하여 “.”을 분류 문자로 정의해서 파싱한다. 첫번째를 키로 저장하고, 두번째를 값으로 저장한다.

KV<String,String> outputValue =  KV.of(key,value)

를 이용하여, KV 객체를 생성한 후, c.output(outputValue); 을 이용하여 다음 파이프라인 컴포넌트로 값을 전달한다.


시스템내에서 수행되는 방법은 다음과 같이 된다. ParDo에 의해서 DoFn 클래스가 여러개의 워커에 의해서 분산 처리가된다.

어그리게이션(Aggregation)

어그리게이션 값을 특정 키 값을 이용하여 데이타를 그룹핑을 하는 개념이다.

이러한 어그리게이션 작업은 내부적으로 셔플링(Shuffling)이라는 개념을 이용해서 이루어지는데, 키별로 데이타를 모으거나 키별로 합산등의 계산을 하기 위해서는, 키별로 데이타를 모아서 특정 워커 프로세스로 보내야 한다.

ParDo를 이용하여 병렬 처리를 할 경우, 데이타가 키값에 상관 없이 여러 워커에 걸쳐서 분산되서 처리되기 때문에, 이를 재 정렬해야 하는데, 이 재 정렬 작업을 셔플링이라고 한다.


아래 그림을 보자, 파이프라인 상에서 첫번째 프로세스를 Worker 1과 Worker 2가 처리를 하였고, 결과는 Key1과  Key2를 키로 가지는 데이타라고 하자, 이를 어그리게이션 하면 아래 그림과 같이 Key1 데이타는 Worker 3로 모이고, Key 2 데이타는 Worker 4로 모인다. 이런 방식으로 셔플링이 이루어진다.


데이타 플로우의 어그리게이션 에는 특정 키별로 데이타를 묶어주는 Grouping과, 특정 키별로 데이타를 연산(합이나 평균을 내는)하는  Combine 두 가지가 있다.

Grouping

PCollection<KV<String, Integer>> wordsAndLines = ...;

에서 다음과 같이 String,Integer 페어로 KV 타입을 리턴한다고 하자.


 cat, 1
 dog, 5
 and, 1
 jump, 3
 tree, 2
 cat, 5
 dog, 2
 and, 2
 cat, 9
 and, 6
 ...


이를 다음과 같이 키에 따라서 밸류들을 그룹핑 하려면 GroupByKey를 사용해야 한다.


 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(

   GroupByKey.<String, Integer>create());


코드는 앞 단계에서 KV<String,Integer>로 들어온 wordLines 데이타를 GroupByKey를 이용하여 Key 단위로 그룹핑을 한후 이를 <String, Iterable<Integer>> 타입으로 리턴하는 Transformation이다.

<String, Iterable<Integer>>에서 앞의 String은 키가 되며, Iterable 다수의 값을 가지고 있는 밸류가 된다. 이 밸류는 Integer 타입으로 정의된다.

Combine

Grouping이 키 단위로 데이타를 묶어서 분류해주는 기능이라면, Combine은 키단위로 데이타를 묶어서 연산을 해주는 기능이다.

예를 들어 앞의 예제처럼, “cat”이라는 문자열 키로 된 데이타들이 [1,5,9] 가 있을때, 이에 대한 총합이나 평균등을 내는 것이 Combine 이다.


PCollection<Integer> pc = ...;

 PCollection<Integer> sum = pc.apply(

   Combine.globally(new Sum.SumIntegerFn()));


코드는 Integer로 들어오는 모든 값을 Combine에서 Sum 기능을 이용하여 모든 값을 더하는 코드이다.

전체 데이타에 대해서 적용하기 때문에, Combine.globally로 적용하였다.


아래와 같은 형태의 데이타가 있다고 가정하자. 키에 따라서 값이 그룹핑이 된 형태이다.

 cat, [1,5,9]
 dog, [5,2]
 and, [1,2,6]
 jump, [3]
 tree, [2]


PCollection<KV<String, Integer>> occurrences = ...;

 PCollection<KV<String, Iterable<Integer>>> grouped = pc.apply(GroupByKey.create());

 PCollection<KV<String, Integer>> firstOccurrences = pc.apply(

   Combine.groupedValues(new Min.MinIntegerFn()));


위의 데이타들이 PCollection<KV<String, Iterable<Integer>>> grouped

에 저장되었다고 할때, 각 키별로 최소값을 구하는 것을 Combine.groupedValue에서 Min을 호출하여 최소값을 구했다.


Transforms 컴포넌트의 기본적인 종류들을 알아보았다. 이외에도, 하나의 Transform 안에 여러개의 Transform을 집어 넣는 Composite Transform이나, 두개 이상의 데이타 스트림에서 데이타를 키에 따라 JOIN하는 기능들이 있는데, 이러한 고급 기능들은 뒤에 고급 프로그래밍 모델에서 설명하기로 한다.

PCollection

PCollection은 데이타 플로우 파이프라인 내에서 데이타를 저장하는 개념이다.

데이타는 이 PCollection 객체에 저장이되서, 파이프라인을 통해서 Transform으로 넘겨진다.

PCollection은 한번 생성이 되면, 그 데이타를 수정이 불가능하다. (데이타를 변경하거나 수정하기 위해서는 PCollection을 새로 생성해야 한다.)

Bounded & Unbounded PCollection

PCollection은 데이타의 종류에 따라 Bounded PCollection과 Unbounded PCollection 두가지로 나뉘어 진다.

Bounded PCollection

Bounded PCollection은 배치 처리 처럼, 데이타가 변화하지 않는 데이타로 파일이나, 업데이트가 더 이상 발생하지 않는 테이블등을 생각하면 된다.

TextIO,BigQueryIO,DataStoreIO등을 이용해서 데이타를 읽은 경우에는 Bounded PCollection으로 처리가 된다.

Bounded PCollection 데이타들은 배치 처리의 특성상 데이타를 한꺼번에 읽어서 처리한다.  

Unbounded PCollection

Unbounded PCollection은, 데이타가 계속 증가 하는 즉 흐르는 데이타를 표현한다. 트위터 타임 라인이나, 스마트 폰에서 서버로 올라오는 이벤트 로그등을 예로 들 수 있다.

이러한 Unbounded PCollection은 시간의 개념을 가지고 윈도우 처리등을 해야하기 때문에, PCollection 객체내에 타임 스탬프가 자동으로 붙는다.

UnBounded PCollection은 데이타를 BigQueryIO또는 Pub/Sub에서 읽을 때 생성된다.

특히 Unbounded PCollection에 Grouping이나, Combine등을 사용할 경우, 데이타가 파이프라인 상에서 언제 그룹핑된 데이타를 다음 Transform 컴포넌트로 넘겨야할지를 정의해야 하기 때문에, Window를 반드시 사용해야 한다.

데이타 타입

PCollection을 이용해서 정의할 수 있는 주요 데이타 타입은 다음과 같다.

일반 데이타 타입

PCollection<Integer> data

가장 기초적인 데이타 형으로, Integer,Float,String 등 자바의 일반 데이타 타입으로 정의되고 하나의 데이타 만을 저장한다.

KV 데이타 타입

PCollection< KV<String,Integer> key_value_data

키 밸류 데이타 타입으로, 키와 값으로 구성된다. 키와 값은 일반 데이타 타입과 같게, 자바의 일반 데이타 타입 사용이 가능하다.


PCollection<KV<String, Iterable<Integer>>>

키 밸류 데이타 타입중에 값에 여러개의 값을 넣을 수 있는 Iterable 타입이 있다.

앞의 Transform 예제에서 언급된것과 같이 키가 cat이고, 그 값이 2,6,7,9 와 같이 여러 값을 가지는 형이 이러한 타입에 해당한다.

커스텀 데이타 타입

단순한 데이타 타입 말고도, 복잡한 데이타 형을 처리하기 위해서  커스텀 데이타 타입을 지원한다.

커스텀 데이타 타입은 오픈 소스 Avro의 데이타 모델을 사용한다. http://avro.apache.org/


예를 들어 어떤 키에 대해서 카운트값,평균,총합 그리고 윈도우의 시작과 끝 시간을 저장하는 데이타 타입 Stat가 있다고 가정하자. 이 데이타 타입은 다음과 같이 정의된다.

자바에서 일반적인 Value Object 형태로 정의를 하면되고, 단 앞에 어노테이션으로 @DefaultCoder(AvroCoder.class) 와 같이 Avro 데이타 클래스임을 정의하면 된다.


@DefaultCoder(AvroCoder.class)

static class Stat{

Float sum;

Float avg;

Integer count;

Integer key;

Instant wStart; // windowStartTime

Instant wEnd; // windowEndTime

public Instant getwStart() {

return wStart;

}

public Instant getwEnd() {

return wEnd;

}


public Float getSum() {

return sum;

}

public Float getAvg() {

return avg;

}

public Integer getCount() {

return count;

}


public Integer getKey(){

return key;

}


public Stat(){}

public Stat(Integer k,Instant start,Instant end,Integer c,Float s,Float a){

this.key = k;

this.count = c;

this.sum = s;

this.avg = a;

this.wStart = start;

this.wEnd = end;

}


}



윈도우

스트리밍 데이타 처리를 할때 가장 중요한 개념이 윈도우이다.

특히나  Unbounded 데이타를 이용한 스트리밍 처리에서 Grouping이나 Combine 시에는 반드시 윈도우를 사용해야 한다.Grouping이나 Combine과 같은 aggregation을 하지 않으면, Unbounded 데이타라도 윈도우 처리가 필요없다.

또한 Bounded 데이타도, 데이타에 타임 스탬프를 추가하면 윈도우를 사용하여, 시간대별 데이타를 처리할 수 있다.

예를 들어 일일 배치를 돌리는 구매 로그가 있을때, 각 데이타의 구매 시간이 있으면, 이 구매시간을 타임 스탬프로 지정하여 배치라도 윈도우 단위로 연산을 할 수 있다.

고정 윈도우 적용

윈도우를 적용하는 방법은 정말 간단하다.

PCollection 객체에 apply 메소드를 이용하여 Window.into 메서드를 이용하여 적용하면된다.

예를 들어서 아래와 같이 PCollection<String> 형 데이타인 items 객체가 있을때, 여기에 윈도우를 적용하려면 다음과 같이 하면 된다.

 PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)));

items.apply를 이용하여 윈도우를 적용하는데, 데이타 타입이 String이기 때문에, Window.<String>into 로 윈도우를 적용하였다.

윈도우 타입은 Fixed 윈도우이기 때문에, FixedWindows.of를 사용하였고, 윈도우 주기는 1분주기라서 1,TimeUnit.MINUTES를 적용하였다.

슬라이딩  윈도우 적용

슬라이딩 윈도우는 윈도우의 크기 (Duration)과 주기를 인자로 넘겨야 한다.

아래 코드는 5초 주기로 30분 크기의 윈도우를 생성하는 예제이다.

 PCollection<String> items = ...;

 PCollection<String> sliding_windowed_items = items.apply(    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

윈도우가 5초마다 생성이되고, 각 윈도우는 30분 단위의 크기를 가지고 있게 된다.

세션 윈도우 적용

세션 윈도우는 HTTP 세션 처럼 특정 사용자가 일정 시간동안 데이타가 올라오지 않으면, 처음 데이타가 올라온 시간 부터 데이타가 올라오지 않은 시간 까지를 윈도우르 묶어주는 개념이다.

앞의 고정 윈도우나, 세션 윈도우와는 다르게 반드시 키별로 세션을 묶기 때문에 키가 필요하다.


아래 예제는 각 사용자 별로 세션당 점수를 계산해주는 예제이다.

userEvents

 .apply(Window.named("WindowIntoSessions")

       .<KV<String, Integer>>into(

             Sessions.withGapDuration(Duration.standardMinutes(Duration.standardMinutes(10))))

   .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))

 // For this use, we care only about the existence of the session, not any particular

 // information aggregated over it, so the following is an efficient way to do that.

 .apply(Combine.perKey(x -> 0))

 // Get the duration per session.

 .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))



다른 윈도우들과 마찬가지로 Window.into를 이용하여 윈도우를 적용하는데, 데이타 형을 잘 보면 <KV<String, Integer>> 형으로 정의된것을 확인할 수 있다.

Sessions.withGapDuration으로 세션 윈도우를 정의한다. 이때 얼마간의 시간 동안 데이타가 없으면 세션으로 짜를지를 지정해줘야 하는데, Duration.standardMinutes(10) 를 이용하여 10분간 해당 사용자에 대해서 데이타가 없으면 해당 사용자(키)에 대해서 세션 윈도우를 자르도록 하였다.

윈도우 시간 조회하기

윈도우를 사용하다보면, 이 윈도우의 시작과 종료 시간이 필요할때가 있다. 예를 들어 고정 크기 윈도우를 적용한다음에, 이를 데이타 베이스등에 저장할때, 이 윈도우가 언제시작해서 언제끝난 윈도우인지를 조회하여 윈도우 시작 시간과 함께 값을 저장하고자 하는 케이스이다. “1시에 몇건, 1시 10분에 몇건의 데이타가 저장되었다”와 같은 시나리오이다.


현재 윈도우에 대한 정보를 얻으려면 DoFn 클래스를 구현할때, com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess 인터페이스를 implement 해야, 윈도우에 대한 정보를 억세스 할 수 있다.


static class StaticsDoFn extends DoFn<KV<Integer,Iterable<Twit>>, KV<Integer,Stat>>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

:


IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

Instant e = w.end();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTime eTime = e.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss");

String str_stime = sTime.toString(dtf);

String str_etime = eTime.toString(dtf);

                                      :


윈도우에 대한 정보는 ProcessContext c에서, c.window()를 호출하면, IntervalWindow라는 클래스로 윈도우에 대한 정보를 보내주고, 윈도우의 시작 시간과 종료 시간은 IntervalWindow 클래스의 start()와 end() 메서드를 이용해서 조회할 수 있다. 이 조회된 윈도우의 시작과 종료 시간은 org.joda.time.Instant 타입으로 리턴되는데, 조금 더 친숙한 DateTime 포맷으로 변환을 하려면, Instant.toDate() 메서드를 사용하면 되고, 이때, TimeZone 을 지정해주면 로컬 타임으로 변환을 하여, 윈도우의 시작과 종료시간을 조회할 수 있다.

타임 스탬프

윈도우는 시간을 기준으로 처리를 하기 때문에, 이 시간을 정의하는 타임스탬프를 어떻게 다루는지를 이해할 필요가 있다.

타임 스탬프 생성

Pub/Sub을 이용하여 unbounded 데이타를 읽을 경우 타임스탬프가 Pub/Sub에 데이타가 들어간 시간을 Event time으로 하여, PCollection에 추가된다.

타임 스탬프 지정하기

Pub/Sub에서와 같이 자동으로 타임 스템프를 부여하는게 아니라, 모바일 디바이스에서 발생한 이벤트 타임이나, 애플리케이션 적으로 PCollection에 직접 타임스탬프를 부여할 수 있다.

PCollection에 타임 스탬프를 부여 하는 방법은 간단하다.

DoFn내에서,  ProcessContext를 다음 파이프라인 컴포넌트로 보낼때,  c.output(데이타) 대신, c.output(데이타, 타임 스탬프)를 사용하면 된다. 타임 스탬프의 데이타 타입은  org.joda.time.Instant 를 사용한다.

예를 들어서

c.outputWithTimestamp(c.element(), logTimeStamp);

와 같은 방법으로 사용을 한다.


모바일 서비스 분석등, 실제 시간에 근접한 분석을 하려면, 로그를 수집할때, 이벤트 발생 시간을 같이 REST API등을 통해서 수집하고, outputWithTimestamp를 이용하여, 수집된 이벤트 발생시간을  PCollection에 추가하는 방식으로 타임 스탬프를 반영 하는 것이 좋다.







본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

  1. Ahn 2016.09.01 20:21  댓글주소  수정/삭제  댓글쓰기

    좋은 글 잘 보았습니다!