[ReactiveX] RxJS

본 블로그 글에서는 반응형 프로그래밍의 핵심인 RxJS를 소개하고, Observable, Operator, Observer 등의 구성 요소와 다양한 활용법을 자세히 설명합니다.

Contents


자료 링크

const { interval } = rxjs;
const { ajax } = rxjs.ajax
const { filter, take, map, delay } = rxjs.operators;

// setInterval
interval(500).pipe(
    filter(n => n%2 === 0),
    take(5),
    map(n => Math.pow(2)),
);
// 유저의 클릭 이벤트
fromEvent(document, "click").pipe(
    map(e => e.x),
    filter(x => x < 400),
    take(4),
);
// AJAX
ajax("http://abcd.com").pipe(
    map(result => result.response.first_name)
).subscribe(console.log);

// delay
fromEvent(document, "click").pipe(
    // 1 초간 Delay 시킨다
    delay(1000),
    // 0.5초간 추가적은 input 이 없으면, 발행시킨다.
    debounceTime(500),
);



Observable

Convention : Observable 변수는 변수 뒤에 $ 를 붙인다.

const { of, from, range, generate } = rxjs

const obs1$ = of(1, 2, 3, 4, 5)
const obs2$ = from([6, 7, 8, 9, 10])
const obs3$ = range(11, 5)
const obs4$ = generate(
  15, x => x < 30, x => x + 2
)

//obs1$.subscribe(item => console.log(`of: ${item}`))
//obs2$.subscribe(item => console.log(`from: ${item}`))
//obs3$.subscribe(item => console.log(`range: ${item}`))
//obs4$.subscribe(item => console.log(`generate: ${item}`))
const { interval, timer } = rxjs

const obs1$ = interval(1000)
const obs2$ = timer(3000)

//obs1$.subscribe(item => console.log(`interval: ${item}`))
//obs2$.subscribe(item => console.log(`timer: ${item}`))
const { fromEvent } = rxjs

const obs1$ = fromEvent(document, 'click')
const obs2$ = fromEvent(document.getElementById('myInput'), 'keypress')

obs1$.subscribe(item => console.log(item))
obs2$.subscribe(item => console.log(item))
const { ajax } = rxjs.ajax

const obs$ = ajax(`http://127.0.0.1:3000/people/1`)
obs$.subscribe(result => console.log(result.response))
const { Observable } = rxjs

const obs$ = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)

  // 값을 다 발행한 뒤에는 compelte를 실행하여 메모리 해제 
  subscriber.complete()
})

obs$.subscribe(item => console.log(item))




Observer

Observer 은 크게 다음과 같은 구조로 설정된다.

const observer = {
    // Operator
    next: Function,
    // Error Handler
    error: Function,
    // When Stream Terminates
    complete: Function,
}
// Object 를 넘겨주면서 subscribe 가능
observable$.subscribe(observer)
// Object 형태가 아니라, 다음과 같이도 subscribe 가능 
// (error, complete) 는 생략 가능 
observable$.subscribe(next, error, complete)

중간에 에러가 발생한다면, Operator 은 동작을 멈추고 중단된다.

import { Observable } from "rxjs";

const observable$ = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  throw new Error("test");
  subscriber.next(3);
  subscriber.complete();
});

observable$.subscribe({
  next: console.log,
  error: console.error,
  complete: console.log,
});
// 실행해 보면, Stream 2 이후로는 실행되지 않는다.

Observable 이 Complete 된다면, 그 이후의 next 는 실행되지 않는다.

import { Observable } from "rxjs";

const observable$ = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
  subscriber.next(3);
});

observable$.subscribe({
  next: console.log,
  error: console.error,
  complete: console.log,
});
// 1, 2 만 출력된다.

사용이 다 된 Observable 은 complete 시켜야 메모리가 낭비되지 않는다. 중간에 구독을 멈추려면 unsubscribe() 를 실행시켜야 한다.



Operator

  1. Creation Operator Observable 을 생성
of, from, range, fromEvent, timeout, interval


  1. Pipable Operator Observable 의 데이터를 순수 함수로 가공 (Observable의 데이터를 수정하지 않음. Immutable) Operator 목록 (공식 문서)

Image

Marvel Diagram 읽는 법



선택 관련 Operator

const { from } = rxjs
const { first, last, elementAt, filter, distinct } = rxjs.operators

const obs$ = from([
    9, 3, 10, 5, 1, 10, 9, 9, 1, 4, 1, 8, 6, 2, 7, 2, 5, 5, 10, 2
]);

obs$.pipe(first()).subscribe(x => console.log('first: ' + x));
obs$.pipe(last()).subscribe(x => console.log('last: ' + x));
obs$.pipe(elementAt(5)).subscribe(x => console.log('elementAt: ' + x));
obs$.pipe(distinct()).subscribe(x => console.log('distinct: ' + x));
obs$.pipe(
    filter(x => x % 2 === 1)
).subscribe(x => console.log('filter: ' + x));

const { from } = rxjs
const { tap, filter, distinct } = rxjs.operators

from([
    9, 3, 10, 5, 1, 10, 9, 9, 1, 4, 1, 8, 6, 2, 7, 2, 5, 5, 10, 2
]).pipe(
    tap(x => console.log('-------------- 처음 탭: ' + x)),
    filter(x => x % 2 === 0),
    tap(x => console.log('--------- 필터 후: ' + x)),
    distinct(),
    tap(x => console.log('중복 제거 후: ' + x)),
).subscribe(x => console.log('발행물: ' + x))



Transformation 연산자들

const { from } = rxjs
const { pluck } = rxjs.operators

const obs$ = from([
    { name: 'apple', price: 1200, info: { category: 'fruit' } },
    { name: 'carrot', price: 800, info: { category: 'vegetable' } },
    { name: 'pork', price: 5000, info: { category: 'meet' } },
    { name: 'milk', price: 2400, info: { category: 'drink' } }
]);

obs$.pipe(
    // pluck 연산자는 특정 항목만 뽑아준다.
    pluck('price')
).subscribe(console.log);

obs$.pipe(
    pluck('info'),
    pluck('category'),
).subscribe(console.log);

obs$.pipe(
    pluck('info', 'category')
).subscribe(console.log);

const obs$ = ajax(`http://api.github.com/search/users?q=user:mojombo`).pipe(
    // 배열이 끼어있는 경우, number 값을 입력해 주면 index 로 이해하고 분해해준다.
    pluck('response', 'items', 0, 'html_url')
)
obs$.subscribe(console.log);


// toArray
// 그냥 subscribe 하면 하나씩 값으로 나오지만, toArray() 를 사용하면 전체 결과가 하나의 list 로 나오게 된다.
range(1, 10).pipe(
    filter(x => x % 3 === 0),
    filter(x => x % 2 === 1),
    toArray()
).subscribe(console.log)
// [3, 9]```
// scan e 는 모든 연산이 끝난 결과를 반환해 주지만,
// reduce 는 모든 연산이 끝난 결과를 반환해 주지만,
// scan 은 연산 과정을 모두 출력해 준다.
const obs$ = of(1,2,3,4,5);
obs$.pipe( cv) => ac + cv)
    scan((ac, cv) => ac + cv)
).subscribe(console.log);
const obs1$ = from([1, 2, 3, 4, 5, 6])
const obs2$ = from(['a', 'b', 'c', 'd', 'e'])

zip(obs1$, obs2$).subscribe(console.log)
// [1, "a"]
// [2, "b"]
// [3, "c"]
// [4, "d"]
// [5, "e"]
// zip 응용 예시 
const obs4$ = interval(1000)
const obs5$ = fromEvent(document, 'click').pipe(pluck('x'))

zip(obs4$, obs5$).subscribe(console.log)

어떻게 동작할까 ? 만약 클릭수가 경과한 초보다 많다면 : 1초마다 하나씩 zip stream 에서 하나씩 발행된다. 만약 클릭보다 경과한 초가 더 많다면 : 클릭할 때마다 zip stream 에서 하나씩 발행된다.

fromEvent(document, 'click').pipe(
    pluck('x'),
    filter(x => x < 200),
    take(5),
).subscribe(
    console.log,
    err => console.error(err),
    _ => console.log('COMPLETE')
)


fromEvent(document, 'click').pipe(
    pluck('x'),
    takeWhile(x => x < 200),
).subscribe(
    console.log,
    err => console.error(err),
    _ => console.log('COMPLETE')
)

위의 예시에서, x 좌표가 200보다 큰 부분들은 아무리 클릭해도 발행되지 않는다. x좌표가 200보다 작은 이벤트 5건만 잡아서 발행되는 스트림이 생성된다.

takeUntil 은 또다른 Stream 을 인자로 받는다. 인자로 받은 Stream 이 첫 번째 값을 발행할 때 까지만 값을 발행한다.

obs1$ = fromEvent(document, 'click')
obs2$ = timer(5000)

obs1$.pipe(
    pluck('x'),
    takeUntil(obs2$)
).subscribe(
    console.log,
    err => console.error(err),
    _ => console.log('COMPLETE')
)

위의 예시는 첫 5초동안만 클릭 이벤트를 입력받고, 5초가 지나면 Complete 시킨다.

skip, skipLast, skipWhile, skipUntil 도 take 와 의미만 반대인 상태이고 동작은 유사하다.


시간 관련 Operator

interval(1000).pipe(
    take(5),
    tap(x => console.log(x + ' 발행시작')),
    delay(1500)
).subscribe(x => console.log(x + ' 발행완료'))
fromEvent(document, 'click').pipe(
    pluck('x'),
    timeInterval()
).subscribe(console.log)
/*
{
    interval: 1402,
    value: 149
}
*/
fromEvent(document, "click").pipe(
    timeout(3000)
).subscribe(
    _ => console.log("OK"),
    err => console.error(err),
)
ajax('http://127.0.0.1:3000/people/name/random').pipe(
    pluck('response'),
    timeoutWith(500, of({
        id: 0,
        first_name: 'Hong',
        last_name: 'Gildong',
        role: 'substitute'
    }))
).subscribe(console.log, console.error)

시간을 다루는 심화 연산자들

const { debounceTime } = rxjs.operators

clicks$.pipe(
    debounceTime(1000)
).subscribe(x => console.log('OUTPUT: -------- ' + x))

Image 유저의 키보드 input 을 받을 때, 사용자로부터 너무 잦은 input 이 있을 때, debounceTime 을 활용하면 적절하다.

const { auditTime } = rxjs.operators

clicks$.pipe(
    auditTime(1000)
).subscribe(x => console.log('OUTPUT: -------- ' + x))
// 첫번째 클릭으로부터 1초 후를 기준으로, 가장 최근에 들어온 input 값으로 발행된다.

Image


const { throttleTime } = rxjs.operators

clicks$.pipe(
    throttleTime(1000, undefined, { 
        leading: true, trailing: false 
    })
).subscribe(x => console.log('OUTPUT: -------- ' + x))

// 클릭하자마자 발행된다. 발행된 후 1초가 지나기 전까진, 다른 값들이 발행되지 않는다.

Image

const { throttleTime } = rxjs.operators

clicks$.pipe(
    throttleTime(1000, undefined, { 
        leading: false, trailing: true 
    })
).subscribe(x => console.log('OUTPUT: -------- ' + x))
// 1초 간격을 기다렸다가 발행한다.

Image


스트림 결합 Operator

const interval$ = interval(1000).pipe(map(_ => "interval"));
const click$ = fromEvent(document, "click").pipe(map(_ => "click"));

merge(interval$, click$).subscribe(console.log);
const { merge, interval } = rxjs
const { map, take } = rxjs.operators

const intv1$ = interval(1000).pipe(
    map(_ => 'INTERVAL 1'), take(3))
const intv2$ = interval(1000).pipe(
    map(_ => 'INTERVAL 2'), take(6))
const intv3$ = interval(1000).pipe(
    map(_ => 'INTERVAL 3'), take(9))
const intv4$ = interval(1000).pipe(
    map(_ => 'INTERVAL 4'), take(9))
const intv5$ = interval(1000).pipe(
    map(_ => 'INTERVAL 5'), take(9))

// merge 함수의 마지막 인자로 number 를 주게 되면,
// 한 번에 몇 개씩 합칠지 나타낸다.
// 위의 예시로는 INTERVAL (1,2,3) 이 먼저 발행되다가, 
// (1,2,3) => (2,3,4) => (3,4,5) => (4,5) => (5) 의 형태로 발행된다.
merge(intv1$, intv2$, intv3$, intv4$, intv5$, 3)
.subscribe(console.log)


const intv1$ = interval(1000).pipe(
    map(_ => "INTERVAL 1"),
    take(3),
);
const intv2$ = interval(1000).pipe(
    map(_ => "INTERVAL 2"),
    take(3),
);
const intv3$ = interval(1000).pipe(
    map(_ => "INTERVAL 3"),
    take(3),
);

concat(intv1$, intv2$, intv3$)
    .subscribe(console.log);

// INTERVAL 1 => 2 => 3 이 각각 3회씩 반복되어서 발행된다.
const interval$ = interval(1000).pipe(
    map(_ => "interval"),
    take(5),
);
const click$ = fromEvent(document, "click").pipe(
    map(_ => "click")
);

concat(interval$, click$).subscribe(console.log);

// interval 이 동작하고 있을 때에는, 
// click 이벤트 스트림 자체가 동작하지 않는다.
// (interval 을 subscribe 중일 때에는, click 스트림은 동작하지 않는다.)


mergeMap(e => interval(1000)).pipe(
        take(5)
    )
).subscribe(console.log);
// 위의 예제는 클릭할 때마다, 
// 1초에 한번씩 이벤트가 발행되는 Stream 을 생성한다.
of(1, 2, 3, 4, 5).pipe(
    mergeMap(keyword => ajax(
            `localhost:3000/people/${keyword}`
        ).pipe(
            pluck("response", "first_name")
        )
    )
).subscribe(console.log);
// 위의 예제는 ajax 요청을 5회 실시한 뒤에, 
// first_name 을 뽑아서 발행해준다.


of(1, 2, 3, 4, 5).pipe(
    concatMap(keyword => ajax(
            `localhost:3000/people/${keyword}`
        ).pipe(
            pluck("response", "first_name")
        )
    )
).subscribe(console.log);
// mergeMap 과는 달리, 
// concatMap 을 사용하면 항상 이름이 순서대로 출력되게 된다.


fromEvent(document, 'click').pipe(
    switchMap(e => interval(1000).pipe(
        map(i => e.x + ' : ' + i),
        take(5)
    ))
).subscribe(console.log)
// 클릭할 때마다, 과거에 있던 stream 은 삭제되게 된다.



기타 유용한 연산자들

const num$ = from([3, 1, 4, 7, 5, 8, 2]);
const key$ = fromEvent(document, "keyup").pipe(
    map(e => Number(e.code.replace("Digit", ""))),
    take(7),
    sequenceEqual(num$),
).subscribe(console.log);
of(1,1,2,2,2,1,1,2,3,3,3,4,4,1).pipe(
    distinctUntilChanged(),
).subscribe(console.log);

distinctUntilChange 는 Object 도 비교할 수 있다. (같은 이름이면, 같은 Object 라고 판단하는 예)

distinctUntilChanged((a, b) => a.name === b.name)
combineLatest(
    fromEvent(document, "click"),
    interval(1000).pipe(pluck("x")),
).subscribe(console.log);
interval(1000).pipe(
    buffer(fromEvent(document, "click"))
).subscribe(console.log);
fromEvent(document, "click").pipe(
    bufferCount(3)
).subscribe(_ => console.log("Clicked 3 Times"));
range(1, 50).pipe(
    groupBy(x => x%3),
    mergeMap(groups$ => groups$.pipe(toArray())),
).subscribe(console.log);
const obs$ = of(1,2,3);
// -1, 0, 1, 2, 3
obs$.pipe(startWith(-1, 0)).subscribe(console.log);
// 1, 2, 3, 4, 5
obs$.pipe(endWith(4, 5)).subscribe(console.log);
fromEvent(document, "click").pipe(
    takeUntil(timer(5000)),
    pluck("x"),
    defaultIfEmpty("NO CLICK")
).subscribe(console.log)
range(1, 20).pipe(
    mergeMap(keyword => ajax(`localhost:3000/${keyword}`)).pipe(
        pluck("response", "first_name"),
        retry(3),
    )
).subscribe(console.log);
fromEvent(document.querySelector("#checked"), "change").pipe(
    pluck("target", "checked")
).subscribe(checked => {
    defer(_ => checked ? of("CHECKED") : of("UNCHECKED")).subscribe(console.log);
})
empty().subscribe(console.log, console.error, _ => console.log("COMPLETE"));
throwError().subscribe(console.log, console.error, _ => console.log("COMPLETE"));
const obs$ = interval(1000).pipe(
    take(20),
    // share 을 사용하면, side effect 도 한 번만 실행된다.
    tap(e => console.log("side effect : " + e)),
    share()
);
obs$.subscribe(console.log);
setTimeout(_ => obs$.subscribe(console.log), 2000);



// Observable 은 구독자가 구독할 때 각각의 데이터가 발행되지만,
// Subject 는 모든 구독자에게 같은 데이터가 발행된다.
const {Subject} = rxjs;
const subject = new Subject();

setTimeout(() => {
    let x = 0;
  setInterval(() => {
        subject.next(x++);
  }, 2000)
})

subject.subscribe(x => console.log("바로 구독 : " + x));
setTimeout(() => {
    subject.subscribe(x => console.log("3초 후 구독 : " + x));
}, 3000)
setTimeout(() => {
    subject.subscribe(x => console.log("7초 후 구독 : " + x));
}, 7000)
const {BehaviorSubject} = rxjs;
const subject = new BehaviorSubject(0);

subject.subscribe(x => console.log(`A : ${x}`));

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(x => console.log(`B : ${x}`));

subject.next(4);
subject.next(5);
subject.next(6);

/*
"A : 0"
"A : 1"
"A : 2"
"A : 3"
"B : 3"
"A : 4"
"B : 4"
"A : 5"
"B : 5"
"A : 6"
"B : 6"
*/
const { ReplaySubject } = rxjs
const subject = new ReplaySubject(3) // 마지막 3개 값 저장

subject.subscribe((x) => console.log('A: ' + x))

subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)
subject.next(5)

subject.subscribe((x) => console.log('B: ' + x))

subject.next(6)
subject.next(7)
/*
"A: 1"
"A: 2"
"A: 3"
"A: 4"
"A: 5"
"B: 3"
"B: 4"
"B: 5"
"A: 6"
"B: 6"
"A: 7"
"B: 7"
*/
const { AsyncSubject } = rxjs
const subject = new AsyncSubject()

subject.subscribe((x) => console.log('A: ' + x))

subject.next(1)
subject.next(2)
subject.next(3)

subject.subscribe((x) => console.log('B: ' + x))

subject.next(4)
subject.next(5)

subject.subscribe((x) => console.log('C: ' + x))

subject.next(6)
subject.next(7)
subject.complete()

이것도 읽어보세요