diff --git a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index 35a7f91ef..19d63ae27 100644 --- a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -210,7 +210,7 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba var connection = new MySqlConnection(_options.Value.ConnectionString); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( - $@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout AND (StatusName='{StatusName.Succeeded}' OR StatusName='{StatusName.Failed}') limit @batchCount;", + $@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout AND StatusName IN ('{StatusName.Succeeded}','{StatusName.Failed}') LIMIT @batchCount;", null, new MySqlParameter("@timeout", timeout), new MySqlParameter("@batchCount", batchCount)) .ConfigureAwait(false); @@ -318,7 +318,7 @@ private async Task> GetMessagesOfNeedRetryAsync(strin var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var sql = $"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " + - $"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; + $"AND `Version`=@Version AND `Added`<@Added AND `StatusName` IN ('{StatusName.Failed}','{StatusName.Scheduled}') LIMIT 200;"; object[] sqlParams = { diff --git a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs index 9b415123b..3d6168bde 100644 --- a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs @@ -87,16 +87,16 @@ public async Task> GetMessagesAsync(MessageQueryDto { var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName; var where = string.Empty; - if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and StatusName=@StatusName"; + if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " AND StatusName=@StatusName"; - if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Name=@Name"; + if (!string.IsNullOrEmpty(queryDto.Name)) where += " AND Name=@Name"; - if (!string.IsNullOrEmpty(queryDto.Group)) where += " and `Group`=@Group"; + if (!string.IsNullOrEmpty(queryDto.Group)) where += " AND `Group`=@Group"; - if (!string.IsNullOrEmpty(queryDto.Content)) where += " and Content like CONCAT('%',@Content,'%')"; + if (!string.IsNullOrEmpty(queryDto.Content)) where += " AND Content LIKE CONCAT('%',@Content,'%')"; var sqlQuery = - $"select * from `{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset"; + $"SELECT * FROM `{tableName}` WHERE 1=1 {where} ORDER BY Added DESC LIMIT @Limit OFFSET @Offset"; object[] sqlParams = { @@ -177,10 +177,10 @@ public ValueTask ReceivedSucceededCount() private async ValueTask GetNumberOfMessage(string tableName, string statusName) { - var sqlQuery = $"select count(Id) from `{tableName}` where StatusName = @state"; + var sqlQuery = $"SELECT COUNT(Id) FROM `{tableName}` WHERE StatusName = @State"; var connection = new MySqlConnection(_options.ConnectionString); await using var _ = connection.ConfigureAwait(false); - return await connection.ExecuteScalarAsync(sqlQuery, new MySqlParameter("@state", statusName)) + return await connection.ExecuteScalarAsync(sqlQuery, new MySqlParameter("@State", statusName)) .ConfigureAwait(false); } @@ -205,22 +205,22 @@ private async Task> GetTimelineStats( IDictionary keyMaps) { var sqlQuery = $@" -SELECT aggr.* +SELECT Aggr.* FROM ( - SELECT date_format(`Added`, '%Y-%m-%d-%H') AS `Key`, - count(id) `Count` + SELECT DATE_FORMAT(`Added`, '%Y-%m-%d-%H') AS `Key`, + COUNT(`Id`) AS `Count` FROM `{tableName}` - WHERE StatusName = @statusName - GROUP BY date_format(`Added`, '%Y-%m-%d-%H') - ) aggr -WHERE `Key` >= @minKey - AND `Key` <= @maxKey;"; + WHERE `StatusName` = @StatusName + GROUP BY DATE_FORMAT(`Added`, '%Y-%m-%d-%H') + ) AS Aggr +WHERE `Key` >= @MinKey + AND `Key` <= @MaxKey;"; object[] sqlParams = { - new MySqlParameter("@statusName", statusName), - new MySqlParameter("@minKey", keyMaps.Keys.Min()), - new MySqlParameter("@maxKey", keyMaps.Keys.Max()) + new MySqlParameter("@StatusName", statusName), + new MySqlParameter("@MinKey", keyMaps.Keys.Min()), + new MySqlParameter("@MaxKey", keyMaps.Keys.Max()) }; Dictionary valuesMap; @@ -258,7 +258,7 @@ GROUP BY date_format(`Added`, '%Y-%m-%d-%H') private async Task GetMessageAsync(string tableName, long id) { - var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};"; + var sql = $"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};"; var connection = new MySqlConnection(_options.ConnectionString); await using var _ = connection.ConfigureAwait(false); diff --git a/src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs b/src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs index 84854d6c1..d376bf7fc 100644 --- a/src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs @@ -98,7 +98,8 @@ protected virtual string CreateDbTablesScript() `ExpiresAt` datetime DEFAULT NULL, `StatusName` varchar(50) NOT NULL, PRIMARY KEY (`Id`), - INDEX `IX_ExpiresAt`(`ExpiresAt`) + INDEX `IX_Version_ExpiresAt_StatusName` (`Version`, `ExpiresAt`, `StatusName`), + INDEX `IX_ExpiresAt_StatusName` (`ExpiresAt`, `StatusName`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE IF NOT EXISTS `{GetPublishedTableName()}` ( @@ -111,9 +112,9 @@ PRIMARY KEY (`Id`), `ExpiresAt` datetime DEFAULT NULL, `StatusName` varchar(40) NOT NULL, PRIMARY KEY (`Id`), - INDEX `IX_ExpiresAt`(`ExpiresAt`) + INDEX `IX_Version_ExpiresAt_StatusName` (`Version`, `ExpiresAt`, `StatusName`), + INDEX `IX_ExpiresAt_StatusName` (`ExpiresAt`, `StatusName`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - "; if (_capOptions.Value.UseStorageLock) batchSql += $@" diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index 8a62956c1..b157aebdd 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -208,7 +208,7 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( - $"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND (\"StatusName\"='{StatusName.Succeeded}' OR \"StatusName\"='{StatusName.Failed}') LIMIT @batchCount);", + $"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"StatusName\" IN ('{StatusName.Succeeded}','{StatusName.Failed}') LIMIT @batchCount);", null, new NpgsqlParameter("@timeout", timeout), new NpgsqlParameter("@batchCount", batchCount)) .ConfigureAwait(false); @@ -318,7 +318,7 @@ private async Task> GetMessagesOfNeedRetryAsync(strin var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); var sql = $"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " + - $"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; + $"AND \"Version\"=@Version AND \"Added\"<@Added AND \"StatusName\" IN ('{StatusName.Failed}','{StatusName.Scheduled}') LIMIT 200;"; object[] sqlParams = { diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index e7c994508..e84ab18a3 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -87,21 +87,21 @@ public async Task> GetMessagesAsync(MessageQueryDto var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName; var where = string.Empty; - if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and Lower(\"StatusName\") = Lower(@StatusName)"; + if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " AND Lower(\"StatusName\") = Lower(@StatusName)"; - if (!string.IsNullOrEmpty(queryDto.Name)) where += " and Lower(\"Name\") = Lower(@Name)"; + if (!string.IsNullOrEmpty(queryDto.Name)) where += " AND Lower(\"Name\") = Lower(@Name)"; - if (!string.IsNullOrEmpty(queryDto.Group)) where += " and Lower(\"Group\") = Lower(@Group)"; + if (!string.IsNullOrEmpty(queryDto.Group)) where += " AND Lower(\"Group\") = Lower(@Group)"; - if (!string.IsNullOrEmpty(queryDto.Content)) where += " and \"Content\" ILike @Content"; + if (!string.IsNullOrEmpty(queryDto.Content)) where += " AND \"Content\" ILike @Content"; var sqlQuery = - $"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; + $"SELECT * FROM {tableName} WHERE 1=1 {where} ORDER BY \"Added\" DESC OFFSET @Offset LIMIT @Limit"; var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); - var count = await connection.ExecuteScalarAsync($"select count(1) from {tableName} where 1=1 {where}", + var count = await connection.ExecuteScalarAsync($"SELECT COUNT(1) FROM {tableName} WHERE 1=1 {where}", new NpgsqlParameter("@StatusName", queryDto.StatusName ?? string.Empty), new NpgsqlParameter("@Group", queryDto.Group ?? string.Empty), new NpgsqlParameter("@Name", queryDto.Name ?? string.Empty), @@ -180,11 +180,11 @@ public async Task> HourlyFailedJobs(MessageType type) private async ValueTask GetNumberOfMessage(string tableName, string statusName) { var sqlQuery = - $"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)"; + $"SELECT COUNT(\"Id\") FROM {tableName} WHERE Lower(\"StatusName\") = Lower(@State)"; var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); - return await connection.ExecuteScalarAsync(sqlQuery, new NpgsqlParameter("@state", statusName)) + return await connection.ExecuteScalarAsync(sqlQuery, new NpgsqlParameter("@State", statusName)) .ConfigureAwait(false); } @@ -210,20 +210,20 @@ private async Task> GetTimelineStats( { var sqlQuery = $@" -with aggr as ( - select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"", - count(""Id"") as ""Count"" - from {tableName} - where ""StatusName"" = @statusName - group by to_char(""Added"", 'yyyy-MM-dd-HH') +WITH Aggr AS ( + SELECT to_char(""Added"",'yyyy-MM-dd-HH') AS ""Key"", + COUNT(""Id"") AS ""Count"" + FROM {tableName} + WHERE ""StatusName"" = @StatusName + GROUP BY to_char(""Added"", 'yyyy-MM-dd-HH') ) -select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxKey;"; +SELECT ""Key"",""Count"" from Aggr WHERE ""Key"" >= @MinKey AND ""Key"" <= @MaxKey;"; object[] sqlParams = { - new NpgsqlParameter("@statusName", statusName), - new NpgsqlParameter("@minKey", keyMaps.Keys.Min()), - new NpgsqlParameter("@maxKey", keyMaps.Keys.Max()) + new NpgsqlParameter("@StatusName", statusName), + new NpgsqlParameter("@MinKey", keyMaps.Keys.Min()), + new NpgsqlParameter("@MaxKey", keyMaps.Keys.Max()) }; Dictionary valuesMap; @@ -245,8 +245,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH') foreach (var key in keyMaps.Keys) { - if (!valuesMap.ContainsKey(key)) - valuesMap.Add(key, 0); + valuesMap.TryAdd(key, 0); } var result = new Dictionary(); diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs index 580a26c37..966aa6b92 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs @@ -76,6 +76,8 @@ protected virtual string CreateDbTablesScript(string schema) ""StatusName"" VARCHAR(50) NOT NULL ); +CREATE INDEX IF NOT EXISTS ""idx_received_ExpiresAt_StatusName"" ON {GetReceivedTableName()} (""ExpiresAt"",""StatusName""); + CREATE TABLE IF NOT EXISTS {GetPublishedTableName()}( ""Id"" BIGINT PRIMARY KEY NOT NULL, ""Version"" VARCHAR(20) NOT NULL, @@ -86,6 +88,23 @@ protected virtual string CreateDbTablesScript(string schema) ""ExpiresAt"" TIMESTAMP NULL, ""StatusName"" VARCHAR(50) NOT NULL ); + +CREATE INDEX IF NOT EXISTS ""idx_published_ExpiresAt_StatusName"" ON {GetPublishedTableName()}(""ExpiresAt"",""StatusName""); + +DO $$ +DECLARE + major_version INT; +BEGIN + SELECT split_part(current_setting('server_version'), '.', 1)::int INTO major_version; + + IF major_version >= 11 THEN + CREATE INDEX IF NOT EXISTS ""idx_received_Version_ExpiresAt_StatusName"" ON {GetReceivedTableName()} (""Version"",""ExpiresAt"",""StatusName"") INCLUDE (""Id"", ""Content"", ""Retries"", ""Added""); + CREATE INDEX IF NOT EXISTS ""idx_published_Version_ExpiresAt_StatusName"" ON {GetPublishedTableName()} (""Version"",""ExpiresAt"",""StatusName"") INCLUDE (""Id"", ""Content"", ""Retries"", ""Added""); + ELSE + CREATE INDEX IF NOT EXISTS ""idx_received_Version_ExpiresAt_StatusName"" ON {GetReceivedTableName()} (""Version"",""ExpiresAt"",""StatusName""); + CREATE INDEX IF NOT EXISTS ""idx_published_Version_ExpiresAt_StatusName"" ON {GetPublishedTableName()} (""Version"",""ExpiresAt"",""StatusName""); + END IF; +END $$; "; if (_capOptions.Value.UseStorageLock) batchSql += $@" diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs index 3c888e390..f775709b6 100644 --- a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -208,7 +208,7 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba var connection = new SqlConnection(_options.Value.ConnectionString); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( - $"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout AND (StatusName='{StatusName.Succeeded}' OR StatusName='{StatusName.Failed}');", + $"DELETE TOP (@batchCount) FROM {table} WITH (READPAST) WHERE ExpiresAt < @timeout AND StatusName IN('{StatusName.Succeeded}','{StatusName.Failed}');", null, new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false); } @@ -226,9 +226,12 @@ public Task> GetReceivedMessagesOfNeedRetry(TimeSpan public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, CancellationToken token = default) { - var sql = - $"SELECT Id,Content,Retries,Added,ExpiresAt FROM {_pubName} WITH (UPDLOCK,READPAST) WHERE Version=@Version " + - $"AND ((ExpiresAt< @TwoMinutesLater AND StatusName = '{StatusName.Delayed}') OR (ExpiresAt< @OneMinutesAgo AND StatusName = '{StatusName.Queued}'))"; + var sql = $@" + SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST) + WHERE Version = @Version AND StatusName = '{StatusName.Delayed}' AND ExpiresAt < @TwoMinutesLater + UNION ALL + SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST) + WHERE Version = @Version AND StatusName = '{StatusName.Queued}' AND ExpiresAt < @OneMinutesAgo;"; object[] sqlParams = { @@ -310,9 +313,10 @@ private async Task StoreReceivedMessage(object[] sqlParams) private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan lookbackSeconds) { var fourMinAgo = DateTime.Now.Subtract(lookbackSeconds); + var sql = - $"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " + - $"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; + $"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (READPAST) " + + $"WHERE Retries < @Retries AND Version = @Version AND Added < @Added AND StatusName IN ('{StatusName.Failed}', '{StatusName.Scheduled}');"; object[] sqlParams = { diff --git a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs index 41be73c4e..b2ffe81ba 100644 --- a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs @@ -97,7 +97,7 @@ public async Task> GetMessagesAsync(MessageQueryDto if (!string.IsNullOrEmpty(queryDto.Content)) where += " AND [Content] LIKE @Content"; var sqlQuery2008 = - $@"SELECT * FROM (SELECT p.*, ROW_NUMBER() OVER(ORDER BY p.Added DESC) AS RowNum FROM {tableName} as p WHERE 1=1 {where}) as tbl WHERE tbl.RowNum BETWEEN @Offset AND @Offset + @Limit"; + $"SELECT * FROM (SELECT p.*, ROW_NUMBER() OVER(ORDER BY p.Added DESC) AS RowNum FROM {tableName} AS p WHERE 1=1 {where}) as tbl WHERE tbl.RowNum BETWEEN @Offset AND @Offset + @Limit"; var sqlQuery = $"SELECT * FROM {tableName} WHERE 1=1 {where} ORDER BY Added DESC OFFSET @Offset ROWS FETCH NEXT @Limit ROWS ONLY"; @@ -147,7 +147,7 @@ public async Task> GetMessagesAsync(MessageQueryDto }, sqlParams: sqlParams).ConfigureAwait(false); return new PagedQueryResult - { Items = items, PageIndex = queryDto.CurrentPage, PageSize = queryDto.PageSize, Totals = count }; + { Items = items, PageIndex = queryDto.CurrentPage, PageSize = queryDto.PageSize, Totals = count }; } public ValueTask PublishedFailedCount() @@ -183,10 +183,10 @@ public ValueTask ReceivedSucceededCount() private async ValueTask GetNumberOfMessage(string tableName, string statusName) { var sqlQuery = - $"select count(Id) from {tableName} with (nolock) where StatusName = @state"; + $"SELECT COUNT(Id) FROM {tableName} WITH (NOLOCK) WHERE StatusName = @StatusName"; var connection = new SqlConnection(_options.ConnectionString); await using var _ = connection.ConfigureAwait(false); - return await connection.ExecuteScalarAsync(sqlQuery, new SqlParameter("@state", statusName)) + return await connection.ExecuteScalarAsync(sqlQuery, new SqlParameter("@StatusName", statusName)) .ConfigureAwait(false); } @@ -205,37 +205,34 @@ private Task> GetHourlyTimelineStats(string tableName, return GetTimelineStats(tableName, statusName, keyMaps); } - private async Task> GetTimelineStats( - string tableName, - string statusName, - IDictionary keyMaps) + private async Task> GetTimelineStats(string tableName, string statusName, IDictionary keyMaps) { var sqlQuery2008 = $@" -with aggr as ( - select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) as [Key], - count(Id) [Count] - from {tableName} - where StatusName = @statusName - group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) +WITH Aggr AS ( +SELECT REPLACE(CONVERT(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) AS [Key], + COUNT(Id) [Count] +FROM {tableName} +WHERE StatusName = @StatusName +GROUP BY REPLACE(CONVERT(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) ) -select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] <= @maxKey;"; +SELECT [Key], [Count] FROM Aggr WITH (NOLOCK) WHERE [Key] >= @MinKey AND [Key] <= @MaxKey;"; //SQL Server 2012+ var sqlQuery = $@" -with aggr as ( - select FORMAT(Added,'yyyy-MM-dd-HH') as [Key], - count(Id) [Count] - from {tableName} - where StatusName = @statusName - group by FORMAT(Added,'yyyy-MM-dd-HH') +WITH Aggr AS ( +SELECT FORMAT(Added,'yyyy-MM-dd-HH') AS [Key], + COUNT(Id) [Count] +FROM {tableName} +WHERE StatusName = @StatusName +GROUP BY FORMAT(Added,'yyyy-MM-dd-HH') ) -select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] <= @maxKey;"; +SELECT [Key], [Count] FROM Aggr WITH (NOLOCK) WHERE [Key] >= @MinKey AND [Key] <= @MaxKey;"; object[] sqlParams = { - new SqlParameter("@statusName", statusName), - new SqlParameter("@minKey", keyMaps.Keys.Min()), - new SqlParameter("@maxKey", keyMaps.Keys.Max()) + new SqlParameter("@StatusName", statusName), + new SqlParameter("@MinKey", keyMaps.Keys.Min()), + new SqlParameter("@MaxKey", keyMaps.Keys.Max()) }; Dictionary valuesMap; @@ -273,8 +270,7 @@ group by FORMAT(Added,'yyyy-MM-dd-HH') private async Task GetMessageAsync(string tableName, long id) { - var sql = - $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}"; + var sql = $"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (READPAST) WHERE Id={id}"; var connection = new SqlConnection(_options.ConnectionString); await using var _ = connection.ConfigureAwait(false); diff --git a/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs index d1a8ad654..0d22717af 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs @@ -86,6 +86,12 @@ [StatusName] [nvarchar](50) NOT NULL, [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY] + +CREATE NONCLUSTERED INDEX [IX_{GetReceivedTableName()}_Version_ExpiresAt_StatusName] ON {GetReceivedTableName()} ([Version] ASC,[ExpiresAt] ASC,[StatusName] ASC) +INCLUDE ([Id], [Content], [Retries], [Added]) + +CREATE NONCLUSTERED INDEX [IX_{GetReceivedTableName()}_ExpiresAt_StatusName] ON {GetReceivedTableName()} ([ExpiresAt] ASC,[StatusName] ASC) + END; IF OBJECT_ID(N'{GetPublishedTableName()}',N'U') IS NULL @@ -104,6 +110,12 @@ [StatusName] [nvarchar](50) NOT NULL, [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY] + +CREATE NONCLUSTERED INDEX [IX_{GetPublishedTableName()}_Version_ExpiresAt_StatusName] ON {GetPublishedTableName()} ([Version] ASC,[ExpiresAt] ASC,[StatusName] ASC) +INCLUDE ([Id], [Content], [Retries], [Added]) + +CREATE NONCLUSTERED INDEX [IX_{GetPublishedTableName()}_ExpiresAt_StatusName] ON {GetPublishedTableName()} ([ExpiresAt] ASC,[StatusName] ASC) + END; "; if (_capOptions.Value.UseStorageLock)