블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


컨테이너 기반의 워크플로우 솔루션 argo

조대협 (http://bcho.tistory.com)


argo는 컨테이너 워크플로우 솔루션이다.

컨테이너 기반으로 빅데이타 분석, CI/CD, 머신러닝 파이프라인을 만들때 유용하게 사용할 수 있는 오픈 소스 솔루션으로 개념은 다음과 같다.


워크플로우를 정의하되 워크플로우의 각각의 스텝을 컨테이너로 정의한다.

워크플로우 스펙은 YAML로 정의하면, 실행할때 마다 컨테이너를 생성해서, 작업을 수행하는 개념이다.


기존에 아파치 에어플로우 (https://airflow.apache.org/)등 많은 워크 플로우 솔루션이 있지만, 이러한 솔루션은 컴포넌트가 VM/컨테이너에서 이미 준비되서 돌고 있음을 전제로 하고, 각각의 컴포넌트를 흐름에 따라서 호출하는데 목적이 맞춰서 있다면, argo 의 경우는 워크플로우를 시작하면서 컨테이너를 배포하고, 워크플로우 작업이 끝나면 컨테이너가 종료되기 때문에, 실행할때만 컨테이너를 통해서 컴퓨팅 자원을 점유하기 때문에 자원 활용면에서 장점이 있다고 볼 수 있다.


argo 설치는 쿠버네티스 클러스터가 있는 상태라면 https://argoproj.github.io/docs/argo/demo.html 를 통해서 간단하게 설치가 가능하다. 설치와 사용법은 위의 문서링크를 활용하기 바란다.

HelloWorld

간단한 워크플로우 예제를 살펴보자. 워크 플로우를 실행하기 위해서는 워크플로우 스펙을 yaml 파일로 정의해야 한다. 아래는 helloworld 의 간단한 예제이다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: hello-world-

spec:

 entrypoint: whalesay

 templates:

 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [cowsay]

     args: ["hello world"]


워크플로우의 이름은 metadata 부분에 generateName에서 워크플로우 JOB의 이름을 정의할 수 있다. 여기서는 hello-world-로 정의했는데, 작업이 생성될때 마다 hello-world-xxx 이라는 이름으로 작업이 생성된다.

Templates 부분에 사용하고자 하는 컨테이너를 정의한다. 위의 예제에서는 docker/whalesay:latest 이미지로 컨테이너를 생성하도록 하였고, 생성후에는 “cosway”라는 명령어를 “hello world” 라는 인자를 줘서 실행하도록 하였다.

Template 부분에는 여러개의 컨테이너를 조합하여, 어떤 순서로 실행할지를 정의한다. 이 예제에서는 하나의 컨테이너만 실행하도록 정의하였다.

다음에 어느 컨테이너 부터 시작하게 할것인지는 sepc 부분에 정의하고, 시작 부분은 entrypoint라는 구문을 이용해서 정의할 수 있다.이 예제에서는 template에 정의한 whalesay라는 컨테이너부터 실행하도록 한다.

이렇게 생성된 워크플로우 스펙은 “argo submit”이라는 CLI 명령을 이용해서 실행한다.


%argo submit --watch {yaml filename}


워크플로우가 실행되면 각 단계별로 쿠버네티스 Pod 가 생성되고, 생성 결과는 argo logs {pod name}으로 확인할 수 있다.


%argo list

명령을 이용하면 argo 워크플로우의 상태를 확인할 수 있다.



위의 그림과 같이 hello-world는 hello-world-smjxq 라는 작업으로 생성되었다.

Pod 명은 이 {argo 작업이름}-xxx 식으로 명명이 된다.

%kubectl get pod

명령으로 확인해보면 아래 그림과 같이 hello-world-smjxq 라는 이름으로 pod가 생성된것을 확인할 수 있다.


이 pod의 실행 결과를 보기 위해서

%argo logs hello-world-smjxq

명령을 실행하면 된다.


위의 그림과 같이 고래 그림을 결과로 출력한것을 확인할 수 있다.

ArgoUI

워크플로우의 목록과 실행결과는 CLI뿐 아니라 웹 기반의 GUI에서도 확인이 가능하다.

argo ui는 argo라는 이름의 deployment에 생성이 되어 있는데, clusterIP (쿠버네티스 내부 IP)로 생성이 되어 있기 때문에 외부에서 접근이 불가능하다. 포트포워딩 기능을 이용해서 argo deployment의 8001 포트를 로컬 PC로 포워딩해서 접속할 수 있다.


% kubectl -n argo port-forward deployment/argo-ui 8001:8001


다음에 http://localhost:8001을 이용해서 접속해보면 다음과 같이 현재 등록되어 있는 워크플로우 목록을 확인할 수 있다.




이 목록에서 아까 수행한 hello-world-xxx 워크플로우를 확인해보자. 아래 그림과 같이 워크플로우의 구조를 보여준다.



hello-world-xxx 노드를 클릭하면 각 노드의 상세 내용을 볼 수 있다.


그림에서 Summary > Logs 부분을 선택하면 아래 그림과 같이 각 단계별로 실행한 결과 로그를 볼 수 있다.


연속된 작업의 실행

앞에서 간단한 설치 및 사용법에 대해서 알아봤는데, 앞에서 살펴본 예제는 하나의 태스크로 된 워크플로우이다. 워크플로우는 좀더 복잡하게 여러개의 태스크를 순차적으로 실행하거나 또는 병렬로 실행이 된다.


예제 원본 https://argoproj.github.io/docs/argo/examples/README.html


다음 워크플로우 정의를 보자

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: steps-

spec:

 entrypoint: hello-hello-hello


 # This spec contains two templates: hello-hello-hello and whalesay

 templates:

 - name: hello-hello-hello

   # Instead of just running a container

   # This template has a sequence of steps

   steps:

   - - name: hello1            #hello1 is run before the following steps

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello1"

   - - name: hello2a           #double dash => run after previous step

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello2a"

     - name: hello2b           #single dash => run in parallel with previous step

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello2b"


 # This is the same template as from the previous example

 - name: whalesay

   inputs:

     parameters:

     - name: message

   container:

     image: docker/whalesay

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


구조를 살펴보면 다음과 같다.



  • Metadata 부분에 generateName으로 이 워크플로우의 이름을 정의했다. 워크플로우 작업이 실행될때마다 steps-xxx  라는 이름으로 생성이 된다.

  • 워크 플로우는 templates 부분에 spec 부분에 정의하는데, hello1 작업을 수행한 후에, hello2a,hello2b 를 동시에 실행한다. 동시 실행인지 순차 실행인지는 steps의 인덴트(탭으로 띄워쓰기 한부분)을 확인하면 되는데, hello1은 - -name: hello1으로 정의 하였고 다음 단계는 - -name: hello2a와, - name:hello2b로 지정하였다. 잘 보면, hello2b는 “-”가 두개가 아니고 한개로 되어 있고 hello2a와 같은 띄어 쓰기로 되어 있는 것을 볼 수 있다.

  • 마지막으로 이 워크플로우에서 사용되는 컨테이너 이미지를 정의하면 된다.


실행시에 --watch 옵션을 주면, 각 단계별 실행 상태와, 워크플로우 그래프의 구조를 볼 수 있다.


%argo submit --watch helloworld-seq.yaml


위의 그림을 보면 step-h44qf라는 이름으로 작업이 수행되는데, hello1이 먼저 실행되고 다음에 hello2a,hello2b가 동시에 실행된것을 확인할 수 있다.


실행후에 UI에서 실행 내역을 확인해보면 다음과 같이 hello1이 먼저 실행 된 후에,  hello2a,hello2b가 병렬로 동시에 실행된것을 확인할 수 있다.



DAG를 이용한 워크플로우 정의

Yaml 파일 형식이 워크플로우의 실행 순서를 정의할 경우 명시성이 떨어져서 가독성 측면에서 읽기에 불편할 수 있는데, DAG(Directed acyclic graph)를 이용하면 조금 더 명시적으로 워크플로우 정의가 가능하다.



apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: dag-diamond-

spec:

 entrypoint: diamond

 templates:

 - name: echo

   inputs:

     parameters:

     - name: message

   container:

     image: alpine:3.7

     command: [echo, "{{inputs.parameters.message}}"]

 - name: diamond

   dag:

     tasks:

     - name: A

       template: echo

       arguments:

         parameters: [{name: message, value: A}]

     - name: B

       dependencies: [A]

       template: echo

       arguments:

         parameters: [{name: message, value: B}]

     - name: C

       dependencies: [A]

       template: echo

       arguments:

         parameters: [{name: message, value: C}]

     - name: D

       dependencies: [B, C]

       template: echo

       arguments:

         parameters: [{name: message, value: D}]


entrypoint는 diamond dag를 실행하도록 한다.

dag 정의 부분을 보면 맨 앞에 name: A인 task를 실행하도록 하고, 다음 B,C는 A에 의존성을 가지도록 한다.D는 B,C의 의존성을 가지게 해서 실행 순서는 A→ B,C → D형태가 된다.

다음과 같은 순서로 실행이 된다.



입력/출력값 전달

argo의 개념과 워크플로우의 개념을 이해했으면, 워크플로우에서 태스크간의 데이타를 어떻게 전달하는지 살펴보도록 하자.

입력값의 전달

argo에서 변수를 입력값으로 사용하는 방법은 간단하다. 먼저 변수를 정의한 다음에, 정의된 변수를 입력이나 출력으로 사용할지 워크플로우의 태스크에서 정의한후, 그 변수를 사용하면 된다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: hello-world-parameters-

spec:

 # invoke the whalesay template with

 # "hello world" as the argument

 # to the message parameter

 entrypoint: whalesay

 arguments:

   parameters:

   - name: message

     value: hello world


 templates:

 - name: whalesay

   inputs:

     parameters:

     - name: message       #parameter declaration

   container:

     # run cowsay with that message input parameter as args

     image: docker/whalesay

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


위의 코드를 살펴보면 먼저 spec.arguments 부분에서 message라는 변수를 선언 하였고, 그 값은 “hello world”로 초기화를 했다.

그리고 워크플로우의 whalesay 태스크에서 message 변수를 input 변수로 사용하도록 선언하였다. 그 후에, args에서 input.parameters.message를 참조하여 message변수의 값을 도커 컨테이너의 실행 변수로 넘기도록 하였다.


만약의 변수의 값을 CLI에서 바꾸고자 한다면 다음과 같이 argo submit시에 -p 옵션을 주면 된다. argo submit {workflow yaml file name} -p {parameter name}={value}

아래는 message=”hello terry”로 바꿔서 실행한 예이다.

ex) argo submit argument.yaml -p message=”hello terry”

출력값 사용

아래 코드를 보자.

아래 코드는 whalesay 컨테이너의 결과를 print-message 컨테이너로 넘기는 코드이다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: output-parameter-

spec:

 entrypoint: output-parameter

 templates:

 - name: output-parameter

   steps:

   - - name: generate-parameter

       template: whalesay

   - - name: consume-parameter

       template: print-message

       arguments:

         parameters:

         # Pass the hello-param output from the generate-parameter step as the message input to print-message

         - name: message

           value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"


 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [sh, -c]

     args: ["echo -n hello world > /tmp/hello_world.txt"]  #generate the content of hello_world.txt

   outputs:

     parameters:

     - name: hello-param       #name of output parameter

       valueFrom:

         path: /tmp/hello_world.txt    #set the value of hello-param to the contents of this hello-world.txt


 - name: print-message

   inputs:

     parameters:

     - name: message

   container:

     image: docker/whalesay:latest

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


구조를 보면 다음과 같은데


먼제 whalesay 정의 부분을 보면 outputs.parameters에 hello-param이라는 이름으로 output 변수를 정의하였고, output의 내용은 /tmp/hello_world.txt 파일 내용으로 채워진다.


다음 print-message 컨테이너 정의 부분을 보면 input param으로 message라는 변수를 정의하였다.

steps를 보면, print-message를 실행할때, message 변수의 값을 {{steps.generate-parameter.outputs.parameters.hello-param}} 로 정의하여, print-message의 이전 스탭인 generate-parameter의 output param중에 hello-param이라는 변수의 값으로 채우는 것을 볼 수 있다.


이 흐름을 그림으로 도식화 해보면 다음과 같다.


Whalesay 컨테이너에서 /tmp/hello_world.txt 파일 내용을 hello-param이라는 output param으로 전달하고, print-message 컨테이너는 입력값으로 message라는 param을 받는데, 이 값을 앞단계의 hello-param의 값을 받도록 한것이다.

Artifact

워크플로우 태스크에 대한 입/출력값을 parameter로 전달할 수 도 있지만, CI/CD 빌드 파이프라인에서는 소스코드, 빌드 바이너리가 될 수 도 있고, 빅데이타 파이프라인에서는 데이타 파일과 같이 큰 사이즈의 파일이나 데이타가 될 수 있다. 이 경우 parameter 를 이용해서 넘기기에는 부담이 되는데, 이런 요구 사항을 위해서 제공되는 것이 artifact라는 기능이다. 태스크의 결과값을 로컬 스토리지가 아니라, AWS S3나 GCP GCS와 같은 외부 스토리지에 쓸 수 있게 하고, 반대로 태스크에 대한 입력 값을 외부 스토리지에서 읽어올 수 있게 하는 기능이다.


예를 들어 텐서플로우로 학습을 시키는 파이프라인이 있을때, 학습된 모델을 S3나 GCS에 저장하도록 하는 등의 작업을 할 수 있다.


아래 예제를 보면 앞에서 소개한 generate-artifact → consume-artifact 워크플로우에서 parameter로 값을 넘기는 방식이 artifact 방식으로 바뀐것을 확인할 수 있다. 단순하게 parameter 로 선언한 부분을 artifact로만 변경해주었다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: artifact-passing-

spec:

 entrypoint: artifact-example

 templates:

 - name: artifact-example

   steps:

   - - name: generate-artifact

       template: whalesay

   - - name: consume-artifact

       template: print-message

       arguments:

         artifacts:

         # bind message to the hello-art artifact

         # generated by the generate-artifact step

         - name: message

           from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"


 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [sh, -c]

     args: ["cowsay hello world | tee /tmp/hello_world.txt"]

   outputs:

     artifacts:

     # generate hello-art artifact from /tmp/hello_world.txt

     # artifacts can be directories as well as files

     - name: hello-art

       path: /tmp/hello_world.txt


 - name: print-message

   inputs:

     artifacts:

     # unpack the message input artifact

     # and put it at /tmp/message

     - name: message

       path: /tmp/message

   container:

     image: alpine:latest

     command: [sh, -c]

     args: ["cat /tmp/message"]


기타

간단하게 기본 개념만 설명했지만, 이외에도 여러가지 기능을 이용하여 좀 더 복잡한 워크플로우를 구현할 수 있다. 예를 들어서 Condition 기능을 이용해서, 조건에 따라서 워크플로우를 분기하거나 조건에 따라 재귀호출을 하는 Recursive 호출, 호출중에 조건에 따라 워크플로우를 종료할 수 있는 기능등을 이용할 수 있다.

이외에도 컨테이너 배포시 사이드카(http://bcho.tistory.com/1256) 를 삽입하여 컨테이너 배포시 로그 수집이나 기타 기능등을 동시에 수행하도록 할 수 있다.


본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

Apache airflow


조대협 (http://bcho.tistory.com)

배경

빅데이타 분석이나, 머신러닝 코드를 만들다 보면 필요한것중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이타 베이스의 ETL 작업과 비슷한 흐름이라고 보면 된다.


예를 들어 머신러닝의 학습 과정을 보면 데이타 전처리,학습,배포,예측과 같은 단계를 가지게 된다.


  • rawdata를 읽어서 preprocessing 단계를 거쳐서 학습에 적절한 training data로 변경하고,

  • 변경된 training data를 가지고 머신러닝 모델을 학습한후, 학습된 모델을 저장한다.

  • 학습된 모델을 가지고 예측을 해서 결과를 저장한다.


이렇게 머신러닝은 여러개의 단계를 거쳐서 수행이 되는데, 각 단계가 끝나면 다음 단계를 수행해야 한다. 단순하게 CRON+쉘로 순차적으로 수행하는 것등이 가능하지만, 에러가 났을때 재처리를 하거나 , 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.

데이타 워크 플로우 관리 도구

이런 요구 사항 때문에 여러가지 툴이 개발되었는데, 대표적인 도구로는 하둡 에코시스템에 우지(oozie ) 등이 있다.



<그림. Oozie eclipse 클라이언트 >


하둡의 여러 에코 시스템 솔루션들을 유기적으로 조합하기 위해서 개발된 도구로, 하둡 에코 시스템에 있는 여러가지 다양한 솔루션과 연동하기 위한 아답터를 가지고 있다.

이외에도 rundeck, luigi와 같은 유사한 솔루션들이 있다.

오늘 소개하고자 하고자하는 데이타 워크 플로우 관리도구는 아파치 오픈소스 airflow 이다. 원래 airbnb에서 개발된 도구로 현재 아파치 오픈소스에서 인큐베이터 단계에 있는 소프트웨어이다.


airflow를 소개하는 이유는 첫번째 파이썬 기반으로 태스크 코드를 작성할 수 있기 때문에, 데이타 분석이나 머신러닝을 개발하는 엔지니어들에게 익숙한 언어이고, 한대에서 동작하는게 아니라 여러 머신에 분산하여 수행 될 수 있는 장점을 가지고 있다.



<그림. Apache airflow 의 작업 그래프 구조 화면 >

airflow 시작하기

그러면 간단하게 airflow에 대한 개념과 사용법에 대해서 알아보자

airflow 설치

airflow는 실행되는 작업의 상태등을 저장하기 위해서 데이타 베이스 (MySQL이나 Postgres)등이 필요하며, 분산 환경을 위해서 여러대에 설치할 수 있다. 또한 로컬 환경에 sqlite와 함께 간단하게 설치할 수 있다. 여기서는 간단하게 개인 맥북환경에 로컬로 설치 및 실행하는 시나리오로 설명한다.


설치 방법은 매우 간단하다. 파이썬 2.7 환경에서 아래와 같이 간단학 “pip install airflow”만 실행해주면 된다.

%pip install airflow



airflow가 설치되었으면 데이타 베이스 설정을 해줘야 하는데, 이 튜토리얼에서는 개발 및 테스트를 위해서 sqlite를 사용한다. sqlite를 초기화 하기 위해서 다음과 같이 “airflow initdb” 명령을 실행한다.

% airflow initdb


자아 이제 설치가 끝나고 airflow를 사용할 준비가 되었다. 이제 airflow 웹콘솔을 기동해보자

“airflow webserver -p 8080” 을 실행하고 웹에 http://localhost:8080에 접근하면 airflow 콘솔을 볼 수 있다.

airflow 코드

airflow에서 워크플로우를 저장하기 위해서 몇가지 추상화된 개념을 사용한다.

Airflow DAG의 구조

DAG (Directed Acyclic Graph)

DAG는 하나의 워크 플로우라고 보면 된다. 위의 예제처럼, 머신러닝 이라는 DAG를 정의한다면, Preprocessing,Training,Prediction 워크플로우가 하나의 DAG가 된다.

Operator and Task

Operator는 DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이야기 하는데, Pre processing, Training, Prediction 함수가 Operatorator 이다.

이 Operator 함수가 DAG 상에서 실제로 워크플로우 상에 정의되서 호출 되면 이것이 Task 이다.

객체지향 언어에서 Operator가 class 라면, Task는 object 라고 보면 된다.


이해가 잘안될 수 있는데, 코드를 보자


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator

from datetime import datetime,timedelta


dag = DAG('hello-airflow',description='Hello airflow DAG',

         schedule_interval = '*/5 0 * * *',

         start_date=datetime(2017,07,01),catchup=False)


def print_hello():

   return 'Hello Airflow'


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


bash_task.set_downstream(python_task)


DAG 정의 부분을 보자. DAG 객체는 DAG에 대한 전체 컨택스를 저장 및 유지 관리한다.

DAG('hello-airflow',description='Hello airflow DAG', 에서 DAG를 이름을 ‘hello-airflow’로 정의하고 description에 설명을 적는다.

schedule_interval = '*/5 * * * *', 다음으로 이 DAG가 실행되는 주기를 정해야 하는데, cron 명령과 같은 노테이션으로 정의한다. 위 설정은 매 5분마다 실행되도록 하는 설정이다.

마지막으로, start_date=datetime(2017,07,01), ,DAG를 언제부터 시작할것인지 지정한다. DAG는 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG를 부르는 sub DAG의 경우에는 지역 변수로 지정이 가능하다.


다음 task에 사용할 operator를 정의하는데, 파이썬 코드를 실행할 오퍼레이터인 PythonOperator와 쉘 커맨드를 실행할 BashOperator를 가지고 각각 파이썬 태스크 python_task와, 쉘 태스크 bash_task를 정의한다.


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


파이썬 태스크의 id는 “python_operator”라고 지정하였고, 실행시 print_hello를 호출하도록 하였다.

그리고 이 태스크는 DAG인 dag에 지정한다.


다음 쉘 태스크의 내용은 다음과 같다.

bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


print_data라는 이름으로 태스크를 정의하고, 쉘 명령어 date를 실행하도록 하였다.

등록

코드 작성이 끝나면 코드를 배포해보자. Dag 파일을 airflow에 등록해야 하는데, dag 파일을 저장하는 장소는 dags_folder 라는 변수로 $AIRFLOW_HOME/airflow.cfg 파일안에 정의 되어 있다. 디폴트 장소는 $AIRFLOW_HOME/dags/ 폴더이다. 위에서 작성한 코드를 해당 디렉토리에 복사하자

다음 dag이 제대로 등록되었는지를 확인한다. 커멘드 창에서 “airflow list_dags”라는 명령을 수행하면 현재 등록되어 있는 DAG 목록을 볼 수 있다. 아래 그림과 같이 hello-airflow dag가 등록된것을 확인할 수 있다.




hello-airflow dag안에 어떤 태스크들이 정의되어 있는지를 확인하려면 ‘airflow list_tasks hello-airflow’ 명령을 이용하면 hello-airflow 안에 등록된 태스크 목록을 출력해준다.


테스트

테스트를 하려면 태스크 단위로 테스트가 가능하다. airflow test {DAG ID} {태스크 ID} {실행날짜} 식으로 하면 된다.

, 예를 들어 print_date 태스크를 2017-07-01을 기준으로 실행하고자하면 airflow test hello-airflow print_date 2017-07-01

Hello-airflow DAG안에 print_date라는 태스크를 실행한다.



실행

DAG 코드 개발 등록과 테스트가 완료되었으면 이제 airflow scheduler 를 띄워준다. (일종의 데몬이다.) 스케쥴러는 DAG 코드에 정의된 스케쥴에 따라서 테스크를 실행해준다.

스케쥴러 실행은 간단하게 airflow scheduler 명령을 실행하면 된다.



스케쥴러가 실행되면, 각 DAG의 스케쥴에 따라서 자동으로 태스크들을 수행한다.


로그 모니터링

스케쥴러에 의해서 실행되는 DAG와 태스크들의 결과와 로그는 어떻게 모니터링 할까? airflow에 의해서 수행되는 태스크들은 $AIRFLOW_HOME/logs 디렉토리에 저장된다.

logs 디렉토리 아래에 각각 DAG 이름으로 저장이 되며, DAG 이름으로 된 디레토리안에는 태스크명으로 된 서브 디렉토리가 있고, 이 서브 디렉토리 아래에 시간대별 로그가 있다.

즉 hello-airflow DAG의 print_date 태스크에 대한 로그는 $AIRFLOW_HOME/logs/hello-airflow/print_date/{날짜및시간} 파일 명으로 저장된다.

웹 콘솔을 이용한 모니터링

airflow의 강력한 기능중의 하나는 웹 기반의 모니터링 콘솔을 제공한다. 뒤에서는 주요 웹 콘솔의 주요 기능에 대해서 알아보도록 한다.

Graph View

Graph View는 DAG의 구조를 그래프 형태로 보여주는 뷰이다.


복잡한 워크플로우의 경우 그 구조를 파악하는데 유용한다. 위의 그림은 앞서 만든 hello-airflow 에 대한 태스크간 그래프로 print_date를 호출한 후에, python_operator 태스크를 호출하는 것을 볼 수 있다.

Tree View


트리뷰를 보면, DAG의 구조를 트리 형태로 보여주고, DAG의 태스크가 각각 성공했는지 실패 했는지를 우측 그래프 처럼 표현해준다. 각 태스크를 로그를 보려면 각 태스크 실행 결과 그래프를 누르면 아래와 같이 세부 메뉴가 나온다.



여기서 View Log를 누르면 각 Task 별로 실행 당시의 로그를 볼 수 있다. 아래는 Python_Operator 태스크를 실행한 로그이다.



아래서 두번째 줄을 보면 Hello Airflow 라는 문자열을 리턴한것을 확인할 수 있다.


Task Duration

Task duration은 DAG에서 수행된 각 태스크의 수행 시간을 그래프 형태로 나타내준다.



어떤 태스크가 시간이 많이 걸리는지 그리고 수행시간이 매번 수행할때 마다 올바른지 (큰 변화가 없고 일정한지. 이건 매우 유용할듯) 등을 체크할 수 있다.

Task Tries


Task Tries 에서는 각 수행별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 즉 재시도 (RETRY)횟수를 모니터링할 수 있다.


Gantt


Gantt 차트는 각 수행에 대해서 태스크들의 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다.

앞의 차트에서 이미 얻을 수 있는 뷰이지만, 각 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉽다.


<그림 airflow gantt chart 그래프 예제 (출처 : https://www.agari.com/airflow-agari/) >


이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 등 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow를 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 수 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 쉽게  정교한 플로우 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.

본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

  1. 이진형 2018.10.22 06:51  댓글주소  수정/삭제  댓글쓰기

    좋을 글 감사합니다. Airflow를 써보려는데큰도움이 됐습니다.