Skip to main content

2 posts tagged with "Go"

View All Tags

Golang에는 Set이 없어요...

· 2 min read

image

Go 언어는 기본적으로 set 데이터 타입을 제공하지 않습니다.

그래서 편리하게 사용하던 set을 사용하고 싶다면, 직접 구현해야 합니다!

막상 갑자기 직접 구현하려니 마음이 답답한데, Go에서 제공하는 Map 데이터 타입을 이용해 쉽게 구현할 수 있습니다.

Map은 키, 값을 한쌍으로 갖기 때문에, 키가 중복되면 안됩니다. 

이 원리를 이용하면 쉽게 구현됩니다.

map[type]struct 를 이용하여 구현하게 됩니다. struct 는 빈 구조체로서 메모리를 차지하지 않게 됩니다. 

그래서 map[type]struct 를 이용하면 값 없이 키만 저장할 때 사용하기에 제격입니다.

이 때, type에 대해 T comparable을 이용하면 여러 타입의 Set을 소화할 수 있도록 만들 수 있습니다.

아래는 구현한 내용입니다. 

package main

type Set[T comparable] map[T]struct{}

func NewSet[T comparable](values []T) Set[T] {
set := make(Set[T])
set.Add(values)
return set
}

func (s Set[T]) Add(values []T) {
for _, value := range values {
s[value] = struct{}{}
}
}

func (s Set[T]) Remove(value T) {
delete(s, value)
}

func (s Set[T]) Contains(value T) bool {
_, exists := s[value]
return exists
}

func (s Set[T]) Size() int {
return len(s)
}

func (s Set[T]) ToSlice() []T {
slice := make([]T, 0, len(s))
for value := range s {
slice = append(slice, value)
}
return slice
}

간단히 사용하려면 사실 설명을 보지 않고 바로 복붙해서 사용해도 됩니다.

NewSet은 slice(array)를 입력 받아 Add를 통해 Set 구조체에 넣어줍니다.

Add도 slice(array)를 입력 받아 해당 Set 구조체에 value를 키값으로 한 빈 구조체를 설정해 줍니다.

Remove는 키 값을 제거해 주고 Contains는 키 값이 존재하는지 확인해 true, false로 반환하고 Size는 키의 개수를 확인해 줍니다.

그리고 나중에 다시 slice(array)로 변경하고자 할 때는 ToSlice를 이용해 slice로 변환합니다.

아래는 실제 사용 예시입니다.

package main

func main() {
// 빈 Set 생성 후 슬라이스로 추가
s := NewSet([]int{1, 2, 3}) // 여러 값 추가

// 추가적인 값들을 슬라이스로 추가
moreValues := []int{4, 5, 6}
s.Add(moreValues) // 슬라이스를 전달하여 추가

fmt.Println(s.Contains(1)) // true
fmt.Println(s.Contains(6)) // true
fmt.Println(s.Contains(7)) // false

fmt.Println(s.Size()) // 6

s.Remove(1)
fmt.Println(s.Contains(1)) // false

// 현재 Set의 값을 배열로 변환하여 출력
fmt.Println(s.ToSlice()) // [2 3 4 5 6]
}

Set을 사용하는 것은 본인이 편리한 방식으로 커스텀 해서 구현해 사용해도 됩니다.

도움이 되었으면 좋겠네요. 감사합니다!

Go의 이벤트 중심 아키텍처

· 13 min read

RabbitMQ, 도메인 중심 설계 및 클린 아키텍처 사용

EDA(이벤트 중심 아키텍처)는 애플리케이션의 느슨한 결합, 확장성, 독립성 및 복원성을 활성화하기 위해 이벤트의 생성, 감지 및 소비를 강조하는 소프트웨어 디자인 패턴입니다.

image

이벤트란 무엇인가요?

주황색 포스트로 표시되는 이벤트는 과거 에 발생한 중요한 사실이며 일반적으로 UserCreated, OrderPaid, InvoiceCanceled 등과 같은 명령을 실행한 결과입니다.

명령이란 무엇입니까?

파란색 포스트로 표시되는 명령은 시스템의 작업 또는 의도이며 항상 명령형 모드입니다(예: CreateUser, PlaceOrder, PayOrder, GetProducts 등).

배우란 무엇인가?

노란색 포스트로 표시되는 행위자는 명령을 실행하는 사람입니다(예: 사용자, 고객, 판매자, 구매자, 시스템 등).

어디서부터 시작하나요?

이벤트 모델링 !!!

문제:

image

내가 제안한 솔루션:

image

직접 체험해 보세요

폴더 구조(여기서는 Clean Arch 및 DDD 레이어 개념을 사용하고 있습니다)

image

이벤트 만들기

내부/도메인/이벤트 폴더 에 이벤트 order_created_event.go를 생성합니다.

package event

type OrderCreatedEvent struct {
Id string
Items []OrderItem
TotalPrice float64
Status string
}

type OrderItem struct {
ProductName string
Quantity int
TotalPrice float64
}

내부/도메인/이벤트 폴더 에 이벤트 order_paid_event.go 생성

package event

import (
"time"
)

type OrderPaidEvent struct {
OrderId string
PaidValue float64
PaymentDate time.Time
}

엔터티 만들기

먼저 uuid 패키지를 사용하여 ID를 생성하고 셸에서 실행합니다.

go mod init eda-example
go get github.com/google/uuid

내부/도메인/엔티티 폴더 에 엔터티 order_item_entity.go를 생성합니다.

package entity

type OrderItemEntity struct {
productName string
productPrice float64
quantity int
}

func NewOrderItemEntity(productName string, productPrice float64, quantity int) *OrderItemEntity {
return &OrderItemEntity{
productName: productName,
productPrice: productPrice,
quantity: quantity,
}
}

// getters
func (o *OrderItemEntity) GetProductName() string {
return o.productName
}

func (o *OrderItemEntity) GetProductPrice() float64 {
return o.productPrice
}

func (o *OrderItemEntity) GetQuantity() int {
return o.quantity
}

func (o *OrderItemEntity) GetTotalPrice() float64 {
return o.productPrice * float64(o.quantity)
}

내부/도메인/엔티티 폴더 에 엔터티 order_entity.go를 생성합니다.

package entity

import (
"errors"
"github.com/google/uuid"
)

const (
OrderStatusPending = "pending"
OrderStatusPaid = "paid"
)

type OrderEntity struct {
id string
status string
items []*OrderItemEntity
totalPrice float64
paidValue float64
}

func NewOrderEntity() (*OrderEntity, error) {
return &OrderEntity{
id: uuid.New().String(),
status: OrderStatusPending,
}, nil
}

// to populate with an existing order
func RestoreOrderEntity(id, status string) (*OrderEntity, error) {
return &OrderEntity{
id: id,
status: status,
}, nil
}

func (o *OrderEntity) AddItem(item *OrderItemEntity) {
o.items = append(o.items, item)
o.totalPrice += item.GetTotalPrice()
}

func (o *OrderEntity) Pay(value float64) error {
if value < o.totalPrice {
return errors.New("value is less than the total price")
}
o.paidValue = value
o.status = OrderStatusPaid
return nil
}

// getters
func (o *OrderEntity) GetItems() []*OrderItemEntity {
return o.items
}

func (o *OrderEntity) GetTotalPrice() float64 {
return o.totalPrice
}

func (o *OrderEntity) GetID() string {
return o.id
}

func (o *OrderEntity) GetStatus() string {
return o.status
}

게시자 인터페이스 만들기

내부/도메인/대기열 폴더 에 인터페이스 publisher**.go를 생성합니다.**

package queue

import "context"

type Publisher interface {
Publish(ctx context.Context, body interface{}) error
}

DTO 생성

내부/응용 프로그램/dto 폴더 에 dto create_order_dto.go를 생성합니다.

package dto

type CreateOrderDTO struct {
Items []Item `json:"items"`
}

type Item struct {
ProductId string `json:"product_id"`
Qtd int `json:"qtd"`
}

명령(또는 사용 사례) 만들기

내부/애플리케이션/유스케이스 폴더 에 유스케이스 create_order_usecase.go 생성

package usecase

import (
"context"
"fmt"
"eda-example/internal/application/dto"
"eda-example/internal/domain/entity"
"eda-example/internal/domain/event"
"eda-example/internal/domain/queue"
)

type CreateOrderUseCase struct {
publisher queue.Publisher
}

func NewCreateOrderUseCase(publisher queue.Publisher) *CreateOrderUseCase {
return &CreateOrderUseCase{
publisher: publisher,
}
}

func (u *CreateOrderUseCase) Execute(ctx context.Context, input dto.CreateOrderDTO) error {
fmt.Println("--- CreateOrderUseCase ---")

// create order
order, err := entity.NewOrderEntity()
if err != nil {
return err
}

for _, item := range input.Items {
// TODO: find product in the repository database here
fakeProductName := "Product " + item.ProductId
fakeProductPrice := 10.50

// create fake order item
i := entity.NewOrderItemEntity(fakeProductName, fakeProductPrice, item.Qtd)

// add items to order
order.AddItem(i)
}

// TODO: save the order in the repository database here

var eventItems []event.OrderItem
for _, item := range order.GetItems() {
eventItems = append(eventItems, event.OrderItem{
ProductName: item.GetProductName(),
TotalPrice: item.GetTotalPrice(),
Quantity: item.GetQuantity(),
})
}

// publish event OrderCreatedEvent passing the order data
err = u.publisher.Publish(ctx, event.OrderCreatedEvent{
Id: order.GetID(),
TotalPrice: order.GetTotalPrice(),
Status: order.GetStatus(),
Items: eventItems,
})
if err != nil {
return err
}
return nil
}

컨트롤러 생성

내부/응용 프로그램/컨트롤러 폴더 에 컨트롤러 order_controller.go를 생성합니다.

package controller

import (
"eda-example/internal/application/dto"
"eda-example/internal/application/usecase"
"encoding/json"
"net/http"
)

type OrderController struct {
createOrderUserCase *usecase.CreateOrderUseCase
}

func NewOrderController(createOrderUserCase *usecase.CreateOrderUseCase) *OrderController {
return &OrderController{
createOrderUserCase: createOrderUserCase,
}
}

func (u *OrderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
var requestData dto.CreateOrderDTO
json.NewDecoder(r.Body).Decode(&requestData)
err := u.createOrderUserCase.Execute(r.Context(), requestData)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}

MemoryQueueAdapter 만들기

내부/ 인프라 /queue 폴더 에 memory_queue_adapter.go 생성

package queue

import (
"context"
"encoding/json"
"log"
"reflect"
)

type MemoryQueueAdapter struct {
}

func NewMemoryQueueAdapter() *MemoryQueueAdapter {
return &MemoryQueueAdapter{}
}

func (eb *MemoryQueueAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
eventType := reflect.TypeOf(eventPayload)
payloadJson, _ := json.Marshal(eventPayload)
log.Printf("** [Publish] %s: %v ---", eventType, payloadJson)
return nil
}

애플리케이션 진입점/시작 생성

cmd/api 폴더 에 main.go 생성

package main

import (
"eda-example/internal/application/controller"
"eda-example/internal/application/usecase"
"eda-example/internal/infra/queue"
"fmt"
"net/http"
)

func main() {
// initialize dependencies and implementations
queue := queue.NewMemoryQueueAdapter()
createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
orderController := controller.NewOrderController(createOrderUseCase)

// register routes
http.HandleFunc("POST /create-order", orderController.CreateOrder)

// start server
fmt.Println("Server is running on port 8080")
http.ListenAndServe(":8080", nil)
}

애플리케이션 실행

image

/create-order 엔드포인트 테스트

우편 배달부에서 요청 보내기:

image

게시된 로깅 이벤트를 확인하세요.

image

지금까지의 요약

주문을 생성하기 위해 엔드포인트를 생성합니다. 주문을 생성한 직후에는 주문이 생성되었음을 알리는 이벤트가 게시됩니다.

이제 이 이벤트를 수신 하고 여기에서 다른 명령(ProcessOrderPayment, StockMovement 및 SendOrderEmail)을 실행 해야 합니다.

리스너 명령 만들기

내부/애플리케이션/유즈케이스 폴더 에 process_order_pay_usecase.go 생성

package usecase

import (
"context"
"fmt"
"time"
"eda-example/internal/domain/entity"
"eda-example/internal/domain/event"
"eda-example/internal/domain/queue"
)

type ProcessOrderPaymentUseCase struct {
publisher queue.Publisher
}

func NewProcessOrderPaymentUseCase(publisher queue.Publisher) *ProcessOrderPaymentUseCase {
return &ProcessOrderPaymentUseCase{
publisher: publisher,
}
}

func (h *ProcessOrderPaymentUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
fmt.Println("--- ProcessOrderPaymentUseCase ---")

// TODO: find order by id in the repository database here
order, err := entity.RestoreOrderEntity(payload.Id, payload.Status)
if err != nil {
return err
}

for _, i := range payload.Items {
item := entity.NewOrderItemEntity(i.ProductName, i.TotalPrice/float64(i.Quantity), i.Quantity)
order.AddItem(item)
}

paymentValue := payload.TotalPrice
err = order.Pay(paymentValue)
if err != nil {
return err
}

fmt.Printf("Order Paid. Value: %f \n", payload.TotalPrice)
err = h.publisher.Publish(ctx, event.OrderPaidEvent{OrderId: payload.Id, PaidValue: paymentValue, PaymentDate: time.Now()})
if err != nil {
return err
}
return nil
}

내부/애플리케이션/유스케이스 폴더 에 stock_movement_usecase.go 생성

package usecase

import (
"context"
"eda-example/internal/domain/event"
"fmt"
)

type StockMovementUseCase struct {
}

func NewStockMovementUseCase() *StockMovementUseCase {
return &StockMovementUseCase{}
}

func (h *StockMovementUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
fmt.Println("--- StockMovementUseCase ---")
for _, item := range payload.Items {
fmt.Printf("Removing %d items of product %s from stock\n", item.Quantity, item.ProductName)
}
return nil
}

내부/애플리케이션/유즈케이스 폴더 에 send_orderemail_usecase.go 생성

package usecase

import (
"context"
"eda-example/internal/domain/event"
"fmt"
)

type SendOrderEmailUseCase struct {
}

func NewSendOrderEmailUseCase() *SendOrderEmailUseCase {
return &SendOrderEmailUseCase{}
}

func (h *SendOrderEmailUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
fmt.Println("--- SendOrderEmailUseCase ---")
fmt.Printf("--- MAIL Order Created: R$ %f \n", payload.TotalPrice)
return nil
}

OrderController에서 리스너 핸들러 생성

내부/응용 프로그램/컨트롤러 폴더 의 order_controller.go 에 ProcessOrderPayment , StockMovement 및 SendOrderEmail 메서드를 만듭니다 .

package controller

import (
"encoding/json"
"net/http"
"eda-example/internal/application/dto"
"eda-example/internal/application/usecase"
"eda-example/internal/domain/event"
)

type OrderController struct {
createOrderUserCase *usecase.CreateOrderUseCase
processOrderPaymentUseCase *usecase.ProcessOrderPaymentUseCase
stockMovementUseCase *usecase.StockMovementUseCase
sendOrderEmailUseCase *usecase.SendOrderEmailUseCase
}

func NewOrderController(createOrderUserCase *usecase.CreateOrderUseCase,
processOrderPaymentUseCase *usecase.ProcessOrderPaymentUseCase,
stockMovementUseCase *usecase.StockMovementUseCase,
sendOrderEmailUseCase *usecase.SendOrderEmailUseCase) *OrderController {
return &OrderController{
createOrderUserCase: createOrderUserCase,
processOrderPaymentUseCase: processOrderPaymentUseCase,
stockMovementUseCase: stockMovementUseCase,
sendOrderEmailUseCase: sendOrderEmailUseCase,
}
}

func (u *OrderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
var requestData dto.CreateOrderDTO
json.NewDecoder(r.Body).Decode(&requestData)
err := u.createOrderUserCase.Execute(r.Context(), requestData)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}

func (u *OrderController) ProcessOrderPayment(w http.ResponseWriter, r *http.Request) {
var body event.OrderCreatedEvent
json.NewDecoder(r.Body).Decode(&body)
err := u.processOrderPaymentUseCase.Execute(r.Context(), &body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}

func (u *OrderController) StockMovement(w http.ResponseWriter, r *http.Request) {
var body event.OrderCreatedEvent
json.NewDecoder(r.Body).Decode(&body)
err := u.stockMovementUseCase.Execute(r.Context(), &body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}

func (u *OrderController) SendOrderEmail(w http.ResponseWriter, r *http.Request) {
var body event.OrderCreatedEvent
json.NewDecoder(r.Body).Decode(&body)
err := u.sendOrderEmailUseCase.Execute(r.Context(), &body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}

리스너 구조체 생성

내부/ 인프라 /queue 폴더 에 listener.go 생성

package queue

import (
"net/http"
"reflect"
)

type Listener struct {
eventType reflect.Type
callback func(w http.ResponseWriter, r *http.Request)
}

QueueResponseWriter 구조체 생성

내부/ 인프라 /queue 폴더 에 response_writer.go 생성

package queue

import (
"fmt"
"net/http"
)

type QueueResponseWriter struct {
body []byte
statusCode int
header http.Header
}

func NewQueueResponseWriter() *QueueResponseWriter {
return &QueueResponseWriter{
header: http.Header{},
}
}

func (w *QueueResponseWriter) Header() http.Header {
return w.header
}

func (w *QueueResponseWriter) Write(b []byte) (int, error) {
w.body = b
return 0, nil
}

func (w *QueueResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
}

var okFn = func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func main() {
r := &http.Request{
Method: http.MethodPost,
}
w := NewQueueResponseWriter()
okFn(w, r)
fmt.Println(w.statusCode)
}

대기열 인터페이스 생성

내부/도메인/queue 폴더 에 queue.go 생성

package queue

import (
"context"
"net/http"
"reflect"
)

type Queue interface {
ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request))
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Publish(ctx context.Context, body interface{}) error
StartConsuming(ctx context.Context, queueName string) error
}

MemoryQueueAdapter에서 메서드 구현

내부/ 인프라 /queue 폴더 의 memory_queue_adapter.go 에 메소드 구현

package queue

import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"
"reflect"
)

type MemoryQueueAdapter struct {
listeners map[string][]Listener
}

func NewMemoryQueueAdapter() *MemoryQueueAdapter {
return &MemoryQueueAdapter{
listeners: make(map[string][]Listener),
}
}

func (eb *MemoryQueueAdapter) ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request)) {
eb.listeners[eventType.Name()] = append(eb.listeners[eventType.Name()], Listener{eventType, handler})
}

func (eb *MemoryQueueAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
eventType := reflect.TypeOf(eventPayload)
payloadJson, _ := json.Marshal(eventPayload)

log.Printf("--- Publish %s ---", eventType)

for _, listener := range eb.listeners[eventType.Name()] {
w := NewQueueResponseWriter()
body := bytes.NewBuffer(payloadJson)
r, err := http.NewRequestWithContext(ctx, http.MethodPost, eventType.Name(), body)
if err != nil {
return err
}

listener.callback(w, r)
if err != nil {
return err
}
}

return nil
}

func (eb *MemoryQueueAdapter) Connect(ctx context.Context) error {
log.Println("--- MemoryQueueAdapter connected ---")
return nil
}

func (eb *MemoryQueueAdapter) Disconnect(ctx context.Context) error {
log.Println("--- MemoryQueueAdapter disconnected ---")
return nil
}

func (eb *MemoryQueueAdapter) StartConsuming(ctx context.Context, queueName string) error {
log.Printf("--- MemoryQueueAdapter StartConsuming queue %s ---", queueName)
return nil
}

리스너 핸들러를 사용하여 이벤트 바인딩

cmd/api 폴더 의 main.go 파일 에 리스너를 등록합니다.

package main

import (
"context"
"eda-example/internal/application/controller"
"eda-example/internal/application/usecase"
"eda-example/internal/domain/event"
"eda-example/internal/infra/queue"
"fmt"
"log"
"net/http"
"reflect"
)

func main() {
ctx := context.Background()

// initialize queue
queue := queue.NewMemoryQueueAdapter()

// use cases
createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
processPaymentUseCase := usecase.NewProcessOrderPaymentUseCase(queue)
stockMovementUseCase := usecase.NewStockMovementUseCase()
sendOrderEmailUseCase := usecase.NewSendOrderEmailUseCase()

// controllers
orderController := controller.NewOrderController(createOrderUseCase, processPaymentUseCase, stockMovementUseCase, sendOrderEmailUseCase)

// register routes
http.HandleFunc("POST /create-order", orderController.CreateOrder)

// mapping listeners
var list map[reflect.Type][]func(w http.ResponseWriter, r *http.Request) = map[reflect.Type][]func(w http.ResponseWriter, r *http.Request){
reflect.TypeOf(event.OrderCreatedEvent{}): {
orderController.ProcessOrderPayment,
orderController.StockMovement,
orderController.SendOrderEmail,
},
}

// register listeners
for eventType, handlers := range list {
for _, handler := range handlers {
queue.ListenerRegister(eventType, handler)
}
}

// connect queue
err := queue.Connect(ctx)
if err != nil {
log.Fatalf("Error connect queue %s", err)
}
defer queue.Disconnect(ctx)

// start consuming queues
OrderCreatedEvent := reflect.TypeOf(event.OrderCreatedEvent{}).Name()

go func(ctx context.Context, queueName string) {
err = queue.StartConsuming(ctx, queueName)
if err != nil {
log.Fatalf("Error running consumer %s: %s", queueName, err)
}
}(ctx, OrderCreatedEvent)

// start server
fmt.Println("Server is running on port 8080")
http.ListenAndServe(":8080", nil)
}

이벤트 소비

애플리케이션을 다시 시작하세요.

image

엔드포인트 POST /create-order를 다시 호출하고 콘솔 로그를 확인하세요.

image

추가(RabbitMQ로 구현)

Rabbitmq lib를 설치합니다:

go get github.com/rabbitmq/amqp091-go

내부/ 인프라 /queue 폴더 에 Rabbitmq_adapter.go를 생성합니다 .

package queue

import (
"bytes"
"context"
"encoding/json"
"errors"
"log"
"net/http"
"reflect"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

type RabbitMQAdapter struct {
uri string
conn *amqp.Connection
listeners map[string][]Listener
}

type QueueMessage struct {
Body []byte
}

func NewRabbitMQAdapter(uri string) *RabbitMQAdapter {
return &RabbitMQAdapter{
uri: uri,
listeners: make(map[string][]Listener),
}
}

func (r *RabbitMQAdapter) Connect(ctx context.Context) error {
conn, err := amqp.Dial(r.uri)
if err != nil {
return err
}
r.conn = conn
return nil
}

func (r *RabbitMQAdapter) Disconnect(ctx context.Context) error {
return r.conn.Close()
}

func (r *RabbitMQAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
eventName := reflect.TypeOf(eventPayload).Name()

ch, err := r.conn.Channel()
if err != nil {
return err
}
defer ch.Close()

q, err := ch.QueueDeclare(
eventName, // queue name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

eventJson, err := json.Marshal(eventPayload)
if err != nil {
return errors.New("error converting struct to json")
}

err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(eventJson),
})
if err != nil {
return err
}
log.Printf(" [x] Sent to queue %s: %s\n", eventName, eventJson)
return nil
}

func (r *RabbitMQAdapter) StartConsuming(ctx context.Context, queueName string) error {
ch, err := r.conn.Channel()
if err != nil {
return err
}
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}

msgs, err := ch.ConsumeWithContext(
ctx,
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}

go func() {
for d := range msgs {
log.Printf("Received a message on queue %s: %s", queueName, d.Body)
hasError := false
for _, listener := range r.listeners[queueName] {
w := NewQueueResponseWriter()
body := bytes.NewBuffer(d.Body)
r, err := http.NewRequestWithContext(ctx, http.MethodPost, queueName, body)
if err != nil {
log.Printf("Error processing message: %s", err)
hasError = true
break
}

listener.callback(w, r)
if w.statusCode >= 400 {
log.Printf("Error processing message: %s", string(w.body))
hasError = true
break
}
}

if !hasError {
d.Ack(false)
}
}
}()

var forever chan struct{}
log.Printf(" [*] Waiting for messages on queue %s. To exit press CTRL+C", queueName)
<-forever
return nil
}

func (r *RabbitMQAdapter) ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request)) {
r.listeners[eventType.Name()] = append(r.listeners[eventType.Name()], Listener{eventType, handler})
}

Docker를 사용하여 RabbitMQ 서버 가동

루트 폴더에 docker-compose.yml 파일 생성

services:
rabbitmq:
image: "rabbitmq:3.8-management-alpine"
hostname: rabbitmq
ports:
- "15672:15672"
- "5672:5672"
volumes:
- "rabbitmq_data:/var/lib/rabbitmq/mnesia"
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest

volumes:
rabbitmq_data:

RabbitMQ 서버 실행

docker compose up -d

image

RabbitMQAdapter를 사용하도록 main.go 파일을 변경하세요 .

package main

import (
"context"
"eda-example/internal/application/controller"
"eda-example/internal/application/usecase"
"eda-example/internal/domain/event"
"eda-example/internal/infra/queue"
"fmt"
"log"
"net/http"
"reflect"
)

func main() {
ctx := context.Background()

// initialize queue
queue := queue.NewRabbitMQAdapter("amqp://guest:guest@localhost:5672/")

// use cases
createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
processPaymentUseCase := usecase.NewProcessOrderPaymentUseCase(queue)
stockMovementUseCase := usecase.NewStockMovementUseCase()
sendOrderEmailUseCase := usecase.NewSendOrderEmailUseCase()

// controllers
orderController := controller.NewOrderController(createOrderUseCase, processPaymentUseCase, stockMovementUseCase, sendOrderEmailUseCase)

// register routes
http.HandleFunc("POST /create-order", orderController.CreateOrder)

// mapping listeners
var list map[reflect.Type][]func(w http.ResponseWriter, r *http.Request) = map[reflect.Type][]func(w http.ResponseWriter, r *http.Request){
reflect.TypeOf(event.OrderCreatedEvent{}): {
orderController.ProcessOrderPayment,
orderController.StockMovement,
orderController.SendOrderEmail,
},
}

// register listeners
for eventType, handlers := range list {
for _, handler := range handlers {
queue.ListenerRegister(eventType, handler)
}
}

// connect queue
err := queue.Connect(ctx)
if err != nil {
log.Fatalf("Error connect queue %s", err)
}
defer queue.Disconnect(ctx)

// start consuming queues
OrderCreatedEvent := reflect.TypeOf(event.OrderCreatedEvent{}).Name()

go func(ctx context.Context, queueName string) {
err = queue.StartConsuming(ctx, queueName)
if err != nil {
log.Fatalf("Error running consumer %s: %s", queueName, err)
}
}(ctx, OrderCreatedEvent)

// start server
fmt.Println("Server is running on port 8080")
http.ListenAndServe(":8080", nil)
}

서버를 실행하고 주문 생성 엔드포인트를 호출합니다.

image

RabbitMQ 대시보드에서 생성 및 사용된 대기열을 확인하세요.

image

전체 코드: https://github.com/hanhyeonkyu/event-driven-golang