블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

구글 클라우드의 대용량 분산 큐 서비스인 Pub/Sub 소개 #2


node.js를 통하여 메세지를 보내고 받기

조대협 (http://bcho.tistory.com)


node.js에서 메세지 보내고 받기


이번 글에서는 node.js를 이용하여 실제로 pub/sub에 메세지를 보내고 받도록 해보자


키 파일 준비 하기

Pub/Sub에 접속하기 위해서는 보안 인증을 위해서 키 파일이 필요하다.

키 파일은 구글 클라우드 콘솔에서, API manager 메뉴로 들어가서 Credential 부분에서 Create Credential을 선택하면 아래와 같은 화면이 나온다.

다음으로, 메뉴에서 Service account key를 선택하여 키를 생성한다.


키가 생성이 되면 json 파일로 다운로드가 된다.

여기서는 편의상 키 파일명을 “pubsub-key.json”이라고 하겠다.


메세지 보내기

node.js를 이용해서 메세지를 보내보자. 먼저 코드를 보자


var gcloud = require('gcloud');

var pubsub = gcloud.pubsub({

projectId:'terrycho-sandbox',

keyFilename: '/Users/terrycho/keys/pubsub-key.json'

});


var topic = pubsub.topic('projects/terrycho-sandbox/topics/repository-changes.default');


for(var i=0;i<3;i++){

topic.publish({

data:{

userId:process.argv[2]+i,

name:'terry.cho'

}

},function(err){

if(err != null) console.log("Error :"+err);

});

};


pub/sub을 사용하려면 구글 클라우드 라이브러리인 gcloud 모듈이 필요하다.

명령어 창에서 npm install gcloud를 이용해서, gcloud모듈을 먼저 인스톨 해놓자.

다음으로, gcloud라이브러리에서 pubsub 객체를 만든다. 여기서는 projectId와, keyFilename을 지정한다.

projectId는 사용하고자 하는 본인의 구글 프로젝트 ID를 넣으면 되고 (여기서는 ‘terrycho-sandbox’), 키 파일은 앞에서 준비한 키 JSON 파일의 경로를 설정하면 된다.


pubsub 객체가 생성되었으면, 메세지를 보낼 topic을 가져와야 한다. topic은 다음과 같은 pubsub.topic메서드를 이용해서 불러올 수 있다. 이때, topic의 경로를 아래와 같이 적어준다.

var topic = pubsub.topic('projects/terrycho-sandbox/topics/repository-changes.default');

여기서 사용한 topic은 projects/terrycho-sandbox/topics/repository-changes.default 이다.

topic을 받아왔으면, 이 topic에 실제로 메세지를 publish하면 되는데, topic.publish( {메세지},{error callback 함수}); 형태로 지정하면 된다.


pub/sub은 앞의 글에서 설명한바와 같이, message와, message attribute 두가지로 분리가 되는데,

message는

data :{

// 여기에 메세지 정의

}


형태로 정의해서 전달하고,message attribute는

attributes:{

  key1:’value1’,

  key2:’value2’
}


형태로 전달한다.

실제 사용예를 보면 다음과 같다.

var registerMessage = {
 data: {
   userId: 3,
   name: 'Stephen',
   event: 'new user'
 },
 attributes: {
   key: 'value',
   hello: 'world'
 }
};


위의 보내기 예제에서는 userId와, name 필드 두개만, 메세지로 3번 보내도록 하였다.

data:{

userId:process.argv[2]+i,

name:'terry.cho'

}

메세지 받기

메세지를 전달하였으면 이제 메세지를 읽어보도록 한다. 전체 코드는 다음과 같다.


var gcloud = require('gcloud');

var pubsub = gcloud.pubsub({

projectId:'terrycho-sandbox',

keyFilename: '/Users/terrycho/keys/pubsub-key.json'

});


var topic = pubsub.topic('projects/terrycho-sandbox/topics/repository-changes.default');

var options = {

 reuseExisting: true, // if the subscription is already exist reuse subscription, option is not changed

 interval:10,

 maxInProgress:5,

 autoAck:false

};


topic.subscribe('nodejs-subscription',options,function(err,subscription,apiResponse){

if(err != null){

console.log('subscription creation failed :'+err);

exit(1);

}

console.log('Subscription :'+subscription);

subscription.on('error',function(err){

console.log('error:'+err);

});


subscription.on('message',function(message){

// read message from queue

console.log(message);

// send ack

subscription.ack(message.ackId,function(err,apiResponse){

console.log('info:sent ack');

});

});

});

topic을 생성하는 것까지는 앞의 메세지 보내기 부분의 코드와 같다.

topic으로 부터 메세지를 받기 위해서, subscription에 대한 옵션을 설정한다.


var options = {

 reuseExisting: true,

 interval:10,

 maxInProgress:5,

 autoAck:false

};

reuseExisting은 별도로 subscription을 생성하지 않고, 기존의 subscription을 사용한다. 이 경우에는, 기존 subscription의 옵션이 그대로 적용되며, 새로운 option이 적용되지 않는다.

interval은 10초 단위로 subscription을 polling 하는 것이고, maxInProgress는 한번에 읽어올 수 있는 메세지 수를 정의한다.

autoAck는 메세지를 보낸 후에, 자동으로 ack를 보내는 옵션인데, 여기서는 false로 하였기 때문에, 수동으로 ack를 보내야 한다.


옵션을 정의하였으면 아래와 같이 topic에 대한 subscription에 대해서, 메세지를 subscription 한다.

topic.subscribe('nodejs-subscription',options,function(err,subscription,apiResponse){

‘nodejs-subscription’은 subscription 이름이고, options는 subscription에 대한 옵션 그리고 마지막은 콜백함수 있다.

메세지를 받았을때 처리 방법을 정의해야 하는데, 앞의 콜백 함수에서 전달되는 subscription객체에 “message”라는 이벤트에 대해서 핸들러를 작성하면 메세지를 받을 수 있다.


아래는 핸들러 코드이다.

subscription.on('message',function(message){

// read message from queue

console.log(message);

// send ack

subscription.ack(message.ackId,function(err,apiResponse){

console.log('info:sent ack');

});

});


메세지가 들어오면 console에 메세지를 출력하고, subscription.ack를 이용하여 ack를 보낸다. 이때, 메세지에서 들어오는 message.ackId를 인자로 하여, 그 메세지에 대해서 ack를 보낸다.


다음은 명령어를 실행하여 메세지를 보내는 실행화면이다.


그리고 다음은 메세지를 받는 프로그램을 수행하여 실제로 메세지를 받은 결과 화면이다.




참고

  • node.js pub/sub 라이브러리 레퍼런스 https://googlecloudplatform.github.io/gcloud-node/#/docs/v0.36.0/pubsub


분산 대용량 큐-Apache Kafka에 대한 검토 내용 정리


실시간 빅데이타 분석 아키텍쳐를 검토하다가 아파치 스톰을 보다보니, 실시간 데이타 스트림은 큐를 이용해서 수집하는 경우가 많은데, 데이타의 양이 많다 보니 기존의 큐 솔루션으로는 한계가 있어서 분산 대용량 큐로 아파치 카프카(Kafka)가 많이 언급된다.

그래서, 아키텍쳐를 대략 보고, 실효성에 대해서 고민을 해봤는데, 큐의 기능은 기존의 JMS나 AMQP 기반의 RabbitMQ(데이타 기반 라우팅,페데레이션 기능등)등에 비해서는 많이 부족하지만 대용량 메세지를 지원할 수 있는 것이 가장 큰 특징이다. 특히 분산 환경에서 용량 뿐 아니라, 복사본을 다른 노드에 저장함으로써 노드 장애에 대한 장애 대응 성을 가지고 있기 때문에 용량에는 확실하게 강점을 보인다.

실제로 마이크로소프트 社의 엔지니어가 쓴 논문을 보면http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

 


카프카의 경우 10만 TPS 이상의 성능을 RabbitMQ는 2만 TPS 정도의 성능을 내는 것으로 나와 있는데, 여기서 생각해볼 문제가 큐는 비동기 처리 솔루션이다. 즉 응답 시간에 그렇게 민감 하지 않다는 것이다.

그리고 일반적인 웹 시스템의 성능이 1500~2000 TPS (엔터프라이즈 시스템의 경우) 내외인 것이 일반적이기 때문에, Rabbit MQ의 2만 TPS의 성능은 충분하다고 볼 수 있지 않을까 한다.

물론 네이버나 해외의 대형 SNS 서비스의 경우에는 충분히 저정도의 용량이 필요하겠지만, 현재로써는 일반적인 시스템에서는 카프카의 용량과 성능은 약간 오버 디자인이 아닌가 하는 생각이 든다.


Rabbit MQ is scalable and




http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes

RabitMQ가 Amazon SQS에 비해 40배 가량 빠름.
AMQ나 RabitMQ를 클라우드에서 서비스하기 위해서는
1. SQS나 Azure Queue Service와의 호환성 문제 --> jCloud등 사용
2. Multitanent 문제 --> Pending Message가 장애를 발생시켜서 다른 업무나 사용자에게 방해를 줄 수 있다.
3. Global Scale Deployment --> DR과 Data Center간 Synchroization

이거 재미는 있겠는데.... 난이도가 무지 높겠다.. ROI가 쉽지 않겠어