Progress belajar
Modul 71 dari 73
0% 0/73 modul selesai
Setelah selesai, tandai modul ini agar progres kursus tetap rapi.
Progress disimpan lokal di browser ini.
Event-Driven Architecture
dengan SQS
Di modul ini, checkout tetap cepat karena order disimpan dulu, sementara email, inventory, shipping, dan notifikasi diproses lewat event.
Kenapa Event-Driven?
Di React, kita sering memisahkan state update dari side effect lewat event handler dan effect. Di backend, pola yang mirip dipakai untuk memisahkan transaksi inti dari pekerjaan sampingan.
Pada checkout skincare shop, core flow yang wajib cepat dan konsisten adalah membuat order, menghitung total, mengunci harga, dan menyimpan status awal. Setelah itu ada banyak side effect: kirim email, kurangi stok fisik, request shipping, update analytics, dan trigger notifikasi. Bila semuanya dijalankan sinkron di request POST /v1/checkout, pelanggan menunggu lebih lama dan satu integrasi lambat bisa menggagalkan order yang sebenarnya valid.
Pola arsitektur di mana perubahan penting dalam domain dinyatakan sebagai event, lalu komponen lain bereaksi terhadap event itu secara asynchronous.
Fakta bisnis yang sudah terjadi, dinamai dengan bentuk lampau, misalnya OrderCreated, PaymentSucceeded, StockReserved, dan NotificationRequested.
Di Laravel kamu mungkin memakai event dan listener. Di Go, kita biasanya membuat struct event eksplisit, interface publisher, dan worker consumer yang membaca queue.
- Request checkout menunggu payment hook, email, inventory, shipping, dan analytics.
- Error email bisa ikut menggagalkan response walau order sudah valid.
- Latency makin buruk ketika downstream bertambah.
- Request checkout cukup menyimpan order dan event.
- Worker memproses side effect di luar request utama.
- Downstream gagal bisa retry tanpa membuat pelanggan menunggu.
Di AWS, Amazon SQS adalah queue terkelola untuk memisahkan producer dan consumer. Producer mengirim message, consumer mengambil message, memprosesnya, lalu menghapus message setelah sukses. Untuk routing event ke banyak subscriber, Amazon EventBridge lebih cocok karena satu rule dapat mengirim event ke beberapa target.
Dari pemanggilan service langsung ke event
Outcome modul ini adalah kamu bisa berpindah dari memanggil service lain secara langsung menjadi alur berbasis event ketika memang dibutuhkan. Mari lihat titik awalnya. Handler checkout versi sinkron memanggil email, inventory, dan shipping secara berurutan di dalam request, semuanya menempel pada satu transaksi mental yang sama.
internal/order/handler.go (SEBELUM: semua sinkron)func (h *Handler) Checkout(w http.ResponseWriter, r *http.Request) { order, err := h.orders.CreateOrder(r.Context(), input) if err != nil { writeError(w, err) return } // Semua side effect menempel di request checkout. if err := h.email.SendOrderConfirmation(r.Context(), order.ID); err != nil { writeError(w, err) // email lambat atau down menggagalkan order yang valid return } if err := h.inventory.Reserve(r.Context(), order.ID); err != nil { writeError(w, err) return } if err := h.shipping.RequestPickup(r.Context(), order.ID); err != nil { writeError(w, err) return } writeJSON(w, http.StatusCreated, order) }
Versi event-driven membalik tanggung jawab. Handler hanya membuat order dan mencatat fakta bahwa order itu terjadi. Email, inventory, dan shipping bereaksi belakangan, di luar jalur request, dan boleh gagal lalu retry tanpa membuat pelanggan menunggu.
internal/order/handler.go (SESUDAH: fakta dulu, reaksi belakangan)func (h *Handler) Checkout(w http.ResponseWriter, r *http.Request) { // CreateOrder menyimpan order DAN event order.created dalam satu transaksi. order, err := h.orders.CreateOrder(r.Context(), input) if err != nil { writeError(w, err) return } // Selesai. Tidak ada email, inventory, atau shipping di sini. writeJSON(w, http.StatusCreated, order) }
Refactor ini hanya untuk side effect yang boleh telat sedikit. Validasi stok yang harus menolak checkout di tempat, atau perhitungan total, tetap sinkron di dalam request. Event untuk pekerjaan yang boleh diproses ulang, bukan untuk keputusan yang harus terjadi sebelum balasan ke pelanggan.
Domain Event di Go
Event di Go sebaiknya bukan map[string]any liar. Buat kontrak eksplisit agar payload mudah dites, divalidasi, dan diubah secara aman.
Event harus menjawab empat hal: apa yang terjadi, kapan terjadi, entity mana yang berubah, dan data minimal apa yang dibutuhkan subscriber. Hindari memasukkan seluruh row database ke event, karena itu membuat event rapuh terhadap perubahan schema internal.
Pakailah event envelope kecil dan payload typed. Envelope memudahkan metadata umum, payload typed menjaga business meaning tetap jelas.
internal/events/event.gopackage events import ( "encoding/json" "time" ) type Type string const ( OrderCreated Type = "order.created" PaymentSucceeded Type = "payment.succeeded" StockReserved Type = "stock.reserved" NotificationRequested Type = "notification.requested" ) type Event struct { ID string `json:"id"` Type Type `json:"type"` AggregateType string `json:"aggregate_type"` AggregateID string `json:"aggregate_id"` OccurredAt time.Time `json:"occurred_at"` Payload json.RawMessage `json:"payload"` } type OrderCreatedPayload struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Total int64 `json:"total"` Currency string `json:"currency"` } type PaymentSucceededPayload struct { OrderID string `json:"order_id"` PaymentID string `json:"payment_id"` TransactionID string `json:"transaction_id"` PaidAmount int64 `json:"paid_amount"` PaidAt time.Time `json:"paid_at"` } type StockReservedPayload struct { OrderID string `json:"order_id"` ReservationID string `json:"reservation_id"` SKU string `json:"sku"` Quantity int `json:"quantity"` } type NotificationRequestedPayload struct { Channel string `json:"channel"` // email, whatsapp, push Template string `json:"template"` // order_confirmation, payment_success CustomerID string `json:"customer_id"` OrderID string `json:"order_id"` }
Nama event memakai dotted name seperti payment.succeeded untuk message body dan queue. Nama struct Go tetap PaymentSucceededPayload karena idiom Go lebih mudah dibaca dengan PascalCase untuk exported type. Keempat payload di atas memetakan langsung ke empat event di katalog (OrderCreated, PaymentSucceeded, StockReserved, NotificationRequested), jadi tiap event punya bentuk data yang jelas, bukan kantong serbaguna.
Di JS payload sering object polos dan di PHP sering array asosiatif, keduanya tanpa pengecekan saat compile. Di Go, Payload di-bungkus json.RawMessage di envelope (decode ditunda sampai handler yang tepat), lalu di-unmarshal ke *StockReservedPayload atau *PaymentSucceededPayload yang typed. Hasilnya: salah ketik field ketahuan saat compile, dan evolusi schema lebih aman karena kontraknya eksplisit, bukan map[string]any liar.
OrderCreated dan PaymentSucceeded kita pakai end to end di sepanjang modul. StockReserved dan NotificationRequested punya payload dan akan dipakai di handler inventory dan notification (Section 10), serta diperdalam di Chapter 5 saat membahas konsistensi inventory. Mendefinisikan payload-nya sekarang menjaga katalog tetap utuh.
Wrapper metadata umum event, misalnya id, type, aggregate_id, occurred_at, dan payload.
Entity domain utama yang menjadi pusat perubahan, misalnya order, payment, stock reservation, atau notification request.
Event adalah kontrak antar bagian sistem. Bila payload berisi semua kolom internal, setiap perubahan schema bisa memecahkan consumer.
Katalog Event Skincare Shop
Event catalog adalah daftar event resmi yang boleh dipublish oleh domain. Ini mirip API contract, tetapi untuk komunikasi asynchronous.
OrderCreated
Diterbitkan setelah order valid tersimpan. Dipakai untuk email konfirmasi awal, analytics, dan proses payment instruction.
PaymentSucceeded
Diterbitkan setelah webhook payment valid dan idempotent. Dipakai untuk update order, inventory, email, dan shipping.
StockReserved
Diterbitkan setelah stok berhasil direservasi untuk order. Dipakai untuk audit inventory dan notifikasi stok rendah.
NotificationRequested
Diterbitkan ketika sistem ingin mengirim email, WhatsApp, atau push notification melalui worker terpisah.
Event tidak selalu berarti microservice. Di modular monolith Go, event tetap berguna untuk memisahkan module order, payment, inventory, dan notification tanpa memanggil semuanya secara langsung dari handler checkout.
- cmd/
- api/
- main.go HTTP API
- worker/
- main.go SQS consumer
- internal/
- events/
- event.go envelope dan payload event
- publisher.go interface publisher
- sqs_publisher.go implementasi AWS SQS
- order/
- service.go create order + outbox event
- payment/
- webhook_handler.go validasi payment webhook
- event_handler.go handle payment.succeeded
- inventory/
- event_handler.go handle stock update
- notification/
- event_handler.go kirim email atau WhatsApp
- outbox/
- dispatcher.go publish event tersimpan ke SQS
/v1/checkout Membuat order dan event `order.created` /v1/webhooks/payment Memvalidasi webhook dan membuat event `payment.succeeded` OrderCreated berarti order sudah terjadi. CreateOrder adalah perintah untuk melakukan sesuatu. Jangan campur keduanya.
Publish Event ke SQS
Publisher adalah boundary. Service domain cukup tahu ada Publisher, bukan tahu detail AWS SDK.
Pertama, buat interface kecil. Pola ini konsisten dengan gaya Go: dependency menerima interface yang dibutuhkan, sedangkan implementasi konkret berada di package infrastructure.
internal/events/publisher.gopackage events import "context" type Publisher interface { Publish(ctx context.Context, event Event) error }
Implementasi SQS memakai AWS SDK for Go V2. Message body berisi envelope event. Kita juga melampirkan message attributes event_type dan aggregate_id. Catatan penting: attributes ini hanya berguna bila consumer secara eksplisit memintanya saat receive (lihat Section 05). Tanpa itu, SQS tidak mengembalikan attributes dan publisher sia-sia melampirkannya.
internal/events/sqs_publisher.gopackage events import ( "context" "encoding/json" "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) type SQSPublisher struct { client *sqs.Client queueURL string } func NewSQSPublisher(client *sqs.Client, queueURL string) *SQSPublisher { return &SQSPublisher{client: client, queueURL: queueURL} } func (p *SQSPublisher) Publish(ctx context.Context, event Event) error { body, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal event %s: %w", event.Type, err) } _, err = p.client.SendMessage(ctx, &sqs.SendMessageInput{ QueueUrl: aws.String(p.queueURL), MessageBody: aws.String(string(body)), MessageAttributes: map[string]types.MessageAttributeValue{ "event_type": { DataType: aws.String("String"), StringValue: aws.String(string(event.Type)), }, "aggregate_id": { DataType: aws.String("String"), StringValue: aws.String(event.AggregateID), }, }, }) if err != nil { return fmt.Errorf("send event %s to sqs: %w", event.Type, err) } return nil }
Konfigurasi client SQS biasanya dilakukan di cmd/api/main.go atau wiring layer. Jangan membuat client SQS baru setiap publish, karena client didesain untuk dipakai ulang.
cmd/api/main.gopackage main import ( "context" "log" "os" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/kamu/skincare-backend/internal/events" ) func main() { ctx := context.Background() awsConfig, err := config.LoadDefaultConfig(ctx) if err != nil { log.Fatalf("load aws config: %v", err) } sqsClient := sqs.NewFromConfig(awsConfig) publisher := events.NewSQSPublisher(sqsClient, os.Getenv("ORDER_EVENTS_QUEUE_URL")) _ = publisher }
Bedanya, di Go kamu menulis interface dan implementasi queue secara eksplisit. Tidak ada facade global, sehingga dependency lebih mudah dites.
Subscriber Worker
Subscriber adalah proses worker yang hidup terpisah dari API. Ia melakukan long polling ke SQS, memproses event, lalu delete message hanya setelah sukses.
SQS bekerja dengan model receive dan delete. Saat message diterima, message menjadi invisible selama visibility timeout. Bila worker sukses, worker menghapus message. Bila worker gagal atau mati sebelum delete, message akan muncul lagi setelah timeout dan bisa diproses ulang.
stateDiagram-v2 [*] --> Visible: SendMessage Visible --> InFlight: ReceiveMessage (mulai visibility timeout) InFlight --> Deleted: handler sukses, DeleteMessage InFlight --> Visible: timeout habis / worker crash (redeliver) Visible --> DLQ: receive count > maxReceiveCount Deleted --> [*] DLQ --> [*]
Gambar 1. Siklus hidup satu message SQS. Selama InFlight (sudah di-receive, belum di-delete) message tidak terlihat consumer lain. Kalau handler tidak sempat delete sebelum visibility timeout habis, message kembali Visible dan dikirim ulang. Setelah gagal melebihi maxReceiveCount, ia pindah ke DLQ.
internal/events/handler.gopackage events import "context" type Handler interface { Handle(ctx context.Context, event Event) error } type HandlerFunc func(ctx context.Context, event Event) error func (fn HandlerFunc) Handle(ctx context.Context, event Event) error { return fn(ctx, event) }
Dua hal penting di worker. Pertama, kalau publisher melampirkan message attributes, consumer wajib meminta MessageAttributeNames saat receive, kalau tidak SQS tidak mengembalikannya. Kedua, kita bedakan error yang retriable (downstream 5xx, timeout jaringan) dari error non-retriable (JSON rusak, tipe event tak dikenal). Hanya error retriable yang dibiarkan kembali ke queue untuk dikirim ulang. Error non-retriable adalah poison message: percuma diulang ribuan kali, lebih baik segera dipindahkan agar tidak menyumbat queue.
internal/events/sqs_worker.gopackage events import ( "context" "encoding/json" "errors" "fmt" "log/slog" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) // ErrNonRetriable menandai message yang tidak boleh dikirim ulang // (JSON rusak, tipe event tak dikenal). Message seperti ini langsung di-delete. var ErrNonRetriable = errors.New("non-retriable message") var ErrNoHandler = fmt.Errorf("%w: no handler registered", ErrNonRetriable) type SQSWorker struct { client *sqs.Client queueURL string handlers map[Type]Handler log *slog.Logger } func NewSQSWorker(client *sqs.Client, queueURL string, handlers map[Type]Handler, log *slog.Logger) *SQSWorker { return &SQSWorker{client: client, queueURL: queueURL, handlers: handlers, log: log} } func (w *SQSWorker) Run(ctx context.Context) error { for { if err := ctx.Err(); err != nil { return err } output, err := w.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(w.queueURL), MaxNumberOfMessages: 10, WaitTimeSeconds: 20, // long polling, maksimum 20 detik VisibilityTimeout: 60, MessageAttributeNames: []string{"All"}, // tanpa ini, attributes publisher hilang }) if err != nil { w.log.Error("receive sqs messages", slog.Any("error", err)) time.Sleep(2 * time.Second) continue } for _, msg := range output.Messages { err := w.handleMessage(ctx, msg) if err != nil && !errors.Is(err, ErrNonRetriable) { // Error retriable: JANGAN delete. Biarkan visibility timeout habis // agar SQS mengirim ulang, lalu eskalasi ke DLQ via maxReceiveCount. w.log.Warn("retriable handler failure, leaving message for redelivery", slog.String("message_id", aws.ToString(msg.MessageId)), slog.Any("error", err)) continue } if err != nil { // Error non-retriable (poison): delete agar tidak diulang selamanya. w.log.Error("poison message, deleting", slog.String("message_id", aws.ToString(msg.MessageId)), slog.Any("error", err)) } if _, delErr := w.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: aws.String(w.queueURL), ReceiptHandle: msg.ReceiptHandle, }); delErr != nil { w.log.Error("delete sqs message", slog.Any("error", delErr)) } } } } func (w *SQSWorker) handleMessage(ctx context.Context, msg types.Message) error { var event Event if err := json.Unmarshal([]byte(aws.ToString(msg.Body)), &event); err != nil { return fmt.Errorf("%w: decode event: %v", ErrNonRetriable, err) } // Attributes hanya terisi karena ReceiveMessage meminta MessageAttributeNames. log := w.log.With( slog.String("event_id", event.ID), slog.String("event_type", string(event.Type)), slog.String("aggregate_id", event.AggregateID), ) handler, ok := w.handlers[event.Type] if !ok { return fmt.Errorf("%w: %s", ErrNoHandler, event.Type) } if err := handler.Handle(ctx, event); err != nil { log.Error("handle event failed", slog.Any("error", err)) return fmt.Errorf("handle event %s: %w", event.Type, err) } log.Info("event handled") return nil }
Handler worker wajib idempotent. Jangan asumsikan satu event pasti hanya diproses satu kali. Cara menutup celah ini ada di Section 08 (inbox).
Di AWS SDK Go v2, kebanyakan field opsional bertipe pointer (QueueUrl *string, makanya pakai aws.String). Tapi ketiga field ini bertipe int32 nilai, jadi literal 10, 20, 60 bisa langsung ditulis. Kalau nilainya kamu simpan di variabel, variabel itu harus bertipe int32, bukan int. Long polling WaitTimeSeconds maksimum 20 detik.
Worker di atas memproses message satu per satu. Karena MaxNumberOfMessages bisa sampai 10, satu batch berisi hingga 10 message. Di produksi, kamu bisa memproses batch itu paralel dengan errgroup atau worker pool kecil agar throughput naik, asalkan tiap message tetap punya ReceiptHandle sendiri saat delete. Untuk modul ini sekuensial lebih mudah dibaca.
Kalau kamu kenal BullMQ (Node) atau queue:work (Laravel), polanya mirip: message = job, worker = consumer, maxReceiveCount = attempts, visibility timeout = lock/timeout job, dan DLQ = failed_jobs di Laravel. Bedanya di Go semua loop receive/handle/delete kamu tulis eksplisit, tidak ada framework yang menyembunyikannya.
cmd/worker/main.gopackage main import ( "context" "errors" "log/slog" "os" "os/signal" "syscall" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/kamu/skincare-backend/internal/events" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() awsConfig, err := config.LoadDefaultConfig(ctx) if err != nil { logger.Error("load aws config", slog.Any("error", err)) os.Exit(1) } sqsClient := sqs.NewFromConfig(awsConfig) handlers := buildHandlers(logger) worker := events.NewSQSWorker(sqsClient, os.Getenv("ORDER_EVENTS_QUEUE_URL"), handlers, logger) if err := worker.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Error("run worker", slog.Any("error", err)) os.Exit(1) } } func buildHandlers(logger *slog.Logger) map[events.Type]events.Handler { return map[events.Type]events.Handler{ events.OrderCreated: events.HandlerFunc(func(ctx context.Context, event events.Event) error { logger.Info("order created", slog.String("aggregate_id", event.AggregateID)) return nil }), events.PaymentSucceeded: events.HandlerFunc(func(ctx context.Context, event events.Event) error { logger.Info("payment succeeded", slog.String("aggregate_id", event.AggregateID)) return nil }), } }
log/slog (standar sejak Go 1.21) menulis log sebagai key-value, mirip pino di Node atau Monolog dengan context di Laravel, bukan string mentah seperti console.log. Field seperti event_id dan event_type jadi bisa di-query di CloudWatch atau Loki. Jebakan “Tidak ada observability” di Section 12 justru ditutup dengan kebiasaan ini.
Alur Order Service ke Worker
SQS membantu request utama selesai cepat, tetapi penting memahami bahwa satu queue bukan broadcast ke semua consumer.
flowchart LR
Client["Frontend React"] -->|POST /v1/checkout| API["Go API"]
API --> OrderSvc["Order Service"]
OrderSvc --> DB[("PostgreSQL")]
OrderSvc --> SQS["SQS order-events"]
SQS --> W1["Worker replica 1"]
SQS --> W2["Worker replica 2"]
SQS --> W3["Worker replica 3"]
W1 --> Email["Email Provider"]
W2 --> Inventory["Inventory Module"]
W3 --> Analytics["Analytics"]Gambar 2. Worker di satu SQS queue adalah consumer pool. Mereka membagi pekerjaan, bukan masing-masing menerima salinan event yang sama.
Untuk fanout sungguhan, gunakan EventBridge atau SNS dengan beberapa SQS queue. Satu queue cocok untuk banyak replica dari worker yang sama. Banyak queue cocok untuk banyak tipe subscriber yang masing-masing harus menerima salinan event.
- Beberapa worker bersaing mengambil message.
- Cocok untuk scale-out satu jenis pekerjaan.
- Tidak cocok bila email, inventory, dan shipping semuanya wajib menerima event yang sama.
- Event dicocokkan rule lalu dikirim ke target berbeda.
- Cocok untuk fanout
payment.succeededke beberapa subscriber. - Setiap subscriber punya queue sendiri dan retry policy sendiri.
Mulai dengan satu queue untuk satu kategori pekerjaan. Tambahkan EventBridge ketika fanout dan routing event mulai nyata, bukan sejak hari pertama.
Outbox Pattern
Masalah klasik event-driven adalah dual write: transaksi database sukses, tetapi publish ke SQS gagal, atau sebaliknya.
Tanpa outbox, service order biasanya melakukan dua hal berurutan: insert order ke PostgreSQL, lalu publish event ke SQS. Bila insert order sukses tetapi publish gagal, downstream tidak pernah tahu ada order baru. Bila publish sukses tetapi commit database gagal, downstream menerima event untuk order yang tidak ada.
flowchart TB
subgraph A["a. Insert dulu, publish belakangan"]
A1["INSERT order OK"] --> A2["publish SQS GAGAL"]
A2 --> A3["Event hilang: downstream tak tahu"]
end
subgraph B["b. Publish dulu, commit belakangan"]
B1["publish SQS OK"] --> B2["COMMIT db GAGAL"]
B2 --> B3["Phantom event: order tak ada"]
end
subgraph C["c. Outbox: satu transaksi"]
C1["BEGIN"] --> C2["INSERT order"]
C2 --> C3["INSERT outbox_events"]
C3 --> C4["COMMIT"]
C4 --> C5["Dispatcher publish belakangan"]
endGambar 3. Dua kelas bug dual write. Outbox menyatukan perubahan domain dan event ke dalam satu transaksi, sehingga keduanya commit bersama atau gagal bersama. Publish ke broker baru terjadi setelah commit, lewat dispatcher terpisah.
Pola yang menyimpan event ke tabel database dalam transaksi yang sama dengan perubahan domain, lalu proses terpisah menerbitkan event itu ke broker.
migrations/202606060904_create_outbox_events.sqlCREATE EXTENSION IF NOT EXISTS pgcrypto; CREATE TABLE outbox_events ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), event_type text NOT NULL, aggregate_type text NOT NULL, aggregate_id uuid NOT NULL, payload jsonb NOT NULL, occurred_at timestamptz NOT NULL DEFAULT now(), published_at timestamptz, attempts integer NOT NULL DEFAULT 0, next_attempt_at timestamptz NOT NULL DEFAULT now(), last_error text ); CREATE INDEX idx_outbox_events_ready ON outbox_events (next_attempt_at, occurred_at) WHERE published_at IS NULL;
Sekarang order dan event disimpan dalam satu transaksi PostgreSQL. Request checkout tidak perlu publish langsung ke SQS.
internal/order/service.gopackage order import ( "context" "encoding/json" "fmt" "github.com/jackc/pgx/v5/pgxpool" "github.com/kamu/skincare-backend/internal/events" ) type Service struct { db *pgxpool.Pool } func NewService(db *pgxpool.Pool) *Service { return &Service{db: db} } type CreateOrderInput struct { CustomerID string Total int64 Currency string } type Order struct { ID string CustomerID string Total int64 Currency string } func (s *Service) CreateOrder(ctx context.Context, input CreateOrderInput) (Order, error) { // TxOptions kosong, jadi Begin lebih idiomatik daripada BeginTx(ctx, pgx.TxOptions{}). tx, err := s.db.Begin(ctx) if err != nil { return Order{}, fmt.Errorf("begin create order tx: %w", err) } defer tx.Rollback(ctx) var order Order err = tx.QueryRow(ctx, ` INSERT INTO orders (customer_id, total, currency, status, created_at) VALUES ($1, $2, $3, 'pending_payment', now()) RETURNING id, customer_id, total, currency `, input.CustomerID, input.Total, input.Currency).Scan(&order.ID, &order.CustomerID, &order.Total, &order.Currency) if err != nil { return Order{}, fmt.Errorf("insert order: %w", err) } payload, err := json.Marshal(events.OrderCreatedPayload{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Currency: order.Currency, }) if err != nil { return Order{}, fmt.Errorf("marshal order.created payload: %w", err) } // occurred_at memakai DEFAULT now() dari kolom: satu sumber waktu (DB), // jadi tidak ada dua jam yang bersaing antara Go dan PostgreSQL. _, err = tx.Exec(ctx, ` INSERT INTO outbox_events (event_type, aggregate_type, aggregate_id, payload) VALUES ($1, 'order', $2, $3) `, events.OrderCreated, order.ID, payload) if err != nil { return Order{}, fmt.Errorf("insert outbox event: %w", err) } if err := tx.Commit(ctx); err != nil { return Order{}, fmt.Errorf("commit create order tx: %w", err) } return order, nil }
Dispatcher berjalan di worker terpisah. Ia mengambil event yang belum dipublish, mengirim ke SQS, lalu menandai published_at.
internal/outbox/dispatcher.gopackage outbox import ( "context" "encoding/json" "fmt" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/kamu/skincare-backend/internal/events" ) type Dispatcher struct { db *pgxpool.Pool publisher events.Publisher } func NewDispatcher(db *pgxpool.Pool, publisher events.Publisher) *Dispatcher { return &Dispatcher{db: db, publisher: publisher} } type rowEvent struct { ID string EventType events.Type AggregateType string AggregateID string Payload json.RawMessage OccurredAt time.Time } func (d *Dispatcher) DispatchOnce(ctx context.Context, limit int) error { tx, err := d.db.Begin(ctx) if err != nil { return fmt.Errorf("begin outbox tx: %w", err) } defer tx.Rollback(ctx) rows, err := tx.Query(ctx, ` SELECT id, event_type, aggregate_type, aggregate_id, payload, occurred_at FROM outbox_events WHERE published_at IS NULL AND next_attempt_at <= now() ORDER BY occurred_at LIMIT $1 FOR UPDATE SKIP LOCKED `, limit) if err != nil { return fmt.Errorf("select outbox events: %w", err) } defer rows.Close() for rows.Next() { var item rowEvent if err := rows.Scan(&item.ID, &item.EventType, &item.AggregateType, &item.AggregateID, &item.Payload, &item.OccurredAt); err != nil { return fmt.Errorf("scan outbox event: %w", err) } event := events.Event{ ID: item.ID, Type: item.EventType, AggregateType: item.AggregateType, AggregateID: item.AggregateID, OccurredAt: item.OccurredAt, Payload: item.Payload, } if err := d.publisher.Publish(ctx, event); err != nil { _, updateErr := tx.Exec(ctx, ` UPDATE outbox_events SET attempts = attempts + 1, next_attempt_at = now() + interval '30 seconds', last_error = $2 WHERE id = $1 `, item.ID, err.Error()) if updateErr != nil { return fmt.Errorf("record publish failure: %w", updateErr) } continue } _, err = tx.Exec(ctx, ` UPDATE outbox_events SET published_at = now(), last_error = NULL WHERE id = $1 `, item.ID) if err != nil { return fmt.Errorf("mark outbox event published: %w", err) } } if err := rows.Err(); err != nil { return fmt.Errorf("iterate outbox events: %w", err) } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit outbox tx: %w", err) } return nil }
Bila publish sukses tetapi update published_at gagal, dispatcher bisa mengirim event yang sama lagi. Consumer tetap harus aman terhadap duplikasi. Inilah pasangan wajib outbox, dibahas di Section 08.
Saat insert, kolom id tidak diisi dari Go; ia diisi gen_random_uuid() oleh PostgreSQL. Dispatcher membaca id itu kembali ke Event.ID. Karena nilainya stabil dan ikut terbawa ke message, Event.ID inilah kunci yang dipakai consumer untuk dedup di tabel inbox. Satu event, satu UUID, dari outbox sampai inbox.
FOR UPDATE SKIP LOCKED mengunci baris yang sedang diproses. Di kode di atas, publish() (panggilan jaringan ke SQS) terjadi sambil baris masih terkunci dan transaksi terbuka. Untuk volume kecil ini aman dan mudah dibaca. Saat throughput naik, batasi limit, atau publish dulu lalu update status agar jendela kunci tidak ikut menunggu latency broker. Lock-hold-time yang panjang menghambat dispatcher lain.
Idempotency dan Inbox
Outbox menjamin event tidak hilang. Inbox menjamin event tidak diproses dua kali. Keduanya pasangan, bukan pilihan.
SQS Standard adalah at-least-once: satu event bisa sampai lebih dari sekali, entah karena dispatcher mengirim ulang (publish sukses tapi published_at gagal di-update), karena worker crash sebelum delete, atau karena SQS sendiri sesekali menduplikasi. Karena itu, “consumer wajib idempotent” yang kita ulang sejak Section 05 harus benar-benar diwujudkan, bukan sekadar diharapkan.
Operasi yang aman diulang: menjalankannya dua kali dengan input sama memberi hasil akhir yang sama dengan menjalankannya sekali.
Tabel di sisi consumer yang mencatat event_id yang sudah diproses dengan unique constraint, dicek di dalam transaksi yang sama dengan side effect, sehingga duplikat ditolak diam-diam.
Idenya sederhana: sebelum mengerjakan side effect, coba insert event_id ke tabel processed_events. Bila berhasil, ini event baru, kerjakan side effect di transaksi yang sama lalu commit. Bila kena unique violation, event ini sudah pernah diproses, lewati saja dan hapus message dari queue.
migrations/202606061015_create_processed_events.sqlCREATE TABLE processed_events ( event_id uuid PRIMARY KEY, handler text NOT NULL, processed_at timestamptz NOT NULL DEFAULT now() );
Kunci primernya adalah event_id, yaitu UUID baris outbox yang dibawa di Event.ID. Bila satu event punya beberapa handler (inventory dan notification), tambahkan handler ke primary key agar tiap handler punya catatan dedup sendiri.
migrations (varian: dedup per handler)ALTER TABLE processed_events DROP CONSTRAINT processed_events_pkey; ALTER TABLE processed_events ADD PRIMARY KEY (event_id, handler);
flowchart TD
Recv["Event tiba (event.ID dari outbox)"] --> Begin["BEGIN tx"]
Begin --> Ins["INSERT event_id ke processed_events"]
Ins --> Conflict{"unique conflict?"}
Conflict -->|ya, sudah diproses| Skip["ROLLBACK, DeleteMessage (skip)"]
Conflict -->|tidak| Work["jalankan side effect di tx yang sama"]
Work --> Commit["COMMIT"]
Commit --> Del["DeleteMessage"]Gambar 4. Alur inbox dedup. Insert event_id dan side effect berada di satu transaksi, jadi keduanya sukses bersama atau gagal bersama. Inilah yang mengubah at-least-once menjadi effectively-once di sisi efek.
Berikut helper yang membungkus pola ini. Handler cukup memanggilnya, lalu menaruh side effect di dalam callback. Kuncinya INSERT ... ON CONFLICT DO NOTHING: kalau baris penanda sudah ada, Exec melaporkan RowsAffected() == 0, dan itulah sinyal duplikat. Tidak perlu query kedua.
internal/events/inbox.gopackage events import ( "context" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // ProcessOnce menjalankan fn tepat sekali untuk pasangan (eventID, handler). // Insert penanda dan side effect berada di transaksi yang sama, sehingga // keduanya commit bersama atau gagal bersama. func ProcessOnce(ctx context.Context, db *pgxpool.Pool, eventID, handler string, fn func(pgx.Tx) error) error { tx, err := db.Begin(ctx) if err != nil { return fmt.Errorf("begin inbox tx: %w", err) } defer tx.Rollback(ctx) tag, err := tx.Exec(ctx, ` INSERT INTO processed_events (event_id, handler) VALUES ($1, $2) ON CONFLICT (event_id, handler) DO NOTHING `, eventID, handler) if err != nil { return fmt.Errorf("claim event %s: %w", eventID, err) } if tag.RowsAffected() == 0 { // Penanda sudah ada: event ini sudah diproses handler ini. // Commit kosong, biarkan worker delete message tanpa mengulang side effect. return tx.Commit(ctx) } if err := fn(tx); err != nil { return fmt.Errorf("run handler side effect: %w", err) } return tx.Commit(ctx) }
Sekarang handler payment dari Section 10 cukup membungkus side effect-nya. Karena MarkPaid berjalan di tx yang sama dengan insert penanda, dua delivery untuk satu Event.ID hanya menghasilkan satu update order.
internal/payment/event_handler.go (dengan inbox)func (h *PaymentSucceededHandler) Handle(ctx context.Context, event events.Event) error { var payload events.PaymentSucceededPayload if err := json.Unmarshal(event.Payload, &payload); err != nil { return fmt.Errorf("%w: decode payment.succeeded: %v", events.ErrNonRetriable, err) } return events.ProcessOnce(ctx, h.db, event.ID, "payment.mark_paid", func(tx pgx.Tx) error { return h.orders.MarkPaidTx(ctx, tx, payload.OrderID, payload.PaymentID) }) }
Kalau penanda di-insert di transaksi terpisah dari side effect, kamu kembali ke dual write: bisa saja penanda commit tapi side effect gagal, lalu event yang sebenarnya belum selesai dianggap sudah selesai. Taruh keduanya dalam satu tx.
Di Laravel, ShouldBeUnique mencegah job kembar antri, tapi tidak menjamin efek hanya terjadi sekali bila job tetap lolos dua kali. Inbox di sisi consumer adalah jaminan yang lebih kuat: keputusan “sudah pernah?” diambil di dalam transaksi database yang sama dengan efeknya, bukan di lapisan antrian.
Routing dengan EventBridge
SQS adalah queue. EventBridge adalah router event. Gunakan sesuai masalahnya.
Jika hanya ada satu worker untuk memproses order.created, SQS cukup. Jika payment.succeeded harus diterima oleh inventory, email, shipping, dan analytics secara terpisah, EventBridge membuat routing lebih bersih. Order service publish ke EventBridge, lalu rule mengirim event ke queue masing-masing subscriber.
flowchart LR PaymentSvc["Payment Module"] -->|payment.succeeded| EB["EventBridge event bus"] EB -->|rule: inventory| Q1["SQS inventory-events"] EB -->|rule: notification| Q2["SQS notification-events"] EB -->|rule: shipping| Q3["SQS shipping-events"] EB -->|rule: analytics| Q4["SQS analytics-events"] Q1 --> W1["Inventory Worker"] Q2 --> W2["Notification Worker"] Q3 --> W3["Shipping Worker"] Q4 --> W4["Analytics Worker"]
Gambar 5. EventBridge cocok saat satu event perlu dirutekan ke banyak target tanpa membuat publisher tahu semua subscriber.
SQS saja
Cocok untuk satu kategori pekerjaan dengan banyak replica worker, misalnya notification-events.
EventBridge + SQS
Cocok untuk fanout dan routing event berdasarkan source, detail-type, atau isi payload.
Jangan terlalu cepat
Untuk monolith kecil, outbox + satu queue sudah cukup. EventBridge masuk saat kebutuhan routing nyata.
Contoh event untuk EventBridge biasanya memakai field source, detail-type, dan detail. Detail tetap bisa memakai payload event yang sama agar kontrak domain tidak bercabang terlalu jauh.
eventbridge-payment-succeeded.json{ "source": "skincare.payment", "detail-type": "payment.succeeded", "detail": { "event_id": "2fb6d62a-1d20-4c50-bc0d-18e2abf40b7a", "order_id": "9faad6d5-f191-4d62-97e4-574ffb506d39", "payment_id": "3da133b5-9b0b-4c4e-b801-a8a7e3c35c1a", "transaction_id": "MID-20260606-0001", "paid_amount": 299000, "paid_at": "2026-06-06T12:00:00Z" } }
Karena publisher hanyalah boundary (Section 04), kita bisa membuat implementasi kedua dari interface Publisher yang sama untuk EventBridge. Dispatcher outbox tidak peduli ujungnya SQS atau EventBridge, ia cukup memanggil Publish.
internal/events/eventbridge_publisher.gopackage events import ( "context" "encoding/json" "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/eventbridge" "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" ) type EventBridgePublisher struct { client *eventbridge.Client busName string source string } func NewEventBridgePublisher(client *eventbridge.Client, busName, source string) *EventBridgePublisher { return &EventBridgePublisher{client: client, busName: busName, source: source} } func (p *EventBridgePublisher) Publish(ctx context.Context, event Event) error { // Detail WAJIB string JSON, bukan objek. Marshal dulu, lalu aws.String. detail, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal event %s: %w", event.Type, err) } _, err = p.client.PutEvents(ctx, &eventbridge.PutEventsInput{ Entries: []types.PutEventsRequestEntry{ { EventBusName: aws.String(p.busName), Source: aws.String(p.source), // mis. "skincare.payment" DetailType: aws.String(string(event.Type)), // mis. "payment.succeeded" Detail: aws.String(string(detail)), // string JSON, bukan map Resources: []string{event.AggregateID}, }, }, }) if err != nil { return fmt.Errorf("put event %s to eventbridge: %w", event.Type, err) } return nil }
Trap paling sering di EventBridge SDK: Detail bertipe *string, isinya harus string JSON yang sudah di-marshal (aws.String(string(detail))), bukan struct atau map. EventBridge memperlakukan Detail sebagai dokumen JSON mentah. Salah di sini menyebabkan rule pattern tidak pernah cocok karena bentuk yang dikirim bukan JSON yang valid.
EventEmitter di Node atau event bus di React itu in-process dan fire-and-forget: kalau tidak ada listener saat event terbit, event hilang tanpa jejak. SQS dan EventBridge adalah lintas proses dengan durabilitas at-least-once: event tersimpan sampai consumer berhasil. Justru karena lebih kuat, kontraknya harus serius, versioning, idempotency, dan observability wajib dipikirkan, bukan opsional seperti di frontend.
Contoh PaymentSucceeded
PaymentSucceeded adalah event paling penting di shop, karena uang sudah diterima dan beberapa perubahan bisnis harus terjadi.
Alur yang aman: webhook payment divalidasi, payment record dibuat idempotent, event payment.succeeded disimpan ke outbox, lalu worker subscriber memproses side effect sesuai tanggung jawabnya.
sequenceDiagram participant Gateway as Payment Gateway participant API as Go API participant DB as PostgreSQL participant Outbox as Outbox Dispatcher participant Bus as SQS/EventBridge participant Worker as Workers Gateway->>API: POST /v1/webhooks/payment API->>API: verify signature + replay protection API->>DB: BEGIN API->>DB: insert payment if transaction_id unique API->>DB: insert outbox payment.succeeded API->>DB: COMMIT API-->>Gateway: 200 OK Outbox->>DB: read unpublished events Outbox->>Bus: publish payment.succeeded Bus->>Worker: deliver event Worker->>DB: update order, inventory, notification, shipping
Gambar 6. Webhook tidak langsung melakukan semua side effect. Ia hanya membuat fakta bisnis yang bisa diproses ulang.
Handler payment menggabungkan dua hal yang sudah kita bangun: dedup inbox dari Section 08 dan side effect domain. MarkPaidTx menerima pgx.Tx agar update order berada di transaksi yang sama dengan penanda dedup, sehingga dua delivery hanya menghasilkan satu perubahan.
internal/payment/event_handler.gopackage payment import ( "context" "encoding/json" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/kamu/skincare-backend/internal/events" ) type OrderRepository interface { MarkPaidTx(ctx context.Context, tx pgx.Tx, orderID, paymentID string) error } type PaymentSucceededHandler struct { db *pgxpool.Pool orders OrderRepository } func NewPaymentSucceededHandler(db *pgxpool.Pool, orders OrderRepository) *PaymentSucceededHandler { return &PaymentSucceededHandler{db: db, orders: orders} } func (h *PaymentSucceededHandler) Handle(ctx context.Context, event events.Event) error { var payload events.PaymentSucceededPayload if err := json.Unmarshal(event.Payload, &payload); err != nil { // JSON rusak tidak akan membaik dengan retry: tandai non-retriable. return fmt.Errorf("%w: decode payment.succeeded: %v", events.ErrNonRetriable, err) } return events.ProcessOnce(ctx, h.db, event.ID, "payment.mark_paid", func(tx pgx.Tx) error { return h.orders.MarkPaidTx(ctx, tx, payload.OrderID, payload.PaymentID) }) }
Untuk inventory, email, dan shipping, buat handler terpisah. Jangan masukkan semua side effect ke satu handler raksasa karena retry dan ownership menjadi sulit.
Ada dua gaya untuk notifikasi, dan keduanya sah. Gaya pertama: handler notification ikut mendengarkan payment.succeeded dan langsung mengirim email. Sederhana, cocok bila notifikasi hanya dipicu satu jenis event.
internal/notification/event_handler.gopackage notification import ( "context" "encoding/json" "fmt" "github.com/kamu/skincare-backend/internal/events" ) type EmailSender interface { SendPaymentSuccessEmail(ctx context.Context, orderID string) error } type PaymentSucceededHandler struct { email EmailSender } func NewPaymentSucceededHandler(email EmailSender) *PaymentSucceededHandler { return &PaymentSucceededHandler{email: email} } func (h *PaymentSucceededHandler) Handle(ctx context.Context, event events.Event) error { var payload events.PaymentSucceededPayload if err := json.Unmarshal(event.Payload, &payload); err != nil { return fmt.Errorf("%w: decode payment.succeeded: %v", events.ErrNonRetriable, err) } if err := h.email.SendPaymentSuccessEmail(ctx, payload.OrderID); err != nil { return fmt.Errorf("send payment success email: %w", err) } return nil }
Gaya kedua menjadikan NotificationRequested event kelas satu. Handler payment tidak tahu cara kirim email; ia hanya menerbitkan notification.requested ke outbox, lalu satu notification worker yang memutuskan channel dan template. Ini berguna saat banyak event berbeda (order dibuat, payment sukses, refund) semuanya berakhir sebagai notifikasi, sehingga logika kirim terpusat di satu tempat.
internal/notification/requested_handler.gofunc (h *RequestedHandler) Handle(ctx context.Context, event events.Event) error { var payload events.NotificationRequestedPayload if err := json.Unmarshal(event.Payload, &payload); err != nil { return fmt.Errorf("%w: decode notification.requested: %v", events.ErrNonRetriable, err) } switch payload.Channel { case "email": return h.email.Send(ctx, payload.Template, payload.CustomerID, payload.OrderID) case "whatsapp": return h.wa.Send(ctx, payload.Template, payload.CustomerID, payload.OrderID) default: // channel tak dikenal tidak akan membaik dengan retry. return fmt.Errorf("%w: unknown channel %q", events.ErrNonRetriable, payload.Channel) } }
Gaya pertama (dengar payment.succeeded langsung) lebih sedikit kode. Gaya kedua (terbitkan notification.requested) memusatkan kebijakan notifikasi dan memungkinkan banyak sumber memicu notifikasi yang sama. Mulai dari gaya pertama, naik ke kedua saat jenis pemicu notifikasi bertambah.
Untuk amount final, order total, dan ownership customer, worker boleh mengambil ulang dari database sendiri. Event cukup membawa identifier dan fakta minimal.
Hands-on Ringan
Hands-on ini membuat satu jalur minimum: create order, simpan outbox, publish ke SQS, worker membaca event, lalu dedup dengan inbox.
Tambahkan internal/events/event.go, publisher.go, dan sqs_publisher.go.
Jalankan migration outbox_events dan processed_events di database lokal dan CI.
CreateOrder menyimpan event order.created di transaksi yang sama dengan insert order.
Worker memanggil DispatchOnce berkala untuk publish event yang belum terkirim.
Worker lain membaca queue, dispatch event ke handler sesuai event.Type, lalu delete message setelah sukses atau saat poison.
Handler memanggil ProcessOnce(event.ID, ...) agar side effect aman terhadap duplikat.
Terminalgo get github.com/aws/aws-sdk-go-v2/config@latest go get github.com/aws/aws-sdk-go-v2/service/sqs@latest go get github.com/aws/aws-sdk-go-v2/service/eventbridge@latest go get github.com/jackc/pgx/v5@latest go test ./...
Untuk local development, kamu bisa memakai LocalStack, ElasticMQ, atau antrian in-memory kecil untuk test unit. Jangan jadikan LocalStack syarat unit test, karena unit test harus tetap cepat. Pakai LocalStack untuk integration test worker dan pipeline CI terpisah.
internal/events/memory_publisher_test.gopackage events import "context" type MemoryPublisher struct { Events []Event } func (p *MemoryPublisher) Publish(ctx context.Context, event Event) error { p.Events = append(p.Events, event) return nil }
Unit test service cukup memastikan event masuk outbox. Integration test memastikan dispatcher benar-benar mengirim ke SQS compatible queue.
Jebakan Umum
Event-driven membuat sistem lebih scalable, tetapi juga menambah failure mode baru. Jangan hanya mengejar kata asynchronous.
Menganggap SQS sebagai broadcast
Satu queue dengan banyak consumer adalah work queue. Untuk semua subscriber menerima event yang sama, pakai beberapa queue melalui EventBridge atau SNS.
Tidak idempotent
SQS Standard bisa mengirim message lebih dari sekali. Implementasikan inbox (processed_events) yang dicek di dalam transaksi handler, jangan cuma berniat idempotent di komentar.
Publish sebelum commit
Consumer bisa membaca event sebelum data order benar-benar committed. Outbox menghindari kelas bug ini.
Payload terlalu besar
Event bukan snapshot seluruh database. Simpan data minimal dan ambil ulang detail sensitif dari database bila perlu.
Handler terlalu besar
Satu handler yang melakukan email, inventory, shipping, dan analytics membuat retry tidak presisi. Pisahkan ownership side effect.
Tidak ada observability
Log event_id, event_type, aggregate_id, attempts, dan error dengan slog. Tanpa ini, debugging async flow akan melelahkan.
FIFO vs Standard
Standard queue adalah default: throughput tinggi, at-least-once, urutan best-effort, dan kadang duplikat. FIFO queue menjaga urutan dan exactly-once processing, tetapi exactly-once-nya hanya berlaku di dalam jendela deduplikasi 5 menit, bukan jaminan global selamanya. Pilih berdasarkan apakah urutan per-aggregate penting.
- Throughput sangat tinggi, urutan best-effort.
- At-least-once, bisa duplikat, jadi inbox tetap wajib.
- Cocok untuk side effect yang tidak peduli urutan, mis. kirim email.
MessageGroupIdmenjaga urutan per grup, mis.order_id.MessageDeduplicationIdmenolak duplikat dalam jendela 5 menit.- Cocok saat transisi status order harus berurutan (pending, paid, shipped).
Pakai FIFO hanya saat urutan per-aggregate benar-benar penting, dan jadikan MessageGroupId = order_id agar order berbeda tetap diproses paralel sementara satu order tetap berurutan. Untuk sebagian besar side effect skincare shop, Standard plus inbox sudah cukup.
Visibility timeout dan dua lapis retry
VisibilityTimeout: 60 di worker harus lebih panjang dari durasi terburuk handler. Kalau handler memerlukan 80 detik tapi visibility cuma 60, SQS mengira worker mati dan mengirim ulang message yang sebenarnya masih diproses, menghasilkan eksekusi ganda. Untuk handler yang kadang lama, perpanjang VisibilityTimeout, atau panggil ChangeMessageVisibility untuk memperpanjang sambil bekerja.
flowchart LR
subgraph L1["Lapis 1: retry outbox relay"]
OB["outbox_events"] -->|next_attempt_at backoff| DSP["Dispatcher"]
DSP -->|publish gagal| OB
end
DSP -->|SendMessage| SQS["SQS queue"]
subgraph L2["Lapis 2: redelivery SQS"]
SQS -->|ReceiveMessage| WK["Worker handler"]
WK -->|gagal, visibility timeout habis| SQS
end
SQS -->|receiveCount > maxReceiveCount| DLQ["Dead-letter queue"]Gambar 7. Ada dua loop retry yang sering tertukar. Lapis 1 adalah dispatcher outbox mengirim ulang ke SQS pakai next_attempt_at. Lapis 2 adalah SQS mengirim ulang ke worker lewat visibility timeout. Keduanya independen; DLQ hanya menampung kegagalan lapis 2.
DLQ dipasang lewat RedrivePolicy di source queue dengan maxReceiveCount, misalnya 5. Setelah message gagal di-receive lebih dari itu, SQS memindahkannya ke DLQ. Setel retention DLQ lebih panjang dari source agar ada waktu investigasi, dan ingat source FIFO butuh DLQ FIFO juga. DLQ bukan tempat sampah, melainkan sinyal ada bug atau data yang perlu dilihat manusia.
Untuk perubahan yang harus atomic dalam satu database, tetap gunakan transaksi PostgreSQL. Event dipakai untuk side effect dan integrasi antar boundary.
Peta padanan dari Laravel
| Laravel | Modul ini (Go + SQS) |
|---|---|
| Event + Listener | events.Event + Handler |
dispatch() / ShouldQueue | SendMessage ke SQS |
queue:work / Horizon | loop consumer di cmd/worker |
failed_jobs | Dead-letter queue (DLQ) |
attempts / backoff | maxReceiveCount + visibility timeout |
ShouldBeUnique | inbox processed_events |
Di Laravel banyak orang memakai DB::afterCommit atau $event->afterCommit agar job baru di-enqueue setelah commit. Itu memperkecil celah, tapi tidak menutupnya: kalau proses mati setelah commit tapi sebelum enqueue, job hilang. Outbox menutup celah ini karena event ikut ter-commit di transaksi yang sama, dan pengiriman ke broker dijamin oleh dispatcher yang membaca dari tabel, bukan dari memori proses.
Ringkasan & Poin Penting
Event-driven architecture membuat backend skincare lebih responsif dan siap scaling, asalkan event diperlakukan sebagai kontrak bisnis yang serius.
Yang Wajib Menempel
- Berpindah ke event berarti handler checkout cukup mencatat fakta (order dibuat), lalu side effect bereaksi di luar request, dan ini hanya untuk pekerjaan yang boleh telat.
- Domain event adalah fakta bisnis lampau dengan payload typed, misalnya
OrderCreated,PaymentSucceeded,StockReserved, danNotificationRequested. - Publisher adalah boundary: implementasi SQS dan EventBridge memenuhi interface yang sama, dan
DetailEventBridge wajib string JSON. - Worker melakukan long polling, membedakan error retriable dari poison, lalu delete message hanya saat sukses atau saat poison.
- Outbox menutup dual write antara PostgreSQL dan broker; inbox (
processed_events) menutup duplikasi at-least-once di sisi consumer. - Ada dua lapis retry: relay outbox (
next_attempt_at) dan redelivery SQS (visibility timeout), dengan DLQ viamaxReceiveCount. - Standard plus inbox cukup untuk kebanyakan kasus; FIFO dengan
MessageGroupIddipakai saat urutan per-aggregate penting.
Untuk proyek online shop skincare, modul ini mengubah checkout dari flow sinkron yang rapuh menjadi flow yang lebih tahan gangguan. Order dibuat dan tersimpan lebih dulu, lalu event menggerakkan pekerjaan lanjutan. Langkah berikutnya di Roadmap 9 adalah melihat bagaimana pola scaling ini berhubungan dengan observability, retry policy, DLQ, dan keputusan kapan sebuah modular monolith perlu dipecah menjadi service terpisah.
Progress disimpan lokal di browser ini.