빅데이타 & 머신러닝/스트리밍 데이타 처리

Apache Storm을 이용한 실시간 데이타 처리 #2-Storm 설치와 HelloStorm 작성하기

Terry Cho 2015. 1. 12. 17:50

대충보는 Storm #2-Storm 설치와 HelloStorm 작성하기

조대협(http://bcho.tistory.com)


Apache Storm Spark

앞서 데이타 스트리밍 처리에 대해서 설명했다. 스트리밍 처리에 대표적인 오픈소스 프레임웍으로는 Apache Storm Apache Spark이 있는데ㅔ, Spark은 최근에 나온 것으로 스트리밍 처리뿐 만 아니라 조금 더 보편적인 분산 컴퓨팅을 지원하는데, Storm의 경우 나온지도 오래되었고 무엇보다 안정성 부분에서 아직까지는 Spark보다 우위에 있기 때문에, Storm을 중심으로 설명하고자 한다

HelloStorm

Storm의 내부 구조 개념등을 설명하기에 앞서, 일단 깔아서 코드부터 돌려보고 개념을 잡아보자


HelloStorm 구조

HelloWorld 처럼 간단한 HelloStorm을 만들어보자. 만들어보려고 하는 Storm 프로그램은 다음과 같다.



<그림. HelloStorm 개념 구조>


HelloSpout 이라는 클래스는, Storm에 데이타를 읽어오는 클래스로 이 예제에서는 자체적으로 데이타를 생성해낸다. Storm으로 들어오는 데이타는 Tuple이라는 형식을 따르는데, Key/Value 형식의 데이타 형을 따른다. 여기서는 키(필드명)“say”, 데이타는 “Hello” 라는 문자열을 가지고 있는 데이타 tuple을 생성한다.

HelloSpout에서 생성된 데이타는 HelloBolt라는 곳으로 전달이 되는데, HelloBolt 클래스는 데이타를 받아서 처리하는 부분으로 간단하게 들어온 데이타에서 “say” 라는 필드의 데이타 값을 System.out으로 출력해주는 역할만을 한다.


개발하기

이클립스를 사용하여, maven project를 생성한다.



다음으로 pom.xml을 작성한다.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

 

  <groupId>com.terry.storm</groupId>

  <artifactId>hellostorm</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>jar</packaging>

 

  <name>hellostorm</name>

  <url>http://maven.apache.org</url>

 

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

<dependencies>

  <dependency>

    <groupId>junit</groupId>

    <artifactId>junit</artifactId>

    <version>3.8.1</version>

    <scope>test</scope>

  </dependency>

  <dependency>

        <groupId>org.apache.storm</groupId>

        <artifactId>storm-core</artifactId>

        <version>0.9.3</version>

  </dependency>

 

</dependencies>

 

<build>

  <plugins>

    <plugin>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.2.1</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <mainClass />

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>make-assembly</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

 

</project>

<그림. pom.xml>


이 예제에서는 storm 0.9.3을 사용했기 때문에 위와 같이 storm-core 0.9.3 dependency 부분에 정의하였다.

다음으로 데이타를 생성하는 HelloSpout을 구현하자


package com.terry.storm.hellostorm;

 

import java.util.Map;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

public class HelloSpout extends BaseRichSpout {

          private static final long serialVersionUID = 1L;

          private SpoutOutputCollector collector;

         

          public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){

               this.collector = collector; 

          }

         

          public void nextTuple(){

                 this.collector.emit(new Values("hello world"));

          }

         

          public void declareOutputFields(OutputFieldsDeclarer declarer){

                 declarer.declare(new Fields("say"));

          }

         

}

<그림. HelloSpout.java>


HelloSpout 실행이 되면, 필드 “say” 값이 “hello world” 데이타를 생성해서 다음 워크 플로우로 보낸다.

nextTuple() 이라는 함수에서 외부에서 데이타를 받아들여서 다음 워크 플로우로 보내는 일을 하는데, 여기서는 외부에서 데이타를 받아들이지 않고 자체적으로 데이타를 생성하도록 한다. 데이타를 뒤에 워크플로우에 보내는 함수는 emit인데, emmit부분에 “hello world”라는 value 넣어서 보내도록 하였다. 그렇다면 필드의 값은 어떻게 정의 하느냐? 필드값은 declareOutputField라는 함수에 정의하는데, 데이타의 필드는 “say” 정의하였다.


다음으로 이 HelloSpout에서 생성할 데이타를 처리한 HelloBolt를 구현해보자


package com.terry.storm.hellostorm;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

 

public class HelloBolt extends BaseBasicBolt{

 

        public void execute(Tuple tuple, BasicOutputCollector collector) {

               // TODO Auto-generated method stub

               String value = tuple.getStringByField("say");

               System.out.println("Tuple value is"+value);

        }

 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

               // TODO Auto-generated method stub

              

        }

 

}

<그림. HelloBolt.java>


HelloSpout에서 생성된 데이타는 HelloBolt 들어오는데, 데이타가 들어오면 execute라는 메서드가 자동으로 수행된다. 이때, Tuple 통해서 데이타가 전달된다. 여기서는 tuple에서 필드이름이 “say” 값을 tuple.getStringByField(“say”) 이용해서 꺼내서 System.out으로 출력했다.

눈치가 빠른 사람이라면 벌써 알아차렸겠지만, 데이타를 다음 플로우로 보내고자 할때는 앞의 HelloSpout에서 한것처럼, execute 메서드내에서 데이타 처리가 끝난후에, collector.emit 이용해서 다음 플로우로 보내고, delcareOutputField에서 데이타에 대한 필드를 정의하면 된다.

데이타를 생성하는 Spout 데이타를 처리 하는 Bolt 구현했으면 둘을 연결 시켜줘야 한다. 이를 연결시켜주는 것이 Topology인데, HelloTopologyLocal 클래스를 구현해 보자


package com.terry.storm.hellostorm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

public class HelloTopologyLocal {

        public static void main(String args[]){

               TopologyBuilder builder = new TopologyBuilder();

               builder.setSpout("HelloSpout", new HelloSpout(),2);

               builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

              

               Config conf = new Config();

               conf.setDebug(true);

               LocalCluster cluster = new LocalCluster();

              

               cluster.submitTopology("HelloTopologyLocal", conf,builder.createTopology());

               Utils.sleep(10000);

               // kill the LearningStormTopology

               cluster.killTopology("HelloTopologyLocal");

               // shutdown the storm test cluster

               cluster.shutdown();          

        }

 

}

<그림. HelloTolologyLocal.java>


나중에 개념에서 자세하 설명하겠지만, Topology 데이타를 생성하는 Spout 처리하는 Bolt간에 토폴로지 데이타 흐름을 정의하는 부분이다. Spout Bolt들을 묶어 주는 부분이다.

먼저 TopologyBuilder 이용해서 Topology 생성하고, setSpout 이용해서 앞에서 구현한 HelloSpout 연결한다.

다음으로, setBolt 이용해서 Bolt Topology 연결한다. 후에, HelloSpout HelloBolt 연결해야 하는데, setBolt시에, SuffleGrouping 메서드를 이용하여, HelloBolt HelloSpout으로 부터 생성되는 데이타를 읽어들임을 명시한다.

builder.setBolt("HelloBolt", new HelloBolt(),4).shuffleGrouping("HelloSpout");

이렇게 Topology 구성되었으면이 Topology 실제로 실행해야 하는데, Topology 어떤 서버에서 어떤 포트등을 이용해서 실행될지는 Config 정의할 있지만, 여기서는 간단한 테스트이기  때문에 별도의 복잡한 Config 정보는 기술하지 않았다.

다음으로 이렇게 만들어진 Topology Storm 클러스터에 배포해야 하는데, Storm 개발의 편의를 위해서 두가지 형태의 클러스터를 제공한다. 개발용 클러스터와 실운영 환경용 클러스터를 제공하는데, 여기서는 LocalCluster cluster = new LocalCluster();

라는  것을 사용하였다.

LocalCluster 개발환경용 클러스터로, 개발자의 환경에서 최소한의 서버들만을 기동하여 개발한 토폴로지를 테스트할 있게 해준다. 이렇게 Cluster 생성했으면 cluster.submitTopology 이용하여 개발한 토폴로지를 배포한다. 토폴로지가 배포되면 자동으로 토폴로지가 실행이 된다. HelloSpout 계속해서 데이타를 생성하고, HelloBolt 생성된 데이타를 받아서 System.out.println으로 출력하게 되는데, 10초후에 멈추게 하기 위해서, Sleep 10초를 준다. 토폴로지 코드를 실행하는 쓰레드는 Sleep으로 빠질지 모르지만 토폴로지에서 생성된 HelloSpout HelloBolt 쓰레드는 백그라운드에서 작업을 계속 진행한다.

10초후에는 killTopology 이용해서 해당 토폴로지를 제거하고 shutdown 이용해서 Storm 클러스터를 종료시킨다.

실행하기

여기까지 구현했으면 첫번째 Storm 프로그램을 기동해보자. 다음과 같이 maven 명령어를 이용하면 실행이 가능하다.

C:\dev\ws\java_workspace\com.terry.storm>mvn exec:java -Dexec.mainClass=com.terry.storm.hellostorm.HelloTopologyLocal -Dexec.classpath.Scope=compile

실행을 해보면, HelloSpout 데이타를 생성하고, HelloBolt 이를 받아서 화면에 출력하는 것을 있다.


6292 [Thread-16-HelloSpout] INFO  backtype.storm.daemon.task - Emitting: HelloSpout default [hello world]

6292 [Thread-22-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:5, stream: default, id: {}, [hello world]

Tuple value ishello world

6292 [Thread-10-HelloBolt] INFO  backtype.storm.daemon.executor - Processing received message source: HelloSpout:6, stream: default, id: {}, [hello world]

Tuple value ishello world

ZooKeeper 에러 대응하기

종종 환경에 따라서 실행이 안되면서 다음과 같은 에러가 출력되는 경우가 있는데


3629 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@7bfd25ce

3649 [main-SendThread(0:0:0:0:0:0:0:1:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (java.lang.SecurityException: 로그인 구성을 찾을 없습니다.)

3650 [main-SendThread(0:0:0:0:0:0:0:1:2000)] ERROR org.apache.storm.zookeeper.ClientCnxnSocketNIO - Unable to open socket to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000

3655 [main-SendThread(0:0:0:0:0:0:0:1:2000)] WARN  org.apache.storm.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect

java.net.SocketException: Address family not supported by protocol family: connect

        at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_37]

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) ~[na:1.6.0_37]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:967) ~[storm-core-0.9.3.jar:0.9.3]

        at org.apache.storm.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003) ~[storm-core-0.9.3.jar:0.9.3]


이 에러는 Storm Zookeeper와 연결을 할 수 없어서 나는 에러인데, LocalCluster 모드로 기동할 경우,Storm embedded Zookeeper를 기동해서 이 Zookeeper와 연결되어야 하나. IPV6로 연결을 시도하기 때문에, (ZK IPV4 Listen하는데) 발생하는 문제로

java로 실행할때 "-Djava.net.preferIPv4Stack=true" 옵션을 주면, JVM IPV6를 사용하지 않고, V4를 사용하기 때문에, ZooKeeper IPV4로 뜨고, Storm IPV4로 연결을 시도하기 때문에 문제가 없어진다.

지금까지 간단하게나마 첫번째 Storm 프로그램을 작성해서 실행해보았다.

다음에는 Storm 이루는 컴포넌트 구조와 아키텍쳐에 대해서 설명하도록 한다