블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 


구글 클라우드의 대용량 메세지 큐 Pub/Sub 소개

조대협 (http://bcho.tistory.com)




구글 클라우드의 Pub/Sub 은 클라우드 기반의 대용량 메세지 큐이다.

흔히들 사용하는 RabbitMQ, JMS나 Kafka의 클라우드 버전으로 보면 된다. Rabbit MQ와 같은 설치형 큐가 작은 메세지에 대해서 세심한 컨트롤을 제공한다고 하면, Kafka나 Pub/Sub은 대용량 스케일의 메세지를 처리하기 위해서 설계 되었고, 자잘한 기능보다는 용량에 목적을 둔다.

그중에서 Pub/Sub은 클라우드 기반의 서비스로 비동기 메세징이 필요한 기능을 매니지드 서비스 형태로 제공함으로써, 별도의 설치나 운영이 필요 없이 손쉽게, 사용이 가능하다.

보통 특정 클라우드 벤더의 매지니드 솔루션은 Lock in 이슈 (한번 개발하면 다른 플랫폼으로 옮기기가 어려운)가 있어서 쉽사리 권하기는 어렵지만, 사용법이 간단하고 Lock in을 감수하고도 기능이 막강한 서비스나, 타 서비스로 전환이 쉬운 서비스일 경우에는 적극적으로 권장하는 편이다.


Pub/Sub의 경우 대용량 큐 서비스이기 때문에 Kafka 처럼 설치나 운영이 필요없음에도 불구하고 대용량 처리를 지원하면서 사용이 매우 쉽고, 코딩 양이 매우 적어서 차후에 다른 솔루션으로 교체가 용이하고 또한 대용량 장점과, 운영 대행의 장점으로 Lock in에 대한 단점을 충분히 커버하리라고 본다.

특징

주요 특징을 살펴보면, 글로벌 스케일 큐로, 전세계 어느 데이타 센터에서 접속하던지 구글 자체 광케이블망을 이용하여 빠른 접근을 제공한다.

메세지 전달 보장을 기능이 있으며, 큐에서 메세지를 PULLING 하는 기능뿐만 아니라, 큐가 메세지를 받는 쪽으로 HTTP를 이용하여 PUSH 해줄 수 있다.

토폴로지

구글 Pub/Sub 은 Message Provider (보내는쪽)과 Message Consumer (받는쪽)이 1:1 관계가 아니라. 1:N 관계이다.


Pub/Sub에는 Topic과 Subscription이라는 개념이 존재하는데,  Topic 을 큐로 생각하면 된다.

Message Provider가 Topic으로 메세지를 보내게 되고, 메세지를 읽으려면 Subscription 이라는 구독 채널을 설정해야 한다. Subscription은 하나의 Topic에 대해서 1..N개가 생성될 수 있다.

클라이언트는 각각의 Subscription에 붙어서 메세지를 받을 수 있다.

예를 들어서 하나의 메세지를 로그 시스템과 데이타 베이스 양쪽에 저장하고 싶을때는 Topic을 만든 후에, 로그 시스템용 Subscription, 데이타 베이스용 Subscription을 각각 만들어서 데이타를 읽으면 된다.


클라이언트 인터페이스

구글 Pub/Sub의 연동은 크게 다음과 같이 3가지 방법으로 접근이 가능하다.

메세지 구조와 생명 주기

Pub/Sub에 넣을 수 있는 메세지는 간단하다. String 형태의 메세지를 넣을 수 있으며, 메세지의 크기는 base64-encoding이 된 기준으로 최대 10M까지 지원이 된다.

메세지는 Message 와, Message Attribute  두가지 블럭으로 구분된다. 비교해서 이해하자면 Message 는 HTTP BODY, Message Attribute는 HTTP Header와 같은 개념으로 생각하면 되는데, Message는 통째로 TEXT가 들어가고, Message Attribute는 Key/Value 형태로 각각의 필드가 들어간다.  

생명주기 및 재처리 정책

메세지 생명 주기가 재미있는데, 먼저 Push로 받거나 Pull로 받은 메세지는 큐에서는 일단은 보이지 않는다. (다시 가지고 올 수 없다는 이야기). 메세지 처리가 끝난 후에는 클라이언트는 Pub/Sub으로 Acknowlege를 보내야 하는데, 만약에 정해진 시간 (이를 message acknowlegement deadline이라고 하고 디폴트는 10초)내에 ack를 주지 않으면, 그 메세지는 다시 Pub/Sub으로 들어간다.  이 acknowlegement를 통해서 메세지 전달 보장이 가능하다.


다시 Pub/Sub으로 돌아간 메세지는 Publishing time으로 부터 최대 7일까지 보관이 되서 클라이언트에서 다시  읽어드릴 수 있다.


https://cloud.google.com/pubsub/subscriber#ack_deadline


순서 보장

Pub/Sub 큐에 들어온 메세지는 Consumer에서 읽어드릴때, Pub/Sub에서 보낸 순서대로 읽을 수 없고, 랜덤한 순서로 전달된다. 즉 전달 순서 보장이 되지 않는다. 이는 Pub/Sub이 기본적으로 분산형 아키텍쳐를 띄고 있기 때문에, 내부에 어떤 노드로 데이타가 전달되는지, 그리고 각 노드중 어느 노드에서 데이타를 읽는지 예측이 불가능하기 때문이다.

메세지 전달 방식 (Message delivery type)

Pub/Sub은 일반적은 큐 시스템과 다르게 메세지를 Subscriber가 읽어오는 Pull 방식 이외에, Pub/Sub이 직접 Subscriber에게 메세지를 쏴주는 Push 방식을 같이 지원한다.

Pub/Sub 테스트 하기

대략적인 개념 이해가 끝났으면, 이제 실제 테스트를 해보자

Topic 생성하기

구글 클라우드 콘솔에서 Pub/Sub을 선택한 후, 아래 그림과 같이 메뉴에 들어오면, Create Topic 메뉴를 선택하여 Pub/Sub Topic을 생성한다.


여기서는 아래 그림과 같이 “mytopic”이라는 이름으로 토픽을 생성하겠다.


토픽명은 “projects/{프로젝트명}/topcis/{토픽명}” 식으로 생성된다. 이 예제에서 사용한 프로젝트명은 terrycho-sandbox이고, 지정한 토픽명은 “mytopic”이기 때문에, topic의 전체 이름은 “projects/terrycho-sandbox/topcis/mytopic”이 된다.

Subscription 생성하기

이제 앞에서 생성한 Topic으로 부터 메세지를 읽어드리기 위해서 Subscription을 생성해보자.

Pub/Sub 메뉴에서 아래와 같이 앞서 생성한 Topic을 확인할 수 있다. 이 메뉴에서 생성한 Topic의 “+New subscrition”이라는 버튼을 선택하면 새로운 Subscription을 생성할 수 있다.


아래 그림과 같이 subscription 생성화면에서 subscription 이름을  mysubscription으로 지정하자.

topic과 마찬가지로 subscription의 full name 역시 “projects/{프로젝트명}/subscriptions/{서브스크립션명}” 이 된다.


그리고 Delivery type (메세지 전달 방식)은 Pull을 선택한다.

아래 그림과 같이 Advanced option에서  Acknowlegement deadline을 설정할 수 있는데, 건들지 말고 디폴트 10초로 놔둔다.



메시지 보내보기

메세지를 보내는 테스트를 하기 위해서는 클라우드 콘솔 Pub/Sub 메뉴에서 앞에서 생성한 Topic을 선택하면 아래 그림과 같이 “Publish” 버튼이 나온다.


Publish 버튼을 누르면 아래와 같이 메세지를 직접 입력할 수 있는 창이 나온다.


위의 그림과 같이 Message 창에 보내고 싶은 메세지를 적고 Publish버튼을 누르면 Pub/Sub에 메세지가 퍼블리슁 된다.

보낸 메세지 읽어드리기

이제 퍼블리슁된 메세지를 읽어보자. 메세지는 gcloud라는 구글 클라우드 클라이언트를 이용해서 할것인데, 설치 방법은 https://cloud.google.com/sdk/gcloud/ 를 참고하면된다.

설치가 귀찮은 경우에는 아래 그림과 같이 구글 클라우드 콘솔의 상단 부분에 우측에 “>.” 이렇게 생긴 아이콘을 누르면 된다.



클라우드 쉘이라는 것인데, 구글 클라우드에 대해서 Command Line으로 명령을 내릴 수 있는 기본 명령어들이 미리 깔려있는 Linux 접속창이다.



pub/sub 은 아직 알파 버전이기 때문에, gcloud를 업그레이드 해서 alpha 버전 명령어가 수행이 가능하도록 해야 한다.

다음 명령어를 실행하자

%gcloud components install alpha

이제 gcloud 명령어가 업데이트 되었다. 이제 Pub/Sub topic에서 데이타를 읽어와보자

다음 명령어를 실행하면 mysubscrition에서 메세지를 읽어올 수 있다.

gcloud alpha pubsub subscriptions pull projects/terrycho-sandbox/subscriptions/mysubscription

다음은 명령을 실행해서 데이타를 읽어온 결과 이다.




10초 정도 후에 같은 명령어 실행해보면 같은 메세지가 리턴되는 것을 볼 수 있는데, 이는 ack를 주지 않았기 때문이다. gcloud에서 자동으로 ack를 보내는 방법은 명령어에 --auto-ack라는 옵션을 추가하면 된다.

옵션을 추가하고 명령을 실행해보자

gcloud alpha pubsub subscriptions pull projects/terrycho-sandbox/subscriptions/mysubscription --auto-ack

아래 결과와 같이, 첫번째 실행에서는 메세지가 도착하지만, 두번째 실행에서는 같은 메세지가 도착 하지 않는 것을 확인할 수 있다.


이 밖에도 gcloud 명령으로 하나의 메세지 뿐 아니라 한번에 여러개의 메세지를 리턴받을 수 도 있고, 여러개의 메세지를  pagination을 통해서 리턴 받을 수 도 있다. 자세한 옵션은 https://cloud.google.com/sdk/gcloud/reference/alpha/pubsub/subscriptions/pull 를 참고하기 바란다.


클라우드 웹 콘솔과, gcloud 명령어를 이용해서, 메세지를 퍼블리슁하고 읽어들이 것을 알아보았다. 다음 글에서는 실제로 SDK를 이용해서 메세지를 퍼블리슁하고 읽어들이는 예제를 소개하도록 하겠다.


IBM 블루믹스 소개

 

PaaS

IBM 블루믹스는 IBM에서 제공하는 PaaS(Platform As A Service) 클라우드 서비스이다. 아마존과 같은 서비스가 VM을 제공하는 IaaS(Infra as a service)라면, 블루믹스는 node.js, Java와 같은 런타임을 미리 깔아놓고, 거기에 소스코드를 넣어서 돌리는 구조이다. IaaS의 경우 Linux Windows Server와 같은 OS VM 기반으로 제공하기 때문에 직접 미들웨어를 설치해서 사용해야 하지만, PaaS의 경우 이미 설치된 미들웨어 위에 코드만 돌리면되기 때문에, 아무래도 관리가 편리하다. 

그러면 왜 PaaS인가?

얼마전까지만 해도, 개발 트렌드의 중심은 기업체에서 개발하는 B2C서비스였다. 페이스북이나 네이버와 같은 서비스들이 대표적인데, B2C 서비스들은 대용량의 사용자를 커버해야 하고, 세세한 튜닝이나 설정 변경이 필요하고 다소 복잡한 아키텍쳐 구조를 가지기 때문에, 직접 인프라를 세팅하고 미들웨어를 설치하는 것이 오히려 유리했다. 그래서 IaaS를 많이 사용했는데,

근래에 들어서 개발의 중심이 모바일 앱이 되고 스타트업이 중심이 되면서, 적은 인원으로 빠르게 개발하고 관리할 수 있는 플랫폼이 필요하게되었고, 그로 인해서, Google App Engine이나, Heroku와 같은 PaaS 서비스가 각광받게 되었다.

IBM의 블루 믹스는?

지원 플랫폼

블루믹스는 APPS라는 개념을 가지고 있는데 이는 하나의 서비스로 보면된다. 이 안에, 서비스를 기동하기 위한 node.js mongodb와 같은 미들웨어를 묶어서 배포 할 수 있다. 아래 그림은 실제로 Terry라는 App mongodb 서비스를 추가하는 화면이다.



<App 에 추가할 서비스를 선택하는 화면>

매우 편하다. 클릭 몇번만으로, 내가 원하는 플랫폼을 쉽게 설치할 수 있다.

아래 화면은 node.js mongodb,redis로 구성된 서비스 환경이다.



<node.js mongodb,redis로 구성된 App>

현재 지원되는 플랫폼은 Java, Node.JS, Ruby on rials, Ruby Sinatra등을 지원한다.

부가 서비스 들은, 앞에서 언급한 mongodb, redis이외에도, rabbitMQ, IBM MQ, memcached,Work flow engine, Cloudant (CouchDB 계열) 등의 미들웨어 서비스 이외에도 Single Sign On, IOT (사물인터넷)등의 서비스를 부가로 지원한다. (꽤 많음)


코드 저장 및 반영

런타임에 적용되는 코드들은, 블루믹스에서 제공되는 git 저장소를 사용하면 된다.



<블루믹스 git 저장소>

재미있는 것중의 하나는, 웹브라우져상에서 코드 개발 에디터 기능 자체도 제공한다. 아래는 node.js의 코드를 웹 개발환경에서 편집하는 화면이다.



<블루믹스내에서 코드 편집하는 화면>

그리고, Atlassian JIRA와 같은 이슈 트랙킹 시스템을 제공한다. 공동 프로젝트를 관리하기 위해서는 태스크를 관리할 수 있는 시스템이 필요한데, 블루믹스에서는 IBM Jazz를 기반으로한 태스크 관리 시스템을 제공하고 있다. 개인적으로 예전에 Jazz를 사용했을때 상당히 무겁고 복잡하다는 느낌을 받았는데.. 어떤지는 조금 더 써봐야 알 수 있겠다.



<Jazz를 이용한 Task 정의 화면>

블루믹스는 앞에서 본것과 같이 서비스를 제공하기 위한 플랫폼만을 제공하는 것이 아니라, 형상관리,태스크 관리 및 빌드/배포 까지 자동화한 ALM (Application Life cycle management) End2End 기능을 제공한다.


서비스 관리

아래 화면은 node.js의 인스턴스 수를 조정하는 화면인데, 정말 쉽다. 아래 인스턴스 개수 숫자를 올려주면, 그만큼의 인스턴스가 가동되고, 각 인스턴스별 메모리양을 설정할 수 있다.



<그림. Node.js의 인스턴스 수를 조정하는 화면>

좀 특이한 점이 아마존처럼 VM단위로 과금을 하는게 아니라, 나한테 정해진 메모리 용량에 따라서, 이 안에서 인스턴스를 마음대로 만들 수 있는 개념인데, 구체적인 과금에 개념에 대해서는 향후에 조금 더 테스트를 해보고 올리도록 하겠다.

 

지금까지 간략하게나마 IBM PaaS 클라우드 블루믹스에 대해서 알아보았다. 특징은 무엇보다 쉽다!! 이다. 블루믹스 클라우드는 가입하면 무료 평가기간 동안 사용할 수 있으며 다른 클라우드 처럼 신용카드 번호를 넣지 않아도 된다. (URL : https://ace.ng.bluemix.net)

서버 개발 환경이 필요한 사람이 있으면 꼭 한번 사용해보기를 추천한다.

알림 : 본글은 IBM 블루믹스로 부터, 스폰서를 받는 글이 아닙니다!!! 혹시나 오해하지 마시기를..


import pika
import ast
import pymongo
import datetime
import logging
import time
import sys,traceback,socket,threading
from datetime import datetime
from time import sleep

# configuration
MONGODB_NAME = "terrydb"
HOSTNAME = ':'+socket.gethostname()
QUEUE_NAME = 'hello'
MONGODB_URL= 'mongodb://localhost'
RABBITMQ_URL='localhost'

LOG_FORMAT = ('[%(levelname)s] %(asctime)s %(name)s : %(message)s')
LOGGER = logging.getLogger(__name__)
              
class WorkerThread(threading.Thread):
    def __init__(self,threadID,name,counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
        # init rabbitmq
        # init mongodb

    # make rabbitMQ connection and create channel
    def initRabbitMQ(self):
        self.q_conn = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))
        self.q_channel =self.q_conn.channel()
        self.q_channel.queue_declare(queue=QUEUE_NAME) # create queue
    
    # make mongo db connection
    def initMongoDB(self):
        self.mongo_conn = pymongo.MongoClient(MONGODB_URL)
        self.mongo_db = self.mongo_conn[MONGODB_NAME]

    def onMessage(self,ch,method,properties,body):
        try:
            LOGGER.info(str(self.name)+" recevied "+body)
            #print str(self.name) + "[x] recevied %r" % (body,)
            json_dict = ast.literal_eval(body) # convert string to dictionary
            ## need to be fixed
            ## specify board name here
            self.writeToMongoDB('MYBOARD',json_dict)
        except ValueError:
            print 'String parsing error'
        except:
            print 'unknown error'
            traceback.print_exc(file=sys.stdout)
    
    def writeToMongoDB(self,boardname,post):
        # get board name
        s = self.mongo_db[boardname]
        # generate uuid for the posting
        post['_id'] = self.genPostId()
        try:
            s.insert(post)
        except:
            LOGGER.error(" mongodb insert fail" + str(sys.exc_info()[0]) )
            traceback.print_exc(file=sys.stdout)

    # generate post unique id with
    # format : YYMM{microsecond from this month 1}:{hostname}
    def genPostId(self):
        time.sleep(0.001) # intentionally sleep to remove key duplication
        dt = datetime.now()
        year = str(dt.year)[-2:]
        mon = dt.month
        if mon < 10 :
            mon = '0'+str(mon)
        else:
            mon = str(mon)
        print dt.second
        print dt.microsecond
        uid = year+mon+ str ( int(dt.day * 24 * 60 * 60 + dt.second) * 1000 + dt.microsecond / 1000.0)
        uid = uid + HOSTNAME
        return uid
                
    def run(self):
        LOGGER.info(str(self.name)+" has been started")
        self.initRabbitMQ()
        self.initMongoDB()
        self.q_channel.basic_consume(self.onMessage,queue=QUEUE_NAME,no_ack=True)
        self.q_channel.start_consuming()
        #while 1:
        # time.sleep(0.01)


         
    #savePostToDB('helloboard',j)
                    

def print_usage():
    print 'usage : python worker_multithread {number of thread}'
    exit()
    
def main(argv):
    if len(argv) <2:
        print_usage()
    if argv[1].isdigit() ==False :
        print_usage()
    max_thread = int(argv[1])
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    
    LOGGER.info('Create '+str(max_thread)+' threads ')

    threadList =[]
    for i in range(max_thread):
        t_name = "WorkerThread-"+str(i)
        t = WorkerThread(i,t_name,i)
        t.start()
        threadList.append(t)
    
if __name__ == '__main__':
    main(sys.argv)


RabbitMQ 기본 기동

클라우드 컴퓨팅 & NoSQL/RabbitMq | 2014.01.02 01:32 | Posted by 조대협

1. 설치

- Erlang 설치

- Rabbit MQ 다운로드 후 설치


2. 기본 명령

  • 서버 기동 : sbin/rabbitmq-server start. 윈도우즈에서는 services.msc에서 서비스로 시작
  • 상태 체크 : sbin/rabbitmqctl status
  • 서버 중지 : sbin/rabbitmqctl stop
3. Web Admin 모듈
  • web admin 모듈 enable - "rabbitmq-plugins enable rabbitmq_management" (이다음 서비스를 restart해야 함)
  • http://localhost:15672/ 접속후 guest/guest로 로그인




'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27

RabbitMQ + Spring

클라우드 컴퓨팅 & NoSQL/RabbitMq | 2013.08.27 22:51 | Posted by 조대협

pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

 

  <groupId>com.terry</groupId>

  <artifactId>rabbitmq</artifactId>

  <version>1.0-SNAPSHOT</version>

  <packaging>jar</packaging>

 

  <name>rabbitmq</name>

  <url>http://maven.apache.org</url>

  <repositories>

         <repository>

             <id>spring-release</id>

             <name>Spring Maven Release Repository</name>

             <url>http://repo.springsource.org/libs-release</url>

         </repository>

  </repositories>

 

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

  <dependencies>

         <dependency>

           <groupId>com.rabbitmq</groupId>

           <artifactId>amqp-client</artifactId>

           <version>3.1.4</version>

         </dependency>

         <!--  spring framework -->

    <dependency>

          <groupId>org.springframework</groupId>

          <artifactId>spring-core</artifactId>

          <version>3.2.2.RELEASE</version>

    </dependency>

    <dependency>

          <groupId>org.springframework</groupId>

          <artifactId>spring-context</artifactId>

          <version>3.2.2.RELEASE</version>

    </dependency>

    <dependency>

          <groupId>org.springframework</groupId>

          <artifactId>spring-beans</artifactId>

          <version>3.2.2.RELEASE</version>

    </dependency>

         <!-- spring rabbit mq -->

                  <dependency>

                  <groupId>org.springframework.amqp</groupId>

                  <artifactId>spring-amqp</artifactId>

                  <version>1.2.0.RELEASE</version>

                  <exclusions>

                           <exclusion>

                                   <groupId>com.sun.jmx</groupId>

                                   <artifactId>jmxri</artifactId>

                           </exclusion>

                  </exclusions>

         </dependency>

         <dependency>

                  <groupId>org.springframework.amqp</groupId>

                  <artifactId>spring-rabbit</artifactId>

                  <version>1.2.0.RELEASE</version>

         </dependency>

         <dependency>

                  <groupId>org.springframework.amqp</groupId>

                  <artifactId>spring-erlang</artifactId>

                  <version>1.2.0.RELEASE</version>

         </dependency>

 

        

         <!--  JUnit 4 -->

         <dependency>

                  <groupId>junit</groupId>

                  <artifactId>junit</artifactId>

                  <version>4.10</version>

         </dependency>

                 

         <!--  logging framework -->

         <dependency>

           <groupId>org.slf4j</groupId>

           <artifactId>slf4j-api</artifactId>

           <version>1.7.5</version>

          </dependency>

          <dependency>

           <groupId>ch.qos.logback</groupId>

           <artifactId>logback-classic</artifactId>

           <version>1.0.13</version>

          </dependency>

          <dependency>

           <groupId>ch.qos.logback</groupId>

           <artifactId>logback-core</artifactId>

           <version>1.0.13</version>

          </dependency>

        

  </dependencies>

</project> 


applicationContext.xml

 

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns:rabbit="http://www.springframework.org/schema/rabbit"

    xsi:schemaLocation="http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

 

    <!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->

    <rabbit:connection-factory id="connectionFactory"/>

 

    <!-- Creates a org.springframework.amqp.rabbit.core.RabbitTemplate for access to the broker -->

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

 

    <!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin  to manage exchanges, queues and bindings -->

    <rabbit:admin connection-factory="connectionFactory"/>

 

    <!-- Creates a queue for consumers to retrieve messages -->

    <rabbit:queue name="simplequeue"/>

</beans>


Producer.java

 

package com.terry.rabbitmq.springframework;

 

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

 

public class Producer {

    public static void main(String[] args) {

        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");

        AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);

        amqpTemplate.convertAndSend("simplequeue", "Hello World");

    }

}


Consumer.java

package com.terry.rabbitmq.springframework;

 

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import org.springframework.context.support.GenericXmlApplicationContext;

 

public class Consumer {

    public static void main(String[] args) {

        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");

        AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);

        System.out.println(amqpTemplate.receive("simplequeue"));

        //System.exit(1);

    }

}

 

'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27



package com.terry.rabbitmq.queue.threadpool;

 

import java.io.IOException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import com.rabbitmq.client.QueueingConsumer;

 

public class QueueListener {

 

        Logger log = LoggerFactory.getLogger(QueueListener.class);

 

        public void invoke(String uri, String queue, int maxthread ) throws Exception{

              

               log.info("QueueListener has been started");

               ConnectionFactory factory = new ConnectionFactory();

               factory.setUri(uri);

              

               ExecutorService es = Executors.newFixedThreadPool(maxthread);

               Connection conn = factory.newConnection(es);

              

    // Thread 당 다른 Channel 을 사용하기 위해서 Thread수 만큼 별도의 채널을 생성하낟.

               for(int i=0;i<maxthread;i++){

                       Channel channel = conn.createChannel();     

                       channel.basicQos(1);

                       channel.basicConsume(queue,false,new MyQueueConsumer(channel));

               }

               log.info("Invoke "+maxthread+" thread and wait for listening");

 

 

              

        } //invoke

       

        final static String host = "127.0.0.1";

        final static String vhost = "";

        final static int port = 5672;

        final static String user = "rabbitmq";

        final static String password = "rabbitmq";

        final static String queue = "simplequeue";

       

        public static void main(String args[]) throws Exception{

               QueueListener ql = new QueueListener();

               String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;

               ql.invoke(uri, "simplequeue", 5);

        }

}


package com.terry.rabbitmq.queue.threadpool;

 

import java.io.IOException;

import java.util.UUID;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

 

public class MyQueueConsumer extends DefaultConsumer {

       

        Logger log = LoggerFactory.getLogger(MyQueueConsumer.class);

        Channel channel;

        public MyQueueConsumer(Channel channel) {

               super(channel);

               // TODO Auto-generated constructor stub

               this.channel = channel;

        }

 

        @Override

        public void handleDelivery(String consumeTag,

                                   Envelope envelope,

                                   AMQP.BasicProperties properties,

                                                        byte[] body)

               throws IOException

        {

               String routingKey = envelope.getRoutingKey();

               String contentType = properties.getContentType();

               long deliveryTag = envelope.getDeliveryTag();

              

               // message handling logic here

               String msg = new String(body);

               UUID uuid = UUID.randomUUID();

               log.debug(uuid+" S Channel :"+channel+" Thread:"+Thread.currentThread()+" msg:"+msg);

              

               // multiple - false if we are acknowledging multiple messages with the same delivery tag

               this.channel.basicAck(deliveryTag, false);

        }

}

 

 


'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27

RabbitMQ - Receive Message

클라우드 컴퓨팅 & NoSQL/RabbitMq | 2013.08.27 22:23 | Posted by 조대협

※ simplequeue 라는 이름으로 큐를 먼저 만들고 시작할것


package com.terry.rabbitmq.queue;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

 

public class MessageReceiver {

       

        Logger log = LoggerFactory.getLogger(MessageReceiver.class);

 

        public String receive(String uri,String queue) throws Exception{

               ConnectionFactory factory = new ConnectionFactory();

               factory.setUri(uri);

              

               log.debug("Connect to :"+uri);

              

               Connection conn = factory.newConnection();

               Channel channel = conn.createChannel();     

               String msg = null;

               try{

                       QueueingConsumer consumer = new QueueingConsumer(channel);

                       channel.basicQos(1);

                       channel.basicConsume(queue,false,consumer);

                       log.debug("Reading msg from (queue:"+queue+")");

                      

                       QueueingConsumer.Delivery delivery = consumer.nextDelivery();

                       msg = new String(delivery.getBody());

                       channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

                       log.debug("Recieve message from (queue:"+queue+"):"+msg);

                      

               }catch(Exception e){

                       e.printStackTrace();

                       throw(e);

               }finally{

                       channel.close();

                       conn.close();

               }

               return msg;

        }

}


단위테스트 코드

package com.terry.rabbitmq.queue.test;

 

import static org.junit.Assert.assertNotNull;

 

import org.junit.Test;

 

import com.terry.rabbitmq.queue.MessageReceiver;

 

 

public class QueueReceiverTest {

        final static String host = "127.0.0.1";

        final static String vhost = "";

        final static int port = 5672;

        final static String user = "rabbitmq";

        final static String password = "rabbitmq";

        final static String queue = "simplequeue";

       

        @Test

        public void MessageReceiverTest() throws Exception{

               MessageReceiver receiver = new MessageReceiver();

               String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;

               String msg = receiver.receive(uri, queue);

               assertNotNull(msg);

        }

       

 

       

}

 

 

'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27

RabbitMQ - Send Message

클라우드 컴퓨팅 & NoSQL/RabbitMq | 2013.08.27 22:21 | Posted by 조대협

※ simplequeue 라는 이름으로 rabbitmq 콘솔에서 먼저 큐를 만들고 시작할것





package com.terry.rabbitmq.queue;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

 

public class MessageSender {

 

 

       

        Logger log = LoggerFactory.getLogger(MessageSender.class);

 

        public boolean sendMessage(String uri, String queue,String msg) throws Exception{

               ConnectionFactory factory = new ConnectionFactory();

               factory.setUri(uri);

              

               log.debug("Connect to :"+uri);

              

               Connection conn = factory.newConnection();

               Channel channel = conn.createChannel();

               try{

                       byte[] messageBodyBytes = msg.getBytes();

                       log.debug("Send msg (queue:"+queue+") msg:"+msg);

                       channel.basicPublish("", queue, null, messageBodyBytes);

               }catch(Exception e){

                       e.printStackTrace();

                       return false;

               }finally{

                       channel.close();

                       conn.close();

               }

               return true;

        }

 

}



다음은 단위 테스트 코드

package com.terry.rabbitmq.queue.test;

 

import static org.junit.Assert.assertTrue;

 

import org.junit.Test;

 

import com.terry.rabbitmq.queue.MessageSender;

 

 

public class QueueSenderTest {

        final static String host = "127.0.0.1";

        final static String vhost = "";

        final static int port = 5672;

        final static String user = "rabbitmq";

        final static String password = "rabbitmq";

        final static String queue = "simplequeue";

       

        @Test

        public void MessageSenderTest() throws Exception{

               MessageSender sender = new MessageSender();

               String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;

               String msg = "hello world" + System.currentTimeMillis();

              

               for(int i=0;i<1000;i++)

               assertTrue(sender.sendMessage(uri, queue, msg));

        }

       

 

       

}

 

'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ multi threaded read message consumer  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27