블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Serveless를 위한 오픈소스 KNative #2 Eventing


조대협 (http://bcho.tistory.com)


knative의 다른 모듈로써는 비동기 메세지 처리를 위한 eventing 이라는 모듈이 있다. 카프카나, 구글 클라우드 Pub/Sub, AWS SQS와 같은 큐에서 메시지를 받거나 또는 Cron과 같은 타이머에서 이벤트가 발생하면 이를 받아서 처리할 수 있는 비동기 메커니즘을 제공하는 모듈이라고 보면 된다.


메시지 큐나 cron 과 같이 이벤트를 발생 시키는 자원들은 knative에 event source 라는 Custom Resource로 등록이 되고, 등록된 event source는 이벤트가 발생되면 지정된 knative 서비스로 이벤트 메시지를 HTTP로 전송한다. 이때 이벤트를 받는 knative 서비스는 앞에서 언급한 knative serving의 서비스이다. 이때 이벤트에 대한 스펙은 CNCF Serverless WG 에서 정의한 CloudEvents 스펙에 기반한다.

Hello Eventing

자세하게 Eventing에 대해서 알아보기 전에 간단한 예제를 살펴보자. 예제는 knative.dev의 cronjob  예제이다.  Crontab으로 이벤트를 생성하면, event-display 라는 서비스에서 이 이벤트를 받아서 이벤트의 내용을 간략하게 로그로 출력하는 예제이다.


먼저 이벤트를 읽어드릴 event-display 서비스를 배포하자. 해당 서비스는 HTTP post로 받은 이벤트의 내용을 log로 출력해주는 코드로 이벤트의 포맷은 앞에서 설명한 CloudEvent의 포맷을 따른다.

Go 로 구현된 코드이며, 코드 원본은 여기에 있다.

 해당 컨테이너를 배포하기 위해서 아래와 같이 service.yaml 파일을 만들고, kubectl apply -f service.yaml 을 이용해서 배포하면, crontab 에서 이벤트를 받는 serving 인스턴스가 준비된다.

apiVersion: serving.knative.dev/v1alpha1

kind: Service

metadata:

 name: event-display

spec:

 runLatest:

   configuration:

     revisionTemplate:

       spec:

         container:

           image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display

<그림. Event consumer용 knative 서비스 배포>


다음 Crontab event 소스를 아래와 같이 yaml로 정의한다.


apiVersion: sources.eventing.knative.dev/v1alpha1

kind: CronJobSource

metadata:

 name: test-cronjob-source

spec:

 schedule: "*/2 * * * *"

 data: '{"message": "Hello world!"}'

 sink:

   apiVersion: serving.knative.dev/v1alpha1

   kind: Service

   name: event-display

<그림. Crontab event source 정의>


spec>schedule 부분에 이벤트 주기에 대한 설정을 crontab 포맷을 따라서 하고, data 부분에 cron 이벤트가 발생할때 마다 보낼 데이타를 정의한다.

데이타를 보낼 목적지는 sink 부분에 지정하는데, kind에 타입을 정의하고 (여기서는 knative의 Service로 지정) 그리고 service 의 이름을 name에 정의한다. 앞에서 knative serving 서비스를 event-display로 지정하였기 때문에, 서비스명을 event-display로 정의한다.

yaml 파일 설정이 끝났으면 kubectl apply -f  명령을 이용해서 이벤트 소스를 등록하고, 동작을 하는지 확인해보도록 하자.


%kubectl logs -l serving.knative.dev/service=event-display -c user-container --since=10m


명령을 이용하면 앞에서 배포한 event-display 서비스의 로그를 볼 수 있는데, 결과를 보면 다음과 같다.



Data 부분에서 crontab 이벤트 소스에서 보내온 “message”:”Hello world!” 문자열이 도착한것을 확인할 수 있다.

Eventing detail

이벤트는 앞의 예제에서 본것과 같이 이벤트 소스에서 바로 Knative 서빙에서 받아서 처리하는 가장 기본적인 비동기 이벤트 처리 패턴이다.


Broker & Trigger

이러한 패턴이외에도 좀 더 다양한 패턴 구현이 가능한데, 두번째가 Broker와 Trigger이다. Broker는 이벤트 소스로 부터 메시지를 받아서 저장하는 버킷 역할을 하고, Broker에는 Trigger를 달 수 있는데, Trigger에는 메시지 조건을 넣어서, 특정 메시지 패턴만 서비스로 보낼 수 있다. 위의 패턴에서 필터를 추가한 패턴으로 보면 된다.



이해를 돕기 위해서 예제를 보자. 다음은 knative.dev 공식 사이트에 나와 있는 예제중에, Google Cloud Pub/Sub Source를 Broker로 연동하는 예제이다.


# Replace the following before applying this file:

#   MY_GCP_PROJECT: Replace with the GCP Project's ID.


apiVersion: sources.eventing.knative.dev/v1alpha1

kind: GcpPubSubSource

metadata:

 name: testing-source

spec:

 gcpCredsSecret:  # A secret in the knative-sources namespace

   name: google-cloud-key

   key: key.json

 googleCloudProject: MY_GCP_PROJECT  # Replace this

 topic: testing

 sink:

   apiVersion: eventing.knative.dev/v1alpha1

   kind: Broker

   name: default

<그림. github-pubsub-source.yaml>


위의 코드는 GCP Pub/Sub Source를 등록하는 부분인데, sink 부분은 이 소스에서 오는 메시지를 어디로 보낼지를 정하는 부분이다. 위에 보면 Broker로 보내는것을 볼 수 있다. Broker는 Default Broker로 보낸다.


다음은 Broker에서 받은 메시지를 Trigger 조건에 따라서 Knative Serving 서비스로 보내는 설정이다.


apiVersion: serving.knative.dev/v1alpha1

kind: Service

metadata:

 name: event-display

spec:

 template:

   spec:

     containers:

     - # This corresponds to

       # https://github.com/knative/eventing-sources/blob/release-0.5/cmd/event_display/main.go           

       image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display@sha256:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d


---


# The GcpPubSubSource's output goes to the default Broker. This Trigger subscribes to events in the

# default Broker.


apiVersion: eventing.knative.dev/v1alpha1

kind: Trigger

metadata:

 name: gcppubsub-source-sample

spec:

 subscriber:

   ref:

     apiVersion: serving.knative.dev/v1alpha1

     kind: Service

     name: event-display


< 그림. Trigger와 이벤트 메시지를 수신하는 Service를 정의한 부분>


서비스는 event-display라는 서비스를 정의하였고, 그 아래 Trigger 부분을 보면 gcppubsub-source-sample 이라는 이름으로 Trigger를 정의하였다. Broker 명을 정의하지 않으면 이 Trigger는 default broker에 적용된다. 별다른 조건이 없기 때문에, Broker의 모든 메시지를 대상 서비스인 event-display로 전달한다.

Channel & subscription

다음 개념은 Channel과 subscription 이라는 개념인데, Channel을 메시지를 저장 후에, Channel에 저장된 메시지는 메시지를 수신하는 Subscription을 통해서 다른 Channel로 포워딩 되거나 또는 Service로 전달 될 수 있다.



<그림. Channel과 Subscription 개념도>


앞에서 Channel에서는 메시지를 저장한다고 했는데, 그러면 저장할 장소가 필요하다. 저장할 장소는 설정으로 다양한 메시지 저장소를 사용할 수 있는데, 현재 메모리, Apache Kafka 또는 NATS Streaming을 지원한다.


간단한 예제를 살펴보자 예제는 이 문서를 참고하였다

먼저 아래 설정을 보자


apiVersion: sources.eventing.knative.dev/v1alpha1

kind: GcpPubSubSource

metadata:

 name: testing-source

spec:

 gcpCredsSecret:  # A secret in the knative-sources namespace

   name: google-cloud-key

   key: key.json

 googleCloudProject: knative-atamel  # Replace this

 topic: testing

 sink:

   apiVersion: eventing.knative.dev/v1alpha1

   kind: Channel

   name: pubsub-test



< 그림. GCPPubSub Event Source 정의한 코드>


위 설정은 GCP Pub/Sub을 Event source로 등록하는 부분이다. 이벤트 소스로 등록 한후에, 이벤트를 sink 부분에서 pubsub-test라는 Channel로 전달하도록 하였다.

다음 아래는 Channel을 정의한 부분인데, pubsub-test 라는 이름으로 Channel을 정의하고 "provisioner” 부분에, 메시지 저장소를 "in-memory-channel” 로 지정해서 메모리에 메시지를 저장하도록 하였다.

apiVersion: eventing.knative.dev/v1alpha1

kind: Channel

metadata:

 name: pubsub-test

spec:

 provisioner:

   apiVersion: eventing.knative.dev/v1alpha1

   kind: ClusterChannelProvisioner

   name: in-memory-channel

< 그림. Channel 정의한 코드>



apiVersion: serving.knative.dev/v1alpha1

kind: Service

metadata:

 name: message-dumper-csharp

spec:

 runLatest:

   configuration:

     revisionTemplate:

       spec:

         container:

           # Replace {username} with your actual DockerHub

           image: docker.io/{username}/message-dumper-csharp:v1

---

apiVersion: eventing.knative.dev/v1alpha1

kind: Subscription

metadata:

 name: gcppubsub-source-sample-csharp

spec:

 channel:

   apiVersion: eventing.knative.dev/v1alpha1

   kind: Channel

   name: pubsub-test

 subscriber:

   ref:

     apiVersion: serving.knative.dev/v1alpha1

     kind: Service

     name: message-dumper-csharp

< 그림. Serving과 subscription을 정의 코드>


Channel에 저장된 메시지를 다른 Channel로 보내거나 또는 Service로 보내려면 Subscription을 거쳐야 한다. 위에서 gcppubsub-source-sample-charp이라는 subscription을 정의하였고, 이 subscription이 연결되는 Channel은 spec > channel 부분에 아래와 같이 정의 하였다.


aspec:

 channel:

   apiVersion: eventing.knative.dev/v1alpha1

   kind: Channel

   name: pubsub-test

< 그림. 위의 Subscription 정의에서 Channel 정의 부분>


그리고 그 채널에서 받은 메시지를 subscriber > ref 부분에서 아래와 같이 message-dumper-charp이라는 서비스로 포워딩 하도록 하였다.

 subscriber:

   ref:

     apiVersion: serving.knative.dev/v1alpha1

     kind: Service

     name: message-dumper-csharp

< 그림.위의 Subscription 정의에서 Service 정의 부분>


전체적으로 Eventing 모듈을 이해하는데 시간이 많이 걸렸는데, Eventing 모듈은 Serving 모듈에 비해서 예제가 적고, 공식 문서에 아직 설명이 부족하다. 예를 들어서 소스 → 서빙으로 메시지를 보낼때 스케일링할 경우 문제가 없는지. Channel → subscription 으로 메시지를 보낼때 Trigger를 사용할 수 있는지 등 정보가 아직 부족해서 자세한 분석이 어려웠다. Knative는 현재 0.5 버전으로 버전이고, Event Source 들도 아직 개발 단계가 아니라 PoC (Proof Of Concept : 기술적으로 가능한지 테스트를 하는 단계) 단계 이기 때문에 제대로 사용하기에는 시간이 더 걸릴 듯 하다.

본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

로그 시스템 #1 - 자바 로그 프레임웍

조대협 (http://bcho.tistory.com)

로그 시스템

로그 시스템은 소프트웨어의 이벤트를 기록 함으로써, 소프트웨어 동작 상태를 파악하고 문제가 발생했을때 이 동작 파악을 통해서 소프트웨어의 문제를 찾아내고 해결하기 위해서 디자인 되었다.

주로 로그 파일이라는 형태로 하나의 파일에 이벤트들을 기록하였다.


그러나 소프트웨어 스택이 OS, 미들웨어, 사용자 애플리케이션 (자바나 파이썬등으로 구현된 애플리케이션)으로 점점 다중화되고 시스템이 대형화 되면서 한대가 아니라 여러대의 서버에 로그를 기록하고 또한 마이크로 서비스 아키텍처로 인하여 서버 컴포넌트가 분산됨에 따라서 로그를 수집해야할 포인트가 많아지게 되었다. 이로 인해서 로그 시스템이 분산 환경을 지원해야 할 필요가 되었고, 단순히 파일로 로그를 기록하는 것만으로는 이러한 여러 시스템과 다중 계층에 대한 모니터링이 불가능하게 되었다.


또한 데이터 분석의 중요성이 대두됨에 따라서, 에러등의 동작 파악성의 로그 뿐만 아니라 사용자의 액티버티를 수집하여 데이터 분석에 사용하기 위해서 데이터 수집 역시 로그 시스템을 통하기 시작하였다.


그래서 몇개의 글에 걸쳐서 좋은 로그 시스템을 개발하기 위한 아키텍처에 대해서 설명하고자 한다.

좋은 로그 시스템이란

먼저 좋은 로그 시스템의 기본 개념을 정의 해보면 다음과 같다.

  • 로그 메시지는 애플리케이션의 동작을 잘 이해할 수 있도록 충분히 구체적이어야 한다.

  • 로그 메시지를 기록하는데 성능 저하가 없어야 한다.

  • 어떤 배포 환경이라도 로그를 수집하고 저장할 수 있도록 충분히 유연해야 한다. (분산 환경 지원, 대용량 데이타 지원등)

자바 로깅 프레임워크

각 프로그래밍 언어마다 고유의 로깅 프레임워크을 지원하지만, 특히 자바의 경우에는 그 프레임웍 수가 많고 발전된 모델이 많아서 자바 프레임워크를 살펴보고 넘어가고자 한다.  

자바는 역사가 오래된 만큼 많은 로깅 프레임웍을 가지고 있다. log4j, logback, log4j2,apache common logging, SLF4J 등 다양한 프레임워크 들이 있는데, 그 개념과 장단점을 알아보도록 하자.

SLF4J

SLF4J는 (Simple Logging Facade for Java)의 약자로 이름이 뜻하는 것과 같이 로깅에 대한 Facade 패턴이다. SLF4J는 자체가 로깅 프레임웍이 아니라, 다양한 로깅 프레임웍을 같은 API를 사용해서 접근할 수 있도록 해주는 추상화 계층이다. 그래서 다른 로그프레임웍과 같이 사용해야 하는데, 보통 Log4J, Logback, Log4J2등이 많이 사용된다. 즉 애플리케이션은 SLF4J API 인터페이스를 통해서 호출하지만, 실제로 호출되는 로깅 프레임웍은 다른 프레임웍이 호출된다는 이야기이다. 이렇게 추상화를 통해서 용도와 목적에 맞게 다른 로깅 프레임워크 으로 쉽게 전환이 가능함은 물론이고, 로깅에 필요한 코드들을 추상화해주기 때문에, 훨씬 쉽고 간단하게 로깅이 가능하다. apache common logging 역시, SLF4J와 같이 다른 로깅 프레임워크 들을 추상화 해주는 기능을 제공한다.



<그림 : SLF4J 가 다른 로깅 프레임웍을 추상화 하는 개념도 >

출처 source


그러나 SLF4J 이전에 개발된 레거시 시스템들의 경우에는 이러한 추상화 계층이 없어서 로그 프레임웍을 변경하고 있기 때문에 로깅 프레임웍을 교체하기가 어렵다. 이런 상황을 해결하기 위해서 SLF4J는 기존 로그 프레임웍에 대한 브릿지를 제공한다. 예를 들어 log4J로 개발된 로깅을 브릿지를 이용해서 SLF4J를 사용하도록 전환할 수 있다. 이런 구조는 레거시 로깅 시스템을 사용해서 개발된 시스템에 대해서, 로그 프레임웍에 대한 코드를 변경하지 않고, 뒷단에 로그 프레임웍을 변경할 수 있게 해주기 때문에, 로깅 프레임웍에 대한 마이그레이션을 쉽게 해준다.



<그림 : SLF4J 브릿지를 이용해서, 기존 로그 시스템을 연동 하는 개념도 >


자바 로깅 프레임워크

자바 로그 프레임웍에는 여러가지 종류가 있지만 그중에서 대표적을 사용되는 로그 프레임웍은 log4j,logback,log4j2 세가지 이다.

Log4J

Log4J는 이 중에서 가장 오래된 로그프레임웍으로 로그 프레임웍에 대한 초반 개념을 설정했다고 볼 수 있다. 현재는 개발이 중지되고, Log4J2로 새로운 버전으로 변경되었다.

Logback

아마 현재 국내에서 가장 널리 많이 사용되고 있는 로그 프레임워크일것이다. Log4J 개발자가 개발한 로그 프레임워크로 주로 Log4J 성능 부분에 대한 개선 작업이 많이 이루어 졌다. SLF4J와 네이티브로 연동이 가능하다.

Log4J2

가장 근래에 나온 프레임워크로 Logback 보다 후에 나오고, 가장 빠른 성능을 제공한다. Logback과 SLF4J사이의 연동 문제를 해결하였으며 비동기 로깅 ( asynchronous logging ) 을 제공하여, 특히 멀티 쓰레드 환경에서 높은 성능을 제공한다.



(source : https://logging.apache.org/log4j/2.x/performance.html )


또한 근래의 로깅 시스템들은 로그를 파일로 기록하기 보다는 ELK(Elastic Search)나 Kafka 등 외부 시스템으로 로그를 전송하여 모으는 형태를 많이 취하기 때문에 이에 대한 연동을 Appender를 통해서 제공한다.


제공되는 Appender는 다음과 같다.

  • Console

  • File, RollingFile, MemoryMappedFile

  • Flume, Kafka, JDBC, JMS, Socket, ZeroMQ

  • SMTP (emails on errors, woo!)

  • … much more


만약에 새로운 시스템을 개발한다면, Logback 보다는 그 다음 세대인 격인 Lob4j2를 사용하는 것을 권장한다.

본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

  1. 2019.04.03 09:29  댓글주소  수정/삭제  댓글쓰기

    비밀댓글입니다

빠르게 훝어 보는 node.js - redis 사용하기 (ioredis 클라이언트 버전)


조대협 (http://bcho.tistory.com)


지난 포스팅에서 http://bcho.tistory.com/1098 node.js에서 redis 사용에 있어서 node-redis 클라이언트를 사용했는데, 조금 더 리서치를 해보니, node.js의 redis 클라이언트는 지난번에 포스팅한 node-redis 클라이언트와 ioredis라는 클라이언트가 가장 많이 사용된다. ioredis 클라이언트가 조금 더 최근에 나온 클라이언트인데, https://github.com/luin/ioredis


Bluebird promise 지원, 트렌젝션 지원등 훨씬 더 많은 기능을 제공하고, 사용법이 node-redis와 거의 유사하여 마이그레이션이 어렵지 않다.

아래 코드는 어제 작성 했던 코드를 ioredis 버전으로 변경한것인데, 코드를 보면 변경 내용이 거의 없음을 확인할 수 있다.


mongodb, redis, mysql 지원 모듈을 살펴보다가 느낀건데, 대부분의 모듈들이 Promise를 지원하고, 특히 bluebird를 지원한다는 것이다.

얼마전에 Async framework에 대해서 Async,bluebird, Q등을 고려했는데, 지금까지 인사이트로 봐서는 bluebird를 표준 프레임웍으로 해서 개발하는게 답이 아닐까 한다.


 

// redis example

var Redis = require('ioredis');

var redis = new Redis(6379,'127.0.0.1');

var JSON = require('JSON');

 

app.use(function(req,res,next){

      req.redis = redis;

      next();

});

app.post('/profile',function(req,res,next){

      req.accepts('application/json');

     

      var key = req.body.name;

      var value = JSON.stringify(req.body);

     

      req.redis.set(key,value,function(err,data){

           if(err){

                 console.log(err);

                 res.send("error "+err);

                 return;

           }

           req.redis.expire(key,10);

           res.json(value);

           //console.log(value);

      });

});

app.get('/profile/:name',function(req,res,next){

      var key = req.params.name;

     

      req.redis.get(key,function(err,data){

           if(err){

                 console.log(err);

                 res.send("error "+err);

                 return;

           }

 

           var value = JSON.parse(data);

           res.json(value);

      });

});

 

// catch 404 and forward to error handler

app.use(function(req, res, next) {

  var err = new Error('Not Found');

  err.status = 404;

  next(err);

});

 


본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

Promise를 이용한 node.js에서 콜백헬의 처리


조대협 (http://bcho.tistory.com)


앞의 글(http://bcho.tistory.com/1083) 에서 async 프레임웍을 이용한 콜백헬을 처리 하는 방법에 대해서 알아보았다.

async 프레임웍 이외에, 콜백헬을 해결할 수 있는 프레임웍으로 promise가 있다.

Promise는 원래 콜백헬을 해결하기 위한 프레임웍이 아니라, 프로그래밍 패턴중의 하나로 지연 응답을 통해서 동시성을 제어 하기 위한 목적으로 만들어졌다. 자바스크립트에서는 JqueryDeferred, CommonJS에 구현되어 있고, ECMAScript5 표준에 포함되서 크롬,파이어폭스,인터넷익스플로러 9 버전등에 포함되어 있다.

구현체가 많아서 설치해야 한다.

node.js는 크롬의 자바스크립트 엔진을 기반으로 하기 때문에, promise가 내장되어 있다.

 

프로미스의 개념

 

asyncfunction이라는 비동기 함수가 있다고 가정하자. 이 함수는 param1,param2를 인자로 받아서 비동기로 처리하는 함수이다. promise 패턴에서는 이 asyncfunction을 호출하면, promise라는 것을 리턴한다. promise란 미래 결과에 대한 약속이다. 그리고 promise의 결과가 성공인지 실패인지에 따라서 이를 핸들링하기 위한 로직을 정의해놓는다. asyncfunction이 처리를 끝내고 결과를 리턴하면 promise에 의해 정의된 로직에 따라 결과값을 처리한다.

약간 말이 복잡한데, 이를 풀어서 설명해보면 다음과 같다.

 

·         프로그램      : asyncfunction에게 “param1param2로 처리해줘라고 부탁한다.

·         asyncfunction : “알았어 처리해줄께, 대신 시간이 걸리니 바로 답은 줄 수 없고, 나중에 답을 줄게. 이게 그 약속(promise)라고 하고, 약속(promise) 객체를 리턴한다.

·         프로그램      : ‘언제 끝날지 모르는 작업이구나그러면 이렇게 해줘. 작업이 성공하면 결과 처리 로직을 실행하게 하고, 만약에 실패하면 에러 처리 로직을 처리하게 하자. 이 내용을 니가 준 약속(promise)에 추가로 적어 넣을께

·         asyncfunction : 실행이 성공적으로 종료되었어. 아 아까 준 약속에 성공시에 처리하는 로직이 정의되어 있군. “결과처리로직를 실행하자

 

이런 내용이 어떻게 코드로 구체화 되는지를 살펴보자

 

var promise = asyncfunction(param1,param2);

promise.then(function(result){

      //결과처리로직

},function(err){

      //에러처리로직

}

 

Figure 1 promise를 이용한 비동기 호출 처리 예제

 

var promise = asyncfunction(param1,param2);

첫번째 코드에서 asyncfunction은 앞서 언급한것과 같이 비동기 함수이다. asyncfunction이 호출되고 나서 결과 값이 아니라, 나중에 결과를 주겠다는 약속(promise) 객체를 리턴한다.

 

다음으로 비동기 함수의 처리가 끝났을때 성공과 실패의 경우 어떻게 처리를 할지를 비동기 함수가 리턴한 약속(promise)에 기술해놓는다.이를 위해서 then이라는 키워드를 사용하는데 다음과 같은 포맷을 사용한다.

 

promise.then(결과처리함수(결과값) ,에러처리 함수(err) )

Figure 2 promise.then의 문법

 

비동기 함수 실행이 끝나면, then에 정의된 첫번째 함수 결과처리함수를 실행하여 비동기 함수 실행 결과를 처리한다.

이때 결과처리함수는 비동기 함수가 처리한 내용에 대한 결과 값을 인자로 갖는다.

만약에 에러가 발생하였을 경우에는 then에 두번째 인자로 정의된 에러처리함수를 실행하여 에러를 처리한다. “에러처리함수는 에러의 내용을 err이라는 인자를 통해서 받는다.

 

앞의 예제에서 2~6줄은 then을 이용하여, 첫번째 인자로 결과처리로직을 가지는 함수를 정의하고, 두번째 인자로에러처리로직을 갖는 함수를 정의했다.

 

promise.then(function(result){

      //결과처리로직

},function(err){

      //에러처리로직

}

Figure 3 프로미스에 결과 및 에러 처리를 지정하는 방법

 

 첫번째 결과처리로직을 갖는 함수는 비동기 함수가 리턴해준 결과값인 “result”를 인자로 받고, 두번째 에러처리로직을 갖는 함수는 에러내용을 “err”라는 인자로 받는다.

 

그렇다면 약속(promise)를 리턴하는 비동기 함수는 어떤 형태로 정의되어야 할까?

promise를 지원하는 비동기 함수는 아래와 같은 형태와 같다. 리턴시에 new Promise를 이용하여 promise 객체를 만들어서 리턴하는데, 이때 두가지 인자를 받는다. resolved reject인데, 성공적으로 실행이 되었으면 이 resolved함수 를 호출하고 이때 인자로 결과값을 넣어서 넘긴다. 반대로 실패했을 경우에는 인자로 받은 rejected 함수를 호출하되 호출 인자로 에러 내용을 담고 있는 err을 넣어서 넘긴다.

 

function asyncfunction(param1,param2){

     

      return new Promise(resolved,rejected){

           if(성공하였는가?){

                 // 성공하였을 경우

                 resolved ("결과");

           }else{

                 rejected(Error(err));

           }

      }

}

Figure 4 프로미스 지원 비동기 함수 정의 방법


프로미스 예제

 

그러면 위의 개념에 따라 실제로 작동하는 코드를 작성하자

 

var Promise = require('promise');

 

var asyncfunction = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       resolved('hello'+param);

                 },2000);

      });

         

}

 

var promise = asyncfunction(' terry ');

promise.then(console.log,console.err); // 여기가 비동기 결과에 대한 콜백함

 

Figure 5 간단한 프로미스 함수 및 사용 예제

 

promise를 사용하기 위해서는 promise 모듈을 require 이용하여 불러들인다.

다음으로 asyncfunction을 정의하고 리턴값으로 Promise객체를 리턴한다. Promise 객체 안에서는 처리할 비지니스 로직이 정의되어 있다. 위의 예제에서는  setTimeout을 이용하여 2초를 기다리도록 하였고, 2초후에 콜백함수에서 resolved 함수를 호출하여 promise를 종료하도록 하였다.

 

다음은 이 promise를 리턴하는 비동기 함수를 실제로 호출하고, 이 비동기 함수에 대해서 성공 및 실패에 대한 처리 함수를 then으로 정의한 부분이다.

var promise = asyncfunction(' terry ');

promise.then(console.log,console.err); // 여기가 비동기 결과에 대한 콜백함


then을 이용하여, 성공시 console.log 함수를 호출하도록 하였고, 실패시에는 console.err를 통해서 에러 메시지를 출력하도록 하였다.

 

프로미스 체이닝 (promise chainning)

여러개의 비동기 함수를 순차적으로 실행하는 방법에 대해서 알아보자.

async 프레임웍의 waterfall과 같은 흐름 제어이다.

다음은 asyncfunction1,2,3를 순차적으로 실행하고, 앞 비동기 함수의 결과를 뒤에 따라오는 비동기 함수의 입력값으로 받아서 처리하는 예제이다.


 

var Promise = require('promise');

 

var asyncfunction1 = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       fullfilled('result 1:'+param);

                 },1000);

      });

}

var asyncfunction2 = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       fullfilled('result 2:'+param);

                 },1000);

      });

}

var asyncfunction3 = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       fullfilled('result 3:'+param);

                 },1000);

      });

}

 

var promise = asyncfunction1(' terry ');

promise

.then(asyncfunction2)

.then(asyncfunction3)

.then(console.log);

 

Figure 6 프로미스 태스크 체이닝 예제

 

promise를 리턴하는 3개의 비동기 함수를 정의하였고, 첫번째 함수로 promise를 만든다음. 실행을 하였다. 다음 then을 이용하여, 다음번에 실행해야하는 비동기 함수 asyncfunction2, asyncfunction3를 순차적으로 정의하였고, 마지막의 최종 결과를 출력하기 위해서 최종 then console.log를 지정하여, 결과값을 출력하도록 하였다.

 

result 3:result 2:result 1: terry

 

하나의 예제를 더 살펴보자

다음 예제는 파일을 읽어서 읽은 내용을 다른 파일에 쓰는 내용이다.

 

 

var Promise = require('promise');

 

var fs = require('fs');

var src = '/tmp/myfile.txt';

var des = '/tmp/myfile_promise2.txt';

 

var fread = Promise.denodeify(fs.readFile);

var fwrite = Promise.denodeify(fs.writeFile);

 

fread(src,'utf-8')

.then(function(text){

           console.log('Read done');

           console.log(text);

           return fwrite(des,text); // 체이닝을 하려면 return 해줘야 .

      })

.then(function(){           

           console.log('Write done');

      })

.catch(function(reason){               

           console.log('Read or Write file error');

           console.log(reason);

});

 

console.log('Promise example');

 

Figure 7 프로미스를 이용해서 파일을 읽어서 다른 파일에 쓰는 예제

 

이 코드에서 먼저 주의 깊게 봐야 하는 부분은 denodeify 부분이다.


var fread = Promise.denodeify(fs.readFile);

var fwrite = Promise.denodeify(fs.writeFile);

 

node.js의 비동기 함수들은 프로미스패턴을 지원하지 않는 경우가 많다. 그래서 프로미스 패턴을 지원하지 않는 일반 함수들을 프로미스를 지원할 수 있는 형태로 변경을 해야 하는데, 이 변경을 해주는 함수가 Promise.denodeify이다.

프로미스화가 끝났으면 이 함수를 프로미스를 사용해서 호출할 수 있다.

 

fread(src,'utf-8')

.then(function(text){

           console.log('Read done');

           console.log(text);

           return fwrite(des,text); // 체이닝을 하려면 return 해줘야 .

      })

 


fread를 수행한 후에, then에서 return시 다음 비동기 함수인 fwrite를 수행한다. 이렇게 하면 task들을 체이닝할 수 있다.


프로미스 에러 핸들링

프로미스에서 에러를 핸들링하는 방법에 대해서 알아보자. 앞의 예제에서 then 중간에 catch라는 구문을 사용했는데, catch가 에러핸들러이다.

아래 코드를 보자 아래 코드는 비동기 함수에서 인위적으로 에러를 발생시켜서 처리 하는 코드이다.


 

var Promise = require('promise');

 

var asyncfunction = function(param){

      return new Promise(function(fullfilled,rejected){

           setTimeout(

                 function(){

                       rejected(Error('this is err '+param));

                 },2000);

      });

         

}

 

asyncfunction(' terry ')

.then(console.log,console.error);

 

asyncfunction('cath')

.then(console.log)

.catch(console.error);

 

Figure 8 프로미스에서 에러처리를 하는 예제

 

asyncfunction내의 프로미스에서 setTimeout으로 2초가 지나면, rejected를 이용하여 에러를 리턴하였다.

첫번째 asyncfunction호출에서는 then에 두개의 인자를 넘겼는데, 두번째 console.error가 에러 핸들러이다. 그래서 에러를 console.error로 출력하게 된다.

두번째 asyncfunction 호출에서는 다른 문법의 에러 핸들링을 사용했는데, then에 두개의 인자를 넘기는 대신, catch를 이용해서 에러 핸들러를 정의하였다.

이 예제를 실행하면 다음과 같은 결과를 얻게 된다.

 

[Error: this is err  terry ]

[Error: this is err cath]

Figure 9 프로미스에서 에러처리를 하는 예제 실행 결과

 

만약에 여러개의 태스크가 연결된 비동기 함수 체인을 호출할때 에러 처리는 어떻게 될까? 아래 코드를 보자.

asyncfunction1,2,3,4,5 가 정의되어 있고, 2 4에서 에러를 발생 시키도록 하였다.

그리고 3번과 5번 뒤에 catch를 넣어서 에러 처리를 하도록 하였는데, 그러면 에러 처리 흐름은 어떻게 될까?


var Promise = require('promise');

 

var asyncfunction1 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func1');

                       resolved('func 1 success:'+param+'\n');

                 },500);

      });

}

var asyncfunction2 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func2');

                       rejected(new Error('func 2 error:'+param+'\n'));

                 },500);

      });

}

var asyncfunction3 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func3');

                       resolved('func 3 success:'+param+'\n');

                 },500);

      });

}

var asyncfunction4 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func4');

                       rejected(Error('func 4 error:'+param+'\n'));

                 },500);

      });

}

var asyncfunction5 = function(param){

      return new Promise(function(resolved,rejected){

           setTimeout(

                 function(){

                       console.log('func5');

                       resolved('func 5 success:'+param+'\n');

                 },500);

      });

}

 

var promise = asyncfunction1(' terry ');

promise

.then(asyncfunction2)

.then(asyncfunction3)

.catch(console.error) // errorhandler1

.then(asyncfunction4)

.then(asyncfunction5)

.catch(console.error)  // errorhandler2

.then(console.log);

 

 

Figure 10 프로미스 태스크 체인에서 에러 처리를 하는 예제

 

3,5번 뒤에 붙은 catch는 어느 비동기 함수들의 에러를 처리할까? 다음 그림을 보자


Figure 11 프로미스 태스크 체인에서 에러 처리를 하는 예제의 에러 처리 흐름

 

1,2,3 번 뒤에 catch를 정의 했기 때문에, 1,2,3번을 수행하던중 에러가 발생하면 수행을 멈추고 첫번째 에러핸들러인 //errorhandler1으로 가서 에러를 처리한다. 여기서 중요한 점은 에러처리 후에, 다시 원래 제어 흐름으로 복귀한다는 것이다. 흐름을 끝내지 않고, 다음 에러핸들러에 의해서 통제 되는 4,5번을 수행한다. 4,5번의 에러는 4,5번 호출 뒤에 붙어 있는 catch //errorhandler2에 의해서 처리 된다. 마찬가지로 //errorhandler2에 의해서 실행이 된후에 맨 마지막 비동기 함수인 console.log를 실행하게 된다.

 

앞에서 2,4번에 에러를 냈으니 실제 흐름이 어떻게 되는지 확인해보자



Figure 12 프로미스 태스크 체인에서 에러 처리를 하는 예제의 실제 수행 흐름


asyncfunction 1,2가 실행되고 에러를 만나서 첫번째 catch에 의해서 에러 처리가 되고, 에러 처리 후 4번이 실행된후 에러를 만나서 두번째 catch가 실행이 되고 마지막에 정의된 console.log가 실행이 된다.

이 흐름을 그림으로 표현해보면 다음과 같다.

 

실행 결과는 다음과 같다.

 

func1

func2

[Error: func 2 error:func 1 success: terry

 

]

func4

[Error: func 4 error:undefined

]

undefined

 

Figure 13 프로미스 태스크 체인에서 에러 처리를 하는 예제를 실행한 결과 


프로미스 지원 프레임웍

지금까지 promise 모듈을 이용하여 promise 패턴을 이용한 비동기 패턴 처리를 알아보았다. 앞에서 살펴보았듯이 쉽게 콜백헬을 해결할 수 있다. async waterfall 흐름 제어와 동일한 흐름 제어 부분만 살펴보았지만, promise 역시, asyncseries, parallel등과 같은 다양한 흐름 제어 알고리즘을 지원한다.

 이 글에서는 promise 모듈을 사용하였지만, 이 프로미스 패턴을 지원하는 모듈은 이외에도 Q (https://github.com/kriskowal/q) , bluebird (http://bluebirdjs.com/docs/getting-started.html) 등 다양한 프레임웍이 있다.

근래에는 성능이나 기능 확장성이 좋은 bluebird가 많이 사용되고 있으니, 실제 운영 코드를 작성하기 위해서는 다른 프로미스 프레임웍도 검토하기 바란다.

 

참고

https://davidwalsh.name/promises

https://github.com/stackp/promisejs 예제가 좋음

http://programmingsummaries.tistory.com/325 정리가 잘되어 있음 추천.

본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

  1. zealot71@naver.com 2016.07.26 05:08  댓글주소  수정/삭제  댓글쓰기

    프로미스 체이닝에서 fullfilled('result 1:'+param); 부분을 fullfilled(param1, param2); 로 2가지 값을 전달할 수는 없나요?
    funtion(param1, param2) {} 로 받으려고 테스트해보니 첫 번째만 인자로 받아지는데...

    비동기 함수1에서 리턴값a, 비동기 함수2에서 비동기 함수1 값인 a+ 비동기 함수2 리턴값b를 비동기 함수3에 전달하려고 할때 어떻게 해야 하는지 궁금합니다.

  2. gun 2018.05.21 14:57  댓글주소  수정/삭제  댓글쓰기

    promise 이해가 잘안되서, 외국, 한국 사이트 포함해서 정말 많은 사이트를 보고 예제도 해봤는데 안되다가, 올려주신글 보고나서야 제코드에 적용 성공하였습니다.ㅠ 정말 너무나 감사합니다 ^^ 아직 초보 단계인데 뭔가 개념적으로 막힐때마다 찾게된답니다^^ 항상 감사드려요

빠르게 훝어 보는 node.js - async 프레임웍을 이용한 콜백헬의 해결


조대협 (http://bcho.tistory.com)


콜백헬의 정의

 

node.js는 자바스크립트의 콜백 패턴을 사용한다. 그래서 함수들을 순차적으로 실행하고자 할때 콜백 함수들의 중첩이 생겨서 코드가 복잡해지는 문제가 생긴다. 코드가 복잡해지고, 코드의 가독성이 떨어져서 유지 보수가 매우 힘들어진다.

 

파일을 읽어서 쓰는 코드를 보자

 

var fs = require('fs');

var src = '/tmp/myfile.txt';

var des = '/tmp/myfile_async.txt';

 

fs.readFile(src,'utf-8',function(err,data){

     

      console.log(data);

      if(err){

           console.log("Read file error");

      }else{

           console.log("Read file is done");

           fs.writeFile(des,data,function(err){

                 if(err){

                       console.log("Write file error");

                       return;

                 }

                 console.log("Write file is done");

           });

      }

});

 

Figure 1 파일을 읽어서 쓰는 코드에서 콜백이 중첩된 예제

 

파일을 읽은후에, 파일을 쓰려면, 파일을 읽는 함수 readFile에서 파일을 다 읽은 후에 호출되는 콜백 함수에서 writeFile 함수를 호출해야 한다.

만약에 위의 예제처럼 두개의 비동기 함수가 아니라, 여러개의 비동기 함수를 순차적으로 실행해야 한다면?

아래코드를 보면 6개의 비동기 함수를 순차적으로 호출하기 위한 코드인데, 콜백 함수가 6번 중첩이 되었음을 볼 수 있다.

알고리즘을 제외한 코드인데, 알고리즘이 들어가 있다면 코드는 훨씬 복잡해지게 된다.

 

 

asyncfunction(params,function(){

      asyncfunction(params,function(){

           asyncfunction(params,function(){

                 asyncfunction(params,function(){

                       asyncfunction(params,function(){

                             asyncfunction(params,function(){

                             });

                       });

                 });

           });

      });

});

 

Figure 2 callback hell의 개념

 

이러한 복잡성을 해결해주기 위해서, 자바스크립트에서는 몇몇 프레임웍을 제공하는데 대표적으로 사용되는 프레임웍으로는 async (https://github.com/caolan/async) promise.js (https://www.promisejs.org/) 가 있다.

Async

asyncnode.js의 콜백헬 문제를 풀기 위해서 개발되었지만, 현재는 브라우져에서도 사용이 가능하며 자바스크립 기반의 애플리케이션의 콜백헬 문제를 푸는데도 사용이 가능하다. 콜백헬 뿐 아니라, 20여가지의 추가 함수를 지원하고 있고, parallel과 같은 동시 수행이 가능한 코드의 동시성 제어로도 다양하게 사용이 가능하다.

여기서는 async에서 자주 사용되는 동시성 제어 흐름에 대해서 알아보도록 한다.

 

waterfall

waterfall은 흐름제어에 있어서 여러개의 비동기 함수를 순차적으로 실행하되, 앞의 비동기 함수의 결과 값을 뒤의 비동기 함수에 인자로 전달하는 흐름이다.

 



Figure 3 waterfall 흐름 제어의 개념


이 그림은 비동기 함수 asyncfunctionaA, asyncfunctionB,asyncfunctionC 를 순차적으로 실행하고, 각 단계에서 다온 리턴값을 다음 단계로 넘기는 waterfall 흐름의 개념을 표현하고 있다. 각각의 단계에서 처리되는 함수를 async에서는 task라고 정의한다.

task가 모두 수행이 끝나면, 맨 마지막에 정의된 callback 함수가 수행된다.

만약task 수행도중에 에러가 발생하면, task 수행을 멈추고 callback 함수를 바로 호출하는데, 이때 err라는 인자에 에러 내용을 채워서 넘긴다. 맨 마지막 callback함수는 errnull이면 정상적으로 모든 task들이 성공적으로 호출된것으로 처리하고, 만약에 null이 아닌경우 task 수행도중에 에러가 난것으로 파악하여 에러 처리를 한다.

 

waterfall의 문법을 살펴보자 


waterfall(tasks,[callback])

Figure 4 waterfall 제어 흐름 문법


waterfall에는 두가지 인자를 넘기도록 되어 있다.

·         첫번째 인자 tasks는 배열로, 순차적으로 실행될 함수들을 배열로 정의한다.

·         두번째 인자는 callbac(err,[result])으로, 모든 함수가 순차적으로 끝난후에 맨 마지막에 수행되는 함수이다. 또한 tasks를 실행하다가 에러가 발생하면 이 최종 callback을 호출한다.
task
를 실행하다가 에러가 발생하면 실행을 멈추고 이 최종callback으로 첫번째 인자인 err에 에러 내용을 넣어서 전달한다., 만약에 에러가 발생하지 않았을 경우에는 모든 task 완료한 후에 err‘null’을 전달한다. 두번째 인자는 생략이 가능한데, 마지막 tasks에서 넘어온 결과에 대한 값을 저장하고 있는 변수 이다.
선택적으로 result 인자를 정의할 수 있는데, 이 경우 waterfall 에 정의된 task의 맨마지막 task (최종callback 이전에 바로 실행된 task)의 리턴값을 넘겨 받는다.

 

이해를 돕기 위해서 코드를 보자. 아래 코드는 위의 그림에 표현된 asyncfunctionA,B,C를 순차적으로 호출하는 흐름을 waterfall로 표현한 슈도 코드이다. (개념을 돕기위한 코드로 실제로 실행이 되지는 않는다).

var async = require('async');

 

async.waterfall([

              function(callback){

                 asyncfunctionA(param,callback);

              },

              function(resultA,callback){

                 asyncfunctionB(resultA,callback);

              },

              function(resultB,callback){

                 asyncfunctionC(resultB,callback);

              }

             ],

             function(err,resultC){

                       if(err) errorHandler(err);
                             // handle resultC

                  }

);

 

Figure 5 waterfall 제어 흐름의 사용 방법 (psedo code)

 

waterfall함수에 첫번째 인자는 배열 형태로 function(callback), function(resultA,callback),function(resultB,callback)을 기술하였다. 각 함수에서는 우리가 호출할 비동기 함수 asyncfunctionA,B,C를 각각 호출하였다.

배열에 인자로 들어가 있는 각 함수는 맨마지막 인자로 callback을 전달 받는데, callback은 다음 호출한 함수를 지칭한다.

맨 처음 호출한 function(callback)에서 이 callbackfunction(resultA,callback)을 지칭하고, 여기에 있는 callback은 다음  function(resultB,callback)을 지칭한다.

 

function(callback){

                 asyncfunctionA(param,callback);

              }

 

에서 asyncfunctionA에서 callback을 인자로 넘겼는데, 원래 asyncfunctionA의 함수 정의가 다음과 같다.

asyncfunctionA = function(param,function(resultA){

                 }

 

asyncfunctioA는 비동기 함수로 실행이 끝나면 callback함수를 호출하게 되어 있는데, callback함수의 인자는 resultA를 받게되는 있는 형태이다.

그런 이유로waterfall 로 넘겨지는 함수 배열중 두번째 함수의 형이 function(resultA,callback) 형태를 띄게 되는 것이다.

 

실제로 작동하는 코드를 구현해 보자. 아래 코드는 앞서 async없이 작성했던 파일을 읽어서 다른 파일에 쓰는 코드를 asyncwaterfall을 이용하여 구현한 예이다.

 

아래 예제를 실행하기 위해서는 package.json“async” 의존성을 추가하거나, 또는 실행 환경에서

%npm install async

를 실행해서 async 모듈을 설치해야 한다.

 

var async = require('async');

 

var fs = require('fs');

var src = '/tmp/myfile.txt';

var des = '/tmp2/myfile_async.txt';

 

async.waterfall([

              function(callback){

                 fs.readFile(src,callback);

              },

              function(data,callback){

                 fs.writeFile(des,data,callback);

              }

             ],

             function(err){

                       if(err) console.log(err);

                  }

);

 

 

Figure 6 waterfall 흐름제어를 이용하여 파일을 읽어서 다른 파일에 쓰는 예제

 

waterfall에서 처음 호출하는 함수에서는 fs.readFile을 이용해서 파일을 읽었다.

function(callback){

                 fs.readFile(src,callback);

              },

다음으로 fs.readFile에 대한 콜백 함수를 waterfall에서 넘겨주는 callback의 형태는 fs.readFile의 포맷이 fs.readFile( filename, function(err,data)) 형태이기 때문에 앞의 err 인자이외에 ‘data’ 인자만 필요하다.

그래서 다음에 오는 함수가 다음과 같이 ‘data 인자를 갖는 function(data,callback)이다.

function(data,callback){

                 fs.writeFile(des,data,callback);

              }

 

인자로 받은 datafs.writeFile에 넘겨서 파일을 쓰게 한다.

마지막 부분은 최종 콜백 함수로, 모든 함수가 실행이 끝나면 실행이 되고 또는 waterfall에 정의된 task 실행중에 에러가 나도 실행이 되는 부분이다. 첫번째 인자로 항상 err를 받는다. 에러가 없을 경우에는 이 값은 null 이된다.

function(err){

                       if(err) console.log(err);

                  }

 


본 예제에서는 파일 쓰기가 완료된 후에, 별도의 액션은 취하지 않아서 별다른 코드가 없지만, 에러가 발생했을때 처리하기 위해서 if(err)를 통해서 에러가 있으면 콘솔로 출력하도록 하였다.

 

series

series 흐름은, waterfall가 유사하게 정의된 task를 순차적으로 실행한다.

차이는 waterfall은 각 task에서 나온 결과를 다음 task의 입력으로 넘겼다면,

series는 각 task의 결과를 취합하여, 최종 callback에 배열 형태로 넘겨준다.

 

개념도를 보면 다음과 같다. series 흐름에 전달된 asyncfunctionA,B,C를 순차적으로 실행하고, 그 결과를 취합해서 맨 마지막 callbackresults라는 배열로 넘긴다.

waterfall과 마찬가지로 task수행중에 에러가 나면 실행을 중단하고, 최종 callback로 흐름을 옮기고, err에 에러에 대한 디테일한 내용을 기술해놓는다.



Figure 7 series 흐름 제어의 개념

이때 results 배열에는 task들의 결과값이 실행 순서대로 들어간다. 위의 그림에서 최종 callback으로 전달되는 results  배열에 asyncfunctionA에 대한 결과값 resultA, 두번째 인자는 asyncfunctionB의 결과값 B, 그리고 마지막 세번째 인자는 asyncfunctionC의 결과값 C가 들어간다.

 

series 흐름제어의 문법을 보자


series(tasks, [callback] )

·         tasks : 동시에 수행할 함수들을 배열로 정의

·         callback : 최종 callback으로, callback(err,results) 형태로 정의된다. 에러가 발생하면 err 변수에 에러에 대한 내용이 넘어오고, 정상적인 수행 완료인 경우에는 errnull로 전달되고, task의 실행 결과가 results 변수에 배열로 정의되서 리턴된다.

 

아래 예제는 위의 series 흐름을 async 프레임웍을 통해서 구현한 예제이다.


var async = require('async');

 

async.series([

              function(callback){

                 callback(null,'resultA');

              },

              function(callback){

                 callback(null,'resultB');

              },

              function(callback){

                 callback(null,'resultC');

              }

             ],

             function(err,results){

                       if(err) console.log(err);

                       console.log(results)

                             // handle resultC

                  }

);

 

Figure 8 sereis 흐름 제어를 사용한 예

실행하면 다음과 같은 결과가 나온다.

 




 

series 흐름은 서로 데이타에 대한 의존성은 없지만 순차적으로 실행이 되어야 하는 경우등에 활용이 될 수 있다.

예를 들어 사용자 정보가 MySQLMongoDB에 분산 저장되어 있고, MySQL에는 사용자ID와 암호화된 비밀번호를 저장하고, 기타 다른 정보를  MongoDB에 저장한다고 가정할때, 새로운 사용자 생성은 MySQL에 사용자ID등의 정보를 저장한 후에, MongoDB에 순차적으로 저장해야 한다면, series 흐름이 유용하게 사용될 수 있다.

 

parallel

async 모듈에서 마지막으로 살펴볼 흐름제어는 parallel이다

이름에서도 볼 수 있듯이 동시에 여러개의 task를 실행하는 방법으로, 마치 멀티 쓰레드와 같은 효과를 낼 수 있어서, 실행 시간을 단축시킬 수 있다.

 

아래 개념 그림을 보자. 3개의 task를 병렬로 동시에 수행하는 개념이다.

asyncfunctionA,B,C를 동시에 수행하고 모든 작업이 끝나면 최종 callback을 수행한다.

수행결과는 최종 callback에 배열 형태로 전달된다.



Figure 9 parallel 흐름 제어의 개념

 

에러 처리는 parallel로 수행중이던 task중에 에러가 발생하면, 바로 최종 callback에 에러를 넘긴다. 단 이때 에러가 발생하지 않은 task들은 수행을 멈추지 않고 끝까지 수행되다.

 

parallel흐름 제어를 사용할때 주의해야 할점은 멀티 쓰레드처럼 작업을 수행해주는 것이지 실제 멀티 쓰레드가 아니다. IO작업등이 있는 task들의 경우 IO 요청을 보내놓고, 응답이 올때 까지 다른 task를 실행해서 병렬로 실행하는 것과 같은 효과를 주는 것이다. 만약에 task자체가 IO작업등이 없고 계속해서 CPU를 사용한다면, 그 작업이 끝난후에 다음 task로 넘어가기 때문에, 병렬 처리가 일어나지 않는다. (이런 경우에는 series를 쓰는게 나음)

 

parallel이 효과적으로 사용될 수 있는 곳은 IO쪽인데, 원격으로 여러개의 REST API를 동시 호출하거나, 또는 동시에 여러개의 쿼리를 조회하는 것들에 효과적으로 사용할 수 있다.

 

parallel 흐름 제어의 문법은 다음과 같다. series 흐름 제어 문법과 거의 동일하다고 보면 된다.

parallel(tasks,[callback])

Figure 10 parallel 흐름 제어 문법

·         tasks : 동시에 수행할 함수들을 배열로 정의

·         callback : 최종 callback으로, callback(err,results) 형태로 정의된다. 에러가 발생하면 err 변수에 에러에 대한 내용이 넘어오고, 정상적인 수행 완료인 경우에는 errnull로 전달되고, task의 실행 결과가 results 변수에 배열로 정의되서 리턴된다.

 

예제 코드를 살펴보자.

var async = require('async');

 

async.parallel([

              function(callback){

                 callback(null,'resultA');

              },

              function(callback){

                 callback(null,'resultB');

              },

              function(callback){

                 callback(null,'resultC');

              }

             ],

             function(err,results){

                       if(err) console.log(err);

                       console.log(results)

                             // handle resultC

                  }

);

 

Figure 11 parallel 흐름 제어 예제

 

parallel 흐름 제어의 문법은 series와 다르지 않다. 단지 내부 수행에 있어서 순차적으로 수행을 하는지 아니면 병렬로 동시에 수행을 하는지에 따른 차이만 있다. 위의 코드는 resultA, resultB, resultC를 내는 3개의 task를 동시에 수행하고, 수행이 끝나면, 최종 콜백 함수인 function(err,results)에서 results 배열에 결과를 출력하는 코드이다. 코드를 실행하면 다음과 같이 callback(null, 결과값)으로 넘긴 resultA,resultB,resultC 문자열이 출력 되는 것을 확인할 수 있다.

 



Figure 12 parallel 흐름 제어 예제 코드 실행 결과

 

실제 코드를 보면서 이해를 돕자.다음은 소스 코드 저장소인githubbwcho75라는 사용자 정보를 조회하는 REST API, bwcho75사용자의 follower를 조회하는 REST API 두개를 parallel을 이용해서 동시에 호출하여 결과를 화면에 출력하는 코드이다.

간단하게 REST 호출을 도와주는 모듈로 unirest를 사용하였다. http://unirest.io/

모듈을 사용하기 위해서 npm을 이용하여 코드를 작성하기 전에 unirest 모듈을 설치한다.

%npm install unirest

 

var async = require('async');

var unirest = require('unirest');

 

var start = new Date().getTime();

async.parallel([

                function(callback){

                      unirest.get('https://api.github.com/users/bwcho75')

                      .header('Accept', 'application/json')

                      .header('User-Agent','mynodeapplication')

                      .end(function(response){

                           callback(null,response.body);

                      })

                     

                },

                function(callback){

                      unirest.get('https://api.github.com/users/bwcho75/followers')

                      .header('Accept', 'application/json')

                      .header('User-Agent','mynodeapplication')

                      .end(function(response){

                           callback(null,response.body);

                      })                  

                }

                ],

                function(err,results){

                             console.log('Result 1 -------');

                             console.log(results[0]);

                             console.log('Result 2 -------');

                             console.log(results[1]);

                             console.log('elapsed time : '+(new Date().getTime() - start));

     

});

 

 

Figure 13 asyncparallel 흐름 제어를 이용하여 두개의 github REST API를 호출하는 예제

 

코드를 실행하면 다음과 같이 Result1, Result2에 대한 결과를 얻은것을 볼 수 있다.

Result 1 -------

{ login: 'bwcho75',

  id: 3168358,

  avatar_url: 'https://avatars.githubusercontent.com/u/3168358?v=3',

  gravatar_id: '',

  url: 'https://api.github.com/users/bwcho75',

  html_url: 'https://github.com/bwcho75',

  : 중략

  followers: 7,

  following: 0,

  created_at: '2013-01-02T10:33:11Z',

  updated_at: '2016-02-02T02:38:53Z' }

Result 2 -------

[ { login: 'yshu0307',

    id: 1740343,

  : 중략

    type: 'User',

    site_admin: false },

  { login: 'kmoonki',

    id: 1725366,

  : 중략

    type: 'User',

    site_admin: false },

  { login: 'z-n',

    id: 5715797,

  : 중략

    type: 'User',

    site_admin: false },

  : 중략

]

elapsed time : 1915

Figure 14 parallel 흐름 제어를 이용하여 두개의 github REST API를 실행한 결과


병렬로 API가 호출되었는지를 확인 하기 위해서, 맨 마지막 부분에 API 호출에 소요된 시간을 출력하였는데, 예제 코드에서 async.parallelasync.series로 바꿔서 호출해보면 순차 호출로 바뀌게 되는데, 수행시간이 본인이 테스트한 경우 200ms정도 더 나왔다. 즉 병렬 호출을 통해서 200ms 정도의 수행 시간을 절약한것이다.

 

지금까지 async 모듈을 이용하여 콜백헬을 해결하고, 제어 흐름을 컨트롤할 수 있는 방법에 대해서 알아보았다. async 모듈에서 위의 3가지 흐름제어가 많이 사용되기는 하지만 이외에도 많은 흐름 제어 방식이 있기 때문에, https://github.com/caolan/async 를 참고하기 바란다.

 


 

 

본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요

  1. 나무나무 2016.09.06 13:27  댓글주소  수정/삭제  댓글쓰기

    좋은 포스팅 정말 감사합니다~!!!!

  2. 트라리 2017.02.23 10:57 신고  댓글주소  수정/삭제  댓글쓰기

    와 진짜 경이롭다;

Vert.x Worker Concept

클라우드 컴퓨팅 & NoSQL/Vert.x & Node.js | 2014. 1. 28. 23:01 | Posted by 조대협

Worker 대한 개념 설명

[개인 공부 노트이기 때문에 설명이 매우 어렵습니다. 나중에 이해하면 다시 개념 정리해서 올리도록 하겠습니다.]

관련 코드 : https://github.com/bwcho75/vertx_study/tree/master/worker_sample




앞단의 Network 핸들러 (TCP,HTTP)등에서 request 읽은 후에, Event Bus 통해서 Backend Worker 보낸다. 개념은 JMS MQ등을 이용해서 뒷단에서 Message Consumer 들이 처리하는 Q 기반의 Async 기반의 개념과 매우 유사하다.

그럼 Vert.x에서 차이점은 Worker 작업을 처리한 후에, 작업을 끝내면 작업 완료 메시지가 Message Producer ( Network Handler)에게 Call back 형태로 전달 된다.

Async – Call back Pattern. 일반적인 메시지 기반의 시스템이 Async-Fire & Forget 패턴을 사용하는 것에 비하면 상당히 메리트가 많다.

Network Handler입장에서는 Connection 물고 있기는 하지만, 작업은 하지 않고 뒷단의 Worker에서 처리하기 때문에, 많은 request 받아드릴 있고, 아주 많은 작업이 온다하더라도 뒷단의 Queue 통해서 비동기 처리되기 때문에, Timeout 제약이 없는 , 많은 양의 request 처리할 있다.

Call back패턴이기 때문에, Async 메시지를 보낼 , Callback function 바인딩 해야 한다.

EventBus.send('call_bus',key,reply_handler)

// reply_handler call back이다.

만약에 http 경우 reply handler에서 해당 connection reply 메시지를 보내고 싶을때는 http handler request 객체를 reply_hanlder pass해야 하는데, reply_handler 인터페이스 규격에는 message parameter 되어 있지, request response 객체와 같은 다른 parameter 추가적으로 넘길 수가 없다. 이를 해결 하는 방법은 inner function 사용하는 방법이 있다.

def url_handler(req):

  # read parameter from URI

  key = req.params['key']

  def reply_handler(message):

          reply_handler_logic(req,message) 

  print 'Im url handler. I just sent message to event bus '

EventBus.send('call_bus',key,reply_handler)

 

위의 코드를 보면, reply_handler 함수 내에서 reply_handler_logic이라는 함수를 호출할 때, “req” 객체를 넘기는 것을 볼 수 있다. reply_handlerurl_handler안에 있는 inner function이기 때문에, url_handler 범위 안의 변수를 모두 사용할 수 있는 것이고, 결과적으로 request 객체를 넘길 수 있는 것이 되낟.

Vertx에서는 앞단의 Message Handler 경우 하나의 Thread 독립된 Class Loader에서 실행된다. 이를 instance라고 하고, 하나의 JVM에서는 여러 개의 instance 수행할 있으나, Thread 수를 CPU Core 수보다 많이 경우 Thread Context Switchin 대한 부하가 생기기 때문에 일반적으로 core 보다 작게 설정한다. Network Handler Verticle 반드시 같은 쓰레드에서 수행된다.

그러나 Worker 경우에는 특정 Thread에서 같은 Veticle 생성되는 것이 아니라 WAS 같은 Thread Pool 형식을 사용한다. 하나의 Worker Verticle 여러 개의 Thread에서 동시에 수행될 있다.  

http://purplefox.github.io/vert.x/manual.html#worker-verticles 보면, worker 동시에 하나의 thread에서만 수행된다고 나와 있는데, “Worker verticles are never executed concurrently by more than one thread. Worker verticles are also not allowed to use TCP or HTTP clients or servers. Worker verticles normally communicate with other verticles using the vert.x event bus, e.g. receiving work to process.

실제 테스트해보면 instance 수만큼 동시 수행되는 듯하다.

앞단의 NetworkHandler에서 Worker로의 메시지는 Eventbus(일종의큐) 통해서 전달되낟. EventBus 내부적으로 DataGrid Hazle Cast 사용한다.

vertx.deploy_verticle('http_server.py') 통해서 기동하고 worker 경우
vertx.deploy_worker_verticle('worker.py')
이용해서 기동한다.

만약에 worker verticle 수를 조정하고 싶으면

vertx.deploy_worker_verticle('worker.py','{"dummy":"dummy"}',10)

같이 한다.(Vertx 내부적으로 Worker thread pool 설정하는 코드는 https://github.com/eclipse/vert.x/blob/master/vertx-core/src/main/java/org/vertx/java/core/impl/VertxExecutorFactory.java)

 

본인은 구글 클라우드의 직원이며, 이 블로그에 있는 모든 글은 회사와 관계 없는 개인의 의견임을 알립니다.

댓글을 달아 주세요