Proto | efficient process scheduling library

The license is MIT-0 (No Attribution).

Description


proto is a concurrent scheduling library that uses the native coroutine library. It prioritizes speed and memory resources by implementing the feature of thread pooling, or reusing the same threads to run many processes. The library mocks the api of the task library, but also offers additional features such as process management: cancellation, chaining, awaiting, status, etc.

Originally, the main goal of this library was to be a light wrapper over the task library that implemented thread pooling for reduced calling overhead. The reduced calling overhead occurred due to the reduction of completely new threads for every call. Currently, it is what it originally sought to be as well as an alternative to the promise library implementations. Comparing the source code shows my library is fundamentally not promise-like, but it can achieve the same problems with better performance.

It is important to note this library has some warnings for debugging, but it also assumes you are using it correctly. For example, if a function expects a table as an argument then passing any table that does not fit the format of the table it expects will not work and may error.

Process


In my library, a process is defined as a table containing data to execute and manage some function to be run in a different thread than the invocation. In other words, it’s a wrapper over raw threads that allows for process management. There are two additional types of processes: parent and child. For example, some of the factory functions such as proto.all implement a parent-child system. The returned parent process will not terminate until all child processes finish.

Performance


Throughout the development of the library I have been running tests and benchmarks, but have not saved any. I will post benchmarks in the future, but at the moment I am too lazy. Feel free to try out tests yourself or improve anything.

Future Ideas


  • Two seperate modules for both ‘debug’ and ‘release’ modes. For example, you could use the ‘debug’ version when editing in studio which provides additional overhead due to more useful debugging features, and ‘release’ mode when the game is running live without the debugging overhead.


Report any issues or concerns in the comments or repo.

21 Likes

What types of cases would this apply to? I feel like listing out a lot of use cases can really show its potential to a lot of developers as well as when not to use this library. I really like the numbers for the speed it takes.

The code is also very neat and readable so I really like that as well along with comments stating what it does as an added bonus for newer developers, pretty professional.

2 Likes

Have been following this library’s progress for a long time. Glad to see it come out! Excited to see what it’s capable of in time.

1 Like

Sounds like a great idea. I’ll update the post in the next few days when I can.

**IMPORTANT**


I just discovered an issue with defer that completely breaks the system. It is an issue on my part and I will update the post as soon as possible.

The issue has been resolved. I published changes to the repo.

pranked

scheduler code
-- Scheduler
-- Validark with modifications by pobammer
-- September 11, 2020

local RunService = game:GetService("RunService")
local Signal = require(script.Signal)

local Heartbeat = RunService.Heartbeat

local Queue = {}
local CurrentLength = 0
local Connection

local Scheduler = {}

local function HeartbeatStep()
	local ClockTick = os.clock()

	repeat
		local Current = Queue[1]
		if Current == nil or Current.EndTime > ClockTick then
			break
		end

		local Done = CurrentLength == 1

		if Done then
			Queue[1] = nil
			CurrentLength = 0
			Connection = Connection:Disconnect()
		else
			local LastNode = Queue[CurrentLength]
			Queue[CurrentLength] = nil
			CurrentLength -= 1
			local TargetIndex = 1

			while true do
				local ChildIndex = 2 * TargetIndex
				if ChildIndex > CurrentLength then
					break
				end

				local MinChild = Queue[ChildIndex]
				local RightChildIndex = ChildIndex + 1

				if RightChildIndex <= CurrentLength then
					local RightChild = Queue[RightChildIndex]
					if RightChild.EndTime < MinChild.EndTime then
						ChildIndex = RightChildIndex
						MinChild = RightChild
					end
				end

				if LastNode.EndTime < MinChild.EndTime then
					break
				end

				Queue[TargetIndex] = MinChild
				TargetIndex = ChildIndex
			end

			Queue[TargetIndex] = LastNode
		end

		local Arguments = Current.Arguments
		local Function = Current.Function

		if type(Function) == "function" then
			local BindableEvent = Signal.new()

			if Arguments then
				BindableEvent:Connect(function()
					Function(table.unpack(Arguments, 2, Arguments[1]))
				end)
			else
				BindableEvent:Connect(Function)
			end

			BindableEvent:Fire(os.clock() - Current.StartTime)
			BindableEvent:Destroy()
		else
			if Arguments then
				Function:Fire(table.unpack(Arguments, 2, Arguments[1]))
			else
				Function:Fire(os.clock() - Current.StartTime)
			end
		end
	until Done
end

function Scheduler.Delay<T...>(Seconds: number?, Function: (T...) -> (), ...: T...)
	-- If seconds is nil, -INF, INF, NaN, or less than MINIMUM_DELAY, assume seconds is MINIMUM_DELAY.
	if Seconds == nil or Seconds <= 0 or Seconds == math.huge then
		Seconds = 0
	end

	local StartTime = os.clock()
	local EndTime = StartTime + Seconds
	local Length = select("#", ...)

	if Connection == nil then -- first is nil when connection is nil
		Connection = Heartbeat:Connect(HeartbeatStep)
	end

	local Node = {
		Function = Function;
		StartTime = StartTime;
		EndTime = EndTime;
		Arguments = Length > 0 and {Length + 1, ...};
	}

	local TargetIndex = CurrentLength + 1
	CurrentLength = TargetIndex

	while true do
		local ParentIndex = (TargetIndex - TargetIndex % 2) / 2
		if ParentIndex < 1 then
			break
		end

		local ParentNode = Queue[ParentIndex]
		if ParentNode.EndTime < Node.EndTime then
			break
		end

		Queue[TargetIndex] = ParentNode
		TargetIndex = ParentIndex
	end

	Queue[TargetIndex] = Node
end

function Scheduler.Defer<T...>(Function: (T...) -> (), ...: T...)
	local StartTime = os.clock()
	local EndTime = StartTime + 0.03
	local Length = select("#", ...)

	if Connection == nil then -- first is nil when connection is nil
		Connection = Heartbeat:Connect(HeartbeatStep)
	end

	local Node = {
		Function = Function;
		StartTime = StartTime;
		EndTime = EndTime;
		Arguments = Length > 0 and {Length + 1, ...};
	}

	local TargetIndex = CurrentLength + 1
	CurrentLength = TargetIndex

	while true do
		local ParentIndex = (TargetIndex - TargetIndex % 2) / 2
		if ParentIndex < 1 then
			break
		end

		local ParentNode = Queue[ParentIndex]
		if ParentNode.EndTime < Node.EndTime then
			break
		end

		Queue[TargetIndex] = ParentNode
		TargetIndex = ParentIndex
	end

	Queue[TargetIndex] = Node
end

function Scheduler.Spawn<T...>(Function: (T...) -> (), ...: T...)
	local StartTime = os.clock()
	local EndTime = StartTime
	local Length = select("#", ...)

	if Connection == nil then -- first is nil when connection is nil
		Connection = Heartbeat:Connect(HeartbeatStep)
	end

	local Node = {
		Function = Function;
		StartTime = StartTime;
		EndTime = EndTime;
		Arguments = Length > 0 and {Length + 1, ...};
	}

	local TargetIndex = CurrentLength + 1
	CurrentLength = TargetIndex

	while true do
		local ParentIndex = (TargetIndex - TargetIndex % 2) / 2
		if ParentIndex < 1 then
			break
		end

		local ParentNode = Queue[ParentIndex]
		if ParentNode.EndTime < Node.EndTime then
			break
		end

		Queue[TargetIndex] = ParentNode
		TargetIndex = ParentIndex
	end

	Queue[TargetIndex] = Node
end

function Scheduler.Wait(Seconds: number?)
	local BindableEvent = Signal.new()
	Scheduler.Delay(math.max(Seconds or 0.03, 0.029), BindableEvent)

	local DeltaTime = BindableEvent:Wait()
	BindableEvent:Destroy()
	return DeltaTime
end

return Scheduler
--!optimize 2
type Connection = {
	Disconnect: (self: Connection) -> (),
	Destroy: (self: Connection) -> (),
}

export type Signal<T...> = {
	Fire: (self: Signal<T...>, T...) -> (),
	FireDeferred: (self: Signal<T...>, T...) -> (),
	Connect: (self: Signal<T...>, fn: (T...) -> ()) -> Connection,
	Once: (self: Signal<T...>, fn: (T...) -> ()) -> Connection,
	DisconnectAll: (self: Signal<T...>) -> (),
	GetConnections: (self: Signal<T...>) -> { Connection },
	Destroy: (self: Signal<T...>) -> (),
	Wait: (self: Signal<T...>) -> T...,
}

local freeRunnerThread = nil
local function acquireRunnerThreadAndCallEventHandler(fn, ...)
	local acquiredRunnerThread = freeRunnerThread
	freeRunnerThread = nil
	fn(...)
	-- The handler finished running, this runner thread is free again.
	freeRunnerThread = acquiredRunnerThread
end

local function runEventHandlerInFreeThread(...)
	acquireRunnerThreadAndCallEventHandler(...)
	while true do
		acquireRunnerThreadAndCallEventHandler(coroutine.yield())
	end
end

-- Connection class
local Connection = {}
Connection.__index = Connection

function Connection.new(signal, fn)
	return setmetatable({
		Connected = true,
		_signal = signal,
		_fn = fn,
		_next = false,
	}, Connection)
end

function Connection:Disconnect()
	if not self.Connected then
		return
	end

	self.Connected = false
	if self._signal._handlerListHead == self then
		self._signal._handlerListHead = self._next
	else
		local prev = self._signal._handlerListHead
		while prev and prev._next ~= self do
			prev = prev._next
		end
		if prev then
			prev._next = self._next
		end
	end
end

Connection.Destroy = Connection.Disconnect

local Signal = {}
Signal.__index = Signal

function Signal.new<T...>(): Signal<T...>
	return setmetatable({
		_handlerListHead = false,
		_proxyHandler = nil,
	}, Signal)
end

function Signal.Is(obj)
	return type(obj) == "table" and getmetatable(obj) == Signal
end

function Signal:Connect(fn)
	local connection = Connection.new(self, fn)
	if self._handlerListHead then
		connection._next = self._handlerListHead
		self._handlerListHead = connection
	else
		self._handlerListHead = connection
	end

	return connection
end

function Signal:Once(fn)
	local connection
	local done = false
	connection = self:Connect(function(...)
		if done then
			return
		end

		done = true
		connection:Disconnect()
		fn(...)
	end)

	return connection
end

function Signal:GetConnections()
	local items = {}
	local item = self._handlerListHead
	while item do
		table.insert(items, item)
		item = item._next
	end

	return items
end

function Signal:DisconnectAll()
	local item = self._handlerListHead
	while item do
		item.Connected = false
		item = item._next
	end

	self._handlerListHead = false
end

function Signal:Fire(...)
	local item = self._handlerListHead
	while item do
		if item.Connected then
			if not freeRunnerThread then
				freeRunnerThread = coroutine.create(runEventHandlerInFreeThread)
			end

			task.spawn(freeRunnerThread, item._fn, ...)
		end

		item = item._next
	end
end

function Signal:Wait()
	local waitingCoroutine = coroutine.running()
	local connection
	local done = false
	connection = self:Connect(function(...)
		if done then
			return
		end

		done = true
		connection:Disconnect()
		task.spawn(waitingCoroutine, ...)
	end)

	return coroutine.yield()
end

function Signal:Destroy()
	self:DisconnectAll()
	local proxyHandler = rawget(self, "_proxyHandler")
	if proxyHandler then
		proxyHandler:Disconnect()
	end
end

return Signal

Would you mind also showing the benchmark code you ran to get those results?

same as yours with scheduler.spawn for the function

Your spawn appers to resume on the next heartbeat which innacurate to the task library which resumes immediately. My library models after the task library.

For example, this is expected behavior:

image

But your library produces this result:

image

Also it is a bit unfair to compare the efficiency of your spawn with my normal spawn as yours does not support process management. However, it is valid to compare it with my “fast” spawn, but the results of those are essentially the same, for example: (same benchmark with yours added)

image

Your defer also seems a bit inaccurate as the task.defer will resume at the next resumption point. A common misconception people have is that it will resume on the next heartbeat, which it could since heartbeat is a resumption point, but there are other resumption points within a frame.

Here is the benchmark for defer:

image

And here is the benchmark for delay with the seconds argument set to 0:

image

I will have to look more at your implementation to make any further comments.

1 Like

yeah it was made before the task library existed, so it’s definitely not going to reproduce the task library at all

also you got some impressive knowledge of the task library

1 Like

Another thing I feel like is worthy of mentioning is your the benchmarks we did with just the overhead of calling the spawn function are deceiving. The way you are scheduling it causes more work to be done at a later time which is not being measured in those results. Here is an example using the microprofiler to show my library is actually generally faster if we also account for the work done later in your library for spawn. My library doesn’t have to do any later work for spawn.

Source Code
local N = 10
local fn = function()
	debug.profilebegin("EXEC")
	debug.profileend()
end

while (true) do
	
	debug.profilebegin("PROTO")
	for i = 1, N do
		debug.profilebegin("CALL")
		proto.spawn(fn)
		debug.profileend()
	end
	debug.profileend()
	
	
	debug.profilebegin("SCHEDULER")
	for i = 1, N do
		debug.profilebegin("CALL")
		scheduler.Spawn(fn)
		debug.profileend()
	end
	debug.profileend()
	
	task.wait()
end

I wrapped your heartbeat step function in a debug profile thing also.

I do later work for my defer and delay functions. Here are the same microprofiler tests, but comparing the defer and delay between our two libraries. The “PROTO-UPDATE” is wrapping my internal update loop for executing deferred and delayed processes.

proto.defer vs scheduler.Defer

proto.fdefer vs scheduler.Defer

proto.delay(0, fn) vs scheduler.Delay(0, fn)

proto.fdelay(0, fn) vs scheduler.Delay(0, fn)

Post Update


I updated the structure of the post and added some extra categories. I mainly added some benchmark tests using my own tests providing the source and results of those tests under the Performance category. Please let me know if you see any issues with the source or results of the tests. I also added a little information to the GitHub repo README.

And finally, I made some small changes to the source code but later I will probably add more useful comments to it.

I apologize for the messy release. This happened at the same time as an unexpected event in my life which is why documentation will take a bit longer to be posted. I plan to only have documentation on this post, in the README, and in the source code itself for now. In the future, I will dedicate a unique API page for this library on the domain of a website for a brand I am building.

Let me know if you have any comments, concerns, or ideas. Thanks!

what proto.fdefer does and what the different compared wirh proto.defer?

Defer will schedule your process to execute at the next resumption point which can happen multiple times in a single frame. It behaves almost exactly like the task.defer equivalent except under my system it implements thread pooling to improve memory usage and speed.

proto.defer return value is a “process” table reference which can be used with the process management functions such as await or cancel.

proto.fdefer is a generally faster function because less instructions running in the same environment is going to run faster. It returns nothing because it lacks the extra setup and instructions that come with supporting process management. The f stands for fast because it’s only goal is to schedule and execute the data you pass as quick as possible.

The same idea applies to fspawn and fdelay although they will schedule differently. Spawn resumes immediately and Delay resumes on the next heartbeat after the amount of seconds passed.

I use this for the clearing of sounds and for camera recoil. I think its useful when, for example, the player is firing every 0.04 seconds, and a new thread is made each time for resetting recoil or something else.

1 Like

I haven’t done much for this library publicly, but I have been working on it on my own every now and then. At the moment I don’t have much time with uni finals coming up, but I’ve been working on some slight improved optimizations and in general structuring my code better. In the next release of the library I will try to attach more useful information, explanation, and documentation.

Here is a sneak peak of the results I am getting, comparing the old library with my new one.

BTW: I use the term “fast” to describe least amount of overhead between calling the function and achieving the effect of the function. For example, the overhead of spawning a thread is the amount of time between calling the function to invoke the operation to the new thread existing and the next operation ready to be run.

Benchmark Code
--!optimize 2
local proto_old = require(...)
local proto_new = require(...)

local fn = function()end
local N = 100

return {
	ParameterGenerator = function()
		return
	end,

	Functions = {
		["proto-new"] = function(Profiler)
			for i = 1, N do
				-- new call here
			end
		end,

		["proto-old"] = function(Profiler)
			for i = 1, N do
				-- proto_old call here
			end
		end,
		
		["tasklib"] = function()
			for i = 1, N do
				-- task library call here
			end
		end,
	},
}


Regular Spawn

The spawn functions in my library supports full process management: cancellation, awaiting, chaining.
image


Faster Spawn

The fast spawn functions in my library supports some process management: cancellation.
image


Fastest Spawn

The fastest spawn functions in my library have no process management.
image