블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

얼굴 인식 모델을 만들어보자 #5 학습된 모델을 Export 하기



조대협 (http://bcho.tistory.com)


앞의 글에서 CloudML을 이용하여 학습하는 부분까지 끝냈다. 그렇다면 학습된 모델을 이용하여 실제로 예측은 어떻게 할것인가? 여기에는 두가지 선택지가 있다.


첫번째는, 체크포인트로 저장된 파일을 이용하는 방식인데, 체크포인트에는 저장된 데이타는 텐서플로우 모델 그래프는 없고, 모델에서 사용된 변수 (Weight,bias etc) 만 저장하기 때문에, 이 데이타를 로딩하려면 텐서플로우 코드로 그래프를 그려준 다음에, 로딩을 해야한다. (상세 설명 http://bcho.tistory.com/1179 )


두번째는, 체크포인트처럼 변수만 저장하는 것이 아니라, 그래프를 함께 저장하는 방식으로 모델을 Protocol Buffer (http://bcho.tistory.com/1182) 타입으로 저장하는 방식이다. 이렇게 Protocol buffer (이하 pb)로 저장된 파일은 Prediction에 최적화된 엔진인 Tensorflow Serving (https://www.tensorflow.org/deploy/tfserve) 에 로딩하여 사용이 가능하다. 그런데, Tensorflow Serving의 경우 일일이 빌드를 해야 하는데, bazel 빌드 툴 (make,gradle과 같은 빌드툴. http://bcho.tistory.com/1160 )을 이용해서 빌드 및 배포를 해야 하는데 이 과정이 쉽지 않고, 또한 Tensorflow Serving에 배포된 모델을 호출하기 위해서는 Google protocol buffer (grpc)를 사용해야 한다.

이 과정에 많은 노력(삽질?)이 필요하고, 운영환경에 올리기 위해서는 모델 pb 파일에 대한 배포 프로세스 그리고 여러개의 Tensorflow Serving Cluster 설치 및 운영등의 이슈가 발생한다.


그래서 이를 플랫폼화하여 서비스로 만들어놓은 것이 Google CloudML Prediction 서비스이다. CloudML Prediction 서비스는 단순하게, 학습된 pb 파일만 배포하게 되면, 운영에 대한 이슈없이 대용량 서비스가 가능하고 grpc를 사용하지 않더라도 SDK를 이용하여 손쉽게 json으로 요청을 보냄으로써 prediction에 대한 구현이 가능하다.

모델 Export 하기

http://bcho.tistory.com/1180 에서 CloudML을 이용하여 얼굴 인식 모델을 학습 시켰다. 여기서 사용된 코드를 수정하여 학습이 끝나면, 모델(그래프와 변수값)을 Export 하는 코드를 추가해야 한다.

Export를 할때 주의할 점은 학습에 사용된 그래프를 그대로 Export 하는 것이 아니라 새로 그래프를 그려서 Export를 해야 한다. Export할 그래프는 Prediction을 위한 그래프이기 때문에, 학습에 사용된 그래프는 Dropout이나 또는 validation등을 위한 로직이 들어가 있기 때문에 이런 부분을 다 제거 하고 Prediction을 위한 그래프로 재정의하여 Export 해야한다.


얼굴 인식 모델에서 학습된 모델을 Export 하는 과정은

  1. 학습을 진행하고 학습 진행중에 체크포인트를 저장한다.

  2. 학습이 종료되면 Export를 위한 그래프를 새로 그린다.

  3. 체크포인트 파일에서 변수 값을 읽어서 2에서 그린 그래프에 채워넣는다.


자 그러면 코드를 보자. (전체 코드는 https://github.com/bwcho75/facerecognition/blob/master/CloudML%20Version/face_recog_model/model_localfile_export.py 에 저장되어 있다.)

체크 포인트 저장하기

코드 401 라인을 보면 아래와 같이 saver 객체를 이용하여 현재 학습이 종료된 세션의 값을 넘겨서 체크포인트 값으로 저장한다. 이 때 체크포인트 파일은 os.path.join(model_dir, 'face_recog') 에 저장한다.

       print('Save model')

       model_dir = os.path.join( FLAGS.base_dir , 'model')

       if not os.path.exists(model_dir):

           os.makedirs(model_dir)

       saver.save(sess, os.path.join(model_dir, 'face_recog'))

       print('Save model done '+model_dir)


다음으로 408 라인에서, 모델을 Export하는 함수 export_model 함수를 호출한다. 이때 첫번째 인자로는 체크포인트 파일 경로를 넘긴다.


       export_dir = os.path.join( FLAGS.base_dir , 'export')

       if  os.path.exists(export_dir):

          rmdir(export_dir)

       export_model(os.path.join(model_dir, 'face_recog'), export_dir)

Export용 그래프 그리기

274 라인의 def export_model(checkpoint, model_dir) 함수를 보자. 이 함수는 checkpoint 디렉토리를 입력받아서 model_dir에 모델을 export 해주는 함수이다.


앞에서도 설명했듯이 Export 용 그래프는 새롭게 그려줘야 하는데, 276~279 라인까지가 새롭게 그래프를 그리는 부분이다.

 with tf.Session(graph=tf.Graph()) as sess:


   images = tf.placeholder(tf.string)

   prediction = build_inference(images)


이미지를 입력할 input용 placeholer를 정의한다. 이때 중요한점이 우리가 학습에서는 float형 placeholder를 사용했는데, 여기서는 입력을 string으로 바꿨다. 이유는 모델을 학습한 후에 실제 운영 환경에 올렸을 때, 클라이언트 (웹이나 모바일)에서 이미지를 입력 받아서 학습된 모델을 호출할때 float 형 행렬로 넘기기에는 불편하고 데이타의 크기도 커진다. (행렬데이타를 [1,2,3,4…] 와 같은 문자열로 넘겨야 하기 때문에 ) 그래서 호출할때 데이타 전달을 쉽게 할 수 있도록 이미지를 문자열 바이너리로 입력 받도록 수정하였다.

다음 build_inference(images) 함수가 실제로 Export 용 그래프를 새로 그리는 부분인데

261 라인에 아래와 같이 정의 되어 있다.


def build_inference(image_bytes):

   # graph for prediction in CloudML

   #image_bytes = tf.placeholder(tf.string)

   rgb_image = tf.image.decode_jpeg(image_bytes[0],channels = FLAGS.image_color)

   rgb_image  = tf.image.convert_image_dtype(rgb_image, dtype=tf.float32)

   image_batch = tf.expand_dims(rgb_image, 0)

   #rgb_image_value = rgb_image.eval()

   #rgb_images = []

   #rgb_images.append(rgb_image_value)

   result = tf.nn.softmax(build_model(image_batch,keep_prob=1.0))

   

   return result


문자열로 입력받은 이미지 데이타는 배열형이기 때문에, [0] 로 첫번째 이미지를 골라내고 (이미지를 입력할때도 하나만 입력한다.) tf.image_decode_jpeg로 디코딩을 한후에, 타입을 tf_float32 형태의 행렬로 바꿔준다. 원래 우리가 사용했던 학습용 모델의 모양이 batch 형이기 때문에, tf.expand_dim으로 차원을 맞춰준다.

그 다음에 build_model() 함수를 이용하여 image_batch를 입력값으로 넣고 그래프를 그린다. dropout을 하지 않기 때문에, keep_prob=1.0 으로 한다. (build_model은 얼굴 인식 모델을 위해서 CNN 네트워크를 정의한 코드이다.)

build_model에 결과를 마지막에 softmax함수를 정의하여 result값을 리턴하도록 한다.

시그네쳐 정의하기

Tensorflow serving (CloudML inference)를 사용하기 위해서는 Tensorflow serving에 모델의 Input 과 Output 변수를 알려줘야 한다. 이를 시그네쳐라고 하는데,  SignatureDefs 를 이용하여 정의한다. (참고 https://github.com/tensorflow/serving/blob/master/tensorflow_serving/g3doc/signature_defs.md)


SignatureDefs는 용도에 따라서 Classification SignatureDef와 Predict SignatureDef 두가지로 나뉘어 진다. Cassification SignatureDef는 분류 모델에 최적화되어 정의된 시그네쳐로 출력값들이 클래스 종류나 클래스별 정확도등을 옵션으로 가질 수 있고, Predict SignatureDef는 분류 모델뿐 아니라 모든 모델에 범용적으로사용될 수 있는 형태로 입력과 출력값을 정의할 수 있다.


이 예제에서는 Predict Signature Def을 사용하였다.

   inputs = {'image': images}

   input_signatures = {}

   for key, val in inputs.iteritems():

     predict_input_tensor = meta_graph_pb2.TensorInfo()

     predict_input_tensor.name = val.name

     predict_input_tensor.dtype = val.dtype.as_datatype_enum

     input_signatures[key] = predict_input_tensor


코드에서는 images placeholder를 입력값으로 하여 “image”라는 이름의 입력 시그네쳐를 생성하였고, 마찬가지로 다음과 같이 출력 값은 prediction 변수를 “prediction”이라는 이름의 시그네쳐로 사용하여 정의하였다.

   outputs = {'prediction': prediction}

   output_signatures = {}

   for key, val in outputs.iteritems():

     predict_output_tensor = meta_graph_pb2.TensorInfo()

     predict_output_tensor.name = val.name

     predict_output_tensor.dtype = val.dtype.as_datatype_enum

     output_signatures[key] = predict_output_tensor


다음, 이렇게 생성한 시그네쳐 변수들을 ‘image’,’prediction’ 을 add_to_colleciton을 이용하여 텐서플로우 그래프에 추가하였다.


   inputs_name, outputs_name = {}, {}

   for key, val in inputs.iteritems():

     inputs_name[key] = val.name

   for key, val in outputs.iteritems():

     outputs_name[key] = val.name

   tf.add_to_collection('inputs', json.dumps(inputs_name))

   tf.add_to_collection('outputs', json.dumps(outputs_name))

체크포인트 데이타 로딩해서 Export 용 그래프에 채워넣기

Export할 그래프가 완성되었으면 여기에 학습된 값을 채워넣으면 된다.

학습된 값은 학습후에, 체크 포인트 파일에 저장되어있기 때문에, 이 체크 포인트 파일을 다시 로딩하자


init_op = tf.global_variables_initializer()

   sess.run(init_op)


   # Restore the latest checkpoint and save the model

   saver = tf.train.Saver()

   saver.restore(sess, checkpoint)


모델 저장

다음 최종적으로 모델을 저장하면 된다.

   predict_signature_def = signature_def_utils.build_signature_def(

       input_signatures, output_signatures,

       signature_constants.PREDICT_METHOD_NAME)


앞서 정의한 input,output 시그네쳐를 가지고, Predict Signature Def를 정의한다.

다음 SavedModelBuilder를 만들어서 디렉토리를 지정하고, add_meta_graph_and_variables 메서드를 이용하여, 정의한 시그네쳐를 넘겨주고, assets_collection을 통해서 그래프 값을 넘긴후, 최종적으로 save() 메서드를 이용하여 그 값을 저장한다.

   build = builder.SavedModelBuilder(model_dir)

   build.add_meta_graph_and_variables(

       sess, [tag_constants.SERVING],

       signature_def_map={

           signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:

               predict_signature_def

       },

       assets_collection=tf.get_collection(tf.GraphKeys.ASSET_FILEPATHS))

   build.save()


저장된 모델 확인

모델 저장이 완료되면 Export 디렉토리데 다음과 같이 파일들이 생성된다

  • saved_model.pb (파일) : 그래프를 저장하고 있는 모델 바이너리 파일이다.

  • variables (디렉토리) : 디렉토리로 변수 값을 저장하고 있는 파일들이 저장되어 있다.

정리

텐서플로우 자료나 튜토리얼을 보면 대부분 모델을 만들고 학습 하는 정도만 있고 Prediction(또는 Inference)는 대부분 체크포인트에 저장된 값을 그래프로 복원하는 방식을 사용하고 있지 Tensorflow Serving등을 사용하는 자료가 별로 없다. 그래서 정리를 해봤는데, 생각보다 어렵기는 하지만 코드를 찬찬히 살펴보니 Signature와 Graph Collection 을 개념을 이해하고 나면 여러 예제코드를 보면서 진행하면 어느정도 할 수 있지 않을까 싶다. 개념 자체가 어려운것 보다는 이를 지원하는 예제나 문서가 적기 때문이라고 보는데,이것도 텐서플로우가 활성화되는 중이니 많은 예제가 나오지 않을까 기대해 본다.


다음 글에서는 이번에 Export 한 모델 (*.pb)을 이용하여 구글 CloudML을 통해서 예측 (Inference) 하는 방법에 대해서 알아보겠다.


참고 자료


저작자 표시 비영리
신고

Wide and deep network 모델 활용하기

빅데이타/머신러닝 | 2017.07.20 17:12 | Posted by 조대협


Wide & deep model 알아보기

조대협 (http://bcho.tistory.com)

Wide & deep model

이글에 설명된 예제는 https://www.tensorflow.org/tutorials/wide_and_deep  문서에 있는 코드를 활용하였습니다. 음식 검색 키워드와 검색 결과를 학습 시킨 후에 이 결과를 기반으로 사용자에게 음식을 추천해주는 서비스가 있다고 하자.

Monetization and Wide model (기억과 와이드 모델)

로지스틱 회귀 모델을 이용하여 추천 알고리즘을 작성하여 학습을 시킨 경우, 학습 데이타를 기반으로 상세화된 예측 결과를 리턴해준다. 예를 들어 검색 키워드 (프라이드 치킨)으로 검색한 사용자가 (치킨과 와플)을 주문한 기록이 많았다면, 이 모델은 (프라이드 치킨)으로 검색한 사용자는 항상 (치킨과 와플)을 추천해주게 된다.  즉 예전에 기억된 값 (Memorization된 값)을 통해서 예측을 하는데, 이러한 모델을 와이드 모델이라고 한다.



<그림 와이드 모델 >

그러나 (프라이드 치킨)으로 검색한 사용자에게 같은 패스트 푸드 종류인 햄버거나 프렌치프라이등을 추천해도 잘 구매가 되지만 와이드 모델은 기존에 기억된 결과로만 추천을 하기 때문에 이러한 결과를 얻기가 어렵다.


Generalization and Deep model (일반화와 딥모델)

뉴럴네트워크 모델의 경우 프라이드 치킨을 햄버거, 프랜치 프라이등을 일반화 시켜서 패스트 푸드로 분류하여 프라이드 치킨으로 검색을 해도 이와 같은 종류의 햄버거를 추천해도 사용자가 택할 가능성이 높다.


<그림 딥 모델>


이러한 모델을 딥모델이라고 하는데, 딥 모델의 경우 문제점이, 너무 일반화가(under fitting)  되서 엉뚱한 결과가 나올 수 있다는 것인데, 예를 들어서 따뜻한 아메리카노를 검색했는데, 커피라는 일반화 범주에서 아이스 라떼를 추천해줄 수 있다는 것이다. 즉 커피라는 일반화 범주에서 라떼는 맞는 추천일 수 있지만, 따뜻한 음료를 원하는 사람에게 차가운 음료를 추천하는 지나친 일반화가 발생할 수 있다.


그래서 이런 문제를 해결하기 위해서 와이드 모델과 딥모델을 합친 “Wide & deep model”이라는 것을 구글이 개발하였고 이를 구글 플레이 스토어에 적용한 결과, 큰 효과를 얻었다고 한다. (https://arxiv.org/abs/1606.07792)


<그림 와이드 앤 딥모델 >


모델 사용 방법

이 모델이 텐서플로우에서 tf.contrib.learn 패키지에 라이브러리 형태로 공개가 되었다.

Classification 용은 tf.contrib.learn.DNNLinearCombinedClassifier

Regression 용은 tf.contrib.learn.DNNLinearCombinedRegressor

를 사용하면 된다.


이 라이브러리들은 텐서플로우의 Esimator API (https://www.tensorflow.org/extend/estimators)인데, 복잡한 알고리즘을 구현할 필요 없이 불러다 쓸 수 있는 하이레벨 API 이면서 학습에서 중요한 다음 두가지를 도와준다.

  • 분산러닝
    멀티 GPU나 멀티 머신에서 분산학습을 하려면 직접 텐서플로우 코드를 써서 작업 분산 및 취합 작업을 해줘야 하는데, Estimator API를 사용할 경우 Experiment API 를 통해서 Google CloudML 인프라 상에서 이런 작업을 자동으로 해준다.

  • 모델 EXPORT
    그리고 학습된 모델은 운영환경에서 예측용으로 사용할때, 모델을 Export 하여 Tensorflow Serving 과 같은 예측 엔진에 배포해야 하는데, 모델을 Export 하려면, 예측에 사용할 텐서플로우 그래프를 다시 그려주고 변수 값을 채워넣는 것에 대한 코드를 작성해야 하는데 (자세한 설명은 http://bcho.tistory.com/1183 문서 참조), 이 역시도 자동화를 해준다.


자 이제 머신러닝 모델은  있으니 여기에 데이타 즉 적절한 피쳐만 제대로 넣어서 학습을 시키면 되는데, 와이드 모델과 딥모델 각각 학습 하기 좋은 피쳐가 따로 있다.

와이드 모델 학습용 피쳐

와이드 모델에는 카테고리(분류)와 같은 비연속성을 가지는 데이타가 학습에 적절하다. 카테고리성 컬럼의 경우에는 다음과 같이 크게 두 가지가 있다.

Sparse based column

성별, 눈동자의 색깔과 같이 비연속성을 지니는 값으로 학습에 사용하려면 이를 벡터화를 해야 한다.

예를 들어 남자 = [1,0] 여자는 = [0,1] 식으로 또는 검정눈 = [1,0,0], 갈색눈 = [0,1,0], 푸른눈 = [0,0,1] 식으로 벡터화할 수 있다.

이때는 다음과 같이 sparse_column_with_keys라는 메서드를 써주면 위와 같은 방식으로 인코딩을 해준다.

gender = tf.contrib.layers.sparse_column_with_keys(column_name="gender", keys=["Female", "Male"])

만약에 나이와 같이 연속형 데이타라도 이를 10대,20대,30대와 같이 구간으로 나눠서 비연속성 분류 데이타로 바꾸고자 할 경우에는 다음과 같이 bucketized_column을 사용하면 된다.

age_buckets = tf.contrib.layers.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])

Crossed column

다음은 crossed column 이라는 피쳐인데, 예를 들어 교육 수준과, 직업이라는 피쳐가 있다고 하자. 이를 각각의 독립된 변수로 취급할 수 도 있지만, 교육수준과 직업에 상관 관계가 있다고 할때 이를 관계를 묶어서 피쳐로 사용할 수 있다. 예를 들어 대졸 사원의 연봉, 컴퓨터 프로그래머의 연봉과 같이 독립된 특징으로 보는것이 아니라 대졸 컴퓨터 프로그래머, 대학원졸 컴퓨터 프로그래머와 같은 상관 관계를 기반으로 피쳐를 사용할 수 있는데 이를 Crossed column이라고 한다. Cross column은 다음과 같이 crossed_colmn이라는 메서드를 이용해서 정의할 수 있다.

tf.contrib.layers.crossed_column([education, occupation], hash_bucket_size=int(1e4))

딥 모델 학습용 피쳐

딥 모델용 학습데이타는 연속성을 가지는 데이타가 적절하다.

Continuous column

Continuous column은 일반적인 연속형 데이타 변수이고 간단하게 real_valued_column 메서드를 정해서 다음과 같이 정의가 가능하다.

age = tf.contrib.layers.real_valued_column("age")

Embedding column

문장의 단어들을 학습 시키기 위해서 각 단어를 벡터로 표현하고자 할때 , 예를 들어 boy = [1,0,0,0..], girl=[0,1,0,...] 으로 단어 하나를 하나의 숫자로 1:1 맵핑을 시킬 수 있다. 그러나 이 경우 이 단어가 다른 단어와 어떤 상관 관계를 갖는지 표현이 불가능하다. 예를 들어 남자:소년=여자:?? 라는 관계식을 줬을때, 위의 방식으로는 단어간의 관계를 유추할 수 없기 때문에, ?? 를 찾아낼 수 없다. 즉 컴퓨터가 “단어가 다른 단어와 어떤 차이점과 공통점”을 가지는지 이해할 수가 없다는 단점이 존재한다.

이런 문제를 해결하기 위해서 단어를 다차원 공간에서 벡터로 표현하여 각 단어간의 관계를 표현할 수 있는 방법을 만들었다.

이와 같은 원리로 어떤 비연속된 카테고리 피쳐들을 숫자로 맵핑할때, 위의 boy,girl 과 같은 방식 (on_hot_encoding) 으로 의미없이 1:1 맵핑을 하는 것이 아니라, 각 카테고리들이 어떠한 연관 관계를 가질때 이 연관성을 표현하여 벡터값으로 변환하는 방법을 임베딩 (embedding)이라고 한다.


그래서 카테고리내의 값들이 서로 연관성을 가질때는 임베딩을 이용하여 벡터 값으로 변경을 한 후, 이 값을 딥모델에 넣어서 학습하면 좋은 결과를 얻을 수 있다. 카테고리화된 값을 임베딩하기 위해서는 아래와 같이 embedding_column이라는 메서드를 사용하면 된다.


tf.contrib.layers.embedding_column(education, dimension=8)

피쳐를 모델에 넣는 방법

위와 같은 방법으로 분리되고 변경된 피쳐는, Wide & deep model에서 각각 와이드 모델과, 딥모델로 주입되서 학습되게 된다.

아래와 같이 피쳐를 와이드 컬럼과 딥 컬럼으로 구별한 후에, 리스트에 넣는다.

wide_columns = [
 gender, native_country, education, occupation, workclass, relationship, age_buckets,
 tf.contrib.layers.crossed_column([education, occupation], hash_bucket_size=int(1e4)),
 tf.contrib.layers.crossed_column([native_country, occupation], hash_bucket_size=int(1e4)),
 tf.contrib.layers.crossed_column([age_buckets, education, occupation], hash_bucket_size=int(1e6))

deep_columns = [
 tf.contrib.layers.embedding_column(workclass, dimension=8),
 tf.contrib.layers.embedding_column(education, dimension=8),
 tf.contrib.layers.embedding_column(gender, dimension=8),
 tf.contrib.layers.embedding_column(relationship, dimension=8),
 tf.contrib.layers.embedding_column(native_country, dimension=8),
 tf.contrib.layers.embedding_column(occupation, dimension=8),
 age, education_num, capital_gain, capital_loss, hours_per_week]

다음 딥모델용 피쳐 리스트와 와이드 모델용 피쳐 리스트를 DNNLinearCombinedClassifier 에 각각 변수로 넣으면 된다. 이때 딥 모델은 뉴럴네트워크이기 때문에, 네트워크의 크기를 정해줘야 하는데 아래 코드에서는 각각 크기가 100인 히든 레이어와 50인 레이어 두개를 넣어서 구성하도록 하였다.

m = tf.contrib.learn.DNNLinearCombinedClassifier(
   model_dir=model_dir,
   linear_feature_columns=wide_columns,
   dnn_feature_columns=deep_columns,
   dnn_hidden_units=[100, 50])



지금 까지 아주 간단하게 나마 Wide & deep model에 대한 이론 적인 설명과 이에 대한 구현체인 DNNLinearCombinedRegressortf.contrib.learn.DNNLinearCombinedClassifier 에 대해서 알아보았다.  이 정도 개념만 있으면 실제 Wide & deep model 튜토리얼을 이해할 수 있으니, 다음은 직접 튜토리얼을 참고하기 바란다. https://www.tensorflow.org/tutorials/wide_and_deep


Reference


저작자 표시 비영리
신고

구글 클라우드 서버의 HTTP 포트를 SSH 로 터널링해서 로컬에서 접속하기


조대협 (http://bcho.tistory.com)


구글 클라우드 VM에 서버를 설치한 후 웹을 접근하고자 할때, 설치한 애플리케이션이 ACL (접근 제어) 처리가 안되어 있는 애플리케이션이 있을 수 있다. 특히 관리자 콘솔 같은 경우에 이런 경우가 많은데, 아파치 에어플로우 역시도 설치 후에 웹 서버를 띄우면 포트가 모두 퍼블릭으로 오픈되기 때문에 관리자만 액세스가 가능하도록 ACL 처리를 할 필요가 있다.


이를 위한 방법으로는 몇가지가 있는데


  1. 방화벽으로 특정 포트만 허용 하는 방법
  2. 앞에 nginx나 apache를 넣어서 HTTP BASIC AUTH등 인증 방식을 추가하는 방법
  3. Google Cloud Identity Aware Proxy를 이용하여, 구글 클라우드 계정 사용자에게 접근 권한을 부여 하는 방법
  4. 해당 HTTP 포트를 SSH로 터널링 하는 방법

1번 방법은 IP 가 바뀌면 접근 제어 하기가 번거로우니 패스, 2번은 웹서버 깔아야 하니 패스, 3번은 제일 좋은 방법인데, 로드밸런서등 구체적인 설정을 해야 해서 패스. 그래서 오늘은 가장 간단한 4번 방법을 설명한다.

4번 방법은 로컬에서 localhost:2222를 접속하면 구글 클라우드 상의 인스턴스:8080 으로 포워딩을 해준다. 이때 프로토콜을 SSH를 통해서 터널링이 된다.

매우 간단하게 할 수 있는데, 로컬에 gcloud SDK가 깔려 있을때

gcloud compute ssh {인스턴스명} --project {내프로젝트ID} --zone {인스턴스가 있는 존 이름} --ssh-flag="-L" --ssh-flag="{랩탑에서 접속할 포트번호}:localhost:{인스턴스의 포트 번호}"

로 기입해주면 된다.

예를 들어 terrycho-ml 프로젝트의 us-central1-f 존에 있는 hello-airflow 인스턴스의 8080 포트를 로컬에서  localhost:2222로 접근하도록 포워딩 설정을 할 경우 다음과 같이 하면 된다.


gcloud compute ssh hello-airflow --project terrycho-ml --zone us-central1-f --ssh-flag="-L" --ssh-flag="2222:localhost:8080"

저작자 표시 비영리
신고

블로그 400만 돌파

사는 이야기 | 2017.07.17 11:25 | Posted by 조대협


바뻐서. 400만 돌파도 로깅을 못했네.

2017년  7월 17일 405만명 돌파.

2016년10월20일300만 돌파

2015년12월16일 200만 돌파


저작자 표시 비영리
신고

'사는 이야기' 카테고리의 다른 글

지난 1년 회고  (0) 2017.09.20
블로그 400만 돌파  (2) 2017.07.17
2016년 업무 종료....  (3) 2016.12.29
세번째 책이 나왔습니다.  (3) 2016.08.29
2015.12.16 기록 블로그 방문자수 200만 돌파  (1) 2015.12.16
조직 문화에 대한 메모  (0) 2015.09.25

Apache airflow


조대협 (http://bcho.tistory.com)

배경

빅데이타 분석이나, 머신러닝 코드를 만들다 보면 필요한것중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이타 베이스의 ETL 작업과 비슷한 흐름이라고 보면 된다.


예를 들어 머신러닝의 학습 과정을 보면 데이타 전처리,학습,배포,예측과 같은 단계를 가지게 된다.


  • rawdata를 읽어서 preprocessing 단계를 거쳐서 학습에 적절한 training data로 변경하고,

  • 변경된 training data를 가지고 머신러닝 모델을 학습한후, 학습된 모델을 저장한다.

  • 학습된 모델을 가지고 예측을 해서 결과를 저장한다.


이렇게 머신러닝은 여러개의 단계를 거쳐서 수행이 되는데, 각 단계가 끝나면 다음 단계를 수행해야 한다. 단순하게 CRON+쉘로 순차적으로 수행하는 것등이 가능하지만, 에러가 났을때 재처리를 하거나 , 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.

데이타 워크 플로우 관리 도구

이런 요구 사항 때문에 여러가지 툴이 개발되었는데, 대표적인 도구로는 하둡 에코시스템에 우지(oozie ) 등이 있다.



<그림. Oozie eclipse 클라이언트 >


하둡의 여러 에코 시스템 솔루션들을 유기적으로 조합하기 위해서 개발된 도구로, 하둡 에코 시스템에 있는 여러가지 다양한 솔루션과 연동하기 위한 아답터를 가지고 있다.

이외에도 rundeck, luigi와 같은 유사한 솔루션들이 있다.

오늘 소개하고자 하고자하는 데이타 워크 플로우 관리도구는 아파치 오픈소스 airflow 이다. 원래 airbnb에서 개발된 도구로 현재 아파치 오픈소스에서 인큐베이터 단계에 있는 소프트웨어이다.


airflow를 소개하는 이유는 첫번째 파이썬 기반으로 태스크 코드를 작성할 수 있기 때문에, 데이타 분석이나 머신러닝을 개발하는 엔지니어들에게 익숙한 언어이고, 한대에서 동작하는게 아니라 여러 머신에 분산하여 수행 될 수 있는 장점을 가지고 있다.



<그림. Apache airflow 의 작업 그래프 구조 화면 >

airflow 시작하기

그러면 간단하게 airflow에 대한 개념과 사용법에 대해서 알아보자

airflow 설치

airflow는 실행되는 작업의 상태등을 저장하기 위해서 데이타 베이스 (MySQL이나 Postgres)등이 필요하며, 분산 환경을 위해서 여러대에 설치할 수 있다. 또한 로컬 환경에 sqlite와 함께 간단하게 설치할 수 있다. 여기서는 간단하게 개인 맥북환경에 로컬로 설치 및 실행하는 시나리오로 설명한다.


설치 방법은 매우 간단하다. 파이썬 2.7 환경에서 아래와 같이 간단학 “pip install airflow”만 실행해주면 된다.

%pip install airflow



airflow가 설치되었으면 데이타 베이스 설정을 해줘야 하는데, 이 튜토리얼에서는 개발 및 테스트를 위해서 sqlite를 사용한다. sqlite를 초기화 하기 위해서 다음과 같이 “airflow initdb” 명령을 실행한다.

% airflow initdb


자아 이제 설치가 끝나고 airflow를 사용할 준비가 되었다. 이제 airflow 웹콘솔을 기동해보자

“airflow webserver -p 8080” 을 실행하고 웹에 http://localhost:8080에 접근하면 airflow 콘솔을 볼 수 있다.

airflow 코드

airflow에서 워크플로우를 저장하기 위해서 몇가지 추상화된 개념을 사용한다.

Airflow DAG의 구조

DAG (Directed Acyclic Graph)

DAG는 하나의 워크 플로우라고 보면 된다. 위의 예제처럼, 머신러닝 이라는 DAG를 정의한다면, Preprocessing,Training,Prediction 워크플로우가 하나의 DAG가 된다.

Operator and Task

Operator는 DAG안에서 정의되는 작업 함수(함수임을 주의하자) 이야기 하는데, Pre processing, Training, Prediction 함수가 Operatorator 이다.

이 Operator 함수가 DAG 상에서 실제로 워크플로우 상에 정의되서 호출 되면 이것이 Task 이다.

객체지향 언어에서 Operator가 class 라면, Task는 object 라고 보면 된다.


이해가 잘안될 수 있는데, 코드를 보자


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator

from datetime import datetime,timedelta


dag = DAG('hello-airflow',description='Hello airflow DAG',

         schedule_interval = '*/5 0 * * *',

         start_date=datetime(2017,07,01),catchup=False)


def print_hello():

   return 'Hello Airflow'


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


bash_task.set_downstream(python_task)


DAG 정의 부분을 보자. DAG 객체는 DAG에 대한 전체 컨택스를 저장 및 유지 관리한다.

DAG('hello-airflow',description='Hello airflow DAG', 에서 DAG를 이름을 ‘hello-airflow’로 정의하고 description에 설명을 적는다.

schedule_interval = '*/5 * * * *', 다음으로 이 DAG가 실행되는 주기를 정해야 하는데, cron 명령과 같은 노테이션으로 정의한다. 위 설정은 매 5분마다 실행되도록 하는 설정이다.

마지막으로, start_date=datetime(2017,07,01), ,DAG를 언제부터 시작할것인지 지정한다. DAG는 반드시 전역 변수로 지정한다. DAG안에서 다른 DAG를 부르는 sub DAG의 경우에는 지역 변수로 지정이 가능하다.


다음 task에 사용할 operator를 정의하는데, 파이썬 코드를 실행할 오퍼레이터인 PythonOperator와 쉘 커맨드를 실행할 BashOperator를 가지고 각각 파이썬 태스크 python_task와, 쉘 태스크 bash_task를 정의한다.


python_task = PythonOperator(

                   task_id='python_operator',

                   python_callable = print_hello,

                   dag = dag)


파이썬 태스크의 id는 “python_operator”라고 지정하였고, 실행시 print_hello를 호출하도록 하였다.

그리고 이 태스크는 DAG인 dag에 지정한다.


다음 쉘 태스크의 내용은 다음과 같다.

bash_task = BashOperator(

       task_id='print_date',

       bash_command='date',

       dag=dag)


print_data라는 이름으로 태스크를 정의하고, 쉘 명령어 date를 실행하도록 하였다.

등록

코드 작성이 끝나면 코드를 배포해보자. Dag 파일을 airflow에 등록해야 하는데, dag 파일을 저장하는 장소는 dags_folder 라는 변수로 $AIRFLOW_HOME/airflow.cfg 파일안에 정의 되어 있다. 디폴트 장소는 $AIRFLOW_HOME/dags/ 폴더이다. 위에서 작성한 코드를 해당 디렉토리에 복사하자

다음 dag이 제대로 등록되었는지를 확인한다. 커멘드 창에서 “airflow list_dags”라는 명령을 수행하면 현재 등록되어 있는 DAG 목록을 볼 수 있다. 아래 그림과 같이 hello-airflow dag가 등록된것을 확인할 수 있다.




hello-airflow dag안에 어떤 태스크들이 정의되어 있는지를 확인하려면 ‘airflow list_tasks hello-airflow’ 명령을 이용하면 hello-airflow 안에 등록된 태스크 목록을 출력해준다.


테스트

테스트를 하려면 태스크 단위로 테스트가 가능하다. airflow test {DAG ID} {태스크 ID} {실행날짜} 식으로 하면 된다.

, 예를 들어 print_date 태스크를 2017-07-01을 기준으로 실행하고자하면 airflow test hello-airflow print_date 2017-07-01

Hello-airflow DAG안에 print_date라는 태스크를 실행한다.



실행

DAG 코드 개발 등록과 테스트가 완료되었으면 이제 airflow scheduler 를 띄워준다. (일종의 데몬이다.) 스케쥴러는 DAG 코드에 정의된 스케쥴에 따라서 테스크를 실행해준다.

스케쥴러 실행은 간단하게 airflow scheduler 명령을 실행하면 된다.



스케쥴러가 실행되면, 각 DAG의 스케쥴에 따라서 자동으로 태스크들을 수행한다.


로그 모니터링

스케쥴러에 의해서 실행되는 DAG와 태스크들의 결과와 로그는 어떻게 모니터링 할까? airflow에 의해서 수행되는 태스크들은 $AIRFLOW_HOME/logs 디렉토리에 저장된다.

logs 디렉토리 아래에 각각 DAG 이름으로 저장이 되며, DAG 이름으로 된 디레토리안에는 태스크명으로 된 서브 디렉토리가 있고, 이 서브 디렉토리 아래에 시간대별 로그가 있다.

즉 hello-airflow DAG의 print_date 태스크에 대한 로그는 $AIRFLOW_HOME/logs/hello-airflow/print_date/{날짜및시간} 파일 명으로 저장된다.

웹 콘솔을 이용한 모니터링

airflow의 강력한 기능중의 하나는 웹 기반의 모니터링 콘솔을 제공한다. 뒤에서는 주요 웹 콘솔의 주요 기능에 대해서 알아보도록 한다.

Graph View

Graph View는 DAG의 구조를 그래프 형태로 보여주는 뷰이다.


복잡한 워크플로우의 경우 그 구조를 파악하는데 유용한다. 위의 그림은 앞서 만든 hello-airflow 에 대한 태스크간 그래프로 print_date를 호출한 후에, python_operator 태스크를 호출하는 것을 볼 수 있다.

Tree View


트리뷰를 보면, DAG의 구조를 트리 형태로 보여주고, DAG의 태스크가 각각 성공했는지 실패 했는지를 우측 그래프 처럼 표현해준다. 각 태스크를 로그를 보려면 각 태스크 실행 결과 그래프를 누르면 아래와 같이 세부 메뉴가 나온다.



여기서 View Log를 누르면 각 Task 별로 실행 당시의 로그를 볼 수 있다. 아래는 Python_Operator 태스크를 실행한 로그이다.



아래서 두번째 줄을 보면 Hello Airflow 라는 문자열을 리턴한것을 확인할 수 있다.


Task Duration

Task duration은 DAG에서 수행된 각 태스크의 수행 시간을 그래프 형태로 나타내준다.



어떤 태스크가 시간이 많이 걸리는지 그리고 수행시간이 매번 수행할때 마다 올바른지 (큰 변화가 없고 일정한지. 이건 매우 유용할듯) 등을 체크할 수 있다.

Task Tries


Task Tries 에서는 각 수행별로 각각의 태스크를 수행한 횟수를 그래프로 보여준다. 즉 재시도 (RETRY)횟수를 모니터링할 수 있다.


Gantt


Gantt 차트는 각 수행에 대해서 태스크들의 수행 순서에 따라서 소모된 시간과 함께 간트 차트로 표시해준다.

앞의 차트에서 이미 얻을 수 있는 뷰이지만, 각 태스크의 수행 순서와 태스크당 시간을 한꺼번에 보여주기 때문에 병목 구간 파악이 쉽다.


<그림 airflow gantt chart 그래프 예제 (출처 : https://www.agari.com/airflow-agari/) >


이미 링크드인의 Azkaban이나, 스포티파이의 Luigi, 하둡의 Oozie 등 여러가지 워크 플로우 관리 시스템이 있지만, 아직 인큐베이터 단계인 airflow를 주목하는 이유는 분산 환경 지원이 가능하고, 태스크에 대한 스크립트를 파이썬을 사용할 수 있기 때문에, 각종 빅데이타 분석 시스템이나 라이브러리 그리고 머신러닝 시스템과 연동이 쉽고, 파이썬 언어만 알면 쉽게  정교한 플로우 개발이 가능하기 때문에, ( XML등의 설정을 하지 않고도) 활용 가능성이 높다.

저작자 표시 비영리
신고