블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기


조대협 (http://bcho.tistory.com)



개요

텐서플로우 학습에 있어서 데이타 포맷은 학습의 성능을 결정 짓는 중요한 요인중의 하나이다. 특히 이미지 파일의 경우 이미지 목록과 이미지 파일이 분리되어 있어서 텐서플로우에서 학습시 이미지 목록을 읽으면서, 거기에 있는 이미지 파일을 매번 읽어야 하기 때문에, 코딩이 다소 지저분해지고,IO 성능이 떨어질 수 있다

텐서플로우에서는 이러한 학습 데이타를 쉽게 읽을 수 있도록 tfrecord (http://bcho.tistory.com/1190)라는 파일 포맷을 지원한다.


이 글에서는 이미지 데이타를 읽어서 tfrecord 로 컨버팅하는 방법을 설명하며, 분산 데이타 처리 프레임웍인 오픈소스 Apache Beam을 기준으로 설명하나, tfrecord 변환 부분은 Apache Beam과 의존성이 없이 사용이 가능하기 때문에, 필요한 부분만 참고해도 된다. 이 Apache Beam을 구글의 Apache Beam 런타임 (매니지드 서비스)인 구글 클라우드의 Dataflow를 이용하여, 클러스터를 이용하여 빠르게 데이타를 처리하는 방법에 대해서 알아보도록 한다.


전체 코드는 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/4.%20Convert%20Pickle%20file%20to%20TFRecord%20by%20using%20Apache%20Beam.ipynb 에 있다.


이 코드는 CIFAR-10 이미지 데이타를 Apache Beam 오픈 소스를 이용하여, 텐서플로우 학습용 데이타 포맷인  tfrecord 형태로 변환 해주는 코드이다.


Apache Beam은 데이타 처리를 위한 프레임웍으로, 구글 클라우드 상에서 실행하거나 또는 개인 PC나 Spark 클러스터상 여러 환경에서 실행이 가능하며, 구글 클라우드 상에서 실행할 경우 오토스케일링이나 그래프 최적화 기능등으로 최적화된 성능을 낼 수 있다.


CIFAR-10 데이타 셋은 32x32 PNG 이미지 60,000개로 구성된 데이타 셋으로 해당 코드 실행시 최적화가 되지 않은 상태에서 약 16분 정도의 처리 시간이 소요된다. 이 중 6분 정도는 Apache Beam 코드를 구글 클라우드로 업로드 하는데 소요되는 시간이고 실제 처리시간은 10분정도가 소요된다. 전처리 과정에 Apache Beam을 사용하기 전에 고려해야 할 요소는 다음과 같다.

  • 데이타가 아주 많아서 전처리 시간이 수시간 이상 소요될 경우 Apache Beam + Google Cloud를 고려하여 여러 머신에서 동시에 처리하여 빠른 시간내에 수행되도록 할 수 있다.

  • 데이타가 그다지 많지 않고 싱글 머신에서 멀티 쓰레드로 처리를 원할 경우에는 Apache Beam으로 멀티 쓰레드 기반의 병렬 처리를 하는 방안을 고려할 수 있다. 이 경우 클라우드에 대한 의존성을 줄일 수 있다.

  • 다른 대안으로는 Spark/Hadoop 등의 오픈소스를 사용하여, On Prem에서 여러 머신을 이용하여 전처리 하는 방안을 고려할 수 있다.

여기서는 아주 많은 대량의 이미지 데이타에 대한 처리를 하는 것을 시나리오로 가정하였다.

전처리 파이프라인

Apache Beam을 이용한 데이타 전처리 파이프라인의 구조는 다음과 같다.

이미지 파일 준비

CIFAR-10 데이타셋 원본은 이미지 파일 형태가 아니라 PICKLE이라는 파일 포맷으로 되어 있기 때문에,  실제 개발 환경에서는 원본데이타가 이미지인것으로 가정하기 위해서 https://github.com/bwcho75/cifar-10/tree/master/pre-processing 의 1~2번 코드를 통해서 Pickle 파일을 이미지 파일로 변경하고, *.csv 파일에 {파일명},{레이블} 형태로 인덱스 데이타를 생성하였다.

생성된 이미지 파일과 *.csv 파일은 gsutil 명령어를 이용하여 Google Cloud Storage (aka GCS)에 업로드 하였다. 업로드 명령은 https://github.com/bwcho75/cifar-10/blob/master/pre-processing/2.%20Convert%20CIFAR-10%20Pickle%20files%20to%20image%20file.ipynb 에 설명되어 있다.


전처리 파이프라인의 구조

Apache Beam으로 구현된 파이프라인의 구조는 다음과 같다.


1. TextIO의 ReadFromText로 CSV 파일에서 한 라인 단위로 문자열을 읽는다.

2. parseLine에서 라인을 ,로 구분하여 filename과 label을 추출한다.

3. readImage 에서 filename을 가지고, 이미지 파일을 읽어서, binary array 형태로 변환한다.

4. TFExampleFromImageDoFn에서 이미지 바이너리와 label을 가지고 TFRecord 데이타형인 TFExample 형태로 변환한다.

5. 마지막으로 TFRecordIOWriter를 통해서 TFExample을 *.tfrecord 파일에 쓴다.

코드 주요 부분 설명

환경 설정 부분

이 코드는 구글 클라우드와 로컬 환경 양쪽에서 모두 실행이 가능하도록 구현되었다.

SRC_DIR_DEV는 로컬환경에서 이미지와 CSV 파일이 위치한 위치이고, DES_DIR_DEV는 로컬환경에서 tfrecord 파일이 써지는 위치이다.

구글 클라우드에서 실행할 경우 파일 저장소를  GCS (Google Cloud Storage)를 사용한다. DES_BUCKET은 GCS 버킷 이름이다. 코드 실행전에 반드시 구글 클라우드 콘솔에서 GCS 버킷을 생성하기 바란다.  SRC_DIR_PRD와 DES_DIR_PRD는 GCS 버킷내의 각각 image,csv 파일의 경로와 tfrecord 파일이 써질 경로 이다. 이 경로에 맞춰서 구글 클라우드 콘솔에서 디렉토리를 먼저 생성해 놓기를 바란다.




PROJECT는 구글 클라우드 프로젝트 명이고, 마지막으로 DEV_MODE가 True이면 로컬에서 수행이되고 False이면 구글 클라우드에서 실행하도록 하는 환경 변수이다.

의존성 설정 부분

로컬에서 실행할 경우필요한  파이썬 라이브러리가 이미 설치되어야 있어야 한다.

만약에 구글 클라우드에서 실행할 경우 이 Apache Beam 코드가 사용하는 파이썬 모듈을 명시적으로 정의해놔야 한다. 클라우드에서 실행시에는 Apache Beam 코드만 업로드가 되기 때문에(의존성 라이브러리를 같이 업로드 하는 방법도 있는데, 이는 추후에 설명한다.), 의존성 라이브는 구글 클라우드에서 Dataflow 실행시 자동으로 설치할 수 있도록 할 수 있는데, 이를 위해서는 requirements.txt 파일에 사용하는 파이썬 모듈들을 정의해줘야 한다. 다음은 requirements.txt에 의존성이 있는 파이썬 모듈등을 정의하고 저장하는 부분이다.


Apache Beam 코드

Apache Beam의 코드 부분은 크게 복잡하지 않기 때문에 주요 부분만 설명하도록 한다.

Service account 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 코드 실행에 대한 권한을 줘야 한다. 구글 클라우드에서는 사용자가 아니라 애플리케이션에 권한을 부여하는 방법이 있는데, Service account라는 것을 사용한다. Service account는 json 파일로 실행 가능한 권한을 정의하고 있다.

Service account 파일을 생성하는 방법은 http://bcho.tistory.com/1166 를 참고하기 바란다.

Service account 파일이 생성되었으면, 이 파일을 적용해야 하는데 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 Service account  파일의 경로를 정의해주면 된다. 파이썬 환경에서 환경 변수를 설정하는 방법은 os.envorin[‘환경변수명']에 환경 변수 값을 지정해주면 된다.

Jobname 설정

구글 클라우드에서 Apache Beam 코드를 실행하면, 하나의 실행이 하나의 Job으로 생성되는데, 이 Job을 구별하기 위해서 Job 마다 ID 를 설정할 수 있다. 아래는 Job ID를 ‘cifar-10’+시간 형태로 지정하는 부분이다


환경 설정

Apache Beam 코드를 구글 클라우드에서 실행하기 위해서는 몇가지 환경을 지정해줘야 한다.


  • staging_location은 클라우드 상에서 실행시 Apache Beam 코드등이 저장되는 위치이다. GCS 버킷 아래 /staging이라는 디렉토리로 지정했는데, 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • temp_location은 기타 실행중 필요한 파일이 저장되는 위치이다. 실행 전에 반드시 버킷아래 디렉토리를 생성하기 바란다.

  • zone은 dataflow worker가 실행되는 존으로 여기서는 asia-northeast1-c  (일본 리전의 c 존)으로 지정하였다.


DEV_MODE 에 따른 환경 설정

로컬 환경이나 클라우드 환경에서 실행이냐에 따라서 환경 변수 설정이 다소 달라져야 한다.


디렉토리 경로를 바꿔서 지정해야 하고, 중요한것은 RUNNER인데, 로컬에서 실행하기 위해서는 DirectRunner를 구글 클라우드 DataFlow 서비스를 사용하기 위해서는 DataflowRunner를 사용하면 된다.


readImage 부분

Read Image는 이미지 파일을 읽어서 byte[] 로 리턴하는 부분인데, 로컬 환경이냐, 클라우드 환경이냐에 따라서 동작 방식이 다소 다르다.

클라우드 환경에서는 이미지 파일이 GCS에 저장되어 있기 때문에 파이썬의 일반 파일 open 명령등을 사용할 수 없다.

그래서 클라우드 환경에서 동작할 경우에는 GCS에서 파일을 읽어서 Worker의 로컬 디스크에 복사를 해놓고 이미지를 읽어서 byte[]로 변환한 후에, 해당 파일을 지우는 방식을 사용한다.


아래 코드에서 보면 DEV_MODE가 False 인경우 GCS에서 파일을 읽어서 로컬에 저장하는 코드가 있다.


storageClient는 GCS 클라이언트이고 bucket 을 얻어온후, bucket에서 파일을 get_blob 명령어를 이용하여 경로를 저장하여 blob.download_to_file을 이용하여 로컬 파일에 저장하였다.

실행

코드 작성이 끝났으면 실행을 한다. 실행 상태는 구글 클라우드 콘솔의 Dataflow  메뉴에서 확인이 가능하다.

아래와 같이 실행중인 그리고 실행이 끝난 Job 리스트들이 출력된다.




코드 실행중에, 파이프라인 실행 상황 디테일을 Job 을 선택하면 볼 수 있다.


여기서 주목할만한 점은 우측 그래프인데, 우측 그래프는 Worker의 수를 나타낸다. 초기에 1대로 시작했다가 오토 스케일링에 의해서 9대 까지 증가한것을 볼 수 있다.

처음 실행이었기 때문에 적정한 인스턴스수를 몰랐기 때문에 디폴트로 1로 시작하고 오토스케일링을 하도록 했지만, 어느정도 테스트를 한후에 적정 인스턴수를 알면 오토 스케일링을 기다릴 필요없이 디폴트 인스턴스 수를 알면 처음부터 그 수만큼 인스턴스 수로 시작하도록 하면 실행 시간을 줄일 수 있다.

만약에 파이프라인 실행시 에러가 나면 우측 상단에 LOGS 버튼을 누르면 상세 로그를 볼 수 있다.


아래 그림은 파이프라인 실행이 실패한 예에서 STACK TRACES를 통해서 에러 내용을 확인하는 화면이다.



해당 로그를 클릭하면 Stack Driver (구글의 모니터링 툴)의 Error Reporting 시스템 화면으로 이동하게 된다.

여기서 디테일한 로그를 볼 수 있다.

아래 화면을 보면 ReadImage 단계에서 file_path라는 변수명을 찾을 수 없어서 나는 에러를 확인할 수 있다.


TFRecord 파일 검증

파이프라인 실행이 끝나면, GCS 버킷에 tfrecord 파일이 생성된것을 확인할 수 있다.


해당 파일을 클릭하면 다운로드 받을 수 있다.

노트북 아래 코드 부분이 TFRecord를 읽어서 확인하는 부분이다. 노트북에서 tfrecord 파일의 경로를 다운로드 받은 경로로 변경하고 실행을 하면 파일이 제대로 읽히는 지 확인할 수 있다.


파일 경로 부분은 코드상에서 다음과 같다.



정상적으로 실행이 된 경우, 다음과 같이 tfrecord에서 읽은 이미지와 라벨값이 출력됨을 확인할 수 있다.


라벨 값은 Label 줄에 values 부분에 출력된다. 위의 그림에서는 순서대로 라벨 값이 4와 2가 된다.



오토 인코더를 이용한 신용카드 비정상 거래 검출 

#3 학습 데이타 전처리


조대협 (http://bcho.tistory.com)




앞의 글들 (http://bcho.tistory.com/1198 http://bcho.tistory.com/1197 ) 에서 신용카드 이상 검출을 하기 위한 데이타에 대한 분석과, 오토 인코더에 대한 기본 원리 그리고 오토 인코더에 대한 샘플 코드를 살펴보았다.


이제 실제 모델을 만들기에 앞서 신용카드 거래 데이타를 학습에 적절하도록 전처리를 하도록한다.

데이타양이 그리 크지 않기 때문에, 데이타 전처리는 파이썬 데이타 라이브러리인 pandas dataframe을 사용하였다. 여기서 사용된 전처리 코드는 https://github.com/bwcho75/tensorflowML/blob/master/autoencoder/creditcard_fraud_detection/2.data_normalization.ipynb 에 공개되어 있다.


데이타 전처리 과정

신용카드 거래 데이타를 머신러닝 학습의 검증과 테스트에 적절하도록 다음과 같은 절차를 통하여 데이타를 전처리하여 CSV 파일로 저장하였다.

데이타 정규화

학습 데이타에 여러가지 피쳐를 사용하는데, 예를 들어 피쳐 V1의 범위가 -10000~10000이고, 피쳐 V2의 범위가 10~20 이라면, 각 피쳐의 범위가 차이가 매우 크기 때문에, 경사 하강법등을 이용할때, 학습 시간이 더디거나 또는 제대로 학습이 되지 않을 수 있다. 자세한 내용은 김성훈 교수님의 모두를 위한 딥러닝 강좌중 정규화 부분  https://www.youtube.com/watch?v=1jPjVoDV_uo&feature=youtu.be 을 참고하기 바란다.

그래서 피쳐의 범위를 보정(정규화)하여 학습을 돕는 과정을 데이타 정규화라고 하는데, 정규화에는 여러가지 방법이 있다. 여기서 사용한 방법은 Fearture scaling이라는 방법으로, 모든 피쳐의 값들을 0~1사이로 변환하는 방법이다. 위에서 언급한 V1은 -10000~10000의 범위가 0~1사이로 사상되는 것이고, V2도 10~20의 범위가 0~1사이로 사상된다.

공식은 아래와 같은데



참고 https://en.wikipedia.org/wiki/Normalization_(statistics)


정규화된 값은 = (원본값 - 피쳐의 최소값) / (피쳐의 최대값 - 피쳐의 최소값)


으로 계산한다.

앞의 V1값에서 0의 경우는 (0 - (-10000)) / (10000 - (-10000)) = 0.5 로 사상이 되는것이다.


그러면 신용카드 데이타에서 V1~V28 컬럼을 Feature scaling을 위해서 정규화를 하려면

df_csv = pd.read_csv('./data/creditcard.csv')

CSV에서 원본 데이타를 읽는다.

읽어드린 데이타의 일부를 보면 다음과 같다.


df_csv 는 데이타의 원본값을 나타내고,  df_csv.min() 각 컬럼의 최소값, df_csv.max()는 각 컬럼의 최대값을 나타낸다. 이 값들을 이용하여 위의 Feature Scaling 공식으로 구현하면 아래와 같이 된다


df_norm = (df_csv - df_csv.min() ) / (df_csv.max() - df_csv.min() )


이렇게 정규화된 값을 출력해보면 다음과 같다.




V1 컬럼의 -1.359807이 정규화후에 0.935192 로 변경된것을 확인할 수 있고 다른 필드들도 변경된것을 확인할 수 있다.

데이타 분할

전체 데이타를 정규화 하였으면 데이타를 학습용, 검증용, 테스트용 데이타로 나눠야 하는데, 오토 인코더의 원리는 정상적인 데이타를 학습 시킨후에, 데이타를 넣어서 오토인코더가 학습되어 있는 정상적인 패턴과 얼마나 다른가를 비교하는 것이기 때문에 학습 데이타에는 이상거래를 제외하고 정상적인 거래만으로 학습을 한다.

이를 위해서 먼저 데이타를 정상과 비정상 데이타셋 두가지로 분리한다.

아래 코드는 Class=1이면 비정상, Class=0이면 정상인 데이타로 분리가 되는데, 정상 데이타는 df_norm_nonfraud에 저장하고, 비정상 데이타는 df_norm_fraud에 저장하는 코드이다.

# split normalized data by label
df_norm_fraud=df_norm[ df_norm.Class==1.0] #fraud
df_norm_nonfraud=df_norm[ df_norm.Class==0.0] #non_fraud


정상 데이타를 60:20:20 비율로 학습용, 테스트용, 검증용으로 나누고, 비정상 데이타는 학습에는 사용되지 않고 테스트용 및 검증용에만 사용되기 때문에, 테스트용 및 검증용으로 50:50 비율로 나눈다.


# split non_fraudfor 60%,20%,20% (training,validation,test)
df_norm_nonfraud_train,df_norm_nonfraud_validate,df_norm_nonfraud_test = \
   np.split(df_norm_nonfraud,[int(.6*len(df_norm_nonfraud)),int(.8*len(df_norm_nonfraud))])


numpy의 split 함수를 쓰면 쉽게 데이타를 분할 할 수 있다. [int(.6*len(df_norm_nonfraud)),int(.8*len(df_norm_nonfraud))] 가 데이타를 분할하는 구간을 정의하는데,  데이타 프레임의 60%, 80% 구간을 데이타 분할 구간으로 하면 0~60%, 60~80%, 80~100% 구간 3가지로 나누어서 데이타를 분할하여 리턴한다. 같은 방식으로 아래와 같이 비정상 거래 데이타도 50% 구간을 기준으로 하여 두 덩어리로 데이타를 나눠서 리턴한다.


# split fraud data to 50%,50% (validation and test)
df_norm_fraud_validate,df_norm_fraud_test = \
   np.split(df_norm_fraud,[int(0.5*len(df_norm_fraud))])

데이타 합치기

다음 이렇게 나눠진 데이타를 테스트용 데이타는 정상과 비정상 거래 데이타를 합치고, 검증용 데이타 역시 정상과 비정상 거래를 합쳐서 각각 테스트용, 검증용 데이타셋을 만들어 낸다.

두개의 데이타 프레임을 합치는 것은 아래와 같이 .append() 메서드를 이용하면 된다.


df_train = df_norm_nonfraud_train.sample(frac=1)
df_validate = df_norm_nonfraud_validate.append(df_norm_fraud_validate).sample(frac=1)
df_test = df_norm_nonfraud_test.append(df_norm_fraud_test).sample(frac=1)

셔플링

데이타를 합치게 되면, 테스트용과 검증용 데이타 파일에서 처음에는 정상데이타가 나오다가 뒷부분에 비정상 데이타가 나오는 형태가 되기 때문에 테스트 결과가 올바르지 않을 수 있는 가능성이 있다. 그래서, 순서를 무작위로 섞는 셔플링(Shuffling) 작업을 수행한다.

셔플링은 위의 코드에서 .sample(frac=1)에 의해서 수행되는데, .sample은 해당 데이타 프레임에서 샘플 데이타를 추출하는 명령으로 frac은 샘플링 비율을 정의한다 1이면 100%로, 전체 데이타를 가져오겠다는 이야기 인데, sample()함수는 데이타를 가지고 오면서 순서를 바꾸기 때문에, 셔플링된 결과를 리턴하게 된다.


전체 파이프라인을 정리해서 도식화 해보면 다음과 같다.


다음글에서는 이렇게 정재된 데이타를 가지고 학습할 오토인코더 모델을 구현해보도록 한다.


연예인 얼굴 인식 서비스를 만들어보자 #1 - 데이타 준비하기

 

CNN 에 대한 이론 공부와 텐서 플로우에 대한 기본 이해를 끝내서 실제로 모델을 만들어보기로 하였다.

CNN을 이용한 이미지 인식중 대중적인 주제로 얼굴 인식 (Face recognition)을 주제로 잡아서, 이 모델을 만들기로 하고 아직 실력이 미흡하여 호주팀에서 일하고 있는 동료인 Win woo 라는 동료에게 모델과 튜토리얼 개발을 부탁하였다.

 

이제 부터 연재하는 연예인 얼굴 인식 서비스는 Win woo 가 만든 코드를 기반으로 하여 설명한다. (코드 원본 주소 : https://github.com/wwoo/tf_face )

 

얼굴 데이타를 구할 수 있는곳

먼저 얼굴 인식 모델을 만들려면, 학습을 시킬 충분한 데이타가 있어야 한다. 사람 얼굴을 일일이 구할 수 도 없고, 구글이나 네이버에서 일일이 저장할 수 도 없기 때문에, 공개된 데이타셋을 활용하였는데, PubFig (Public Figures Face Database - http://www.cs.columbia.edu/CAVE/databases/pubfig/) 를 사용하였다.


 

이 데이타셋에는 약 200명에 대한 58,000여장의 이미지를 저장하고 있는데, 이 중의 일부만을 사용하였다.

Download 페이지로 가면, txt 파일 형태 (http://www.cs.columbia.edu/CAVE/databases/pubfig/download/dev_urls.txt) 로 아래와 같이

 

Abhishek Bachan 1 http://1.bp.blogspot.com/_Y7rzCyUABeI/SNIltEyEnjI/AAAAAAAABOg/E1keU_52aFc/s400/ash_abhishek_365x470.jpg 183,60,297,174 f533da9fbd1c770428c8961f3fa48950
Abhishek Bachan 2 http://1.bp.blogspot.com/_v9nTKD7D57Q/SQ3HUQHsp_I/AAAAAAAAQuo/DfPcHPX2t_o/s400/normal_14thbombaytimes013.jpg 49,71,143,165 e36a8b24f0761ec75bdc0489d8fd570b
Abhishek Bachan 3 http://2.bp.blogspot.com/_v9nTKD7D57Q/SL5KwcwQlRI/AAAAAAAANxM/mJPzEHPI1rU/s400/ERTYH.jpg 32,68,142,178 583608783525c2ac419b41e538a6925d

 

사람이름, 이미지 번호, 다운로드 URL, 사진 크기, MD5 체크섬을 이 필드로 저장되어 있다.

이 파일을 이용하여 다운로드 URL에서 사진을 다운받아서, 사람이름으로된 폴더에 저장한다.

물론 수동으로 할 수 없으니 HTTP Client를 이용하여, URL에서 사진을 다운로드 하게 하고, 이를 사람이름 폴더 별로 저장하도록 해야 한다.

 

HTTP Client를 이용하여 파일을 다운로드 받는 코드는 일반적인 코드이기 때문에 별도로 설명하지 않는다.

본인의 경우에는 Win이 만든 https://github.com/wwoo/tf_face/blob/master/tf/face_extract/pubfig_get.py 코드를 이용하여 데이타를 다운로드 받았다.

사용법은  https://github.com/wwoo/tf_face 에 나와 있는데,

 

$> python tf/face_extract/pubfig_get.py tf/face_extract/eval_urls.txt ./data

를 실행하면 ./data 디렉토리에 이미지를 다운로드 받아서 사람 이름별 폴더에 저장해준다.

evals_urls.txt에는 위에서 언급한 dev_urls.txt 형태의 데이타가 들어간다.


사람 종류가 너무 많으면 데이타를 정재하는 작업이 어렵고, (왜 어려운지는 뒤에 나옴) 학습 시간이 많이 걸리기 때문에, 약 47명의 데이타를 다운로드 받아서 작업하였다.

학습 데이타 준비에 있어서 경험

쓰레기 데이타 골라내기

데이타를 다운받고 나니, 아뿔사!! PubFig 데이타셋이 오래되어서 없는 이미지도 있고 학습에 적절하지 않은 이미지도 있다.


주로 학습에 적절하지 않은 데이타는 한 사진에 두사람 이상의 얼굴이 있거나, 이미지가 사라져서 위의 우측 그림처럼, 이미지가 없는 형태로 나오는 경우인데, 이러한 데이타는 어쩔 수 없이 눈으로 한장한장 다 걸러내야만 했는데, 이런 간단한 데이타 필터링 처리는 Google Cloud Vision API를 이용하여, 얼굴이 하나만 있는 사진만을 사용하도록 하여 필터링을 하였다.

학습 데이타의 분포

처음에 학습을 시작할때, 분류별로 데이타의 수를 다르게 하였다. 어렵게 모은 데이타를 버리기가 싫어서 모두 다 넣고 학습 시켰는데, 그랬더니 학습이 쏠리는 현상이 발생하였다.

예를 들어 안젤리나 졸리 300장, 브래드피트 100장, 제시카 알바 100장 이런식으로 학습을 시켰더니, 이미지 예측에서 안젤리나 졸리로 예측하는 경우가 많아졌다. 그래서 학습을 시킬때는 데이타수가 작은 쪽으로 맞춰서 각 클래스당 학습 데이타수가 같도록 하였다. 즉 위의 데이타의 경우에는 안젤리나 졸리 100장, 브래드피트 100장, 제시카 알바 100장식으로 데이타 수를 같게 해야했다.

라벨은 숫자로

라벨의 가독성을 높이기 위해서 라벨을 영문 이름으로 사용했는데, CNN 알고리즘에서 최종 분류를 하는 알고리즘은 softmax 로 그 결과 값을 0,1,2…,N식으로 라벨을 사용하기 때문에, 정수형으로 변환을 해줘야 하는데, 텐서 플로우 코드에서는 이게 그리 쉽지않았다. 그래서 차라리 처음 부터 학습 데이타를 만들때는 라벨을 정수형으로 만드는것이 더 효과적이다

얼굴 각도, 표정,메이크업, 선글라스 도 중요하다

CNN 알고리즘을 마법처럼 생각해서였을까? 데이타만 있다면 어떻게든 학습이 될 줄 알았다. 그러나 얼굴의 각도가 많이 다르거나 표정이 심하게 차이가 난 경우에는 다른 사람으로 인식이 되기 때문에 가능하면 비슷한 표정에 비슷한 각도의 사진으로 학습 시키는 것이 정확도를 높일 수 있다.


 

얼굴 각도의 경우 구글 클라우드 VISION API를 이용하면 각도를 추출할 수 있기 때문에 20도 이상 차이가 나는 사진은 필터링 하였고, 표정 부분도 VISION API를 이용하면 감정도를 분석할 수 있기 때문에 필터링이 가능하다. (아래서 설명하는 코드에서는 감정도 분석 부분은 적용하지 않았다)

또한 선글라스를 쓴 경우에도 다른 사람으로 인식할 수 있기 때문에 VISION API에서 물체 인식 기능을 이용하여 선글라스가 검출된 경우에는 학습 데이타에서 제거하였다.

이외에도 헤어스타일이나 메이크업이 심하게 차이가 나는 경우에는 다른 사람으로 인식되는 확률이 높기 때문에 이런 데이타도 가급적이면 필터링을 하는것이 좋다.

웹 크라울링의 문제점

데이타를 쉽게 수집하려고 웹 크라울러를 이용해서 구글 이미지 검색에서 이미지를 수집해봤지만, 정확도는 매우 낮게 나왔다.


 

https://www.youtube.com/watch?v=k5ioaelzEBM

<그림. 설현 얼굴을 웹 크라울러를 이용하여 수집하는 화면>

 

아래는 웹 크라울러를 이용하여 EXO 루한의 사진을 수집한 결과중 일부이다.


웹크라울러로 수집한 데이타는, 앞에서 언급한 쓰레기 데이타들이 너무 많다. 메이크업, 표정, 얼굴 각도, 두명 이상 있는 사진들이 많았고, 거기에 더해서 그 사람이 아닌 사람의 얼굴 사진까지 같이 수집이 되는 경우가 많았다.

웹 크라울링을 이용한 학습 데이타 수집은 적어도 얼굴 인식용 데이타 수집에 있어서는 좋은 방법은 아닌것 같다. 혹여나 웹크라울러를 사용하더라도 반드시 수동으로 직접 데이타를 검증하는 것이 좋다.

학습 데이타의 양도 중요하지만 질도 매우 중요하다

아이돌 그룹인 EXO와 레드벨벳의 사진을 웹 크라울러를 이용해서 수집한 후에 학습을 시켜보았다. 사람당 약 200장의 데이타로 8개 클래스 정도를 테스트해봤는데 정확도가 10%가 나오지를 않았다.

대신 데이타를 학습에 좋은 데이타를 일일이 눈으로 확인하여 클래스당 30장 정도를 수집해서 학습 시킨 결과 60% 정도의 정확도를 얻을 수 있었다.  양도 중요하지만 학습 데이타의 질적인 면도 중요하다.

중복데이타 처리 문제

데이타를 수집해본 결과, 중복되는 데이타가 생각보다 많았다. 중복 데이타를 걸러내기 위해서 파일의 MD5 해쉬 값을 추출해낸 후 이를 비교해서 중복되는 파일을 제거하였는데, 어느정도 효과를 볼 수 있었지만, 아래 이미지와 같이 같은 이미지지만, 편집이나 리사이즈가 된 이미지의 경우에는 다른 파일로 인식되서 중복 체크에서 검출되지 않았다.


연예인 얼굴 인식은 어렵다

얼굴 인식 예제를 만들면서 재미를 위해서 한국 연예인 얼굴을 수집하여 학습에 사용했는데, 제대로 된 학습 데이타를 구하기가 매우 어려웠다. 앞에서 언급한데로 메이크업이나 표정 변화가 너무 심했고, 어렸을때나 나이먹었을때의 차이등이 심했다. 간단한 공부용으로 사용하기에는 좋은 데이타는 아닌것 같다.

그러면 학습에 좋은 데이타는?

그러면 얼굴 인식 학습에 좋은 데이타는 무엇일까? 테스트를 하면서 내린 자체적인 결론은 정면 프로필 사진류가 제일 좋다. 특히 스튜디오에서 찍은 사진은 같은 조명에 같은 메이크업과 헤어스타일로 찍은 경우가 많기 때문에 학습에 적절하다. 또는 동영상의 경우에는 프레임을 잘라내면 유사한 표정과 유사한 각도, 조명등에 대한 데이타를 많이 얻을 수 있기 때문에 좋은 데이타 된다.

얼굴 추출하기

그러면 앞의 내용을 바탕으로 해서, 적절한 학습용 얼굴 이미지를 추출하는 프로그램을 만들어보자

포토샵으로 일일이 할 수 없기 때문에 얼굴 영역을 인식하는 API를 사용하기로한다. OPEN CV와 같은 오픈소스 라이브러리를 사용할 수 도 있지만 구글의 VISION API의 경우 얼굴 영역을 아주 잘 잘라내어주고,  얼굴의 각도나 표정을 인식해서 필터링 하는 기능까지 코드 수십줄만 가지고도 구현이 가능했기 때문에, VISION API를 사용하였다. https://cloud.google.com/vision/

VISION API ENABLE 하기

VISION API를 사용하기 위해서는 해당 구글 클라우드 프로젝트에서 VISION API를 사용하도록 ENABLE 해줘야 한다.

VISION API를 ENABLE하기 위해서는 아래 화면과 같이 구글 클라우드 콘솔 > API Manager 들어간후


 

+ENABLE API를 클릭하여 아래 그림과 같이 Vision API를 클릭하여 ENABLE 시켜준다.

 



 

SERVICE ACCOUNT 키 만들기

다음으로 이 VISION API를 호출하기 위해서는 API 토큰이 필요한데, SERVICE ACCOUNT 라는 JSON 파일을 다운 받아서 사용한다.

구글 클라우드 콘솔에서 API Manager로 들어간후 Credentials 메뉴에서 Create creadential 메뉴를 선택한후, Service account key 메뉴를 선택한다


 

다음 Create Service Account key를 만들도록 하고, accountname과 id와 같은 정보를 넣는다. 이때 중요한것이 이 키가 가지고 있는 사용자 권한을 설정해야 하는데, 편의상 모든 권한을 가지고 있는  Project Owner 권한으로 키를 생성한다.

 

(주의. 실제 운영환경에서 전체 권한을 가지는 키는 보안상의 위험하기 때문에 특정 서비스에 대한 접근 권한만을 가지도록 지정하여 Service account를 생성하기를 권장한다.)

 


 

Service account key가 생성이 되면, json 파일 형태로 다운로드가 된다.

여기서는 terrycho-ml-80abc460730c.json 이름으로 저장하였다.

 

예제 코드

그럼 예제를 보자 코드의 전문은 https://github.com/bwcho75/facerecognition/blob/master/com/terry/face/extract/crop_face.py 에 있다.

 

이 코드는 이미지 파일이 있는 디렉토리를 지정하고, 아웃풋 디렉토리를 지정해주면 이미지 파일을 읽어서 얼굴이 있는지 없는지를 체크하고 얼굴이 있으면, 얼굴 부분만 잘라낸 후에, 얼굴 사진을 96x96 사이즈로 리사즈 한후에,

70%의 파일들은 학습용으로 사용하기 위해서 {아웃풋 디렉토리/training/} 디렉토리에 저장하고

나머지 30%의 파일들은 검증용으로 사용하기 위해서 {아웃풋 디렉토리/validate/} 디렉토리에 저장한다.

 

그리고 학습용 파일 목록은 다음과 같이 training_file.txt에 파일 위치,사람명(라벨) 형태로 저장하고

/Users/terrycho/traning_datav2/training/wsmith.jpg,Will Smith

/Users/terrycho/traning_datav2/training/wsmith061408.jpg,Will Smith

/Users/terrycho/traning_datav2/training/wsmith1.jpg,Will Smith

 

검증용 파일들은 validate_file.txt에 마찬가지로  파일위치와, 사람명(라벨)을 저장한다.

사용 방법은 다음과 같다.

python com/terry/face/extract/crop_face.py “원본 파일이있는 디렉토리" “아웃풋 디렉토리"

(원본 파일 디렉토리안에는 {사람이름명} 디렉토리 아래에 사진들이 쭈욱 있는 구조라야 한다.)

 

자 그러면, 코드의 주요 부분을 살펴보자

 

VISION API 초기화 하기

  def __init__(self):

       # initialize library

       #credentials = GoogleCredentials.get_application_default()

       scopes = ['https://www.googleapis.com/auth/cloud-platform']

       credentials = ServiceAccountCredentials.from_json_keyfile_name(

                       './terrycho-ml-80abc460730c.json', scopes=scopes)

       self.service = discovery.build('vision', 'v1', credentials=credentials)

 

초기화 부분은 Google Vision API를 사용하기 위해서 OAuth 인증을 하는 부분이다.

scope를 googleapi로 정해주고, 인증 방식을 Service Account를 사용한다. credentials 부분에 service account key 파일인 terrycho-ml-80abc460730c.json를 지정한다.

 

얼굴 영역 찾아내기

다음은 이미지에서 얼굴을 인식하고, 얼굴 영역(사각형) 좌표를 리턴하는 함수를 보자

 

   def detect_face(self,image_file):

       try:

           with io.open(image_file,'rb') as fd:

               image = fd.read()

               batch_request = [{

                       'image':{

                           'content':base64.b64encode(image).decode('utf-8')

                           },

                       'features':[

                           {

                           'type':'FACE_DETECTION',

                           'maxResults':MAX_FACE,

                           },

                           {

                           'type':'LABEL_DETECTION',

                           'maxResults':MAX_LABEL,

                           }

                                   ]

                       }]

               fd.close()

       

           request = self.service.images().annotate(body={

                           'requests':batch_request, })

           response = request.execute()

           if 'faceAnnotations' not in response['responses'][0]:

                print('[Error] %s: Cannot find face ' % image_file)

                return None

               

           face = response['responses'][0]['faceAnnotations']

           label = response['responses'][0]['labelAnnotations']

           

           if len(face) > 1 :

               print('[Error] %s: It has more than 2 faces in a file' % image_file)

               return None

           

           roll_angle = face[0]['rollAngle']

           pan_angle = face[0]['panAngle']

           tilt_angle = face[0]['tiltAngle']

           angle = [roll_angle,pan_angle,tilt_angle]

           

           # check angle

           # if face skew angle is greater than > 20, it will skip the data

           if abs(roll_angle) > MAX_ROLL or abs(pan_angle) > MAX_PAN or abs(tilt_angle) > MAX_TILT:

               print('[Error] %s: face skew angle is big' % image_file)

               return None

           

           # check sunglasses

           for l in label:

               if 'sunglasses' in l['description']:

                 print('[Error] %s: sunglass is detected' % image_file)  

                 return None

           

           box = face[0]['fdBoundingPoly']['vertices']

           left = box[0]['x']

           top = box[1]['y']

               

           right = box[2]['x']

           bottom = box[2]['y']

               

           rect = [left,top,right,bottom]

               

           print("[Info] %s: Find face from in position %s and skew angle %s" % (image_file,rect,angle))

           return rect

       except Exception as e:

           print('[Error] %s: cannot process file : %s' %(image_file,str(e)) )

           

 

 

맨 처음에는 얼굴 영역을 추출하기전에, 같은 파일이 예전에 사용되었는지를 확인한다.

           image = Image.open(fd)  

 

           # extract hash from image to check duplicated image

           m = hashlib.md5()

           with io.BytesIO() as memf:

               image.save(memf, 'PNG')

               data = memf.getvalue()

               m.update(data)

 

           if image_hash in global_image_hash:

               print('[Error] %s: Duplicated image' %(image_file) )

               return None

           global_image_hash.append(image_hash)

 

이미지에서 md5 해쉬를 추출한후에, 이 해쉬를 이용하여 학습 데이타로 사용된 파일들의 해쉬와 비교한다. 만약에 중복되는 것이 없으면 이 해쉬를 리스트에 추가하고 다음 과정을 수행한다.

 

VISION API를 이용하여, 얼굴 영역을 추출하는데, 위의 코드에서 처럼 image_file을 읽은후에, batch_request라는 문자열을 만든다. JSON 형태의 문자열이 되는데, 이때 image라는 항목에 이미지 데이타를 base64 인코딩 방식으로 인코딩해서 전송한다. 그리고 VISION API는 얼굴인식뿐 아니라 사물 인식, 라벨인식등 여러가지 기능이 있기 때문에 그중에서 타입을 ‘FACE_DETECTION’으로 정의하여 얼굴 영역만 인식하도록 한다.

 

request를 만들었으면, VISION API로 요청을 보내면 응답이 오는데, 이중에서 response 엘리먼트의 첫번째 인자 ( [‘responses’][0] )은 첫번째 얼굴은 뜻하는데, 여기서 [‘faceAnnotation’]을 하면 얼굴에 대한 정보만을 얻을 수 있다. 이중에서  [‘fdBoundingPoly’] 값이 얼굴 영역을 나타내는 사각형이다. 이 갑ㄱㅅ을 읽어서 left,top,right,bottom 값에 세팅한 후 리턴한다.

 

그리고 얼굴의 각도 (상하좌우옆)를 추출하여, 얼국 각도가 각각 20도 이상 더 돌아간 경우에는 학습 데이타로 사용하지 않고 필터링을 해냈다.

다음은 각도를 추출하고 필터링을 하는 부분이다.

           roll_angle = face[0]['rollAngle']

           pan_angle = face[0]['panAngle']

           tilt_angle = face[0]['tiltAngle']

           angle = [roll_angle,pan_angle,tilt_angle]

           

           # check angle

           # if face skew angle is greater than > 20, it will skip the data

           if abs(roll_angle) > MAX_ROLL or abs(pan_angle) > MAX_PAN or abs(tilt_angle) > MAX_TILT:

               print('[Error] %s: face skew angle is big' % image_file)

               return None

 

 

VISION API에서 추가로 “FACE DETECTION” 뿐만 아니라 “LABEL_DETECTION” 을 같이 수행했는데 이유는 선글라스를 쓰고 있는 사진을 필터링하기 위해서 사용하였다. 아래는 선글라스 있는 사진을 검출하는  코드이다.

           # check sunglasses

           for l in label:

               if 'sunglasses' in l['description']:

                 print('[Error] %s: sunglass is detected' % image_file)  

                 return None

 

얼굴 잘라내고 리사이즈 하기

앞의 detect_face에서 필터링하고 찾아낸 얼굴 영역을 가지고 그 부분만 전체 사진에서 잘라내고, 잘라낸 얼굴을 학습에 적합하도록 같은 크기 (96x96)으로 리사이즈 한다.

이런 이미지 처리를 위해서 PIL (Python Imaging Library - http://www.pythonware.com/products/pil/)를 사용하였다.

   def crop_face(self,image_file,rect,outputfile):

       try:

           fd = io.open(image_file,'rb')

           image = Image.open(fd)  

           crop = image.crop(rect)

           im = crop.resize(IMAGE_SIZE,Image.ANTIALIAS)

           im.save(outputfile,"JPEG")

           fd.close()

           print('[Info] %s: Crop face %s and write it to file : %s' %(image_file,rect,outputfile) )

       except Exception as e:

           print('[Error] %s: Crop image writing error : %s' %(image_file,str(e)) )

image_file을 인자로 받아서 , rect 에 정의된 사각형 영역 만큼 crop를 해서 잘라내고, resize 함수를 이용하여 크기를 96x96으로 조정한후 (참고 IMAGE_SIZE = 96,96 로 정의되어 있다.) outputfile 경로에 저장하게 된다.        

 

실행을 해서 정재된 데이타는 다음과 같다.



  

생각해볼만한점들

이 코드는 간단한 토이 프로그램이기 때문에 간단하게 작성했지만 실제 운영환경에 적용하기 위해서는 몇가지 고려해야 할 사항이 있다.

먼저, 이 코드는 싱글 쓰레드로 돌기 때문에 속도가 상대적으로 느리다 그래서 멀티 쓰레드로 코드를 수정할 필요가 있으며, 만약에 수백만장의 사진을 정재하기 위해서는 한대의 서버로 되지 않기 때문에, 원본 데이타를 여러 서버로 나눠서 처리할 수 있는 분산 처리 구조가 고려되어야 한다.

또한, VISION API로 사진을 전송할때는 BASE64 인코딩된 구조로 서버에 이미지를 직접 전송하기 때문에, 자칫 이미지 사이즈들이 크면 네트워크 대역폭을 많이 잡아먹을 수 있기 때문에 가능하다면 식별이 가능한 크기에서 리사이즈를 한 후에, 서버로 전송하는 것이 좋다. 실제로 필요한 얼굴 크기는 96x96 픽셀이기 때문에 필요없이 1000만화소 고화질의 사진들을 전송해서 네트워크 비용을 낭비하지 않기를 바란다.

 

다음은 이렇게 정재한 파일들을 텐서플로우에서 읽어서 실제로 학습하는 모델을 만들어보겠다.


위의 코드를 멀티 프로세스&멀티쓰레드로 돌리는 아키텍쳐와 코드는 http://bcho.tistory.com/1177 글을 참고하기 바란다.

 

머신러닝 모델 개발 삽질기

빅데이타/머신러닝 | 2017.04.24 14:27 | Posted by 조대협

머신러닝 모델 개발 삽질 경험기


조대협 (http://bcho.tistory.com)


딥러닝을 공부하고 CNN 모델을 기반으로 무언가를 만들어보겠다는 생각에, 해외 유명 연예인 얼굴 사진을 가져다가 분류하는 얼굴 인식 모델을 만들어 보기로 하였다.

아직도 진행중이지만, 많은 시행 착오를 겪었는데 같은 시행 착오를 겪지 않고 경험을 공유하기 위해서 겪었던 시행 착오들을 정리해 본다.

학습 데이타 확보 및 분류

먼저 학습용 데이타를 수집 하는 것이 가장 문제 였다. 인터넷에서 사진을 모아서 학습 데이타로 사용해도 되겠지만, 아무래도 저작권 및 초상권 문제가 있고, 일일이 사진을 하나씩 받아서 수집하거나 또는 별도의 수집기를 만드는 것도 부담이 되었다.

그래서 찾은 것이 pubfig라는 셀럽 얼굴 데이타인데 http://www.cs.columbia.edu/CAVE/databases/pubfig/

상용 목적이 아니라 연구용 목적이면 사용이 가능하다. 이 데이타는 파일 URL, 셀럽 이름 형태로 라벨링이 되어 있기 때문에, 학습에 적합하리라고 생각하고, 이 파일을 기반으로 데이타를 수집하였다.


여기서 생긴 문제는, 이 데이타가 오래된 데이타라서 존재하지 않는 파일이 다수 있었고, 이 경우 파일을 저장하고 있는 사이트에서, 404 Not found와 같은 이미지를 리턴하였기 때문에, 이를 필터링해야 하였고, 같은 사진이 중복되서 오는 문제등이 있었기 때문에,상당량을 일일이 필터링을 해야 했다.


그리고, 사진상에, 여러 얼굴이 있는 이미지가 많았기 때문에, VISION API로 얼굴을 인식해서 얼굴 사진만 잘라낼 요량이었기 때문에, 독사진만을 일일이 보고 골라내야 했다. 나중에 생각해보니 VISION API로 얼굴이 한명만 인식이 되는 사진만 필터링을 했으면 됐을텐데. 불필요한 작업이 많았다.

라벨을 문자열로 쓴 문제

학습 데이타에 대한 라벨을 생성할때, 괜히 가독성을 높힌다고 라벨을 문자열로 해서 각 사람의 이름을 사용하였다.

CNN에서 마지막은 Softmax는 matrix이기 때문에, 라벨 문자열을 나중에 list.indexOf를 이용하여 배열로 변경할 예정이었는데, 파이썬에서는 쉽게 될지 몰라고, 텐서플로우 코드에서는 이 과정이 쉽지 않았다.

그래서..

결국은 라벨 데이타를 문자열이 아니라, 0~44의 int 값으로 재 생성한후에,


   batch_label_on_hot=tf.one_hot(tf.to_int64(batch_label),

       FLAGS.num_classes, on_value=1.0, off_value=0.0)


tf.one_hot 함수를 이용하여, 1*45 행렬로 바뀌어서 사용하였다.

학습용 및 검증용 데이타를 초기에 분류하지 않았던 문제

학습데이타를 준비할때, 학습 데이타를 학습용과 검증용으로 따로 분류를 해놨어야 하는데, 이 작업을 안해서, 결국 모델을 만들다가 다시 학습 데이타를 7:3 비율로 학습 데이타와 검증용 데이타로 분류하는 작업을 진행하였다.

학습 데이타의 분포가 골고르지 못했던 문제

사진을 모으는 과정에서 필터링 되서 버려지는 데이타가 많았고, 원본 데이타 역시 사람별로 사진 수가 고르지 못했기 때문에, 결과적으로 모여진 학습 데이타의 분포가 사람별로 고르지 못했다.

학습데이타가 많은 셀럽은 200~250장, 적은 사람은 50장으로 편차가 컸다.


이로 인해서 첫번째 모델의 학습이 끝난 후에, 모델을 검증해보니, 학습 데이타를 많이 준 사람으로 대부분 분류를 해냈다. 47개의 클래스 약 6000장의 사진으로 5시간 학습 시킨 결과, 예측을 검증하는 과정에서 90%이상을 모두 브래드피트로 인식해내는 문제가 생겼다. (내 맥북이 브레드피트를 좋아하는가??)


그래서 결과적으로 학습데이타와 검증 데이타를 클래스별로 분포를 같게 하기 위해서, 클래스당 약 50 장의 샘플 사진으로 맞춰서 예측 결과가 편중되는 현상을 해결하려고 하였다.

학습 순서가 클래스별로 된 문제

클래스별 학습 데이타의 양을 균일하게 맞췄음에도 불구하고, 모델의 학습 결과가 특정 클래스들로 편향되는 현상이 발생하였다.

이는 학습을 시킬때, 골고루 학습을 시켜야 하는데, 학습 데이타를 순서대로 학습을 시켰기 때문에 발생한 문제이다. 즉 풀어서 말하자면, “브래드 피트"를 20번 학습 시키고, “안젤리나 졸리"를 20분 학습 시키고, “브루스 윌리스”를 20번 학습 시켜서 모델이 첫 학습데이타 쪽으로 편향되는 현상이 발생한것인데, 이를 해결하려면 학습 데이타를 랜덤으로 만들어서 학습시켜야 한다.

예를 들어 “브래드 피트”,”안젤리나 졸리",”브루스 윌리스",”안젤리나 졸리",”브루스 윌리스", ”안젤리나 졸리",“브래드 피트” …. 이런식으로 말이다.

즉 코드 상에서 배치 데이타를 읽어올때 셔플 처리를 하면되는데 이를 위해서 데이타를 읽는 부분을 다음과 같이 변경 하였다.


def get_input_queue(csv_file_name,num_epochs = None):

   train_images = []

   train_labels = []

   for line in open(csv_file_name,'r'):

       cols = re.split(',|\n',line)

       train_images.append(cols[0])

       # 3rd column is label and needs to be converted to int type

       train_labels.append(int(cols[2]) )

                           

   input_queue = tf.train.slice_input_producer([train_images,train_labels],

                                              num_epochs = num_epochs,shuffle = True)

   

   return input_queue


get_input_queue 함수에, csv_file_name을 인자로 주면, 이 파일을 한줄 단위로 읽어서, 첫번째는 파일명, 세번째는 라벨로 읽은 후에, 각각 train_images와  train_lables에 각각 string과 int 형으로 저장한다

그 다음이 배열을 가지고 tf.train.slice_input_producer를 사용하면 배열에서 데이타를 읽어 드리는 input queue 를 생성하는데, 이때 인자로 shuffle = True로 주면 데이타를 리턴 할때 순차적으로 리턴하지 않고 셔플된 형태로 랜덤하게 리턴한다.


def read_data(input_queue):

   image_file = input_queue[0]

   label = input_queue[1]

   

   image =  tf.image.decode_jpeg(tf.read_file(image_file),channels=FLAGS.image_color)

   

   return image,label,image_file


다음으로, 이 큐를 이용하여 이미지 파일명과, 라벨을 읽어서 이미지 파일 데이타(텐서)와 라벨로 읽는 코드를 read_data라는 함수로 구현하였다. 입력값은 input_queue인데, input queue에서 데이타를 읽으면 첫번째는 이미지 파일명, 두번째는 라벨이 되는데, 첫번째 파일명을 tf.image.decode_jpeg함수를 이용하여 텐서로 읽은후, 읽은 이미지 데이타와 라벨을 리턴하였다.


def read_data_batch(csv_file_name,batch_size=FLAGS.batch_size):

   input_queue = get_input_queue(csv_file_name)

   image,label,file_name= read_data(input_queue)

   image = tf.reshape(image,[FLAGS.image_size,FLAGS.image_size,FLAGS.image_color])

   

   batch_image,batch_label,batch_file = tf.train.batch([image,label,file_name],batch_size=batch_size)

                                                      #,enqueue_many=True)

   batch_file = tf.reshape(batch_file,[batch_size,1])


   batch_label_on_hot=tf.one_hot(tf.to_int64(batch_label),

       FLAGS.num_classes, on_value=1.0, off_value=0.0)

   return batch_image,batch_label_on_hot,batch_file


마지막으로, 배치로 데이타를 읽는 함수 부분에서 앞에 정의한 get_input_queue와 read_data 함수를 이용하여 데이타를 shuffle 된 상태로 읽은 후에, tf.train.batch를 이용하여 일정한 개수 (배치) 형태로 리턴하도록 하였다.


그 결과 예측 결과가 한쪽으로 편향되는 현상을 없앨 수 는 있었다.

샘플 데이타의 부족

데이타 편향 현상은 잡았지만, 클래스의 수(45)에 대비하여, 샘플데이타의 수(클래스당 50개)로 부족하여, 학습을 계속 진행해도 cross entropy 함수는 4~7 사이에서 왔다갔다 하면서 더 이상 0으로 수렴하지 않았고 정확도되 0~35% 사이를 왔다갔다 하면서 수렴을 하지 않았다.


그래서, 학습 이미지의 색이나, 방향등을 변경하는 방법으로 데이타를 뻥튀기 하려고 하는데, 이 부분은 아직 작업중.

그외에 자잘한 삽질

모 그외에도 엄청 여러가지 삽질을 하고 있다. 그래도 모델 하나 제대로 만들어봐야 겠다는 생각에 끝까지 우격다짐으로 진행하고 있지만, 학습을 돌다가 스크린 세이버나, 절전 모드로 들어가서 학습이 중단된 사례. 모델을 개발하다가 중간에 텐서 플로우 버전이 올라가서 코드를 수정한 일. 맥에서 개발하다가 윈도우 머신에 GPU로 바꿨더니, 파이썬 2.7이 아니라 파이썬 3.5만 지원을 해서, 2.7 코드를 모두 다시 고친일등.


머신러닝이 과학이나 수학보다 노가다라는데, 몸소 느끼는 중.


연예인 얼굴 인식 서비스를 만들어보자 #1 - 학습데이타 준비하기


조대협 (http://bcho.tistory.com)


CNN 에 대한 이론 공부와 텐서 플로우에 대한 기본 이해를 끝내서 실제로 모델을 만들어보기로 하였다.

CNN을 이용한 이미지 인식중 대중적인 주제로 얼굴 인식 (Face recognition)을 주제로 잡아서, 이 모델을 만들기로 하고 아직 실력이 미흡하여 호주팀에서 일하고 있는 동료인 Win woo 라는 동료에게 모델과 튜토리얼 개발을 부탁하였다.


이제 부터 연재하는 연예인 얼굴 인식 서비스는 Win woo 가 만든 코드를 기반으로 하여 설명한다. (코드 원본 주소 : https://github.com/wwoo/tf_face )

얼굴 데이타를 내려 받자

먼저 얼굴 인식 모델을 만들려면, 학습을 시킬 충분한 데이타가 있어야 한다. 사람 얼굴을 일일이 구할 수 도 없고, 구글이나 네이버에서 일일이 저장할 수 도 없기 때문에, 공개된 데이타셋을 활용하였는데, PubFig (Public Figures Face Database - http://www.cs.columbia.edu/CAVE/databases/pubfig/) 를 사용하였다.



이 데이타셋에는 약 200명에 대한 58,000여장의 이미지를 저장하고 있는데, 이 중의 일부만을 사용하였다.

Download 페이지로 가면, txt 파일 형태 (http://www.cs.columbia.edu/CAVE/databases/pubfig/download/dev_urls.txt) 로 아래와 같이


Abhishek Bachan 1 http://1.bp.blogspot.com/_Y7rzCyUABeI/SNIltEyEnjI/AAAAAAAABOg/E1keU_52aFc/s400/ash_abhishek_365x470.jpg 183,60,297,174 f533da9fbd1c770428c8961f3fa48950
Abhishek Bachan 2 http://1.bp.blogspot.com/_v9nTKD7D57Q/SQ3HUQHsp_I/AAAAAAAAQuo/DfPcHPX2t_o/s400/normal_14thbombaytimes013.jpg 49,71,143,165 e36a8b24f0761ec75bdc0489d8fd570b
Abhishek Bachan 3 http://2.bp.blogspot.com/_v9nTKD7D57Q/SL5KwcwQlRI/AAAAAAAANxM/mJPzEHPI1rU/s400/ERTYH.jpg 32,68,142,178 583608783525c2ac419b41e538a6925d


사람이름, 이미지 번호, 다운로드 URL, 사진 크기, MD5 체크섬을 이 필드로 저장되어 있다.

이 파일을 이용하여 다운로드 URL에서 사진을 다운받아서, 사람이름으로된 폴더에 저장한다.

물론 수동으로 할 수 없으니 HTTP Client를 이용하여, URL에서 사진을 다운로드 하게 하고, 이를 사람이름 폴더 별로 저장하도록 해야 한다.


HTTP Client를 이용하여 파일을 다운로드 받는 코드는 일반적인 코드이기 때문에 별도로 설명하지 않는다.

본인의 경우에는 Win이 만든 https://github.com/wwoo/tf_face/blob/master/tf/face_extract/pubfig_get.py 코드를 이용하여 데이타를 다운로드 받았다.

사용법은  https://github.com/wwoo/tf_face 에 나와 있는데,


$> python tf/face_extract/pubfig_get.py tf/face_extract/eval_urls.txt ./data

를 실행하면 ./data 디렉토리에 이미지를 다운로드 받아서 사람 이름별 폴더에 저장해준다.

evals_urls.txt에는 위에서 언급한 dev_urls.txt 형태의 데이타가 들어간다.


사람 종류가 너무 많으면 데이타를 정재하는 작업이 어렵고, (왜 어려운지는 뒤에 나옴) 학습 시간이 많이 걸리기 때문에, 약 47명의 데이타를 다운로드 받아서 작업하였다.

쓰레기 데이타 골라내기

데이타를 다운받고 나니, 아뿔사!! PubFig 데이타셋이 오래되어서 없는 이미지도 있고 학습에 적절하지 않은 이미지도 있다.


주로 학습에 적절하지 않은 데이타는 한 사진에 두사람 이상의 얼굴이 있거나, 이미지가 사라져서 위의 우측 그림처럼, 이미지가 없는 형태로 나오는 경우인데, 이러한 데이타는 어쩔 수 없이 눈으로 한장한장 다 걸러내야만 하였다.

아마 이 작업이 가장 오랜 시간이 걸린 작업이 아닐까도 한다. 더불어서 머신러닝이 정교한 수학이나 알고리즘이 아니라 노가다라고 불리는 이유를 알았다.

얼굴 추출하기

다음 학습에 가능한 데이타를 잘 골라내었으면, 학습을 위해서 사진에서 얼굴만을 추출해내야 한다. 포토샵으로 일일이 할 수 없기 때문에 얼굴 영역을 인식하는 API를 사용하기로한다. OPEN CV와 같은 오픈소스 라이브러리를 사용할 수 도 있지만 구글의 VISION API의 경우 얼굴 영역을 아주 잘 잘라내어주고, 코드 수십줄만 가지고도 얼굴 영역을 알아낼 수 있기 때문에 구글 VISION API를 사용하였다.

https://cloud.google.com/vision/




VISION API ENABLE 하기

VISION API를 사용하기 위해서는 해당 구글 클라우드 프로젝트에서 VISION API를 사용하도록 ENABLE 해줘야 한다.

VISION API를 ENABLE하기 위해서는 아래 화면과 같이 구글 클라우드 콘솔 > API Manager 들어간후




+ENABLE API를 클릭하여 아래 그림과 같이 Vision API를 클릭하여 ENABLE 시켜준다.




SERVICE ACCOUNT 키 만들기

다음으로 이 VISION API를 호출하기 위해서는 API 토큰이 필요한데, SERVICE ACCOUNT 라는 JSON 파일을 다운 받아서 사용한다.

구글 클라우드 콘솔에서 API Manager로 들어간후 Credentials 메뉴에서 Create creadential 메뉴를 선택한후, Service account key 메뉴를 선택한다



다음 Create Service Account key를 만들도록 하고, accountname과 id와 같은 정보를 넣는다. 이때 중요한것이 이 키가 가지고 있는 사용자 권한을 설정해야 하는데, 편의상 모든 권한을 가지고 있는  Project Owner 권한으로 키를 생성한다.


(주의. 실제 운영환경에서 전체 권한을 가지는 키는 보안상의 위험하기 때문에 특정 서비스에 대한 접근 권한만을 가지도록 지정하여 Service account를 생성하기를 권장한다.)




Service account key가 생성이 되면, json 파일 형태로 다운로드가 된다.

여기서는 terrycho-ml-80abc460730c.json 이름으로 저장하였다.


예제 코드

그럼 예제를 보자 코드의 전문은 https://github.com/bwcho75/facerecognition/blob/master/com/terry/face/extract/crop_face.py 에 있다.


이 코드는 이미지 파일이 있는 디렉토리를 지정하고, 아웃풋 디렉토리를 지정해주면 이미지 파일을 읽어서 얼굴이 있는지 없는지를 체크하고 얼굴이 있으면, 얼굴 부분만 잘라낸 후에, 얼굴 사진을 96x96 사이즈로 리사즈 한후에,

70%의 파일들은 학습용으로 사용하기 위해서 {아웃풋 디렉토리/training/} 디렉토리에 저장하고

나머지 30%의 파일들은 검증용으로 사용하기 위해서 {아웃풋 디렉토리/validate/} 디렉토리에 저장한다.


그리고 학습용 파일 목록은 다음과 같이 training_file.txt에 파일 위치,사람명(라벨) 형태로 저장하고

/Users/terrycho/traning_datav2/training/wsmith.jpg,Will Smith

/Users/terrycho/traning_datav2/training/wsmith061408.jpg,Will Smith

/Users/terrycho/traning_datav2/training/wsmith1.jpg,Will Smith


검증용 파일들은 validate_file.txt에 마찬가지로  파일위치와, 사람명(라벨)을 저장한다.

사용 방법은 다음과 같다.

python com/terry/face/extract/crop_face.py “원본 파일이있는 디렉토리" “아웃풋 디렉토리"

(원본 파일 디렉토리안에는 {사람이름명} 디렉토리 아래에 사진들이 쭈욱 있는 구조라야 한다.)


자 그러면, 코드의 주요 부분을 살펴보자


VISION API 초기화 하기

  def __init__(self):

       # initialize library

       #credentials = GoogleCredentials.get_application_default()

       scopes = ['https://www.googleapis.com/auth/cloud-platform']

       credentials = ServiceAccountCredentials.from_json_keyfile_name(

                       './terrycho-ml-80abc460730c.json', scopes=scopes)

       self.service = discovery.build('vision', 'v1', credentials=credentials)


초기화 부분은 Google Vision API를 사용하기 위해서 OAuth 인증을 하는 부분이다.

scope를 googleapi로 정해주고, 인증 방식을 Service Account를 사용한다. credentials 부분에 service account key 파일인 terrycho-ml-80abc460730c.json를 지정한다.


얼굴 영역 찾아내기

다음은 이미지에서 얼굴을 인식하고, 얼굴 영역(사각형) 좌표를 리턴하는 함수를 보자


   def detect_face(self,image_file):

       try:

           with io.open(image_file,'rb') as fd:

               image = fd.read()

               batch_request = [{

                       'image':{

                           'content':base64.b64encode(image).decode('utf-8')

                           },

                       'features':[{

                           'type':'FACE_DETECTION',

                           'maxResults':MAX_RESULTS,

                           }]

                       }]

               fd.close()

       

           request = self.service.images().annotate(body={

                           'requests':batch_request, })

           response = request.execute()

           if 'faceAnnotations' not in response['responses'][0]:

                print('[Error] %s: Cannot find face ' % image_file)

                return None

               

           face = response['responses'][0]['faceAnnotations']

           box = face[0]['fdBoundingPoly']['vertices']

           left = box[0]['x']

           top = box[1]['y']

               

           right = box[2]['x']

           bottom = box[2]['y']

               

           rect = [left,top,right,bottom]

               

           print("[Info] %s: Find face from in position %s" % (image_file,rect))

           return rect

       except Exception as e:

           print('[Error] %s: cannot process file : %s' %(image_file,str(e)) )

 

VISION API를 이용하여, 얼굴 영역을 추출하는데, 위의 코드에서 처럼 image_file을 읽은후에, batch_request라는 문자열을 만든다. JSON 형태의 문자열이 되는데, 이때 image라는 항목에 이미지 데이타를 base64 인코딩 방식으로 인코딩해서 전송한다. 그리고 VISION API는 얼굴인식뿐 아니라 사물 인식, 라벨인식등 여러가지 기능이 있기 때문에 그중에서 타입을 ‘FACE_DETECTION’으로 정의하여 얼굴 영역만 인식하도록 한다.


request를 만들었으면, VISION API로 요청을 보내면 응답이 오는데, 이중에서 response 엘리먼트의 첫번째 인자 ( [‘responses’][0] )은 첫번째 얼굴은 뜻하는데, 여기서 [‘faceAnnotation’]을 하면 얼굴에 대한 정보만을 얻을 수 있다. 이중에서  [‘fdBoundingPoly’] 값이 얼굴 영역을 나타내는 사각형이다. 이 갑ㄱㅅ을 읽어서 left,top,right,bottom 값에 세팅한 후 리턴한다.


얼굴 잘라내고 리사이즈 하기

앞의 detect_face에서 찾아낸 얼굴 영역을 가지고 그 부분만 전체 사진에서 잘라내고, 잘라낸 얼굴을 학습에 적합하도록 같은 크기 (96x96)으로 리사이즈 한다.

이런 이미지 처리를 위해서 PIL (Python Imaging Library - http://www.pythonware.com/products/pil/)를 사용하였다.

   def crop_face(self,image_file,rect,outputfile):

       try:

           fd = io.open(image_file,'rb')

           image = Image.open(fd)  

           crop = image.crop(rect)

           im = crop.resize(IMAGE_SIZE,Image.ANTIALIAS)

           im.save(outputfile,"JPEG")

           fd.close()

           print('[Info] %s: Crop face %s and write it to file : %s' %(image_file,rect,outputfile) )

       except Exception as e:

           print('[Error] %s: Crop image writing error : %s' %(image_file,str(e)) )

image_file을 인자로 받아서 , rect 에 정의된 사각형 영역 만큼 crop를 해서 잘라내고, resize 함수를 이용하여 크기를 96x96으로 조정한후 (참고 IMAGE_SIZE = 96,96 로 정의되어 있다.) outputfile 경로에 저장하게 된다.        


실행을 해서 정재된 데이타는 다음과 같다.


생각해볼만한점들

이 코드는 간단한 토이 프로그램이기 때문에 간단하게 작성했지만 실제 운영환경에 적용하기 위해서는 몇가지 고려해야 할 사항이 있다.

먼저, 이 코드는 싱글 쓰레드로 돌기 때문에 속도가 상대적으로 느리다 그래서 멀티 쓰레드로 코드를 수정할 필요가 있으며, 만약에 수백만장의 사진을 정재하기 위해서는 한대의 서버로 되지 않기 때문에, 원본 데이타를 여러 서버로 나눠서 처리할 수 있는 분산 처리 구조가 고려되어야 한다.

또한, VISION API로 사진을 전송할때는 BASE64 인코딩된 구조로 서버에 이미지를 직접 전송하기 때문에, 자칫 이미지 사이즈들이 크면 네트워크 대역폭을 많이 잡아먹을 수 있기 때문에 가능하다면 식별이 가능한 크기에서 리사이즈를 한 후에, 서버로 전송하는 것이 좋다. 실제로 필요한 얼굴 크기는 96x96 픽셀이기 때문에 필요없이 1000만화소 고화질의 사진들을 전송해서 네트워크 비용을 낭비하지 않기를 바란다.


다음은 이렇게 정재한 파일들을 텐서플로우에서 읽어서 학습 데이타로 활용하는 방법에 대해서 알아보겠다.


텐서플로우 - 파일에서 학습데이타를 읽어보자#2


CSV 파일을 읽어보자

조대협 (http://bcho.tistory.com)


이 글은 http://bcho.tistory.com/1163 의 두번째 글이다. 앞의 글을 먼저 읽고 읽기를 권장한다.

앞의 글에서는 트레이닝 파일명의 목록을 읽어서 큐에 넣고, 파일명을 하나씩 읽어오는 처리 방법에 대해서 알아보았다. 이번 글에서는 그 파일들에 있는 데이타를 읽어서 파싱한 후, 실제 트레이닝 세션에 학습용 데이타로 불러들이는 방법을 설명하도록 한다.

파일에서 데이타 읽기 (Reader)

finename_queue에 파일명이 저장되었으면, 이 파일들을 하나씩 읽어서 처리하는 방법을 알아본다.

파일에서 데이타를 읽어오는 컴포넌트를 Reader라고 한다. 이 Reader들은 filename_queue에 저장된 파일들을 하나씩 읽어서, 그 안에 있는 데이타를 읽어서 리턴한다.


예를 들어 TextLineReader의 경우에는 , 텍스트 파일에서, 한줄씩 읽어서 문자열을 리턴한다.


꼭 텐서플로우에서 미리 정해져있는 Reader 들을 사용할 필요는 없지만, 미리 정의된 Reader를 쓰면 조금 더 편리하다.

미리 정의된 Reader로는 Text File에서, 각 필드가 일정한 길이를 가지고 있을때 사용할 수 있는, FixedLengthRecordReader 그리고, 텐서플로우 데이타를 바이너리 포맷으로 저장하는 TFRecord 포맷에 대한 리더인 TFRecordReader 등이 있다.


Reader를 사용하는 방법은 다음과 같다.

reader = tf.TextLineReader()

key,value = reader.read(filename_queue)


먼저 Reader 변수를 지정한 다음, reader.read를 이용하여 filename_queue 로 부터 파일을 읽게 하면 value에 파일에서 읽은 값이 리턴이 된다

예를 들어 csv 파일에 아래와 같은 문자열이 들어가 있다고 할때


167c9599-c97d-4d42-bdb1-027ddaed07c0,1,2016,REG,3:54

67ea7e52-333e-43f3-a668-6d7893baa8fb,1,2016,REG,2:11

9e44593b-a870-446e-aed5-90a22ab0c952,1,2016,REG,2:32

48832a52-e56c-467f-a1ef-c6f8c6e908ea,1,2016,REG,2:17


위의 코드 처럼, TextLineReader를 이용하여 파일을 읽게 되면 value에는

처음에는 “167c9599-c97d-4d42-bdb1-027ddaed07c0,1,2016,REG,3:54”이, 다음에는 “67ea7e52-333e-43f3-a668-6d7893baa8fb,1,2016,REG,2:11” 문자열이 순차적으로 리턴된다.

읽은 데이타를 디코딩 하기 (Decoder)

Reader에서 읽은 값은 파일의 원시 데이타 (raw)데이타이다. 아직 파싱(해석)이 된 데이타가 아닌데,

예를 들어 Reader를 이용해서 csv 파일을 읽었을 때, Reader에서 리턴되는 값은 csv 파일의 각 줄인 문자열이지, csv 파일의 각 필드 데이타가 아니다.


즉 우리가 학습에서 사용할 데이타는

167c9599-c97d-4d42-bdb1-027ddaed07c0,1,2016,REG,3:54

하나의 문자열이 아니라

Id = “167c9599-c97d-4d42-bdb1-027ddaed07c0”,

Num  = 1

Year = 2016

rType = “REG”

rTime = “3:54”

과 같이 문자열이 파싱된 각 필드의 값이 필요하다.


이렇게 읽어드린 데이타를 파싱 (해석) 하는 컴포넌트를 Decoder라고 한다.


Reader와 마찬가지로, Decoder 역시 미리 정해진 Decoder 타입이 있는데, JSON,CSV 등 여러가지 데이타 포맷에 대한 Decoder를 지원한다.

위의 CSV 문자열을 csv 디코더를 이용하여 파싱해보자


record_defaults = [ ["null"],[1],[1900],["null"],["null"]]

id, num, year, rtype , rtime = tf.decode_csv(

   value, record_defaults=record_defaults,field_delim=',')


csv decoder를 사용하기 위해서는 각 필드의 디폴트 값을 지정해줘야 한다. record_default는 각 필드의 디폴트 값을 지정해 주는 것은 물론이고, 각 필드의 데이타 타입을 (string,int,float etc)를 정의 하는 역할을 한다.

디폴트 값은 csv 데이타에서 해당 필드가 비워져 있을때 채워 진다.

위에서는 record_deafult에서 첫번째 필드는 string 형이고 디폴트는 “null”로, 두번째 필드는 integer 형이고, 디폴트 값은 1로, 세번째 필드는 integer 형이고 디폴트는 1900 으로, 네번째와 다섯번째 필드는 모두 string형이고, 디폴트 값을 “null” 로 지정하였다.

이 디폴트 값 세팅을 가지고 tf.decode_csv를 이용하여 파싱 한다.

value는 앞에서 읽어 드린 CSV 문자열이다. record_defaults= 를 이용하여 레코드의 형과 디폴트 값을 record_defaults에 정해진 값으로 지정하였고, CSV 파일에서 각 필드를 구분하기 위한 구분자를 ‘,’를 사용한다는 것을 명시 하였다.

다음 Session을 실행하여, 이 Decoder를 실행하면 csv의 각 행을 파싱하여, 각 필드를 id,num,year,rtype,rtime이라는 필드에 리턴하게 된다.


이를 정리해보면 다음과 같은 구조를 가지게 된다.


예제

위에서 설명한 CSV 파일명을 받아서 TextLineReader를 이용하여 각 파일을 읽고, 각 파일에서 CSV 포맷의 데이타를 읽어서 출력하는 예제의 전체 코드를 보면 다음과 같다.


import tensorflow as tf

from numpy.random.mtrand import shuffle


#define filename queue

filename_queue = tf.train.string_input_producer(['/Users/terrycho/training_datav2/queue_test_data/b1.csv'

                                                ,'/Users/terrycho/training_datav2/queue_test_data/c2.csv']

                                                ,shuffle=False,name='filename_queue')

# define reader

reader = tf.TextLineReader()

key,value = reader.read(filename_queue)


#define decoder

record_defaults = [ ["null"],[1],[1900],["null"],["null"]]

id, num, year, rtype , rtime = tf.decode_csv(

   value, record_defaults=record_defaults,field_delim=',')


with tf.Session() as sess:

   

   coord = tf.train.Coordinator()

   threads = tf.train.start_queue_runners(sess=sess, coord=coord)

   

   for i in range(100):

       print(sess.run([id, num, year, rtype , rtime]))

   

   coord.request_stop()

   coord.join(threads)                                        


지금까지 파일에서 데이타를 읽어서 학습 데이타로 사용하는 방법에 대해서 알아보았다.

다음에는 이미지 기반의 CNN 모델을 학습 시키기 위해서 이미지 데이타를 전처리 하고 읽는 방법에 대해서 설명하도록 하겠다.

머신러닝에서 학습용 데이타양 늘리기


머신러닝에 대해서 공부하다가 강연을 들은적이 있었는데, 그때 많이 들었던 이야기가 데이타 뻥튀기에 대한 이야기 였다.

확보할 수 있는 원본 데이타의 양이 한정되어 있으니, 현재의 데이타를 가지고 그 양을 늘리는 방법인데. 어떻게 하나 사실 궁금했는데.

(얼굴의 경우 선글라스를 씌우거나 기타의 방법을 생각했는데..)


오늘 튜토리얼을 보다보니, 구체적인 그 방법이 나와 있어서 잠깐 메모 해놓는다

https://www.tensorflow.org/tutorials/deep_cnn


여기서 소개된 방법은

  • 이미지의 좌/우를 바꾼다거나, 
  • 이미지의 밝기나 선명도를 바꾸는 방법을 사용한다.




파이어베이스 애널러틱스를 이용한 모바일 데이타 분석

#2-분석 지표와 대쉬 보드 이해하기


조대협 (http://bcho.tistory.com)


파이어베이스 애널러틱스로 지표를 수집하게 되면, 몬가 아름다워(?) 보이는 대쉬 보드와 그래프들을 볼 수 있다. 그러나 정작 각 그래프의 항목과 수치가 무엇을 의미하는지를 이해하지 못한다면 무용 지물이나 다름없다.


비단 파이어베이스 애널러틱스 뿐 아니라, 일반적인 데이타 분석에서도 많이 겪는 실수중에 하나인데, 이번에는 파이어베이스 애널러틱스에 의해서 분석되어 리포트로 제공되는 각종 지표와 이와 연관된 이벤트들에 대해서 알아보도록 한다.

대쉬 보드

파이어베이스 애널러틱스를 사용하게 되면 리포트는 대쉬보드를 통하여 출력되게 된다. 대쉬 보드는 대략 아래와 같이 생겼는데, 각 항목을 살펴보도록 하자



출처 https://support.google.com/firebase/answer/6317517?hl=en&ref_topic=6317489

기준 시간

분석 지표에 대한 이해를 하기 위해서는 먼저 기준 시간에 대한 이해를 할 필요가 있다. 파이어베이스 애널러틱스 콘솔의 우측 상단의 보면 분석 기간을 선택할 수 있다. 분석 기간은 오늘, 어제, 이번주, 지난 7일, 지난 30일 등 미리 정해진 기간이나 Custom을 이용하여, 기간을 정의할 수 있다.


1. Active User (활성 사용자수)

가장 처음에 나오는 지표는 활성 사용자 수 이다. 가장 많이 보는 지표중의 하나인데, 일,월,주별 방문자 수 이다.


  • Monthly Active User (MAU:월별 활성 사용자 수)
    그래프의 X축의 날짜에서 부터 부터 전 30일까지의 앱을 사용한 총 일일 사용자 수.

  • Weekly Active User (WAU:주별 활성 사용자 수)
    그래프의 X축의 날짜에서 부터 전 7일 까지 앱을 사용한 총 일일 사용자의 수

  • Daily Active User (DAU : 주별 활성 사용자 수)
    그래프의 X축 날짜의 앱을 사용한 일일 사용자의 수


위의 그래프를 보면 WAU와 DAU는 수평을 그리고 있는데, 반하여 MAU가 올라가고 있음을 볼 수 있다. 이 그래프는 파이어베이스 애널러틱스를 설치한지 얼마 되지 않는 기간에 뽑은 리포트인데, DAU는 일정하기 때문에, MAU는 누적되서 그래프가 상승 곡선을 띄게 되는 것이다.

예를 들어 8월1일에 설치했다고 했을때, 8월2일의 MAU는 7월3일~8월2일 DAU의 합이 되는데, 8월 1일에 설치를 했기 때문에 7월3일~7월30일까지의 데이타는 없다. 8월 30일의 MAU는 8월1일~8월30일까지 합이고, 8월1~30일까지는 데이타가 있기 때문에 누적되서 상승 곡선을 그리게 된다.

2. Average Revenue (평균 수익)

다음 지표는 수익 지표이다. 크게 ARPU와 ARPPU로 표현되는데 그 내용은 다음과 같다.

  • ARPU (Average revenue per User)
    사용자별 수익률로, 전체 수익을 전체 사용자 수로 나눠서 계산한다.

  • ARPPU (Average revenue per purchased user)
    유료 사용자별 수익률로, 전체수익을 비용을 지불한 사용자로 나눠서 계산한다.

전체 서비스가 유료가 아닌 이상, 커머스의 경우 일부 사용자만 물건을 구매하거나, 게임이나 서비스 앱인 경우에는 일부 사용자만 인앱구매등을 통해서 비용을 지불하기 때문에 다른 두개의 지표가 나온다.

ARPU는 서비스에서 사용자가 증가하는 당 수익률이 어떻게 올라가는지를 알 수 있고, ARPPU는 유료 사용자당 얼마의 금액을 사용하는지를 이해할 수 있다.


이 지표는 파이어베이스 애널러틱스에서  ecommerce_purchase (쇼핑몰 이벤트 중, 구매 이벤트)와 in_app_purchase (일반 이벤트중 인앱 구매) 이벤트에 의해서 추적되기 때문에, ARPU와 ARPPU를 구하고 싶으면, 상품구매나 인앱 구매가 발생하였을때, 위의 이벤트를 통해서 파이어베이스 애널러틱스에 이벤트를 로깅해줘야 한다.  


3. first_open attribution (앱실행 빈도)

다음 지표는 첫 앱 실행을 추적하는 지표이다.

기준 시간 기간 동안 인스톨 또는 재 인스톨이 된후, 처음으로 앱이 실행된 횟수를 추적하는 지표이다.

이 지표는 다양한 의미를 가지고 있는데, 앱 다운로드가 캠페인등을 통해서 많이 일어났다고 하더라도, 앱을 한번도 실행을 해보지 않고 삭제하는 경우도 많기 때문에, 앱 다운로드 대비, 얼마나 많은 사용자가 실제로 앱을 실행했는 가를 추적할 수 있다.

앱 다운로드 횟수는 구글 플레이 스토어나 애플 앱스토어의 사용자 콘솔에서 그 값을 추적할 수 있다.


또한 “NETWORK SETTING”에서 광고 서비스 네트워크를 연동할 수 있는데, 광고 네트워크를 연동하게 되면 앱의 설치가 사용자가 앱스토어에서 그냥 자발적으로 설치를 한것인지 아니면 광고 네트워크의 특정 광고 캠페인을 통해서 인입된 사용자인지를 판단할 수 있다.



<그림 광고 네트워크를 연동하는 화면 >


이를 통해서, 광고 마케팅의 효율과, 성과를 측정하여 효율적인 광고 집행이 가능하다.

앱 첫실행을 기록하는 first_open 이벤트는 개발자가 별도로 코드 상에 정의하지 않더라도 자동으로 로깅 된다.

아래 예제를 보자, 광고 네트워크를 통하지 않고, 앱을 처음 사용한 것이 150K 정도 되고, 다음은 구글을 통해서 들어온 비중이 38K  정도가 된다.



맨뒤에, LTV 라는 수치가 있는데, LTV는 Life Time Value의 약자로 사용자가 앱을 설치 한 후, 초기 120일 동안에 일으킨 매출의 수의 총합이다. 매출은 ARPU와 같이   ecommerce_purchase (쇼핑몰 이벤트 중, 구매 이벤트)와 in_app_purchase (일반 이벤트중 인앱 구매) 이벤트에 의해서 추적된다.

이를 통해서 광고 네트워크별로 얼마만큼의 사용자가 들어오고, 유입된 사용자가 발생 시킨 매출을 추적하여 광고의 효율을 측정할 수 있다.


여기서 포스트백 (PostBack)이라는 기능을 잠깐 짚고 넘어갈 필요가 있는데, 쇼핑몰에서 광고 네트워크를 통해서 광고를 집행하고 있다고 하자, 사용자가 호텔 예약을 하고 싶어하는 니즈를 파악하고 사용자에게 호텔 예약 광고를 계속 내보냈다. 광고를 통해서 사용자는 호텔을 예약했다고 하자. 그렇다면 이제 더이상 해당 사용자에게 호텔 광고가 계속 나가면 안된다. (이미 팔았기 때문에) 이를 막기 위해서 광고 네트워크에 해당 물건을 사용자가 구매했으니, 더 이상 같은 광고를 내보내지 말라고 알려줘야 한다. 이를 포스트 백(Postback)이라고 한다. 파이어베이스 애널러틱스에서 포스트백을 설정하는 방법은 https://support.google.com/firebase/answer/6317518?hl=en&utm_id=ad#postbacks 를 참고하기 바란다.

4. Retention cohort (사용자 잔존율 코호트 분석)

다음 지표는 사용자 잔존율을 코호트 분석을 통해서 분석해낸 결과로, 사용자가 처음 앱을 사용한 후 얼마나 많은 사용자가 지속적으로 남아 있느냐를 나타내는 중요한 지표이다. 주 단위 잔존율을 기준으로 통계를 잡아주는데, 잔존 사용자가 많을 수록, 그래프가 더 진하게 표시 되는데, 다음 예제를 보면, 7월17일~7월23일에 가입한 사용자는 총 19481명으로 첫주에는 100% 사용자가 잔존하였으나, 1주 후에는 23.5%만 남았고, 2 주후에는 12.2%만 남았다가 5주후에는 6.4%만 남았다.

7월31~8월6일에 가입한 사용자의 경우 1주차에 23.7%가 남아 있어서 다른 주 대비 잔존율이 높아서 조금 더 진한 색깔이 그래프로 표현되었다.



5. User engagement (사용자 활동 지표)

사용자 활동 지표란, 사용자들이 기간동안 얼마나 앱을 사용했느냐에 대한 기간과 횟수등을 표현하는 지표들이다. 아래 그래프 예제로 설명하면




  • Daily engagement (총 사용시간)
    통계 기간 (기준 시간 기간) 동안 모든 사용자들이 앱을 사용한 총 시간의 합이다. 위의 예에서는 1년 34일 14시간을 사용한것으로 집게 되었다.

  • Daily engagement per user (사용자당 평균 사용 시간)
    통계 기간중, 사용자 1인당 평균 사용시간이다. Daily engagement를 그 기간 동안 총 활성 사용자 수로 나눈 값이다.

  • Session per user (사용자당 평균 세션 수 )
    사용자당 평균 세션 수 인데, 세션은 사용자가 기간동안 앱을 사용한 횟수로 보면 되다. 위의 예제에서는 사용자당 평균 3.7 회 정도 사용하였다.

  • Avg. session duration (사용자당 평균 세션 길이)
    사용자당 세션의 길이로, 한번 사용할때 평균 얼마 정도의 시간을 사용하느냐인데, 여기서는 사용자당 한번 사용에 7분 8초 정도를 사용한것으로 집게 되었다.


이런 통계 분석에서 주의할점은 이는 어디까지나 평균 값일 뿐이다. 특정 사용자는 기간동안 평균값이 3.7회가 넘는 10회 20회를 사용할 수 도 있고, 어떤 사용자 층은 한번 밖에 사용하지 않을 수 도 있다. 일반적으로 모바일 서비스 앱은 그 사용횟수나 사용 시간에 대한 분포가 특정 사용자군 (헤비유저)에게 몰리는 경향이 있기 때문에, 이러한 평균 지표보다는 정규 분포형의 지표를 따라서 분석하는 것이 조금 더 정확한데, 이를 위해서는 파이어베이스 애널러틱스의 지표만으로는 불가능하고, 원본 데이타를 기반으로 분석을 할 필요가 있다. 이를 위해서 원본 데이타를 빅쿼리에 저장한 후 분석하는 것이 좋은데, 이 방법은 나중에 다시 설명하도록 하겠다.

6. In-App purchase (인앱 구매)

이 지표는 인앱 구매에 대한 지표로, in_app_purchase 이벤트에 의해서 수집된 정보를 기반으로 통계를 계산한다. 총 얼마 만큼의 사용자가, 인앱 구매를 했는지를 출력하고, 이를 통해서 발생된 매출을 출력한다.

아울러 아래 그림과 같이 최고 매출을 일으킨 인앱 구매 상품들을 구매 횟수와 총 매출액을 통계로 표시해준다.



아래의 “VIEW IN-APP PURCHASE DETAILS” 탭을 클릭하면, 모든 인앱 상품의 매출 정보와 판매 추이,  사용자 연령대별 매출 발생 비중등 자세한 정보를 볼 수 있다.


<그림. 인앱 구매 이벤트 집게 화면에서 상세 화면중 성별 및 연령 별 구매 비율 >


7. App version (앱 버전)

통계 기간 동안 모든 사용자가 사용한 앱 버전에 대한 통계를 보여준다. 상위 3개의 버전을 보여주고, 나머지는 Others로 묶어서 통계로 보여준다.


앱 버전 역시 모바일 서비스에서 매우 중요한 지표중의 하나인데, 신기능이나 신규 컨텐츠가 올라가더라도 버전이 옛날 버전이 많이 깔려 있을 경우 신규 기능이나 컨텐츠가 동작하지 않을 수 도 있기 때문에, 얼마나 사용자들이 새 버전으로 업데이트했는지 추적하는 것이 중요한 지표가 되며, 아울러 경우에 따라서 예전 버전이 많을 경우에는 강제 업데이트를 해야 하는 경우도 있기 때문에, 앱 버전에 대한 추적 역시 매우 중요한 지표로 작용하낟.

8. Devices (디바이스)

통계 기간동안에 사용자가 앱을 사용하는데 사용한 주요 디바이스명과, OS 버전에 대한 통계이다.

디바이스명은 테스트 환경을 만들때 사용자들이 주로 어떤 디바이스를 사용하는지를 알면 테스트 디바이스를 준비하기가 편리하기 때문이고, OS version의 경우, 낮은 버전의 OS에서는 특정 SDK나 기능이 작동하지 않을 수 있기 때문에 앱 개발시 어느 OS 버전 부터 지원을 해야 할지, 그리고 사용 빈도가 낮은 OS는 언제 지원을 중단할 수 있을지등을 결정할 수 있는 지표로 활용이 가능하다.


9. Location(위치)

이해는 쉽지만 가장 중요한 지표중의 하나이다. 해당 기간동안 주로 어느 국가에서 앱이 많이 사용되었는 가를 리포팅 해주는 지표이다.


국내나 특정 국가 한정 서비스인 경우가 아닌 글로벌 서비스인 경우 서비스가 어느 나라에서 인기가 있는 가에 따라서, 그 나라에 맞도록 앱을 현지화 하거나, 앱에 대한 마케팅 자원등을 선택과 집중할 수 있다.

10. Demographics (데모그래픽 정보)

데모 그래픽 정보는 사용자의 연령과 성별등을 나타내는 정보이다.

이를 통하여 앱 사용자가 누구인지를 파악할 수 있고, 이를 기반으로 앱 서비스를 타케팅할 수 있는 대상을 식별하여, 제공할 컨텐츠, 마케팅 캠페인 대상등을 정할 수 있다.  



11. Interest (사용자 흥미)

마지막으로 이 앱 서비스를 사용하는 사용자가 어떤 흥미를 가지고 있는지를 분석 해주는 기능인데,

이러한 모바일 분석 플랫폼을 무료로 제공하는 서비스 제공자는 구글뿐아니라 야후, 트위터와 같이 광고를 통해서 수익을 창출하는 경우가 많다. 이러한 사업자등은 자사의 서비스에서 사용자들이 어떤 서비스나 어떤 컨텐츠를 선호 하는지를 분석한 후에, 이를 기반으로 모바일 데이타 분석 플랫폼을 사용하는 앱 개발사들의 사용자들이 어떤 컨텐츠나 서비스를 선호하는지를 추적 분석해주는데, 이것이 Interest 분석이다.


위의 그림과 같이 이 앱을 사용하는 사용자들은 TV나 온라인 비디오에 관심이 많은 사용자들이 7.6%, 그리고 음악에 관심이 있는 사용자들이 6.7%, 카메라나 전자 제품에 관심 있는 사용자들이 3.6% 정도이다.

이를 통해서 앱 사용자들을 대상으로 한 타겟 광고나 서비스 개선등에 활용할 수 있다.


지금까지 간략하게나마 파이어베이스 애널러틱스 대쉬보드의 주요 지표에 대해서 설명하였다.

여기에 나오는 지표들은 파이어베이스뿐 아니라 일반적인 모바일 앱 서비스 분석 지표로도 사용되는 만큼, 잘 이해해놓으면 모바일 서비스 빅데이타 분석에 유용하게 활용할 수 있다.


다음 글에서는 파이어베이스 애널러틱스의 주요 이벤트들에 대해서 설명하도록 하겠다.


빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개


조대협 (http://bcho.tistroy.com)


빅데이타 분석에 있어서, 아키텍쳐적으로 중요한 모듈중의 하나는 여러 서버로 부터 생성되는 데이타를 어떻게 모을 것인가이다. 얼마전에, 일본의 사례를 보다가 눈에 띄는 솔루션이 있어서 주말을 통해서 이런 저런 테스트를 해봤다.


Embulk 소개

Embulk라는 솔루션인데, fluentd를 만들었던 사람이 만들었다고 한다.

여러 종류의 데이타 소스에서 데이타를 읽어서 로딩을 할 수 있다. 주요 특징을 보면

  • 플러그인 형태로 여러개의 소스와 타겟을 지원한다.
    jRuby로 개발이 되어서 ruby gem을 이용하여 손쉽게 플러그인을 설치할 수 있다.

  • 병렬 로딩이 가능하다.
    예를 들어 여러개의 파일을 동시에 로딩하거나 또는 하나의 큰 파일이라도 자동으로 여러개의 파일로 쪼게서 병렬로 로딩을 함으로써, 로딩 속도를 올릴 수 있다.

  • 변환이 가능하다.
    파일 포맷 변환뿐 아니라, 각 필드에 대한 형 변환 그리고, 간단한 필드 맵핑 등이 가능하다.

  • 스키마 예측 (Schema guessing)
    입력 데이타를 보고, 자동으로 입력 데이타의 스키마(테이블 구조)를 예측한다. 일일이 설정을 하려면 귀찮은 일인데, 자동으로 스키마를 인식해주시기 때문에, 설정양을 줄일 수 있다.

전제적인 개념은 미니 ETL과 유사하다고 볼 수 있는데, 그 사용법이 매우 쉽다.

Embulk 설치

이 글에서는 로컬에 있는 CSV 포맷의 파일을 구글 클라우드의 빅쿼리로 로딩하는 예제를 통해서 어떻게 Embulk를 사용하는지를 알아보겠다.

VM 생성

테스트는 구글 클라우드 VM에서 진행한다. 4코어 Ubuntu VM을 생성하고 테스트 데이타를 복사하였다.

VM을 생성할때, 빅쿼리 API를 사용할 것이기 때문에, Cloud API access scopes에 BigQuery API access 권한을 반드시 부여해야 한다.


이 예제에서는 VM 생성시 모든 Cloud API에 대한 사용권한을 부여한체 생성하였다. VM을 생성한 후에, 콘솔에서 VM 상세 정보를 확인해보면 위의 그림과 같이 “This instance has full API access to all Google Cloud services.”로, 모든  구글  클라우드 API에 대한 권한을 가지고 있는 것을 확인할 수 있다.

자바 설치

구글 Ubuntu VM에는 디폴트로 자바가 설치되어있지 않기 때문에, JVM을 설치한다.

% sudo apt-get update

% sudo apt-get install default-jre

Embulk 설치

JVM 설치가 끝났으면 Embulk를 설치해보자. 다음 명령어를 실행하면 Embulk 가 설치된다.

% curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
% source ~/.bashrc

Embulk는 ~/.embulk 디렉토리에 설치가 된다.

다음으로, 빅쿼리에 결과를 쓸 예정이기 때문에, 빅쿼리 Output 플러그인을 설치한다.

%embulk gem install embulk-output-bigquery


Embulk 로 빅쿼리에 CSV 파일 로딩하기

로딩할 데이타 살펴보기

로딩에 사용한 데이타는 게임 이벤트에 대한 데이타를 시뮬레이션 해놓은 것으로, 사용자가 NPC를 만나서 전투를 하는 각각의 이벤트를 기록해놓은 파일이다. 파일이름은 events000000000001 CSV 파일 포맷이고 총 1220395 레코드에, 243 MB의 크기이며 데이타 포맷은 다음과 같다.


파일 포맷은 다음과 같다.


eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


첫줄에, CSV 파일에 대한 컬럼명이 들어가고 두번째 줄 부터, “,” delimiter를 이용하여 각 컬럼을 구별하여 실 데이타가 들어가 있다.

스키마 예측을 통하여 자동으로 Config 파일 생성하기

이제, Embulk를 통해서 이 파일을 로딩하기 위해서, config 파일을 생성해보자.

Embulk에서 config 파일은 스키마 자동 예측을 통해서 자동으로 생성해낼 수 있다. Config 파일을 생성하기 위해서는 input과 output 에 대한 기본 정보를 기술해줘야 하는데, 다음과 같이 seed.yml 파일에 기본 정보를 기술한다.


in:  

 type: file  

 path_prefix: "/home/terrycho/data/events"

out:  

 type: bigquery


path_prefix에는 파일명을 정의하는데, /home/terrycho/data/events는 /home/terrycho/data/ 디렉토리내에 events*로 시작하는 모든 파일에 대해서 로딩을 하겠다는 정의이다.


seed.yml 파일 설정이 끝났으면 config 파일을 생성해보자

% embulk guess ./seed.yml -o config.yml

명령을 실행하면 아래와 같이 config.yml 파일이 생성된다.


in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: string}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out: {type: bigquery}


생성된 config.yml 파일을 보면 firstLogin 컬럼의 데이타 형이 string으로 되어 있는 것을 볼 수 있다. 빅쿼리 테이블에서 이 필드의 형은 실제로는 boolean이다. 아무래도 자동 인식이기 때문에, 이렇게 형들이 다르게 인식되는 경우가 있기 때문에, 생성 후에는 반드시 검토를 하고 알맞은 형으로 수정을 해줘야 한다.


다음으로 위의 파일에 데이타를 로딩할 빅쿼리에 대한 정보를 정의해줘야 한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   escape: '"'

   trim_if_not_quoted: false

   skip_header_lines: 1

   allow_extra_columns: false

   allow_optional_columns: false

   columns:

   - {name: eventTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: userId, type: string}

   - {name: sessionId, type: string}

   - {name: sessionStartTime, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}

   - {name: eventId, type: string}

   - {name: npcId, type: string}

   - {name: battleId, type: string}

   - {name: firstLogin, type: boolean}

   - {name: playerAttackPoints, type: long}

   - {name: playerHitPoints, type: long}

   - {name: playerMaxHitPoints, type: long}

   - {name: playerArmorClass, type: long}

   - {name: npcAttackPoints, type: long}

   - {name: npcHitPoints, type: long}

   - {name: npcMaxHitPoints, type: long}

   - {name: npcArmorClass, type: long}

   - {name: attackRoll, type: long}

   - {name: damageRoll, type: long}

   - {name: currentQuest, type: long}

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 table: game_event

 source_format: CSV


“out:” 부분을 위와 같이 수정하였다.

mode는 append 모드로, 기존 파일에 데이타를 붙이는 모드로 하였다. auth_method에는 빅쿼리 API 호출을 위한 인증 방식을 정의하는데, 구글 클라우드의 VM에서 호출하기 때문에, compute_engine이라는 인증 방식을 사용하였다. (구글 클라우드의 VM에서 같은 프로젝트 내의 빅쿼리 API를 호출할 경우에는 별도의 인증을 생략할 수 있다.) 다른 인프라드에서 호출할 경우에는 IAM에서 Service account를 생성한 후에, json  파일을 다운 받아서, json 파일 인증 방식으로 변경하고, 다운 로드 받은 json 파일을 지정해주면 된다.

다음으로, project,dataset,table에, 로딩할 빅쿼리 데이블에 대한 프로젝트명, 데이타셋명, 테이블명을 기술해주었다. 그리고 마지막으로 입력 포맷이 CSV임을 source_format에서 CSV로 정의하였다.


이제 데이타 로딩을 위한 모든 준비가 끝났다.

Config 파일 테스트

데이타 로딩을 하기 전에, 이 config 파일이 제대로 작동하는지 테스트를 해보자

%embulk preview config.yml

의 명령어는 데이타를 읽어서 제대로 파싱을 하는지 설정 파일은 문제가 없는지 테스트를 해주는 명령어이다.

명령을 실행하면 다음과 같이 일부 데이타를 읽어서 파싱을 하고 결과를 보여주는 것을 볼 수 있다.



실행하기

테스트가 끝났으면 실제로 데이타를 로딩해보자. 로딩은 아래와 같이 embulk run 명령어를 사용하면 된다.

%embulk run config.yml

실제로 실행한 결과 약 12분이 소요되었다.


멀티 쓰레드를 이용하여 로딩 속도 올리기

앞에서 설명하였듯이, Embulk는 패레럴 로딩이 지원된다. 아래와 같이 config.yml 파일에 exec이라는 부분에, max_threads수와, min_output_tasks 수를 정해주면 되는데, min_output_tasks 수는 최소로 동시 실행할 로딩 테스크 수이다. 5로 정했기 때문에, 이 시나리오에서는 하나의 CSV 파일을 업로드 하기 때문에, 이 파일을 5개의 작은 파일로 잘라서 동시에 5개의 쓰레드로 동시에 업로딩 한다.


exec:

 max_threads: 20

 min_output_tasks: 5

in:

 type: file

 path_prefix: /home/terrycho/data/events

 parser:

 :


실제로 테스트한 결과 디폴트 설정에서는 초당 약 1200줄을 업로드하였는데, 반하여, min_output_tasks를 5개로 하였을때는 초당 2000개 내외를 업로드 하였다. min_output_tasks를 10개,20개로 올려봤으나 성능은 비슷하였다. (아마 튜닝을 잘못한듯)

Parser-none으로 로딩 속도 올리기

앞의 시나리오는 데이타 라인을 각각 읽어서 컬럼을 일일이 파싱하고 이를 입력하도록 하는 시나리오였다.

만약에 CSV나 JSON 입력 파일이 빅쿼리 입력 포맷에 맞도록 이미 포매팅이 되어있다면, 일일이 파싱할 필요가 없다.

그냥 파일을 읽어서 파싱 없이 바로 빅쿼리에 insert만 하면되기 때문에, 이 경우에는 Parser를 제거하면 되는데, Parsing을 하지 않는 Parser로 embulk-parser-none이 있다.

이 Parser 다음과 같이 설치한다.

$ embulk gem install embulk-parser-none

다음 config 파일을 다음과 같이 수정한다.


in:

 type: file

 path_prefix: /home/terrycho/data/events000000000001_nohead

 parser:

   type: none

   column_name: payload

out:

 type: bigquery

 mode: append

 auth_method: compute_engine

 project: useful-hour-138023

 dataset: gamedata

 schema_file: /home/terrycho/data/gameevent.schema.json

 table: game_event

 payload_column_index: 0


이때 중요한것중 하나는 데이타 파일 (CSV)파일 첫줄에 데이타에 대한 컬럼 정보가 들어가 있으면 안된다.

그래서 아래와 같이 원본 데이타 파일에서 첫줄을 지운다.

eventTime,userId,sessionId,sessionStartTime,eventId,npcId,battleId,firstLogin,playerAttackPoints,playerHitPoints,playerMaxHitPoints,playerArmorClass,npcAttackPoints,npcHitPoints,npcMaxHitPoints,npcArmorClass,attackRoll,damageRoll,currentQuest

2015-11-29 01:31:10.017236 UTC,user875@example.com,688206d6-adc4-5e60-3848-b94e51c3707b,2015-11-29 01:29:20.017236 UTC,npcmissedplayer,boss15,6e4232df-26fa-22f1-fa04-465e85b34c1e,,15,3,15,15,15,15,15,15,11,,15

:


다음 embulk run을 이용하여 이 config 파일을 실행해보면 같은 데이타인데도 로딩 타임이 약 50초 정도 밖에 소요되지 않는 것을 확인할 수 있다.

빅쿼리 관련 몇가지 추가 옵션

이외에도 다양한 옵션이 존재하기 때문에, 빅쿼리 output 플러그인 페이지인 https://github.com/embulk/embulk-output-bigquery 를 참고하기 바란다.

자동으로 중복을 제거하는 기능이나, 로딩할때 마다 동적으로 빅쿼리 테이블을 생성하는 기능등이 있으니 반드시 참고하기 바란다.

GCS를 경유하는 업로딩

Embulk의 패레럴 로딩이 좋기는 하지만 의외의 문제가 발생할 수 있는 부분이 하나가 있는데, 하나의 파일을 로딩하는데 Embulk는 여러개의 태스크로 병렬 처리를 하기 때문에, 빅쿼리 입장에서는 각각의 태스크가 빅쿼리 로딩 JOB으로 인식이 될 수 있다. 일반적으로 빅쿼리 JOB은 하루에 10,000개만 실행할 수 있는 제약을 가지고 있다. 그래서 만약에 데이타 로딩이 많을 경우 이런 병렬 로딩은 JOB 수를 깍아 먹는 원인이 될 수 있는데, bigquery output 플러그인에서는 다음과 같은 해법을 제공한다.


빅쿼리로 데이타를 로딩할때 GCS (Google Cloud Storage)를 사용하여, 와일드카드 (*)를 사용할 경우에는 하나의 디렉토리에 있는 여러 파일을 병렬로 로딩할 수 있으며, 이때 와일드 카드를 사용한 JOB은 하나의 JOB으로 인식된다. (병렬로 여러 파일을 로딩하더라도)


그래서 out 옵션에 다음과 같이 GCS  관련 옵션을 설정해주면 파일을 직접 로컬에서 로딩하는 것이 아니라, 처리를 다 끝난 파일을 GCS 버킷으로 업로딩한 후에, GCS 버킷에서 로딩을 하게 되기 때문에, JOB수를 줄일 수 있다.


out:

 type: bigquery

 gcs_bucket: bucket_name

 auto_create_gcs_bucket: false


성능과 활용도에 대한 분석

각 시나리오에 대한 성능 테스트 결과 값은 다음과 같다.

CSV를 구글에서 제공되는 bq load 명령어를 이용해도 108초가 나오는데 반해서, non-parser를 이용하면 파일을 자동으로 쪼게서 보내기 때문에, bq load를 이용하여 하나의 파일로 업로드 하는 것보다 높은 성능이 나온다.


시나리오

성능

bq load 명령어를 이용한 로딩

108초

CSV 파서를 사용한 경우

12분

non parser를 사용한 경우

50초


하나 고려할 사항은 Parser나 Filter의 경우 ruby로 개발된 것이 있고, java로 개발된 것들이 있는데, ruby로 개발된 플러그인의 경우 성능이 java 대비 많이 느리기 때문에 가급적이면 java로 개발된것을 사용하도록 한다.


다양한 데이타 소스와 저장소가 지원이 되고, 설정이 매우 간단하며 간단한 포맷 변환등이 지원되는 만큼, 쉽고 빠르게 데이타 연동 파이프라인을 구축하는데 활용도가 매우 높다. 이와 유사한 솔루션으로는 fluentd등이 있는데, fluentd는 조금 더 실시간 즉 스트리밍 데이타에 초점이 맞춰져 있으며, Embulk는 배치성 분석에 맞춰져 있다.


참고 자료




구글 빅데이타 플랫폼 빅쿼리 소개


조대협 (http://bcho.tistory.com)


구글의 클라우드 관련 기술중 무엇이 좋은게 있을까 살펴 보면서 기술을 하나하나씩 보다 보니, 구글 클라우드의 특징은 여러가지가 있겠지만, 데이타 회사 답게 빅데이타 및 머신 러닝 플랫폼이 상당히 강하다.


그중에서 빅데이타 플랫폼의 중심에 BIG QUERY라는 빅데이타 플랫폼이 있어서, 몇 회에 걸쳐서 빅쿼리에 대해서 소개해보고자 한다.

구글 빅데이타 분석의 역사

구글은 빅데이타를 다루면서, 그 근간이 되는 기술들의 논문들을 공개했다. 하둡 파일 시스템의 시초가 되는 GFS나, 하둡의 시초인 MapReduce 논문, 그리고 Hive를 통해 오픈소스화가 된 Big Table등의 논문들이 있다. 구글의 빅쿼리는 Dremel 이라는 논문을 근간으로 한다.

빅쿼리랑 무엇인가?

빅쿼리는 페타 바이트급의 데이타 저장 및 분석용 클라우드 서비스이다.

요즘은 페타바이트급의 data warehouse로 부르는데, 쉽게 말해서 페타바이트급의 데이타를 저장해놓고, 쿼리를 통해서 조회나 통계 작업등을 할 수 있는 DB(라고 보기에는 약간 애매하지만)이다.

빅쿼리의 특징

대략적인 특징을 살펴보면 다음과 같다.

클라우드 서비스로 설치/운영이 필요 없음 (NoOps)

어디에 설치해서 사용하는 서비스가 아니라 구글 클라우드 서비스를 통해서 제공되는 빅데이타 저장 분석 서비스이다. 클릭 몇번으로 서비스 사용이 가능하고, 별도의 설정이나 운영이 필요 없다.

SQL 언어 사용

기존의 RDBMS에서 사용되는 SQL언어를 그대로 사용하기 때문에, 사용이 매우 쉽다.

클라우드 스케일의 인프라를 통한 대용량 지원과 빠른 성능

빅쿼리의 성능이나 스케일을 보려면 다음 예제를 보는게 좋다.

https://cloud.google.com/blog/big-data/2016/01/anatomy-of-a-bigquery-query


위키피디아에서 100 billion record (1000억개)의 레코드를 스캐닝해서 regular expression으로 “G.*o.*o.*g”) 문자열을 찾아내서 그 문서의 뷰수를 카운트 하는 예제이다.

대략 4TB 용량의 데이타가 핸들링 되고, 약 30초가 소요된다.

30초 동안, 약 3,300개의 CPU와, 330개의 하드 디스크, 330 Gigabit의 네트웍이 사용된다.

(자료 : https://cloud.google.com/blog/big-data/2016/01/bigquery-under-the-hood)

이 쿼리를 수행하는데 소요되는 비용은 딱 $20가 소요된다.

일반적인 인프라에서 빅데이타 연산을 하는데, 3300개의 CPU를 동시에 사용하기란 쉽지 않은 일이고, 이런 대용량 연산을 20$에 할 수 있는 것은 대용량 인프라를 공유하는 클라우드 서비스이기 때문에 가능하다.

데이타 복제를 통한 안정성

데이타는 3개의 복제본이 서로 다른 3개의 데이타 센터에 분산되어 저장되기 때문에 데이타에 대한 유실 위험이 적다.

배치와 스트리밍 모두 지원

한꺼번에 데이타를 로딩하는 배치 이외에도 REST API등을 통해서 실시간으로 데이타를 입력할 수 있는 스트리밍 기능을 제공하며, 스트리밍시에는 초당 100,000개의 행(row)의 데이타를 입력할 수 있다.

비용 정책

비용 정책 역시 클라우드 서비스 답게, DB 인스턴스와 같은 과금 방식이 아니라서 큰 데이타를 핸들링 하기 위해서 큰 인스턴스를 쓰고 사용하지 않는 동안에도 과금이 되는 정책이 아니라,

딱  저장되는 데이타 사이즈와, 쿼리시에 발생하는 트렌젝션 비용만큼만 과금이 된다.  데이타 저장 요금은 GB당 0.02$이고, 90일이 지나서 사용하지 않는 데이타는 자동으로 0.01$로 가격이 떨어진다.

클라우드 서비스에서 가격이 싸다는 일반적인 오브젝트 스토리지 (Google Cloud Storage : GB당 0.026$)보다 싸다. 트렌젝션 비용은 쿼리 수행시 스캔되는 데이타를 기준으로 TB당 $5 이다.  (월  1TB는 무료)
(나중에 자세하게 설명하겠지만, 스캔되는 컬럼당 비용이 나오기 때문에 사실상 비용을 계산해보면 그리 높지 않다)

가격 정책 : https://cloud.google.com/bigquery/pricing


빅쿼리가 기존의 빅데이타 플랫폼과 다른점은?

그렇다면 빅쿼리가 기존의 빅데이타 분석 플랫폼인 Hadoop, Spark등과의 차이가 무엇일까? 앞의 장점을 기반으로 그 차이점을 정리하자면 크게 다음과 같은 3가지를 들 수 있다.

쉽다.

보통 Hadoop이나 Spark등을 사용하게 되면, Map&Reduce(이하 MR) 로직을 사용하거나 SparkSQL을 사용하더라도 일정 수준 이상의 전문성이 필요하다. 또한 MR 로직의 경우 전문성이 있는 개발자가 분석 로직을 개발해야 하기 때문에 시간이 상대적으로 많이 소요되지만 빅쿼리는 로그인 후 SQL만 수행하면 되기 때문에, 상대적으로 빅데이타 분석이 쉽다.

운영이 필요 없다

Hadoop이나 Spark과 같은 빅데이타 솔루션의 경우에는 인스톨과 설정 그리고 클러스터의 유지 보수가 보통 일이 아니다. 그래서 별도의 운영 조직이 필요하고 여기에 많은 리소스가 소요되지만, 빅쿼리는 클라우드 서비스 이기 때문에, 별도의 운영등에 신경을 쓸 필요가 없이 개발과 분석에만 집중하면 된다.  

인프라에 대한 투자없이 막강한 컴퓨팅 자원을 활용

앞의 예에서 본것과 같이, 빅쿼리를 이용하면 수천개의 CPU와 수백/수천개의 컴퓨팅 자원을 사용할 수 있다. 물론 기존 빅데이타 플랫폼도 클라우드 환경에 올리면 수천개의 CPU를 사용하는 것이 가능은 하지만, 그 설정 작업과 비용적인 측면에서 차이가 크다.

빅쿼리 맛보기

그러면 직접 빅쿼리를 사용해보자. 빅쿼리 버전 HelloWorld라고 생각하면 된다.

가입 하기

http://cloud.google.com 으로 들어가서 구글 클라우드 서비스에 가입을 한후에,  로그인을 해서 아래 그림 처럼 결재 메뉴에서 빌링 정보를 입력한다 (신용 카드 정보 입력)



계정이 생성되면 자동으로 $300 의 무료 사용권이 생성되고, 이 금액은 60일동안 사용할 수 있다. (60일이 지나면 자동으로 소멸된다. ).

신용 카드 정보를 넣었더라도, 사용자가 직접 과금이 되는 플랜으로 업그레이드를 하지 않는 이상 과금이 되지 않으니 이 부분은 걱정하지 말기 바란다.

프로젝트 생성

구글 클라우드는 VM이나 각종 자원들을 프로젝트라는 개념으로 묶어서 사용한다. 처음 계정을 생성했으면 프로젝트가 없기 때문에 프로젝트를 생성하자.

아래 그림과 같이 상단 우측 메뉴에 프로젝트 생성 메뉴가 있다.


프로젝트 생성을 선택한 후 아래와 같이 프로젝트 이름을 입력하면 프로젝트가 생성된다.


빅쿼리 콘솔로 이동하기

프로젝트가 생성되었으면 메뉴에서 아래 그림과 같이 BigQuery 메뉴를 선택하게 되면 빅쿼리 웹 콘솔로 이동이 된다.




빅쿼리 메뉴로 들어가면 다음과 같은 작업 창이 나온다.




좌측은 프로젝트와 프로젝트에 속한 데이타셋과 테이블 리스트가 나온다.

나중에 데이타 모델을 다시 설명하겠지만, 데이타 셋 (dataset)은 RDBMS의 db와 같은 개념으로 테이블의 집합이라고 보면 되고, 그 안에 개별 테이블들이 들어가 있다.

우측 상단 쿼리 입력창에는 SQL을 입력해서 쿼리를 실행하고, 우측 아래에는 쿼리 결과를 볼 수 있다.

쿼리 실행

그러면 실제로 간단한 쿼리를 수행해보자

빅쿼리에서는 테스트를 위해서 몇가지 데이타 셋을 공개로 해놓았는데, bigquery-samples라는 데이타 셋에서 1000억개의 레코드를 가지고 있는  wikipedia_benchmark.Wiki100B 테이블에서, 위키 페이지 제목이 “Seoul”또는 “seoul”인 페이지의 제목과 뷰수를 쿼리를 해본다.


다음과 같이 쿼리를 입력하고


select title,sum(views) as views

from [bigquery-samples:wikipedia_benchmark.Wiki100B]

where regexp_match(title,'[Ss]eoul')

group by title

order by views desc;


쿼리 입력창 하단에 체크 마크를 누르면 다음과 같은 화면이 출력된다.


쿼리를 수행하기 전에, 쿼리가 제대로 되었는지 확인을 해주고, 위와 같이

Valid: This query will process 3.64 TB when run.”

3.64 TB를 스캐닝 할것임을 알려준다. (이를 통해서 쿼리 수행 비용을 예측해볼 수 있다.)


“Run Query” 버튼을 눌러서 쿼리를 수행하면 다음과 같은 결과를 얻을 수 있다.


RUN QUERY 버튼 가장 우측에 총 3.64TB를 처리했고, 총 수행 시간은 38.9초가 걸렸음을 확인할 수 있다.

그리고, 아래 쿼리 결과가 나온다.

Seoul 로 된 페이지가 11258720회 조회되었고, Seoul_National_University가 다음으로 894040회, FC_Seoul이 802570회 조회 된것을 확인할 수 있다.


지금까지 간략하게나마 빅쿼리에 대한 소개와 주요 특징 그리고 간단한 사용법을 소개했다.

다음 글에서는 빅쿼리의 내부 아키텍쳐에 대해서 설명하도록 한다.


Storm을 이용한 실시간 데이타 처리 #1-데이타 스트림 개념 이해하기

조대협(http://bcho.tistory.com)

 


Apache Storm 은 실시간 데이타 처리를 위한 시스템이다.. 나온지는 꽤 오래됬지만 요즘 빅데이타와 실시간 분석과 함께 다시 화두가 되고 있는데, Apache Storm에 대해서 알아보도록 하자.


 

스트리밍 처리에 대한 개념

Storm을 이해하기 위해서는 먼저 데이타 스트리밍9(Data Streaming) 에 대한 개념을 이해 해야 한다. 비유를 하자면 데이타가 흘러가는 강(river)와 같은 의미라고나 할까?

예를 들어보면, 트위터에서 생성되는 데이타 피드들은 일종의 데이타 스트림이다, 시간이 지나감에 따라서 끊임 없이 데이타 들이 생성된다.



<그림. 트위터의 트윗 데이타 스트림>

또는 공장의 팬베이어 벨트의 센서를 통해서 생성되는 각종 계측 값들도 데이타 스트림으로 볼 수 있다. 신용카드 트랜젝션, 전자 상거래 사이트의 구매 트렌젝션등. 이러한 데이타는 시간을 축으로 하여, 계속해서 생성되는 데이타 이다.

그렇다면 이런 스트림 데이타로 무엇을 할 수 있는가? 다시 트위터를 예를 들어보자


트위터 스트림을 이용한 소셜 마케팅 반응 분석

트위터 데이타 피드 스트림을 모니러링 하면서, 스마트폰과 관련된 키워드가 나오면 A社와 B社에 대한 스트림만을 수집하여, 각사별로 피드의 내용이 긍정적인지 부정적인지를 판단하는 시스템이 있다고 하자

이를 데이타 스트림 분석 개념으로 표현하면 다음과 같다.



<그림. 트위터 데이타 스트림 분석을 통한 제품 반응 분석>

먼저 스마트폰 관련 데이타 스트림을 모두 수집한다.

그 중에서, A사가 언급된 피드와, B사가 언급된 피드를 분리한다.

각 피드에 대해서, 문자의 구문이 긍정인지 부정인지를 판단한다. 예를 들어, Good,Awesome 등의 긍정적인 단어가 들어가 있으면 긍정적인 것으로 판단하고, Bad,fuck 등 부정적인 단어가 들어가 있으면 부정적인 반응으로 판단한다.

그후에, 각각의 반응에 대해서 카운트를 한다음에, 시계열(Time series) 그래프로 대쉬 보드에 표현한다.

정리해보면, 계속해서 들어오는 데이타들의 흐름을 여러 방향으로 분류하고 처리해서 계속해서 결과를 업데이트해주는 일종의 work flow와 같은 개념을 가지는 것이 데이타 스트림 처리이다.

이러한 데이타 스트림 처리는 일종의 중첩 함수의 개념으로도 볼수 있는데,

A사 제품에 대한 긍정적인 반응 = [“긍정적인 단어 필터 함수”(
[“A
사 제품에 대한 필터 함수”](“트위터의 스마트폰 관련 피드”)
);

와 같은 형태로 나타내어 줄 수 있다. (결국 실시간 데이타 스트림 처리는 실시간으로 들어오는 데이타 에 대해서 중첩으로 여러개의 함수 처리를 순차적으로 하는 것이다.)

물론 이러한 작업들은 데이타를 모두 모아서 수집해놓고 데이타 베이스에 저장해놓고 주기적으로 배치르 통해서 분석하는 것도 가능하다. 그러나 선거나 마케팅과 같이 실시간 대응이 중요한 경우에는 실시간 데이타 분석이 중요하기 때문에, 데이타 스트림을 실시간으로 분석하는 것이 필요하다.


이벤트 감지를 통한 신용 카드 이상 거래 감지

이번에는 데이타 스트림 처리에 이벤트 처리라는 개념을 추가해보자.

신용 카드 결재시, 결재 내용을 저장하는 것 뿐만 아니라, 사용자의 결재 내역이 문제가 없는지를 검출하는 시나리오이다.

이상거래 검출 요건은, 사용자가 평상시 결재 액보다 많은 금액이 결재되거나, 사용자가 결재할때, 물리적으로 이상한 곳에서 결재가 이루어 졌을때, 여를 들어 서울에서 카드 결재 후에, 10분 후에 부산에서 같은 카드로 결재가 이루어진 경우는 이상 거래로 검출하는 시스템이다.

다음 플로우를 보자



<그림. 실시간 신용 카드 이상 거래 검출>

신용 카드 결재 정보 데이타 스트림이 들어오면 결재 내용을 저장하는 스트림과 이상 거래를 검출하는 스트림 두가지로 분리되서 스트림이 처리된다.

이상 거래 검출 스트림에서는, 사용자의 구매 패턴을 분석하고 계속해서 학습한 후에, 이를 통해서 사용자 구매 패턴 검증단계에서 구매 금액이 일상적인 구매 패턴인지를 판단한다.

판단의 기준은 머신 러닝을 이용하여, 해당 사용자의 구매 패턴을 알고리즘화 해놓을 수 있다.

다음으로 결재 위치를 저장한 다음에 사용자 결재 위치 검증단계에서 지난 결재 가맹점 정보를 기반으로 이번 결재의 가맹점의 위치가 시간상으로 이동이 가능한 곳인지를 판단한다.

서울 강남에서 결재 한 후에, 30분후 서울 잠실에서 결재한 것은 정상 거래로 보지만 서울에서 결재했다가 10분후에 부산에서 결재 하는 것등은 이상거래로 취급한다.

이러한 시나리오는 결재가 이루지는 동시에 같이 실행되어야 하기 때문에 데이타 베이스 기반의 배치 처리로는 불가능하면 실시간 데이타 스트림을 분석해야 한다.

이외에도 시스템의 로그 스트림을 이용한 분석이라던지, 이벤트 처리, 데이타 가공, 머신러닝등 아주 다양한 부분의 실시간 처리에 데이타 스트리밍 처리는 사용될 수 있다.


데이타 스트림 처리를 이루는 기술들

앞의 신용카드 이상 거래 검출 시나리오를 보면 데이타 스트림을 분석하는데, 몇가지 추가적인 기술일 사용되었음을 볼 수 있다.


스트리밍 처리

먼저 데이타 스트림을 처리하기 위한 스트리밍 시스템이 필요하다, 데이타 스트림을 여러가지 경로로 분산하여, 각각의 단계별로 처리(사용자 구매 패턴 분석, 구매 패턴 검증,…)하는 워크 플로우 기능이 필요하다.

이러한 프레임웍은 앞으로 살펴보고자 하는 Apache Storm이 대표적이고 근래에는 Apache Spark도 많이 사용되고 있다.


대용량 분산 큐

다음으로 대용량으로 여러 경로를 통해서 들어오는 데이타를 수집하기 위한 터널(수집용 깔때기)이 필요한데, 비동기 처리를 위한 큐가 적절하다 그중에서도, 많은 데이타를 동시에 처리하기 위한 대용량 지원성이 필수적인데, 이를 위해서는 Kafka와 같은 대용량 분산 큐 솔루션이 적절하다.


머신러닝

사용자의 거래 패턴을 분석하여, 이상거래인지 검출을 하려면, 컴퓨터에서 사용자의 거래 패턴을 알려줄 필요가 있는데 이는 기존의 사용자 거래 내역을 학습하여 하여 패턴을 분석해내는 머신러닝 기술이 필요하며, 이는 Apache Mahout, Microsoft Azure ML (Machine Learning), Spark ML등이 있다.


이벤트 처리

마지막으로 이벤트 처리 부분이 있는데

카드 결재 장소가 지난 장소에 비해서 시간*20km 이상이면 이상거래로 판단해라”. 라는 이벤트를 정의할 수 있다. (※ 한시간에 20km를 이동할 수 있다고 가정)

특정 조건에 대해서 이벤트를 발생 시키는 것을 이벤트 처리라고 이런 처리를 CEP (Complex Event Processing) 라고 부르고, 이를 구현한 아키텍쳐를 EDA (Event Driven Architecture)라고 한다.

이러한 이벤트 처리 프레임웍으로는 JBoss Drool이나, Esper와 같은 프레임웍이 있다.


이외에도 데이타 스트림 분석을 위해서는 시나리오에 따라 더 많은 기술이 사용된다. 추가 사용되는 기술은 기회가 되면 향후에 추가로 소개하도록 한다. (아직도 공부 中)

지금까지 간략하게 나마 데이타 스트림의 개념과 이를 처리 하는 방법에 대해서 알아보았다.

그러면 다음에는 이런 데이타 스트림을 처리하기 위한 대표적인 프레임웍중의 하나인 apache storm에 대해서 소개해도록 한다.