Eventually-decentralized project hosting and management platform

[[ 🗃 ^WvWbo vervis ]] :: [📥 Inbox] [📤 Outbox] [🐤 Followers] [🤝 Collaborators] [🛠 Changes]

Clone

HTTPS: darcs clone https://vervis.peers.community/repos/WvWbo

SSH: darcs clone USERNAME@vervis.peers.community:WvWbo

Tags

TODO

src / Control / Concurrent /

Actor.hs

{- This file is part of Vervis.
 -
 - Written in 2019, 2020, 2023 by fr33domlover <fr33domlover@riseup.net>.
 -
 - ♡ Copying is an act of love. Please copy, reuse and share.
 -
 - The author(s) have dedicated all copyright and related and neighboring
 - rights to this software to the public domain worldwide. This software is
 - distributed without any warranty.
 -
 - You should have received a copy of the CC0 Public Domain Dedication along
 - with this software. If not, see
 - <http://creativecommons.org/publicdomain/zero/1.0/>.
 -}

module Control.Concurrent.Actor
    ( Stage (..)
    , TheaterFor ()
    , ActFor ()
    , MonadActor (..)
    , asksEnv
    , Next ()
    , Message (..)
    , startTheater
    , callIO
    , call
    --, sendIO
    , send
    , sendManyIO
    , sendMany
    , spawnIO
    , spawn
    , done
    , doneAnd
    , stop
    )
where

import Control.Concurrent
import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.Fail
import Control.Monad.IO.Unlift
import Control.Monad.Logger.CallStack
import Control.Monad.STM
import Control.Monad.Trans.Class
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader
import Data.Foldable
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.HashSet (HashSet)
import Data.Text (Text)
import Data.Traversable
import UnliftIO.Exception

import qualified Control.Exception.Annotated as AE
import qualified Control.Monad.Trans.RWS.Lazy as RWSL
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import qualified Data.Text as T

import Control.Concurrent.Return

type LogFunc = Loc -> LogSource -> LogLevel -> LogStr -> IO ()

-- PROBLEM: I'm stuck with how App can hold the (TheaterFor Env) while Env
-- needs to somehow hold the route rendering function (Route App -> Text) so
-- there's a cyclic reference
--
-- And right now the classes below are weird:
--
-- * Stage and Env terms used interchangeably, it's cnfusing, Stage is weird
-- * The main type everything's keyed on is the Env, which is merely parameters
--   for the actor, perhaps we can key on an abstact type where Env is just one
--   of the things keyed on it?
--
-- And that change into abstract type can also help with the cyclic reference?

class Stage a where
    type StageKey a
    type StageMessage a
    type StageReturn a

newtype Actor m r = Actor (Chan (m, Either SomeException r -> IO ()))

callIO' :: Actor m r -> m -> IO r
callIO' (Actor chan) msg = do
    (returx, wait) <- newReturn
    writeChan chan (msg, returx)
    result <- wait
    case result of
        Left e -> AE.checkpointCallStack $ throwIO e
        Right r -> return r

sendIO' :: Actor m r -> m -> IO ()
sendIO' (Actor chan) msg = writeChan chan (msg, const $ pure ())

-- | A set of live actors responding to messages
data TheaterFor s = TheaterFor
    { theaterMap :: TVar (HashMap (StageKey s) (Actor (StageMessage s) (StageReturn s)))
    , theaterLog :: LogFunc
    }

-- | Actor monad in which message reponse actions are executed. Supports
-- logging, a read-only environment, and IO.
newtype ActFor s a = ActFor
    { unActFor :: LoggingT (ReaderT (s, TheaterFor s) IO) a
    }
    deriving
        ( Functor, Applicative, Monad, MonadFail, MonadIO, MonadLogger
        , MonadLoggerIO
        )

instance MonadUnliftIO (ActFor s) where
    askUnliftIO =
        ActFor $ withUnliftIO $ \ u ->
            return $ UnliftIO $ unliftIO u . unActFor
    withRunInIO inner =
        ActFor $ withRunInIO $ \ run -> inner (run . unActFor)

runActor :: TheaterFor s -> s -> ActFor s a -> IO a
runActor theater env (ActFor action) =
    runReaderT (runLoggingT action $ theaterLog theater) (env, theater)

class Monad m => MonadActor m where
    type ActorEnv m
    askEnv :: m (ActorEnv m)
    liftActor :: ActFor (ActorEnv m) a -> m a

instance MonadActor (ActFor s) where
    type ActorEnv (ActFor s) = s
    askEnv = ActFor $ lift $ asks fst
    liftActor = id

instance MonadActor m => MonadActor (ReaderT r m) where
    type ActorEnv (ReaderT r m) = ActorEnv m
    askEnv                      = lift askEnv
    liftActor                   = lift . liftActor

instance MonadActor m => MonadActor (MaybeT m) where
    type ActorEnv (MaybeT m) = ActorEnv m
    askEnv                   = lift askEnv
    liftActor                = lift . liftActor

instance MonadActor m => MonadActor (ExceptT e m) where
    type ActorEnv (ExceptT e m) = ActorEnv m
    askEnv                      = lift askEnv
    liftActor                   = lift . liftActor

instance (Monoid w, MonadActor m) => MonadActor (RWSL.RWST r w s m) where
    type ActorEnv (RWSL.RWST r w s m) = ActorEnv m
    askEnv                            = lift askEnv
    liftActor                         = lift . liftActor

asksEnv :: MonadActor m => (ActorEnv m -> a) -> m a
asksEnv f = f <$> askEnv

data Next = Stop | Proceed

class Message a where
    summarize :: a -> Text
    refer :: a -> Text

launchActorThread
    :: ( StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Hashable k, Eq k, Show k, Message m, Show r
       )
    => Chan (m, Either SomeException r -> IO ())
    -> TheaterFor s
    -> k
    -> s
    -> (m -> ActFor s (r, ActFor s (), Next))
    -> IO ()
launchActorThread chan theater actor env behavior =
    void $ forkIO $ runActor theater env $ do
        logInfo $ prefix <> "starting"
        loop
        logInfo $ prefix <> "bye"
    where
    prefix = T.concat ["[Actor '", T.pack $ show actor, "'] "]
    loop = do
        (message, respond) <- liftIO $ readChan chan
        logInfo $ T.concat [prefix, "received: ", summarize message]
        result <- try $ behavior message
        proceed <-
            case result of
                Left e -> do
                    logError $ T.concat [prefix, "on ", refer message, " exception: ", T.pack $ displayException (e :: SomeException)]
                    liftIO $ respond $ Left e
                    return True
                Right (value, after, next) -> do
                    logInfo $ T.concat [prefix, "on ", refer message, " result: ", T.pack $ show value]
                    liftIO $ respond $ Right value
                    after
                    case next of
                        Stop -> do
                            logInfo $ T.concat [prefix, "on ", refer message, " stopping"]
                            let tvar = theaterMap theater
                            liftIO $ atomically $ modifyTVar' tvar $ HM.delete actor
                            return False
                        Proceed -> do
                            logInfo $ T.concat [prefix, "on ", refer message, " done"]
                            return True
        when proceed loop

-- | Launch the actor system
startTheater
    :: ( StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Hashable k, Eq k, Show k, Message m, Show r
       )
    => LogFunc
    -> [(k, s, m -> ActFor s (r, ActFor s (), Next))]
    -> IO (TheaterFor s)
startTheater logFunc actors = do
    actorsWithChans <- for actors $ \ (key, env, behavior) -> do
        chan <- newChan
        return ((key, Actor chan), (env, behavior))
    tvar <- newTVarIO $ HM.fromList $ map fst actorsWithChans
    let theater = TheaterFor tvar logFunc
    for_ actorsWithChans $ \ ((key, Actor chan), (env, behavior)) ->
        launchActorThread chan theater key env behavior
    return theater

askTheater :: ActFor s (TheaterFor s)
askTheater = ActFor $ lift $ asks snd

lookupActor
    :: ( StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Eq k, Hashable k
       )
    => TheaterFor s
    -> k
    -> IO (Maybe (Actor m r))
lookupActor (TheaterFor tvar _) actor = HM.lookup actor <$> readTVarIO tvar

-- | Same as 'call', except it takes the theater as a parameter.
callIO
    :: ( StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Eq k, Hashable k
       )
    => TheaterFor s -> k -> m -> IO (Maybe r)
callIO theater key msg = do
    maybeActor <- lookupActor theater key
    for maybeActor $ \ actor -> callIO' actor msg

-- | Send a message to an actor, and wait for the result to arrive. Return
-- 'Nothing' if actor doesn't exist, otherwise 'Just' the result.
--
-- If the called method throws an exception, it is rethrown, wrapped with an
-- annotation, in the current thread.
call
    :: ( MonadActor n, ActorEnv n ~ s
       , StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Eq k, Hashable k
       )
    => k -> m -> n (Maybe r)
call key msg = liftActor $ do
    theater <- askTheater
    liftIO $ callIO theater key msg

-- | Like 'send', except it takes the theater as a parameter.
sendIO
    :: (StageKey s ~ k, StageMessage s ~ m, Eq k, Hashable k)
    => TheaterFor s -> k -> m -> IO Bool
sendIO theater key msg = do
    maybeActor <- lookupActor theater key
    case maybeActor of
        Nothing -> return False
        Just actor -> do
            sendIO' actor msg
            return True

-- | Send a message to an actor, without waiting for a result. Return 'True' if
-- the given actor exists, 'False' otherwise.
send
    :: ( MonadActor n, ActorEnv n ~ s
       , StageKey s ~ k, StageMessage s ~ m
       , Eq k, Hashable k
       )
    => k -> m -> n Bool
send key msg = liftActor $ do
    theater <- askTheater
    liftIO $ sendIO theater key msg

-- | Like 'sendMany', except it takes the theater as a parameter.
sendManyIO
    :: (StageKey s ~ k, StageMessage s ~ m, Eq k, Hashable k)
    => TheaterFor s -> HashSet k -> m -> IO ()
sendManyIO (TheaterFor tvar _) recips msg = do
    allActors <- readTVarIO tvar
    for_ (HM.intersection allActors (HS.toMap recips)) $
        \ actor -> sendIO' actor msg

-- | Send a message to each actor in the set that exists in the system,
-- without waiting for results.
sendMany
    :: ( MonadActor n, ActorEnv n ~ s
       , StageKey s ~ k, StageMessage s ~ m
       , Eq k, Hashable k
       )
    => HashSet k -> m -> n ()
sendMany keys msg = liftActor $ do
    theater <- askTheater
    liftIO $ sendManyIO theater keys msg

-- | Same as 'spawn', except it takes the theater as a parameter.
spawnIO
    :: ( StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Eq k, Hashable k, Show k, Message m, Show r
       )
    => TheaterFor s
    -> k
    -> IO s
    -> (m -> ActFor s (r, ActFor s (), Next))
    -> IO Bool
spawnIO theater@(TheaterFor tvar _) key mkEnv behavior = do
    chan <- newChan
    added <- atomically $ stateTVar tvar $ \ hm ->
        let hm' = HM.alter (create $ Actor chan) key hm
        in  ( not (HM.member key hm) && HM.member key hm'
            , hm'
            )
    when added $ do
        env <- mkEnv
        launchActorThread chan theater key env behavior
    return added
    where
    create actor Nothing    = Just actor
    create _     j@(Just _) = j

-- | Launch a new actor with the given ID and behavior. Return 'True' if the ID
-- was unused and the actor has been launched. Return 'False' if the ID is
-- already in use, thus a new actor hasn't been launched.
spawn
    :: ( MonadActor n, ActorEnv n ~ s
       , StageKey s ~ k, StageMessage s ~ m, StageReturn s ~ r
       , Eq k, Hashable k, Show k, Message m, Show r
       )
    => k
    -> IO s
    -> (m -> ActFor s (r, ActFor s (), Next))
    -> n Bool
spawn key mkEnv behavior = liftActor $ do
    theater <- askTheater
    liftIO $ spawnIO theater key mkEnv behavior

done :: Monad n => a -> n (a, ActFor s (), Next)
done msg = return (msg, return (), Proceed)

doneAnd :: Monad n => a -> ActFor s () -> n (a, ActFor s (), Next)
doneAnd msg act = return (msg, act, Proceed)

stop :: Monad n => a -> n (a, ActFor s (), Next)
stop msg = return (msg, return (), Stop)
[See repo JSON]