MinhVo

Minh Vo

rss feed

Slaying code & making it lit fr fr 🔥 tagline

Hey there 👋 I'm an AI Engineer with 7 years of experience building scalable web and mobile applications. Currently at Neurond AI (May 2025 — present), architecting an Enterprise AI Assistant Platform with multi-tenant RAG on pgvector, multi-provider LLM orchestration, and Azure-native infrastructure. Previously spent 5+ years at SNAPTEC (Sep 2019 — Apr 2025), leading SaaS themes, admin dashboards, and e-commerce platforms — earned the Hero of the Year award in 2021. I specialize in TypeScript, React, Next.js, and AI-Native engineering with Claude Code and Cursor.bio

Back to blogs

Functional Reactive Programming with RxJS

Master RxJS: Observables, operators, subjects, and reactive patterns for async data streams.

RxJSFRPJavaScriptReactive

By MinhVo

Introduction

Reactive programming fundamentally changes how we think about data flow in applications. Instead of imperatively pulling data and managing state changes manually, reactive programming lets you declare how data should flow through your system, and the framework handles the plumbing. RxJS—the Reactive Extensions for JavaScript—is the most mature and widely adopted reactive library in the JavaScript ecosystem, powering Angular's HTTP client, managing complex UI state in enterprise applications, and enabling real-time data processing pipelines.

The core insight of Functional Reactive Programming (FRP) is that almost everything can be modeled as a stream: user clicks, WebSocket messages, HTTP responses, form input changes, timers, and even errors. Once you embrace this mental model, complex asynchronous patterns that would require nested callbacks, tangled promise chains, or elaborate state machines become elegant, composable pipelines of operators. RxJS provides over 120 operators that let you filter, transform, combine, retry, throttle, and cache these streams with declarative precision.

This guide dives deep into RxJS fundamentals, the Observable pattern, essential and advanced operators, Subject types, error handling strategies, and real-world patterns used in production applications. Whether you are building a real-time dashboard, managing complex form interactions, or orchestrating microservice communication, RxJS provides the tools to handle asynchronous complexity with clarity and confidence.

Reactive programming stream visualization

Understanding Observables: Core Concepts

An Observable is the foundation of RxJS. It represents a lazy push collection of values over time. Unlike a Promise, which resolves once, an Observable can emit multiple values over its lifetime—zero, one, or many. It can also emit errors or completion signals, giving you fine-grained control over asynchronous data flows.

Observables vs Promises vs Callbacks

Promises are eager—they start executing immediately upon creation and resolve to a single value. Callbacks are fire-and-forget—they execute once when an event occurs but offer no composability. Observables combine the best of both worlds: they are lazy (nothing happens until you subscribe), they can emit multiple values, and they are highly composable through operators.

import { Observable } from "rxjs";
 
// Creating an Observable from scratch
const clicks$ = new Observable<number>((subscriber) => {
  let count = 0;
  const interval = setInterval(() => {
    subscriber.next(count++);
    if (count >= 5) {
      subscriber.complete();
    }
  }, 1000);
 
  // Cleanup function: called on unsubscribe
  return () => clearInterval(interval);
});
 
// Lazy: nothing happens until subscribe is called
const subscription = clicks$.subscribe({
  next: (value) => console.log(`Click #${value}`),
  error: (err) => console.error("Error:", err),
  complete: () => console.log("Stream completed"),
});
 
// Unsubscribe to prevent memory leaks
setTimeout(() => subscription.unsubscribe(), 3500);

Hot vs Cold Observables

Cold Observables create a new producer for each subscriber—each subscription gets its own independent execution. Hot Observables share a single producer among all subscribers. Understanding this distinction is critical for performance and correctness.

import { interval, Subject } from "rxjs";
 
// Cold: each subscriber gets its own timer
const cold$ = interval(1000);
// Subscriber A gets: 0, 1, 2, 3...
// Subscriber B (subscribes 2s later) gets: 0, 1, 2... (its own sequence)
 
// Hot: shared producer via Subject
const hot$ = new Subject<number>();
let i = 0;
setInterval(() => hot$.next(i++), 1000);
// Subscriber A gets: 0, 1, 2, 3...
// Subscriber B (subscribes 2s later) gets: 2, 3, 4... (joins the running sequence)

The Marble Diagram Mental Model

Marble diagrams are the standard way to visualize Observable behavior. Time flows left to right, values are shown as circles, errors as crosses, and completion as a vertical line. This mental model helps you reason about how operators transform streams before writing any code.

Async data flow architecture diagram

Architecture and Design Patterns

RxJS enables a reactive architecture where components communicate through streams rather than direct method calls. This decouples producers from consumers and makes the system more flexible, testable, and resilient to change.

The Reactive Store Pattern

Instead of using imperative state management with actions and reducers, you can build a reactive store where state is derived from streams of events. Each piece of state is a pure transformation of the event stream.

import { BehaviorSubject, Subject } from "rxjs";
import { map, scan, distinctUntilChanged, shareReplay } from "rxjs/operators";
 
interface Todo {
  id: number;
  text: string;
  completed: boolean;
}
 
type TodoAction =
  | { type: "ADD"; text: string }
  | { type: "TOGGLE"; id: number }
  | { type: "DELETE"; id: number };
 
// Event stream
const actions$ = new Subject<TodoAction>();
 
// State derived from event stream using scan (reduce for streams)
const todos$ = actions$.pipe(
  scan<TodoAction, Todo[]>((state, action) => {
    switch (action.type) {
      case "ADD":
        return [...state, { id: Date.now(), text: action.text, completed: false }];
      case "TOGGLE":
        return state.map((t) =>
          t.id === action.id ? { ...t, completed: !t.completed } : t
        );
      case "DELETE":
        return state.filter((t) => t.id !== action.id);
    }
  }, []),
  shareReplay(1)
);
 
// Derived state: completed count
const completedCount$ = todos$.pipe(
  map((todos) => todos.filter((t) => t.completed).length),
  distinctUntilChanged()
);
 
// Dispatch actions
actions$.next({ type: "ADD", text: "Learn RxJS" });
actions$.next({ type: "ADD", text: "Build reactive app" });
actions$.next({ type: "TOGGLE", id: 1 });

Component Communication Pattern

RxJS provides a clean pattern for component communication using services that expose Observables. Components subscribe to streams they care about and emit events through the service, completely decoupling producers from consumers.

Step-by-Step Implementation

Let us build a comprehensive RxJS toolkit covering the most essential operators and patterns for production applications.

Creation Operators

Creation operators generate Observables from various data sources. Choosing the right creation operator is the first step in building an effective reactive pipeline.

import {
  of, from, fromEvent, interval, timer,
  EMPTY, throwError, combineLatest, forkJoin
} from "rxjs";
 
// of: emits values synchronously, then completes
const numbers$ = of(1, 2, 3, 4, 5);
 
// from: converts arrays, promises, iterables to Observables
const fromArray$ = from([10, 20, 30]);
const fromPromise$ = from(fetch("/api/data").then((r) => r.json()));
 
// fromEvent: DOM events as streams
const click$ = fromEvent<MouseEvent>(document, "click");
const keyup$ = fromEvent<KeyboardEvent>(document, "keyup");
 
// interval: emits incrementing numbers at fixed intervals
const heartbeat$ = interval(1000); // 0, 1, 2, 3... every second
 
// timer: delay or periodic emissions
const delayed$ = timer(3000); // emits 0 after 3 seconds, then completes
const periodic$ = timer(2000, 1000); // starts after 2s, then every 1s
 
// EMPTY: completes immediately without emitting
const empty$ = EMPTY;
 
// throwError: emits error immediately
const error$ = throwError(() => new Error("Something went wrong"));

Transformation Operators

Transformation operators modify the values emitted by an Observable. They are the workhorses of reactive data processing.

import { map, pluck, switchMap, mergeMap, concatMap, exhaustMap, scan, reduce } from "rxjs/operators";
import { of, interval } from "rxjs";
 
// map: transform each emitted value
const doubled$ = of(1, 2, 3).pipe(map((n) => n * 2));
 
// switchMap: cancel previous inner Observable on new emission
// Perfect for type-ahead search: only care about the latest query
const searchResults$ = fromEvent<InputEvent>(searchInput, "input").pipe(
  map((e) => (e.target as HTMLInputElement).value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap((query) =>
    query.length > 2
      ? from(fetch(`/api/search?q=${query}`).then((r) => r.json()))
      : of([])
  )
);
 
// mergeMap: run inner Observables concurrently
// Good for independent parallel operations
const fileUploads$ = selectedFiles$.pipe(
  mergeMap((file) => uploadFile(file), 3) // max 3 concurrent uploads
);
 
// concatMap: run inner Observables sequentially
// Good for order-dependent operations
const sequentialUpdates$ = updateQueue$.pipe(
  concatMap((update) => saveToServer(update))
);
 
// exhaustMap: ignore new emissions while inner Observable is active
// Perfect for preventing double-clicks on submit buttons
const submitClicks$ = fromEvent(submitBtn, "click").pipe(
  exhaustMap(() => submitForm())
);
 
// scan: accumulate values over time (like reduce but emits each intermediate)
const runningTotal$ = of(1, 2, 3, 4, 5).pipe(
  scan((acc, val) => acc + val, 0)
);
// Emits: 1, 3, 6, 10, 15

Filtering Operators

Filtering operators control which values pass through the stream. They are essential for reducing noise and focusing on relevant data.

import {
  filter, take, takeUntil, takeWhile, first, last,
  debounceTime, throttleTime, distinctUntilChanged,
  sampleTime, auditTime, skip, skipUntil
} from "rxjs/operators";
 
// debounceTime: wait for pause in emissions
const debouncedInput$ = input$.pipe(debounceTime(300));
 
// throttleTime: emit first value, then ignore for duration
const throttledScroll$ = scroll$.pipe(throttleTime(100));
 
// distinctUntilChanged: only emit when value changes
const uniqueValues$ = of(1, 1, 2, 2, 3, 3, 2, 2).pipe(
  distinctUntilChanged()
);
// Emits: 1, 2, 3, 2
 
// takeUntil: complete when notifier emits (for cleanup)
const data$ = interval(1000).pipe(
  takeUntil(componentDestroyed$)
);
 
// filter with type narrowing
const validEvents$ = event$.pipe(
  filter((e): e is ValidEvent => e.type === "valid")
);

Combination Operators

Combination operators merge multiple streams together. They are essential for coordinating data from different sources.

import { combineLatest, merge, concat, zip, race, withLatestFrom } from "rxjs";
import { map, startWith } from "rxjs/operators";
 
// combineLatest: emits when any source emits (needs all to have emitted at least once)
const formValid$ = combineLatest([
  nameValid$,
  emailValid$,
  passwordValid$,
]).pipe(
  map(([name, email, password]) => name && email && password)
);
 
// merge: interleave emissions from multiple sources
const allEvents$ = merge(
  click$.pipe(map(() => "click")),
  keypress$.pipe(map(() => "keypress")),
  scroll$.pipe(map(() => "scroll"))
);
 
// withLatestFrom: combine trigger stream with latest value from others
const submitWithFormData$ = submitClick$.pipe(
  withLatestFrom(name$, email$, password$),
  map(([_, name, email, password]) => ({ name, email, password }))
);
 
// race: first source to emit wins, others are ignored
const fastestResponse$ = race(
  fetchFromPrimary$,
  fetchFromBackup$
);

Reactive streams data transformation visualization

Real-World Use Cases

Use Case 1: Type-Ahead Search with Cancellation

One of the most common RxJS use cases is implementing a search input that fetches results from an API while handling debouncing, deduplication, and request cancellation.

import { fromEvent } from "rxjs";
import {
  map, debounceTime, distinctUntilChanged,
  switchMap, catchError, startWith, filter
} from "rxjs/operators";
import { EMPTY } from "rxjs";
 
const searchInput = document.getElementById("search") as HTMLInputElement;
 
const searchResults$ = fromEvent<InputEvent>(searchInput, "input").pipe(
  map((e) => (e.target as HTMLInputElement).value.trim()),
  debounceTime(300),
  filter((query) => query.length >= 2),
  distinctUntilChanged(),
  switchMap((query) =>
    from(
      fetch(`/api/search?q=${encodeURIComponent(query)}`).then((r) =>
        r.json()
      )
    ).pipe(
      catchError((err) => {
        console.error("Search failed:", err);
        return EMPTY; // Silently handle error, continue stream
      })
    )
  )
);
 
// Display results
searchResults$.subscribe((results) => {
  renderSearchResults(results);
});

Use Case 2: WebSocket Real-Time Dashboard

RxJS excels at managing WebSocket connections with automatic reconnection, heartbeat monitoring, and data aggregation for real-time dashboards.

import { webSocket } from "rxjs/webSocket";
import { retry, delay, switchMap, map, scan, share } from "rxjs/operators";
import { timer, EMPTY } from "rxjs";
 
interface MetricData {
  timestamp: number;
  cpu: number;
  memory: number;
  requests: number;
}
 
const ws$ = webSocket<MetricData>("wss://metrics.example.com/live");
 
// Auto-reconnect on error with exponential backoff
const metrics$ = ws$.pipe(
  retry({
    delay: (error, retryCount) => {
      const backoff = Math.min(1000 * Math.pow(2, retryCount), 30000);
      console.log(`Reconnecting in ${backoff}ms (attempt ${retryCount})`);
      return timer(backoff);
    },
  }),
  share() // Share single connection among multiple subscribers
);
 
// Rolling average over last 10 data points
const rollingAvg$ = metrics$.pipe(
  scan<MetricData, MetricData[]>(
    (window, data) => [...window.slice(-9), data],
    []
  ),
  map((window) => ({
    avgCpu: window.reduce((s, d) => s + d.cpu, 0) / window.length,
    avgMemory: window.reduce((s, d) => s + d.memory, 0) / window.length,
    avgRequests: window.reduce((s, d) => s + d.requests, 0) / window.length,
  }))
);

Use Case 3: Complex Form Orchestration

Managing dependent fields, cross-field validation, and async validation in complex forms is where RxJS truly shines.

import { combineLatest, of } from "rxjs";
import { map, switchMap, debounceTime, distinctUntilChanged, startWith } from "rxjs/operators";
 
interface RegistrationForm {
  username: string;
  email: string;
  password: string;
  confirmPassword: string;
}
 
const username$ = fromEvent<InputEvent>(usernameInput, "input").pipe(
  map((e) => (e.target as HTMLInputElement).value),
  debounceTime(400),
  distinctUntilChanged()
);
 
const email$ = fromEvent<InputEvent>(emailInput, "input").pipe(
  map((e) => (e.target as HTMLInputElement).value),
  debounceTime(400),
  distinctUntilChanged()
);
 
// Async validation: check username availability
const usernameAvailable$ = username$.pipe(
  switchMap((username) =>
    username.length >= 3
      ? from(
          fetch(`/api/check-username/${username}`).then((r) => r.json())
        ).pipe(map((res) => ({ available: res.available, username })))
      : of({ available: false, username })
  )
);
 
// Cross-field validation: passwords match
const passwordsMatch$ = combineLatest([
  passwordValue$,
  confirmPasswordValue$,
]).pipe(
  map(([password, confirm]) => ({
    match: password === confirm && password.length > 0,
    strength: calculatePasswordStrength(password),
  }))
);
 
// Overall form validity
const formValid$ = combineLatest([
  usernameAvailable$.pipe(map((r) => r.available)),
  emailValid$,
  passwordsMatch$.pipe(map((r) => r.match && r.strength >= 2)),
]).pipe(
  map(([usernameOk, emailOk, passwordOk]) => usernameOk && emailOk && passwordOk),
  startWith(false)
);

Use Case 4: Drag and Drop with Constraints

Building a drag-and-drop system that respects boundaries, snaps to grids, and supports undo/redo is a perfect application for RxJS streams.

Best Practices for Production

  1. Always unsubscribe: Use takeUntil, take, or the AsyncPipe in Angular to prevent memory leaks. Every subscription that is not cleaned up is a potential memory leak.
  2. Use shareReplay for shared streams: When multiple components need the same data, use shareReplay(1) to cache the latest emission and share the underlying subscription.
  3. Prefer switchMap for HTTP calls: switchMap automatically cancels previous pending requests when a new emission arrives, preventing race conditions and stale data.
  4. Use BehaviorSubject for state: BehaviorSubjects emit their current value to new subscribers, making them ideal for state management where components need immediate access to current state.
  5. Avoid nested subscribes: Nested subscriptions are the RxJS equivalent of callback hell. Use higher-order mapping operators (switchMap, mergeMap, concatMap) instead.
  6. Handle errors at the right level: Errors propagate through the Observable chain and terminate it by default. Use catchError at appropriate boundaries to handle errors without killing the entire stream.
  7. Use the AsyncPipe in templates: In Angular, the AsyncPipe automatically subscribes and unsubscribes, eliminating manual subscription management and preventing leaks.
  8. Test with marble diagrams: Use TestScheduler and marble syntax for deterministic testing of time-dependent Observable behavior.

Common Pitfalls and Solutions

PitfallImpactSolution
Forgetting to unsubscribeMemory leaks, duplicate event handlersUse takeUntil pattern, AsyncPipe, or automatic cleanup
Using mergeMap for HTTP requestsRace conditions from out-of-order responsesUse switchMap or concatMap to control concurrency
Subscribing inside subscribeCallback hell, hard to manage lifecycleUse switchMap, mergeMap, or combineLatest instead
Not handling errors in streamsStream terminates silently on first errorAdd catchError at appropriate boundaries
Creating Observables inside componentsUntestable, tightly coupled logicExtract to services with injected dependencies
Using Subject when BehaviorSubject is neededNew subscribers miss the current stateUse BehaviorSubject for state that late subscribers need

Performance Optimization

RxJS performance in production depends on choosing the right operators and managing subscription lifecycles carefully.

import { auditTime, shareReplay, distinctUntilChanged } from "rxjs/operators";
 
// Debounce high-frequency events to prevent excessive processing
const optimizedScroll$ = fromEvent(window, "scroll").pipe(
  auditTime(16), // ~60fps
  map(() => window.scrollY),
  distinctUntilChanged()
);
 
// Share expensive computations across subscribers
const expensiveData$ = from(fetch("/api/large-dataset").then((r) => r.json())).pipe(
  map((data) => processLargeDataset(data)),
  shareReplay({ bufferSize: 1, refCount: true })
);
 
// Virtual scrolling with windowed data
const visibleItems$ = combineLatest([allItems$, scrollPosition$, viewportHeight$]).pipe(
  map(([items, scrollY, viewportHeight]) => {
    const startIndex = Math.floor(scrollY / ITEM_HEIGHT);
    const endIndex = Math.ceil((scrollY + viewportHeight) / ITEM_HEIGHT);
    const buffer = 5; // Render 5 extra items above and below
    return items.slice(
      Math.max(0, startIndex - buffer),
      Math.min(items.length, endIndex + buffer)
    );
  }),
  distinctUntilChanged()
);

Comparison with Alternatives

FeatureRxJSPromises/Async-AwaitEventEmitterRedux/Zustand
Multiple valuesYes (streams)Single value onlyYesSingle state
Lazy evaluationYesNo (eager)N/AN/A
CancellationBuilt-in (unsubscribe)AbortControllerManualN/A
Operator library120+ operatorsLimitedNoneMiddleware
Learning curveSteepLowLowModerate
Backpressure handlingManualNoneNoneN/A
TestingMarble diagramsStandard asyncStandardTime-travel
Best forComplex async, real-timeSimple one-shot requestsSimple eventsGlobal state

Advanced Patterns

Custom Operators

Creating custom operators lets you encapsulate reusable stream logic and build domain-specific abstractions.

import { Observable, EMPTY } from "rxjs";
import { switchMap, catchError, retry, delay } from "rxjs/operators";
 
// Custom operator: retry with exponential backoff
const retryWithBackoff = <T>(maxRetries = 3, baseDelay = 1000) =>
  (source: Observable<T>): Observable<T> =>
    source.pipe(
      retry({
        count: maxRetries,
        delay: (error, retryCount) => {
          const backoff = baseDelay * Math.pow(2, retryCount - 1);
          console.log(`Retry ${retryCount}/${maxRetries} after ${backoff}ms`);
          return timer(backoff);
        },
      })
    );
 
// Custom operator: log emissions for debugging
const logStream = <T>(label: string) =>
  (source: Observable<T>): Observable<T> =>
    new Observable((subscriber) => {
      return source.subscribe({
        next: (value) => {
          console.log(`[${label}] next:`, value);
          subscriber.next(value);
        },
        error: (err) => {
          console.error(`[${label}] error:`, err);
          subscriber.error(err);
        },
        complete: () => {
          console.log(`[${label}] complete`);
          subscriber.complete();
        },
      });
    });
 
// Usage
const resilientApi$ = fetchFromApi$.pipe(
  retryWithBackoff(5, 500),
  logStream("API Response"),
  catchError((err) => {
    showNotification("API unavailable");
    return EMPTY;
  })
);

State Machines with RxJS

You can model finite state machines using RxJS by defining states as discriminated unions and transitions as streams.

Testing Strategies

RxJS provides TestScheduler for deterministic, synchronous testing of time-dependent Observable behavior using marble diagram syntax.

import { TestScheduler } from "rxjs/testing";
import { debounceTime, map } from "rxjs/operators";
 
describe("Search pipeline", () => {
  let scheduler: TestScheduler;
 
  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });
 
  it("debounces input and maps to search results", () => {
    scheduler.run(({ cold, expectObservable }) => {
      const input$ = cold("abc 300ms d 100ms e|", {
        a: "r",
        b: "rx",
        c: "rxj",
        d: "rxjs",
        e: "rxjs ",
      });
 
      const result$ = input$.pipe(debounceTime(100), map((q) => `results:${q}`));
 
      expectObservable(result$).toBe("----c 300ms d 100ms (e|)", {
        c: "results:rxj",
        d: "results:rxjs",
        e: "results:rxjs ",
      });
    });
  });
});

Future Outlook

The RxJS ecosystem continues to evolve with the TC39 Observable proposal moving through the standards process, which may eventually bring native Observable support to JavaScript. Angular continues to deeply integrate RxJS, and newer frameworks are adopting reactive patterns. The rise of Signals (Angular, Solid, Vue) as a complementary reactivity primitive for synchronous state is not replacing RxJS but rather handling simpler cases while RxJS remains essential for complex asynchronous orchestration.

Conclusion

RxJS transforms how you handle asynchronous complexity in JavaScript applications. By modeling everything as streams, you gain powerful composition, automatic resource management, and declarative data flow that scales from simple event handling to complex real-time systems. The key is to start with simple patterns—debouncing input, transforming HTTP responses, combining form fields—and gradually adopt more advanced operators as your comfort grows.

The essential takeaways are: always unsubscribe to prevent leaks, prefer switchMap for HTTP operations, use shareReplay for shared data streams, and test with marble diagrams for deterministic results. Master these patterns, and you will find that even the most complex asynchronous requirements become manageable, readable, and maintainable with RxJS.