블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 

'빅데이타'에 해당되는 글 43

  1. 2017.01.09 딥러닝을 이용한 숫자 이미지 인식 #1/2-학습 (4)
  2. 2016.11.15 파이어베이스를 이용한 유니티 게임 로그 분석 (2)
  3. 2016.10.05 수학포기자를 위한 딥러닝-#3 텐서플로우로 선형회귀 학습을 구현해보자 (2)
  4. 2016.10.04 수학포기자를 위한 딥러닝-#1 머신러닝과 딥러닝 개요 (4)
  5. 2016.09.20 노트7의 소셜 반응을 분석해 보았다. - #2 구현하기
  6. 2016.09.20 노트7의 소셜 반응을 분석해 보았다. (3)
  7. 2016.09.09 트위터 피드 실시간 분석 시스템 디자인
  8. 2016.09.01 파이어베이스 애널러틱스를 이용한 모바일 데이타 분석- #3 빅쿼리에 연동하여 모든 데이타를 분석하기
  9. 2016.08.30 파이어베이스 애널러틱스를 이용한 모바일 데이타 분석 #2-분석 지표 이해하기 (1)
  10. 2016.08.29 파이어베이스 애널러틱스를 이용한 모바일 데이타 분석 #1-Hello Firebase (2)
  11. 2016.08.25 실시간 데이타 분석 플랫폼 Dataflow - #5 데이타 플로우 프로그래밍 모델 (1)
  12. 2016.08.09 실시간 데이타 분석 플랫폼 Dataflow - #4 개발환경 설정하기
  13. 2016.08.01 빅쿼리를 이용하여 두시간만에 트위터 실시간 데이타를 분석하는 대쉬보드 만들기 (3)
  14. 2016.07.31 빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개
  15. 2016.07.22 데이타 스트리밍 분석 플랫폼 DataFlow - #2 개념 소개 (2/2) (1)
  16. 2016.07.17 데이타 스트리밍 분석 플랫폼 DataFlow - #2 개념 소개 (1/2)
  17. 2016.07.17 데이타 스트리밍 분석 플랫폼 dataflow - #1. 소개
  18. 2016.07.04 실시간 빅데이타 처리를 위한 스트리밍 처리의 개념 (1)
  19. 2016.06.18 빅쿼리-#3 데이타 구조와 접근(공유) (3)
  20. 2016.06.16 구글 빅데이타 플랫폼 빅쿼리 아키텍쳐 소개
 

딥러닝을 이용한 숫자 이미지 인식 #1/2


조대협 (http://bcho.tistory.com)


지난 글(http://bcho.tistory.com/1154 ) 을 통해서 소프트맥스 회귀를 통해서, 숫자를 인식하는 모델을 만들어서 학습 시켜 봤다.

이번글에서는 소프트맥스보다 정확성이 높은 컨볼루셔널 네트워크를 이용해서 숫자 이미지를 인식하는 모델을 만들어 보겠다.


이 글의 목적은 CNN 자체의 설명이나, 수학적 이론에 대한 이해가 목적이 아니다. 최소한의 수학적 지식만 가지고, CNN 네트워크 모델을 텐서플로우로 구현하는데에 그 목적을 둔다. CNN을 이해하기 위해서는 Softmax 등의 함수를 이해하는게 좋기 때문에 가급적이면 http://bcho.tistory.com/1154 예제를 먼저 보고 이 문서를 보는게 좋다. 그 다음에 CNN 모델에 대한 개념적인 이해를 위해서 http://bcho.tistory.com/1149  문서를 참고하고 이 문서를 보는 것이 좋다.


이번 글은 CNN을 적용하는 것 이외에, 다음과 같은 몇가지 팁을 추가로 소개한다.

  • 학습이 된 모델을 저장하고 다시 로딩 하는 방법

  • 학습된 모델을 이용하여 실제로 주피터 노트북에서 글씨를 써보고 인식하는 방법

MNIST CNN 모델


우리가 만들고자 하는 모델은 두개의 컨볼루셔널 레이어(Convolutional layer)과, 마지막에 풀리 커넥티드 레이어 (fully connected layer)을 가지고 있는 컨볼루셔널 네트워크 모델(CNN) 이다.

모델의 모양을 그려보면 다음과 같다.


입력 데이타

입력으로 사용되는 데이타는 앞의 소프트맥스 예제에서 사용한 데이타와 동일한 손으로 쓴 숫자들이다. 각 숫자 이미지는 28x28 픽셀로 되어 있고, 흑백이미지이기 때문에 데이타는 28x28x1 행렬이 된다. (만약에 칼라 RGB라면 28x28x3이 된다.)

컨볼루셔널 계층

총 두 개의 컨볼루셔널 계층을 사용했으며, 각 계층에서 컨볼루셔널 필터를 사용해서, 특징을 추출한다음에, 액티베이션 함수 (Activation function)으로, ReLu를 적용한 후, 맥스풀링 (Max Pooling)을 이용하여, 주요 특징을 정리해낸다.

이와 같은 컨볼루셔널 필터를 두개를 중첩하여 적용하였다.

마지막 풀리 커넥티드 계층

컨볼루셔널 필터를 통해서 추출된 특징은 풀리 커넥티드 레이어(Fully connected layer)에 의해서 분류 되는데, 풀리 커넥티드 레이어는 하나의 뉴럴 네트워크를 사용하고, 그 뒤에 드롭아웃 (Dropout) 계층을 넣어서, 오버피팅(Overfitting)이 발생하는 것을 방지한다.  마지막으로 소프트맥스 (Softmax) 함수를 이용하여 0~9 열개의 숫자로 분류를 한다.


학습(트레이닝) 코드

이를 구현하기 위한 코드는 다음과 같다.


코드

import tensorflow as tf

import numpy as np

import matplotlib.pyplot as plt

from tensorflow.examples.tutorials.mnist import input_data



tf.reset_default_graph()


np.random.seed(20160704)

tf.set_random_seed(20160704)


# load data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)


# define first layer

num_filters1 = 32


x = tf.placeholder(tf.float32, [None, 784])

x_image = tf.reshape(x, [-1,28,28,1])


W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],

                                         stddev=0.1))

h_conv1 = tf.nn.conv2d(x_image, W_conv1,

                      strides=[1,1,1,1], padding='SAME')


b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))

h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)


h_pool1 = tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],

                        strides=[1,2,2,1], padding='SAME')


# define second layer

num_filters2 = 64


W_conv2 = tf.Variable(

           tf.truncated_normal([5,5,num_filters1,num_filters2],

                               stddev=0.1))

h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,

                      strides=[1,1,1,1], padding='SAME')


b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))

h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)


h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],

                        strides=[1,2,2,1], padding='SAME')


# define fully connected layer

h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])


num_units1 = 7*7*num_filters2

num_units2 = 1024


w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))

b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))

hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)


keep_prob = tf.placeholder(tf.float32)

hidden2_drop = tf.nn.dropout(hidden2, keep_prob)


w0 = tf.Variable(tf.zeros([num_units2, 10]))

b0 = tf.Variable(tf.zeros([10]))

k = tf.matmul(hidden2_drop, w0) + b0

p = tf.nn.softmax(k)


#define loss (cost) function

t = tf.placeholder(tf.float32, [None, 10])

loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(k,t))

train_step = tf.train.AdamOptimizer(0.0001).minimize(loss)

correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))


# prepare session

sess = tf.InteractiveSession()

sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()


# start training

i = 0

for _ in range(1000):

   i += 1

   batch_xs, batch_ts = mnist.train.next_batch(50)

   sess.run(train_step,

            feed_dict={x:batch_xs, t:batch_ts, keep_prob:0.5})

   if i % 500 == 0:

       loss_vals, acc_vals = [], []

       for c in range(4):

           start = len(mnist.test.labels) / 4 * c

           end = len(mnist.test.labels) / 4 * (c+1)

           loss_val, acc_val = sess.run([loss, accuracy],

               feed_dict={x:mnist.test.images[start:end],

                          t:mnist.test.labels[start:end],

                          keep_prob:1.0})

           loss_vals.append(loss_val)

           acc_vals.append(acc_val)

       loss_val = np.sum(loss_vals)

       acc_val = np.mean(acc_vals)

       print ('Step: %d, Loss: %f, Accuracy: %f'

              % (i, loss_val, acc_val))


saver.save(sess, 'cnn_session')

sess.close()



데이타 로딩 파트

그러면 코드를 하나씩 살펴보도록 하자.

맨 처음 블럭은 데이타를 로딩하고 각종 변수를 초기화 하는 부분이다.

import tensorflow as tf

import numpy as np

import matplotlib.pyplot as plt

from tensorflow.examples.tutorials.mnist import input_data


#Call tf.reset_default_graph() before you build your model (and the Saver). This will ensure that the variables get the names you intended, but it will invalidate previously-created graphs.


tf.reset_default_graph()


np.random.seed(20160704)

tf.set_random_seed(20160704)


# load data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)


Input_data 는 텐서플로우에 내장되어 있는 MNIST (손으로 쓴 숫자 데이타)셋으로, read_data_sets 메서드를 이요하여 데이타를 읽었다. 데이타 로딩 부분은 앞의 소프트맥스 MNIST와 같으니 참고하기 바란다.


여기서 특히 주목해야 할 부분은 tf.reset_default_graph()  인데, 주피터 노트북과 같은 환경에서 실행을 하게 되면, 주피터 커널을 리스타트하지 않는 이상 변수들의 컨택스트가 그대로 유지 되기 때문에, 위의 코드를 같은 커널에서 tf.reset_default_graph() 없이, 두 번 이상 실행하게 되면 에러가 난다. 그 이유는 텐서플로우 그래프를 만들어놓고, 그 그래프가 지워지지 않은 상태에서 다시 같은 그래프를 생성하면서 나오는 에러인데, tf.reset_default_graph() 메서드는 기존에 생성된 디폴트 그래프를 모두 삭제해서 그래프가 중복되는 것을 막아준다. 일반적인 파이썬 코드에서는 크게 문제가 없지만, 컨택스트가 계속 유지되는 주피터 노트북 같은 경우에는 발생할 수 있는 문제이니, 반드시 디폴트 그래프를 리셋해주도록 하자

첫번째 컨볼루셔널 계층

필터의 정의

다음은 첫번째 컨볼루셔널 계층을 정의 한다. 컨볼루셔널 계층을 이해하려면 컨볼루셔널 필터에 대한 개념을 이해해야 하는데, 다시 한번 되짚어 보자.

컨볼루셔널 계층에서 하는 일은 입력 데이타에 필터를 적용하여, 특징을 추출해 낸다.


이 예제에서 입력 받는 이미지 데이타는  28x28x1 행렬로 표현된 흑백 숫자 이미지이고, 예제 코드에서는 5x5x1 사이즈의 필터를 적용한다.

5x5x1 사이즈의 필터 32개를 적용하여, 총 32개의 특징을 추출할것이다.


코드

필터 정의 부분까지 코드로 살펴보면 다음과 같다.

# define first layer

num_filters1 = 32


x = tf.placeholder(tf.float32, [None, 784])

x_image = tf.reshape(x, [-1,28,28,1])


W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],


x는 입력되는 이미지 데이타로, 2차원 행렬(28x28)이 아니라, 1차원 벡터(784)로 되어 있고, 데이타의 수는 무제한으로 정의하지 않았다. 그래서 placeholder정의에서 shape이 [None,784] 로 정의 되어 있다.  

예제에서는 연산을 편하게 하기 위해서 2차원 행렬을 사용할것이기 때문에, 784 1차원 벡터를 28x28x1 행렬로 변환을 해준다.

x_image는 784x무한개인 이미지 데이타 x를 , (28x28x1)이미지의 무한개 행렬로  reshape를 이용하여 변경하였다. [-1,28,28,1]은 28x28x1 행렬을 무한개(-1)로 정의하였다.


필터를 정의하는데, 필터는 앞서 설명한것과 같이 5x5x1 필터를 사용할것이고, 필터의 수는 32개이기 때문에, 필터 W_conv1의 차원(shape)은 [5,5,1,32] 가된다. (코드에서 32는 num_filters1 이라는 변수에 저장하여 사용하였다.) 그리고 W_conv1의 초기값은 [5,5,1,32] 차원을 가지는 난수를 생성하도록 tf.truncated_normal을 사용해서 임의의 수가 지정되도록 하였다.

필터 적용

필터를 정의했으면 필터를 입력 데이타(이미지)에 적용한다.


h_conv1 = tf.nn.conv2d(x_image, W_conv1,

                      strides=[1,1,1,1], padding='SAME')


필터를 적용하는 방법은 tf.nn.conv2d를 이용하면 되는데, 28x28x1 사이즈의 입력 데이타인 x_image에 앞에서 정의한 필터 W_conv1을 적용하였다.

스트라이드 (Strides)

필터는 이미지의 좌측 상단 부터 아래 그림과 같이 일정한 간격으로 이동하면서 적용된다.


이를 개념적으로 표현하면 다음과 같은 모양이 된다.


이렇게 필터를 움직이는 간격을 스트라이드 (Stride)라고 한다.

예제에서는 우측으로 한칸 그리고 끝까지 이동하면 아래로 한칸을 이동하도록 각각 가로와 세로의 스트라이드 값을 1로 세팅하였다.

코드에서 보면

h_conv1 = tf.nn.conv2d(x_image, W_conv1,

                      strides=[1,1,1,1], padding='SAME')

에서 strides=[1,1,1,1] 로 정의한것을 볼 수 있다. 맨앞과 맨뒤는 통상적으로 1을 쓰고, 두번째 1은 가로 스트라이드 값, 그리고 세번째 1은 세로 스트라이드 값이 된다.

패딩 (Padding)

위의 그림과 같이 필터를 적용하여 추출된 특징 행렬은 원래 입력된 이미지 보다 작게 된다.

연속해서 필터를 이런 방식으로 적용하다 보면 필터링 된 특징들이  작아지게되는데, 만약에 특징을  다 추출하기 전에 특징들이 의도하지 않게 유실되는 것을 막기 위해서 패딩이라는 것을 사용한다.


패딩이란, 입력된 데이타 행렬 주위로, 무의미한 값을 감싸서 원본 데이타의 크기를 크게 해서, 필터를 거치고 나온 특징 행렬의 크기가 작아지는 것을 방지한다.

또한 무의미한 값을 넣음으로써, 오버피팅이 발생하는 것을 방지할 수 있다. 코드상에서 padding 변수를 이용하여 패딩 방법을 정의하였다.


h_conv1 = tf.nn.conv2d(x_image, W_conv1,

                      strides=[1,1,1,1], padding='SAME')



padding=’SAME’을 주게 되면, 텐서플로우가 자동으로 패딩을 삽입하여 입력값과 출력값 (특징 행렬)의 크기가 같도록 한다. padding=’VALID’를 주게 되면, 패딩을 적용하지 않고 필터를 적용하여 출력값 (특징 행렬)의 크기가 작아진다.

활성함수 (Activation function)의 적용

필터 적용이 끝났으면, 이 필터링된 값에 활성함수를 적용한다. 컨볼루셔널 네트워크에서 일반적으로 사용하는 활성함수는 ReLu 함수이다.


코드

b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))

h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)


먼저 bias 값( y=WX+b 에서 b)인 b_conv1을 정의하고, tf.nn.relu를 이용하여, 필터된 결과(h_conv1)에 bias 값을 더한 값을 ReLu 함수로 적용하였다.

Max Pooling

추출된 특징 모두를 가지고 특징을 판단할 필요가 없이, 일부 특징만을 가지고도 특징을 판단할 수 있다. 즉 예를 들어서 고해상도의 큰 사진을 가지고도 어떤 물체를 식별할 수 있지만, 작은 사진을 가지고도 물체를 식별할 수 있다. 이렇게 특징의 수를 줄이는 방법을 서브샘플링 (sub sampling)이라고 하는데, 서브샘플링을 해서 전체 특징의 수를 의도적으로 줄이는 이유는 데이타의 크기를 줄이기 때문에, 컴퓨팅 파워를 절약할 수 있고, 데이타가 줄어드는 과정에서 데이타가 유실이 되기 때문에, 오버 피팅을 방지할 수 있다.


이러한 서브 샘플링에는 여러가지 방법이 있지만 예제에서는 맥스 풀링 (max pooling)이라는 방법을 사용했는데, 맥스 풀링은 풀링 사이즈 (mxn)로 입력데이타를 나눈후 그 중에서 가장 큰 값만을 대표값으로 추출하는 것이다.


아래 그림을 보면 원본 데이타에서 2x2 사이즈로 맥스 풀링을 해서 결과를 각 셀별로 최대값을 뽑아내었고, 이 셀을 가로 2칸씩 그리고 그다음에는 세로로 2칸씩 이동하는 stride 값을 적용하였다.


코드

h_pool1 = tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],

                        strides=[1,2,2,1], padding='SAME')


Max pooling은 tf.nn.max_pool이라는 함수를 이용해서 적용할 수 있는데, 첫번째 인자는 활성화 함수 ReLu를 적용하고 나온 결과 값인 h_conv1_cutoff 이고, 두 번째 인자인 ksize는 풀링 필터의 사이즈로 [1,2,2,1]은 2x2 크기로 묶어서 풀링을 한다는 의미이다.


다음 stride는 컨볼루셔널 필터 적용과 마찬가지로 풀링 필터를 가로와 세로로 얼마만큼씩 움직일 것인데, strides=[1,2,2,1]로, 가로로 2칸, 세로로 2칸씩 움직이도록 정의하였다.


행렬의 차원 변환

텐서플로우를 이용해서 CNN을 만들때 각각 개별의 알고리즘을 이해할 필요는 없지만 각 계층을 추가하거나 연결하기 위해서는 행렬의 차원이 어떻게 바뀌는지는 이해해야 한다.

다음 그림을 보자


첫번째 컨볼루셔널 계층은 위의 그림과 같이, 처음에 28x28x1 의 이미지가 들어가면 32개의 컨볼루셔널 필터 W를 적용하게 되고, 각각은 28x28x1의 결과 행렬을 만들어낸다. 컨볼루셔널 필터를 거치게 되면 결과 행렬의 크기는 작아져야 정상이지만, 결과 행렬의 크기를 입력 행렬의 크기와 동일하게 유지하도록 padding=’SAME’으로 설정하였다.

다음으로 bias 값 b를 더한후 (위의 그림에는 생략하였다) 에 이 값에 액티베이션 함수 ReLu를 적용하고 나면 행렬 크기에 변화 없이 28x28x1 행렬 32개가 나온다. 이 각각의 행렬에 size가 2x2이고, stride가 2인 맥스풀링 필터를 적용하게 되면 각각의 행렬의 크기가 반으로 줄어들어 14x14x1 행렬 32개가 리턴된다.


두번째 컨볼루셔널 계층


이제 두번째 컨볼루셔널 계층을 살펴보자. 첫번째 컨볼루셔널 계층과 다를 것이 없다.


코드

# define second layer

num_filters2 = 64


W_conv2 = tf.Variable(

           tf.truncated_normal([5,5,num_filters1,num_filters2],

                               stddev=0.1))

h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,

                      strides=[1,1,1,1], padding='SAME')


b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))

h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)


h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],

                        strides=[1,2,2,1], padding='SAME')


단 필터값인 W_conv2의 차원이 [5,5,32,64] ([5,5,num_filters1,num_filters2] 부분 )로 변경되었다.


W_conv2 = tf.Variable(

           tf.truncated_normal([5,5,num_filters1,num_filters2],

                               stddev=0.1))


필터의 사이즈가 5x5이고, 입력되는 값이 32개이기 때문에, 32가 들어가고, 총 64개의 필터를 적용하기 때문에 마지막 부분이 64가 된다.

첫번째 필터와 똑같이 stride를 1,1을 줘서 가로,세로로 각각 1씩 움직이고, padding=’SAME’으로 입력과 출력 사이즈를 같게 하였다.


h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],

                        strides=[1,2,2,1], padding='SAME')


맥스풀링 역시 첫번째 필터와 마찬가지로 2,2 사이즈의 필터(ksize=[1,2,2,1]) 를 적용하고 stride값을 2,2로 줘서 (strides=[1,2,2,1]) 가로 세로로 두칸씩 움직이게 하여 결과의 크기가 반으로 줄어들게 하였다.


14x14 크기의 입력값 32개가 들어가서, 7x7 크기의 행렬 64개가 리턴된다.

풀리 커넥티드 계층

두개의 컨볼루셔널 계층을 통해서 특징을 뽑아냈으면, 이 특징을 가지고 입력된 이미지가 0~9 중 어느 숫자인지를 풀리 커넥티드 계층 (Fully connected layer)를 통해서 판단한다.


코드

# define fully connected layer

h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])


num_units1 = 7*7*num_filters2

num_units2 = 1024


w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))

b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))

hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)


keep_prob = tf.placeholder(tf.float32)

hidden2_drop = tf.nn.dropout(hidden2, keep_prob)


w0 = tf.Variable(tf.zeros([num_units2, 10]))

b0 = tf.Variable(tf.zeros([10]))

k = tf.matmul(hidden2_drop, w0) + b0

p = tf.nn.softmax(k)


입력된 64개의 7x7 행렬을 1차원 행렬로 변환한다.


h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])


다음으로 풀리 커넥티드 레이어에 넣는데, 이때 입력값은 64x7x7 개의 벡터 값을 1024개의 뉴런을 이용하여 학습한다.


w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))

b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))


그래서 w2의 값은 [num_units1,num_units2]로 num_units1은 64x7x7 로 입력값의 수를, num_unit2는 뉴런의 수를 나타낸다. 다음 아래와 같이 이 뉴런으로 계산을 한 후 액티베이션 함수 ReLu를 적용한다.


hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)


다음 레이어에서는 드롭 아웃을 정의하는데, 드롭 아웃은 오버피팅(과적합)을 막기 위한 계층으로, 원리는 다음 그림과 같이 몇몇 노드간의 연결을 끊어서 학습된 데이타가 도달하지 않도록 하여서 오버피팅이 발생하는 것을 방지하는 기법이다.


출처 : http://cs231n.github.io/neural-networks-2/


텐서 플로우에서 드롭 아웃을 적용하는 것은 매우 간단하다. 아래 코드와 같이 tf.nn.dropout 이라는 함수를 이용하여, 앞의 네트워크에서 전달된 값 (hidden2)를 넣고 keep_prob에, 연결 비율을 넣으면 된다.

keep_prob = tf.placeholder(tf.float32)

hidden2_drop = tf.nn.dropout(hidden2, keep_prob)


연결 비율이란 네트워크가 전체가 다 연결되어 있으면 1.0, 만약에 50%를 드롭아웃 시키면 0.5 식으로 입력한다.

드롭 아웃이 끝난후에는 결과를 가지고 소프트맥스 함수를 이용하여 10개의 카테고리로 분류한다.


w0 = tf.Variable(tf.zeros([num_units2, 10]))

b0 = tf.Variable(tf.zeros([10]))

k = tf.matmul(hidden2_drop, w0) + b0

p = tf.nn.softmax(k)

비용 함수 정의

여기까지 모델 정의가 끝났다. 이제 이 모델을 학습 시키기 위해서 비용함수(코스트 함수)를 정의해보자.

코스트 함수는 크로스엔트로피 함수를 이용한다.

#define loss (cost) function

t = tf.placeholder(tf.float32, [None, 10])

loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(k,t))

train_step = tf.train.AdamOptimizer(0.0001).minimize(loss)


k는 앞의 모델에 의해서 앞의 모델에서

k = tf.matmul(hidden2_drop, w0) + b0

p = tf.nn.softmax(k)


으로 softmax를 적용하기 전의 값이다.  Tf.nn.softmax_cross_entropy_with_logits 는 softmax가 포함되어 있는 함수이기 때문에, p를 적용하게 되면 softmax 함수가 중첩 적용되기 때문에, softmax 적용전의 값인 k 를 넣었다.


WARNING: This op expects unscaled logits, since it performs a softmax on logits internally for efficiency. Do not call this op with the output of softmax, as it will produce incorrect results

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/api_docs/python/functions_and_classes/shard7/tf.nn.softmax_cross_entropy_with_logits.md


t는 플레이스 홀더로 정의하였는데, 나중에 학습 데이타 셋에서 읽을 라벨 (그 그림이 0..9 중 어느 숫자인지)이다.


그리고 이 비용 함수를 최적화 하기 위해서 최적화 함수 AdamOptimizer를 사용하였다.

(앞의 소프트맥스 예제에서는 GradientOptimizer를 사용하였는데, 일반적으로 AdamOptimizer가 좀 더 무난하다.)

학습

이제 모델 정의와, 모델의 비용함수와 최적화 함수까지 다 정의하였다. 그러면 이 그래프들을 데이타를 넣어서 학습 시켜보자.  학습은 배치 트레이닝을 이용할것이다.


학습 도중 학습의 진행상황을 보기 위해서 학습된 모델을 중간중간 테스트할것이다. 테스트할때마다 학습의 정확도를 측정하여 출력하는데, 이를 위해서 정확도를 계산하는 함수를 아래와 같이 정의한다.


#define validation function

correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))


correct_prediction은 학습 결과와 입력된 라벨(정답)을 비교하여 맞았는지 틀렸는지를 리턴한다.

argmax는 인자에서 가장 큰 값의 인덱스를 리턴하는데, 0~9 배열이 들어가 있기 때문에 가장 큰 값이 학습에 의해 예측된 숫자이다. p는 예측에 의한 결과 값이고, t는 라벨 값이다 이 두 값을 비교하여 가장 큰 값이 있는 인덱스가 일치하면 예측이 성공한것이다.

correct_pediction은 bool 값이기 때문에, 이 값을 숫자로 바꾸기 위해서 tf.reduce_mean을 사용하여, accuracy에 저장하였다.


이제 학습을 세션을 시작하고, 변수들을 초기화 한다.

# prepare session

sess = tf.InteractiveSession()

sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()


다음 배치 학습을 시작한다.

# start training

i = 0

for _ in range(10000):

   i += 1

   batch_xs, batch_ts = mnist.train.next_batch(50)

   sess.run(train_step,

            feed_dict={x:batch_xs, t:batch_ts, keep_prob:0.5})

   if i % 500 == 0:

       loss_vals, acc_vals = [], []

       for c in range(4):

           start = len(mnist.test.labels) / 4 * c

           end = len(mnist.test.labels) / 4 * (c+1)

           loss_val, acc_val = sess.run([loss, accuracy],

               feed_dict={x:mnist.test.images[start:end],

                          t:mnist.test.labels[start:end],

                          keep_prob:1.0})

           loss_vals.append(loss_val)

           acc_vals.append(acc_val)

       loss_val = np.sum(loss_vals)

       acc_val = np.mean(acc_vals)

       print ('Step: %d, Loss: %f, Accuracy: %f'

              % (i, loss_val, acc_val))


학습은 10,000번 루프를 돌면서 한번에 50개씩 배치로 데이타를 읽어서 학습을 진행하고, 500 번째 마다 중각 학습 결과를 출력한다. 중간 학습 결과에서는 10,000 중 몇번째 학습인지와, 비용값 그리고 정확도를 출력해준다.


코드를 보자


   batch_xs, batch_ts = mnist.train.next_batch(50)


MNIST 학습용 데이타 셋에서 50개 단위로 데이타를 읽는다. batch_xs에는 학습에 사용할 28x28x1 사이즈의 이미지와, batch_ts에는 그 이미지에 대한 라벨 (0..9중 어떤 수인지) 가 들어 있다.

읽은 데이타를 feed_dict를 통해서 피딩(입력)하고 트레이닝 세션을 시작한다.


  sess.run(train_step,

            feed_dict={x:batch_xs, t:batch_ts, keep_prob:0.5})


이때 마지막 인자에 keep_prob를 0.5로 피딩하는 것을 볼 수 있는데, keep_prob는 앞의 드롭아웃 계층에서 정의한 변수로 드롭아웃을 거치지 않을 비율을 정의한다. 여기서는 0.5 즉 50%의 네트워크를 인위적으로 끊도록 하였다.


배치로 학습을 진행하다가 500번 마다 중간중간 정확도와 학습 비용을 계산하여 출력한다.

   if i % 500 == 0:

       loss_vals, acc_vals = [], []


여기서 주목할 점은 아래 코드 처럼 한번에 검증을 하지 않고 테스트 데이타를 4등분 한후, 1/4씩 테스트 데이타를 로딩해서 학습비용(loss)와 학습 정확도(accuracy)를 계산하는 것을 볼 수 있다.


       for c in range(4):

           start = len(mnist.test.labels) / 4 * c

           end = len(mnist.test.labels) / 4 * (c+1)

           loss_val, acc_val = sess.run([loss, accuracy],

               feed_dict={x:mnist.test.images[start:end],

                          t:mnist.test.labels[start:end],

                          keep_prob:1.0})

           loss_vals.append(loss_val)

           acc_vals.append(acc_val)


이유는 한꺼번에 많은 데이타를 로딩해서 검증을 할 경우 메모리 문제가 생길 수 있기 때문에, 4번에 나눠 걸쳐서 읽고 검증한 다음에 아래와 같이 학습 비용은 4번의 학습 비용을 합하고, 정확도는 4번의 학습 정확도를 평균으로 내어 출력하였다.


       loss_val = np.sum(loss_vals)

       acc_val = np.mean(acc_vals)

       print ('Step: %d, Loss: %f, Accuracy: %f'

              % (i, loss_val, acc_val))

학습 결과 저장

학습을 통해서 최적의 W와 b값을 구했으면 이 값을 예측에 이용해야 하는데, W 값들이 많고, 이를 일일이 출력해서 파일로 저장하는 것도 번거롭고 해서, 텐서플로우에서는 학습된 모델을 저장할 수 있는 기능을 제공한다. 학습을 통해서 계산된 모든 변수 값을 저장할 수 있는데,  앞에서 세션을 생성할때 생성한 Saver (saver = tf.train.Saver())를 이용하면 현재 학습 세션을  저장할 수 있다.


코드

saver.save(sess, 'cnn_session')

sess.close()


이렇게 하면 현재 디렉토리에 cnn_session* 형태의 파일로 학습된 세션 값들이 저장된다.

그래서 추후 예측을 할때 다시 학습할 필요 없이 이 파일을 로딩해서, 모델의 값들을 복귀한 후에, 예측을 할 수 있다. 이 파일을 읽어서 예측을 하는 것은 다음글에서 다루기로 한다.


예제 코드 : https://github.com/bwcho75/tensorflowML/blob/master/MNIST_CNN_Training.ipynb


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

파이어베이스를 이용한 유니티 게임 로그 분석


조대협 (http://bcho.tistory.com)

모바일 로그 분석

일반적으로 모바일 로그 분석은 클라우드 기반의 무료 솔루션을 이용하다가 자체 구축으로 가는 경우가 많다.

클라우드 기반의 무료 로그 분석 솔루션으로는 구글 애널러틱스, 야후의 플러리, 트위터의 패브릭 그리고 구글의 파이어베이스 등이 있다.

이런 무료 로그 분석 솔루션들을 사용이 매우 간편하고, 핵심 지표를 쉽게 뽑아 줄 수 있으며, 별도의 운영이 필요 없다는 장점을 가지고 있다.

그러나 이런 클라우드 기반의 무료 솔루션의 경우에는 요약된 정보들만 볼 수 있고 또한 내가 원하는 지표를 마음대로 지정을 할 수 없기 때문에, 어느정도 서비스가 성장하고 팀의 여력이 되면 별도의 로그 수집 및 분석 솔루션을 만드는 것이 일반적이다.

오픈 소스 기반의 분석 솔루션

오픈 소스를 조합해서 모바일 로그 수집 시스템을 만들면 대략 다음과 같은 모양이 된다.


API 서버에서 로그를 수집해서 카프카등의 큐를 통해서 로그를 모으고, 실시간은 스파크 스트리밍, 배치는 하둡이나 스파크 스트리밍 프레임웍을 이용합니다. 대쉬 보드는 만드는 곳도 있지만, 주피터 노트북이나 제플린 노트북과 같은 노트북을 이용한다.

요즘은 데이타 저장 및 분석에 ELK (Elastic Search + Logstash + Kibana)와 같은 솔루션도 많이 사용하고 있다.


그런데 이런 오픈 소스 솔루션 기반으로 로그 분석 시스템을 개발하면 몇가지 문제가 발생한다.

  • 개발에 드는 노력
    이런 오픈소스 스택으로 시스템을 개발하려면, 이 프레임웍에 대해서 잘 아는 전문가가 필요합다. 일반적인 스타트업에서는 구하기도 힘들고, 기업이 어느정도 규모가 되더라도 빅데이타 관련 기술을 다룰 줄 아는 엔지니어는 여전히 귀한 엔지니어이고, 이런 엔지니어들이 있다하더라도, 시스템 설계및 구현에는 수개월의 기간이 소요 되게 된다.

  • 시스템 구매와 운영
    다음 문제는 모바일 데이타는 양이 많기 때문에, 위에서 언급한 빅데이타 관련 오픈 소스를 사용하게 되는데, 이러한 시스템은 하드웨어 자원이 수십에서 수백대가 필요하거니와, 이를 설치하고 운영하는 것 역시 쉽지 않다.
    로그를 수집하고 분석하는 로직을 만들어야 하는 엔지니어들이 정작 데이타 분석 보다는 시스템 운영과 유지보수에 많은 시간을 낭비해야 한다는 문제가 발생한다.
    규모가 작은 스타트업이나 엔지니어링 능력이 되지 않는 기업들은 이런 빅데이타 분석은 엄두도 내지 못하는 상황이 되고, 디테일한 데이타 분석을 하지 못하게 되니 자연히 경쟁력이 떨어지게 될 수 있다.

  • 연산 시간
    그리고 수집 수백대의 서버를 가지고 있다하더라도, 데이타 연산 시간은 수십분에서 수시간이 소요된다. 특히 데이타 분석 서버들이 분석을 하고 있을때는 다른 분석을 하고 싶은 사람들은 연산이 끝날때 까지 기다려야 하고, 수시간을 들여서 연산한 결과라도 연산이 잘못되었으면 다시 로직을 수정해서 수시간 동안 다시 연산을 해야 한다.
    비지니스 조직 입장에서는 지표 분석 결과를 얻는데, 수시간이 걸리니 의사 결정의 민첩성이 떨어지게 된다.

클라우드 기반의 분석 솔루션

근래에 이런 빅데이타 분석이 클라우드 컴퓨팅 기술과 만나면서 한번의 큰 변화를 겪게 되는데, 흔히들 빅데이타의 민주화라고 이야기 한다.  빅데이타 분석이 클라우드 컴퓨팅과 만나면서 겪은 큰 변화는 다음과 같다 .

클라우드 스케일의 연산

먼저 스케일이 달라집니다. 클라우드의 대용량 자원을 이용하여, 연산을 하기 때문에, 훨씬 더 빠른 연산을 저 비용에 할 수 있다.

예를 들어 구글의 빅쿼리의 경우에는 1000억개의 문자열(ROW)를  Regular expression을 이용하여 스트링 Like 검색을 하고 이를 group by 로 그룹핑하여 연산 하는 쿼리를 수행할때


“8600개의 CPU, 3600개의 디스크, 350GB의 네트워크 대역폭"


이 사용이 되고, 쿼리 수행 시간은 약 20~30초, 클라우드 사용 비용은 20$ (2만원) 정도가 소요 된다.

오픈 소스 기반으로 왠만한 규모로는 동시에 단일 연산으로 이렇게 수천개의 CPU를 같이 돌릴 수 있는 인프라를 사내에 가지고 있기도 힘들뿐 더러, 이만한 리소스를 20$라는 저렴한 비용에 사용하기란 거의 불가능에 가깝다.

이런 빠른 연산으로 인해서, 현업에서는 연산 결과를 기다리지 않고 바로바로 볼 수 있고, 비용 역시 저렴하기 때문에, 어느정도 자금력과 개발력이 있는 기업이 아니더라도 고성능의 빅데이타 분석 시스템 구현이 가능하게 된다.

NoOPS

다음 장점으로는 운영이 필요 없다는 것인데, 앞에서도 설명했듯이, 오픈 소스를 이용해서 빅데이타 분석 시스템을 직접 구축한 경우에는 시스템 인스톨과, 구성, 그리고 운영에 많은 시간이 소요 되는데, 클라우드 기반의 빅데이타 솔루션은 설정과 운영을 클라우드 서비스 제공자가 대행을 하기 때문에, 엔지니어링 팀은 별도의 설정과 유지보수 없이 본연의 역할인 데이타 분석에만 집중할 수 있게 된다. (아마 직접 하둡이나 스파크 클러스터를 운영해본 사람이라면 이 의미를 잘 이해하리라 본다.)


이렇게 클라우드가 빅데이타 영역에 도입되면서 이제는 빅데이타 분석이 뛰어난 엔지니어링 지식과 자금력이 없더라도 단시간내에 저비용으로 효율적인 데이타 분석이 가능하게 되었기 때문에, 이를 빅데이타의 민주화라고 부른다.

파이어베이스 애널러틱스

파이어베이스는 얼마전에 구글이 인수해서 클라우드 서비스 형태로 제공하고 있는 통합 모바일 개발 프레임웍이다. 웹은 지원하지 않고 모바일만 지원하는 형태의 프레임웍이며, 리얼타임 데이타 베이스, 광고 네트워크 통합, 푸쉬 서비스, 사용자 개인 인증 서비스등 여러가지 기능을 가지고 있는데, 그 중에서, 파이어베이스 애널러틱스는 모바일 빅데이타 분석에 최적화된 시스템이다.

빅쿼리와 파이어베이스의 조합

게임 체인저

파이어베이스는 모바일 데이타 분석에서 거의 게임 체인저라고 할만한 기술인데, 기존의 클라우드 기반의 모바일 데이타 분석 솔루션은 가장 큰 문제점이, 개발자가 정의한 로그 이벤트 (커스텀 로그)를 수집할 수 없다는 문제와  그리고 수집한 원본 데이타를 볼 수 없기 때문에, 원하는 지표를 마음대로 수집하고 분석하는 것이 불가능했다.

그런데 파이어베이스 애널러틱스는 이 두가지 기능을 지원하기 시작하였다.

커스텀 이벤트 정의를 통해서 개발자가 원하는 로그를 손쉽게 정의해서 수집이 가능하고, 또한 수집한 로그는 모두 구글의 빅데이타 저장 및 분석 플랫폼인 빅쿼리에 저장되고 바로 분석이 가능하다.

빅쿼리

파이어베이스 애널러틱스의 데이타는 빅쿼리에 저장이 되는데, 앞에서 예를 든것과 같이, 빅쿼리는 한번 연산에 수천개의 CPU와 디스크를 사용하여, 하둡이나 스파크에서 수시간이 걸리는 연산을 불과 수십초만에 처리가 가능하다.

빅쿼리의 또 다른 장점중의 하나는 이런 연산 속도 뿐만 아니라 RDBMS와는 다르게 JSON과 같이 트리형 (계층 구조를 가지는) 데이타형을 그대로 저장하고 쿼리가 가능하다는 것이다.


빅쿼리에 대한 자세한 설명은

를 참고하기 바란다.

파이어베이스 기반의 로그 분석

파이어베이스 애널러틱스는 뒤로는 빅쿼리 연동을 통해서 모든 원본 데이타의 수집과 분석을 지원하고 앞으로는 파이어베이스 에이전트를 모바일 디바이스에 탑재 하는 방식으로 최소한의 코드 개발로 모바일 앱으로 부터 모든 데이타를 수집할 수 있다.  파이어베이스 애널러틱스는 안드로이드와 iOS 플랫폼을 지원한다.

게임 프레임웍 지원

반가운 소식중의 하나는 파이어베이스 애널러틱스가 이제 유니티3D나, 언리얼(C++) 과 같은 게임 엔진을 지원한다. 현재 두 플랫폼에 대한 지원은 베타로 공개되어 있다.

코드 예제

그러면 파이어베이스 애널러틱스를 이용해서 로그를 수집하는 코드는 어떻게 삽입을 할까? 안드로이드와 유니티 3D의 예를 들어서 보자.

안드로이드 예제 코드

상세한 코드는 http://bcho.tistory.com/1131 를 참고하기 바란다.

코드 부분을 발췌해서 보면 다음과 같다.


//생략

:


import com.google.firebase.analytics.FirebaseAnalytics;


public class MainActivity extends AppCompatActivity {


 // add firebase analytics object

 private FirebaseAnalytics mFirebaseAnalytics;


   public void onSendEvent(View view){

     // 중간 생략

     Bundle bundle = new Bundle();

     bundle.putString(FirebaseAnalytics.Param.ITEM_ID, contentsId);

     bundle.putString(FirebaseAnalytics.Param.ITEM_NAME, contentsName);

     bundle.putString(FirebaseAnalytics.Param.CONTENT_TYPE, contentsCategory);

     mFirebaseAnalytics.logEvent(FirebaseAnalytics.Event.SELECT_CONTENT, bundle);


 }

}



기본적으로 gradle 빌드 스크립트에 파이어베이스 애널러틱스 모듈을 import 하고, FirebaseAnalytics 객체만 선언해주면 기본적인 사용자 로그 (앱 실행, 종료등), 일일 방문자, 동시 접속자, 접속 디바이스 종류, 사용자 연령과 성별들을 모두 수집해준다.

빌드 스크립트 수정 및 소스코드에 한줄의 코드만 추가해주면 된다.

다음으로, 각각의 이벤트를 추가하고자 한다면, 위와 같이 Bundle 객체를 정의해서, 넘기고자 하는 인자를 정의해주고 logEvent라는 메서드를 호출해주면 파이어베이스로 로그가 전달된다.

유니티 3D 예제 코드

유니티 3D에서 파이어베이스에 로그를 남기는 것도 다르지 않다.

다음 코드를 보자


       Firebase.Analytics.Parameter[] param = {

           new Firebase.Analytics.Parameter("sessionid", sessionid),

           new Firebase.Analytics.Parameter("score", (string)ApplicationModel.score.ToString())

       };

       Firebase.Analytics.FirebaseAnalytics.LogEvent(ApplicationModel.EVENT.END_SESSION, param);


Parameter라는 배열로, 파이어베이스에 남길 로그의 인자들을 정의한후에, LogEvent 메서드를 이용하여 이벤트 명과, 앞에서 정의된 인자들 (Parameter)를 남겨주면 로그는 자동으로 파이어베이스로 전달된다.


파이어베이스 애널러틱스를 이용한 모바일 데이타 분석

그러면 파이어베이스를 이용하여 모바일 로그 분석을 어떻게 할 수 있는지 알아보자. 마침 유니티 3D가 얼마전 부터 베타로 지원이 되기 때문에, 간단한 게임을 이용한 로그 수집을 설명한다.

샘플 게임 설명

샘플에 사용한 게임은 간단한 RPG 형태의 게임으로 다음과 같이 구성된다.



시작 화면

시작화면에서는 로그 분석을 위해서, 사용자의 나이와 성별을 입력 받는다.


게임 화면

다음 게임이 시작되면, 화면을 터치하여 토끼 캐릭터를 이동 시키고, 돼지를 클릭하면 돼지를 공격한다.

돼지를 공격할때 마다 데미지는 돼지의 종류에 따라 일정 값 범위내에서 랜덤으로 판정되고, 생명 값이 남아있지 않으면 돼지가 죽게 된다.

맵내에 돼지는 7개가 유지되도록 되어 있으며, 돼지가 줄면, 돼지는 하늘에서 부터 떨어지게 되어 있다.

게임은 120초 동안 진행되며, 120초가 지나면 자동으로 종료된다.

종료 화면

게임이 종료되면 점수를 표시한다.

데이타  분석 지표 디자인

그러면 이 게임으로 어떻게 데이타를 분석할것인지에 대해서 고민해보자.

일일 접속 사용자나 사용자에 대한 사용 시간,횟수등은 파이어베이스 애널러틱스에서 기본적으로 수집이 되기 때문에, 조금 더 의미 있는 데이타를 수집해보도록 한다.

캐릭터 이동 히트맵

이 예제에서 다소 중점을 둔 부분중의 하나는 캐릭터 이동 히트맵이다.

게임에서 난이도 조정등에 사용할 수 있는 정보중의 하나가 NPC 캐릭터의 이동 동선과, 플레이어 캐릭터의 이동 동선이다. 주로 플레이어가 죽는 위치를 데드존 (Dead zone)이라고 하면, 이 데드존 위치를 찾아낼 수 있고, 이 데드존에서 플레이어와 NPC의 타입,레벨 등을 조사하여 난이도를 조정한다거나, 또는 AI(인공지능) 플레이어 캐릭터의 경우에는 이동 동선을 추적함으로써 맵 내에서 AI가 원하는 데로 잘 움직이는지를 추적해볼 수 있다.

아래는 데드존을 기반으로 캐릭터와 NPC의 레벨을 분석해놓은 예제이다.


<그림. 게임맵상에서 데드존의 플레이어와 NPC 캐릭터간의 레벨 분석 >


아래는 흥미로운 분석중의 한예인데, 게임맵에서, 각 위치별로 자주 발생하는 채팅 메세지를 표시한 내용이다.




<그림. 게임맵상에서 자주 사용되는 채팅 메세지 분석>


그림 출처 : http://www.cs.cornell.edu/courses/cs4152/2013sp/sessions/15-GameAnalytics.pdf


이런 시스템 역시 쉽게 개발이 가능한데, 파이어베이스 애널러틱스를 이용하여 채팅 로그를 수집한 후, 자연어 분석 API를 이용하면, 명사와 형용사등을 추출하여 자주 오가는 말들을 통계를 낼 수 있다.

http://bcho.tistory.com/1136 는 구글의 자연어 분석 API를 이용하여 트위터의 내용을 실시간으로 분석한 내용이다.

나이별  점수 분포

다음으로 일반적인 분석 시스템에서 수집되지 않는 커스텀 로그 분석 시나리오중 사용자 나이별 점수대를 분석해본다.

게임실행에서 종료까지 실행한 사용자

마지막으로 유용하게 사용되는 퍼널 분석의 예로 게임을 시작해서 종료할때까지의 도달율을 측정해봤다.

게임을 인스톨하고 시작한다음, 캐릭터를 움직이고, 캐릭터를 이용하여 공격을하고, 2분동안 플레이해서 게임을 종료한 사용자의 비율을 분석해본다.

로그 메세지 디자인

그러면 이러한 게임 로그를 분석하기 위해서 수집할 로그 메세지는 어떤 형태가 될지 디자인을 해보자.

로그 이벤트는 아래와 같이 7가지로 정의한다.

  • START_SESSION,END_SESSION 은 게임을 시작과 끝날때 발생하는 이벤트이다.

  • NPC_CREATE,NPC_MOVE,NPC_DIE 는 NPC(돼지)를 생성하고 이동하고, 그리고 죽었을때 각각 발생하는 이벤트이다. 이동은 이벤트의 수가 많기 때문에, 10초 단위로 수집하였다.

  • PLAYER_MOVE,PLAYER_ATTACK 은 플레이어 캐릭터의 이동과 NPC를 공격하는 이벤트를 수집한다.


각 이벤트를 플레이하는 판과 연결하기 위해서 각 플레이는 고유의 sessionid가 생성되서 게임이 시작될때부터 끝날때 까지 모든 이벤트에 저장된다.



Event name

Param

Key

Value

Type

Note


START_SESSION

This event is triggered when player press “START” button after submitting player’s age & gender

sessionid

Unique session Id for this play

String


age

Player’s age

String


sex

Player’s gender

String

true : man

false : woman

PLAYER_MOVE

It record location of player in game map periodically (every 2sec)

sessionid




Pos_X




Pox_Z




PLAYER_ATTACK

This event is occurred when player attack NPC.

sessionid

Unique session Id for this play



npc_id

Attacked NPC ID



type

Type of NPC



pos_X

NPC location X



pos_Z

NPC location Y



damage

Damage that NPC get in this attack



life

Left life for this NPC



NPC_CREATE

When new NPC is created, this event is logged.

sessionid

Unique session Id for this play



npc_id

Attacked NPC ID



type

Type of NPC



pos_X

NPC location X



pos_Y

NPC location Y



NPC_MOVE

Every 2sec for each NPC, it records the location of NPC.

sessionid

Unique session Id for this play



npc_id

Attacked NPC ID



type

Type of NPC



pos_X

NPC location X



pos_Y

NPC location Y



NPC_DIE

It is triggered when NPC is dead by attack

sessionid

Unique session Id for this play



npc_id

Attacked NPC ID



type

Type of NPC



pos_X

NPC location X



pos_Y

NPC location Y



END_SCENE

It is triggered when game stage(session) is over

sessionid

Unique session Id for this play



score

Score for this play




이렇게 정의된 로그는 파이어베이스 애널러틱스에 의해서 빅쿼리로 자동으로 저장되게 된다.

실시간 디버깅

이런 로깅을 삽입하면, 로그가 제대로 저장이 되는지 확인이 필요한데, 파이어베이스 애널러틱스는 특성상 로그 이벤트가 1000개가 쌓이거나 또는 컨버전 이벤트가 발생하거나 또는 1시간 주기로 로그를 서버에 전송하기 때문에 바로 올라오는 로그 메세지를 확인할 수 없다.

그래서 이번에 새로 소개되니 기능이 “DEBUG VIEW”라는 기능인데, 이 특정 디바이스에 디버깅 옵션을 지정하면, 실시간으로 올라오는 로그를 확인할 수 있다.

로그는 모바일앱에서 업로드한 후 약 10~20초 후에, 화면에 반영된다.



대쉬 보드를 이용한 지표 분석

대쉬 보드는 파이어 베이스 애널러틱스에서 기본으로 제공되는 지표로 모바일 서비스에 공통적으로 필요한 지표들을 분석하여 웹으로 출력해준다.

DAU/WAU/MAU 분석

가장 기본적인 지표로는 월간,주간,일간 방문자 수로를 그래프로 출력해준다.

평균 플레이 시간 분석

다음은 평균 플레이 시간으로, 사용자가 하루에 평균 얼마나 앱을 사용하였는지, 동시 접속자수 (Session)과,  한번 접속했을때 얼마나 오래 앱을 사용 하였는지 (Session duration)등을 분석하여 그래프로 출력해준다.


국가별 접속 내역 분석

다음은 국가별 접속 내용으로, 글로벌 서비스에는 필수로 필요한 분석 내용이다.


사용자 데모그래픽 정보 분석

사용자에 대한 데모 그래픽 정보 즉 성별과, 나이를 분석해주는데, 앱에 별도로 사용자 로그인 기능이 없거나, 사용자 정보를 추적하는 기능이 없더라도, 파이어베이스 애널러틱스는 여러군데에서 수집한 로그를 기반으로 사용자의 성별과 나이를 분석해 준다.



특정 이벤트에 대한 분석

다음은 특정 이벤트에 대한 분석이 가능하다. 게임에서 사용자가 스테이지를 넘어가는 이벤트등 파이어베이스에 정의된 이벤트 이외에도 사용자가 정의한 이벤트에 대한 분석이 가능하다.

또한 이벤트가 발생한 사용자에 대한 데모 그래픽 정보 (연령,성별,국가)를 같이 분석해서 해당 이벤트가 어떤 사용자 층에서 발생하였는지를 분석해 준다.


예를 들어 게임의 보너스 스테이지를 많이 클리어한 사용자의 통계만을 볼 수 있고, 그 보너스 스테이지를 클리어한 사용자의 나이,성별, 국가 정보등을 볼 수 있다.



게임 플레이 완료율에 대한 퍼널 분석

다음은 앞에서 데이타 분석 모델을 정의할때 정의한 문제로 사용자가 게임을 시작해서 플레이를 끝낸 사용자 까지를 퍼널(깔때기) 분석을 적용한 예이다.

해당 시간에 총 93번의 게임이 플레이 되었으며, 캐릭터까지는 이동하였으나, 공격을 하지 않은 플레이는 3번, 그리고 끝까지 게임 플레이를 끝낸 사용자는 총 62번으로 측정되었다.



이외에도 상품 구매에 대한(인앱)에 대한 분석이나, 디바이스 종류, 앱 버전, 그리고 어느 광고 네트워크에서 사용자가 인입되었는지 등의 분석등 다양한 분석이 가능한데, 대쉬보드의 자세한 지표에 대해서는 http://bcho.tistory.com/1132 를 참고하기 바란다.

노트북을 이용한 커스텀 로그 분석

앞에서는 파이어베이스에서 제공되는 로그와 분석 방법에 대해서만 분석을 진행하였다. 이번에는 커스텀 로그와 원본(raw)데이타를 이용한 데이타 분석에 대해서 알아보자.


모든 원본 데이타는 앞에서도 언급했듯이 구글의 빅쿼리에 저장되기 때문에, SQL 쿼리를 이용하여 자유롭게 데이타 분석이 가능하고 그래프로도 표현이 가능하다.

별도의 개발이 없이 자유롭게 쿼리를 실행하고 그래프로 표현할 수 있는 도구로는 노트북이 있는데, 빅쿼리는 주피터 노트북과 제플린이 지원된다. 주피처 노트북 오픈소스를 구글 클라우드에 맞춘 버전은 Google Cloud Datalab이라는 것이 있는데, 여기서는 데이타랩을 이용하여 분석하였다.

캐릭터 이동 히트맵 분석

앞에서 NPC_MOVE와 PLAYER_ATTACK을 이용하여, NPC의 이동 동선과, PLAYER가 공격을 한 위치를 수집하였다.

이를 히트맵으로 그려보면 다음과 같다.


좌측은 NPC가 주로 이동하는 경로이고 우측은 플레이어가 NPC를 주로 공격한 위치로, 많이 간곳일 수록 진하게 칠해진다.

NPC 캐릭터는 전체 맵에 걸쳐서 이동을 하는 것을 볼 수 있고, 주로 우측 나무 근처를 많이 움직이는 것을 볼 수 있다. 오른쪽 사용자가 공격한 위치를 보면 주로 중앙에 모여 있기 때문에 우측 나무 근처로 움직인 NPC는 생존 확률이 높았을 것으로 생각해볼 수 있다.

그리고 NPC 이동 맵에서 중간중간에 진하게 보이는 점은 NPC 가 생성되는 위치이기 때문에, 이동이 많이 관측되었다.

연령별 플레이 점수 분석

다음으로 플레이어 연령별 점수대를 보면, 최고 점수는 30대가 기록하였고, 대략 4900점대인데 반해서, 전체적인 평균 점수는 40대가 높은 것을 볼 수 있다. (이 데이타는 연령별로 수집된 데이타의 양이 그리 많지 않기 때문에 정확하지는 않다. 어디까지나 분석 예제용으로만 이해하기 바란다.)



분석에 사용된 코드는 아래에 있다. 이 코드는 데모용이고 최적화가 되어있지 않기 때문에, 운영 환경에서는 반드시 최적화를 해서 사용하기 바란다.


https://github.com/bwcho75/bigquery/blob/master/GameData/Game%20Data%20Demo.ipynb


참고로, 모든 데이타 분석은 주로 파이썬을 이용하였는데, 근래에 빅데이타 분석용 언어로 파이썬이 많이 사용되기 때문에, 파이썬을 공부해놓으면 좀 더 쉽게 데이타 분석이 가능하다. 또한 파이썬으로 데이타를 분석할때 많이 쓰이는 프레임웍으로는 팬다스 (pandas)와 넘파이 (numpy)가 있는데, 이 둘 역시 같이 익혀놓는것이 좋다.

파이어베이스 노티피케이션 서비스를 통한 이벤트 기반의 푸쉬 타게팅

파이어베이스 애널러틱스와 연계해서 유용하게 사용할 수 있는 기능은 파이어베이스 노티피케이션 이라는 서비스가 있다.


파이어 베이스 노티피케이션 서비스는 파이어베이스에서 제공되는 웹 콘솔을 이용하여 관리자가 모바일 서비스에 손쉽게 푸쉬 메세지를 보낼 수 있는 서비스이다.

푸쉬 타게팅을 위한 별도의 서버 시스템을 개발하지 않고도 마케팅이나 기획자등 비 개발인력이 타게팅된 푸쉬 메세지를 손쉽게 보낼 수 있게 디자인된 서비스인데, 특히 파이어 베이스 애널러틱스와 연계가 되면 세세한 타게팅이 가능하다.


이벤트 로그 기반의 타케팅

푸쉬 타겟을 정할때, 파이어베이스 애널러틱스에서 수집한 이벤트를 조건으로 해서 푸쉬를 타게팅할 수 있다.

예를 들어

  • 게임 스테이지 3 이상을 클리어한 플레이어한 푸쉬를 보낸다.

  • NPC를 10,000개 이상 죽인 플레이어에게 푸쉬를 보낸다.

  • 아이템을 100개이상 구매한 사용자에게 푸쉬를 보낸다.

와 같이 서비스에서 수집된 이벤트에 따라서 다양한 조건을 정의할 수 있다.



<그림. 파이어베이스 노티피케이션에서 특정 사용자 층을 타게팅 해서 보내는 화면 >


이런 타게팅은 파이어베이스 애널러틱스에서 Audience로 사용자 군을 정의한 후에, (로그 이벤트 조건이나 사용자 이벤트 조건 등), 이 조건에 타겟해서 푸쉬를 파이어베이스 노티피케이션 서비스에서 정의한다.

사용자 정보 기반의 타게팅

서비스의 로그 이벤트 정보뿐 아니라, 사용자에 대해서도 푸쉬 타게팅이 가능한데, 특정 성별이나 나이에 대해 푸쉬를 보내거나, 특정 단말을 사용하는 사용자, 특정 국가에 있는 사용자등 다양한 사용자 관련 정보로 푸쉬를 보낼 수 있다.

사용자 정보 역시 앞의 이벤트 로그 정보처럼 개발자가 커스텀 필드를 추가하여 사용자 정보를 로그에 수집할 수 있다.


스케쥴링

이런 타게팅 푸쉬는 바로 웹에서 보낼 수 도 있지만, 특정 시간에 맞춰서 미리 예약을 해놓는 것도 가능하다.  




비용 정책 분석

파이어베이스 애널러틱스에서 원본 데이타를 수집 및 분석 하려면 빅쿼리를 연동해야 하는데, 빅쿼리 연동은 파이어베이스의 무료 플랜으로는 사용이 불가능하다. Blaze 플랜으로 업그레이드 해야 하는데, Blaze 플랜은 사용한 만큼 비용을 내는 정책으로 다른 서비스를 사용하지 않고, 파이어베이스 애널러틱스와 빅쿼리 연동만을 사용할 경우에는 파이어베이스에 추가로 과금되는 금액은 없다. (0원이다.)

단 빅쿼리에 대한 저장 가격과 쿼리 비용은 과금이 되는데,  빅쿼리 저장 가격은 GB당 월 0.02$ 이고, 90일동안 테이블의 데이타가 변하지 않으면 자동으로 0.01$로 50%가 할인된다.

그리고 쿼리당 비용을 받는데, 쿼리는 GB 스캔당 0.005$가 과금된다.


자세한 가격 정책 및, 파이어베이스 애널러틱스에 대한 데이타 구조는 http://bcho.tistory.com/1133 를 참고하기 바란다.

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

수포자를 위한 딥러닝


#3 - 텐서플로우로 선형회귀 학습을 구현해보자


조대협 (http://bcho.tistory.com)


앞에서 살펴본 선형 회귀(Linear regression) 머신 러닝 모델을 실제 프로그래밍 코드를 만들어서 학습을 시켜보자. 여러가지 언어를 사용할 수 있지만, 이 글에서는 텐서플로우를 기반으로 설명한다.

텐서플로우 개발 환경 셋업

텐서 플로우 개발 환경을 설정하는 방법은 여러가지가 있지만, 구글 클라우드의 데이타랩 (datalab)환경을 사용하기로 한다. 텐서플로우 환경을 설정하려면 파이썬 설치 및 연관된 수학 라이브러리를 설치해야 하는 등 설치가 까다롭기 때문에, 구글 클라우드에서 제공하는 파이썬 노트북 (Jupyter 노트북 : http://jupyter.org/ ) 이 패키징 된 도커 이미지를 사용하기로 한다.

파이썬 노트북은 일종의 위키나 연습장 같은 개념으로 연산등에 필요한 메모를 해가면서 텐서 플로우나 파이썬 코드도 적어넣고 실행도 할 수 있기 때문에 데이타 관련 작업을 하기 매우 편리하다.

또한 도커로 패키징된 데이타랩 환경은 로컬에서나 클라우드 등 아무곳에서나 실행할 수 있기 때문에 편리하고 별도의 과금이 되지 않기 때문에 편리하게 사용할 수 있다.

구글 클라우드 계정 및 프로젝트 생성

GCP 클라우드를 사용하기 위해서는 구글 계정에 가입한다. 기존에 gmail 계정이 있으면 gmail 계정을 사용하면 된다. http://www.google.com/cloud 로 가서, 좌측 상당에 Try it Free 버튼을 눌러서 구글 클라우드에 가입한다.





다음 콘솔에서 상단의 Google Cloud Platform 을 누르면 좌측에 메뉴가 나타나는데, 메뉴 중에서 “결제" 메뉴를 선택한후 결제 계정 추가를 통해서 개인 신용 카드 정보를 등록한다.



개인 신용 카드 정보를 등록해야 모든 서비스를 제한 없이 사용할 수 있다.  단 Trial의 경우 자동으로 한달간 300$의 비용을 사용할 수 있는 크레딧이 자동으로 등록되니, 이 범위를 넘지 않으면 자동으로 결제가 되는 일이 없으니 크게 걱정할 필요는 없다.

프로젝트 생성

계정 생성 및 결제 계정 세팅이 끝났으면 프로젝트를 생성한다.

프로젝트는 VM이나 네트워크 자원, SQL등 클라우드 내의 자원을 묶어서 관리하는 하나의 집합이다. 여러 사람이 하나의 클라우드를 사용할때 이렇게 프로젝트를 별도로 만들어서 별도로 과금을 하거나 각 시스템이나 팀별로 프로젝트를 나눠서 정의하면 관리하기가 용이하다.


화면 우측 상단에서 프로젝트 생성 메뉴를  선택하여 프로젝트를 생성한다.




프로젝트 생성 버튼을 누르면 아래와 같이 프로젝트 명을 입력 받는 창이 나온다. 여기에 프로젝트명을 넣으면 된다.

도커 설치

이 글에서는 로컬 맥북 환경에 데이타랩을 설치하는 방법을 설명한다.

데이타 랩은 앞에서 언급한것과 같이 구글 클라우드 플랫폼 상의 VM에 설치할 수 도 있고, 맥,윈도우 기반의 로컬 데스크탑에도 설치할 수 있다. 각 플랫폼별 설치 가이드는  https://cloud.google.com/datalab/docs/quickstarts/quickstart-local 를 참고하기 바란다. 이 문서에서는 맥 OS를 기반으로 설치하는 방법을 설명한다.


데이타 랩은 컨테이너 솔루션인 도커로 패키징이 되어 있다. 그래서 도커 런타임을 설치해야 한다.

https://www.docker.com/products/docker 에서 도커 런타임을 다운 받아서 설치한다.

도커 런타임을 설치하면 애플리케이션 목록에 다음과 같이 고래 모양의 도커 런타임 아이콘이 나오는 것을 확인할 수 있다.

하나 주의할점이라면 맥에서 예전의 도커 런타임은 오라클의 버추얼 박스를 이용했었으나, 제반 설정등이 복잡하기 때문에, 이미 오라클 버추얼 박스 기반의 도커 런타임을 설치했다면 이 기회에, 도커 런타임을 새로 설치하기를 권장한다.

다음으로 도커 사용을 도와주는 툴로 Kitematic 이라는 툴을 설치한다. (https://kitematic.com/) 이 툴은 도커 컨테이너에 관련한 명령을 내리거나 이미지를 손쉽게 관리할 수 있는 GUI 환경을 제공한다.


구글 클라우드 데이타 랩 설치

Kitematic의 설치가 끝났으면 데이타랩 컨테이너 이미지를 받아서 실행해보자, Kitematic 좌측 하단의 “Dokcer CLI” 버튼을 누르면, 도커 호스트 VM의 쉘 스크립트를 수행할 수 있는 터미널이 구동된다.



터미널에서 다음 명령어를 실행하자


docker run -it -p 8081:8080 -v "${HOME}:/content" \

 -e "PROJECT_ID=terrycho-firebase" \

 gcr.io/cloud-datalab/datalab:local


데이타랩은 8080 포트로 실행이 되고 있는데, 위에서 8081:8080은  도커 컨테이너안에서 8080으로 실행되고 있는 데이타 랩을 외부에서 8081로 접속을 하겠다고 정의하였고, PROJECT_ID는 데이타랩이 접속할 구글 클라우드 프로젝트의 ID를 적어주면 된다. 여기서는 terrycho-firebase를 사용하였다.

명령을 실행하면, 데이타랩 이미지가 다운로드 되고 실행이 될것이다.

실행이 된 다음에는 브라우져에서 http://localhost:8081로 접속하면 다음과 같이 데이타랩이 수행된 것을 볼 수 있다.


학습하기

이제 텐서 플로우 기반의 머신러닝을 위한 개발 환경 설정이 끝났다. 이제 선형 회귀 모델을 학습 시켜보자

테스트 데이타 만들기

학습을 하려면 데이타가 있어야 하는데, 여기서는 랜덤으로 데이타를 생성해내도록 하겠다. 다음은 데이타를 생성하는 텐서 플로우코드이다.

텐서 플로우 자체에 대한 설명과 문법은 나중에 기회가 되면 별도로 설명하도록 하겠다.


import numpy as np

num_points = 200

vectors_set = []

for i in xrange(num_points):

 x = np.random.normal(5,5)+15

 y =  x*1000+ (np.random.normal(0,3))*1000

 vectors_set.append([x,y])

 

x_data = [v[0] for v in vectors_set ]

y_data = [v[1] for v in vectors_set ]


for 루프에서 xrange로 200개의 샘플 데이타를 생성하도록 하였다.

x는 택시 주행거리로,  정규 분포를 따르는 난수를 생성하되 5를 중심으로 표준편차가 5인 데이타를 생성하도록 하였다. 그래프를 양수로 만들기 위해서 +15를 해주었다.

다음으로 y값은 택시비인데, 주행거리(x) * 1000 + 정규 분포를 따르는 난수로 중심값은 0, 그리고 표준편차를 3으로 따르는 난수를 생성한후, 이 값에 1000을 곱하였다.

x_data에는 x 값들을, 그리고 y_data에는 y값들을 배열형태로 저장하였다.


값들이 제대로 나왔는지 그래프를 그려서 확인해보자. 아래는 그래프를 그리는 코드이다.

Pyplot이라는 모듈을 이용하여 plot이라는 함수를 이용하여 그래프를 그렸다. Y축은 0~40000, X축은 0~35까지의 범위를 갖도록 하였다.

import matplotlib.pyplot as plt

plt.plot(x_data,y_data,'ro')

plt.ylim([0,40000])

plt.xlim([0,35])

plt.xlabel('x')

plt.ylabel('y')

plt.legend()

plt.show()

그려진 그래프의 모양은 다음과 같다.


학습 로직 구현 

이제 앞에서 생성한 데이타를 기반으로해서 선형 회귀 학습을 시작해보자. 코드는 다음과 같다.


import tensorflow as tf


W = tf.Variable(tf.random_uniform([1],-1.0,1.0))

b = tf.Variable(tf.zeros([1]))

y = W * x_data + b


loss = tf.reduce_mean(tf.square(y-y_data))

optimizer = tf.train.GradientDescentOptimizer(0.0015)

train = optimizer.minimize(loss)


init = tf.initialize_all_variables()

sess = tf.Session()

sess.run(init)


for step in xrange(10):

 sess.run(train)

 print(step,sess.run(W),sess.run(b))

 print(step,sess.run(loss))

 

 plt.plot(x_data,y_data,'ro')

 plt.plot(x_data,sess.run(W)*x_data + sess.run(b))

 plt.xlabel('x')

 plt.ylabel('y')

 plt.legend()

 plt.show()


W의 초기값은 random_uniform으로 생성을 한다. 초기값은 -1.0~1.0 사이의 값으로 생성하도록 하였다.

( radom_uniform 에서 첫번째 인자 [1]은 텐서의 차원을 설명하는데, 1은 1차원으로 배열과 같은 형태가 2는 2차원으로 행렬과 같은 형태, 3은 3차원 행렬 행태가 된다.)

다음 b는 tf.zeros([1])으로 정의했는데, 1차원 텐서로 값이 0이 된다. (zeros)

학습을 하고자 하는 공식 (가설은) y = W * x_data + b 이 된다.


다음으로 코스트 함수와 옵티마이져를 지정하는데, 코스트 함수는 앞 글에서 설명한것과 같이 

가설에 의해 계산된 값 y에서 측정값 y_data를 뺀후에, 이를 제곱하여 평균한 값이다. 코드로 옮기면 다음과 같다.

loss = tf.reduce_mean(tf.square(y-y_data))


코스트 함수에서 최소 값을 구하기 위해서 옵티마이저로 경사하강법 (Gradient descent) 알고리즘을 사용하기 때문에, 옵티마이저로 tf.train.GradientDescentOptimizer(0.0015) 과 같이 지정하였다. 인자로 들어가는 0.0015는 경사 하강법에서 학습 단계별로 움직이는 학습 속도를 정의하는 것으로 러닝 레이트 (Learning rate라고 한다)) 이 내용은 뒤에서 다시 자세하게 설명하겠다.

코스트 함수와 옵티마이져(Gradient descent)가 정의되었으면 트레이닝 모델에 적용한다.

train = optimizer.minimize(loss)

는 경사 하강법(Gradient descent) 을 이용하여 코스트 함수 (loss)가 최소가 되는 값을 찾으라는 이야기이다.


다음 코드에서는 for loop로 학습을 10번을 반복해가면서 학습을 하라는 이야기로,

for step in xrange(10):

 sess.run(train)

 print(step,sess.run(W),sess.run(b))

 print(step,sess.run(loss))


학습 단계별로, W,b값 그리고 loss의 값을 화면으로 출력하도록 하였다.

그리고 학습이 어떻게 되는지 그래프로 표현하기 위해서

 plt.plot(x_data,sess.run(W)*x_data + sess.run(b))

X_data를 가로축으로 하고, W*x_data + b의 값을 그래프로 출력하도록 하였다.


이렇게 해서 학습을 진행하면 다음과 같은 그래프가 순차적으로 출력되는 것을 확인할 수 있다.


그래프가 점점 데이타의 중앙에 수렴하면서 조정되는 것을 확인할 수 있다.

이렇게 해서 맨 마직막에 다음과 같은 결과가 출력된다.



W는 1018, b는 51 그리고 코스트의 값은 10272684.0이 됨을 확인할 수 있다.

이렇게 학습이 끝났고, 이제 거리에 따른 택시비는

(택시비) = 1018 * (거리) + 51로 

이 공식을 가지고 거리에 따른 택시비를 예측할 수 있다.


테스트에 사용한 모든 데이타는 링크를 참고하면 얻을 수 있다.

https://github.com/bwcho75/tensorflowML/blob/master/1.%20Linear%20Regression.ipynb

학습 속도(러닝 레이트 / Learning Rate) 조정하기 

앞의 예제에서 optimizer를  tf.train.GradientDescentOptimizer(0.0015) 에서 0.0015로 학습 속도를 지정하였다. 그렇다면 학습 속도란 무엇인가?


선형 회귀 분석의 알고리즘을 되 짚어보면, 가설에 의한 값과 원래값의 차이를 최소화 하는 값을 구하는 것이 이 알고리즘의 내용이고, 이를 코스트 함수를의 최소값을 구하는 것을 통해서 해결한다.

W의 값을 조정해 가면서 코스트의 값이 최소가 되는 값을 찾는데, 이때 경사 하강법 (Gradient descent)방법을 사용하고 경사의 방향에 따라서 W의 값을 조정하는데, 다음 W의 값이 되는 부분으로 이동하는 폭이 학습 속도 즉 러닝 레이트이다. (아래 그림)


이 예제에서는 학습 속도를 0.0015로 설정하고, 매번 학습 마다 W를 경사 방향으로 0.0015씩 움직이도록 하였다.  그러면 적정 학습 속도를 어떻게 구할까?

오퍼 슈팅 (Over shooting)

먼저 학습 속도가 크면 어떤일이 벌어지는지를 보자

학습 속도를 0.1로 주고 학습을 시키면 어떤 결과가 생길까?

W,b 그리고 cost 함수를 찍어보면 다음과 같은 결과가 나온다.

(0, array([ 86515.3671875], dtype=float32), array([ 4038.51806641], dtype=float32))
(0, 3.1747764e+12) ← cost
(1, array([-7322238.], dtype=float32), array([-341854.6875], dtype=float32))
(1, 2.3281766e+16)
(2, array([  6.27127488e+08], dtype=float32), array([ 29278710.], dtype=float32))
(2, 1.7073398e+20)
(3, array([ -5.37040691e+10], dtype=float32), array([ -2.50728218e+09], dtype=float32))
(3, 1.252057e+24)
(4, array([  4.59895629e+12], dtype=float32), array([  2.14711517e+11], dtype=float32))
(4, 9.1818105e+27)
(5, array([ -3.93832261e+14], dtype=float32), array([ -1.83868557e+13], dtype=float32))
(5, 6.7333667e+31)
(6, array([  3.37258807e+16], dtype=float32), array([  1.57456078e+15], dtype=float32))
(6, 4.9378326e+35)
(7, array([ -2.88812128e+18], dtype=float32), array([ -1.34837741e+17], dtype=float32))
(7, inf)
(8, array([  2.47324691e+20], dtype=float32), array([  1.15468523e+19], dtype=float32))
(8, inf)
(9, array([ -2.11796860e+22], dtype=float32), array([ -9.88816316e+20], dtype=float32))
(9, inf)

Cost 값이 3.1e+12,2.3e+16,1.7e+20 ... 오히려 커지다가 7,8,9에서는 inf(무한대)로 가버리는 것을 볼 수 있다.


그래프를 보면 다음과 같은 형태의 그래프가 나온다.


학습이 진행될 수 록, 코스트 함수의 결과 값이 작아지면서 수렴이 되어야 하는데,  그래프의 각이 서로 반대로 왔다갔다 하면서 발산을 하는 모습을 볼 수 있다.

코스트 함수의 그래프를 보고 생각해보면 그 원인을 알 수 있다.


학습 속도의 값이 크다 보니, 값이 아래 골짜기로 수렴하지 않고 오히려 반대편으로 넘어가면서 점점 오히려 그래프 바깥 방향으로 발산하면서, W값이 발산을 해서 결국은 무한대로 간다. 이를 오버 슈팅 문제라고 한다.

그래서, 학습 과정에서 코스트 값이 수렴하지 않고 점점 커지면서 inf(무한대)로 발산하게 되면, 학습 속도가 지나치게 큰것으로 판단할 수 있다.

스몰 러닝 레이트(Small Learning Rate)

반대로 학습 속도가 매우 작을때는 어떤일이 발생할까?

학습속도를 0.0001로 작게 설정을 해보자.


(0, array([ 86.40672302], dtype=float32), array([ 4.03895712], dtype=float32))
(0, 3.6995174e+08)
(1, array([ 165.43540955], dtype=float32), array([ 7.72794485], dtype=float32))
(1, 3.1007162e+08)
(2, array([ 237.61743164], dtype=float32), array([ 11.09728241], dtype=float32))
(2, 2.6011749e+08)
(3, array([ 303.54595947], dtype=float32), array([ 14.17466259], dtype=float32))
(3, 2.18444e+08)
(4, array([ 363.76275635], dtype=float32), array([ 16.98538017], dtype=float32))
(4, 1.8367851e+08)
(5, array([ 418.76269531], dtype=float32), array([ 19.55253601], dtype=float32))
(5, 1.5467589e+08)
(6, array([ 468.99768066], dtype=float32), array([ 21.89723206], dtype=float32))
(6, 1.304809e+08)
(7, array([ 514.8805542], dtype=float32), array([ 24.03874016], dtype=float32))
(7, 1.1029658e+08)
(8, array([ 556.78839111], dtype=float32), array([ 25.99466515], dtype=float32))
(8, 93458072.0)
(9, array([ 595.06555176], dtype=float32), array([ 27.78108406], dtype=float32))
(9, 79410816.0)

 

코스트값이 점점 작은 값으로 작아지는 것을 볼 수 있지만 계속 감소할 뿐 어떤 값에서 정체 되거나 수렴이 되는 형태가 아니다.

그래프로 표현해보면 아래 그래프와 같이 점점 입력 데이타에 그래프가 가까워 지는 것을 볼 수 있지만, 입력 데이타에 그래프가 겹쳐지기 전에 학습이 중지 됨을 알 수 있다.


이런 문제는 학습속도가 너무 작을 경우 아래 그림 처럼, 코스트 값의 최소 값에 도달하기전에, 학습이 끝나버리는 문제로 Small learning rate 라고 한다.




이 경우에는 학습 횟수를 느리거나 또는 학습 속도를 조절함으로써 해결이 가능하다.


다음글에서는 분류 문제의 대표적인 알고리즘인 로지스틱 회귀 (Logistic Regression)에 대해서 알아보도록 한다.


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

수포자를 위한 딥러닝

#1 - 머신러닝의 개요

조대협(http://bcho.tistory.com)

들어가기에 앞서서 

몇년전부터 빅데이타와 머신러닝이 유행하면서 이분야를 공부해야겠다고 생각을 하고 코세라의 Andrew.NG 교수님의 강의도 듣고, 통계학 책도 보고, 수학적인 지식이 부족해서 고등학교 수학 참고서도 봤지만, 도저히 답이 나오지 않는다. 머신 러닝에 사용되는 알고리즘은 복잡도가 높고 일반적인 수학 지식으로 이해조차 어려운데, 실제 운영 시스템에 적용할 수 있는 수준의 알고리즘은 석박사급의 전문가적인 지식이 아니면 쉽게 만들 수 없는 것으로 보였다. 예를 들어 인공지능망(뉴럴네트워크:Neural Network) 알고리즘에 대한 원리는 이해할 수 있지만, 실제로 서비스에 사용되는 알고르즘을 보니 보통 60~90개의 계층으로 이루어져 있는데, (그냥 복잡하다는 이야기로 이해하면 됨) 이런 복잡한 알고리즘을 수학 초보자인 내가 만든다는 것은 거의 불가능에 가까워 보였고, 이런것을 만들기 위해서 몇년의 시간을 투자해서 머신러닝 전문가로 커리어패스를 전환할 수 는 있겠지만 많은 시간과 노력이 드는데 반해서, 이미 나에게는 소프트웨어 개발과 백앤드 시스템이라는 전문분야가 있어싸.

그래도 조금씩 보다보니, 머신 러닝에서 소개되는 알고리즘은 주로 사용되는 것은 약 20개 내외였고, 이미 다 정형화 되어 있어서 그 알고리즘을 만들어내기보다는, 가져다 쓰기만 하면 될 것 같다는 느낌이 들었다. 아직 많이 보지는 못했지만, 실제로 머신 러닝 기반의 시스템들은 나와 있는 알고리즘을 코드로 옮겨서 운영 환경에 올리는 경우가 대부분이었다.

비유를 하자면 우리가 복잡한 해쉬 리스트나, 소팅 알고리즘을 모르고도 간단하게 프로그래밍 언어에 있는 라이브러리를 가져다 쓰는 것과 같은 원리라고나 할까? 그래서, 완벽하게 이해하고 만들기 보다는 기본적인 원리를 파악하고 이미 공개된 알고리즘과 특히 레퍼런스 코드를 가져다가 운영환경에다 쓸 수 있는 정도의 수준을 목표로 하기로 했다.

이제 아주 아주 초보적인 수준의 이해를 가지고, 구글의 텐서플로우 기반으로 머신러닝과 딥러닝을 공부하면서 내용을 공유하고자 한다. 글을 쓰는 나역시도 수포자이며 머신러닝에 대한 초보자이기 때문에, 설명이 부족할 수 도 있고, 틀린 내용이 있을 수 있음을 미리 알리고 시작한다. (틀린 내용은 알려주세요)

머신러닝

머신 러닝은 데이타를 기반으로 학습을 시켜서 몬가를 예측하게 만드는 기법이다.

통계학적으로는 추측 통계학 (Inferential statistics)에 해당하는 영역인데, 근래에 들어서 알파고와 같은 인공지능이나 자동 주행 자동차, 로봇 기술등을 기반으로 주목을 받고 있다.



<그림. 구글의 자동 주행 자동차>


간단한 활용 사례를 보면

  • 학습된 컴퓨터에 의한 이메일 스팸 필터링

  • 편지지의 우편번호 글자 인식

  • 쇼핑몰이나 케이블 TV의 추천 시스템

  • 자연어 인식

  • 자동차 자율 주행

등을 볼 수 있다.


이러한 시나리오는 지속적인 샘플 데이타를 수집 및 정제하고 지속적으로 알고리즘을 학습해나감에 따라서 최적의 알고리즘을 찾아나가도록 한다.

쇼핑몰의 추천 시스템의 경우 사용자의 구매 패턴을 군집화하여 유사한 패턴을 찾아냄으로써 적절한 상품을 추천하는데, 예를 들어 30대 남성/미혼/연수입 5000만원/차량 보유한 사용자가 카메라,배낭등을 구매했을 경우 여행 상품을 구매할 확률이 높다는 것을 학습하였을때, 이러한 패턴의 사용자에게 여행 상품을 추천해주는 것과 같은 답을 제공할 수 있다.

지도 학습과 비지도 학습

머신러닝은 학습 방법에 따라서 지도 학습 (Supervised Learning)과 비지도 학습 (Unsupervised Learning)으로 분류될 수 있다.

지도 학습 (Supervised Learning)



예를 들어 학생에게 곱셈을 학습 시킬때,

“2*3=6이고, 2*4=8이야, 그러면 2*5= 얼마일까? “

처럼 문제에 대한 정답을 주고 학습을 한 후, 나중에 문제를 줬을때 정답을 구하도록 하는 것이 지도 학습 (Supervised Learning)이다.

비지도 학습 (Unsupervised learning)

반대로 비지도 학습은 정답을 주지않고 문제로만 학습을 시키는 방식을 비지도 학습이라고 한다.

예를 들어 영화에 대한 종류를 학습 시키기 위해서, 연령,성별과 영화의 종류 (액션, 드라마, SF)를 학습 시켰을때, 이를 군집화 해보면 20대 남성은 액션 영화를 좋아하고 20대 여성은 드라마 영화를 좋아 하는 것과 같은 군집된 결과를 얻을 수 있고, 이를 기반으로 20대 남성이 좋아하는 영화의 종류는 유사한 군집의 결과인 ”액션 영화" 라는 답을 내게 되낟.


여기서 문제에 대한 답을 전문적인 용어로 이야기 하면 라벨된 데이타 (Labeled data)라고 한다.


머신러닝의 대표적인 문제 Regression과 Classification 문제

머신러닝을 이용해서 해결하는 문제의 타입은 크게 regression과 classification 문제 두가지로 대표가 된다.

Classification

Classification은 입력값에 대한 결과값이 연속적이지 않고 몇개의 종류로 딱딱 나눠서 끊어지는 결과가 나오는 것을 이야기 한다. 예를 들어 종양의 크기가 0.3cm 이상이고 20대이면, 암이 양성, 또는 종양의 크기가 0.2cm 이하이고 30대이면, 암이 음성과 같이 결과 값이 ”양성암/음성암"과 같이 두개의 결과를 갖는 것이 예가 된다.


<종양 크기에 따른, 암의 양성/음성 여부에 대한 그래프>

또 다른 예로는 사진을 업로드 했을때, 사진의 물체를 인식할때 ”이사진은 개이다.” “이사진은 고양이이다.” 처럼 특정 종류에 대한 결과값이 나오는 것 역시 Classification 문제로 볼 수 있다.


Regression

Regression 문제는 결과값이 연속성을 가지고 있을때 Regression 문제라고 한다. 즉 택시의 주행거리에 따른 요금과 같은 문제인데, 변수 택시 주행 거리에 대해서, 결과 택시 값이 기대 되는 경우로 변수와 결과값이 연속적으로 이루어 지는 경우를 말한다.


<그림. 주행 거리에 따른 택시비 >

머신 러닝과 딥러닝

이러한 머신 러닝의 분야중, 인공 지능망 (뉴럴 네트워크 / Artificial neural network)라는 기법이 있는데, 사람의 뇌의 구조를 분석하여, 사람 뇌의 모양이 여러개의 뉴런이 모여서 이루어진것 처럼, 머신 러닝의 학습 모델을 두뇌의 모양과 같이 여러개의 계산 노드를 여러 층으로 연결해서 만들어낸 모델이다.


<알파고에 사용된 뉴럴네트워크 구조>


이 모델은 기존에 다른 기법으로 풀지 못하였던 복잡한 문제를 풀어낼 수 있었지만, 계층을 깊게 하면 계산이 복잡하여 연산이 불가능하다는  이유로 그간 관심을 가지고 있지 못했다가

캐나다의 CIFAR (Canadian Institute for Advanced Research) 연구소에서 2006년에 Hinton 교수가 ”A fast learning algorithm for deep belifef nets” 논문을 발표하게 되는데,  이 논문을 통해서 뉴럴네트워크에 입력하는 초기값을 제대로 입력하면 여러 계층의 레이어에서도 연산이 가능하다는 것을 증명하였고,  2007년 Yosua Bengio 라는 분이 ”Greedy Layer-Wise training of deep network” 라는 논문에서 깊게 신경망을 구축하면 굉장히 복잡한 문제를 풀 수 있다는 것을 증명해냈다.


이때 부터 뉴럴네트워크가 다시 주목을 받기 시작했는데,  이때 뉴럴 네트워크라는 모델을 사람들에게 부정적인 인식이 있었기 때문에, 다시 이 뉴럴 네트워크를 딥러닝 (Deep learning)이라는 이름으로 다시 브랜딩을 하였다.

그 이후에 IMAGENET 챌린지라는 머신러닝에 대한 일종의 컨테스트가 있는데, 이 대회는 이미지를 입력하고 머신 러닝을 통해서 컴퓨터가 이미지의 물체등을 인식할 수 있게 하는 대회로, 머신 러닝 알고리즘의 정확도를 측정하는 대회이다. 이 대회에서 2012년   Hinton 교수님 랩에 있던 Alex 라는 박사 과정의 학생이 딥러닝 기반의 머신 러닝 알고리즘으로 혁신 적인 결과를 내었고 지금은 이 딥러닝이 머신 러닝의 큰 주류중의 하나로 자리잡게 되었다.


<이미지넷에서 사용되는 이미지>



저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


노트7의 소셜 반응을 분석해 보았다. 


#2 구현하기


조대협 (http://bcho.tistory.com)

지난번 글 http://bcho.tistory.com/1136에 이어서, 트위터를 통한 소셜 반응을 분석하는 시 스템을 구축하는 방법에 대해서 알아본다. 

시나리오 및 아키텍쳐

스트리밍 처리와 데이타 플로우에 대한 개념 이해가 끝났으면 이제 실제로 실시간 분석 애플리케이션을 만들어보자.

SNS를 이용한 마케팅 분석에서 대표적인 시나리오중 하나는 트위터 피드를 분석하여, 사람들의 반응을 분석하는 시나리오이다. 자주 언급 되는 단어나 형용사를 분석함으로써, 특정 제품이나 서비스에 대한 소셜 네트워크상의 바이럴 반응을 분석할 수 있는데, 여기서  구현하고자 하는 시나리오는 다음과 같다. 트위터 피드에서 특정 키워드로 트윗 문자열들을 수집한 후에, 구글의 자연어 분석 API를 통하여 트윗 문자열에서 명사와 형용사를 추출한다

추출한 명사와 형용사의 발생 횟수를 통계내어서 대쉬보드에 출력하는 시나리오이다.


이를 구현하기 위한 솔루션 아키텍쳐는 다음과 같다.


fluentd를 이용하여 트위터의 특정 키워드를 기반으로 트위터 피드를 수집하고, 수집된 피드들은 구글 클라우드의 큐 서비스인 Pub/Sub으로 전달된다. 전달된 데이타는 데이타 플로우에서 읽어서 필요한 데이타만 필터링한 후, 구글의 자연어 분석 API를 통해서 명사와 형용사를 분리한다.

분리된 명사와 형용사는 데이타플로우에서 30초 주기의 고정윈도우(Fixed Window) 단위로, 명사에서 발생한 단어의 수와, 형용사에서 발생한 단어의 수를 카운트 한 다음에, 빅쿼리에 명사 테이블과 형용사 테이블에 저장한다.

저장된 데이타는 구글의 리포팅 도구인 데이타 스튜디오를 통해서 그래프로 출력한다.


구현

그러면 위에서 설명한 아키텍쳐대로 시스템을 하나씩 구현해보자.

전체 예제 코드와 설정 파일은 https://github.com/bwcho75/googledataflow/tree/master/twitter 에서 받아볼 수 있다.

트위터 피드 수집 서버 설정

먼저 트위터에서 피드를 수집하기 위해서 fluentd 에이전트를 설정한다. 구글 컴퓨트 엔진에서 VM을 생성한 후에, 앞의 빅쿼리 예제에서 한것과 마찬가지로 fluentd 에이전트를 설치한다.

VM을 설치할때, 반드시 Cloud API access scopes를 full API access로 설정해야 하는데, 이 VM에서 fluentd를 통해서 수집한 피드를 Pub/Sub으로 전달할때, Pub/Sub API를 사용하기 때문이다.


Fluentd 가 설치되었으면 Pub/Sub으로 데이타를 전달하기 때문에,Fluentd pub/sub 에이전트를 추가설치 한다.

에이전트명은 “fluent-plugin-gcloud-pubsub”로

% sudo td-agent-gem install fluent-plugin-gcloud-pubsub

명령을 이용해서 설치한다.


에이전트 설치가 끝났으면 fluentd 에이전트 설정을 해야 한다.

다음은 트위터에서 “note7”에 관련된 피드를 읽어서 pub/sub 큐로 피드를 전송하는 fluentd 설정 예제이다.


<source>

 type twitter

 consumer_key        트위터 Consumer Key

 consumer_secret     트위터 Consumer Secrect

 oauth_token         트위터 Access Token

 oauth_token_secret  트위터 Access Token Secrect

 tag                 input.twitter.sampling  # Required

 timeline            sampling                # Required (tracking or sampling or location or userstream)

 keyword             note7

 output_format       nest                    # Optional (nest or flat or simple[default])

</source>

<match input.twitter.sampling>

 type gcloud_pubsub

 project 본인의 프로젝트명

 topic projects/본인의 프로젝트명/topics/twitter

 key 다운로드받은 구글 클라우드 억세스 토큰 JSON 파일

 flush_interval 10

 autocreate_topic false

</match>


Fluentd 설정이 끝났다.

Pub/Sub 큐 설정

다음으로는 fluentd 읽어드린 트위터 피드를 받아드를 Pub/Sub 큐를 생성한다.

큐 생성 방법에 대해서는 앞의 Pub/Sub 챕터를 참고하기 바란다. (http://bcho.tistory.com/1120)

큐 이름은 twitter라고 한다. 전체 큐 이름은 “projects/본인 프로젝트명/twitter” 가 된다.

데이타 플로우 프로젝트 생성

큐까지 데이타를 읽어드렸으면, 이 데이타를 처리할 데이타 플로우 파이프라인을 구현한다.

이클립스에서 데이타 플로우 파이프라인 프로젝트를 생성하자. 프로젝트 생성은 앞장의 “데이타 플로우 개발환경 설정" 부분을 참고하기 바란다. (http://bcho.tistory.com/1128)


프로젝트가 생성되었으면, 이 프로젝트에서 사용할 의존성 라이브러리들을 메이븐 (maven) 빌드 스크립트인 pom.xml에 추가해준다.

추가해야 하는 API는 JSON 파싱을 위한 javax.json-api와, javax.json 그리고 구글의 자연서 분석 API를 호출하기 위한 google-api-client와 google-api-service-language 모듈이다.


다음 코드 블럭을 <dependencies> 엘리먼트 아래 하부 엘리먼트로 추가해준다


   <dependency>

   <groupId>javax.json</groupId>

   <artifactId>javax.json-api</artifactId>

   <scope>provided</scope>

   <version>1.0</version>

</dependency>

<dependency>

   <groupId>org.glassfish</groupId>

   <artifactId>javax.json</artifactId>

   <version>1.0.4</version>

</dependency>

<!-- NL API dependency -->

<dependency>

     <groupId>com.google.apis</groupId>

     <artifactId>google-api-services-language</artifactId>

     <version>v1beta1-rev7-1.22.0</version>

   </dependency>

   <dependency>

     <groupId>com.google.api-client</groupId>

     <artifactId>google-api-client</artifactId>

     <version>1.22.0</version>

   </dependency>


데이타 플로우 코드 작성

전체 파이프라인 흐름

파이프라인 코드 작성에 앞서서 전체 파이프라인 흐름을 살펴보자

전체 흐름은 다음과 같다.


  1. Read From PubSub
    PubSub의 “twitter” 큐에서 JSON 형태의 트위터 메세지를 읽는다.

  2. Parse Twitter
    트위터 JSON 메세지를 파싱한 후, 전체 메세지에서 트윗 메세지를 저장하고 있는 “text” 필드와 언어셋을 정의하고 있는 “lang” 필드만 추출한다.
    자연어 분석 API가 아직 영어, 스페인어, 일본어만 지원하기 때문에, 이 예제에서는 영어로 트윗만 추출하도록 한다.

  3. NL Processing
    앞에서 추출한 트윗 메세지를 구글의 자연어 분석 API에 분석을 요청하여 명사와 형용사만 추출해낸다.

  4. 명사 처리 파이프라인
    다양한 처리 방식을 보여주기 위해서, 이 예제에서는 하나의 데이타 스트림을 분기 처리하여 두개의 데이타 파이프라인에서 처리하는 방식으로 구현하였다. 명사 처리 파이프라인은 다음과 같은 단계를 거친다.

    1. Noun Filter
      명사와 형용사 리스트로 들어온 데이타 중에서 명사만 필터링 한다.

    2. Window 적용
      고정 크기 윈도우 (Fixed Window) 30초를 적용하여, 30초 단위로 데이타를 분석하도록 한다.

    3. Count.PerElement
      명사 단어와, 각 단어별 발생횟 수를 30초 단위로 모아서 카운트 한다.

    4. Noun Formating
      카운트된 결과를 빅쿼리에 쓰도록, [윈도우 시작 시간,명사 단어, 발생횟수] 형태의 빅쿼리 ROW(행) 데이타 타입으로 포매팅 한다.

    5. Write Noun Count to BQ
      포매팅 된 데이타를 빅쿼리에 쓴다.

  5. 형용사 처리 파이프라인
    형용사를 처리하는 파이프라인도 내용은 명사를 처리한 파이프라인과 다르지 않고 동일하게 다음과 같은 순서를 따른다.

    1. Adj Filter

    2. Window 적용

    3. Count.PerElement

    4. Adj Formating

    5. Write Adj Count to BQ

빅쿼리 데이타 구조

빅쿼리에는 두개의 테이블에 데이타를 나눠서 저장하였다.

명사와 형용사 테이블로 각각의 테이블 명과 구조는 다음과 같다.


명사 테이블 : noun

필드명

데이타 타입

date

TIMESTAMP

noun

STRING

count

INTEGER


형용사 테이블 : adj

필드명

데이타 타입

date

TIMESTAMP

adj

STRING

count

INTEGER

자연어 분석 클래스 작성

전체 데이타 흐름과 저장 구조가 이해되었으면, 파이프라인 코드 작성에 앞서서 자연어 처리 API를 호출하는 로직을 만들어보자


우리가 사용할 API는 String으로 문자열을 주면 다음과 같이 NLAnalyzeVO 객체로 분석 결과를 리턴해주는 코드이다.


package com.terry.nl;


import java.util.ArrayList;

import java.util.List;


public class NLAnalyzeVO {

List<String> nouns = new ArrayList<String>();

List<String> adjs = new ArrayList<String>();

List<String> emoticons = new ArrayList<String>();

float sentimental;


public List<String> getNouns() {

return nouns;

}


public List<String> getAdjs() {

return adjs;

}


public List<String> getEmoticons() {

return emoticons;

}


public float getSentimental() {

return sentimental;

}


public void setSentimental(float sentimental) {

this.sentimental = sentimental;

}

public void addNouns(String n){

nouns.add(n);

}

public void addAdj(String a){

adjs.add(a);

}

public void addEmoticons(String e){

emoticons.add(e);

}

}

<NLAnalyzeVO.java>


분석 결과로는 List<String> 타입으로 명사들의 목록을 nouns 로, 형용사들의 목록을 adj로 리턴해준다. float형으로 sentimental 이라는 필드에는 입력된 문장의 감정도를 리턴하도록 되어 있다. 음수값일 때는 부정적, 양수값일 경우에는 긍정을 의미한다.

VO안에는 List<String> emoticons 라는 필드가 있는데, 이는 트위터 메세지 내의 이모티콘을 추출하여 저장하기 위한 필드인데, 이 예제에서는 사용하지 않으니 신경 쓰지 않아도 된다.


package com.terry.nl;


import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;

import com.google.api.client.http.HttpRequest;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.json.JsonFactory;

import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPI.Documents.AnnotateText;

import com.google.api.services.language.v1beta1.CloudNaturalLanguageAPIScopes;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeEntitiesResponse;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentRequest;

import com.google.api.services.language.v1beta1.model.AnalyzeSentimentResponse;

import com.google.api.services.language.v1beta1.model.AnnotateTextRequest;

import com.google.api.services.language.v1beta1.model.AnnotateTextResponse;

import com.google.api.services.language.v1beta1.model.Document;

import com.google.api.services.language.v1beta1.model.Entity;

import com.google.api.services.language.v1beta1.model.Features;

import com.google.api.services.language.v1beta1.model.Sentiment;

import com.google.api.services.language.v1beta1.model.Token;


import java.io.IOException;

import java.io.PrintStream;

import java.security.GeneralSecurityException;

import java.util.List;

import java.util.Map;


/**

*

* Google Cloud NL API wrapper

*/



@SuppressWarnings("serial")

public class NLAnalyze {


public static NLAnalyze getInstance() throws IOException,GeneralSecurityException {


return new NLAnalyze(getLanguageService());

}


public NLAnalyzeVO analyze(String text) throws IOException, GeneralSecurityException{

Sentiment  s = analyzeSentiment(text);

List <Token> tokens = analyzeSyntax(text);

NLAnalyzeVO vo = new NLAnalyzeVO();


for(Token token:tokens){

String tag = token.getPartOfSpeech().getTag();

String word = token.getText().getContent();


if(tag.equals("NOUN")) vo.addNouns(word);

else if(tag.equals("ADJ")) vo.addAdj(word);

}


vo.setSentimental(s.getPolarity());


return vo;

}



/**

* Be sure to specify the name of your application. If the application name is {@code null} or

* blank, the application will log a warning. Suggested format is "MyCompany-ProductName/1.0".

*/

private static final String APPLICATION_NAME = "Google-LanguagAPISample/1.0";


/**

* Connects to the Natural Language API using Application Default Credentials.

*/

public static CloudNaturalLanguageAPI getLanguageService()

throws IOException, GeneralSecurityException {

GoogleCredential credential =

GoogleCredential.getApplicationDefault().createScoped(CloudNaturalLanguageAPIScopes.all());

JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

return new CloudNaturalLanguageAPI.Builder(

GoogleNetHttpTransport.newTrustedTransport(),

jsonFactory, new HttpRequestInitializer() {

@Override

public void initialize(HttpRequest request) throws IOException {

credential.initialize(request);

}

})

.setApplicationName(APPLICATION_NAME)

.build();

}


private final CloudNaturalLanguageAPI languageApi;


/**

* Constructs a {@link Analyze} which connects to the Cloud Natural Language API.

*/

public NLAnalyze(CloudNaturalLanguageAPI languageApi) {

this.languageApi = languageApi;

}


public List<Token> analyzeSyntax(String text) throws IOException{

AnnotateTextRequest request =

new AnnotateTextRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"))

.setFeatures(new Features().setExtractSyntax(true))

.setEncodingType("UTF16");

AnnotateText analyze =

languageApi.documents().annotateText(request);


AnnotateTextResponse response = analyze.execute();


return response.getTokens();


}

/**

* Gets {@link Sentiment} from the string {@code text}.

*/

public Sentiment analyzeSentiment(String text) throws IOException {

AnalyzeSentimentRequest request =

new AnalyzeSentimentRequest()

.setDocument(new Document().setContent(text).setType("PLAIN_TEXT"));

CloudNaturalLanguageAPI.Documents.AnalyzeSentiment analyze =

languageApi.documents().analyzeSentiment(request);


AnalyzeSentimentResponse response = analyze.execute();

return response.getDocumentSentiment();

}


}


<NLAnalyze.java>


코드 상의 주요 부분을 살펴보자

public NLAnalyzeVO analyze(String text)

메서느가 주요 메서드로, 트윗 문자열을 text 인자로 넘겨주면 분석 결과를 NLAnalyzeVO로 리턴한다.

이 메서드 안에서는 두개의 메서드를 호출하는데, analyzeSentiment(text) 와, analyzeSyntax(text)

를 두개 호출한다.

analyzeSentiment(text) 메서드는 text 를 넣으면 float 타입으로 감정도인 Sentinetal 지수를 리턴한다.

analyzeSyntax(text)는 구문을 분석하여, 명사,형용사,접속사,조사 등과 단어간의 의존 관계등을 분석해서 리턴해주는데, Token 이라는 데이타 타입의 리스트 형태로 다음과 같이 리턴한다.

List <Token> tokens = analyzeSyntax(text);


여기서 단어의 형(명사,형용사)는 token에서 tag 라는 필드를 통해서 리턴되는데, 우리가 필요한것은 명사와 형용사만 필요하기 때문에, tag가 NOUN (명사)와 ADJ (형용사)로 된 단어만 추출해서 NLAnalyzeVO 객체에 넣어서 리턴한다. (태그의 종류는 https://cloud.google.com/natural-language/reference/rest/v1beta1/documents/annotateText#Tag ) 를 참고하기 바란다.


중요

이 코드를 이용해서 구글 클라우드의 자연어 분석 API를 호출할때 그러면 API 인증은 어떻게 할까? 보통 구글 클라우드 콘솔에서 다운 받는 서비스 어카운트 키 (Service Account Key) JSON 파일을 사용하는데, 구글 자연어 분석 API를 호출하기 위해서도 서비스 어카운트 키가 필요하다.

이 키를 콘솔에서 다운로드 받은 후에, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 서비스 어카운트 키의 경로를 지정해주면 된다.


예) export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your-project-credentials.json


자연어 분석 클래스를 다 만들었으면 테스트 코드를 만들어서 테스트를 해보자.

다음은 JUnit 4.X를 이용한 간단한 테스트 코드 이다.


package com.terry.nl.test;


import static org.junit.Assert.*;


import java.io.IOException;

import java.security.GeneralSecurityException;

import java.util.List;


import org.junit.Test;


import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


public class NLAnalyzeTest {


@Test

public void test() {

try {

NLAnalyze instance = NLAnalyze.getInstance();

String text="Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'";

NLAnalyzeVO vo = instance.analyze(text);

List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();

System.out.println("### NOUNS");

for(String noun:nouns){

System.out.println(noun);

}

System.out.println("### ADJS");

for(String adj:adjs){

System.out.println(adj);

}

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("API call error");

} catch (GeneralSecurityException e) {

// TODO Auto-generated catch block

e.printStackTrace();

fail("Security exception");

}

}


}


"Larry Page, Google's co-founder, once described the 'perfect search engine' as something that 'understands exactly what you mean and gives you back exactly what you want.'" 문자열을 분석하여,  명사와 형용사를 추출하여 다음과 같이 결과를 출력해준다.

### NOUNS

Larry

Page

Google

co-founder

search

engine

something

### ADJS

perfect

파이프라인 코드 작성

이제 메인 파이프라인 개발을 위한 준비가 다 되었다. 이제 TwitterPipeline 이라는 이름으로 파이프라인을 구현해보자. 전체 코드는 다음과 같다.

package com.terry.dataflow;


import java.io.IOException;

import java.io.StringReader;

import java.security.GeneralSecurityException;

import java.util.ArrayList;

import java.util.List;


import javax.json.Json;

import javax.json.JsonObject;

import javax.json.JsonReader;


import org.joda.time.DateTime;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.joda.time.format.DateTimeFormat;

import org.joda.time.format.DateTimeFormatter;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.io.BigQueryIO;

import com.google.cloud.dataflow.sdk.io.PubsubIO;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;

import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;

import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;

import com.google.cloud.dataflow.sdk.transforms.windowing.Window;

import com.google.cloud.dataflow.sdk.values.KV;

import com.terry.nl.NLAnalyze;

import com.terry.nl.NLAnalyzeVO;


import com.google.cloud.dataflow.sdk.values.PCollection;


/**

* A starter example for writing Google Cloud Dataflow programs.

*

* <p>The example takes two strings, converts them to their upper-case

* representation and logs them.

*

* <p>To run this starter example locally using DirectPipelineRunner, just

* execute it without any additional parameters from your favorite development

* environment.

*

* <p>To run this starter example using managed resource in Google Cloud

* Platform, you should specify the following command-line options:

*   --project=<YOUR_PROJECT_ID>

*   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>

*   --runner=BlockingDataflowPipelineRunner

*/

public class TwitterPipeline {

private static final Logger LOG = LoggerFactory.getLogger(TwitterPipeline.class);

private static final String NOWN_TABLE=

"useful-hour-138023:twitter.noun";

private static final String ADJ_TABLE=

"useful-hour-138023:twitter.adj";


// Read Twitter feed as a JSON format

// extract twitt feed string and pass into next pipeline

static class ParseTwitterFeedDoFn extends DoFn<String,String>{


private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}


// Parse Twitter string into

// - list of nouns

// - list of adj

// - list of emoticon


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{ /**

*

*/

private static final long serialVersionUID = 3013780586389810713L;


// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}



static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}

}

}


static class AddTimeStampNoun extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("noun", key)

.set("count", value);


c.output(row);

}


}


static class AddTimeStampAdj extends DoFn<KV<String,Long>,TableRow>

implements com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess

{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey(); // get Word

Long value = c.element().getValue();// get count of the word

IntervalWindow w = (IntervalWindow) c.window();

Instant s = w.start();

DateTime sTime = s.toDateTime(org.joda.time.DateTimeZone.forID("Asia/Seoul"));

DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

String str_stime = sTime.toString(dtf);


TableRow row =  new TableRow()

.set("date", str_stime)

.set("adj", key)

.set("count", value);


c.output(row);

}


}

static class AdjFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("ADJ")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

c.output(value);

}

}

}


static class Echo extends DoFn<KV<String,Iterable<String>>,Void>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

}

}


}

public static void main(String[] args) {

Pipeline p = Pipeline.create(

PipelineOptionsFactory.fromArgs(args).withValidation().create());


@SuppressWarnings("unchecked")

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));



// Noun handling sub-pipeline

List<TableFieldSchema> fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("noun").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

TableSchema schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));


// Adj handling sub-pipeline

fields = new ArrayList<>();

fields.add(new TableFieldSchema().setName("date").setType("TIMESTAMP"));

fields.add(new TableFieldSchema().setName("adj").setType("STRING"));

fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));

schema = new TableSchema().setFields(fields);


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Adj Formating").of(new AddTimeStampAdj()) )

.apply(BigQueryIO.Write

.named("Write Adj Count to BQ")

.to( ADJ_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));





p.run();

}


}

<TwitterPipeline.java>

코드를 하나씩 분석해보자.

먼저 main함수 부분을 보자

PCollection <KV<String,Iterable<String>>> nlprocessed

=  (PCollection<KV<String,Iterable<String>>>) p.apply(PubsubIO.Read.named("ReadFromPubSub").topic("projects/useful-hour-138023/topics/twitter"))

.apply(ParDo.named("Parse Twitter").of(new ParseTwitterFeedDoFn()))

.apply(ParDo.named("NL Processing").of(new NLAnalyticsDoFn()));

<TwitterPipeline.java에서 main() 함수 일부 >


파이프 라인이 시작되면, PubSubIO를 이용하여 “projects/useful-hour-138023/topics/twitter” 이름의 큐에서 데이타를 읽는다. 읽은 데이타는 ParseTwitterFeedDoFn() 라는 함수에서 파싱이 된다.

ParseTwitterFeedDoFn() 은 다음과 같다.


static class ParseTwitterFeedDoFn extends DoFn<String,String>{

private static final long serialVersionUID = 3644510088969272245L;


@Override

public void processElement(ProcessContext c){

String text = null;

String lang = null;

try {

JsonReader reader = Json.createReader(new StringReader(c.element()));

JsonObject json = reader.readObject();

text = (String) json.getString("text");

lang = (String) json.getString("lang");


if(lang.equals("en")){

c.output(text.toLowerCase());

}


} catch (Exception e) {

LOG.debug("No text element");

LOG.debug("original message is :" + c.element());

}  

}

}

<TwitterPipeline.java 에서 ParseTwitterFeedDoFn 클래스 구현부>


PubSub에서 읽어드린 데이타는 문자열로 안에 JSON 데이타를 가지고 있다. 이 JSON 문자열을 파싱해서 “text”와 “lang” 엘리먼트만 추출한 후에, “lang”이 “en”(영어) 인 경우에만 다음 파이프라인으로 “text”에서 추출한 문자열을 보내고, 영어가 아닌 경우에는 데이타를 무시한다.


다음은 NLAnalyticsDoFn에서 트윗 문자열을 받아서 자연어 분석을 한다.


static class NLAnalyticsDoFn extends DoFn<String,KV<String,Iterable<String>>>{

// return list of NOUN,ADJ,Emoticon

@Override

public void processElement(ProcessContext c) throws IOException, GeneralSecurityException{

String text = (String)c.element();


NLAnalyze nl = NLAnalyze.getInstance();

NLAnalyzeVO vo = nl.analyze(text);


List<String> nouns = vo.getNouns();

List<String> adjs = vo.getAdjs();


KV<String,Iterable<String>> kv_noun=  KV.of("NOUN", (Iterable<String>)nouns);

KV<String,Iterable<String>> kv_adj =  KV.of("ADJ", (Iterable<String>)adjs);


c.output(kv_noun);

c.output(kv_adj);

}


}

<TwitterPipeline.java 에서 NLAnalyticsDoFn 클래스 구현부>


앞에서 작성한 자연어 분석 클래스인 NLAnalyze 클래스를 이용하여 text를 넘기고, 리턴 값으로 NLAnalyzeVO를 리턴 값으로 받은 후, 명사는 KV<String,Iterable<String>> 타입으로 다음과 같을 저장해서 c.output을 이용해서 다음 파이프라인으로 넘기고

“NOUN”

명사1,명사2,명사3,...


마찬가지 방법으로 형용사도 같은 데이타 형인 KV<String,Iterable<String>> 타입으로 저장하여 다음 파이프라인으로 넘긴다.


이 데이타를 각각 명사와 형용사 두개의 처리 파이프라인으로 전달하는데, 두개의 파이프라인으로 단일 데이타를 보내는 방법은 다음과 같다.


nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

: (중략)


nlprocessed.apply(ParDo.named("AdjFilter").of(new AdjFilter()))

: (중략)

nlprocessed는 PCollection 타입으로, NLAnalyticsDoFn에 의해서 처리된 결과이다.

이 결과 값에 두 개의 각각 다른 트랜스폼 (NounFilter와 AdjFilter)를 적용하였다.

이렇게 하나의 PCollection 값에 두 개의 트랜스폼을 각각 적용하면 적용된 각각의 파이프라인은 다른 파이프라인으로 아래 그림 처럼 분기 처리가 된다.


자아 그러면, 명사 처리 파이프라인 흐름을 따라가 보자

nlprocessed.apply(ParDo.named("NounFilter").of(new NounFilter()))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30))))

.apply(Count.<String>perElement())

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

< TwitterPipeline.java의 main() 함수 일부>


첫번째 NounFilter에서는 앞에 파이프라인에서 들어온 명사와 형용사중에서 명사만 필터링 해서 다음 파이프라인으로 전달한다.


static class NounFilter extends DoFn<KV<String,Iterable<String>>,String>{

@Override

public void processElement(ProcessContext c) {

String key = c.element().getKey();

if(!key.equals("NOUN")) return;

List<String> values = (List<String>) c.element().getValue();

for(String value:values){

// Filtering #

if(value.equals("#")) continue;

else if(value.startsWith("http")) continue;

c.output(value);

}


}

}

<TwitterPipeline.java 에서 NounFilter 클래스 구현부>

명사 인지 형용사 인지는 앞에서 넘어오는 데이타 형이 KV<String, .. > 인데, 키 부분의 값이 “NOUN” 일 경우에 명사이기 때문에, 이 값이 아니면 무시한다. 명사인경우에도 종종 쓰레기 값이 들어오는데, 예를 들어 트위터 특성상 해쉬 태그등을 위해서 “#”이 사용되고, 링크를 위해서 “http…” 링크가 들어가기도 하는데 이는 명사가 아니기 때문에 이 내용은 모두 필터링해서 무시한다.


이렇게 정재된 데이타는 파이프라인의 다음 단계인 .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))) 를 통해서 30초 단위의 고정 윈도우가 적용되고, 다음  .apply(Count.<String>perElement()) 을 통해서 단어별로 그룹핑되서 카운트 되고 그 결과는 앞서 적용한 30초 윈도우 시간 단위로 다음 파이프 라인으로 전달된다.  전달되는 데이타의 모양은 대략 다음과 같다.

Key (String)

Value (Long)

airplane

100

boy

29

india

92


이렇게 전달된 데이타는 빅쿼리에 저장하기 위해서 빅쿼리의 ROW 데이타 타입은 TableRow로 변환한다.

.apply(ParDo.named("Noun Formating").of(new AddTimeStampNoun()) )

.apply(BigQueryIO.Write

.named("Write Noun Count to BQ")

.to( NOWN_TABLE)

.withSchema(schema)