diff --git a/dc/dc.go b/dc/dc.go index ebb72d0..ffabb0c 100644 --- a/dc/dc.go +++ b/dc/dc.go @@ -12,14 +12,14 @@ import ( type Env struct { ms.Env - PROVIDER string - RABBITMQ_URI string - RABBITMQ_EXCHANGE string `default:"ingress"` - RABBITMQ_CLIENT string + PROVIDER string + MQ_URI string + MQ_EXCHANGE string `default:"ingress"` + MQ_CLIENT string } func PubFromEnv(e Env) (chan<- dto.RawAny, error) { - return Pub(e.RABBITMQ_URI, e.RABBITMQ_EXCHANGE, e.RABBITMQ_CLIENT) + return Pub(e.MQ_URI, e.MQ_EXCHANGE, e.MQ_CLIENT) } func Pub(uri string, client string, exchange string) (chan<- dto.RawAny, error) { diff --git a/mq/mq.go b/mq/mq.go index f23200e..47848ad 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -51,6 +51,9 @@ func Connect(uri string, client string) (R, error) { return r, err } + prefetch := 3 + ch.Qos(prefetch, 0, true) + r.Ch = ch r.Con = con diff --git a/tr/tr.go b/tr/tr.go index ace22a3..bc75730 100644 --- a/tr/tr.go +++ b/tr/tr.go @@ -22,12 +22,12 @@ import ( type Env struct { ms.Env - RABBITMQ_URI string - RABBITMQ_EXCHANGE string `default:"routed"` - RABBITMQ_CLIENT string - RABBITMQ_QUEUE string - RABBITMQ_KEY string - MONGO_URI string + MQ_URI string + MQ_EXCHANGE string `default:"routed"` + MQ_CLIENT string + MQ_QUEUE string + MQ_KEY string + MONGO_URI string } func getMongo[Raw any](uri string, m dto.Notification) (*dto.Raw[Raw], error) { @@ -67,7 +67,7 @@ func msgReject(d *amqp091.Delivery) { // Default configuration for transformers with one queue func ListenFromEnv[Raw any](e Env, handler func(*dto.Raw[Raw]) error) error { - return Listen(e.RABBITMQ_URI, e.RABBITMQ_CLIENT, e.RABBITMQ_EXCHANGE, e.RABBITMQ_QUEUE, e.RABBITMQ_KEY, e.MONGO_URI, handler) + return Listen(e.MQ_URI, e.MQ_CLIENT, e.MQ_EXCHANGE, e.MQ_QUEUE, e.MQ_KEY, e.MONGO_URI, handler) } // Configurable listen for transformers with multiple queues