For future readers, its been like a year and I have indeed added a good amount of functions to my FSM (Though some parts are lacking), I dont have the time to fully document everything / make a separate tutorial post about this so you would have to power through this unfortunately.
If you need a maintained, documented and action-oriented (I assume), then I highly recommend looking into @prooheckcp’s FSM implementation instead.
Anyway, here’s my take on FSM:
local unpack = unpack or table.unpack
local SignalClass = require(game:GetService("ReplicatedStorage").Modules.Shared.lib.SignalClass)
local MaidClass = require(game:GetService("ReplicatedStorage").Modules.Shared.lib.MaidClass)
local FSM = {}
FSM.__index = function(self, k)
--* If the indexed string matches the OnEnter or any of the events' String
--* We then either get the signal class / make a new signal to connect
if k and (k:match("onEnter") or k:match("onBefore") or k:match("onAfter") or k:match("onLeave")) then
if rawget(self.connectionsTbl, k) then return rawget(self.connectionsTbl, k) end
rawset(self.connectionsTbl, k, SignalClass.new())
return rawget(self.connectionsTbl, k)
end
return rawget(FSM, k)
end
FSM.Wildcard = "*"
FSM.WildcardIndex = 1
FSM.CANCELLED = 1
FSM.PENDING = 2
local function fireConnection(signal, args)
if signal then
return signal:Fire(unpack(args))
end
end
--! This fire the according signal for the parent state (Since transitioning into a sub)
local function fireParentConnections(self, key, state, args)
for i, parentState in pairs(self:_RetrieveAllParents(state)) do
--* If the parent state is the previous state then the CN will not be fired (Parent to sub-state)
--! Transitioning from sub-states to sub-states will qualify
if self.previous == parentState then continue end
fireConnection(self.connectionsTbl[key .. parentState], args)
end
end
--* Get the nearest matching parent for 2 sub-states (Can be nested)
--* Returns a single parent (A sub-state can only have a single parent in a parent hierarchy)
function getParentMatch(self, main, mainStateParentsTbl, state2)
local mainStateTopParent = mainStateParentsTbl[1]
for i, parent in pairs(self.parentState_Of_State[state2] or {}) do
if table.find(mainStateParentsTbl, parent) then return parent end
local recursionParent = getParentMatch(self, main, mainStateParentsTbl, parent)
if recursionParent then
return recursionParent
end
end
return false
end
--* Fires OnLeave events for the previous state's parents (Sub-states are ignored)
local function fireLeaveParentConnections(self, previous, newState, args)
local t = tick()
--* Fetch the previous state parent hierarchy
--* (Only points to the parent states, does not include ANY sub-states no matter the scope)
local prevStateParentsTbl = self:_RetrieveAllParents(previous)
if #prevStateParentsTbl == 0 then return end
local prevStateTopParent = prevStateParentsTbl[1]
print("Top parent: " .. prevStateTopParent, previous)
-- the new state is out of the parent hierarchy
if not table.find(self:_RetrieveAllSubStates(prevStateTopParent), newState) then
--* Only fire the parent state hierarchy for the previous state (Doesnt take into account other substates of a parent state)
for i, parentState in pairs(prevStateParentsTbl) do
fireConnection(self.connectionsTbl["onLeave" .. parentState], args)
end
return
end
-- The new state resides within the parent
--* Fetch the nearest parent between the previous and the new state
--? We do this since the new state may be a sub state that resides in a different parent state
--? (Different here means its not apart of the previous state's parent hierarchy)
local parentMatch = table.find(prevStateParentsTbl, newState) or
getParentMatch(self, self.previous, prevStateParentsTbl, newState)
if not parentMatch then return end
local foundMatchedParent = false
--* This fires the event on parent states that's under the matched parent
for i, parentState in pairs(prevStateParentsTbl) do
if parentMatch == parentState then foundMatchedParent = true end
if not foundMatchedParent then continue end
fireConnection(self.connectionsTbl["onLeave" .. parentState], args)
end
print("Time to fire leave parent CNs:" .. tostring(tick() - t))
end
local function beforeEvent(self, event, _, _, args)
fireConnection(self.connectionsTbl["onBefore" .. event], args)
fireConnection(self.connectionsTbl["onBeforeEvent"], args)
end
local function afterEvent(self, event, _, _, args)
fireConnection(self.connectionsTbl["onAfter" .. event], args) -- specific after event callback
fireConnection(self.connectionsTbl["onAfterEvent"], args) -- all after event callback
end
local function enterState(self, _, _, to, args)
fireParentConnections(self, "onEnter", to, args)
fireConnection(self.connectionsTbl["onEnter" .. to], args)
fireConnection(self.connectionsTbl["onEnterState"], args)
end
local function leaveState(self, _, from, to, args)
fireLeaveParentConnections(self, from, to, args)
fireConnection(self.connectionsTbl["onLeave" .. from], args)
fireConnection(self.connectionsTbl["onLeaveState"], args)
end
local function releaseQueue(self)
self.isPending = false
if self.queue[1] then
if coroutine.status(self.queue[1]) ~= "running" then
task.spawn(self.queue[1])
end
table.remove(self.queue, 1)
end
end
-- Construct the transitions
local function buildTransition(self, event, statesTbl)
return function(...)
local from, to = internal_canEvent(self, event)
local args = {self, event, from, to, ...}
if from and to then
--print("transitioned from " .. from .. " to" .. to)
--print(debug.traceback())
if self.isPending then
table.insert(self.queue, coroutine.running())
coroutine.yield()
end
self.isPending = true
local before = beforeEvent(self, event, from, to, args)
if from == to then
afterEvent(self, event, from, to, args)
releaseQueue(self)
return "NONE"
end
--* [[ CANCEL & CONFIRM METHODS (GENERALLY, ITS ONLY USEFUL IN BeforeEvent CNs that yield)]]
self.confirm = function()
self.confirm = nil
self.cancel = nil
self.previous = self.current
self.current = to
--? Make Cns yield ? (pass the curr thread to the cn and manipulate it in the cn)
afterEvent(self, event, from, to, args)
enterState(self, event, from, to, args)
releaseQueue(self)
return true
end
self.cancel = function()
self.confirm = nil
self.cancel = nil
afterEvent(self, event, from, to, args)
releaseQueue(self)
return FSM.CANCELLED
end
leaveState(self, event, from, to, args)
if self.confirm then
self.confirm()
end
end
end
end
--[[
```lua
FSM.init({
initial = "Idle",
events = {
{name = "foo", from = {"bar", "foo2"}, to = "bar2"},
{name = "foo3", from = "bar2", to = "foo3", subStateOf "bar2"},
{name = "foo4", from = *, to = "bar3", except = "bar"},
},s
connections = {
}
}
```
*: Wildcard, any states can be transitioned to this state
subStateOf: Declare the parent state of the current state
*A sub state is a "child" state of a "parent" state,
*if we are IN a child state, then we are also IN the parent state. But doesn't work the other way around
*except: Used for wildcard parameters to declare which state is not allowed
]]
function FSM.init(cfg, target)
local self = setmetatable(target or {}, FSM)
self.current = "none"
self.previous = "none"
self.isPending = false
--* Stack data structure / Intrinsic lock (this stores thread to make it more performant instead of using signals)
--* Used to queue any race conditions in the FSM, which could potentially cause deadlock and raise errors
self.queue = {}
local initTab = type(cfg.initial) == "string" and {state = cfg.initial} or cfg.initial
local initEvent = initTab.event or "startup" or cfg.initial
self.eventsTbl = cfg.events
self.connectionsTbl = cfg.connections or {}
self.Maid = MaidClass.new()
self.Maid:giveTask(self.connectionsTbl)
self.statesForEvents = {} -- Track state transitions allowed from an event, used to execute transitions
self.eventsForStates = {} -- Track events that transition to a specified state, used to get an event's string from a state
self.parentState_Of_State = {} -- Track the parent states of a state
self.subState_Of_State = {} -- Track the sub-states of a parent state
-- Add an event to the table
local function add(eventTbl)
local from = eventTbl.from
local e = eventTbl.name
local fromTbl = type(from) == "table" and from or (from and {from} or {FSM.Wildcard})
local to = eventTbl.to
local parentStateTbl = type(eventTbl.subStateOf) == "table" and eventTbl.subStateOf or {eventTbl.subStateOf}
self.statesForEvents[e] = self.statesForEvents[e] or {
to = to,
except = eventTbl.except,
fromTbl = {}
}
if #parentStateTbl > 0 then --* if the state has a parent state, the "to" key here is the sub state
self.parentState_Of_State[to] = self.parentState_Of_State[to] or {}
for i, parentState in pairs(parentStateTbl) do
self.subState_Of_State[parentState] = self.subState_Of_State[parentState] or {}
table.insert(self.parentState_Of_State[to], parentState)
table.insert(self.subState_Of_State[parentState], to)
end
end
self.eventsForStates[to] = e
--* The original fromTbl is a table, we need to convert it to dictionary to increase indexing performance
for i, fromState in pairs(fromTbl) do
self.statesForEvents[e].fromTbl[fromState] = true
end
end
if cfg.initial then
add({name = initEvent, from = "none", to = initTab.state or "none"})
end
-- intialize the events
for _, tab in pairs(self.eventsTbl) do
add(tab)
end
-- build the state transitions
for name, stateTbl in pairs(self.statesForEvents) do
self[name] = buildTransition(self, name, stateTbl)
end
-- build the Connections and Establish the callbacks (These are the default / permanent callbacks)
for name, callback in pairs(self.connectionsTbl) do
self.connectionsTbl[name] = SignalClass.new()
self.connectionsTbl[name]:Connect(callback)
end
if cfg.initial then
self[initEvent]()
end
return self
end
--* Deep check if the parent states for a specific state matches the current state
--* inputState is the state whose parents should be fetched
function FSM:_deepCheckParentStateMatch(stateToCheck, inputState)
--* Loop as a state can have multiple parent states
for _, parentState in pairs(self.parentState_Of_State[inputState] or {}) do
if parentState == stateToCheck then return true end
--* Recursion check, this allows for nested parent states
if self.parentState_Of_State[parentState] then
--* do not return the method here as there might be underlying states after this loop
if self:_deepCheckParentStateMatch(stateToCheck, parentState) then return true end
end
end
return false
end
--* Check if the current state is equal to the specified state
--* strict parameter: If true, indicates that the state being checked neglects parent-states and only verify ITSELF
--* checkingPrevState: If true, defines that we're checking if the PREVIOUS state is equal to the one specified
function FSM:is(state, strict, checkingPrevState)
local stateKey = checkingPrevState and self.previous or self.current
if type(state) == "table" then
for _, s in pairs(state) do
local parentMatch = not strict and self:_deepCheckParentStateMatch(s, stateKey)
if stateKey == s or parentMatch then
return true
end
end
return false
end
local parentMatch = not strict and self:_deepCheckParentStateMatch(state, stateKey)
return stateKey == state or parentMatch
end
--* Check if the previous state is equal to the one specified (Short for FSM:is(state, strict, true))
function FSM:PreviousIs(state, strict)
return self:is(state, strict, true)
end
--* Retrieve ALL parents of the state (Ordered by parent state's hierarchy, top to bottom)
function FSM:_RetrieveAllParents(inputState)
local parentStateTbl = {}
for _, parentState in pairs(self.parentState_Of_State[inputState] or {}) do
table.insert(parentStateTbl, parentState)
--* Recurssion
if self.parentState_Of_State[parentState] then
table.insert(parentStateTbl, table.unpack(self:_RetrieveAllParents(parentState)))
end
end
return parentStateTbl
end
function FSM:_RetrieveAllSubStates(inputState)
local subStateTbl = {}
for _, subState in pairs(self.subState_Of_State[inputState] or {}) do
table.insert(subStateTbl, subState)
if self.subState_Of_State[subState] then
table.insert(subStateTbl, table.unpack(self:_RetrieveAllSubStates(subState)))
end
end
return subStateTbl
end
--* Check if parent states can be transitioned from a state ()
function FSM:_deepCheckParentStateTransition(fromTbl, inputState)
-- similar to :_deepCheckParentStateMatch, see that method for more details on how this works
for _, parentState in pairs(self.parentState_Of_State[inputState] or {}) do
if fromTbl[parentState] then return true end
if self.parentState_Of_State[parentState] then
if self:_deepCheckParentStateTransition(fromTbl, parentState) then return true end
end
end
return false
end
-- Interally used for transitions, this returns the from and to arguments for convenience
--! Substate that doesnt specifically declare which parent states that can be transitioned to itself,
--! And a :canEvent() is called for the subState while being in the parent state, it will not qualify
--! (Same thing applies for multiple substates in a same parent state, meaning transitions from substates to another will not be permitted unless specified)
function internal_canEvent(self, e)
local statesForEvents = self.statesForEvents[e]
local fromTbl = statesForEvents.fromTbl
local subState_ParentTbl = self.parentState_Of_State[self.current]
local validTrans = fromTbl[self.current] or self:_deepCheckParentStateTransition(fromTbl, self.current) or fromTbl[FSM.Wildcard]
-- check if the current state / the sub state's parent can be transitioned from the specified event / WIDLCARD
if validTrans then
local exception = statesForEvents.except
local exceptionTab = type(exception) == "table" and exception or {exception}
-- handle exceptions
if exception then
for _, state in pairs(exceptionTab) do
if self:is(state) then
print(state)
return nil, nil
end
end
end
return self.current, statesForEvents.to
end
return nil, nil
end
-- Check if an Event is possible to be executed
function FSM:canEvent(e)
return internal_canEvent(self, e) ~= nil
end
-- retrieve the event's name that transition into the specified state
function FSM:getEventForState(stateName)
return self.eventsForStates[stateName or self.current]
end
function FSM:getToStateForEvent(eventName)
return self.statesForEvents[eventName].to
end
function FSM:isPending()
return self.confirm ~= nil
end
function FSM:Destroy()
self.Maid:Destroy()
setmetatable(self, nil)
self = nil
end
return FSM
Example:
self.FSM = FSM.init({
initial = "none",
events = {
{name = "beginIdle", from = "*", to = "Idle"},
{name = "Attack", from = "Idle", to = "Attacking"},
{name = "Retreat", from = {"Idle", "Attacking"}, to = "Retreating"},
{name = "Stun", from = "*", to = "Stunned"},
{name = "Knock", from = "*", to = "Knocked", subStateOf = "Stunned"},
},
connections = {
onEnterState = function(_, event, from, to)
end,
onEnterStunned = function(_, event, from, to)
end,
onEnterKnocked = function(_, event, from, to)
end,
onEnterIdle = function(_, event, from, to)
end,
}
})
self.FSM["beginIdle"]()
Note: You need the Maid and Signal libaries for this to work, make sure to replace the paths with your own.
The notable features are: wildcards, exceptions, sub-states and parent states
This FSM is designed for flow control (ie: it handles and verifies state changes, actions should not handled within the connections itself)
There might be unexpected bugs since I haven’t tested this extensively yet.