블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 

'Tutorial'에 해당되는 글 43

  1. 2018.07.16 쿠버네티스 #13 - 모니터링 (1/2)
  2. 2018.06.24 쿠버네티스 #9 - HealthCheck (1)
  3. 2018.06.11 쿠버네티스 #5 - 디스크 (볼륨/Volume) (1)
  4. 2018.01.01 Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기
  5. 2017.12.20 HBase와 구글의 빅테이블 #1 - 아키텍쳐 (2)
  6. 2017.10.20 수학포기자를 위한 딥러닝과 텐서플로우의 이해 (14)
  7. 2017.09.20 오토인코더를 이용한 비정상 거래 검출 모델의 구현 #3 - 데이타 전처리 (1)
  8. 2017.09.11 오토 인코더를 이용한 비정상 거래 검출 모델의 구현 #1 (2)
  9. 2017.09.10 텐서플로우 하이레벨 API를 Estimator를 이용한 모델 정의 방법
  10. 2017.08.30 Tensorflow Object Detection API를 이용한 물체 인식 #3-얼굴은 학습시켜보자
  11. 2017.08.21 Tensorflow Object Detection API를 이용한 물체 인식 #2-동물 사진을 학습 시켜보자 (1)
  12. 2017.08.16 Tensorflow Object Detection API를 이용한 물체 인식 #1-설치와 사용하기
  13. 2017.08.10 텐서플로우 트레이닝 데이타 포맷인 *.tfrecord 파일 읽고 쓰기 (2)
  14. 2017.07.15 데이타 워크플로우 관리를 위한 Apache Airflow #1 - 소개
  15. 2017.06.22 얼굴 인식 모델을 만들어보자 #4 -클라우드를 이용하여 학습 시키기
  16. 2017.06.15 연예인 얼굴 인식 모델을 만들어보자 - #2. CNN 모델을 만들고 학습시켜 보자 (11)
  17. 2017.06.10 머신러닝 시스템 프로세스와 아키텍쳐 (7)
  18. 2017.04.03 텐서플로우의 세션,그래프 그리고 함수의 개념 (1)
  19. 2017.04.03 텐서플로우-배치 처리에 대해서 이해하자 (2)
  20. 2017.01.09 딥러닝을 이용한 숫자 이미지 인식 #2/2-예측 (12)
 


쿠버네티스 #13

모니터링 1/2


조대협 (http://bcho.tistory.com)


시스템을 운영하는데 있어서 운영 관점에 있어서 가장 중요한 기능중의 하나는 시스템에 대한 모니터링이다. 시스템 자원의 사용량이나 에러등에 대한 모니터링을 통해서, 시스템을 안정적으로 운영하고 문제 발생시 원인 파악과 대응을 할 수 있다.

이번 글에서는 쿠버네티스 모니터링 시스템에 대한 개념과, 아키텍쳐 그리고 구축 방법에 대해서 소개하고자 한다.

쿠버네티스 모니터링 컨셉

쿠버네티스에 대한 모니터링을 보면 많은 툴과 지표들이 있어서 혼돈하기 쉬운데, 먼저 모니터링 컨셉에 대한 이해를 할 필요가 있다.

쿠버네티스 기반의 시스템을 모니터링하기 위해서는 크게 아래와 같이 4가지 계층을 모니터링해야 한다.



1. 호스트 (노드)

먼저 쿠버네티스 컨테이너를 실행하는 하드웨어 호스트 즉 노드에 대한 지표 모니터링이 필요하다. 노드의 CPU,메모리, 디스크, 네트워크 사용량과, 노드 OS와 커널에 대한 모니터링이 이에 해당한다.

2. 컨테이너

다음은 노드에서 기동되는 각각의 컨테이너에 대한 정보이다. 컨테이너의 CPU,메모리, 디스크, 네트워크 사용량등을 모니터링 한다.

3. 애플리케이션

컨테이너안에서 구동되는 개별 애플리케이션의 지표를 모니터링 한다. 예를 들어, 컨테이너에서 기동되는 node.js 기반의 애플리케이션의 응답시간, HTTP 에러 빈도등을 모니터링한다.

4. 쿠버네티스

마지막으로, 컨테이너를 컨트롤 하는 쿠버네티스 자체에 대한 모니터링을한다. 쿠버네티스의 자원인 서비스나 POD, 계정 정보등이 이에 해당한다.

쿠버네티스 기반의 시스템 모니터링에 대해서 혼돈이 오는 부분중의 하나가 모니터링이라는 개념이 포괄적이기 때문이다. 우리가 여기서 다루는 모니터링은 자원에 대한 지표 대한 모니터링이다. 포괄적인 의미의 모니터링은 로그와, 에러 모니터링등 다양한 내용을 포괄한다.  

쿠버네티스 로깅

지표 모니터링과 함께 중요한 모니터링 기능중 하나는 로그 수집 및 로그 모니터링이다.

로그 수집 및 로그 모니터링 방법은 여러가지 방법이 있지만, 오픈소스 로그 수집 및 모니터링 조합인 EFK (Elastic search + FluentD + Kibina) 스택을 이용하는 경우가 대표적이다.

Fluentd 에이전트를 이용하여, 각종 로그를 수집하여, Elastic search에 저장하고, 저장된 지표를 Kibana 대쉬 보들르 이용하여 시작화 해서 나타내는 방법이 있다.

이에 대한 자세한 설명을 생략한다.

쿠버네티스 모니터링 시스템 구축

그러면 이러한 모니터링 시스템을 어떻게 구축할 것인가?

쿠버네티스 모니터링은 버전업 과정에서 많은 변화를 겪고 있다. 기존 모니터링 시스템의 아키텍쳐는 cAdvisor,Heapster를 이용하는 구조였으나, 이 아키텍쳐는 곧 deprecated 될 예정이고, Prometheus등 다양한 모니터링 아키텍쳐가 후보로 고려 되고 있다.

아래 그래프를 보면 재미있는 통계 결과가 있는데, cAdvisor,Heapster,Promethus 를 이용하는 방법도 있지만, 클라우드의 경우에는 클라우드 벤더에서 제공하는 쿠버네티스 모니터링 솔루션을 그대로 사용하거나 (18%) 또는 데이타독이나 뉴렐릭 (Datadog, newRelic)과 같이 전문화된 모니터링 클라우드을 사용하는 비율 (26%) 도 꽤 높다.



<그림. 쿠버네티스 모니터링 솔루션 분포 >

출처 :  https://thenewstack.io/5-tools-monitoring-kubernetes-scale-production/


개인적인 의견으로는 직접 모니터링 솔루션을 구축해서 사용하는 것보다는 비용은 약간 들지만 클라우드 벤더에서 제공되는 모니터링 도구나 또는 데이타독과 같은 전문 모니터링 솔루션을 이용하는 것을 추천한다.


직접 모니터링 솔루션을 구축할 경우 구축과 운영에 드는 노력도 꽤 크고, 또한 어떠한 지표를 모니터링해야할지 등에 대한 추가적인 노하우가 필요하다. 또한 cAdvisor,Heapster,Promethues 조합은 호스트와 컨테이너 그리고 쿠버네티스에 대한 모니터링은 제공하지만 애플리케이션 지표에 대한 모니터링과 로깅 기능은 제공하지 않기 때문에 별도의 구축이 필요하다. 이런 노력을 들이는 것 보다는 모든 기능이 한번에 제공되고 운영을 대행해주는 데이타독이나 클라우드에서 제공해주는 모니터링 솔루션을 사용하는 것을 추천한다.

Heapster 기반 모니터링 아키텍처

이러한 모니터링 요건을 지원하기 위해서, 쿠버네티스는 자체적인 모니터링 컴포넌트를 가지고 있는데, 그 구조는 다음과 같다.



<그림. 쿠버네티스 모니터링 시스템 아키텍쳐>

출처 Source : https://www.datadoghq.com/blog/how-to-collect-and-graph-kubernetes-metrics/


cAdvisor

cAdvisor는 모니터링 에이전트로, 각 노드마다 설치되서 노드에 대한 정보와 컨테이너 (Pod)에 대한 지표를 수집하여, Kubelet으로 전달한다.

Heapster

cAdvisor에 의해 수집된 지표는 Heapster 라는 중앙 집중화된 지표 수집 시스템에 모이게 되고, Heapster는 수집된 지표를 스토리지 백앤드에 저장한다.

Storage backend

Heapster가 지표를 저장하는 데이타베이스를 스토리지 백앤드라고 하는데, Heapster는 확장성을 위해서 다양한 스토리지 백앤드를 플러그인 구조를 선택하여 연결할 수 있다.

현재 제공되는 대표적인 스토리지 백앤드는 구글 클라우드의 모니터링 시스템인 스택드라이버 (stackdriver), 오픈 소스 시계열 데이타베이스인 인플럭스 디비 (InfluxDB) 등을 지원한다.

그래프 대쉬 보드

이렇게 저장된 모니터링 지표는 그래프와 같은 형태로 시각화 될필요가 있는데, 스토리지 백앤드를 지원하는 다양한 시각화 도구를 사용할 수 있다. 구글의 모니터링 시스템인 스택드라이버의 경우에는 자체적인 대쉬보드 및 그래프 인터페이스가 있고, 인플럭스 디비나 프로메테우스의 경우에는 오픈소스 시각화 도구인 그라파나(Grafana)를 사용할 수 있다.


<그림. 그라파나와 프로메테우스를 연결하여, 지표 모니터링을 시각화 한 예제>


그러나 이 아키텍쳐는 deprecation 계획이 시작되서 1.13 버전 부터는 완전히 제거될 예정이다.

https://github.com/kubernetes/heapster/blob/master/docs/deprecation.md


쿠버네티스 대시보드

다른 방법으로는 쿠버네티스를 모니터링 하고 관리할 수 있는 쉬운 방법이 하나 있는데, 쿠버네티스 대시보드를 사용하는 방법이다. 쿠버네티스는 기본적으로 kubectl이라는 커맨드 라인 인터페이스 (이하 CLI : Command Line Interface)를 사용하지만, 추가적으로 웹 기반의 관리 콘솔을 제공한다. 이를 쿠버네티스 대시보드라고 한다. (https://github.com/kubernetes/dashboard)

대시 보드 설치

쿠버네티스 대시 보드 설치 방법은 간단하다. 아래와 같이 대시보드 설정 yaml 파일을 이용하면 간단하게 대시 보드를 쿠버네티스 클러스터에 설치할 수 있다.


% kubectl create -f https://raw.githubusercontent.com/kubernetes/dashboard/master/src/deploy/recommended/kubernetes-dashboard.yaml


일반적인 경우에는 위의 스크립트로 설치가 가능하지만, 구글 클라우드 쿠버네티스 엔진의의 경우에는 설치 중에 권한 관련 에러가 나올 수 있는데, 구글 클라우드 쿠버네티스 엔진의 경우에는 보안을 이유로 일반적인 쿠버네티스보다 권한 설정 레벨이 높게 설정되어 있기 때문이다. 구글 클라우드 쿠버네티스 엔진에서 대시보드를 설치하고자할때에는 위의 스크립트를 실행하기 전에 먼저 아래 명령어를 이용해서, 현재 사용자 계정에 대해서 cluster-admin 롤을 부여해줘야 한다.  


%kubectl create clusterrolebinding cluster-admin-binding \
--clusterrole cluster-admin --user $(gcloud config get-value account)

대시 보드 접속

대시보드 설치가 끝났으면, 대시보드를 접속해보자

대시보드는 외부 서비스로 제공되지 않고, 내부 IP로만 접속이 가능한데, 클러스터 외부에서 접근하려면 kubectl proxy를 이용하면, 간단하게 접근이 가능하다.

kubectl proxy는 로컬 머신 (예를 들어 노트북)과 쿠버네티스 클러스터간의 통신을 프록싱해줘서, 로컬 머신에서 쿠버네티스 클러스터내의 HTTP 서비스를 접근할 수 있도록 해준다.

사용 방법은 로컬 머신에서 간단하게

%kubectl proxy

명령을 실행해주면 localhost:8001 포트를 통해서 쿠버네티스 클러스터로 트래픽을 프록시 해준다.

위와 같이 proxy를 실행한후에,  아래 URL로 접근을 하면, 대시보드 콘솔에 접근할 수 있다.

http://localhost:8001/api/v1/namespaces/kube-system/services/https:kubernetes-dashboard:/proxy/


URL에 접근하면 아래와 같이 로그인 창이 나타난다.



사용자 계정 및 토큰등에 대해서는 보안 부분에서 별도로 다루기로 하겠다.

대쉬보드를 사용하기 위해서는 사용자 인증이 필요한데, 간단하게 인증을 위한 토큰을 사용하는 방법을 이용하도록 하겠다.

토큰은 쿠버네티스 API 인증 메커니즘중의 하나로, 여기서는 admin-user라는 계정을 하나 만든후에, 그 계정에, 클러스터 관리자롤을 부여한 후에, 그 사용자의 토큰을 사용하는 방법을 사용하겠다.


먼저 아래 스크립트를 이용해서 admin-user 라는 사용자를 생성한다.

admin-user.yaml 파일

apiVersion: v1

kind: ServiceAccount

metadata:

 name: admin-user

 namespace: kube-system


다음 아래 스크립트를 이용해서 cluster-admin 롤을 앞에서 생성한 admin-user에 부여한다.

admin-rolebinding.yaml 파일

apiVersion: rbac.authorization.k8s.io/v1beta1

kind: ClusterRoleBinding

metadata:

 name: admin-user

roleRef:

 apiGroup: rbac.authorization.k8s.io

 kind: ClusterRole

 name: cluster-admin

subjects:

- kind: ServiceAccount

 name: admin-user

 namespace: kube-system


다음 아래 명령어를 이용하면 admin-user의 토큰 값을 알 수 있다.

% kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | grep admin-user | awk '{print $1}')


명령을 실행하면 아래와 같이 토큰이 출력된다.


이 토큰 값을 앞의 로그인 창에 입력하면, 대시보드에 로그인할 수 있다.

대시 보드에 로그인하면 아래와 같이 노드나, Pod, 서비스등 쿠버네티스의 자원의 대부분의 정보에 대한 모니터링이 가능하다.




또한 kubectl CLI 명령을 사용하지 않고도 손쉽게 Deployment 등 각종 자원을 생성할 수 있다.


로그 부분에 들어가면 아래와 같이 로그 정보를 볼 수 있다



재미있는 기능중 하나는 아래 그림과 같이 특정 Pod의 컨테이너를 선택하면, 웹콘솔상에서 해당 컨테이너로 SSH 로그인이 가능하다.



여기서 다룬 쿠버네티스 대시보드 설정 및 로그인 부분은 프록시 사용, 로그인을 토큰을 사용하는 등, 운영환경에는 적절하지 않은 방법이다. 개발환경이나 테스트 용도로만 사용하도록 하고, 운영 환경에서는 사용자 계정 시스템 생성과 적절한 권한 배정을 한 후에, 적절한 보안 인증 시스템을 마련한 후에 적용하도록 하자.




쿠버네티스 #9

Health Check


조대협 (http://bcho.tistory.com)


쿠버네티스는 각 컨테이너의 상태를 주기적으로 체크해서, 문제가 있는 컨테이너를 자동으로 재시작하거나 또는 문제가 있는 컨테이너(Pod를) 서비스에서 제외할 수 있다. 이러한 기능을 헬쓰 체크라고 하는데, 크게 두가지 방법이 있다.

컨테이너가 살아 있는지 아닌지를 체크하는 방법이 Liveness probe 그리고 컨테이너가 서비스가 가능한 상태인지를 체크하는 방법을 Readiness probe 라고 한다.


Probe types

Liveness probe와 readiness probe는 컨테이너가 정상적인지 아닌지를 체크하는 방법으로 다음과 같이 3가지 방식을 제공한다.

  • Command probe

  • HTTP probe

  • TCP probe


그럼 각각에 대해서 살펴보자

Command probe

Command probe는 컨테이너의 상태 체크를 쉘 명령을 수행하고 나서, 그 결과를 가지고 컨테이너의 정상여부를 체크한다. 쉘 명령어를 수행한 후, 결과값이 0 이면 성공, 0이 아니면 실패로 간주한다.

아래는 command probe 를 사용한 예이다.

apiVersion: v1

kind: Pod

metadata:

 name: liveness-pod

spec:

 containers:

 - name: liveness

   image: gcr.io/terrycho-sandbox/liveness:v1

   imagePullPolicy: Always

   ports:

   - containerPort: 8080

   livenessProbe:

     exec:

       command:

       - cat

       - /tmp/healthy


Readiness probe 또는 liveness probe 부분에 exec: 으로 정의하고, command: 아래에 실행하고자 하는 쉘 명령어에 대한 인자를 기술한다.

이 쉘명령이 성공적으로 실행되서 0을 리턴하면, probe를 정상으로 판단한다.


HTTP probe

가장 많이 사용하는 probe 방식으로 HTTP GET을 이용하여, 컨테이너의 상태를 체크한다.

지정된 URL로 HTTP GET 요청을 보내서 리턴되는 HTTP 응답 코드가 200~300 사이면 probe를 정상으로 판단하고, 그 이외의 값일 경우에는 비정상으로 판단한다.

아래는 HTTP probe를 이용한 readiness probe를 정의한 예제이다.


metadata:

 name: readiness-rc

spec:

 replicas: 2

 selector:

   app: readiness

 template:

   metadata:

     name: readiness-pod

     labels:

       app: readiness

   spec:

     containers:

     - name: readiness

       image: gcr.io/terrycho-sandbox/readiness:v1

       imagePullPolicy: Always

       ports:

       - containerPort: 8080

       readinessProbe:

         httpGet:

           path: /readiness

           port: 8080


liveness 또는 readinessProbe  항목 아래에 httpGet이라는 이름으로 정의하고, path에  HTTP GET을 보낼 URL을 그리고, port에는 HTTP GET을 보낼 port 를 지정한다.

일반적인 HTTP 서비스를 보내는 port와 HTTP readiness를 서비스 하는 포트를 분리할 수 있는데, HTTP GET 포트가 외부에 노출될 경우에는 DDos 공격등을 받을 수 있는 가능성이 있기 때문에, 필요하다면 서비스 포트와 probe 포트를 분리해서 구성할 수 있다.

TCP probe

마지막으로 TCP probe는 지정된 포트에 TCP 연결을 시도하여, 연결이 성공하면, 컨테이너가 정상인것으로 판단한다. 다음은 tcp probe를 적용한 liveness probe의 예제이다.


apiVersion: v1

kind: Pod

metadata:

 name: liveness-pod-tcp

spec:

 containers:

 - name: liveness

   image: gcr.io/terrycho-sandbox/liveness:v1

   imagePullPolicy: Always

   ports:

   - containerPort: 8080

   livenessProbe:

     tcpSocket:

       port: 8080

     initialDelaySeconds: 5

     periodSeconds: 5


Tcp probe는 간단하게, livenessProbe나 readinessProbe 아래 tcpSocket이라는 항목으로 정의하고 그 아래 port 항목에 tcp port를 지정하면 된다. 이 포트로 TCP 연결을 시도하고, 이 연결이 성공하면 컨테이너가 정상인것으로 실패하면 비정상으로 판단한다.


그러면 실제로 Liveness Probe와 Readiness Probe를 예제를 통해서 조금 더 상세하게 살펴보도록 하자.

Liveness Probe

Liveness probe는 컨테이너의 상태를 주기적으로 체크해서, 응답이 없으면 컨테이너를 자동으로 재시작해준다. 컨테이너가 정상적으로 기동중인지를 체크하는 기능이다.


Liveness probe는 Pod의 상태를 체크하다가, Pod의 상태가 비정상인 경우 kubelet을 통해서 재 시작한다.



이해를 돕기 위해서 예제를 하나 살펴보자.

node.js 애플리케이션을 기동하는 컨테이너를 만들어서 배포 하도록 한다. node.js는 앞에서 사용한 애플리케이션과 동일한 server.js  애플리케이션을 사용한다.

헬쓰 체크를 하는 방법은 여러가지가 있지만, 컨터이너에서 “cat /tmp/healthy” 명령어를 실행해서 성공하면 컨테이너를 정상으로 판단하고 실패하면 비정상으로 판단하도록 하겠다.

이를 위해서 컨테이너 생성시에 /tmp/ 디렉토리에 healthy 파일을 복사해 놓도록 한다.

heatlhy 파일의 내용은 아래와 같다.

i'm healthy


파일만 존재하면 되기 때문에 내용은 크게 중요하지 않다.

다음 Dockerfile을 다음과 같이 작성하자

FROM node:carbon

EXPOSE 8080

COPY server.js .

COPY healthy /tmp/

CMD node server.js > log.out


앞서 작성한 healthy 파일을 /tmp 디렉토리에 복사하였다.


이제 pod를 정의해보자 다음은 liveness-pod.yaml 파일이다.

여기에 cat /tmp/healthy 명령을 이용하여 컨테이너의 상태를 체크하도록 하였다.


apiVersion: v1

kind: Pod

metadata:

 name: liveness-pod

spec:

 containers:

 - name: liveness

   image: gcr.io/terrycho-sandbox/liveness:v1

   imagePullPolicy: Always

   ports:

   - containerPort: 8080

   livenessProbe:

     exec:

       command:

       - cat

       - /tmp/healthy

     initialDelaySeconds: 5

     periodSeconds: 5



컨테이너가 기동 된후 initialDelaySecond에 설정된 값 만큼 대기를 했다가 periodSecond 에 정해진 주기 단위로 컨테이너의 헬스 체크를 한다. initialDelaySecond를 주는 이유는, 컨테이너가 기동 되면서 애플리케이션이 기동될텐데, 설정 정보나 각종 초기화 작업이 필요하기 때문에, 컨테이너가 기동되자 마자 헬스 체크를 하게 되면, 서비스할 준비가 되지 않았기 때문에 헬스 체크에 실패할 수 있기 때문에, 준비 기간을 주는 것이다. 준비 시간이 끝나면, periodSecond에 정의된 주기에 따라 헬스 체크를 진행하게 된다.


헬스 체크 방식은 여러가지가 있는데, HTTP 를 이용하는 방식 TCP를 이용하는 방식 쉘 명령어를 이용하는 방식 3가지가 있다. 이 예제에서는 쉘 명령을 이용하는 방식을 사용하였다.

“cat /tmp/healty” 라는 명령을 사용하였고, 이 명령 실행이 성공하면 이 컨테이너를 정상이라고 판단하고, 만약 이 명령이 실패하면 컨테이너가 비정상이라고 판단한다.


앞서 작성한 Dockerfile을 이용해서 컨테이너를 생성한 후, 이 컨테이너를 리파지토리에 등록하자.

다음 앞에서 작성한 liveness-prod.yaml 파일을 이용하여 Pod를 생성해보자.




다음, 테스트를 위해서 /tmp/healthy 파일을 인위적으로 삭제해보자



파일을 삭제하면 위의 그림과 같이 cat /tmp/healthy 는 exit code 1 을 내면서 에러로 종료된다.

수초 후에, 해당 컨테이너가 재 시작되는데, kubectl get pod 명령을 이용하여 pod의 상태를 확인해보면 다음과 같다.


liveness-pod는 정상적으로 실행되고는 있지만, RESTARTS 항목을 보면 한번 리스타트가 된것을 볼 수 있다.

상세 정보를 보기 위해서 kubectl describe pod liveness-pod 명령을 실행해보면 다음과 같다.


위의 그림과 같이 중간에, “Killing container with id docker://liveness:Container failed liveness probe.. Container will be killed and recreated.” 메세지가 나오면서 liveness probe 체크가 실패하고, 컨테이너를 재 시작하는 것을 확인할 수 있다.

Readiness probe

컨테이너의 상태 체크중에 liveness의 경우에는 컨테이너가 비정상적으로 작동이 불가능한 경우도 있지만, Configuration을 로딩하거나, 많은 데이타를 로딩하거나, 외부 서비스를 호출하는 경우에는 일시적으로 서비스가 불가능한 상태가 될 수 있다. 이런 경우에는 컨테이너를 재시작한다 하더라도 정상적으로 서비스가 불가능할 수 있다. 이런 경우에는 컨테이너를 일시적으로 서비스가 불가능한 상태로 마킹해주면 되는데, 이러한 기능은 쿠버네티스의 서비스와 함께 사용하면 유용하게 이용할 수 있다.


예를 들어 쿠버네티스 서비스에서 아래와 같이 3개의 Pod를 로드밸런싱으로 서비스를 하고 있을때, Readiness probe 를 이용해서 서비스 가능 여부를 주기적으로 체크한다고 하자. 이 경우 하나의 Pod가 서비스가 불가능한 상태가 되었을때, 즉 Readiness Probe에 대해서 응답이 없거나 실패 응답을 보냈을때는 해당 Pod를 사용 불가능한 상태로 체크하고 서비스 목록에서 제외한다.



Liveness probe와 차이점은 Liveness probe는 컨테이너의 상태가 비정상이라고 판단하면, 해당 Pod를 재시작하는데 반해, Readiness probe는 컨테이너가 비정상일 경우에는 해당 Pod를 사용할 수 없음으로 표시하고, 서비스등에서 제외한다.


간단한 예제를 보자. 아래 server.js 코드는 /readiness 를 호출하면 파일 시스템내에  /tmp/healthy라는 파일이 있으면 HTTP 응답코드 200 정상을 리턴하고, 파일이 없으면 HTTP 응답코드 500 비정상을 리턴하는 코드이다.


server.js 파일

var os = require('os');

var fs = require('fs');


var http = require('http');

var handleRequest = function(request, response) {

 if(request.url == '/readiness') {

   if(fs.existsSync('/tmp/healthy')){

     // healthy

     response.writeHead(200);

     response.end("Im ready I'm  "+os.hostname() +" \n");

   }else{

     response.writeHead(500);

     response.end("Im not ready I'm  "+os.hostname() +" \n");

   }

 }else{

   response.writeHead(200);

   response.end("Hello World! I'm  "+os.hostname() +" \n");

 }


 //log

 console.log("["+

Date(Date.now()).toLocaleString()+

"] "+os.hostname());

}

var www = http.createServer(handleRequest);

www.listen(8080);


다음은 replication controller를 다음과 같이 정의한다.


readiness-rc.yaml 파일

apiVersion: v1

kind: ReplicationController

metadata:

 name: readiness-rc

spec:

 replicas: 2

 selector:

   app: readiness

 template:

   metadata:

     name: readiness-pod

     labels:

       app: readiness

   spec:

     containers:

     - name: readiness

       image: gcr.io/terrycho-sandbox/readiness:v1

       imagePullPolicy: Always

       ports:

       - containerPort: 8080

       readinessProbe:

         httpGet:

           path: /readiness

           port: 8080

         initialDelaySeconds: 5

         periodSeconds: 5


앞의 Liveness probe와 다르게,  이번에는 Command probe가 아니라 HTTP 로 체크를 하는 HTTP Probe를 적용해보자 HTTP Probe는 , HTTP GET으로 /readiness URL로 5초마다 호출을 해서 HTTP 응답 200을 받으면 해당 컨테이너를 정상으로 판단하고 200~300 범위를 벗어난 응답 코드를 받으면 비정상으로 판단하여, 서비스 불가능한 상태로 인식해서 쿠버네티스 서비스에서 제외한다.


Replication Controller 로 의해서 Pod들을 생성하였으면 이에 대한 로드 밸런서 역할을할 서비스를 배포한다.


readiness-svc.yaml 파일

apiVersion: v1

kind: Service

metadata:

 name: readiness-svc

spec:

 selector:

   app: readiness

 ports:

   - name: http

     port: 80

     protocol: TCP

     targetPort: 8080

 type: LoadBalancer


서비스가 기동되고 Pod들이 정상적으로 기동된 상태에서 kubectl get pod 명령을 이용해서 현재 Pod 리스트를 출력해보면 다음과 같다.


2개의 Pod가 기동중인것을 확인할 수 있다.


서비스가 기동중인 상태에서 인위적으로 하나의 컨테이너를 서비스 불가 상태로 만들어보자.

앞에서만든 server.js가, 컨테이너 내의 /tmp/healthy 파일의 존재 여부를 체크하기 때문에,  /tmp/healthy 파일을 삭제하면 된다.

아래와 같이

%kubectl exec  -it readiness-rc-5v64f -- rm /tmp/healthy

명령을 이용해서 readiness-rc-5v64f pod의 /tmp/healthy 파일을 삭제해보자

다음 kubectl describe pod readiness-rc-5v64f 명령을 이용해서 해당 Pod의 상태를 확인할 수 있는데, 아래 그림과 같이 HTTP probe가 500 상태 코드를 리턴받고 Readniess probe가 실패한것을 확인할 수 있다.


이 상태에서 kubectl get pod로 pod 목록을 확인해보면 다음과 같다.




Readiness probe가 실패한 readiness-rc-5v64f 의 상태가 Running이기는 하지만 Ready 상태가 0/1인것으로 해당 컨테이너가 준비 상태가 아님을 확인할 수 있다.

이 Pod들에 연결된 서비스를 여러번 호출해보면 다음과 같은 결과를 얻을 수 있다.



모든 호출이 readiness-rc-5v64f 로 가지않고, 하나 남은 정상적인 Pod인 readiness-rc-89d89 로만 가는 것을 확인할 수 있다.  


다음글에서는 Deployment에 대해서 알아보도록 하겠다.



쿠버네티스 #4

Volume (디스크)

조대협 (http://bcho.tistory.com)


이번 글에서는 쿠버네티스의 디스크 서비스인 볼륨에 대해서 알아보도록 하겠다.

쿠버네티스에서 볼륨이란 Pod에 종속되는 디스크이다. (컨테이너 단위가 아님). Pod 단위이기 때문에, 그 Pod에 속해 있는 여러개의 컨테이너가 공유해서 사용될 수 있다.

볼륨 종류

쿠버네티스의 볼륨은 여러가지 종류가 있는데,  로컬 디스크 뿐 아니라, NFS, iSCSI, Fiber Channel과 같은 일반적인 외장 디스크 인터페이스는 물론, GlusterFS나, Ceph와 같은 오픈 소스 파일 시스템, AWS EBS, GCP Persistent 디스크와 같은 퍼블릭 클라우드에서 제공되는 디스크, VsphereVolume과 같이 프라이비트 클라우드 솔루션에서 제공하는 디스크 볼륨까지 다양한 볼륨을 지원한다.

자세한 볼륨 리스트는 https://kubernetes.io/docs/concepts/storage/volumes/#types-of-volumes 를 참고하기 바란다.


이 볼륨 타입을 구별해보면 크게 임시 디스크, 로컬 디스크 그리고 네트워크 디스크 등으로 분류할 수 있다.


Temp

Local

Network

emptyDir

hostPath

GlusterFS

gitRepo

NFS

iSCSI

gcePersistentDisk

AWS EBS

azureDisk

Fiber Channel

Secret

VshereVolume


그럼 각각에 대해서 알아보도록 하자

emptyDir

emptyDir은 Pod가 생성될때 생성되고, Pod가 삭제 될때 같이 삭제되는 임시 볼륨이다.

단 Pod 내의 컨테이너 크래쉬되어 삭제되거나 재시작 되더라도 emptyDir의 생명주기는 컨테이너 단위가 아니라, Pod 단위이기 때문에, emptyDir은 삭제 되지 않고 계속해서 사용이 가능하다.

생성 당시에는 디스크에 아무 내용이 없기 때문에, emptyDir  이라고 한다.

emptyDir의 물리적으로 노드에서 할당해주는 디스크에 저장이 되는데, (각 환경에 따라 다르다. 노드의 로컬 디스크가 될 수 도 있고, 네트워크 디스크등이 될 수 도 있다.) emptyDir.medium 필드에 “Memory”라고 지정해주면, emptyDir의 내용은 물리 디스크 대신 메모리에 저장이 된다.


다음은 하나의 Pod에 nginx와 redis 컨테이너를 기동 시키고, emptyDir 볼륨을 생성하여 이를 공유하는 설정이다.


apiVersion: v1

kind: Pod

metadata:

 name: shared-volumes

spec:

 containers:

 - name: redis

   image: redis

   volumeMounts:

   - name: shared-storage

     mountPath: /data/shared

 - name: nginx

   image: nginx

   volumeMounts:

   - name: shared-storage

     mountPath: /data/shared

 volumes:

 - name : shared-storage

   emptyDir: {}


shared-storage라는 이름으로 emptyDir 기반의 볼륨을 만든 후에, nginx와 redis 컨테이너의 /data/shared 디렉토리에 마운트를 하였다.


Pod를 기동 시킨후에, redis 컨테이너의 /data/shared 디렉토리에 들어가 보면 당연히 아무 파일도 없는 것을 확인할 수 있다.

이 상태에서 아래와 같이 file.txt 파일을 생성하였다.



다음 nginx 컨테이너로 들어가서 /data/shared 디렉토리를 살펴보면 file.txt 파일이 있는 것을 확인할 수 있다.



이 파일은 redis 컨테이너에서 생성이 되어 있지만, 같은 Pod 내이기 때문에, nginx 컨테이너에서도 접근이 가능하게 된다.

hostPath

다음은 hostPath 라는 볼륨 타입인데, hostPath는 노드의 로컬 디스크의 경로를 Pod에서 마운트해서 사용한다. 같은 hostPath에 있는 볼륨은 여러 Pod 사이에서 공유되어 사용된다.

또한  Pod가 삭제 되더라도 hostPath에 있는 파일들은 삭제되지 않고 다른 Pod가 같은 hostPath를 마운트하게 되면, 남아 있는 파일을 액세스할 수 있다.


주의할점 중의 하나는 Pod가 재시작되서 다른 노드에서 기동될 경우, 그 노드의 hostPath를 사용하기 때문에, 이전에 다른 노드에서 사용한 hostPath의 파일 내용은 액세스가 불가능하다.


hostPath는 노드의 파일 시스템을 접근하는데 유용한데, 예를 들어 노드의 로그 파일을 읽어서 수집하는 로그 에이전트를 Pod로 배포하였을 경우, 이 Pod에서 노드의 파일 시스템을 접근해야 한다. 이러한 경우에 유용하게 사용할 수 있다.


아래는 노드의 /tmp 디렉토리를 hostPath를 이용하여 /data/shared 디렉토리에 마운트 하여 사용하는 예제이다.


apiVersion: v1

kind: Pod

metadata:

 name: hostpath

spec:

 containers:

 - name: redis

   image: redis

   volumeMounts:

   - name: terrypath

     mountPath: /data/shared

 volumes:

 - name : terrypath

   hostPath:

     path: /tmp

     type: Directory



이 Pod를 배포해서 Pod를 Id를 얻어보았다.


Pod Id를 통해서 VM을 아래와 같이 확인하였다.


VM에 SSH로 접속해서 /tmp/에 hello.txt 파일을 생성하였다.




다음, Pod의 컨테이너에서 마운트된 /data/shared 디렉토리를 확인해보면 아래와 같이 노드의 /tmp 디렉토리의 내용이 그대로 보이는 것을 볼 수 있다.


gitRepo

볼륨 타입중에 gitRepo라는 유용한 볼륨 타입이 하나 있어서 소개한다.

이 볼륨은 생성시에 지정된 git 리파지토리의 특정 리비전의 내용을 clone을 이용해서 내려 받은후에 디스크 볼륨을 생성하는 방식이다. 물리적으로는 emptyDir이 생성되고, git 레파지토리 내용을 clone으로 다운 받는다.




HTML과 같은 정적 파일이나 Ruby on rails, PHP, node.js 와 같은 스크립트 언어 기반의 코드들은 gitRepo 볼륨을 이용하여 손쉽게 배포할 수 있다.


apiVersion: v1

kind: Pod

metadata:

name: gitrepo-volume-pod

spec:

containers:

- image: nginx:alpine

  name: web-server

  volumeMounts:

  - name: html

    mountPath: /usr/share/nginx/html

    readOnly: true

  ports:

  - containerPort: 80

    protocol: TCP

volumes:

- name: html

  gitRepo:

       repository: https://github.com/luksa/kubia-website-example.git

       revision: master

       directory: .


이 설정은 https://github.com/luksa/kubia-website-example.git 의 master 리비전을 클론으로 다운받아서 /usr/share/nginx/html에 마운트 시키는 설정이다.


PersistentVolume and PersistentVolumeClaim

일반적으로 디스크 볼륨을 설정하려면 물리적 디스크를 생성해야 하고, 이러한 물리적 디스크에 대한 설정을 자세하게 이해할 필요가 있다.

쿠버네티스는 인프라에 대한 복잡성을 추상화를 통해서 간단하게 하고, 개발자들이 손쉽게 필요한 인프라 (컨테이너,디스크, 네트워크)를 설정할 수 있도록 하는 개념을 가지고 있다

그래서 인프라에 종속적인 부분은 시스템 관리자가 설정하도록 하고, 개발자는 이에 대한 이해 없이 간단하게 사용할 수 있도록 디스크 볼륨 부분에 PersistentVolumeClaim (이하 PVC)와 PersistentVolume (이하 PV)라는 개념을 도입하였다.


시스템 관리자가 실제 물리 디스크를 생성한 후에, 이 디스크를 PersistentVolume이라는 이름으로 쿠버네티스에 등록한다.

개발자는 Pod를 생성할때, 볼륨을 정의하고, 이 볼륨 정의 부분에 물리적 디스크에 대한 특성을 정의하는 것이 아니라 PVC를 지정하여, 관리자가 생성한 PV와 연결한다.


그림으로 정리해보면 다음과 같다.


시스템 관리자가 생성한 물리 디스크를 쿠버네티스 클러스터에 표현한것이 PV이고, Pod의 볼륨과 이 PV를 연결하는 관계가 PVC가 된다.


이때 주의할점은 볼륨은 생성된후에, 직접 삭제하지 않으면 삭제되지 않는다. PV의 생명 주기는 쿠버네티스 클러스터에 의해서 관리되면 Pod의 생성 또는 삭제에 상관없이 별도로 관리 된다. (Pod와 상관없이 직접 생성하고 삭제해야 한다.)

PersistentVolume

PV는 물리 디스크를 쿠버네티스에 정의한 예제로, NFS 파일 시스템 5G를 pv0003이라는 이름으로 정의하였다.




PV를 설정하는데 여러가지 설정 옵션이 있는데, 간략하게 그 내용을 살펴보면 다음과 같다.

  • Capacity
    볼륨의 용량을 정의한다. 현재는 storage 항목을 통해서 용량만을 지정하는데 향후에는 필요한 IOPS나 Throughput등을 지원할 예정이다.

  • VolumeMode
    VolumeMode는 Filesystem (default)또는 raw를 설정할 수 있는데, 볼륨이 일반 파일 시스템인데, raw 볼륨인지를 정의한다.

  • Reclaim Policy
    PV는 연결된 PVC가 삭제된 후 다시 다른 PVC에 의해서 재 사용이 가능한데, 재 사용시에 디스크의 내용을 지울지 유지할지에 대한 정책을 Reclaim Policy를 이용하여 설정이 가능하다.

    • Retain : 삭제하지 않고 PV의 내용을 유지한다.

    • Recycle : 재 사용이 가능하며, 재 사용시에는 데이타의 내용을 자동으로 rm -rf 로 삭제한 후 재사용이 된다.

    • Delete : 볼륨의 사용이 끝나면, 해당 볼륨은 삭제 된다. AWS EBS, GCE PD,Azure Disk등이 이에 해당한다.

Reclaim Policy은 모든 디스크에 적용이 가능한것이 아니라, 디스크의 특성에 따라서 적용이 가능한 Policy가 있고, 적용이 불가능한 Policy 가 있다.

  • AccessMode
    AccessMode는 PV에 대한 동시에 Pod에서 접근할 수 있는 정책을 정의한다.

    • ReadWriteOnce (RWO)
      해당 PV는 하나의 Pod에만 마운트되고 하나의 Pod에서만 읽고 쓰기가 가능하다.

    • ReadOnlyMany(ROX)
      여러개의 Pod에 마운트가 가능하며, 여러개의 Pod에서 동시에 읽기가 가능하다. 쓰기는 불가능하다.

    • ReadWriteMany(RWX)
      여러개의 Pod에 마운트가 가능하고, 동시에 여러개의 Pod에서 읽기와 쓰기가 가능하다.

위와 같이 여러개의 모드가 있지만, 모든 디스크에 사용이 가능한것은 아니고 디스크의 특성에 따라서 선택적으로 지원된다.


PV의 라이프싸이클

PV는 생성이 되면, Available 상태가 된다. 이 상태에서 PVC에 바인딩이 되면 Bound 상태로 바뀌고 사용이 되며, 바인딩된 PVC가 삭제 되면, PV가 삭제되는 것이 아니라  Released 상태가 된다. (Available이 아니면 사용은 불가능하고 보관 상태가 된다.)

PV 생성 (Provisioning)

PV의 생성은 앞에서 봤던것 처럼 yaml 파일등을 이용하여, 수동으로 생성을 할 수 도 있지만, 설정에 따라서 필요시마다 자동으로 생성할 수 있게 할 수 있다. 이를 Dynamic Provisioning (동적 생성)이라고 하는데, 이에 대해서는 PVC를 설명하면서 같이 설명하도록 하겠다.

PersistentVolumeClaim

PVC는 Pod의 볼륨과 PVC를 연결(바인딩/Bind)하는 관계 선언이다.

아래 예제를 보자 아래 예제는 PVC의 예제이다.



(출처 : https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims)


  • accessMode, VolumeMode는 PV와 동일하다.

  • resources는 PV와 같이, 필요한 볼륨의 사이즈를 정의한다.

  • selector를 통해서 볼륨을 선택할 수 있는데, label selector 방식으로 이미 생성되어 있는 PV 중에, label이 매칭되는 볼륨을 찾아서 연결하게 된다.


PV/PVC 예제

그러면 예제를 통해서 PV를 생성하고, 이 PV를 PVC에 연결한후에, PVC를 Pod에 할당하여 사용하는 방법을 살펴보도록 하자. 예제는 구글 클라우드 환경을 사용하였다.

1.물리 디스크 생성

먼저 구글 클라우드 콘솔에서 Compute Engine 부분에서 아래와 같이 Disks 부분에서 물리 디스크를 생성한다.


디스크를 pv-demo-disk라는 이름으로 생성하였다.

이때 주의할점은 디스크의 region과 zone이 쿠베네티스 클러스터가 배포된 region과 zone에 동일해야 한다.


2.생성된 디스크로 PV를 선언

생성된 디스크를 이용하여 PV를 생성한다. 아래는 PV를 생성하기 위한 yaml 파일이다.


existing-pd.yaml

apiVersion: v1

kind: PersistentVolume

metadata:

 name: pv-demo

spec:

 storageClassName:

 capacity:

   storage: 20G

 accessModes:

   - ReadWriteOnce

 gcePersistentDisk:

   pdName: pv-demo-disk

   fsType: ext4


PV의이름은 pv-demo이고, gcePersistentDisk에서 앞에서 생성한 pv-demo-disk 를 사용하도록 정의하였다.

파일을 실행하면, 아래와 같이 pv-demo로 PV가 생성된것을 확인할 수 있다.

3. 다음 PVC를 생성한다.

아래는 앞에서 생성한 pv-demo PV를 사용하는 PVC를 생성하는 yaml 파일이다. 하나의 Pod에서만 액세스가 가능하도록 accessMode를 ReadWriteOnce로 설정하였다.


existing-pvc.yaml

apiVersion: v1

kind : PersistentVolumeClaim

metadata:

 name: pv-claim-demo

spec:

 storageClassName: ""

 volumeName: pv-demo

 accessModes:

   - ReadWriteOnce

 resources:

   requests:

     storage: 20G


4. Pod를 생성하여, PVC를 바인딩

그러면 앞에서 생성한 PV와 PVC를 Pod에 생성해서 연결하자


existing-pod-redis.yaml

apiVersion: v1

kind: Pod

metadata:

 name: redis

spec:

 containers:

 - name: redis

   image: redis

   volumeMounts:

   - name: terrypath

     mountPath: /data

 volumes:

 - name : terrypath

   persistentVolumeClaim:

     claimName: pv-claim-demo


앞에서 생성한 PVC pv-claim-demo를 Volume에 연결한후, 이 볼륨을 /data 디렉토리에 마운트 하였다.

Pod를 생성한후에, 생성된 Pod에 df -k 로 디스크 연결 상태를 확인해 보면 다음과 같다.



/dev/sdb 가 20G로 생성되어 /data 디렉토리에 마운트 된것을 확인할 수 있다.

Dynamic Provisioning

앞에서 본것과 같이 PV를 수동으로 생성한후 PVC에 바인딩 한 후에, Pod에서 사용할 수 있지만, 쿠버네티스 1.6에서 부터 Dynamic Provisioning (동적 생성) 기능을 지원한다. 이 동적 생성 기능은 시스템 관리자가 별도로 디스크를 생성하고 PV를 생성할 필요 없이 PVC만 정의하면 이에 맞는 물리 디스크 생성 및 PV 생성을 자동화해주는 기능이다.




PVC를 정의하면, PVC의 내용에 따라서 쿠버네티스 클러스터가 물리 Disk를 생성하고, 이에 연결된 PV를 생성한다.

실 환경에서는 성능에 따라 다양한 디스크(nVME, SSD, HDD, NFS 등)를 사용할 수 있다. 그래서 디스크를 생성할때, 필요한 디스크의 타입을 정의할 수 있는데, 이를 storageClass 라고 하고, PVC에서 storage class를 지정하면, 이에 맞는 디스크를 생성하도록 한다.

Storage class를 지정하지 않으면, 디폴트로 설정된 storage class 값을 사용하게 된다.


동적 생성 방법은 어렵지 않다. PVC에 필요한 디스크 용량을 지정해놓으면, 자동으로 이에 해당하는 물리 디스크 및 PV가 생성이 된다. 아래는 동적으로 PV를 생성하는 PVC 예제이다.


dynamic-pvc.yaml

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

 name: mydisk

spec:

 accessModes:

   - ReadWriteOnce

 resources:

   requests:

     storage: 30Gi


다음 Pod를 생성한다.

apiVersion: v1

kind: Pod

metadata:

 name: redis

spec:

 containers:

 - name: redis

   image: redis

   volumeMounts:

   - name: terrypath

     mountPath: /data/shared

 volumes:

 - name : terrypath

   persistentVolumeClaim:

     claimName: mydisk


Pod를 생성한후에, kubectl get pvc 명령어를 이용하여, 생성된 PVC와 PV를 확인할 수 있다.

PVC는 위에서 정의한것과 같이 mydisk라는 이름으로 생성되었고, Volume (PV)는 pvc-4a…. 식으로 새롭게 생성되었다.

Storage class

스토리지 클래스를 살펴보자,

아래는  AWS EBS 디스크에 대한 스토리지 클래스를 지정한 예로, slow 라는 이름으로 스토리지 클래스를 지정하였다. EBS 타입은 io1을 사용하고, GB당 IOPS는 10을 할당하도록 하였고, 존은 us-east-1d와 us-east-1c에 디스크를 생성하도록 하였다.



아래는 구글 클라우드의 Persistent Disk (pd)의 예로, slow라는 이름으로 스토리지 클래스를 지정하고, pd-standard (HDD)타입으로 디스크를 생성하되 us-central1-a와 us-central1-b 존에 디스크를 생성하도록 하였다.



이렇게 정의한 스토리지 클래스는  PVC 정의시에, storageClassName에 적으면 PVC에 연결이 되고, 스토리지 클래스에 정해진 스펙에 따라서 물리 디스크와 PV를 생성하게 된다.

Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기


조대협 (http://bcho.tistory.com)



개요

텐서플로우 학습에 있어서 데이타 포맷은 학습의 성능을 결정 짓는 중요한 요인중의 하나이다. 특히 이미지 파일의 경우 이미지 목록과 이미지 파일이 분리되어 있어서 텐서플로우에서 학습시 이미지 목록을 읽으면서, 거기에 있는 이미지 파일을 매번 읽어야 하기 때문에, 코딩이 다소 지저분해지고,IO 성능이 떨어질 수 있다

텐서플로우에서는 이러한 학습 데이타를 쉽게 읽을 수 있도록 tfrecord (http://bcho.tistory.com/1190)라는 파일 포맷을 지원한다.


이 글에서는 이미지 데이타를 읽어서 tfrecord 로 컨버팅하는 방법을 설명하며, 분산 데이타 처리 프레임웍인 오픈소스 Apache Beam을 기준으로 설명하나, tfrecord 변환 부분은 Apache Beam과 의존성이 없이 사용이 가능하기 때문에, 필요한 부분만 참고해도 된다. 이 Apache Beam을 구글의 Apache Beam 런타임 (매니지드 서비스)인 구글 클라우드의 Dataflow를 이용하여, 클러스터를 이용하여 빠르게 데이타를 처리하는 방법에 대해서 알아보도록 한다.


전체 코드는 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/4.%20Convert%20Pickle%20file%20to%20TFRecord%20by%20using%20Apache%20Beam.ipynb 에 있다.


이 코드는 CIFAR-10 이미지 데이타를 Apache Beam 오픈 소스를 이용하여, 텐서플로우 학습용 데이타 포맷인  tfrecord 형태로 변환 해주는 코드이다.


Apache Beam은 데이타 처리를 위한 프레임웍으로, 구글 클라우드 상에서 실행하거나 또는 개인 PC나 Spark 클러스터상 여러 환경에서 실행이 가능하며, 구글 클라우드 상에서 실행할 경우 오토스케일링이나 그래프 최적화 기능등으로 최적화된 성능을 낼 수 있다.


CIFAR-10 데이타 셋은 32x32 PNG 이미지 60,000개로 구성된 데이타 셋으로 해당 코드 실행시 최적화가 되지 않은 상태에서 약 16분 정도의 처리 시간이 소요된다. 이 중 6분 정도는 Apache Beam 코드를 구글 클라우드로 업로드 하는데 소요되는 시간이고 실제 처리시간은 10분정도가 소요된다. 전처리 과정에 Apache Beam을 사용하기 전에 고려해야 할 요소는 다음과 같다.

  • 데이타가 아주 많아서 전처리 시간이 수시간 이상 소요될 경우 Apache Beam + Google Cloud를 고려하여 여러 머신에서 동시에 처리하여 빠른 시간내에 수행되도록 할 수 있다.

  • 데이타가 그다지 많지 않고 싱글 머신에서 멀티 쓰레드로 처리를 원할 경우에는 Apache Beam으로 멀티 쓰레드 기반의 병렬 처리를 하는 방안을 고려할 수 있다. 이 경우 클라우드에 대한 의존성을 줄일 수 있다.

  • 다른 대안으로는 Spark/Hadoop 등의 오픈소스를 사용하여, On Prem에서 여러 머신을 이용하여 전처리 하는 방안을 고려할 수 있다.

여기서는 아주 많은 대량의 이미지 데이타에 대한 처리를 하는 것을 시나리오로 가정하였다.

전처리 파이프라인

Apache Beam을 이용한 데이타 전처리 파이프라인의 구조는 다음과 같다.

이미지 파일 준비

CIFAR-10 데이타셋 원본은 이미지 파일 형태가 아니라 PICKLE이라는 파일 포맷으로 되어 있기 때문에,  실제 개발 환경에서는 원본데이타가 이미지인것으로 가정하기 위해서 https://github.com/bwcho75/cifar-10/tree/master/pre-processing 의 1~2번 코드를 통해서 Pickle 파일을 이미지 파일로 변경하고, *.csv 파일에 {파일명},{레이블} 형태로 인덱스 데이타를 생성하였다.

생성된 이미지 파일과 *.csv 파일은 gsutil 명령어를 이용하여 Google Cloud Storage (aka GCS)에 업로드 하였다. 업로드 명령은 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/2.%20Convert%20CIFAR-10%20Pickle%20files%20to%20image%20file.ipynb 에 설명되어 있다.


전처리 파이프라인의 구조

Apache Beam으로 구현된 파이프라인의 구조는 다음과 같다.


1. TextIO의 ReadFromText로 CSV 파일에서 한 라인 단위로 문자열을 읽는다.

2. parseLine에서 라인을 ,로 구분하여 filename과 label을 추출한다.

3. readImage 에서 filename을 가지고, 이미지 파일을 읽어서, binary array 형태로 변환한다.

4. TFExampleFromImageDoFn에서 이미지 바이너리와 label을 가지고 TFRecord 데이타형인 TFExample 형태로 변환한다.

5. 마지막으로 TFRecordIOWriter를 통해서 TFExample을 *.tfrecord 파일에 쓴다.

코드 주요 부분 설명

환경 설정 부분

이 코드는 구글 클라우드와 로컬 환경 양쪽에서 모두 실행이 가능하도록 구현되었다.

SRC_DIR_DEV는 로컬환경에서 이미지와 CSV 파일이 위치한 위치이고, DES_DIR_DEV는 로컬환경에서 tfrecord 파일이 써지는 위치이다.

구글 클라우드에서 실행할 경우 파일 저장소를  GCS (Google Cloud Storage)를 사용한다. DES_BUCKET은 GCS 버킷 이름이다. 코드 실행전에 반드시 구글 클라우드 콘솔에서 GCS 버킷을 생성하기 바란다.  SRC_DIR_PRD와 DES_DIR_PRD는 GCS 버킷내의 각각 image,csv 파일의 경로와 tfrecord 파일이 써질 경로 이다. 이 경로에 맞춰서 구글 클라우드 콘솔에서 디렉토리를 먼저 생성해 놓기를 바란다.




PROJECT는 구글 클라우드 프로젝트 명이고, 마지막으로 DEV_MODE가 True이면 로컬에서 수행이되고 False이면 구글 클라우드에서 실행하도록 하는 환경 변수이다.

의존성 설정 부분

로컬에서 실행할 경우필요한  파이썬 라이브러리가 이미 설치되어야 있어야 한다.

만약에 구글 클라우드에서 실행할 경우 이 Apache Beam 코드가 사용하는 파이썬 모듈을 명시적으로 정의해놔야 한다. 클라우드에서 실행시에는 Apache Beam 코드만 업로드가 되기 때문에(의존성 라이브러리를 같이 업로드 하는 방법도 있는데, 이는 추후에 설명한다.), 의존성 라이브는 구글 클라우드에서 Dataflow 실행시 자동으로 설치할 수 있도록 할 수 있는데, 이를 위해서는 requirements.txt 파일에 사용하는 파이썬 모듈들을 정의해줘야 한다. 다음은 requirements.txt에 의존성이 있는 파이썬 모듈등을 정의하고 저장하는 부분이다.


Apache Beam 코드

Apache Beam의 코드 부분은 크게 복잡하지 않기 때문에 주요 부분만 설명하도록 한다.

Service account 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 코드 실행에 대한 권한을 줘야 한다. 구글 클라우드에서는 사용자가 아니라 애플리케이션에 권한을 부여하는 방법이 있는데, Service account라는 것을 사용한다. Service account는 json 파일로 실행 가능한 권한을 정의하고 있다.

Service account 파일을 생성하는 방법은 http://bcho.tistory.com/1166 를 참고하기 바란다.

Service account 파일이 생성되었으면, 이 파일을 적용해야 하는데 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 Service account  파일의 경로를 정의해주면 된다. 파이썬 환경에서 환경 변수를 설정하는 방법은 os.envorin[‘환경변수명']에 환경 변수 값을 지정해주면 된다.

Jobname 설정

구글 클라우드에서 Apache Beam 코드를 실행하면, 하나의 실행이 하나의 Job으로 생성되는데, 이 Job을 구별하기 위해서 Job 마다 ID 를 설정할 수 있다. 아래는 Job ID를 ‘cifar-10’+시간 형태로 지정하는 부분이다


환경 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 몇가지 환경을 지정해줘야 한다.


  • staging_location은 클라우드 상에서 실행시 Apache Beam 코드등이 저장되는 위치이다. GCS 버킷 아래 /staging이라는 디렉토리로 지정했는데, 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • temp_location은 기타 실행중 필요한 파일이 저장되는 위치이다. 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • zone은 dataflow worker가 실행되는 존으로 여기서는 asia-northeast1-c  (일본 리전의 c 존)으로 지정하였다.


DEV_MODE 에 따른 환경 설정

로컬 환경이나 클라우드 환경에서 실행이냐에 따라서 환경 변수 설정이 다소 달라져야 한다.


디렉토리 경로를 바꿔서 지정해야 하고, 중요한것은 RUNNER인데, 로컬에서 실행하기 위해서는 DirectRunner를 구글 클라우드 DataFlow 서비스를 사용하기 위해서는 DataflowRunner를 사용하면 된다.


readImage 부분

Read Image는 이미지 파일을 읽어서 byte[] 로 리턴하는 부분인데, 로컬 환경이냐, 클라우드 환경이냐에 따라서 동작 방식이 다소 다르다.

클라우드 환경에서는 이미지 파일이 GCS에 저장되어 있기 때문에 파이썬의 일반 파일 open 명령등을 사용할 수 없다.

그래서 클라우드 환경에서 동작할 경우에는 GCS에서 파일을 읽어서 Worker의 로컬 디스크에 복사를 해놓고 이미지를 읽어서 byte[]로 변환한 후에, 해당 파일을 지우는 방식을 사용한다.


아래 코드에서 보면 DEV_MODE가 False 인경우 GCS에서 파일을 읽어서 로컬에 저장하는 코드가 있다.


storageClient는 GCS 클라이언트이고 bucket 을 얻어온후, bucket에서 파일을 get_blob 명령어를 이용하여 경로를 저장하여 blob.download_to_file을 이용하여 로컬 파일에 저장하였다.

실행

코드 작성이 끝났으면 실행을 한다. 실행 상태는 구글 클라우드 콘솔의 Dataflow  메뉴에서 확인이 가능하다.

아래와 같이 실행중인 그리고 실행이 끝난 Job 리스트들이 출력된다.




코드 실행중에, 파이프라인 실행 상황 디테일을 Job 을 선택하면 볼 수 있다.


여기서 주목할만한 점은 우측 그래프인데, 우측 그래프는 Worker의 수를 나타낸다. 초기에 1대로 시작했다가 오토 스케일링에 의해서 9대 까지 증가한것을 볼 수 있다.

처음 실행이었기 때문에 적정한 인스턴스수를 몰랐기 때문에 디폴트로 1로 시작하고 오토스케일링을 하도록 했지만, 어느정도 테스트를 한후에 적정 인스턴수를 알면 오토 스케일링을 기다릴 필요없이 디폴트 인스턴스 수를 알면 처음부터 그 수만큼 인스턴스 수로 시작하도록 하면 실행 시간을 줄일 수 있다.

만약에 파이프라인 실행시 에러가 나면 우측 상단에 LOGS 버튼을 누르면 상세 로그를 볼 수 있다.


아래 그림은 파이프라인 실행이 실패한 예에서 STACK TRACES를 통해서 에러 내용을 확인하는 화면이다.



해당 로그를 클릭하면 Stack Driver (구글의 모니터링 툴)의 Error Reporting 시스템 화면으로 이동하게 된다.

여기서 디테일한 로그를 볼 수 있다.

아래 화면을 보면 ReadImage 단계에서 file_path라는 변수명을 찾을 수 없어서 나는 에러를 확인할 수 있다.


TFRecord 파일 검증

파이프라인 실행이 끝나면, GCS 버킷에 tfrecord 파일이 생성된것을 확인할 수 있다.


해당 파일을 클릭하면 다운로드 받을 수 있다.

노트북 아래 코드 부분이 TFRecord를 읽어서 확인하는 부분이다. 노트북에서 tfrecord 파일의 경로를 다운로드 받은 경로로 변경하고 실행을 하면 파일이 제대로 읽히는 지 확인할 수 있다.


파일 경로 부분은 코드상에서 다음과 같다.



정상적으로 실행이 된 경우, 다음과 같이 tfrecord에서 읽은 이미지와 라벨값이 출력됨을 확인할 수 있다.


라벨 값은 Label 줄에 values 부분에 출력된다. 위의 그림에서는 순서대로 라벨 값이 4와 2가 된다.



HBase 와 구글의 빅테이블

#1 아키텍쳐


조대협 (http://bcho.tistory.com)

HBase

HBase 는 아파치 오픈소스 NoSQL 솔루션으로 구글의 빅테이블  (https://research.google.com/archive/bigtable.html) 논문을 기반으로 개발되었다.

Key/Value Store 기반의 NoSQL이며, 대용량 데이타를 빠르게 처리할 수 있는 기능을 가지고 있다.

데이타 모델

HBase는 컬럼 패밀리라는 데이타 모델을 사용하는데, 대략적인 구조를 보면 다음과 같다.

각 행은 하나의 로우키(rowkey)를 가지고 있다. 이 키는 RDBMS의 프라이머리 키와 같은 키라고 보면 된다.

각각의 행에는 컬럼이 정의되어 있는데, RDBMS 테이블의 일반 컬럼과 같은 개념이라고 보면 된다. 특이 사항은 이 컬럼들이 컬럼 패밀리 (Column family)라는 것으로 묶이게 되는데, 이렇게 컬럼 패밀리로 묶인 컬럼의 데이타는 물리적으로 같은곳에 저장이 된다. 그래서, 데이타 접근시에 한꺼번에 접근되는 데이타의 경우에는 컬럼 패밀리로 묶는 것이 유리하다.

위의 그림은 name과 contact, 그리고 company라는 컬럼 패밀리를 가지고 있고,

  • name 컬럼 패밀리는 lastname,firstname 컬럼

  • contact 컬럼 패밀리는 phone, mobile, email 컬럼

  • company 컬럼 패밀리는 company라는 컬럼

을 가지고 있다.

내부적으로 데이타는 rowkey에 의해서 오름차순으로 정렬이 되서 저장이 된다.



각 컬럼의 값을 셀이라고 하는데, 데이타 셀에는 timestamp가 있어서 이전의 값이 같이 저장되며, 일정 기간까지 그 값을 유지하도록 한다.


조인이나 인덱스등을 지원하지는 않지만 대용량 데이타를 안전하고 빠르게 저장 및 억세스가 가능하기 때문에, 광고 클릭 데이타나, 사용자 행동 데이타 수집, 로그 수집, IOT의 센서 데이타, 금융에서 시계열 데이타 등을 저장하는데 유용하게 사용할 수 있다.

아키텍쳐

아래 아키텍쳐는 HBase의 원조인 빅테이블의 아키텍쳐이다.




주키퍼등 몇몇 시스템들이 빠져 있지만, 큰 구조는 유사하다고 보면 된다.

데이타 노드에 SSTable 이라는 파일 형태로 데이타가 저장되어 있고, 위에 연산 노드가 붙어서 클러스터를 이룬다. 각 노드는 데이타를 저장하고 있는데, 로우키에 따라서 그 데이타가 분산되어 저장된다. 예를 들어 키가 1~3000의 범위를 가지고 노드가 3개이면 1번 노드는 1~1000, 2번은 2~2000, 3번은 3~3000 데이타를 저장하고 처리하게 된다.


각 노드의 구조는 다음과 같다.

쓰기 연산이 들어오면, 쓰기에 대한 로그를 tablet log 라는 파일에 남긴다. RDBMS의 백로그와 같은 개념으로 보면 되는데, 장애가 나더라도, tablet log가 남아 있기 때문에 이를 통해서 디스크에 쓰여지지 않은 데이타를 복구할 수 있게 된다.




데이타 로그를 쓰고 나면, 실제 데이타는 memtable 이라는 메모리 기반의 중간 저장소에 저장이 되고, 이 memtable이 꽉차게 되면, 데이타를 SSTable로 플러슁하고, tablet log에 있는 데이타를 지우게 된다. 이 과정을 Minor compaction이라고 한다.


읽기 연산이 들어오면, 먼저 memtable을 뒤져보고, 없을 경우 SSTable을 뒤져서 데이타를 읽게되는데, SSTable은 물리적으로 다음과 같은 모양을 하고 있다.

name,address,gender 라는 컬럼은 실제 SSTable 내에서 각 셀단위로 쪼게 져서 셀단위로 row key와 컬럼패밀리, 컬럼 명을 키로 하고, 그 안에 값을 저장한다. 만약에 같은 키의 셀을 업데이트 하더라도 그 데이타 셀을 업데이트 하는것이 아니라 새로운 시간 timestamp를 달아서, append 하는 방식으로 데이타를 저장한다.


계속 append 만하면, 저장 공간이 부족해지기 때문에, 어느 일정 시간이 되면 오래된 데이타를 지워야 하는데 이를  compaction이라고 하며 주기적으로 이 작업이 일어나게 된다.

핫스팟

아키텍쳐를 이해하면, 데이타가 어떻게 분산되는지를 이해할 수 있는데, 그래서 생기는 문제가 HOTKEY라는 문제가 발생한다.

예를 들어 주민등록 번호를 로우키로 사용하는 서비스가 있는데 98년생~08년생들에게 특히 인기가 있다고 하면, 그 키 범위내에 데이타가 다른 연령대에 비해서 많을 것이고, 98~08년 로우키 범위를 담당하는 노드에 부하가 많이 갈것이기 때문에 제대로 된 성능을 내기 어려워진다. 이와 같이 특정 로우키범위에 데이타가 볼리는 곳을 핫스팟이라고 하는데, 이를 방지하기 위해서는 키의 값을 UUID와 같은 랜덤 스트링이나 해쉬값등을 사용하여 전체적으로 분포가 골고를 키를 사용하는 것이 좋다.

구글 빅테이블

구글의 빅테이블은 HBase의 원조가 되는 서비스로, 구글 내부에서 지메일과 광고플랫폼등 여러 분야에 사용되고 있으며, 현존하는 단일 데이타베이스 시스템중 가장 큰 데이타 시스템이다.

개발 초기 당시에는 GFS (하둡파일 시스템 HDFS의 전신)을 사용하였으나, 콜러서스라는 고속 파일 시스템으로 변경하면서 매우 빠른 성능을 낼 수 있게 되었다.

구글 빅테이블은 구글 클라우드 (http://cloud.google.com)을 통해서 서비스가 제공되며, HBase API와 호환이 되기 때문에, 별도의 변경 없이 기존 HBase 애플리케이션 및 HBase 관련 도구를 사용할 수 있다는 장점이 있다.

성능은 HBase에 비해서, 초당 처리 성능은 대략 2.5배, 응답 시간은 50배 정도 빠르다.


(성능 비교 자료 http://www.i-programmer.info/news/197-data-mining/8594-google-cloud-bigtable-beta.html)


수십 페타의 데이타를 저장하더라도 일반적인 읽기나 쓰기의 경우 한자리 ms (~9ms)내의 응답성을 보장하기 때문에 빅데이타 핸들링에 매우 유리하며, 안정적인 구조로 서비스가 가능하다. 빠른 응답 시간 때문에 앞단에 캐쉬 서버를 두지 않아도 되서 전체 시스템 아키텍쳐를 단순화할 수 있는 장점을 가지고 있다.


빅테이블의 내부 아키텍쳐는 다음과 같은 모양으로 되어 있다.





성능 저하 없는 안정적인 확장

HBase와 유사한 구조이지만, 큰 특징이 데이타 노드와 연산노드 (Bigtable node)가 물리적으로 분리되어 있고, 하단의 파일시스템이 무제한 확장 구조를 갖는 구조를 가지고 있다.


즉 무슨 이야기인가 하면 데이타 파일이 콜로서스 파일 시스템 내에 아래와 같이 배치 되어 있고, 연산 노드는 자기가 관리한 SSTable 파일을 포인팅 하는 구조로 되어 있다.


이게 무슨 장점을 가지냐 하면, 보통 NoSQL이나 분산 시스템은 리밸런성이라는 작업을 하게 되는데, 데이타가 점점 쌓여가면, 그 연산노드와 데이타 노드에 데이타가 부하가 몰리기 때문에 그 데이타 청크를 나눠야 하는 일이 발생을 하게 되는데 이를 리밸런싱이라고 한다. 이때 물리적으로 데이타 노드의 데이타를 다른 노드로 이동해야 하기 때문에, 이 이동/복사 작업이 부하를 일으켜서 전체 시스템의 성능에 영향을 주게 된다.


아래 그림을 예로 들어보자




첫번째 노드에서 데이타 파티션, A,B,C를 관리하고 있다가 데이타가 많아져서 C를 두번째 노드로 이동하고자 하면, C데이타를 물리적으로 복사하여 두번째 노드의 저장소로 옮겨줘야 한다.

그러나 빅테이블의 경우 콜로서스에 파일이 공유 스토리지 형식으로 저장되어 있기 때문에, 물리적인 파일 이동은 필요없고,  연산 노드에서 어떤 데이타 파티션을 처리할지에 대한 포인터만 변경해주면 된다.



이런 이유 때문에, 내부적으로 리밸런싱이 일어나더라도, 리밸런싱에 의한 성능 저하나 충격이 전혀 없다는 장점을 가지고 있다.

마찬가지 원리로 연산 노드가 추가되거나 삭제될때도 HBase의 경우에는 리밸런싱에 의한 실제 데이타 이동 부하가 발생하는데, 빅테이블의 경우에는 새로운 노드를 추가하고, 각 노드에서 처리하는 데이타 포인트만 변경하면 되기 때문에, 성능 저하 없이 안정적으로 확장이 가능하다.


다음글에서는 HBase 를 로컬에 설치하는 방법과, 구글 클라우드 빅테이블을 설정하는 방법을 알아보고 CLI 명령을 이용하여 간단한 테이블 생성 및 데이타를 조작하는 방법에 대해서 알아보겠다.



글은 제가 텐서플로우와 딥러닝을 공부하면서 블로그에 메모해놨던 내용을 모아놓은 글입니다.

혼자 공부하면서 어려웠던 점도 있었기 때문에, 저처럼 텐서플로우와 딥러닝을 공부하시는 분들께 도움이 되고자 자료를 공개합니다.

텐서플로우 초기버전부터 작성하였기 때문에, 다소 코드가 안맞는 부분이 있을 있으니 양해 부탁드리며, 글은 개인이 스터디용으로 자유롭게 사용하실 있으며, 단체나 기타 상용 목적으로 사용은 금지 됩니다.


머신러닝 이북-수포자를 위한 머신러닝.pdf.zip


혹시 이 교재로 공부하시다가 잘못된 부분을 수정하셨으면 다른분들을 위해서 친절하게 댓글을 달아주시면 감사하겠습니다.


그리고 오프라인 스터디 그룹을 진행하시는 분들을 위해서 지원을 해드립니다.

  • 발표용 프리젠테이션 파일
  • 실습 자료
  • 온라인 실습용 https://google.qwiklabs.com/catalog 토큰
스터디 지원을 위해서는 
1. https://www.facebook.com/groups/googlecloudkorea/ 구글 클라우드 사용자 그룹에 가입 하신후
2. https://www.meetup.com/GDG-Cloud-Korea 에 가입하신 후에, 스터디 모임을 매주 진행하실때 마다 밋업을 여시면 됩니다.
그후에, 저한테 페이스북으로 연락 주시면 https://www.facebook.com/terry.cho.7 제가 자료와 함께 실습 토큰 (무료 크레딧)을 제공해 드립니다.


오토 인코더를 이용한 신용카드 비정상 거래 검출 

#3 학습 데이타 전처리


조대협 (http://bcho.tistory.com)




앞의 글들 (http://bcho.tistory.com/1198 http://bcho.tistory.com/1197 ) 에서 신용카드 이상 검출을 하기 위한 데이타에 대한 분석과, 오토 인코더에 대한 기본 원리 그리고 오토 인코더에 대한 샘플 코드를 살펴보았다.


이제 실제 모델을 만들기에 앞서 신용카드 거래 데이타를 학습에 적절하도록 전처리를 하도록한다.

데이타양이 그리 크지 않기 때문에, 데이타 전처리는 파이썬 데이타 라이브러리인 pandas dataframe을 사용하였다. 여기서 사용된 전처리 코드는 https://github.com/bwcho75/tensorflowML/blob/master/autoencoder/creditcard_fraud_detection/2.data_normalization.ipynb 에 공개되어 있다.


데이타 전처리 과정

신용카드 거래 데이타를 머신러닝 학습의 검증과 테스트에 적절하도록 다음과 같은 절차를 통하여 데이타를 전처리하여 CSV 파일로 저장하였다.

데이타 정규화

학습 데이타에 여러가지 피쳐를 사용하는데, 예를 들어 피쳐 V1의 범위가 -10000~10000이고, 피쳐 V2의 범위가 10~20 이라면, 각 피쳐의 범위가 차이가 매우 크기 때문에, 경사 하강법등을 이용할때, 학습 시간이 더디거나 또는 제대로 학습이 되지 않을 수 있다. 자세한 내용은 김성훈 교수님의 모두를 위한 딥러닝 강좌중 정규화 부분  https://www.youtube.com/watch?v=1jPjVoDV_uo&feature=youtu.be 을 참고하기 바란다.

그래서 피쳐의 범위를 보정(정규화)하여 학습을 돕는 과정을 데이타 정규화라고 하는데, 정규화에는 여러가지 방법이 있다. 여기서 사용한 방법은 Fearture scaling이라는 방법으로, 모든 피쳐의 값들을 0~1사이로 변환하는 방법이다. 위에서 언급한 V1은 -10000~10000의 범위가 0~1사이로 사상되는 것이고, V2도 10~20의 범위가 0~1사이로 사상된다.

공식은 아래와 같은데



참고 https://en.wikipedia.org/wiki/Normalization_(statistics)


정규화된 값은 = (원본값 - 피쳐의 최소값) / (피쳐의 최대값 - 피쳐의 최소값)


으로 계산한다.

앞의 V1값에서 0의 경우는 (0 - (-10000)) / (10000 - (-10000)) = 0.5 로 사상이 되는것이다.


그러면 신용카드 데이타에서 V1~V28 컬럼을 Feature scaling을 위해서 정규화를 하려면

df_csv = pd.read_csv('./data/creditcard.csv')

CSV에서 원본 데이타를 읽는다.

읽어드린 데이타의 일부를 보면 다음과 같다.


df_csv 는 데이타의 원본값을 나타내고,  df_csv.min() 각 컬럼의 최소값, df_csv.max()는 각 컬럼의 최대값을 나타낸다. 이 값들을 이용하여 위의 Feature Scaling 공식으로 구현하면 아래와 같이 된다


df_norm = (df_csv - df_csv.min() ) / (df_csv.max() - df_csv.min() )


이렇게 정규화된 값을 출력해보면 다음과 같다.




V1 컬럼의 -1.359807이 정규화후에 0.935192 로 변경된것을 확인할 수 있고 다른 필드들도 변경된것을 확인할 수 있다.

데이타 분할

전체 데이타를 정규화 하였으면 데이타를 학습용, 검증용, 테스트용 데이타로 나눠야 하는데, 오토 인코더의 원리는 정상적인 데이타를 학습 시킨후에, 데이타를 넣어서 오토인코더가 학습되어 있는 정상적인 패턴과 얼마나 다른가를 비교하는 것이기 때문에 학습 데이타에는 이상거래를 제외하고 정상적인 거래만으로 학습을 한다.

이를 위해서 먼저 데이타를 정상과 비정상 데이타셋 두가지로 분리한다.

아래 코드는 Class=1이면 비정상, Class=0이면 정상인 데이타로 분리가 되는데, 정상 데이타는 df_norm_nonfraud에 저장하고, 비정상 데이타는 df_norm_fraud에 저장하는 코드이다.

# split normalized data by label
df_norm_fraud=df_norm[ df_norm.Class==1.0] #fraud
df_norm_nonfraud=df_norm[ df_norm.Class==0.0] #non_fraud


정상 데이타를 60:20:20 비율로 학습용, 테스트용, 검증용으로 나누고, 비정상 데이타는 학습에는 사용되지 않고 테스트용 및 검증용에만 사용되기 때문에, 테스트용 및 검증용으로 50:50 비율로 나눈다.


# split non_fraudfor 60%,20%,20% (training,validation,test)
df_norm_nonfraud_train,df_norm_nonfraud_validate,df_norm_nonfraud_test = \
   np.split(df_norm_nonfraud,[int(.6*len(df_norm_nonfraud)),int(.8*len(df_norm_nonfraud))])


numpy의 split 함수를 쓰면 쉽게 데이타를 분할 할 수 있다. [int(.6*len(df_norm_nonfraud)),int(.8*len(df_norm_nonfraud))] 가 데이타를 분할하는 구간을 정의하는데,  데이타 프레임의 60%, 80% 구간을 데이타 분할 구간으로 하면 0~60%, 60~80%, 80~100% 구간 3가지로 나누어서 데이타를 분할하여 리턴한다. 같은 방식으로 아래와 같이 비정상 거래 데이타도 50% 구간을 기준으로 하여 두 덩어리로 데이타를 나눠서 리턴한다.


# split fraud data to 50%,50% (validation and test)
df_norm_fraud_validate,df_norm_fraud_test = \
   np.split(df_norm_fraud,[int(0.5*len(df_norm_fraud))])

데이타 합치기

다음 이렇게 나눠진 데이타를 테스트용 데이타는 정상과 비정상 거래 데이타를 합치고, 검증용 데이타 역시 정상과 비정상 거래를 합쳐서 각각 테스트용, 검증용 데이타셋을 만들어 낸다.

두개의 데이타 프레임을 합치는 것은 아래와 같이 .append() 메서드를 이용하면 된다.


df_train = df_norm_nonfraud_train.sample(frac=1)
df_validate = df_norm_nonfraud_validate.append(df_norm_fraud_validate).sample(frac=1)
df_test = df_norm_nonfraud_test.append(df_norm_fraud_test).sample(frac=1)

셔플링

데이타를 합치게 되면, 테스트용과 검증용 데이타 파일에서 처음에는 정상데이타가 나오다가 뒷부분에 비정상 데이타가 나오는 형태가 되기 때문에 테스트 결과가 올바르지 않을 수 있는 가능성이 있다. 그래서, 순서를 무작위로 섞는 셔플링(Shuffling) 작업을 수행한다.

셔플링은 위의 코드에서 .sample(frac=1)에 의해서 수행되는데, .sample은 해당 데이타 프레임에서 샘플 데이타를 추출하는 명령으로 frac은 샘플링 비율을 정의한다 1이면 100%로, 전체 데이타를 가져오겠다는 이야기 인데, sample()함수는 데이타를 가지고 오면서 순서를 바꾸기 때문에, 셔플링된 결과를 리턴하게 된다.


전체 파이프라인을 정리해서 도식화 해보면 다음과 같다.


다음글에서는 이렇게 정재된 데이타를 가지고 학습할 오토인코더 모델을 구현해보도록 한다.


오토인코더를 이용한 비정상 거래 검출 모델의 구현 #1

신용카드 거래 데이타 분석


조대협 (http://bcho.tistory.com)


이미지 인식 모델은 만들어봤고, 아무래도 실제로 짜봐야 하는지라 좋은 시나리오를 고민하고 있는데, 추천 시스템도 좋지만, 이상 거래 감지에 대해 접할 기회가 있어서 이상 거래 감지 (Fraud Detection System)  시스템을 만들어 보기로 하였다


데이타셋

샘플 데이타를 구해야 하는데, 마침 kaggle.com 에 크레딧 카드 이상거래 감지용 데이타가 있었다.

https://www.kaggle.com/dalpozz/creditcardfraud 에서 데이타를 다운 받을 수 있다.




CSV 형태로 되어 있으며, 2013년 유럽 카드사의 실 데이타 이다. 2일간의 데이타 이고, 총 284,807건의 트렌젝션 로그중에, 492건이 비정상 데이타이고, 데이타 분포는 비정상 데이타가 0.172%로 심하게 불균형적이다.


전체 31개의 컬럼중, 첫번째 컬럼은 시간,30번째 컬럼은 비정상 거래 유무 (1이면 비정상, 0이면 정상) 그리고 마지막 31번째 컬럼은 결재 금액을 나타낸다 2~29번째 컬럼이 특징 데이타 인데, V1~V28로 표현되고 데이타 컬럼명은 보안을 이유로 모두 삭제 되었다.


데이타 분석

어떤 컬럼들을 피쳐로 정할것인가를 결정하기 위해서 데이타 분석을 시작한다.

데이타 분석 방법은  https://www.kaggle.com/currie32/predicting-fraud-with-tensorflow 를 참고하였다.


시간대별 트렌젝션양을 분석해보면 별다른 상관 관계를 찾을 수 없다.


트렌젝션 금액별로 비교를 한 그림이다.


위의 비정상 데이타를 보면, 작은 금액에서 비정상 거래가 많이 일어난것을 볼 수 있지만, 정상 거래군과 비교를 해서 다른 특징을 찾아낼 수 없다.


다음은 트랜젝션 금액을 기준으로 V1~V28 피쳐를 비교 분석해봤다.


붉은 점은 비정상, 파란점이 정상 거래이고, 가로축이 금액, 새로축이 V1 값이다. 이런 방법으로 V1~V8에 대한 그래프를 그려봤으나, 비정상 거래가 항상 정상거래의 부분집합형으로 별다른 특이점을 찾아낼 수 없었다.


다음으로 V1~V28 각 컬럼간의 값 분포를 히스토 그램으로 표현한 결과이다.

아래는 V2 피쳐의 값을 히스토그램으로 표현한 결과로 파란색이 정상, 붉은 색이 비정상 거래인데, 히스토그램이 차이가 나는 것을 확인할 수 있다.


V4 피쳐 역시 아래 그림과 같이 차이가 있는 것을 볼 수 있다.


V22 피쳐의 경우에는 정상과 비정상 거래의 패턴이 거의 유사하여 변별력이 없는것을 볼 수 있다.



이런식으로, V1~V28중에 비정상과 정상거래에 차이를 보이는 피쳐들만 선정한다.

위의 그래프들은 생성하는 코드는 https://github.com/bwcho75/tensorflowML/blob/master/autoencoder/Credit%20card%20fraud%20detection%20(Data%20Analytics).ipynb 에 있다.


모델 선택

정상거래와 비정상 거래가 라벨링이 되어 있기 때문에, 로지스틱 회귀나 일반적인 뉴럴네트워크를 사용해도 되지만, 비정상 거래 검출 로직의 경우 비정상 거래를 분별해서 라벨링한 데이타를 구하기가 매우 어렵다.

그래서 라벨된 데이타를 전제로 하는 지도학습보다 비지도학습 알고리즘을 선택하기로 한다.


비지도 학습 모델 중에서 오토 인코더라는 모델을 사용할 예정이다.

오토인코더 (AutoEncoder)

오토 인코더는 딥네트워크 기반의 비지도 학습 모델로, 뉴럴네트워크 두개를 뒤집어서 붙여놓은 형태이다.





<그림 출처 : https://deeplearning4j.org/deepautoencoder >

앞에 있는 뉴럴네트워크는 인코더, 뒤에 붙은 네트워크는 디코더가 된다.

인코더를 통해서 입력 데이타에 대한 특징을 추출해내고, 이 결과를 가지고 뉴럴 네트워크를 역으로 붙여서 원본 데이타를 생성해낸다.




이 과정에서 입력과 출력값이 최대한 같아지도록 튜닝함으로써, Feature를 잘 추출할 수 있게 하는것이 오토 인코더의 원리이다.


비정상 거래 검출에 있어서 이를 활용하는 방법은 학습이 되지 않은 데이타의 경우 디코더에 의해 복원이 제대로 되지 않고 원본 데이타와 비교했을때 차이값이 크기 때문에, 정상 거래로 학습된 모델은 비정상 거래가 들어왔을때 결과값이 입력값보다 많이 다를것이라는 것을 가정한다.


그러면 입력값 대비 출력값이 얼마나 다르면 비정상 거래로 판단할것인가에 대한 임계치 설정이 필요한데, 이는 실제 데이타를 통한 설정이나 또는 통계상의 데이타에 의존할 수 밖에 없다. 예를 들어 전체 신용카드 거래의 0.1%가 비정상 거래라는 것을 가정하면, 입력 값들 중에서 출력값과 차이가 큰 순서대로 데이타를 봤을때 상위 0.1%만을 비정상 거래로 판단한다.


또는 비지도 학습이기 때문에, 나온 데이타로 정상/비정상을 판단하기 보다는 비정상 거래일 가능성을 염두해놓고, 그 거래들을 비정상 거래일 것이라고 예측하고 이 비정상 거래 후보에 대해서 실제 확인이나 다른 지표에 대한 심층 분석을 통해서 비정상 거래를 판별한다.


이러한 과정을 거쳐서 비정상 거래가 판별이 되면, 비정상 거래에 대한 데이타를 라벨링하고 이를 통해서 다음 모델 학습시 임계치 값을 설정하거나 다른 지도 학습 알고리즘으로 변경하는 방법등을 고민해볼 수 있다.


다음글에서는 실제로 오토인코더 모델을 텐서플로우를 이용해서 구현해보겠다.


텐서플로우 하이레벨 API Estimator를 이용한 모델 정의 방법


조대협 (http://bcho.tistory.com)


텐서플로우의 하이레벨 API를 이용하기 위해서는 Estimator 를 사용하는데, Estimator 는 Predefined model 도 있지만, 직접 모델을 구현할 수 있다. 하이레벨 API와 Estimator에 대한 설명은 http://bcho.tistory.com/1195 글을 참고하기 바란다.


이 문서는 Custom Estimator를 이용하여 Estimator를 구현하는 방법에 대해서 설명하고 있으며, 대부분 https://www.tensorflow.org/extend/estimators 의 내용을 참고하여 작성하였다.

Custom Estimator

Estimator의 스켈레톤 코드는 다음과 같다. 모델을 정의하는 함수는 학습을 할 feature와, label을 입력 받고, 모델의 모드 (학습, 테스트, 예측) 모드를 인자로 받아서 모드에 따라서 모델을 다르게 정의할 수 있다. 예를 들어 학습의 경우 드롭 아웃을 사용하지만 테스트 모드에서는 드롭 아웃을 사용하지 않는다.

def model_fn(features, labels, mode, params):
  # Logic to do the following:
  # 1. Configure the model via TensorFlow operations
  # 2. Define the loss function for training/evaluation
  # 3. Define the training operation/optimizer
  # 4. Generate predictions
  # 5. Return predictions/loss/train_op/eval_metric_ops in EstimatorSpec object
  return EstimatorSpec(mode, predictions, loss, train_op, eval_metric_ops)

입력 인자에 대한 설명

그러면 각 인자를 구체적으로 살펴보자

  • features : input_fn을 통해서 입력되는 feature로 dict 형태가 된다.

  • labels : input_fn을 통해서 입력되는 label 값으로 텐서 형태이고, predict (예측) 모드 일 경우에는 비어 있게 된다.

  • mode : 모드는 모델의 모드로, tf.estimator.ModeKeys 중 하나를 사용하게 된다.

    • tf.estimator.ModeKeys.TRAIN : 학습 모드로 Estimator의 train()을 호출하였을 경우 사용되는 모드이다.

    • tf.estimator.ModeKeys.EVAL : 테스트 모드로, evaluate() 함수를 호출하였을 경우 사용되는 모드이다.

    • tf.estimator.ModeKeys.PREDICT : 예측모드로,  predict() 함수를 호출하였을 경우에 사용되는 모드이다.  

  • param : 추가적으로 입력할 수 있는 패러미터로, dict 포맷을 가지고 있으며, 하이퍼 패러미터등을 이 변수를 통해서 넘겨 받는다.

Estimator 에서 하는 일

Estimator 를 구현할때, Estimator 내의 내용은 모델을 설정하고, 모델의 그래프를 그린 다음에, 모델에 대한 loss 함수를 정의하고, Optimizer를 정의하여 loss 값의 최소값을 찾는다. 그리고 prediction 값을 계산한다.


Estimator의 리턴값

Estimator에서 리턴하는 값은 tf.estimator.EstimatorSpec 객체를 리턴하는데, 이 객체는 다음과 같은 값을 갖는다.

  • mode : Estimator가 수행한 모드. 보통 입력값으로 받은 모드 값이 그대로 리턴된다.

  • prediction (PREDICT 모드에서만 사용됨) : PREDICT 모드에서 예측을 수행하였을 경우, 예측된 값을 dict 형태로 리턴한다.

  • loss (EVAL 또는, TRAIN 모드에서 사용됨) : 학습과 테스트중에 loss 값을 리턴한다.

  • train_op (트레이닝 모드에서만 필요함) : 한 스텝의 학습을 수행하기 위해서 호출하는 함수를 리턴한다. 보통 옵티마이져의  minimize()와 같은 함수가 사용된다.
           optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
           train_op = optimizer.minimize(loss, global_step=global_step)
           return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

  • eval_metrics_ops (optional) : EVAL (테스트) 모드에서 테스트를 위해서 사용된 인자들을 dict 형태로 리턴한다. tf.metrics에는 미리 정의된 일반적인 메트릭들이 정의되어 있는데, 예를 들어 accuracy 등이 이에 해당한다. 아래는 tf.metrics.accuracy를 이용하여 예측값 (predictions)과 라벨(labels)의 값을 계산하여, 메트릭으로 리턴하는 방법이다.

    eval_metric_ops = {
    "accuracy": tf.metrics.accuracy(labels, predictions) }

    만약 rmse를 evaluation metric으로 사용하고자 하면 다음과 같이 정의한다.
    eval_metric_ops = {
       "rmse": tf.metrics.root_mean_squared_error(
           tf.cast(labels, tf.float64), predictions)
    }

    만약에 별도의 메트릭을 정의하지 않으면, 디폴트로 loss 값만 EVAL 단계에서 계산되게 된다.

데이타 입력 처리

모델로의 데이타 입력은 Esitmator의 모델 함수로 입력되는 features 변수를 통해서 입력 된다.

features는 컬럼명으로된 키와, 컬럼 값으로 이루어진 dict 형태의 데이타 형으로, 뉴럴 네트워크 모델에 데이타를 입력하기 위해서는 이중에서 학습에 사용할 컬럼만을 추출하여, 입력 레이어에 넣어 줘야 한다.

이 features 에서 특정 컬럼만을 지정하여 추출한 후에, 그 컬럼의 값을 넣어주는 것은 tf.feature_column.input_layer 함수를 사용하면 된다.


예제를 보자

input_layer = tf.feature_column.input_layer(
 features=features, feature_columns=[age, height, weight])


위의 예제는 features 에서 age,height,weight 컬럼을 추출하여 input layer로 넣는 코드이다.

네트워크 정의

데이타를 읽었으면 이제 뉴럴네트워크를 구성해야 한다. 네트워크의 레이어는 tf.layers 로 간단하게 구현할 수 있다. tf.layer에는 풀링,드롭아웃,일반적인 뉴럴네트워크의 히든 레이어, 컨볼루셔널 네트워크들이 함수로 구현되어 있기 때문에 각 레이어를 하나의 함수로 간단하게 정의가 가능하다.


아래는 히든레이어를 구현하는 tf.layers.dense 함수이다.


tf.layers.dense( inputs, units, activation)


  • inputs는 앞의 레이어를 정의하고

  • units는 이 레이어에 크기를 정의하고

  • 마지막으로 activation은 sigmoid나,ReLu와 같은 Activation 함수를 정의한다.


다음 예제는 5개의 히든 레이어를 가지는 오토 인코더 네트워크를 정의한 예이다.

 input_layer = features['inputs'] # 784 pixels
   dense1 = tf.layers.dense(inputs=input_layer, units=256, activation=tf.nn.relu)
   dense2 = tf.layers.dense(inputs=dense1, units=128, activation=tf.nn.relu)
   dense3 = tf.layers.dense(inputs=dense2, units=16, activation=tf.nn.relu)
   dense4 = tf.layers.dense(inputs=dense3, units=128, activation=tf.nn.relu)
   dense5 = tf.layers.dense(inputs=dense4, units=256, activation=tf.nn.relu)
   output_layer = tf.layers.dense(inputs=dense5, units=784, activation=tf.nn.sigmoid)


5개의 히든 레이어는 각각 256,128,16,128,256 개의 노드를 가지고 있고, 각각 ReLu를 Activation 함수로 사용하였다.

그리고 마지막 output layer는 784개의 노드를 가지고 sigmoid 함수를 activation 함수로 사용하였다.

Loss 함수 정의

다음 모델에 대한 비용함수(loss/cost function)을 정의한다. 이 글을 읽을 수준이면 비용함수에 대해서 별도로 설명하지 않아도 되리라고 보는데, 비용함수는 예측값과 원래 라벨에 대한 차이의 합을 나타내는 것이 비용함수이다.


 # Connect the output layer to second hidden layer (no activation fn)

 output_layer = tf.layers.dense(second_hidden_layer, 1)
 # Reshape output layer to 1-dim Tensor to return predictions
 predictions = tf.reshape(output_layer, [-1])
 predictions_dict = {"ages": predictions}

 # Calculate loss using mean squared erro
 loss = tf.losses.mean_squared_error(labels, predictions)

코드를 보면, 최종 예측된 값은 predictions에 저장되고, 학습 데이타로 부터 받은 라벨 값은 labels에 저장된다. 이 차이를 계산할때, MSE (mean square error)를 사용하였다.

Training Op 정의

비용 함수가 적용되었으면, 이 비용함수의 값을 최적화 하는 것이 학습이기 때문에, 옵티마이저를 정의하고, 옵티마이저를 이용하여 비용함수의 최적화가 되도록 한다.

아래 코드는  Optimizer를 GradientDescentOptimizer로 정의하고, 이 옵티마이저를 이용하여 이용하여 loss 값을 최소화 하도록 하였다.

optimizer = tf.train.GradientDescentOptimizer(
   learning_rate=params["learning_rate"])

train_op = optimizer.minimize(
   loss=loss, global_step=tf.train.get_global_step())

전체 코드

그러면 위의 내용을 모두 합쳐서 model_fn으로 모아서 해보자.

def model_fn(features, labels, mode, params):
 """Model function for Estimator."""
 # Connect the first hidden layer to input layer
 # (features["x"]) with relu activation
 first_hidden_layer = tf.layers.dense(features["x"], 10, activation=tf.nn.relu)

 # Connect the second hidden layer to first hidden layer with relu
 second_hidden_layer = tf.layers.dense(
     first_hidden_layer, 10, activation=tf.nn.relu)

 # Connect the output layer to second hidden layer (no activation fn)
 output_layer = tf.layers.dense(second_hidden_layer, 1)


 # Reshape output layer to 1-dim Tensor to return predictions
 predictions = tf.reshape(output_layer, [-1])

 # Provide an estimator spec for `ModeKeys.PREDICT`.
 if mode == tf.estimator.ModeKeys.PREDICT:
   return tf.estimator.EstimatorSpec(
       mode=mode,
       predictions={"ages": predictions})

 # Calculate loss using mean squared error
 loss = tf.losses.mean_squared_error(labels, predictions)

 # Calculate root mean squared error as additional eval metric
 eval_metric_ops = {
     "rmse": tf.metrics.root_mean_squared_error(
         tf.cast(labels, tf.float64), predictions)
 }

 optimizer = tf.train.GradientDescentOptimizer(
  learning_rate=params["learning_rate"])

 train_op = optimizer.minimize(
     loss=loss, global_step=tf.train.get_global_step())

 # Provide an estimator spec for `ModeKeys.EVAL` and `ModeKeys.TRAIN` modes.

 return tf.estimator.EstimatorSpec(
     mode=mode,
     loss=loss,
     train_op=train_op,
     eval_metric_ops=eval_metric_ops)

데이타 입력

 first_hidden_layer = tf.layers.dense(features["x"], 10, activation=tf.nn.relu)

네트워크 정의

 # Connect the second hidden layer to first hidden layer with relu
 second_hidden_layer = tf.layers.dense(
     first_hidden_layer, 10, activation=tf.nn.relu)

 # Connect the output layer to second hidden layer (no activation fn)
 output_layer = tf.layers.dense(second_hidden_layer, 1)

first_hidden_layer의 입력값을 가지고 네트워크를 구성한다. 두번째 레이어는 first_hidden_layer를 입력값으로 하여, 10개의 노드를 가지고, ReLu를 activation 레이어로 가지도록 하였다.  

마지막 계층은 두번째 계층에서 나온 결과를 하나의 노드를 이용하여 합쳐서 activation 함수 없이 결과를 냈다.

 # Reshape output layer to 1-dim Tensor to return predictions
 predictions = tf.reshape(output_layer, [-1])

 # Provide an estimator spec for `ModeKeys.PREDICT`.
 if mode == tf.estimator.ModeKeys.PREDICT:
   return tf.estimator.EstimatorSpec(
       mode=mode,
       predictions={"ages": predictions})

예측 모드에서는 prediction 값을 리턴해야 하기 때문에, 먼저 예측값을 output_layer에서 나온 값으로, 행렬 차원을 변경하여 저장하고, 만약에 예측 모드 tf.estimator.ModeKeys.PREDICT일 경우 EstimatorSpec에 predction 값을 넣어서 리턴한다. 이때 dict 형태로 prediction 결과 이름을 age로 값을 predictions 값으로 채워서 리턴한다.

Loss 함수 정의

다음 비용 함수를 정의하고, 테스트 단계(EVAL)에서 사용할 evaluation metrics에 rmse를 테스트 기준으로 메트릭으로 정의한다.

 # Calculate loss using mean squared error
 loss = tf.losses.mean_squared_error(labels, predictions)

 # Calculate root mean squared error as additional eval metric
 eval_metric_ops = {
     "rmse": tf.metrics.root_mean_squared_error(
         tf.cast(labels, tf.float64), predictions)
 }

Training OP 정의

비용 함수를 정했으면, 비용 함수를 최적화 하기 위한 옵티마이져를 정의한다. 아래와 같이 GradientDescentOptimzer를 이용하여 loss 함수를 최적화 하도록 하였다.

 optimizer = tf.train.GradientDescentOptimizer(
  learning_rate=params["learning_rate"])

 train_op = optimizer.minimize(
     loss=loss, global_step=tf.train.get_global_step())

 # Provide an estimator spec for `ModeKeys.EVAL` and `ModeKeys.TRAIN` modes.

마지막으로, PREDICTION이 아니고, TRAIN,EVAL인 경우에는 EstimatorSpec을 다음과 같이 리턴한다.

Loss 함수와, Training Op를 정의하고 평가용 매트릭스를 정의하여 리턴한다.

 return tf.estimator.EstimatorSpec(
     mode=mode,
     loss=loss,
     train_op=train_op,
     eval_metric_ops=eval_metric_ops)

실행

그러면 완성된 Estimator를 사용해보자

train_input_fn = tf.estimator.inputs.numpy_input_fn(
   x={"x": np.array(training_set.data)},
   y=np.array(training_set.target),
   num_epochs=None,
   shuffle=True)

# Train

nn.train(input_fn=train_input_fn, steps=5000)

# Score accuracy

test_input_fn = tf.estimator.inputs.numpy_input_fn(
   x={"x": np.array(test_set.data)},
   y=np.array(test_set.target),
   num_epochs=1,
   shuffle=False)

ev = nn.evaluate(input_fn=test_input_fn)
print("Loss: %s" % ev["loss"])
print("Root Mean Squared Error: %s" % ev["rmse"])

각 코드를 보면

train_input_fn = tf.estimator.inputs.numpy_input_fn(
   x={"x": np.array(training_set.data)},
   y=np.array(training_set.target),
   num_epochs=None,
   shuffle=True)

를 이용하여 numpy 의 데이타로 input_fn 함수를 만들었다. training_set.data는 학습 데이타, training_set.target을 학습용 라벨로 설정하고, epoch는 무제한, 그리고 데이타는 셔플 하도록 하였다.

nn.train(input_fn=train_input_fn, steps=5000)

앞서 정의된 모델에 train_input_fn을 넣어서 총 5000 번 학습을 하도록 하였다.

학습이 끝난 모델을 테스트 해야 하는데, 같은 방법으로 test_input_fn을 정의하고

ev = nn.evaluate(input_fn=test_input_fn)

evaluate를 이용하여, 학습된 모델을 평가한다.

평가된 결과를 보기 위해서 loss 값과 rmse 값을 ev[‘loss’], ev[‘rmse’]로 출력하였다.

지금까지 Estimator를 만드는 방법에 대해서 알아보았다. 다음 글에서는 Auto Encoder 네트워크를 Estimator로 구현해보도록 하겠다.





Object Detection API를 이용하여 커스텀 데이타 학습하기

얼굴인식 모델 만들기


조대협 (http://bcho.tistory.com)


이번글에서는 Tensorflow Object Detection API를 이용하여 직접 이미지를 인식할 수 있는 방법에 대해서 알아보자. 이미 가지고 있는 데이타를 가지고 다양한 상품에 대한 인식이나, 사람 얼굴에 대한 인식 모델을 머신러닝에 대한 전문적인 지식 없이도 손쉽게 만들 수 있다.


Object Detection API 설치

Object Detection API 설치는 http://bcho.tistory.com/1193http://bcho.tistory.com/1192 에서 이미 다뤘기 때문에 별도로 언급하지 않는다.

학습용 데이타 데이타 생성 및 준비

Object Detection API를 학습 시키기 위해서는 http://bcho.tistory.com/1193 예제와 같이 TFRecord 형태로 학습용 파일과 테스트용 파일이 필요하다. TFRecord 파일 포맷에 대한 설명은 http://bcho.tistory.com/1190 를 참고하면 된다.


이미지 파일을 TFRecord로 컨버팅하는 전체 소스 코드는 https://github.com/bwcho75/objectdetection/blob/master/custom/create_face_data.py 를 참고하기 바란다.

구글 클라우드 VISION API를 이용하여,얼굴이 있는지 여부를 파악하고, 얼굴 각도가 너무 많이 틀어진 경우에는 필터링 해낸후에,  얼굴의 위치 좌표를 추출하여 TFRecord 파일에 쓰는 흐름이다.

VISION API를 사용하기 때문에 반드시 서비스 어카운트 (Service Account/JSON 파일)를 구글 클라우드 콘솔에서 만들어서 설치하고 실행하기 바란다.


사용 방법은

python create_face_data.py {이미지 소스 디렉토리} {이미지 아웃풋 디렉토리} {TFRECORD 파일명}


형태로 사용하면 된다.

예) python ./custom/create_face_data.py /Users/terrycho/trainingdata_source /Users/terrycho/trainingdata_out


{이미지 소스 디렉토리} 구조는 다음과 같다.

{이미지 소스 디렉토리}/{라벨1}

{이미지 소스 디렉토리}/{라벨2}

{이미지 소스 디렉토리}/{라벨3}

:

예를 들어

/Users/terrycho/trainingdata_source/Alba

/Users/terrycho/trainingdata_source/Jessica

/Users/terrycho/trainingdata_source/Victoria

:

이런식이 된다.



명령을 실행하면, {이미지 아웃풋 디렉토리} 아래

  • 학습 파일은 face_training.record

  • 테스트 파일은 face_evaluation.record

  • 라벨맵은 face_label_map.pbtxt

로 생성된다. 이 세가지 파일이 Object Detection API를 이용한 학습에 필요하고 부가적으로 생성되는  csv 파일이 있는데

  • all_files.csv : 소스 디렉토리에 있는 모든 이미지 파일 목록

  • filtered_files.csv : 각 이미지명과, 라벨, 얼굴 위치 좌표 (사각형), 이미지 전체 폭과 높이

  • converted_result_files.csv : filtered_files에 있는 이미지중, 얼굴의 각도등이 이상한 이미지를 제외하고 학습과 테스트용 데이타 파일에 들어간 이미지 목록으로, 이미지 파일명, 라벨 (텍스트), 라벨 (숫자), 얼굴 좌표 (사각형) 을 저장한다.


여기서 사용한 코드는 간단한 테스트용 코드로, 싱글 쓰레드에 싱글 프로세스 모델로 대규모의 이미지를 처리하기에는 적절하지 않기 때문에, 운영환경으로 올리려면, Apache Beam등 분산 프레임웍을 이용하여 병렬 처리를 하는 것을 권장한다. http://bcho.tistory.com/1177 를 참고하기 바란다.


여기서는 학습하고자 하는 이미지의 바운드리(사각형 경계)를 추출하는 것을 VISION API를 이용해서 자동으로 했지만, 일반적인 경우는 이미지에서 각 경계를 수동으로 추출해서 학습데이타로 생성해야 한다




이런 용도로 사용되는 툴은 https://medium.com/towards-data-science/how-to-train-your-own-object-detector-with-tensorflows-object-detector-api-bec72ecfe1d9 문서에 따르면 FastAnnotationTool이나 ImageMagick 과 같은 툴을 추천하고 있다.



이렇게 학습용 파일을 생성하였으면 다음 과정은 앞의  http://bcho.tistory.com/1193 에서 언급한 절차와 크게 다르지 않다.

체크포인트 업로드

학습 데이타가 준비 되었으면 학습을 위한 준비를 하는데, 트랜스퍼 러닝 (Transfer learning)을 위해서 기존의 학습된 체크포인트 데이타를 다운 받아서 이를 기반으로 학습을 한다.

Tensorflow Object Detection API는 경량이고 단순한 모델에서 부터 정확도가 비교적 높은 복잡한 모델까지 지원하고 있지만, 복잡도가 높다고 해서 정확도가 꼭 높지는 않을 수 있다. 복잡한 모델일 수 록 학습 데이타가 충분해야 하기 때문에, 학습하고자 하는 데이타의 양과 클래스의 종류에 따라서 적절한 모델을 선택하기를 권장한다.


여기서는 faster_rcnn_inception_resnet_v2 모델을 이용했기 때문에 아래와 같이 해당 모델의 체크포인트 데이타를 다운로드 받는다.


curl -O http://download.tensorflow.org/models/object_detection/faster_rcnn_inception_resnet_v2_atrous_coco_11_06_2017.tar.gz


파일의 압축을 푼 다음 체크 포인트 파일을 학습 데이타용 Google Cloud Storage (GCS) 버킷으로 업로드 한다.

gsutil cp faster_rcnn_inception_resnet_v2_atrous_coco_11_06_2017/model.ckpt.* gs://${YOUR_GCS_BUCKET}/data/





설정 파일 편집 및 업로드

다음 학습에 사용할 모델의 설정을 해야 하는데,  object_detection/samples/configs/ 디렉토리에 각 모델별 설정 파일이 들어 있으며, 여기서는 faster_rcnn_inception_resnet_v2_atrous_pets.config 파일을 사용한다.


이 파일에서 수정해야 하는 부분은 다음과 같다.

클래스의 수

클래스 수를 정의한다. 이 예제에서는 총 5개의 클래스로 분류를 하기 때문에 아래와 같이 5로 변경하였다.

 8 model {

 9   faster_rcnn {

10     num_classes: 5

11     image_resizer {

학습 데이타 파일 명 및 라벨명

학습에 사용할 학습데이타 파일 (tfrecord)와 라벨 파일명을 지정한다.

126 train_input_reader: {

127   tf_record_input_reader {

128     input_path: "gs://terrycho-facedetection/data/face_training.record"

129   }

130   label_map_path: "gs://terrycho-facedetection/data/face_label_map.pbtxt"

131 }


테스트 데이타 파일명 및 라벨 파일명

학습후 테스트에 사용할 테스트 파일 (tfrecord)과 라벨 파일명을 지정한다

140 eval_input_reader: {

141   tf_record_input_reader {

142     input_path: "gs://terrycho-facedetection/data/face_evaluation.record"

143   }

144   label_map_path: "gs://terrycho-facedetection/data/face_label_map.pbtxt"

145   shuffle: false

146   num_readers: 1


만약에 학습 횟수(스탭)을 조정하고 싶으면 num_steps 값을 조정한다. 디폴트 설정은 20만회인데, 여기서는 5만회로 수정하였다.

117   # never decay). Remove the below line to train indefinitely.
118   # num_steps: 200000
119   num_steps: 50000
120   data_augmentation_options {
121     random_horizontal_flip {
122     }


설정 파일 수정이 끝났으면 gsutil cp 명령을 이용하여 해당 파일을 GCS 버킷에 다음과 같이 업로드 한다.

gsutil cp object_detection/samples/configs/faster_rcnn_inception_resnet_v2_atrous_pets.config gs://${YOUR_GCS_BUCKET}/data/faster_rcnn_inception_resnet_v2_atrous_pets.config

코드 패키징

models/ 디렉토리에서 다음 명령을 수행하여, 모델 코드를 패키징한다.

python setup.py sdist

(cd slim && python setup.py sdist)



학습


gcloud ml-engine jobs submit training `whoami`_object_detection_`date +%s` \

   --job-dir=gs://${YOUR_GCS_BUCKET}/train \

   --packages dist/object_detection-0.1.tar.gz,slim/dist/slim-0.1.tar.gz \

   --module-name object_detection.train \

   --region asia-east1 \

   --config object_detection/samples/cloud/cloud.yml \

   -- \

   --train_dir=gs://${YOUR_GCS_BUCKET}/train \

   --pipeline_config_path=gs://${YOUR_GCS_BUCKET}/data/faster_rcnn_resnet101_pets.config

모니터링

학습이 진행되면 텐서보드를 이용하여 학습 진행 상황을 모니터링할 수 있고, 또한 테스트 트레이닝을 수행하여, 모델에 대한 테스트를 동시 진행할 수 있다. http://bcho.tistory.com/1193 와 방법이 동일하니 참고하기 바란다.


학습을 시작하면 텐서보드를 통해서, Loss 값이 수렴하는 것을 확인할 수 있다.



결과

학습이 끝나면 텐서보드에서 테스트된 결과를 볼 수 있다. 이 예제의 경우 모델을 가장 복잡한 모델을 사용했는데 반하여, 총 5개의 클래스에 대해서 클래스당 약 40개정도의 학습 데이타를 사용했는데, 상대적으로 정확도가 낮았다. 실 서비스에서는 더 많은 데이타를 사용하기를 권장한다.



활용

학습된 모델을 활용하는 방법은 학습된 모델을 export 한후에, (Export 하는 방법은  http://bcho.tistory.com/1193 참고) export 된 모델을 로딩하여, 코드에서 불러서 사용하면 된다.

http://bcho.tistory.com/1192 참고



Object Detection API에 애완동물 사진을 학습 시켜 보자


조대협 (http://bcho.tistory.com)


Object Detection API에 이번에는 애완동물 사진 데이타를 학습시켜 보도록 한다.

애완 동물 학습 데이타의 원본은  Oxford-IIIT Pets lives  http://www.robots.ox.ac.uk/~vgg/data/pets/ 에 있다. 약 37개의 클래스에, 클래스당 200개 정도의 이미지를 가지고 있다.



이번 글에서는 이 애완동물 데이타를 다운 받아서, Object Detection API에 학습 시키는 것까지 진행을 한다.

데이타를 다운로드 받은 후, Object Detection API에 학습 시키기 위해서, 데이타 포맷을 TFRecord 형태로 변환한 후, 학습을 하는 과정을 설명한다.


주의할점 : 이 튜토리얼은 총 37개의 클래스 약 7000장의 이미지를 학습시키는데, 17시간 이상이 소요되며, 구글 클라우 CloudML의 텐서플로우 클러스터에서 분산 러닝을 하도록 설명하고 있는데, 많은 비용이 들 수 있다. 전체 흐름과 과정을 이해하기 위해서는 17시간을 풀 트레이닝 시키지 말고 학습 횟수를 줄이거나 아니면 중간에서 학습을 멈춰서 비용이 많이 나오지 않도록 하는 것을 권장한다.

학습 데이타 다운로드 받기

%curl -O http://www.robots.ox.ac.uk/~vgg/data/pets/data/images.tar.gz

%curl -O http://www.robots.ox.ac.uk/~vgg/data/pets/data/annotations.tar.gz

※ 맥이기 때문에, curl -O 를 사용했는데, Linux의 경우에는 wget을 사용하면 된다.

파일을 다운로드 받았으면 압축을 풀어보자

  • images.tar.gz에는 애완동물의 학습용 이미지가 들어가 있다.

  • annotations.tar.gz 는 각 이미지에 대한 메타 데이타가 들어있다. 이미지 마다 나타난 동물의 종류, 사진상 동물의 위치 (박스)

TFRecord 파일 포맷으로  컨버팅 하기

압축을 푼 메타데이타와 이미지 파일을 이용해서 tfrecord 파일 형태로 컨버팅을 해야 한다. Tfrecord 내에는 이미지 바이너리, 이미지에 대한 정보 (이미지 크기, 인식할 물체의 위치, 라벨)등이 들어간다. 상세 데이타 포맷에 대해서는 다음글에서 설명하도록 한다.

이 데이타를 가지고 tfrecord 타입으로 컨버팅 하는 코드는 object_detection/create_pet_tf_record.py

에 이미 작성되어 있다. 아래 코드를 이용해서 실행해주면 자동으로 pet_train.record에 학습용 데이타를 pet_val.record에 테스트용 데이타를 생성해준다.


python object_detection/create_pet_tf_record.py \
   --label_map_path=object_detection/data/pet_label_map.pbtxt \
   --data_dir=`pwd` \
   --output_dir=`pwd`

학습 환경 준비하기

데이타가 준비되었으면 학습을 위한 환경을 준비해야 한다.

학습은 구글 클라우드 플랫폼의 CloudML을 사용한다. CloudML은 구글 클라우드 플랫폼의 Tensorflow managed 서비스로, Tensorflow 클러스터 설치나 운영 필요 없이 간단하게 명령어 만으로 여러대의 머신에서 학습을 가능하게 해준다.

CloudML을 사용하기 위해서는 몇가지 환경 설정을 해줘야 한다.

  • 먼저 학습용 데이타 (tfrecord)파일을 구글 클라우드 스토리지 (GCS)로 업로드 해야 한다.

  • Object Detection API에서 사물 인식에 사용된 모델의 체크 포인트를 업로드 해야 한다.

  • 클라우드에서 학습을 하기 때문에, 텐서플로우 코드를 패키징해서 업로드해야 한다.

학습 데이타 업로드 하기

데이타를 업로드하기전에, 구글 클라우드 콘솔에서 구글 클라우드 스토리지 버킷을 생성한다.

생성된 버킷명을 YOUR_GCS_BUCKET 환경 변수에 저장한다.

export YOUR_GCS_BUCKET=${YOUR_GCS_BUCKET}


다음 gsutil 유틸리티를 이용하여 YOUR_GCS_BUCKET 버킷으로 학습용 데이타와, 라벨맵 데이타를 업로드 한다.


gsutil cp pet_train.record gs://${YOUR_GCS_BUCKET}/data/pet_train.record
gsutil cp pet_val.record gs://${YOUR_GCS_BUCKET}/data/pet_val.record
gsutil cp object_detection/data/pet_label_map.pbtxt gs://${YOUR_GCS_BUCKET}/data/pet_label_map.pbtxt


학습된 모델 다운로드 받아서 업로드 하기

다음은 학습된 모델을 받아서, 그중에서 체크포인트를  GCS에 올린다.


curl -O http://storage.googleapis.com/download.tensorflow.org/models/object_detection/faster_rcnn_resnet101_coco_11_06_2017.tar.gz

tar -xvf faster_rcnn_resnet101_coco_11_06_2017.tar.gz
gsutil cp faster_rcnn_resnet101_coco_11_06_2017/model.ckpt.* gs://${YOUR_GCS_BUCKET}/data/


체크 포인트를 다운받아서 업로드 하는 이유는, 트랜스퍼 러닝 (Transfer Learning)을 하기 위함인데, 하나도 학습이 되지 않은 모델을 학습을 시키는데는 시간이 많이 들어간다. 트랜서퍼러닝은 이미 학습이 되어 있는 모델로 다른 데이타를 학습 시키는 방법인데, 사물을 인식하는 상태로 학습되어 있는 모델을 다른 물체 (여기서는 애완동물)를 학습하는데 사용하면 학습 시간을 많이 줄 일 수 있다. 이런 이유로, 사물 인식용으로 학습된 체크포인트를 로딩해서 이 체크포인트 부터 학습을 하기 위함이다.

설정 파일 변경하기

Object Detection API를 사용하기 위해서는 학습에 대한 설정 정보를 정의해야 한다.

이 설정 파일안에는 학습 데이타의 위치, 클래스의 수 및 각종 하이퍼 패러미터들이 정의되어 있다. 패러미터에 대한 자세한 설명은  https://github.com/tensorflow/models/blob/master/object_detection/g3doc/configuring_jobs.md를 참고하기 바란다. 이 예제에서는 설정 파일을 따로 만들지 않고 애완동물 사진 학습을 위해서 미리 정의되어 있는 템플릿 설정 파일을 이용하도록 한다.  설정 파일은 미리 정의된 모델에 따라 다른데, 여기서는 faster_rcnn_resnet101_pets 모델을 사용하기 때문에 object_detection/samples/configs/faster_rcnn_resnet101_pets.config 파일을 사용한다.


파일의 위치가 PATH_TO_BE_CONFIGURED 문자열로 정의되어 있는데, 이를 앞에서 만든 GCS 버킷명으로 변경해야 하기 때문에, 아래와 같이 sed 명령을 이용하여 해당 문자열을 변경하자


Linux : sed -i "s|PATH_TO_BE_CONFIGURED|"gs://${YOUR_GCS_BUCKET}"/data|g" object_detection/samples/configs/faster_rcnn_resnet101_pets.config


Max : sed -i ‘’ -e "s|PATH_TO_BE_CONFIGURED|"gs://${YOUR_GCS_BUCKET}"/data|g" object_detection/samples/configs/faster_rcnn_resnet101_pets.config


설정 파일 작성이 끝났으면 이를 GCS 버킷에 올린 후에, 학습시에 사용하도록 한다. 다음 명령어는 설정 파일을 GCS 버킷에 올리는 명령이다.

gsutil cp object_detection/samples/configs/faster_rcnn_resnet101_pets.config \
   gs://${YOUR_GCS_BUCKET}/data/faster_rcnn_resnet101_pets.config


텐서플로우 코드 패키징 및 업로드

학습에 사용할 데이타와 체크포인트등을 업로드 했으면, 다음 텐서플로우 코드를 패키징 해야 한다. 이 글에서는 학습을 로컬 머신이 아니라 구글 클라우드의 텐서플로우 메니지드 서비스인 CloudML을 사용하는데, 이를 위해서는 텐서플로우코드와 코드에서 사용하는 파이썬 라이브러리들을 패키징해서 올려야 한다.


Object Detection API 모델 디렉토리에서 다음 명령어를 실행하면, model 디렉토리와 model/slim 디렉토리에 있는 텐서플로우 코드 및 관련 라이브러리를 같이 패키징하게된다.


# From tensorflow/models/
python setup.py sdist
(cd slim && python setup.py sdist)


명령을 실행하고 나면 패키징된 파일들은 dist/object_detection-0.1.tar.gzslim/dist/slim-0.1.tar.gz 에 저장되게 된다.

학습하기

구글 CloudML을 이용하여 학습하기. 그러면 학습을 시작해보자. 학습은 200,000 스탭에 총 17시간 정도가 소요되며, 비용이 3000$ 이상이 소요되니, 비용이 넉넉하지 않다면, 학습을 중간에 중단 시키기를 권장한다. 테스트 목적이라면 약 10~20분 정도면 충분하지 않을까 한다. 아니면 앞의 config 파일에서 trainning step을 작게 낮춰서 실행하기 바란다.


# From tensorflow/models/
gcloud ml-engine jobs submit training `whoami`_object_detection_`date +%s` \
   --job-dir=gs://${YOUR_GCS_BUCKET}/train \
   --packages dist/object_detection-0.1.tar.gz,slim/dist/slim-0.1.tar.gz \
   --module-name object_detection.train \
   --region asia-east1 \
   --config object_detection/samples/cloud/cloud.yml \
   -- \
   --train_dir=gs://${YOUR_GCS_BUCKET}/train \
   --pipeline_config_path=gs://${YOUR_GCS_BUCKET}/data/faster_rcnn_resnet101_pets.config


학습을 시킬 텐서플로우 클러스터에 대한 정보는 object_detection/samples/cloud/cloud.yml 에 들어 있다. 내용을 보면,

trainingInput:

 runtimeVersion: "1.0"

 scaleTier: CUSTOM

 masterType: standard_gpu

 workerCount: 5

 workerType: standard_gpu

 parameterServerCount: 3

 parameterServerType: standard


scaleTier로 클러스터의 종류를 정의할 수 있는데, 서버 1대에서 부터 여러대의 클러스터까지 다양하게 적용이 가능하다. 여기서는 모델이 크기가 다소 크기 때문에, Custom으로 설정하였다.


역할

서버 타입

댓수

Master server

standard_gpu

1

Worker

standard_gpu

5

Parameter Server

standard

5


각 서버의 스펙은 상세 스펙은 나와있지 않고, 상대값으로 정의되어 있는데 대략 내용이 다음과 같다.



출처 https://cloud.google.com/ml-engine/docs/concepts/training-overview#machine_type_table




학습을 시작하고 나면 CloudML 콘솔에서 실행중인 Job을 볼 수 있고, Job을 클릭하면 자원의 사용 현황을 볼 수 있다. (CPU와 메모리 사용량)



학습을 시작한 후에, 학습된 모델을 Evaluate할 수 있는데, Object Detection API에서는 학습 말고 Evaluation 모델을 별도로 나눠서, 잡을 나눠서 수행하도록 하였다. 학습중에 생성되는 체크포인트 파일을 읽어서 Evaluation을 하는 형태이다.

다음을 Evaluation을 실행하는 명령어인데, 위의 학습 작업이 시작한 후에, 한시간 정도 후부터 실행해도 실행 상태를 볼 수 있다.


# From tensorflow/models/
gcloud ml-engine jobs submit training `whoami`_object_detection_eval_`date +%s` \
   --job-dir=gs://${YOUR_GCS_BUCKET}/train \
   --packages dist/object_detection-0.1.tar.gz,slim/dist/slim-0.1.tar.gz \
   --module-name object_detection.eval \
   --region asia-east1 \
   --scale-tier BASIC_GPU \
   -- \
   --checkpoint_dir=gs://${YOUR_GCS_BUCKET}/train \
   --eval_dir=gs://${YOUR_GCS_BUCKET}/eval \
  --pipeline_config_path=gs://${YOUR_GCS_BUCKET}/data/faster_rcnn_resnet101_pets.config


학습 진행 상황 확인하기

학습이 진행중에도, Evaluation을 시작했으면, Tensorboard를 이용하여 학습 진행 상황을 모니터링 할 수 있다. 학습 진행 데이타가 gs://${YOUR_GCS_BUCKET} 에 저장되기 때문에, 이 버킷에 있는 데이타를 Tensorboard로 모니터링 하면 된다.

실행 방법은 먼저 GCS 에 접속이 가능하도록 auth 정보를 설정하고, Tensorboard에 로그 파일 경로를

GCS 버킷으로 지정하면 된다.

gcloud auth application-default login
tensorboard --logdir=gs://${YOUR_GCS_BUCKET}


아래는 실제 실행 결과이다.



Evaluataion이 끝났으면, 테스트된 이미지도 IMAGES 탭에서 확인이 가능하다.



학습된 모델을 Export 하기

학습이 완료되었으면, 이 모델을 예측 (Prediction)에 사용하기 위해서 Export 할 수 있다. 이렇게 Export 된 이미지는 나중에 다시 로딩하여 예측(Prediction)코드에서 로딩을 하여 사용이 가능하다.

${YOUR_GCS_BUCKET}에 가면 체크 포인트 파일들이 저장되어 있는데, 이 체크 포인트를 이용하여 모델을 Export 한다.



GCS 버킷에서 Export 하고자 하는 Check Point 번호를 선택한 후에 Export 하면 된다, 여기서는 200006 Check Point를 Export 해보겠다.


${CHECKPOINT_NUMBER} 환경 변수를

export CHECKPOINT_NUMBER=200006

으로 설정한 다음에 다음 명령어를 실행한다.


# From tensorflow/models
gsutil cp gs://${YOUR_GCS_BUCKET}/train/model.ckpt-${CHECKPOINT_NUMBER}.* .
python object_detection/export_inference_graph.py \

   --input_type image_tensor \

   --pipeline_config_path object_detection/samples/configs/faster_rcnn_resnet101_pets.config \

   --trained_checkpoint_prefix model.ckpt-${CHECKPOINT_NUMBER} \

   --output_directory output_inference_graph.pb


명령을 실행하고 나면 output_inference_graph.pb 디렉토리에 모델이 Export 된것을 확인할 수 있다.

다음 글에서는 직접 자신의 사진 데이타만을 가지고 학습과 예측을 하는 방법에 대해서 알아보겠다.


참고 자료




Tensorflow Object Detection API


조대협 ( http://bcho.tistory.com)


Tensorflow Object Detection API는, Tensorflow 를 이용하여 이미지를 인식할 수 있도록 개발된 모델로, 라이브러리 형태로 제공되며, 각기 다른 정확도와 속도를 가지고 있는 5개의 모델을 제공한다. 머신러닝이나 텐서플로우에 대한 개념이 거의 없더라도 라이브러리 형태로 손쉽게 사용할 수 있으며, 직접 사용자 데이타를 업로드해서 학습을 하여, 내 시나리오에 맞는 Object Detection System을 손쉽게 만들 수 있다.


Object Detection API를 설치하기 위해서는 텐서플로우 1.x 와 파이썬 2.7x 버전이 사전 설치되어 있어야 한다. 이 글에서는 파이썬 2.7.13과 텐서플로우 2.7.13 버전을 기준으로 하고, 맥에 설치하는 것을 기준으로 한다. 리눅스나 다른 플랫폼 설치는 원본 설치 문서 https://github.com/tensorflow/models/blob/master/object_detection/g3doc/installation.md 를 참고하기 바란다.


설치 및 테스팅

Protocol Buffer 설치

Object Detection API는 내부적으로 Protocol Buffer를 사용한다. MAC에서 Protocol Buffer를 설치 하는 방법은 https://github.com/google/protobuf/tree/master/pythonhttp://bcho.tistory.com/1182 를 참고하기 바란다.

설치가 되었는지를 확인하려면, 프롬프트 상에서 protoc 명령을 실행해보면 된다.

파이썬 라이브러리 설치

프로토콜 버퍼 설치가 끝났으면, 필요한 파이썬 라이브러리를 설치한다.

% pip install pillow

% pip install lxml

% pip install jupyter

% pip install matplotlib

Object Detection API 다운로드 및 설치

Object Detection API 설치는 간단하게, 라이브러리를 다운 받으면 된다. 설치할 디렉토리로 들어가서 git clone 명령어를 통해서, 라이브러리를 다운로드 받자

% git clone https://github.com/tensorflow/models

Protocol Buffer 컴파일

다음 프로토콜 버퍼를 사용하기 위해서 protoc로 proto 파일을 컴파일 한데, Object Detection API를 설치한 디렉토리에서 models 디렉토리로 들어간 후에, 다음 명령어를 수행한다.


protoc object_detection/protos/*.proto --python_out=.

PATH 조정하기

설치가 끝났으면 Object Detection API를 PATH와 파이썬 라이브러리 경로인 PYTHONPATH에 추가한다. 맥에서는 사용자 홈디렉토리의 .bash_profile 에 추가 하면되낟.

PYTHONPATH 환경 변수에 {Object Detection API 설치 디렉토리}/models/slim 디렉토리와 Object Detection API 설치 디렉토리}/models/models 디렉토리를 추가한다.

같은 디렉토리를 PATH에도 추가해준다.


export PYTHONPATH=$PYTHONPATH:/Users/terrycho/dev/workspace/objectdetection/models:/Users/terrycho/dev/workspace/objectdetection/models/slim

export PATH=$PATH:/Users/terrycho/dev/workspace/objectdetection/models:/Users/terrycho/dev/workspace/objectdetection/models/slim

테스팅

설치가 제대로 되었는지를 확인하기 위해서 {Object Detection API 설치 디렉토리}/models/ 디렉토리에서 다음 명령을 실행해보자


% python object_detection/builders/model_builder_test.py


문제 없이 실행이 되었으면 제대로 설치가 된것이다.


사용하기

설치가 끝났으면 실제로 사용해 보자, Object Detection API를 인스톨한 디렉토리 아래 models/object_detection/object_detection_tutorial.ipynb 에 테스트용 노트북 파일이 있다. 이 파일을 주피터 노트북 (http://jupyter.org/)을 이용하여 실행해보자.
(원본 코드 https://github.com/tensorflow/models/blob/master/object_detection/object_detection_tutorial.ipynb)


실행을 하면 결과로 아래와 같이 물체를 인식한 결과를 보여준다.




이 중에서 중요한 부분은 Model Preparation이라는 부분으로,

여기서 하는 일은 크게 아래 3가지와 같다.

  • Export 된 모델 다운로드

  • 다운로드된 모델 로딩

  • 라벨맵 로딩


Export 된 모델 다운로드

Object Detection API는 여러가지 종류의 미리 훈련된 모델을 가지고 있다.

모델 종류는 https://github.com/tensorflow/models/blob/master/object_detection/g3doc/detection_model_zoo.md 를 보면 되는데,  다음과 같은 모델들을 지원하고 있다.  COCO mAP가 높을 수 록 정확도가 높은 모델인데, 대신 예측에 걸리는 속도가 더 느리다.


Model name

Speed

COCO mAP

Outputs

ssd_mobilenet_v1_coco

fast

21

Boxes

ssd_inception_v2_coco

fast

24

Boxes

rfcn_resnet101_coco

medium

30

Boxes

faster_rcnn_resnet101_coco

medium

32

Boxes

faster_rcnn_inception_resnet_v2_atrous_coco

slow

37

Boxes


모델은 *.gz 형태로 다운로드가 되는데, 이 파일안에는 다음과 같은 내용들이 들어있다.

  • Check point (model.ckpt.data-00000-of-00001, model.ckpt.index, model.ckpt.meta)
    텐서플로우 학습 체크 포인트로, 나중에, 다른 데이타를 학습 시킬때 Transfer Learning을 이용할때, 텐서플로우 그래프에 이 체크포인트를 로딩하여, 그 체크포인트 당시의 상태로 학습 시켜놓을 수 있다. 이 예제에서는 사용하지 않지만, 다른 데이타를 이용하여 학습할때 사용한다.

  • 학습된 모델 그래프 (frozen_inference_graph.pb)
    학습이 완료된 그래프에 대한 내용을 Export 해놓은 파일이다. 이 예제에서는 이 모델 파일을 다시 로딩하여 Prediction을 수행한다.

  • Graph proto (pgrah.pbtxt)


기타 파일들

이외에도 기타 다른 파일들이 있는데, 다른 파일들은 이미 Object Detection API 안에 이미 다운로드 되어 있다.

  • 라벨맵
    라벨맵은 {Object Detection API 설치 디렉토리}/models/object_detection/data  디렉토리 안에 몇몇 샘플 모델에 대한 라벨맵이 저장되어 있다. 라벨맵은 모델에서 사용한 분류 클래스에 대한 정보로 name,id,display_name 식으로 정의되며, name은 텍스트 라벨, id는 라벨을 숫자로 표현한 값 (반드시 1부터 시작해야 한다.), display_name은 Prediction 결과를 원본 이미지에서 인식한 물체들을 박스처리해서 출력하는데 이때 박스에 어떤 물체인지 출력해주는 문자열에 들어가는 텍스트 이다.
    여기서 사용한 라벨맵은 mscoco_label_map.pbtxt 파일이 사용되었다.

  • 학습 CONFIG 파일
    모델 학습과 예측에 사용되는 각종 설정 정보를 저장한 파일로 위에서 미리 정의된 모델별로 각각 다른 설정 파일을 가지고 있으며 설정 파일의 위치는  {Object Detection API 설치 디렉토리}/models/object_detection/samples/configs 에 {모델명}.config 에 저장되어 있다.


다운로드된 모델과 라벨맵 로딩

위에서 많은 파일이 다운되고 언급되었지만 예측 (Prediction)에는 학습된 그래프 모델을 저장한 frozen_inference_graph.pb 파일과, 분류 라벨이 저장된 mscoco_label_map.pbtxt 두 개만 사용된다.


다음 코드 부분에서 모델을 다운 로드 받고, 모델 파일과 라벨 파일의 경로를 지정하였다.


# What model to download.

MODEL_NAME = 'ssd_mobilenet_v1_coco_11_06_2017'

MODEL_FILE = MODEL_NAME + '.tar.gz'

DOWNLOAD_BASE = 'http://download.tensorflow.org/models/object_detection/'


# Path to frozen detection graph. This is the actual model that is used for the object detection.

PATH_TO_CKPT = MODEL_NAME + '/frozen_inference_graph.pb'


# List of the strings that is used to add correct label for each box.

PATH_TO_LABELS = os.path.join('data', 'mscoco_label_map.pbtxt')


NUM_CLASSES = 90


그리고 마지막 부분에 분류 클래스의 수를 설정한다. 여기서는 90개의 클래스로 정의하였다.

만약에 모델을 바꾸고자 한다면 PATH_TO_CKPT를 다른 모델 파일로 경로만 변경해주면 된다.


다음으로  frozen_inference_graph.pb  로 부터 모델을 읽어서 그래프를 재생성하였다.


detection_graph = tf.Graph()

with detection_graph.as_default():

 od_graph_def = tf.GraphDef()

 with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:

   serialized_graph = fid.read()

   od_graph_def.ParseFromString(serialized_graph)

   tf.import_graph_def(od_graph_def, name='')


나머지 부분은 이미지를 읽어서, 로딩된 모델을 이용하여 물체를 Detection 하는 코드이다.


여기까지 간단하게 Tensorflow Object Detection API를 설치 및 사용하는 방법에 대해서 알아보았다.

다음 글에서는 다른 데이타로 모델을 학습해서 예측하는 부분에 대해서 알아보도록 하겠다.


참고 자료





TFRecord


조대협 (http://bcho.tistory.com)


텐서플로우를 접하게 다 보면 필히 만나는 부분이 텐서플로우 학습 데이타 포맷인 TFRecord라는 파일 포맷이다. 마침 얼굴 인식 모델을 이번에는 텐서플로우에서 미리 개발되어 제공되는 물체 인식 API인 Tensorflow Object Detection API를 사용해서 얼굴을 학습시켜보려고 하니 데이타 포맷을 TFRecord 포맷으로 변경해야 한다. 그래서, TFRecord 파일을 만들어보고, 테스트를 위해서 데이타 내용도 직접 읽는 코드를 작성해보았다. (전체 코드는 https://github.com/bwcho75/objectdetection/tree/master/tfrecord 에 다.)

TFRecord 파일 포맷이란

TFRecord 파일은 텐서플로우의 학습 데이타 등을 저장하기 위한 바이너리 데이타 포맷으로, 구글의 Protocol Buffer 포맷으로 데이타를 파일에 Serialize 하여 저장한다.

CSV 파일에서와 같이 숫자나 텍스트 데이타를 읽을때는 크게 지장이 없지만, 이미지를 데이타를 읽을 경우 이미지는 JPEG나 PNG 형태의 파일로 저장되어 있고 이에 대한 메타 데이타와 라벨은 별도의 파일에 저장되어 있기 때문에, 학습 데이타를 읽을때 메타데이타나 라벨용 파일 하나만 읽는 것이 아니라 이미지 파일도 별도로 읽어야 하기 때문에, 코드가 복잡해진다.


또한 이미지를 JPG나 PNG 포맷으로 읽어서 매번 디코딩을 하게 되면, 그 성능이 저하되서 학습단계에서 데이타를 읽는 부분에서 많은 성능 저하가 발생한다.


이와 같이 성능과 개발의 편의성을 이유로 TFRecord 파일 포맷을 이용하는 것이 좋다.


그러면 간단한 예제를 통해서 TFRecord 파일을 쓰고 읽는 방법에 대해서 알아보도록 하자

본 예제는 http://warmspringwinds.github.io/tensorflow/tf-slim/2016/12/21/tfrecords-guide/ 글과 https://github.com/tensorflow/models/blob/master/object_detection/g3doc/using_your_own_dataset.md 글을 참고하였다.


TFRecord 파일 생성

TFRecord 파일 생성은 tf.train.Example에 Feature를 딕셔너리 형태로 정의한 후에, tf.train.Example 객체를 TFRecord 파일 포맷 Writer인 tf.python_io.TFRecordWriter를 통해서 파일로 저장하면 된다.


다음 코드를 보자, 이 코드는 Tensorflow Object Detection API를 자신의 데이타로 학습시키기 위해서 데이타를 TFRecord 형태로 변환하여 저장하는 코드의 내용이다.

이미지를 저장할때 사물의 위치를 사각형 좌표로 지정하고 저장한다.


def create_cat_tf_example(encoded_image_data):


 height = 1032

 width = 1200

 filename = 'example_cat.jpg'

 image_format = 'jpg'


 xmins = [322.0 / 1200.0]

 xmaxs = [1062.0 / 1200.0]

 ymins = [174.0 / 1032.0]

 ymaxs = [761.0 / 1032.0]

 classes_text = ['Cat']

 classes = [1]


 tf_example = tf.train.Example(features=tf.train.Features(feature={

     'image/height': dataset_util.int64_feature(height),

     'image/width': dataset_util.int64_feature(width),

     'image/filename': dataset_util.bytes_feature(filename),

     'image/source_id': dataset_util.bytes_feature(filename),

     'image/encoded': dataset_util.bytes_feature(encoded_image_data),

     'image/format': dataset_util.bytes_feature(image_format),

     'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),

     'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),

     'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),

     'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),

     'image/object/class/text': dataset_util.bytes_list_feature(classes_text),

     'image/object/class/label': dataset_util.int64_list_feature(classes),

 }))

 return tf_example


저장되는 내용은 이미지의 높이와 너비 (height,weight), 파일명 (filename), 인코딩 포맷 (format), 이미지 바이너리 (encoded), 이미지내에서 물체의 위치를 가르키는 사각형 위치 (xmin,ymin,xmax,ymax) 와 라벨 값등이 저장된다.


코드를 유심이 살표보면 생각보다 이해하기어렵지 않다.

tf.train.Example 객체를 만들고 이때 인자로 features에 TFRecord에 저장될 갚들의 목록을 딕셔너리 형태로 저장한다.

이때 저장되는 데이타의 이름과 값을 지정해야 하는데


'image/height': dataset_util.int64_feature(height),


를 보면 'image/height' 이 데이타의 이름이 되고, dataset_util.int64_feature(height), 가 height 값을 텐서플로우용 리스트형으로 변형하여 이를 학습용 피쳐 타입으로 변환하여 저장한다.

이 예제는 Object Detection API의 일부이기 때문에, dataset_util이라는 모듈을 사용했지만, 실제로 이 함수의 내부를 보면  tf.train.Feature(int64_list=tf.train.Int64List(value=values)) 로 구현이 되어 있다.


다음 이렇게 생성된 tf.train.Example 객체를 tf.python_io.TFRecordWriter 를 이용해서 다음과 같이 파일에 써주면 된다.

   writer = tf.python_io.TFRecordWriter(tfrecord_filename)

   writer.write(tf_example.SerializeToString())


다음은 코드 전체이다.


import tensorflow as tf

from PIL import Image

from object_detection.utils import dataset_util


def create_cat_tf_example(encoded_image_data):


 height = 1032

 width = 1200

 filename = 'example_cat.jpg'

 image_format = 'jpg'


 xmins = [322.0 / 1200.0]

 xmaxs = [1062.0 / 1200.0]

 ymins = [174.0 / 1032.0]

 ymaxs = [761.0 / 1032.0]

 classes_text = ['Cat']

 classes = [1]


 tf_example = tf.train.Example(features=tf.train.Features(feature={

     'image/height': dataset_util.int64_feature(height),

     'image/width': dataset_util.int64_feature(width),

     'image/filename': dataset_util.bytes_feature(filename),

     'image/source_id': dataset_util.bytes_feature(filename),

     'image/encoded': dataset_util.bytes_feature(encoded_image_data),

     'image/format': dataset_util.bytes_feature(image_format),

     'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),

     'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),

     'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),

     'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),

     'image/object/class/text': dataset_util.bytes_list_feature(classes_text),

     'image/object/class/label': dataset_util.int64_list_feature(classes),

 }))

 return tf_example


def read_imagebytes(imagefile):

   file = open(imagefile,'rb')

   bytes = file.read()


   return bytes

   

def main():

   print ('Converting example_cat.jpg to example_cat.tfrecord')

   tfrecord_filename = 'example_cat.tfrecord'

   bytes = read_imagebytes('example_cat.jpg')

   tf_example = create_cat_tf_example(bytes)


   writer = tf.python_io.TFRecordWriter(tfrecord_filename)

   writer.write(tf_example.SerializeToString())


main()


참고로 이 예제는 앞에서도 언급하였듯이 Object Detection API에 대한 의존성을 가지고 있기 때문에 일반적인 텐서플로우 개발환경에서는 실행이 되지 않는다. Tensorflow Object Detection API 를 인스톨해야 dataset_util 를 사용할 수 있기 때문에 Object Detection API 설치가 필요하다.

만약에 Object Detection API 설치 없이 TFRecord Writer를 짜보고 싶은 경우에는 http://warmspringwinds.github.io/tensorflow/tf-slim/2016/12/21/tfrecords-guide/ 문서에 예제가 간단하게 잘 정리되어 있으니 참고하기 바란다.

TFRecord에서 데이타 읽기

데이타를 읽는 방법도 크게 다르지 않다. 쓰는 순서의 반대라고 보면 되는데, TFReader를 통해서 Serialized 된 데이타를 읽고, 이를 Feature 목록을 넣어서 파싱한 후에, 파싱된 데이타셋에서 각 피쳐를 하나하나 읽으면 된다.


코드를 보자


def readRecord(filename_queue):

   reader = tf.TFRecordReader()

   _,serialized_example = reader.read(filename_queue)

   

   #'''

   keys_to_features = {

       'image/height': tf.FixedLenFeature((), tf.int64, 1),

       'image/width': tf.FixedLenFeature((), tf.int64, 1),

       'image/filename': tf.FixedLenFeature((), tf.string, default_value=''),

       #'image/key/sha256': tf.FixedLenFeature((), tf.string, default_value=''),

       'image/source_id': tf.FixedLenFeature((), tf.string, default_value=''),

       'image/encoded': tf.FixedLenFeature((), tf.string, default_value=''),

: (중략)

   }

   

   features = tf.parse_single_example(serialized_example,features= keys_to_features)

   

   height = tf.cast(features['image/height'],tf.int64)

   width = tf.cast(features['image/width'],tf.int64)

   filename = tf.cast(features['image/filename'],tf.string)

   source_id = tf.cast(features['image/source_id'],tf.string)

   encoded = tf.cast(features['image/encoded'],tf.string)

: (중략)

   return height,width,filename,source_id,encoded,image_format


TFRecoderReader를 이용하여 파일을 읽는데, 파일을 직접읽지 않고 filename_queue를 이용해서 읽는다. 코드 전체를 보면, 이 큐는

filename_queue = tf.train.string_input_producer([tfrecord_filename])

로, 파일 이름 목록을 가지고 리턴하는 string_input_producer를 사용하였다.

파일을 읽으면 데이타는 직렬화 된 상태로 리턴이 되어 serialized_example 에 저장된다.

이를 각 개별 피쳐로 디코딩 하기 위해서 피쳐 목록을 keys_to_features 딕셔너리에 저장한다.

이때, 각 피쳐의 이름과, 타입을 정의한다.

다음은 ‘image/height’ 피쳐를 정의하여 int 64 타입으로 읽어드리는 부분이다.


       'image/height': tf.FixedLenFeature((), tf.int64, 1),


피쳐 목록과, 데이타 타입은 TFRecord 파일을 쓸때 사용한 이름과 데이타 타입을 그대로 사용하면 된다.

피쳐 목록이 정의되었으면


   features = tf.parse_single_example(serialized_example,features= keys_to_features)


를 통해서 피쳐를 파싱해낸다. 파싱된 피쳐는 features에 딕셔너리 형태로 저장된다.

다음 리턴 받은 텐서를 각 변수에 저장한다. 이때 타입 캐스팅을 해서 저장한다. (이미 타입을 맞춰서 데이타를 꺼냈기 때문에, 별도의 캐스팅은 필요없지만 확실히 하기 위해서 캐스팅을 한다.)

다음은 ‘image/height’ 를 피쳐를 배열에서 뽑아서, int64 타입으로 변환하여 height 에 저장하는 부분이다.


   height = tf.cast(features['image/height'],tf.int64)


리턴값은 텐서가 되는데, 이 값을 출력하는 코드를 보자

당연히 텐서이기 때문에 Session 을 시작해야 하는데, 먼저 파일에서 데이타를 읽기 위해서 filename queue를 정의한다.

    filename_queue = tf.train.string_input_producer([tfrecord_filename])

다음 filename_queue를 인자로 넘겨서 height,source_id 등의 값을 *.tfrecord 파일에서 읽어서 텐서로 리턴 받는다.


    height,width,filename,source_id,encoded,image_format = readRecord(filename_queue)


다음 세션을 시작하고, readRecord 에서 리턴된 값을 받아온다.         vheight,vwidth,vfilename,vsource_id,vencoded,vimage_format = sess.run([height,width,filename,source_id,encoded,image_format])


전체 코드는 https://github.com/bwcho75/objectdetection/tree/master/tfrecord/reader.py 를 참고하기 바란다.


마지막으로 받은 값을 화면에 출력한다.



간단하게 tfRecord 파일 포맷으로 학습용 데이타를 쓰는 방법을 알아보았다. 텐서플로우 코드가 간단해 지고 성능에 도움이 되는 만큼 데이타 전처리 단계에서 가급적이면 학습 데이타를 tfrecord 타입으로 바꿔서 학습하는 것을 권장한다. (특히 이미지 데이타!!)


참고 자료


Apache airflow


조대협 (http://bcho.tistory.com)

배경

빅데이타 분석이나, 머신러닝 코드를 만들다 보면 필요한것중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이타 베이스의 ETL 작업과 비슷한 흐름이라고 보면 된다.


예를 들어 머신러닝의 학습 과정을 보면 데이타 전처리,학습,배포,예측과 같은 단계를 가지게 된다.


  • rawdata를 읽어서 preprocessing 단계를 거쳐서 학습에 적절한 training data로 변경하고,

  • 변경된 training data를 가지고 머신러닝 모델을 학습한후, 학습된 모델을 저장한다.

  • 학습된 모델을 가지고 예측을 해서 결과를 저장한다.


이렇게 머신러닝은 여러개의 단계를 거쳐서 수행이 되는데, 각 단계가 끝나면 다음 단계를 수행해야 한다. 단순하게 CRON+쉘로 순차적으로 수행하는 것등이 가능하지만, 에러가 났을때 재처리를 하거나 , 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.

데이타 워크 플로우 관리 도구

이런 요구 사항 때문에 여러가지 툴이 개발되었는데, 대표적인 도구로는 하둡 에코시스템에 우지(oozie ) 등이 있다.



<그림. Oozie eclipse 클라이언트 >


하둡의 여러 에코 시스템 솔루션들을 유기적으로 조합하기 위해서 개발된 도구로, 하둡 에코 시스템에 있는 여러가지 다양한 솔루션과 연동하기 위한 아답터를 가지고 있다.

이외에도 rundeck, luigi와 같은 유사한 솔루션들이 있다.

오늘 소개하고자 하고자하는 데이타 워크 플로우 관리도구는 아파치 오픈소스 airflow 이다. 원래 airbnb에서 개발된 도구로 현재 아파치 오픈소스에서 인큐베이터 단계에 있는 소프트웨어이다.


airflow를 소개하는 이유는 첫번째 파이썬 기반으로 태스크 코드를 작성할 수 있기 때문에, 데이타 분석이나 머신러닝을 개발하는 엔지니어들에게 익숙한 언어이고, 한대에서 동작하는게 아니라 여러 머신에 분산하여 수행 될 수 있는 장점을 가지고 있다.



<그림. Apache airflow 의 작업 그래프 구조 화면 >

airflow 시작하기

그러면 간단하게 airflow에 대한 개념과 사용법에 대해서 알아보자

airflow 설치

airflow는 실행되는 작업의 상태등을 저장하기 위해서 데이타 베이스 (MySQL이나 Postgres)등이 필요하며, 분산 환경을 위해서 여러대에 설치할 수 있다. 또한 로컬 환경에 sqlite와 함께 간단하게 설치할 수 있다. 여기서는 간단하게 개인 맥북환경에 로컬로 설치 및 실행하는 시나리오로 설명한다.


설치 방법은 매우 간단하다. 파이썬 2.7 환경에서 아래와 같이 간단학 “pip install airflow”만 실행해주면 된다.

%pip install airflow



airflow가 설치되었으면 데이타 베이스 설정을 해줘야 하는데, 이 튜토리얼에서는 개발 및 테스트를 위해서 sqlite를 사용한다. sqlite를 초기화 하기 위해서 다음과 같이 “airflow initdb” 명령을 실행한다.

% airflow initdb


자아 이제 설치가 끝나고 airflow를 사용할 준비가 되었다. 이제 airflow 웹콘솔을 기동해보자

“airflow webserver -p 8080” 을 실행하고 웹에 http://localhost:8080에 접근하면 airflow 콘솔을 볼 수 있다.

airflow 코드

airflow에서 워크플로우를 저장하기 위해서 몇가지 추상화된 개념을 사용한다.

Airflow DAG의 구조

DAG (Directed Acyclic Graph)

DAG는 하나의 워크 플로우라고 보면 된다. 위의 예제처럼, 머신러닝 이라는 DAG를 정의한다면, Preprocessing,Training,Prediction 워크플로우가 하나의 DAG가 된다.

Operator and Task

Operator는 DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이야기 하는데, Pre processing, Training, Prediction 함수가 Operatorator 이다.

이 Operator 함수가 DAG 상에서 실제로 워크플로우 상에 정의되서 호출 되면 이것이 Task 이다.

객체지향 언어에서 Operator가 class 라면, Task는 object 라고 보면 된다.


이해가 잘안될 수 있는데, 코드를 보자


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator

from datetime import datetime,timedelta


dag = DAG('hello-airflow',description='Hello airflow DAG',

         schedule_interval = '*/5 0 * * *',

         start_date=datetime(2017,07,01),catchup=False)


def print_hello():

   return 'Hello Airflow'


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


bash_task.set_downstream(python_task)


DAG 정의 부분을 보자. DAG 객체는 DAG에 대한 전체 컨택스를 저장 및 유지 관리한다.

DAG('hello-airflow',description='Hello airflow DAG', 에서 DAG를 이름을 ‘hello-airflow’로 정의하고 description에 설명을 적는다.

schedule_interval = '*/5 * * * *', 다음으로 이 DAG가 실행되는 주기를 정해야 하는데, cron 명령과 같은 노테이션으로 정의한다. 위 설정은 매 5분마다 실행되도록 하는 설정이다.

마지막으로, start_date=datetime(2017,07,01), ,DAG를 언제부터 시작할것인지 지정한다. DAG는 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG를 부르는 sub DAG의 경우에는 지역 변수로 지정이 가능하다.


다음 task에 사용할 operator를 정의하는데, 파이썬 코드를 실행할 오퍼레이터인 PythonOperator와 쉘 커맨드를 실행할 BashOperator를 가지고 각각 파이썬 태스크 python_task와, 쉘 태스크 bash_task를 정의한다.


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


파이썬 태스크의 id는 “python_operator”라고 지정하였고, 실행시 print_hello를 호출하도록 하였다.

그리고 이 태스크는 DAG인 dag에 지정한다.


다음 쉘 태스크의 내용은 다음과 같다.

bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


print_data라는 이름으로 태스크를 정의하고, 쉘 명령어 date를 실행하도록 하였다.

등록

코드 작성이 끝나면 코드를 배포해보자. Dag 파일을 airflow에 등록해야 하는데, dag 파일을 저장하는 장소는 dags_folder 라는 변수로 $AIRFLOW_HOME/airflow.cfg 파일안에 정의 되어 있다. 디폴트 장소는 $AIRFLOW_HOME/dags/ 폴더이다. 위에서 작성한 코드를 해당 디렉토리에 복사하자

다음 dag이 제대로 등록되었는지를 확인한다. 커멘드 창에서 “airflow list_dags”라는 명령을 수행하면 현재 등록되어 있는 DAG 목록을 볼 수 있다. 아래 그림과 같이 hello-airflow dag가 등록된것을 확인할 수 있다.




hello-airflow dag안에 어떤 태스크들이 정의되어 있는지를 확인하려면 ‘airflow list_tasks hello-airflow’ 명령을 이용하면 hello-airflow 안에 등록된 태스크 목록을 출력해준다.


테스트

테스트를 하려면 태스크 단위로 테스트가 가능하다. airflow test {DAG ID} {태스크 ID} {실행날짜} 식으로 하면 된다.

, 예를 들어 print_date 태스크를 2017-07-01을 기준으로 실행하고자하면 airflow test hello-airflow print_date 2017-07-01

Hello-airflow DAG안에 print_date라는 태스크를 실행한다.



실행

DAG 코드 개발 등록과 테스트가 완료되었으면 이제 airflow scheduler 를 띄워준다. (일종의 데몬이다.) 스케쥴러는 DAG 코드에 정의된 스케쥴에 따라서 테스크를 실행해준다.

스케쥴러 실행은 간단하게 airflow scheduler 명령을 실행하면 된다.



스케쥴러가 실행되면, 각 DAG의 스케쥴에 따라서 자동으로 태스크들을 수행한다.


로그 모니터링

스케쥴러에 의해서 실행되는 DAG와 태스크들의 결과와 로그는 어떻게 모니터링 할까? airflow에 의해서 수행되는 태스크들은 $AIRFLOW_HOME/logs 디렉토리에 저장된다.

logs 디렉토리 아래에 각각 DAG 이름으로 저장이 되며, DAG 이름으로 된 디레토리안에는 태스크명으로 된 서브 디렉토리가 있고, 이 서브 디렉토리 아래에 시간대별 로그가 있다.

즉 hello-airflow DAG의 print_date 태스크에 대한 로그는 $AIRFLOW_HOME/logs/hello-airflow/print_date/{날짜및시간} 파일 명으로 저장된다.

웹 콘솔을 이용한 모니터링

airflow의 강력한 기능중의 하나는 웹 기반의 모니터링 콘솔을 제공한다. 뒤에서는 주요 웹 콘솔의 주요 기능에 대해서 알아보도록 한다.

Graph View

Graph View는 DAG의 구조를 그래프 형태로 보여주는 뷰이다.


복잡한 워크플로우의 경우 그 구조를 파악하는데 유용한다. 위의 그림은 앞서 만든 hello-airflow 에 대한 태스크간 그래프로 print_date를 호출한 후에, python_operator 태스크를 호출하는 것을 볼 수 있다.

Tree View


트리뷰를 보면, DAG의 구조를 트리 형태로 보여주고, DAG의 태스크가 각각 성공했는지 실패 했는지를 우측 그래프 처럼 표현해준다. 각 태스크를 로그를 보려면 각 태스크 실행 결과 그래프를 누르면 아래와 같이 세부 메뉴가 나온다.



여기서 View Log를 누르면 각 Task 별로 실행 당시의 로그를 볼 수 있다. 아래는 Python_Operator 태스크를 실행한 로그이다.



아래서 두번째 줄을 보면 Hello Airflow 라는 문자열을 리턴한것을 확인할 수 있다.


Task Duration

Task duration은 DAG에서 수행된 각 태스크의 수행 시간을 그래프 형태로 나타내준다.



어떤 태스크가 시간이 많이 걸리는지 그리고 수행시간이 매번 수행할때 마다 올바른지 (큰 변화가 없고 일정한지. 이건 매우 유용할듯) 등을 체크할 수 있다.

Task Tries


Task Tries 에서는 각 수행별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 즉 재시도 (RETRY)횟수를 모니터링할 수 있다.