Node.js 비동기 처리 라이브러리: flowpipe

https://github.com/proin/flowpipe

Async 라이브러리를 사용하다보면 루프 관리나 병렬 작업을 할 때 흐름을 제어하기가 어렵고, async 콜백 내부에 async를 여러번 사용해야하는데 기본적으로 함수가 배열 형태로 들어가기 때문에 async 한번당 기본으로 인덴트가 2번이 추가된다. 콜백 지옥을 피해가기위해 async를 사용하는데 async 콜백에서 async를 사용해야되는 경우에는 더 위험한 지옥에 빠지게되는 것 같다. 이러한 문제를 어떻게 하면 해결 할 수 있을까 생각하다가, pipe 형태로 작업 흐름을 구현하여 함수에 따라 작업을 제어하면 어떨까 해서 flowpipe 라이브러리를 만들어보았다.


Async의 문제점

  • parallel, series, during 등의 함수는 독립적으로 실행되어 서로 연계 할 수 없다.
  • 각 모듈을 연계하기 위해서는 during 내부에서 series를 쓰는 등 async 콜백 안에서 async를 재사용 해야한다.
  • 기본적으로 async 함수에서 작업과 관련된 함수를 배열 형태로 받기 때문에 인덴트가 2번 들어간다.

Flowpipe Quick Start

npm을 통한 설치 

npm install flowpipe

bower를 통한 설치 

bower install flowpipe

사용 예제

스크린샷 2016-05-07 오후 11.21.01

위와 같은 구조로 동작하는 코드를 flowpipe로 구현하면 아래와 같다.

var flowpipe = require('flowpipe');
flowpipe
    .init(function (next) {
        var page = 1;
        next(null, page);
    })
    .pipe('getHtml', function (next, page) {
        var url = 'http://site.com/list?page=' + page;
        console.log('url', url);
        getHtml(url, function (html) {
            next(null, page, html);
        });
    })
    .pipe('parse', function (next, page, html) {
        console.log('parse');
        parse(html, function (list) {
            next(null, list, page);
        });
    })
    .parallel('insertDB', function (next, item, page) {
        console.log('insertDB');
        item = {title: item, page: page};
        next(null, item);
    })
    .loopback('pipe-getHtml', function (loop, next, instance, parallel, page) {
        console.log('loopback');
        if (!instance.result) instance.result = [];
        for (var i = 0; i < parallel.length; i++)
            instance.result.push(parallel[i]);
        if (page < 5)
            loop(null, page + 1);
        else {
            next(null, instance.result);
        }
    })
    .end(function (err, result) {
        console.log(result);
    }).graph('./result.html');

Documents

  • flowpipe.init(work)
    • 작업 시작시 변수 초기화 등을 할 수 있다.
    • 파라미터 
      • work: function(next)
        • next: function(err, arg1, arg2 …)
          • 맨 처음은 에러, arg1, arg2 등 처음 이후에 변수들은 다음 작업의 변수로 넘겨준다.
          • 에러가 있으면 작업이 중단되고 마지막 함수(end)로 넘어간다.
  • flowpipe.pipe(name, work)
    • 순차적으로 작업을 수행할 수 있다.
    • 파라미터
      • name: 현재 작업의 이름, loopback, jump 기능에서 사용된다.
      • work: function(next, arg1, arg2 …)
        • next: function(err, arg1, arg2)
        • args: 이전 작업에서 넘겨준 파라미터가 넘어온다.
  • flowpipe.parallel(name, work)
    • 병렬로 작업을 수행한 후 최종 결과를 동기시킨 후 다음 작업으로 넘어 갈 수 있다.
    • 파라미터
      • name: 현재 작업의 이름
      • work: function(next, item, args…)
        • next: function(err, data)
          • data: 현재 작업의 결과, 병렬 작업이 끝나면 다음 작업의 첫번째 파라미터에 동기시켜서 전달한다.
        • item
          • 이전 작업에서 넘겨준 첫번째 변수의 각 아이템
          • 이전 작업의 에러 이후 첫번째 파라미터는 배열 형태여야 한다.
  • flowpipe.loopback(target, work)
    • 특정 작업으로 루프를 돌려주면서 루프 관리를 할 수 있다.
    • 조건 처리를 통해 종료 조건을 수행 할 수도 있고 무한 루프를 돌릴 수도 있다.
    • 파라미터
      • target
        • 작업의 id
        • 함수명-작업명 형태로 작성하여야한다.
        • pipe-parse
      • work: function(loop, next, instance, args …)
        • loop, next: function(err, args …)
          • loop는 지정한 루프로 돌아가는 함수이고, next는 loopback을 하지 않고 다음으로 넘어가는 함수이다.
        • instance: 현재 루프에서 지속적으로 유지되는 변수이다. 루프가 돌아가는 동안 유지되고, 루프가 종료되어 다음 작업으로 넘어가면 사라진다.
  • flowpipe.jump(name, work)
    • 조건에 따라 특정 작업으로 점프 할 수 있다.
    • 파라미터
      • name: 현재 작업의 이름
      • work: function(next, args …)
        • next: function(jump_to, args)
          • jump_to:
            • 점프할 작업의 이름
            • 루프백의 타겟과 마찬가지로 함수명-작업명 형태로 작성한다.
            • pipe-parse
  • flowpipe.end(work)
    • 에러가 발생하거나, 마지막으로 실행되는 작업을 등록할 수 있다.
    • 전체 흐름을 작성하고 마지막에 무조건 호출해야 작업이 시작된다.
    • 파라미터
      • work: function(err, args …)
        • 작업 도중 에러가 발생하거나 작업이 완료되면 실행된다.
  • flowpipe.graph(savepath)
    • end 이후에 실행해야 한다.
    • 전체 작업 흐름을 네트워크 그래프 형태로 보여주는 html을 생성해준다.
    • 스크린샷 2016-05-08 오후 7.34.03

네이버 뉴스 수집 예제

모듈 설치

npm install flowpipe request cheerio mysql iconv

예제 코드

var flowpipe = require('flowpipe');
var request = require('request');
var cheerio = require('cheerio');
var mysql = require('mysql');

var category_list = [
    {title: '경제', id: 949986},
    {title: '정치', id: 950203},
    {title: '사회', id: 949987},
    {title: '생활/문화', id: 949988},
    {title: '세계', id: 949990},
    {title: 'IT/과학', id: 949984}
];

flowpipe
    .init(function (next) {
        var query = {};
        query.date = new Date('2015-01-01');
        next(null, query);
    })
    .pipe('dateloop', function (next, query) {
        query.category = 0;
        next(null, query);
    })
    .pipe('categoryloop', function (next, query) {
        query.page = 1;
        next(null, query);
    })
    .pipe('pageloop', function (next, query) {
        setTimeout(function () {
            var dateformat = query.date.format('yyyy-MM-dd 00:00:00');
            var category = category_list[query.category];
            var url = 'http://news.naver.com/main/mainNews.nhn?componentId=' + category.id + '&date=' + dateformat + '&page=' + query.page;
            next(null, query, url);
        }, 500)
    })
    .pipe('getHtml', function (next, query, url) {
        request.get({
            url: url,
            encoding: 'binary'
        }, function (err, res, body) {
            if (err) return next(err);

            try {
                var charSet = 'euc-kr';
                body = new Buffer(body, 'binary');
                var Iconv = require('iconv').Iconv;
                var ic = new Iconv(charSet, 'utf-8');
                body = ic.convert(body).toString();
            } catch (e) {
            }

            var data = JSON.parse(body).itemList;
            next(null, query, data);
        });
    })
    .pipe('parse', function (next, query, data) {
        var result = [];
        var category = category_list[query.category];
        for (var i = 0; i < data.length; i++)
            result.push({
                title: data[i].titleWithUnescapeHtml,
                category: category.title,
                articleDate: data[i].articleDate,
                articleId: data[i].articleId,
                officeId: data[i].officeId,
                href: 'http://news.naver.com/main/read.nhn?mode=LSD&mid=shm&sid1=105&oid=' + data[i].officeId + '&aid=' + data[i].articleId
            });

        next(null, query, result);
    })
    .loopback('pipe-pageloop', function (loop, next, instance, query, result) {
        if (!instance.result) instance.result = [];
        if (!instance.pre) instance.pre = [];
        for (var i = 0; i < result.length; i++)
            for (var j = 0; j < instance.pre.length; j++)
                if (result[i].articleId == instance.pre[j].articleId)
                    return next(null, instance.result, query);
        for (var i = 0; i < result.length; i++)
            instance.result.push(result[i]);
        instance.pre = result;

        query.page += 1;
        loop(null, query);
    })
    .parallel('getContent', function (next, data) {
        request.get({
            url: data.href,
            encoding: 'binary'
        }, function (err, res, body) {
            if (err) return next(err);

            try {
                var charSet = 'euc-kr';
                body = new Buffer(body, 'binary');
                var Iconv = require('iconv').Iconv;
                var ic = new Iconv(charSet, 'utf-8');
                body = ic.convert(body).toString();
            } catch (e) {
            }
            var $ = cheerio.load(body);
            data.text = $('#articleBodyContents').text().trim();
            next(null, data);
        });
    })
    .pipe('createDB', function (next, parsed, query) {
        var connection = mysql.createConnection({
            "host": "localhost",
            "user": "root",
            "password": "",
            "database": "async"
        });

        next(null, parsed, connection, query);
    })
    .parallel('insertDB', function (next, data, connection) {
        connection.query('INSERT INTO news VALUES(?,?,?,?,?,?,?)', [data.articleId, data.category, data.title, data.articleDate, data.officeId, data.href, data.text], function (err) {
            next(null, err ? err : 'no-error');
        });
    })
    .pipe('closeDB', function (next, errors, connection, query) {
        var errorCnt = 0;
        for (var i = 0; i < errors.length; i++)
            if (errors[i] != 'no-error') errorCnt++;

        console.log(
            '[' + query.date.format('yyyy-MM-dd') + ']',
            '[' + category_list[query.category].title + ']',
            errors.length, 'data parsed.',
            errorCnt, 'error at inserting.'
        );

        connection.end(function () {
            next(null, query);
        });
    })
    .loopback('pipe-categoryloop', function (loop, next, instance, query) {
        query.category += 1;
        if (category_list[query.category])
            loop(null, query);
        else {
            next(null, query);
        }
    })
    .loopback('pipe-dateloop', function (loop, next, instance, query) {
        if (query.date.getTime() < new Date('2015-12-31').getTime()) {
            query.date = query.date.day(+1);
            loop(null, query);
        } else {
            next(null);
        }
    })
    .end();

// 날짜 포맷 변경 관련
Date.prototype.day = function (val) {
    return new Date(this.getTime() + val * 1000 * 60 * 60 * 24);
};

Date.prototype.format = function (f) {
    if (!this.valueOf()) return " ";

    var weekName = ["일요일", "월요일", "화요일", "수요일", "목요일", "금요일", "토요일"];
    var d = this;

    return f.replace(/(yyyy|yy|MM|dd|E|hh|mm|ss|a\/p)/gi, function ($1) {
        switch ($1) {
            case "yyyy":
                return d.getFullYear();
            case "yy":
                return (d.getFullYear() % 1000).zf(2);
            case "MM":
                return (d.getMonth() + 1).zf(2);
            case "dd":
                return d.getDate().zf(2);
            case "E":
                return weekName[d.getDay()];
            case "HH":
                return d.getHours().zf(2);
            case "hh":
                return ((h = d.getHours() % 12) ? h : 12).zf(2);
            case "mm":
                return d.getMinutes().zf(2);
            case "ss":
                return d.getSeconds().zf(2);
            case "a/p":
                return d.getHours() < 12 ? "오전" : "오후";
            default:
                return $1;
        }
    });
};

String.prototype.string = function (len) {
    var s = '', i = 0;
    while (i++ < len) {
        s += this;
    }
    return s;
};

String.prototype.zf = function (len) {
    return "0".string(len - this.length) + this;
};

Number.prototype.zf = function (len) {
    return this.toString().zf(len);
};

댓글 남기기