Node.js Stream 조작하기

본 블로그 글에서는 대용량 주소 데이터 다운로드 및 저장 시 Node.js 스트림 사용으로 인해 발생한 문제점과 해결 방안을 제시합니다.

Contents


배경


배치에서 대량의 데이터를 다운로드받아, RDBMS 에 저장하는 배치 시스템을 개발해야 하는 일이 생겼다. 기가 단위의 텍스트 파일을 다운로드 받아, rx.js 파이프라인과 BullMQ 메세지 큐를 사용하여 약 700만 건의 데이터를 적재하는 작업이었다. 다운로드 받아야 하는 텍스트 파일의 크기 자체가 크다 보니, 파일을 다운로드하고 읽을 때에도 Node.js 의 Stream 기능을 이용하였다. 다운로드 하려는 파일은 주소기반산업지원서비스에서 공개하는 주소 데이터인 "도로명주소 한글" 데이터 파일이다. 파일의 데이터 형식은 다음과 같다.

51150350200002300029100006|5115035023|강원특별자치도|강릉시|옥계면|주수리|0|214|3|511502000023|동해대로|0|291|6|5115035000|옥계면|25635||20230611|0|63|||

31110101430743900001500002|3111010100|울산광역시|중구|학성동||0|401|5|311104307439|학성6길|0|15|2|3111051000|학성동|44517||20110729|0|63|||

41480350320607500036900000|4148035021|경기도|파주시|광탄면|분수리|0|226|4|414803206075|장지산로|0|369|0|4148035000|광탄면|10954||20110729|0|63|||

46170400465217300003100000|4617040021|전라남도|나주시|산포면|매성리|0|65|0|461704652173|마성2길|0|31|0|4617040000|산포면|58214||20110729|0|63|||

47210107330900500005000000|4721010700|경상북도|영주시|고현동||0|423|1|472103309005|고현로|0|50|0|4721063000|가흥2동|36053||20110729|0|63|||

51150350200002300031200000|5115035023|강원특별자치도|강릉시|옥계면|주수리|0|131|1|511502000023|동해대로|0|312|0|5115035000|옥계면|25635||20230611|0|63|||


줄마다 특정 문자(|)를 기준으로, 각각의 인덱스가 특정한 정보를 지니는 전문 형태이다.

도로명주소 한글 레이아웃


문제상황


위의 "배경" 에서 설명했듯이, 전문 데이터에서 중요한 역할을 하는 문자는 두 가지이다.

| 문자 | 설명 | | --- | --- | | \| | 주소 데이터의 column을 구분하는 역할을 한다 | | | 주소 데이터의 row 를 구분하는 역할을 한다 |

따라서, File Stream 을 읽을 때, 두 문자를 기준으로 데이터를 토크나이징하고, 구조화하였다. 하지만 이 때 문제가 발생하였다. 한 주소 데이터에는 24개의 컬럼이 있어야 하는데, line 별로 데이터를 파싱해보니 가끔 24개보다 적은 갯수의 컬럼을 지닌 주소데이터가 생기는 것이었다. 이러한 데이터는 주소 검증 로직에 걸려, 에러가 발생하였다. 무엇이 잘못되었던 것일까 ?

원인은 Node.js 의 Stream 을 사용했기 때문에 발생했다. 서버로부터 파일을 다운로드 받을 때, HTTP 패킷은 chunk 단위로 Node.js Stream 으로 변환되어 데이터가 흐른다. 별도로 chunk size 를 설정하지 않으면, Node.js 는 16kb 로 Readable Stream 의 chunk size 를 결정한다. Stream 의 highWaterMark 옵션을 통해 chunk size 를 커스텀하게 설정할 수 있다. 파일을 다운로드 받는 과정에서 다음과 같은 상황이 발생했다고 가정해 보자.


한 주소가 Stream 의 chunk 와 chunk 사이의 경계에 위치할 경우, 다음 chunk 가 오기 전까지 딜레이가 생길 수 있다. 만약 이 딜레이 기간에 주소 파싱이 이루어지게 되면, 주소의 컬럼이 부족한 상태로 데이터 파싱이 이루어지게 된다. 실제 데이터 형태와 같이 줄 단위로 데이터를 스트림에 흐르게 한다면 문제가 발생하지 않을 것이라는 생각이 들었다.


해결방법


구글링을 해보니, Node.js 의 Stream에서 동적으로 chunk size 를 결정하게 하는 방법은 지원되지 않았다. 다만, 추가적인 pipe 를 꽂아넣음으로써 후속 pipe 에서 받는 데이터의 형태를 조절할 수 있었다.

Input 으로 들어온 chunk 를 line 단위로 잘라서 다음 pipe 에 전달해주는 LineByLinePipe 를 구현하였다. 구현의 간편성을 위해, Readable Stream 과 Writable Stream 이 합쳐진 형태인 Transform(Duplex) 를 이용하였다.


import { Transform } from 'node:stream';

function LineByLinePipeFactory(): Transform {
  // 현재 담겨있는 데이터를 나타내는 클로저 변수
  let buffer = Buffer.alloc(0);

  return new Transform({
    // newline 이 나올 때까지 버퍼에 담아두고, newline 이 나오면 push 한다
    transform(chunk: Buffer, encoding: string, callback: () => void) {
      buffer = Buffer.concat([buffer, chunk]);

      let newlineIndex;
      while ((newlineIndex = buffer.indexOf('
')) !== -1) {
        const line = buffer.subarray(0, newlineIndex + 1);
        this.push(line);
        buffer = buffer.subarray(newlineIndex + 1);
      }

      callback();
    },

    // stream 이 끝나게 되면, 버퍼에 남아있는 데이터를 push 한다 
    flush(callback: () => void) {
      if (buffer.length > 0) {
        this.push(buffer);
        buffer = Buffer.alloc(0);
      }

      callback();
    }
  });
}

...

readStream
  .pipe(LineByLinePipeFactory())
  .on('error', reject)
  .pipe(writeStream)
  .on('error', reject)
  .on('finish', resolve)


위와 같이, readStream 과 writeStream 사이에 line 별로 데이터를 나누어 주는 pipeline 을 추가하면, 항상 정합성 있는 주소 데이터가 파이프라인에 흐르도록 기능을 개선할 수 있다.


이것도 읽어보세요