블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기


조대협 (http://bcho.tistory.com)



개요

텐서플로우 학습에 있어서 데이타 포맷은 학습의 성능을 결정 짓는 중요한 요인중의 하나이다. 특히 이미지 파일의 경우 이미지 목록과 이미지 파일이 분리되어 있어서 텐서플로우에서 학습시 이미지 목록을 읽으면서, 거기에 있는 이미지 파일을 매번 읽어야 하기 때문에, 코딩이 다소 지저분해지고,IO 성능이 떨어질 수 있다

텐서플로우에서는 이러한 학습 데이타를 쉽게 읽을 수 있도록 tfrecord (http://bcho.tistory.com/1190)라는 파일 포맷을 지원한다.


이 글에서는 이미지 데이타를 읽어서 tfrecord 로 컨버팅하는 방법을 설명하며, 분산 데이타 처리 프레임웍인 오픈소스 Apache Beam을 기준으로 설명하나, tfrecord 변환 부분은 Apache Beam과 의존성이 없이 사용이 가능하기 때문에, 필요한 부분만 참고해도 된다. 이 Apache Beam을 구글의 Apache Beam 런타임 (매니지드 서비스)인 구글 클라우드의 Dataflow를 이용하여, 클러스터를 이용하여 빠르게 데이타를 처리하는 방법에 대해서 알아보도록 한다.


전체 코드는 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/4.%20Convert%20Pickle%20file%20to%20TFRecord%20by%20using%20Apache%20Beam.ipynb 에 있다.


이 코드는 CIFAR-10 이미지 데이타를 Apache Beam 오픈 소스를 이용하여, 텐서플로우 학습용 데이타 포맷인  tfrecord 형태로 변환 해주는 코드이다.


Apache Beam은 데이타 처리를 위한 프레임웍으로, 구글 클라우드 상에서 실행하거나 또는 개인 PC나 Spark 클러스터상 여러 환경에서 실행이 가능하며, 구글 클라우드 상에서 실행할 경우 오토스케일링이나 그래프 최적화 기능등으로 최적화된 성능을 낼 수 있다.


CIFAR-10 데이타 셋은 32x32 PNG 이미지 60,000개로 구성된 데이타 셋으로 해당 코드 실행시 최적화가 되지 않은 상태에서 약 16분 정도의 처리 시간이 소요된다. 이 중 6분 정도는 Apache Beam 코드를 구글 클라우드로 업로드 하는데 소요되는 시간이고 실제 처리시간은 10분정도가 소요된다. 전처리 과정에 Apache Beam을 사용하기 전에 고려해야 할 요소는 다음과 같다.

  • 데이타가 아주 많아서 전처리 시간이 수시간 이상 소요될 경우 Apache Beam + Google Cloud를 고려하여 여러 머신에서 동시에 처리하여 빠른 시간내에 수행되도록 할 수 있다.

  • 데이타가 그다지 많지 않고 싱글 머신에서 멀티 쓰레드로 처리를 원할 경우에는 Apache Beam으로 멀티 쓰레드 기반의 병렬 처리를 하는 방안을 고려할 수 있다. 이 경우 클라우드에 대한 의존성을 줄일 수 있다.

  • 다른 대안으로는 Spark/Hadoop 등의 오픈소스를 사용하여, On Prem에서 여러 머신을 이용하여 전처리 하는 방안을 고려할 수 있다.

여기서는 아주 많은 대량의 이미지 데이타에 대한 처리를 하는 것을 시나리오로 가정하였다.

전처리 파이프라인

Apache Beam을 이용한 데이타 전처리 파이프라인의 구조는 다음과 같다.

이미지 파일 준비

CIFAR-10 데이타셋 원본은 이미지 파일 형태가 아니라 PICKLE이라는 파일 포맷으로 되어 있기 때문에,  실제 개발 환경에서는 원본데이타가 이미지인것으로 가정하기 위해서 https://github.com/bwcho75/cifar-10/tree/master/pre-processing 의 1~2번 코드를 통해서 Pickle 파일을 이미지 파일로 변경하고, *.csv 파일에 {파일명},{레이블} 형태로 인덱스 데이타를 생성하였다.

생성된 이미지 파일과 *.csv 파일은 gsutil 명령어를 이용하여 Google Cloud Storage (aka GCS)에 업로드 하였다. 업로드 명령은 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/2.%20Convert%20CIFAR-10%20Pickle%20files%20to%20image%20file.ipynb 에 설명되어 있다.


전처리 파이프라인의 구조

Apache Beam으로 구현된 파이프라인의 구조는 다음과 같다.


1. TextIO의 ReadFromText로 CSV 파일에서 한 라인 단위로 문자열을 읽는다.

2. parseLine에서 라인을 ,로 구분하여 filename과 label을 추출한다.

3. readImage 에서 filename을 가지고, 이미지 파일을 읽어서, binary array 형태로 변환한다.

4. TFExampleFromImageDoFn에서 이미지 바이너리와 label을 가지고 TFRecord 데이타형인 TFExample 형태로 변환한다.

5. 마지막으로 TFRecordIOWriter를 통해서 TFExample을 *.tfrecord 파일에 쓴다.

코드 주요 부분 설명

환경 설정 부분

이 코드는 구글 클라우드와 로컬 환경 양쪽에서 모두 실행이 가능하도록 구현되었다.

SRC_DIR_DEV는 로컬환경에서 이미지와 CSV 파일이 위치한 위치이고, DES_DIR_DEV는 로컬환경에서 tfrecord 파일이 써지는 위치이다.

구글 클라우드에서 실행할 경우 파일 저장소를  GCS (Google Cloud Storage)를 사용한다. DES_BUCKET은 GCS 버킷 이름이다. 코드 실행전에 반드시 구글 클라우드 콘솔에서 GCS 버킷을 생성하기 바란다.  SRC_DIR_PRD와 DES_DIR_PRD는 GCS 버킷내의 각각 image,csv 파일의 경로와 tfrecord 파일이 써질 경로 이다. 이 경로에 맞춰서 구글 클라우드 콘솔에서 디렉토리를 먼저 생성해 놓기를 바란다.




PROJECT는 구글 클라우드 프로젝트 명이고, 마지막으로 DEV_MODE가 True이면 로컬에서 수행이되고 False이면 구글 클라우드에서 실행하도록 하는 환경 변수이다.

의존성 설정 부분

로컬에서 실행할 경우필요한  파이썬 라이브러리가 이미 설치되어야 있어야 한다.

만약에 구글 클라우드에서 실행할 경우 이 Apache Beam 코드가 사용하는 파이썬 모듈을 명시적으로 정의해놔야 한다. 클라우드에서 실행시에는 Apache Beam 코드만 업로드가 되기 때문에(의존성 라이브러리를 같이 업로드 하는 방법도 있는데, 이는 추후에 설명한다.), 의존성 라이브는 구글 클라우드에서 Dataflow 실행시 자동으로 설치할 수 있도록 할 수 있는데, 이를 위해서는 requirements.txt 파일에 사용하는 파이썬 모듈들을 정의해줘야 한다. 다음은 requirements.txt에 의존성이 있는 파이썬 모듈등을 정의하고 저장하는 부분이다.


Apache Beam 코드

Apache Beam의 코드 부분은 크게 복잡하지 않기 때문에 주요 부분만 설명하도록 한다.

Service account 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 코드 실행에 대한 권한을 줘야 한다. 구글 클라우드에서는 사용자가 아니라 애플리케이션에 권한을 부여하는 방법이 있는데, Service account라는 것을 사용한다. Service account는 json 파일로 실행 가능한 권한을 정의하고 있다.

Service account 파일을 생성하는 방법은 http://bcho.tistory.com/1166 를 참고하기 바란다.

Service account 파일이 생성되었으면, 이 파일을 적용해야 하는데 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 Service account  파일의 경로를 정의해주면 된다. 파이썬 환경에서 환경 변수를 설정하는 방법은 os.envorin[‘환경변수명']에 환경 변수 값을 지정해주면 된다.

Jobname 설정

구글 클라우드에서 Apache Beam 코드를 실행하면, 하나의 실행이 하나의 Job으로 생성되는데, 이 Job을 구별하기 위해서 Job 마다 ID 를 설정할 수 있다. 아래는 Job ID를 ‘cifar-10’+시간 형태로 지정하는 부분이다


환경 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 몇가지 환경을 지정해줘야 한다.


  • staging_location은 클라우드 상에서 실행시 Apache Beam 코드등이 저장되는 위치이다. GCS 버킷 아래 /staging이라는 디렉토리로 지정했는데, 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • temp_location은 기타 실행중 필요한 파일이 저장되는 위치이다. 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • zone은 dataflow worker가 실행되는 존으로 여기서는 asia-northeast1-c  (일본 리전의 c 존)으로 지정하였다.


DEV_MODE 에 따른 환경 설정

로컬 환경이나 클라우드 환경에서 실행이냐에 따라서 환경 변수 설정이 다소 달라져야 한다.


디렉토리 경로를 바꿔서 지정해야 하고, 중요한것은 RUNNER인데, 로컬에서 실행하기 위해서는 DirectRunner를 구글 클라우드 DataFlow 서비스를 사용하기 위해서는 DataflowRunner를 사용하면 된다.


readImage 부분

Read Image는 이미지 파일을 읽어서 byte[] 로 리턴하는 부분인데, 로컬 환경이냐, 클라우드 환경이냐에 따라서 동작 방식이 다소 다르다.

클라우드 환경에서는 이미지 파일이 GCS에 저장되어 있기 때문에 파이썬의 일반 파일 open 명령등을 사용할 수 없다.

그래서 클라우드 환경에서 동작할 경우에는 GCS에서 파일을 읽어서 Worker의 로컬 디스크에 복사를 해놓고 이미지를 읽어서 byte[]로 변환한 후에, 해당 파일을 지우는 방식을 사용한다.


아래 코드에서 보면 DEV_MODE가 False 인경우 GCS에서 파일을 읽어서 로컬에 저장하는 코드가 있다.


storageClient는 GCS 클라이언트이고 bucket 을 얻어온후, bucket에서 파일을 get_blob 명령어를 이용하여 경로를 저장하여 blob.download_to_file을 이용하여 로컬 파일에 저장하였다.

실행

코드 작성이 끝났으면 실행을 한다. 실행 상태는 구글 클라우드 콘솔의 Dataflow  메뉴에서 확인이 가능하다.

아래와 같이 실행중인 그리고 실행이 끝난 Job 리스트들이 출력된다.




코드 실행중에, 파이프라인 실행 상황 디테일을 Job 을 선택하면 볼 수 있다.


여기서 주목할만한 점은 우측 그래프인데, 우측 그래프는 Worker의 수를 나타낸다. 초기에 1대로 시작했다가 오토 스케일링에 의해서 9대 까지 증가한것을 볼 수 있다.

처음 실행이었기 때문에 적정한 인스턴스수를 몰랐기 때문에 디폴트로 1로 시작하고 오토스케일링을 하도록 했지만, 어느정도 테스트를 한후에 적정 인스턴수를 알면 오토 스케일링을 기다릴 필요없이 디폴트 인스턴스 수를 알면 처음부터 그 수만큼 인스턴스 수로 시작하도록 하면 실행 시간을 줄일 수 있다.

만약에 파이프라인 실행시 에러가 나면 우측 상단에 LOGS 버튼을 누르면 상세 로그를 볼 수 있다.


아래 그림은 파이프라인 실행이 실패한 예에서 STACK TRACES를 통해서 에러 내용을 확인하는 화면이다.



해당 로그를 클릭하면 Stack Driver (구글의 모니터링 툴)의 Error Reporting 시스템 화면으로 이동하게 된다.

여기서 디테일한 로그를 볼 수 있다.

아래 화면을 보면 ReadImage 단계에서 file_path라는 변수명을 찾을 수 없어서 나는 에러를 확인할 수 있다.


TFRecord 파일 검증

파이프라인 실행이 끝나면, GCS 버킷에 tfrecord 파일이 생성된것을 확인할 수 있다.


해당 파일을 클릭하면 다운로드 받을 수 있다.

노트북 아래 코드 부분이 TFRecord를 읽어서 확인하는 부분이다. 노트북에서 tfrecord 파일의 경로를 다운로드 받은 경로로 변경하고 실행을 하면 파일이 제대로 읽히는 지 확인할 수 있다.


파일 경로 부분은 코드상에서 다음과 같다.



정상적으로 실행이 된 경우, 다음과 같이 tfrecord에서 읽은 이미지와 라벨값이 출력됨을 확인할 수 있다.


라벨 값은 Label 줄에 values 부분에 출력된다. 위의 그림에서는 순서대로 라벨 값이 4와 2가 된다.



텐서플로우 하이레벨 API Estimator를 이용한 모델 정의 방법


조대협 (http://bcho.tistory.com)


텐서플로우의 하이레벨 API를 이용하기 위해서는 Estimator 를 사용하는데, Estimator 는 Predefined model 도 있지만, 직접 모델을 구현할 수 있다. 하이레벨 API와 Estimator에 대한 설명은 http://bcho.tistory.com/1195 글을 참고하기 바란다.


이 문서는 Custom Estimator를 이용하여 Estimator를 구현하는 방법에 대해서 설명하고 있으며, 대부분 https://www.tensorflow.org/extend/estimators 의 내용을 참고하여 작성하였다.

Custom Estimator

Estimator의 스켈레톤 코드는 다음과 같다. 모델을 정의하는 함수는 학습을 할 feature와, label을 입력 받고, 모델의 모드 (학습, 테스트, 예측) 모드를 인자로 받아서 모드에 따라서 모델을 다르게 정의할 수 있다. 예를 들어 학습의 경우 드롭 아웃을 사용하지만 테스트 모드에서는 드롭 아웃을 사용하지 않는다.

def model_fn(features, labels, mode, params):
  # Logic to do the following:
  # 1. Configure the model via TensorFlow operations
  # 2. Define the loss function for training/evaluation
  # 3. Define the training operation/optimizer
  # 4. Generate predictions
  # 5. Return predictions/loss/train_op/eval_metric_ops in EstimatorSpec object
  return EstimatorSpec(mode, predictions, loss, train_op, eval_metric_ops)

입력 인자에 대한 설명

그러면 각 인자를 구체적으로 살펴보자

  • features : input_fn을 통해서 입력되는 feature로 dict 형태가 된다.

  • labels : input_fn을 통해서 입력되는 label 값으로 텐서 형태이고, predict (예측) 모드 일 경우에는 비어 있게 된다.

  • mode : 모드는 모델의 모드로, tf.estimator.ModeKeys 중 하나를 사용하게 된다.

    • tf.estimator.ModeKeys.TRAIN : 학습 모드로 Estimator의 train()을 호출하였을 경우 사용되는 모드이다.

    • tf.estimator.ModeKeys.EVAL : 테스트 모드로, evaluate() 함수를 호출하였을 경우 사용되는 모드이다.

    • tf.estimator.ModeKeys.PREDICT : 예측모드로,  predict() 함수를 호출하였을 경우에 사용되는 모드이다.  

  • param : 추가적으로 입력할 수 있는 패러미터로, dict 포맷을 가지고 있으며, 하이퍼 패러미터등을 이 변수를 통해서 넘겨 받는다.

Estimator 에서 하는 일

Estimator 를 구현할때, Estimator 내의 내용은 모델을 설정하고, 모델의 그래프를 그린 다음에, 모델에 대한 loss 함수를 정의하고, Optimizer를 정의하여 loss 값의 최소값을 찾는다. 그리고 prediction 값을 계산한다.


Estimator의 리턴값

Estimator에서 리턴하는 값은 tf.estimator.EstimatorSpec 객체를 리턴하는데, 이 객체는 다음과 같은 값을 갖는다.

  • mode : Estimator가 수행한 모드. 보통 입력값으로 받은 모드 값이 그대로 리턴된다.

  • prediction (PREDICT 모드에서만 사용됨) : PREDICT 모드에서 예측을 수행하였을 경우, 예측된 값을 dict 형태로 리턴한다.

  • loss (EVAL 또는, TRAIN 모드에서 사용됨) : 학습과 테스트중에 loss 값을 리턴한다.

  • train_op (트레이닝 모드에서만 필요함) : 한 스텝의 학습을 수행하기 위해서 호출하는 함수를 리턴한다. 보통 옵티마이져의  minimize()와 같은 함수가 사용된다.
           optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
           train_op = optimizer.minimize(loss, global_step=global_step)
           return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

  • eval_metrics_ops (optional) : EVAL (테스트) 모드에서 테스트를 위해서 사용된 인자들을 dict 형태로 리턴한다. tf.metrics에는 미리 정의된 일반적인 메트릭들이 정의되어 있는데, 예를 들어 accuracy 등이 이에 해당한다. 아래는 tf.metrics.accuracy를 이용하여 예측값 (predictions)과 라벨(labels)의 값을 계산하여, 메트릭으로 리턴하는 방법이다.

    eval_metric_ops = {
    "accuracy": tf.metrics.accuracy(labels, predictions) }

    만약 rmse를 evaluation metric으로 사용하고자 하면 다음과 같이 정의한다.
    eval_metric_ops = {
       "rmse": tf.metrics.root_mean_squared_error(
           tf.cast(labels, tf.float64), predictions)
    }

    만약에 별도의 메트릭을 정의하지 않으면, 디폴트로 loss 값만 EVAL 단계에서 계산되게 된다.

데이타 입력 처리

모델로의 데이타 입력은 Esitmator의 모델 함수로 입력되는 features 변수를 통해서 입력 된다.

features는 컬럼명으로된 키와, 컬럼 값으로 이루어진 dict 형태의 데이타 형으로, 뉴럴 네트워크 모델에 데이타를 입력하기 위해서는 이중에서 학습에 사용할 컬럼만을 추출하여, 입력 레이어에 넣어 줘야 한다.

이 features 에서 특정 컬럼만을 지정하여 추출한 후에, 그 컬럼의 값을 넣어주는 것은 tf.feature_column.input_layer 함수를 사용하면 된다.


예제를 보자

input_layer = tf.feature_column.input_layer(
 features=features, feature_columns=[age, height, weight])


위의 예제는 features 에서 age,height,weight 컬럼을 추출하여 input layer로 넣는 코드이다.

네트워크 정의

데이타를 읽었으면 이제 뉴럴네트워크를 구성해야 한다. 네트워크의 레이어는 tf.layers 로 간단하게 구현할 수 있다. tf.layer에는 풀링,드롭아웃,일반적인 뉴럴네트워크의 히든 레이어, 컨볼루셔널 네트워크들이 함수로 구현되어 있기 때문에 각 레이어를 하나의 함수로 간단하게 정의가 가능하다.


아래는 히든레이어를 구현하는 tf.layers.dense 함수이다.


tf.layers.dense( inputs, units, activation)


  • inputs는 앞의 레이어를 정의하고

  • units는 이 레이어에 크기를 정의하고

  • 마지막으로 activation은 sigmoid나,ReLu와 같은 Activation 함수를 정의한다.


다음 예제는 5개의 히든 레이어를 가지는 오토 인코더 네트워크를 정의한 예이다.

 input_layer = features['inputs'] # 784 pixels
   dense1 = tf.layers.dense(inputs=input_layer, units=256, activation=tf.nn.relu)
   dense2 = tf.layers.dense(inputs=dense1, units=128, activation=tf.nn.relu)
   dense3 = tf.layers.dense(inputs=dense2, units=16, activation=tf.nn.relu)
   dense4 = tf.layers.dense(inputs=dense3, units=128, activation=tf.nn.relu)
   dense5 = tf.layers.dense(inputs=dense4, units=256, activation=tf.nn.relu)
   output_layer = tf.layers.dense(inputs=dense5, units=784, activation=tf.nn.sigmoid)


5개의 히든 레이어는 각각 256,128,16,128,256 개의 노드를 가지고 있고, 각각 ReLu를 Activation 함수로 사용하였다.

그리고 마지막 output layer는 784개의 노드를 가지고 sigmoid 함수를 activation 함수로 사용하였다.

Loss 함수 정의

다음 모델에 대한 비용함수(loss/cost function)을 정의한다. 이 글을 읽을 수준이면 비용함수에 대해서 별도로 설명하지 않아도 되리라고 보는데, 비용함수는 예측값과 원래 라벨에 대한 차이의 합을 나타내는 것이 비용함수이다.


 # Connect the output layer to second hidden layer (no activation fn)

 output_layer = tf.layers.dense(second_hidden_layer, 1)
 # Reshape output layer to 1-dim Tensor to return predictions
 predictions = tf.reshape(output_layer, [-1])
 predictions_dict = {"ages": predictions}

 # Calculate loss using mean squared erro
 loss = tf.losses.mean_squared_error(labels, predictions)

코드를 보면, 최종 예측된 값은 predictions에 저장되고, 학습 데이타로 부터 받은 라벨 값은 labels에 저장된다. 이 차이를 계산할때, MSE (mean square error)를 사용하였다.

Training Op 정의

비용 함수가 적용되었으면, 이 비용함수의 값을 최적화 하는 것이 학습이기 때문에, 옵티마이저를 정의하고, 옵티마이저를 이용하여 비용함수의 최적화가 되도록 한다.

아래 코드는  Optimizer를 GradientDescentOptimizer로 정의하고, 이 옵티마이저를 이용하여 이용하여 loss 값을 최소화 하도록 하였다.

optimizer = tf.train.GradientDescentOptimizer(
   learning_rate=params["learning_rate"])

train_op = optimizer.minimize(
   loss=loss, global_step=tf.train.get_global_step())

전체 코드

그러면 위의 내용을 모두 합쳐서 model_fn으로 모아서 해보자.

def model_fn(features, labels, mode, params):
 """Model function for Estimator."""
 # Connect the first hidden layer to input layer
 # (features["x"]) with relu activation
 first_hidden_layer = tf.layers.dense(features["x"], 10, activation=tf.nn.relu)

 # Connect the second hidden layer to first hidden layer with relu
 second_hidden_layer = tf.layers.dense(
     first_hidden_layer, 10, activation=tf.nn.relu)

 # Connect the output layer to second hidden layer (no activation fn)
 output_layer = tf.layers.dense(second_hidden_layer, 1)


 # Reshape output layer to 1-dim Tensor to return predictions
 predictions = tf.reshape(output_layer, [-1])

 # Provide an estimator spec for `ModeKeys.PREDICT`.
 if mode == tf.estimator.ModeKeys.PREDICT:
   return tf.estimator.EstimatorSpec(
       mode=mode,
       predictions={"ages": predictions})

 # Calculate loss using mean squared error
 loss = tf.losses.mean_squared_error(labels, predictions)

 # Calculate root mean squared error as additional eval metric
 eval_metric_ops = {
     "rmse": tf.metrics.root_mean_squared_error(
         tf.cast(labels, tf.float64), predictions)
 }

 optimizer = tf.train.GradientDescentOptimizer(
  learning_rate=params["learning_rate"])

 train_op = optimizer.minimize(
     loss=loss, global_step=tf.train.get_global_step())

 # Provide an estimator spec for `ModeKeys.EVAL` and `ModeKeys.TRAIN` modes.

 return tf.estimator.EstimatorSpec(
     mode=mode,
     loss=loss,
     train_op=train_op,
     eval_metric_ops=eval_metric_ops)

데이타 입력

 first_hidden_layer = tf.layers.dense(features["x"], 10, activation=tf.nn.relu)

네트워크 정의

 # Connect the second hidden layer to first hidden layer with relu
 second_hidden_layer = tf.layers.dense(
     first_hidden_layer, 10, activation=tf.nn.relu)

 # Connect the output layer to second hidden layer (no activation fn)
 output_layer = tf.layers.dense(second_hidden_layer, 1)

first_hidden_layer의 입력값을 가지고 네트워크를 구성한다. 두번째 레이어는 first_hidden_layer를 입력값으로 하여, 10개의 노드를 가지고, ReLu를 activation 레이어로 가지도록 하였다.  

마지막 계층은 두번째 계층에서 나온 결과를 하나의 노드를 이용하여 합쳐서 activation 함수 없이 결과를 냈다.

 # Reshape output layer to 1-dim Tensor to return predictions
 predictions = tf.reshape(output_layer, [-1])

 # Provide an estimator spec for `ModeKeys.PREDICT`.
 if mode == tf.estimator.ModeKeys.PREDICT:
   return tf.estimator.EstimatorSpec(
       mode=mode,
       predictions={"ages": predictions})

예측 모드에서는 prediction 값을 리턴해야 하기 때문에, 먼저 예측값을 output_layer에서 나온 값으로, 행렬 차원을 변경하여 저장하고, 만약에 예측 모드 tf.estimator.ModeKeys.PREDICT일 경우 EstimatorSpec에 predction 값을 넣어서 리턴한다. 이때 dict 형태로 prediction 결과 이름을 age로 값을 predictions 값으로 채워서 리턴한다.

Loss 함수 정의

다음 비용 함수를 정의하고, 테스트 단계(EVAL)에서 사용할 evaluation metrics에 rmse를 테스트 기준으로 메트릭으로 정의한다.

 # Calculate loss using mean squared error
 loss = tf.losses.mean_squared_error(labels, predictions)

 # Calculate root mean squared error as additional eval metric
 eval_metric_ops = {
     "rmse": tf.metrics.root_mean_squared_error(
         tf.cast(labels, tf.float64), predictions)
 }

Training OP 정의

비용 함수를 정했으면, 비용 함수를 최적화 하기 위한 옵티마이져를 정의한다. 아래와 같이 GradientDescentOptimzer를 이용하여 loss 함수를 최적화 하도록 하였다.

 optimizer = tf.train.GradientDescentOptimizer(
  learning_rate=params["learning_rate"])

 train_op = optimizer.minimize(
     loss=loss, global_step=tf.train.get_global_step())

 # Provide an estimator spec for `ModeKeys.EVAL` and `ModeKeys.TRAIN` modes.

마지막으로, PREDICTION이 아니고, TRAIN,EVAL인 경우에는 EstimatorSpec을 다음과 같이 리턴한다.

Loss 함수와, Training Op를 정의하고 평가용 매트릭스를 정의하여 리턴한다.

 return tf.estimator.EstimatorSpec(
     mode=mode,
     loss=loss,
     train_op=train_op,
     eval_metric_ops=eval_metric_ops)

실행

그러면 완성된 Estimator를 사용해보자

train_input_fn = tf.estimator.inputs.numpy_input_fn(
   x={"x": np.array(training_set.data)},
   y=np.array(training_set.target),
   num_epochs=None,
   shuffle=True)

# Train

nn.train(input_fn=train_input_fn, steps=5000)

# Score accuracy

test_input_fn = tf.estimator.inputs.numpy_input_fn(
   x={"x": np.array(test_set.data)},
   y=np.array(test_set.target),
   num_epochs=1,
   shuffle=False)

ev = nn.evaluate(input_fn=test_input_fn)
print("Loss: %s" % ev["loss"])
print("Root Mean Squared Error: %s" % ev["rmse"])

각 코드를 보면

train_input_fn = tf.estimator.inputs.numpy_input_fn(
   x={"x": np.array(training_set.data)},
   y=np.array(training_set.target),
   num_epochs=None,
   shuffle=True)

를 이용하여 numpy 의 데이타로 input_fn 함수를 만들었다. training_set.data는 학습 데이타, training_set.target을 학습용 라벨로 설정하고, epoch는 무제한, 그리고 데이타는 셔플 하도록 하였다.

nn.train(input_fn=train_input_fn, steps=5000)

앞서 정의된 모델에 train_input_fn을 넣어서 총 5000 번 학습을 하도록 하였다.

학습이 끝난 모델을 테스트 해야 하는데, 같은 방법으로 test_input_fn을 정의하고

ev = nn.evaluate(input_fn=test_input_fn)

evaluate를 이용하여, 학습된 모델을 평가한다.

평가된 결과를 보기 위해서 loss 값과 rmse 값을 ev[‘loss’], ev[‘rmse’]로 출력하였다.

지금까지 Estimator를 만드는 방법에 대해서 알아보았다. 다음 글에서는 Auto Encoder 네트워크를 Estimator로 구현해보도록 하겠다.





TFRecord


조대협 (http://bcho.tistory.com)


텐서플로우를 접하게 다 보면 필히 만나는 부분이 텐서플로우 학습 데이타 포맷인 TFRecord라는 파일 포맷이다. 마침 얼굴 인식 모델을 이번에는 텐서플로우에서 미리 개발되어 제공되는 물체 인식 API인 Tensorflow Object Detection API를 사용해서 얼굴을 학습시켜보려고 하니 데이타 포맷을 TFRecord 포맷으로 변경해야 한다. 그래서, TFRecord 파일을 만들어보고, 테스트를 위해서 데이타 내용도 직접 읽는 코드를 작성해보았다. (전체 코드는 https://github.com/bwcho75/objectdetection/tree/master/tfrecord 에 다.)

TFRecord 파일 포맷이란

TFRecord 파일은 텐서플로우의 학습 데이타 등을 저장하기 위한 바이너리 데이타 포맷으로, 구글의 Protocol Buffer 포맷으로 데이타를 파일에 Serialize 하여 저장한다.

CSV 파일에서와 같이 숫자나 텍스트 데이타를 읽을때는 크게 지장이 없지만, 이미지를 데이타를 읽을 경우 이미지는 JPEG나 PNG 형태의 파일로 저장되어 있고 이에 대한 메타 데이타와 라벨은 별도의 파일에 저장되어 있기 때문에, 학습 데이타를 읽을때 메타데이타나 라벨용 파일 하나만 읽는 것이 아니라 이미지 파일도 별도로 읽어야 하기 때문에, 코드가 복잡해진다.


또한 이미지를 JPG나 PNG 포맷으로 읽어서 매번 디코딩을 하게 되면, 그 성능이 저하되서 학습단계에서 데이타를 읽는 부분에서 많은 성능 저하가 발생한다.


이와 같이 성능과 개발의 편의성을 이유로 TFRecord 파일 포맷을 이용하는 것이 좋다.


그러면 간단한 예제를 통해서 TFRecord 파일을 쓰고 읽는 방법에 대해서 알아보도록 하자

본 예제는 http://warmspringwinds.github.io/tensorflow/tf-slim/2016/12/21/tfrecords-guide/ 글과 https://github.com/tensorflow/models/blob/master/object_detection/g3doc/using_your_own_dataset.md 글을 참고하였다.


TFRecord 파일 생성

TFRecord 파일 생성은 tf.train.Example에 Feature를 딕셔너리 형태로 정의한 후에, tf.train.Example 객체를 TFRecord 파일 포맷 Writer인 tf.python_io.TFRecordWriter를 통해서 파일로 저장하면 된다.


다음 코드를 보자, 이 코드는 Tensorflow Object Detection API를 자신의 데이타로 학습시키기 위해서 데이타를 TFRecord 형태로 변환하여 저장하는 코드의 내용이다.

이미지를 저장할때 사물의 위치를 사각형 좌표로 지정하고 저장한다.


def create_cat_tf_example(encoded_image_data):


 height = 1032

 width = 1200

 filename = 'example_cat.jpg'

 image_format = 'jpg'


 xmins = [322.0 / 1200.0]

 xmaxs = [1062.0 / 1200.0]

 ymins = [174.0 / 1032.0]

 ymaxs = [761.0 / 1032.0]

 classes_text = ['Cat']

 classes = [1]


 tf_example = tf.train.Example(features=tf.train.Features(feature={

     'image/height': dataset_util.int64_feature(height),

     'image/width': dataset_util.int64_feature(width),

     'image/filename': dataset_util.bytes_feature(filename),

     'image/source_id': dataset_util.bytes_feature(filename),

     'image/encoded': dataset_util.bytes_feature(encoded_image_data),

     'image/format': dataset_util.bytes_feature(image_format),

     'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),

     'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),

     'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),

     'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),

     'image/object/class/text': dataset_util.bytes_list_feature(classes_text),

     'image/object/class/label': dataset_util.int64_list_feature(classes),

 }))

 return tf_example


저장되는 내용은 이미지의 높이와 너비 (height,weight), 파일명 (filename), 인코딩 포맷 (format), 이미지 바이너리 (encoded), 이미지내에서 물체의 위치를 가르키는 사각형 위치 (xmin,ymin,xmax,ymax) 와 라벨 값등이 저장된다.


코드를 유심이 살표보면 생각보다 이해하기어렵지 않다.

tf.train.Example 객체를 만들고 이때 인자로 features에 TFRecord에 저장될 갚들의 목록을 딕셔너리 형태로 저장한다.

이때 저장되는 데이타의 이름과 값을 지정해야 하는데


'image/height': dataset_util.int64_feature(height),


를 보면 'image/height' 이 데이타의 이름이 되고, dataset_util.int64_feature(height), 가 height 값을 텐서플로우용 리스트형으로 변형하여 이를 학습용 피쳐 타입으로 변환하여 저장한다.

이 예제는 Object Detection API의 일부이기 때문에, dataset_util이라는 모듈을 사용했지만, 실제로 이 함수의 내부를 보면  tf.train.Feature(int64_list=tf.train.Int64List(value=values)) 로 구현이 되어 있다.


다음 이렇게 생성된 tf.train.Example 객체를 tf.python_io.TFRecordWriter 를 이용해서 다음과 같이 파일에 써주면 된다.

   writer = tf.python_io.TFRecordWriter(tfrecord_filename)

   writer.write(tf_example.SerializeToString())


다음은 코드 전체이다.


import tensorflow as tf

from PIL import Image

from object_detection.utils import dataset_util


def create_cat_tf_example(encoded_image_data):


 height = 1032

 width = 1200

 filename = 'example_cat.jpg'

 image_format = 'jpg'


 xmins = [322.0 / 1200.0]

 xmaxs = [1062.0 / 1200.0]

 ymins = [174.0 / 1032.0]

 ymaxs = [761.0 / 1032.0]

 classes_text = ['Cat']

 classes = [1]


 tf_example = tf.train.Example(features=tf.train.Features(feature={

     'image/height': dataset_util.int64_feature(height),

     'image/width': dataset_util.int64_feature(width),

     'image/filename': dataset_util.bytes_feature(filename),

     'image/source_id': dataset_util.bytes_feature(filename),

     'image/encoded': dataset_util.bytes_feature(encoded_image_data),

     'image/format': dataset_util.bytes_feature(image_format),

     'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),

     'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),

     'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),

     'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),

     'image/object/class/text': dataset_util.bytes_list_feature(classes_text),

     'image/object/class/label': dataset_util.int64_list_feature(classes),

 }))

 return tf_example


def read_imagebytes(imagefile):

   file = open(imagefile,'rb')

   bytes = file.read()


   return bytes

   

def main():

   print ('Converting example_cat.jpg to example_cat.tfrecord')

   tfrecord_filename = 'example_cat.tfrecord'

   bytes = read_imagebytes('example_cat.jpg')

   tf_example = create_cat_tf_example(bytes)


   writer = tf.python_io.TFRecordWriter(tfrecord_filename)

   writer.write(tf_example.SerializeToString())


main()


참고로 이 예제는 앞에서도 언급하였듯이 Object Detection API에 대한 의존성을 가지고 있기 때문에 일반적인 텐서플로우 개발환경에서는 실행이 되지 않는다. Tensorflow Object Detection API 를 인스톨해야 dataset_util 를 사용할 수 있기 때문에 Object Detection API 설치가 필요하다.

만약에 Object Detection API 설치 없이 TFRecord Writer를 짜보고 싶은 경우에는 http://warmspringwinds.github.io/tensorflow/tf-slim/2016/12/21/tfrecords-guide/ 문서에 예제가 간단하게 잘 정리되어 있으니 참고하기 바란다.

TFRecord에서 데이타 읽기

데이타를 읽는 방법도 크게 다르지 않다. 쓰는 순서의 반대라고 보면 되는데, TFReader를 통해서 Serialized 된 데이타를 읽고, 이를 Feature 목록을 넣어서 파싱한 후에, 파싱된 데이타셋에서 각 피쳐를 하나하나 읽으면 된다.


코드를 보자


def readRecord(filename_queue):

   reader = tf.TFRecordReader()

   _,serialized_example = reader.read(filename_queue)

   

   #'''

   keys_to_features = {

       'image/height': tf.FixedLenFeature((), tf.int64, 1),

       'image/width': tf.FixedLenFeature((), tf.int64, 1),

       'image/filename': tf.FixedLenFeature((), tf.string, default_value=''),

       #'image/key/sha256': tf.FixedLenFeature((), tf.string, default_value=''),

       'image/source_id': tf.FixedLenFeature((), tf.string, default_value=''),

       'image/encoded': tf.FixedLenFeature((), tf.string, default_value=''),

: (중략)

   }

   

   features = tf.parse_single_example(serialized_example,features= keys_to_features)

   

   height = tf.cast(features['image/height'],tf.int64)

   width = tf.cast(features['image/width'],tf.int64)

   filename = tf.cast(features['image/filename'],tf.string)

   source_id = tf.cast(features['image/source_id'],tf.string)

   encoded = tf.cast(features['image/encoded'],tf.string)

: (중략)

   return height,width,filename,source_id,encoded,image_format


TFRecoderReader를 이용하여 파일을 읽는데, 파일을 직접읽지 않고 filename_queue를 이용해서 읽는다. 코드 전체를 보면, 이 큐는

filename_queue = tf.train.string_input_producer([tfrecord_filename])

로, 파일 이름 목록을 가지고 리턴하는 string_input_producer를 사용하였다.

파일을 읽으면 데이타는 직렬화 된 상태로 리턴이 되어 serialized_example 에 저장된다.

이를 각 개별 피쳐로 디코딩 하기 위해서 피쳐 목록을 keys_to_features 딕셔너리에 저장한다.

이때, 각 피쳐의 이름과, 타입을 정의한다.

다음은 ‘image/height’ 피쳐를 정의하여 int 64 타입으로 읽어드리는 부분이다.


       'image/height': tf.FixedLenFeature((), tf.int64, 1),


피쳐 목록과, 데이타 타입은 TFRecord 파일을 쓸때 사용한 이름과 데이타 타입을 그대로 사용하면 된다.

피쳐 목록이 정의되었으면


   features = tf.parse_single_example(serialized_example,features= keys_to_features)


를 통해서 피쳐를 파싱해낸다. 파싱된 피쳐는 features에 딕셔너리 형태로 저장된다.

다음 리턴 받은 텐서를 각 변수에 저장한다. 이때 타입 캐스팅을 해서 저장한다. (이미 타입을 맞춰서 데이타를 꺼냈기 때문에, 별도의 캐스팅은 필요없지만 확실히 하기 위해서 캐스팅을 한다.)

다음은 ‘image/height’ 를 피쳐를 배열에서 뽑아서, int64 타입으로 변환하여 height 에 저장하는 부분이다.


   height = tf.cast(features['image/height'],tf.int64)


리턴값은 텐서가 되는데, 이 값을 출력하는 코드를 보자

당연히 텐서이기 때문에 Session 을 시작해야 하는데, 먼저 파일에서 데이타를 읽기 위해서 filename queue를 정의한다.

    filename_queue = tf.train.string_input_producer([tfrecord_filename])

다음 filename_queue를 인자로 넘겨서 height,source_id 등의 값을 *.tfrecord 파일에서 읽어서 텐서로 리턴 받는다.


    height,width,filename,source_id,encoded,image_format = readRecord(filename_queue)


다음 세션을 시작하고, readRecord 에서 리턴된 값을 받아온다.         vheight,vwidth,vfilename,vsource_id,vencoded,vimage_format = sess.run([height,width,filename,source_id,encoded,image_format])


전체 코드는 https://github.com/bwcho75/objectdetection/tree/master/tfrecord/reader.py 를 참고하기 바란다.


마지막으로 받은 값을 화면에 출력한다.



간단하게 tfRecord 파일 포맷으로 학습용 데이타를 쓰는 방법을 알아보았다. 텐서플로우 코드가 간단해 지고 성능에 도움이 되는 만큼 데이타 전처리 단계에서 가급적이면 학습 데이타를 tfrecord 타입으로 바꿔서 학습하는 것을 권장한다. (특히 이미지 데이타!!)


참고 자료


텐서플로우에서 이미지 데이타 처리 성능 향상방법


이미지 인식 모델을 만들다가 파일 포맷 성능 향상 관련해서 좋은 팁을 찾아서 메모


if you are working with >O(1000) JPEG images, keep in mind that it is extremely inefficient to individually ready 1000's of small files. This will slow down your training quite a bit.

A more robust and faster solution to convert a dataset of images to a sharded TFRecord of Example protos. Here is a fully worked script for converting the ImageNet data set to such a format. And here is a set of instructions for running a generic version of this preprocessing script on an arbitrary directory containing JPEG images.

http://stackoverflow.com/questions/37126108/how-to-read-data-into-tensorflow-batches-from-example-queue

1000개 이상의 JPEG나 PNG 이미지를 매번 읽어서 트레이닝을 시킬 경우, 트레이닝 성능이 낮아진다.

그래서 JPEG 포맷을 사용하지 말고 TFRecord 포맷을 사용하고, TFRecord 포맷에서 한 파일에 하나의 데이타를 넣지말고 여러 데이타를 넣는 방법을 사용해야 한다.