Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1639 - LambdaTestTool - Sync Invoke Endpoint #1640

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -13,13 +15,14 @@ public class RuntimeApiController : ControllerBase
private const string HEADER_BREAK = "-----------------------------------";
private readonly IRuntimeApiDataStore _runtimeApiDataStore;

private static readonly ConcurrentDictionary<string, string> _registeredExtensions = new ();

public RuntimeApiController(IRuntimeApiDataStore runtimeApiDataStore)
{
_runtimeApiDataStore = runtimeApiDataStore;
}

[HttpPost("/runtime/test-event")]
[HttpPost("/2015-03-31/functions/function/invocations")]
public async Task<IActionResult> PostTestEvent()
{
using var reader = new StreamReader(Request.Body);
Expand All @@ -28,6 +31,124 @@ public async Task<IActionResult> PostTestEvent()

return Accepted();
}

[HttpPost("/2020-01-01/extension/register")]
public async Task<IActionResult> PostRegisterExtension()
{
using var reader = new StreamReader(Request.Body);
// Read and discard - do not need the body
await reader.ReadToEndAsync();

var extensionId = Guid.NewGuid().ToString();
var extensionName = Request.Headers["Lambda-Extension-Name"];
_registeredExtensions.TryAdd(extensionId, extensionName);

Response.Headers["Lambda-Extension-Identifier"] = extensionId;
return Ok();
}

[HttpGet("/2020-01-01/extension/event/next")]
public async Task<IActionResult> GetNextExtensionEvent()
{
var extensionId = Request.Headers["Lambda-Extension-Identifier"];
if (_registeredExtensions.ContainsKey(extensionId)) {
await Task.Delay(TimeSpan.FromSeconds(15));
Console.WriteLine(HEADER_BREAK);
Console.WriteLine($"Extension ID: {extensionId} - Returning NoContent");
return NoContent();
} else {
Console.WriteLine(HEADER_BREAK);
Console.Error.WriteLine($"Extension ID: {extensionId} - Was not found - Returning 404 - ");
return NotFound();
}
}

[HttpPost("/2015-03-31/functions/{functionName}/invocations")]
public async Task<IActionResult> PostTestInvokeEvent(string functionName)
{
using var reader = new StreamReader(Request.Body);
var testEvent = await reader.ReadToEndAsync();
var eventContainer = _runtimeApiDataStore.QueueEvent(testEvent);

// Need a task completion source so we can block until the event is executed.
var tcs = new TaskCompletionSource();

eventContainer.OnSuccess += () =>
{
try {
tcs.SetResult();
} catch (InvalidOperationException) {
// This can happen if both OnSuccess and OnError are called
// Oddly, this does happen 1 time in 50 million requests
// We can't check a variable because it's a race condition
}
};

eventContainer.OnError += () =>
{
try {
tcs.SetResult();
} catch (InvalidOperationException) {
// See note above
}
};

// Wait for our event to process
// TODO: This is where we can timeout if the event was not processed in time
try {
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(60));
} catch (TimeoutException e) {
eventContainer.ReportErrorResponse("Task Timed Out", "Error");

return Ok(new {
StatusCode = 202,
FunctionError = "Unhandled",
ExecutedVersion = "$LATEST",
Payload = "{\"errorMessage\":\"Task Timed Out\",\"errorType\":\"Error\"}"
});
} catch (Exception e) {
// This is a catch all for any other exceptions
// We should not get here
eventContainer.ReportErrorResponse("Unhandled Error", "Error");

return Ok(new {
StatusCode = 202,
FunctionError = "Unhandled",
ExecutedVersion = "$LATEST",
Payload = "{\"errorMessage\":\"Unhandled Error\",\"errorType\":\"Error\"}"
});
}

if (eventContainer.ErrorResponse != null)
{
if (eventContainer.ErrorType == "Throttled")
{
return Ok(new {
StatusCode = 429,
FunctionError = "Throttled",
ExecutedVersion = "$LATEST",
Payload = "{\"errorMessage\":\"Rate Exceeded.\"}"
});
}
return Ok(new {
StatusCode = 200,
FunctionError = "Unhandled",
ExecutedVersion = "$LATEST",
Payload = "{\"errorMessage\":\"An error occurred.\",\"errorType\":\"Error\"}"
});
}

var response = new
{
StatusCode = 200, // Accepted
// FunctionError = null, // TODO: Set this if there was an error
// LogResult = null, // TODO: Set this to the base64-encoded last 4 KB of log data produced by the function
Payload = eventContainer.Response, // Set this to the response from the function
// ExecutedVersion = null // TODO: Set this to the version of the function that was executed
};

return Ok(response);
}

[HttpPost("/2018-06-01/runtime/init/error")]
public IActionResult PostInitError([FromHeader(Name = "Lambda-Runtime-Function-Error-Type")] string errorType, [FromBody] string error)
Expand All @@ -50,6 +171,8 @@ public async Task GetNextInvocation()
Console.WriteLine(HEADER_BREAK);
Console.WriteLine($"Next invocation returned: {activeEvent.AwsRequestId}");

// Set Deadline to 5 minutes from now in unix epoch ms
Response.Headers["Lambda-Runtime-Deadline-Ms"] = DateTimeOffset.UtcNow.AddMinutes(5).ToUnixTimeMilliseconds().ToString();
Response.Headers["Lambda-Runtime-Aws-Request-Id"] = activeEvent.AwsRequestId;
Response.Headers["Lambda-Runtime-Trace-Id"] = Guid.NewGuid().ToString();
Response.Headers["Lambda-Runtime-Invoked-Function-Arn"] = activeEvent.FunctionArn;
Expand All @@ -64,6 +187,11 @@ public async Task GetNextInvocation()
await Response.Body.WriteAsync(buffer, 0, buffer.Length);
Response.Body.Close();
}

// TODO: The response from this function is the active event
// The event gets run by the lambda that called this endpoint

// This is where we need to setup a timeout for the event
}

[HttpPost("/2018-06-01/runtime/invocation/{awsRequestId}/response")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

<p>
For Lambda functions written as executable assemblies, i.e. custom runtimes functions and top level statement functions, this page can be used for testing the functions locally.
Set the <b>AWS_LAMBDA_RUNTIME_API</b> environment variable to <b>@httpContextAccessor.HttpContext.Request.Host</b> while debugging executable assembly
Set the <b>AWS_LAMBDA_RUNTIME_API</b> environment variable to <b>@httpContextAccessor?.HttpContext?.Request?.Host</b> while debugging executable assembly
Lambda function. More information can be found in the <a href="/documentation">documentation</a>.
</p>

Expand Down Expand Up @@ -75,6 +75,13 @@
<pre class="form-control" style="@Constants.ResponseErrorStyleSizeConstraint" >@RuntimeApiModel.ActiveEvent.ErrorResponse</pre>
</p>
}
else if (RuntimeApiModel.ActiveEvent.Response == null)
{
<p>
<b>Response:</b>
<pre class="form-control" style="@Constants.ResponseErrorStyleSizeConstraint" >No Response Received</pre>
</p>
}
else
{
<p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.TestTool.BlazorTester.Services
{
Expand Down Expand Up @@ -47,6 +48,23 @@ public EventContainer QueueEvent(string eventBody)
{
_queuedEvents.Add(evnt);
}

// Start a task that will cancel the event after a timeout or dispatch
Task.Run(async () => {
// Wait for the event to be dispatched or timeout
try {
await evnt.DispatchedTCS.Task.WaitAsync(evnt.TimedOutCTS.Token);
} catch (TaskCanceledException) {
// If the event was cancelled then it timed out
lock(_lock) {
_queuedEvents.Remove(evnt);
}
evnt.Cancel("Lambda throttled response error");
return;
}

// If the event was not cancelled then it was dispatched, and we're good
});

RaiseStateChanged();
return evnt;
Expand All @@ -57,22 +75,28 @@ public bool TryActivateEvent(out IEventContainer activeEvent)
activeEvent = null;
lock(_lock)
{
if (!_queuedEvents.Any())
{
return false;
}
while (true) {
if (!_queuedEvents.Any())
{
return false;
}

var evnt = _queuedEvents[0];
_queuedEvents.RemoveAt(0);
evnt.EventStatus = IEventContainer.Status.Executing;
if (ActiveEvent != null)
{
_executedEvents.Add(ActiveEvent as EventContainer);
var evnt = _queuedEvents[0];
_queuedEvents.RemoveAt(0);
if (evnt.MarkExecuting()) {
if (ActiveEvent != null)
{
_executedEvents.Add(ActiveEvent as EventContainer);
}
ActiveEvent = evnt;
activeEvent = ActiveEvent;
RaiseStateChanged();
return true;
}

// If we get here there was an event but it was already failed (timed out)
// Loop around and check if there are more events
}
ActiveEvent = evnt;
activeEvent = ActiveEvent;
RaiseStateChanged();
return true;
}
}

Expand Down Expand Up @@ -213,7 +237,10 @@ public enum Status {Queued, Executing, Success, Failure}

public class EventContainer : IEventContainer
{

public Action OnSuccess { get; set; }

public Action OnError { get; set; }

private const string defaultFunctionArn = "arn:aws:lambda:us-west-2:123412341234:function:Function";
public string AwsRequestId { get; }
public string EventJson { get; }
Expand All @@ -223,13 +250,31 @@ public class EventContainer : IEventContainer

public string Response { get; private set; }

public bool MarkExecuting()
{
lock(_statusLock){
if (EventStatus == IEventContainer.Status.Queued)
{
EventStatus = IEventContainer.Status.Executing;

// Mark that the event has been dispatched
DispatchedTCS.TrySetResult();
return true;
}
}
return false;
}

public DateTime LastUpdated { get; private set; }

public TaskCompletionSource DispatchedTCS { get; private set; } = new ();
public CancellationTokenSource TimedOutCTS { get; private set; }
private readonly object _statusLock = new();
private IEventContainer.Status _status = IEventContainer.Status.Queued;
public IEventContainer.Status EventStatus
{
get => _status;
set
private set
{
_status = value;
LastUpdated = DateTime.Now;
Expand All @@ -243,6 +288,8 @@ public EventContainer(RuntimeApiDataStore dataStore, int eventCount, string even
this._dataStore = dataStore;
this.AwsRequestId = eventCount.ToString("D12");
this.EventJson = eventJson;
// TODO: Parse the JSON so we can get the timeout value
this.TimedOutCTS = new (1000);
}

public string FunctionArn
Expand All @@ -253,18 +300,42 @@ public string FunctionArn
public void ReportSuccessResponse(string response)
{
LastUpdated = DateTime.Now;
this.Response = response;
this.EventStatus = IEventContainer.Status.Success;
_dataStore.RaiseStateChanged();
lock (_statusLock) {
if (this.EventStatus == IEventContainer.Status.Executing) {
this.EventStatus = IEventContainer.Status.Success;
this.Response = response;
OnSuccess?.Invoke();
_dataStore.RaiseStateChanged();
}
}
}

public void ReportErrorResponse(string errorType, string errorBody)
{
LastUpdated = DateTime.Now;
this.ErrorType = errorType;
this.ErrorResponse = errorBody;
this.EventStatus = IEventContainer.Status.Failure;
_dataStore.RaiseStateChanged();
lock(_statusLock) {
if (EventStatus == IEventContainer.Status.Queued || this.EventStatus == IEventContainer.Status.Executing) {
this.ErrorType = errorType;
this.ErrorResponse = errorBody;
this.EventStatus = IEventContainer.Status.Failure;
OnError?.Invoke();
_dataStore.RaiseStateChanged();
}
}
}

public void Cancel(string errorBody)
{
lock(_statusLock) {
if (EventStatus == IEventContainer.Status.Queued)
{
ReportErrorResponse("Throttled", errorBody);
}
else
{
ReportErrorResponse("Failed", errorBody);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void ConfigureServices(IServiceCollection services)
services.AddControllers();
services.AddRazorPages();
services.AddServerSideBlazor()
.AddHubOptions(options => options.MaximumReceiveMessageSize = null);
.AddHubOptions(options => options.MaximumReceiveMessageSize = null)
.AddHubOptions(options => options.ClientTimeoutInterval = TimeSpan.FromMinutes(10));
services.AddHttpContextAccessor();

services.AddBlazoredModal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public static async Task<TestSession> CreateSessionAsync(string host, int port)
var uriString = Utils.DetermineLaunchUrl(session.Host, session.Port, Constants.DEFAULT_HOST);
session.Client = new HttpClient()
{
BaseAddress = new Uri(uriString)
BaseAddress = new Uri(uriString),
Timeout = TimeSpan.FromMinutes(15)
};

session.Store = session.WebHost.Services.GetService<IRuntimeApiDataStore>();
Expand Down