MinhVo

Minh Vo

rss feed

Slaying code & making it lit fr fr 🔥 tagline

Hey there 👋 I'm an AI Engineer with 7 years of experience building scalable web and mobile applications. Currently at Neurond AI (May 2025 — present), architecting an Enterprise AI Assistant Platform with multi-tenant RAG on pgvector, multi-provider LLM orchestration, and Azure-native infrastructure. Previously spent 5+ years at SNAPTEC (Sep 2019 — Apr 2025), leading SaaS themes, admin dashboards, and e-commerce platforms — earned the Hero of the Year award in 2021. I specialize in TypeScript, React, Next.js, and AI-Native engineering with Claude Code and Cursor.bio

Back to blogs

Kafka Streams: Real-Time Data Processing

Process streams with Kafka: topology, state stores, windowing, and exactly-once.

KafkaStreamsReal-TimeData

By MinhVo

Introduction

Real-time data processing has become a cornerstone of modern application architectures. Whether you are building fraud detection systems, live dashboards, recommendation engines, or IoT data pipelines, the ability to process and react to data as it arrives is a competitive advantage. Apache Kafka has established itself as the de facto standard for distributed event streaming, but processing those events efficiently requires more than just a message broker—it requires a stream processing framework.

Kafka Streams is a client library for building applications and microservices that process data stored in Kafka. Unlike Apache Flink or Apache Spark Streaming, Kafka Streams does not require a separate cluster or processing framework. It runs as a standard Java application, deployed alongside your existing services, and leverages Kafka's own partitioning and fault-tolerance mechanisms. This simplicity makes Kafka Streams uniquely appealing for teams that want real-time processing without the operational overhead of managing a dedicated stream processing cluster.

In this guide, we will explore Kafka Streams in depth, covering its core abstractions, practical implementation patterns, stateful processing with state stores, windowing strategies, and the exactly-once processing guarantees that make it suitable for mission-critical applications.

Real-time data processing

Understanding Kafka Streams: Core Concepts

The Streams Processing Model

Kafka Streams processes data as an unbounded sequence of records. Each record consists of a key, a value, and a timestamp. The processing model is based on two core abstractions: KStream and KTable.

A KStream represents an append-only stream of records. Each record is an independent event—inserting a new record does not affect previous records with the same key. Think of it as a log of events: user clicks, sensor readings, or transaction records.

A KTable represents a changelog stream where each record represents an update to a key. If a record arrives with a key that already exists, the new record replaces the old one. Think of it as a materialized view of the latest state for each key: user profiles, inventory counts, or account balances.

Understanding the distinction between KStream and KTable is fundamental to designing correct Kafka Streams applications. Processing a click stream as a KTable would lose earlier clicks for the same user, while processing a user profile stream as a KStream would accumulate stale entries.

Topology: The Processing Graph

A Kafka Streams application defines a topology—a directed acyclic graph (DAG) of processing nodes. Each node performs a specific operation: filtering, mapping, grouping, aggregating, or joining. The topology reads from one or more source topics, processes the records through the node graph, and writes results to one or more sink topics.

StreamsBuilder builder = new StreamsBuilder();
 
// Source: read from topic
KStream<String, String> source = builder.stream("input-topic");
 
// Processing: transform and filter
KStream<String, String> processed = source
    .filter((key, value) -> value != null)
    .mapValues(value -> value.toUpperCase())
    .peek((key, value) -> System.out.println("Processed: " + value));
 
// Sink: write to topic
processed.to("output-topic");

Application Architecture

Kafka Streams applications are distributed across multiple instances. Each instance processes a subset of the topic's partitions. When an instance starts or stops, Kafka automatically redistributes partitions among the remaining instances. This rebalancing happens through Kafka's consumer group protocol and requires no external coordination.

Data pipeline architecture

Architecture and Design Patterns

Stateless Processing

Stateless operations process each record independently, without referencing previous records. They are the simplest and most performant type of processing:

// Filter: remove records that don't match criteria
KStream<String, Order> validOrders = orders
    .filter((key, order) -> order.getAmount() > 0);
 
// Map: transform record values
KStream<String, OrderSummary> summaries = orders
    .mapValues(order -> new OrderSummary(
        order.getId(),
        order.getAmount(),
        order.getTimestamp()
    ));
 
// FlatMap: one input record to zero or more output records
KStream<String, LineItem> items = orders
    .flatMap((key, order) -> order.getLineItems().stream()
        .map(item -> KeyValue.pair(order.getId(), item))
        .collect(Collectors.toList()));
 
// Branch: split stream into multiple sub-streams
KStream<String, Order>[] branches = orders.branch(
    (key, order) -> order.getType() == OrderType.PURCHASE,
    (key, order) -> order.getType() == OrderType.REFUND,
    (key, order) -> true  // default branch
);

Stateful Processing with State Stores

Stateful operations maintain local state that is updated as records are processed. Kafka Streams stores this state in state stores backed by Kafka topics for fault tolerance:

// Aggregate: accumulate values per key
KTable<String, Long> orderCounts = orders
    .groupByKey()
    .aggregate(
        () -> 0L,                    // Initializer
        (key, order, count) -> count + 1,  // Aggregator
        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
            "order-counts-store"
        ).withValueSerde(Serdes.Long())
    );
 
// Reduce: combine values per key
KTable<String, Order> largestOrders = orders
    .groupByKey()
    .reduce(
        (current, incoming) -> incoming.getAmount() > current.getAmount()
            ? incoming : current,
        Materialized.as("largest-orders-store")
    );

Joining Streams and Tables

Kafka Streams supports three types of joins: stream-stream, stream-table, and table-table. Each has different semantics regarding event time, retention, and output behavior:

// Stream-Table Join: enrich orders with customer data
KStream<String, EnrichedOrder> enriched = orders.join(
    customers,                          // KTable to join with
    (order, customer) -> new EnrichedOrder(order, customer),
    Joined.with(Serdes.String(), orderSerde, customerSerde)
);
 
// Stream-Stream Join: correlate events within a time window
KStream<String, CorrelatedEvent> correlated = pageViews.join(
    purchases,
    (pageView, purchase) -> new CorrelatedEvent(pageView, purchase),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30)),
    StreamJoined.with(Serdes.String(), pageViewSerde, purchaseSerde)
);

Step-by-Step Implementation

Setting Up a Kafka Streams Application

Start by configuring the Kafka Streams application with the essential properties:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
 
import java.util.Properties;
 
public class OrderProcessor {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
            Serdes.StringSerde.class);
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
            SpecificAvroSerde.class);
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
            "exactly_once_v2");
 
        StreamsBuilder builder = new StreamsBuilder();
        buildTopology(builder);
 
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
 
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
 
        streams.start();
    }
}

Building a Real-Time Aggregation Pipeline

A common pattern is aggregating events in real time to produce running totals or counts:

public static void buildTopology(StreamsBuilder builder) {
    // Source: order events
    KStream<String, Order> orders = builder.stream("orders");
 
    // Filter valid orders only
    KStream<String, Order> validOrders = orders
        .filter((key, order) -> order.getStatus() != OrderStatus.CANCELLED)
        .filter((key, order) -> order.getAmount().compareTo(BigDecimal.ZERO) > 0);
 
    // Group by customer ID
    KGroupedStream<String, Order> grouped = validOrders
        .groupBy(
            (key, order) -> order.getCustomerId(),
            Grouped.with(Serdes.String(), orderSerde)
        );
 
    // Aggregate: compute running totals per customer
    KTable<String, CustomerStats> stats = grouped.aggregate(
        CustomerStats::new,
        (customerId, order, current) -> current.addOrder(order),
        Materialized.<String, CustomerStats, KeyValueStore<Bytes, byte[]>>as(
            "customer-stats-store"
        )
        .withRetention(Duration.ofDays(90))
        .withKeySerde(Serdes.String())
        .withValueSerde(customerStatsSerde)
    );
 
    // Output: publish stats to downstream topic
    stats.toStream().to("customer-stats");
}

Implementing Windowed Aggregations

Windowing groups records into time-based buckets for aggregation:

// Tumbling window: non-overlapping fixed-size windows
KTable<Windowed<String>, Long> hourlyCounts = orders
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1))
    )
    .count(Materialized.as("hourly-counts-store"));
 
// Sliding window: overlapping windows that slide by a fixed increment
KTable<Windowed<String>, OrderStats> slidingStats = orders
    .groupByKey()
    .windowedBy(
        SlidingWindows.ofTimeDifferenceAndGrace(
            Duration.ofMinutes(5),
            Duration.ofSeconds(30)
        )
    )
    .aggregate(
        OrderStats::new,
        (key, order, stats) -> stats.add(order),
        Materialized.as("sliding-stats-store")
    );
 
// Session window: dynamic windows based on activity gaps
KTable<Windowed<String>, Long> sessionCounts = orders
    .groupByKey()
    .windowedBy(
        SessionWindows.ofInactivityGapAndGrace(
            Duration.ofMinutes(30),
            Duration.ofMinutes(5)
        )
    )
    .count(Materialized.as("session-counts-store"));

Time-based processing

Real-World Use Cases

Fraud Detection System

A real-time fraud detection system processes credit card transactions, flags suspicious patterns, and alerts analysts within seconds:

public static void fraudDetectionTopology(StreamsBuilder builder) {
    KStream<String, Transaction> transactions = builder.stream("transactions");
 
    // Rule 1: Multiple transactions from same card in short window
    KTable<Windowed<String>, Long> rapidTransactions = transactions
        .groupBy(
            (key, tx) -> tx.getCardId(),
            Grouped.with(Serdes.String(), transactionSerde)
        )
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(
            Duration.ofMinutes(5),
            Duration.ofMinutes(1)
        ))
        .count(Materialized.as("rapid-tx-count"));
 
    // Rule 2: Transaction amount exceeds threshold
    KStream<String, Transaction> largeTransactions = transactions
        .filter((key, tx) -> tx.getAmount().compareTo(
            new BigDecimal("10000")) > 0);
 
    // Combine alerts
    KStream<String, FraudAlert> alerts = largeTransactions
        .mapValues(tx -> new FraudAlert(tx, AlertType.LARGE_AMOUNT));
 
    alerts.to("fraud-alerts");
}

IoT Sensor Data Pipeline

Processing sensor data from thousands of devices, computing rolling averages, and detecting anomalies:

public static void sensorProcessingTopology(StreamsBuilder builder) {
    KStream<String, SensorReading> readings = builder.stream("sensor-readings");
 
    // Filter out-of-range readings
    KStream<String, SensorReading> validReadings = readings
        .filter((key, reading) ->
            reading.getTemperature() > -50 &&
            reading.getTemperature() < 150
        );
 
    // Compute 5-minute rolling average
    KTable<Windowed<String>, Double> rollingAvg = validReadings
        .groupBy(
            (key, reading) -> reading.getSensorId(),
            Grouped.with(Serdes.String(), sensorSerde)
        )
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
        .aggregate(
            () -> new double[]{0.0, 0.0},
            (sensorId, reading, acc) -> {
                acc[0] += reading.getTemperature();
                acc[1] += 1;
                return acc;
            },
            Materialized.as("sensor-avg-store")
        )
        .mapValues(acc -> acc[0] / acc[1]);
 
    // Detect anomalies: reading deviates from average
    rollingAvg.toStream()
        .filter((key, avg) -> avg > 100 || avg < -10)
        .mapValues(avg -> new AnomalyAlert(key.key(), avg))
        .to("anomaly-alerts");
}

Live Dashboard Updates

Aggregating user activity data for real-time dashboard displays:

public static void dashboardTopology(StreamsBuilder builder) {
    KStream<String, UserEvent> events = builder.stream("user-events");
 
    // Count events by type per minute
    KTable<Windowed<String>, Map<String, Long>> metrics = events
        .groupBy(
            (key, event) -> event.getEventType(),
            Grouped.with(Serdes.String(), userEventSerde)
        )
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
        .count(Materialized.as("event-counts-store"))
        .toStream()
        .groupByKey()
        .aggregate(
            HashMap::new,
            (type, count, map) -> {
                map.put(type, count);
                return map;
            },
            Materialized.as("dashboard-metrics")
        );
}

Best Practices for Production

  1. Enable exactly-once semantics — Set processing.guarantee=exactly_once_v2 to ensure each record is processed exactly once, even during failures and rebalances.

  2. Use Avro or Protobuf for serialization — Schema-aware serialization formats prevent compatibility issues when evolving your data model. Use a Schema Registry for schema management.

  3. Monitor consumer lag — Track the lag between the latest offset in each partition and the current processing offset. High lag indicates your application cannot keep up with the incoming data rate.

  4. Size state stores appropriately — Configure RocksDB block cache size, write buffer size, and compaction style based on your workload. Monitor state store disk usage.

  5. Handle deserialization errors gracefully — Configure a DeserializationExceptionHandler to route malformed records to a dead-letter topic instead of crashing the application.

  6. Use named operators for topology clarity — Give each processing step a descriptive name using .withName() to make the topology easier to understand in monitoring tools.

  7. Test topologies with TopologyTestDriver — Use Kafka's built-in test driver to unit test your processing logic without a running Kafka cluster.

  8. Plan for state store recovery — When a Streams instance starts, it may need to restore its state store from the changelog topic. Plan for this recovery time in your deployment strategy.

Common Pitfalls and Solutions

PitfallImpactSolution
Using wrong stream/table semanticsData loss or incorrect aggregationsUse KStream for append-only events, KTable for upserts
Not configuring serialization error handlingApplication crashes on malformed inputSet default.deserialization.exception.handler
Unbounded state store growthDisk space exhaustionConfigure retention period or use windowed stores
Ignoring consumer lagFalling behind real-time processing requirementsMonitor lag and scale instances accordingly
Joining streams with different partition countsData mismatch — records with same key may be on different partitionsRe-key or repartition before joining
Not handling null keysNullPointerException in groupByKeyFilter out null keys before grouping

Performance Optimization

Kafka Streams performance depends on parallelism, serialization efficiency, and state store configuration:

// Configure for high throughput
Properties config = new Properties();
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
 
// Optimize RocksDB state store
config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
    CustomRocksDBConfig.class);
 
public class CustomRocksDBConfig implements RocksDBConfigSetter {
    @Override
    public void setConfig(String storeName, Options options,
                          Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(256 * 1024 * 1024); // 256MB
        tableConfig.setBlockSize(16 * 1024); // 16KB
        options.setTableFormatConfig(tableConfig);
        options.setMaxWriteBufferNumber(4);
        options.setWriteBufferSize(64 * 1024 * 1024); // 64MB
    }
}

Comparison with Alternatives

FeatureKafka StreamsApache FlinkApache Spark StreamingAkka Streams
Deployment modelLibrary (embedded)ClusterClusterLibrary (embedded)
State managementBuilt-in (RocksDB)Built-inExternalExternal
Exactly-onceYes (v2)YesMicro-batchNo (at-least-once)
WindowingTumbling, sliding, sessionRich windowingMicro-batch onlyCustom
LatencyLow (milliseconds)Low (milliseconds)Medium (seconds)Low (milliseconds)
Operational overheadLow (no cluster)High (cluster)High (cluster)Low
Language supportJava, ScalaJava, Scala, Python, SQLJava, Scala, Python, RScala, Java
Backpressure handlingAutomaticAutomaticLimitedAutomatic
Best forKafka-native processingComplex event processingBatch + stream unificationReactive systems

Kafka Streams is the best choice when your data is already in Kafka and you want to avoid the operational complexity of a separate stream processing cluster.

Advanced Patterns

Interactive Queries

Kafka Streams allows you to query state stores directly from your application, enabling real-time lookups without writing results to a separate database:

KafkaStreams streams = new KafkaStreams(topology, config);
 
// Query the state store for a specific customer's stats
ReadOnlyKeyValueStore<String, CustomerStats> store =
    streams.store(
        StoreQueryParameters.fromNameAndType(
            "customer-stats-store",
            QueryableStoreTypes.keyValueStore()
        )
    );
 
CustomerStats stats = store.get("customer-123");
System.out.println("Total orders: " + stats.getOrderCount());
 
// Range query: iterate over all customers
KeyValueIterator<String, CustomerStats> allStats = store.all();
while (allStats.hasNext()) {
    KeyValue<String, CustomerStats> entry = allStats.next();
    System.out.println(entry.key + ": " + entry.value);
}
allStats.close();

Global KTable

A Global KTable replicates the entire topic to every Streams instance, enabling lookups against a shared dataset without partitioning constraints:

GlobalKTable<String, Product> products =
    builder.globalTable("products");
 
// Join: enrich orders with product data using Global KTable
KStream<String, EnrichedOrder> enriched = orders.join(
    products,
    (key, order) -> order.getProductId(),  // Map order to product key
    (order, product) -> new EnrichedOrder(order, product)
);

Dead Letter Queue Pattern

Route processing errors to a separate topic for investigation:

public class DLQDeserializationHandler implements DeserializationExceptionHandler {
    @Override
    public DeserializationHandlerResponse handle(
        ProcessorContext<byte[], byte[]> context,
        byte[] data,
        Exception exception
    ) {
        // Send to dead letter queue
        context.forward("dlq-key", data,
            To.all().withTimestamp(context.timestamp()));
        return DeserializationHandlerResponse.CONTINUE;
    }
}

Testing Strategies

Kafka Streams provides a TopologyTestDriver for unit testing without a running Kafka cluster:

import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
 
@Test
void shouldProcessOrders() {
    TopologyTestDriver testDriver = new TopologyTestDriver(topology);
    TestInputTopic<String, Order> input = testDriver.createInputTopic(
        "orders", stringSerde.serializer(), orderSerde.serializer()
    );
    TestOutputTopic<String, OrderSummary> output = testDriver.createOutputTopic(
        "order-summaries", stringSerde.deserializer(), summarySerde.deserializer()
    );
 
    // Send test records
    input.pipeInput("order-1", new Order("order-1", "customer-1",
        new BigDecimal("100.00")));
    input.pipeInput("order-2", new Order("order-2", "customer-1",
        new BigDecimal("250.00")));
 
    // Verify output
    List<KeyValue<String, OrderSummary>> results = output.readKeyValuesToList();
    assertEquals(2, results.size());
    assertEquals("customer-1", results.get(0).key);
}

Future Outlook

Kafka Streams continues to evolve with the broader Kafka ecosystem. Recent improvements include better exactly-once semantics, improved scalability for large state stores, and enhanced interactive queries. The introduction of KIP-848 (the new consumer group protocol) will further improve rebalancing behavior and reduce processing latency during group membership changes.

The trend toward event-driven architectures and microservices ensures that Kafka Streams will remain a critical tool in the data engineering landscape. As organizations move away from batch processing toward real-time analytics, the demand for stream processing libraries that are easy to deploy and operate will only grow.

Conclusion

Kafka Streams is a powerful, embeddable stream processing library that brings real-time data processing to any Java or Scala application. Its tight integration with Kafka, built-in state management, and exactly-once semantics make it an excellent choice for teams that want stream processing without the operational overhead of a dedicated cluster.

Key takeaways:

  1. Kafka Streams runs as a library in your application—no separate cluster required
  2. KStream represents append-only event streams; KTable represents changelog streams with upsert semantics
  3. State stores backed by RocksDB provide local state with fault tolerance through Kafka changelog topics
  4. Windowed aggregations enable time-based computations with tumbling, sliding, and session windows
  5. Exactly-once processing guarantees ensure records are processed exactly once during failures
  6. Interactive queries allow direct state store lookups without external databases
  7. The TopologyTestDriver enables comprehensive unit testing without a running Kafka cluster
  8. Monitor consumer lag and state store size for production health

Start by building a simple filter-map-aggregate pipeline, then progressively add windowing, joins, and stateful processing as your requirements grow. For deeper exploration, see the Kafka Streams documentation and the Kafka Streams developer guide.