Event-Driven Architecture

Event-Driven Architecture (EDA) adalah pola arsitektur yang berfokus pada produksi, deteksi, dan reaksi terhadap events. Tutorial ini akan menjelaskan implementasi EDA di Go.

Contoh Masalah

Bagaimana cara:

  1. Implement event system
  2. Handle async operations
  3. Manage event queues
  4. Scale event processing

Penyelesaian

1. Struktur Project

eventapp/
├── cmd/
│   └── server/
│       └── main.go
├── internal/
│   ├── events/
│   │   ├── dispatcher.go
│   │   ├── event.go
│   │   └── handler.go
│   ├── domain/
│   │   └── order/
│   │       ├── events.go
│   │       └── service.go
│   └── infrastructure/
│       └── kafka/
│           └── producer.go
└── pkg/
    └── logger/
        └── logger.go

2. Event System

internal/events/event.go:

package events

import "time"

type Event interface {
    EventName() string
    Payload() interface{}
    Timestamp() time.Time
}

type BaseEvent struct {
    Name      string
    Data      interface{}
    CreatedAt time.Time
}

func NewEvent(name string, data interface{}) Event {
    return &BaseEvent{
        Name:      name,
        Data:      data,
        CreatedAt: time.Now(),
    }
}

func (e *BaseEvent) EventName() string {
    return e.Name
}

func (e *BaseEvent) Payload() interface{} {
    return e.Data
}

func (e *BaseEvent) Timestamp() time.Time {
    return e.CreatedAt
}

3. Event Dispatcher

internal/events/dispatcher.go:

package events

import (
    "context"
    "sync"
)

type Handler func(Event) error

type Dispatcher struct {
    handlers map[string][]Handler
    mu       sync.RWMutex
}

func NewDispatcher() *Dispatcher {
    return &Dispatcher{
        handlers: make(map[string][]Handler),
    }
}

func (d *Dispatcher) Subscribe(eventName string,
    handler Handler) {
    d.mu.Lock()
    defer d.mu.Unlock()

    d.handlers[eventName] = append(d.handlers[eventName],
        handler)
}

func (d *Dispatcher) Dispatch(ctx context.Context,
    event Event) error {
    d.mu.RLock()
    handlers := d.handlers[event.EventName()]
    d.mu.RUnlock()

    for _, handler := range handlers {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            if err := handler(event); err != nil {
                return err
            }
        }
    }

    return nil
}

func (d *Dispatcher) AsyncDispatch(event Event) {
    d.mu.RLock()
    handlers := d.handlers[event.EventName()]
    d.mu.RUnlock()

    for _, handler := range handlers {
        go func(h Handler) {
            if err := h(event); err != nil {
                // Log error
            }
        }(handler)
    }
}

4. Domain Events

internal/domain/order/events.go:

package order

import (
    "time"
    "encoding/json"
)

const (
    EventOrderCreated = "order.created"
    EventOrderPaid    = "order.paid"
    EventOrderShipped = "order.shipped"
)

type OrderCreatedEvent struct {
    OrderID    string    `json:"orderId"`
    CustomerID string    `json:"customerId"`
    Amount     float64   `json:"amount"`
    CreatedAt  time.Time `json:"createdAt"`
}

func (e *OrderCreatedEvent) EventName() string {
    return EventOrderCreated
}

func (e *OrderCreatedEvent) Payload() interface{} {
    return e
}

func (e *OrderCreatedEvent) Timestamp() time.Time {
    return e.CreatedAt
}

type OrderPaidEvent struct {
    OrderID     string    `json:"orderId"`
    PaymentID   string    `json:"paymentId"`
    Amount      float64   `json:"amount"`
    PaidAt      time.Time `json:"paidAt"`
}

func (e *OrderPaidEvent) EventName() string {
    return EventOrderPaid
}

func (e *OrderPaidEvent) Payload() interface{} {
    return e
}

func (e *OrderPaidEvent) Timestamp() time.Time {
    return e.PaidAt
}

5. Event Handlers

internal/events/handler.go:

package events

import (
    "log"
    "encoding/json"
    
    "eventapp/internal/domain/order"
)

type OrderEventHandler struct {
    logger *log.Logger
}

func NewOrderEventHandler(logger *log.Logger) *OrderEventHandler {
    return &OrderEventHandler{
        logger: logger,
    }
}

func (h *OrderEventHandler) HandleOrderCreated(
    event Event) error {
    var orderEvent order.OrderCreatedEvent
    data, err := json.Marshal(event.Payload())
    if err != nil {
        return err
    }

    if err := json.Unmarshal(data, &orderEvent); err != nil {
        return err
    }

    h.logger.Printf("Order created: %s, Amount: %.2f",
        orderEvent.OrderID, orderEvent.Amount)

    // Additional processing...
    return nil
}

func (h *OrderEventHandler) HandleOrderPaid(
    event Event) error {
    var orderEvent order.OrderPaidEvent
    data, err := json.Marshal(event.Payload())
    if err != nil {
        return err
    }

    if err := json.Unmarshal(data, &orderEvent); err != nil {
        return err
    }

    h.logger.Printf("Order paid: %s, Payment: %s, Amount: %.2f",
        orderEvent.OrderID, orderEvent.PaymentID,
        orderEvent.Amount)

    // Additional processing...
    return nil
}

6. Kafka Integration

internal/infrastructure/kafka/producer.go:

package kafka

import (
    "encoding/json"
    "github.com/Shopify/sarama"
    "eventapp/internal/events"
)

type Producer struct {
    producer sarama.SyncProducer
}

func NewProducer(brokers []string) (*Producer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }

    return &Producer{
        producer: producer,
    }, nil
}

func (p *Producer) PublishEvent(event events.Event) error {
    data, err := json.Marshal(event.Payload())
    if err != nil {
        return err
    }

    msg := &sarama.ProducerMessage{
        Topic: event.EventName(),
        Value: sarama.StringEncoder(data),
    }

    _, _, err = p.producer.SendMessage(msg)
    return err
}

func (p *Producer) Close() error {
    return p.producer.Close()
}

7. Main Application

cmd/server/main.go:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    
    "eventapp/internal/events"
    "eventapp/internal/domain/order"
    "eventapp/internal/infrastructure/kafka"
)

func main() {
    logger := log.New(os.Stdout, "[EVENT] ", log.LstdFlags)

    // Create event dispatcher
    dispatcher := events.NewDispatcher()

    // Create event handlers
    orderHandler := events.NewOrderEventHandler(logger)

    // Subscribe to events
    dispatcher.Subscribe(order.EventOrderCreated,
        orderHandler.HandleOrderCreated)
    dispatcher.Subscribe(order.EventOrderPaid,
        orderHandler.HandleOrderPaid)

    // Setup Kafka producer
    producer, err := kafka.NewProducer([]string{"localhost:9092"})
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    // Create sample event
    event := &order.OrderCreatedEvent{
        OrderID:    "order_123",
        CustomerID: "cust_456",
        Amount:     99.99,
        CreatedAt:  time.Now(),
    }

    // Handle event locally
    ctx := context.Background()
    if err := dispatcher.Dispatch(ctx, event); err != nil {
        log.Printf("Error dispatching event: %v", err)
    }

    // Publish to Kafka
    if err := producer.PublishEvent(event); err != nil {
        log.Printf("Error publishing event: %v", err)
    }

    // Wait for interrupt signal
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan
}

Penjelasan

  1. Event System

    • Event interface
    • Event dispatcher
    • Event handlers
    • Async processing
  2. Integration

    • Kafka producer
    • Message serialization
    • Error handling
    • Retry logic
  3. Domain Events

    • Order events
    • Event payload
    • Event metadata
    • Event types

Best Practices

  1. Event Design

    • Clear naming
    • Versioning
    • Schema evolution
    • Documentation
  2. Processing

    • Idempotency
    • Error handling
    • Retry policies
    • Dead letter queues
  3. Monitoring

    • Event tracking
    • Performance metrics
    • Error logging
    • Alerting

Contoh Penggunaan

  1. Subscribe to events:
dispatcher.Subscribe("order.created", func(e Event) error {
    // Handle order created event
    return nil
})
  1. Dispatch event:
event := &OrderCreatedEvent{
    OrderID: "123",
    Amount:  99.99,
}
dispatcher.AsyncDispatch(event)

Tips

  1. Design

    • Event schema
    • Failure handling
    • Scaling strategy
    • Monitoring plan
  2. Implementation

    • Use interfaces
    • Handle timeouts
    • Implement retries
    • Monitor queues
  3. Operations

    • Monitor latency
    • Track failures
    • Set alerts
    • Backup data

Troubleshooting

  1. Common Issues

    • Message loss
    • Duplicate events
    • Processing delays
    • System overload
  2. Solutions

    • Event tracking
    • Retry mechanism
    • Circuit breakers
    • Load shedding

Security

  1. Event Security

    • Event validation
    • Access control
    • Encryption
    • Audit logging
  2. System Security

    • Network security
    • Authentication
    • Authorization
    • Monitoring