블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


Machine Learning Pipeline


조대협 (http://bcho.tistory.com)

대부분 모델 개발과 알고리즘에 집중

머신러닝을 공부하고 나서는 주로 통계학이나, 모델 자체에 많은 공부를 하는 노력을 드렸었다. 선형대수나 미적분 그리고 방정식에 까지 기본으로 돌아가려고 노력을 했었고, 그 중간에 많은 한계에도 부딪혔지만, 김성훈 교수님의 모두를 위한 딥러닝 강의를 접하고 나서, 수학적인 지식도 중요하지만 수학적인 깊은 지식이 없어도 모델 자체를 이해하고 근래에 발전된 머신러닝 개발 프레임웍을 이용하면 모델 개발이 가능하다는 것을 깨달았다.

 

계속해서 모델을 공부하고, 머신러닝을 공부하는 분들을 관심있게 지켜보고 실제 머신러닝을 사용하는 업무들을 살펴보니 재미있는 점이 모두 모델 자체 개발에만 집중한다는 것이다. 커뮤니티에 올라오는 글의 대부분은 어떻게 모델을 구현하는지 어떤 알고리즘을 사용하는지에 대한 내용들이 많았고, 실 업무에 적용하는 분들을 보면 많은 곳들이 R을 이용하여 데이타를 분석하고 모델링을 하는데, 데이타를 CSV 파일 형태로 다운 받아서 정재하고 데이타를 분석하고 모델을 개발하는 곳이 많은 것을 보았다. 데이타의 수집 및 전처리 및 개발된 모델에 대한 서비스에 대해서는 상대적으로 많은 정보를 접하지 못했는데, 예상하기로 대부분 모델 개발에 집중하기 때문이 아닌가 싶다.

 

엔지니어 백그라운드를 가진 나로써는 CSV로 데이타를 끌어다가 정재하고 분석하는 것이 매우 불편해 보이고 이해가 되지 않았다. 빅데이타 분석 시스템에 바로 연결을 하면, CSV로 덤프 받고 업로드 하는 시간등에 대한 고민이 없을텐데.” 왜 그렇게 할까 ?”라는 의문이 계속 생기기 시작하였다.

미니 프로젝트를 시작하다

이런 의문을 가지던중 CNN 네트워크 모델에 대한 대략적인 학습이 끝나고, 실제로 적용하면서 경험을 쌓아보기로 하였다. 그래서 얼굴 인식 모델 개발을 시작하였다. CNN 모델이라는 마법을 사용하면 쉽게 개발이 될줄 알았던 프로젝트가 벌써 몇달이 되어 간다. 학습용 데이타를 구하고, 이를 학습에 적절하도록 전처리 하는 과정에서 많은 실수가 있었고, 그 과정에서 많은 재시도가 있었다.

 

(자세한 내용은 http://bcho.tistory.com/1174 , https://www.slideshare.net/Byungwook/ss-76098082 를 참조)

 

특히나 데이타 자체를 다시 처리해야 하는 일이 많았기 때문에, 데이타 전처리 코드를 지속적으로 개선하였고 개선된 코드를 이용하여 데이타를 지속적으로 다시 처리해서 데이타의 품질을 높여나갔는데, 처리 시간이 계속해서 많이 걸렸다.

자동화와 스케일링의 필요성

특히 이미지 전처리 부분은 사진에서 얼굴이 하나만 있는 사진을 골라내고 얼굴의 각도와 선글라스 유무등을 확인한후 사용 가능한 사진에서 얼굴을 크롭핑하고 학습용 크기로 리사이즈 하는 코드였는데 (자세한 내용 http://bcho.tistory.com/1176) 싱글 쓰레드로 만들다 보니 아무래도 시간이 많이 걸렸다. 실제 운영환경에서는 멀티 쓰레드 또는 멀티 서버를 이용하여 스케일링을 할 필요가 있다고 느꼈다.

 

또한 이미지 수집에서 부터 필터링, 그리고 학습 및 학습된 모델의 배포와 서비스 까지 이 전 과정을 순차적으로 진행을 하되 반복적인 작업이기 때문에 자동화할 필요성이 있다고 생각했다.

아이 체중 예측 모델을 통한 파이프라인에 대한 이해

그러던 중에 팀 동료로 부터 좋은 예제 하나를 전달 받게 되었다.

미국 아기들의 환경에 따른 출생 체중을 예측하는 간단한 선형 회귀 모델을 구현한 파이썬 노트북인데 (https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/blogs/babyweight/babyweight.ipynb) 하나의 노트북에 전체 단계를 모두 구현해놓았다.

 


 

데이타에 대한 분석을 통한 데이타 특성 추출, 추출된 특성을 통한 모델 개발, 모델 학습을 위한 데이타 전처리 그리고 학습 및 학습된 모델을 통한 예측 서비스 까지 모든 과정을 하나의 노트북에 구현해놓았다.

(시간이 있으면 꼭 보기를 강력 추천한다.)

 

흥미로운 점이 데이타 전처리를 Apache Beam이라는 데이타 처리 플랫폼을 썼고, 그 전처리 코드를 파이썬 노트북에 하나로 다 정리한것이다. (실제로 수행은 로컬에서도 가능하지만, 클라우드에서도 실행이 가능해서 충분한 스케일링을 지원한다.)

 

Apache Beam의 구글의 빅데이타 분석 프레임웍으로 Apache Spark 과 같은 프레임웍이라고 보면된다. Google Dataflow라는 이름으로 구글 클라우드에서 서비스가 되는데, Apache Beam이라는 오픈소스로 공개가 되었다. ( http://bcho.tistory.com/1123 http://bcho.tistory.com/1122 http://bcho.tistory.com/1124 )

 

아 이렇게 하는구나 하는 생각이 들었고, 그즘 실무에서 이와 같은 흐름으로 실제로 머신러닝을 수행하는 것을 볼 기회가 있었다.

데이타 전처리를 스케일링하다.

서비스가 가능한 수준의 전체 머신러닝 서비스 파이프라인을 만들어보고 싶어졌다. 마침 또 Apache Beam의 경우에는 예전에 Java 코드로 실시간 분석을 해본 경험이 있고 이번에 2.0 버전이 릴리즈 되서 이번에는 2.0에서 파이썬을 공부해보기로 하고 개발에 들어갔다.

 

특히 기존의 데이타 전처리 코드는 싱글 쓰레드로 돌기 때문에 스케일링에 문제가 있었지만, Apache Beam을 사용할 경우 멀티 쓰레드 뿐만 아니라 동시에 여러대의 머신에서 돌릴 수 있고 이러한 병렬성에 대해서는 크게 고민을 하지 않아도 Apache Beam이 이 기능을 다 제공해준다. 또한 이 데이타 전처리 코드를 돌릴 런타임도 별도 설치할 필요가 없이 커멘드 하나로 구글 클라우드에서 돌릴 수 가 있다. (직업 특성상 클라우드 자원을 비교적 자유롭게 사용할 수 있었다.)

 

Apache Beam으로 전처리 코드를 컨버팅 한결과 기존 싱글 쓰레드 파이썬 코드가 400~500장의 이미지 전처리에 1~2시간이 걸렸던 반면, 전환후에 대략 15~17분이면 끝낼 수 있었다. 전처리 중에는 서버의 대수가 1대에서 시작해서 부하가 많아지자 자동으로 5대까지 늘어났다. 이제는 아무리 많은 데이타가 들어오더라도 서버의 대수만 단순하게 늘리면 수분~수십분내에 수십,수만장의 데이타 처리가 가능하게 되었다.


<그림. Apache Beam 기반의 이미지 전처리 시스템 실행 화면 >

 

Apache Beam 기반의 이미지 전처리 코드는 https://github.com/bwcho75/facerecognition/blob/master/Preprocess%2Bface%2Brecognition%2Bdata%2Band%2Bgenerate%2Btraining%2Bdata.ipynb 에 공개해 놨다.

 

머신러닝 파이프라인 아키텍쳐와 프로세스

이번 과정을 통해서 머신러닝의 학습 및 예측 시스템 개발이 어느 정도 정형화된 프로세스화가 가능하고 시스템 역시 비슷한 패턴의 아키텍쳐를 사용할 수 있지 않을까 하는 생각이 들었고, 그 내용을 아래와 같이 정리한다.

파이프라인 개발 프로세스

지금까지 경험한 머신러닝 개발 프로세스는 다음과 같다.

 

  1. 데이타 분석
    먼저 머신러닝에 사용할 전체 데이타셋을 분석한다. 그래프도 그려보고 각 변수간의 연관 관계나 분포도를 분석하여, 학습에 사용할 변수를 정의하고 어떤 모델을 사용할지 판단한다.

  2. 모델 정의
    분석된 데이타를 기반으로 모델을 정의하고, 일부 데이타를 샘플링하여 모델을 돌려보고 유효한 모델인지를 체크한다. 모델이 유효하지 않다면 변수와 모델을 바꿔 가면서 최적의 모델을 찾는다.

  3. 데이타 추출 및 전처리
    유효한 모델이 개발이 되면, 일부 데이타가 아니라 전체 데이타를 가지고 학습을 한다. 전체 데이타를 추출해서 모델에 넣어서 학습을 하려면 데이타의 크기가 크면 매번 매뉴얼로 하기가 어렵기 때문에 데이타 추출 및 전처리 부분을 자동화 한다.   

  4. 전체 데이타를 이용한 반복 학습 및 튜닝
    모델 자체가 유효하다고 하더라도 전체 데이타를 가지고 학습 및 검증을 한것이 아니기 때문에 의외의 데이타가 나오거나 전처리에 의해서 필터링되지 않은 데이타가 있을 수 있기 때문에 지속적으로 데이타 추출 및 전처리 모듈을 수정해야 하고, 마찬가지로 모델 역시 정확도를 높이기 위해서 지속적으로 튜닝을 한다. 이 과정에서 전체 데이타를 다루기 때문에 모델 역시 성능을 위해서 분산형 구조로 개선되어야 한다.

  5. 모델 배포
    학습 모델이 완성되었으면 학습된 모델을 가지고 예측을 할 수 있는 시스템을 개발하고 이를 배포한다.

  6. 파이프라인 연결 및 자동화
    머신러닝의 모델은 위의 과정을 통해서 만들었지만, 데이타가 앞으로도 지속적으로 들어올 것이고 지속적인 개선이 필요하기 때문에 이 전과정을 자동화 한다. 이때 중요한것은 데이타 전처리, 학습, 튜닝, 배포등의 각 과정을 물 흐르듯이 연결하고 자동화를 해야 하는데 이렇게 데이타를 흐르는 길을 데이타 플로우라고 한다. (흔히 Luigi, Rundeck, Airflow와 같은 데이타플로우 오케스트레이션 툴을 이용한다)

 

전체적인 프로세스에 대해서 좋은 영상이 있어서 공유한다.


아키텍쳐

위의 프로세스를 기반으로한 머신러닝 파이프라인 아키텍쳐 는 다음과 같다.


 

 

Inputs

머신 러닝 파이프라인의 가장 처음단은 데이타를 수집하고 이 수집된 데이타를 저장하는 부분이다.

데이타 수집은 시간,일,주,월과 같이 주기적으로 데이타를 수집하는 배치 프로세싱과, 실시간으로 데이타를 수집하는 리얼타임 프로세싱 두가지로 나뉘어 진다. 이 두 파이프라인을 통해서 데이타 소스로 부터 데이타를 수집하고 필터링하고 정재하여, 데이타 레이크에 저장한다. 이 구조는 일반적인 빅데이타 분석 시스템의 구조와 유사하다. (참고 자료 http://bcho.tistory.com/984 http://bcho.tistory.com/671 )

 

개인적으로 머신러닝을 위해서 중요한 부분 중 하나는 데이타 레이크를 얼마나 잘 구축하느냐이다. 데이타 레이크는 모든 데이타가 모여 있는 곳으로 보통 데이타 레이크를 구축할때는 많은 데이타를 모으는 데만 집중하는데, 데이타를 잘 모으는 것은 기본이고 가장 중요한 점은 이 모여 있는 데이타에 대한 접근성을 제공하는 것이다.

 

무슨 이야기인가 하면, 보통 머신러닝 학습을 위해서 학습 데이타를 받거나 또는 데이타에 대한 연관성 분석등을 하기 위해서는 데이타 레이크에서 데이타를 꺼내오는데, 데이타 레이크를 개발 운영 하는 사람과 데이타를 분석하고 머신러닝 모델을 만드는 사람은 보통 다르기 때문에, 모델을 만드는 사람이 데이타 레이크를 운영하는 사람에게 “무슨 무슨 데이타를 뽑아서 CSV로 전달해 주세요.” 라고 이야기 하는 것이 보통이다. 그런데 이 과정이 번거롭기도 하고 시간이 많이 걸린다.

가장 이상적인 방법은 데이타를 분석하고 모델링 하는 사람이 데이타 레이크 운영팀에 부탁하지 않고서도 손쉽고 빠르게 데이타에 접근해서 데이타를 읽어오고 분석을 할 수 있어야 한다.

직업 특성상 구글의 빅쿼리를 많이 접하게 되는데, 빅쿼리는 대용량 데이타를 저장할 수 있을 뿐만 아니라 파이썬 노트북이나 R 스튜디오 플러그인을 통해서 바로 데이타를 불러와서 분석할 수 있다.  


<그림 INPUT 계층의 빅데이타 저장 분석 아키텍쳐>

Pre processing & Asset creation

Pre processing은 수집한 데이타를 학습 시스템에 넣기 위해서 적절한 데이타만 필터링하고 맞는 포맷으로 바꾸는 작업을 한다. 작은 모델이나 개발등에서는 샘플링된 데이타를 로컬에서 내려 받아서 R이나 numpy/pandas등으로 작업이 가능하지만, 데이타가 수테라에서 수백테라이상이 되는 빅데이타라면 로컬에서는 작업이 불가능하기 때문에, 데이타 전처리 컴포넌트를 만들어야 한다.

일반적으로 빅데이타 분석에서 사용되는 기술을 사용하면 되는데, 배치성 전처리는 하둡이나 스파크와 같은 기술이 보편적으로 사용되고 실시간 스트리밍 분석은 스파크 스트리밍등이 사용된다.


Train

학습은 전처리된 데이타를 시스템에 넣어서 모델을 학습 시키는 단계이다. 이 부분에서 생각해야 할점은 첫번째는 성능 두번째는 튜닝이다. 성능 부분에서는 GPU등을 이용하여 학습속도를 늘리고 여러대의 머신을 연결하여 학습을 할 수 있는 병렬성이 필요하다. 작은 모델의 경우에는 수시간에서 하루 이틀 정도 소요되겠지만 모델이 크면 한달 이상이 걸리기 때문에 고성능 하드웨어와 병렬 처리를 통해서 학습 시간을 줄이는 접근이 필요하다. 작은 모델의 경우에는 NVIDIA GPU를 데스크탑에 장착해놓고 로컬에서 돌리는 것이 가성비 적으로 유리하고, 큰 모델을 돌리거나 동시에 여러 모델을 학습하고자 할때는 클라우드를 사용하는 것이 절대 적으로 유리하다 특히 구글 클라우드의 경우에는  알파고에서 사용된 GPU의 다음 세대인 TPU (텐서플로우 전용 딥러닝 CPU)를 제공한다. https://cloud.google.com/tpu/ CPU나 GPU대비 최대 15~30배 정도의 성능 차이가 난다.

 

 

학습 단계에서는 세부 변수를 튜닝할 필요가 있는데, 예를 들어 학습 속도나 뉴럴 네트워크의 폭이나 깊이, 드롭 아웃의 수, 컨볼루셔널 필터의 크기등등이 있다. 이러한 변수들을 하이퍼 패러미터라고 하는데, 학습 과정에서 모델의 정확도를 높이기 위해서 이러한 변수들을 자동으로 튜닝할 수 있는 아키텍쳐를 가지는 것이 좋다.

 

텐서플로우등과 같은 머신러닝 전용 프레임웍을 사용하여 직접 모델을 구현하는 방법도 있지만, 모델의 난이도가 그리 높지 않다면 SparkML등과 같이 미리 구현된 모델의 런타임을 사용하는 방법도 있다.

Predict

Predict에서는 학습된 모델을 이용하여 예측 기능을 서비스 하는데, 텐서플로우에서는  Tensorflow Serv를 사용하면 되지만, Tensorflow Serv의 경우에는 bazel 빌드를 이용하여 환경을 구축해야 하고, 대규모 서비를 이용한 분산 환경 서비스를 따로 개발해야 한다. 거기다가 인터페이스가 gRPC이다. (귀찮다.)

구글 CloudML의 경우에는 별도의 빌드등도 필요 없고 텐서 플로우 모델만 배포하면 대규모 서비스를 할 수 있는 런타임이 바로 제공되고 무엇보다 gRPC 인터페이스뿐만 아니라 HTTP/REST 인터페이스를 제공한다. 만약에 Production에서 머신러닝 모델을 서비스하고자 한다면 구글 CloudML을 고려해보기를 권장한다.

Dataflow Orchestration

이 전과정을 서로 유기적으로 묶어 주는 것을 Dataflow Orchestration이라고 한다.

예를 들어 하루에 한번씩 이 파이프라인을 실행하도록 하고, 파이프라인에서 데이타 전처리 과정을 수행하고, 이 과정이 끝나면 자동으로 학습을 진행하고 학습 정확도가 정해진 수준을 넘으면 자동으로 학습된 모델은 서비스 시스템에 배포하는 이 일련의 과정을 자동으로 순차적으로 수행할 수 있도록 엮어 주는 과정이다.

airbnb에서 개발한 Airflow나 luigi 등의 솔루션을 사용하면 된다.

아직도 갈길은 멀다.

얼굴 인식이라는 간단한 모델을 개발하고 있지만, 전체를 자동화 하고, 클라우드 컴퓨팅을 통해서 학습 시간을 단축 시키고 예측 서비스를 할 수 있는 컴포넌트를 개발해야 하고, 향후에는 하이퍼 패러미터 튜닝을 자동으로 할 수 있는 수준까지 가보려고 한다. 그 후에는 GAN을 통한 얼굴 합성들도 도전하려고 하는데, node.js 공부하는데도 1~2년을 투자한후에나 조금이나마 이해할 수 있게 되었는데, 머신러닝을 시작한지 이제 대략 8개월 정도. 길게 보고 해야 하겠다.

 



 

 

저작자 표시 비영리
신고


구글 데이타 스트리밍 데이타 분석 플랫폼 dataflow - #1 소개


조대협 (http://bcho.tistory.com)


실시간 데이타 처리에서는 들어오는 데이타를 바로 읽어서 처리 하는 스트리밍 프레임웍이 대세인데, 대표적인 프레임웍으로는 Aapche Spark등을 들 수 있다. 구글의 DataFlow는 구글 내부의 스트리밍 프레임웍을 Apache Beam이라는 형태의 오픈소스로 공개하고 이를 실행하기 위한 런타임을 구글 클라우드의 DataFlow라는 이름으로 제공하고 있는 서비스이다.


스트리밍 프레임웍 중에서 Apache Spark 보다 한 단계 앞선 개념을 가지고 있는 다음 세대의 스트리밍 프레임웍으로 생각할 수 있다. Apache Flink 역시 유사한 개념을 가지면서 Apache Spark의 다음 세대로 소개 되는데, 이번글에서는 이 DataFlow에 대한 전체적인 개념과 프로그래밍 모델등에 대해서 설명하고자 한다.  스트리밍 데이타 처리에 대한 개념은 http://bcho.tistory.com/1119 글을 참고하기 바란다.

소개

dataflow에 대해서 이해하기 위해서 프로그래밍 모델을 먼저 이해해야 하는데, dataflow의 프로그래밍 모델은 얼마전에 Apache에 Beam이라는 오픈 소스 프로젝트로 기증 되었다. Apache Spark이나, Apache Flink와 유사한 스트리밍 처리 프레임웍이라고 생각하면 된다. dataflow는 이 Apache beam의 프로그래밍 모델을 실행할 수 있는 런타임 엔진이라고 생각하면 된다. 예를 들어 Apache beam으로 짠 코드를 Servlet이나 Spring 코드라고 생각하면, dataflow는 이를 실행하기 위한 Tomcat,Jetty,JBoss와 같은 런타임의 개념이다.


런타임

Apache Beam으로 작성된 코드는 여러개의 런타임에서 동작할 수 있다. 구글 클라우드의 Dataflow 서비스에서 돌릴 수 도 있고, Apache Flink나 Apache Spark 클러스터 위에서도 그 코드를 실행할 수 있으며, 로컬에서는 Direct Pipeline이라는 Runner를 이용해서 실행이 가능하다.


여러 런타임이 있지만 구글 클라우드의 Dataflow 런타임을 사용하면 다음과 같은 장점이 있다.


매니지드 서비스로 설정과 운영이 필요 없다.

스트리밍 처리는 하나의 노드에서 수행되는 것이 아니라, 여러개의 노드에서 동시에 수행이 되기 때문에, 이 환경을 설치하고 유지 보수 하는 것만 해도 많은 노력이 들지만, Dataflow는 클라우드 서비스이기 때문에 별도의 설치나 운영이 필요없고, 작성한 코드를 올려서 실행 하기만 하면 된다.

Apache Spark등을 운영해본 사람들은 알겠지만, Spark 코드를 만드는 것 이외에도, Spark 클러스터를 설치하고 운영 하는 것 자체가 일이기 때문에, 개발에 집중할 시간이 줄어든다.

오토 스케일링을 지원하기 때문에, 필요한 만큼 컴퓨팅 자원을 끌어다가 빠르게 연산을 끝낼 수 있다.

클라우드 컴퓨팅의 장점은 무한한 자원을 이용하여, 워크로드에 따라서 자원을 탄력적으로 배치가 가능한 것인데, Dataflow 역시, 이러한 클라우드의 장점을 이용하여, 들어오는 데이타량이나 처리 부하에 따라서 자동을 오토 스케일링이 가능하다.


그림처럼 오전에 800 QPS (Query per second)의 처리를 하다가 12시경에 부하가 5000 QPS로 늘어나면 그만한 양의 리소스 (컴퓨팅)를 더 투여해서 늘어나는 부하에 따라서 탄력적으로 대응이 가능하다.

리밸런싱(Rebalancing)기능을 이용하여 작업을 골고루 분배가 가능하다.

Spark이나 Hadoop Map & Reduce와 같은 대용량 분산 처리 시스템의 경우 문제가 특정 노드의 연산이 늦게 끝나서 전체 연산이 늦게 끝나는 경우가 많다. 예를 들어 1000개의 데이타를 10개씩 100개의 노드에서 분산하여 처리를 한후 그 결과를 모두 모아서 합치는 연산이 있다고 할때, 1~2개의 노드가 연산이 늦게 끝나더라도 그 결과가 있어야 전체 값을 합칠 수 있기 때문에, 다른 노드의 연산이 끝나도 다른 노드들은 기다려야 하고 전체 연산 시간이 느려 진다.


Dataflow의 경우는 이런 문제를 해결 하기 위해서, 리밸런싱(rebalancing)이라는 메카니즘을 발생하는데, 위의 그림(좌측의 그래프는 각 노드의 연산 시간이다.) 과 같이 특정 노드의 연산이 느려진 경우, 느려진 노드의 데이타를 다른 연산이 끝난 노드로 나눠서 재 배치하여 아래와 같이 전체 연산 시간을 줄일 수 있다.




저작자 표시 비영리
신고


데이타 스트리밍 처리에 대한 이해


조대협 (http://bcho.tistory.com)


근래에 Apache Beam 프로젝트를 공부하게 되서, 그간 묵혀놨던 데이타 스트리밍 처리에 대해서 다시 정리중인데, 예전에 Apache Storm을 봤을때 보다 트리거나, 윈도우등 많은 개념들이 들어가 있어서 데이타 스트리밍에 대한 개념 부터 다시 정리를 시작을 하고자한다.


Apache Storm에서 부터, Apache Spark 기반의 데이타 스트림 처리뿐 아니라 근래에는 Apache Flink와 같은 새로운 스트리밍 프레임웍크과 구글이 이미 클라우드를 통해서 서비스 하고 있는  google cloud dataflow (Apache Beam이라는 프로젝트로 오픈소스화 되었고, 현재 인큐베이션 단계에 있다.) 까지 빅데이타에 대한 실시간 처리성이 강조되면서 근래에 데이타 스트리밍 처리가 다시 주목 받는 것 같다. 이 문서는 구글이 개발한 dataflow에 대한 개념을 이해하기 위함이다.


본 문서의 내용과 그림은 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 를 참고하였다.


사전 개념 이해

스트리밍 데이타 처리를 이해하기 위해서는 몇몇 용어와 개념을 사전에 이해해야 하는 부분이 있다.

Bounded data 와 Unbounded data

먼저 스트리밍 데이타 처리를 이해하려면 데이타의 종류에 대해서 먼저 이해해야 한다.

  • Unbounded data 는 데이타의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이타라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이타를 Unbounded data 라고 한다.

  • Bounded data는 데이타가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이타를 뜻한다. 1월의 정산 데이타.

Event time과 Processing time

데이타의 발생 시간과 시스템에서 처리되는 시간이 차이가 있는데, 이를 각각 Event time과 Processing time 이라고 정의한다.

예를 들어, 게임에서 사용자가 공격을 한 이벤트를 서버에 전달해서 처리하여 저장하는 시나리오가 있다고 가정할때, 공격 이벤트가 1:00:00에 발생했으면, 이 데이타가 네트워크를 타고 서버로 도달하여 프로그램 로직을 수행하고 저장하는데 소요된 시간을 2초라고 가정하면, Event time 은 1:00가 되고, Processing time은 1:00:02가 된다.

이상적으로는 Event time과 Processing time이 동일하면 좋겠지만, 네트워크 시간이나 처리 시간에 따라 Processing time이 Event time 보다 늦고, 또한 Processing time에서 소요되는 실제 처리 시간은 일정하지 않고 아래 그림의 파란색 그래프(실제 처리 그래프) 처럼 들쭉 날쭉하다. 네트워크 상황이나, 서버의 CPU, IO 상황이 그때마다 차이가 나기 때문이다.


아래 그림을 통해서 개념을 다시 정리해보면,

X축은 Event time, Y축은 Processing Time이다. 0초에 발생한 데이타가 서버에 도착해서 처리하는 시간이 소요 되기 때문에, 아래 그림과 같이 Processing Time은 2초 부터 시작한다. Skew는 Event time과 Processing time간의 간격이다. 아래 그림에서 보면, Processing time에서 3초때에는 Event time 1초에서 발생한 데이타를 처리하고 있는데, 실제 Event time에서는 3초 시간의 데이타가 발생하고 있기 때문에, Processing time과 Event time은 약 2초의 지연이 발생하고 있고, 이를 Skew 라고 한다.



Bounded data의 처리

Bounded data는 이미 저장되어 있는 데이타를 처리하는 것이기 때문에 별다른 처리 패턴이 필요없다



데이타를 읽어서 한번에 처리해서 저장 하면 된다.

UnBounded data 처리

복잡한 것은 스트리밍 데이타 즉, Unbounded data 를 처리하는 방법인데, Unbounded data 는 크게 Batch와 Streaming 두 가지 방식으로 처리할 수 있다.

Batch로 처리

배치로 Unbounded data를 처리 하는 방식은 아래와 같이 두 가지 방식이 있다.

Fixed Windows

Fixed Windows 방식은 스트리밍으로 들어오는 데이타를 일정 시간 단위로 모은 후, 배치로 처리 하는 방식이다. 예를 들어서 아래 그림과 같이 10~11시 까지 데이타를 수집한후, 11시 이후에, 10~11시까지 들어온 데이타를 처리해서 정리 하는 방식이다.



이 방식은 구현이 간단하다는 장점이 있지만, 데이타가 수집 된 후 처리를 시작하기 때문에, 실시간성이 떨어진다. (근 실시간)

Streaming 처리

Unbounded 데이타를 제대로 처리하려면 스트리밍 처리를 하는 것이 좋은데, 스트리밍 처리 방법에는 아래와 같이 크게 Time agnostic, Filtering, Inner Join, Windowing 방식등이 있다.


스트리밍 처리는 배치 처리에 비해서 복잡한 것이, Unbounded 데이타는 기본적으로 특성이 Skew가 환경에 따라 변화가 심하고 그래서 데이타가 시스템에 도착하는 순서 역시 순차적으로 도착하지 않고 들쭉 날쭉 하다.

Time agnostic

Time agnostic 이란, 데이타가 시간 속성을 가지고 있지 않는 데이타 이다. 들어오는 데로 처리를 하면 되기 때문에, 별다른 노하우가 필요 없지만, 하나의 데이타 형이기 때문에 간단하게 언급만 한다.

Filtering

다음으로 많이 사용 되는 것이 필터링인데, 들어오는 데이타 중 특정 데이타만 필터링 해서 저장 하는 구조이다.


예를 들면, 웹 로깅 데이타를 수집해서, 특정 IP나 국가 대역에서 들어오는 데이타만 필터링해서 저장하는 시나리오등이 될 수 있다.

Inner joins (교집합)

Inner join은 두개의 Unbounded 데이타에서 들어오는 값을 서로 비교하여 매칭 시켜서 값을 구하는 방식이다.



모바일 뉴스 앱이 있다고 가정할때, 뉴스 앱에서는 사용자가 어떤 컨텐츠를 보는지에 대한 데이타를 수집 전송하고, 지도 앱에서는 현재 사용자의 위치를 수집해서 전송한다고 하자.

이 경우 사용자별 뉴스 뷰에 대한 Unbounded data 와, 사용자별 위치에 대한 Unbounded data 가 있게 되는데, 이 두개의 데이타 스트림을 사용자로 Inner Join을 하면 사용자가 어떤 위치에서 어떤 뉴스를 보는지에 대해서 분석을 할 수 있다.

Inner join을 구현하기 위해서는 양쪽 스트림에서 데이타가 항상 같은 시간에 도착하는 것이 아니기 때문에, 반대쪽 데이타가 도착할때 까지 먼저 도착한 데이타를 임시로 저장할 버퍼 영역이 필요하고, 이 영역에 임시로 일정 기간 데이타를 저장하고 있다가 반대쪽 스트림에서 데이타가 도착 하면 이를 조인해서 결과를 저장하고, 버퍼 영역에서 두개의 데이타를 삭제한다.

만약에 반대쪽의 데이타가 도착하지 않으면, 이 버퍼 영역에 데이타가 계속 쌓이기 때문에, 일정 기간이 지나면 반대쪽 스트림에서 데이타가 도착하지 않은 데이타를 주기적으로 삭제 해주는 (garbage collection) 정책이 필요하다.


cf. Inner join (교집합), Outer join (합집합)

Approximation algorithms (근사치 추정)

근사치 추정 방식은 실시간 데이타 분석에서 많이 사용되는데, 실시간 분석에서는 전체 데이타를 모두 분석할 수 있는 시간이 없는 경우가 많고, 시급한 분석이 필요한 경우가 있기 때문에, 전체 데이타를 분석하지 않고 일부만 분석하거나 또는 대략적인 데이타의 근사값만을 구하는 방법으로 해서, 빠르게 데이타를 분석하는 경우가 있다. 이를 근사치 추정 방식이라고 하는데, 예를 들어 VOD 서비스에서 지금 10분간 인기있는 비디오 목록, 12시간 동안 가장 인기 있는 판매 제품등 과 같은 시나리오인데, 이런 시나리오에서 데이타는 아주 정확하지 않아도 근사 값만 있으면 되고, 데이타를 그 시간에 보는 시급성이 중요하다.  이러한 시나리오에서는 전체 데이타를 다 보고 분석이 어렵기 때문에, 샘플링을 하거나 대략적인 근사 값만을 구해서 결과를 낸다.


이런 근사치를 추정하는 알고르즘은 K-means나 Approximate Top-N등이 이미 정의되어 있는 알고리즘이 많다.


참고 자료 :

Storm을 이용한 근사치 구하기 : https://pkghosh.wordpress.com/2014/09/10/realtime-trending-analysis-with-approximate-algorithms/

Apache Spark에서 K means로 근사치 구하기 :

https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html


Windowing

실시간 스트리밍 데이타 처리에서 중요한 개념중의 하나는 Windowing 인데, Windowing 이란 스트리밍 데이타를 처리할때 일정 시간 간격으로 처리하는 것을 정의한다.

예를 들어, 10분 단위의 Windowing의 경우 1시~2시까지 들어온 데이타를 1:10, 1:20,1:30, …  단위로 모아서 처리한다.

윈도우에는 자르는 방법에 따라서 다음과 같이 몇가지 방법이 있다.

Fixed Windows

정확하게 일정 시간 단위로 시간 윈도우를 쪼게는 개념이다. 앞에서 언급한 예와 같이 윈도우 사이즈가 10분 일때, 1시 10분은 1시00분~1시10분까지의 데이타를, 1시 20분은 1시10분~1시20분까지의 데이타를 처리한다.

Sliding Windows

Sliding Window 방식은 윈도우가 움직이는 개념이다.

슬라이딩 윈도우의 개념은 현재 시간으로 부터 +-N 시간 전후의 데이타를 매 M 시간 마다 추출 하는 것을 슬라이딩 윈도우라고 하고, 이 윈도우들은 서로 겹치게 된다.

예를 들면 현재시간으로부터 10분 전에서 부터  측정시간까지의 접속자를 1분 단위로 측정하는 시나리오가 될 수 있다. 매 1분 간격으로, 데이타를 추출하고, 매번 그 시간으로부터 10분전의 데이타를 추출하기 때문에 데이타가 중첩이 된다.  

이렇게 추출하는 간격을 Period (앞에서 1분), 그리고 추출하는 기간을 Length 또는 Size (앞에서 10분)라고 한다.



출처 : https://cloud.google.com/dataflow/model/windowing#sliding-time-windows

Session

다음으로는 Session Window의 개념이다.

Session Window에는 사용자가 일정 기간동안 반응이 없는 경우(데이타가 올라오지 않는 경우)에 세션 시작에서 부터, 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리한다

예를 들어서 세션 타임 아웃이 20분이라고 하고 데이타가 1:00 부터 올라오고 있는데,  1:01, 1:15에 데이타가 올라오고, 1:40분에 데이타가 올라오면 1:15 이후에 20분동안 (1:35까지) 데이타가 올라오지 않았기 때문에, 1:00,1:01,1:15은 하나의 세션으로 되고, 1:40은 새로운 세션 시작이 된다.



출처 : https://cloud.google.com/dataflow/model/windowing#session-windows


시간대별 Window 처리 방식

스트리밍 데이타에서 윈도우를 사용할때, 어느 시간을 기준 시간으로 할것인가를 정해야 하는데, 데이타가 시스템에 도착하는 Processing time을 기준으로 할 수 있고 또는  데이타가 실제 발생한 시간인 Event time을 기준으로도 할 수 있다.

Processing time based windowing

Processing time을 기준으로 데이타를 처리하는 것은 크게 어렵지 않다. 데이타가 도착한 순서대로 처리해서 저장하면 된다.


Event time based windowing

문제는 Event time을 기준으로 데이타를 처리 하는 경우인데, 데이타가 들어오는 것이 순서대로 들어오지 않는 경우가 많고, 또한 데이타의 도착 시간또한 일정하지 않다.




이 그림은 Event time을 기준으로 데이타를 처리하는 개념인데, 좌측 하얀색 화살표 처럼 12:00~13:00에 도착한 데이타가 11:00~12:00에 발생한 데이타 일 경우, 11:00~12:00 윈도우에 데이타를 반영해줘야 한다.

이러한 Event time 기반의 스트리밍 처리는 아래와 같이 기술적으로 두가지 주요 고려 사항이 필요하다.

  • Buffering
    늦게 도착한 데이타를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다.

  • Completeness
    Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가?
    즉 해당 시간에 발생한 모든 데이타는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이타가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다. (예를 들어 워터마크 기법 같은 것이 있는데, 이는 다음글에서 설명하도록 한다.)


지금까지 실시간 데이타 분석에 사용되는 대략적인 개념을 알아보았다. 다음 글에서는 Apache Beam을 이용하여 이러한 실시간 데이타 분석을 어떻게 구현하는지 알아보도록 하겠다.



참고 자료

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/

https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison#game-stats-advanced-stream-processing


저작자 표시 비영리
신고