CQRS (Command Query Responsibility Segregation)

CQRS adalah pola arsitektur yang memisahkan operasi read (query) dan write (command) untuk meningkatkan performa, skalabilitas, dan maintainability.

Contoh Masalah

Bagaimana cara:

  1. Implement CQRS pattern
  2. Separate read/write models
  3. Handle eventual consistency
  4. Scale read/write operations

Penyelesaian

1. Struktur Project

cqrsapp/
├── cmd/
│   └── server/
│       └── main.go
├── internal/
│   ├── commands/
│   │   ├── handler.go
│   │   └── types.go
│   ├── queries/
│   │   ├── handler.go
│   │   └── types.go
│   ├── domain/
│   │   └── product/
│   │       ├── aggregate.go
│   │       └── events.go
│   └── infrastructure/
│       ├── persistence/
│       │   ├── mysql/
│       │   │   └── write_repo.go
│       │   └── elasticsearch/
│       │       └── read_repo.go
│       └── messaging/
│           └── rabbitmq/
│               └── publisher.go
└── pkg/
    └── eventbus/
        └── eventbus.go

2. Command Types

internal/commands/types.go:

package commands

type Command interface {
    CommandType() string
}

// Create Product Command
type CreateProduct struct {
    ID          string  `json:"id"`
    Name        string  `json:"name"`
    Description string  `json:"description"`
    Price       float64 `json:"price"`
    Stock       int     `json:"stock"`
}

func (c CreateProduct) CommandType() string {
    return "create_product"
}

// Update Product Command
type UpdateProduct struct {
    ID          string  `json:"id"`
    Name        string  `json:"name"`
    Description string  `json:"description"`
    Price       float64 `json:"price"`
    Stock       int     `json:"stock"`
}

func (c UpdateProduct) CommandType() string {
    return "update_product"
}

// Delete Product Command
type DeleteProduct struct {
    ID string `json:"id"`
}

func (c DeleteProduct) CommandType() string {
    return "delete_product"
}

3. Query Types

internal/queries/types.go:

package queries

type Query interface {
    QueryType() string
}

// Get Product Query
type GetProduct struct {
    ID string `json:"id"`
}

func (q GetProduct) QueryType() string {
    return "get_product"
}

// List Products Query
type ListProducts struct {
    Page  int `json:"page"`
    Limit int `json:"limit"`
}

func (q ListProducts) QueryType() string {
    return "list_products"
}

// Search Products Query
type SearchProducts struct {
    Query string `json:"query"`
    Page  int    `json:"page"`
    Limit int    `json:"limit"`
}

func (q SearchProducts) QueryType() string {
    return "search_products"
}

4. Command Handler

internal/commands/handler.go:

package commands

import (
    "context"
    "errors"
    
    "cqrsapp/internal/domain/product"
    "cqrsapp/pkg/eventbus"
)

type CommandHandler struct {
    writeRepo product.WriteRepository
    eventBus  eventbus.EventBus
}

func NewCommandHandler(repo product.WriteRepository,
    bus eventbus.EventBus) *CommandHandler {
    return &CommandHandler{
        writeRepo: repo,
        eventBus:  bus,
    }
}

func (h *CommandHandler) Handle(ctx context.Context,
    cmd Command) error {
    switch c := cmd.(type) {
    case CreateProduct:
        return h.handleCreateProduct(ctx, c)
    case UpdateProduct:
        return h.handleUpdateProduct(ctx, c)
    case DeleteProduct:
        return h.handleDeleteProduct(ctx, c)
    default:
        return errors.New("unknown command")
    }
}

func (h *CommandHandler) handleCreateProduct(ctx context.Context,
    cmd CreateProduct) error {
    // Create product aggregate
    prod := product.NewProduct(cmd.ID, cmd.Name, cmd.Description,
        cmd.Price, cmd.Stock)

    // Save to write database
    if err := h.writeRepo.Save(ctx, prod); err != nil {
        return err
    }

    // Publish event
    event := product.NewProductCreatedEvent(prod)
    return h.eventBus.Publish(ctx, event)
}

func (h *CommandHandler) handleUpdateProduct(ctx context.Context,
    cmd UpdateProduct) error {
    // Get existing product
    prod, err := h.writeRepo.FindByID(ctx, cmd.ID)
    if err != nil {
        return err
    }

    // Update product
    prod.Update(cmd.Name, cmd.Description, cmd.Price, cmd.Stock)

    // Save changes
    if err := h.writeRepo.Save(ctx, prod); err != nil {
        return err
    }

    // Publish event
    event := product.NewProductUpdatedEvent(prod)
    return h.eventBus.Publish(ctx, event)
}

func (h *CommandHandler) handleDeleteProduct(ctx context.Context,
    cmd DeleteProduct) error {
    // Delete from write database
    if err := h.writeRepo.Delete(ctx, cmd.ID); err != nil {
        return err
    }

    // Publish event
    event := product.NewProductDeletedEvent(cmd.ID)
    return h.eventBus.Publish(ctx, event)
}

5. Query Handler

internal/queries/handler.go:

package queries

import (
    "context"
    "errors"
    
    "cqrsapp/internal/domain/product"
)

type QueryHandler struct {
    readRepo product.ReadRepository
}

func NewQueryHandler(repo product.ReadRepository) *QueryHandler {
    return &QueryHandler{
        readRepo: repo,
    }
}

func (h *QueryHandler) Handle(ctx context.Context,
    query Query) (interface{}, error) {
    switch q := query.(type) {
    case GetProduct:
        return h.handleGetProduct(ctx, q)
    case ListProducts:
        return h.handleListProducts(ctx, q)
    case SearchProducts:
        return h.handleSearchProducts(ctx, q)
    default:
        return nil, errors.New("unknown query")
    }
}

func (h *QueryHandler) handleGetProduct(ctx context.Context,
    query GetProduct) (*product.ProductView, error) {
    return h.readRepo.FindByID(ctx, query.ID)
}

func (h *QueryHandler) handleListProducts(ctx context.Context,
    query ListProducts) (*product.ProductList, error) {
    return h.readRepo.FindAll(ctx, query.Page, query.Limit)
}

func (h *QueryHandler) handleSearchProducts(ctx context.Context,
    query SearchProducts) (*product.ProductList, error) {
    return h.readRepo.Search(ctx, query.Query, query.Page,
        query.Limit)
}

6. Write Repository

internal/infrastructure/persistence/mysql/write_repo.go:

package mysql

import (
    "context"
    "database/sql"
    
    "cqrsapp/internal/domain/product"
)

type productWriteRepo struct {
    db *sql.DB
}

func NewProductWriteRepo(db *sql.DB) *productWriteRepo {
    return &productWriteRepo{
        db: db,
    }
}

func (r *productWriteRepo) Save(ctx context.Context,
    p *product.Product) error {
    query := `
        INSERT INTO products (
            id, name, description, price, stock, created_at,
            updated_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?)
        ON DUPLICATE KEY UPDATE
            name = VALUES(name),
            description = VALUES(description),
            price = VALUES(price),
            stock = VALUES(stock),
            updated_at = VALUES(updated_at)
    `

    _, err := r.db.ExecContext(ctx, query,
        p.ID,
        p.Name,
        p.Description,
        p.Price,
        p.Stock,
        p.CreatedAt,
        p.UpdatedAt,
    )

    return err
}

func (r *productWriteRepo) Delete(ctx context.Context,
    id string) error {
    query := "DELETE FROM products WHERE id = ?"
    _, err := r.db.ExecContext(ctx, query, id)
    return err
}

7. Read Repository

internal/infrastructure/persistence/elasticsearch/read_repo.go:

package elasticsearch

import (
    "context"
    "encoding/json"
    
    "github.com/elastic/go-elasticsearch/v8"
    "cqrsapp/internal/domain/product"
)

type productReadRepo struct {
    es *elasticsearch.Client
}

func NewProductReadRepo(es *elasticsearch.Client) *productReadRepo {
    return &productReadRepo{
        es: es,
    }
}

func (r *productReadRepo) FindByID(ctx context.Context,
    id string) (*product.ProductView, error) {
    res, err := r.es.Get("products", id)
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()

    if !res.Found {
        return nil, product.ErrProductNotFound
    }

    var prod product.ProductView
    if err := json.NewDecoder(res.Body).Decode(&prod); err != nil {
        return nil, err
    }

    return &prod, nil
}

func (r *productReadRepo) Search(ctx context.Context,
    query string, page, limit int) (*product.ProductList, error) {
    from := (page - 1) * limit

    // Build search query
    searchQuery := map[string]interface{}{
        "query": map[string]interface{}{
            "multi_match": map[string]interface{}{
                "query":  query,
                "fields": []string{"name", "description"},
            },
        },
        "from": from,
        "size": limit,
    }

    // Perform search
    res, err := r.es.Search(
        r.es.Search.WithContext(ctx),
        r.es.Search.WithIndex("products"),
        r.es.Search.WithBody(
            strings.NewReader(
                mustToJSON(searchQuery),
            ),
        ),
    )
    if err != nil {
        return nil, err
    }
    defer res.Body.Close()

    // Parse response
    var result struct {
        Hits struct {
            Total struct {
                Value int `json:"value"`
            } `json:"total"`
            Hits []struct {
                Source product.ProductView `json:"_source"`
            } `json:"hits"`
        } `json:"hits"`
    }

    if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
        return nil, err
    }

    // Build product list
    products := make([]product.ProductView, 0)
    for _, hit := range result.Hits.Hits {
        products = append(products, hit.Source)
    }

    return &product.ProductList{
        Items: products,
        Total: result.Hits.Total.Value,
        Page:  page,
        Limit: limit,
    }, nil
}

8. Event Bus

pkg/eventbus/eventbus.go:

package eventbus

import (
    "context"
    "encoding/json"
    
    "github.com/streadway/amqp"
)

type Event interface {
    EventType() string
    Payload() interface{}
}

type EventBus interface {
    Publish(context.Context, Event) error
    Subscribe(string, EventHandler) error
}

type EventHandler func(context.Context, Event) error

type rabbitMQEventBus struct {
    conn    *amqp.Connection
    channel *amqp.Channel
}

func NewRabbitMQEventBus(url string) (EventBus, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    return &rabbitMQEventBus{
        conn:    conn,
        channel: ch,
    }, nil
}

func (b *rabbitMQEventBus) Publish(ctx context.Context,
    event Event) error {
    data, err := json.Marshal(event.Payload())
    if err != nil {
        return err
    }

    return b.channel.Publish(
        "events",        // exchange
        event.EventType(), // routing key
        false,          // mandatory
        false,          // immediate
        amqp.Publishing{
            ContentType: "application/json",
            Body:       data,
        },
    )
}

func (b *rabbitMQEventBus) Subscribe(eventType string,
    handler EventHandler) error {
    q, err := b.channel.QueueDeclare(
        eventType, // name
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        return err
    }

    msgs, err := b.channel.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        return err
    }

    go func() {
        for d := range msgs {
            var event Event
            if err := json.Unmarshal(d.Body, &event); err != nil {
                // Log error
                continue
            }

            if err := handler(context.Background(),
                event); err != nil {
                // Log error
            }
        }
    }()

    return nil
}

Penjelasan

  1. CQRS Pattern

    • Command handling
    • Query handling
    • Event publishing
    • Data consistency
  2. Components

    • Write model
    • Read model
    • Event bus
    • Repositories
  3. Data Flow

    • Command flow
    • Query flow
    • Event flow
    • Synchronization

Best Practices

  1. Design

    • Clear boundaries
    • Event sourcing
    • Eventual consistency
    • Error handling
  2. Implementation

    • Separate models
    • Use interfaces
    • Handle failures
    • Monitor sync
  3. Operations

    • Monitor latency
    • Track errors
    • Backup data
    • Scale independently

Tips

  1. Architecture

    • Start simple
    • Evolve gradually
    • Monitor complexity
    • Document decisions
  2. Development

    • Use DTOs
    • Handle timeouts
    • Implement retries
    • Test thoroughly
  3. Deployment

    • Scale reads
    • Monitor writes
    • Handle failures
    • Backup regularly

Troubleshooting

  1. Common Issues

    • Sync delays
    • Data inconsistency
    • Performance issues
    • System complexity
  2. Solutions

    • Monitor sync
    • Implement retry
    • Optimize queries
    • Simplify design

Security

  1. Data Security

    • Access control
    • Encryption
    • Audit logging
    • Data validation
  2. System Security

    • Authentication
    • Authorization
    • Rate limiting
    • Monitoring