블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

텐서플로우 - 파일에서 학습데이타를 읽어보자#1


조대협 (http://bcho.tistory.com)


텐서플로우를 학습하면서 실제 모델을 만들어보려고 하니 생각보다 데이타 처리에 대한 부분에서 많은 노하우가 필요하다는 것을 알게되었다. MNIST와 같은 예제는 데이타가 다 이쁘게 정리되어서 학습 하기 좋은 형태로 되어 있지만, 실제로 내 모델을 만들고 학습을 하기 위해서는 데이타에 대한 정재와 분류 작업등이 많이 필요하다.


이번글에서는 학습에 필요한 데이타를 파일에서 읽을때 필요한 큐에 대한 개념에 대해서 알아보도록 한다.


피딩 (Feeding) 개념 복습


텐서플로우에서 모델을 학습 시킬때, 학습 데이타를 모델에 적용하는 방법은 일반적으로 피딩 (feeding)이라는 방법을 사용한다. 메모리상의 어떤 변수 리스트 형태로 값을 저장한 후에, 모델을 세션에서 실행할 때, 리스트에서 값을 하나씩 읽어서 모델에 집어 넣는 방식이다.



위의 그림을 보면, y=W*x라는 모델에서 학습 데이타 x는 [1,2,3,4,5]로, 첫번째 학습에는 1, 두번째 학습에는 2를 적용하는 식으로 피딩이 된다.

그런데, 이렇게 피딩을 하려면, 학습 데이타 [1,2,3,4,5]가 메모리에 모두 적재되어야 하는데, 실제로 모델을 만들어서 학습을할때는 데이타의 양이 많기 때문에 메모리에 모두 적재하고 학습을 할 수 가 없고, 파일에서 읽어드리면서 학습을 해야 한다.


텐서플로우 큐에 대해서

이러한 문제를 해결하기 위해서는 파일에서 데이타를 읽어가면서, 읽은 데이타를 순차적으로 모델에 피딩하면 되는데, 이때 큐를 사용한다.


파일에서 데이타를 읽는 방법에 앞서서 큐를 설명하면, 큐에 데이타를 넣는 것(Enqueue) 은 Queue Runner 라는 것이 한다.

이 Queue Runner가 큐에 어떤 데이타를 어떻게 넣을지를 정의 하는 것이 Enqueue_operation인데, 데이타를 읽어서 실제로 어떻게 Queue에 Enqueue 하는지를 정의한다.


이 Queue Runner는 멀티 쓰레드로 작동하는데, Queue Runner 안의 쓰레드들을 관리해주기 위해서 별도로 Coordinator라는 것을 사용한다.


이 개념을 정리해서 도식화 해주면 다음과 같다.


=


Queue Runner 는 여러개의 쓰레드 (T)를 가지고 있고, 이 쓰레드들은 Coordinator들에 의해서 관리된다. Queue Runner 가 Queue에 데이타를 넣을때는 Enqueue_op이라는 operation에 의해 정의된 데로 데이타를 Queue에 집어 넣는다.


위의 개념을 코드로 구현해보자


import tensorflow as tf


QUEUE_LENGTH = 20

q = tf.FIFOQueue(QUEUE_LENGTH,"float")

enq_ops = q.enqueue_many(([1.0,2.0,3.0,4.0],) )

qr = tf.train.QueueRunner(q,[enq_ops,enq_ops,enq_ops])


sess = tf.Session()

# Create a coordinator, launch the queue runner threads.

coord = tf.train.Coordinator()

threads = qr.create_threads(sess, coord=coord, start=True)


for step in xrange(20):

   print(sess.run(q.dequeue()))


coord.request_stop()

coord.join(threads)


sess.close()


Queue 생성

tf.FIFOQUEUE를 이용해서 큐를 생성한다.

q = tf.FIFOQueue(QUEUE_LENGTH,"float")

첫번째 인자는 큐의 길이를 정하고, 두번째는 dtype으로 큐에 들어갈 데이타형을 지정한다.

Queue Runner 생성

다음은 Queue Runner를 만들기 위해서 enqueue_operation 과, QueueRunner를 생성한다.

enq_ops = q.enqueue_many(([1.0,2.0,3.0,4.0],) )

qr = tf.train.QueueRunner(q,[enq_ops,enq_ops,enq_ops])

enqueue operation인 enq_ops는 위와 같이 한번에 [1.0,2.0,3.0,4.0] 을 큐에 넣는 operation으로 지정한다.

그리고 Queue Runner를 정의하는데, 앞에 만든 큐에 데이타를 넣을것이기 때문에 인자로 큐 ‘q’를 넘기고 list 형태로 enq_ops를 3개를 넘긴다. 3개를 넘기는 이유는 Queue Runner가 멀티쓰레드 기반이기 때문에 각 쓰레드에서 Enqueue시 사용할 Operation을 넘기는 것으로, 3개를 넘긴것은 3개의 쓰레드에 Enqueue 함수를 각각 지정한 것이다.

만약 동일한 enqueue operation을 여러개의 쓰레드로 넘길 경우 위 코드처럼 일일이 enqueue operation을 쓸 필요 없이

qr = tf.train.QueueRunner(q,[enq_ops]*NUM_OF_THREAD)

[enq_ops] 에 쓰레드 수 (NUM_OF_THREAD)를 곱해주면 된다.

Coordinator 생성

이제 Queue Runner에서 사용할 쓰레드들을 관리할 Coordinator를 생성하자

coord = tf.train.Coordinator()

Queue Runner용 쓰레드 생성

Queue Runner와 쓰레드를 관리할 Coordinator 가 생성되었으면, Queue Runner에서 사용할 쓰레드들을 생성하자

threads = qr.create_threads(sess, coord=coord, start=True)

생성시에는 세션과, Coordinator를 지정하고, start=True로 해준다.

start=True로 설정하지 않으면, 쓰레드가 생성은 되었지만, 동작을 하지 않기 때문에, 큐에 메세지를 넣지 않는다.

큐 사용

이제 큐에서 데이타를 꺼내와 보자. 아래코드는 큐에서 20번 데이타를 꺼내와서 출력하는 코드이다.

for step in xrange(20):

   print(sess.run(q.dequeue()))


큐가 비워지면, QueueRunner를 이용하여 계속해서 데이타를 채워 넣는다. 즉 큐가 비기전에 계속해서 [1.0,2.0,3.0,4.0] 데이타가 큐에 계속 쌓인다.

쓰레드 정지

큐 사용이 끝났으면 Queue Runner의 쓰레드들을 모두 정지 시켜야 한다.

coord.request_stop()

을 이용하면 모든 쓰레드들을 정지 시킨다.

coord.join(threads)

는 다음 코드를 진행하기전에, Queue Runner의 모든 쓰레드들이 정지될때 까지 기다리는 코드이다.

멀티 쓰레드

Queue Runner가 멀티 쓰레드라고 하는데, 그렇다면 쓰레드들이 어떻게 데이타를 큐에 넣고 enqueue 연산은 어떻게 동작할까?

그래서, 간단한 테스트를 해봤다. 3개의 쓰레드를 만든 후에, 각 쓰레드에 따른 enqueue operation을 다르게 지정해봤다.

import tensorflow as tf


QUEUE_LENGTH = 20

q = tf.FIFOQueue(QUEUE_LENGTH,"float")

enq_ops1 = q.enqueue_many(([1.0,2.0,3.0],) )

enq_ops2 = q.enqueue_many(([4.0,5.0,6.0],) )

enq_ops3 = q.enqueue_many(([6.0,7.0,8.0],) )

qr = tf.train.QueueRunner(q,[enq_ops1,enq_ops2,enq_ops3])


sess = tf.Session()

# Create a coordinator, launch the queue runner threads.

coord = tf.train.Coordinator()

threads = qr.create_threads(sess, coord=coord, start=True)


for step in xrange(20):

   print(sess.run(q.dequeue()))


coord.request_stop()

coord.join(threads)


sess.close()


실행을 했더니, 다음과 같은 결과를 얻었다.


첫번째 실행 결과

1.0

2.0

3.0

4.0

5.0

6.0

6.0

7.0

8.0



두번째 실행결과

1.0

2.0

3.0

1.0

2.0

3.0

4.0

5.0

6.0


결과에서 보는것과 같이 Queue Runner의 3개의 쓰레드중 하나가 무작위로 (순서에 상관없이) 실행되서 데이타가 들어가는 것을 볼 수 있었다.


파일에서 데이타 읽기


자 그러면 이 큐를 이용해서, 파일 목록을 읽고, 파일을 열어서 학습 데이타를 추출해서 학습 파이프라인에 데이타를 넣어주면 된다.

텐서 플로우에서는 파일에서 데이타를 읽는 처리를 위해서 앞에서 설명한 큐 뿐만 아니라 Reader와 Decoder와 같은 부가적인 기능을 제공한다.


  1. 파일 목록을 읽는다.

  2. 읽은 파일목록을 filename queue에 저장한다.

  3. Reader 가 finename queue 에서 파일명을 하나씩 읽어온다.

  4. Decoder에서 해당 파일을 열어서 데이타를 읽어들인다.

  5. 필요하면 읽어드린 데이타를 텐서플로우 모델에 맞게 정재한다. (이미지를 리사이즈 하거나, 칼라 사진을 흑백으로 바꾸거나 하는 등의 작업)

  6. 텐서 플로우에 맞게 정재된 학습 데이타를 학습 데이타 큐인 Example Queue에 저장한다.

  7. 모델에서 Example Queue로 부터 학습 데이타를 읽어서 학습을 한다.


먼저 파일 목록을 읽는 부분은 파일 목록을 읽어서 각 파일명을  큐에 넣은 부분을 살펴보자.

다음 예제코드는 파일명 목록을 받은 후에, filename queue에 파일명을 넣은후에, 파일명을 하나씩 꺼내는 예제이다.

import tensorflow as tf


filename_queue = tf.train.string_input_producer(["1","2","3"],shuffle=False)


with tf.Session() as sess:

   

   coord = tf.train.Coordinator()

   threads = tf.train.start_queue_runners(coord=coord,sess=sess)

   

   for step in xrange(10):

       print(sess.run(filename_queue.dequeue()) )


   coord.request_stop()

   coord.join(threads)


코드를 보면 큐 생성이나, enqueue operation 처리들이 다소 다른것을 볼 수 있는데, 이는 텐서플로우에서는  학습용 파일 목록을 편리하게 처리 하기 위해서 조금 더 추상화된 함수들을 제공하기 때문이다.


filename_queue = tf.train.string_input_producer(["1","2","3"],shuffle=False)


train.xx_input_producer() 함수는 입력 받은 큐를 만드는 역할을 한다.

위의 명령을 수행하면, filename queue 가 FIFO (First In First Out)형태로 생긴다.


큐가 생기기는 하지만, 실제로 큐에 파일명이 들어가지는 않는다. (아직 Queue Runner와 쓰레드들을 생성하지 않았기 때문에)

다음으로 쓰레드를 관리하기 위한 Coordinator 를 생성한다.

   coord = tf.train.Coordinator()

Coordinator 가 생성이 되었으면 Queue Runner와 Queue Runner에서 사용할 Thread들을 생성해주는데,  start_queue_runner 라는 함수로, 이 기능들을 모두 구현해놨다.

   threads = tf.train.start_queue_runners(coord=coord,sess=sess)

이 함수는 Queue Runner와, 쓰레드 생성 및 시작 뿐 만 아니라 Queue Runner 쓰레드가 사용하는 enqueue operation 까지 파일형태에 맞춰서 자동으로 생성 및 지정해준다.






Queue, Queue Runner, Coordinator와 Queue Runner가 사용할 쓰레드들이 생성되고 시작되었기 때문에,Queue Runner는 filename queue에 파일명을 enqueue 하기 시작한다.

파일명 Shuffling

위의 예제를 실행하면 파일명이 다음과 같이 1,2,3 이 순차적으로 반복되서 나오는 것을 볼 수 있다.

실행 결과

1

2

3

1

2

3

1

2

3

1


만약에 파일명을 랜덤하게 섞어서 나오게 하려면 어떻게해야 할까? (매번 학습시 학습데이타가 일정 패턴으로 몰려서 편향되지 않고, 랜덤하게 나와서 학습 효과를 높이고자 할때)

filename_queue = tf.train.string_input_producer(["1","2","3"],shuffle=False)

큐를 만들때, 다음과 같이 셔플 옵션을 True로 주면 된다.

filename_queue = tf.train.string_input_producer(["1","2","3"],shuffle=True)

실행 결과

2

1

3

2

3

1

2

3

1

1

지금까지 파일명을 지정해서 이 파일명들을 filename queue에 넣는 방법에 대해서 알아보았다.

다음은 이 file name queue에서 파일을 순차적으로 꺼내서

  • 파일을 읽어드리고

  • 각 파일을 파싱해서 학습 데이타를 만들고

  • 학습 데이타용 큐 (example queue)에 넣는 방법

에 대해서 설명하도록 한다.




구글 클라우드의 대용량 메세지 큐 Pub/Sub 소개

조대협 (http://bcho.tistory.com)




구글 클라우드의 Pub/Sub 은 클라우드 기반의 대용량 메세지 큐이다.

흔히들 사용하는 RabbitMQ, JMS나 Kafka의 클라우드 버전으로 보면 된다. Rabbit MQ와 같은 설치형 큐가 작은 메세지에 대해서 세심한 컨트롤을 제공한다고 하면, Kafka나 Pub/Sub은 대용량 스케일의 메세지를 처리하기 위해서 설계 되었고, 자잘한 기능보다는 용량에 목적을 둔다.

그중에서 Pub/Sub은 클라우드 기반의 서비스로 비동기 메세징이 필요한 기능을 매니지드 서비스 형태로 제공함으로써, 별도의 설치나 운영이 필요 없이 손쉽게, 사용이 가능하다.

보통 특정 클라우드 벤더의 매지니드 솔루션은 Lock in 이슈 (한번 개발하면 다른 플랫폼으로 옮기기가 어려운)가 있어서 쉽사리 권하기는 어렵지만, 사용법이 간단하고 Lock in을 감수하고도 기능이 막강한 서비스나, 타 서비스로 전환이 쉬운 서비스일 경우에는 적극적으로 권장하는 편이다.


Pub/Sub의 경우 대용량 큐 서비스이기 때문에 Kafka 처럼 설치나 운영이 필요없음에도 불구하고 대용량 처리를 지원하면서 사용이 매우 쉽고, 코딩 양이 매우 적어서 차후에 다른 솔루션으로 교체가 용이하고 또한 대용량 장점과, 운영 대행의 장점으로 Lock in에 대한 단점을 충분히 커버하리라고 본다.

특징

주요 특징을 살펴보면, 글로벌 스케일 큐로, 전세계 어느 데이타 센터에서 접속하던지 구글 자체 광케이블망을 이용하여 빠른 접근을 제공한다.

메세지 전달 보장을 기능이 있으며, 큐에서 메세지를 PULLING 하는 기능뿐만 아니라, 큐가 메세지를 받는 쪽으로 HTTP를 이용하여 PUSH 해줄 수 있다.

토폴로지

구글 Pub/Sub 은 Message Provider (보내는쪽)과 Message Consumer (받는쪽)이 1:1 관계가 아니라. 1:N 관계이다.


Pub/Sub에는 Topic과 Subscription이라는 개념이 존재하는데,  Topic 을 큐로 생각하면 된다.

Message Provider가 Topic으로 메세지를 보내게 되고, 메세지를 읽으려면 Subscription 이라는 구독 채널을 설정해야 한다. Subscription은 하나의 Topic에 대해서 1..N개가 생성될 수 있다.

클라이언트는 각각의 Subscription에 붙어서 메세지를 받을 수 있다.

예를 들어서 하나의 메세지를 로그 시스템과 데이타 베이스 양쪽에 저장하고 싶을때는 Topic을 만든 후에, 로그 시스템용 Subscription, 데이타 베이스용 Subscription을 각각 만들어서 데이타를 읽으면 된다.


클라이언트 인터페이스

구글 Pub/Sub의 연동은 크게 다음과 같이 3가지 방법으로 접근이 가능하다.

메세지 구조와 생명 주기

Pub/Sub에 넣을 수 있는 메세지는 간단하다. String 형태의 메세지를 넣을 수 있으며, 메세지의 크기는 base64-encoding이 된 기준으로 최대 10M까지 지원이 된다.

메세지는 Message 와, Message Attribute  두가지 블럭으로 구분된다. 비교해서 이해하자면 Message 는 HTTP BODY, Message Attribute는 HTTP Header와 같은 개념으로 생각하면 되는데, Message는 통째로 TEXT가 들어가고, Message Attribute는 Key/Value 형태로 각각의 필드가 들어간다.  

생명주기 및 재처리 정책

메세지 생명 주기가 재미있는데, 먼저 Push로 받거나 Pull로 받은 메세지는 큐에서는 일단은 보이지 않는다. (다시 가지고 올 수 없다는 이야기). 메세지 처리가 끝난 후에는 클라이언트는 Pub/Sub으로 Acknowlege를 보내야 하는데, 만약에 정해진 시간 (이를 message acknowlegement deadline이라고 하고 디폴트는 10초)내에 ack를 주지 않으면, 그 메세지는 다시 Pub/Sub으로 들어간다.  이 acknowlegement를 통해서 메세지 전달 보장이 가능하다.


다시 Pub/Sub으로 돌아간 메세지는 Publishing time으로 부터 최대 7일까지 보관이 되서 클라이언트에서 다시  읽어드릴 수 있다.


https://cloud.google.com/pubsub/subscriber#ack_deadline


순서 보장

Pub/Sub 큐에 들어온 메세지는 Consumer에서 읽어드릴때, Pub/Sub에서 보낸 순서대로 읽을 수 없고, 랜덤한 순서로 전달된다. 즉 전달 순서 보장이 되지 않는다. 이는 Pub/Sub이 기본적으로 분산형 아키텍쳐를 띄고 있기 때문에, 내부에 어떤 노드로 데이타가 전달되는지, 그리고 각 노드중 어느 노드에서 데이타를 읽는지 예측이 불가능하기 때문이다.

메세지 전달 방식 (Message delivery type)

Pub/Sub은 일반적은 큐 시스템과 다르게 메세지를 Subscriber가 읽어오는 Pull 방식 이외에, Pub/Sub이 직접 Subscriber에게 메세지를 쏴주는 Push 방식을 같이 지원한다.

Pub/Sub 테스트 하기

대략적인 개념 이해가 끝났으면, 이제 실제 테스트를 해보자

Topic 생성하기

구글 클라우드 콘솔에서 Pub/Sub을 선택한 후, 아래 그림과 같이 메뉴에 들어오면, Create Topic 메뉴를 선택하여 Pub/Sub Topic을 생성한다.


여기서는 아래 그림과 같이 “mytopic”이라는 이름으로 토픽을 생성하겠다.


토픽명은 “projects/{프로젝트명}/topcis/{토픽명}” 식으로 생성된다. 이 예제에서 사용한 프로젝트명은 terrycho-sandbox이고, 지정한 토픽명은 “mytopic”이기 때문에, topic의 전체 이름은 “projects/terrycho-sandbox/topcis/mytopic”이 된다.

Subscription 생성하기

이제 앞에서 생성한 Topic으로 부터 메세지를 읽어드리기 위해서 Subscription을 생성해보자.

Pub/Sub 메뉴에서 아래와 같이 앞서 생성한 Topic을 확인할 수 있다. 이 메뉴에서 생성한 Topic의 “+New subscrition”이라는 버튼을 선택하면 새로운 Subscription을 생성할 수 있다.


아래 그림과 같이 subscription 생성화면에서 subscription 이름을  mysubscription으로 지정하자.

topic과 마찬가지로 subscription의 full name 역시 “projects/{프로젝트명}/subscriptions/{서브스크립션명}” 이 된다.


그리고 Delivery type (메세지 전달 방식)은 Pull을 선택한다.

아래 그림과 같이 Advanced option에서  Acknowlegement deadline을 설정할 수 있는데, 건들지 말고 디폴트 10초로 놔둔다.



메시지 보내보기

메세지를 보내는 테스트를 하기 위해서는 클라우드 콘솔 Pub/Sub 메뉴에서 앞에서 생성한 Topic을 선택하면 아래 그림과 같이 “Publish” 버튼이 나온다.


Publish 버튼을 누르면 아래와 같이 메세지를 직접 입력할 수 있는 창이 나온다.


위의 그림과 같이 Message 창에 보내고 싶은 메세지를 적고 Publish버튼을 누르면 Pub/Sub에 메세지가 퍼블리슁 된다.

보낸 메세지 읽어드리기

이제 퍼블리슁된 메세지를 읽어보자. 메세지는 gcloud라는 구글 클라우드 클라이언트를 이용해서 할것인데, 설치 방법은 https://cloud.google.com/sdk/gcloud/ 를 참고하면된다.

설치가 귀찮은 경우에는 아래 그림과 같이 구글 클라우드 콘솔의 상단 부분에 우측에 “>.” 이렇게 생긴 아이콘을 누르면 된다.



클라우드 쉘이라는 것인데, 구글 클라우드에 대해서 Command Line으로 명령을 내릴 수 있는 기본 명령어들이 미리 깔려있는 Linux 접속창이다.



pub/sub 은 아직 알파 버전이기 때문에, gcloud를 업그레이드 해서 alpha 버전 명령어가 수행이 가능하도록 해야 한다.

다음 명령어를 실행하자

%gcloud components install alpha

이제 gcloud 명령어가 업데이트 되었다. 이제 Pub/Sub topic에서 데이타를 읽어와보자

다음 명령어를 실행하면 mysubscrition에서 메세지를 읽어올 수 있다.

gcloud alpha pubsub subscriptions pull projects/terrycho-sandbox/subscriptions/mysubscription

다음은 명령을 실행해서 데이타를 읽어온 결과 이다.




10초 정도 후에 같은 명령어 실행해보면 같은 메세지가 리턴되는 것을 볼 수 있는데, 이는 ack를 주지 않았기 때문이다. gcloud에서 자동으로 ack를 보내는 방법은 명령어에 --auto-ack라는 옵션을 추가하면 된다.

옵션을 추가하고 명령을 실행해보자

gcloud alpha pubsub subscriptions pull projects/terrycho-sandbox/subscriptions/mysubscription --auto-ack

아래 결과와 같이, 첫번째 실행에서는 메세지가 도착하지만, 두번째 실행에서는 같은 메세지가 도착 하지 않는 것을 확인할 수 있다.


이 밖에도 gcloud 명령으로 하나의 메세지 뿐 아니라 한번에 여러개의 메세지를 리턴받을 수 도 있고, 여러개의 메세지를  pagination을 통해서 리턴 받을 수 도 있다. 자세한 옵션은 https://cloud.google.com/sdk/gcloud/reference/alpha/pubsub/subscriptions/pull 를 참고하기 바란다.


클라우드 웹 콘솔과, gcloud 명령어를 이용해서, 메세지를 퍼블리슁하고 읽어들이 것을 알아보았다. 다음 글에서는 실제로 SDK를 이용해서 메세지를 퍼블리슁하고 읽어들이는 예제를 소개하도록 하겠다.

분산 대용량 큐-Apache Kafka에 대한 검토 내용 정리


실시간 빅데이타 분석 아키텍쳐를 검토하다가 아파치 스톰을 보다보니, 실시간 데이타 스트림은 큐를 이용해서 수집하는 경우가 많은데, 데이타의 양이 많다 보니 기존의 큐 솔루션으로는 한계가 있어서 분산 대용량 큐로 아파치 카프카(Kafka)가 많이 언급된다.

그래서, 아키텍쳐를 대략 보고, 실효성에 대해서 고민을 해봤는데, 큐의 기능은 기존의 JMS나 AMQP 기반의 RabbitMQ(데이타 기반 라우팅,페데레이션 기능등)등에 비해서는 많이 부족하지만 대용량 메세지를 지원할 수 있는 것이 가장 큰 특징이다. 특히 분산 환경에서 용량 뿐 아니라, 복사본을 다른 노드에 저장함으로써 노드 장애에 대한 장애 대응 성을 가지고 있기 때문에 용량에는 확실하게 강점을 보인다.

실제로 마이크로소프트 社의 엔지니어가 쓴 논문을 보면http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

 


카프카의 경우 10만 TPS 이상의 성능을 RabbitMQ는 2만 TPS 정도의 성능을 내는 것으로 나와 있는데, 여기서 생각해볼 문제가 큐는 비동기 처리 솔루션이다. 즉 응답 시간에 그렇게 민감 하지 않다는 것이다.

그리고 일반적인 웹 시스템의 성능이 1500~2000 TPS (엔터프라이즈 시스템의 경우) 내외인 것이 일반적이기 때문에, Rabbit MQ의 2만 TPS의 성능은 충분하다고 볼 수 있지 않을까 한다.

물론 네이버나 해외의 대형 SNS 서비스의 경우에는 충분히 저정도의 용량이 필요하겠지만, 현재로써는 일반적인 시스템에서는 카프카의 용량과 성능은 약간 오버 디자인이 아닌가 하는 생각이 든다.


Rabbit MQ is scalable and




RabbitMQ - Receive Message

클라우드 컴퓨팅 & NoSQL/RabbitMq | 2013.08.27 22:23 | Posted by 조대협

※ simplequeue 라는 이름으로 큐를 먼저 만들고 시작할것


package com.terry.rabbitmq.queue;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

 

public class MessageReceiver {

       

        Logger log = LoggerFactory.getLogger(MessageReceiver.class);

 

        public String receive(String uri,String queue) throws Exception{

               ConnectionFactory factory = new ConnectionFactory();

               factory.setUri(uri);

              

               log.debug("Connect to :"+uri);

              

               Connection conn = factory.newConnection();

               Channel channel = conn.createChannel();     

               String msg = null;

               try{

                       QueueingConsumer consumer = new QueueingConsumer(channel);

                       channel.basicQos(1);

                       channel.basicConsume(queue,false,consumer);

                       log.debug("Reading msg from (queue:"+queue+")");

                      

                       QueueingConsumer.Delivery delivery = consumer.nextDelivery();

                       msg = new String(delivery.getBody());

                       channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

                       log.debug("Recieve message from (queue:"+queue+"):"+msg);

                      

               }catch(Exception e){

                       e.printStackTrace();

                       throw(e);

               }finally{

                       channel.close();

                       conn.close();

               }

               return msg;

        }

}


단위테스트 코드

package com.terry.rabbitmq.queue.test;

 

import static org.junit.Assert.assertNotNull;

 

import org.junit.Test;

 

import com.terry.rabbitmq.queue.MessageReceiver;

 

 

public class QueueReceiverTest {

        final static String host = "127.0.0.1";

        final static String vhost = "";

        final static int port = 5672;

        final static String user = "rabbitmq";

        final static String password = "rabbitmq";

        final static String queue = "simplequeue";

       

        @Test

        public void MessageReceiverTest() throws Exception{

               MessageReceiver receiver = new MessageReceiver();

               String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;

               String msg = receiver.receive(uri, queue);

               assertNotNull(msg);

        }

       

 

       

}

 

 

'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27

RabbitMQ - Send Message

클라우드 컴퓨팅 & NoSQL/RabbitMq | 2013.08.27 22:21 | Posted by 조대협

※ simplequeue 라는 이름으로 rabbitmq 콘솔에서 먼저 큐를 만들고 시작할것





package com.terry.rabbitmq.queue;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

 

public class MessageSender {

 

 

       

        Logger log = LoggerFactory.getLogger(MessageSender.class);

 

        public boolean sendMessage(String uri, String queue,String msg) throws Exception{

               ConnectionFactory factory = new ConnectionFactory();

               factory.setUri(uri);

              

               log.debug("Connect to :"+uri);

              

               Connection conn = factory.newConnection();

               Channel channel = conn.createChannel();

               try{

                       byte[] messageBodyBytes = msg.getBytes();

                       log.debug("Send msg (queue:"+queue+") msg:"+msg);

                       channel.basicPublish("", queue, null, messageBodyBytes);

               }catch(Exception e){

                       e.printStackTrace();

                       return false;

               }finally{

                       channel.close();

                       conn.close();

               }

               return true;

        }

 

}



다음은 단위 테스트 코드

package com.terry.rabbitmq.queue.test;

 

import static org.junit.Assert.assertTrue;

 

import org.junit.Test;

 

import com.terry.rabbitmq.queue.MessageSender;

 

 

public class QueueSenderTest {

        final static String host = "127.0.0.1";

        final static String vhost = "";

        final static int port = 5672;

        final static String user = "rabbitmq";

        final static String password = "rabbitmq";

        final static String queue = "simplequeue";

       

        @Test

        public void MessageSenderTest() throws Exception{

               MessageSender sender = new MessageSender();

               String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;

               String msg = "hello world" + System.currentTimeMillis();

              

               for(int i=0;i<1000;i++)

               assertTrue(sender.sendMessage(uri, queue, msg));

        }

       

 

       

}

 

'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes

RabitMQ가 Amazon SQS에 비해 40배 가량 빠름.
AMQ나 RabitMQ를 클라우드에서 서비스하기 위해서는
1. SQS나 Azure Queue Service와의 호환성 문제 --> jCloud등 사용
2. Multitanent 문제 --> Pending Message가 장애를 발생시켜서 다른 업무나 사용자에게 방해를 줄 수 있다.
3. Global Scale Deployment --> DR과 Data Center간 Synchroization

이거 재미는 있겠는데.... 난이도가 무지 높겠다.. ROI가 쉽지 않겠어