Implementing SAGA Orchestration with Spring Boot State Machine and Kafka: A Production-Ready Guide
Introduction
Managing distributed transactions across microservices is one of the most challenging aspects of modern system architecture. In my previous article on SAGA Design Pattern, I covered the fundamentals of SAGA — what it is, when to use it, and the differences between orchestration and choreography approaches.
In this article, we’ll take those concepts and build a production-ready implementation using Spring Boot State Machine and Apache Kafka. We’ll focus on the orchestration approach and demonstrate an innovative abstraction layer that makes your system portable across different messaging technologies.
What you’ll learn:
How to use Spring State Machine for SAGA orchestration
Building a messaging abstraction layer for technology independence
Implementing compensating transactions and idempotency
Testing strategies for distributed sagas
Production deployment considerations
Prerequisites:
Understanding of SAGA fundamentals (read here)
Familiarity with Spring State Machine (read my article)
Basic knowledge of Kafka
Let’s dive into the implementation!
Our Use Case: E-commerce Order Processing
We’ll implement a realistic order processing system with four microservices:
Order Service (Orchestrator) — Coordinates the entire saga
Payment Service — Processes payments and refunds
Inventory Service — Reserves and releases inventory
Shipping Service — Schedules shipments
Failure Scenarios:
Payment fails → Cancel order
Inventory unavailable → Refund payment → Cancel order
Shipping fails → Release inventory → Refund payment → Cancel order
Architecture: Multi-Module Spring Boot Project
spring-saga-orchestrator/
├── common-library/ # Shared components
│ ├── commands/ # BaseCommand and service commands
│ ├── events/ # BaseEvent and service events
│ ├── constant/ # KafkaTopics
│ ├── publisher/ # MessagePublisher interface
│ └── listener/ # DomainEventListener, KafkaListenerRegistrar
├── order-service/ # Orchestrator (Port: 8081)
├── payment-service/ # Payment processing (Port: 8082)
├── inventory-service/ # Inventory management (Port: 8083)
└── shipping-service/ # Shipping operations (Port: 8084)Key Decisions:
Kafka for reliable async messaging
Spring State Machine for workflow orchestration
Messaging abstraction layer for technology portability
Common library for shared domain models
The Game-Changer: Kafka Abstraction Layer
The most powerful aspect of this implementation is the abstraction layer that completely decouples services from Kafka-specific APIs. This enables seamless migration to other messaging systems (Pulsar, RabbitMQ, etc.) with zero business logic changes.
1. Centralized Topic Management
// common-library/src/main/java/com/codeexpert/common/constant/KafkaTopics.java
public class KafkaTopics {
public static final String PAYMENT_COMMAND = “payment-command-topic”;
public static final String PAYMENT_EVENT = “payment-event-topic”;
public static final String INVENTORY_COMMAND = “inventory-command-topic”;
public static final String INVENTORY_EVENT = “inventory-event-topic”;
public static final String SHIPPING_COMMAND = “shipping-command-topic”;
public static final String SHIPPING_EVENT = “shipping-event-topic”;
}2. Generic Message Publisher
// Interface
public interface MessagePublisher {
<T extends DomainEvent> void publish(String topic, String key, T message);
}
// Kafka Implementation
@Component
public class KafkaMessagePublisher implements MessagePublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Override
public <T extends DomainEvent> void publish(String topic, String key, T message) {
kafkaTemplate.send(topic, key, message);
}
}Why This Matters: Services depend on MessagePublisher, not Kafka. Want to switch to Pulsar? Just create PulsarMessagePublisher implementing the same interface—no other code changes needed!
3. Domain Event Hierarchy
public interface DomainEvent {} // Marker interface
@Data
@SuperBuilder
@NoArgsConstructor
public abstract class BaseCommand implements DomainEvent {
private String sagaId;
private String orderId;
private Instant timestamp;
}
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class ProcessPaymentCommand extends BaseCommand {
private String userId;
private BigDecimal amount;
}4. Programmatic Listener Registration
No more @KafkaListener annotations! Services register listeners programmatically:
@Component
public class KafkaListenerRegistrar {
private final ConcurrentKafkaListenerContainerFactory<String, Object> factory;
public <T extends DomainEvent> void registerListener(
String topic, String groupId,
DomainEventListener listener, Class<T> eventType) {
ContainerProperties props = new ContainerProperties(topic);
props.setGroupId(groupId);
props.setMessageListener((MessageListener<String, T>) record ->
listener.onEvent(record.value())
);
ConcurrentMessageListenerContainer<String, Object> container =
factory.createContainer(props);
container.start();
}
}Service Registration:
@SpringBootApplication
public class PaymentServiceApplication {
@Autowired
private KafkaListenerRegistrar registrar;
@Autowired
private PaymentCommandListener listener;
@PostConstruct
public void registerListeners() {
registrar.registerListener(
KafkaTopics.PAYMENT_COMMAND,
“payment-service-group”,
listener,
ProcessPaymentCommand.class
);
}
}SAGA Orchestrator with Spring State Machine
Since I’ve covered Spring State Machine basics in my previous article, I’ll focus on the SAGA-specific implementation.
State Machine Configuration
@Configuration
@EnableStateMachine
public class OrderSagaStateMachineConfig
extends StateMachineConfigurerAdapter<OrderState, OrderEvent> {
@Autowired
private MessagePublisher messagePublisher;
@Override
public void configure(StateMachineStateConfigurer<OrderState, OrderEvent> states)
throws Exception {
states
.withStates()
.initial(OrderState.PENDING)
.state(OrderState.PAYMENT_PROCESSING)
.state(OrderState.PAYMENT_COMPLETED)
.state(OrderState.INVENTORY_RESERVING)
.state(OrderState.INVENTORY_RESERVED)
.state(OrderState.SHIPPING_SCHEDULING)
.end(OrderState.COMPLETED)
.end(OrderState.CANCELLED);
}
@Override
public void configure(StateMachineTransitionConfigurer<OrderState, OrderEvent> transitions)
throws Exception {
transitions
// Forward flow
.withExternal()
.source(OrderState.PENDING)
.target(OrderState.PAYMENT_PROCESSING)
.event(OrderEvent.ORDER_CREATED)
.action(initiatePaymentAction())
.and()
.withExternal()
.source(OrderState.PAYMENT_PROCESSING)
.target(OrderState.PAYMENT_COMPLETED)
.event(OrderEvent.PAYMENT_PROCESSED)
.action(reserveInventoryAction())
.and()
.withExternal()
.source(OrderState.PAYMENT_COMPLETED)
.target(OrderState.INVENTORY_RESERVED)
.event(OrderEvent.INVENTORY_RESERVED)
.action(scheduleShippingAction())
.and()
.withExternal()
.source(OrderState.INVENTORY_RESERVED)
.target(OrderState.COMPLETED)
.event(OrderEvent.SHIPPING_SCHEDULED)
// Compensation flows
.and()
.withExternal()
.source(OrderState.PAYMENT_PROCESSING)
.target(OrderState.CANCELLED)
.event(OrderEvent.PAYMENT_FAILED)
.and()
.withExternal()
.source(OrderState.PAYMENT_COMPLETED)
.target(OrderState.CANCELLED)
.event(OrderEvent.INVENTORY_FAILED)
.action(compensatePaymentAction())
.and()
.withExternal()
.source(OrderState.INVENTORY_RESERVED)
.target(OrderState.CANCELLED)
.event(OrderEvent.SHIPPING_FAILED)
.action(compensateInventoryAndPaymentAction());
}
@Bean
public Action<OrderState, OrderEvent> initiatePaymentAction() {
return context -> {
CreateOrderCommand order = (CreateOrderCommand) context
.getExtendedState().getVariables().get(”orderCommand”);
ProcessPaymentCommand cmd = ProcessPaymentCommand.builder()
.sagaId(order.getSagaId())
.orderId(order.getOrderId())
.userId(order.getUserId())
.amount(order.getAmount())
.timestamp(Instant.now())
.build();
messagePublisher.publish(
KafkaTopics.PAYMENT_COMMAND,
cmd.getOrderId(),
cmd
);
};
}
@Bean
public Action<OrderState, OrderEvent> compensatePaymentAction() {
return context -> {
CreateOrderCommand order = (CreateOrderCommand) context
.getExtendedState().getVariables().get(”orderCommand”);
RefundPaymentCommand cmd = RefundPaymentCommand.builder()
.sagaId(order.getSagaId())
.orderId(order.getOrderId())
.userId(order.getUserId())
.amount(order.getAmount())
.timestamp(Instant.now())
.build();
messagePublisher.publish(
KafkaTopics.PAYMENT_COMMAND,
cmd.getOrderId(),
cmd
);
};
}
}Orchestrator Service
@Service
public class OrderSagaOrchestrator {
private final StateMachine<OrderState, OrderEvent> stateMachine;
public void startSaga(CreateOrderCommand command) {
stateMachine.getExtendedState()
.getVariables()
.put(”orderCommand”, command);
stateMachine.sendEvent(OrderEvent.ORDER_CREATED);
}
public void handlePaymentSuccess(PaymentProcessedEvent event) {
stateMachine.sendEvent(OrderEvent.PAYMENT_PROCESSED);
}
public void handlePaymentFailure(PaymentFailedEvent event) {
stateMachine.sendEvent(OrderEvent.PAYMENT_FAILED);
}
public void handleInventoryReserved(InventoryReservedEvent event) {
stateMachine.sendEvent(OrderEvent.INVENTORY_RESERVED);
}
public void handleInventoryFailed(InventoryFailedEvent event) {
stateMachine.sendEvent(OrderEvent.INVENTORY_FAILED);
}
}Implementing Services with Idempotency
Payment Service
@Service
public class PaymentService {
private final MessagePublisher messagePublisher;
private final Set<String> processedSagaIds = ConcurrentHashMap.newKeySet();
private final Map<String, BigDecimal> userBalances = new ConcurrentHashMap<>();
public void processPayment(ProcessPaymentCommand command) {
// Idempotency check
String key = command.getSagaId() + “-payment”;
if (processedSagaIds.contains(key)) {
return; // Already processed
}
try {
BigDecimal balance = userBalances.getOrDefault(
command.getUserId(), BigDecimal.ZERO);
if (balance.compareTo(command.getAmount()) >= 0) {
// Deduct amount
userBalances.put(command.getUserId(),
balance.subtract(command.getAmount()));
processedSagaIds.add(key);
// Publish success
messagePublisher.publish(
KafkaTopics.PAYMENT_EVENT,
command.getOrderId(),
PaymentProcessedEvent.builder()
.sagaId(command.getSagaId())
.orderId(command.getOrderId())
.build()
);
} else {
// Publish failure
messagePublisher.publish(
KafkaTopics.PAYMENT_EVENT,
command.getOrderId(),
PaymentFailedEvent.builder()
.sagaId(command.getSagaId())
.orderId(command.getOrderId())
.reason(”Insufficient balance”)
.build()
);
}
} catch (Exception e) {
// Publish failure
messagePublisher.publish(
KafkaTopics.PAYMENT_EVENT,
command.getOrderId(),
PaymentFailedEvent.builder()
.sagaId(command.getSagaId())
.orderId(command.getOrderId())
.reason(e.getMessage())
.build()
);
}
}
public void refundPayment(RefundPaymentCommand command) {
String key = command.getSagaId() + “-refund”;
if (processedSagaIds.contains(key)) {
return;
}
BigDecimal balance = userBalances.getOrDefault(
command.getUserId(), BigDecimal.ZERO);
userBalances.put(command.getUserId(),
balance.add(command.getAmount()));
processedSagaIds.add(key);
}
}Production Note: In production, store processedSagaIds in Redis or a database with TTL for cleanup.
Testing Your SAGA
Unit Testing State Transitions
@SpringBootTest
public class OrderSagaStateMachineTest {
@Autowired
private StateMachine<OrderState, OrderEvent> stateMachine;
@Test
public void testSuccessfulOrderFlow() {
stateMachine.start();
assertEquals(OrderState.PENDING, stateMachine.getState().getId());
stateMachine.sendEvent(OrderEvent.ORDER_CREATED);
assertEquals(OrderState.PAYMENT_PROCESSING, stateMachine.getState().getId());
stateMachine.sendEvent(OrderEvent.PAYMENT_PROCESSED);
assertEquals(OrderState.PAYMENT_COMPLETED, stateMachine.getState().getId());
stateMachine.sendEvent(OrderEvent.INVENTORY_RESERVED);
assertEquals(OrderState.INVENTORY_RESERVED, stateMachine.getState().getId());
stateMachine.sendEvent(OrderEvent.SHIPPING_SCHEDULED);
assertEquals(OrderState.COMPLETED, stateMachine.getState().getId());
}
@Test
public void testInventoryFailureCompensation() {
stateMachine.start();
stateMachine.sendEvent(OrderEvent.ORDER_CREATED);
stateMachine.sendEvent(OrderEvent.PAYMENT_PROCESSED);
stateMachine.sendEvent(OrderEvent.INVENTORY_FAILED);
assertEquals(OrderState.CANCELLED, stateMachine.getState().getId());
}
}Integration Testing with Testcontainers
@SpringBootTest
@Testcontainers
public class OrderSagaIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse(”confluentinc/cp-kafka:latest”)
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add(”spring.kafka.bootstrap-servers”,
kafka::getBootstrapServers);
}
@Test
public void testCompleteOrderSaga() throws Exception {
CreateOrderCommand command = CreateOrderCommand.builder()
.sagaId(UUID.randomUUID().toString())
.orderId(UUID.randomUUID().toString())
.userId(”user123”)
.productId(”product456”)
.quantity(2)
.amount(new BigDecimal(”100.00”))
.build();
orderController.createOrder(command);
// Wait and verify
Thread.sleep(3000);
// Assert order status is COMPLETED
}
}Monitoring and Observability
State Transition Logging
@Component
public class SagaMonitoringListener
extends StateMachineListenerAdapter<OrderState, OrderEvent> {
@Autowired
private MeterRegistry meterRegistry;
@Override
public void stateChanged(State<OrderState, OrderEvent> from,
State<OrderState, OrderEvent> to) {
logger.info(”State transition: {} -> {}”, from.getId(), to.getId());
meterRegistry.counter(”saga.state.transition”,
“from”, from.getId().name(),
“to”, to.getId().name()
).increment();
}
}Key Metrics to Track
@Component
public class SagaMetrics {
private final MeterRegistry registry;
public void recordSagaCompleted(long durationMs) {
registry.counter(”saga.completed”).increment();
registry.timer(”saga.duration”)
.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordSagaFailed(String reason) {
registry.counter(”saga.failed”, “reason”, reason).increment();
}
public void recordCompensation(String step) {
registry.counter(”saga.compensation”, “step”, step).increment();
}
}Running the System
Build and Run
# Build
./gradlew clean build
# Run services (separate terminals)
java -jar order-service/build/libs/order-service-1.0-SNAPSHOT.jar
java -jar payment-service/build/libs/payment-service-1.0-SNAPSHOT.jar
java -jar inventory-service/build/libs/inventory-service-1.0-SNAPSHOT.jar
java -jar shipping-service/build/libs/shipping-service-1.0-SNAPSHOT.jarTest API
curl -X POST http://localhost:8081/api/orders \
-H “Content-Type: application/json” \
-d ‘{
“userId”: “user123”,
“productId”: “product456”,
“quantity”: 2,
“amount”: 100.0
}’Production Best Practices
✅ Essential Practices
Idempotency Everywhere — Use unique keys for all operations
Proper Error Handling — Catch and publish failure events
Comprehensive Logging — Include saga ID in all logs
Timeout Management — Set timeouts for all operations
Dead Letter Queues — Handle poison messages
State Persistence — Store saga state for recovery
❌ Avoid These Pitfalls
Business Logic in Orchestrator — Keep it in services
Synchronous HTTP Calls — Use async messaging only
Missing Compensation — Every action needs compensation
Ignoring Duplicates — Always implement idempotency
Poor Monitoring — Track all metrics from day one
Migration to Other Messaging Systems
Thanks to our abstraction layer, switching from Kafka to Pulsar is trivial:
@Component
@ConditionalOnProperty(name = “messaging.provider”, havingValue = “pulsar”)
public class PulsarMessagePublisher implements MessagePublisher {
private final PulsarClient pulsarClient;
@Override
public <T extends DomainEvent> void publish(String topic, String key, T message) {
Producer<T> producer = pulsarClient
.newProducer(Schema.JSON(message.getClass()))
.topic(topic)
.create();
producer.newMessage().key(key).value(message).send();
}
}Configuration change only:
messaging:
provider: pulsar # Changed from ‘kafka’Zero business logic changes required!
Conclusion
We’ve built a production-ready SAGA orchestration system that demonstrates:
✨ Clean Architecture — Abstraction layer for technology independence
✨ Spring State Machine — Natural workflow modeling
✨ Kafka Integration — Reliable async messaging
✨ Idempotency — Handling duplicate messages
✨ Compensation Logic — Proper rollback mechanisms
✨ Comprehensive Testing — Unit and integration tests
✨ Production Monitoring — Metrics and observability
Key Takeaways
Abstraction is Power — Decouple from infrastructure for flexibility
State Machines are Natural — Perfect for modeling SAGA workflows
Design for Failure — Compensating transactions are critical
Test Thoroughly — Both success and failure scenarios
Monitor Everything — You can’t fix what you can’t see
Next Steps
Get hands-on:
git clone https://github.com/codeexpert07/spring-saga-orchestrator
cd spring-saga-orchestrator
./gradlew clean buildExtend the implementation:
Add notification service
Implement saga history visualization
Add parallel execution branches
Build saga replay mechanism
Learn more:
Final Thoughts
Building distributed systems is challenging, but patterns like SAGA orchestration provide structure for managing that complexity. The abstractions we’ve built today aren’t just for SAGAs — they’re principles of good software design that will serve you across many projects.
Remember: eventual consistency is not a limitation; it’s a design choice that enables scalability and resilience.
Found this helpful?
⭐ Star the GitHub repository
💬 Share your implementation stories below
🔗 Connect with me for more distributed systems deep dives
Thank you for reading! Drop a comment if you have questions or want to share your SAGA implementation experiences.
Happy coding! 🚀
About the Author
Software architect passionate about distributed systems, microservices patterns, and building scalable solutions.
Connect:
GitHub: @codeexpert07
Substack: @codeexperts07

