-
-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
Copy pathLeanOptimizer.cs
447 lines (391 loc) · 17.2 KB
/
LeanOptimizer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using QuantConnect.Util;
using QuantConnect.Logging;
using QuantConnect.Configuration;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using QuantConnect.Optimizer.Objectives;
using QuantConnect.Optimizer.Parameters;
using QuantConnect.Optimizer.Strategies;
namespace QuantConnect.Optimizer
{
/// <summary>
/// Base Lean optimizer class in charge of handling an optimization job packet
/// </summary>
public abstract class LeanOptimizer : IDisposable
{
private readonly int _optimizationUpdateInterval = Config.GetInt("optimization-update-interval", 10);
private DateTime _startedAt = DateTime.UtcNow;
private DateTime _lastUpdate;
private int _failedBacktest;
private int _completedBacktest;
private volatile bool _disposed;
/// <summary>
/// The total completed backtests count
/// </summary>
protected int CompletedBacktests => _failedBacktest + _completedBacktest;
/// <summary>
/// Lock to update optimization status
/// </summary>
private object _statusLock = new object();
/// <summary>
/// The current optimization status
/// </summary>
protected OptimizationStatus Status { get; private set; } = OptimizationStatus.New;
/// <summary>
/// The optimization target
/// </summary>
protected Target OptimizationTarget { get; }
/// <summary>
/// Collection holding <see cref="ParameterSet"/> for each backtest id we are waiting to finish
/// </summary>
protected ConcurrentDictionary<string, ParameterSet> RunningParameterSetForBacktest { get; init; }
/// <summary>
/// Collection holding <see cref="ParameterSet"/> for each backtest id we are waiting to launch
/// </summary>
/// <remarks>We can't launch 1 million backtests at the same time</remarks>
protected ConcurrentQueue<ParameterSet> PendingParameterSet { get; init; }
/// <summary>
/// The optimization strategy being used
/// </summary>
protected IOptimizationStrategy Strategy { get; init; }
/// <summary>
/// The optimization packet
/// </summary>
protected OptimizationNodePacket NodePacket { get; init; }
/// <summary>
/// Indicates whether optimizer was disposed
/// </summary>
protected bool Disposed => _disposed;
/// <summary>
/// Event triggered when the optimization work ended
/// </summary>
public event EventHandler<OptimizationResult> Ended;
/// <summary>
/// Creates a new instance
/// </summary>
/// <param name="nodePacket">The optimization node packet to handle</param>
protected LeanOptimizer(OptimizationNodePacket nodePacket)
{
if (nodePacket.OptimizationParameters.IsNullOrEmpty())
{
throw new ArgumentException("Cannot start an optimization job with no parameter to optimize");
}
if (string.IsNullOrEmpty(nodePacket.Criterion?.Target))
{
throw new ArgumentException("Cannot start an optimization job with no target to optimize");
}
NodePacket = nodePacket;
OptimizationTarget = NodePacket.Criterion;
OptimizationTarget.Reached += (s, e) =>
{
// we've reached the optimization target
TriggerOnEndEvent();
};
Strategy = (IOptimizationStrategy)Activator.CreateInstance(Type.GetType(NodePacket.OptimizationStrategy));
RunningParameterSetForBacktest = new ConcurrentDictionary<string, ParameterSet>();
PendingParameterSet = new ConcurrentQueue<ParameterSet>();
Strategy.Initialize(OptimizationTarget, nodePacket.Constraints, NodePacket.OptimizationParameters, NodePacket.OptimizationStrategySettings);
Strategy.NewParameterSet += (s, parameterSet) =>
{
if (parameterSet == null)
{
// shouldn't happen
Log.Error($"Strategy.NewParameterSet({GetLogDetails()}): generated a null {nameof(ParameterSet)} instance");
return;
}
LaunchLeanForParameterSet(parameterSet);
};
}
/// <summary>
/// Starts the optimization
/// </summary>
public virtual void Start()
{
lock (RunningParameterSetForBacktest)
{
Strategy.PushNewResults(OptimizationResult.Initial);
// if after we started there are no running parameter sets means we have failed to start
if (RunningParameterSetForBacktest.Count == 0)
{
SetOptimizationStatus(OptimizationStatus.Aborted);
throw new InvalidOperationException($"LeanOptimizer.Start({GetLogDetails()}): failed to start");
}
Log.Trace($"LeanOptimizer.Start({GetLogDetails()}): start ended. Waiting on {RunningParameterSetForBacktest.Count + PendingParameterSet.Count} backtests");
}
SetOptimizationStatus(OptimizationStatus.Running);
ProcessUpdate(forceSend: true);
}
/// <summary>
/// Triggers the optimization job end event
/// </summary>
protected virtual void TriggerOnEndEvent()
{
if (_disposed)
{
return;
}
SetOptimizationStatus(OptimizationStatus.Completed);
var result = Strategy.Solution;
if (result != null)
{
var constraint = NodePacket.Constraints != null ? $"Constraints: ({string.Join(",", NodePacket.Constraints)})" : string.Empty;
Log.Trace($"LeanOptimizer.TriggerOnEndEvent({GetLogDetails()}): Optimization has ended. " +
$"Result for {OptimizationTarget}: was reached using ParameterSet: ({result.ParameterSet}) backtestId '{result.BacktestId}'. " +
$"{constraint}");
}
else
{
Log.Trace($"LeanOptimizer.TriggerOnEndEvent({GetLogDetails()}): Optimization has ended. Result was not reached");
}
// we clean up before we send an update so that the runtime stats are updated
CleanUpRunningInstance();
ProcessUpdate(forceSend: true);
Ended?.Invoke(this, result);
}
/// <summary>
/// Handles starting Lean for a given parameter set
/// </summary>
/// <param name="parameterSet">The parameter set for the backtest to run</param>
/// <param name="backtestName">The backtest name to use</param>
/// <returns>The new unique backtest id</returns>
protected abstract string RunLean(ParameterSet parameterSet, string backtestName);
/// <summary>
/// Get's a new backtest name
/// </summary>
protected virtual string GetBacktestName(ParameterSet parameterSet)
{
return "OptimizationBacktest";
}
/// <summary>
/// Handles a new backtest json result matching a requested backtest id
/// </summary>
/// <param name="jsonBacktestResult">The backtest json result</param>
/// <param name="backtestId">The associated backtest id</param>
protected virtual void NewResult(string jsonBacktestResult, string backtestId)
{
lock (RunningParameterSetForBacktest)
{
ParameterSet parameterSet;
// we take a lock so that there is no race condition with launching Lean adding the new backtest id and receiving the backtest result for that id
// before it's even in the collection 'ParameterSetForBacktest'
if (!RunningParameterSetForBacktest.TryRemove(backtestId, out parameterSet))
{
Interlocked.Increment(ref _failedBacktest);
Log.Error(
$"LeanOptimizer.NewResult({GetLogDetails()}): Optimization compute job with id '{backtestId}' was not found");
return;
}
// we got a new result if there are any pending parameterSet to run we can now trigger 1
// we do this before 'Strategy.PushNewResults' so FIFO is respected
if (PendingParameterSet.TryDequeue(out var pendingParameterSet))
{
LaunchLeanForParameterSet(pendingParameterSet);
}
var result = new OptimizationResult(null, parameterSet, backtestId);
if (string.IsNullOrEmpty(jsonBacktestResult))
{
Interlocked.Increment(ref _failedBacktest);
Log.Error(
$"LeanOptimizer.NewResult({GetLogDetails()}): Got null/empty backtest result for backtest id '{backtestId}'");
}
else
{
Interlocked.Increment(ref _completedBacktest);
result = new OptimizationResult(jsonBacktestResult, parameterSet, backtestId);
}
// always notify the strategy
Strategy.PushNewResults(result);
// strategy could of added more
if (RunningParameterSetForBacktest.Count == 0)
{
TriggerOnEndEvent();
}
else
{
ProcessUpdate();
}
}
}
/// <summary>
/// Disposes of any resources
/// </summary>
public virtual void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
CleanUpRunningInstance();
}
/// <summary>
/// Returns the current optimization status and strategy estimates
/// </summary>
public int GetCurrentEstimate()
{
return Strategy.GetTotalBacktestEstimate();
}
/// <summary>
/// Get the current runtime statistics
/// </summary>
public Dictionary<string, string> GetRuntimeStatistics()
{
var completedCount = _completedBacktest;
var totalEndedCount = completedCount + _failedBacktest;
var runtime = DateTime.UtcNow - _startedAt;
var result = new Dictionary<string, string>
{
{ "Completed", $"{completedCount}"},
{ "Failed", $"{_failedBacktest}"},
{ "Running", $"{RunningParameterSetForBacktest.Count}"},
{ "In Queue", $"{PendingParameterSet.Count}"},
{ "Average Length", $"{(totalEndedCount > 0 ? new TimeSpan(runtime.Ticks / totalEndedCount) : TimeSpan.Zero).ToString(@"hh\:mm\:ss", CultureInfo.InvariantCulture)}"},
{ "Total Runtime", $"{runtime.ToString(@"hh\:mm\:ss", CultureInfo.InvariantCulture)}" }
};
return result;
}
/// <summary>
/// Helper method to have pretty more informative logs
/// </summary>
protected string GetLogDetails()
{
if (NodePacket.UserId == 0)
{
return $"OID {NodePacket.OptimizationId}";
}
return $"UI {NodePacket.UserId} PID {NodePacket.ProjectId} OID {NodePacket.OptimizationId} S {Status}";
}
/// <summary>
/// Handles stopping Lean process
/// </summary>
/// <param name="backtestId">Specified backtest id</param>
protected abstract void AbortLean(string backtestId);
/// <summary>
/// Sends an update of the current optimization status to the user
/// </summary>
protected abstract void SendUpdate();
/// <summary>
/// Sets the current optimization status
/// </summary>
/// <param name="optimizationStatus">The new optimization status</param>
protected virtual void SetOptimizationStatus(OptimizationStatus optimizationStatus)
{
lock (_statusLock)
{
// we never come back from an aborted/ended status
if (Status != OptimizationStatus.Aborted && Status != OptimizationStatus.Completed)
{
Status = optimizationStatus;
}
}
}
/// <summary>
/// Clean up any pending or running lean instance
/// </summary>
private void CleanUpRunningInstance()
{
PendingParameterSet.Clear();
lock (RunningParameterSetForBacktest)
{
foreach (var backtestId in RunningParameterSetForBacktest.Keys)
{
ParameterSet parameterSet;
if (RunningParameterSetForBacktest.TryRemove(backtestId, out parameterSet))
{
Interlocked.Increment(ref _failedBacktest);
try
{
AbortLean(backtestId);
}
catch
{
// pass
}
}
}
}
}
/// <summary>
/// Will determine if it's right time to trigger an update call
/// </summary>
/// <param name="forceSend">True will force send, skipping interval, useful on start and end</param>
private void ProcessUpdate(bool forceSend = false)
{
if (!forceSend && Status == OptimizationStatus.New)
{
// don't send any update until we finish the Start(), will be creating a bunch of backtests don't want to send partial/multiple updates
return;
}
try
{
var now = DateTime.UtcNow;
if (forceSend || (now - _lastUpdate > TimeSpan.FromSeconds(_optimizationUpdateInterval)))
{
_lastUpdate = now;
Log.Debug($"LeanOptimizer.ProcessUpdate({GetLogDetails()}): start sending update...");
SendUpdate();
Log.Debug($"LeanOptimizer.ProcessUpdate({GetLogDetails()}): finished sending update successfully.");
}
}
catch (Exception e)
{
Log.Error(e, "Failed to send status update");
}
}
private void LaunchLeanForParameterSet(ParameterSet parameterSet)
{
if (_disposed || Status == OptimizationStatus.Completed || Status == OptimizationStatus.Aborted)
{
return;
}
lock (RunningParameterSetForBacktest)
{
if (NodePacket.MaximumConcurrentBacktests != 0 && RunningParameterSetForBacktest.Count >= NodePacket.MaximumConcurrentBacktests)
{
// we hit the limit on the concurrent backtests
PendingParameterSet.Enqueue(parameterSet);
return;
}
try
{
var backtestName = GetBacktestName(parameterSet);
var backtestId = RunLean(parameterSet, backtestName);
if (!string.IsNullOrEmpty(backtestId))
{
Log.Trace($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): launched backtest '{backtestId}' with parameters '{parameterSet}'");
RunningParameterSetForBacktest.TryAdd(backtestId, parameterSet);
}
else
{
Interlocked.Increment(ref _failedBacktest);
// always notify the strategy
Strategy.PushNewResults(new OptimizationResult(null, parameterSet, backtestId));
Log.Error($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): Initial/null optimization compute job could not be placed into the queue");
}
ProcessUpdate();
}
catch (Exception ex)
{
Log.Error($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): Error encountered while placing optimization message into the queue: {ex.Message}");
}
}
}
}
}