Building Event-Driven Microservices with Kafka and Spring Boot
Event-driven architecture has become the backbone of modern distributed systems. In this article, I'll share my experience building event-driven microservices using Apache Kafka and Spring Boot.
Why Event-Driven Architecture?
Traditional request-response patterns have limitations:
- Tight Coupling: Services depend directly on each other
- Synchronous: Caller waits for response
- Limited Scalability: Bottlenecks in synchronous chains
- Poor Resilience: Failures propagate immediately
Event-driven architecture solves these problems by:
- Decoupling services through events
- Enabling asynchronous processing
- Supporting horizontal scaling
- Improving fault tolerance
Architecture Overview
┌─────────────┐ Events ┌─────────────┐
│ Service A │─────────────────>│ Kafka │
└─────────────┘ └──────┬──────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Service B │ │ Service C │ │ Service D │
└────────────┘ └────────────┘ └────────────┘
Implementation with Spring Boot
1. Setup Dependencies
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2. Configure Kafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
3. Define Events
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderCreatedEvent {
private String orderId;
private String customerId;
private BigDecimal amount;
private LocalDateTime createdAt;
private List<OrderItem> items;
}
@Data
public class OrderItem {
private String productId;
private int quantity;
private BigDecimal price;
}
4. Produce Events
@Service
@Slf4j
public class OrderService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private OrderRepository orderRepository;
@Transactional
public Order createOrder(OrderRequest request) {
// Save order to database
Order order = orderRepository.save(request.toEntity());
// Publish event
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.amount(order.getTotalAmount())
.createdAt(order.getCreatedAt())
.items(order.getItems())
.build();
kafkaTemplate.send("order.created", order.getId(), event)
.addCallback(
result -> log.info("Event published: {}", event),
ex -> log.error("Failed to publish event", ex)
);
return order;
}
}
5. Consume Events
@Service
@Slf4j
public class NotificationService {
@Autowired
private EmailService emailService;
@KafkaListener(
topics = "order.created",
groupId = "notification-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Received order created event: {}", event);
try {
// Send confirmation email
emailService.sendOrderConfirmation(
event.getCustomerId(),
event.getOrderId(),
event.getAmount()
);
log.info("Order confirmation sent for: {}", event.getOrderId());
} catch (Exception e) {
log.error("Failed to send notification for order: {}",
event.getOrderId(), e);
throw e; // Will trigger retry
}
}
}
Best Practices
1. Event Schema Evolution
Use schema registry (like Confluent Schema Registry) to manage event schemas:
@Data
@Schema(namespace = "com.example.events", version = "1.0")
public class OrderCreatedEvent {
@Schema(required = true)
private String orderId;
@Schema(required = true)
private String customerId;
@Schema(required = false, defaultValue = "0")
private BigDecimal discount; // New field in v2
}
2. Idempotent Consumers
Ensure consumers can handle duplicate messages:
@Service
public class PaymentService {
@Autowired
private PaymentRepository paymentRepository;
@KafkaListener(topics = "order.created")
@Transactional
public void processPayment(OrderCreatedEvent event) {
// Check if already processed
if (paymentRepository.existsByOrderId(event.getOrderId())) {
log.warn("Payment already processed for order: {}",
event.getOrderId());
return;
}
// Process payment
Payment payment = processPaymentLogic(event);
paymentRepository.save(payment);
}
}
3. Dead Letter Queue (DLQ)
Handle failed messages:
@Configuration
public class KafkaErrorHandler {
@Bean
public DefaultErrorHandler errorHandler() {
BackOff backOff = new FixedBackOff(1000L, 3L); // 3 retries with 1s delay
DefaultErrorHandler handler = new DefaultErrorHandler(
(record, exception) -> {
// Send to DLQ
kafkaTemplate.send(
"order.created.dlq",
record.key(),
record.value()
);
},
backOff
);
return handler;
}
}
4. Monitoring & Observability
@Aspect
@Component
public class KafkaMetricsAspect {
@Autowired
private MeterRegistry meterRegistry;
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public Object measureConsumer(ProceedingJoinPoint pjp) throws Throwable {
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = pjp.proceed();
sample.stop(Timer.builder("kafka.consumer.duration")
.tag("method", pjp.getSignature().getName())
.tag("status", "success")
.register(meterRegistry));
return result;
} catch (Exception e) {
sample.stop(Timer.builder("kafka.consumer.duration")
.tag("method", pjp.getSignature().getName())
.tag("status", "error")
.register(meterRegistry));
throw e;
}
}
}
Real-World Use Cases
1. Order Processing Pipeline
// Order Service
@PostMapping("/orders")
public ResponseEntity<Order> createOrder(@RequestBody OrderRequest request) {
Order order = orderService.createOrder(request);
// Publishes: order.created
return ResponseEntity.ok(order);
}
// Inventory Service
@KafkaListener(topics = "order.created")
public void reserveInventory(OrderCreatedEvent event) {
inventoryService.reserve(event.getItems());
// Publishes: inventory.reserved or inventory.failed
}
// Payment Service
@KafkaListener(topics = "inventory.reserved")
public void processPayment(InventoryReservedEvent event) {
paymentService.charge(event.getCustomerId(), event.getAmount());
// Publishes: payment.completed or payment.failed
}
// Notification Service
@KafkaListener(topics = "payment.completed")
public void sendConfirmation(PaymentCompletedEvent event) {
notificationService.sendEmail(event.getOrderId());
}
2. Data Synchronization
@KafkaListener(topics = "user.updated")
public void syncUserData(UserUpdatedEvent event) {
// Update read model
userReadRepository.save(event.toReadModel());
// Update search index
elasticsearchService.indexUser(event);
// Update cache
cacheService.invalidate("user:" + event.getUserId());
}
Performance Optimization
1. Batch Processing
@KafkaListener(topics = "transactions")
public void processTransactions(
List<TransactionEvent> events,
Acknowledgment acknowledgment
) {
try {
// Process in batch
transactionService.processBatch(events);
acknowledgment.acknowledge();
} catch (Exception e) {
// Handle error
log.error("Batch processing failed", e);
}
}
2. Parallel Consumption
# application.properties
spring.kafka.consumer.concurrency=10
spring.kafka.listener.poll-timeout=1000
Challenges & Solutions
| Challenge | Solution |
|---|---|
| Exactly-once semantics | Kafka transactions + idempotent consumers |
| Ordering guarantees | Partition keys based on entity ID |
| Schema evolution | Schema registry + backward compatibility |
| Monitoring | Kafka metrics + custom dashboards |
| Testing | Embedded Kafka for integration tests |
Testing
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"order.created"})
class OrderServiceTest {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private OrderService orderService;
@Test
void shouldPublishEventWhenOrderCreated() {
// Given
OrderRequest request = new OrderRequest();
// When
Order order = orderService.createOrder(request);
// Then
ConsumerRecord<String, OrderCreatedEvent> record =
KafkaTestUtils.getSingleRecord(consumer, "order.created");
assertThat(record.value().getOrderId())
.isEqualTo(order.getId());
}
}
Conclusion
Event-driven architecture with Kafka and Spring Boot enables:
- Scalability: Handle millions of events per second
- Resilience: Services continue working despite failures
- Flexibility: Easy to add new services
- Decoupling: Services evolve independently
The key is to design events carefully, handle failures gracefully, and monitor everything.
Want to discuss event-driven architectures? Connect with me on LinkedIn!

