NeuroAgent

Optimize Java FlatMap for High-Throughput Kafka Production

Learn efficient techniques to optimize Java flatmap operations for high-throughput Kafka production. Achieve millions of operations per second with stream processing enhancements and Kafka producer tuning.

Question

What is the most efficient way to flatmap ClassA objects to ClassC objects before sending them to a Kafka producer in a high-throughput scenario?

I have the following class structure:

java
public ClassA {
  String prop1;
  List<ClassB> classBList;
}

public ClassB {
  String prop2;
  List<Integer> indexList;
  List<Integer> valueList;
}

public ClassC {
  String prop1;
  String prop2;
  Integer index;
  Integer value;
}

Currently, I’m using Java streams to convert ClassA objects to ClassC objects:

java
List<ClassC> flattenedList = classA.getClassBList().stream()
    .flatMap(e -> {
       List<ClassC> tempList = new ArrayList<>();
       for(int i=0; i<e.getIndexList().size(); i++) {
          ClassC temp = new ClassC(classA.getProp1(), e.getProp2(), 
                                  e.getIndexList().get(i), e.getValueList().get(i));
          tempList.add(temp);
       }
       return tempList;
    }).collect(Collectors.toList());

However, this approach is too slow for my requirements. I need to process millions of ClassA objects (which can result in hundreds of millions of ClassC objects) efficiently. Currently, processing 1 million ClassA objects (resulting in approximately 860 million ClassC objects) takes about 1 hour on a 128 VCPU machine with 64 consumer threads and partitions.

Increasing the number of partitions hasn’t improved performance. I’m open to custom serialization/deserialization solutions as well. The application is a Spring application running on AWS/Azure.

What are the best practices and optimization techniques to improve the performance of this flatmapping operation to achieve millions of operations per second?

NeuroAgent

The most efficient way to flatmap ClassA objects to ClassC objects for high-throughput Kafka production involves optimizing the stream transformation, reducing object creation overhead, and implementing proper Kafka producer configuration. Based on research showing Kafka can achieve over 1 million messages per second, your performance bottleneck is likely in the data transformation phase rather than Kafka itself.

Contents

Core Optimization Strategies

Replace your current flatmap approach with these high-performance alternatives:

1. Manual Iteration with Direct Producer Calls

java
// Avoid collecting to intermediate list - send directly to Kafka
try (Producer<String, ClassC> producer = createKafkaProducer()) {
    for (ClassA classA : classAList) {
        for (ClassB classB : classA.getClassBList()) {
            List<Integer> indices = classB.getIndexList();
            List<Integer> values = classB.getValueList();
            
            // Pre-size collection to avoid resizing
            for (int i = 0; i < indices.size(); i++) {
                ClassC classC = new ClassC(
                    classA.getProp1(), 
                    classB.getProp2(),
                    indices.get(i), 
                    values.get(i)
                );
                
                // Send directly instead of collecting
                ProducerRecord<String, ClassC> record = 
                    new ProducerRecord<>("topic", classC);
                producer.send(record);
            }
        }
    }
}

2. Parallel Stream with Proper Configuration

java
List<ClassC> flattenedList = classAList.parallelStream()
    .flatMap(classA -> classA.getClassBList().parallelStream()
        .flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
            .mapToObj(i -> new ClassC(
                classA.getProp1(),
                classB.getProp2(),
                classB.getIndexList().get(i),
                classB.getValueList().get(i)
            ))
        )
    )
    .collect(Collectors.toList());

Stream Processing Enhancements

1. Reduce Object Creation Overhead

java
// Use object pooling for frequently created objects
private static final ClassCObjectPool classCPool = new ClassCObjectPool();

// In the stream:
.flatMap(classA -> classA.getClassBList().stream()
    .flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
        .mapToObj(i -> {
            ClassC classC = classCPool.borrowObject();
            classC.setProp1(classA.getProp1());
            classC.setProp2(classB.getProp2());
            classC.setIndex(classB.getIndexList().get(i));
            classC.setValue(classB.getValueList().get(i));
            return classC;
        })
    )
)

2. Primitive Stream Optimization

java
// Convert to primitive arrays for better performance
.flatMap(classA -> {
    List<ClassB> classBList = classA.getClassBList();
    return IntStream.range(0, classBList.size())
        .mapToObj(i -> {
            ClassB classB = classBList.get(i);
            int[] indices = classB.getIndexList().stream().mapToInt(Integer::intValue).toArray();
            int[] values = classB.getValueList().stream().mapToInt(Integer::intValue).toArray();
            return IntStream.range(0, indices.length)
                .mapToObj(j -> new ClassC(
                    classA.getProp1(),
                    classB.getProp2(),
                    indices[j],
                    values[j]
                ));
        })
        .flatMap(Function.identity());
})

3. Batch Processing

java
// Process in batches to reduce memory pressure
int batchSize = 10000;
for (int i = 0; i < classAList.size(); i += batchSize) {
    List<ClassA> batch = classAList.subList(i, Math.min(i + batchSize, classAList.size()));
    List<ClassC> flattenedBatch = batch.stream()
        .flatMap(classA -> classA.getClassBList().stream()
            .flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
                .mapToObj(j -> new ClassC(
                    classA.getProp1(),
                    classB.getProp2(),
                    classB.getIndexList().get(j),
                    classB.getValueList().get(j)
                ))
            )
        )
        .collect(Collectors.toList());
    
    // Send batch to Kafka
    sendBatchToKafka(flattenedBatch);
}

Kafka Producer Optimization

1. Producer Configuration Tuning

java
Properties props = new Properties();
props.put("bootstrap.servers", "your-broker:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.your.package.CustomClassCSerializer");
props.put("batch.size", 16384);  // 16KB batches
props.put("linger.ms", 5);       // Wait 5ms before sending
props.put("buffer.memory", 33554432); // 32MB buffer
props.put("compression.type", "lz4");  // Fast compression
props.put("max.request.size", 1048576); // 1MB max request
props.put("acks", "1");          // Leader acknowledgment
props.put("retries", 3);         // Retry configuration
props.put("max.in.flight.requests.per.connection", 5);

2. Custom Serializer for ClassC

java
public class ClassCSerializer implements Serializer<ClassC> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(String topic, ClassC data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing ClassC", e);
        }
    }
}

3. Asynchronous Producer with Callbacks

java
Producer<String, ClassC> producer = new KafkaProducer<>(props);

// Send records asynchronously with callbacks
for (ClassC classC : flattenedList) {
    ProducerRecord<String, ClassC> record = new ProducerRecord<>("topic", classC);
    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            // Handle error
            System.err.println("Error sending record: " + exception);
        } else {
            // Success handling
            System.out.println("Record sent to partition " + metadata.partition());
        }
    });
}

Memory and GC Management

1. JVM Configuration

bash
# Optimize for high-throughput applications
-Xms8g -Xmx8g              # Fixed heap size
-XX:+UseG1GC               # Garbage First collector
-XX:MaxGCPauseMillis=200   # Target GC pause time
-XX:ParallelGCThreads=8    # Number of parallel GC threads
-XX:ConcGCThreads=2        # Number of concurrent GC threads
-XX:InitiatingHeapOccupancyPercent=35 # Start GC at 35% occupancy

2. Off-Heap Processing

java
// Use memory-mapped files for large datasets
FileChannel fileChannel = FileChannel.open(Paths.get("large-data.bin"), 
    StandardOpenOption.READ, StandardOpenOption.WRITE);
MappedByteBuffer buffer = fileChannel.map(
    FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());

3. Object Reuse Patterns

java
// Reuse ClassC objects to reduce GC pressure
public class ClassCReusable {
    private final List<ClassC> objectPool = new ArrayList<>();
    private final int maxPoolSize = 10000;
    
    public ClassC borrowObject() {
        if (objectPool.isEmpty()) {
            return new ClassC();
        }
        return objectPool.remove(objectPool.size() - 1);
    }
    
    public void returnObject(ClassC obj) {
        if (objectPool.size() < maxPoolSize) {
            objectPool.add(obj);
        }
    }
}

Alternative Implementation Approaches

1. Direct Producer without Intermediate Collection

java
public void processAndSendDirectly(List<ClassA> classAList, Producer<String, ClassC> producer) {
    for (ClassA classA : classAList) {
        String prop1 = classA.getProp1(); // Cache to avoid repeated calls
        for (ClassB classB : classA.getClassBList()) {
            String prop2 = classB.getProp2();
            List<Integer> indices = classB.getIndexList();
            List<Integer> values = classB.getValueList();
            
            // Direct iteration without streams
            for (int i = 0; i < indices.size(); i++) {
                ClassC classC = new ClassC(prop1, prop2, indices.get(i), values.get(i));
                ProducerRecord<String, ClassC> record = new ProducerRecord<>("topic", classC);
                producer.send(record);
            }
        }
    }
}

2. Reactive Programming with Project Reactor

java
Flux.fromIterable(classAList)
    .flatMap(classA -> Flux.fromIterable(classA.getClassBList())
        .flatMap(classB -> Flux.range(0, classB.getIndexList().size())
            .map(i -> new ClassC(
                classA.getProp1(),
                classB.getProp2(),
                classB.getIndexList().get(i),
                classB.getValueList().get(i)
            ))
        )
    )
    .bufferTimeout(1000, Duration.ofMillis(100)) // Batch every 100ms or 1000 items
    .subscribe(batch -> {
        // Send batch to Kafka
        sendBatchToKafka(batch);
    });

3. Fork-Join Framework for Parallel Processing

java
public List<ClassC> parallelFlatMap(List<ClassA> classAList) {
    ForkJoinPool customThreadPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    
    try {
        return customThreadPool.submit(() -> 
            classAList.parallelStream()
                .flatMap(classA -> classA.getClassBList().parallelStream()
                    .flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
                        .mapToObj(i -> new ClassC(
                            classA.getProp1(),
                            classB.getProp2(),
                            classB.getIndexList().get(i),
                            classB.getValueList().get(i)
                        ))
                    )
                )
                .collect(Collectors.toList())
        ).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException("Parallel processing failed", e);
    } finally {
        customThreadPool.shutdown();
    }
}

Benchmarking and Performance Monitoring

1. Performance Metrics Collection

java
public class PerformanceMonitor {
    private final AtomicLong totalProcessed = new AtomicLong(0);
    private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
    
    public void recordProcess(int count) {
        totalProcessed.addAndGet(count);
        long elapsed = System.currentTimeMillis() - startTime.get();
        double rate = (totalProcessed.get() * 1000.0) / elapsed;
        
        System.out.printf("Processed: %d, Rate: %.2f ops/sec%n", 
            totalProcessed.get(), rate);
    }
}

2. Microbenchmarking with JMH

java
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class FlatMapBenchmark {
    
    private List<ClassA> testData;
    
    @Setup
    public void setup() {
        // Generate test data
        testData = generateTestData(100000); // 100K ClassA objects
    }
    
    @Benchmark
    public void currentImplementation() {
        List<ClassC> result = testData.stream()
            .flatMap(classA -> classA.getClassBList().stream()
                .flatMap(classB -> {
                    List<ClassC> tempList = new ArrayList<>();
                    for (int i = 0; i < classB.getIndexList().size(); i++) {
                        tempList.add(new ClassC(
                            classA.getProp1(),
                            classB.getProp2(),
                            classB.getIndexList().get(i),
                            classB.getValueList().get(i)
                        ));
                    }
                    return tempList.stream();
                })
            )
            .collect(Collectors.toList());
    }
    
    @Benchmark
    public void optimizedImplementation() {
        List<ClassC> result = testData.parallelStream()
            .flatMap(classA -> classA.getClassBList().parallelStream()
                .flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
                    .mapToObj(i -> new ClassC(
                        classA.getProp1(),
                        classB.getProp2(),
                        classB.getIndexList().get(i),
                        classB.getValueList().get(i)
                    ))
                )
            )
            .collect(Collectors.toList());
    }
}

3. Kafka Monitoring Integration

java
// Add Kafka metrics monitoring
Map<String, String> metrics = new HashMap<>();
metrics.put("record-send-rate", "kafka.producer.record-send-rate");
metrics.put("byte-send-rate", "kafka.producer.byte-send-rate");
metrics.put("request-latency-avg", "kafka.producer.request-latency-avg");

// Configure Kafka metrics reporter
Properties props = new Properties();
props.put("metrics.reporters", "io.confluent.metrics.reporter.ConfluentMetricsReporter");
props.put("confluent.metrics.reporter.bootstrap.servers", "your-metrics-broker:9092");
props.put("confluent.metrics.reporter.topic.replicas", "1");
props.put("confluent.metrics.reporter.client.id", "your-application");

Conclusion

To achieve millions of operations per second in your flatmap-to-Kafka scenario, implement these key optimizations:

  1. Eliminate intermediate collections by sending directly to Kafka instead of collecting to lists first
  2. Use parallel streams with proper configuration for CPU-intensive transformations
  3. Tune Kafka producer settings for batching, compression, and asynchronous sending
  4. Optimize memory usage through object pooling and JVM configuration tuning
  5. Consider alternative approaches like manual iteration or reactive programming for better performance

Start with the direct producer approach as it typically yields the best performance by eliminating the intermediate collection bottleneck. Monitor your metrics and adjust based on your specific workload characteristics, keeping in mind that going beyond 2,000 Kafka partitions can actually hurt performance according to Confluent monitoring data.

Sources

  1. How Kafka Works - by Neo Kim and Stanislav Kozlovski
  2. Comparing Kafka and Alternatives for Event Streaming Solutions | MoldStud
  3. Effective Strategies for Stream Processing with Apache Kafka | MoldStud
  4. Introduction to Apache Kafka for Beginners - DEV Community
  5. Java Stream map() vs flatMap(): Ultimate Guide to Usage and Examples - ReviewInsights.com