CQRS (Command Query Responsibility Segregation)
CQRS adalah pola arsitektur yang memisahkan operasi read (query) dan write (command) untuk meningkatkan performa, skalabilitas, dan maintainability.
Contoh Masalah
Bagaimana cara:
- Implement CQRS pattern
- Separate read/write models
- Handle eventual consistency
- Scale read/write operations
Penyelesaian
1. Struktur Project
cqrsapp/
├── cmd/
│ └── server/
│ └── main.go
├── internal/
│ ├── commands/
│ │ ├── handler.go
│ │ └── types.go
│ ├── queries/
│ │ ├── handler.go
│ │ └── types.go
│ ├── domain/
│ │ └── product/
│ │ ├── aggregate.go
│ │ └── events.go
│ └── infrastructure/
│ ├── persistence/
│ │ ├── mysql/
│ │ │ └── write_repo.go
│ │ └── elasticsearch/
│ │ └── read_repo.go
│ └── messaging/
│ └── rabbitmq/
│ └── publisher.go
└── pkg/
└── eventbus/
└── eventbus.go
2. Command Types
internal/commands/types.go:
package commands
type Command interface {
CommandType() string
}
// Create Product Command
type CreateProduct struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Stock int `json:"stock"`
}
func (c CreateProduct) CommandType() string {
return "create_product"
}
// Update Product Command
type UpdateProduct struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Stock int `json:"stock"`
}
func (c UpdateProduct) CommandType() string {
return "update_product"
}
// Delete Product Command
type DeleteProduct struct {
ID string `json:"id"`
}
func (c DeleteProduct) CommandType() string {
return "delete_product"
}
3. Query Types
internal/queries/types.go:
package queries
type Query interface {
QueryType() string
}
// Get Product Query
type GetProduct struct {
ID string `json:"id"`
}
func (q GetProduct) QueryType() string {
return "get_product"
}
// List Products Query
type ListProducts struct {
Page int `json:"page"`
Limit int `json:"limit"`
}
func (q ListProducts) QueryType() string {
return "list_products"
}
// Search Products Query
type SearchProducts struct {
Query string `json:"query"`
Page int `json:"page"`
Limit int `json:"limit"`
}
func (q SearchProducts) QueryType() string {
return "search_products"
}
4. Command Handler
internal/commands/handler.go:
package commands
import (
"context"
"errors"
"cqrsapp/internal/domain/product"
"cqrsapp/pkg/eventbus"
)
type CommandHandler struct {
writeRepo product.WriteRepository
eventBus eventbus.EventBus
}
func NewCommandHandler(repo product.WriteRepository,
bus eventbus.EventBus) *CommandHandler {
return &CommandHandler{
writeRepo: repo,
eventBus: bus,
}
}
func (h *CommandHandler) Handle(ctx context.Context,
cmd Command) error {
switch c := cmd.(type) {
case CreateProduct:
return h.handleCreateProduct(ctx, c)
case UpdateProduct:
return h.handleUpdateProduct(ctx, c)
case DeleteProduct:
return h.handleDeleteProduct(ctx, c)
default:
return errors.New("unknown command")
}
}
func (h *CommandHandler) handleCreateProduct(ctx context.Context,
cmd CreateProduct) error {
// Create product aggregate
prod := product.NewProduct(cmd.ID, cmd.Name, cmd.Description,
cmd.Price, cmd.Stock)
// Save to write database
if err := h.writeRepo.Save(ctx, prod); err != nil {
return err
}
// Publish event
event := product.NewProductCreatedEvent(prod)
return h.eventBus.Publish(ctx, event)
}
func (h *CommandHandler) handleUpdateProduct(ctx context.Context,
cmd UpdateProduct) error {
// Get existing product
prod, err := h.writeRepo.FindByID(ctx, cmd.ID)
if err != nil {
return err
}
// Update product
prod.Update(cmd.Name, cmd.Description, cmd.Price, cmd.Stock)
// Save changes
if err := h.writeRepo.Save(ctx, prod); err != nil {
return err
}
// Publish event
event := product.NewProductUpdatedEvent(prod)
return h.eventBus.Publish(ctx, event)
}
func (h *CommandHandler) handleDeleteProduct(ctx context.Context,
cmd DeleteProduct) error {
// Delete from write database
if err := h.writeRepo.Delete(ctx, cmd.ID); err != nil {
return err
}
// Publish event
event := product.NewProductDeletedEvent(cmd.ID)
return h.eventBus.Publish(ctx, event)
}
5. Query Handler
internal/queries/handler.go:
package queries
import (
"context"
"errors"
"cqrsapp/internal/domain/product"
)
type QueryHandler struct {
readRepo product.ReadRepository
}
func NewQueryHandler(repo product.ReadRepository) *QueryHandler {
return &QueryHandler{
readRepo: repo,
}
}
func (h *QueryHandler) Handle(ctx context.Context,
query Query) (interface{}, error) {
switch q := query.(type) {
case GetProduct:
return h.handleGetProduct(ctx, q)
case ListProducts:
return h.handleListProducts(ctx, q)
case SearchProducts:
return h.handleSearchProducts(ctx, q)
default:
return nil, errors.New("unknown query")
}
}
func (h *QueryHandler) handleGetProduct(ctx context.Context,
query GetProduct) (*product.ProductView, error) {
return h.readRepo.FindByID(ctx, query.ID)
}
func (h *QueryHandler) handleListProducts(ctx context.Context,
query ListProducts) (*product.ProductList, error) {
return h.readRepo.FindAll(ctx, query.Page, query.Limit)
}
func (h *QueryHandler) handleSearchProducts(ctx context.Context,
query SearchProducts) (*product.ProductList, error) {
return h.readRepo.Search(ctx, query.Query, query.Page,
query.Limit)
}
6. Write Repository
internal/infrastructure/persistence/mysql/write_repo.go:
package mysql
import (
"context"
"database/sql"
"cqrsapp/internal/domain/product"
)
type productWriteRepo struct {
db *sql.DB
}
func NewProductWriteRepo(db *sql.DB) *productWriteRepo {
return &productWriteRepo{
db: db,
}
}
func (r *productWriteRepo) Save(ctx context.Context,
p *product.Product) error {
query := `
INSERT INTO products (
id, name, description, price, stock, created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description),
price = VALUES(price),
stock = VALUES(stock),
updated_at = VALUES(updated_at)
`
_, err := r.db.ExecContext(ctx, query,
p.ID,
p.Name,
p.Description,
p.Price,
p.Stock,
p.CreatedAt,
p.UpdatedAt,
)
return err
}
func (r *productWriteRepo) Delete(ctx context.Context,
id string) error {
query := "DELETE FROM products WHERE id = ?"
_, err := r.db.ExecContext(ctx, query, id)
return err
}
7. Read Repository
internal/infrastructure/persistence/elasticsearch/read_repo.go:
package elasticsearch
import (
"context"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8"
"cqrsapp/internal/domain/product"
)
type productReadRepo struct {
es *elasticsearch.Client
}
func NewProductReadRepo(es *elasticsearch.Client) *productReadRepo {
return &productReadRepo{
es: es,
}
}
func (r *productReadRepo) FindByID(ctx context.Context,
id string) (*product.ProductView, error) {
res, err := r.es.Get("products", id)
if err != nil {
return nil, err
}
defer res.Body.Close()
if !res.Found {
return nil, product.ErrProductNotFound
}
var prod product.ProductView
if err := json.NewDecoder(res.Body).Decode(&prod); err != nil {
return nil, err
}
return &prod, nil
}
func (r *productReadRepo) Search(ctx context.Context,
query string, page, limit int) (*product.ProductList, error) {
from := (page - 1) * limit
// Build search query
searchQuery := map[string]interface{}{
"query": map[string]interface{}{
"multi_match": map[string]interface{}{
"query": query,
"fields": []string{"name", "description"},
},
},
"from": from,
"size": limit,
}
// Perform search
res, err := r.es.Search(
r.es.Search.WithContext(ctx),
r.es.Search.WithIndex("products"),
r.es.Search.WithBody(
strings.NewReader(
mustToJSON(searchQuery),
),
),
)
if err != nil {
return nil, err
}
defer res.Body.Close()
// Parse response
var result struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
Source product.ProductView `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, err
}
// Build product list
products := make([]product.ProductView, 0)
for _, hit := range result.Hits.Hits {
products = append(products, hit.Source)
}
return &product.ProductList{
Items: products,
Total: result.Hits.Total.Value,
Page: page,
Limit: limit,
}, nil
}
8. Event Bus
pkg/eventbus/eventbus.go:
package eventbus
import (
"context"
"encoding/json"
"github.com/streadway/amqp"
)
type Event interface {
EventType() string
Payload() interface{}
}
type EventBus interface {
Publish(context.Context, Event) error
Subscribe(string, EventHandler) error
}
type EventHandler func(context.Context, Event) error
type rabbitMQEventBus struct {
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitMQEventBus(url string) (EventBus, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
return &rabbitMQEventBus{
conn: conn,
channel: ch,
}, nil
}
func (b *rabbitMQEventBus) Publish(ctx context.Context,
event Event) error {
data, err := json.Marshal(event.Payload())
if err != nil {
return err
}
return b.channel.Publish(
"events", // exchange
event.EventType(), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
},
)
}
func (b *rabbitMQEventBus) Subscribe(eventType string,
handler EventHandler) error {
q, err := b.channel.QueueDeclare(
eventType, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
msgs, err := b.channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
go func() {
for d := range msgs {
var event Event
if err := json.Unmarshal(d.Body, &event); err != nil {
// Log error
continue
}
if err := handler(context.Background(),
event); err != nil {
// Log error
}
}
}()
return nil
}
Penjelasan
CQRS Pattern
- Command handling
- Query handling
- Event publishing
- Data consistency
Components
- Write model
- Read model
- Event bus
- Repositories
Data Flow
- Command flow
- Query flow
- Event flow
- Synchronization
Best Practices
Design
- Clear boundaries
- Event sourcing
- Eventual consistency
- Error handling
Implementation
- Separate models
- Use interfaces
- Handle failures
- Monitor sync
Operations
- Monitor latency
- Track errors
- Backup data
- Scale independently
Tips
Architecture
- Start simple
- Evolve gradually
- Monitor complexity
- Document decisions
Development
- Use DTOs
- Handle timeouts
- Implement retries
- Test thoroughly
Deployment
- Scale reads
- Monitor writes
- Handle failures
- Backup regularly
Troubleshooting
Common Issues
- Sync delays
- Data inconsistency
- Performance issues
- System complexity
Solutions
- Monitor sync
- Implement retry
- Optimize queries
- Simplify design
Security
Data Security
- Access control
- Encryption
- Audit logging
- Data validation
System Security
- Authentication
- Authorization
- Rate limiting
- Monitoring