클라우드 컴퓨팅 & NoSQL/RabbitMq

RabbitMQ multi threaded read message consumer

Terry Cho 2013. 8. 27. 22:26



package com.terry.rabbitmq.queue.threadpool;

 

import java.io.IOException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import com.rabbitmq.client.QueueingConsumer;

 

public class QueueListener {

 

        Logger log = LoggerFactory.getLogger(QueueListener.class);

 

        public void invoke(String uri, String queue, int maxthread ) throws Exception{

              

               log.info("QueueListener has been started");

               ConnectionFactory factory = new ConnectionFactory();

               factory.setUri(uri);

              

               ExecutorService es = Executors.newFixedThreadPool(maxthread);

               Connection conn = factory.newConnection(es);

              

    // Thread 당 다른 Channel 을 사용하기 위해서 Thread수 만큼 별도의 채널을 생성하낟.

               for(int i=0;i<maxthread;i++){

                       Channel channel = conn.createChannel();     

                       channel.basicQos(1);

                       channel.basicConsume(queue,false,new MyQueueConsumer(channel));

               }

               log.info("Invoke "+maxthread+" thread and wait for listening");

 

 

              

        } //invoke

       

        final static String host = "127.0.0.1";

        final static String vhost = "";

        final static int port = 5672;

        final static String user = "rabbitmq";

        final static String password = "rabbitmq";

        final static String queue = "simplequeue";

       

        public static void main(String args[]) throws Exception{

               QueueListener ql = new QueueListener();

               String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;

               ql.invoke(uri, "simplequeue", 5);

        }

}


package com.terry.rabbitmq.queue.threadpool;

 

import java.io.IOException;

import java.util.UUID;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

 

public class MyQueueConsumer extends DefaultConsumer {

       

        Logger log = LoggerFactory.getLogger(MyQueueConsumer.class);

        Channel channel;

        public MyQueueConsumer(Channel channel) {

               super(channel);

               // TODO Auto-generated constructor stub

               this.channel = channel;

        }

 

        @Override

        public void handleDelivery(String consumeTag,

                                   Envelope envelope,

                                   AMQP.BasicProperties properties,

                                                        byte[] body)

               throws IOException

        {

               String routingKey = envelope.getRoutingKey();

               String contentType = properties.getContentType();

               long deliveryTag = envelope.getDeliveryTag();

              

               // message handling logic here

               String msg = new String(body);

               UUID uuid = UUID.randomUUID();

               log.debug(uuid+" S Channel :"+channel+" Thread:"+Thread.currentThread()+" msg:"+msg);

              

               // multiple - false if we are acknowledging multiple messages with the same delivery tag

               this.channel.basicAck(deliveryTag, false);

        }

}

 

 


'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글

RabbitMQ 기본 기동  (1) 2014.01.02
RabbitMQ 공부 노트  (0) 2013.09.03
RabbitMQ + Spring  (0) 2013.08.27
RabbitMQ - Receive Message  (0) 2013.08.27
RabbitMQ - Send Message  (0) 2013.08.27