블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

클러스터 상에서 하둡 배포 아키텍쳐


조대협 (http://bcho.tistory.com)


오늘 빅데이타 관련 교육을 받다가 클라우드 상에서 하둡 클러스터 활용에 대한 영감을 받은 부분이 있어서 정리해보고자 한다. 하둡의 경우에는 On-prem 환경에 적절하게 디자인이 된 오픈 소스라서, 이걸 클라우드에서 사용할 경우에도 on-prem에서 사용하는 형태와 유사하게 사용하는 경우가 많다. 일종의 습관 또는 관성이라고 해야 하나? 인프라가 바뀌면 그 장점에 맞는 아키텍쳐를 선택해야 하는데, 이 부분을 놓치고 있지 않았나 싶다.

Job별 클러스터를 생성하는 아키텍쳐

job을 수행하는 방법을 보면, 일반적으로 On-Prem에서 사용하는 방법은 하나의 하둡 클러스터에 Job을 실행하고 Job이 끝나면 다음 Job을 수행하는 방식을 사용한다.



이러한 경험(습관)때문인지 보통 클라우드 상에 하둡 클러스터를 만들어놓고 그 클러스터에 On-prem과 마찬가지 방식으로 Job을 넣고 그 작업이 끝나면 같은 클러스터에 Job을 넣는 방식을 사용한다.

그런데 클라우드의 특성상 자원이 거의 무한대인데, 여러개의 Job을 하나의 클러스터에서 순차적으로 실행할 필요가 있을까?


Job별로 클러스터를 생성해서 연산을 하고 Job이 끝나면 클러스터를 끄는 모델을 사용하면, 동시에 여러 작업을 빠르게 처리할 수 있지 않을까?



이 경우 문제가 될 수 있는 것이 같은 데이타셋을 가지고 여러가지 Job을 실행할 경우 기존의 On-Prem의 경우에는 동시에 같은 데이타셋에 접근할 경우 오버로드가 심하기 때문에, 어렵지만 클라우드의 경우에는 바로 HDFS를 사용하는 것보다는 GCS와 같은 클라우드 스토리지를 사용하는데, 여기서 오는 장점이 이런 클라우드 스토리지에 저장된 데이타는 동시에 여러 하둡 클러스터에서 접근이 가능하다.


이러한 아키텍쳐를 생각하게된 이유는 구글 클라우드의 특성 때문인데, 구글 클라우드의 과금 모델은 분당 과금 모델을 사용한다. 그래서 Job별로 클러스터를 전개했다가 작업이 끝난 다음에 끄더라도 비용 낭비가 없다. 시간당 과금인 경우에는 클러스터를 껐다켰다 하면 비용 낭비가 발생할 수 있기 때문에 바람직하지 않을 수 있다. 예를 들어서 1시간 10분이 걸리는 작업과 20분이 걸리는 작업을 각각 다른 클러스터에서 돌린다면 2시간, 그리고 1시간 비용이 과금되기 때문에 총 3시간이 과금된다.

그러나 구글의 분당 과금을 적용하면 총 1시간30분이 과금되기 때문에 비용이 1/2로 절감된다.


또한 구글 클라우드의 경우 하둡 클러스터 배포시간이 통상 90초이내로 빠르기때문에, Job마다 클러스터를 생성하고 끈다고 해도, 실행 시간이 크게 늘어나지 않는다.


클러스터별로 인프라를 최적화

만약에 위에처럼 Job별로 클러스터를 나눈다면, Job 의 특성에 맞춰서 클러스터의 인스턴스 수나 구조(?)를 변경하는 것도 가능하다.

즉 많은 컴퓨팅 작업이 필요한 Job이라면 클러스터에 많은 인스턴스를 넣고, 그렇지 않다면 인스턴스 수를 줄일 수 있다.




조금 더 나가면, 상태 정보 같이 HDFS에 저장하는 데이타가 적을 경우 워커노드가 중간에 정지되더라도 전체 연산에 크게 문제가 없을 경우에는 Preemptible VM과 같이 가격은 싸지만 저장 기능이 없는 VM의 수를 늘려서 전체 비용을 낮출 수 있다.




여기서 한걸음 더 나가면, 구글 클라우드의 인스턴스 타입중의 하나가 Cutomizable 인스턴스로 CPU와 메모리 수를 마음대로 조정할 수 있다. 즉 Job이 메모리를 많이 사용하는 Spark과 같은 Job일 경우 클러스터를 구성하는 인스턴스에 CPU보다 메모리를 상대적으로 많이 넣고 구성하는 것들이 가능하다.


아키텍쳐 설계도 의미가 있었지만, 그동안 간과하고 있었던 점이 기술이나 인프라가 바뀌면 활용 하는 방법도 다시 고민해야 하는데, 그동안의 관성 때문인지 기존 시스템을 활용하는 방법을 그대로 의심 없이 사용하려 했다는 것에 다시한번 생각할 필요가 있었던 주제였다.









스팍에 대한 간단한 개념과 장점 소개


조대협 (http://bcho.tistory.com)



스팍의 개념과 주요 기능


요즘 주변에서 아파치 스팍을 공부하는 사람도 많고, 스팍을 기반으로한 Zeppelin을 이용하여 데이타 분석을 하는 경우도 많아서, 오늘부터 다시 Spark을 들여다 보기 시작했습니다.


스팍은 예전에도 몇번 관심을 가진적이 있는데, Storm과 같은 데이타 스트리밍 프레임웍에서 Storm과 같이 언급 되기도 하고, 머신 러닝 프레임웍을 볼때도  스팍 ML  라이브러리 기능이 언급 되기도 하고, 예전 모 회사의 데이타 분석 아키텍쳐를 보니, 카산드라에 저장된 데이타를 스팍/Shark라는 프레임웍으로 분석을 하더군요. 또 누구는 메모리 기반의 하둡이라고도 합니다.


스팍의 정의를 내려보면 한마디로

범용적 목적의 분산 고성능 클러스터링 플랫폼 (General purpose high performance distributed platform)

입니다 말이 정말 길고 모호한데, 달리 설명할만한 단어가 없습니다.


스팍은 범용 분산 플랫폼입니다. 하둡과 같이 Map & Reduce 만 돌리는 것도 아니고, Storm 과 같이 스트리밍 처리만 하는게 아니라, 그냥 분산된 여러대의 노드에서 연산을 할 수 있게 해주는 범용 분산 클러스터링 플랫폼으로, 이 위에, Map & Reduce나, 스트리밍 처리등의 모듈을 추가 올려서 그 기능을 수행하게 하는 기능을 제공합니다.


특히나, 메모리 하둡이라고도 불리는데, 이 스팍은 기존의 하둡이 MR(aka. Map & Reduce) 작업을 디스크 기반으로 수행하기 때문에 느려지는 성능을 메모리 기반으로 옮겨서 고속화 하고자 하는데서 출발하였습니다.


그 플랫폼 위에 SQL 기반으로 쿼리를 할 수 있는 기능이나, 스트리밍 기능등등을 확장하여 현재의 스팍과 같은 모습을 가지게 되었습니다.

스팍의 주요 기능은 앞에서 언급하였듯이 

  • Map & Reduce (cf. Hadoop)
  • Streaming 데이타 핸들링 (cf. Apache Storm)
  • SQL 기반의 데이타 쿼리 (cf. Hadoop의 Hive)
  • 머신 러닝 라이브러리 (cf. Apache Mahout)
등을 제공합니다.

스팍의 장점

빠른 속도라는 장점은 필수 이고, 왜 다들 스팍! 스팍 하는가 봤더니, 플랫폼으로써의 특성이 장점으로 작용합니다.
예를 들어 기존의 빅데이타 분석 플랫폼의 구현 구조를 보면, 배치 분석시 ETL 등을 써서 데이타를 로딩한 후에, Hadoop의 MR을 이용하여 데이타를 정재한후, 이 데이타를 OLAP 등의 데이타 베이스에 넣은 후에, 리포팅 툴로 그래프로 표현했습니다. 여기에 만약 실시간 분석을 요한다면 Storm을 연결해서 실시간 데이타 분석 내용을 더하는 일을 했습니다. 
사용되는 프레임웍만해도 몇가지가 되고, 이를 공부하는 시간과 시스템을 배포 운영하는데 여러가지 노력이 들어갔습니다만, 스팍은 하나로 이 모든것이 가능하다는 겁니다.

즉 한번 배워놓으면, 하나의 플랫폼내에서 여러가지가 가능한데다가 속도까지 빠릅니다.
그리고 한 플랫폼안에 배치,스트리밍, 머신 러닝등 다양한 처리를 제공하기 때문에, 하나의 데이타로 여러가지 형태로 데이타를 처리할 수 있는 기능을 가집니다.

연동성 부분에서도 장점을 볼 수 가 있는데, 스팍은 스칼라로 구현되었지만, 파이썬,자바,스칼라등 다양한 언어를 지원하기 위한 SDK를 가지고 있고, 데이타 저장단으로는 하둡, 아마존  S3, 카산드라등 다양한 데이트 스토리지를 지원합니다.
이런 연동성으로 다양한 언어로 다양한 데이타 저장소를 연동하여 데이타를 분석 및 처리할 수 있는 장점을 가지고 있습니다.


ZooKeeper란 무엇인가?

조대협 (http://bcho.tistory.com)


소개


분산 시스템을 설계 하다보면, 가장 문제점 중의 하나가 분산된 시스템간의 정보를 어떻게 공유할것이고, 클러스터에 있는 서버들의 상태를 체크할 필요가 있으며 또한, 분산된 서버들간에 동기화를 위한 락(lock)을 처리하는 것들이 문제로 부딪힌다.


이러한 문제를 해결하는 시스템을 코디네이션 서비스 시스템 (coordination service)라고 하는데, Apache Zookeeper가 대표적이다. 이 코디네이션 서비스는 분산 시스템 내에서 중요한 상태 정보나 설정 정보등을 유지하기 때문에, 코디네이션 서비스의 장애는 전체 시스템의 장애를 유발하기 때문에, 이중화등을 통하여 고가용성을 제공해야 한다. ZooKeeper는 이러한 특성을 잘 제공하고 있는데, 그런 이유로 이미 유명한 분산 솔루션에 많이 사용되고 있다. NoSQL의 한종류인 Apache HBase, 대용량 분산 큐 시스템인 Kafka등이 그 대표적인 사례이다.

분산 시스템을 코디네이션 하는 용도로 디자인이 되었기 때문에, 데이타 억세스가 빨라야 하며, 자체적으로 장애에 대한 대응성을 가져야 한다. 그래서 Zookeeper는 자체적으로 클러스터링을 제공하며, 장애에도 데이타 유실 없이 fail over/fail back이 가능하다.


Apache Zookeeper의 기능은 사실상 별로 없다. 디렉토리 구조기반으로 znode라는 데이타 저장 객체를 제공하고, (key-value식). 이 객체에 데이타를 넣고 빼는 기능만을 제공한다. 일단 디렉토리 형식을 사용하기 때문에 데이타를 계층화된 구조로 저장하기 용이하다.


데이타 모델


데이타 모델은 간단하다. 디렉토리 구조의 각 노드에 데이타를 저장할 수 있다.



 

노드는 아래와 같이 기능에 따라 몇가지 종류로 나뉘어 지는데 


  • Persistent Node : 노드에 데이타를 저장하면 일부러 삭제하지 않는 이상 삭제되지 않고 영구히 저장된다.

  • Ephemeral Node : 노드를 생성한 클라이언트의 세션이 연결되어 있을 경우만 유효하다. 즉 클라이언트 연결이 끊어지는 순간 삭제 된다. 이를 통해서 클라이언트가 연결이 되어 있는지 아닌지를 판단하는데 사용할 수 있다. (클러스터를 구성할때 클러스터내에 서버가 들어오면, 이 Ephemeral Node로 등록하면 된다.)

  • Sequence Node : 노드를 생성할때 자동으로 sequence 번호가 붙는 노드이다. 주로 분산락을 구현하는데 이용된다.


Watcher


Watch 기능은 ZooKeeper 클라이언트가 특정 znode에 watch를 걸어놓으면, 해당 znode가 변경이 되었을때, 클라이언트로 callback 호출을 날려서 클라이언트에 해당 znode가 변경이 되었음을 알려준다. 그리고 해당 watcher는 삭제 된다. 


ZooKeeper 활용 시나리오


Zookeeper는 단순히, 디렉토리 형태의 데이타 저장소이지만, 노드의 종류별 특성과, Watcher 기능들을 활용하면 다양한 시나리오에 사용할 수 있다. 


  1. 큐 : Watcher와 Sequence node를 이용하면, 큐를 구현할 수 있는데, Queue 라는 Node를 만든 후에, 이 노드의 Child node를 sequence node로 구성하면, 새롭게 생성되는 메세지들은 이 sequence node로 순차적으로 생성된다. 이 큐를 읽는 클라이언트가 이 큐 node를 watch 하도록 설정하면, 이 클라이언트는 새로운 메세지가 들어올 때 마다 call back을 받아서, 마치 메세지 Queue의 pub/sub 과 같은 형태의 효과를 낼 수 있다. 대용량 메세지나 애플리케이션 성격상의 메세지는 일반적인 큐 솔루션인 Rabbit MQ등을 활용하고, ZooKeeper는 클러스터간 통신용 큐로 활용하는 것을 고려해볼 수 있다.

  2. 서버 설정 정보 : 가장 일반적인 용도로는 클러스터 내의 각 서버들의 설정 정보(Configuration)를 저장하는 저장소로 쓸 수 있다. 정보가 안정적으로 저장이 될 뿐 아니라. Watch 기능을 이용하면, 설정 정보가 저장될 경우, 각 서버로 알려서 바로 반영을 할 수 있다.

  3. 클러스터 정보 : 현재 클러스터에서 기동중인 서버 목록을 유지할 수 있다. Ephemeral Node는 Zookeeper 클라이언트가 살아 있을 경우에만 유효하기 때문에, 클러스터내의 각 서버가 Ephemeral Node를 등록하도록 하면, 해당 서버가 죽으면 Ephemeral Node 가 삭제 되기 때문에 클러스터 내의 살아 있는 Node 리스트만 유지할 수 있다. 조금 더 발전하면, 클러스터가 master/slave 구조일때, master 서버가 죽었을 때, 다른 master 서버를 선출하는 election logic 도 구현이 가능하다. (https://zookeeper.apache.org/doc/r3.4.6/recipes.html#sc_recipes_Queues 참고)

  4. 글로벌 락 : Zookeeper를 이용하여 많이 사용되는 시나리오 중의 하나인데, 여러개의 서버로 구성된 분산 서버가 공유 자원을 접근하려고 했을때, 동시에 하나의 작업만이 발생해야 한다고 할때, 그 작업에 Lock을 걸고 작업을 할 수 있는 기능을 구현할때 사용한다. 자바 멀티 쓰레드 프로그램에서 synchronized를 생각하면 쉽게 이해가 가능할 것이다.

자세한 구현 방식과 시나리오에 대해서는 https://zookeeper.apache.org/doc/r3.4.6/recipes.html 를 참고하기 바란다.


설치

https://zookeeper.apache.org 에서 gz 파일을 다운 받아서, 압축만 풀면 간단하게 설치는 끝난다. 여기서는 안정 버전인 3.4.6 버전을 사용하였다. 그리고 c:\dev\tools\zookeeper 디렉토리에 압축을 풀었다.


간단한 테스트 하기

$ZooKeeper_home/conf/zoo.conf를 다음과 같이 작성하자

clientPort=2181

tickTime=2000

dataDir=c:\\temp\\


그리고

c:\dev\tools\zookeeper>.\bin\zkServer.cmd로 서버를 구동한다. 이때 하나의 서버만 구동하기 때문에, 이를 standalone 모드라고 한다. (원래 Production에서는 데이타 복제를 통한 페일오버를 지원하기 위해서 여러개의 서버를 띄운다.)

다음으로 zookeepr에 접속해보자

C:\dev\tools\zookeeper>bin\zkCli -server 127.0.0.1:2181


다음과 같이 쉘이 실행됨을 확인할 수 있다.



 

다음으로, 새로운 znode를 하나 만들어보자. 다음과 같이 create 명령을 이용하여 zk_test 노드를 “hello”라는 값으로 생성한다. 다음에, get 명령을 이용하여 데이타를 조회한다.

 

다음으로, my_server1 라는 노드를 추가로 만들자 그리고, ls / 명령어를 실행하면 다음과 같이 아까 만든 zk_test 노드와, my_server1 노드가 생성됨을 확인할 수 있다.

 


참고 자료

https://zookeeper.apache.org/doc/r3.4.6/zookeeperStarted.html

http://helloworld.naver.com/helloworld/textyle/294797 이 글은 레디스 클러스터를 주키퍼를 이용하여 구현한 실 사례로, 읽어볼 여지가 있다.


람다 아키텍쳐의 소개와 해석

조대협 (http://bcho.tistory.com)


람다 아키텍쳐란

람다 아키텍쳐는 트위터에서 스트리밍 컴퓨팅에 있었던Nathan Marz에 의해서 소개된 아키텍쳐로, 실시간 분석을 지원하는 빅데이타 아키텍쳐이다.

아키텍쳐에 대한 자세한 내용은 http://lambda-architecture.net/ 에 소개되어 있다.


문제의 정의

아키텍쳐에 대한 이해를 돕기 위해서 예를 들어 설명해보자.

 페이스북과 SNS 애플리케이션 SNS가 있다고 가정하자. 이 애플리케이션은 모바일 애플리케이션이며, 글쓰기, 읽기, 댓글 달기, 스크롤 하기, 페이지 넘기기등 약 1000여개의 사용자 이벤트가 있다고 가정하자.

 사용자 수는 대략 1억명이며, 매일 이 각 사용자의 행동 패턴을 서버에 저장하여, 일별로, 사용자 이벤트의 개수를 통계로 추출한다고 하자.

클라이언트 디바이스로 부터 올라오는 데이타는 다음과 같다

  • 사용자 : 조대협

  • 날짜 : 2015년 1월 5일



<그림 1. 클라이언트에서 올라오는 데이타 포맷>

이런 환경에서, 기간별 특정 이벤트 추이, 가장 많이 활용되는 이벤트 TOP5 등의 통계 정보를 실시간으로 보고 싶다고 가정하자

가장 단순한 접근은 RDBMS에 저장하고 쿼리를 수행하는 방법이다.


<그림 2. 로그 데이타를 RDBMS에 저장한 포맷>

RDBM에 저장하고 SQL 쿼리문을 돌리면 되겠지만, 문제는 간단하지 않다. 1000개의 컬럼에, 1억명이 사용하는 시스템이다. 즉. 하루에 최대 1000개의 컬럼 짜리, 1억개의 레코드가 생성이 된다것이다.한달이면 30억개의 레코드이다.

이런 많은 데이타를 동적 SQL로 실행하였을때 그 수행시간이 많이 걸린다.


배치를 활용

그러면 이런 시간이 많이 걸리는 문제를 어떻게 해결하면 좋을까? 이를 위한 전통적인 접근 방식은 배치(BATCH)를 활용하는 것이다. 배치는, 어떤 특정 정해진 시간에, 계산을 미리 해놓는 것이다.

즉 데이타를 모아 놓았다가.밤마다.그날짜의 사용자들의 이벤트들의 합을 매일 계산해놓은 테이블을 만들어 놓으면 된다.



<그림 3. 일별 배치로 생성된 이벤트 데이타 테이블>

자아, 이렇게 배치로 테이블을 만들어 놓으면, 특정 기간에 각 이벤트별 통계를 내기가 쉬워 진다. 1년분의 데이타라하더라도 365 행 밖에 되지 않기 때문에, 속도 문제가 해결이 된다.

실시간 데이타의 반영

테이블 조인

이렇게 배치 테이블을 생성하면, 성능에 대한 문제는 해결이 되지만, 데이타가 배치 주기에 따라 최대 1일의 편차를 두게 된다. 즉 실시간 반영에 대한 문제가 발생한다.

그렇다면 어떻게 해결을 해야 할까? 해결은 배치 테이블과 그날의 데이타 테이블을 두개를 같이 사용하면 된다.

즉 어제까지의 데이타는 일별 배치로 생성된 테이블을 사용하고, 오늘 데이타 부분은 사용자별로 기록된 로그 테이블을 사용하여 두 테이블을 조인 하면, 오늘의 지금 순간의 통계값까지 볼 수 있다.

 


<그림 4. 테이블 조인을 이용한 실시간 데이타 통계 추출 >


실시간 집계 테이블의 활용

하루에 쌓이는 데이타량이 얼마 되지 않는다면 문제가 되지 않겠지만, 이 시나리오에서 하루에 쌓이는 데이타는 일 최대 1억건이 된다. 즉, 오늘 쌓이는 데이타 테이블을 조인 하면 1억개의 행에 대한 연산이 발생하여 적절한 성능을 기대하기 어렵다. 

그렇다면, 배치는 매일 돌리되, 오늘 데이타에 대한 통계 값을 실시간으로 업데이트 하는 방법을 생각해볼 수 있다. 

아래 그림과 같이 로그서버에서 클라이언트에서 받은 로그를 원본 데이타 테이블에 계속 저장을 하고, 오늘 통계에 대한 실시간 집계 테이블에, 글쓰기, 글 읽기 등 개별 이벤트의 값을 계산해서 더해 주면 된다.

 


<그림 5. 실시간 집계 테이블>

이렇게 하면, 실시간 집계 테이블과, 배치 테이블을 조인하여 빠르게 실시간 통계를 볼 수 있다.

즉 일별 실시간 통계는 다음 그림과 같이 당일전의 배치뷰와 당일의 실시간뷰를 합쳐서 통계를 낸 형태가 된다.

 


<그림 6. 실시간 통계를 뽑기 위한 테이블들의 관계>


람다 아키텍쳐를 활용

이 개념을 람다 아키텍쳐로 해석해보자. 데이타 흐름을 도식화 해보면 다음과 같다.

 


<그림 7. 람다 아키텍쳐의 개념>


먼저 배치 처리를 위해서, 로그 서버는 모든 로그 데이타를 저장소에 저장하고, 배치 처리 계층에서 일일 또는 일정한 시간을 주기로 배치 처리로 계산을 해서 배치 뷰(배치 테이블)을 만든다.

그리고 다른 흐름으로 실시간 처리쪽에 데이타를 전송해서 실시간 집계를 해서 실시간 집계 테이블을 만든다.

마지막으로, 이 두개의 뷰를 합쳐서 통계를 만든다.

배치뷰는 배치로 돌때만 쓰기가 가능하고 평상시에는 데이타를 읽기만 가능하게 한다. 이를 통해서 데이타가 변경되거나 오염(Corrupt)되는 것을 막을 수 있다.

실시간 뷰는 실시간으로 데이타를 쓰고, 읽을 수 있는 시스템을 사용한다.

위의 문제 정의 예제에서는 컬럼의 개수를 카운트 정도하는 간단한 예를 들었지만, 실제 빅데이타 분석에서는 단순 통계뿐 아니라 복잡한 수식이나 다단계를 거쳐야 하는 데이타 파일의 가공이 필요하기 때문에 복잡한 프로그래밍이 가능한 처리(배치/실시간)이 필요한데, 이 처리 계층에는 프로그램을 이용하려 알고리즘을 삽입할 수 있어야 한다.

이러한 특성에 맞춰서 각 데이타 처리 흐름에 솔루션을 맵핑 해보면 다음과 같다.



<그림 8. 람다 아키텍쳐에 대한 솔루션 맵핑> 


저장소는 대량의 데이타를 저비용으로, 안정성 있게 (유실이 없게) 저장할 수 있는 것이 필요하다. 그리고 이런 대량의 데이타를 배치로 처리할 때 되도록이면 빠른 시간내에 복잡한 알고리즘을 적용해서 계산할 수 있는 계층이 필요한데, 이러한 솔루션으로 제시되는 솔루션이 하둡의 HDFS(Hadoop File System)과 하둡의 MR (Map & Reduce)이다.

이렇게 계산된 배치 데이타를 저장할 장소가 필요한데, 하둡에서는 이런 데이타를 저장하고 고속으로 액세스할 수 있도록 HBase라는 NoSQL을 제공한다.

실시간 처리는 복잡한 알고지즘을 빠르게 데이타를 처리할 수 있는 솔루션이 필요한데, 대표적으로 Apache Storm등이 있으며, 빠른 읽기와 쓰기를 지원해야 하기 때문에, Redis와 같은 In-memory 기반의 NoSQL이 적절하게 추천되고 있다.

일반적으로 람다 아키텍쳐를 소개할때, 제안되는 솔루션의 형태이기는 하나, 람다 아키텍쳐는 특정 솔루션을 제안하는 아키텍쳐이기 보다는 데이타의 처리 기법을 소개하는 솔루션에 종속성이 없는 레퍼런스 아키텍쳐이다.

그래서 다른 솔루션 조합을 고려해볼 수 있는데, Dr.dobbs (http://www.drdobbs.com/database/applying-the-big-data-lambda-architectur/240162604)

에 소개된 솔루션 조합과 필자가 추천하는 조합을 추가해서 보면 다음과 같다.


<그림 9. 람다 아키텍쳐의 솔루션 조합>


여기서,필자가 Dr.Dobbs의 추천 솔루션 이외에, 배치 뷰와 실시간 뷰 쪽에, RDBMS를 추가하였는데, 배치뷰에 추가한 Amazon RedShift의 경우 아마존 클라우드 서비스에서 제공되는 Postgres 기반이 서비스로, 최대 16PB(페타바이트)까지의 용량을 지원한다. 이미 빅데이타라고 부를만큼의 충분한 데이타 사이즈를 지원할 뿐더라, RDBMS 기반의 SQL을 이용하여 유연한 데이타 조회가 가능하며, 리포트를 출력하기 위한 기존의 BI 툴과도 호환이 잘되서 많은 개발에 관련된 부분을 덜 수 있다. 실제로 통계 리포팅에서 가장 많은 시간이 소요되는 작업이, 비즈니스쪽 요구에 맞는 리포트를 만드는 작업이다.어떤 테이블과 그래프를 이용해서 데이터에 대한 의미를 보여줄 지는 단순한 리포팅 작업이라고 치부하기에는 매우 중요한 작업이며, 다양한 비즈니스 요건에 맞는 뷰를 보여 주기 위해서는 BI툴과의 연동은 많은 장점을 제공한다.

위에서 설명한 람다 아키텍쳐를 계층(Layer)로 나눠서 소개 하면 다음 그림과 같다.

실시간 데이터를 처리하는 부분을 스피드 레이어라고 부르며, 배치 처리는 배치 저장소와 배치 처리 부분을 배치 레이어라고 명명하고, 배치에 의해서 처리된 요약 데이터를 제공하는 부분을 서빙 레이어(Serving Layer)라고 한다.

 


<그림 10. 계층별로 추상화된 람다 아키텍쳐>

배치 레이어의 의미

배치 레이어의 저장소에는 가공전의 원본 데이터를 모두 저장한다. 데이터가 처리된 후에도 저장소에 데이터를 삭제 하지 않는다.

이렇게 원본 데이터를 저장함으로써, 배치 뷰의 데이터가 잘못 계산되었거나, 유실 되었을때, 복구가 가능하고, 현재 데이터 분석에서 없었던 새로운 뷰(통계)를 제공하고자 할 때 기존의 원본 데이터를 가지고 있음으로써, 기존 데이터에 대해서도 새로운 뷰의 통계 분석이 가능하다.


람다 아키텍쳐의 재구성

RDBMS를 활용한 유연성 증대 방안

이러한 람다 아키텍쳐는 대용량 데이터 처리와 실시간 정보 제공을 위한 장점을 가지고 있음에도 불구하고 대부분 하둡이나 NOSQL등의 솔루션을 조합해서 구현하는 경우가 대부분이기 때문에, 유연성 측면에서 문제점을 가지고 있다.

예를 들어 배치 뷰를 HBase를 사용하고, 실시간 뷰를 Redis를 사용할 경우, 상호 솔루션간 데이터 조인이 불가능할 뿐더러, 인덱스나 조인,그룹핑, 소팅 등이 어렵다. 이러한 기능이 필요하다면 각각 배치 처리와 실시간 처리 단계에 추가적으로 로직을 추가해서 새로운 뷰를 만들어야 한다.

쉽게 설명하면, 일반적인 NoSQL은 키-밸류 스토어의 개념을 가지고 있다.

그래서, 위의 그림3과 같은 테이블이 생성되었다 하더라도, 특정 컬럼 별로 데이터를 소팅해서 보여줄수 가 없다. 만약 소팅된 데이터를 표현하고자 한다면, 소팅이 된 테이블 뷰를 별도로 생성해야 한다.

참고 : NoSQL 데이터 모델링 패턴

http://bcho.tistory.com/665 , http://bcho.tistory.com/666

그래서 이런 문제점을 보강하기 위해서는 위에서도 잠깐 언급하였듯이 실시간 뷰와 배치 뷰 부분을 RDBMS를 사용하는 것을 고려해볼 수 있다. 쿼리에 특화된 OLAP 데이터 베이스를 활용하는 방법도 있고, 또는 HP Vertica 등을 활용할 수 있다. (HP Vertica는 SQL을 지원하지만, 전통적인RDBMS가 데이터를 행 단위로 처리하는데 반하여, Vertica는 데이터를 열 단위로 처리해서 통계나 쿼리에 성능이 매우 뛰어나다. 유료이지만 1테라까지는 무료로 사용할 수 있으니 뷰 테이블 용도 정도로 사용하는데는 크게 문제가 없다.)


데이터 분석 도구를 이용한 새로운 분석 모델 개발

분석 통계 데이터를 제공하다 보면, 저장소에 저장된 원본 데이터를 재 분석함으로써 추가적인 의미를 찾아낼 수 있는데, 이 영역은 데이터 과학자의 영역으로, 저장소에 있는 데이터를 통해서 새로운 데이터 모델을 추출해 내는 방식이다.

예를 들어, 글읽기 이벤트와 글쓰기 이벤트간의 상관 관계를 파악해내거나, 요일별 이벤트 변화량등을 분석해낼 수 있는데,

  1. 이 저장소에 R이나 MetLab과 같은 데이터 분석 도구를 이용하여, 샘플(표본) 데이터를 추출해서 데이터의 상관 관계를 파악해보고,

  2. 이러한 분석을 통해서 새로운 통계 모델을 설계하고 검증해볼 수 있다.

  3. 만약 이러한 모델이 적절하다면 알고리즘을 구현하고 이를 빅데이타 엔지니어에게 넘겨 준다.

  4. 빅데이타 엔지니어는 데이터 과학자에게서 받은 알고리즘을 람다 아키텍쳐의 각 레이어에 배치된 솔루션에 알맞은 형태로 구현한다.


 


<그림 11. 새로운 데이터 모델의 개발>

이러한 과정의 반복을 통해서, 분석 시스템은 지속적으로 발전되어가면서 데이터에 대한 더 많은 인사이트를 제공할 수 있게 된다.


결론

간단하게나마 람다 아키텍쳐에 대해서 알아보았다.

람다 아키텍쳐는 꼭 빅데이타에 적용하거나, 또는 하둡을 이용해야 하는 아키텍쳐가 아니다. RDBMS나 CSV 파일 등, 어떤 데이터 형태라도 기본은 배치를 이용한 집계 테이블과 실시간 뷰 테이블을 조인한다는 개념이기 때문에, 솔루션에 억메이지 말고, 적절한 시나리오를 찾아서 적용할 수 있도록 하면 좋겠다.


참고 : 

http://www.drdobbs.com/database/applying-the-big-data-lambda-architectur/240162604

http://www.infoq.com/articles/lambda-architecture-scalable-big-data-solutions



Spring for Apache Hadoop Project #2

(Hive Integration)

Hive Apache 오픈 소스 프로젝트의 하나로, Hadoop 관련 프로젝트이다.

HDFS에 저장된 데이타를 마치 RDMS SQL처럼 쿼리하기 위한 솔루션으로, 복잡한 데이타 쿼리 연산에 있어서, Hadoop과 함께 사용하면 매우 유용하게 이용할 수 있다.

SHDP에서도 이 Hive를 지원한다. 크게 Hive의 기동과, Hive Script의 실행 그리고, Hive에서 제공하는 API를 수행할 수 있도록 지원하며, Hadoop 지원과 마찬가지로, Tasklet을 제공하여 Spring Batch와의 통합을 지원한다.

Hive Server의 기동

hive-server 엘리먼트로 정의하며, configuration file을 읽어서 기동할 수 있으며, 추가되는 configuration hive-server엘리먼트 안에 value로써 지정이 가능하다.

<hdp:hive-server host="some-other-host" port="10001" properties-location="classpath:hive-dev.properties" configuration-ref="hadoopConfiguration">
  someproperty=somevalue
  hive.exec.scratchdir=/tmp/mydir
</hdp:hive-server>

Thrift Client 를 이용한 Hive Script의 수행

Hive를 사용하기 위해서는 Hive Server에 접속하는 클라이언트를 생성해야 하는데, 첫번째 방법이 Thrift Client를 이용하는 방법이 있다. Thrift Client의 경우에는 Thread Safe 하지 않기 때문에, client factory를 리턴한다.

아래 설정을 보면 hive-client-factory hive서버의 ip,port를 지정하여 client를 생성하였다.

그리고, script 실행을 위해서 runner 를 지정한후에, 앞서 생성한 clientfactory reference하였다. 그리고 hive-runner에서 script location을 지정하여,password-analysis.hal 파일에 정의된 script가 실행되도록 정의하였다.

<hdp:hive-client-factory host="some-other-host" port="10001" />
<hdp:hive-runner id=”hiveRunner”hive-client-ref=”hiveClientFactory” run-at-startup=”false” pre-action=”hdfsScript”>
  <script location=”password-analysis.hal”/>
</hdp:/hiverunner>

실제 위의 Configuration을 가지고 수행하는 자바 코드를 보면 다음과 같다.

public class HiveAppWithApacheLogs {
 
         private static final Log log = LogFactory.getLog(HiveAppWithApacheLogs.class);
 
         public static void main(String[] args) throws Exception {
                 AbstractApplicationContext context = new ClassPathXmlApplicationContext(
                                   "/META-INF/spring/hive-apache-log-context.xml"
, HiveAppWithApacheLogs.class);
                 log.info("Hive Application Running");
                 context.registerShutdownHook();    
 
 
                 HiveRunner runner = context.getBean(HiveRunner.class);                
                 runner.call();
 
         }
}

Hive client를 만들때는 각 client가 생성될때마다 자동으로 initialize script를 실행할 수 있다.

<hive-client-factory host="some-host" port="some-port" xmlns="http://www.springframework.org/schema/hadoop">
   <hdp:script>
     DROP TABLE IF EXITS testHiveBatchTable; 
     CREATE TABLE testHiveBatchTable (key int, value string);
   </hdp:script>
   <hdp:script location="classpath:org/company/hive/script.q">
       <arguments>ignore-case=true</arguments>
   </hdp:script>
</hive-client-factory>

위의 설정은 client가 생성될때 마다 DROP TABLE xx 스크립트와, script.q에 지정된 스크립트 두개를 자동으로 수행하도록 한다.

마찬가지로, runner에서도 순차적으로 여러개의 쿼리가 수행되도록 설정할 수 있다.

JDBC 를 이용한 스크립트 수행

Hive Thrift 이외에도, RDBMS에 사용하는 JDBC 드라이버를 사용할 수 있다. Spring에서도 이 JDBC를 통한 Hive 통합을 지원한다.

사용 방법은 일반적인 JDBC Template을 사용하는 방법과 동일하다.

먼저 hive-driver Hive JDBC 드라이버를 지정한후, 이를 이용하여, hive data source를 정의한후, Jdbc template을 이 data source와 연결하여 사용한다. (아래 예제 참고)

<beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:c="http://www.springframework.org/schema/c"
         xmlns:context="http://www.springframework.org/schema/context"
         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
         
    <!-- basic Hive driver bean -->
    <bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>
 
    <!-- wrapping a basic datasource around the driver -->
    <!-- notice the 'c:' namespace (available in Spring 3.1+) for inlining constructor arguments, 
         in this case the url (default is 'jdbc:hive://localhost:10000/default') -->
    <bean id="hive-ds" class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
       c:driver-ref="hive-driver" c:url="${hive.url}"/>
 
    <!-- standard JdbcTemplate declaration -->
    <bean id=" jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" c:data-source-ref="hive-ds"/>
         
    <context:property-placeholder location="hive.properties"/>
</beans>

위의 Configuration을 수행하는 자바 코드는 다음과 같다.

Hive template을 이용한 Hive API 실행

JDBC Template과 유사하게 Hive 실행도 Template을 제공한다.

다음과 같이 context 파일에서, hive-template을 만든후에, 해당 template SomeClass라는 클래스에 someBean이란 이름으로 생성해서 weaving하였다.

<hdp:hive-client-factory ... />
<!-- Hive template wires automatically to 'hiveClientFactory'-->
<hdp:hive-template />
         
<!-- wire hive template into a bean -->
<bean id="someBean" class="org.SomeClass" p:hive-template-ref="hiveTemplate"/>

SomeClass에서는 template을 받아서, hivetemplate.execute() 메서드를 수행한다.

public class SomeClass {
 
private HiveTemplate template;
 
public void setHiveTemplate(HiveTemplate template) { this.template = template; }
 
public List<String> getDbs() {
    return hiveTemplate.execute(new HiveClientCallback<List<String>>() {
       @Override
       public List<String> doInHive(HiveClient hiveClient) throws Exception {
          return hiveClient.get_all_databases();
       }
    }));
}}

 

Spring Batch Integration

마지막으로 Hadoop integration등과 마찬가지로 Spring Batch 통합을 위하여, tasklet을 제공한다.

<hdp:hive-tasklet id="hive-script">
   <hdp:script>
     DROP TABLE IF EXITS testHiveBatchTable; 
     CREATE TABLE testHiveBatchTable (key int, value string);
   </hdp:script>
   <hdp:script location="classpath:org/company/hive/script.q" />
</hdp:hive-tasklet>

 

 

 

 

Spring for Apache Hadoop Project


얼마전에, Spring에서 Hadoop과 통합을 지원하는 프로젝트를 발표하였습니다. Hadoop 자체뿐만 아니라, Hadoop echo system Hive, Pig, Cascade등을 함께 지원하며, 기존 Spring Spring Batch(배치 작업 수행 및 워크 플로우 관리)와의 통합을 지원합니다.

이번 글에서는 Spring Data Apache Hadoop (이하 SHDP-Spring for Apache Hadoop Project)에 대해 설명한다 ( Spring Hadoop에 대한 기본적인 이해가 선행되어야 한다. )

전체적으로의 느낌은 Spring을 컨테이너의 개념으로 보고, Hadoop을 그 컨테이너 안에서 실행 시키는 것과 같은 느낌을 준다. Hadoop 자체와 아주 Tight하게 통합 된것은 아니면서, 외부 명령어나 Library invoke 하듯이 Hadoop을 실행하며, 단순히 Map & Reduce Job 뿐만 아니라, Job을 수행하기 위해서 통상적으로 Hadoop에서 수행하는 Shell Script 처리까지 프레임웍에서 지원하도록 했다. 또한 각 Job이나 이런 Script들을 순차적으로 batch처럼 수행하기 위해서 Tasklet을 제공함으로써 Spring Batch와의 통합을 지원한다. Hadoop job, tool,script들을 모두 spring configuration내의 element로 다 맵핑을 한후에, Spring batch와 통합을 통하여, hadoop을 수행하는데 모든 필요한 순차적인 작업을 shell command나 여타의 작업이 필요 없이 spring만 가지고도 수행할 수 있도록 지원한다.


현재 지원 버전 참고

Spring for Apache Hadoop requires JDK level 6.0 (just like Hadoop) and above, Spring Framework 3.0 (3.2 recommended) and above and Apache Hadoop0.20.2 (1.0.4 recommended) and above. SHDP supports and is tested daily against various Hadoop distributions, such as Cloudera CDH3 (CD3u5) and CDH4 (CDH4.1u3 MRv1) distributions and Greenplum HD (1.2). Any distro compatible with Apache Hadoop 1.0.x should be supported

 Job 기반의 기본적인 Hadoop Map & Reduce 실행

Hadoop Map & Reduce 에 대한 통합은 크게 3가지 관점으로 이루어 진다. Job,Tool 그리고 Hadoop jar 파일 자체

Job Map & Reduce의 단위로, 당연히 지원되어야 하고, Tool Hadoop이 지원하는 각종 유틸리티 들이다. 이 역시 Spring element로 지정이 되어 있다. 마지막으로 hadoop.jar 자체를 수행할 수 있는 기능을 지원한다. 3가지 기능을 모두 지원함으로써 hadoop으로 할 수 있는 거의 모든 것을 spring으로 mapping을 할 수 있다. 또한 이 각각에 대해서 Spring Batch 통합을 위하여 Tasklet을 제공한다.

Java 기반의 job 정의

<hdp:job id="mr-job"

  input-path="/input/" output-path="/ouput/"

  mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"

  reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>

 

위의 예제는 default configuration을 사용하는 것으로 되어 있는데, 별도로 hadoop configuration을 명시적으로 지정할 수 도 있고, 아울러, Job 마다 custom property 를 이용하여, configuration을 명시적으로 지정할 수 있음. 아래 예제는 특정 job에 대해서 configuration property 파일로 지정한 형태. 또한 아래에서 볼 것중 하나는 jar 파일을 명시적으로 지정할 수 있다. spring load되는 classloader가 아니라, 별도의 class loader를 이용하여 job에 이용되는 클래스들을 로딩함으로써, 같은 클래스가 있을때 중복이나 충돌이 발생함을 방지해준다.

<hdp:job id="mr-job" 
  input-path="/input/" output-path="/ouput/"
  mapper="mapper class" reducer="reducer class"
  jar-by-class="class used for jar detection"
  properties-location="classpath:special-job.properties">
    electric=sea
</hdp:job>

 

Streaming job 정의

Streaming Job (일반 Command 명령어 방식)은 다음과 같이 정의한다.

<hdp:streaming id="streaming-env" 
  input-path="/input/" output-path="/ouput/"
  mapper="${path.cat}" reducer="${path.wc}">
  <hdp:cmd-env>
     EXAMPLE_DIR=/home/example/dictionaries/
     ...
  </hdp:cmd-env>
</hdp:streaming>

mapperreducer에 직접 command를 정의한다.

그리고 만약에, command에 넘겨야 하는 parameter들이 있다면 <hdp:cmd-env> 엘리먼트에 지정해서 넘긴다.

 Running a Hadoop Job

앞서와 같이 Hadoop Job의 정의가 끝나면, Spring에서는 “job-runner” 엘리먼트를 이용해서 특정 Job을 구동 시킬 수 있다.

<hdp:job-runner id="myjob-runner" pre-action="cleanup-script" post-action="export-results" job="myjob" run-at-startup="true"/>
 
<hdp:job id="myjob"  input-path="/input/" output-path="/output/"
         mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
         reducer="org.apache.hadoop.examples.WordCount.IntSumReducer" />

위의 예제는 구동과 동시에 TokenizerMappe라는 java mapper InitSumReducer라는 java reducer를 가지고 있는 “myjob”이라는 job을 구동 시키는 설정이다. “myjob”을 구동하기 전에 “cleanup-script” 를 수행하고, job 실행이 끝나면 “export-results”라는 스크립트를 자동으로 수행한다.

또한 <hdp:job-runner> 엘리먼트에 job=”” 속성에, 순차적으로 1개 이상의 job id를 적으면, 여러개의 job을 순차적으로 수행하게 된다.

Job 실행의 Spring Batch와 통합

SHDPHadoop job 실행을 Spring Batch와 통합하기 위해서 hadoop job을 실행 시키는 tasklet을 제공한다.

<hdp:job-tasklet id="hadoop-tasklet" job-ref="mr-job" wait-for-completion="true" />

위와 같이 job tasklet으로 지정하여 spring batch work flow에 통합할 수 있다.

Hadoop Tool 실행

Hadoop에는 내부적으로 Map & reduce job을 수행 시키는 것 이외에도, 여러가지 툴(유틸리티)들이 포함되어 있는데, SHDP에서는 이러한 유틸리티를 수행할 수 있도록, “tool-runner” element를 지원한다.

<hdp:tool-runner id="someTool" tool-class="org.foo.SomeTool" run-at-startup="true">
   <hdp:arg value="data/in.txt"/>
   <hdp:arg value="data/out.txt"/>
   
   property=value
</hdp:tool-runner>

위와 같이 tool-runner element에서 수행하고자 하는 클래스를 tool-class로 지정하고, 필요한 parameter hdp:arg 엘리먼트로 정의하면 된다.

또한 하나의 tool만 실행하는 것이 아니라, 순차적으로 여러개의 tool tool-runner를 이용해서 수행할 수 있다.

<hdp:tool-runner id="job1" tool-class="job1.Tool" jar="job1.jar" files="fullpath:props.properties" properties-location="config.properties"/>
<hdp:tool-runner id="job2" jar="job2.jar">
   <hdp:arg value="arg1"/>
   <hdp:arg value="arg2"/>
</hdp:tool-runner>
<hdp:tool-runner id="job3" jar="job3.jar"/>
...

위와 같이 job1,job2,job3의 툴을 순차적으로 수행함으로써, hadoop 수행시 shell script에서 여러 도구를 수행하는 절차를 모두 Spring으로 통합할 수 있다.

Hadoop ToolSpring Batch 통합

Job과 마찬가지로 Tool 실행 역시 tasklet을 제공하여, Spring Batch과 통합 될 수 있도록 지원한다.

<hdp:tool-tasklet id="tool-tasklet" tool-ref="some-tool" />

 

Hadoop jar의 실행

마지막으로, hadoop.jar 자체를 수행할 수 있도록 지원한다.

<hdp:jar-runner id="wordcount" jar="hadoop-examples.jar" run-at-startup="true">
    <hdp:arg value="wordcount"/>
    <hdp:arg value="/wordcount/input"/>
    <hdp:arg value="/wordcount/output"/>
</hdp:jar-runner>

위의 예제는 아래 shell command를 그대로 수행하는 기능을 한다.

bin/hadoop jar hadoop-examples.jar wordcount /wordcount/input /wordcount/output

 

Hadoop jar 실행과 Spring Batch의 통합

Job이나 Hadoop tool과 마찬가지로 Spring 통합을 위해서 tasklet을 제공한다.

<hdp:jar-tasklet id="jar-tasklet" jar="some-jar.jar" />

 

다음 글에서는 Hive, Pig, Cascading, HDFS에 대한 SHDP 지원에 대해서 소개하고자 한다.

참고 자료 : http://static.springsource.org/spring-hadoop/docs/current/reference/html/hadoop.html

 

Hadoop Architecture Overview

요즘 클라우드와 빅데이타 그리고 분산 컴퓨팅이 유행하면서 가장 많은 언급 되는 솔루션중하나가 Hadoop이다. Hadoop 이 무엇이길래 이렇게 여기저기서 언급될까? 본 글에서는 Hadoop에 대한 소개와 함께, Hadoop의 내부 동작 아키텍쳐에 대해서 간략하게 소개 한다.

What is Hadoop?

Hadoop의 공식 소개를 홈페이지에서 찾아보면 다음과 같다.

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using a simple programming model. ’

정의를 요약하면, Hadoop은 여러 컴퓨터로 구성된 클러스터를 이용하여 큰 사이즈의 데이타를 처리하기 위한 분산 처리 프레임웍이다.

구조를 보면 엔진 형태로 되어 있는 미들웨어와 소프트웨어 개발 프레임웍의 형태를 띄고 있고, 대용량 데이타를 분산처리를 통해서 처리할 수 있는 솔루션으로, OLTP성의 트렌젝션 처리 (웹과 같이 즉시 응답이 필요한 시스템)보다는 OLAP성의 처리 (데이타를 모아서 처리 후 일정 시간 이후에 응답을 주는 형태)를 위해 디자인된 시스템으로 수분~수일이 소요되는  대규모 데이타 처리에 적합한 프레임웍이다.

Hadoop을 기반으로 한 여러가지 솔루션이 있으나 본 글에서는 Hadoop 자체에 대해서만 설명하도록 하겠다.

Map & Reduce의 기본

Hadoop의 분산 처리 방식은 기본적으로 “Map & Reduce”라는 아키텍쳐를 이용한다. Map & Reduce는 하나의 큰 데이타를 여러개의 조각으로 나눠서 처리 하는 단계 (Map), 처리 결과를 모아서 하나로 합쳐서 결과를 내는 단계 (Reduce)로 나뉘어 진다.

 

 

<그림 Map & Reduce의 기본 개념 >

예를 들어 한국에 사는 모든 사람의 수입의 합을 더한다고 하자. 우리한테는 한국에 있는 모든 사람의 수입이 적혀 있는 텍스트 파일이 하나 있다. 이 파일을 이용해서 각각 사람의 수입을 순차적으로 더해서할 수 도 있지만, 전체 파일을 10조각, 100조각으로 나눠서 각 조각의 수입합을 계산한후, 그 결과를 하나로 더하면, 조각을 나눈만큼의 계산을 병렬로 처리할 수 있기 때문에 조금 더 빠른 결과를 가질 수 있다. 이렇게 하나의 파일을 여러 조각으로 나눠서 계산 하는 것을 Map, 그리고 합치는 과정을 Reduce라고 한다.

이러한 Map & Reduce를 하기 위해서는 Input 데이타를 저장하고, 이 데이타를 조각으로 나눠서 저장하고, 조각에서 처리된 임시 결과를 저장 그리고 합쳐서 저장할 각각의 저장 공간이 필요하다. 이 공간은 전체 분산 처리 시스템에 걸쳐서 접근이 가능해야 하고, 대용량의 데이타를 저장할 수 있어야 하는데, Hadoop에서는 이 공간으로 분산 파일 시스템 (Distributed File System)을 사용하며, 이를 HDFS (Hadoop Distributed File System)이라고 한다. 그리고 위에서 설명한 것 처럼 MR을 위해서 데이타를 처리하는 부분을 Map & Reduce 모듈이라고 한다.

사용 시나리오

Hadoop Map & Reduce는 대용량 데이타에 대한 분석에 대해서 최적화 되어 있다.

즉 웹 시스템이나 OLTP (Online Transaction Processing)과 같은 1~5초내에 응답이 오는 형태의 Request/Response 방식의 Synchronous 형식의 시나리오에는 적합하지 않고, Input을 넣은 다음 수분 후에 결과를 받아서 보는 비동기 (Asynchronous) Deferred 형태의 시스템에 적절하다.

 수학적 계산을 위한 연산 처리, 대규모 저장 및 분석, BI (Business Intelligence)와 같은 데이타 분석과 같은 후처리(지연처리) 중심의 분석 시나리오에 적합하다.

Hadoop 구성

앞에서 설명한 것과 같이 Hadoop은 크게 분산 데이타 처리를 하기 위한 Map & Reduce 모듈(이하 MR) MR Input/Output 데이타를 저장하는 파일시스템인 HDFS (Hadoop Distributed File System)으로 구성되어 있다.

HDFS (Hadoop Distributed File System)

HDFS는 분산 데이타 처리를 위해서 데이타를 저장하기 위한 파일 시스템으로, 다음과 같은 특징을 가지고 있다.

  • 대용량의 파일 (페타 바이트)을 저장할 수 있다.

  • 많은 수의 파일을 저장할 수 있다.

  •  Streaming Data Access

  •  Commodity Hardware

  • Multiple writers, arbitrary file modifications

HDFS의 구성 컴포넌트는 크게 두가지로 나뉘어 진다. (Namenodes, Datanodes).

Namenodes

Namenodes는 일종의 Master node로 파일에 대한 메타 데이타를 저장하는 노드로, 디렉토리 구조, 파일에 대한 각종 메타 데이타, 그리고 물리적 파일이 저장되어 있는 위치등을 저장한다.

Datanodes

Datanodes는 실제 파일을 저장하고 읽어온다. 하나의 파일을 블록이라는 단위로 나눠서 저장하는 역할을 수행한다. 그리고 Namenodes와 주기적으로 통신하여 저장하고 있는 블록에 대한 정보를 Namenodes에 저장하도록 한다.

Namenodes에는 모든 블록에 대한 메타정보가 들어와 있기 때문에, Namenodes가 장애가 나면 전체 HDFS 이 장애가 나는 SFP ( Single Failure Point )가 된다. Namenodes에 대한 이중화가 필요하다. Namenodes에 대한 이중화 방안에 대해서는 추후에 설명하도록 한다. Namenodes SFP로 작용하는 것은 Hadoop 운영상에 아주 중요한 운영 포인트로 존재한다. 근래에는 HDFS의 약점을 보완하기 위해서 GlusterFS Hadoop을 지원하면서, 파일 시스템으로 HDFS를 사용하는 대신 GlusterFS로 대처할 수 있다.

블럭 (Block)

블럭은 HDFS에서 Read Write를 하는 최소 단위이다. 하나의 파일을 여러개의 블럭으로 나눠서 저장된다. Hadoop의 블럭 사이즈는 일반적인 파일 시스템의 블럭사이즈 ( Kilobytes – 일반적으로 파일시스템에서는 512 KB )에 비해서 큰 블럭사이즈를 사용한다. Hadoop에서는 디폴트로 64MB를 사용하고, 보통 128MB를 사용한다.

클 블럭사이즈를 사용하는 이유는, MR 처리에 있어서 Map Task가 하나의 파일 사이즈 단위로 처리하기 때문에, 작은 파일 억세스 보다는 Map Task 단위로 처리할 수 있는 단위가 필요하다. (이것이 블럭) 이를 위해서 큰 사이즈 단위로 파일 처리를 할 수 있는 블럭을 지정하는 것이다.

또한 블럭 크기를 크게 함으로써, 해당 파일에 대한 Seeking Time을 줄일 수 있고, 블럭 사이즈를 적게 하면 Master Node에서 저장 및 처리해야 하는 데이타의 양이 많아지기 때문에 Master Node에 많은 부하가 걸리기 때문에 큰 블럭 사이즈를 사용한다. 이런 이유로 반대로 블럭 사이즈가 작거나 사이즈가 작은 데이타 처리의 경우 Hadoop에서는 충분한 분산 처리 성능을 기대하기 어렵다.

HDFS는 대규모 분산 처리에 필요한 대용량 input 데이타를 저장하고 output 데이타를 저장할 대용량 파일 시스템을 지원한다. HDFS 시절 이전에는 고가의 SAN (Storage Area Network)장비나 NFS 장비를 사용했어야 했는데, Hadoop은 일반 x86 서버에 Disk를 붙인 형태의 저가의 서버 여러개를 연결하여 대규모 분산 파일 시스템을 구축할 수 있게 해줌으로써, 값비싼 파일 시스템 장비 없이 분산 처리를 가능하게 해주는 것이다

Read/Write Operation

1) Read Operation

Client에서 Read를 수행하면, 먼저 Client SDK Namenode에 해당 파일의 데이타 블럭들이 어디에 있는지 블록의 위치를 먼저 물어온 다음에, 순차적으로 해당 데이타 블록이 저장되어 있는 datanodes로 부터 데이타를 블록을 읽어온다.

 

<그림. HDFS Read Operation >

이 과정에서 Namenode라는 놈이 아주 기특한 일을 하는데, 기본적으로 HDFS는 하나의 파일 블록을 저장할때 하나의 datanode에 저장하는 것이 아니라 N개의 datanode에 복제해서 저장한다. (장애 대응을 위하여). Namenode가 블록의 위치를 리턴할때, Hadoop Client로 부터 가까운 곳에 있는 datanode (같은 서버, 같은 Rack, 같은 데이타 센터 순서로..)를 우선으로 리턴하여 효율적인 Read Operation을 할 수 있도록 한다.

2) Write Operation

 

<그림. HDFS Write Operation>

파일 블록 저장은 약간 더 복잡한 과정을 거치는데

파일을 Write를 요청하면,

     먼저 Namenode에서 File Write에 대한 권한 체크등을 수행한다.

     Namenode에서 파일이 Writing Block의 위치한 Datanode를 리턴한다. (1)
파일을 쓰다가 해당 블록이 다 차면, Namenode에 다음 Block이 저장되는 Datanode의 위치를 물어본다.

     Client Stream을 통하여 해당 Datanode Block에 파일을 Write한다. (2)

     Write된 파일을 복제 Datanodes (※ 여기서는 복제노드와 원본 노드를 포함하여 총 3개의 노드가 있다고 가정하자)로 복제(복사)한다. (3,4)

     복제되는 Datanodes들 쌍을 pipeline이라고 하는데, 내부 정책에 따라서 서로 복제할 Datanodes들을 미리 정해놓고 이를 통해서 복제한다. (pipeline Datanode의 물리적 위치 – Rack, 데이타 센터 등을 고려해서 자동으로 결정된다.)

     복제가 모두 끝나면 ACK를 보낸다. (5,6,7)

     파일 Writing을 완료한다.

 

이번 글에서는 간단하게 Hadoop에 대한 개념 소개와 Hadoop의 하부 파일 시스템인 HDFS에 대해서 알아보았다. 다음 글에서는 Hadoop의 Map & Reduce Framework에 대해서 살펴보기로 한다.