I am trying to implement a generic Lock class that can protect concurrent access to variables. This is to prevent the possibility of race conditions in my code like this:
-- I want to protect concurrent access to this
local data = {key=10}
coroutine.wrap(function()
while wait() do
local x = data["key"]
wait()
data["key"] = nil
wait()
data["key"] = x
end
end)()
while wait() do
local x = data["key"]
wait()
if (data["key"] ~= x) then
print("Race condition")
end
end
This code will print “Race condition” continually which indicates that the lua thread scheduler switches between the coroutine loop and the second loop at the comment labelled A.
Technically, this is not a race condition since lua is single-threaded, but the heart of what I want is a class that I could use to protect access to the data table for blocks of code with a guarantee that nothing else can access the data table even if the lua thread scheduler switches in the middle of the block. This would change the above code to:
-- lock to protect access to data
local lock = Lock.new()
-- I want to protect concurrent access to this
local data = {key=10}
coroutine.wrap(function()
while wait() do
lock:acquire() -- this will busy wait until the lock is free
local x = data["key"]
wait()
data["key"] = nil
wait()
data["key"] = x
lock:release()
end
end)()
while wait() do
lock:acquire() -- this will busy wait until the lock is free
local x = data["key"]
wait()
if (data["key"] ~= x) then
print("Race condition")
end
lock:release()
end
My issue is in the implementation of the acquire method of the lock. To ensure the correctness of the lock, acquire has to guarantee that only one thread can be granted access at a time. I know that lua is single threaded and that the scheduler should not switch threads unless the thread yields, but I still don’t know if the following implementation is correct. Could the Heartbeat:Wait() in acquire lead to a scenario where two threads using acquire() are both granted the lock? Does Heartbeat.Wait() yield the current thread? Where is this documented? Thanks.
Lock code:
local RunService = game:GetService("RunService")
local Lock;
do
Lock = setmetatable({}, {})
Lock.__index = Lock
function Lock.new(...)
local self = setmetatable({}, Lock);
self.locked = false;
return self;
end
function Lock:acquire()
while self.locked do
RunService.Heartbeat:Wait();
end
self.locked = true;
end
function Lock:release()
self.locked = false;
end
end
A better approach might be having your lock contain a queue of threads. When you try to acquire the lock, if there are no threads in the queue, acquire and lock it. After the thread is done you can pop the first value off the queue if any were added while it ran. You would then run this thread and repeat until your queue is exhausted.
That’s a much better idea. Using promises from evaera’s Promise implementation, I’ve come up with this solution. The queue of threads you mentioned is just a queue of resolve functions that resolves promises to acquire the lock for threads. I’m using Osyris’s typescript compiler here, but it essentially follows regular Promise semantics from javascript:
local TS = require(...)
local RunService = game:GetService("RunService")
local Lock;
do
Lock = setmetatable({}, {});
Lock.__index = Lock;
function Lock.new(...)
local self = setmetatable({}, Lock);
self.locked = false;
self.queue = {};
return self;
end;
function Lock:acquire()
if self.locked then
return TS.Promise.new(function(resolve)
-- add the function to resolve the promise to the queue
self.queue[#self.queue + 1] = resolve;
end);
else -- give the user a free lock right away
self.locked = true;
return TS.Promise.resolve();
end;
end;
function Lock:release()
if #self.queue > 0 then
-- keep the lock locked if there are "threads" waiting for it
-- resolve the oldest promise to give the oldest "thread" the lock
local resolve = table.remove(self.queue, 1);
resolve();
else
-- if there are no more "threads" waiting for this lock, simply unlock it
self.locked = false;
end;
end;
end;
-- lock to protect access to data
local lock = Lock.new()
-- I want to protect concurrent access to this
local data = {key=10}
coroutine.wrap(function()
while wait() do
-- TS.await will wait for the lock to be free
-- via a BindableEvent that the Promises library uses
TS.await(lock:acquire():andThen(function()
local x = data["key"]
wait()
data["key"] = nil
wait()
data["key"] = x
lock:release()
end))
end
end)()
while wait() do
-- TS.await will wait for the lock to be free
-- via a BindableEvent that the Promises library uses
TS.await(lock:acquire():andThen(function()
local x = data["key"]
wait()
if (data["key"] ~= x) then
print("Race condition")
end
lock:release()
end))
end
Anything that acquires the lock must release it within the callback as shown. What do you think?
I think it’s very over-engineered. You don’t need to pull in an entire library or two to accomplish this.
local MutexMt = {__index = {}}
function MutexMt.__index:acquire()
local thread = coroutine.running()
table.insert(self.queue, thread)
if #self.queue ~= 1 then
coroutine.yield()
end
end
function MutexMt.__index:release()
table.remove(self.queue, 1)
if #self.queue ~= 0 then
coroutine.resume(self.queue[1])
end
end
function MutexMt.new()
local self = {}
self.queue = {}
return setmetatable(self, MutexMt)
end
return MutexMt
This is a quickly put together example implementation; I haven’t tested it but the point is that you can get this behavior with Lua coroutines easily and with less effort.
Example usage:
local mutex = Mutex.new()
mutex:acquire()
-- code
mutex:release()
Promises are JavaScript’s way of getting around its total lack of concurrency. I’m not saying that they’re bad, they’re actually a really useful tool in many cases, but because lua has coroutines it’s not always necessary to use promises or callback chaining. Mutexes are a pretty well established way of working with coroutines, and at their heart are often just represented with an integer. I believe @Autterfly’s solution is perfect and very clear.
The only drawback with this is the unhandled coroutine.resume. Here’s a version that uses events so that errors are handled correctly. It also has a Wrap method for convenience.
-- Mutex is a mutual exclusion lock.
local Mutex = {__index={}}
-- Lock locks the mutex. If the lock is already in use, then the calling thread
-- blocks until the lock is available.
function Mutex.__index:Lock()
local blocker = Instance.new("BoolValue")
table.insert(self.blockers, blocker)
if #self.blockers > 1 then
blocker.Changed:Wait() -- Yield
end
end
-- Unlock unlocks the mutex. If threads are blocked by the mutex, then the next
-- blocked mutex will be resumed.
function Mutex.__index:Unlock()
local blocker = table.remove(self.blockers, 1)
if not blocker then
error("attempt to unlock non-locked mutex", 2)
end
if #self.blockers == 0 then
return
end
blocker = self.blockers[1]
blocker.Value = not blocker.Value -- Resume
end
-- Wrap returns a function that, when called, locks the mutex before func is
-- called, and unlocks it after func returns. The new function receives and
-- returns the same parameters as func.
function Mutex.__index:Wrap(func)
return function(...)
self:Lock()
local results = table.pack(func(...))
self:Unlock()
return table.unpack(results, 1, results.n)
end
end
-- NewMutex returns a new mutex.
local function NewMutex()
return setmetatable({blockers = {}}, Mutex)
end
return {
new = NewMutex,
}
Something to keep in mind with either implementation is that, because of how coroutines work, releasing the mutex resumes the next thread immediately.
This solution is quite simple and elegant. I opted to use promises since I’m using them to model the rest of asynchronous behavior in my codebase, but under the hood I believe they just use events like @Anaminus’s solution. I’m marking your post as the solution, but I also think Anaminus’s code would work well. Thanks everyone for helping with this.