Skip to content

Commit

Permalink
"removed build artifacts"
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Memphis committed Jan 16, 2024
2 parents 32514de + c9b6e69 commit 341320e
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 18 deletions.
25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ const consumer = await memphisConnection.consumer({
consumerGroup: '<group-name>', // defaults to the consumer name.
pullIntervalMs: 1000, // defaults to 1000
batchSize: 10, // defaults to 10
batchMaxTimeToWaitMs: 5000, // defaults to 5000
batchMaxTimeToWaitMs: 100, // defaults to 100
maxAckTimeMs: 30000, // defaults to 30000
maxMsgDeliveries: 2, // defaults to 2
startConsumeFromSequence: 1, // start consuming from a specific sequence. defaults to 1
Expand Down Expand Up @@ -835,7 +835,7 @@ async function consumerBatched(){
consumerName: "newConsumer",
pullIntervalMs: 10000,
batchSize: 100,
batchMaxTimeToWaitMs: 15000
batchMaxTimeToWaitMs: 100
});
}catch(exception){
// Handle exception
Expand All @@ -855,7 +855,7 @@ async function consumerMaxMessages(){
consumerName: "newConsumer",
pullIntervalMs: 10000,
batchSize: 100,
batchMaxTimeToWaitMs: 15000,
batchMaxTimeToWaitMs: 100,
maxMsgDeliveries: 2
});
}catch(exception){
Expand Down Expand Up @@ -903,7 +903,7 @@ const msgs = await memphis.fetchMessages({
consumerName: '<consumer-name>',
consumerGroup: '<group-name>', // defaults to the consumer name.
batchSize: 10, // defaults to 10
batchMaxTimeToWaitMs: 5000, // defaults to 5000
batchMaxTimeToWaitMs: 100, // defaults to 100
maxAckTimeMs: 30000, // defaults to 30000
maxMsgDeliveries: 2, // defaults to 2
startConsumeFromSequence: 1, // start consuming from a specific sequence. defaults to 1
Expand Down Expand Up @@ -971,6 +971,23 @@ Acknowledge a message indicates the Memphis server to not re-send the same messa
message.ack();
```

### Nacking a Message

Mark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.

```js
msg.nack();
```

### Sending a message to the dead-letter

Sending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups

```js
message.deadLetter("reason");
```

### Delay and resend the message after a given duration

Delay the message and tell the Memphis server to re-send the same message again to the same consumer group. The message will be redelivered only in case `Consumer.maxMsgDeliveries` is not reached yet.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "memphis-dev",
"version": "1.3.0",
"version": "1.3.1",
"description": "A powerful messaging platform for modern developers",
"exports": {
".": "./lib/index.js"
Expand Down
12 changes: 9 additions & 3 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Message } from './message';
import { MemphisError } from './utils'

const maxBatchSize = 5000
const DlsMessagePartitionNumber = -1;

export class Consumer {
private connection: Memphis;
Expand Down Expand Up @@ -62,7 +63,7 @@ export class Consumer {
this.internalConsumerGroupName = this.consumerGroup.replace(/\./g, '#');
this.pullIntervalMs = pullIntervalMs;
this.batchSize = batchSize;
this.batchMaxTimeToWaitMs = batchMaxTimeToWaitMs < 1000 ? 1000 : batchMaxTimeToWaitMs;
this.batchMaxTimeToWaitMs = batchMaxTimeToWaitMs < 100 ? 100 : batchMaxTimeToWaitMs;
this.maxAckTimeMs = maxAckTimeMs;
this.maxMsgDeliveries = maxMsgDeliveries;
this.eventEmitter = new events.EventEmitter();
Expand Down Expand Up @@ -138,21 +139,26 @@ export class Consumer {
}
let streamName = `${this.internalStationName}`;
let stationPartitions = this.connection.stationPartitions.get(this.internalStationName);
let messagePartitionNumber = 0;
if (stationPartitions != null && stationPartitions.length === 1) {
let partitionNumber = stationPartitions[0]
messagePartitionNumber = partitionNumber;
streamName = `${this.internalStationName}$${partitionNumber.toString()}`
} else if (stationPartitions != null && stationPartitions.length > 0) {
if (consumerPartitionNumber > 0 && consumerPartitionKey != null) {
throw MemphisErrors.GivenBothPartitionNumAndKey;
}
if (consumerPartitionKey != null) {
const partitionNumberKey = this.connection._getPartitionFromKey(consumerPartitionKey, this.internalStationName);
messagePartitionNumber = partitionNumberKey;
streamName = `${this.internalStationName}$${partitionNumberKey.toString()}`;
} else if (consumerPartitionNumber > 0) {
this.connection._validatePartitionNumber(consumerPartitionNumber, this.internalStationName)
messagePartitionNumber = consumerPartitionNumber;
streamName = `${this.internalStationName}$${consumerPartitionNumber.toString()}`
} else {
let partitionNumber = this.partitionsGenerator.Next();
messagePartitionNumber = partitionNumber;
streamName = `${this.internalStationName}$${partitionNumber.toString()}`;
}
}
Expand All @@ -174,7 +180,7 @@ export class Consumer {
{ batch: batchSize, expires: this.batchMaxTimeToWaitMs });

for await (const m of batch)
messages.push(new Message(m, this.connection, this.consumerGroup, this.internalStationName));
messages.push(new Message(m, this.connection, this.consumerGroup, this.internalStationName, messagePartitionNumber));

return messages;
} catch (ex) {
Expand All @@ -189,7 +195,7 @@ export class Consumer {
if (this.dlsCurrentIndex >= 10000) {
indexToInsert %= 10000;
}
this.dlsMessages[indexToInsert] = new Message(m, this.connection, this.consumerGroup, this.internalStationName);
this.dlsMessages[indexToInsert] = new Message(m, this.connection, this.consumerGroup, this.internalStationName, DlsMessagePartitionNumber);
this.dlsCurrentIndex++;
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/memphis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ class Memphis {
* @param {String} consumerGroup - consumer group name, defaults to the consumer name.
* @param {Number} pullIntervalMs - interval in miliseconds between pulls, default is 1000.
* @param {Number} batchSize - pull batch size.
* @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, defauls is 5000. The smallest possible value is 1000(1 second), and if the value is smaller than 1000, it will be set to 1000.
* @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, defauls is 100. The smallest possible value is 100, and if the value is smaller than 100, it will be set to 100.
* @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it untill reaches the maxMsgDeliveries value
* @param {Number} maxMsgDeliveries - max number of message deliveries, by default is 2
* @param {String} genUniqueSuffix - Deprecated: will be stopped to be supported after November 1'st, 2023. Indicates memphis to add a unique suffix to the desired producer name.
Expand All @@ -907,7 +907,7 @@ class Memphis {
consumerGroup = '',
pullIntervalMs = 1000,
batchSize = 10,
batchMaxTimeToWaitMs = 5000,
batchMaxTimeToWaitMs = 100,
maxAckTimeMs = 30000,
maxMsgDeliveries = 2,
genUniqueSuffix = false,
Expand Down Expand Up @@ -1003,8 +1003,8 @@ class Memphis {
}
this.stationPartitions.set(internal_station, partitions);

// the least possible value for batchMaxTimeToWaitMs is 1000 (1 second)
batchMaxTimeToWaitMs = batchMaxTimeToWaitMs < 1000 ? 1000 : batchMaxTimeToWaitMs;
// the least possible value for batchMaxTimeToWaitMs is 100
batchMaxTimeToWaitMs = batchMaxTimeToWaitMs < 100 ? 100 : batchMaxTimeToWaitMs;
const consumer = new Consumer(
this,
stationName,
Expand Down Expand Up @@ -1118,7 +1118,7 @@ class Memphis {
* @param {String} genUniqueSuffix - Deprecated: will be stopped to be supported after November 1'st, 2023. Indicates memphis to add a unique suffix to the desired consumer name.
* @param {Number} batchSize - pull batch size.
* @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it until reaches the maxMsgDeliveries value
* @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, default is 5000. The smallest possible value is 1000(1 second), and if the value is smaller than 1000, it will be set to 1000.
* @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, default is 100. The smallest possible value is 100, and if the value is smaller than 100, it will be set to 100.
* @param {Number} maxMsgDeliveries - max number of message deliveries, by default is 2
* @param {Number} startConsumeFromSequence - start consuming from a specific sequence. defaults to 1
* @param {Number} lastMessages - consume the last N messages, defaults to -1 (all messages in the station)
Expand All @@ -1132,7 +1132,7 @@ class Memphis {
genUniqueSuffix = false,
batchSize = 10,
maxAckTimeMs = 30000,
batchMaxTimeToWaitMs = 5000,
batchMaxTimeToWaitMs = 100,
maxMsgDeliveries = 2,
startConsumeFromSequence = 1,
lastMessages = -1,
Expand Down
49 changes: 47 additions & 2 deletions src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@ export class Message {
private stationName: string;
private internal_station: string;
private station: Station;
constructor(message: broker.JsMsg, connection: Memphis, cgName: string, internalStationName: string) {
private partition_number: number;

constructor(message: broker.JsMsg, connection: Memphis, cgName: string, internalStationName: string, partition_number: number) {
this.message = message;
this.connection = connection;
this.cgName = cgName;
this.internal_station = internalStationName;
this.station = new Station(connection, internalStationName);
this.partition_number = partition_number;
}

private _isInDls() {
return this.partition_number == -1;
}

/**
Expand All @@ -38,6 +45,44 @@ export class Message {
}
}

/**
* nack - not ack for a message, meaning that the message will be redelivered again to the same consumers group without waiting to its ack wait time.
*/
nack() {
if (this.message.nak)
this.message.nak();
}

/**
* deadLetter - Sending the message to the dead-letter station (DLS). the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
* The message will still be available to other consumer groups
* @param reason - the reason for the dead-lettering
* @returns void
*/
deadLetter(reason: string) {
if (this._isInDls())
return;
try {
if (this.message.term)
this.message.term();
else
return;

const data = {
station_name: this.internal_station,
error: reason,
partition: this.partition_number,
cg_name: this.cgName,
seq: this.message.seq
}
const requestPayload = this.connection.JSONC.encode(data);
this.connection.brokerManager.publish('$memphis_nacked_dls', requestPayload);
}
catch (ex) {
throw MemphisError(ex);
}
}

/**
* Returns the message payload.
*/
Expand Down Expand Up @@ -135,7 +180,7 @@ export class Message {
/**
* Returns time when the message was sent.
*/
getTimeSent(){
getTimeSent() {
const timestampNanos = this.message.info.timestampNanos;
let timestampMillis = timestampNanos / 1000000;
return new Date(timestampMillis);
Expand Down
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.19
1.1.20
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.0
1.3.1

0 comments on commit 341320e

Please sign in to comment.