블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

빠르게 훝어 보는 node.js - redis 사용하기 (ioredis 클라이언트 버전)


조대협 (http://bcho.tistory.com)


지난 포스팅에서 http://bcho.tistory.com/1098 node.js에서 redis 사용에 있어서 node-redis 클라이언트를 사용했는데, 조금 더 리서치를 해보니, node.js의 redis 클라이언트는 지난번에 포스팅한 node-redis 클라이언트와 ioredis라는 클라이언트가 가장 많이 사용된다. ioredis 클라이언트가 조금 더 최근에 나온 클라이언트인데, https://github.com/luin/ioredis


Bluebird promise 지원, 트렌젝션 지원등 훨씬 더 많은 기능을 제공하고, 사용법이 node-redis와 거의 유사하여 마이그레이션이 어렵지 않다.

아래 코드는 어제 작성 했던 코드를 ioredis 버전으로 변경한것인데, 코드를 보면 변경 내용이 거의 없음을 확인할 수 있다.


mongodb, redis, mysql 지원 모듈을 살펴보다가 느낀건데, 대부분의 모듈들이 Promise를 지원하고, 특히 bluebird를 지원한다는 것이다.

얼마전에 Async framework에 대해서 Async,bluebird, Q등을 고려했는데, 지금까지 인사이트로 봐서는 bluebird를 표준 프레임웍으로 해서 개발하는게 답이 아닐까 한다.


 

// redis example

var Redis = require('ioredis');

var redis = new Redis(6379,'127.0.0.1');

var JSON = require('JSON');

 

app.use(function(req,res,next){

      req.redis = redis;

      next();

});

app.post('/profile',function(req,res,next){

      req.accepts('application/json');

     

      var key = req.body.name;

      var value = JSON.stringify(req.body);

     

      req.redis.set(key,value,function(err,data){

           if(err){

                 console.log(err);

                 res.send("error "+err);

                 return;

           }

           req.redis.expire(key,10);

           res.json(value);

           //console.log(value);

      });

});

app.get('/profile/:name',function(req,res,next){

      var key = req.params.name;

     

      req.redis.get(key,function(err,data){

           if(err){

                 console.log(err);

                 res.send("error "+err);

                 return;

           }

 

           var value = JSON.parse(data);

           res.json(value);

      });

});

 

// catch 404 and forward to error handler

app.use(function(req, res, next) {

  var err = new Error('Not Found');

  err.status = 404;

  next(err);

});

 


Promise를 이용한 node.js에서 콜백헬의 처리


조대협 (http://bcho.tistory.com)


앞의 글(http://bcho.tistory.com/1083) 에서 async 프레임웍을 이용한 콜백헬을 처리 하는 방법에 대해서 알아보았다.

async 프레임웍 이외에, 콜백헬을 해결할 수 있는 프레임웍으로 promise가 있다.

Promise는 원래 콜백헬을 해결하기 위한 프레임웍이 아니라, 프로그래밍 패턴중의 하나로 지연 응답을 통해서 동시성을 제어 하기 위한 목적으로 만들어졌다. 자바스크립트에서는 JqueryDeferred, CommonJS에 구현되어 있고, ECMAScript5 표준에 포함되서 크롬,파이어폭스,인터넷익스플로러 9 버전등에 포함되어 있다.

구현체가 많아서 설치해야 한다.

node.js는 크롬의 자바스크립트 엔진을 기반으로 하기 때문에, promise가 내장되어 있다.

 

프로미스의 개념

 

asyncfunction이라는 비동기 함수가 있다고 가정하자. 이 함수는 param1,param2를 인자로 받아서 비동기로 처리하는 함수이다. promise 패턴에서는 이 asyncfunction을 호출하면, promise라는 것을 리턴한다. promise란 미래 결과에 대한 약속이다. 그리고 promise의 결과가 성공인지 실패인지에 따라서 이를 핸들링하기 위한 로직을 정의해놓는다. asyncfunction이 처리를 끝내고 결과를 리턴하면 promise에 의해 정의된 로직에 따라 결과값을 처리한다.

약간 말이 복잡한데, 이를 풀어서 설명해보면 다음과 같다.

 

·         프로그램      : asyncfunction에게 “param1param2로 처리해줘라고 부탁한다.

·         asyncfunction : “알았어 처리해줄께, 대신 시간이 걸리니 바로 답은 줄 수 없고, 나중에 답을 줄게. 이게 그 약속(promise)라고 하고, 약속(promise) 객체를 리턴한다.

·         프로그램      : ‘언제 끝날지 모르는 작업이구나그러면 이렇게 해줘. 작업이 성공하면 결과 처리 로직을 실행하게 하고, 만약에 실패하면 에러 처리 로직을 처리하게 하자. 이 내용을 니가 준 약속(promise)에 추가로 적어 넣을께

·         asyncfunction : 실행이 성공적으로 종료되었어. 아 아까 준 약속에 성공시에 처리하는 로직이 정의되어 있군. “결과처리로직를 실행하자

 

이런 내용이 어떻게 코드로 구체화 되는지를 살펴보자

 

var promise = asyncfunction(param1,param2);

promise.then(function(result){

      //결과처리로직

},function(err){

      //에러처리로직

}

 

Figure 1 promise를 이용한 비동기 호출 처리 예제

 

var promise = asyncfunction(param1,param2);

첫번째 코드에서 asyncfunction은 앞서 언급한것과 같이 비동기 함수이다. asyncfunction이 호출되고 나서 결과 값이 아니라, 나중에 결과를 주겠다는 약속(promise) 객체를 리턴한다.

 

다음으로 비동기 함수의 처리가 끝났을때 성공과 실패의 경우 어떻게 처리를 할지를 비동기 함수가 리턴한 약속(promise)에 기술해놓는다.이를 위해서 then이라는 키워드를 사용하는데 다음과 같은 포맷을 사용한다.

 

promise.then(결과처리함수(결과값) ,에러처리 함수(err) )

Figure 2 promise.then의 문법

 

비동기 함수 실행이 끝나면, then에 정의된 첫번째 함수 결과처리함수를 실행하여 비동기 함수 실행 결과를 처리한다.

이때 결과처리함수는 비동기 함수가 처리한 내용에 대한 결과 값을 인자로 갖는다.

만약에 에러가 발생하였을 경우에는 then에 두번째 인자로 정의된 에러처리함수를 실행하여 에러를 처리한다. “에러처리함수는 에러의 내용을 err이라는 인자를 통해서 받는다.

 

앞의 예제에서 2~6줄은 then을 이용하여, 첫번째 인자로 결과처리로직을 가지는 함수를 정의하고, 두번째 인자로에러처리로직을 갖는 함수를 정의했다.

 

promise.then(function(result){

      //결과처리로직

},function(err){

      //에러처리로직

}

Figure 3 프로미스에 결과 및 에러 처리를 지정하는 방법

 

 첫번째 결과처리로직을 갖는 함수는 비동기 함수가 리턴해준 결과값인 “result”를 인자로 받고, 두번째 에러처리로직을 갖는 함수는 에러내용을 “err”라는 인자로 받는다.

 

그렇다면 약속(promise)를 리턴하는 비동기 함수는 어떤 형태로 정의되어야 할까?

promise를 지원하는 비동기 함수는 아래와 같은 형태와 같다. 리턴시에 new Promise를 이용하여 promise 객체를 만들어서 리턴하는데, 이때 두가지 인자를 받는다. resolved reject인데, 성공적으로 실행이 되었으면 이 resolved함수 를 호출하고 이때 인자로 결과값을 넣어서 넘긴다. 반대로 실패했을 경우에는 인자로 받은 rejected 함수를 호출하되 호출 인자로 에러 내용을 담고 있는 err을 넣어서 넘긴다.

 

function asyncfunction(param1,param2){

     

      return new Promise(resolved,rejected){

           if(성공하였는가?){

                 // 성공하였을 경우

                 resolved ("결과");

           }else{

                 rejected(Error(err));

           }

      }

}

Figure 4 프로미스 지원 비동기 함수 정의 방법


프로미스 예제

 

그러면 위의 개념에 따라 실제로 작동하는 코드를 작성하자

 

var Promise = require('promise');

 

var asyncfunction = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       resolved('hello'+param);

                 },2000);

      });

         

}

 

var promise = asyncfunction(' terry ');

promise.then(console.log,console.err); // 여기가 비동기 결과에 대한 콜백함

 

Figure 5 간단한 프로미스 함수 및 사용 예제

 

promise를 사용하기 위해서는 promise 모듈을 require 이용하여 불러들인다.

다음으로 asyncfunction을 정의하고 리턴값으로 Promise객체를 리턴한다. Promise 객체 안에서는 처리할 비지니스 로직이 정의되어 있다. 위의 예제에서는  setTimeout을 이용하여 2초를 기다리도록 하였고, 2초후에 콜백함수에서 resolved 함수를 호출하여 promise를 종료하도록 하였다.

 

다음은 이 promise를 리턴하는 비동기 함수를 실제로 호출하고, 이 비동기 함수에 대해서 성공 및 실패에 대한 처리 함수를 then으로 정의한 부분이다.

var promise = asyncfunction(' terry ');

promise.then(console.log,console.err); // 여기가 비동기 결과에 대한 콜백함


then을 이용하여, 성공시 console.log 함수를 호출하도록 하였고, 실패시에는 console.err를 통해서 에러 메시지를 출력하도록 하였다.

 

프로미스 체이닝 (promise chainning)

여러개의 비동기 함수를 순차적으로 실행하는 방법에 대해서 알아보자.

async 프레임웍의 waterfall과 같은 흐름 제어이다.

다음은 asyncfunction1,2,3를 순차적으로 실행하고, 앞 비동기 함수의 결과를 뒤에 따라오는 비동기 함수의 입력값으로 받아서 처리하는 예제이다.


 

var Promise = require('promise');

 

var asyncfunction1 = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       fullfilled('result 1:'+param);

                 },1000);

      });

}

var asyncfunction2 = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       fullfilled('result 2:'+param);

                 },1000);

      });

}

var asyncfunction3 = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       fullfilled('result 3:'+param);

                 },1000);

      });

}

 

var promise = asyncfunction1(' terry ');

promise

.then(asyncfunction2)

.then(asyncfunction3)

.then(console.log);

 

Figure 6 프로미스 태스크 체이닝 예제

 

promise를 리턴하는 3개의 비동기 함수를 정의하였고, 첫번째 함수로 promise를 만든다음. 실행을 하였다. 다음 then을 이용하여, 다음번에 실행해야하는 비동기 함수 asyncfunction2, asyncfunction3를 순차적으로 정의하였고, 마지막의 최종 결과를 출력하기 위해서 최종 then console.log를 지정하여, 결과값을 출력하도록 하였다.

 

result 3:result 2:result 1: terry

 

하나의 예제를 더 살펴보자

다음 예제는 파일을 읽어서 읽은 내용을 다른 파일에 쓰는 내용이다.

 

 

var Promise = require('promise');

 

var fs = require('fs');

var src = '/tmp/myfile.txt';

var des = '/tmp/myfile_promise2.txt';

 

var fread = Promise.denodeify(fs.readFile);

var fwrite = Promise.denodeify(fs.writeFile);

 

fread(src,'utf-8')

.then(function(text){

           console.log('Read done');

           console.log(text);

           return fwrite(des,text); // 체이닝을 하려면 return 해줘야 .

      })

.then(function(){           

           console.log('Write done');

      })

.catch(function(reason){               

           console.log('Read or Write file error');

           console.log(reason);

});

 

console.log('Promise example');

 

Figure 7 프로미스를 이용해서 파일을 읽어서 다른 파일에 쓰는 예제

 

이 코드에서 먼저 주의 깊게 봐야 하는 부분은 denodeify 부분이다.


var fread = Promise.denodeify(fs.readFile);

var fwrite = Promise.denodeify(fs.writeFile);

 

node.js의 비동기 함수들은 프로미스패턴을 지원하지 않는 경우가 많다. 그래서 프로미스 패턴을 지원하지 않는 일반 함수들을 프로미스를 지원할 수 있는 형태로 변경을 해야 하는데, 이 변경을 해주는 함수가 Promise.denodeify이다.

프로미스화가 끝났으면 이 함수를 프로미스를 사용해서 호출할 수 있다.

 

fread(src,'utf-8')

.then(function(text){

           console.log('Read done');

           console.log(text);

           return fwrite(des,text); // 체이닝을 하려면 return 해줘야 .

      })

 


fread를 수행한 후에, then에서 return시 다음 비동기 함수인 fwrite를 수행한다. 이렇게 하면 task들을 체이닝할 수 있다.


프로미스 에러 핸들링

프로미스에서 에러를 핸들링하는 방법에 대해서 알아보자. 앞의 예제에서 then 중간에 catch라는 구문을 사용했는데, catch가 에러핸들러이다.

아래 코드를 보자 아래 코드는 비동기 함수에서 인위적으로 에러를 발생시켜서 처리 하는 코드이다.


 

var Promise = require('promise');

 

var asyncfunction = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       rejected(Error('this is err '+param));

                 },2000);

      });

         

}

 

asyncfunction(' terry ')

.then(console.log,console.error);

 

asyncfunction('cath')

.then(console.log)

.catch(console.error);

 

Figure 8 프로미스에서 에러처리를 하는 예제

 

asyncfunction내의 프로미스에서 setTimeout으로 2초가 지나면, rejected를 이용하여 에러를 리턴하였다.

첫번째 asyncfunction호출에서는 then에 두개의 인자를 넘겼는데, 두번째 console.error가 에러 핸들러이다. 그래서 에러를 console.error로 출력하게 된다.

두번째 asyncfunction 호출에서는 다른 문법의 에러 핸들링을 사용했는데, then에 두개의 인자를 넘기는 대신, catch를 이용해서 에러 핸들러를 정의하였다.

이 예제를 실행하면 다음과 같은 결과를 얻게 된다.

 

[Error: this is err  terry ]

[Error: this is err cath]

Figure 9 프로미스에서 에러처리를 하는 예제 실행 결과

 

만약에 여러개의 태스크가 연결된 비동기 함수 체인을 호출할때 에러 처리는 어떻게 될까? 아래 코드를 보자.

asyncfunction1,2,3,4,5 가 정의되어 있고, 2 4에서 에러를 발생 시키도록 하였다.

그리고 3번과 5번 뒤에 catch를 넣어서 에러 처리를 하도록 하였는데, 그러면 에러 처리 흐름은 어떻게 될까?


var Promise = require('promise');

 

var asyncfunction1 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func1');

                       resolved('func 1 success:'+param+'\n');

                 },500);

      });

}

var asyncfunction2 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func2');

                       rejected(new Error('func 2 error:'+param+'\n'));

                 },500);

      });

}

var asyncfunction3 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func3');

                       resolved('func 3 success:'+param+'\n');

                 },500);

      });

}

var asyncfunction4 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func4');

                       rejected(Error('func 4 error:'+param+'\n'));

                 },500);

      });

}

var asyncfunction5 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func5');

                       resolved('func 5 success:'+param+'\n');

                 },500);

      });

}

 

var promise = asyncfunction1(' terry ');

promise

.then(asyncfunction2)

.then(asyncfunction3)

.catch(console.error) // errorhandler1

.then(asyncfunction4)

.then(asyncfunction5)

.catch(console.error)  // errorhandler2

.then(console.log);

 

 

Figure 10 프로미스 태스크 체인에서 에러 처리를 하는 예제

 

3,5번 뒤에 붙은 catch는 어느 비동기 함수들의 에러를 처리할까? 다음 그림을 보자


Figure 11 프로미스 태스크 체인에서 에러 처리를 하는 예제의 에러 처리 흐름

 

1,2,3 번 뒤에 catch를 정의 했기 때문에, 1,2,3번을 수행하던중 에러가 발생하면 수행을 멈추고 첫번째 에러핸들러인 //errorhandler1으로 가서 에러를 처리한다. 여기서 중요한 점은 에러처리 후에, 다시 원래 제어 흐름으로 복귀한다는 것이다. 흐름을 끝내지 않고, 다음 에러핸들러에 의해서 통제 되는 4,5번을 수행한다. 4,5번의 에러는 4,5번 호출 뒤에 붙어 있는 catch //errorhandler2에 의해서 처리 된다. 마찬가지로 //errorhandler2에 의해서 실행이 된후에 맨 마지막 비동기 함수인 console.log를 실행하게 된다.

 

앞에서 2,4번에 에러를 냈으니 실제 흐름이 어떻게 되는지 확인해보자



Figure 12 프로미스 태스크 체인에서 에러 처리를 하는 예제의 실제 수행 흐름


asyncfunction 1,2가 실행되고 에러를 만나서 첫번째 catch에 의해서 에러 처리가 되고, 에러 처리 후 4번이 실행된후 에러를 만나서 두번째 catch가 실행이 되고 마지막에 정의된 console.log가 실행이 된다.

이 흐름을 그림으로 표현해보면 다음과 같다.

 

실행 결과는 다음과 같다.

 

func1

func2

[Error: func 2 error:func 1 success: terry

 

]

func4

[Error: func 4 error:undefined

]

undefined

 

Figure 13 프로미스 태스크 체인에서 에러 처리를 하는 예제를 실행한 결과 


프로미스 지원 프레임웍

지금까지 promise 모듈을 이용하여 promise 패턴을 이용한 비동기 패턴 처리를 알아보았다. 앞에서 살펴보았듯이 쉽게 콜백헬을 해결할 수 있다. async waterfall 흐름 제어와 동일한 흐름 제어 부분만 살펴보았지만, promise 역시, asyncseries, parallel등과 같은 다양한 흐름 제어 알고리즘을 지원한다.

 이 글에서는 promise 모듈을 사용하였지만, 이 프로미스 패턴을 지원하는 모듈은 이외에도 Q (https://github.com/kriskowal/q) , bluebird (http://bluebirdjs.com/docs/getting-started.html) 등 다양한 프레임웍이 있다.

근래에는 성능이나 기능 확장성이 좋은 bluebird가 많이 사용되고 있으니, 실제 운영 코드를 작성하기 위해서는 다른 프로미스 프레임웍도 검토하기 바란다.

 

참고

https://davidwalsh.name/promises

https://github.com/stackp/promisejs 예제가 좋음

http://programmingsummaries.tistory.com/325 정리가 잘되어 있음 추천.

빠르게 훝어 보는 node.js - async 프레임웍을 이용한 콜백헬의 해결


조대협 (http://bcho.tistory.com)


콜백헬의 정의

 

node.js는 자바스크립트의 콜백 패턴을 사용한다. 그래서 함수들을 순차적으로 실행하고자 할때 콜백 함수들의 중첩이 생겨서 코드가 복잡해지는 문제가 생긴다. 코드가 복잡해지고, 코드의 가독성이 떨어져서 유지 보수가 매우 힘들어진다.

 

파일을 읽어서 쓰는 코드를 보자

 

var fs = require('fs');

var src = '/tmp/myfile.txt';

var des = '/tmp/myfile_async.txt';

 

fs.readFile(src,'utf-8',function(err,data){

     

      console.log(data);

      if(err){

           console.log("Read file error");

      }else{

           console.log("Read file is done");

           fs.writeFile(des,data,function(err){

                 if(err){

                       console.log("Write file error");

                       return;

                 }

                 console.log("Write file is done");

           });

      }

});

 

Figure 1 파일을 읽어서 쓰는 코드에서 콜백이 중첩된 예제

 

파일을 읽은후에, 파일을 쓰려면, 파일을 읽는 함수 readFile에서 파일을 다 읽은 후에 호출되는 콜백 함수에서 writeFile 함수를 호출해야 한다.

만약에 위의 예제처럼 두개의 비동기 함수가 아니라, 여러개의 비동기 함수를 순차적으로 실행해야 한다면?

아래코드를 보면 6개의 비동기 함수를 순차적으로 호출하기 위한 코드인데, 콜백 함수가 6번 중첩이 되었음을 볼 수 있다.

알고리즘을 제외한 코드인데, 알고리즘이 들어가 있다면 코드는 훨씬 복잡해지게 된다.

 

 

asyncfunction(params,function(){

      asyncfunction(params,function(){

           asyncfunction(params,function(){

                 asyncfunction(params,function(){

                       asyncfunction(params,function(){

                             asyncfunction(params,function(){

                             });

                       });

                 });

           });

      });

});

 

Figure 2 callback hell의 개념

 

이러한 복잡성을 해결해주기 위해서, 자바스크립트에서는 몇몇 프레임웍을 제공하는데 대표적으로 사용되는 프레임웍으로는 async (https://github.com/caolan/async) promise.js (https://www.promisejs.org/) 가 있다.

Async

asyncnode.js의 콜백헬 문제를 풀기 위해서 개발되었지만, 현재는 브라우져에서도 사용이 가능하며 자바스크립 기반의 애플리케이션의 콜백헬 문제를 푸는데도 사용이 가능하다. 콜백헬 뿐 아니라, 20여가지의 추가 함수를 지원하고 있고, parallel과 같은 동시 수행이 가능한 코드의 동시성 제어로도 다양하게 사용이 가능하다.

여기서는 async에서 자주 사용되는 동시성 제어 흐름에 대해서 알아보도록 한다.

 

waterfall

waterfall은 흐름제어에 있어서 여러개의 비동기 함수를 순차적으로 실행하되, 앞의 비동기 함수의 결과 값을 뒤의 비동기 함수에 인자로 전달하는 흐름이다.

 



Figure 3 waterfall 흐름 제어의 개념


이 그림은 비동기 함수 asyncfunctionaA, asyncfunctionB,asyncfunctionC 를 순차적으로 실행하고, 각 단계에서 다온 리턴값을 다음 단계로 넘기는 waterfall 흐름의 개념을 표현하고 있다. 각각의 단계에서 처리되는 함수를 async에서는 task라고 정의한다.

task가 모두 수행이 끝나면, 맨 마지막에 정의된 callback 함수가 수행된다.

만약task 수행도중에 에러가 발생하면, task 수행을 멈추고 callback 함수를 바로 호출하는데, 이때 err라는 인자에 에러 내용을 채워서 넘긴다. 맨 마지막 callback함수는 errnull이면 정상적으로 모든 task들이 성공적으로 호출된것으로 처리하고, 만약에 null이 아닌경우 task 수행도중에 에러가 난것으로 파악하여 에러 처리를 한다.

 

waterfall의 문법을 살펴보자 


waterfall(tasks,[callback])

Figure 4 waterfall 제어 흐름 문법


waterfall에는 두가지 인자를 넘기도록 되어 있다.

·         첫번째 인자 tasks는 배열로, 순차적으로 실행될 함수들을 배열로 정의한다.

·         두번째 인자는 callbac(err,[result])으로, 모든 함수가 순차적으로 끝난후에 맨 마지막에 수행되는 함수이다. 또한 tasks를 실행하다가 에러가 발생하면 이 최종 callback을 호출한다.
task
를 실행하다가 에러가 발생하면 실행을 멈추고 이 최종callback으로 첫번째 인자인 err에 에러 내용을 넣어서 전달한다., 만약에 에러가 발생하지 않았을 경우에는 모든 task 완료한 후에 err‘null’을 전달한다. 두번째 인자는 생략이 가능한데, 마지막 tasks에서 넘어온 결과에 대한 값을 저장하고 있는 변수 이다.
선택적으로 result 인자를 정의할 수 있는데, 이 경우 waterfall 에 정의된 task의 맨마지막 task (최종callback 이전에 바로 실행된 task)의 리턴값을 넘겨 받는다.

 

이해를 돕기 위해서 코드를 보자. 아래 코드는 위의 그림에 표현된 asyncfunctionA,B,C를 순차적으로 호출하는 흐름을 waterfall로 표현한 슈도 코드이다. (개념을 돕기위한 코드로 실제로 실행이 되지는 않는다).

var async = require('async');

 

async.waterfall([

              function(callback){

                 asyncfunctionA(param,callback);

              },

              function(resultA,callback){

                 asyncfunctionB(resultA,callback);

              },

              function(resultB,callback){

                 asyncfunctionC(resultB,callback);

              }

             ],

             function(err,resultC){

                       if(err) errorHandler(err);
                             // handle resultC

                  }

);

 

Figure 5 waterfall 제어 흐름의 사용 방법 (psedo code)

 

waterfall함수에 첫번째 인자는 배열 형태로 function(callback), function(resultA,callback),function(resultB,callback)을 기술하였다. 각 함수에서는 우리가 호출할 비동기 함수 asyncfunctionA,B,C를 각각 호출하였다.

배열에 인자로 들어가 있는 각 함수는 맨마지막 인자로 callback을 전달 받는데, callback은 다음 호출한 함수를 지칭한다.

맨 처음 호출한 function(callback)에서 이 callbackfunction(resultA,callback)을 지칭하고, 여기에 있는 callback은 다음  function(resultB,callback)을 지칭한다.

 

function(callback){

                 asyncfunctionA(param,callback);

              }

 

에서 asyncfunctionA에서 callback을 인자로 넘겼는데, 원래 asyncfunctionA의 함수 정의가 다음과 같다.

asyncfunctionA = function(param,function(resultA){

                 }

 

asyncfunctioA는 비동기 함수로 실행이 끝나면 callback함수를 호출하게 되어 있는데, callback함수의 인자는 resultA를 받게되는 있는 형태이다.

그런 이유로waterfall 로 넘겨지는 함수 배열중 두번째 함수의 형이 function(resultA,callback) 형태를 띄게 되는 것이다.

 

실제로 작동하는 코드를 구현해 보자. 아래 코드는 앞서 async없이 작성했던 파일을 읽어서 다른 파일에 쓰는 코드를 asyncwaterfall을 이용하여 구현한 예이다.

 

아래 예제를 실행하기 위해서는 package.json“async” 의존성을 추가하거나, 또는 실행 환경에서

%npm install async

를 실행해서 async 모듈을 설치해야 한다.

 

var async = require('async');

 

var fs = require('fs');

var src = '/tmp/myfile.txt';

var des = '/tmp2/myfile_async.txt';

 

async.waterfall([

              function(callback){

                 fs.readFile(src,callback);

              },

              function(data,callback){

                 fs.writeFile(des,data,callback);

              }

             ],

             function(err){

                       if(err) console.log(err);

                  }

);

 

 

Figure 6 waterfall 흐름제어를 이용하여 파일을 읽어서 다른 파일에 쓰는 예제

 

waterfall에서 처음 호출하는 함수에서는 fs.readFile을 이용해서 파일을 읽었다.

function(callback){

                 fs.readFile(src,callback);

              },

다음으로 fs.readFile에 대한 콜백 함수를 waterfall에서 넘겨주는 callback의 형태는 fs.readFile의 포맷이 fs.readFile( filename, function(err,data)) 형태이기 때문에 앞의 err 인자이외에 ‘data’ 인자만 필요하다.

그래서 다음에 오는 함수가 다음과 같이 ‘data 인자를 갖는 function(data,callback)이다.

function(data,callback){

                 fs.writeFile(des,data,callback);

              }

 

인자로 받은 datafs.writeFile에 넘겨서 파일을 쓰게 한다.

마지막 부분은 최종 콜백 함수로, 모든 함수가 실행이 끝나면 실행이 되고 또는 waterfall에 정의된 task 실행중에 에러가 나도 실행이 되는 부분이다. 첫번째 인자로 항상 err를 받는다. 에러가 없을 경우에는 이 값은 null 이된다.

function(err){

                       if(err) console.log(err);

                  }

 


본 예제에서는 파일 쓰기가 완료된 후에, 별도의 액션은 취하지 않아서 별다른 코드가 없지만, 에러가 발생했을때 처리하기 위해서 if(err)를 통해서 에러가 있으면 콘솔로 출력하도록 하였다.

 

series

series 흐름은, waterfall가 유사하게 정의된 task를 순차적으로 실행한다.

차이는 waterfall은 각 task에서 나온 결과를 다음 task의 입력으로 넘겼다면,

series는 각 task의 결과를 취합하여, 최종 callback에 배열 형태로 넘겨준다.

 

개념도를 보면 다음과 같다. series 흐름에 전달된 asyncfunctionA,B,C를 순차적으로 실행하고, 그 결과를 취합해서 맨 마지막 callbackresults라는 배열로 넘긴다.

waterfall과 마찬가지로 task수행중에 에러가 나면 실행을 중단하고, 최종 callback로 흐름을 옮기고, err에 에러에 대한 디테일한 내용을 기술해놓는다.



Figure 7 series 흐름 제어의 개념

이때 results 배열에는 task들의 결과값이 실행 순서대로 들어간다. 위의 그림에서 최종 callback으로 전달되는 results  배열에 asyncfunctionA에 대한 결과값 resultA, 두번째 인자는 asyncfunctionB의 결과값 B, 그리고 마지막 세번째 인자는 asyncfunctionC의 결과값 C가 들어간다.

 

series 흐름제어의 문법을 보자


series(tasks, [callback] )

·         tasks : 동시에 수행할 함수들을 배열로 정의

·         callback : 최종 callback으로, callback(err,results) 형태로 정의된다. 에러가 발생하면 err 변수에 에러에 대한 내용이 넘어오고, 정상적인 수행 완료인 경우에는 errnull로 전달되고, task의 실행 결과가 results 변수에 배열로 정의되서 리턴된다.

 

아래 예제는 위의 series 흐름을 async 프레임웍을 통해서 구현한 예제이다.


var async = require('async');

 

async.series([

              function(callback){

                 callback(null,'resultA');

              },

              function(callback){

                 callback(null,'resultB');

              },

              function(callback){

                 callback(null,'resultC');

              }

             ],

             function(err,results){

                       if(err) console.log(err);

                       console.log(results)

                             // handle resultC

                  }

);

 

Figure 8 sereis 흐름 제어를 사용한 예

실행하면 다음과 같은 결과가 나온다.

 




 

series 흐름은 서로 데이타에 대한 의존성은 없지만 순차적으로 실행이 되어야 하는 경우등에 활용이 될 수 있다.

예를 들어 사용자 정보가 MySQLMongoDB에 분산 저장되어 있고, MySQL에는 사용자ID와 암호화된 비밀번호를 저장하고, 기타 다른 정보를  MongoDB에 저장한다고 가정할때, 새로운 사용자 생성은 MySQL에 사용자ID등의 정보를 저장한 후에, MongoDB에 순차적으로 저장해야 한다면, series 흐름이 유용하게 사용될 수 있다.

 

parallel

async 모듈에서 마지막으로 살펴볼 흐름제어는 parallel이다

이름에서도 볼 수 있듯이 동시에 여러개의 task를 실행하는 방법으로, 마치 멀티 쓰레드와 같은 효과를 낼 수 있어서, 실행 시간을 단축시킬 수 있다.

 

아래 개념 그림을 보자. 3개의 task를 병렬로 동시에 수행하는 개념이다.

asyncfunctionA,B,C를 동시에 수행하고 모든 작업이 끝나면 최종 callback을 수행한다.

수행결과는 최종 callback에 배열 형태로 전달된다.



Figure 9 parallel 흐름 제어의 개념

 

에러 처리는 parallel로 수행중이던 task중에 에러가 발생하면, 바로 최종 callback에 에러를 넘긴다. 단 이때 에러가 발생하지 않은 task들은 수행을 멈추지 않고 끝까지 수행되다.

 

parallel흐름 제어를 사용할때 주의해야 할점은 멀티 쓰레드처럼 작업을 수행해주는 것이지 실제 멀티 쓰레드가 아니다. IO작업등이 있는 task들의 경우 IO 요청을 보내놓고, 응답이 올때 까지 다른 task를 실행해서 병렬로 실행하는 것과 같은 효과를 주는 것이다. 만약에 task자체가 IO작업등이 없고 계속해서 CPU를 사용한다면, 그 작업이 끝난후에 다음 task로 넘어가기 때문에, 병렬 처리가 일어나지 않는다. (이런 경우에는 series를 쓰는게 나음)

 

parallel이 효과적으로 사용될 수 있는 곳은 IO쪽인데, 원격으로 여러개의 REST API를 동시 호출하거나, 또는 동시에 여러개의 쿼리를 조회하는 것들에 효과적으로 사용할 수 있다.

 

parallel 흐름 제어의 문법은 다음과 같다. series 흐름 제어 문법과 거의 동일하다고 보면 된다.

parallel(tasks,[callback])

Figure 10 parallel 흐름 제어 문법

·         tasks : 동시에 수행할 함수들을 배열로 정의

·         callback : 최종 callback으로, callback(err,results) 형태로 정의된다. 에러가 발생하면 err 변수에 에러에 대한 내용이 넘어오고, 정상적인 수행 완료인 경우에는 errnull로 전달되고, task의 실행 결과가 results 변수에 배열로 정의되서 리턴된다.

 

예제 코드를 살펴보자.

var async = require('async');

 

async.parallel([

              function(callback){

                 callback(null,'resultA');

              },

              function(callback){

                 callback(null,'resultB');

              },

              function(callback){

                 callback(null,'resultC');

              }

             ],

             function(err,results){

                       if(err) console.log(err);

                       console.log(results)

                             // handle resultC

                  }

);

 

Figure 11 parallel 흐름 제어 예제

 

parallel 흐름 제어의 문법은 series와 다르지 않다. 단지 내부 수행에 있어서 순차적으로 수행을 하는지 아니면 병렬로 동시에 수행을 하는지에 따른 차이만 있다. 위의 코드는 resultA, resultB, resultC를 내는 3개의 task를 동시에 수행하고, 수행이 끝나면, 최종 콜백 함수인 function(err,results)에서 results 배열에 결과를 출력하는 코드이다. 코드를 실행하면 다음과 같이 callback(null, 결과값)으로 넘긴 resultA,resultB,resultC 문자열이 출력 되는 것을 확인할 수 있다.

 



Figure 12 parallel 흐름 제어 예제 코드 실행 결과

 

실제 코드를 보면서 이해를 돕자.다음은 소스 코드 저장소인githubbwcho75라는 사용자 정보를 조회하는 REST API, bwcho75사용자의 follower를 조회하는 REST API 두개를 parallel을 이용해서 동시에 호출하여 결과를 화면에 출력하는 코드이다.

간단하게 REST 호출을 도와주는 모듈로 unirest를 사용하였다. http://unirest.io/

모듈을 사용하기 위해서 npm을 이용하여 코드를 작성하기 전에 unirest 모듈을 설치한다.

%npm install unirest

 

var async = require('async');

var unirest = require('unirest');

 

var start = new Date().getTime();

async.parallel([

                function(callback){

                      unirest.get('https://api.github.com/users/bwcho75')

                      .header('Accept', 'application/json')

                      .header('User-Agent','mynodeapplication')

                      .end(function(response){

                           callback(null,response.body);

                      })

                     

                },

                function(callback){

                      unirest.get('https://api.github.com/users/bwcho75/followers')

                      .header('Accept', 'application/json')

                      .header('User-Agent','mynodeapplication')

                      .end(function(response){

                           callback(null,response.body);

                      })                  

                }

                ],

                function(err,results){

                             console.log('Result 1 -------');

                             console.log(results[0]);

                             console.log('Result 2 -------');

                             console.log(results[1]);

                             console.log('elapsed time : '+(new Date().getTime() - start));

     

});

 

 

Figure 13 asyncparallel 흐름 제어를 이용하여 두개의 github REST API를 호출하는 예제

 

코드를 실행하면 다음과 같이 Result1, Result2에 대한 결과를 얻은것을 볼 수 있다.

Result 1 -------

{ login: 'bwcho75',

  id: 3168358,

  avatar_url: 'https://avatars.githubusercontent.com/u/3168358?v=3',

  gravatar_id: '',

  url: 'https://api.github.com/users/bwcho75',

  html_url: 'https://github.com/bwcho75',

  : 중략

  followers: 7,

  following: 0,

  created_at: '2013-01-02T10:33:11Z',

  updated_at: '2016-02-02T02:38:53Z' }

Result 2 -------

[ { login: 'yshu0307',

    id: 1740343,

  : 중략

    type: 'User',

    site_admin: false },

  { login: 'kmoonki',

    id: 1725366,

  : 중략

    type: 'User',

    site_admin: false },

  { login: 'z-n',

    id: 5715797,

  : 중략

    type: 'User',

    site_admin: false },

  : 중략

]

elapsed time : 1915

Figure 14 parallel 흐름 제어를 이용하여 두개의 github REST API를 실행한 결과


병렬로 API가 호출되었는지를 확인 하기 위해서, 맨 마지막 부분에 API 호출에 소요된 시간을 출력하였는데, 예제 코드에서 async.parallelasync.series로 바꿔서 호출해보면 순차 호출로 바뀌게 되는데, 수행시간이 본인이 테스트한 경우 200ms정도 더 나왔다. 즉 병렬 호출을 통해서 200ms 정도의 수행 시간을 절약한것이다.

 

지금까지 async 모듈을 이용하여 콜백헬을 해결하고, 제어 흐름을 컨트롤할 수 있는 방법에 대해서 알아보았다. async 모듈에서 위의 3가지 흐름제어가 많이 사용되기는 하지만 이외에도 많은 흐름 제어 방식이 있기 때문에, https://github.com/caolan/async 를 참고하기 바란다.

 


 

 

Worker 대한 개념 설명

[개인 공부 노트이기 때문에 설명이 매우 어렵습니다. 나중에 이해하면 다시 개념 정리해서 올리도록 하겠습니다.]

관련 코드 : https://github.com/bwcho75/vertx_study/tree/master/worker_sample




앞단의 Network 핸들러 (TCP,HTTP)등에서 request 읽은 후에, Event Bus 통해서 Backend Worker 보낸다. 개념은 JMS MQ등을 이용해서 뒷단에서 Message Consumer 들이 처리하는 Q 기반의 Async 기반의 개념과 매우 유사하다.

그럼 Vert.x에서 차이점은 Worker 작업을 처리한 후에, 작업을 끝내면 작업 완료 메시지가 Message Producer ( Network Handler)에게 Call back 형태로 전달 된다.

Async – Call back Pattern. 일반적인 메시지 기반의 시스템이 Async-Fire & Forget 패턴을 사용하는 것에 비하면 상당히 메리트가 많다.

Network Handler입장에서는 Connection 물고 있기는 하지만, 작업은 하지 않고 뒷단의 Worker에서 처리하기 때문에, 많은 request 받아드릴 있고, 아주 많은 작업이 온다하더라도 뒷단의 Queue 통해서 비동기 처리되기 때문에, Timeout 제약이 없는 , 많은 양의 request 처리할 있다.

Call back패턴이기 때문에, Async 메시지를 보낼 , Callback function 바인딩 해야 한다.

EventBus.send('call_bus',key,reply_handler)

// reply_handler call back이다.

만약에 http 경우 reply handler에서 해당 connection reply 메시지를 보내고 싶을때는 http handler request 객체를 reply_hanlder pass해야 하는데, reply_handler 인터페이스 규격에는 message parameter 되어 있지, request response 객체와 같은 다른 parameter 추가적으로 넘길 수가 없다. 이를 해결 하는 방법은 inner function 사용하는 방법이 있다.

def url_handler(req):

  # read parameter from URI

  key = req.params['key']

  def reply_handler(message):

          reply_handler_logic(req,message) 

  print 'Im url handler. I just sent message to event bus '

EventBus.send('call_bus',key,reply_handler)

 

위의 코드를 보면, reply_handler 함수 내에서 reply_handler_logic이라는 함수를 호출할 때, “req” 객체를 넘기는 것을 볼 수 있다. reply_handlerurl_handler안에 있는 inner function이기 때문에, url_handler 범위 안의 변수를 모두 사용할 수 있는 것이고, 결과적으로 request 객체를 넘길 수 있는 것이 되낟.

Vertx에서는 앞단의 Message Handler 경우 하나의 Thread 독립된 Class Loader에서 실행된다. 이를 instance라고 하고, 하나의 JVM에서는 여러 개의 instance 수행할 있으나, Thread 수를 CPU Core 수보다 많이 경우 Thread Context Switchin 대한 부하가 생기기 때문에 일반적으로 core 보다 작게 설정한다. Network Handler Verticle 반드시 같은 쓰레드에서 수행된다.

그러나 Worker 경우에는 특정 Thread에서 같은 Veticle 생성되는 것이 아니라 WAS 같은 Thread Pool 형식을 사용한다. 하나의 Worker Verticle 여러 개의 Thread에서 동시에 수행될 있다.  

http://purplefox.github.io/vert.x/manual.html#worker-verticles 보면, worker 동시에 하나의 thread에서만 수행된다고 나와 있는데, “Worker verticles are never executed concurrently by more than one thread. Worker verticles are also not allowed to use TCP or HTTP clients or servers. Worker verticles normally communicate with other verticles using the vert.x event bus, e.g. receiving work to process.

실제 테스트해보면 instance 수만큼 동시 수행되는 듯하다.

앞단의 NetworkHandler에서 Worker로의 메시지는 Eventbus(일종의큐) 통해서 전달되낟. EventBus 내부적으로 DataGrid Hazle Cast 사용한다.

vertx.deploy_verticle('http_server.py') 통해서 기동하고 worker 경우
vertx.deploy_worker_verticle('worker.py')
이용해서 기동한다.

만약에 worker verticle 수를 조정하고 싶으면

vertx.deploy_worker_verticle('worker.py','{"dummy":"dummy"}',10)

같이 한다.(Vertx 내부적으로 Worker thread pool 설정하는 코드는 https://github.com/eclipse/vert.x/blob/master/vertx-core/src/main/java/org/vertx/java/core/impl/VertxExecutorFactory.java)