Skip to content

Commit

Permalink
Merge pull request #631 from nhsconnect/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
ChristopherJamesMorris authored Jun 20, 2024
2 parents 9052df8 + 25d0502 commit 79f4634
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 108 deletions.
2 changes: 1 addition & 1 deletion modules/api/src/Controllers/ReportingController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public async Task<IActionResult> RouteReportRequest([FromBody] RouteReportReques
}

[HttpPost("createinteractionmessage")]
public IActionResult SendMessageToCreateInteractionReportContentAsync([FromBody] List<ReportInteractionRequest> reportInteractionRequest)
public IActionResult SendMessageToCreateInteractionReportContentAsync([FromBody] ReportInteractionRequest reportInteractionRequest)
{
var result = _service.SendMessageToCreateInteractionReportContent(reportInteractionRequest);
return Ok(result);
Expand Down
2 changes: 1 addition & 1 deletion modules/api/src/Service/Interfaces/IReportingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface IReportingService
public Task<Stream> ExportReport(ReportRequest reportRequest);
public Task<Stream> CreateInteractionReport(ReportCreationRequest reportCreationRequest);
public Task<string> RouteReportRequest(RouteReportRequest routeReportRequest);
public HttpStatusCode SendMessageToCreateInteractionReportContent(List<ReportInteractionRequest> reportInteractionRequest);
public Task SendMessageToCreateInteractionReportContent(ReportInteractionRequest reportInteractionRequest);
public Task<List<Report>> GetReports();
public Task<List<CapabilityReport>> GetCapabilityReports();
public Task<MemoryStream> ExportBySpineMessage(int spineMessageId, string reportName);
Expand Down
23 changes: 1 addition & 22 deletions modules/api/src/Service/MessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,14 @@ public async Task<HttpStatusCode> SendMessageToQueue(SendMessageRequest sendMess
return HttpStatusCode.ServiceUnavailable;
}


public async Task<HttpStatusCode> SendMessageBatchToQueue(SendMessageBatchRequest sendMessageBatchRequest)
{
sendMessageBatchRequest.QueueUrl = _sqsClientFactory.GetSqsQueue();
var sqsClient = _sqsClientFactory.GetSqsClient();
if (sqsClient != null)
{
var response = await sqsClient.SendMessageBatchAsync(sendMessageBatchRequest, CancellationToken.None);

if (response.Successful.Count > 0)
{
foreach (var success in response.Successful)
{
_logger.LogInformation(success.Id);
_logger.LogInformation(success.MessageId);
_logger.LogInformation(success.MD5OfMessageBody);
}
}

if (response.Failed.Count > 0)
{
foreach (var fail in response.Failed)
{
_logger.LogInformation(fail.Id);
_logger.LogInformation(fail.Code);
_logger.LogInformation(fail.Message);
_logger.LogInformation(fail.SenderFault.ToString());
}
}

return response.HttpStatusCode;
}
return HttpStatusCode.ServiceUnavailable;
Expand Down
21 changes: 10 additions & 11 deletions modules/api/src/Service/ReportingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using Newtonsoft.Json;
using System.Data;
using System.Linq.Dynamic.Core;
using System.Net;

namespace GpConnect.AppointmentChecker.Api.Service;

Expand All @@ -32,19 +31,19 @@ public ReportingService(ILogger<ReportingService> logger, IMessageService messag
_interactionService = interactionService ?? throw new ArgumentNullException(nameof(interactionService));
}

public HttpStatusCode SendMessageToCreateInteractionReportContent(List<ReportInteractionRequest> reportInteractionRequest)
public async Task SendMessageToCreateInteractionReportContent(ReportInteractionRequest reportInteractionRequest)
{
Parallel.ForEach(reportInteractionRequest, new ParallelOptions() { MaxDegreeOfParallelism = 30 }, request =>
var request = JsonConvert.SerializeObject(reportInteractionRequest, new JsonSerializerSettings
{
var json = JsonConvert.SerializeObject(request, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
Formatting = Formatting.Indented
});
var sendMessageRequest = new SendMessageRequest() { MessageBody = json, MessageGroupId = request.MessageGroupId.ToString() };
_messageService.SendMessageToQueue(sendMessageRequest);
NullValueHandling = NullValueHandling.Ignore,
Formatting = Formatting.Indented
});

await _messageService.SendMessageToQueue(new SendMessageRequest()
{
MessageGroupId = reportInteractionRequest.MessageGroupId.ToString(),
MessageBody = request
});
return HttpStatusCode.OK;
}

public async Task<Stream> ExportReport(ReportRequest reportRequest)
Expand Down
85 changes: 44 additions & 41 deletions modules/function/src/CapabilityReportEventFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using Microsoft.AspNetCore.Http.Extensions;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
Expand All @@ -26,6 +25,7 @@ public class CapabilityReportEventFunction
private readonly StorageConfiguration _storageConfiguration;
private ILambdaContext _lambdaContext;
private Stopwatch _stopwatch;
private ParallelOptions _parallelOptions;

public CapabilityReportEventFunction()

Check warning on line 30 in modules/function/src/CapabilityReportEventFunction.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field '_storageConfiguration' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 30 in modules/function/src/CapabilityReportEventFunction.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field '_lambdaContext' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
{
Expand All @@ -34,6 +34,11 @@ public CapabilityReportEventFunction()
_endUserConfiguration = JsonConvert.DeserializeObject<EndUserConfiguration>(_secretManager.Get("enduser-configuration"));

Check warning on line 34 in modules/function/src/CapabilityReportEventFunction.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference assignment.
_storageConfiguration = JsonConvert.DeserializeObject<StorageConfiguration>(_secretManager.Get("storage-configuration"));

Check warning on line 35 in modules/function/src/CapabilityReportEventFunction.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference assignment.

_parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0))
};

var apiUrl = _endUserConfiguration?.ApiBaseUrl ?? throw new ArgumentNullException("ApiBaseUrl");

_httpClient = new HttpClient
Expand All @@ -53,16 +58,12 @@ public async Task<HttpStatusCode> FunctionHandler(ILambdaContext lambdaContext)
{
_stopwatch.Start();
_lambdaContext = lambdaContext;
_lambdaContext.Logger.LogLine("Removing current transaction objects");
await Reset(Objects.Transient, Objects.Key, Objects.Completion);
_lambdaContext.Logger.LogLine("Obtaining list of roles");
var rolesSource = await StorageManager.Get<List<string>>(new StorageDownloadRequest { BucketName = _storageConfiguration.BucketName, Key = _storageConfiguration.RolesObject });
_lambdaContext.Logger.LogLine("Obtaining organisational ods data");
var odsList = await GetOdsData(rolesSource.ToArray());
_lambdaContext.Logger.LogLine("Setting up messages for queue");
var messages = await AddMessagesToQueue(odsList);
_lambdaContext.Logger.LogLine("Processing messages in queue");
return await ProcessMessages(messages);
var statusCode = await GenerateMessages(messages);
return statusCode;
}

private async Task<string[]> GetOdsData(string[] roles)
Expand All @@ -82,23 +83,32 @@ private async Task<string[]> GetOdsData(string[] roles)
return JsonConvert.DeserializeObject<string[]>(body, _options);
}

private async Task<List<List<MessagingRequest>>?> AddMessagesToQueue(string[] odsList)
private async Task<IEnumerable<MessagingRequest>> AddMessagesToQueue(string[] odsList)
{
var codesSuppliers = await LoadDataSource();
var dataSource = codesSuppliers.Where(x => odsList.Contains(x.OdsCode)).ToList();

var messages = new ConcurrentDictionary<MessagingRequest, int>();

if (dataSource != null && dataSource.Any())
{
var dataSourceCount = dataSource.Count;
var capabilityReports = await GetCapabilityReports();

var bag = new ConcurrentBag<List<MessagingRequest>>();
var tasks = capabilityReports.Select(async capabilityReport =>
var batchSize = 50;
var iterationCount = dataSourceCount / batchSize;

await Parallel.ForEachAsync(capabilityReports, _parallelOptions, async (capabilityReport, ct) =>
{
var x = 0;
var y = 0;

var messageGroupId = Guid.NewGuid();

var interactionRequest = new InteractionRequest
{
WorkflowId = capabilityReport.Workflow?.FirstOrDefault(),
InteractionId = capabilityReport.Interaction?.FirstOrDefault(),
WorkflowId = capabilityReport.Workflow != null ? capabilityReport.Workflow.FirstOrDefault() : null,
InteractionId = capabilityReport.Interaction != null ? capabilityReport.Interaction.FirstOrDefault() : null,
ReportName = capabilityReport.ReportName,
ReportId = capabilityReport.ReportId
};
Expand All @@ -112,49 +122,42 @@ await StorageManager.Post(new StorageUploadRequest
InputBytes = Encoding.UTF8.GetBytes(interactionBytes)
});

var start = 0;
var finish = dataSourceCount;
var increment = 100;

while (start < finish)
while (y <= iterationCount)
{
var messages = from request in dataSource.GetRange(start, !((start + increment) > finish) ? increment : finish - start)
select new MessagingRequest
{
DataSource = new DataSource() { OdsCode = request.OdsCode, SupplierName = request.SupplierName },
ReportName = capabilityReport.ReportName,
ReportId = capabilityReport.ReportId,
Interaction = capabilityReport.Interaction,
Workflow = capabilityReport.Workflow,
MessageGroupId = capabilityReport.MessageGroupId
};
bag.Add(messages.ToList());
start += increment;
messages.TryAdd(new MessagingRequest()
{
DataSource = dataSource.GetRange(x, x + batchSize > dataSourceCount ? dataSourceCount - x : batchSize),
ReportName = capabilityReport.ReportName,
Interaction = capabilityReport.Interaction,
Workflow = capabilityReport.Workflow,
MessageGroupId = messageGroupId,
ReportId = capabilityReport.ReportId
}, Environment.CurrentManagedThreadId);
x += batchSize;
y++;
}
});
await Task.WhenAll(tasks);
return bag.ToList();
}
return null;
return messages.Select(x => x.Key);
}

private async Task<HttpStatusCode> ProcessMessages(List<List<MessagingRequest>> messages)
private async Task<HttpStatusCode> GenerateMessages(IEnumerable<MessagingRequest> messagingRequests)
{
_lambdaContext.Logger.LogLine("Processing messages: " + messages.Count);
Parallel.ForEach(messages, new ParallelOptions() { MaxDegreeOfParallelism = 3 }, message =>
await Parallel.ForEachAsync(messagingRequests, _parallelOptions, async (messagingRequest, ct) =>
{
_lambdaContext.Logger.LogLine("Number of messages being sent is " + message.Count);
var json = new StringContent(JsonConvert.SerializeObject(message, null, _options),
var json = new StringContent(JsonConvert.SerializeObject(messagingRequest, null, _options),
Encoding.UTF8,
MediaTypeHeaderValue.Parse("application/json").MediaType);
var response = _httpClient.PostWithHeadersAsync("/reporting/createinteractionmessage", new Dictionary<string, string>()

await _httpClient.PostWithHeadersAsync("/reporting/createinteractionmessage", new Dictionary<string, string>()
{
[Headers.UserId] = _endUserConfiguration.UserId,
[Headers.ApiKey] = _endUserConfiguration.ApiKey
}, json).Result;
}, json);
});
_lambdaContext.Logger.LogLine("Completed Processing messages: " + messages.Count);

_stopwatch.Stop();
_lambdaContext.Logger.LogLine($"Completed generation of {messagingRequests.Count()} messages in {_stopwatch.Elapsed:mm':'ss' minutes'}");
return HttpStatusCode.OK;
}

Expand Down
5 changes: 1 addition & 4 deletions modules/function/src/CompletionFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
using GpConnect.AppointmentChecker.Function.DTO.Request;
using GpConnect.AppointmentChecker.Function.Helpers;
using GpConnect.AppointmentChecker.Function.Helpers.Constants;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Http.Headers;
Expand Down Expand Up @@ -93,7 +90,7 @@ private async Task BundleUpJsonResponsesAndSendReport()
responses.Add(jsonData);
}

var responseObject = responses.Select(JArray.Parse).SelectMany(token => token).OrderBy(x => x["ODS_Code"]);
var responseObject = responses.Select(JArray.Parse).SelectMany(token => token).DistinctBy(x => x["ODS_Code"]).OrderBy(x => x["ODS_Code"]);
string combinedJson = JsonConvert.SerializeObject(responseObject, Formatting.Indented);

var interactionObject = await StorageManager.Get<ReportInteraction>(new StorageDownloadRequest { BucketName = keyObject.BucketName, Key = keyObject.Key });
Expand Down
2 changes: 1 addition & 1 deletion modules/function/src/DTO/Request/MessagingRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class MessagingRequest
{
public DataSource DataSource { get; set; }
public List<DataSource> DataSource { get; set; }
public List<string>? Interaction { get; set; }
public List<string>? Workflow { get; set; }
public string? ReportName { get; set; } = null;
Expand Down
2 changes: 1 addition & 1 deletion modules/function/src/DTO/Request/ReportInteraction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace GpConnect.AppointmentChecker.Function.DTO.Request;

public class ReportInteraction
{
public ReportSource ReportSource { get; set; } = null;
public List<ReportSource> ReportSource { get; set; } = null;
public List<string>? Interaction { get; set; }
public List<string>? Workflow { get; set; }
public string ObjectKeyJson => $"{Helpers.Constants.Objects.Transient}_{ReportName?.ReplaceNonAlphanumeric()}_{DateTime.Now.ToString("s").ReplaceNonAlphanumeric()}_{GetTypeLabel(Interaction, Workflow)}_{ Guid.NewGuid() }.json".ToLower();
Expand Down
45 changes: 19 additions & 26 deletions modules/function/src/SQSEventFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContex
var reportRequest = await ProcessMessageAsync(message);
var response = await RouteReportRequest(reportRequest);

_lambdaContext.Logger.LogLine("RouteReportRequest IsSuccessStatusCode " + response.IsSuccessStatusCode);

if (response.IsSuccessStatusCode)
{
await GenerateTransientJsonForReport(reportRequest.ObjectKeyJson, response);
Expand All @@ -84,17 +82,26 @@ public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContex
var messageRequest = JsonConvert.DeserializeObject<MessagingRequest>(message.Body);
if (messageRequest != null)
{
var hierarchy = await GetOrganisationHierarchy(messageRequest.DataSource.OdsCode);
if (hierarchy != null)
{
reportInteraction = new()
var json = new StringContent(JsonConvert.SerializeObject(messageRequest.DataSource.Select(x => x.OdsCode).ToList(), null, _options),
Encoding.UTF8,
MediaTypeHeaderValue.Parse("application/json").MediaType);

var response = await _httpClient.PostWithHeadersAsync("/hierarchy", new Dictionary<string, string>()
{
[Headers.UserId] = _endUserConfiguration.UserId,
[Headers.ApiKey] = _endUserConfiguration.ApiKey
}, json);

response.EnsureSuccessStatusCode();

if (response != null)
{
var body = await response.Content.ReadAsStringAsync();
var hierarchySource = JsonConvert.DeserializeObject<List<OrganisationHierarchy>>(body, _options);

reportInteraction = new ReportInteraction()
{
ReportSource = new()
{
OdsCode = messageRequest.DataSource.OdsCode,
SupplierName = messageRequest.DataSource.SupplierName,
OrganisationHierarchy = hierarchy
},
ReportSource = messageRequest.DataSource.Select(x => new ReportSource() { OdsCode = x.OdsCode, SupplierName = x.SupplierName, OrganisationHierarchy = hierarchySource.FirstOrDefault(y => y.OdsCode == x.OdsCode) }).ToList(),
ReportName = messageRequest.ReportName,
Interaction = messageRequest.Interaction,
Workflow = messageRequest.Workflow,
Expand All @@ -111,20 +118,6 @@ public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContex
}
}

private async Task<OrganisationHierarchy> GetOrganisationHierarchy(string odsCode)
{
var response = await _httpClient.GetWithHeadersAsync($"/hierarchy/{odsCode}", new Dictionary<string, string>()
{
[Headers.UserId] = _endUserConfiguration.UserId,
[Headers.ApiKey] = _endUserConfiguration.ApiKey
});
_lambdaContext.Logger.LogLine("GetOrganisationHierarchy.StatusCode");
_lambdaContext.Logger.LogLine(response.StatusCode.ToString());
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<OrganisationHierarchy> (body, _options);
}

private async Task<HttpResponseMessage?> RouteReportRequest(ReportInteraction reportInteraction)
{
try
Expand Down

0 comments on commit 79f4634

Please sign in to comment.