What is the most efficient way to flatmap ClassA objects to ClassC objects before sending them to a Kafka producer in a high-throughput scenario?
I have the following class structure:
public ClassA {
String prop1;
List<ClassB> classBList;
}
public ClassB {
String prop2;
List<Integer> indexList;
List<Integer> valueList;
}
public ClassC {
String prop1;
String prop2;
Integer index;
Integer value;
}
Currently, I’m using Java streams to convert ClassA objects to ClassC objects:
List<ClassC> flattenedList = classA.getClassBList().stream()
.flatMap(e -> {
List<ClassC> tempList = new ArrayList<>();
for(int i=0; i<e.getIndexList().size(); i++) {
ClassC temp = new ClassC(classA.getProp1(), e.getProp2(),
e.getIndexList().get(i), e.getValueList().get(i));
tempList.add(temp);
}
return tempList;
}).collect(Collectors.toList());
However, this approach is too slow for my requirements. I need to process millions of ClassA objects (which can result in hundreds of millions of ClassC objects) efficiently. Currently, processing 1 million ClassA objects (resulting in approximately 860 million ClassC objects) takes about 1 hour on a 128 VCPU machine with 64 consumer threads and partitions.
Increasing the number of partitions hasn’t improved performance. I’m open to custom serialization/deserialization solutions as well. The application is a Spring application running on AWS/Azure.
What are the best practices and optimization techniques to improve the performance of this flatmapping operation to achieve millions of operations per second?
The most efficient way to flatmap ClassA objects to ClassC objects for high-throughput Kafka production involves optimizing the stream transformation, reducing object creation overhead, and implementing proper Kafka producer configuration. Based on research showing Kafka can achieve over 1 million messages per second, your performance bottleneck is likely in the data transformation phase rather than Kafka itself.
Contents
- Core Optimization Strategies
- Stream Processing Enhancements
- Kafka Producer Optimization
- Memory and GC Management
- Alternative Implementation Approaches
- Benchmarking and Performance Monitoring
Core Optimization Strategies
Replace your current flatmap approach with these high-performance alternatives:
1. Manual Iteration with Direct Producer Calls
// Avoid collecting to intermediate list - send directly to Kafka
try (Producer<String, ClassC> producer = createKafkaProducer()) {
for (ClassA classA : classAList) {
for (ClassB classB : classA.getClassBList()) {
List<Integer> indices = classB.getIndexList();
List<Integer> values = classB.getValueList();
// Pre-size collection to avoid resizing
for (int i = 0; i < indices.size(); i++) {
ClassC classC = new ClassC(
classA.getProp1(),
classB.getProp2(),
indices.get(i),
values.get(i)
);
// Send directly instead of collecting
ProducerRecord<String, ClassC> record =
new ProducerRecord<>("topic", classC);
producer.send(record);
}
}
}
}
2. Parallel Stream with Proper Configuration
List<ClassC> flattenedList = classAList.parallelStream()
.flatMap(classA -> classA.getClassBList().parallelStream()
.flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
.mapToObj(i -> new ClassC(
classA.getProp1(),
classB.getProp2(),
classB.getIndexList().get(i),
classB.getValueList().get(i)
))
)
)
.collect(Collectors.toList());
Stream Processing Enhancements
1. Reduce Object Creation Overhead
// Use object pooling for frequently created objects
private static final ClassCObjectPool classCPool = new ClassCObjectPool();
// In the stream:
.flatMap(classA -> classA.getClassBList().stream()
.flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
.mapToObj(i -> {
ClassC classC = classCPool.borrowObject();
classC.setProp1(classA.getProp1());
classC.setProp2(classB.getProp2());
classC.setIndex(classB.getIndexList().get(i));
classC.setValue(classB.getValueList().get(i));
return classC;
})
)
)
2. Primitive Stream Optimization
// Convert to primitive arrays for better performance
.flatMap(classA -> {
List<ClassB> classBList = classA.getClassBList();
return IntStream.range(0, classBList.size())
.mapToObj(i -> {
ClassB classB = classBList.get(i);
int[] indices = classB.getIndexList().stream().mapToInt(Integer::intValue).toArray();
int[] values = classB.getValueList().stream().mapToInt(Integer::intValue).toArray();
return IntStream.range(0, indices.length)
.mapToObj(j -> new ClassC(
classA.getProp1(),
classB.getProp2(),
indices[j],
values[j]
));
})
.flatMap(Function.identity());
})
3. Batch Processing
// Process in batches to reduce memory pressure
int batchSize = 10000;
for (int i = 0; i < classAList.size(); i += batchSize) {
List<ClassA> batch = classAList.subList(i, Math.min(i + batchSize, classAList.size()));
List<ClassC> flattenedBatch = batch.stream()
.flatMap(classA -> classA.getClassBList().stream()
.flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
.mapToObj(j -> new ClassC(
classA.getProp1(),
classB.getProp2(),
classB.getIndexList().get(j),
classB.getValueList().get(j)
))
)
)
.collect(Collectors.toList());
// Send batch to Kafka
sendBatchToKafka(flattenedBatch);
}
Kafka Producer Optimization
1. Producer Configuration Tuning
Properties props = new Properties();
props.put("bootstrap.servers", "your-broker:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.your.package.CustomClassCSerializer");
props.put("batch.size", 16384); // 16KB batches
props.put("linger.ms", 5); // Wait 5ms before sending
props.put("buffer.memory", 33554432); // 32MB buffer
props.put("compression.type", "lz4"); // Fast compression
props.put("max.request.size", 1048576); // 1MB max request
props.put("acks", "1"); // Leader acknowledgment
props.put("retries", 3); // Retry configuration
props.put("max.in.flight.requests.per.connection", 5);
2. Custom Serializer for ClassC
public class ClassCSerializer implements Serializer<ClassC> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, ClassC data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing ClassC", e);
}
}
}
3. Asynchronous Producer with Callbacks
Producer<String, ClassC> producer = new KafkaProducer<>(props);
// Send records asynchronously with callbacks
for (ClassC classC : flattenedList) {
ProducerRecord<String, ClassC> record = new ProducerRecord<>("topic", classC);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Handle error
System.err.println("Error sending record: " + exception);
} else {
// Success handling
System.out.println("Record sent to partition " + metadata.partition());
}
});
}
Memory and GC Management
1. JVM Configuration
# Optimize for high-throughput applications
-Xms8g -Xmx8g # Fixed heap size
-XX:+UseG1GC # Garbage First collector
-XX:MaxGCPauseMillis=200 # Target GC pause time
-XX:ParallelGCThreads=8 # Number of parallel GC threads
-XX:ConcGCThreads=2 # Number of concurrent GC threads
-XX:InitiatingHeapOccupancyPercent=35 # Start GC at 35% occupancy
2. Off-Heap Processing
// Use memory-mapped files for large datasets
FileChannel fileChannel = FileChannel.open(Paths.get("large-data.bin"),
StandardOpenOption.READ, StandardOpenOption.WRITE);
MappedByteBuffer buffer = fileChannel.map(
FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
3. Object Reuse Patterns
// Reuse ClassC objects to reduce GC pressure
public class ClassCReusable {
private final List<ClassC> objectPool = new ArrayList<>();
private final int maxPoolSize = 10000;
public ClassC borrowObject() {
if (objectPool.isEmpty()) {
return new ClassC();
}
return objectPool.remove(objectPool.size() - 1);
}
public void returnObject(ClassC obj) {
if (objectPool.size() < maxPoolSize) {
objectPool.add(obj);
}
}
}
Alternative Implementation Approaches
1. Direct Producer without Intermediate Collection
public void processAndSendDirectly(List<ClassA> classAList, Producer<String, ClassC> producer) {
for (ClassA classA : classAList) {
String prop1 = classA.getProp1(); // Cache to avoid repeated calls
for (ClassB classB : classA.getClassBList()) {
String prop2 = classB.getProp2();
List<Integer> indices = classB.getIndexList();
List<Integer> values = classB.getValueList();
// Direct iteration without streams
for (int i = 0; i < indices.size(); i++) {
ClassC classC = new ClassC(prop1, prop2, indices.get(i), values.get(i));
ProducerRecord<String, ClassC> record = new ProducerRecord<>("topic", classC);
producer.send(record);
}
}
}
}
2. Reactive Programming with Project Reactor
Flux.fromIterable(classAList)
.flatMap(classA -> Flux.fromIterable(classA.getClassBList())
.flatMap(classB -> Flux.range(0, classB.getIndexList().size())
.map(i -> new ClassC(
classA.getProp1(),
classB.getProp2(),
classB.getIndexList().get(i),
classB.getValueList().get(i)
))
)
)
.bufferTimeout(1000, Duration.ofMillis(100)) // Batch every 100ms or 1000 items
.subscribe(batch -> {
// Send batch to Kafka
sendBatchToKafka(batch);
});
3. Fork-Join Framework for Parallel Processing
public List<ClassC> parallelFlatMap(List<ClassA> classAList) {
ForkJoinPool customThreadPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
try {
return customThreadPool.submit(() ->
classAList.parallelStream()
.flatMap(classA -> classA.getClassBList().parallelStream()
.flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
.mapToObj(i -> new ClassC(
classA.getProp1(),
classB.getProp2(),
classB.getIndexList().get(i),
classB.getValueList().get(i)
))
)
)
.collect(Collectors.toList())
).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Parallel processing failed", e);
} finally {
customThreadPool.shutdown();
}
}
Benchmarking and Performance Monitoring
1. Performance Metrics Collection
public class PerformanceMonitor {
private final AtomicLong totalProcessed = new AtomicLong(0);
private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
public void recordProcess(int count) {
totalProcessed.addAndGet(count);
long elapsed = System.currentTimeMillis() - startTime.get();
double rate = (totalProcessed.get() * 1000.0) / elapsed;
System.out.printf("Processed: %d, Rate: %.2f ops/sec%n",
totalProcessed.get(), rate);
}
}
2. Microbenchmarking with JMH
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class FlatMapBenchmark {
private List<ClassA> testData;
@Setup
public void setup() {
// Generate test data
testData = generateTestData(100000); // 100K ClassA objects
}
@Benchmark
public void currentImplementation() {
List<ClassC> result = testData.stream()
.flatMap(classA -> classA.getClassBList().stream()
.flatMap(classB -> {
List<ClassC> tempList = new ArrayList<>();
for (int i = 0; i < classB.getIndexList().size(); i++) {
tempList.add(new ClassC(
classA.getProp1(),
classB.getProp2(),
classB.getIndexList().get(i),
classB.getValueList().get(i)
));
}
return tempList.stream();
})
)
.collect(Collectors.toList());
}
@Benchmark
public void optimizedImplementation() {
List<ClassC> result = testData.parallelStream()
.flatMap(classA -> classA.getClassBList().parallelStream()
.flatMap(classB -> IntStream.range(0, classB.getIndexList().size())
.mapToObj(i -> new ClassC(
classA.getProp1(),
classB.getProp2(),
classB.getIndexList().get(i),
classB.getValueList().get(i)
))
)
)
.collect(Collectors.toList());
}
}
3. Kafka Monitoring Integration
// Add Kafka metrics monitoring
Map<String, String> metrics = new HashMap<>();
metrics.put("record-send-rate", "kafka.producer.record-send-rate");
metrics.put("byte-send-rate", "kafka.producer.byte-send-rate");
metrics.put("request-latency-avg", "kafka.producer.request-latency-avg");
// Configure Kafka metrics reporter
Properties props = new Properties();
props.put("metrics.reporters", "io.confluent.metrics.reporter.ConfluentMetricsReporter");
props.put("confluent.metrics.reporter.bootstrap.servers", "your-metrics-broker:9092");
props.put("confluent.metrics.reporter.topic.replicas", "1");
props.put("confluent.metrics.reporter.client.id", "your-application");
Conclusion
To achieve millions of operations per second in your flatmap-to-Kafka scenario, implement these key optimizations:
- Eliminate intermediate collections by sending directly to Kafka instead of collecting to lists first
- Use parallel streams with proper configuration for CPU-intensive transformations
- Tune Kafka producer settings for batching, compression, and asynchronous sending
- Optimize memory usage through object pooling and JVM configuration tuning
- Consider alternative approaches like manual iteration or reactive programming for better performance
Start with the direct producer approach as it typically yields the best performance by eliminating the intermediate collection bottleneck. Monitor your metrics and adjust based on your specific workload characteristics, keeping in mind that going beyond 2,000 Kafka partitions can actually hurt performance according to Confluent monitoring data.
Sources
- How Kafka Works - by Neo Kim and Stanislav Kozlovski
- Comparing Kafka and Alternatives for Event Streaming Solutions | MoldStud
- Effective Strategies for Stream Processing with Apache Kafka | MoldStud
- Introduction to Apache Kafka for Beginners - DEV Community
- Java Stream map() vs flatMap(): Ultimate Guide to Usage and Examples - ReviewInsights.com