ArchitectureKafkaEvent-DrivenMicroservices

Building Event-Driven Microservices with Kafka and Spring Boot

December 20, 2024
5 min read
Share:
Building Event-Driven Microservices with Kafka and Spring Boot

Building Event-Driven Microservices with Kafka and Spring Boot

Event-driven architecture has become the backbone of modern distributed systems. In this article, I'll share my experience building event-driven microservices using Apache Kafka and Spring Boot.

Why Event-Driven Architecture?

Traditional request-response patterns have limitations:

  • Tight Coupling: Services depend directly on each other
  • Synchronous: Caller waits for response
  • Limited Scalability: Bottlenecks in synchronous chains
  • Poor Resilience: Failures propagate immediately

Event-driven architecture solves these problems by:

  • Decoupling services through events
  • Enabling asynchronous processing
  • Supporting horizontal scaling
  • Improving fault tolerance

Architecture Overview

┌─────────────┐      Events      ┌─────────────┐
│   Service A │─────────────────>│    Kafka    │
└─────────────┘                   └──────┬──────┘
                                         │
                          ┌──────────────┼──────────────┐
                          ▼              ▼              ▼
                   ┌────────────┐ ┌────────────┐ ┌────────────┐
                   │ Service B  │ │ Service C  │ │ Service D  │
                   └────────────┘ └────────────┘ └────────────┘

Implementation with Spring Boot

1. Setup Dependencies

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

2. Configure Kafka

@Configuration
public class KafkaConfig {
    
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

3. Define Events

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderCreatedEvent {
    private String orderId;
    private String customerId;
    private BigDecimal amount;
    private LocalDateTime createdAt;
    private List<OrderItem> items;
}

@Data
public class OrderItem {
    private String productId;
    private int quantity;
    private BigDecimal price;
}

4. Produce Events

@Service
@Slf4j
public class OrderService {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Transactional
    public Order createOrder(OrderRequest request) {
        // Save order to database
        Order order = orderRepository.save(request.toEntity());
        
        // Publish event
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            .amount(order.getTotalAmount())
            .createdAt(order.getCreatedAt())
            .items(order.getItems())
            .build();
            
        kafkaTemplate.send("order.created", order.getId(), event)
            .addCallback(
                result -> log.info("Event published: {}", event),
                ex -> log.error("Failed to publish event", ex)
            );
            
        return order;
    }
}

5. Consume Events

@Service
@Slf4j
public class NotificationService {
    
    @Autowired
    private EmailService emailService;
    
    @KafkaListener(
        topics = "order.created",
        groupId = "notification-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderCreated(OrderCreatedEvent event) {
        log.info("Received order created event: {}", event);
        
        try {
            // Send confirmation email
            emailService.sendOrderConfirmation(
                event.getCustomerId(),
                event.getOrderId(),
                event.getAmount()
            );
            
            log.info("Order confirmation sent for: {}", event.getOrderId());
        } catch (Exception e) {
            log.error("Failed to send notification for order: {}", 
                event.getOrderId(), e);
            throw e; // Will trigger retry
        }
    }
}

Best Practices

1. Event Schema Evolution

Use schema registry (like Confluent Schema Registry) to manage event schemas:

@Data
@Schema(namespace = "com.example.events", version = "1.0")
public class OrderCreatedEvent {
    @Schema(required = true)
    private String orderId;
    
    @Schema(required = true)
    private String customerId;
    
    @Schema(required = false, defaultValue = "0")
    private BigDecimal discount; // New field in v2
}

2. Idempotent Consumers

Ensure consumers can handle duplicate messages:

@Service
public class PaymentService {
    
    @Autowired
    private PaymentRepository paymentRepository;
    
    @KafkaListener(topics = "order.created")
    @Transactional
    public void processPayment(OrderCreatedEvent event) {
        // Check if already processed
        if (paymentRepository.existsByOrderId(event.getOrderId())) {
            log.warn("Payment already processed for order: {}", 
                event.getOrderId());
            return;
        }
        
        // Process payment
        Payment payment = processPaymentLogic(event);
        paymentRepository.save(payment);
    }
}

3. Dead Letter Queue (DLQ)

Handle failed messages:

@Configuration
public class KafkaErrorHandler {
    
    @Bean
    public DefaultErrorHandler errorHandler() {
        BackOff backOff = new FixedBackOff(1000L, 3L); // 3 retries with 1s delay
        
        DefaultErrorHandler handler = new DefaultErrorHandler(
            (record, exception) -> {
                // Send to DLQ
                kafkaTemplate.send(
                    "order.created.dlq", 
                    record.key(), 
                    record.value()
                );
            },
            backOff
        );
        
        return handler;
    }
}

4. Monitoring & Observability

@Aspect
@Component
public class KafkaMetricsAspect {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public Object measureConsumer(ProceedingJoinPoint pjp) throws Throwable {
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            Object result = pjp.proceed();
            sample.stop(Timer.builder("kafka.consumer.duration")
                .tag("method", pjp.getSignature().getName())
                .tag("status", "success")
                .register(meterRegistry));
            return result;
        } catch (Exception e) {
            sample.stop(Timer.builder("kafka.consumer.duration")
                .tag("method", pjp.getSignature().getName())
                .tag("status", "error")
                .register(meterRegistry));
            throw e;
        }
    }
}

Real-World Use Cases

1. Order Processing Pipeline

// Order Service
@PostMapping("/orders")
public ResponseEntity<Order> createOrder(@RequestBody OrderRequest request) {
    Order order = orderService.createOrder(request);
    // Publishes: order.created
    return ResponseEntity.ok(order);
}

// Inventory Service
@KafkaListener(topics = "order.created")
public void reserveInventory(OrderCreatedEvent event) {
    inventoryService.reserve(event.getItems());
    // Publishes: inventory.reserved or inventory.failed
}

// Payment Service
@KafkaListener(topics = "inventory.reserved")
public void processPayment(InventoryReservedEvent event) {
    paymentService.charge(event.getCustomerId(), event.getAmount());
    // Publishes: payment.completed or payment.failed
}

// Notification Service
@KafkaListener(topics = "payment.completed")
public void sendConfirmation(PaymentCompletedEvent event) {
    notificationService.sendEmail(event.getOrderId());
}

2. Data Synchronization

@KafkaListener(topics = "user.updated")
public void syncUserData(UserUpdatedEvent event) {
    // Update read model
    userReadRepository.save(event.toReadModel());
    
    // Update search index
    elasticsearchService.indexUser(event);
    
    // Update cache
    cacheService.invalidate("user:" + event.getUserId());
}

Performance Optimization

1. Batch Processing

@KafkaListener(topics = "transactions")
public void processTransactions(
    List<TransactionEvent> events,
    Acknowledgment acknowledgment
) {
    try {
        // Process in batch
        transactionService.processBatch(events);
        acknowledgment.acknowledge();
    } catch (Exception e) {
        // Handle error
        log.error("Batch processing failed", e);
    }
}

2. Parallel Consumption

# application.properties
spring.kafka.consumer.concurrency=10
spring.kafka.listener.poll-timeout=1000

Challenges & Solutions

ChallengeSolution
Exactly-once semanticsKafka transactions + idempotent consumers
Ordering guaranteesPartition keys based on entity ID
Schema evolutionSchema registry + backward compatibility
MonitoringKafka metrics + custom dashboards
TestingEmbedded Kafka for integration tests

Testing

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"order.created"})
class OrderServiceTest {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @Autowired
    private OrderService orderService;
    
    @Test
    void shouldPublishEventWhenOrderCreated() {
        // Given
        OrderRequest request = new OrderRequest();
        
        // When
        Order order = orderService.createOrder(request);
        
        // Then
        ConsumerRecord<String, OrderCreatedEvent> record = 
            KafkaTestUtils.getSingleRecord(consumer, "order.created");
            
        assertThat(record.value().getOrderId())
            .isEqualTo(order.getId());
    }
}

Conclusion

Event-driven architecture with Kafka and Spring Boot enables:

  • Scalability: Handle millions of events per second
  • Resilience: Services continue working despite failures
  • Flexibility: Easy to add new services
  • Decoupling: Services evolve independently

The key is to design events carefully, handle failures gracefully, and monitor everything.


Want to discuss event-driven architectures? Connect with me on LinkedIn!

Shahariar Hossen

Shahariar Hossen

Senior Full Stack Engineer with 6+ years of experience in building scalable systems. Specialist in Spring Boot, Microservices, and AI/ML.

Continue Reading