클라우드 컴퓨팅 & NoSQL/도커 & 쿠버네티스

컨테이너 기반 워크플로우 솔루션 Argo

Terry Cho 2018. 12. 10. 20:47


컨테이너 기반의 워크플로우 솔루션 argo

조대협 (http://bcho.tistory.com)


argo는 컨테이너 워크플로우 솔루션이다.

컨테이너 기반으로 빅데이타 분석, CI/CD, 머신러닝 파이프라인을 만들때 유용하게 사용할 수 있는 오픈 소스 솔루션으로 개념은 다음과 같다.


워크플로우를 정의하되 워크플로우의 각각의 스텝을 컨테이너로 정의한다.

워크플로우 스펙은 YAML로 정의하면, 실행할때 마다 컨테이너를 생성해서, 작업을 수행하는 개념이다.


기존에 아파치 에어플로우 (https://airflow.apache.org/)등 많은 워크 플로우 솔루션이 있지만, 이러한 솔루션은 컴포넌트가 VM/컨테이너에서 이미 준비되서 돌고 있음을 전제로 하고, 각각의 컴포넌트를 흐름에 따라서 호출하는데 목적이 맞춰서 있다면, argo 의 경우는 워크플로우를 시작하면서 컨테이너를 배포하고, 워크플로우 작업이 끝나면 컨테이너가 종료되기 때문에, 실행할때만 컨테이너를 통해서 컴퓨팅 자원을 점유하기 때문에 자원 활용면에서 장점이 있다고 볼 수 있다.


argo 설치는 쿠버네티스 클러스터가 있는 상태라면 https://argoproj.github.io/docs/argo/demo.html 를 통해서 간단하게 설치가 가능하다. 설치와 사용법은 위의 문서링크를 활용하기 바란다.

HelloWorld

간단한 워크플로우 예제를 살펴보자. 워크 플로우를 실행하기 위해서는 워크플로우 스펙을 yaml 파일로 정의해야 한다. 아래는 helloworld 의 간단한 예제이다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: hello-world-

spec:

 entrypoint: whalesay

 templates:

 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [cowsay]

     args: ["hello world"]


워크플로우의 이름은 metadata 부분에 generateName에서 워크플로우 JOB의 이름을 정의할 수 있다. 여기서는 hello-world-로 정의했는데, 작업이 생성될때 마다 hello-world-xxx 이라는 이름으로 작업이 생성된다.

Templates 부분에 사용하고자 하는 컨테이너를 정의한다. 위의 예제에서는 docker/whalesay:latest 이미지로 컨테이너를 생성하도록 하였고, 생성후에는 “cosway”라는 명령어를 “hello world” 라는 인자를 줘서 실행하도록 하였다.

Template 부분에는 여러개의 컨테이너를 조합하여, 어떤 순서로 실행할지를 정의한다. 이 예제에서는 하나의 컨테이너만 실행하도록 정의하였다.

다음에 어느 컨테이너 부터 시작하게 할것인지는 sepc 부분에 정의하고, 시작 부분은 entrypoint라는 구문을 이용해서 정의할 수 있다.이 예제에서는 template에 정의한 whalesay라는 컨테이너부터 실행하도록 한다.

이렇게 생성된 워크플로우 스펙은 “argo submit”이라는 CLI 명령을 이용해서 실행한다.


%argo submit --watch {yaml filename}


워크플로우가 실행되면 각 단계별로 쿠버네티스 Pod 가 생성되고, 생성 결과는 argo logs {pod name}으로 확인할 수 있다.


%argo list

명령을 이용하면 argo 워크플로우의 상태를 확인할 수 있다.



위의 그림과 같이 hello-world는 hello-world-smjxq 라는 작업으로 생성되었다.

Pod 명은 이 {argo 작업이름}-xxx 식으로 명명이 된다.

%kubectl get pod

명령으로 확인해보면 아래 그림과 같이 hello-world-smjxq 라는 이름으로 pod가 생성된것을 확인할 수 있다.


이 pod의 실행 결과를 보기 위해서

%argo logs hello-world-smjxq

명령을 실행하면 된다.


위의 그림과 같이 고래 그림을 결과로 출력한것을 확인할 수 있다.

ArgoUI

워크플로우의 목록과 실행결과는 CLI뿐 아니라 웹 기반의 GUI에서도 확인이 가능하다.

argo ui는 argo라는 이름의 deployment에 생성이 되어 있는데, clusterIP (쿠버네티스 내부 IP)로 생성이 되어 있기 때문에 외부에서 접근이 불가능하다. 포트포워딩 기능을 이용해서 argo deployment의 8001 포트를 로컬 PC로 포워딩해서 접속할 수 있다.


% kubectl -n argo port-forward deployment/argo-ui 8001:8001


다음에 http://localhost:8001을 이용해서 접속해보면 다음과 같이 현재 등록되어 있는 워크플로우 목록을 확인할 수 있다.




이 목록에서 아까 수행한 hello-world-xxx 워크플로우를 확인해보자. 아래 그림과 같이 워크플로우의 구조를 보여준다.



hello-world-xxx 노드를 클릭하면 각 노드의 상세 내용을 볼 수 있다.


그림에서 Summary > Logs 부분을 선택하면 아래 그림과 같이 각 단계별로 실행한 결과 로그를 볼 수 있다.


연속된 작업의 실행

앞에서 간단한 설치 및 사용법에 대해서 알아봤는데, 앞에서 살펴본 예제는 하나의 태스크로 된 워크플로우이다. 워크플로우는 좀더 복잡하게 여러개의 태스크를 순차적으로 실행하거나 또는 병렬로 실행이 된다.


예제 원본 https://argoproj.github.io/docs/argo/examples/README.html


다음 워크플로우 정의를 보자

apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: steps-

spec:

 entrypoint: hello-hello-hello


 # This spec contains two templates: hello-hello-hello and whalesay

 templates:

 - name: hello-hello-hello

   # Instead of just running a container

   # This template has a sequence of steps

   steps:

   - - name: hello1            #hello1 is run before the following steps

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello1"

   - - name: hello2a           #double dash => run after previous step

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello2a"

     - name: hello2b           #single dash => run in parallel with previous step

       template: whalesay

       arguments:

         parameters:

         - name: message

           value: "hello2b"


 # This is the same template as from the previous example

 - name: whalesay

   inputs:

     parameters:

     - name: message

   container:

     image: docker/whalesay

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


구조를 살펴보면 다음과 같다.



  • Metadata 부분에 generateName으로 이 워크플로우의 이름을 정의했다. 워크플로우 작업이 실행될때마다 steps-xxx  라는 이름으로 생성이 된다.

  • 워크 플로우는 templates 부분에 spec 부분에 정의하는데, hello1 작업을 수행한 후에, hello2a,hello2b 를 동시에 실행한다. 동시 실행인지 순차 실행인지는 steps의 인덴트(탭으로 띄워쓰기 한부분)을 확인하면 되는데, hello1은 - -name: hello1으로 정의 하였고 다음 단계는 - -name: hello2a와, - name:hello2b로 지정하였다. 잘 보면, hello2b는 “-”가 두개가 아니고 한개로 되어 있고 hello2a와 같은 띄어 쓰기로 되어 있는 것을 볼 수 있다.

  • 마지막으로 이 워크플로우에서 사용되는 컨테이너 이미지를 정의하면 된다.


실행시에 --watch 옵션을 주면, 각 단계별 실행 상태와, 워크플로우 그래프의 구조를 볼 수 있다.


%argo submit --watch helloworld-seq.yaml


위의 그림을 보면 step-h44qf라는 이름으로 작업이 수행되는데, hello1이 먼저 실행되고 다음에 hello2a,hello2b가 동시에 실행된것을 확인할 수 있다.


실행후에 UI에서 실행 내역을 확인해보면 다음과 같이 hello1이 먼저 실행 된 후에,  hello2a,hello2b가 병렬로 동시에 실행된것을 확인할 수 있다.



DAG를 이용한 워크플로우 정의

Yaml 파일 형식이 워크플로우의 실행 순서를 정의할 경우 명시성이 떨어져서 가독성 측면에서 읽기에 불편할 수 있는데, DAG(Directed acyclic graph)를 이용하면 조금 더 명시적으로 워크플로우 정의가 가능하다.



apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: dag-diamond-

spec:

 entrypoint: diamond

 templates:

 - name: echo

   inputs:

     parameters:

     - name: message

   container:

     image: alpine:3.7

     command: [echo, "{{inputs.parameters.message}}"]

 - name: diamond

   dag:

     tasks:

     - name: A

       template: echo

       arguments:

         parameters: [{name: message, value: A}]

     - name: B

       dependencies: [A]

       template: echo

       arguments:

         parameters: [{name: message, value: B}]

     - name: C

       dependencies: [A]

       template: echo

       arguments:

         parameters: [{name: message, value: C}]

     - name: D

       dependencies: [B, C]

       template: echo

       arguments:

         parameters: [{name: message, value: D}]


entrypoint는 diamond dag를 실행하도록 한다.

dag 정의 부분을 보면 맨 앞에 name: A인 task를 실행하도록 하고, 다음 B,C는 A에 의존성을 가지도록 한다.D는 B,C의 의존성을 가지게 해서 실행 순서는 A→ B,C → D형태가 된다.

다음과 같은 순서로 실행이 된다.



입력/출력값 전달

argo의 개념과 워크플로우의 개념을 이해했으면, 워크플로우에서 태스크간의 데이타를 어떻게 전달하는지 살펴보도록 하자.

입력값의 전달

argo에서 변수를 입력값으로 사용하는 방법은 간단하다. 먼저 변수를 정의한 다음에, 정의된 변수를 입력이나 출력으로 사용할지 워크플로우의 태스크에서 정의한후, 그 변수를 사용하면 된다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: hello-world-parameters-

spec:

 # invoke the whalesay template with

 # "hello world" as the argument

 # to the message parameter

 entrypoint: whalesay

 arguments:

   parameters:

   - name: message

     value: hello world


 templates:

 - name: whalesay

   inputs:

     parameters:

     - name: message       #parameter declaration

   container:

     # run cowsay with that message input parameter as args

     image: docker/whalesay

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


위의 코드를 살펴보면 먼저 spec.arguments 부분에서 message라는 변수를 선언 하였고, 그 값은 “hello world”로 초기화를 했다.

그리고 워크플로우의 whalesay 태스크에서 message 변수를 input 변수로 사용하도록 선언하였다. 그 후에, args에서 input.parameters.message를 참조하여 message변수의 값을 도커 컨테이너의 실행 변수로 넘기도록 하였다.


만약의 변수의 값을 CLI에서 바꾸고자 한다면 다음과 같이 argo submit시에 -p 옵션을 주면 된다. argo submit {workflow yaml file name} -p {parameter name}={value}

아래는 message=”hello terry”로 바꿔서 실행한 예이다.

ex) argo submit argument.yaml -p message=”hello terry”

출력값 사용

아래 코드를 보자.

아래 코드는 whalesay 컨테이너의 결과를 print-message 컨테이너로 넘기는 코드이다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: output-parameter-

spec:

 entrypoint: output-parameter

 templates:

 - name: output-parameter

   steps:

   - - name: generate-parameter

       template: whalesay

   - - name: consume-parameter

       template: print-message

       arguments:

         parameters:

         # Pass the hello-param output from the generate-parameter step as the message input to print-message

         - name: message

           value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"


 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [sh, -c]

     args: ["echo -n hello world > /tmp/hello_world.txt"]  #generate the content of hello_world.txt

   outputs:

     parameters:

     - name: hello-param       #name of output parameter

       valueFrom:

         path: /tmp/hello_world.txt    #set the value of hello-param to the contents of this hello-world.txt


 - name: print-message

   inputs:

     parameters:

     - name: message

   container:

     image: docker/whalesay:latest

     command: [cowsay]

     args: ["{{inputs.parameters.message}}"]


구조를 보면 다음과 같은데


먼제 whalesay 정의 부분을 보면 outputs.parameters에 hello-param이라는 이름으로 output 변수를 정의하였고, output의 내용은 /tmp/hello_world.txt 파일 내용으로 채워진다.


다음 print-message 컨테이너 정의 부분을 보면 input param으로 message라는 변수를 정의하였다.

steps를 보면, print-message를 실행할때, message 변수의 값을 {{steps.generate-parameter.outputs.parameters.hello-param}} 로 정의하여, print-message의 이전 스탭인 generate-parameter의 output param중에 hello-param이라는 변수의 값으로 채우는 것을 볼 수 있다.


이 흐름을 그림으로 도식화 해보면 다음과 같다.


Whalesay 컨테이너에서 /tmp/hello_world.txt 파일 내용을 hello-param이라는 output param으로 전달하고, print-message 컨테이너는 입력값으로 message라는 param을 받는데, 이 값을 앞단계의 hello-param의 값을 받도록 한것이다.

Artifact

워크플로우 태스크에 대한 입/출력값을 parameter로 전달할 수 도 있지만, CI/CD 빌드 파이프라인에서는 소스코드, 빌드 바이너리가 될 수 도 있고, 빅데이타 파이프라인에서는 데이타 파일과 같이 큰 사이즈의 파일이나 데이타가 될 수 있다. 이 경우 parameter 를 이용해서 넘기기에는 부담이 되는데, 이런 요구 사항을 위해서 제공되는 것이 artifact라는 기능이다. 태스크의 결과값을 로컬 스토리지가 아니라, AWS S3나 GCP GCS와 같은 외부 스토리지에 쓸 수 있게 하고, 반대로 태스크에 대한 입력 값을 외부 스토리지에서 읽어올 수 있게 하는 기능이다.


예를 들어 텐서플로우로 학습을 시키는 파이프라인이 있을때, 학습된 모델을 S3나 GCS에 저장하도록 하는 등의 작업을 할 수 있다.


아래 예제를 보면 앞에서 소개한 generate-artifact → consume-artifact 워크플로우에서 parameter로 값을 넘기는 방식이 artifact 방식으로 바뀐것을 확인할 수 있다. 단순하게 parameter 로 선언한 부분을 artifact로만 변경해주었다.


apiVersion: argoproj.io/v1alpha1

kind: Workflow

metadata:

 generateName: artifact-passing-

spec:

 entrypoint: artifact-example

 templates:

 - name: artifact-example

   steps:

   - - name: generate-artifact

       template: whalesay

   - - name: consume-artifact

       template: print-message

       arguments:

         artifacts:

         # bind message to the hello-art artifact

         # generated by the generate-artifact step

         - name: message

           from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"


 - name: whalesay

   container:

     image: docker/whalesay:latest

     command: [sh, -c]

     args: ["cowsay hello world | tee /tmp/hello_world.txt"]

   outputs:

     artifacts:

     # generate hello-art artifact from /tmp/hello_world.txt

     # artifacts can be directories as well as files

     - name: hello-art

       path: /tmp/hello_world.txt


 - name: print-message

   inputs:

     artifacts:

     # unpack the message input artifact

     # and put it at /tmp/message

     - name: message

       path: /tmp/message

   container:

     image: alpine:latest

     command: [sh, -c]

     args: ["cat /tmp/message"]


기타

간단하게 기본 개념만 설명했지만, 이외에도 여러가지 기능을 이용하여 좀 더 복잡한 워크플로우를 구현할 수 있다. 예를 들어서 Condition 기능을 이용해서, 조건에 따라서 워크플로우를 분기하거나 조건에 따라 재귀호출을 하는 Recursive 호출, 호출중에 조건에 따라 워크플로우를 종료할 수 있는 기능등을 이용할 수 있다.

이외에도 컨테이너 배포시 사이드카(http://bcho.tistory.com/1256) 를 삽입하여 컨테이너 배포시 로그 수집이나 기타 기능등을 동시에 수행하도록 할 수 있다.