빅데이타 & 머신러닝/스트리밍 데이타 처리

Apache Storm을 이용한 실시간 데이타 처리 #4 –소개와 기본 개념

Terry Cho 2015. 1. 25. 11:18

 

대충보는 Storm #4-Apache Storm 특징과 기본 개념

 조대협 (http://bcho.tistory.com)


지금까지 Storm에 대해서 이해하기 위해서, 실시간 스트리밍 서비스의 개념에 대해서 알아보고 간단한 HelloStorm 애플리케이션을 제작해서, 싱글 클러스터 노드에 배포해봤다. 대략 실시간 스트리밍이 무엇이고, Storm을 이용해서 어떻게 개발하는지에 대해서는 어느정도 이해를 했을 것이라고 생각한다.

그러면 지금까지의 경험을 조금 더 체졔적으로 정리해서 Storm에 대해서 이해해보도록 하자. 이번에는 Storm에 대한 개념과 아키텍쳐 구조에 대해서 알아보겠다.


Storm의 특징

Storm을 실시간 스트리밍을 처리하기 위한 서버이자 프레임웍이다. 그렇다면 이 Storm이 다른 스트리밍 처리 솔루션에 비해 가지는 특징은 무엇일까?


확장성

Storm은 클러스터링 기능을 이용해서 수평으로 확장이 가능하다. 그래서 많은 데이타를 다루어야 하는 빅데이타이 데이타 스트림 서비스에서도 가능한데, Storm홈페이지에 포스팅된 자료를 보면, 2x2.4GHz CPU 24GB 메모리 머신을 기반으로 초당 100바이트짜리 메세지를 100만개 정도 처리가 가능하다고 한다. (https://storm.apache.org/about/scalable.html) TPS로 환산하면 100 TPS이다. (Wow!!)


장애 대응성

Storm의 다른 특징 중의 하나는 Fault tolerant 구조를 통한 장애 대응 능력이다. ZooKeeper를 통해서 전체 클러스터의 운영 상태를 감지 하면서, 특정 노드가 장애가 나더라도, 시스템이 전혀 문제 없이 작업을 진행할 수 있으며, 장애가 난 노드에 할당된 작업은 다른 노드에 할당해서 처리하고, 장애 노드에 대해서는 복구 처리를 자동으로 수행해준다.


메세지 전달 보장

Storm은 메세지 처리에 안정성을 제공하는데, 장애가 나건 문제가 있건간에,유실 없이 최소한 한번 메세지가 처리될 수 있게 지원한다. (at least once : 이말은 반대로 이야기 하면, 1번 이상 같은 메세지가 중복 처리될 수 있다는 이야기이다.)

만약에 정확하게 메세지가 한번만 처리가 되기를 원하면 Trident (https://storm.apache.org/documentation/Trident-tutorial.html) 를 통해서 Storm을 확장하면, 정확히 하나의 메세지가 한번만 처리되도록 할 수 있다.


쉬운 배포

Storm은 메뉴얼에 따르면(?) 배포와 설정이 매우 쉽다. 실제로 클러스터를 구성해보면 분산 시스템인데 비해서 별로 어려움 없이 배포와 설정이 가능하다. 그리고 메뉴얼에 따르면(?) 배포 후에, 크게 많은 관리 없이 운영이 가능하다고는 하는데, 이것은 실제로 해보지 않았기 때문에 패스

일단, 오픈소스인데도 설치 후 웹 기반의 모니터링 콘솔을 제공하기 때문에 시스템의 상태를 쉽게 모니터링 하고 운영하는데 도움을 준다.


여러 프로그래밍 언어 지원

Storm은 기본적으로 JVM (Java Virtual Machine)위에서 동작하기는 하지만, Twitter Thrift 프로토콜을 기반으로 하기 때문에, 다양한 언어로 구현이 가능하다. Java 뿐 아니라, JVM을 사용하지 않는 경우에 대해서는 stdin/stdout을 통해서 데이타를 주고 받음으로써, Ruby,Python,Javascript,Perl 등 다양한 언어를 사용할 수 있다.


다양한 시스템 연계

Storm은 다양한 다른 솔루션과 통합이 가능하다. 데이타를 수집하는 부분에서는 Kestrel (http://robey.github.io/kestrel/), RabbitMQ (http://www.rabbitmq.com/) , Kafka (http://kafka.apache.org/), JMS 프로토콜, mazon Kinesis (http://aws.amazon.com/kinesis/)

등이 연동이 가능하며, 다양한 데이타 베이스 (RDBMS, Cassandra, MongoDB )에도 쉽게 연계가가능하다. CEP(Complex Event Processing을 지원하는) 이벤트 처리 분야에서는  Drools (http://www.drools.org/),  Esper (http://esper.codehaus.org/등이 연계 가능하고. 그외에도 Elastic Search (http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/storm.html

) , node.js (https://github.com/paralect/storm-nodejs-starterkit)등 다양한 솔루션과 연동을 통해서 시스템을 확장해 나갈 수 있다.


오픈소스

마지막으로 Storm은 오픈소스이다. 상업적 활용이 가능한 Apache License 2.0을 따르고 있다.

Apache License 2.0 http://ko.wikipedia.org/wiki/%EC%95%84%ED%8C%8C%EC%B9%98_%EB%9D%BC%EC%9D%B4%EC%84%A0%EC%8A%A4

Apache License 2.0 한국 번역본 http://yesarang.tistory.com/272

Storm은 유사한 특징을 가지고 있는 Apache Spark에 비해서, 개발이 된지 오래되어서 안정성이 높고 특유의 구조상 장애 대처 능력과 메세지 전달 보장 능력등이 좋다. 반대로 Spark은 최근에 만들어진 만큼 머신 러닝등 더 많은 기능을 가지고 있다.

Storm의 기본 개념

자아 그러면 이제 Storm의 개념을 다시 정립해보자. Storm의 개념을 이해하려면 필수적으로 먼저 이해해야 하는것이 Spout Bolt의 개념이다.

Spout Bolt

Spout Storm 클러스터로 데이타를 읽어들이는 데이타 소스이다. 외부의 로그 파일이나, 트위터 타임 피드와 같인 데이타 스트림, 큐등에서 데이타를 읽어드린다. 이렇게 읽어 드린 데이타를 다른 Bolt로 전달한다. Spout에는 크게 4가지 중요한 메서드가 있다.

Ÿ   open() : 이 메서드는 Spout이 처음 초기화 될때 한번만 호출되는 메서드로, 데이타 소스로 부터의 연결을 초기화 하는 등의 역할을 한다.

Ÿ   nextTuple() : 이 메서드는 데이타 스트림 하나를 읽고 나서, 다음 데이타 스트림을 읽을 때 호출 되는 메서드 이다.

Ÿ   ack(Object msgId) : 이 메서드는 데이타 스트림이 성공적으로 처리되었을때 호출되는데, 이 메서드에서는 성공 처리된 메세지를 지우는 등, 성공 처리에 대한 후처리를 구현한다.

Ÿ   fail(Object msgId) : 이 메서드는 해당 데이타 스트림이 Storm 토폴로지를 수행하던중에, 에러가 나거나 타임아웃등이 걸렸을때 호출되는데,이때에는 사용자가 에러에 대한 에처 처리 로직을 명시해야 한다. 흔히 재처리 로직을 구현하거나 또는 에러 로깅등의 처리를 하게 된다.

Bolt는 이렇게 읽어 드린 데이타를 처리하는 함수이다. 입력 값으로 데이타 스트림을 받고, 그 데이타를 내부의 비지니스 로직에 따라서 가공한 다음에 데이타 스트림으로 다른 Bolt로 넘겨주거나 종료 한다. Bolt에서 정의되는 주요한 메서드는 다음과 같다.

Ÿ   prepare (Map stormConf, TopologyContext context, OutputCollector collector): 이 메서드는 Bolt 객체가 생성될때 한번 호출 된다. 각종 설정 정보나 컨텍스트등 초기 설정에 필요한 부분을 세팅하게 된다.

Ÿ   execute(Tuple input): 가장 필수적인 메서드로, Bolt에 들어온 메세지를 처리하는 로직을 갖는다. 종단 Bolt가 아닌 경우에는 다음 Bolt로 메세지를 전달하기도 한다.

Storm 클러스터내에는 여러개의 Spout Bolt가 존재하게 된다.



<그림. Storm Spout Bolt의 개념>

Topology

이렇게 여러 개의 Spout Bolt간의 연관 관계를 정의해서 데이타 흐름을 정의하는 것을 토폴로지(Topology)라고 한다. 아래 그림과 같이 데이타가 어디로 들어와서 어디로 나가는지를 정의하는 것인데, 아래 그림은 두 개의 Spout에 대해서 각각의 토폴로지를 정의하였다.



<그림. Storm Topology>

Spout Bolt간의 연결 토폴로지는 TopologyBuilder라는 클래스를 통해서 정의한다. 그러면 간략하게, Spout Bolt, Bolt간의 데이타 흐름 관계를 어떻게 정의하는지 살펴보도록 하자.

다음과 같은 토폴로지 흐름을 정의한다고 가정하자



<그림. 간단한 토폴로지 정의 예제>


HelloSpout은 앞서 예제에서 만든것과 같은 Spout이고,

EchoBoltA 는 각각 들어온 메세지에 “Hello I am BoltA :”+메세지를 붙여서 화면에 출력한 후 전송하고 EechoBoltB는 들어온 메세지에 “Hello I am BoltB :”+메세지를 붙여서 전송한다.


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class EchoBoltA extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Hello I am Bolt A: "+value);

               collector.emit(new Values("Hello I am Bolt A :"+value));

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

                 declarer.declare(new Fields("say"));

        }

 

}

<그림 EchoBoltA 클래스>


※ 참고 EchoBoltB 클래스도 클래스명만 다르고 내부 구현 내용은 동일하다

토폴로지를 정의할때, HelloSpout hs라는 ID로 생성을 할것이고, EchoBoltA eba라는 ID, EchoBoltB ebb라는 이름으로 생성을 할것이다.

토폴로지 생성 코드를 보자. 아래 노랑색으로 표시된 부분이 실제 토폴로지는 구성하는 부분이다.


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

import com.terry.storm.hellostorm.EchoBoltB;

import com.terry.storm.hellostorm.EchoBoltA;

 

public class ToplogySequence {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("hs", new HelloSpout(),1);

               builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

               builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

              

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("ToplogySequence", conf,builder.createTopology());

               Utils.sleep(1000);

               // kill the LearningStormTopology

               cluster.killTopology("ToplogySequence");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

}

 

<그림. 위의 그림에 있는 토폴로리지를 실제로 구현한 >

 

Spout을 구현한 부분을 보자


builder.setSpout("hs", new HelloSpout(),1);

 

를 통해서 Spout을 생성하는데, setSpout(“{id}”,”{Spout 객체}”,”{Parallelism 힌트}”) 로 이루어진다. 여기서 id“hs”로 정의했고, Spout 객체는 HelloSpout을 지정했다.

    Paralleism 힌트는 나중에 병령 처리와 그룹핑 개념을 설명할때 다시 설명하도록 한다.

다음으로 Bolt를 생성하는데,

builder.setBolt("eba", new EchoBoltA(),1).shuffleGrouping("hs");

builder.setBolt("ebb", new EchoBoltB(),1).shuffleGrouping("eba");

 

로 각각의 Bolt를 생성했다. 이때 주목해야 하는 점이 뒤에 붙어 있는 shufflerGrouping이라는 메서드인데, Spout Bolt간의 연관 관계는 이 Grouping이라는 개념을 이용해서 생성한다. Grouping에는 여러가지 종류와 개념이 있지만 여기서는 간단한 shuuflerGrouping만을 사용했다.첫번째 EchoBoltA에서 자신을 “eba” 라는 id로 생성을 한후에, suffelerGrouping(“hs”)를 선언했는데, 이는 “hs”라는 ID를 가지고 있는 Spout이나 Bolt로 부터 메세지를 받아들이겠다는 이야기이다. 두번째 EchoBoltBsuffelerGrouping(“eba”)를 통해서, id“eba” Spout이나 Bolt, 즉 앞서 생성한 EchBoltA로 부터 메세지를 받아들이겠다는 이야기이다.

자 그러면 이 토폴로지를 실행해 보자.

실행하면 다음과 같은 로그를 얻을 수 있다.

5399 [Thread-12-hs] INFO  backtype.storm.daemon.task - Emitting: hs default [hello world 1]

5400 [Thread-8-eba] INFO  backtype.storm.daemon.executor - Processing received message source: hs:4, stream: default, id: {}, [hello world 1]

Hello I am Bolt A: hello world 1

5401 [Thread-8-eba] INFO  backtype.storm.daemon.task - Emitting: eba default [Hello I am Bolt A :hello world 1]

5409 [Thread-10-ebb] INFO  backtype.storm.daemon.executor - Processing received message source: eba:2, stream: default, id: {}, [Hello I am Bolt A :hello world 1]

Hello I am Bolt B: Hello I am Bolt A :hello world 1

 

     5399 번 라인에서 12번 쓰레드에서 수행되는 “hs” 라는 이름의 Spout, “hello world 1” 이라는 문자열을 emit (제출) 하였다

     5400 번 라인에서 8번 쓰레드에서 수행되는 “eba”라는 Bolt“hs”라는 Spout또는 Bolt에서 “hello world”라는 문자열을 받았다. 그 다음 라인에 “Hello I am Bolt A: hello world 1”가 출력되는 것을 확인할 수 있다.

     5401 라인에서 8번 쓰레드에서 수행되는 “eba” 볼트가 “Hello I am Bolt A :hello world 1” 라는 문자열을 제출하였다.

     5409 라인세서 10번 쓰레드에서 수행되는 “ebb”라는 id의 볼트가 “eba”로 부터 “Hello I am Bolt A :hello world 1” 라는 메세지를 받았다. 다음 행에 EchoBoltB에 의해서 처리되어 “Hello I am Bolt B: Hello I am Bolt A :hello world 1” 문자열이 출력되었음을 확인할 수 있다.

Stream Tuple

다음으로 데이타 Stream Tuple에 대한 개념을 이해해야 한다.

Storm에서 데이타는 Stream이라는 개념으로 정의되는데, Stream이란, Spout Bolt간 또는 Bolt간을 이동하는 데이타들의 집합을 이야기 한다.

각각의 Stream은 하나의 Tuple로 이루어 지는데, Tuple형태로 정의된다.



<그림. Storm Stream Tuple 개념>


앞에 예제에서는 하나의 키만 있는 Tuple을 사용하였다.

앞에서 사용한 HelloSpout 클래스를 다시 한번 살펴보면


public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private static int count=0;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 if(count++<10) this.collector.emit(new Values("hello world "+count));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout>

nextTuple 부분에서 newValue로 하나의 값을 보내는 것을 볼 수 있다. 그리고 이 Tuple의 키 구조는 아래 declareOutputFields 메서드에서 “say” 라는 필드이름으로 정의된것을 볼 수 있다.

실제로 HelloSpout에서 생성하는 데이타 스트림은 다음과 같다.



<그림 데이타 Stream Turple 구조>

이번 글에서는 간단하게 Storm의 특징과 기본 개념에 대해서 알아보았다. 다음글에서는 조금더 상세한 Storm의 아키텍쳐의 개념과 병렬 처리 개념에 대해서 알아보도록 하겠다.

 

 

그리드형