proto is a concurrent scheduling library that uses the native coroutine library. It prioritizes speed and memory resources by implementing the feature of thread pooling, or reusing the same threads to run many processes. The library mocks the api of the task library, but also offers additional features such as process management: cancellation, chaining, awaiting, status, etc.
Originally, the main goal of this library was to be a light wrapper over the task library that implemented thread pooling for reduced calling overhead. The reduced calling overhead occurred due to the reduction of completely new threads for every call. Currently, it is what it originally sought to be as well as an alternative to the promise library implementations. Comparing the source code shows my library is fundamentally not promise-like, but it can achieve the same problems with better performance.
It is important to note this library has some warnings for debugging, but it also assumes you are using it correctly. For example, if a function expects a table as an argument then passing any table that does not fit the format of the table it expects will not work and may error.
Process
In my library, a process is defined as a table containing data to execute and manage some function to be run in a different thread than the invocation. In other words, it’s a wrapper over raw threads that allows for process management. There are two additional types of processes: parent and child. For example, some of the factory functions such as proto.all implement a parent-child system. The returned parent process will not terminate until all child processes finish.
Performance
Throughout the development of the library I have been running tests and benchmarks, but have not saved any. I will post benchmarks in the future, but at the moment I am too lazy. Feel free to try out tests yourself or improve anything.
Future Ideas
Two seperate modules for both ‘debug’ and ‘release’ modes. For example, you could use the ‘debug’ version when editing in studio which provides additional overhead due to more useful debugging features, and ‘release’ mode when the game is running live without the debugging overhead.
Report any issues or concerns in the comments or repo.
What types of cases would this apply to? I feel like listing out a lot of use cases can really show its potential to a lot of developers as well as when not to use this library. I really like the numbers for the speed it takes.
The code is also very neat and readable so I really like that as well along with comments stating what it does as an added bonus for newer developers, pretty professional.
-- Scheduler
-- Validark with modifications by pobammer
-- September 11, 2020
local RunService = game:GetService("RunService")
local Signal = require(script.Signal)
local Heartbeat = RunService.Heartbeat
local Queue = {}
local CurrentLength = 0
local Connection
local Scheduler = {}
local function HeartbeatStep()
local ClockTick = os.clock()
repeat
local Current = Queue[1]
if Current == nil or Current.EndTime > ClockTick then
break
end
local Done = CurrentLength == 1
if Done then
Queue[1] = nil
CurrentLength = 0
Connection = Connection:Disconnect()
else
local LastNode = Queue[CurrentLength]
Queue[CurrentLength] = nil
CurrentLength -= 1
local TargetIndex = 1
while true do
local ChildIndex = 2 * TargetIndex
if ChildIndex > CurrentLength then
break
end
local MinChild = Queue[ChildIndex]
local RightChildIndex = ChildIndex + 1
if RightChildIndex <= CurrentLength then
local RightChild = Queue[RightChildIndex]
if RightChild.EndTime < MinChild.EndTime then
ChildIndex = RightChildIndex
MinChild = RightChild
end
end
if LastNode.EndTime < MinChild.EndTime then
break
end
Queue[TargetIndex] = MinChild
TargetIndex = ChildIndex
end
Queue[TargetIndex] = LastNode
end
local Arguments = Current.Arguments
local Function = Current.Function
if type(Function) == "function" then
local BindableEvent = Signal.new()
if Arguments then
BindableEvent:Connect(function()
Function(table.unpack(Arguments, 2, Arguments[1]))
end)
else
BindableEvent:Connect(Function)
end
BindableEvent:Fire(os.clock() - Current.StartTime)
BindableEvent:Destroy()
else
if Arguments then
Function:Fire(table.unpack(Arguments, 2, Arguments[1]))
else
Function:Fire(os.clock() - Current.StartTime)
end
end
until Done
end
function Scheduler.Delay<T...>(Seconds: number?, Function: (T...) -> (), ...: T...)
-- If seconds is nil, -INF, INF, NaN, or less than MINIMUM_DELAY, assume seconds is MINIMUM_DELAY.
if Seconds == nil or Seconds <= 0 or Seconds == math.huge then
Seconds = 0
end
local StartTime = os.clock()
local EndTime = StartTime + Seconds
local Length = select("#", ...)
if Connection == nil then -- first is nil when connection is nil
Connection = Heartbeat:Connect(HeartbeatStep)
end
local Node = {
Function = Function;
StartTime = StartTime;
EndTime = EndTime;
Arguments = Length > 0 and {Length + 1, ...};
}
local TargetIndex = CurrentLength + 1
CurrentLength = TargetIndex
while true do
local ParentIndex = (TargetIndex - TargetIndex % 2) / 2
if ParentIndex < 1 then
break
end
local ParentNode = Queue[ParentIndex]
if ParentNode.EndTime < Node.EndTime then
break
end
Queue[TargetIndex] = ParentNode
TargetIndex = ParentIndex
end
Queue[TargetIndex] = Node
end
function Scheduler.Defer<T...>(Function: (T...) -> (), ...: T...)
local StartTime = os.clock()
local EndTime = StartTime + 0.03
local Length = select("#", ...)
if Connection == nil then -- first is nil when connection is nil
Connection = Heartbeat:Connect(HeartbeatStep)
end
local Node = {
Function = Function;
StartTime = StartTime;
EndTime = EndTime;
Arguments = Length > 0 and {Length + 1, ...};
}
local TargetIndex = CurrentLength + 1
CurrentLength = TargetIndex
while true do
local ParentIndex = (TargetIndex - TargetIndex % 2) / 2
if ParentIndex < 1 then
break
end
local ParentNode = Queue[ParentIndex]
if ParentNode.EndTime < Node.EndTime then
break
end
Queue[TargetIndex] = ParentNode
TargetIndex = ParentIndex
end
Queue[TargetIndex] = Node
end
function Scheduler.Spawn<T...>(Function: (T...) -> (), ...: T...)
local StartTime = os.clock()
local EndTime = StartTime
local Length = select("#", ...)
if Connection == nil then -- first is nil when connection is nil
Connection = Heartbeat:Connect(HeartbeatStep)
end
local Node = {
Function = Function;
StartTime = StartTime;
EndTime = EndTime;
Arguments = Length > 0 and {Length + 1, ...};
}
local TargetIndex = CurrentLength + 1
CurrentLength = TargetIndex
while true do
local ParentIndex = (TargetIndex - TargetIndex % 2) / 2
if ParentIndex < 1 then
break
end
local ParentNode = Queue[ParentIndex]
if ParentNode.EndTime < Node.EndTime then
break
end
Queue[TargetIndex] = ParentNode
TargetIndex = ParentIndex
end
Queue[TargetIndex] = Node
end
function Scheduler.Wait(Seconds: number?)
local BindableEvent = Signal.new()
Scheduler.Delay(math.max(Seconds or 0.03, 0.029), BindableEvent)
local DeltaTime = BindableEvent:Wait()
BindableEvent:Destroy()
return DeltaTime
end
return Scheduler
--!optimize 2
type Connection = {
Disconnect: (self: Connection) -> (),
Destroy: (self: Connection) -> (),
}
export type Signal<T...> = {
Fire: (self: Signal<T...>, T...) -> (),
FireDeferred: (self: Signal<T...>, T...) -> (),
Connect: (self: Signal<T...>, fn: (T...) -> ()) -> Connection,
Once: (self: Signal<T...>, fn: (T...) -> ()) -> Connection,
DisconnectAll: (self: Signal<T...>) -> (),
GetConnections: (self: Signal<T...>) -> { Connection },
Destroy: (self: Signal<T...>) -> (),
Wait: (self: Signal<T...>) -> T...,
}
local freeRunnerThread = nil
local function acquireRunnerThreadAndCallEventHandler(fn, ...)
local acquiredRunnerThread = freeRunnerThread
freeRunnerThread = nil
fn(...)
-- The handler finished running, this runner thread is free again.
freeRunnerThread = acquiredRunnerThread
end
local function runEventHandlerInFreeThread(...)
acquireRunnerThreadAndCallEventHandler(...)
while true do
acquireRunnerThreadAndCallEventHandler(coroutine.yield())
end
end
-- Connection class
local Connection = {}
Connection.__index = Connection
function Connection.new(signal, fn)
return setmetatable({
Connected = true,
_signal = signal,
_fn = fn,
_next = false,
}, Connection)
end
function Connection:Disconnect()
if not self.Connected then
return
end
self.Connected = false
if self._signal._handlerListHead == self then
self._signal._handlerListHead = self._next
else
local prev = self._signal._handlerListHead
while prev and prev._next ~= self do
prev = prev._next
end
if prev then
prev._next = self._next
end
end
end
Connection.Destroy = Connection.Disconnect
local Signal = {}
Signal.__index = Signal
function Signal.new<T...>(): Signal<T...>
return setmetatable({
_handlerListHead = false,
_proxyHandler = nil,
}, Signal)
end
function Signal.Is(obj)
return type(obj) == "table" and getmetatable(obj) == Signal
end
function Signal:Connect(fn)
local connection = Connection.new(self, fn)
if self._handlerListHead then
connection._next = self._handlerListHead
self._handlerListHead = connection
else
self._handlerListHead = connection
end
return connection
end
function Signal:Once(fn)
local connection
local done = false
connection = self:Connect(function(...)
if done then
return
end
done = true
connection:Disconnect()
fn(...)
end)
return connection
end
function Signal:GetConnections()
local items = {}
local item = self._handlerListHead
while item do
table.insert(items, item)
item = item._next
end
return items
end
function Signal:DisconnectAll()
local item = self._handlerListHead
while item do
item.Connected = false
item = item._next
end
self._handlerListHead = false
end
function Signal:Fire(...)
local item = self._handlerListHead
while item do
if item.Connected then
if not freeRunnerThread then
freeRunnerThread = coroutine.create(runEventHandlerInFreeThread)
end
task.spawn(freeRunnerThread, item._fn, ...)
end
item = item._next
end
end
function Signal:Wait()
local waitingCoroutine = coroutine.running()
local connection
local done = false
connection = self:Connect(function(...)
if done then
return
end
done = true
connection:Disconnect()
task.spawn(waitingCoroutine, ...)
end)
return coroutine.yield()
end
function Signal:Destroy()
self:DisconnectAll()
local proxyHandler = rawget(self, "_proxyHandler")
if proxyHandler then
proxyHandler:Disconnect()
end
end
return Signal
Your spawn appers to resume on the next heartbeat which innacurate to the task library which resumes immediately. My library models after the task library.
For example, this is expected behavior:
But your library produces this result:
Also it is a bit unfair to compare the efficiency of your spawn with my normal spawn as yours does not support process management. However, it is valid to compare it with my “fast” spawn, but the results of those are essentially the same, for example: (same benchmark with yours added)
Your defer also seems a bit inaccurate as the task.defer will resume at the next resumption point. A common misconception people have is that it will resume on the next heartbeat, which it could since heartbeat is a resumption point, but there are other resumption points within a frame.
Here is the benchmark for defer:
And here is the benchmark for delay with the seconds argument set to 0:
I will have to look more at your implementation to make any further comments.
Another thing I feel like is worthy of mentioning is your the benchmarks we did with just the overhead of calling the spawn function are deceiving. The way you are scheduling it causes more work to be done at a later time which is not being measured in those results. Here is an example using the microprofiler to show my library is actually generally faster if we also account for the work done later in your library for spawn. My library doesn’t have to do any later work for spawn.
local N = 10
local fn = function()
debug.profilebegin("EXEC")
debug.profileend()
end
while (true) do
debug.profilebegin("PROTO")
for i = 1, N do
debug.profilebegin("CALL")
proto.spawn(fn)
debug.profileend()
end
debug.profileend()
debug.profilebegin("SCHEDULER")
for i = 1, N do
debug.profilebegin("CALL")
scheduler.Spawn(fn)
debug.profileend()
end
debug.profileend()
task.wait()
end
I wrapped your heartbeat step function in a debug profile thing also.
I do later work for my defer and delay functions. Here are the same microprofiler tests, but comparing the defer and delay between our two libraries. The “PROTO-UPDATE” is wrapping my internal update loop for executing deferred and delayed processes.
I updated the structure of the post and added some extra categories. I mainly added some benchmark tests using my own tests providing the source and results of those tests under the Performance category. Please let me know if you see any issues with the source or results of the tests. I also added a little information to the GitHub repo README.
And finally, I made some small changes to the source code but later I will probably add more useful comments to it.
I apologize for the messy release. This happened at the same time as an unexpected event in my life which is why documentation will take a bit longer to be posted. I plan to only have documentation on this post, in the README, and in the source code itself for now. In the future, I will dedicate a unique API page for this library on the domain of a website for a brand I am building.
Let me know if you have any comments, concerns, or ideas. Thanks!
Defer will schedule your process to execute at the next resumption point which can happen multiple times in a single frame. It behaves almost exactly like the task.defer equivalent except under my system it implements thread pooling to improve memory usage and speed.
proto.defer return value is a “process” table reference which can be used with the process management functions such as await or cancel.
proto.fdefer is a generally faster function because less instructions running in the same environment is going to run faster. It returns nothing because it lacks the extra setup and instructions that come with supporting process management. The f stands for fast because it’s only goal is to schedule and execute the data you pass as quick as possible.
The same idea applies to fspawn and fdelay although they will schedule differently. Spawn resumes immediately and Delay resumes on the next heartbeat after the amount of seconds passed.
I use this for the clearing of sounds and for camera recoil. I think its useful when, for example, the player is firing every 0.04 seconds, and a new thread is made each time for resetting recoil or something else.
I haven’t done much for this library publicly, but I have been working on it on my own every now and then. At the moment I don’t have much time with uni finals coming up, but I’ve been working on some slight improved optimizations and in general structuring my code better. In the next release of the library I will try to attach more useful information, explanation, and documentation.
Here is a sneak peak of the results I am getting, comparing the old library with my new one.
BTW: I use the term “fast” to describe least amount of overhead between calling the function and achieving the effect of the function. For example, the overhead of spawning a thread is the amount of time between calling the function to invoke the operation to the new thread existing and the next operation ready to be run.
Benchmark Code
--!optimize 2
local proto_old = require(...)
local proto_new = require(...)
local fn = function()end
local N = 100
return {
ParameterGenerator = function()
return
end,
Functions = {
["proto-new"] = function(Profiler)
for i = 1, N do
-- new call here
end
end,
["proto-old"] = function(Profiler)
for i = 1, N do
-- proto_old call here
end
end,
["tasklib"] = function()
for i = 1, N do
-- task library call here
end
end,
},
}
Regular Spawn
The spawn functions in my library supports full process management: cancellation, awaiting, chaining.
Faster Spawn
The fast spawn functions in my library supports some process management: cancellation.
Fastest Spawn
The fastest spawn functions in my library have no process management.