Federated forge server

[[ 🗃 ^rjQ3E vervis ]] :: [📥 Inbox] [📤 Outbox] [🐤 Followers] [🤝 Collaborators] [🛠 Commits]

Clone

HTTPS: git clone https://vervis.peers.community/repos/rjQ3E

SSH: git clone USERNAME@vervis.peers.community:rjQ3E

Branches

Tags

main :: src / Web / Actor /

Deliver.hs

{- This file is part of Vervis.
 -
 - Written in 2023, 2024 by fr33domlover <fr33domlover@riseup.net>.
 -
 - ♡ Copying is an act of love. Please copy, reuse and share.
 -
 - The author(s) have dedicated all copyright and related and neighboring
 - rights to this software to the public domain worldwide. This software is
 - distributed without any warranty.
 -
 - You should have received a copy of the CC0 Public Domain Dedication along
 - with this software. If not, see
 - <http://creativecommons.org/publicdomain/zero/1.0/>.
 -}

{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DerivingVia #-}

-- | Should eventually turn into an internal module for use only by
-- 'Web.Actor'.
--
-- System of local utility-actors that do the actual HTTP POSTing of
-- activities to remote actors.
module Web.Actor.Deliver
    ( DeliveryActor
    , DeliveryStage
    , DeliveryTheater ()
    , startDeliveryTheater
    , DeliveryMethod (..)
    , sendHttp
    )
where

import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Exception.Base hiding (handle)
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Logger.CallStack
import Control.Monad.Trans.Class
import Control.Monad.Trans.Except
import Control.Retry
import Data.ByteString (ByteString)
import Data.Foldable
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.HList (HList (..))
import Data.List
import Data.List.NonEmpty (NonEmpty)
import Data.Maybe
import Data.Proxy
import Data.Text (Text)
import Data.Time.Clock
import Data.Time.Interval
import Data.Traversable
import Database.Persist.Sql
import Network.HTTP.Client (Manager, HttpException (..), HttpExceptionContent (..), responseStatus)
import Network.HTTP.Types.Header (HeaderName)
import Network.HTTP.Types.URI (urlEncode, urlDecode)
import Network.HTTP.Types.Status
import System.FilePath
import System.Directory
import Web.Hashids

import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import qualified Data.HList as H
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE

import Control.Concurrent.Actor
import Database.Persist.Box
import Network.FedURI

import qualified Web.ActivityPub as AP

import Vervis.Settings

data RemoteActor = RemoteActor
    { raInbox      :: Maybe LocalURI
    , _raErrorSince :: Maybe UTCTime
    }
    deriving (Show, Read)

instance BoxableVia RemoteActor where
    type BV RemoteActor = BoxableShow

data DeliveryActor u
data DeliveryStage u

instance UriMode u => Actor (DeliveryActor u) where
    type ActorStage (DeliveryActor u) = DeliveryStage u
    type ActorIdentity (DeliveryActor u) = ObjURI u
    type ActorInterface (DeliveryActor u) =
        [ "deliver-local" ::: AP.Envelope u :-> Bool :-> Return ()
        , "forward-remote" ::: AP.Errand u :-> Return ()
        ]

instance UriMode u => ActorLaunch (DeliveryActor u) where
    actorBehavior _ =
        (handleMethod @"deliver-local" := \ uri envelope fwd -> do
            Env _ (manager, headers, micros) <- askEnv
            behavior manager headers micros uri $ Left (envelope, fwd)
        )
        `HCons`
        (handleMethod @"forward-remote" := \ uri errand -> do
            Env _ (manager, headers, micros) <- askEnv
            behavior manager headers micros uri $ Right errand
        )
        `HCons`
        HNil

instance UriMode u => Stage (DeliveryStage u) where
    data StageEnv (DeliveryStage u) = Env
        { envBox :: Box RemoteActor
        , envInit :: (Manager, NonEmpty HeaderName, Int)
        }
    type StageActors (DeliveryStage u) = '[DeliveryActor u]
    type StageSpawn (DeliveryStage u) = AllowSpawn

{-
migrations :: [Migration SqlBackend IO]
migrations =
    [ -- 1
      addEntities [entities|
        RemoteActor
            inbox      LocalURI Maybe
            errorSince UTCTime  Maybe
        |]
    ]
-}

instance UriMode u => MonadBox (ActFor (DeliveryStage u)) where
    type BoxType (ActFor (DeliveryStage u)) = RemoteActor
    askBox = asksEnv envBox

data DeliveryTheater u = DeliveryTheater
    { _dtManager :: Manager
    , _dtHeaders :: NonEmpty HeaderName
    , _dtDelay   :: Int
    , _dtLog     :: LogFunc
    , _dtDir     :: OsPath
    , _dtTheater :: TheaterFor (DeliveryStage u)
    , _dtMap     :: TVar (HashMap (ObjURI u) (Ref (DeliveryActor u)))
    }

data IdMismatch = IdMismatch deriving Show

instance Exception IdMismatch

behavior
    :: UriMode u
    => Manager
    -> NonEmpty HeaderName
    -> Int
    -> ObjURI u
    -> Either (AP.Envelope u, Bool) (AP.Errand u)
    -> ActFor (DeliveryStage u) ((), ActFor (DeliveryStage u) (), Next)
behavior manager postSignedHeaders micros (ObjURI h lu) = \case
    Left (envelope, fwd) -> do
        ra@(RemoteActor mluInbox _mError) <- runBox obtain
        uInbox <- getInbox
        let mluFwd = if fwd then Just lu else Nothing
        _resp <-
            liftIO $ retry shouldRetry toException $
                AP.deliver manager postSignedHeaders envelope mluFwd uInbox
        done ()
    Right errand -> do
        uInbox <- getInbox
        _resp <-
            liftIO $ retry shouldRetry toException $
                AP.forward manager postSignedHeaders errand uInbox
        done ()
    where
    shouldRetry = \case
        AP.APPostErrorHTTP (HttpExceptionRequest _ (StatusCodeException resp _))
            | noRetry (responseStatus resp) -> False
        _ -> True
        where
        noRetry s =
            status200 <= s && s < status300 ||
            status400 <= s && s < status500
    retry :: (e -> Bool) -> (e -> SomeException) -> IO (Either e a) -> IO a
    retry shouldRetry' toE action = do
        errorOrResult <-
            runExceptT $
                retryOnError
                    (exponentialBackoff micros)
                    (\ _ e -> pure $ shouldRetry' e)
                    (const $ ExceptT action)
        case errorOrResult of
            Left e -> throwIO $ toE e
            Right r -> return r
    getInbox = do
        ra@(RemoteActor mluInbox _mError) <- runBox obtain
        luInbox <-
            case mluInbox of
                Just luInb -> return luInb
                Nothing -> do
                    AP.Actor local _detail <-
                        liftIO $
                            retry
                                (const True)
                                (maybe (toException IdMismatch) toException)
                                (AP.fetchAPID' manager (AP.actorId . AP.actorLocal) h lu)
                    let luInb = AP.actorInbox local
                    runBox $ bestow $ ra { raInbox = Just luInb }
                    return luInb
        return $ ObjURI h luInbox

mkEnv
    :: (Manager, NonEmpty HeaderName, Int)
    -> LogFunc
    -> OsPath
    -> IO (StageEnv (DeliveryStage u))
mkEnv env logFunc path = flip runLoggingT logFunc $ do
    box <- loadBox {-migrations-} path (RemoteActor Nothing Nothing)
    return $ Env box env

type OsPath = FilePath
encodeUtf = pure
decodeUtf = pure

startDeliveryTheater
    :: UriMode u
    => FilePath
    -> NonEmpty HeaderName
    -> Int
    -> Manager
    -> LogFunc
    -> OsPath
    -> IO (DeliveryTheater u)
startDeliveryTheater avarBoxPath headers micros manager logFunc dbRootDir = do

    -- We first add the sqlite3 extension as needed
    entries <- listDirectory dbRootDir
    for_ entries $ \ path ->
        if "-shm" `isSuffixOf` path || "-wal" `isSuffixOf` path
            then return ()
        else if takeExtension path == ".sqlite3"
            then return ()
        else renameFile (dbRootDir </> path) (dbRootDir </> path <.> "sqlite3")

    entries <- listDirectory dbRootDir
    let dbs = filter ((== ".sqlite3") . takeExtension) entries
    actors <- for dbs $ \ path -> do
        path' <- T.pack <$> decodeUtf (dropExtension path)
        path'' <- either throwIO pure $ TE.decodeUtf8' $ urlDecode False $ TE.encodeUtf8 path'
        u <-
            case parseObjURI path'' of
                Left e ->
                    error $
                        "Failed to parse URI-named SQLite db filename: " ++ e
                Right uri -> return uri
        env <- mkEnv (manager, headers, micros) logFunc (dbRootDir </> path)
        return (u, env)
    (theater, actorMap `HCons` HNil) <- startTheater avarBoxPath logFunc (actors `H.HCons` H.HNil)
    actorMapVar <- newTVarIO $ HM.fromList actorMap
    return $ DeliveryTheater manager headers micros logFunc dbRootDir theater actorMapVar

data DeliveryMethod u
    = MethodDeliverLocal (AP.Envelope u) Bool
    | MethodForwardRemote (AP.Errand u)

-- Since sendManyIO isn't available right now, we're using many sendIO
sendHttp :: UriMode u => DeliveryTheater u -> DeliveryMethod u -> [ObjURI u] -> IO ()
sendHttp (DeliveryTheater manager headers micros logFunc root theater actorMapVar) method recips =
    case method of
        MethodDeliverLocal envelope fwd ->
            for_ recips $ \ u -> do
                ref <- getRef u
                void $ sendIO' @"deliver-local" theater Proxy ref $ envelope `HCons` fwd `HCons` HNil
        MethodForwardRemote errand ->
            for_ recips $ \ u -> do
                ref <- getRef u
                void $ sendIO' @"forward-remote" theater Proxy ref $ errand `HCons` HNil
    where
    makeEnv u =
        either throwIO pure (TE.decodeUtf8' $ urlEncode False $ TE.encodeUtf8 $ renderObjURI u) >>=
        encodeUtf . (<.> "sqlite3") . (root </>) . T.unpack >>=
        mkEnv (manager, headers, micros) logFunc
    getRef u = do
        mref <- HM.lookup u <$> readTVarIO actorMapVar
        case mref of
            Just r -> pure r
            Nothing -> do
                r <- spawnIO theater u (makeEnv u)
                atomically $ modifyTVar' actorMapVar $ HM.insert u r
                return r
[See repo JSON]