Skip to content

Commit

Permalink
Add rabbitmq source
Browse files Browse the repository at this point in the history
  • Loading branch information
paulkr committed Jun 27, 2023
1 parent 160a8da commit 41e4ce1
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 0 deletions.
12 changes: 12 additions & 0 deletions catalog/sources/rabbitmq/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# 📣 Change Log
All notable changes to the `RabbitMQ` Connection will be documented in this file.

The format followed is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/).

---

## [1.0.0] - 2022-08-02

⚡️ Initial Version

---
99 changes: 99 additions & 0 deletions catalog/sources/rabbitmq/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"title": "RabbitMQ",
"description": "An open-source message broker that implements the Advanced Message Queuing Protocol (AMQP).",
"apiVersion": "2023-02-07",
"type": "rabbitmq",
"category": "broker",
"image": "https://assets.buildable.dev/catalog/node-templates/rabbitmq.svg",
"tags": ["queue", "message broker"],
"authentication": [
{
"name": "RABBITMQ_CONNECTION_URI",
"label": "Enter your RabbitMQ connection URI",
"placeholder": "amqps://username:password@host/ytbqyshy"
},
{
"name": "RABBITMQ_QUEUES",
"label": "Enter your comma separated RabbitMQ queues",
"placeholder": "payments_queue, customers_queue"
}
],
"eventSchema": {},
"settings": {
"autoSubscribeAllEvents": true,
"showEvents": true,
"secureAuthFields": true
},
"paths": {
"id": null,
"event": "_.body.event",
"payload": "_.body",
"secret": null,
"signature": null
},
"events": [
{
"name": "connection.established",
"description": "Fired when a connection is established to RabbitMQ.",
"group": "connection"
},
{
"name": "connection.failed",
"description": "Fired when a connection to RabbitMQ fails.",
"group": "connection"
},
{
"name": "connection.disconnected",
"description": "Fired when a connection to RabbitMQ is disconnected.",
"group": "connection"
},
{
"name": "channel.connection.established",
"description": "Fired when a connection is established to the channel.",
"group": "channel"
},
{
"name": "channel.connection.failed",
"description": "Fired when a connection to the channel fails.",
"group": "channel"
},
{
"name": "consumer.connection.established",
"description": "Fired when a connection is established to the consumer.",
"group": "consumer"
},
{
"name": "consumer.connection.failed",
"description": "Fired when a connection to the consumer fails.",
"group": "consumer"
},
{
"name": "queue.connection.established",
"description": "Fired when a connection is established to the queue.",
"group": "queue"
},
{
"name": "queue.connection.failed",
"description": "Fired when a connection to the queue fails.",
"group": "queue"
},
{
"name": "heartbeat.received",
"description": "Fired when a message is received.",
"group": "message"
},
{
"name": "message.received",
"description": "Fired when a message is received.",
"group": "message"
}
],
"connectionTypes": ["source"],
"testConnection": true,
"classifications": {
"dataIngestion": {
"streamTypes": ["manual"],
"extractionTypes": []
}
}
}
11 changes: 11 additions & 0 deletions catalog/sources/rabbitmq/docs/connect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Connect

Receive events from listening to RabbitMQ queues.

### Securely Encrypted

Rest assured, your credentials are securely encrypted to keep your information safe.

### Need Help?

Start a conversation in our [Discord Server](https://discord.com/invite/47AJ42Wzys) or send an email to [[email protected]](mailto:https://discord.com/invite/47AJ42Wzys). Our Data Engineers are available 24/7 to help if you ever get stuck.
3 changes: 3 additions & 0 deletions catalog/sources/rabbitmq/docs/setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## RabbitMQ Source Setup

Check out our [quick start guide](https://docs.buildable.dev/) for more information.
80 changes: 80 additions & 0 deletions catalog/sources/rabbitmq/rabbitmq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import * as amqp from 'amqplib';
import { TestConnection } from "../../../types/sourceClassDefinition";

export default class RabbitMQ {
RABBITMQ_CONNECTION_URI: string;

RABBITMQ_QUEUES: string;

constructor({
RABBITMQ_CONNECTION_URI,
RABBITMQ_QUEUES,
}: {
RABBITMQ_CONNECTION_URI: string;
RABBITMQ_QUEUES: string;
}) {
this.RABBITMQ_CONNECTION_URI = RABBITMQ_CONNECTION_URI;
this.RABBITMQ_QUEUES = RABBITMQ_QUEUES;
}

async init({ webhookUrl, events }) {
return {
webhookData: {},
events,
};
}

async verifyWebhookSignature({ request, signature, secret }) {
// Validation falls on the user to implement
return true;
}

async subscribe({ webhookId, events }) {
return {
webhook: {},
events: [],
};
}

async unsubscribe({ webhookId, events }) {
return {
events: [],
webhook: {},
};
}

async getWebhooks() {
return [];
}

async getSubscribedEvents({ webhookId }) {
return [];
}

async deleteWebhookEndpoint({ webhookId }) {
return true;
}

async testConnection(): Promise<TestConnection> {
let connection;

try {
connection = await amqp.connect(this.RABBITMQ_CONNECTION_URI);

const channel = await connection.createChannel();

await connection.close();

return {
success: true,
message: "Connection tested successfully!",
};
} catch (error) {
if (connection) await connection.close();

throw new Error(
`Unable to connect to RabbitMQ: ${error.message}`,
);
}
}
}

0 comments on commit 41e4ce1

Please sign in to comment.