Programming

librdkafka: Kafka Producer Consumer Tuning for High Throughput

Optimize Kafka producers and consumers with librdkafka/rdkafka in Rust for max throughput on small JSON messages. Tune batching, compression, memory limits, and backpressure to handle tens of GB/hour without crashes.

1 answer 1 view

How should I configure Kafka consumers and producers (librdkafka / rdkafka in Rust) to maximize throughput for very small (≤100 bytes) JSON messages (tens to hundreds of GB/hour) while preventing excessive memory growth?

Context:
I have a data-processing pipeline that sends and receives very small JSON messages (typically ≤100 bytes). With default client settings throughput is low (defaults optimize latency). After tuning the client options I’ve achieved ~100× higher throughput and CPU usage rises to ~60–70%, but the process eventually consumes all memory (64 GB) and crashes after about an hour.

Consumer config I’m using:

rust
let mut consumer_config = rdkafka::config::ClientConfig::new();
consumer_config.set("bootstrap.servers", "192.168.0.2:9092");
consumer_config.set("client.id", "client");
consumer_config.set("group.id", "group");
consumer_config.set("enable.auto.commit", "false");
consumer_config.set("auto.offset.reset", "earliest");
consumer_config.set("fetch.min.bytes", "2097152"); // 2MB
consumer_config.set("fetch.max.bytes", "104857600"); // 100MB
consumer_config.set("max.partition.fetch.bytes", "104857600"); // 100MB

let consumer = consumer_config.create::<rdkafka::consumer::BaseConsumer>().unwrap();

Producer config I’m using:

rust
let mut producer_config = rdkafka::config::ClientConfig::new();
producer_config.set("bootstrap.servers", "192.168.0.2:9092");
producer_config.set("batch.size", "200000");
producer_config.set("queue.buffering.max.messages", "300000");
producer_config.set("linger.ms", "100");
producer_config.set("compression.type", "lz4");
producer_config.set("acks", "1");
producer_config.set("max.in.flight.requests.per.connection", "100");

let producer = producer_config.create::<rdkafka::producer::BaseProducer>().unwrap();

Producer throttling / polling loop I’m using:

rust
let producer_poll_timeout = std::time::Duration::from_millis(50); // 50 ms
let producer_in_flight_count_max = 100000;

while producer.in_flight_count() > producer_in_flight_count_max {
 producer.poll(producer_poll_timeout);
}

Assumptions I want to confirm:

  • I believe batch.size is in bytes.
  • I believe queue.buffering.max.messages is a count of messages (not bytes).
  • I believe max.in.flight.requests.per.connection is a count of in-flight requests.
  • I’m unsure whether I should throttle based on producer.in_flight_count() or producer.len() (or some other metric/API).

Specific questions:

  1. Are my assumptions about the units/semantics of batch.size, queue.buffering.max.messages, and max.in.flight.requests.per.connection correct?
  2. Which client configuration options most strongly affect throughput for very small messages, and how should I tune them (typical tuning direction and example values) for a latency-insensitive, high-throughput workload? Please cover consumer settings (e.g., fetch.min.bytes, fetch.max.bytes, max.partition.fetch.bytes, fetch.wait.max.ms/max.poll.records) and producer settings (e.g., batch.size, linger.ms / queue.buffering.max.ms, queue.buffering.max.messages / queue.buffering.max.kbytes, compression.type, acks, max.in.flight.requests.per.connection).
  3. How can I bound the producer’s memory usage and prevent unbounded buffering? Which settings provide firm limits (by messages or bytes), and what are good practices to apply backpressure? Is checking producer.in_flight_count() and polling an appropriate approach, or should I use producer.len(), blocking sends, a bounded channel, or different librdkafka APIs?
  4. What is the recommended polling strategy for the producer (poll frequency, thread model, use of a dedicated poll thread) to ensure delivery reports are processed and memory is reclaimed promptly?
  5. What are the trade-offs between batching, compression (lz4), linger.ms, and acks for small messages? Which combinations typically maximize throughput without causing memory pressure?
  6. Are there rdkafka/librdkafka-specific metrics, stats callbacks, or debugging techniques to determine whether memory growth is due to queued-but-unsent messages, in-flight requests/acks, or an application-level leak?
  7. Any architecture-level suggestions (partition count, producer parallelism, increasing broker resources, flush/acks patterns) to safely increase throughput for this workload?

Code snippets and parameter names above are from rdkafka (Rust wrapper around librdkafka). Concrete recommended settings or patterns for this environment (small messages, very high throughput, latency not critical) would be most helpful.

For very-small (≤100 B) JSON messages with librdkafka/rdkafka, the fastest wins are aggressive batching + compression (increase linger.ms and batch bytes) combined with hard producer-side caps (use queue.buffering.max.kbytes and queue.buffering.max.messages) and application-level backpressure so the client never buffers indefinitely. Tune consumer fetch sizes so you receive big batches without multiplying memory per partition, run a dedicated poll thread (or use Threaded/FutureProducer), and instrument librdkafka stats to see queued vs in‑flight counts while you iterate on limits.


Contents


Assumptions: units & semantics (batch.size, queue.buffering.*, max.in.flight)

Short answers to the three unit/semantics items:

A few caveats and clarifications:

  • librdkafka/rust-rdkafka keep messages in a C-side buffer until they are sent and acknowledged; the library is asynchronous by design (see the producer docs: https://docs.rs/rdkafka/latest/rdkafka/producer/index.html). That means the library’s internal byte and message accounting (the queue.buffering.* knobs) and the application’s own queued copies both matter.
  • producer.in_flight_count() counts messages already sent to brokers and awaiting ACKs (useful for monitoring). producer.len() (or the queue-length API) reflects messages queued locally waiting to be sent. Rely on the library’s “QueueFull” response as the authoritative signal to stop enqueueing more — use in_flight_count() and len() for metrics and heuristics. See the rdkafka producer docs: https://docs.rs/rdkafka/latest/rdkafka/producer/struct.BaseProducer.html.

Producer tuning for tiny messages (librdkafka / rdkafka)

What actually moves the throughput needle for ≤100‑byte JSON messages:

  • Batching (linger + batch size / num-messages): bigger batches amortize per-record overhead.
  • Compression: compressing large batches reduces network I/O and broker CPU work per record.
  • Reducing per-request latency (fewer tiny requests) by increasing linger.ms and collecting many messages.
  • Keeping producer-side memory bounded so you don’t build an unbounded backlog.

Practical tuning directions (typical starting values and reasoning):

  • linger.ms — increase from the default 5 ms to 50–200 ms (100 ms is a common sweet spot). Confluent documentation notes defaults favor latency and recommends >50 ms for throughput — throughput often keeps improving up to 100–1000 ms depending on message size and pattern: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html.

  • batch.size — measured in bytes (see GitHub discussion). For very small messages the effective batch should be large enough to hold many messages; set this to a few hundred KB to 1 MB (e.g., 256k–1M). If your client ignores or doesn’t expose a Java-like batch.size, rely on linger.ms + queue.buffering.* to form big batches.

  • queue.buffering.max.kbytes / queue.buffering.max.messages — use these to limit the absolute producer memory footprint (see previous section and https://dsinecos.github.io/blog/Learning-Kafka-Configuring-Kafka-Producer-for-High-Throughput). If you want to cap producer buffers to, say, 128 MB, set queue.buffering.max.kbytes=131072 (128*1024).

  • compression.type — LZ4 is a low-CPU, low-latency compressor that often wins for tiny JSON when batches are large. ZSTD gives better compression but costs more CPU; use it if network, not CPU, is the bottleneck.

  • acks — acks=1 yields lower end-to-end latency and higher throughput; acks=all increases durability at the cost of throughput. Choose based on durability needs.

  • max.in.flight.requests.per.connection — higher values increase parallelism but also increase the number of outstanding requests (and memory tied up in wire buffers). If you enable idempotence you must keep this ≤5; otherwise 5–25 is a pragmatic range. See the request-per-connection explanation: https://stackoverflow.com/questions/62210167/kafka-max-in-flight-requests-per-connection-is-per-producer-or-session.

  • message.timeout.ms — set a sensible expiry for messages that can’t be delivered; this prevents messages from sitting forever in the queue if the brokers are overwhelmed.

Concrete example (start point; tune from there — explanation follows in memory section):

rust
let mut producer_cfg = rdkafka::config::ClientConfig::new();
producer_cfg.set("bootstrap.servers", "192.168.0.2:9092");
producer_cfg.set("linger.ms", "100"); // more batching
producer_cfg.set("batch.size", "524288"); // 512 KB (if supported)
producer_cfg.set("queue.buffering.max.kbytes", "131072"); // 128 MB cap
producer_cfg.set("queue.buffering.max.messages", "200000");//
producer_cfg.set("compression.type", "lz4");
producer_cfg.set("acks", "1");
producer_cfg.set("max.in.flight.requests.per.connection", "10");
producer_cfg.set("message.timeout.ms", "300000"); // 5 minutes
producer_cfg.set("statistics.interval.ms", "1000"); // enable stats JSON

Why these numbers? For tiny payloads you want bigger linger and batch bytes so each request carries many messages; but you also need a firm queue.buffering.* limit to avoid unbounded memory growth (see next section).


Consumer tuning for batching (librdkafka / rdkafka)

On the consumer side, your goal is to receive big batches without exploding memory because per-partition limits multiply.

Important knobs and starting ideas:

  • fetch.min.bytes — raise from default to something like 128 KB–2 MB so the broker sends larger chunks instead of many small responses. This improves throughput but increases the delay until the broker sends data (it will wait up to fetch.wait.max.ms).

  • fetch.wait.max.ms (sometimes called fetch.wait.max.ms) — set to 50–200 ms so the broker won’t wait forever to fill the fetch.min.bytes bucket.

  • max.partition.fetch.bytes — set a sensible per-partition cap. Beware: if you have many partitions assigned, per-partition caps multiply into large memory footprints on the client. For example, max.partition.fetch.bytes=104857600 (100 MB) × 100 partitions = 10 GB potential memory. For tiny messages you often want something like 1 MB–4 MB per partition, depending on partition count.

  • fetch.max.bytes — a global cap per broker request; use this to bound worst-case memory from a single fetch.

  • Processing model — poll frequently and hand batches to a bounded worker pool (so processing backlog is limited). Commit offsets in batches after processing.

Example consumer settings to start with:

rust
consumer_cfg.set("fetch.min.bytes", "262144"); // 256 KB
consumer_cfg.set("fetch.wait.max.ms", "100");
consumer_cfg.set("max.partition.fetch.bytes", "1048576"); // 1 MB per partition
consumer_cfg.set("fetch.max.bytes", "67108864"); // 64 MB global cap
consumer_cfg.set("enable.auto.commit", "false"); // manual batched commits

If you have tens or hundreds of partitions, reduce max.partition.fetch.bytes proportionally.

Useful reference: Karafka consumer tuning discussion: https://karafka.io/docs/Latency-and-Throughput/.


Bounding producer memory & backpressure (practical patterns)

Why did your process grow to 64 GB? Common causes:

  • The producer was allowed to accumulate very large queues (either via queue.buffering.* or because your app kept copies of payloads).
  • A slow broker or network bottleneck caused the backlog to grow slowly while you kept enqueuing.
  • Delivery reports / polling weren’t processed frequently enough, so the library couldn’t reclaim buffers.

Firm limits you can set (enforced by librdkafka):

  • queue.buffering.max.messages — hard max number of messages queued locally (message-count).
  • queue.buffering.max.kbytes — hard max total kilobytes queued locally (byte-size).

If those are set, produce calls will fail with QUEUE FULL when limits are hit. Treat that failure as a backpressure signal.

Backpressure strategies (pick one or combine):

  1. Prefer a bounded application queue (recommended)
  • Use a bounded channel (sync_channel / crossbeam / tokio bounded mpsc) between producers and the rdkafka sending thread. The bounded channel capacity should be chosen from your memory budget and expected message size. This prevents the rest of your process from allocating until the producer has drained some messages.
  1. Rely on produce() returning QueueFull and block/retry
  • When produce()/send() returns QueueFull, call producer.poll() in a tight-but-yielding loop and retry after a short sleep. Don’t busy-spin; use small sleeps or exponential backoff.
  1. Use the library’s caps + bounded channel together
  • Set queue.buffering.max.kbytes to a known cap and put a bounded channel in front of the producer so the application is back-pressured before hitting the cap.

Which metric to throttle on?

  • Use producer.len() (queued local messages) to make active throttling decisions if you need a fast, predictive heuristic.
  • Use producer.in_flight_count() to monitor outstanding network activity — useful for health checks.
  • Most importantly, treat QueueFull returned by produce as the authoritative signal: when queuing fails, stop adding new messages until you poll/drain.

Minimal example pattern (conceptual Rust pseudocode — adapt to your rdkafka API):

rust
// bounded channel approach
let (tx, rx) = crossbeam_channel::bounded(100_000); // blocks producers when full

// sender thread
std::thread::spawn(move || {
 for payload in rx.iter() {
 loop {
 match producer.produce(
 &topic, Partition::Any, OwnedMessage::from(payload)
 ) {
 Ok(_) => break,
 Err(QueueFull(_)) => {
 producer.poll(Duration::from_millis(10)); // service delivery reports
 std::thread::sleep(Duration::from_millis(2));
 },
 Err(e) => { /* handle other errors */ }
 }
 }
 }
});

Key points:

  • Don’t keep huge Rust-side buffers (Strings/Vectors) around — move ownership into the produce call if possible to avoid duplicate copies.
  • Use queue.buffering.max.kbytes to convert a desired memory budget into a firm limit. Compute it from estimated average message size plus overhead and desired seconds-of-backlog:
  • throughput_bytes_per_sec ≈ msgs_per_sec × avg_payload_plus_overhead_bytes
  • desired_backlog_seconds × throughput_bytes_per_sec / 1024 ≈ queue.buffering.max.kbytes
  • Example: if you expect 300k msgs/s, avg effective size 300 B → ~90 MB/s. For a 2-second buffer, set ~180 MB → queue.buffering.max.kbytes=184320.

Producer polling strategy (how often, thread model)

Polling is how librdkafka processes delivery reports, retries and reclaims buffers. If you only poll when buffers are full you’ll see memory pile up.

Recommendations:

  • Use ThreadedProducer or FutureProducer if you want the library to manage polling automatically. Otherwise, create a dedicated poll thread.
  • Poll interval: ~5–50 ms for high-throughput workloads. If you can’t dedicate a thread, poll in the hot path frequently (poll(0) or poll(1) is cheap) but avoid blocking the critical path too long.
  • Do not poll only when in_flight_count() > threshold. Poll continuously at a fixed cadence so delivery reports are processed promptly.

Example poll thread (Rust-ish pseudocode):

rust
std::thread::spawn(move || {
 loop {
 producer.poll(Duration::from_millis(10)); // process delivery callbacks
 std::thread::sleep(Duration::from_millis(5)); // simple rate control
 }
});

If using a dedicated thread you can call producer.poll() with small timeouts and avoid putting polling in the path that enqueues messages (keeps latency predictable).

Reference: rdkafka producer docs describe the async buffer and the need to poll to service delivery reports: https://docs.rs/rdkafka/latest/rdkafka/producer/index.html.


Trade‑offs between batching, compression, linger.ms and acks (practical guidance)

  • Bigger batching (higher linger.ms, larger batch bytes) → higher throughput, better compression efficiency, lower broker-side per-message overhead. Cost: higher end-to-end latency and larger transient memory use while the batch is being built.

  • Compression:

  • LZ4 — low CPU, decent for medium compression; often best when CPU is already loaded and you need more network efficiency for small messages when batched.

  • ZSTD — better compression ratio (fewer bytes on wire), higher CPU; good if network is the bottleneck and you have CPU headroom.

  • For tiny messages, compression only helps once you batch messages together.

  • acks:

  • acks=1 → fastest, less memory tied to retries, but risk of data loss on broker failure.

  • acks=all → safer but heavier on broker and will slightly reduce throughput.

  • If you need exactly-once or no-duplicates, enable idempotence/transactions (which constrains max.in.flight.requests.per.connection and impacts throughput).

Typical best-for-throughput combo (latency-insensitive):

  • linger.ms = 100 ms
  • batch.size = 256k–1M (bytes)
  • compression.type = lz4 (or zstd-level-1 if CPU fine)
  • acks = 1 (unless durability requires all)
  • queue.buffering.max.kbytes set to a cap you’re willing to accept

That combo gives good throughput while still allowing a firm cap on memory.


Diagnostics: librdkafka/rust-rdkafka metrics & debugging (how to find what’s growing)

Tools and signals to differentiate queued vs in-flight vs app leaks:

  • Enable librdkafka JSON stats (set statistics.interval.ms) and implement a stats callback. Stats include queued bytes, number of messages queued, in-flight requests, tx/rx counters and broker-level connection state. This is the first place to see whether messages are queued locally or stuck waiting for ACKs. See the librdkafka config reference: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md.

  • Use rdkafka methods for live probes:

  • producer.in_flight_count() — how many messages are currently in flight awaiting ACKs.

  • producer.len() or an equivalent queue-length API — how many messages are queued locally. (Check the crate docs for exact method names: https://docs.rs/rdkafka/latest/rdkafka/producer/struct.BaseProducer.html.)

  • Broker-side view: monitor broker produce/receive rates, network utilization and ISR state. If the broker is the bottleneck you’ll see increasing network tx or request queueing on the broker.

  • Heap / allocator tools for native leaks:

  • Use jemalloc, tcmalloc or OS-level tools (pmap/smaps, perf, heaptrack) to see which allocator regions grow.

  • Remember rust-rdkafka uses librdkafka ©; leaks can be on either the Rust side (holding onto payloads) or on librdkafka if delivery reports aren’t polled.

  • Debug logging: enable debug (careful — noisy in production) to trace queueing/requests. Use it briefly to confirm flow.

  • Practical checks:

  • Do stats show large queued bytes? Then producer buffer is backing up — reduce queue.buffering.max.kbytes and add backpressure.

  • Do in-flight counts stay large but queued bytes are low? Then requests are inflight and the broker or network is slow.

  • No queued/in-flight growth but RSS grows? Likely an application heap growth (retain buffers, caches, etc.) and you should use a heap profiler.


Architecture-level suggestions to safely increase throughput (partitions, parallelism, packing)

  • Increase partition count to parallelize consumption and broker-side ingest (more partitions → more parallel fetch/generate throughput). But watch per-partition consumer memory (max.partition.fetch.bytes multiplies).

  • Scale horizontally: run more producer instances/threads each with modest local buffers rather than one huge producer instance.

  • Broker resources: add brokers, improve NICs, use SSDs and tune broker replica.fetch.* and network settings if brokers become the bottleneck.

  • Message packing: if your application allows it, pack multiple JSON records into one Kafka message (application-level batching). That reduces per-message overhead dramatically and is often the single best improvement for tiny messages. It does add serialization/aggregation complexity and increases consumer work to unpack.

  • Use client-side rate limiting/backpressure: if the cluster can’t keep up, drop or persist to another store, or slow producers. Don’t let clients buffer forever.

  • Reconsider durability settings: if you can tolerate occasional loss, acks=1 + lower retries increases throughput. If you need strong durability, plan to scale brokers accordingly.


Concrete example configs & patterns (ready-to-adapt)

Producer (example balanced for high throughput + bounded memory):

rust
let mut producer_cfg = rdkafka::config::ClientConfig::new();
producer_cfg.set("bootstrap.servers", "192.168.0.2:9092");
producer_cfg.set("linger.ms", "100");
producer_cfg.set("batch.size", "524288"); // 512 KB
producer_cfg.set("queue.buffering.max.kbytes", "131072"); // 128 MB cap
producer_cfg.set("queue.buffering.max.messages", "200000");
producer_cfg.set("compression.type", "lz4");
producer_cfg.set("acks", "1");
producer_cfg.set("max.in.flight.requests.per.connection", "10");
producer_cfg.set("message.timeout.ms", "300000");
producer_cfg.set("statistics.interval.ms", "1000"); // JSON stats every second
let producer = producer_cfg.create::<rdkafka::producer::BaseProducer>()?;

Consumer (example to get larger batches without exploding per-partition memory):

rust
let mut consumer_cfg = rdkafka::config::ClientConfig::new();
consumer_cfg.set("bootstrap.servers", "192.168.0.2:9092");
consumer_cfg.set("group.id", "group");
consumer_cfg.set("enable.auto.commit", "false");
consumer_cfg.set("fetch.min.bytes", "262144"); // 256 KB
consumer_cfg.set("fetch.wait.max.ms", "100"); // don't wait forever
consumer_cfg.set("max.partition.fetch.bytes", "1048576"); // 1 MB per partition
consumer_cfg.set("fetch.max.bytes", "67108864"); // 64 MB global cap
let consumer = consumer_cfg.create::<rdkafka::consumer::BaseConsumer>()?;

Backpressure pattern summary:

  • Put a bounded channel between producers and the rdkafka sender thread.
  • The sender thread calls produce() and loops on QueueFull with small producer.poll() calls; do not busy-wait.
  • Always run a dedicated poll thread (or use ThreadedProducer) to process delivery reports and free C-side buffers.

Sources

  1. Learning Kafka - Configuring Kafka Producer for High Throughput
  2. Tuning for Performance — WarpStream Kafka client
  3. librdkafka/CONFIGURATION.md (Confluent GitHub)
  4. librdkafka: INTRODUCTION (Confluent docs)
  5. Karafka — Latency and Throughput
  6. rdkafka::producer - Rust docs
  7. BaseProducer in rdkafka::producer - Rust docs
  8. rust-rdkafka GitHub repo (fede1024/rust-rdkafka)
  9. Meaning of ‘Batch size’ in rdkafka_performance tool (librdkafka issue)
  10. Stack Overflow — max.in.flight.requests.per.connection semantics

Conclusion

For tiny (≤100 B) JSON messages with rdkafka/librdkafka, the fastest reliable approach is to: (1) batch aggressively (increase linger.ms and batch bytes), (2) compress batches (lz4 or zstd depending on CPU vs network), and (3) enforce firm producer-side caps (queue.buffering.max.kbytes / queue.buffering.max.messages) plus application-level backpressure (bounded channel or blocking/retry on QueueFull). Also tune consumer fetch sizes so you receive efficient batches without multiplying per-partition memory, run a dedicated poll thread (or ThreadedProducer/FutureProducer), and use librdkafka stats + producer.in_flight_count() / queue-length checks to debug and tune. Those measures let you hit very high throughput while preventing unbounded memory growth from the producer or consumer.

Authors
Verified by moderation
Moderation
librdkafka: Kafka Producer Consumer Tuning for High Throughput