Skip to content

Commit

Permalink
Merge pull request #240 from memphisdev/bugfix-RND-412-permissions-vi…
Browse files Browse the repository at this point in the history
…olation-for-subscription-to-memphis-schema-updates-on-authorized-pattern-when-creating-default-station

fix call for await _scemaUpdatesListener
  • Loading branch information
shay23b authored Jan 3, 2024
2 parents 45139f2 + 720131f commit 00f9bc0
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions src/memphis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -864,16 +864,16 @@ class Memphis {
'send_notification',
createRes.send_notification
);
await this._scemaUpdatesListener(stationName, createRes.schema_update);
var partitions: number[]
if (createRes?.partitions_update === undefined || createRes?.partitions_update === null || createRes?.partitions_update?.partitions_list === null) {
partitions = [];
} else {
partitions = createRes.partitions_update.partitions_list;
}
this.stationPartitions.set(internal_station, partitions);

const producer = new Producer(this, producerName, stationName, realName, partitions);
await this._scemaUpdatesListener(stationName, createRes.schema_update);
this.setCachedProducer(producer);

return producer;
Expand Down Expand Up @@ -990,7 +990,6 @@ class Memphis {
let partitions = []
try {
createRes = this.JSONC.decode(createRes.data);
await this._scemaUpdatesListener(stationName, createRes.schema_update);
if (createRes.error != '') {
throw MemphisError(new Error(createRes.error));
}
Expand All @@ -1006,7 +1005,7 @@ class Memphis {
}
}
this.stationPartitions.set(internal_station, partitions);

// the least possible value for batchMaxTimeToWaitMs is 1000 (1 second)
batchMaxTimeToWaitMs = batchMaxTimeToWaitMs < 1000 ? 1000 : batchMaxTimeToWaitMs;
const consumer = new Consumer(
Expand All @@ -1025,7 +1024,8 @@ class Memphis {
partitions,
consumerPartitionKey,
consumerPartitionNumber
);
);
await this._scemaUpdatesListener(stationName, createRes.schema_update);
this.setCachedConsumer(consumer);

return consumer;
Expand Down

0 comments on commit 00f9bc0

Please sign in to comment.