Skip to content

Commit

Permalink
Worker restart on function timeout (Azure#5794)
Browse files Browse the repository at this point in the history
Restart worker process(es) on function timeout for out-of-proc languages instead of shutting down host.
  • Loading branch information
yojagad authored Mar 25, 2020
1 parent bf96005 commit b2af164
Show file tree
Hide file tree
Showing 20 changed files with 303 additions and 26 deletions.
15 changes: 15 additions & 0 deletions sample/Node/HttpTrigger-Timeout/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"bindings": [
{
"type": "httpTrigger",
"name": "req",
"direction": "in",
"methods": [ "get" ]
},
{
"type": "http",
"name": "$return",
"direction": "out"
}
]
}
35 changes: 35 additions & 0 deletions sample/Node/HttpTrigger-Timeout/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
var test = require('../Shared/test');

module.exports = function (context, req) {
context.log('Node.js HTTP trigger function processed a request. Name=%s', req.query.name);

var headerValue = req.headers['test-header'];
if (headerValue) {
context.log('test-header=' + headerValue);
}

var res;
if (typeof req.query.name === 'undefined') {
res = {
status: 400,
body: "Please pass a name on the query string",
headers: {
'Content-Type': 'text/plain'
}
};
}
else {
var e = new Date().getTime() + (30 * 1000);
while (new Date().getTime() <= e) {}
res = {
status: 200,
body: test.greeting(req.query.name),
headers: {
'Content-Type': 'text/plain',
'Shared-Module': test.timestamp
}
};
}

context.done(null, res);
};
2 changes: 1 addition & 1 deletion sample/Node/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"healthCheckThreshold": 6,
"counterThreshold": 0.80
},
"functionTimeout": "00:05:00",
"functionTimeout": "00:00:10",
"logging": {
"fileLoggingMode": "always"
},
Expand Down
26 changes: 21 additions & 5 deletions src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.WebJobs.Script.Workers;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.WebHost
Expand All @@ -15,10 +16,12 @@ public class WebScriptHostExceptionHandler : IWebJobsExceptionHandler
{
private readonly IApplicationLifetime _applicationLifetime;
private readonly ILogger _logger;
private readonly IFunctionInvocationDispatcherFactory _functionInvocationDispatcherFactory;

public WebScriptHostExceptionHandler(IApplicationLifetime applicationLifetime, ILogger<WebScriptHostExceptionHandler> logger)
public WebScriptHostExceptionHandler(IApplicationLifetime applicationLifetime, ILogger<WebScriptHostExceptionHandler> logger, IFunctionInvocationDispatcherFactory functionInvocationDispatcherFactory)
{
_applicationLifetime = applicationLifetime ?? throw new ArgumentNullException(nameof(applicationLifetime));
_functionInvocationDispatcherFactory = functionInvocationDispatcherFactory;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

Expand All @@ -39,17 +42,30 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T
}
}

LogErrorAndFlush("A function timeout has occurred. Host is shutting down.", exceptionInfo.SourceException);

// We can't wait on this as it may cause a deadlock if the timeout was fired
// by a Listener that cannot stop until it has completed.
// TODO: DI (FACAVAL) The shutdown call will invoke the host stop... but we may need to do this
// explicitly in order to pass the timeout.
// Task ignoreTask = _hostManager.StopAsync();
// Give the manager and all running tasks some time to shut down gracefully.
//await Task.Delay(timeoutGracePeriod);

_applicationLifetime.StopApplication();
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher();
if (functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Initialized))
{
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException);
bool result = await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString());
if (!result)
{
_logger.LogWarning($"Restarting all language worker processes since invocation Id '{timeoutException.InstanceId}' was not found.", exceptionInfo.SourceException);
await functionInvocationDispatcher.RestartAllWorkersAsync();
}
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException);
}
else
{
LogErrorAndFlush("A function timeout has occurred. Host is shutting down.", exceptionInfo.SourceException);
_applicationLifetime.StopApplication();
}
}

public Task OnUnhandledExceptionAsync(ExceptionDispatchInfo exceptionInfo)
Expand Down
1 change: 0 additions & 1 deletion src/WebJobs.Script/ScriptHostBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public static IHostBuilder AddScriptHostCore(this IHostBuilder builder, ScriptAp

//Worker Function Invocation dispatcher
services.AddSingleton<IFunctionInvocationDispatcherFactory, FunctionInvocationDispatcherFactory>();

services.AddSingleton<IScriptJobHost>(p => p.GetRequiredService<ScriptHost>());
services.AddSingleton<IJobHost>(p => p.GetRequiredService<ScriptHost>());
services.AddSingleton<IFunctionMetadataManager, FunctionMetadataManager>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,16 @@ public Task ShutdownAsync()
{
return Task.CompletedTask;
}

public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
{
await DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); // Since there's only one channel for httpworker
return true;
}

public async Task RestartAllWorkersAsync()
{
await DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); // Since there's only one channel for httpworker
}
}
}
4 changes: 4 additions & 0 deletions src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@ public interface IFunctionInvocationDispatcher : IDisposable
Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default);

Task ShutdownAsync();

Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId);

Task RestartAllWorkersAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public async Task InvokeAsync(ScriptInvocationContext invocationContext)
}
}

internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannelsAsync()
internal async Task<IEnumerable<IRpcWorkerChannel>> GetAllWorkerChannelsAsync()
{
Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostChannelDictionary = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
List<IRpcWorkerChannel> webhostChannels = null;
Expand All @@ -264,11 +264,18 @@ internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannels
}
}
IEnumerable<IRpcWorkerChannel> workerChannels = webhostChannels == null ? _jobHostLanguageWorkerChannelManager.GetChannels() : webhostChannels.Union(_jobHostLanguageWorkerChannelManager.GetChannels());
return workerChannels;
}

internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannelsAsync()
{
IEnumerable<IRpcWorkerChannel> workerChannels = await GetAllWorkerChannelsAsync();
IEnumerable<IRpcWorkerChannel> initializedWorkers = workerChannels.Where(ch => ch.IsChannelReadyForInvocations());
if (initializedWorkers.Count() > _maxProcessCount)
{
throw new InvalidOperationException($"Number of initialized language workers exceeded:{initializedWorkers.Count()} exceeded maxProcessCount: {_maxProcessCount}");
}

return initializedWorkers;
}

Expand Down Expand Up @@ -399,5 +406,33 @@ public void Dispose()
State = FunctionInvocationDispatcherState.Disposing;
Dispose(true);
}

public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
{
// Dispose and restart errored channel with the particular invocation id
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
var channels = await GetInitializedWorkerChannelsAsync();
foreach (var channel in channels)
{
if (channel.IsExecutingInvocation(invocationId))
{
_logger.LogInformation($"Restarting channel '{channel.Id}' that is executing invocation '{invocationId}' and timed out.");
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
return true;
}
}
return false;
}

public async Task RestartAllWorkersAsync()
{
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
var channels = await GetAllWorkerChannelsAsync();
foreach (var channel in channels)
{
_logger.LogInformation($"Restarting channel '{channel.Id}' that is as part of restarting all channels.");
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
}
}
}
}
2 changes: 2 additions & 0 deletions src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ public interface IRpcWorkerChannel
Task StartWorkerProcessAsync();

Task DrainInvocationsAsync();

bool IsExecutingInvocation(string invocationId);
}
}
7 changes: 6 additions & 1 deletion src/WebJobs.Script/Workers/Rpc/RpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ internal RpcWorkerChannel(

public bool IsChannelReadyForInvocations()
{
return _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
}

public async Task StartWorkerProcessAsync()
Expand Down Expand Up @@ -500,5 +500,10 @@ public async Task DrainInvocationsAsync()
await currContext.ResultSource.Task;
}
}

public bool IsExecutingInvocation(string invocationId)
{
return _executingInvocations.ContainsKey(invocationId);
}
}
}
10 changes: 5 additions & 5 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerChannelState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ public enum RpcWorkerChannelState
/// <summary>
/// The Default state of LanguageWorkerChannel.
/// </summary>
Default = 0,
Default = 1 << 0,

/// <summary>
/// LanguageWorkerChannel is created. InvocationBuffers per function are setup
/// </summary>
InvocationBuffersInitialized = 1,
InvocationBuffersInitialized = 1 << 1,

/// <summary>
/// The LanguageWorkerChannel is created.Worker process is starting
/// The LanguageWorkerChannel is created. Worker process is starting
/// </summary>
Initializing = 2,
Initializing = 1 << 2,

/// <summary>
/// LanguageWorkerChannel is created. Worker process is Initialized. Rpc Channel is established.
/// </summary>
Initialized = 4,
Initialized = 1 << 3,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,28 @@ await Assert.ThrowsAsync<ApplicationException>(() => TestHelpers.Await(() => !(j
}
}

private async Task<TestFunctionHost> RunTimeoutExceptionTest(bool handleCancellation)
[Fact]
public async Task OnTimeoutException_OOP_HasExpectedLogs()
{
using (var host = await RunTimeoutExceptionTest(handleCancellation: false, timeoutFunctionName: "TimeoutSync", path: @"TestScripts\Node"))
{
var jobHostManager = host.WebHostServices.GetService<IScriptHostManager>();

// wait a few seconds to make sure the manager doesn't die
await Assert.ThrowsAsync<ApplicationException>(() => TestHelpers.Await(() => !(jobHostManager.State == ScriptHostState.Running),
timeout: 3000, throwWhenDebugging: true, userMessageCallback: () => "Expected host manager not to die"));

var messages = host.GetScriptHostLogMessages().Where(t => t?.FormattedMessage != null);
Assert.Contains(messages, t => t.FormattedMessage.StartsWith("A function timeout has occurred. Restarting worker process executing invocationId "));
Assert.Contains(messages, t => t.FormattedMessage.StartsWith("Restarting channel"));
Assert.Contains(messages, t => t.FormattedMessage == "Restart of language worker process(es) completed.");
}
}

private async Task<TestFunctionHost> RunTimeoutExceptionTest(bool handleCancellation, string timeoutFunctionName = "TimeoutToken", string path = @"TestScripts\CSharp")
{
TimeSpan gracePeriod = TimeSpan.FromMilliseconds(5000);
var host = CreateAndStartWebScriptHost();
var host = CreateAndStartWebScriptHost(timeoutFunctionName, path);

string scenarioName = handleCancellation ? "useToken" : "ignoreToken";

Expand All @@ -70,17 +88,17 @@ private async Task<TestFunctionHost> RunTimeoutExceptionTest(bool handleCancella

var jobHost = host.JobHostServices.GetService<IJobHost>();

await Assert.ThrowsAsync<FunctionTimeoutException>(() => jobHost.CallAsync("TimeoutToken", args));
await Assert.ThrowsAsync<FunctionTimeoutException>(() => jobHost.CallAsync(timeoutFunctionName, args));

return host;
}

private TestFunctionHost CreateAndStartWebScriptHost()
private TestFunctionHost CreateAndStartWebScriptHost(string timeoutFunctionName, string path)
{
var functions = new Collection<string> { "TimeoutToken" };
var functions = new Collection<string> { timeoutFunctionName };

return new TestFunctionHost(
@"TestScripts\CSharp",
path,
Path.Combine(TestHelpers.FunctionsTestDirectory, "Logs", Guid.NewGuid().ToString(), @"Functions"),
configureWebHostServices: b =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"bindings": [
{
"type": "manualTrigger",
"name": "inputData",
"name": "input",
"direction": "in"
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module.exports = function (context, timerInfo) {
context.log(context.bindings.inputData);
context.log(context.bindings.input);

// Run for 30 seconds before closing. Test will timeout in 3 seconds but this cleans up.
var stop = new Date().getTime() + 10000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ protected EndToEndTestFixture(string rootPath, string testId, string functionsWo

public TestEventGenerator EventGenerator { get; private set; } = new TestEventGenerator();

public string HostInstanceId => Host.JobHostServices.GetService<IOptions<ScriptJobHostOptions>>().Value.InstanceId;

public string MasterKey { get; private set; }

protected virtual ExtensionPackageReference[] GetExtensionsToInstall()
Expand Down
Loading

0 comments on commit b2af164

Please sign in to comment.