Skip to content

Commit

Permalink
Change fix for avoiding event loss to opt-in
Browse files Browse the repository at this point in the history
  • Loading branch information
Leh2 authored and oskardudycz committed Oct 24, 2021
1 parent 22f6cea commit 68da2d1
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/Marten/Events/AppendEventFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public override void Write(DdlRules rules, StringWriter writer)
body = bodies[index];
insert into {databaseSchema}.mt_events
(seq_id, id, stream_id, version, data, type, tenant_id, {DocumentMapping.DotNetTypeColumn}, tx_id)
(seq_id, id, stream_id, version, data, type, tenant_id, {DocumentMapping.DotNetTypeColumn}{(_events.Options.UseTransactionIdFixToAvoidEventLossInProjectionDaemon ? ", tx_id" : "")})
values
(seq, event_id, stream, event_version, body, event_type, tenantid, dotnet_types[index], txid_current());
(seq, event_id, stream, event_version, body, event_type, tenantid, dotnet_types[index]{(_events.Options.UseTransactionIdFixToAvoidEventLossInProjectionDaemon ? ", txid_current()" : "")});
index := index + 1;
end loop;
Expand Down
6 changes: 5 additions & 1 deletion src/Marten/Events/EventsTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public EventsTable(EventGraph events) : base(new DbObjectName(events.DatabaseSch
AddColumn("timestamp", "timestamptz", "default (now()) NOT NULL");
AddColumn<TenantIdColumn>();
AddColumn(new DotNetTypeColumn { Directive = "NULL" });
AddColumn("tx_id", "bigint", "default 0 NOT NULL");

if (events.Options.UseTransactionIdFixToAvoidEventLossInProjectionDaemon)
{
AddColumn("tx_id", "bigint", "default 0 NOT NULL");
}

if (events.TenancyStyle == TenancyStyle.Conjoined)
{
Expand Down
14 changes: 10 additions & 4 deletions src/Marten/Events/Projections/Async/Fetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ public Fetcher(IDocumentStore store, DaemonSettings settings, AsyncOptions optio

EventTypeNames = eventTypes.Select(x => store.Events.EventMappingFor(x).Alias).ToArray();

var transactionIdCondition = "";
if (store.Events.Options.UseTransactionIdFixToAvoidEventLossInProjectionDaemon)
{
transactionIdCondition = "and tx_id < txid_snapshot_xmin(txid_current_snapshot())";
}

_sql =
$@"
select seq_id from {_selector.Events.DatabaseSchemaName}.mt_events where seq_id > :last and seq_id <= :limit and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer and tx_id < txid_snapshot_xmin(txid_current_snapshot()) order by seq_id;
{_selector.ToSelectClause(null)} where seq_id > :last and seq_id <= :limit and type = ANY(:types) and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer and tx_id < txid_snapshot_xmin(txid_current_snapshot()) order by seq_id;
select min(seq_id) from {_selector.Events.DatabaseSchemaName}.mt_events where seq_id > :limit and type = ANY(:types) and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer and tx_id < txid_snapshot_xmin(txid_current_snapshot());
select max(seq_id) from {_selector.Events.DatabaseSchemaName}.mt_events where seq_id >= :limit and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer and tx_id < txid_snapshot_xmin(txid_current_snapshot());
select seq_id from {_selector.Events.DatabaseSchemaName}.mt_events where seq_id > :last and seq_id <= :limit and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer {transactionIdCondition} order by seq_id;
{_selector.ToSelectClause(null)} where seq_id > :last and seq_id <= :limit and type = ANY(:types) and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer {transactionIdCondition} order by seq_id;
select min(seq_id) from {_selector.Events.DatabaseSchemaName}.mt_events where seq_id > :limit and type = ANY(:types) and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer {transactionIdCondition};
select max(seq_id) from {_selector.Events.DatabaseSchemaName}.mt_events where seq_id >= :limit and extract(epoch from age(transaction_timestamp(), {_selector.Events.DatabaseSchemaName}.mt_events.timestamp)) >= :buffer {transactionIdCondition};
".Replace(" as d", "");
}

Expand Down
7 changes: 7 additions & 0 deletions src/Marten/StoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ public EnumStorage DuplicatedFieldEnumStorage
[Obsolete("Please use only for migration from Marten 2.*. It might be removed in the next major versions")]
public bool DuplicatedFieldUseTimestampWithoutTimeZoneForDateTime { get; set; } = true;

/// <summary>
/// Ensures the projection daemon runs without event loss.
/// Requires db migrations for existing databases.
/// See https://github.com/JasperFx/marten/pull/1880 for details
/// </summary>
public bool UseTransactionIdFixToAvoidEventLossInProjectionDaemon => false;

internal void CreatePatching()
{
if (PLV8Enabled)
Expand Down

0 comments on commit 68da2d1

Please sign in to comment.