블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


Machine Learning Pipeline


조대협 (http://bcho.tistory.com)

대부분 모델 개발과 알고리즘에 집중

머신러닝을 공부하고 나서는 주로 통계학이나, 모델 자체에 많은 공부를 하는 노력을 드렸었다. 선형대수나 미적분 그리고 방정식에 까지 기본으로 돌아가려고 노력을 했었고, 그 중간에 많은 한계에도 부딪혔지만, 김성훈 교수님의 모두를 위한 딥러닝 강의를 접하고 나서, 수학적인 지식도 중요하지만 수학적인 깊은 지식이 없어도 모델 자체를 이해하고 근래에 발전된 머신러닝 개발 프레임웍을 이용하면 모델 개발이 가능하다는 것을 깨달았다.

 

계속해서 모델을 공부하고, 머신러닝을 공부하는 분들을 관심있게 지켜보고 실제 머신러닝을 사용하는 업무들을 살펴보니 재미있는 점이 모두 모델 자체 개발에만 집중한다는 것이다. 커뮤니티에 올라오는 글의 대부분은 어떻게 모델을 구현하는지 어떤 알고리즘을 사용하는지에 대한 내용들이 많았고, 실 업무에 적용하는 분들을 보면 많은 곳들이 R을 이용하여 데이타를 분석하고 모델링을 하는데, 데이타를 CSV 파일 형태로 다운 받아서 정재하고 데이타를 분석하고 모델을 개발하는 곳이 많은 것을 보았다. 데이타의 수집 및 전처리 및 개발된 모델에 대한 서비스에 대해서는 상대적으로 많은 정보를 접하지 못했는데, 예상하기로 대부분 모델 개발에 집중하기 때문이 아닌가 싶다.

 

엔지니어 백그라운드를 가진 나로써는 CSV로 데이타를 끌어다가 정재하고 분석하는 것이 매우 불편해 보이고 이해가 되지 않았다. 빅데이타 분석 시스템에 바로 연결을 하면, CSV로 덤프 받고 업로드 하는 시간등에 대한 고민이 없을텐데.” 왜 그렇게 할까 ?”라는 의문이 계속 생기기 시작하였다.

미니 프로젝트를 시작하다

이런 의문을 가지던중 CNN 네트워크 모델에 대한 대략적인 학습이 끝나고, 실제로 적용하면서 경험을 쌓아보기로 하였다. 그래서 얼굴 인식 모델 개발을 시작하였다. CNN 모델이라는 마법을 사용하면 쉽게 개발이 될줄 알았던 프로젝트가 벌써 몇달이 되어 간다. 학습용 데이타를 구하고, 이를 학습에 적절하도록 전처리 하는 과정에서 많은 실수가 있었고, 그 과정에서 많은 재시도가 있었다.

 

(자세한 내용은 http://bcho.tistory.com/1174 , https://www.slideshare.net/Byungwook/ss-76098082 를 참조)

 

특히나 데이타 자체를 다시 처리해야 하는 일이 많았기 때문에, 데이타 전처리 코드를 지속적으로 개선하였고 개선된 코드를 이용하여 데이타를 지속적으로 다시 처리해서 데이타의 품질을 높여나갔는데, 처리 시간이 계속해서 많이 걸렸다.

자동화와 스케일링의 필요성

특히 이미지 전처리 부분은 사진에서 얼굴이 하나만 있는 사진을 골라내고 얼굴의 각도와 선글라스 유무등을 확인한후 사용 가능한 사진에서 얼굴을 크롭핑하고 학습용 크기로 리사이즈 하는 코드였는데 (자세한 내용 http://bcho.tistory.com/1176) 싱글 쓰레드로 만들다 보니 아무래도 시간이 많이 걸렸다. 실제 운영환경에서는 멀티 쓰레드 또는 멀티 서버를 이용하여 스케일링을 할 필요가 있다고 느꼈다.

 

또한 이미지 수집에서 부터 필터링, 그리고 학습 및 학습된 모델의 배포와 서비스 까지 이 전 과정을 순차적으로 진행을 하되 반복적인 작업이기 때문에 자동화할 필요성이 있다고 생각했다.

아이 체중 예측 모델을 통한 파이프라인에 대한 이해

그러던 중에 팀 동료로 부터 좋은 예제 하나를 전달 받게 되었다.

미국 아기들의 환경에 따른 출생 체중을 예측하는 간단한 선형 회귀 모델을 구현한 파이썬 노트북인데 (https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/blogs/babyweight/babyweight.ipynb) 하나의 노트북에 전체 단계를 모두 구현해놓았다.

 


 

데이타에 대한 분석을 통한 데이타 특성 추출, 추출된 특성을 통한 모델 개발, 모델 학습을 위한 데이타 전처리 그리고 학습 및 학습된 모델을 통한 예측 서비스 까지 모든 과정을 하나의 노트북에 구현해놓았다.

(시간이 있으면 꼭 보기를 강력 추천한다.)

 

흥미로운 점이 데이타 전처리를 Apache Beam이라는 데이타 처리 플랫폼을 썼고, 그 전처리 코드를 파이썬 노트북에 하나로 다 정리한것이다. (실제로 수행은 로컬에서도 가능하지만, 클라우드에서도 실행이 가능해서 충분한 스케일링을 지원한다.)

 

Apache Beam의 구글의 빅데이타 분석 프레임웍으로 Apache Spark 과 같은 프레임웍이라고 보면된다. Google Dataflow라는 이름으로 구글 클라우드에서 서비스가 되는데, Apache Beam이라는 오픈소스로 공개가 되었다. ( http://bcho.tistory.com/1123 http://bcho.tistory.com/1122 http://bcho.tistory.com/1124 )

 

아 이렇게 하는구나 하는 생각이 들었고, 그즘 실무에서 이와 같은 흐름으로 실제로 머신러닝을 수행하는 것을 볼 기회가 있었다.

데이타 전처리를 스케일링하다.

서비스가 가능한 수준의 전체 머신러닝 서비스 파이프라인을 만들어보고 싶어졌다. 마침 또 Apache Beam의 경우에는 예전에 Java 코드로 실시간 분석을 해본 경험이 있고 이번에 2.0 버전이 릴리즈 되서 이번에는 2.0에서 파이썬을 공부해보기로 하고 개발에 들어갔다.

 

특히 기존의 데이타 전처리 코드는 싱글 쓰레드로 돌기 때문에 스케일링에 문제가 있었지만, Apache Beam을 사용할 경우 멀티 쓰레드 뿐만 아니라 동시에 여러대의 머신에서 돌릴 수 있고 이러한 병렬성에 대해서는 크게 고민을 하지 않아도 Apache Beam이 이 기능을 다 제공해준다. 또한 이 데이타 전처리 코드를 돌릴 런타임도 별도 설치할 필요가 없이 커멘드 하나로 구글 클라우드에서 돌릴 수 가 있다. (직업 특성상 클라우드 자원을 비교적 자유롭게 사용할 수 있었다.)

 

Apache Beam으로 전처리 코드를 컨버팅 한결과 기존 싱글 쓰레드 파이썬 코드가 400~500장의 이미지 전처리에 1~2시간이 걸렸던 반면, 전환후에 대략 15~17분이면 끝낼 수 있었다. 전처리 중에는 서버의 대수가 1대에서 시작해서 부하가 많아지자 자동으로 5대까지 늘어났다. 이제는 아무리 많은 데이타가 들어오더라도 서버의 대수만 단순하게 늘리면 수분~수십분내에 수십,수만장의 데이타 처리가 가능하게 되었다.


<그림. Apache Beam 기반의 이미지 전처리 시스템 실행 화면 >

 

Apache Beam 기반의 이미지 전처리 코드는 https://github.com/bwcho75/facerecognition/blob/master/Preprocess%2Bface%2Brecognition%2Bdata%2Band%2Bgenerate%2Btraining%2Bdata.ipynb 에 공개해 놨다.

 

머신러닝 파이프라인 아키텍쳐와 프로세스

이번 과정을 통해서 머신러닝의 학습 및 예측 시스템 개발이 어느 정도 정형화된 프로세스화가 가능하고 시스템 역시 비슷한 패턴의 아키텍쳐를 사용할 수 있지 않을까 하는 생각이 들었고, 그 내용을 아래와 같이 정리한다.

파이프라인 개발 프로세스

지금까지 경험한 머신러닝 개발 프로세스는 다음과 같다.

 

  1. 데이타 분석
    먼저 머신러닝에 사용할 전체 데이타셋을 분석한다. 그래프도 그려보고 각 변수간의 연관 관계나 분포도를 분석하여, 학습에 사용할 변수를 정의하고 어떤 모델을 사용할지 판단한다.

  2. 모델 정의
    분석된 데이타를 기반으로 모델을 정의하고, 일부 데이타를 샘플링하여 모델을 돌려보고 유효한 모델인지를 체크한다. 모델이 유효하지 않다면 변수와 모델을 바꿔 가면서 최적의 모델을 찾는다.

  3. 데이타 추출 및 전처리
    유효한 모델이 개발이 되면, 일부 데이타가 아니라 전체 데이타를 가지고 학습을 한다. 전체 데이타를 추출해서 모델에 넣어서 학습을 하려면 데이타의 크기가 크면 매번 매뉴얼로 하기가 어렵기 때문에 데이타 추출 및 전처리 부분을 자동화 한다.   

  4. 전체 데이타를 이용한 반복 학습 및 튜닝
    모델 자체가 유효하다고 하더라도 전체 데이타를 가지고 학습 및 검증을 한것이 아니기 때문에 의외의 데이타가 나오거나 전처리에 의해서 필터링되지 않은 데이타가 있을 수 있기 때문에 지속적으로 데이타 추출 및 전처리 모듈을 수정해야 하고, 마찬가지로 모델 역시 정확도를 높이기 위해서 지속적으로 튜닝을 한다. 이 과정에서 전체 데이타를 다루기 때문에 모델 역시 성능을 위해서 분산형 구조로 개선되어야 한다.

  5. 모델 배포
    학습 모델이 완성되었으면 학습된 모델을 가지고 예측을 할 수 있는 시스템을 개발하고 이를 배포한다.

  6. 파이프라인 연결 및 자동화
    머신러닝의 모델은 위의 과정을 통해서 만들었지만, 데이타가 앞으로도 지속적으로 들어올 것이고 지속적인 개선이 필요하기 때문에 이 전과정을 자동화 한다. 이때 중요한것은 데이타 전처리, 학습, 튜닝, 배포등의 각 과정을 물 흐르듯이 연결하고 자동화를 해야 하는데 이렇게 데이타를 흐르는 길을 데이타 플로우라고 한다. (흔히 Luigi, Rundeck, Airflow와 같은 데이타플로우 오케스트레이션 툴을 이용한다)

 

전체적인 프로세스에 대해서 좋은 영상이 있어서 공유한다.


아키텍쳐

위의 프로세스를 기반으로한 머신러닝 파이프라인 아키텍쳐 는 다음과 같다.


 

 

Inputs

머신 러닝 파이프라인의 가장 처음단은 데이타를 수집하고 이 수집된 데이타를 저장하는 부분이다.

데이타 수집은 시간,일,주,월과 같이 주기적으로 데이타를 수집하는 배치 프로세싱과, 실시간으로 데이타를 수집하는 리얼타임 프로세싱 두가지로 나뉘어 진다. 이 두 파이프라인을 통해서 데이타 소스로 부터 데이타를 수집하고 필터링하고 정재하여, 데이타 레이크에 저장한다. 이 구조는 일반적인 빅데이타 분석 시스템의 구조와 유사하다. (참고 자료 http://bcho.tistory.com/984 http://bcho.tistory.com/671 )

 

개인적으로 머신러닝을 위해서 중요한 부분 중 하나는 데이타 레이크를 얼마나 잘 구축하느냐이다. 데이타 레이크는 모든 데이타가 모여 있는 곳으로 보통 데이타 레이크를 구축할때는 많은 데이타를 모으는 데만 집중하는데, 데이타를 잘 모으는 것은 기본이고 가장 중요한 점은 이 모여 있는 데이타에 대한 접근성을 제공하는 것이다.

 

무슨 이야기인가 하면, 보통 머신러닝 학습을 위해서 학습 데이타를 받거나 또는 데이타에 대한 연관성 분석등을 하기 위해서는 데이타 레이크에서 데이타를 꺼내오는데, 데이타 레이크를 개발 운영 하는 사람과 데이타를 분석하고 머신러닝 모델을 만드는 사람은 보통 다르기 때문에, 모델을 만드는 사람이 데이타 레이크를 운영하는 사람에게 “무슨 무슨 데이타를 뽑아서 CSV로 전달해 주세요.” 라고 이야기 하는 것이 보통이다. 그런데 이 과정이 번거롭기도 하고 시간이 많이 걸린다.

가장 이상적인 방법은 데이타를 분석하고 모델링 하는 사람이 데이타 레이크 운영팀에 부탁하지 않고서도 손쉽고 빠르게 데이타에 접근해서 데이타를 읽어오고 분석을 할 수 있어야 한다.

직업 특성상 구글의 빅쿼리를 많이 접하게 되는데, 빅쿼리는 대용량 데이타를 저장할 수 있을 뿐만 아니라 파이썬 노트북이나 R 스튜디오 플러그인을 통해서 바로 데이타를 불러와서 분석할 수 있다.  


<그림 INPUT 계층의 빅데이타 저장 분석 아키텍쳐>

Pre processing & Asset creation

Pre processing은 수집한 데이타를 학습 시스템에 넣기 위해서 적절한 데이타만 필터링하고 맞는 포맷으로 바꾸는 작업을 한다. 작은 모델이나 개발등에서는 샘플링된 데이타를 로컬에서 내려 받아서 R이나 numpy/pandas등으로 작업이 가능하지만, 데이타가 수테라에서 수백테라이상이 되는 빅데이타라면 로컬에서는 작업이 불가능하기 때문에, 데이타 전처리 컴포넌트를 만들어야 한다.

일반적으로 빅데이타 분석에서 사용되는 기술을 사용하면 되는데, 배치성 전처리는 하둡이나 스파크와 같은 기술이 보편적으로 사용되고 실시간 스트리밍 분석은 스파크 스트리밍등이 사용된다.


Train

학습은 전처리된 데이타를 시스템에 넣어서 모델을 학습 시키는 단계이다. 이 부분에서 생각해야 할점은 첫번째는 성능 두번째는 튜닝이다. 성능 부분에서는 GPU등을 이용하여 학습속도를 늘리고 여러대의 머신을 연결하여 학습을 할 수 있는 병렬성이 필요하다. 작은 모델의 경우에는 수시간에서 하루 이틀 정도 소요되겠지만 모델이 크면 한달 이상이 걸리기 때문에 고성능 하드웨어와 병렬 처리를 통해서 학습 시간을 줄이는 접근이 필요하다. 작은 모델의 경우에는 NVIDIA GPU를 데스크탑에 장착해놓고 로컬에서 돌리는 것이 가성비 적으로 유리하고, 큰 모델을 돌리거나 동시에 여러 모델을 학습하고자 할때는 클라우드를 사용하는 것이 절대 적으로 유리하다 특히 구글 클라우드의 경우에는  알파고에서 사용된 GPU의 다음 세대인 TPU (텐서플로우 전용 딥러닝 CPU)를 제공한다. https://cloud.google.com/tpu/ CPU나 GPU대비 최대 15~30배 정도의 성능 차이가 난다.

 

 

학습 단계에서는 세부 변수를 튜닝할 필요가 있는데, 예를 들어 학습 속도나 뉴럴 네트워크의 폭이나 깊이, 드롭 아웃의 수, 컨볼루셔널 필터의 크기등등이 있다. 이러한 변수들을 하이퍼 패러미터라고 하는데, 학습 과정에서 모델의 정확도를 높이기 위해서 이러한 변수들을 자동으로 튜닝할 수 있는 아키텍쳐를 가지는 것이 좋다.

 

텐서플로우등과 같은 머신러닝 전용 프레임웍을 사용하여 직접 모델을 구현하는 방법도 있지만, 모델의 난이도가 그리 높지 않다면 SparkML등과 같이 미리 구현된 모델의 런타임을 사용하는 방법도 있다.

Predict

Predict에서는 학습된 모델을 이용하여 예측 기능을 서비스 하는데, 텐서플로우에서는  Tensorflow Serv를 사용하면 되지만, Tensorflow Serv의 경우에는 bazel 빌드를 이용하여 환경을 구축해야 하고, 대규모 서비를 이용한 분산 환경 서비스를 따로 개발해야 한다. 거기다가 인터페이스가 gRPC이다. (귀찮다.)

구글 CloudML의 경우에는 별도의 빌드등도 필요 없고 텐서 플로우 모델만 배포하면 대규모 서비스를 할 수 있는 런타임이 바로 제공되고 무엇보다 gRPC 인터페이스뿐만 아니라 HTTP/REST 인터페이스를 제공한다. 만약에 Production에서 머신러닝 모델을 서비스하고자 한다면 구글 CloudML을 고려해보기를 권장한다.

Dataflow Orchestration

이 전과정을 서로 유기적으로 묶어 주는 것을 Dataflow Orchestration이라고 한다.

예를 들어 하루에 한번씩 이 파이프라인을 실행하도록 하고, 파이프라인에서 데이타 전처리 과정을 수행하고, 이 과정이 끝나면 자동으로 학습을 진행하고 학습 정확도가 정해진 수준을 넘으면 자동으로 학습된 모델은 서비스 시스템에 배포하는 이 일련의 과정을 자동으로 순차적으로 수행할 수 있도록 엮어 주는 과정이다.

airbnb에서 개발한 Airflow나 luigi 등의 솔루션을 사용하면 된다.

아직도 갈길은 멀다.

얼굴 인식이라는 간단한 모델을 개발하고 있지만, 전체를 자동화 하고, 클라우드 컴퓨팅을 통해서 학습 시간을 단축 시키고 예측 서비스를 할 수 있는 컴포넌트를 개발해야 하고, 향후에는 하이퍼 패러미터 튜닝을 자동으로 할 수 있는 수준까지 가보려고 한다. 그 후에는 GAN을 통한 얼굴 합성들도 도전하려고 하는데, node.js 공부하는데도 1~2년을 투자한후에나 조금이나마 이해할 수 있게 되었는데, 머신러닝을 시작한지 이제 대략 8개월 정도. 길게 보고 해야 하겠다.

 



 

 

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


데이타 플로우 개발환경 설정하기


조대협 (http://bcho.tistory.com)


데이타 플로우에 대한 이해가 끝났으면 이제 직접 코딩을 해보자. 데이타 플로우에 대한 개념등은 http://bcho.tistory.com/search/dataflow 를 참고하기 바란다.

데이타 플로우에서 지원하는 프로그래밍 언어는 자바와 파이썬이다. 파이썬은 아직 알파버전으로, 이 글에서는 자바를 이용해서 설명한다.


자바를 이용한 개발환경 설정은 이클립스 개발환경과 maven을 이용한 개발 환경 두가지가 있는데, 여기서는 조금 더 손 쉬운 이클립스 환경을 기준으로 설명한다.

메이븐 기반의 개발 환경 설정은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven 를 참고하기 바란다.


사전준비

클라우드 계정 생성 및 빌링 설정

구글 클라우드 계정 생성 및 빌링 설정 방법은 앞서 다른글에서도 많이 설명하였기 때문에 다시 설명하지 않는다. 자세한 내용은 http://bcho.tistory.com/1107 를 참고하기 바란다.

API 사용 설정하기

다음 데이타플로우와 기타 같이 사용할 제품들의 API를 사용하기 위해서 이를 설정해줘야 한다.

구글 클라우드 콘솔에서 API Manager를 선택한후 대쉬 보드에서 아래 서비스들을 선택하여 API를 Enable 해준다. Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Datastore APIs.





구글 Cloud SDK 설정

구글 데이타 플로우를 프로그래밍 하기 위해서, 데이타 플로우 API를 호출하기 위한 SDK와 조작을 위한 CLI (Command Line Interface)가 필요한데, 이는 구글 Cloud SDK를 설치하면 같이 설치가 된다.

클라우드 SDK 설치는 https://cloud.google.com/sdk/docs/ 를 참고하면 된다.

gcloud 인증하기

구글 Cloud SDK 설치가 끝났으면, gcloud 명령어를 사용하기 위해서 gcloud 명령어를 초기화 한다.

초기화는 어떤 구글 클라우드 프로젝트를 사용할것인지, 그리고 사용자 아이디등으로 인증을 하는 절차를 거친다.

프롬프트 상에서

%gcloud init

명령을 실행하여, 수행한다.

이클립스 환경 설정

이제 구글 클라우드 프로젝트 설정과, 이를 호출하기 위한 SDK 환경 설치가 끝났다. 이제 이클립스 기반의 개발 환경을 설정해보자.

이클립스 설치하기

이클립스는 4.4 버전 이상을 설치하고, JDK는 1.7 이상을 설정한다.

플러그인 설치하기

다음 구글 데이타 플로우 개발환경을 위한 이클립스 플러그인을 설치한다.

이클립스에서 Help > Install New Software를 선택한 다음에, Work with 텍스트 박스에  https://dl.google.com/dataflow/eclipse/  을 입력한다.


다음으로 Google Cloud Dataflow를 선택하여 설치를 진행한다.

설치가 끝난 후 확인은 이클립스에서 New > Project를 하면, 위자드를 선택하는 화면에서 아래와 같이 Google Cloud Platform이라는 폴더와 함께 그 안에 “Cloud Dataflow Java Project”를 선택할 수 있는 화면이 나온것을 볼 수 있다.



헬로우 데이타 플로우

개발 환경 설정이 끝났으니, 이제 간단한 데이타 플로우 프로그램을 하나 만들어보자.

이 프로그램은 단어들을 읽어드린 후에, 단어들의 발생 횟수를 카운트 해 주는 파이프라인이다.



단어들을 읽어드린 후 toUpper라는 트랜스폼에서, 각 단어들을 대문자로 변환한 후, Count라는 트랜스폼에서 단어별로 발생횔 수를 카운트 한후에, 이를 Key Value (단어:발생횟수)로 리턴한 후, Print라는 트랜스폼에서 화면으로 결과를 출력해주는 예제이다.


프로젝트 생성

예제 파이프라인을 만들기 위해서, 이클립스에서 프로젝트를 생성해보자. New > Project를 선택한 후 에, 아래 그림과 같이 Google Cloud Platform 폴더에서 Cloud Dataflow Java Project를 선택한다



다음 프로젝트에 대해서  Group ID, Artifact ID 그리고 패키지 명등을 입력한다.



다음 메뉴로 넘어가면 구글 데이타 플로우를 실행하기 위한 디테일한 정보를 넣어야 하는데,




프로젝트 명과, “Cloud Storage Staging Location”이라는 정보를 입력해야 한다. Cloud Storage Staging Location은 Google Cloud Storage 의 버킷명으로, 데이타 플로우 애플리케이션 코드가 로딩 되는 장소이다.

데이타플로우 애플리케이션을 구글 클라우드에서 실행하게 되면, 애플리케이션 코드와 애플리케이션을 실행하기 위한 라이브러리들이 각각의 워커 노드로 배포 되는데, 배포를 위해서 먼저 클라이언트에서 부터, 이러한 실행 코드를 Google Cloud Storage에 올려놓게 된다. 앞에서 정의하는 “Cloud Storage Staging Location”은, 이 클라우드 스토리지 버킷에 대한 경로 정의이다.

클라우드 스토리지 버킷은 아래와 같인 Google Cloud Storage 메뉴에서 아래와 같이 생성할 수 있다.


코드 제작

그러면 코드를 작성해 보자.



package com.terry.df;


import com.google.cloud.dataflow.sdk.Pipeline;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

import com.google.cloud.dataflow.sdk.transforms.Count;

import com.google.cloud.dataflow.sdk.transforms.Create;

import com.google.cloud.dataflow.sdk.transforms.DoFn;

import com.google.cloud.dataflow.sdk.transforms.ParDo;

import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;

import com.google.cloud.dataflow.sdk.values.KV;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


public class StarterPipeline {

 private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);


 public static void main(String[] args) {

   Pipeline p = Pipeline.create(

       PipelineOptionsFactory.fromArgs(args).withValidation().create());


   p.apply(Create.of("Hello", "World","hello","boy","hello","girl"))

   .apply(ParDo.named("toUpper").of(new DoFn<String, String>() {

     @Override

     public void processElement(ProcessContext c) {

       c.output(c.element().toUpperCase());

     }

   }))

   .apply(Count.<String>perElement())

   .apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>(){

@Override

public void processElement(ProcessContext c) throws Exception {

LOG.info(c.element().getKey() + " count:"+c.element().getValue());

}

   }));


   p.run();

 }

}



(참고 : 위의 소스코드는 https://github.com/bwcho75/googledataflow/tree/master/HelloDataFlow 에 있다.)


처음 p.apply(Create.of…)에서, 데이타를 생성하였다.

다음으로 .apply(ParDo.named("toUpper").of(new DoFn<String, String>() 에서 소문자를 대문자로 다 치완하는 데, ParDo는 이 작업을 여러 노드에서 병렬로 실행하겠다는 선언이고, named는 이 트랜스폼의 이름을 “toUpper”로 정의하겠다는 정의이다. (나중에 디버깅에 유용한다.) 다음으로, 트랜스폼 함수는 DoFn으로 정의했는데, <String,String>으로 정의되어 앞의 인자가 Input 그리고 뒤의 인자가 Output의 데이타 형으로 String 인자를 받아서, String 인자로 리턴하겠다는 것이다.


.apply(Count.<String>perElement()) 은 데이타플로우에서 미리 정의된, 트랜스폼으로,  <String>으로 된 데이타를 받아서 엘리먼트당 카운트를 해서 <String,Long> 형으로 리턴을 해준다. 즉 String형의 단어마다 카운트를 한 결과를 Long형으로 넣어서 이를 키밸류(KV)형식으로 묶어서 리턴해준다.

.apply(ParDo.named("Print").of(new DoFn<KV<String,Long>, Void>() 에서는 앞에서 전달해준  String,Long형이 키밸류형으로 정의된 KV<String,Long>형의 데이타를 받아서, 출력해주고, 마지막 트랜스폼이기 때문에 더 이상 뒤로 데이타를 넘기지 않을 것이기 때문에, Output의 인지 타입을 Void로 선언하였다.

실행

코드를 작성이 끝났으면 실제로 실행해보자 Run As에서 Dataflow Pipeline을 선택하면 실행을 할 수 있다.



이때 다음과 같이 실행환경을 설정할 수 있다.



여기서 Runner에 대한 개념을 짚고 넘어갈 필요가 있다.

Direct Pipeline Runner

Direct Pipeline Runner는 데이타플로우 코드를 로컬 개발 환경 (노트북이나 데스크탑)에서 실행하고자 할때 선택할 수 있는 러너이다. 주로 개발이나 테스트에서 사용할 수 있는데, 다른 클라우드 서비스 예를 들어  Pub/Sub이나 빅쿼리등이랑 연동이 되는 파이프라인의 경우에는 DirectPipelineRunner를 사용할 수 없으니 주의하기 바란다.

DataflowPipelineRunner

클라우드 환경에서 데이타 플로우를 실행하기 위해서는 DataflowPipelineRunner와  BlockingDataflowPipelineRunner 두 가지가 있다.

DataflowPipelineRunner는 데이타 플로우 애플리케이션을 구글 클라우드에서 실행을 해주는데, 데이타 플로우 잡을 클라우드에서 실행해놓고, 로컬 애플리케이션을 바로 종료 한다. (클라우드에 접수된 잡은 클라우드에서 계속 실행된다.)

BlockingDataflowPipelineRunner

BlockingDataflowPipelineRunner는 데이타 플로우잡을 구글 클라우드에서 실행해놓게 해놓고, 잡이 끝날때 까지 로컬 애플리케이션을 대기하도록 한다.  

배치와 같이 끝이 있는 경우에는 필요에 따라서 사용할 수 있다. 스트리밍의 경우 BlockingDataflowPipelineRunner를 사용하게 되면 스트리밍 잡을 명시적으로 끊지 않는 이상 계속해서 로컬 애플리케이션이 실행되는 상태가 된다.


DirectPipelineRunner로 실행을 해보면 다음과 같이 이클립스 콘솔에서 결과가 출력되는 것을 볼 수 있다.


BODY는 1,  GIRL 은 1, HELLO는 3개 그리고 WORLD는 1개가 출력되는 것을 볼 수 있다.


이번에는 클라우드에 배포를 하고 실행해보자, Run As에서, BlockingDataflowPipelineRunner를 선택하여 실행해보자.

실행을 하면 코드가 자동으로 클라우드로 배포 되서 실행되는 것을 확인할 수 있다. 구글 클라우드 콘솔의 데이타 플로우 메뉴를 보면, 새로운 잡이 생성된것을 확인할 수 있다.


해당 잡을 선택해서 들어가 보면 현재 잡의 실행 상황과 함께, 파이프라인에서 단계별 시간이나 기타 상세한 로그를 볼 수 있다.



데이타 플로우 애플리케이션이 기동이 완료되면, Logs 메뉴에서 Worker Logs라는 버튼을 누르면 각 워커 노드에서의 로그를 볼 수 있다.


Worker Logs를 누르면 다음과 같이  GIRL,WORLD,BOY,HELLO에 대한 count 수를 출력한 로그를 확인할 수 있다.


참고 : Logs 메뉴로 들어가서  Job Logs에서  Minimum serverity를 “All” 로 선택하면 전체 작업 상황을 알 수 있는데, 애플리케이션을 실행했다고 바로 데이타 플로우의 파이프라인이 실행되는 것이 아니라, 애플리케이션 코드가 구글 클라우드 스토리에 로드되고, 이 로드된 코드들이 각각의 워커 노드에 배포가 된후에, 워커 노드가 기동이 되야 잡이 실제로 수행된다.


워커가 제대로 기동되었는지는 Job Logs에서 Mimimum serverity를 All로 한후에 다음과 같이 “Worker have started successfully”라는 로그가 나오면 그때 부터 데이타 플로우 잡을 실행을 시작한다고 생각하면 된다.








저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

데이타 스트리밍 분석 플랫폼 Dataflow 개념 잡기 #2/2

(트리거, 이벤트 타임, 워터마크 개념)


조대협 (http://bcho.tistory.com)


앞글 http://bcho.tistory.com/1122 에 의해서 Dataflow에 대한 개념에 대해서 계속 알아보자

트리거

윈도우와 더블어서 Dataflow 프로그래밍 개념중에서 유용한 개념중의 하나가 트리거이다. 트리거는 처리중인 데이타를 언제 다음 단계로 넘길지를 결정하는 개념이다. 특히 윈도우의 개념과 같이 생각하면 좋은데, 윈도우는 일반적으로 윈도우가 종료되는 시간에 그 데이타를 다음 Transform으로 넘기게 된다.


그런데 이런 의문이 생길 수 있다. “윈도우의 크기가 클때 (예를 들어 한시간), 한시간을 기다려야 데이타를 볼 수 있는 것인가? 그렇다면 한 시간 후에 결과를 본다면 이것을 실시간 분석이라고 할 수 있는가?”

그래서 여기서 트리거의 개념이 나온다.

예를 들어 한시간 윈도우가 있더라도, 윈도우가 끝나지 않더라도 현재 계산 값을 다음 Transform으로 넘겨서결과를 볼 수 있는 개념이다. 1분 단위로 트리거를 걸면 1분 결과를 저장하고, 2분째도 결과를 저장하고, 3분째도…. 60분째에도 매번 결과를 업데이트 함으로써, 윈도우가 종료되기 전에도 실시간으로 결과를 업데이트 할 수 있게 된다.


트리거의 종류

그렇다면 이러한 트리거는 앞에서 언급한 시간 단위의 트리거만 있을까? Dataflow는 상당히 여러 종류의 트리거를 지원한다.


  • Time trigger (시간 기반 트리거) : 시간 기반 트리거는 일정 시간 주기로 트리거링을 해주는 트리거 이다. 1분 단위, 1초 단위 같이 일정 주기를 지정하거나, “윈도우 시작후 2분후 한번과 윈도우 종료후 한번"과 같이 절대적인 시간을 기준으로도 정의가 가능하다.

  • Element Count (데이타 개수 기반 트리거) : 다음은 개수 기반인데, 예를 들어 “어떤 데이타가 100번 이상 들어오면 한번 트리거링을 해라” 또는 “매번 데이타가 100개씩 들어올때 마다 트리거링을 해라" 라는 형태로 정의가 가능하다.

  • Punctuations  (이벤트 기반 트리거) : Punctuations는 엄밀하게 번역하면 “구두점" 이라는 의미인데, 구두점 처럼 특정 데이타가 들어오는 순간에, 트리거링을 하는 방법이다.

트리거 조합

이러한 트리거는 하나의 트리거 뿐 아니라, 여러개의 트리거를 동시에 조합하여 사용이 가능하다.

  • AND : AND 조건으로 두개의 트리거의 조건이 만족해야 트리거링이 된다. 예를 들어, Time Trigger가 1분이고, Element Count 트리거가 100개이면, 윈도우가 시작된 1분 후에, Element Count가 100개가 되면 트리거링이 된다.

  • OR : OR 조건으로 두개의 트리거의 조건 중 하나만 만족하면 트리거링이 된다.

  • Repeat : Repeat는 트리거를 반복적으로 수행한다. Element Count 트리거 10개를 반복으로 수행하면, 매 10개 마다 트리거링이 된다. Time 트리거를 1분 단위로 반복하면, 매 1분 마다 트리거링이 된다.

  • Sequence : Sequence 트리거는 등록된 트리거를 순차적으로 실행한다. Time 트리거 1분을 걸고 Element count 트리거 100개를 걸면, 윈도우 시작후 1분 후 트리거링인 된후, 그 후 부터 Element 가 100개 들어오면 두번째 트리거링이 발생하고 트리거링이 종료 된다.


트리거 결과의 누적

그러면 트리거링이 될때 마다 전달 되는 데이타는 어떻게 될까라는 질문이 나올 수 있는데. 무슨 이야기인가 하면 윈도우 내에서 트리거가 발생할때, 이전 데이타에 대한 처리를 어떻게 할것인가이다.


데이타가 A,B,C,D,E,F 가 들어왔다고 가정하자. 트리거가 C 다음 발생했다고 했을때, 윈도우가 끝난 F에는 어떤 값이 리턴이 될까?

첫번째 트리거링에는 당연히 A,B,C 가 전달된다.

윈도우가 끝나면 A,B,C,D,E,F 가 전달되는 것이 맞을까 아니면 트리거링 된 이후의 값인 D,E,F 만 전달되는 것이 맞을까?

맞는 건 없고, 옵션으로 지정이 가능하다.

  • Accumulating
    Accumulating은 트리거링을 할때 마다 윈도우 내에서 그때까지의 값을 모두 리턴한다.

  • Discarding
    트리거링 한 후에, 이전 값은 더이상 리턴하지 않고, 그 이후 부터 다음 트리거링 할때까지의 값만을 리턴한다.

예를 들어서 보자


다음과 같은 윈도우가 있고, 3번, 23번, 10번에서 트리거링이 된다고 했을때,

Accumulating mode의 경우

  • 첫번째 트리거 후 : [5,8,3]

  • 두번째 트리거 후 : [5,8,3,15,19,23]

  • 세번째 트리거 후 : [5,8,3,15,19,239,13,10]

와 같이 값이 반환되고

Discarding mode의 경우

  • 첫번째 트리거 후 [5,8,3]

  • 두번째 트리거 후 [15,19,23]

  • 세번째 트리거 후 [9,13,10]

이 반환된다.

데이타 지연에 대한 처리 방법

실시간 데이타 분석은 특성상 데이타의 전달 시간이 중요한데, 데이타는 모바일 클라이언트 등에서 인터넷을 통해서 데이타가 서버로 전송되는 경우가 많기 때문에, 데이타의 실제 도달 시간이 들쭉날쭉 하다. 이러다 보니 데이타의 도착 순서나 지연등이 발생하는데, 이에 대한 처리가 필요하다. 먼저 데이타 도달 시간의 개념을 이해하려면, 이벤트 타임과 프로세싱 타임의 개념을 먼저 이해해야 한다.

이벤트 타임과 프로세싱 타임

모바일 단말에서 다음과 같이 A,B,C,D의 데이타를 1시1초, 1시2초,3초,5초에 보냈다고 하자.


서버에 도착해서 Dataflow에 도착하는 시간은 물리적으로 서버와 단말간의 거리 차이가 있기 때문에 도착 시간은 단말에서 데이타가 발생한 시간보다 느리게 되며, 또한 각 단말의 위치나 단말이 연결되어 있는 네트워크 상황이 다르기 때문에 순차적으로 도착하는 것이 아니라, 늦게 보낸 데이타가 더 빨리 도착할 수 도 있다.

아래 그림을 보면 A데이타는 1시1초에 단말에서 생성되었지만 서버에 도착한 시간은 1시2초가 된다. C,D의 경우, 순서가 바뀌어서 도착하였다.



이렇게 실제로 데이타가 발생한 시간을 이벤트 타임, 그리고 서버에 데이타가 도착한 시간을 프로세싱 타임이라고 정의한다.


이 프로세싱 타임은 네트워크 상황이나 데이타에 크기에 따라 가변적으로 변하기 때문에, 이벤트 타임과 프로세싱 타임의 상관 관계를 그래프로 표현해보면 다음과 같아진다.


가장 이상적인 결과는 이벤트 타임과 프로세싱 타임이 동일한 것이겠지만 불가능하고, 위의 그림처럼 이벤트 타임보다 프로세싱 타임이 항상 늦게 되고, 이벤트 타임과 프로세싱 타임의 차이는 매 순간 다르게 된다.

워터 마크 (Water Mark)

이렇게 위의 그림과 같이 실제 데이타가 시스템에 도착하는 시간을 예측 하게 되는데, 이를 워터 마크라고 한다. 위의 그림에서 “실제 처리 그래프"로 표시되는 부분을 워터마크라고 생각하면 된다. 이 예측된 시간을 기반으로 윈도우의 시스템상의 시작 시간과 종료 시간을 예측 하게 된다.

지연 데이타 처리 방법

윈도우 처리 관련해서, 실제 발생한 시간과 도착 시간이 달라서, 처리 시간내에 못 들어오는 경우가 발생할 수 있다. 아래 그림을 보면, 실제 윈도우는 1시1초~1시6초까지의 데이타를 처리하기를 바라고 정의했을 수 있는데, 시스템에서는 이 윈도우의 값이 프로세싱 타임 기준으로 (워터 마크를 기준으로 연산함) 1시2초~1시6초에 도착하기를 기대하고 있는데, 데이타 C의 경우에는 기대했던 프로세싱 타임에 도착하지 않았기 때문에 이 데이타는 연산에서 누락될 수 있다.



비단 늦게 도착한 데이타 뿐만 아니라, 시스템이 예측한 프로세싱 타임 보다 일찍 데이타가 도착할 수 있는데, 이런 조기 도착한 데이타와 지연 도착한 데이타에 대한 처리는 어떻게 해야 할까?

Dataflow에서는 이런 조기 도착이나 지연 데이타에 대한 처리 메카니즘을 제공한다.

윈도우를 생성할때, withAllowedLateness라는 메서드를 사용하면, 늦게 도착하는 데이타에 대한 처리 기간을 정의할 수 있다.


PCollection<String> items = ...;

 PCollection<String> fixed_windowed_items = items.apply(

   Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))

         .withAllowedLateness(Duration.standardDays(2)));

https://cloud.google.com/dataflow/model/windowing#managing-time-skew-and-late-data


위의 예제는 1분 단위의 Fixed Window를 정의하고, 최대 2일까지 지연 도착한 데이타 까지 처리할 수 있도록 정의한 예제이다.


지금까지 간단하게 dataflow를 이용한 스트리밍 데이타 처리의 개념에 대해서 알아보았다.

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

스파크 성능이 안나오면, 우리 회사 데이타팀 팀장왈. 먼저 파이썬으로 짰는지 확인 부터 해보라길래, 파이썬과 스칼라로 만들어진 스파크 성능 차이가 얼마나 나는지 찾아봤더니 다음과 같은 수치가 나왔다.


http://emptypipes.org/2015/01/17/python-vs-scala-vs-spark/ (원본 출처)


일단 스파크를 할려면 스칼라는 필수인듯 하다. 

간단한 프로토타입핑등에는 파이썬을 사용할 수 있겠지만 결국 프로적션은 스칼라로 최적화해야 할듯.

근데. 자바대 스칼라 성능 비교는 없네

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
Spark의 전체적인 스택 구조

조대협 (http://bcho.tistory.com)

스파크의  전체적인 스택 구조를 보면 다음과 같다.





  • 인프라 계층 : 먼저 스파크가 기동하기 위한 인프라는 스파크가 독립적으로 기동할 수 있는 Standalone Scheudler가 있고 (그냥 스팍만 OS위에 깔아서 사용한다고 생각하면 된다). 또는 하둡 종합 플랫폼인 YARN 위에서 기동될 수 있고 또는 Docker 가상화 플랫폼인 Mesos 위에서 기동될 수 있다.
  • 스파크 코어 : 메모리 기반의 분산 클러스터 컴퓨팅 환경인 스팍 코어가 그 위에 올라간다. 
  • 스파크 라이브러리  : 다음으로는 이 스파크 코어를 이용하여 특정한 기능에 목적이 맞추어진 각각의 라이브러리가 돌아간다. 빅데이타를 SQL로 핸들링할 수 있게 해주는 Spark SQL, 실시간으로 들어오는 데이타에 대한 리얼타임 스트리밍 처리를 해주는 Spark Streaming, 그리고 머신러닝을 위한 MLib, 그래프 데이타 프로세싱이 가능한 GraphX가 있다.

현재 글에서 설명하고 있는 부분은 먼저 스파크에 대한 기본을 이해하기 위해서 Spark Core 부분을 중점적으로 설명하고 있다. 


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License



Apache Spark Cluster 구조

스팍의 기본 구조는 다음과 같다.
스팍 프로그램은 일반적으로 “Driver Program”이라고 하는데, 이 Driver Program 은 여러개의 병렬적인 작업으로 나뉘어져사 Spark의 Worker Node(서버)에 있는  Executor(프로세스)에서 실행된다.



1. SparkContext가 SparkClusterManager에 접속한다. 이 클러스터 메니져는 스팍 자체의 클러스터 메니져가 될 수 도 있고 Mesos,YARN 등이 될 수 있다. 이 클러스터 메니저를 통해서 가용한 Excutor 들을 할당 받는다
2. Excutor를 할당 받으면, 각각의 Executor들에게 수행할 코드를 보낸다.
3. 다음으로 각 Excutor 안에서 Task에서 로직을 수행한다.


  • Executor : Process
  • Task : A Unit of work that will sent to one executor

cf. Storm 과 개념이 헷갈릴 수 있는데, 
Storm 은 Node가 하드웨어 서버, Worker가 프로세스,Executor가 쓰레드
Spark 은 Worker Node가 하드웨어 서버, Executor가 프로세스 이다.  


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License



Apache Spark 설치 하기


조대협 (http://bcho.tistory.com)


Spark 설치 하기


1. 스팍 홈페이지에서 다운로드.


다운로드시 Pre-built in Spark을 골라야 함. 여기서는 Hadoop 2.6용으로 빌드된 스팍을 선택한다.








2. 스팍 쉘을 실행 해보자


인스톨 디렉토리에서, 

%./bin/pyspark





을 실행하면, 위와 같이 파이썬 기반의 스팍 쉘이 실행됨을 확인할 수 있다.


3. 로깅 레벨 조정 및 간단한 스팍 예제


디폴트 로깅은 INFO 레벨로 되어 있기 때문에, 쉘에서 명령어를 하나라도 실행하면 INFO 메세지가 우루루 나온다. (몬가 할때 결과 값보다, 오히려 INFO 메세지가 많이 나온다.)

그래서, conf/log4j.properties 파일을 conf/log4j.properties.templates 파일을 복사해서 만든후 

log4j.rootCategory를 Info에서 WARN 레벨로 다음과 같이 수정한다.


log4j.rootCategory=WARN, console






환경 설정이 끝났으면 간단한 예제를 돌려보자

$SPARK_HOME 디렉토리에 있는  README.md 파일을 읽어서, 라인 수를 카운트 하는 예제이다.





스팍은 자체적으로 클러스터를 모니터링 할 수 있는 차체적인 Web UI가 있다. 

http://localhost:4040에 접속하면 다음과 같이 스팍 클러스터에 대한 모니터링 화명을 얻을 수 있다.









저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

분산 대용량 큐-Apache Kafka에 대한 검토 내용 정리


실시간 빅데이타 분석 아키텍쳐를 검토하다가 아파치 스톰을 보다보니, 실시간 데이타 스트림은 큐를 이용해서 수집하는 경우가 많은데, 데이타의 양이 많다 보니 기존의 큐 솔루션으로는 한계가 있어서 분산 대용량 큐로 아파치 카프카(Kafka)가 많이 언급된다.

그래서, 아키텍쳐를 대략 보고, 실효성에 대해서 고민을 해봤는데, 큐의 기능은 기존의 JMS나 AMQP 기반의 RabbitMQ(데이타 기반 라우팅,페데레이션 기능등)등에 비해서는 많이 부족하지만 대용량 메세지를 지원할 수 있는 것이 가장 큰 특징이다. 특히 분산 환경에서 용량 뿐 아니라, 복사본을 다른 노드에 저장함으로써 노드 장애에 대한 장애 대응 성을 가지고 있기 때문에 용량에는 확실하게 강점을 보인다.

실제로 마이크로소프트 社의 엔지니어가 쓴 논문을 보면http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

 


카프카의 경우 10만 TPS 이상의 성능을 RabbitMQ는 2만 TPS 정도의 성능을 내는 것으로 나와 있는데, 여기서 생각해볼 문제가 큐는 비동기 처리 솔루션이다. 즉 응답 시간에 그렇게 민감 하지 않다는 것이다.

그리고 일반적인 웹 시스템의 성능이 1500~2000 TPS (엔터프라이즈 시스템의 경우) 내외인 것이 일반적이기 때문에, Rabbit MQ의 2만 TPS의 성능은 충분하다고 볼 수 있지 않을까 한다.

물론 네이버나 해외의 대형 SNS 서비스의 경우에는 충분히 저정도의 용량이 필요하겠지만, 현재로써는 일반적인 시스템에서는 카프카의 용량과 성능은 약간 오버 디자인이 아닌가 하는 생각이 든다.


Rabbit MQ is scalable and




저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

대충보는 Storm #5-Apache Storm 병렬 분산 처리 이해하기

 조대협 (http://bcho.tistory.com)

 

Storm에 있는 Spout Bolt들은 여러개의 머신에서 어떻게 나눠서 처리될까? Storm 클러스터는 여러대의 분산된 서버에서 운용되기 때문에, 당연히 Spout Bolt도 나눠서 처리된다 그렇다면 이런 Storm의 병렬 처리 구조는 어떻게 되는 것일까?

이 글에서는 Spout Bolt를 병렬로 처리하는 Storm의 구조에 대해서 알아보도록 한다.

Storm의 병렬 처리를 이해하기 위한 개념

Storm의 병렬 처리를 이해하기 위해서는 몇가지 개념을 정리해야 한다. Node,Worker,Exectutor,Task 이 네 가지 개념을 이해해야 한다.


Node

Node는 물리적인 서버이다. Nimbus Supervisor 프로세스가 기동되는 물리적인 서버이다.

Nimbus는 전체 노드에 하나의 프로세스만 기동하며, Supervisor는 일반적으로 하나의 노드에 하나만 기동한다. 여러대를 기동시킬 수 도 있지만, Supervisor의 역할 자체가 해당 노드를 관리하는 역할이기 때문에 하나의 노드에 여러개의 Supervisor를 기동할 필요는 없다.


Worker

Worker Supervisor가 기동되어 있는 노드에서 기동되는 자바 프로세스로 실제로 Spout Bolt를 실행하는 역할을 한다.


Executor

Executor Worker내에서 수행되는 하나의 자바 쓰레드를 지칭한다.


Task

Task Bolt Spout의 객체를 지칭한다. Task Executor (쓰레드)에 의해서 수행된다.

이 개념을 다시 정리해보면 다음과 같은 그림이 된다.



<그림. Node,Worker,Executor,Task 의 개념>

각 슬레이브 노드에는 Supervisor 프로세스가 하나씩 떠있고, conf/storm.yaml에 정의된 설정에 따라서 worker 프로세스를 띄운다.supervisor.slots.ports에 각 Worker가 사용할 TCP 포트를 정해주면 된다. 아래는 5개의 Worker 프로세스를 사용하도록 한 설정이다.



<그림. Storm 설정에서 Supervisor 5개 띄우도록한 설정>

 

그리고 난후에, Topology를 생성할때, Topology에 상세 Worker,Executor,Task의 수를 정의한다. 앞에서 예제로 사용했던 HelloTopology 클래스 코드를 다시 살펴보자. 아래 코드는 Worker,Executor,Task등을 설정한 예이다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),2)

                       .setNumTasks(4)

                       .shuffleGrouping("HelloSpout");

              

              

               Config conf = new Config();

               conf.setNumWorkers(5);

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<코드. Worker,Executor,Task 수를 설정한 HelloTopology 예제>

     Topology가 사용할 Worker 프로세스의 수 설정
Config
에서 setNumWorkers(5)를 이용해서 이 토폴로지에서 사용한 Worker 프로세스 수를 5개로 지정했다.

     Spout Executor(쓰레드 수) 설정
다음으로 setSpout에서 3번째 인자로 “2”라는 숫자를 넘겼는데, setSpout에 마지막 인자는 Executor의 수이다. 이를 Parallelism 힌트라고 하는데, Spout 컴포넌트가 수행될 쓰레드의 수이다. 여기서는 Spout Task (객체의 수)를 정의하지 않았는데, 정의하지 않은 경우 디폴트로 Executor의 수와 같이 설정된다.

     Bolt Executor(쓰레드 수)Task(객체)수 설정
Bolt
도 마찬가지로 setBolt 3번째 마지막 인자가 Parallelism 힌트인데, 역시 2개로 지정하였다. 여기서는 Task수를 별도로 지정하였는데, setTaskNum(4)을 이용해서 지정한다. 이렇게 설정하면 HelloBolt 객체는 총 4개가 생기고 2개의 Thread에서 번갈아 가면서 실행하게 된다.

자아 그러면 실제로 설정하는데로 동작하는 지 몇가지 확인을 해보자. 자바의 jps 명령을 이용하면 현재 동작중인 자바 프로세스 수를 볼 수 있다.



<그림 Worker 프로세스 수의 확인>

위의 테스트는 하나의 환경에서 nimbus,zookeeper,supervisor,worker를 모두 띄워놓은 형태인데,worker가 설정대로 5개의 프로세스가 떠있고, nimbus,supervisor가 떠 있는 것이 확인되고, QuorumPeerMainzookeeper 프로세스이다.

실제로 Executor가 지정한데로 Thread가 뜨는지 확인을 해보자. 여러개의 Worker 프로세스에 나눠서 뜨면 모니터링하기가 복잡하니 편의상 conf.setNumer(1)로 해서, 하나의 Worker 프로세스에서 모든 Executor가 뜨도록 Topology를 변경한후, Worker 프로세스의 쓰레드를 모니터링 하니 다음과 같은 결과를 얻었다.

코드상에서 HelloSpout에 대한 Parallelism 힌트를 2로 설정하고, HelloBolt에 대한 Parallelism 힌트도 2로 설정하였다.



<그림. Worker 프로세스의 쓰레드 덤프>

실제로 Worker 프로세스내의 쓰레드를 보면 HelloSpout용 쓰레드가 2, HelloBolt용 쓰레드가 2개가 기동됨을 확인할 수 있다.


리밸런싱

Storm 운영중에 노드를 추가 삭제 하거나 또는 성능 튜닝을 위해서 운영중인 환경에 Worker, Executor의 수를 재 조정이 가능하다. 이를 rebalance라고 하는데, 다음과 같은 명령어를 이용해서 가능하다.

% bin/storm rebalance [TopologyName] -n [NumberOfWorkers] -e [Spout]=[NumberOfExecutos] -e [Bolt1]=[NumberOfExecutos] [Bolt2]=[NumberOfExecutos]

미들웨어 엔지니어로써 본 Storm 튜닝

본인의 경우 경력이 톰캣이나 오라클社의 웹로직에 대해 장애진단과 성능 튜닝을 한 경력을 가지고 있어서 JVM이나 미들웨어 튜닝에 많은 관심을 가지고 있는데, 이 미들웨어 튜닝이라는 것이 대부분 JVM과 쓰레드 수등의 튜닝에 맞춰 있다보니, Storm의 병렬성 부분을 공부하다 보니, Executor Worker,Task의 수에 따라서 성능이 많이 차이가 나겠다는 생각이 든다.

특히나 하나의 토폴리지만 기동하는 것이 아니라, 여러개의 토폴로지를 하나의 클러스터에서 구동 할 경우 더 많은 변수가 작용할 수 있는데, 쓰레드란 것의 특성이 동시에 하나의 코어를 차지하고 돌기 때문에, 쓰레드수가 많다고 시스템의 성능이 좋아지지 않으며 반대로 적으면 성능을 극대화할 수 없기 때문에, 이 쓰레드의 수와 이 쓰레드에서 돌아가는 객체(Task)의 수에 따라서 성능 차이가 많이 날것으로 생각된다. 아마도 주요 튜닝 포인트가 되지 않을까 싶은데, 예전에는 보통 JVM당 적정 쓰레드 수는 50~100개 정도로 책정했는데 (톰캣과 같은 WAS 미들웨어 기준). 요즘은 코어수도 많아져서 조금 더 많은 쓰레드를 책정해도 되지 않을까 싶다. 쓰레드 수 뿐 아니라, 프로세스수도 영향을 미치는데, JVM 프로세스의 컨텐스트 스위칭은 쓰레드의 컨텐스트 스위칭보다 길기 때문에, 프로세스를 적게 띄우는 것이 좋을것으로 예상 되지만, JVM 프로세스는 메모리 GC에 의한 pausing 시간이 발생하기 때문에 이 GC 시간을 적절하게 나눠주기 위해서 적절 수 의 프로세스를 찾는 것도 숙제가 아닐까 싶다. 디폴트 worker의 옵션을 보니 768M의 힙 메모리를 가지고 기동하게 되어 있는데, 메모리를 많이 사용하는 연산는 다소 부족하지 않을까 하는 느낌이 든다.

Bolt가 데이타 베이스, 파일 또는 네트워크를 통해서 데이타를 주고 받는 연산을 얼마나 하느냐에 따라서도 CPU 사용률이 차이가 날것이기 때문에 (IO작업중에는 쓰레드가 idle 상태로 빠지고 CPU가 노는 상태가 되기 때문에) IO 작업이 많은 경우에는 쓰레드의 수를 늘리는 것이 어떨까 한다.

Bolt Spout와 같은 통신은 내부적으로 ZeroMQ를 사용하는 것으로 알고 있는데, 아직 내부 구조는 제대로 살펴보지는 않았지만, 같은 프로세스내에서는 네트워크 호출 없이 call-by-reference를 이용해서 통신 효율을 높이고, 통신이 잦은 컴포넌트는 같은 프로세스에 배치 하는 affinity와 같은 속성(?)이 있지 않을까 예측을 해본다.

결과적으로 튜닝 포인트는, Worker,Executor,Task 수의 적절한 산정과, 만약에 옵션이 있다면 리모트 호출을 줄이기 위한 Bolt Spout 컴포넌트의 배치 전략에 있지 않을까 한다.


다음 글에서는 이런 병렬 처리를 기반으로 각 컴포넌트간에 메세지를 보낼때, 여러 Task간에 어떻게 메세지를 라우팅을 하는지에 대한 정리한 그룹핑(Grouping)에 대한 개념에 대해서 알아보도록한다.


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

 

대충보는 Storm #4-Apache Storm 특징과 기본 개념

 조대협 (http://bcho.tistory.com)


지금까지 Storm에 대해서 이해하기 위해서, 실시간 스트리밍 서비스의 개념에 대해서 알아보고 간단한 HelloStorm 애플리케이션을 제작해서, 싱글 클러스터 노드에 배포해봤다. 대략 실시간 스트리밍이 무엇이고, Storm을 이용해서 어떻게 개발하는지에 대해서는 어느정도 이해를 했을 것이라고 생각한다.

그러면 지금까지의 경험을 조금 더 체졔적으로 정리해서 Storm에 대해서 이해해보도록 하자. 이번에는 Storm에 대한 개념과 아키텍쳐 구조에 대해서 알아보겠다.


Storm의 특징

Storm을 실시간 스트리밍을 처리하기 위한 서버이자 프레임웍이다. 그렇다면 이 Storm이 다른 스트리밍 처리 솔루션에 비해 가지는 특징은 무엇일까?


확장성

Storm은 클러스터링 기능을 이용해서 수평으로 확장이 가능하다. 그래서 많은 데이타를 다루어야 하는 빅데이타이 데이타 스트림 서비스에서도 가능한데, Storm홈페이지에 포스팅된 자료를 보면, 2x2.4GHz CPU 24GB 메모리 머신을 기반으로 초당 100바이트짜리 메세지를 100만개 정도 처리가 가능하다고 한다. (https://storm.apache.org/about/scalable.html) TPS로 환산하면 100 TPS이다. (Wow!!)


장애 대응성

Storm의 다른 특징 중의 하나는 Fault tolerant 구조를 통한 장애 대응 능력이다. ZooKeeper를 통해서 전체 클러스터의 운영 상태를 감지 하면서, 특정 노드가 장애가 나더라도, 시스템이 전혀 문제 없이 작업을 진행할 수 있으며, 장애가 난 노드에 할당된 작업은 다른 노드에 할당해서 처리하고, 장애 노드에 대해서는 복구 처리를 자동으로 수행해준다.


메세지 전달 보장

Storm은 메세지 처리에 안정성을 제공하는데, 장애가 나건 문제가 있건간에,유실 없이 최소한 한번 메세지가 처리될 수 있게 지원한다. (at least once : 이말은 반대로 이야기 하면, 1번 이상 같은 메세지가 중복 처리될 수 있다는 이야기이다.)

만약에 정확하게 메세지가 한번만 처리가 되기를 원하면 Trident (https://storm.apache.org/documentation/Trident-tutorial.html) 를 통해서 Storm을 확장하면, 정확히 하나의 메세지가 한번만 처리되도록 할 수 있다.


쉬운 배포

Storm은 메뉴얼에 따르면(?) 배포와 설정이 매우 쉽다. 실제로 클러스터를 구성해보면 분산 시스템인데 비해서 별로 어려움 없이 배포와 설정이 가능하다. 그리고 메뉴얼에 따르면(?) 배포 후에, 크게 많은 관리 없이 운영이 가능하다고는 하는데, 이것은 실제로 해보지 않았기 때문에 패스

일단, 오픈소스인데도 설치 후 웹 기반의 모니터링 콘솔을 제공하기 때문에 시스템의 상태를 쉽게 모니터링 하고 운영하는데 도움을 준다.


여러 프로그래밍 언어 지원

Storm은 기본적으로 JVM (Java Virtual Machine)위에서 동작하기는 하지만, Twitter Thrift 프로토콜을 기반으로 하기 때문에, 다양한 언어로 구현이 가능하다. Java 뿐 아니라, JVM을 사용하지 않는 경우에 대해서는 stdin/stdout을 통해서 데이타를 주고 받음으로써, Ruby,Python,Javascript,Perl 등 다양한 언어를 사용할 수 있다.


다양한 시스템 연계

Storm은 다양한 다른 솔루션과 통합이 가능하다. 데이타를 수집하는 부분에서는 Kestrel (http://robey.github.io/kestrel/), RabbitMQ (http://www.rabbitmq.com/) , Kafka (http://kafka.apache.org/), JMS 프로토콜, mazon Kinesis (http://aws.amazon.com/kinesis/)

등이 연동이 가능하며, 다양한 데이타 베이스 (RDBMS, Cassandra, MongoDB )에도 쉽게 연계가가능하다. CEP(Complex Event Processing을 지원하는) 이벤트 처리 분야에서는  Drools (http://www.drools.org/),  Esper (http://esper.codehaus.org/등이 연계 가능하고. 그외에도 Elastic Search (http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/storm.html

) , node.js (https://github.com/paralect/storm-nodejs-starterkit)등 다양한 솔루션과 연동을 통해서 시스템을 확장해 나갈 수 있다.


오픈소스

마지막으로 Storm은 오픈소스이다. 상업적 활용이 가능한 Apache License 2.0을 따르고 있다.

Apache License 2.0 http://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EB%9D%BC%EC%9D%B4%EC%84%A0%EC%8A%A4

Apache License 2.0 한국 번역본 http://yesarang.tistory.com/272

Storm은 유사한 특징을 가지고 있는 Apache Spark에 비해서, 개발이 된지 오래되어서 안정성이 높고 특유의 구조상 장애 대처 능력과 메세지 전달 보장 능력등이 좋다. 반대로 Spark은 최근에 만들어진 만큼 머신 러닝등 더 많은 기능을 가지고 있다.

Storm의 기본 개념

자아 그러면 이제 Storm의 개념을 다시 정립해보자. Storm의 개념을 이해하려면 필수적으로 먼저 이해해야 하는것이 Spout Bolt의 개념이다.

Spout Bolt

Spout Storm 클러스터로 데이타를 읽어들이는 데이타 소스이다. 외부의 로그 파일이나, 트위터 타임 피드와 같인 데이타 스트림, 큐등에서 데이타를 읽어드린다. 이렇게 읽어 드린 데이타를 다른 Bolt로 전달한다. Spout에는 크게 4가지 중요한 메서드가 있다.

Ÿ   open() : 이 메서드는 Spout이 처음 초기화 될때 한번만 호출되는 메서드로, 데이타 소스로 부터의 연결을 초기화 하는 등의 역할을 한다.

Ÿ   nextTuple() : 이 메서드는 데이타 스트림 하나를 읽고 나서, 다음 데이타 스트림을 읽을 때 호출 되는 메서드 이다.

Ÿ   ack(Object msgId) : 이 메서드는 데이타 스트림이 성공적으로 처리되었을때 호출되는데, 이 메서드에서는 성공 처리된 메세지를 지우는 등, 성공 처리에 대한 후처리를 구현한다.

Ÿ   fail(Object msgId) : 이 메서드는 해당 데이타 스트림이 Storm 토폴로지를 수행하던중에, 에러가 나거나 타임아웃등이 걸렸을때 호출되는데,이때에는 사용자가 에러에 대한 에처 처리 로직을 명시해야 한다. 흔히 재처리 로직을 구현하거나 또는 에러 로깅등의 처리를 하게 된다.

Bolt는 이렇게 읽어 드린 데이타를 처리하는 함수이다. 입력 값으로 데이타 스트림을 받고, 그 데이타를 내부의 비지니스 로직에 따라서 가공한 다음에 데이타 스트림으로 다른 Bolt로 넘겨주거나 종료 한다. Bolt에서 정의되는 주요한 메서드는 다음과 같다.

Ÿ   prepare (Map stormConf, TopologyContext context, OutputCollector collector): 이 메서드는 Bolt 객체가 생성될때 한번 호출 된다. 각종 설정 정보나 컨텍스트등 초기 설정에 필요한 부분을 세팅하게 된다.

Ÿ   execute(Tuple input): 가장 필수적인 메서드로, Bolt에 들어온 메세지를 처리하는 로직을 갖는다. 종단 Bolt가 아닌 경우에는 다음 Bolt로 메세지를 전달하기도 한다.

Storm 클러스터내에는 여러개의 Spout Bolt가 존재하게 된다.



<그림. Storm Spout Bolt의 개념>

Topology

이렇게 여러 개의 Spout Bolt간의 연관 관계를 정의해서 데이타 흐름을 정의하는 것을 토폴로지(Topology)라고 한다. 아래 그림과 같이 데이타가 어디로 들어와서 어디로 나가는지를 정의하는 것인데, 아래 그림은 두 개의 Spout에 대해서 각각의 토폴로지를 정의하였다.



<그림. Storm Topology>

Spout Bolt간의 연결 토폴로지는 TopologyBuilder라는 클래스를 통해서 정의한다. 그러면 간략하게, Spout Bolt, Bolt간의 데이타 흐름 관계를 어떻게 정의하는지 살펴보도록 하자.

다음과 같은 토폴로지 흐름을 정의한다고 가정하자



<그림. 간단한 토폴로지 정의 예제>


HelloSpout은 앞서 예제에서 만든것과 같은 Spout이고,

EchoBoltA 는 각각 들어온 메세지에 “Hello I am BoltA :”+메세지를 붙여서 화면에 출력한 후 전송하고 EechoBoltB는 들어온 메세지에 “Hello I am BoltB :”+메세지를 붙여서 전송한다.


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class EchoBoltA extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Hello I am Bolt A: "+value);

               collector.emit(new Values("Hello I am Bolt A :"+value));

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

                 declarer.declare(new Fields("say"));

        }

 

}

<그림 EchoBoltA 클래스>


※ 참고 EchoBoltB 클래스도 클래스명만 다르고 내부 구현 내용은 동일하다

토폴로지를 정의할때, HelloSpout hs라는 ID로 생성을 할것이고, EchoBoltA eba라는 ID, EchoBoltB ebb라는 이름으로 생성을 할것이다.

토폴로지 생성 코드를 보자. 아래 노랑색으로 표시된 부분이 실제 토폴로지는 구성하는 부분이다.


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

import com.terry.storm.hellostorm.EchoBoltB;

import com.terry.storm.hellostorm.EchoBoltA;

 

public class ToplogySequence {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("hs", new HelloSpout(),1);

               builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

               builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

              

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("ToplogySequence", conf,builder.createTopology());

               Utils.sleep(1000);

               // kill the LearningStormTopology

               cluster.killTopology("ToplogySequence");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

}

 

<그림. 위의 그림에 있는 토폴로리지를 실제로 구현한 >

 

Spout을 구현한 부분을 보자


builder.setSpout("hs", new HelloSpout(),1);

 

를 통해서 Spout을 생성하는데, setSpout(“{id}”,”{Spout 객체}”,”{Parallelism 힌트}”) 로 이루어진다. 여기서 id“hs”로 정의했고, Spout 객체는 HelloSpout을 지정했다.

    Paralleism 힌트는 나중에 병령 처리와 그룹핑 개념을 설명할때 다시 설명하도록 한다.

다음으로 Bolt를 생성하는데,

builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

 

로 각각의 Bolt를 생성했다. 이때 주목해야 하는 점이 뒤에 붙어 있는 shufflerGrouping이라는 메서드인데, Spout Bolt간의 연관 관계는 이 Grouping이라는 개념을 이용해서 생성한다. Grouping에는 여러가지 종류와 개념이 있지만 여기서는 간단한 shuuflerGrouping만을 사용했다.첫번째 EchoBoltA에서 자신을 “eba” 라는 id로 생성을 한후에, suffelerGrouping(“hs”)를 선언했는데, 이는 “hs”라는 ID를 가지고 있는 Spout이나 Bolt로 부터 메세지를 받아들이겠다는 이야기이다. 두번째 EchoBoltBsuffelerGrouping(“eba”)를 통해서, id“eba” Spout이나 Bolt, 즉 앞서 생성한 EchBoltA로 부터 메세지를 받아들이겠다는 이야기이다.

자 그러면 이 토폴로지를 실행해 보자.

실행하면 다음과 같은 로그를 얻을 수 있다.

5399 [Thread-12-hs] INFO  backtype.storm.daemon.task - Emitting: hs default [hello world 1]

5400 [Thread-8-eba] INFO  backtype.storm.daemon.executor - Processing received message source: hs:4, stream: default, id: {}, [hello world 1]

Hello I am Bolt A: hello world 1

5401 [Thread-8-eba] INFO  backtype.storm.daemon.task - Emitting: eba default [Hello I am Bolt A :hello world 1]

5409 [Thread-10-ebb] INFO  backtype.storm.daemon.executor - Processing received message source: eba:2, stream: default, id: {}, [Hello I am Bolt A :hello world 1]

Hello I am Bolt B: Hello I am Bolt A :hello world 1

 

     5399 번 라인에서 12번 쓰레드에서 수행되는 “hs” 라는 이름의 Spout, “hello world 1” 이라는 문자열을 emit (제출) 하였다

     5400 번 라인에서 8번 쓰레드에서 수행되는 “eba”라는 Bolt“hs”라는 Spout또는 Bolt에서 “hello world”라는 문자열을 받았다. 그 다음 라인에 “Hello I am Bolt A: hello world 1”가 출력되는 것을 확인할 수 있다.

     5401 라인에서 8번 쓰레드에서 수행되는 “eba” 볼트가 “Hello I am Bolt A :hello world 1” 라는 문자열을 제출하였다.

     5409 라인세서 10번 쓰레드에서 수행되는 “ebb”라는 id의 볼트가 “eba”로 부터 “Hello I am Bolt A :hello world 1” 라는 메세지를 받았다. 다음 행에 EchoBoltB에 의해서 처리되어 “Hello I am Bolt B: Hello I am Bolt A :hello world 1” 문자열이 출력되었음을 확인할 수 있다.

Stream Tuple

다음으로 데이타 Stream Tuple에 대한 개념을 이해해야 한다.

Storm에서 데이타는 Stream이라는 개념으로 정의되는데, Stream이란, Spout Bolt간 또는 Bolt간을 이동하는 데이타들의 집합을 이야기 한다.

각각의 Stream은 하나의 Tuple로 이루어 지는데, Tuple형태로 정의된다.



<그림. Storm Stream Tuple 개념>


앞에 예제에서는 하나의 키만 있는 Tuple을 사용하였다.

앞에서 사용한 HelloSpout 클래스를 다시 한번 살펴보면


public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private static int count=0;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 if(count++<10) this.collector.emit(new Values("hello world "+count));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout>

nextTuple 부분에서 newValue로 하나의 값을 보내는 것을 볼 수 있다. 그리고 이 Tuple의 키 구조는 아래 declareOutputFields 메서드에서 “say” 라는 필드이름으로 정의된것을 볼 수 있다.

실제로 HelloSpout에서 생성하는 데이타 스트림은 다음과 같다.



<그림 데이타 Stream Turple 구조>

이번 글에서는 간단하게 Storm의 특징과 기본 개념에 대해서 알아보았다. 다음글에서는 조금더 상세한 Storm의 아키텍쳐의 개념과 병렬 처리 개념에 대해서 알아보도록 하겠다.

 

 

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

대충보는 Storm #3-Storm 싱글 클러스터 노드 설치 및 배포

조대협(http://bcho.tistory.com)

 

지난번에는 간략하게, Storm을 이용한 HelloStorm 애플리케이션을 개발용 클러스터인 Local Cluster에서 구동해봤다. 이번에는 운영용 클러스터를 설정하고, 이 운영 클러스터에 지난번에 작성한 HelloStorm 토폴리지를 배포해보도록 한다.

Storm 클러스터의 기본 구조

Storm 클러스터를 기동하기 전에, 클러스터가 어떤 노드들로 구성이 되는지 먼저 알아보도록 하자 Storm 클러스터는 기본적으로 아래와 같은 3가지 구성요소로 구성이 되어 있다.

먼저 주요 노드인 Nimbus Supervior 노드에 대해서 알아보자, Nimbus Supervisor 노드는 각각 하나의 물리 서버로 생각하면 된다.


Nimbus

Nimbus는 마스터 노드로 주요 설정 정보를 가지고 있으며, Nimbus 노드를 통해서 프로그래밍 된 토폴로지를 Supervisor 노드로 배포한다. 일종의 중앙 컨트롤러로 생각하면 된다. Storm에서는 중앙의 하나의 Nimbus 노드만을 유지한다.

Supervisor

Supervisor 노드는 실제 워커 노드로, Nimbus로 부터 프로그램을 배포 받아서 탑재하고, Nimbus로 부터 배정된 작업을 실행하는 역할을 한다. 하나의 클러스터에는 여러개의 Supervisor 노드를 가질 수 있으며, 이를 통해서 여러개의 서버를 통해서 작업을 분산 처리할 수 있다.

Zookeeper

이렇게 여러개의 Supervisor를 관리하기 위해서, Storm Zookeeper를 통해서 각 노드의 상태를 모니터링 하고, 작업의 상태들을 공유한다.

Zookeeper는 아파치 오픈소스 프로젝트의 하나로, 분산 클러스터의 노드들의 상태를 체크하고 공유 정보를 관리하기 위한 분산 코디네이터 솔루션이다.

전체적인 클러스터의 구조를 살펴보면 다음과 같다.



<그림. Storm 클러스터의 구조>

하나의 싱글 머신에 Nimbus가 설치되고, 다른 각각의 머신에 Supervisor가 하나씩 설치된다. 그리고, ZooKeeper 클러스터를 통해서 기동이 된다.

※ 실제 물리 배포 구조에서는 Nimbus Supervisor, ZooKeeper등을 하나의 서버에 분산배포할 수 도 있고, 여러가지 다양한 배포구조를 취할 수 있으나, Supervisor의 경우에는 하나의 서버에 하나의 Supervisor만을 설치하는 것을 권장한다. Supervisor의 역할이 하나의 물리서버에 대한 작업을 관리하는 역할이기 때문에, 한 서버에 여러 Supervisor를 설치하는 것은 적절하지 않다.

 

설치와 기동

Storm 클러스터를 기동하기 위해서는 앞에서 설명한바와 같이 ZooKeeper가 필요하다. Zookeeper를 다운로드 받은 후에, ~/conf/zoo_sample.cfg 파일을 ~/conf/zoo.cfg로 복사한다.

다음으로 ZooKeeper를 실행한다.

% $ZooKeeper_HOME/bin/zkServer.cmd


<그림. 주키퍼 기동 로드>


다음으로 Nimbus 노드를 실행해보자. Storm을 다운 받은 후 압축을 푼다.

다음 $APACHE_STORM/bin 디렉토리에서

%storm nimbus

를 실행하면 nimbus 노드가 실행된다. 실행 결과나 에러는 $APACHE_STORM/logs 디렉토리에 nimbus.log라는 파일로 기록이 된다.

정상적으로 nimbus 노드가 기동이 되었으면 이번에는 supervisor 노드를 기동한다.

%storm supervisor

Supervisor에 대한 노드는 $APACHE_STORM/logs/supervisor.log 라는 이름으로 기록된다.

Storm은 자체적으로 클러서터를 모니터링 할 수 있는 UI 콘솔을 가지고 있다. UI를 기동하기 위해서는

%storm ui

로 실행을 해주면 UI가 기동이 되며 http://localhost:8080 에 접속을 하면 관리 콘솔을 통해서 현재 storm의 작동 상태를 볼 수 있다.



<그림. Storm UI를 이용한 기동 상태 모니터링>


현재 하나의 PC nimbus supervior,UI를 모두 배포하였기 때문에 다음과 같은 물리적인 토폴로지가 된다.



<그림. 싱글 서버에 nimbus supervisor를 같이 설치한 예>

싱글 클러스터 노드에 배포 하기

싱글 노드 클러스터를 구축했으니, 앞의 1장에서 만든 HelloStorm 토폴로지를 이 클러스터에 배포해보도록 하자.

전장의 예제에서 만든 토폴로지는 Local Cluster를 생성해서, 자체적으로 개발 테스트용 클러스터에 토폴로지를 배포하도록 하는 코드였다면, 이번에는 앞에서 생성한 Storm 클러스터에 배포할 수 있는 토폴로지를 다시 만들어야 한다. 이 코드의 차이는 기존 코드와는 다르게 LocalCluster를 생성하지 않고, 기동중인 클러스터에 HelloTopoloy Submit하도록 한다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<그림. HelloTopology.java>


토폴로지 클래스를 만들었으면 이를 빌드해보자

%mvn clean install

을 실행하면 ~/target 디렉토리에 토폴로지 jar가 생성된것을 확인할 수 있다.



jar 파일을 배포해보도록 하자.

배포는 storm {jar} {jar파일명} {토폴로지 클래스명} {토폴로지 이름} 명령을 실행하면 된다.

% storm jar hellostorm-0.0.1-SNAPSHOT.jar com.terry.storm.hellostorm.HelloTopology HelloTopology

배포 명령을 내리면 $APACHE_STORM_HOME/logs/nimbus.log에 다음과 같이 HelloTopology가 배포되는 것을 확인할 수 있다.


2015-01-25T07:35:03.352+0900 b.s.d.nimbus [INFO] Uploading file from client to storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.365+0900 b.s.d.nimbus [INFO] Finished uploading file from client: storm-local\nimbus\inbox/stormjar-8c25c678-23f5-436c-b64e-b354da9a3746.jar

2015-01-25T07:35:03.443+0900 b.s.d.nimbus [INFO] Received topology submission for HelloTopology with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "HelloTopology", "storm.id" "HelloTopology-1-1422138903"}

2015-01-25T07:35:03.507+0900 b.s.d.nimbus [INFO] Activating HelloTopology: HelloTopology-1-1422138903

2015-01-25T07:35:03.606+0900 b.s.s.EvenScheduler [INFO] Available slots: (["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6702] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6701] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6700])

2015-01-25T07:35:03.652+0900 b.s.d.nimbus [INFO] Setting new assignment for topology id HelloTopology-1-1422138903: #backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local\\nimbus\\stormdist\\HelloTopology-1-1422138903", :node->host {"226ceb74-c1a3-4b1a-aab5-2384e68124c5" "terry-PC"}, :executor->node+port {[3 3] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [6 6] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [5 5] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [4 4] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [7 7] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [2 2] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703], [1 1] ["226ceb74-c1a3-4b1a-aab5-2384e68124c5" 6703]}, :executor->start-time-secs {[7 7] 1422138903, [6 6] 1422138903, [5 5] 1422138903, [4 4] 1422138903, [3 3] 1422138903, [2 2] 1422138903, [1 1] 1422138903}}

2015-01-25T07:37:48.901+0900 b.s.d.nimbus [INFO] Updated HelloTopology-1-1422138903 with status {:type :inactive}

해당 토폴로지가 배포되었는지를 확인하려면 storm list라는 명령어를 사용하면 현재 기동되고 있는 토폴로지 목록을 확인할 수 있다.




<그림. storm list 명령으로 기동중인 토폴로지를 확인>


실행 결과는 $APACHE_STORM_HOME/logs 디렉토리를 보면 worker-xxx.logs 라는 파일이 생긴것을 확인해 볼 수 있는데, 파일 내용을 보면 다음과 같다.

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

2015-01-25T07:35:08.908+0900 STDIO [INFO] Tuple value ishello world

우리가 앞서 구현한 Bolt에서 System.out으로 출력한 내용이 출력된다.

동작을 확인하였으면, 이제 기동중인 토폴로지를 정지 시켜 보자. 토폴로지의 정지는 storm deactivate {토폴로지명} 을 사용하면 된다. 아까 배포한 토폴로지 명이 HelloTopology였기 때문에 다음과 같이 토폴로지를 정지 시킨다.

%storm deactivate HelloTopology

그 후에 다시 storm list 명령을 이용해서 토폴로지 동작 상태를 확인해보면 다음과 같다.



< 그림. Storm  토폴로지 정지와 확인 >


만약에 Topology를 재 배포 하려면 storm kill로 해당 토폴로지를 삭제한 후에, 다시 배포 해야 한다. 이때 주의할점은 storm kill로 삭제해도 바로 삭제가 안되고 시간 텀이 있으니 약간 시간을 두고 재 배포를 해야 한다.


지금까지 간단하게, 운영용 클러스터를 구성하고, 운영 클러스터에 토폴로지를 배포 하는 것에 대해서 알아보았다.

HelloStorm 코드 구현, 클러스터 노드 구축 및 배포를 통해서 간단하게 스톰이 무엇을 하는 것인지는 파악했을 것으로 안다. 다음 장에는 조금 더 구체적으로 Storm의 개념과 스톰의 아키텍쳐등에 대해서 살펴보도록 하겠다.


 


저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

대충보는 Storm #2-Storm 설치와 HelloStorm 작성하기

조대협(http://bcho.tistory.com)


Apache Storm Spark

앞서 데이타 스트리밍 처리에 대해서 설명했다. 스트리밍 처리에 대표적인 오픈소스 프레임웍으로는 Apache Storm Apache Spark이 있는데ㅔ, Spark은 최근에 나온 것으로 스트리밍 처리뿐 만 아니라 조금 더 보편적인 분산 컴퓨팅을 지원하는데, Storm의 경우 나온지도 오래되었고 무엇보다 안정성 부분에서 아직까지는 Spark보다 우위에 있기 때문에, Storm을 중심으로 설명하고자 한다

HelloStorm

Storm의 내부 구조 개념등을 설명하기에 앞서, 일단 깔아서 코드부터 돌려보고 개념을 잡아보자


HelloStorm 구조

HelloWorld 처럼 간단한 HelloStorm을 만들어보자. 만들어보려고 하는 Storm 프로그램은 다음과 같다.



<그림. HelloStorm 개념 구조>


HelloSpout 이라는 클래스는, Storm에 데이타를 읽어오는 클래스로 이 예제에서는 자체적으로 데이타를 생성해낸다. Storm으로 들어오는 데이타는 Tuple이라는 형식을 따르는데, Key/Value 형식의 데이타 형을 따른다. 여기서는 키(필드명)“say”, 데이타는 “Hello” 라는 문자열을 가지고 있는 데이타 tuple을 생성한다.

HelloSpout에서 생성된 데이타는 HelloBolt라는 곳으로 전달이 되는데, HelloBolt 클래스는 데이타를 받아서 처리하는 부분으로 간단하게 들어온 데이타에서 “say” 라는 필드의 데이타 값을 System.out으로 출력해주는 역할만을 한다.


개발하기

이클립스를 사용하여, maven project를 생성한다.



다음으로 pom.xml을 작성한다.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

 

  <groupId>com.terry.storm</groupId>

  <artifactId>hellostorm</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>jar</packaging>

 

  <name>hellostorm</name>

  <url>http://maven.apache.org</url>

 

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

<dependencies>

  <dependency>

    <groupId>junit</groupId>

    <artifactId>junit</artifactId>

    <version>3.8.1</version>

    <scope>test</scope>

  </dependency>

  <dependency>

        <groupId>org.apache.storm</groupId>

        <artifactId>storm-core</artifactId>

        <version>0.9.3</version>

  </dependency>

 

</dependencies>

 

<build>

  <plugins>

    <plugin>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.2.1</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <mainClass />

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>make-assembly</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

 

</project>

<그림. pom.xml>


이 예제에서는 storm 0.9.3을 사용했기 때문에 위와 같이 storm-core 0.9.3 dependency 부분에 정의하였다.

다음으로 데이타를 생성하는 HelloSpout을 구현하자


package com.terry.storm.hellostorm;

 

import java.util.Map;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 this.collector.emit(new Values("hello world"));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout.java>


HelloSpout 실행이 되면, 필드 “say” 값이 “hello world” 데이타를 생성해서 다음 워크 플로우로 보낸다.

nextTuple() 이라는 함수에서 외부에서 데이타를 받아들여서 다음 워크 플로우로 보내는 일을 하는데, 여기서는 외부에서 데이타를 받아들이지 않고 자체적으로 데이타를 생성하도록 한다. 데이타를 뒤에 워크플로우에 보내는 함수는 emit인데, emmit부분에 “hello world”라는 value 넣어서 보내도록 하였다. 그렇다면 필드의 값은 어떻게 정의 하느냐? 필드값은 declareOutputField라는 함수에 정의하는데, 데이타의 필드는 “say” 정의하였다.


다음으로 이 HelloSpout에서 생성할 데이타를 처리한 HelloBolt를 구현해보자


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

 

public class HelloBolt extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Tuple value is"+value);

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

              

        }

 

}

<그림. HelloBolt.java>


HelloSpout에서 생성된 데이타는 HelloBolt 들어오는데, 데이타가 들어오면 execute라는 메서드가 자동으로 수행된다. 이때, Tuple 통해서 데이타가 전달된다. 여기서는 tuple에서 필드이름이 “say” 값을 tuple.getStringByField(“say”) 이용해서 꺼내서 System.out으로 출력했다.

눈치가 빠른 사람이라면 벌써 알아차렸겠지만, 데이타를 다음 플로우로 보내고자 할때는 앞의 HelloSpout에서 한것처럼, execute 메서드내에서 데이타 처리가 끝난후에, collector.emit 이용해서 다음 플로우로 보내고, delcareOutputField에서 데이타에 대한 필드를 정의하면 된다.

데이타를 생성하는 Spout 데이타를 처리 하는 Bolt 구현했으면 둘을 연결 시켜줘야 한다. 이를 연결시켜주는 것이 Topology인데, HelloTopologyLocal 클래스를 구현해 보자


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

public class HelloTopologyLocal {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("HelloTopologyLocal", conf,builder.createTopology());

               Utils.sleep(10000);

               // kill the LearningStormTopology

               cluster.killTopology("HelloTopologyLocal");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

 

}

<그림. HelloTolologyLocal.java>


나중에 개념에서 자세하 설명하겠지만, Topology 데이타를 생성하는 Spout 처리하는 Bolt간에 토폴로지 데이타 흐름을 정의하는 부분이다. Spout Bolt들을 묶어 주는 부분이다.

먼저 TopologyBuilder 이용해서 Topology 생성하고, setSpout 이용해서 앞에서 구현한 HelloSpout 연결한다.

다음으로, setBolt 이용해서 Bolt Topology 연결한다. 후에, HelloSpout HelloBolt 연결해야 하는데, setBolt시에, SuffleGrouping 메서드를 이용하여, HelloBolt HelloSpout으로 부터 생성되는 데이타를 읽어들임을 명시한다.

builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

이렇게 Topology 구성되었으면이 Topology 실제로 실행해야 하는데, Topology 어떤 서버에서 어떤 포트등을 이용해서 실행될지는 Config 정의할 있지만, 여기서는 간단한 테스트이기  때문에 별도의 복잡한 Config 정보는 기술하지 않았다.

다음으로 이렇게 만들어진 Topology Storm 클러스터에 배포해야 하는데, Storm 개발의 편의를 위해서 두가지 형태의 클러스터를 제공한다. 개발용 클러스터와 실운영 환경용 클러스터를 제공하는데, 여기서는 LocalCluster cluster = new LocalCluster();

라는  것을 사용하였다.

LocalCluster 개발환경용 클러스터로, 개발자의 환경에서 최소한의 서버들만을 기동하여 개발한 토폴로지를 테스트할 있게 해준다. 이렇게 Cluster 생성했으면 cluster.submitTopology 이용하여 개발한 토폴로지를 배포한다. 토폴로지가 배포되면 자동으로 토폴로지가 실행이 된다. HelloSpout 계속해서 데이타를 생성하고, HelloBolt 생성된 데이타를 받아서 System.out.println으로 출력하게 되는데, 10초후에 멈추게 하기 위해서, Sleep 10초를 준다. 토폴로지 코드를 실행하는 쓰레드는 Sleep으로 빠질지 모르지만 토폴로지에서 생성된 HelloSpout HelloBolt 쓰레드는 백그라운드에서 작업을 계속 진행한다.

10초후에는 killTopology 이용해서 해당 토폴로지를 제거하고 shutdown 이용해서 Storm 클러스터를 종료시킨다.

실행하기

여기까지 구현했으면 첫번째 Storm 프로그램을 기동해보자. 다음과 같이 maven 명령어를 이용하면 실행이 가능하다.

C:\dev\ws\java_workspace\com.terry.storm>mvn exec:java -Dexec.mainClass=com.terry.storm.hellostorm.HelloTopologyLocal -Dexec.classpath.Scope=compile

실행을 해보면, HelloSpout 데이타를 생성하고, HelloBolt 이를 받아서 화면에 출력하는 것을 있다.


6292 [Thread-16-HelloSpout] INFO  backtype.storm.daemon.task - Emitting: HelloSpout default [hello world]

6292 [Thread-22-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello world]

Tuple value ishello world

6292 [Thread-10-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello world]

Tuple value ishello world

ZooKeeper 에러 대응하기

종종 환경에 따라서 실행이 안되면서 다음과 같은 에러가 출력되는 경우가 있는데


3629 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@7bfd25ce

3649 [main-SendThread(0:0:0:0:0:0:0:1:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (java.lang.SecurityException: 로그인 구성을 찾을 없습니다.)

3650 [main-SendThread(0:0:0:0:0:0:0:1:2000)] ERROR org.apache.storm.zookeeper.ClientCnxnSocketNIO - Unable to open socket to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000

3655 [main-SendThread(0:0:0:0:0:0:0:1:2000)] WARN  org.apache.storm.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.SocketException: Address family not supported by protocol family: connect

        at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_37]

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_37]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003) ~[storm-core-0.9.3.jar:0.9.3]


이 에러는 Storm Zookeeper와 연결을 할 수 없어서 나는 에러인데, LocalCluster 모드로 기동할 경우,Storm embedded Zookeeper를 기동해서 이 Zookeeper와 연결되어야 하나. IPV6로 연결을 시도하기 때문에, (ZK IPV4 Listen하는데) 발생하는 문제로

java로 실행할때 "-Djava.net.preferIPv4Stack=true" 옵션을 주면, JVM IPV6를 사용하지 않고, V4를 사용하기 때문에, ZooKeeper IPV4로 뜨고, Storm IPV4로 연결을 시도하기 때문에 문제가 없어진다.

지금까지 간단하게나마 첫번째 Storm 프로그램을 작성해서 실행해보았다.

다음에는 Storm 이루는 컴포넌트 구조와 아키텍쳐에 대해서 설명하도록 한다

 

저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Spark 노트

클라우드 컴퓨팅 & NoSQL | 2014.12.18 00:03 | Posted by 조대협

Spark을 살펴보기전에, 단순하게 빅데이터 실시간 분석을 위한 스트리밍 플랫폼 정도로 생각했다.

CEP 영역에 해당하는 Apache Storm 과 같은 개념으로 생각했었는데, Spark의 특징은 스트리밍은 하나의 특징일 뿐이고 조금 더 일반적인 클러스터링 플랫폼이다.

여러개의 컴퓨터를 묶어서 무언가(?)를 할 수 있는 플랫폼의 개념으로, 무엇인가 처리를 클러스터에 분산하여 실행하도록 해준다.

무언가가 데이타 분석도 될 수 있고, 머신 러닝이나 기타 여러가지가 될 수 있다.


Spark은 기존의 하둡의 Map & Reduce의 성능 문제, MR 기반의 복잡성을 제거하고자 탄생했다.

 메모리 기반의 처리를 통해서 기존의 MR에 비해 성능을 올리고 조금더 쉬운 데이타 접근을  목적을 두고 탄생했다.


Spark은 이미 몇가지 기본 라이브러리들을 탑재하고 있는데, 머신러닝을 위한 MLib, 그래프 계산을 위한 GraphX, 스트리밍 처리를 위한 Spark Streaming 그리고, 분산 데이타 베이스에 대해서 SQL 문장같은 처리를 가능하게 해주는 Spark SQL 등이 있다.

SparkSQL등은 Cassandra와 같은 분산 NoSQL에 SQL 문장으로 처리가 가능하게 해주고, 특히나 분산된 노드에 Map&Reduce와 같이 쿼리를 보내서 모아서 리턴해주는 기능을 프레임웍 차원에서 수행해준다.


주요 프로그래밍 언어로는 Java,Scala,Python을 지원하고 있다.


먼저 RDD의 개념을 이해해야 한다.


Spark tutorial

http://cdn.liber118.com/workshop/itas_workshop.pdf



저작자 표시 비영리
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
TAG apache, Spark

Spring for Apache Hadoop Project #2

(Hive Integration)

Hive Apache 오픈 소스 프로젝트의 하나로, Hadoop 관련 프로젝트이다.

HDFS에 저장된 데이타를 마치 RDMS SQL처럼 쿼리하기 위한 솔루션으로, 복잡한 데이타 쿼리 연산에 있어서, Hadoop과 함께 사용하면 매우 유용하게 이용할 수 있다.

SHDP에서도 이 Hive를 지원한다. 크게 Hive의 기동과, Hive Script의 실행 그리고, Hive에서 제공하는 API를 수행할 수 있도록 지원하며, Hadoop 지원과 마찬가지로, Tasklet을 제공하여 Spring Batch와의 통합을 지원한다.

Hive Server의 기동

hive-server 엘리먼트로 정의하며, configuration file을 읽어서 기동할 수 있으며, 추가되는 configuration hive-server엘리먼트 안에 value로써 지정이 가능하다.

<hdp:hive-server host="some-other-host" port="10001" properties-location="classpath:hive-dev.properties" configuration-ref="hadoopConfiguration">
  someproperty=somevalue
  hive.exec.scratchdir=/tmp/mydir
</hdp:hive-server>

Thrift Client 를 이용한 Hive Script의 수행

Hive를 사용하기 위해서는 Hive Server에 접속하는 클라이언트를 생성해야 하는데, 첫번째 방법이 Thrift Client를 이용하는 방법이 있다. Thrift Client의 경우에는 Thread Safe 하지 않기 때문에, client factory를 리턴한다.

아래 설정을 보면 hive-client-factory hive서버의 ip,port를 지정하여 client를 생성하였다.

그리고, script 실행을 위해서 runner 를 지정한후에, 앞서 생성한 clientfactory reference하였다. 그리고 hive-runner에서 script location을 지정하여,password-analysis.hal 파일에 정의된 script가 실행되도록 정의하였다.

<hdp:hive-client-factory host="some-other-host" port="10001" />
<hdp:hive-runner id=”hiveRunner”hive-client-ref=”hiveClientFactory” run-at-startup=”false” pre-action=”hdfsScript”>
  <script location=”password-analysis.hal”/>
</hdp:/hiverunner>

실제 위의 Configuration을 가지고 수행하는 자바 코드를 보면 다음과 같다.

public class HiveAppWithApacheLogs {
 
         private static final Log log = LogFactory.getLog(HiveAppWithApacheLogs.class);
 
         public static void main(String[] args) throws Exception {
                 AbstractApplicationContext context = new ClassPathXmlApplicationContext(
                                   "/META-INF/spring/hive-apache-log-context.xml"
, HiveAppWithApacheLogs.class);
                 log.info("Hive Application Running");
                 context.registerShutdownHook();    
 
 
                 HiveRunner runner = context.getBean(HiveRunner.class);                
                 runner.call();
 
         }
}

Hive client를 만들때는 각 client가 생성될때마다 자동으로 initialize script를 실행할 수 있다.

<hive-client-factory host="some-host" port="some-port" xmlns="http://www.springframework.org/schema/hadoop">
   <hdp:script>
     DROP TABLE IF EXITS testHiveBatchTable; 
     CREATE TABLE testHiveBatchTable (key int, value string);
   </hdp:script>
   <hdp:script location="classpath:org/company/hive/script.q">
       <arguments>ignore-case=true</arguments>
   </hdp:script>
</hive-client-factory>

위의 설정은 client가 생성될때 마다 DROP TABLE xx 스크립트와, script.q에 지정된 스크립트 두개를 자동으로 수행하도록 한다.

마찬가지로, runner에서도 순차적으로 여러개의 쿼리가 수행되도록 설정할 수 있다.

JDBC 를 이용한 스크립트 수행

Hive Thrift 이외에도, RDBMS에 사용하는 JDBC 드라이버를 사용할 수 있다. Spring에서도 이 JDBC를 통한 Hive 통합을 지원한다.

사용 방법은 일반적인 JDBC Template을 사용하는 방법과 동일하다.

먼저 hive-driver Hive JDBC 드라이버를 지정한후, 이를 이용하여, hive data source를 정의한후, Jdbc template을 이 data source와 연결하여 사용한다. (아래 예제 참고)

<beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:c="http://www.springframework.org/schema/c"
         xmlns:context="http://www.springframework.org/schema/context"
         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
         
    <!-- basic Hive driver bean -->
    <bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>
 
    <!-- wrapping a basic datasource around the driver -->
    <!-- notice the 'c:' namespace (available in Spring 3.1+) for inlining constructor arguments, 
         in this case the url (default is 'jdbc:hive://localhost:10000/default') -->
    <bean id="hive-ds" class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
       c:driver-ref="hive-driver" c:url="${hive.url}"/>
 
    <!-- standard JdbcTemplate declaration -->
    <bean id=" jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" c:data-source-ref="hive-ds"/>
         
    <context:property-placeholder location="hive.properties"/>
</beans>

위의 Configuration을 수행하는 자바 코드는 다음과 같다.

Hive template을 이용한 Hive API 실행

JDBC Template과 유사하게 Hive 실행도 Template을 제공한다.

다음과 같이 context 파일에서, hive-template을 만든후에, 해당 template SomeClass라는 클래스에 someBean이란 이름으로 생성해서 weaving하였다.

<hdp:hive-client-factory ... />
<!-- Hive template wires automatically to 'hiveClientFactory'-->
<hdp:hive-template />
         
<!-- wire hive template into a bean -->
<bean id="someBean" class="org.SomeClass" p:hive-template-ref="hiveTemplate"/>

SomeClass에서는 template을 받아서, hivetemplate.execute() 메서드를 수행한다.

public class SomeClass {
 
private HiveTemplate template;
 
public void setHiveTemplate(HiveTemplate template) { this.template = template; }
 
public List<String> getDbs() {
    return hiveTemplate.execute(new HiveClientCallback<List<String>>() {
       @Override
       public List<String> doInHive(HiveClient hiveClient) throws Exception {
          return hiveClient.get_all_databases();
       }
    }));
}}

 

Spring Batch Integration

마지막으로 Hadoop integration등과 마찬가지로 Spring Batch 통합을 위하여, tasklet을 제공한다.

<hdp:hive-tasklet id="hive-script">
   <hdp:script>
     DROP TABLE IF EXITS testHiveBatchTable; 
     CREATE TABLE testHiveBatchTable (key int, value string);
   </hdp:script>
   <hdp:script location="classpath:org/company/hive/script.q" />
</hdp:hive-tasklet>

 

 

 

 

저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Apache Tomcat Tuning

성능과 튜닝/WAS 튜닝 | 2013.03.13 23:47 | Posted by 조대협

Tomcat Tuning Guide

 

Tomcat configuration $Tomcat/conf/server.xml

Assumption

This configuration is optimized for REST/HTTP API call. And it doesn’t use any reverse proxy like Apache, NginX etc. We will reside simple L4 switch infront of tomcat groups.

In addition we will not use Tomcat Clustering, Session etc. So the clustering configuration is omitted.

Listener Setting

 <Listener className="org.apache.catalina.security.SecurityListener" checkedOsUsers="root" /> 

checkedOsUser setting means Unix system user “root” cannot start Tomcat. If user starts tomcat as a root user it makes log file as a root user permission. In that case tomcat user cannot delete the log file.

<Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" /> 

This makes detect memory leak.

Connector Setting

protocol="org.apache.coyote.http11.Http11Protocol" 

It makes tomcat use BIO. Tomcat has options for IO (BIO,NIO,APR). APR is fastest IO setting. It uses Apache web server IO module, so it is fastest. But it uses C code (JNI call), it can have a risk to kill tomcat instance. (with core dump). APR is more faster about 10% than BIO. But BIO is more stable. Use BIO. (Default is BIO)

acceptCount="10"

It specifies server request queue length. If message is queued in the request queue, it means server cannot handle incoming message (it is overloaded). It will wait for idle thead and the request message will be pending. This setting reduce total size of request queue to 10. If the queue has been overflowed, client will get a error. It can protect server from high overload and let system manager to know the server has been overloaded.

enableLookups="false"

In Java Servlet Code, user can look up request message origin (IP or URL). For example user in yahoo.com send request to server, and Tomcat try to resolve incoming request IP address. “enableLooksups” option enables return DNS name not a IP address. During this processing Tomcat look up DNS. It brings performance degradation. This option removes DNS look up stage and increase performance.

compression="off" 

We are using REST protocol not a normal web contents like HTML,Image etc. This options allows to compress HTTP message. It consumes computing power but it can reduce network payload. In our environment compression is not required. It is better to save computing power. And in some particular Telco network, compression is not supported.

 connectionTimeout="10000"

It is HTTP Connection time out (client to server). It is milliseconds. (10,000 = 10 sec).

If server cannot make a connection from client til 10 sec. It will throw HTTP time out error. In normal situation, our API response time is under 5 sec. So 10 sec means, server has been overloaded. The reason why I increased the time up to 10 sec is, depends on network condition, connection time will be deferred.

maxConnections="8192"

The maximum number of connection, tomcat can handle. It means tomcat can handle maximum 8192 socket connection in a time. This value is restricted by Unix system parameter “ulimit –f” (You can check up in unix console)

maxKeepAliveRequests="1"

As I mentioned above, this configuration is optimized to REST API request not a common web system. It means client will send REST API call only. It sends the request and get a response. Client will not send request in a short time. It means we cannot reuse the connection from the client. So this setting turn of HTTP Keep Alive. (After response the request from client, tomcat disconnect the connection immediately)

maxThreads="100"

This defines total number of thread in Tomcat. It represents max number of active user at that time. Usually 50~500 is good for performance. And 100~200 is best (it is different depends on use case scenario).

Please test with 100 and 200 values and find value for performance. This parameter also get a impact from DB connection pool setting, even if we have a lot of thread , and the total number of db connection is not enough, the thread will wait to acquire the connection. 

tcpNoDelay="true"

This allows us to use TCP_NO_DELAY in tcp/ip layer. It makes send small packet without delay. In TCP, to reduce small package congestion, it gathers small packet to tcp buffer until it has been filled and send the packet. TCP_NO_DELAY option makes send small packet immediately even though TCP buffer is not full.

 

JVM Tuning

Java Virtual Machine tuning is also very important factor to run Tomcat

The focus of JVM tuning is reducing Full GC time.

-server

This option makes JVM to optimize server application. It tunes HotSpot compiler etc internally. This option is very important and mandatory in server side application

-Xmx1024m –Xms1024m -XX:MaxNewSize=384m -XX:MaxPermSize=128m

This memory tuning options, our infrastructure is using c1.mediuem amazon instance, so the available memory is about 1.7 gb total. Heap size is 1G and let them to have fixed size. It defines max 1Gb, min 1Gb heap size. The NewSize is 384mb (1/3 size of total heap size). 1/3 New Size is best performance usually. Perm size is defines area of memory to load class. 64mb is enough. But we will use 128m first time and tune based on gc log analysis later.

Total physical memory consumption is 1G heap + 128mb perm = 1.128 GB and JVM internally uses memory to run JVM itself. It consumes about 350~500mb. So total estimated required memory is about 1.128GB+500m = 1.5 GB.

As I mentioned, c1.mediuem size has only 1.7GB physical memory. If consumed memory exceeds actual physical memory, it makes disk swapping. If JVM memory is swapped out to disk, the performance is significantly degraded. Please take care swapping is not occurred.

-XX:-HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./java_pid<pid>.hprof

These options are for trouble shooting “OOM (Java Out Of Memory Error”. If out of memory error has been occurred. The memory layout will be dumped to disk. The location of dumpfile is specified by “-XX:HeapDumpPath” option

 -XX:ParallelGCThreads=2 -XX:-UseConcMarkSweepGC

These options specify GC strategy. It uses ParallelGC for Minor collection and 2 threads will be used for the Minor GC. And for Old area, concurrent gc will be used. It will reduce Full gc time

-XX:-PrintGC -XX:-PrintGCDetails -XX:-PrintGCTimeStamps -XX:-TraceClassUnloading -XX:-TraceClassLoading

These option specifies GC logging. It logs the GC log detail to stderr (console output). It shows usage trend os Java Heap memory, time stamp etc. (it contains old,new & perm area usage).

Especially, ClassLoading & UnLoading option show what class is loaded and unloaded to memory. It helps us to trace Perm Out of memory error.

 

Reference : http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html

Logging

1.     모든 log catalina.out 하나의 파일에 모두 쌓이게 할것 à LogAppender Console Appender로 변경하면 된다.

     Tomcat 자체가 쓰는 로그 (별도로 지정 안하면 원래 Console로 나옴)

     Application에서 LogBack을 이용해서 로깅 되는 로그 (별도로 ConsoleAppender를 개발단에서 정의해줘야 함)

     GC 로그 (별도로 지정 안하면 원래 Console로 나옴)

DB Connection Pool Setting

Please use tomcat dbpcp connection pool. Apache-common connection pool is not updated frequently. Tomcat dbcp connection pool is updated well.

The basic setting guide is “Let number of connection in the pool to keep exact number of connection”. It can be done by set min conn and max conn to same number.

Total number of connections (including read + write) should be around. 70~80. We are using 100 threads in one tomcat instance. The 70~80% will use db connection at the same time.

Library Setting

Developer packaged java lib inside war file (WEB-INF/lib) it can increase usage of perm memory. And sometime can bring confusion about “which lib is actually used”. It means if same lib(jar file) resides in $TOMCAT_HOME/lib and WEB-INF/lib. The lib in $TOMCAT_HOME/lib wil be used and lib is WEB-INF/lib will be ignored.

To solve this problem. Remove common library like my-sql-jdbc driver and dbcp lib jar file from WEB-INF/lib and move it into $TOMCAT_HOME/lib

저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Apache Camel Overview

아키텍쳐 /EAI | 2013.02.17 00:43 | Posted by 조대협

조대협 (http://bcho.tistory.com)

서문

예전 BEA나 오라클 시절에, EAI, ESB 등을 가지고 시스템간의 연계 업무를 많이 해왔던 나로써는 오픈 소스 기반의 EAI 프레임웍인 Apache Camel의 경우 상당히 흥미로운 주제였다. 과연 상용 제품 대비 얼마나 현실성있는 integration 기능을 제공할 것인가? 가 가장 큰 궁금증이었다.

BEA WebLogic EAI, Oracle Service Bus, AIA 등 여러 제품을 이용해서 직접 시스템간의 연계 시나리오도 구현해보고, BMT에서 타의 솔루션을 테스트도 해봤지만, 먼저 상용 솔루션의 약점은, 솔루션에서 제공하는 시스템간의 연계에 있어서 성능적인 제약이 매우 많이 따른 다는 것이다. 대부분 Message Queue 구조의 비동기 구조를 통해서, 양단간에 데이타 연계를 하는 시나리오가 많은데, 이 경우 비주얼하게 각 단계에 대해서 모니터링은 가능 하지만, 대규모 처리에 있어서 비동기의 약점 때문에 성능적인 제약이 따르고 특히 대규모 메세지 처리에 있어서 그리 매끄럽지 못했던 경험이 있다. BPEL과 같은 제품의 경우 양단간의 연동 시나리오중, 개개별 메세지를 콘솔을 통해서 트레이스할 수 있는데, 하루에 수백,수천건을 연계하는 거래의 경우에는 GUI 콘솔을 통해서 메세지를 전달 내용이나 에러 내용을 추적한다는 것은 애초 부터 불가능한 일이다.

 그래서 실제 프로젝트 때에는 송수신 시스템을 연동하는 양쪽의 아답터 (JMS, 메인프레임, TP 모니터, ERP, CRM 등 다양한 시스템과 연동이 필요하기 때문에, 상용 제품에서 지원되는 아답터는 매우 중요하다)와 기본적인 메세지 플로우 이외의 부분은 대부분 직접 구현하였다. 그래서, EAI와 같은 제품에서 필요한 구조와 이런 것을 요구 사항에 맞게 빠르게 만들었으면 하는 프레임웍이 있었으면 했는데, Apache Camel을 보니, EAI와 같은 연계 업무성 프레임웍으로 적합해 보인다.

Apache Camel의 특징을 보면

Message Integration Framework

Camel은 기본적으로 Message Integration Framework이다. Framework임을 강조하는 것은, ESB EAI 제품과 같은 솔루션이라기 보다는 이를 개발하기 위한 Framework으로 보는 것이 적절하다고 판단된다.

이유는 ESB EAI는 메세지 연동 기능을 수행하기 위한 컨테이너가 있다. (즉 서버가 있다) 아무런 연동 인터페이스가 없더라도, 연동 인터페이스를 수용할 수 있는 컨테이너가 있고, 이 컨테이너들은 모니터링, 로깅등의 관리 기능을 제공한다. 그러나 CamelLibrary이다. 자체적으로 Container를 가지고 있지 않다. 다만, OSGI 컨테이너나 WAS, Spring등에 탑재 되서 돌아갈 수 있다. 쉽게 이야기 하면, EAI ESB와 같은 솔루션 제품은 서버를 설치해야 하고, 서버 관리를 위한 관리 콘솔 UI를 제공한다. Camel Jar로 된 라이브러리 이다.

상용 제품 연계에는 적절하지 않다.

Camel을 보면, 타 솔루션을 연동하기 위한 아답터, (Camel에서는 Component라고 부른다)가 있다. 50여개의 아답터가 있기는 하지만, 기업에서 많이 사용되는 솔루션의 아답터는 턱없이 부족하다. 예를 들어 SAP ERP, Sieble CRM, Tuxedo, AS/400 과 같은 애플리케이션에 대한 아답터 지원이 없다. 상용 제품의 경우 이런 아답터 지원이 강력하다.

반면, DB,FTP,HTTP,JMS와 같이 일반적인 애플리케이션 개발에 사용되는 기술에 대한 아답터는 많이 제공된다.

Apache Camel의 컨셉

Apache Camel의 컨셉을 대략적으로 도식화 해보면 다음과 같다.


Route

먼저 Route 라는 개념을 이해해야 하는데, Route하나의 시스템간의 연동 인터페이스를 정의한다. 예를 들어 시스템 A에서 B로 웹서비스를 이용해서 연동을 했다면 이것이 하나의 Route가 된다. Route 1:1 관계뿐만 아니라 1:N의 관계도 지원 하는데, A 시스템에서 B 시스템으로 JMS로 메세지를 보내고, 그후에 C 시스템으로 FTP 파일 전송하는 인터페이스가 있다면, 이 역시 하나의 Route로 정의할 수 있다.

Component

Route는 크게 Component Processor로 정의 된다. 연계 하고자 하는 송신 시스템과 수신 시스템이 있을때, 각 송신,수신 시스템의 주소(IP) end point라고 정의하며, Component는 일종의 아답터의 개념으로, 송수신 시스템의 프로토콜에 맞는 컴포넌트를 선택해야 한다. 예를 들어 송신 시스템을 FTP로 연동하고 싶다면, FTP 컴포넌트를 , JDBC로 연동하고 싶다면, JDBC 컴포넌트를 사용해야 한다. 컴포넌트는 송신 시스템으로 부터 메세지를 읽어드리고, 수신 시스템으로 메세지를 전송하는 역할을 한다.

Processor

메세지를 읽고 그냥 보내기만 한다면 별 문제가 없겠지만, 시스템간의 연동에는 메세지를 받은 후에 수신 시스템으로 보내기전에 하다못해 로깅을 남기더라도 무엇인가 항상 처리를 한다. 이렇게 송신 시스템으로 부터 받은 메세지를 수신 시스템에 보내기전에 무엇인가 처리를 하는 부분을 Processor라고 하는데, Processor는 그 특징에 따라서 몇가지로 나뉘어 질 수 있다.

1. Message Transformation

Message Format Transformation

메세지의 포맷을 변경하는 작업을 수행한다. 예를 들어 JSON으로 들어온 데이타를 XML로 변경하는 것들이 이에 해당한다.

Message Type Transformation

메세지의 데이타 타입을 변경한다. String으로 들어온 메세지를 jms:TextMessageType으로 변경하는 등의 작업을 수행한다.

이러한 메세지 변환은 Java Class를 정의해서할 수 있으며, 이외에도 Camel에서 미리 제공되는 Converter, XSLT를 이용한 변환 Apache Velocity의 같은 Template 엔진등 다양한 방법을 이용해서 변환이 가능하다.

2. Routing

메세지 라우팅은 들어온 메세지를 다수의 수신 시스템에 조건에 따라서 라우팅할 수 있는 기능이다.


Processor 단계는 쉽게 생각하면, 메세지를 받은 후, 보내기전에 무엇인가.. 를 하는 곳이다. 앞서 설명한것처럼, 메세지를 변환하거나, 라우팅할 수 도 있고, 로깅을 할 수도 있다. 들어온 메세지에 대해서 유효성 검증을 할 수 도 있다. 이러한 Processor는 자주 사용되는 메세지 변환등의 패턴은 Camel에 의해서 제공되지만, Java 클래스를 구현하면, 무엇이든지 가능하기 때문에 메세지에 대한 거의 모든 처리를 구현할 수 있는 단계이다.

 

이렇게, “송신 Component à Processor à 수신 Component”로 하나의 Route가 정의되는데, 이렇게 Route를 정의하여 객체화 시키는 것이 RouteBuilder이다. 이렇게 RouteBuilder에 의해서 생성된 Route CamelContext에 바인딩이 된다. CamelContext SpringContext와 유사한 개념으로 생각하면 된다. Route에 대한 집합이며, Route에 대한 라이플 사이클 (Start up,Stop)등을 관리한다.

간단한 개발 예제

이쯤에서 간단한 예제를 하나 보자. 다음은 FTP로 원격지 디렉토리의 파일을 읽어서 Local Directory에 쓰는 Camel Application이다.

먼저 Maven을 이용해서 다음과 같이 Camel Project를 만든 후에,

mvn archetype:create -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-java -DarchetypeVersion=2.5.0 -DgroupId=camelinaction -DartifactId=order-router

다음과 같은 코드를 구현한다.

public class MyRouteBuilder extends RouteBuilder {

    public static void main(String... args) throws Exception {

        Main.main(args);

    }



    public void configure() {

        from("ftp://userid@168.1.2.3/camel/src/?password=password&stepwise=false")

         .to("file:c:/temp/");

 

    }

}

위의 코드는 RouteBuilder를 구현한 코드로 configure 메서드에서 FTP로 파일을 읽어와서 Local에 저장하는 route를 정의한 예이다.

DSL

눈치가 빠른 사람이라면, 위의 코드가 몬가 이상하다는 것을 느꼈을지도 모르겠다. configure 메서드안에 from(“….”).to(“….”).??

함수().함수() 호출? Java에서 이런 문법이 있었던가?

정확히 이야기 하면 이건 일반적인 Java Coding이 아니라 DSL (Domain Specific Language)이다. DSL은 특수목적으로 정의된 언어를 이야기 하는데, 여기서 사용된 Java DSL Route를 효율적으로 정의하기 위해서 Camel에 정의된 내장 스크립트 언어이다.

JavaDSL을 이용하면 상당히 쉽게 Route를 구성할 수 있는데, Camel은 이 Java DSL 뿐만 아니라, 다음과 같이 상당히 다양한 DSL을 제공한다.

Ÿ   Spring XML : XML 기반으로 정의된 DSL , Spring xml configuration 파일에 정의

Ÿ   Groovy,Scala DSL : Groovy 언어, Scala 언어 기반의 DSL

Ÿ   Annotation DSL : Java Annotation 기반의 DSL

Ÿ   기타 (Kotlin DSL, Bluprint XML etc)

(참고: http://camel.apache.org/dsl.html)

DSL Route Processor 부분을 정의하는데 주로 사용되는데, 송수신, Component만 정의되면, 사실상 시스템 연계에서 구현해야 되는 부분은 거의 Processor이며, Processor의 로직 대부분은 메세지 처리, 변환,라우팅에 해당하는 내용이기 때문에 특수목적의 DSL을 사용할 수 있으며, 또한 DSL 사용을 통해서 개발 생산성이나 코드양을 획기적으로 줄일 수 있다.

예를 메세지를 스트링으로 받아서 reverse하여, 화면에 출력하는 경우 Java DSL을 이용하면

from("direct:test")

      .transform(new Expression() {

         @Override

         public Object evaluate(Exchange e) {

            return new StringBuffer(e.getIn().getBody().toString()).reverse().toString();

         }

      })

      .process(new Processor() {

         @Override

         public void process(Exchange e) {

           System.out.println(e.getIn().getBody());

         }

      });

인데 반해서, Groovy DSL을 사용하면

from('direct:test')

      .transform { it.in.body.reverse() }

      .process { println it.in.body }

같이 단지 3줄이면 간단하게 끝난다.

Camel EIP

시스템 연동에는 사실 많은 패턴들이 있다. 메세지 변환, 라우팅, 로그 추적을 위한 글로벌 트렌젝션 ID, 장애시 재처리등등 여러가지 방법이 있는데, 이를 패턴화 시켜놓은 것이 Enterprise Integration Pattern (EIP)이다. EIP


책에 잘 정의 되어 있으니 참고하기 바란다.

사실 Camel에서 반영 및 설계되어 있는 EIP도 이 책에 있는 내용을 대부분 바탕으로 하여 구현되어 있다. (Camel을 쓸려면 이책은 꼭 한번 읽어봐야 하지 않을까 싶다.)

 

참고 : http://camel.apache.org/enterprise-integration-patterns.html 에 간단하게 대표적인 EIP 들이 정리되어 있다.

상용 지원

오픈소스가 무료이라서 저 비용의 장점을 가지고 있지만, 반대로, 오픈소스는 기술지원이나 교육 부분에서 매우 취약하다. 그래서 RedHat과 같이 오픈소스 제품에 대해서 subscription base로 기술 지원이나 교육 및 컨설팅을 제공하는 회사들이 존재하는데, Apache Camel 역시 http://fusesource.com/ 는 곳에서 상용 기술 지원을 받을 수 있다. (얼마전에 Redhat에 인수되었다.) Fuse Source의 경우, Camel을 이용하여 제품을 만들어서 판매하고 있으며, Camel 프로젝트에 참여하고 있는 Commiter 들을 많이 보유하고 있다고 한다.

http://fusesource.com/products/enterprise-camel/ 들어가면 Fuse Source에서 개발 및 판매하는 상용 Camel을 다운로드 받을 수 있으며, 개발 관련 및 트레이닝 자료들을 살펴볼 수 있다.

결론

짧은 시간내에 살펴본 제품이라서 아직 완벽한 특성은 파악하지는 못했다. 그러나 EAI, ESB와 같이 메세지 기반의 연동 처리를 하기 위한 시스템을 개발한다면, 개발 프레임웍으로 충분히 활용할만 하다. 특히 엔터프라이즈의 특정 애플리케이션 (ERP,CRM)등이 아니라 일반적인 프로토콜 (HTTP,JMS,TCP)등을 사용하여 그리 복잡하지 않으면서 고속의 연동 처리를 필요로 한다면 아주 유용하게 사용할 수 있을 것이라 판단된다.

시간 관계상 에러처리, 모니터링, 배포 및 확장성등 운영 관점에 대한 부분은 깊게 살펴보지 못했지만, 그냥 개발 프레임웍으로 본다면, 어짜피 운영 관련 부분은 직접 구현해야 하니까는 괜찮지 않을까 싶다.

다만 단순히 프레임웍이기 때문에, 클러스터링 기반의 HA나 부하 분산등의 기능을 제공하지 않는 것이 아쉬운 점이다.

시스템간의 연동의 중요성과 편이성을 경험한 나한테는, Apache Camel물건은 물건이다.” 라는 결론이 적절하다고나 할까?

 

ErrorHandling - http://bcho.tistory.com/716


참고:ErrorHandling에 대한 관련글 -

http://www.consulting-notes.com/2010/08/camel-exception-handling-overview.html 

정리가 잘되어 있네...

 

저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

'아키텍쳐  > EAI' 카테고리의 다른 글

Apache Camel Error Handling  (0) 2013.02.20
Apache Camel Overview  (0) 2013.02.17
EAI (Enterprise Application Integration) 추진 전략  (1) 2009.07.16
ETL vs EAI  (0) 2009.06.16
EAI 도입 전략  (0) 2007.08.21

Hadoop Architecture Overview

요즘 클라우드와 빅데이타 그리고 분산 컴퓨팅이 유행하면서 가장 많은 언급 되는 솔루션중하나가 Hadoop이다. Hadoop 이 무엇이길래 이렇게 여기저기서 언급될까? 본 글에서는 Hadoop에 대한 소개와 함께, Hadoop의 내부 동작 아키텍쳐에 대해서 간략하게 소개 한다.

What is Hadoop?

Hadoop의 공식 소개를 홈페이지에서 찾아보면 다음과 같다.

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using a simple programming model. ’

정의를 요약하면, Hadoop은 여러 컴퓨터로 구성된 클러스터를 이용하여 큰 사이즈의 데이타를 처리하기 위한 분산 처리 프레임웍이다.

구조를 보면 엔진 형태로 되어 있는 미들웨어와 소프트웨어 개발 프레임웍의 형태를 띄고 있고, 대용량 데이타를 분산처리를 통해서 처리할 수 있는 솔루션으로, OLTP성의 트렌젝션 처리 (웹과 같이 즉시 응답이 필요한 시스템)보다는 OLAP성의 처리 (데이타를 모아서 처리 후 일정 시간 이후에 응답을 주는 형태)를 위해 디자인된 시스템으로 수분~수일이 소요되는  대규모 데이타 처리에 적합한 프레임웍이다.

Hadoop을 기반으로 한 여러가지 솔루션이 있으나 본 글에서는 Hadoop 자체에 대해서만 설명하도록 하겠다.

Map & Reduce의 기본

Hadoop의 분산 처리 방식은 기본적으로 “Map & Reduce”라는 아키텍쳐를 이용한다. Map & Reduce는 하나의 큰 데이타를 여러개의 조각으로 나눠서 처리 하는 단계 (Map), 처리 결과를 모아서 하나로 합쳐서 결과를 내는 단계 (Reduce)로 나뉘어 진다.

 

 

<그림 Map & Reduce의 기본 개념 >

예를 들어 한국에 사는 모든 사람의 수입의 합을 더한다고 하자. 우리한테는 한국에 있는 모든 사람의 수입이 적혀 있는 텍스트 파일이 하나 있다. 이 파일을 이용해서 각각 사람의 수입을 순차적으로 더해서할 수 도 있지만, 전체 파일을 10조각, 100조각으로 나눠서 각 조각의 수입합을 계산한후, 그 결과를 하나로 더하면, 조각을 나눈만큼의 계산을 병렬로 처리할 수 있기 때문에 조금 더 빠른 결과를 가질 수 있다. 이렇게 하나의 파일을 여러 조각으로 나눠서 계산 하는 것을 Map, 그리고 합치는 과정을 Reduce라고 한다.

이러한 Map & Reduce를 하기 위해서는 Input 데이타를 저장하고, 이 데이타를 조각으로 나눠서 저장하고, 조각에서 처리된 임시 결과를 저장 그리고 합쳐서 저장할 각각의 저장 공간이 필요하다. 이 공간은 전체 분산 처리 시스템에 걸쳐서 접근이 가능해야 하고, 대용량의 데이타를 저장할 수 있어야 하는데, Hadoop에서는 이 공간으로 분산 파일 시스템 (Distributed File System)을 사용하며, 이를 HDFS (Hadoop Distributed File System)이라고 한다. 그리고 위에서 설명한 것 처럼 MR을 위해서 데이타를 처리하는 부분을 Map & Reduce 모듈이라고 한다.

사용 시나리오

Hadoop Map & Reduce는 대용량 데이타에 대한 분석에 대해서 최적화 되어 있다.

즉 웹 시스템이나 OLTP (Online Transaction Processing)과 같은 1~5초내에 응답이 오는 형태의 Request/Response 방식의 Synchronous 형식의 시나리오에는 적합하지 않고, Input을 넣은 다음 수분 후에 결과를 받아서 보는 비동기 (Asynchronous) Deferred 형태의 시스템에 적절하다.

 수학적 계산을 위한 연산 처리, 대규모 저장 및 분석, BI (Business Intelligence)와 같은 데이타 분석과 같은 후처리(지연처리) 중심의 분석 시나리오에 적합하다.

Hadoop 구성

앞에서 설명한 것과 같이 Hadoop은 크게 분산 데이타 처리를 하기 위한 Map & Reduce 모듈(이하 MR) MR Input/Output 데이타를 저장하는 파일시스템인 HDFS (Hadoop Distributed File System)으로 구성되어 있다.

HDFS (Hadoop Distributed File System)

HDFS는 분산 데이타 처리를 위해서 데이타를 저장하기 위한 파일 시스템으로, 다음과 같은 특징을 가지고 있다.

  • 대용량의 파일 (페타 바이트)을 저장할 수 있다.

  • 많은 수의 파일을 저장할 수 있다.

  •  Streaming Data Access

  •  Commodity Hardware

  • Multiple writers, arbitrary file modifications

HDFS의 구성 컴포넌트는 크게 두가지로 나뉘어 진다. (Namenodes, Datanodes).

Namenodes

Namenodes는 일종의 Master node로 파일에 대한 메타 데이타를 저장하는 노드로, 디렉토리 구조, 파일에 대한 각종 메타 데이타, 그리고 물리적 파일이 저장되어 있는 위치등을 저장한다.

Datanodes

Datanodes는 실제 파일을 저장하고 읽어온다. 하나의 파일을 블록이라는 단위로 나눠서 저장하는 역할을 수행한다. 그리고 Namenodes와 주기적으로 통신하여 저장하고 있는 블록에 대한 정보를 Namenodes에 저장하도록 한다.

Namenodes에는 모든 블록에 대한 메타정보가 들어와 있기 때문에, Namenodes가 장애가 나면 전체 HDFS 이 장애가 나는 SFP ( Single Failure Point )가 된다. Namenodes에 대한 이중화가 필요하다. Namenodes에 대한 이중화 방안에 대해서는 추후에 설명하도록 한다. Namenodes SFP로 작용하는 것은 Hadoop 운영상에 아주 중요한 운영 포인트로 존재한다. 근래에는 HDFS의 약점을 보완하기 위해서 GlusterFS Hadoop을 지원하면서, 파일 시스템으로 HDFS를 사용하는 대신 GlusterFS로 대처할 수 있다.

블럭 (Block)

블럭은 HDFS에서 Read Write를 하는 최소 단위이다. 하나의 파일을 여러개의 블럭으로 나눠서 저장된다. Hadoop의 블럭 사이즈는 일반적인 파일 시스템의 블럭사이즈 ( Kilobytes – 일반적으로 파일시스템에서는 512 KB )에 비해서 큰 블럭사이즈를 사용한다. Hadoop에서는 디폴트로 64MB를 사용하고, 보통 128MB를 사용한다.

클 블럭사이즈를 사용하는 이유는, MR 처리에 있어서 Map Task가 하나의 파일 사이즈 단위로 처리하기 때문에, 작은 파일 억세스 보다는 Map Task 단위로 처리할 수 있는 단위가 필요하다. (이것이 블럭) 이를 위해서 큰 사이즈 단위로 파일 처리를 할 수 있는 블럭을 지정하는 것이다.

또한 블럭 크기를 크게 함으로써, 해당 파일에 대한 Seeking Time을 줄일 수 있고, 블럭 사이즈를 적게 하면 Master Node에서 저장 및 처리해야 하는 데이타의 양이 많아지기 때문에 Master Node에 많은 부하가 걸리기 때문에 큰 블럭 사이즈를 사용한다. 이런 이유로 반대로 블럭 사이즈가 작거나 사이즈가 작은 데이타 처리의 경우 Hadoop에서는 충분한 분산 처리 성능을 기대하기 어렵다.

HDFS는 대규모 분산 처리에 필요한 대용량 input 데이타를 저장하고 output 데이타를 저장할 대용량 파일 시스템을 지원한다. HDFS 시절 이전에는 고가의 SAN (Storage Area Network)장비나 NFS 장비를 사용했어야 했는데, Hadoop은 일반 x86 서버에 Disk를 붙인 형태의 저가의 서버 여러개를 연결하여 대규모 분산 파일 시스템을 구축할 수 있게 해줌으로써, 값비싼 파일 시스템 장비 없이 분산 처리를 가능하게 해주는 것이다

Read/Write Operation

1) Read Operation

Client에서 Read를 수행하면, 먼저 Client SDK Namenode에 해당 파일의 데이타 블럭들이 어디에 있는지 블록의 위치를 먼저 물어온 다음에, 순차적으로 해당 데이타 블록이 저장되어 있는 datanodes로 부터 데이타를 블록을 읽어온다.

 

<그림. HDFS Read Operation >

이 과정에서 Namenode라는 놈이 아주 기특한 일을 하는데, 기본적으로 HDFS는 하나의 파일 블록을 저장할때 하나의 datanode에 저장하는 것이 아니라 N개의 datanode에 복제해서 저장한다. (장애 대응을 위하여). Namenode가 블록의 위치를 리턴할때, Hadoop Client로 부터 가까운 곳에 있는 datanode (같은 서버, 같은 Rack, 같은 데이타 센터 순서로..)를 우선으로 리턴하여 효율적인 Read Operation을 할 수 있도록 한다.

2) Write Operation

 

<그림. HDFS Write Operation>

파일 블록 저장은 약간 더 복잡한 과정을 거치는데

파일을 Write를 요청하면,

     먼저 Namenode에서 File Write에 대한 권한 체크등을 수행한다.

     Namenode에서 파일이 Writing Block의 위치한 Datanode를 리턴한다. (1)
파일을 쓰다가 해당 블록이 다 차면, Namenode에 다음 Block이 저장되는 Datanode의 위치를 물어본다.

     Client Stream을 통하여 해당 Datanode Block에 파일을 Write한다. (2)

     Write된 파일을 복제 Datanodes (※ 여기서는 복제노드와 원본 노드를 포함하여 총 3개의 노드가 있다고 가정하자)로 복제(복사)한다. (3,4)

     복제되는 Datanodes들 쌍을 pipeline이라고 하는데, 내부 정책에 따라서 서로 복제할 Datanodes들을 미리 정해놓고 이를 통해서 복제한다. (pipeline Datanode의 물리적 위치 – Rack, 데이타 센터 등을 고려해서 자동으로 결정된다.)

     복제가 모두 끝나면 ACK를 보낸다. (5,6,7)

     파일 Writing을 완료한다.

 

이번 글에서는 간단하게 Hadoop에 대한 개념 소개와 Hadoop의 하부 파일 시스템인 HDFS에 대해서 알아보았다. 다음 글에서는 Hadoop의 Map & Reduce Framework에 대해서 살펴보기로 한다.

 

저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
http://libcloud.apache.org/

Public Cloud의 Management 기능들을 Abstract해놓은 Lib. OpenSource.
CF. JCloud.

저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

'클라우드 컴퓨팅 & NoSQL > 분산컴퓨팅&클라우드' 카테고리의 다른 글

분산 처리 오픈 소스 Gearman 퀵리뷰  (0) 2011.10.24
Message Queue Comparision  (0) 2011.06.03
Apache LibCloud  (0) 2011.05.26
IOMeter  (0) 2011.03.30
BOOK-The Cloud At Your Service  (0) 2011.03.23
예전에 정리해놓은 IP TV 아키텍쳐  (0) 2011.03.02

Introduction of Cassandra

카산드라는 구글의 BigTable 컬럼 기반의 데이타 모델과 FaceBook에서 만든 Dynamo의 분산 모델을 기반으로 하여 제작되어 Facebook에 의해 2008년에 아파치 오픈소스로 공개된 분산 데이타 베이스 입니다. 기존의 관계형 데이타 베이스와 다르게 SQL을 사용하지 않는 NoSQL의 제품중의 하나이며, 대용량의 데이타 트렌젝션에 대해서 고성능 처리가 가능한 시스템이다.(High-Scale). 노드를 추가함으로써 성능을 낮추지 않고 횡적으로 용량을 확장할 수 있다.

 얼마전에 트위터도 MySQL에서 Cassandra로 데이타베이스를 전환하였다고 한다..

자바로 작성되었음에도 불구하고, 데이타베이스라는 명칭에 걸맞게 여러 프로그래밍 언어를 지원합니다. Ruby,Perl,Python,Scala,Java,PHP,C# 

데이타간의 복잡한 관계 정의(Foreign Key)등이 필요없고, 대용량과 고성능 트렌젝션을 요구하는 SNS (Social Networking Service)에 많이 사용되고 있습니다. 성능이나 확장성과 안정성이 뛰어나지만 안타깝게도 Global Scale (여러 국가에 데이타 센터를 분리 배치하여 배포하고, 데이타 센타간 데이타를 동기화 하는 요구사항) 은 지원하지 않습니다. Global Scale이 필요하다면, MySQL기반의 geo replication Sharding이 아직까지는 가장 널리 쓰이는 아키텍쳐 같습니다

Data Model

카산드라의 데이타 모델은 다음과 같다.

전통적인 관계형 데이타 베이스와 다른 구조를 가지고 있다.먼저 데이타 모델에 대한 개념을 잡아보면

Column
컬럼은 컬럼 이름과, 값으로 이루어진 데이타 구조체이다.

{name: “emailAddress”, value:”cassandra@apache.org”}
{name:”age” , value:”20”}

Column Family

컬럼 패밀리는 컬럼들의 집합이다. 관계형 데이타 베이스의 테이블을 생각하면 되는데, 약간 그 개념이 다르다. 차이점은 나중에 설명하기로 하고, 컬럼 패밀리는 하나의 ROW를 식별하기 위한 Key를 갖는다. 하나의 Key에 여러개의 컬럼이 달려 있는 형태가 컬럼 패밀리이다.

하나의 Row를 예를 들어보면

Cassandra = { emailAddress:”casandra@apache.org” , age:”20”}

과 같은 형태이다. Cassandra가 해당 Row에 대한 Key가 되고, emailAddress age라는 이름의 두개의 컬럼을 가지고 있으며 각 컬럼의 값은 “casandra@apache.org” “20”이다.

여러개의 Row를 가지고 UserProfile이라는 이름의 컬럼 패밀리를 보면

UserProfile={
  Cassandra={ emailAddress:”casandra@apache.org” , age:”20”}
  TerryCho= { emailAddress:”terry.cho@apache.org” , gender:”male”}
  Cath= { emailAddress:”cath@apache.org” , age:”20”,gender:”female”,address:”Seoul”}
}

과 같이 표현할 수 있다. 여기서 주목할만한 점이 각 Row의 데이타 스키마가 다르다는 것이다. Cassandra Row emaillAddress age라는 컬럼을 가지고 있고, Terry.Cho emaillAddress gender라는 컬럼을 가지고 있다. 이 처럼 카산드라는 각 Row마다 다른 형태의 데이타 스키마를 가질 수 있는데, 이러한 특징은 “Schemeless”라고 한다.(키에 바인딩되는 데이타 구조는 같은 컬럼 패밀리라도 각 키별로 다를 수 있다.)

KeySpace

KeySpace는 논리적으로 ColumnFamily를 묶어주는 개념입니다. 단지 묶어만 줄뿐 데이타 구조나 관계에서는 별다른 영향을 주지 않습니다.

Super Column & Supper Column Family

앞에서 설명드렸던 컬럼에서 컬럼의 Value String이나 Integer와 같은 Primitive형 뿐만 아니라 컬럼 자체가 다시 들어갈 수 있습니다. 예를 들어 이런 구조입니다.

{name:”username” 
 value: firstname{name:”firstname”,value=”Terry”} 
 value: lastname{name:”lastname”,value=”Cho”} 
}

username이라는 컬럼 안에 firstname lastname이라는 두개의 컬럼이 들어가 있는 구조입니다.

마찬가지 형태로 Column Family 안에도 Column Family가 들어가는 Super 구조가 가능합니다.

UserList={ 
   Cath:{ 
       username:{firstname:”Cath”,lastname:”Yoon”}
       address:{city:”Seoul”,postcode:”1234”}
           }
    Terry:{ 
       username:{firstname:”Terry”,lastname:”Cho”}
       account:{bank:”hana”,accounted:”1234”} 
           }
 }

UserList라는 Column Family 안에, 각각 Cath Key username address라는 Column Family를 가지고 있고, Terry라는 Key username account라는 Column Family를 가지고 있습니다.  

Data Model for Java Developer

간단하게 카산드라의 데이타 구조에 대해서 살펴보았는데, 자바 개발자분이시라면 HashTable이 떠오를겁니다. 데이타 모델을 HashTable과 비교해서 설명해보면 다음과 같은 형태가 됩니다.코드로 이야기 하면 대략 다음과 같은 형태가 되겠지요


앞서 들었던 Column Family의 데이타 구조를 자바 코드로 표현하면 다음과 같은 구조가 됩니다.

UserProfile={
  Cassandra={ emailAddress:”casandra@apache.org” , age:”20”}
  TerryCho= { emailAddress:”terry.cho@apache.org” , gender:”male”}
  Cath= { emailAddress:”cath@apache.org” , age:”20”,gender:”female”,address:”Seoul”}
}

자바 코드

class Keyspace{
           HashTable keyspaces = new HashTable();          

           createColumnFamily(String name){
                     keyspaces.put(name,new HashTable);
           }

           putValue(String columnFamily,String key,Object value){
                     Hashtable cf = keyspaces.get(columnFamily);
                     cf.put(key,value);
           }
}

 

class TerryVO{ // Terry is a Key
           String emailAddress; // each column
           String gender;
           // setter & getter
}

 class CathVO{ // Cath is a Key

           String emailAddress;
           String age;
           String gender;
           // setter & getter 
}

KeySpace myspace;
myspace.createColumnFamily("UserProfile");
myspace.putValue("UserProfile","TerryCho",new TerryVO("terry.cho@apache.org","male");
myspace.putValue("UserProfile","Cath",new CathVO("cath@apache.org","20","female")

 자바 개발자분들이시라면 쉽게 이해하실 수 있을것 같고
구조를 분석하다보니 오라클의 데이타 그리드 솔루션은 Coherence와 데이타 구조가 매우 유사합니다. 요즘 이게 유행인가 보네요

Cassandra Test

개념을 이해했으면 실제 테스트를 한번 해보도록 하겠습니다.

먼저 아파치 카산드라 프로젝트(http://incubator.apache.org/cassandra/) 에서 카산드라를 다운 받습니다. 압축을 푼후에 bin/cassandra.bat를 실행시킵니다. (클러스터로 기동할 수 도 있으나 여기서는 단순하게 하나의 노드만 뛰어보도록 합니다.)

이제 카산드라 커맨드 라인 인터페이스(CLI)를 시키고(/bin/cassandra-cli.bat) 다음 카산드라 노드에 연결합니다. 포트는 디폴트로 9160 포트가 지정되어 있으며 /conf/storage-conf.xml에서 Listen Address Port를 변경할 수 있습니다.  

/conf/storage-conf.xml 파일에는 default Keyspace1이라는 이름으로 Keyspace가 정의되어 있습니다. Keyspace1에 지정되어 있는 Column Family(CF) 형식은 다음과 같습니다.


Standard2 CF Terry이라는 Key Gender라는 Column Male이라는 값을 넣고 다시 조회해보겠습니다.


다음번에는 Java Code를 이용하여 카산드라에 접근하는 방법에 대해서 알아보도록 하겠습니다.

참고 할만한 자료

저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
server.xml에 AJP 프로토콜 설정하는 부분의 샘플이 빠져 있어서 삽질을 좀 했네.
아래와 같이 server.xml에서 AJP Port를 열어줘야함.

<Server port="8000" shutdown="SHUTDOWN" debug="0">
    <Service name="Tomcat-Standalone">
        <Connector className="org.apache.coyote.tomcat4.CoyoteConnector" port="8080" minProcessors="5"
                   maxProcessors="75"
                   enableLookups="false" redirectPort="8443" acceptCount="10" debug="0" connectionTimeout="20000"
                   useURIValidationHack="false" URIEncoding="UTF-8"/>
        <Connector port="8009" protocol="AJP/1.3" redirectPort="8443"
                   minProcessors="50" maxProcessors="150"/>

        <Engine name="Standalone" defaultHost="localhost" debug="0">
        <!--
        <Listener className="org.apache.jk.config.ApacheConfig" modJk="/usr/local/apache2/modules/mod_jk.so" />
        -->


            <Host name="localhost" debug="0" appBase="webapps" unpackWARs="true" autoDeploy="false">

                <Context path="" docBase="../confluence" debug="0" reloadable="false">
                    <!-- Logger is deprecated in Tomcat 5.5. Logging configuration for Confluence is specified in confluence/WEB-INF/classes/log4j.properties -->
                    <Manager pathname="" />
                </Context>
            </Host>

        </Engine>

Apache에서는 mod_jk 설치한후

LoadModule jk_module modules/mod_jk.so
<IfModule jk_module>
 JkWorkersFile /usr/local/apache2/conf/workers.properties
 JkLogFile logs/mod_jk.log
 JkLogLevel info
</IfModule>

# First Virtual Host.
#
<VirtualHost 61.109.254.15:80>
DocumentRoot "/usr/local/confluence-2.10.3-std/conf"
ServerName wiki.javastudy.co.kr
JkMount /* confluence
</VirtualHost>

workers.propertis에 다음가 같이 기술
==
worker.list=confluence
worker.confluence.port=8009
worker.confluence.host=61.109.254.15
worker.confluence.type=ajp13

==


저작자 표시
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
 

티스토리 툴바