Event-Driven Architecture
Event-Driven Architecture (EDA) adalah pola arsitektur yang berfokus pada produksi, deteksi, dan reaksi terhadap events. Tutorial ini akan menjelaskan implementasi EDA di Go.
Contoh Masalah
Bagaimana cara:
- Implement event system
- Handle async operations
- Manage event queues
- Scale event processing
Penyelesaian
1. Struktur Project
eventapp/
├── cmd/
│ └── server/
│ └── main.go
├── internal/
│ ├── events/
│ │ ├── dispatcher.go
│ │ ├── event.go
│ │ └── handler.go
│ ├── domain/
│ │ └── order/
│ │ ├── events.go
│ │ └── service.go
│ └── infrastructure/
│ └── kafka/
│ └── producer.go
└── pkg/
└── logger/
└── logger.go
2. Event System
internal/events/event.go:
package events
import "time"
type Event interface {
EventName() string
Payload() interface{}
Timestamp() time.Time
}
type BaseEvent struct {
Name string
Data interface{}
CreatedAt time.Time
}
func NewEvent(name string, data interface{}) Event {
return &BaseEvent{
Name: name,
Data: data,
CreatedAt: time.Now(),
}
}
func (e *BaseEvent) EventName() string {
return e.Name
}
func (e *BaseEvent) Payload() interface{} {
return e.Data
}
func (e *BaseEvent) Timestamp() time.Time {
return e.CreatedAt
}
3. Event Dispatcher
internal/events/dispatcher.go:
package events
import (
"context"
"sync"
)
type Handler func(Event) error
type Dispatcher struct {
handlers map[string][]Handler
mu sync.RWMutex
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{
handlers: make(map[string][]Handler),
}
}
func (d *Dispatcher) Subscribe(eventName string,
handler Handler) {
d.mu.Lock()
defer d.mu.Unlock()
d.handlers[eventName] = append(d.handlers[eventName],
handler)
}
func (d *Dispatcher) Dispatch(ctx context.Context,
event Event) error {
d.mu.RLock()
handlers := d.handlers[event.EventName()]
d.mu.RUnlock()
for _, handler := range handlers {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := handler(event); err != nil {
return err
}
}
}
return nil
}
func (d *Dispatcher) AsyncDispatch(event Event) {
d.mu.RLock()
handlers := d.handlers[event.EventName()]
d.mu.RUnlock()
for _, handler := range handlers {
go func(h Handler) {
if err := h(event); err != nil {
// Log error
}
}(handler)
}
}
4. Domain Events
internal/domain/order/events.go:
package order
import (
"time"
"encoding/json"
)
const (
EventOrderCreated = "order.created"
EventOrderPaid = "order.paid"
EventOrderShipped = "order.shipped"
)
type OrderCreatedEvent struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Amount float64 `json:"amount"`
CreatedAt time.Time `json:"createdAt"`
}
func (e *OrderCreatedEvent) EventName() string {
return EventOrderCreated
}
func (e *OrderCreatedEvent) Payload() interface{} {
return e
}
func (e *OrderCreatedEvent) Timestamp() time.Time {
return e.CreatedAt
}
type OrderPaidEvent struct {
OrderID string `json:"orderId"`
PaymentID string `json:"paymentId"`
Amount float64 `json:"amount"`
PaidAt time.Time `json:"paidAt"`
}
func (e *OrderPaidEvent) EventName() string {
return EventOrderPaid
}
func (e *OrderPaidEvent) Payload() interface{} {
return e
}
func (e *OrderPaidEvent) Timestamp() time.Time {
return e.PaidAt
}
5. Event Handlers
internal/events/handler.go:
package events
import (
"log"
"encoding/json"
"eventapp/internal/domain/order"
)
type OrderEventHandler struct {
logger *log.Logger
}
func NewOrderEventHandler(logger *log.Logger) *OrderEventHandler {
return &OrderEventHandler{
logger: logger,
}
}
func (h *OrderEventHandler) HandleOrderCreated(
event Event) error {
var orderEvent order.OrderCreatedEvent
data, err := json.Marshal(event.Payload())
if err != nil {
return err
}
if err := json.Unmarshal(data, &orderEvent); err != nil {
return err
}
h.logger.Printf("Order created: %s, Amount: %.2f",
orderEvent.OrderID, orderEvent.Amount)
// Additional processing...
return nil
}
func (h *OrderEventHandler) HandleOrderPaid(
event Event) error {
var orderEvent order.OrderPaidEvent
data, err := json.Marshal(event.Payload())
if err != nil {
return err
}
if err := json.Unmarshal(data, &orderEvent); err != nil {
return err
}
h.logger.Printf("Order paid: %s, Payment: %s, Amount: %.2f",
orderEvent.OrderID, orderEvent.PaymentID,
orderEvent.Amount)
// Additional processing...
return nil
}
6. Kafka Integration
internal/infrastructure/kafka/producer.go:
package kafka
import (
"encoding/json"
"github.com/Shopify/sarama"
"eventapp/internal/events"
)
type Producer struct {
producer sarama.SyncProducer
}
func NewProducer(brokers []string) (*Producer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &Producer{
producer: producer,
}, nil
}
func (p *Producer) PublishEvent(event events.Event) error {
data, err := json.Marshal(event.Payload())
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: event.EventName(),
Value: sarama.StringEncoder(data),
}
_, _, err = p.producer.SendMessage(msg)
return err
}
func (p *Producer) Close() error {
return p.producer.Close()
}
7. Main Application
cmd/server/main.go:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"eventapp/internal/events"
"eventapp/internal/domain/order"
"eventapp/internal/infrastructure/kafka"
)
func main() {
logger := log.New(os.Stdout, "[EVENT] ", log.LstdFlags)
// Create event dispatcher
dispatcher := events.NewDispatcher()
// Create event handlers
orderHandler := events.NewOrderEventHandler(logger)
// Subscribe to events
dispatcher.Subscribe(order.EventOrderCreated,
orderHandler.HandleOrderCreated)
dispatcher.Subscribe(order.EventOrderPaid,
orderHandler.HandleOrderPaid)
// Setup Kafka producer
producer, err := kafka.NewProducer([]string{"localhost:9092"})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Create sample event
event := &order.OrderCreatedEvent{
OrderID: "order_123",
CustomerID: "cust_456",
Amount: 99.99,
CreatedAt: time.Now(),
}
// Handle event locally
ctx := context.Background()
if err := dispatcher.Dispatch(ctx, event); err != nil {
log.Printf("Error dispatching event: %v", err)
}
// Publish to Kafka
if err := producer.PublishEvent(event); err != nil {
log.Printf("Error publishing event: %v", err)
}
// Wait for interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
}
Penjelasan
Event System
- Event interface
- Event dispatcher
- Event handlers
- Async processing
Integration
- Kafka producer
- Message serialization
- Error handling
- Retry logic
Domain Events
- Order events
- Event payload
- Event metadata
- Event types
Best Practices
Event Design
- Clear naming
- Versioning
- Schema evolution
- Documentation
Processing
- Idempotency
- Error handling
- Retry policies
- Dead letter queues
Monitoring
- Event tracking
- Performance metrics
- Error logging
- Alerting
Contoh Penggunaan
- Subscribe to events:
dispatcher.Subscribe("order.created", func(e Event) error {
// Handle order created event
return nil
})
- Dispatch event:
event := &OrderCreatedEvent{
OrderID: "123",
Amount: 99.99,
}
dispatcher.AsyncDispatch(event)
Tips
Design
- Event schema
- Failure handling
- Scaling strategy
- Monitoring plan
Implementation
- Use interfaces
- Handle timeouts
- Implement retries
- Monitor queues
Operations
- Monitor latency
- Track failures
- Set alerts
- Backup data
Troubleshooting
Common Issues
- Message loss
- Duplicate events
- Processing delays
- System overload
Solutions
- Event tracking
- Retry mechanism
- Circuit breakers
- Load shedding
Security
Event Security
- Event validation
- Access control
- Encryption
- Audit logging
System Security
- Network security
- Authentication
- Authorization
- Monitoring