-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.txt
528 lines (409 loc) · 13.4 KB
/
queue.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
--@name Queue
--@author
--@shared
--@include lkl/middleclass_extras.txt
--@include lkl/destroyable.txt
--@include lkl/table_clone_deep.txt
if Queue then return end
require( "lkl/middleclass_extras.txt" )
require( "lkl/destroyable.txt" )
require( "lkl/table_clone_deep.txt" )
Queue = class( "Queue", Destroyable )
local allQueues = {}
local getPerfLimit
local tableInsert = table.insert
local tableRemove = table.remove
local tableRemoveByValue = table.removeByValue
local tableAdd = table.add
local tableEmpty = table.empty
----- STATIC FUNCTIONS -----
--[[
- Creates a Queue to safely process lots of things in batches over time.
- When a Queue finishes processing all of its entries, it will call the onComplete function if one was provided, and then :stop() itself.
- If the Queue is stopped before it finishes processing all of its entries, it will not call the onComplete function.
- When a Queue is created, it is initially stopped, and must be started with :start() before it will process entries.
entryFunc: (function)
- The function to call when an entry is being processed.
- Has the form function( queue, entry ) return shouldpause end
- queue: (Queue)
- The queue the entry is being processed in. Can be used for queue:stop(), etc.
- entry: (any)
- The entry being processed.
- shouldpause: (nil or boolean or number)
- If nil, the queue will continue processing entries.
- If true, the batch of entries will stop being processed until the next interval, and the current entry will REMAIN in the queue.
- If false, the batch of entries will stop being processed until the next interval, and the current entry will be REMOVED from the queue.
- If a number, the queue will pause for that many seconds, and the current entry will REMAIN in the queue.
onComplete: (optional) (function)
- The function to call when the queue is finished processing all entries.
- The queue will be stopped before this is called.
- Has the form function( queue ) return end
- queue: (Queue)
- The queue that has finished processing its entries.
entries: (optional) (table)
- The initial entries for the queue.
- If provided, must be numerically indexed.
- Each entry will be processed in order, and can be of any type.
interval: (optional) (number)
- The time in seconds between each batch of entries to process.
- If not provided, defaults to 0.1
batchSize: (optional) (number)
- The number of entries to process at once.
- A value of 0 will try to process all remaining entries in one go.
- If not provided, defaults to 0.
perfLimit: (optional) (number)
- The fraction of the CPU limit to use before breaking out of the batch early.
- If not provided, defaults to 0.9
name: (optional) (string)
- Gives a name to the Queue for easy identification.
- Only changes results of tostring() and :getName().
- Names don't have to be unique.
--]]
function Queue:initialize( entryFunc, onComplete, entries, interval, batchSize, perfLimit, name )
self:setEntryFunction( entryFunc )
self:setOnComplete( onComplete )
self:setEntries( entries )
self:setInterval( interval )
self:setBatchSize( batchSize )
self:setPerfLimit( perfLimit )
self:setName( name )
self._running = false
self:setPerfCooldown( 0.5 )
tableInsert( allQueues, self )
end
-- Returns a list of all valid queues.
function Queue.static:getAllQueues()
local out = {}
for i, queue in ipairs( allQueues ) do
out[i] = queue
end
return out
end
----- INSTANCE FUNCTIONS -----
function Queue:setEntryFunction( entryFunc )
if type( entryFunc ) ~= "function" then
error( "entryFunc must be a function." )
end
self._entryFunction = entryFunc
end
function Queue:getEntryFunction()
return self._entryFunction
end
function Queue:setOnComplete( onComplete )
if onComplete == nil then
self._onComplete = nil
return
end
if type( onComplete ) ~= "function" then
error( "onComplete must be nil or a function." )
end
self._onComplete = onComplete
end
-- Note that this will return nil if no OnComplete function has been set.
function Queue:getOnComplete()
return self._onComplete
end
-- If nil, then entries will be emptied.
-- If a table, entries will be set to a deep clone of it. The table must be numerically indexed.
function Queue:setEntries( entries )
if entries == nil then
self._entries = {}
return
end
if type( entries ) ~= "table" then
error( "entries must be nil or a table." )
end
self._entries = table.cloneDeep( entries )
end
-- Returns the exact entries table. Modifying it will modify the queue.
function Queue:getEntries()
return self._entries
end
-- Because __len and thus custom # is broken in Lua 5.1
function Queue:getLength()
return self:__len()
end
function Queue:len()
return self:__len()
end
function Queue:setInterval( interval )
if interval == nil then
self._interval = 0.1
return
end
if type( interval ) ~= "number" then
error( "interval must be nil or a number." )
end
if interval < 0 then
error( "interval must be greater than or equal to 0." )
end
self._interval = interval
end
function Queue:getInterval()
return self._interval
end
function Queue:setBatchSize( batchSize )
if batchSize == nil then
self._batchSize = 0
return
end
if type( batchSize ) ~= "number" then
error( "batchSize must be nil or a number." )
end
if batchSize < 0 then
error( "batchSize must be greater than or equal to 0." )
end
self._batchSize = batchSize
end
function Queue:getBatchSize()
return self._batchSize
end
function Queue:setPerfLimit( perfLimit )
if perfLimit == nil then
self._perfLimit = 0.9
return
end
if type( perfLimit ) ~= "number" then
error( "perfLimit must be nil or a number." )
end
if perfLimit < 0 or perfLimit > 1 then
error( "perfLimit must be between 0 and 1." )
end
self._perfLimit = perfLimit
end
function Queue:getPerfLimit()
return self._perfLimit
end
--[[
- If the queue breaks the perf limit, it will pause for this many seconds.
- If zero, the queue will not pause.
- Default duration is 0.5 seconds.
--]]
function Queue:setPerfCooldown( cooldown )
if type( cooldown ) ~= "number" then
error( "cooldown must be a number." )
end
if cooldown < 0 then
error( "cooldown must be greater than or equal to 0." )
end
self._perfCooldown = cooldown
end
function Queue:getPerfCooldown()
return self._perfCooldown
end
function Queue:setName( name )
if name == nil then
self._name = nil
return
end
if type( name ) ~= "string" then
error( "name must be nil or a string." )
end
self._name = name
end
-- Note that this may return nil if the name is not set.
function Queue:getName()
return self._name
end
function Queue:start()
self._running = true
self._nextBatchTime = timer.realtime()
end
function Queue:stop()
self._running = false
end
-- Sets the next batch proccess time to the current time + duration.
-- Does effectively nothing if the Queue is already stopped.
function Queue:pause( duration )
if type( duration ) ~= "number" then
error( "duration must be a number." )
end
if duration < 0 then
error( "duration must be greater than or equal to 0." )
end
self._nextBatchTime = timer.realtime() + duration
end
function Queue:isRunning()
return self._running
end
--[[
- Add an entry to the Queue.
entry: (any)
- The entry to add to the Queue.
index: (optional) (number)
- The index to add the entry at.
- Defaults to the end of the Queue (queue:len() + 1).
--]]
function Queue:addEntry( entry, index )
if entry == nil then
error( "entry cannot be nil." )
end
if index then
tableInsert( self._entries, index, entry )
else
tableInsert( self._entries, entry )
end
end
-- Adds an entry to the Queue and starts the Queue if it is not already running.
function Queue:addEntryAndStart( entry, index )
self:addEntry( entry, index )
if not self:isRunning() then
self:start()
end
end
-- Adds multiple entries to the Queue.
-- entries must be a sequential table.
function Queue:addEntries( entries, startIndex )
if type( entries ) ~= "table" then
error( "entries must be a table." )
end
if startIndex then
for i = #entries, 1, -1 do
tableInsert( self._entries, startIndex, entries[i] )
end
else
tableAdd( self._entries, entries )
end
end
-- Adds multiple entries to the Queue and starts the Queue if it is not already running.
function Queue:addEntriesAndStart( entries, startIndex )
self:addEntries( entries, startIndex )
if not self:isRunning() then
self:start()
end
end
--[[
- Removes an entry from the Queue.
index: (number or nil)
- The index to remove.
- Defaults to the first entry (1).
--]]
function Queue:removeEntry( index )
index = index or 1
if type( index ) ~= "number" then
error( "index must be a number." )
end
tableRemove( self._entries, index )
end
--[[
- Removes the first entry in the Queue that matches the specified value.
val: (any)
- The value to search for and remove.
--]]
function Queue:removeEntryByValue( val )
if val == nil then
error( "val cannot be nil." )
end
tableRemoveByValue( self._entries, val )
end
--[[
- Removes all entries from the Queue.
- Does not stop the Queue.
--]]
function Queue:clear()
tableEmpty( self._entries )
end
--[[
- Moves an entry to the end of the Queue.
index: (number or nil)
- The index of the entry to move.
- Defaults to the first entry (1).
--]]
function Queue:moveEntryToEnd( index )
index = index or 1
if type( index ) ~= "number" then
error( "index must be a number." )
end
local entries = self._entries
local entry = entries[index]
if entry == nil then
error( "No entry at index " .. index )
end
tableRemove( entries, index )
tableInsert( entries, entry )
end
--[[
- Force a batch to be processed, ignoring the interval and overriding the batch size, if provided.
- Will run even if the Queue is stopped or paused.
- Will set the next batch time automatically, so be careful about calling this while the batch is paused.
--]]
function Queue:processBatch( batchSize )
batchSize = batchSize or self:getBatchSize()
batchSize = batchSize == 0 and math.huge or batchSize
local now = timer.realtime()
local perfLimit = getPerfLimit( self:getPerfLimit() )
self._nextBatchTime = now + self:getInterval()
for _ = 1, batchSize do
if cpuAverage() > perfLimit then
self:pause( self:getPerfCooldown() )
return
end
local entry = self:getEntries()[1] -- Don't localize for in case :setEntries() or :Destroy() are called.
if entry == nil then
self:stop()
if self._onComplete then
self:_onComplete()
end
return
end
local shouldpause = self:_entryFunction( entry )
if shouldpause == nil then
tableRemove( self._entries, 1 )
elseif shouldpause == true then
return
elseif shouldpause == false then
tableRemove( self._entries, 1 )
return
elseif type( shouldpause ) == "number" then
self:pause( shouldpause )
return
else
error( "EntryFunction must return nil, true, false, or a number." )
end
end
end
----- META FUNCTIONS -----
function Queue:__concat( other )
return tostring( self ) .. tostring( other )
end
function Queue:__len() -- Get the number of entries left in the Queue
return #self:getEntries()
end
----- IMPLEMENTED FUNCTIONS -----
function Queue:onDestroyed()
self._entryFunction = nil
self._onComplete = nil
self._entries = nil
self._interval = nil
self._batchSize = nil
self._perfLimit = nil
self._name = nil
self._running = false
tableRemoveByValue( allQueues, self )
end
function Queue:tostringInner()
return self:getName()
end
----- PRIVATE FUNCTIONS -----
-- Account for cpu usage of other owned chips, allowing for use of cpuAverage() over the slow cpuTotalAverage() in future loop iterations.
getPerfLimit = function( frac )
local perfLimit = cpuMax() * frac
local totalPerf = cpuTotalAverage()
if totalPerf >= perfLimit then return 0 end
local otherPerf = totalPerf - cpuAverage()
perfLimit = perfLimit - otherPerf
return perfLimit
end
----- SETUP -----
hook.add( "think", "LKL_QueueCreator_ProcessQueues", function()
local now = timer.realtime()
for i = 1, #allQueues do
local queue = allQueues[i]
if queue:isRunning() then
local nextBatchTime = queue._nextBatchTime or 0
if now >= nextBatchTime then
queue:processBatch()
end
end
end
end )