빅데이타 & 머신러닝/스트리밍 데이타 처리

Apache Storm을 이용한 실시간 데이타 처리 #5 –Storm의 병렬/분산 처리

Terry Cho 2015. 1. 25. 15:08

대충보는 Storm #5-Apache Storm 병렬 분산 처리 이해하기

 조대협 (http://bcho.tistory.com)

 

Storm에 있는 Spout Bolt들은 여러개의 머신에서 어떻게 나눠서 처리될까? Storm 클러스터는 여러대의 분산된 서버에서 운용되기 때문에, 당연히 Spout Bolt도 나눠서 처리된다 그렇다면 이런 Storm의 병렬 처리 구조는 어떻게 되는 것일까?

이 글에서는 Spout Bolt를 병렬로 처리하는 Storm의 구조에 대해서 알아보도록 한다.

Storm의 병렬 처리를 이해하기 위한 개념

Storm의 병렬 처리를 이해하기 위해서는 몇가지 개념을 정리해야 한다. Node,Worker,Exectutor,Task 이 네 가지 개념을 이해해야 한다.


Node

Node는 물리적인 서버이다. Nimbus Supervisor 프로세스가 기동되는 물리적인 서버이다.

Nimbus는 전체 노드에 하나의 프로세스만 기동하며, Supervisor는 일반적으로 하나의 노드에 하나만 기동한다. 여러대를 기동시킬 수 도 있지만, Supervisor의 역할 자체가 해당 노드를 관리하는 역할이기 때문에 하나의 노드에 여러개의 Supervisor를 기동할 필요는 없다.


Worker

Worker Supervisor가 기동되어 있는 노드에서 기동되는 자바 프로세스로 실제로 Spout Bolt를 실행하는 역할을 한다.


Executor

Executor Worker내에서 수행되는 하나의 자바 쓰레드를 지칭한다.


Task

Task Bolt Spout의 객체를 지칭한다. Task Executor (쓰레드)에 의해서 수행된다.

이 개념을 다시 정리해보면 다음과 같은 그림이 된다.



<그림. Node,Worker,Executor,Task 의 개념>

각 슬레이브 노드에는 Supervisor 프로세스가 하나씩 떠있고, conf/storm.yaml에 정의된 설정에 따라서 worker 프로세스를 띄운다.supervisor.slots.ports에 각 Worker가 사용할 TCP 포트를 정해주면 된다. 아래는 5개의 Worker 프로세스를 사용하도록 한 설정이다.



<그림. Storm 설정에서 Supervisor 5개 띄우도록한 설정>

 

그리고 난후에, Topology를 생성할때, Topology에 상세 Worker,Executor,Task의 수를 정의한다. 앞에서 예제로 사용했던 HelloTopology 클래스 코드를 다시 살펴보자. 아래 코드는 Worker,Executor,Task등을 설정한 예이다.

package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

 

public class HelloTopology {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),2)

                       .setNumTasks(4)

                       .shuffleGrouping("HelloSpout");

              

              

               Config conf = new Config();

               conf.setNumWorkers(5);

               // Submit topology to cluster

               try{

                       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

               }catch(AlreadyAliveException ae){

                       System.out.println(ae);

               }catch(InvalidTopologyException ie){

                       System.out.println(ie);

               }

              

        }

 

}

<코드. Worker,Executor,Task 수를 설정한 HelloTopology 예제>

     Topology가 사용할 Worker 프로세스의 수 설정
Config
에서 setNumWorkers(5)를 이용해서 이 토폴로지에서 사용한 Worker 프로세스 수를 5개로 지정했다.

     Spout Executor(쓰레드 수) 설정
다음으로 setSpout에서 3번째 인자로 “2”라는 숫자를 넘겼는데, setSpout에 마지막 인자는 Executor의 수이다. 이를 Parallelism 힌트라고 하는데, Spout 컴포넌트가 수행될 쓰레드의 수이다. 여기서는 Spout Task (객체의 수)를 정의하지 않았는데, 정의하지 않은 경우 디폴트로 Executor의 수와 같이 설정된다.

     Bolt Executor(쓰레드 수)Task(객체)수 설정
Bolt
도 마찬가지로 setBolt 3번째 마지막 인자가 Parallelism 힌트인데, 역시 2개로 지정하였다. 여기서는 Task수를 별도로 지정하였는데, setTaskNum(4)을 이용해서 지정한다. 이렇게 설정하면 HelloBolt 객체는 총 4개가 생기고 2개의 Thread에서 번갈아 가면서 실행하게 된다.

자아 그러면 실제로 설정하는데로 동작하는 지 몇가지 확인을 해보자. 자바의 jps 명령을 이용하면 현재 동작중인 자바 프로세스 수를 볼 수 있다.



<그림 Worker 프로세스 수의 확인>

위의 테스트는 하나의 환경에서 nimbus,zookeeper,supervisor,worker를 모두 띄워놓은 형태인데,worker가 설정대로 5개의 프로세스가 떠있고, nimbus,supervisor가 떠 있는 것이 확인되고, QuorumPeerMainzookeeper 프로세스이다.

실제로 Executor가 지정한데로 Thread가 뜨는지 확인을 해보자. 여러개의 Worker 프로세스에 나눠서 뜨면 모니터링하기가 복잡하니 편의상 conf.setNumer(1)로 해서, 하나의 Worker 프로세스에서 모든 Executor가 뜨도록 Topology를 변경한후, Worker 프로세스의 쓰레드를 모니터링 하니 다음과 같은 결과를 얻었다.

코드상에서 HelloSpout에 대한 Parallelism 힌트를 2로 설정하고, HelloBolt에 대한 Parallelism 힌트도 2로 설정하였다.



<그림. Worker 프로세스의 쓰레드 덤프>

실제로 Worker 프로세스내의 쓰레드를 보면 HelloSpout용 쓰레드가 2, HelloBolt용 쓰레드가 2개가 기동됨을 확인할 수 있다.


리밸런싱

Storm 운영중에 노드를 추가 삭제 하거나 또는 성능 튜닝을 위해서 운영중인 환경에 Worker, Executor의 수를 재 조정이 가능하다. 이를 rebalance라고 하는데, 다음과 같은 명령어를 이용해서 가능하다.

% bin/storm rebalance [TopologyName] -n [NumberOfWorkers] -e [Spout]=[NumberOfExecutos] -e [Bolt1]=[NumberOfExecutos] [Bolt2]=[NumberOfExecutos]

미들웨어 엔지니어로써 본 Storm 튜닝

본인의 경우 경력이 톰캣이나 오라클社의 웹로직에 대해 장애진단과 성능 튜닝을 한 경력을 가지고 있어서 JVM이나 미들웨어 튜닝에 많은 관심을 가지고 있는데, 이 미들웨어 튜닝이라는 것이 대부분 JVM과 쓰레드 수등의 튜닝에 맞춰 있다보니, Storm의 병렬성 부분을 공부하다 보니, Executor Worker,Task의 수에 따라서 성능이 많이 차이가 나겠다는 생각이 든다.

특히나 하나의 토폴리지만 기동하는 것이 아니라, 여러개의 토폴로지를 하나의 클러스터에서 구동 할 경우 더 많은 변수가 작용할 수 있는데, 쓰레드란 것의 특성이 동시에 하나의 코어를 차지하고 돌기 때문에, 쓰레드수가 많다고 시스템의 성능이 좋아지지 않으며 반대로 적으면 성능을 극대화할 수 없기 때문에, 이 쓰레드의 수와 이 쓰레드에서 돌아가는 객체(Task)의 수에 따라서 성능 차이가 많이 날것으로 생각된다. 아마도 주요 튜닝 포인트가 되지 않을까 싶은데, 예전에는 보통 JVM당 적정 쓰레드 수는 50~100개 정도로 책정했는데 (톰캣과 같은 WAS 미들웨어 기준). 요즘은 코어수도 많아져서 조금 더 많은 쓰레드를 책정해도 되지 않을까 싶다. 쓰레드 수 뿐 아니라, 프로세스수도 영향을 미치는데, JVM 프로세스의 컨텐스트 스위칭은 쓰레드의 컨텐스트 스위칭보다 길기 때문에, 프로세스를 적게 띄우는 것이 좋을것으로 예상 되지만, JVM 프로세스는 메모리 GC에 의한 pausing 시간이 발생하기 때문에 이 GC 시간을 적절하게 나눠주기 위해서 적절 수 의 프로세스를 찾는 것도 숙제가 아닐까 싶다. 디폴트 worker의 옵션을 보니 768M의 힙 메모리를 가지고 기동하게 되어 있는데, 메모리를 많이 사용하는 연산는 다소 부족하지 않을까 하는 느낌이 든다.

Bolt가 데이타 베이스, 파일 또는 네트워크를 통해서 데이타를 주고 받는 연산을 얼마나 하느냐에 따라서도 CPU 사용률이 차이가 날것이기 때문에 (IO작업중에는 쓰레드가 idle 상태로 빠지고 CPU가 노는 상태가 되기 때문에) IO 작업이 많은 경우에는 쓰레드의 수를 늘리는 것이 어떨까 한다.

Bolt Spout와 같은 통신은 내부적으로 ZeroMQ를 사용하는 것으로 알고 있는데, 아직 내부 구조는 제대로 살펴보지는 않았지만, 같은 프로세스내에서는 네트워크 호출 없이 call-by-reference를 이용해서 통신 효율을 높이고, 통신이 잦은 컴포넌트는 같은 프로세스에 배치 하는 affinity와 같은 속성(?)이 있지 않을까 예측을 해본다.

결과적으로 튜닝 포인트는, Worker,Executor,Task 수의 적절한 산정과, 만약에 옵션이 있다면 리모트 호출을 줄이기 위한 Bolt Spout 컴포넌트의 배치 전략에 있지 않을까 한다.


다음 글에서는 이런 병렬 처리를 기반으로 각 컴포넌트간에 메세지를 보낼때, 여러 Task간에 어떻게 메세지를 라우팅을 하는지에 대한 정리한 그룹핑(Grouping)에 대한 개념에 대해서 알아보도록한다.