From 27fb96afdf7a8c762cf6b1ed87b23ddc51238147 Mon Sep 17 00:00:00 2001 From: teknomunk Date: Sun, 14 Apr 2024 13:11:28 +0000 Subject: [PATCH] Get scheduler reliably running and rescheduling tasks --- mods/CORE/vl_scheduler/fifo.lua | 31 ++++++++---- mods/CORE/vl_scheduler/init.lua | 81 ++++++++++++++++++++++---------- mods/CORE/vl_scheduler/queue.lua | 63 +++++++++++++++++-------- 3 files changed, 121 insertions(+), 54 deletions(-) diff --git a/mods/CORE/vl_scheduler/fifo.lua b/mods/CORE/vl_scheduler/fifo.lua index 92cfabb3c..db864c69c 100644 --- a/mods/CORE/vl_scheduler/fifo.lua +++ b/mods/CORE/vl_scheduler/fifo.lua @@ -15,28 +15,39 @@ end -- Amoritized O(1) insert/delete functional First In, First Out (FIFO) queue local fifo = Class() mod.fifo = fifo -function fifo.insert(node) - node.next = fifo.inbox - fifo.inbox = node.next +function fifo:insert(node) + if not node then return end + + node.next = self.inbox + self.inbox = node end -function fifo.get() +function fifo:insert_many(nodes) + while nodes do + local node = nodes + nodes = nodes.next + + node.next = self.inbox + self.inbox = node.next + end +end +function fifo:get() if not fifo.outbox then -- reverse inbox - local iter = fifo.inbox - fifo.inbox = nil + local iter = self.inbox + self.inbox = nil while iter do local i = iter iter = iter.next - i.next = fifo.outbox - fifo.outbox = i + i.next = self.outbox + self.outbox = i end end - local res = fifo.outbox + local res = self.outbox if res then - fifo.outbox = res.next + self.outbox = res.next res.next = nil end return res diff --git a/mods/CORE/vl_scheduler/init.lua b/mods/CORE/vl_scheduler/init.lua index ff7a2b830..e9cd56fcf 100644 --- a/mods/CORE/vl_scheduler/init.lua +++ b/mods/CORE/vl_scheduler/init.lua @@ -12,12 +12,13 @@ local run_queues = {} for i = 1,4 do run_queues[i] = mod.fifo:new() end +local tasks = 0 local time = 0 local priority_queue = mod.queue:new() local functions = {} local function_id_from_name = {} -local table_unpack = table.unpack +local unpack = unpack local minetest_get_us_time = minetest.get_us_time local queue_add_task = mod.queue.add_task local queue_get = mod.queue.get @@ -25,11 +26,24 @@ local queue_tick = mod.queue.tick local fifo_insert = mod.fifo.insert local fifo_get = mod.fifo.get -function mod.add_task(time, name) +function mod.add_task(time, name, priority, args) + if priority then + if priority > 4 then priority = 4 end + if priority < 1 then priority = 1 end + end + local fid = function_id_from_name[name] + if not fid then + print("Trying to add task with unknown function "..name) + return + end + local dtime = math.floor(time * 20) + 1 local task = { - time = time, + time = dtime, -- Used by scheduler to track how long until this task is dispatched + dtime = dtime, -- Original time amount fid = fid, + priority = priority, + args = args, } queue_add_task(priority_queue, task) end @@ -41,12 +55,22 @@ function mod.register_function(name, func) name = name, fid = fid, } - function_id_from_name = name + function_id_from_name[name] = fid print("Registering "..name.." as #"..tostring(fid)) end +mod.register_function("vl_scheduler:test",function(task) + print("game time="..tostring(minetest.get_gametime())) + + -- Reschedule task + task.time = 0.25 * 20 + return task +end) +mod.add_task(0, "vl_scheduler:test") + minetest.register_globalstep(function(dtime) local start_time = minetest_get_us_time() + local end_time = start_time + 50000 time = time + dtime -- Add tasks to the run queues @@ -56,34 +80,43 @@ minetest.register_globalstep(function(dtime) iter = iter.next local priority = task.priority or 3 + fifo_insert(run_queues[priority], task) + tasks = tasks + 1 end + local task_time = minetest_get_us_time() + --print("Took "..tostring(task_time-start_time).." us to update task list") -- Run tasks until we run out of timeslice - local i = 1 - while i < 4 and (minetest_get_us_time() - start_time) < 50000 do - local task = fifo_get(run_queues[i]) - if task then - print("Running task "..dump(task)) - local func = functions[task.fid] - local cancel = false - if func then - local err - cancel,err = pcall(func, task.dtime, table_unpack(task.args)) - if err then - minetest.log("error","Error while running task: err") - end - end + if tasks > 0 then + local i = 1 + while i < 4 and minetest_get_us_time() < end_time do + local task = fifo_get(run_queues[i]) + if task then + tasks = tasks - 1 + local func = functions[task.fid] + if func then + --print("Running task "..dump(task)..",func="..dump(func)) + local ok,ret = pcall(func.func, task, unpack(task.args or {})) + if not ok then + minetest.log("error","Error while running task "..func.name..": "..tostring(ret)) + end - -- Add periodic tasks back into the queue - if task.period and not cancel then - task.time = task.period - queue_add_task(priority_queue, task) + -- If the task was returned, reschedule it + if ret == task then + task.next = nil + queue_add_task(priority_queue, task) + end + local next_task_time = minetest_get_us_time() + print(func.name.." took "..(next_task_time-task_time).." us") + task_time = next_task_time + end + else + i = i + 1 end - else - i = i + 1 end end print("Total scheduler time: "..tostring(minetest_get_us_time() - start_time).." microseconds") + --print("priority_queue="..dump(priority_queue)) end) diff --git a/mods/CORE/vl_scheduler/queue.lua b/mods/CORE/vl_scheduler/queue.lua index d7a4dd2ad..d981e8754 100644 --- a/mods/CORE/vl_scheduler/queue.lua +++ b/mods/CORE/vl_scheduler/queue.lua @@ -1,4 +1,5 @@ local mod = vl_scheduler +local DEBUG = false --[[ @@ -40,6 +41,7 @@ local inner_queue_init_slot_list function inner_queue:construct(level) self.level = level + self.slots = 4 --self.items = {} self.unsorted_count = 0 @@ -53,7 +55,7 @@ end inner_queue_construct = inner_queue.construct function inner_queue:get() - local slots = 4 + local slots = self.slots local slot = 5 - slots if not self.items then self.items = inner_queue_init_slot_list(self) @@ -69,10 +71,12 @@ function inner_queue:get() if next_level_get then self.items = next_level_get.items else - self.items = {} + self.items = inner_queue_init_slot_list(self) end - slots = 4 + else + self.items = inner_queue_init_slot_list(self) end + slots = 4 end self.slots = slots @@ -86,8 +90,10 @@ function inner_queue:insert_task(task) local level = self.level local t = task.time - --task.log = tostring(t).."(1)<- "..(task.log or "") - --print("<"..tostring(self.level).."> t="..tostring(t)..",task.time="..tostring(task.time)..",time="..tostring(time)) + if DEBUG then + task.log = tostring(t).."(1)<- "..(task.log or "") + print("<"..tostring(self.level).."> t="..tostring(t)..",task.time="..tostring(task.time)..",time="..tostring(time)) + end if not (t >= 1 ) then error("Invalid time: task="..dump(task)) end @@ -113,12 +119,18 @@ function inner_queue:insert_task(task) end -- Task belongs in a slot on this level - --print("t="..tostring(t)..",slot_size="..tostring(slot_size)..",slots="..tostring(slots)) - local slot = math.floor((t-1) / slot_size) + 1 -- + ( slots - 4 ) + if DEBUG then + print("t="..tostring(t)..",slot_size="..tostring(slot_size)..",slots="..tostring(slots)) + end + local slot = math.floor((t-1) / slot_size) + 1 + ( 4 - slots ) t = (t - 1) % slot_size + 1 - --print("slot="..tostring(slot)..",t="..tostring(t)) + if DEBUG then + print("slot="..tostring(slot)..",t="..tostring(t)..",slots="..tostring(slots)) + end task.time = t - --task.log = tostring(t).."(2)<- "..(task.log or "") + if DEBUG then + task.log = tostring(t).."(2)<- "..(task.log or "") + end -- Lazily initialize items if not self.items then @@ -127,27 +139,30 @@ function inner_queue:insert_task(task) -- Get the sublist the item belongs in local list = self.items[slot] - + if not list then + print("self="..dump(self)) + end if level == 1 then assert(task.time <= 20) task.next = list[t] list[t] = task - - --print("list="..dump(list)) else - --print("list="..dump(list)) inner_queue_insert_task(list, task, 0) end end inner_queue_insert_task = inner_queue.insert_task function inner_queue:add_tasks(tasks, time) - --print("inner_queue<"..tostring(self.level)..">:add_tasks()") + if DEBUG then + print("inner_queue<"..tostring(self.level)..">:add_tasks()") + end local task = tasks local slots = self.slots local slot_size = self.slot_size - --print("This queue handles times 1-"..tostring(slot_size*slots)) + if DEBUG then + print("This queue handles times 1-"..tostring(slot_size*slots)) + end while task do local curr_task = task task = task.next @@ -157,7 +172,9 @@ function inner_queue:add_tasks(tasks, time) inner_queue_insert_task(self, curr_task) end - --print("self="..dump(self)) + if DEBUG then + print("self="..dump(self)) + end end inner_queue_add_tasks = inner_queue.add_tasks @@ -188,7 +205,9 @@ function queue:add_task(task) local t = task.time task.original_time = t t = t + self.m_tick - --print("add_task({ time="..tostring(t).." })") + if DEBUG then + print("add_task({ time="..tostring(t).." })") + end -- Handle task in current seccond if t <= 20 then @@ -197,6 +216,10 @@ function queue:add_task(task) return end + -- Update task time + t = t - 20 + task.time = t + local count = self.unsorted_count if count > 20 then -- Push to next level @@ -225,15 +248,15 @@ function queue:tick() if self.m_tick == 21 then -- Push items to next level if self.first_unsorted then - inner_queue_add_tasks(self.next_level, self.first_unsorted, 20) + inner_queue_add_tasks(self.next_level, self.first_unsorted, 0) self.first_unsorted = nil self.unsorted_count = 0 end - self.items = inner_queue_get(self.next_level) + self.items = inner_queue_get(self.next_level) or {} self.m_tick = 1 end - return ret or {} + return ret end