package com.terry.rabbitmq.queue.threadpool;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
public class QueueListener {
Logger log = LoggerFactory.getLogger(QueueListener.class);
public void invoke(String uri, String queue, int maxthread ) throws Exception{
log.info("QueueListener has been started");
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);
ExecutorService es = Executors.newFixedThreadPool(maxthread);
Connection conn = factory.newConnection(es);
// Thread 당 다른 Channel 을 사용하기 위해서 Thread수 만큼 별도의 채널을 생성하낟.
for(int i=0;i<maxthread;i++){
Channel channel = conn.createChannel();
channel.basicQos(1);
channel.basicConsume(queue,false,new MyQueueConsumer(channel));
}
log.info("Invoke "+maxthread+" thread and wait for listening");
} //invoke
final static String host = "127.0.0.1";
final static String vhost = "";
final static int port = 5672;
final static String user = "rabbitmq";
final static String password = "rabbitmq";
final static String queue = "simplequeue";
public static void main(String args[]) throws Exception{
QueueListener ql = new QueueListener();
String uri = "amqp://"+user+":"+password+"@"+host+":"+port;//+"/"+vhost;
ql.invoke(uri, "simplequeue", 5);
}
}
package com.terry.rabbitmq.queue.threadpool;
import java.io.IOException;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MyQueueConsumer extends DefaultConsumer {
Logger log = LoggerFactory.getLogger(MyQueueConsumer.class);
Channel channel;
public MyQueueConsumer(Channel channel) {
super(channel);
// TODO Auto-generated constructor stub
this.channel = channel;
}
@Override
public void handleDelivery(String consumeTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// message handling logic here
String msg = new String(body);
UUID uuid = UUID.randomUUID();
log.debug(uuid+" S Channel :"+channel+" Thread:"+Thread.currentThread()+" msg:"+msg);
// multiple - false if we are acknowledging multiple messages with the same delivery tag
this.channel.basicAck(deliveryTag, false);
}
}
'클라우드 컴퓨팅 & NoSQL > RabbitMq' 카테고리의 다른 글
RabbitMQ 기본 기동 (1) | 2014.01.02 |
---|---|
RabbitMQ 공부 노트 (0) | 2013.09.03 |
RabbitMQ + Spring (0) | 2013.08.27 |
RabbitMQ - Receive Message (0) | 2013.08.27 |
RabbitMQ - Send Message (0) | 2013.08.27 |