Get scheduler reliably running and rescheduling tasks

This commit is contained in:
teknomunk 2024-04-14 13:11:28 +00:00
parent 09f034de16
commit 27fb96afdf
3 changed files with 121 additions and 54 deletions

View file

@ -15,28 +15,39 @@ end
-- Amoritized O(1) insert/delete functional First In, First Out (FIFO) queue
local fifo = Class()
mod.fifo = fifo
function fifo.insert(node)
node.next = fifo.inbox
fifo.inbox = node.next
function fifo:insert(node)
if not node then return end
node.next = self.inbox
self.inbox = node
end
function fifo.get()
function fifo:insert_many(nodes)
while nodes do
local node = nodes
nodes = nodes.next
node.next = self.inbox
self.inbox = node.next
end
end
function fifo:get()
if not fifo.outbox then
-- reverse inbox
local iter = fifo.inbox
fifo.inbox = nil
local iter = self.inbox
self.inbox = nil
while iter do
local i = iter
iter = iter.next
i.next = fifo.outbox
fifo.outbox = i
i.next = self.outbox
self.outbox = i
end
end
local res = fifo.outbox
local res = self.outbox
if res then
fifo.outbox = res.next
self.outbox = res.next
res.next = nil
end
return res

View file

@ -12,12 +12,13 @@ local run_queues = {}
for i = 1,4 do
run_queues[i] = mod.fifo:new()
end
local tasks = 0
local time = 0
local priority_queue = mod.queue:new()
local functions = {}
local function_id_from_name = {}
local table_unpack = table.unpack
local unpack = unpack
local minetest_get_us_time = minetest.get_us_time
local queue_add_task = mod.queue.add_task
local queue_get = mod.queue.get
@ -25,11 +26,24 @@ local queue_tick = mod.queue.tick
local fifo_insert = mod.fifo.insert
local fifo_get = mod.fifo.get
function mod.add_task(time, name)
function mod.add_task(time, name, priority, args)
if priority then
if priority > 4 then priority = 4 end
if priority < 1 then priority = 1 end
end
local fid = function_id_from_name[name]
if not fid then
print("Trying to add task with unknown function "..name)
return
end
local dtime = math.floor(time * 20) + 1
local task = {
time = time,
time = dtime, -- Used by scheduler to track how long until this task is dispatched
dtime = dtime, -- Original time amount
fid = fid,
priority = priority,
args = args,
}
queue_add_task(priority_queue, task)
end
@ -41,12 +55,22 @@ function mod.register_function(name, func)
name = name,
fid = fid,
}
function_id_from_name = name
function_id_from_name[name] = fid
print("Registering "..name.." as #"..tostring(fid))
end
mod.register_function("vl_scheduler:test",function(task)
print("game time="..tostring(minetest.get_gametime()))
-- Reschedule task
task.time = 0.25 * 20
return task
end)
mod.add_task(0, "vl_scheduler:test")
minetest.register_globalstep(function(dtime)
local start_time = minetest_get_us_time()
local end_time = start_time + 50000
time = time + dtime
-- Add tasks to the run queues
@ -56,34 +80,43 @@ minetest.register_globalstep(function(dtime)
iter = iter.next
local priority = task.priority or 3
fifo_insert(run_queues[priority], task)
tasks = tasks + 1
end
local task_time = minetest_get_us_time()
--print("Took "..tostring(task_time-start_time).." us to update task list")
-- Run tasks until we run out of timeslice
if tasks > 0 then
local i = 1
while i < 4 and (minetest_get_us_time() - start_time) < 50000 do
while i < 4 and minetest_get_us_time() < end_time do
local task = fifo_get(run_queues[i])
if task then
print("Running task "..dump(task))
tasks = tasks - 1
local func = functions[task.fid]
local cancel = false
if func then
local err
cancel,err = pcall(func, task.dtime, table_unpack(task.args))
if err then
minetest.log("error","Error while running task: err")
end
--print("Running task "..dump(task)..",func="..dump(func))
local ok,ret = pcall(func.func, task, unpack(task.args or {}))
if not ok then
minetest.log("error","Error while running task "..func.name..": "..tostring(ret))
end
-- Add periodic tasks back into the queue
if task.period and not cancel then
task.time = task.period
-- If the task was returned, reschedule it
if ret == task then
task.next = nil
queue_add_task(priority_queue, task)
end
local next_task_time = minetest_get_us_time()
print(func.name.." took "..(next_task_time-task_time).." us")
task_time = next_task_time
end
else
i = i + 1
end
end
end
print("Total scheduler time: "..tostring(minetest_get_us_time() - start_time).." microseconds")
--print("priority_queue="..dump(priority_queue))
end)

View file

@ -1,4 +1,5 @@
local mod = vl_scheduler
local DEBUG = false
--[[
@ -40,6 +41,7 @@ local inner_queue_init_slot_list
function inner_queue:construct(level)
self.level = level
self.slots = 4
--self.items = {}
self.unsorted_count = 0
@ -53,7 +55,7 @@ end
inner_queue_construct = inner_queue.construct
function inner_queue:get()
local slots = 4
local slots = self.slots
local slot = 5 - slots
if not self.items then
self.items = inner_queue_init_slot_list(self)
@ -69,11 +71,13 @@ function inner_queue:get()
if next_level_get then
self.items = next_level_get.items
else
self.items = {}
self.items = inner_queue_init_slot_list(self)
end
else
self.items = inner_queue_init_slot_list(self)
end
slots = 4
end
end
self.slots = slots
return ret
@ -86,8 +90,10 @@ function inner_queue:insert_task(task)
local level = self.level
local t = task.time
--task.log = tostring(t).."(1)<- "..(task.log or "")
--print("<"..tostring(self.level).."> t="..tostring(t)..",task.time="..tostring(task.time)..",time="..tostring(time))
if DEBUG then
task.log = tostring(t).."(1)<- "..(task.log or "")
print("<"..tostring(self.level).."> t="..tostring(t)..",task.time="..tostring(task.time)..",time="..tostring(time))
end
if not (t >= 1 ) then
error("Invalid time: task="..dump(task))
end
@ -113,12 +119,18 @@ function inner_queue:insert_task(task)
end
-- Task belongs in a slot on this level
--print("t="..tostring(t)..",slot_size="..tostring(slot_size)..",slots="..tostring(slots))
local slot = math.floor((t-1) / slot_size) + 1 -- + ( slots - 4 )
if DEBUG then
print("t="..tostring(t)..",slot_size="..tostring(slot_size)..",slots="..tostring(slots))
end
local slot = math.floor((t-1) / slot_size) + 1 + ( 4 - slots )
t = (t - 1) % slot_size + 1
--print("slot="..tostring(slot)..",t="..tostring(t))
if DEBUG then
print("slot="..tostring(slot)..",t="..tostring(t)..",slots="..tostring(slots))
end
task.time = t
--task.log = tostring(t).."(2)<- "..(task.log or "")
if DEBUG then
task.log = tostring(t).."(2)<- "..(task.log or "")
end
-- Lazily initialize items
if not self.items then
@ -127,27 +139,30 @@ function inner_queue:insert_task(task)
-- Get the sublist the item belongs in
local list = self.items[slot]
if not list then
print("self="..dump(self))
end
if level == 1 then
assert(task.time <= 20)
task.next = list[t]
list[t] = task
--print("list="..dump(list))
else
--print("list="..dump(list))
inner_queue_insert_task(list, task, 0)
end
end
inner_queue_insert_task = inner_queue.insert_task
function inner_queue:add_tasks(tasks, time)
--print("inner_queue<"..tostring(self.level)..">:add_tasks()")
if DEBUG then
print("inner_queue<"..tostring(self.level)..">:add_tasks()")
end
local task = tasks
local slots = self.slots
local slot_size = self.slot_size
--print("This queue handles times 1-"..tostring(slot_size*slots))
if DEBUG then
print("This queue handles times 1-"..tostring(slot_size*slots))
end
while task do
local curr_task = task
task = task.next
@ -157,7 +172,9 @@ function inner_queue:add_tasks(tasks, time)
inner_queue_insert_task(self, curr_task)
end
--print("self="..dump(self))
if DEBUG then
print("self="..dump(self))
end
end
inner_queue_add_tasks = inner_queue.add_tasks
@ -188,7 +205,9 @@ function queue:add_task(task)
local t = task.time
task.original_time = t
t = t + self.m_tick
--print("add_task({ time="..tostring(t).." })")
if DEBUG then
print("add_task({ time="..tostring(t).." })")
end
-- Handle task in current seccond
if t <= 20 then
@ -197,6 +216,10 @@ function queue:add_task(task)
return
end
-- Update task time
t = t - 20
task.time = t
local count = self.unsorted_count
if count > 20 then
-- Push to next level
@ -225,15 +248,15 @@ function queue:tick()
if self.m_tick == 21 then
-- Push items to next level
if self.first_unsorted then
inner_queue_add_tasks(self.next_level, self.first_unsorted, 20)
inner_queue_add_tasks(self.next_level, self.first_unsorted, 0)
self.first_unsorted = nil
self.unsorted_count = 0
end
self.items = inner_queue_get(self.next_level)
self.items = inner_queue_get(self.next_level) or {}
self.m_tick = 1
end
return ret or {}
return ret
end