Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dotnetcore/CAP
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Oct 28, 2024
2 parents c4e8872 + 1cafc93 commit 83b4217
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 79 deletions.
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
var connection = new MySqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout AND (StatusName='{StatusName.Succeeded}' OR StatusName='{StatusName.Failed}') limit @batchCount;",
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout AND StatusName IN ('{StatusName.Succeeded}','{StatusName.Failed}') LIMIT @batchCount;",
null,
new MySqlParameter("@timeout", timeout), new MySqlParameter("@batchCount", batchCount))
.ConfigureAwait(false);
Expand Down Expand Up @@ -318,7 +318,7 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var sql =
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " +
$"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
$"AND `Version`=@Version AND `Added`<@Added AND `StatusName` IN ('{StatusName.Failed}','{StatusName.Scheduled}') LIMIT 200;";

object[] sqlParams =
{
Expand Down
38 changes: 19 additions & 19 deletions src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ public async Task<PagedQueryResult<MessageDto>> GetMessagesAsync(MessageQueryDto
{
var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName;
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and StatusName=@StatusName";
if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " AND StatusName=@StatusName";

if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Name=@Name";
if (!string.IsNullOrEmpty(queryDto.Name)) where += " AND Name=@Name";

if (!string.IsNullOrEmpty(queryDto.Group)) where += " and `Group`=@Group";
if (!string.IsNullOrEmpty(queryDto.Group)) where += " AND `Group`=@Group";

if (!string.IsNullOrEmpty(queryDto.Content)) where += " and Content like CONCAT('%',@Content,'%')";
if (!string.IsNullOrEmpty(queryDto.Content)) where += " AND Content LIKE CONCAT('%',@Content,'%')";

var sqlQuery =
$"select * from `{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
$"SELECT * FROM `{tableName}` WHERE 1=1 {where} ORDER BY Added DESC LIMIT @Limit OFFSET @Offset";

object[] sqlParams =
{
Expand Down Expand Up @@ -177,10 +177,10 @@ public ValueTask<int> ReceivedSucceededCount()

private async ValueTask<int> GetNumberOfMessage(string tableName, string statusName)
{
var sqlQuery = $"select count(Id) from `{tableName}` where StatusName = @state";
var sqlQuery = $"SELECT COUNT(Id) FROM `{tableName}` WHERE StatusName = @State";
var connection = new MySqlConnection(_options.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteScalarAsync<int>(sqlQuery, new MySqlParameter("@state", statusName))
return await connection.ExecuteScalarAsync<int>(sqlQuery, new MySqlParameter("@State", statusName))
.ConfigureAwait(false);
}

Expand All @@ -205,22 +205,22 @@ private async Task<Dictionary<DateTime, int>> GetTimelineStats(
IDictionary<string, DateTime> keyMaps)
{
var sqlQuery = $@"
SELECT aggr.*
SELECT Aggr.*
FROM (
SELECT date_format(`Added`, '%Y-%m-%d-%H') AS `Key`,
count(id) `Count`
SELECT DATE_FORMAT(`Added`, '%Y-%m-%d-%H') AS `Key`,
COUNT(`Id`) AS `Count`
FROM `{tableName}`
WHERE StatusName = @statusName
GROUP BY date_format(`Added`, '%Y-%m-%d-%H')
) aggr
WHERE `Key` >= @minKey
AND `Key` <= @maxKey;";
WHERE `StatusName` = @StatusName
GROUP BY DATE_FORMAT(`Added`, '%Y-%m-%d-%H')
) AS Aggr
WHERE `Key` >= @MinKey
AND `Key` <= @MaxKey;";

object[] sqlParams =
{
new MySqlParameter("@statusName", statusName),
new MySqlParameter("@minKey", keyMaps.Keys.Min()),
new MySqlParameter("@maxKey", keyMaps.Keys.Max())
new MySqlParameter("@StatusName", statusName),
new MySqlParameter("@MinKey", keyMaps.Keys.Min()),
new MySqlParameter("@MaxKey", keyMaps.Keys.Max())
};

Dictionary<string, int> valuesMap;
Expand Down Expand Up @@ -258,7 +258,7 @@ GROUP BY date_format(`Added`, '%Y-%m-%d-%H')

private async Task<MediumMessage?> GetMessageAsync(string tableName, long id)
{
var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};";
var sql = $"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};";

var connection = new MySqlConnection(_options.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
Expand Down
7 changes: 4 additions & 3 deletions src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ protected virtual string CreateDbTablesScript()
`ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(50) NOT NULL,
PRIMARY KEY (`Id`),
INDEX `IX_ExpiresAt`(`ExpiresAt`)
INDEX `IX_Version_ExpiresAt_StatusName` (`Version`, `ExpiresAt`, `StatusName`),
INDEX `IX_ExpiresAt_StatusName` (`ExpiresAt`, `StatusName`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS `{GetPublishedTableName()}` (
Expand All @@ -111,9 +112,9 @@ PRIMARY KEY (`Id`),
`ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(40) NOT NULL,
PRIMARY KEY (`Id`),
INDEX `IX_ExpiresAt`(`ExpiresAt`)
INDEX `IX_Version_ExpiresAt_StatusName` (`Version`, `ExpiresAt`, `StatusName`),
INDEX `IX_ExpiresAt_StatusName` (`ExpiresAt`, `StatusName`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
";
if (_capOptions.Value.UseStorageLock)
batchSql += $@"
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
$"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND (\"StatusName\"='{StatusName.Succeeded}' OR \"StatusName\"='{StatusName.Failed}') LIMIT @batchCount);",
$"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"StatusName\" IN ('{StatusName.Succeeded}','{StatusName.Failed}') LIMIT @batchCount);",
null,
new NpgsqlParameter("@timeout", timeout), new NpgsqlParameter("@batchCount", batchCount))
.ConfigureAwait(false);
Expand Down Expand Up @@ -318,7 +318,7 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);
var sql =
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " +
$"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
$"AND \"Version\"=@Version AND \"Added\"<@Added AND \"StatusName\" IN ('{StatusName.Failed}','{StatusName.Scheduled}') LIMIT 200;";

object[] sqlParams =
{
Expand Down
39 changes: 19 additions & 20 deletions src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ public async Task<PagedQueryResult<MessageDto>> GetMessagesAsync(MessageQueryDto
var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName;
var where = string.Empty;

if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(\"StatusName\") = Lower(@StatusName)";
if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " AND Lower(\"StatusName\") = Lower(@StatusName)";

if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Lower(\"Name\") = Lower(@Name)";
if (!string.IsNullOrEmpty(queryDto.Name)) where += " AND Lower(\"Name\") = Lower(@Name)";

if (!string.IsNullOrEmpty(queryDto.Group)) where += " and Lower(\"Group\") = Lower(@Group)";
if (!string.IsNullOrEmpty(queryDto.Group)) where += " AND Lower(\"Group\") = Lower(@Group)";

if (!string.IsNullOrEmpty(queryDto.Content)) where += " and \"Content\" ILike @Content";
if (!string.IsNullOrEmpty(queryDto.Content)) where += " AND \"Content\" ILike @Content";

var sqlQuery =
$"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";
$"SELECT * FROM {tableName} WHERE 1=1 {where} ORDER BY \"Added\" DESC OFFSET @Offset LIMIT @Limit";

var connection = _options.CreateConnection();
await using var _ = connection.ConfigureAwait(false);

var count = await connection.ExecuteScalarAsync<int>($"select count(1) from {tableName} where 1=1 {where}",
var count = await connection.ExecuteScalarAsync<int>($"SELECT COUNT(1) FROM {tableName} WHERE 1=1 {where}",
new NpgsqlParameter("@StatusName", queryDto.StatusName ?? string.Empty),
new NpgsqlParameter("@Group", queryDto.Group ?? string.Empty),
new NpgsqlParameter("@Name", queryDto.Name ?? string.Empty),
Expand Down Expand Up @@ -180,11 +180,11 @@ public async Task<IDictionary<DateTime, int>> HourlyFailedJobs(MessageType type)
private async ValueTask<int> GetNumberOfMessage(string tableName, string statusName)
{
var sqlQuery =
$"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)";
$"SELECT COUNT(\"Id\") FROM {tableName} WHERE Lower(\"StatusName\") = Lower(@State)";

var connection = _options.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteScalarAsync<int>(sqlQuery, new NpgsqlParameter("@state", statusName))
return await connection.ExecuteScalarAsync<int>(sqlQuery, new NpgsqlParameter("@State", statusName))
.ConfigureAwait(false);
}

Expand All @@ -210,20 +210,20 @@ private async Task<Dictionary<DateTime, int>> GetTimelineStats(
{
var sqlQuery =
$@"
with aggr as (
select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"",
count(""Id"") as ""Count""
from {tableName}
where ""StatusName"" = @statusName
group by to_char(""Added"", 'yyyy-MM-dd-HH')
WITH Aggr AS (
SELECT to_char(""Added"",'yyyy-MM-dd-HH') AS ""Key"",
COUNT(""Id"") AS ""Count""
FROM {tableName}
WHERE ""StatusName"" = @StatusName
GROUP BY to_char(""Added"", 'yyyy-MM-dd-HH')
)
select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxKey;";
SELECT ""Key"",""Count"" from Aggr WHERE ""Key"" >= @MinKey AND ""Key"" <= @MaxKey;";

object[] sqlParams =
{
new NpgsqlParameter("@statusName", statusName),
new NpgsqlParameter("@minKey", keyMaps.Keys.Min()),
new NpgsqlParameter("@maxKey", keyMaps.Keys.Max())
new NpgsqlParameter("@StatusName", statusName),
new NpgsqlParameter("@MinKey", keyMaps.Keys.Min()),
new NpgsqlParameter("@MaxKey", keyMaps.Keys.Max())
};

Dictionary<string, int> valuesMap;
Expand All @@ -245,8 +245,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH')

foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key))
valuesMap.Add(key, 0);
valuesMap.TryAdd(key, 0);
}

var result = new Dictionary<DateTime, int>();
Expand Down
19 changes: 19 additions & 0 deletions src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ protected virtual string CreateDbTablesScript(string schema)
""StatusName"" VARCHAR(50) NOT NULL
);
CREATE INDEX IF NOT EXISTS ""idx_received_ExpiresAt_StatusName"" ON {GetReceivedTableName()} (""ExpiresAt"",""StatusName"");
CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}(
""Id"" BIGINT PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL,
Expand All @@ -86,6 +88,23 @@ protected virtual string CreateDbTablesScript(string schema)
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);
CREATE INDEX IF NOT EXISTS ""idx_published_ExpiresAt_StatusName"" ON {GetPublishedTableName()}(""ExpiresAt"",""StatusName"");
DO $$
DECLARE
major_version INT;
BEGIN
SELECT split_part(current_setting('server_version'), '.', 1)::int INTO major_version;
IF major_version >= 11 THEN
CREATE INDEX IF NOT EXISTS ""idx_received_Version_ExpiresAt_StatusName"" ON {GetReceivedTableName()} (""Version"",""ExpiresAt"",""StatusName"") INCLUDE (""Id"", ""Content"", ""Retries"", ""Added"");
CREATE INDEX IF NOT EXISTS ""idx_published_Version_ExpiresAt_StatusName"" ON {GetPublishedTableName()} (""Version"",""ExpiresAt"",""StatusName"") INCLUDE (""Id"", ""Content"", ""Retries"", ""Added"");
ELSE
CREATE INDEX IF NOT EXISTS ""idx_received_Version_ExpiresAt_StatusName"" ON {GetReceivedTableName()} (""Version"",""ExpiresAt"",""StatusName"");
CREATE INDEX IF NOT EXISTS ""idx_published_Version_ExpiresAt_StatusName"" ON {GetPublishedTableName()} (""Version"",""ExpiresAt"",""StatusName"");
END IF;
END $$;
";
if (_capOptions.Value.UseStorageLock)
batchSql += $@"
Expand Down
16 changes: 10 additions & 6 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
var connection = new SqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
$"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout AND (StatusName='{StatusName.Succeeded}' OR StatusName='{StatusName.Failed}');",
$"DELETE TOP (@batchCount) FROM {table} WITH (READPAST) WHERE ExpiresAt < @timeout AND StatusName IN('{StatusName.Succeeded}','{StatusName.Failed}');",
null,
new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false);
}
Expand All @@ -226,9 +226,12 @@ public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan
public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
CancellationToken token = default)
{
var sql =
$"SELECT Id,Content,Retries,Added,ExpiresAt FROM {_pubName} WITH (UPDLOCK,READPAST) WHERE Version=@Version " +
$"AND ((ExpiresAt< @TwoMinutesLater AND StatusName = '{StatusName.Delayed}') OR (ExpiresAt< @OneMinutesAgo AND StatusName = '{StatusName.Queued}'))";
var sql = $@"
SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
WHERE Version = @Version AND StatusName = '{StatusName.Delayed}' AND ExpiresAt < @TwoMinutesLater
UNION ALL
SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
WHERE Version = @Version AND StatusName = '{StatusName.Queued}' AND ExpiresAt < @OneMinutesAgo;";

object[] sqlParams =
{
Expand Down Expand Up @@ -310,9 +313,10 @@ private async Task StoreReceivedMessage(object[] sqlParams)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds)
{
var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds);

var sql =
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " +
$"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (READPAST) " +
$"WHERE Retries < @Retries AND Version = @Version AND Added < @Added AND StatusName IN ('{StatusName.Failed}', '{StatusName.Scheduled}');";

object[] sqlParams =
{
Expand Down
Loading

0 comments on commit 83b4217

Please sign in to comment.