Event Sourcing in practice: Lessons from the trenches

Event Sourcing is often seen as a complex pattern that requires substantial changes to application architecture. However, with careful implementation, it can be introduced incrementally to existing systems, providing significant benefits without the need for a complete overhaul. This article shares practical experiences and techniques for applying Event Sourcing in production systems, particularly in the context of evolving legacy applications.

Understanding event sourcing fundamentals

At its core, Event Sourcing is about storing changes to application state as a sequence of events rather than just the current state. Each event represents a fact that happened in the system - something that cannot be denied or changed. By replaying these events, we can reconstruct the state of an entity at any point in time.

The main components in an Event Sourcing architecture typically include:

  1. Events - immutable records of something that happened
  2. Event Store - a specialized database for storing sequences of events (it can even be normal Postgres)
  3. Aggregates - domain objects that enforce business rules and emit events
  4. Projections - read models built from events for efficient querying

What makes Event Sourcing powerful is that it preserves the history of changes, enables temporal queries, and provides a clear audit trail. However, implementing it effectively requires understanding when and how to apply it.

Introducing event sourcing to legacy systems

One of the biggest challenges is integrating Event Sourcing into an existing application. The approach shared here focuses on incrementally adopting Event Sourcing for specific domains within the application rather than attempting a wholesale migration.

Consider a system that handles financial transaction classification. In the legacy implementation, transactions were stored in a relational database with columns for category and classification method. The goal was to extract this logic into a properly modeled domain without disrupting existing functionality.

The approach taken was to:

  1. Create a new domain model with Event Sourcing for transaction classification
  2. Continue using the existing database tables as read models
  3. Implement event handlers to synchronize the new event-sourced domain with the legacy tables
  4. Gradually transition application code to use the new domain model

This hybrid approach allowed for maintaining backward compatibility while introducing more robust domain modeling with proper business rules.

The migration event technique

When introducing Event Sourcing to an existing system with existing data, a common challenge is initializing the event streams for existing entities. This is where the "migration event" pattern proves useful.

The migration event is a special event that represents the initial state of an entity based on data imported from the legacy system. It differs from regular events in that it doesn't represent a domain action but rather establishes a baseline state.

Here's a simplified example of implementing a migration event:

module Transactions
  class CategoryAggregate
    # Regular domain events
    TransactionCategorized = Class.new(Event)
    CategoryManuallyChanged = Class.new(Event)
    
    # Special migration event
    ImportedFromLegacySystem = Class.new(Event)
    
    def initialize
      @events = []
      @category = nil
      @classification_method = nil
    end
    
    # Method specifically for migrating legacy data
    def import_from_legacy_system(category:, classification_method:)
      apply_event(ImportedFromLegacySystem.new(
        category: category,
        classification_method: classification_method
      ))
    end
    
    # Regular domain methods
    def categorize_automatically(category:)
      # Business rules to prevent overwriting manual classification
      raise "Cannot overwrite manual classification" if @classification_method == "manual"
      
      apply_event(TransactionCategorized.new(
        category: category,
        classification_method: "automatic"
      ))
    end
    
    private
    
    def apply_event(event)
      case event
      when ImportedFromLegacySystem
        @category = event.category
        @classification_method = event.classification_method
      when TransactionCategorized
        @category = event.category
        @classification_method = event.classification_method
      end
      
      @events << event
    end
  end
end

The migration event serves several important purposes:

  1. It clearly marks the entity as having been initialized from legacy data
  2. It provides context for future debugging (if unexpected behavior occurs)
  3. It establishes the starting point for the event stream without losing information

This approach is particularly valuable when transitioning gradually, as it maintains the full context of the entity's history while enabling the new domain model.

Event upcasting: evolving your events over time

As your understanding of the domain evolves, you might need to change the structure or naming of events. However, existing events in the store cannot be modified without losing data integrity. This is where "event upcasting" becomes essential.

Event upcasting is the process of transforming older event versions into newer formats when they're loaded from the Event Store. This allows the application to evolve its events without breaking existing event streams.

Consider a scenario where an event initially named BankAccountSynced was later found to be more accurately described as ConnectorSynced:

module EventUpcasting
  class EventUpcastingTransformer
    def transform(event)
      case event.type
      when "BankAccountSynced"
        # Transform old event to new format
        ConnectorSynced.new(
          connector_id: event.data[:bank_account_id],
          synchronized_at: event.data[:synced_at]
        )
      else
        # No transformation needed for other events
        event
      end
    end
  end
  
  class EventStore
    def initialize
      @transformer = EventUpcastingTransformer.new
    end
    
    def load_events(stream_id)
      raw_events = database.fetch_events(stream_id)
      raw_events.map { |event| @transformer.transform(event) }
    end
  end
end

The process for safely evolving events typically involves several steps:

  1. Deploy the new event type and the upcasting logic to transform old events into the new format
  2. Verify that the system works correctly with both old and new events
  3. Optionally, perform a stream rewrite to convert all old events to the new format
  4. Remove the upcasting logic once all events have been migrated

This approach ensures backward compatibility while allowing your event model to evolve alongside your understanding of the domain.

Finding the right balance with aggregates

Domain-Driven Design principles, particularly those outlined by Eric Evans, provide guidance on designing aggregates. However, not all rules are equally critical in all contexts. Understanding which rules can be bent depending on your specific circumstances is crucial for practical implementation.

The two core rules that should rarely be broken are:

  1. Reference other aggregates by identity only
  2. Maintain invariants within a single aggregate's boundary

However, rules about keeping aggregates small or enforcing eventual consistency for updates outside the aggregate can sometimes be relaxed in specific contexts, especially in systems with lower throughput or when simplifying the implementation significantly outweighs the benefits of strict adherence.

Consider the example of updating both an aggregate and a read model in the same transaction:

module Transactions
  class UpdateTransactionCategory
    def call(transaction_id:, category:, user:)
      # Load aggregate from event store
      aggregate = repository.load_aggregate(transaction_id)
      
      # Apply domain logic through aggregate
      aggregate.categorize_manually(category: category, user: user)
      
      # Save events in transaction with read model update
      ActiveRecord::Base.transaction do
        # Save events to event store
        repository.save_aggregate(aggregate)
        
        # Update read model directly
        transaction_record = Transaction.find(transaction_id)
        transaction_record.update!(
          category: category,
          classification_method: "manual",
          updated_by: user.id
        )
      end
    end
  end
end

While this approach violates the strict separation between command and query sides, it can be appropriate in systems where:

  1. The extra complexity of eventual consistency outweighs its benefits
  2. The performance impact of additional synchronization is acceptable
  3. The system doesn't require high concurrency for these operations

The key is making these trade-offs deliberately, with full awareness of what you're gaining and sacrificing.

Stream management and event lifecycles

As event streams grow over time, they can become unwieldy. While snapshots are one solution, they're often just a technical workaround for a more fundamental modeling issue. Instead, consider business-driven approaches to manage stream size:

  1. Temporal boundaries: Split streams based on business time periods, such as accounting quarters or fiscal years
  2. Process boundaries: When a business process completes, start a new stream for the next instance
  3. State transitions: Major state changes can mark natural boundaries for streams

Here's an example of implementing temporal boundaries for financial accounts:

module Accounts
  class AccountService
    def close_fiscal_year(account_id:, year:)
      # Load the current account
      account = repository.load_account(account_id)
      
      # Record year closing
      account.close_fiscal_year(year: year)
      repository.save(account)
      
      # Create next year's account with starting balance
      next_year_account = Account.new
      next_year_account.initialize_fiscal_year(
        year: year + 1,
        starting_balance: account.closing_balance
      )
      
      repository.save(next_year_account)
      
      next_year_account
    end
  end
end

This approach maintains the integrity of your domain model while preventing individual streams from growing indefinitely.

Practical benefits beyond architectural purity

Event Sourcing provides numerous practical benefits that extend beyond architectural considerations:

Enhanced analytics and business insights

By capturing the complete history of domain events, Event Sourcing enables powerful analytics. For example, in the transaction classification system, you can analyze how users manually classify transactions to improve automatic classification algorithms. This continuous feedback loop improves the system over time.

Simplified audit trails

Audit requirements are satisfied naturally with Event Sourcing, as the complete history of changes is preserved with information about who made each change and when.

Experimentation with minimal risk

New features can be developed by consuming existing events without modifying the core system. This allows for experimentation with new models or visualizations based on the same underlying events.

Debugging complex issues

When unexpected behavior occurs, the event stream provides a complete history that can be replayed to pinpoint exactly what happened and why.

Getting started with event sourcing incrementally

Based on practical experience, here are recommendations for getting started with Event Sourcing:

  1. Start small: Apply Event Sourcing to a bounded context with clear business value rather than attempting to implement it system-wide.
  2. Focus on domain modeling: Concentrate on capturing the business language and rules correctly in your events and aggregates.
  3. Simplify infrastructure: Don't immediately introduce complex CQRS architectures with separate read and write databases.
  4. Test thoroughly: Event-sourced systems must be well-tested, especially around business rules enforced by aggregates.
  5. Be pragmatic about implementation: Bend the rules when appropriate for your specific context.

A simple example to start with might be capturing user feedback when canceling subscriptions:

module Subscriptions
  class CancellationService
    def cancel_subscription(subscription_id:, feedback:, user_id:)
      # Perform the cancellation in the external system
      external_service.cancel(subscription_id)
      
      # Record the cancellation event with feedback
      event_store.publish(
        "SubscriptionCancelled",
        subscription_id: subscription_id,
        user_id: user_id,
        feedback: feedback,
        cancelled_at: Time.current
      )
    end
  end
  
  class CancellationNotifier
    def initialize(slack_client)
      @slack_client = slack_client
    end
    
    def subscription_cancelled(event)
      @slack_client.send_message(
        channel: "#customer-feedback",
        text: "Subscription #{event.subscription_id} cancelled. Feedback: #{event.feedback}"
      )
    end
  end
end

This simple example captures valuable business information without requiring complex infrastructure, providing immediate value while introducing Event Sourcing concepts.

Summary

Event Sourcing isn't just an architectural pattern—it's a powerful tool for capturing business knowledge, enabling analytics, and creating systems that can evolve alongside your understanding of the domain.