Skip to content

Commit

Permalink
Merge pull request #4 from catcherwong/dev
Browse files Browse the repository at this point in the history
Fix some known issue of redis barrier
  • Loading branch information
catcherwong authored Feb 28, 2022
2 parents a07838a + 8e11c1c commit aef4a2e
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 19 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build_and_it.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ jobs:
name: build on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
services:
redis6379:
image: redis
ports:
- 6379:6379
mysql:
image: 'mysql:8.0'
ports:
Expand Down
14 changes: 9 additions & 5 deletions src/DtmSERedisBarrier/RedisBranchBarrier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ public static async Task RedisCheckAdjustAmount(this BranchBarrier bb, StackExch
}
catch (System.Exception ex)
{
bb.Logger?.LogWarning(ex, "RedisCheckAdjustAmount lua return :{0}", result);
bb.Logger?.LogWarning(ex, "RedisCheckAdjustAmount lua error");
throw;
}

if (result.IsNull && bb.Op == Constant.TYPE_MSG && result.Equals(Constant.ResultDuplicated))
bb.Logger?.LogDebug("RedisCheckAdjustAmount, k0={0},k1={1},k2={2},v0={3},v1={4},v2={5} lua return={6}", key, bkey1, bkey2, amount, originOp, barrierExpire, result);

if (!result.IsNull && bb.Op == Constant.TYPE_MSG && ((string)result).Equals(Constant.ResultDuplicated))
throw new DtmDuplicatedException();

if (result.IsNull && result.Equals(Constant.ResultFailure))
if (!result.IsNull && ((string)result).Equals(Constant.ResultFailure))
throw new DtmFailureException();
}

Expand All @@ -49,11 +51,13 @@ public static async Task RedisQueryPrepared(this BranchBarrier bb, StackExchange
}
catch (System.Exception ex)
{
bb.Logger?.LogWarning(ex, "RedisQueryPrepared lua return :{0}", result);
bb.Logger?.LogWarning(ex, "RedisQueryPrepared lua error");
throw;
}

if (result.IsNull && result.Equals(Constant.ResultFailure))
bb.Logger?.LogDebug("RedisQueryPrepared, key={0} lua return={1}", bkey1, result);

if (!result.IsNull && ((string)result).Equals(Constant.ResultFailure))
throw new DtmFailureException();
}
}
Expand Down
85 changes: 75 additions & 10 deletions tests/BusiGrpcService/Services/BusiApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,64 @@ public override async Task<BusiReply> QueryPrepared(BusiReq request, ServerCallC
throw Dtmgrpc.DtmGImp.Utils.DtmError2GrpcError(ex);
}

public override async Task<Empty> QueryPreparedRedis(BusiReq request, ServerCallContext context)
public override async Task<Empty> TransInRedis(BusiReq request, ServerCallContext context)
{
_logger.LogInformation("TransInRedis req={req}", JsonSerializer.Serialize(request));

var barrier = _barrierFactory.CreateBranchBarrier(context);

await DoSomethingWithgRpcException(async () =>
{
await barrier.RedisCheckAdjustAmount(await GetRedis(), GetRedisAccountKey(TransInUID), (int)request.Amount, 86400);
});

try
return new Empty();
}

public override async Task<Empty> TransInRevertRedis(BusiReq request, ServerCallContext context)
{
var barrier = _barrierFactory.CreateBranchBarrier(context);

await DoSomethingWithgRpcException(async () =>
{
// NOTE: this redis connection code is only for sample, don't use in production
var config = StackExchange.Redis.ConfigurationOptions.Parse("localhost:6379");
var conn = await StackExchange.Redis.ConnectionMultiplexer.ConnectAsync(config);
await barrier.RedisQueryPrepared(conn.GetDatabase(), 86400);
}
catch (Exception ex)
await barrier.RedisCheckAdjustAmount(await GetRedis(), GetRedisAccountKey(TransInUID), -(int)request.Amount, 86400);
});

return new Empty();
}

public override async Task<Empty> TransOutRedis(BusiReq request, ServerCallContext context)
{
var barrier = _barrierFactory.CreateBranchBarrier(context);

await DoSomethingWithgRpcException(async () =>
{
Dtmgrpc.DtmGImp.Utils.DtmError2GrpcError(ex);
}
await barrier.RedisCheckAdjustAmount(await GetRedis(), GetRedisAccountKey(TransOutUID), -(int)request.Amount, 86400);
});

return new Empty();
}

public override async Task<Empty> TransOutRevertRedis(BusiReq request, ServerCallContext context)
{
var barrier = _barrierFactory.CreateBranchBarrier(context);

await DoSomethingWithgRpcException(async () =>
{
await barrier.RedisCheckAdjustAmount(await GetRedis(), GetRedisAccountKey(TransOutUID), (int)request.Amount, 86400);
});

return new Empty();
}

public override async Task<Empty> QueryPreparedRedis(BusiReq request, ServerCallContext context)
{
var barrier = _barrierFactory.CreateBranchBarrier(context);

await DoSomethingWithgRpcException(async () =>
{
await barrier.RedisQueryPrepared(await GetRedis(), 86400);
});

return new Empty();
}
Expand Down Expand Up @@ -225,6 +268,14 @@ await barrier.Call(conn, async (tx) =>

private MySqlConnection GetBarrierConn() => new("Server=localhost;port=3306;User ID=root;Password=123456;Database=dtm_barrier");

private async Task<StackExchange.Redis.IDatabase> GetRedis()
{
// NOTE: this redis connection code is only for sample, don't use in production
var config = StackExchange.Redis.ConfigurationOptions.Parse("localhost:6379");
var conn = await StackExchange.Redis.ConnectionMultiplexer.ConnectAsync(config);
return conn.GetDatabase();
}

private async Task SagaGrpcAdjustBalance(DbConnection conn, DbTransaction tx, int uid, int amount, string result)
{
_logger.LogInformation("SagaGrpcAdjustBalance uid={uid}, amount={amount}, result={result}", uid, amount, result);
Expand All @@ -239,5 +290,19 @@ await conn.ExecuteAsync(
param: new { balance = amount, user_id = uid },
transaction: tx);
}

private string GetRedisAccountKey(int uid) => $"dtm:busi:redis-account-key-{uid}";

private async Task DoSomethingWithgRpcException(Func<Task> func)
{
try
{
await func();
}
catch (Exception ex)
{
Dtmgrpc.DtmGImp.Utils.DtmError2GrpcError(ex);
}
}
}
}
3 changes: 2 additions & 1 deletion tests/BusiGrpcService/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Microsoft.AspNetCore": "Warning",
"Dtm": "Debug"
}
},
"AllowedHosts": "*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\Dtmgrpc\Dtmgrpc.csproj" />
<ProjectReference Include="..\..\src\DtmSERedisBarrier\DtmSERedisBarrier.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion tests/Dtmgrpc.IntegrationTests/GrpcCallInvokerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async Task CallInvokerAsync()
Assert.NotNull(resp.Gid);
}

[Fact]
[Fact(Skip = "")]
public async Task DtmTimoutTest()
{
var provider = ITTestHelper.AddDtmGrpc(1);
Expand Down
60 changes: 58 additions & 2 deletions tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Dtmgrpc.IntegrationTests
{
Expand Down Expand Up @@ -47,11 +50,11 @@ public class TransGlobalStore
public TransGlobalStore Transaction { get; set; }
}

public static busi.BusiReq GenBusiReq(bool outFailed, bool inFailed)
public static busi.BusiReq GenBusiReq(bool outFailed, bool inFailed, int amount = 30)
{
return new busi.BusiReq
{
Amount = 30,
Amount = amount,
TransOutResult = outFailed ? "FAILURE" : "",
TransInResult = inFailed ? "FAILURE" : ""
};
Expand All @@ -71,5 +74,58 @@ public static ServiceProvider AddDtmGrpc(int dtmTimout = 10000)
var provider = services.BuildServiceProvider();
return provider;
}

public static string GetRedisAccountKey(int uid) => $"dtm:busi:redis-account-key-{uid}";

public static async Task<StackExchange.Redis.IDatabase> GetRedis()
{
// NOTE: this redis connection code is only for sample, don't use in production
var config = StackExchange.Redis.ConfigurationOptions.Parse("localhost:6379");
var conn = await StackExchange.Redis.ConnectionMultiplexer.ConnectAsync(config);
return conn.GetDatabase();
}
}

[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class TestPriorityAttribute : Attribute
{
public int Priority { get; private set; }

public TestPriorityAttribute(int priority) => Priority = priority;
}

public class PriorityOrderer : ITestCaseOrderer
{
public IEnumerable<TTestCase> OrderTestCases<TTestCase>(
IEnumerable<TTestCase> testCases) where TTestCase : ITestCase
{
string assemblyName = typeof(TestPriorityAttribute).AssemblyQualifiedName!;
var sortedMethods = new SortedDictionary<int, List<TTestCase>>();
foreach (TTestCase testCase in testCases)
{
int priority = testCase.TestMethod.Method
.GetCustomAttributes(assemblyName)
.FirstOrDefault()
?.GetNamedArgument<int>(nameof(TestPriorityAttribute.Priority)) ?? 0;

GetOrCreate(sortedMethods, priority).Add(testCase);
}

foreach (TTestCase testCase in
sortedMethods.Keys.SelectMany(
priority => sortedMethods[priority].OrderBy(
testCase => testCase.TestMethod.Method.Name)))
{
yield return testCase;
}
}

private static TValue GetOrCreate<TKey, TValue>(
IDictionary<TKey, TValue> dictionary, TKey key)
where TKey : struct
where TValue : new() =>
dictionary.TryGetValue(key, out TValue result)
? result
: (dictionary[key] = new TValue());
}
}
100 changes: 100 additions & 0 deletions tests/Dtmgrpc.IntegrationTests/MsgGrpcRedisBarrierTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using DtmSERedisBarrier;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;
using Xunit;

namespace Dtmgrpc.IntegrationTests
{
public class MsgGrpcRedisBarrierTest
{
[Fact, TestPriority(0)]
public async Task Submit_Should_Succeed()
{
var provider = ITTestHelper.AddDtmGrpc();
var transFactory = provider.GetRequiredService<IDtmTransFactory>();

// init balance
var b1 = 1000;
var b2 = 0;
await SetBalanceByUID(1, b1);
await SetBalanceByUID(2, b2);

var gid = "msg-Submit_Should_Succeed" + DateTime.Now.ToString("MMddHHmmss");
var msg = transFactory.NewMsgGrpc(gid);
var req = ITTestHelper.GenBusiReq(false, false, 30);
var busiGrpc = ITTestHelper.BuisgRPCUrl;
msg.Add(busiGrpc + "/busi.Busi/TransInRedis", req);

await msg.DoAndSubmit(busiGrpc + "/busi.Busi/QueryPreparedRedis", async bb =>
{
await bb.RedisCheckAdjustAmount(await ITTestHelper.GetRedis(), ITTestHelper.GetRedisAccountKey(1), -30, 86400);
});

await Task.Delay(2000);
var status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("succeed", status);

var b11 = await GetBalanceByUID(1);
var b21 = await GetBalanceByUID(2);

Assert.NotEqual(b1, b11);
Assert.Equal(b1 + b2, b11 + b21);
}

[Fact, TestPriority(1)]
public async Task Submit_Should_Failed()
{
var provider = ITTestHelper.AddDtmGrpc();
var transFactory = provider.GetRequiredService<IDtmTransFactory>();

// init balance
var b1 = 1000;
var b2 = 0;
await SetBalanceByUID(1, b1);
await SetBalanceByUID(2, b2);

var gid = "msg-Submit_Should_Failed" + DateTime.Now.ToString("MMddHHmmss");
var msg = transFactory.NewMsgGrpc(gid);
var req = ITTestHelper.GenBusiReq(false, false, 30);
var busiGrpc = ITTestHelper.BuisgRPCUrl;
msg.Add(busiGrpc + "/busi.Busi/TransInRedis", req);

await Assert.ThrowsAnyAsync<Exception>(async () =>
{
await msg.DoAndSubmit(busiGrpc + "/busi.Busi/QueryPreparedRedis", async bb =>
{
await Task.CompletedTask;
throw new Exception("ex");
});
});

var b11 = await GetBalanceByUID(1);
var b21 = await GetBalanceByUID(2);

Assert.Equal(b1, b11);
Assert.Equal(b2, b21);
}


private async Task<int> GetBalanceByUID(int uid)
{
var db = await ITTestHelper.GetRedis();
var key = ITTestHelper.GetRedisAccountKey(uid);
var res = await db.StringGetAsync(key);
if (res.TryParse(out int val))
{
return val;
}

throw new Exception("ex ex ex");
}

private async Task SetBalanceByUID(int uid, int amount)
{
var db = await ITTestHelper.GetRedis();
var key = ITTestHelper.GetRedisAccountKey(uid);
await db.StringSetAsync(key, amount);
}
}
}

0 comments on commit aef4a2e

Please sign in to comment.