{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
module Hyperion.Remote where
import Control.Concurrent.MVar (MVar, isEmptyMVar,
newEmptyMVar, putMVar,
readMVar, takeMVar)
import Control.Distributed.Process hiding (bracket, catch,
try)
import Control.Distributed.Process.Async (AsyncResult (..), async,
task, wait)
import Control.Distributed.Process.Closure (SerializableDict (..))
import qualified Control.Distributed.Process.Node as Node
import Control.Distributed.Static (registerStatic,
staticLabel)
import Control.Monad.Catch (Exception, SomeException,
bracket, catch, throwM,
try)
import Control.Monad.Extra (whenM)
import Control.Monad.Trans.Maybe (MaybeT (..))
import Data.Binary (Binary)
import Data.Constraint (Dict (..))
import Data.Data (Typeable)
import Data.Foldable (asum)
import Data.Rank1Dynamic (toDynamic)
import Data.Text (Text, pack)
import qualified Data.Text.Encoding as E
import Data.Time.Clock (NominalDiffTime)
import GHC.Generics (Generic)
import Hyperion.CallClosure (call')
import qualified Hyperion.Log as Log
import Hyperion.Static (Serializable, ptrAp)
import Hyperion.Util (nominalDiffTimeToMicroseconds,
newUnique)
import Network.BSD (HostEntry (..),
getHostEntries,
getHostName)
import Network.Socket (hostAddressToTuple)
import Network.Transport (EndPointAddress (..))
import qualified Network.Transport.TCP as NT
newtype ServiceId = ServiceId String
deriving (ServiceId -> ServiceId -> Bool
(ServiceId -> ServiceId -> Bool)
-> (ServiceId -> ServiceId -> Bool) -> Eq ServiceId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ServiceId -> ServiceId -> Bool
$c/= :: ServiceId -> ServiceId -> Bool
== :: ServiceId -> ServiceId -> Bool
$c== :: ServiceId -> ServiceId -> Bool
Eq, Int -> ServiceId -> ShowS
[ServiceId] -> ShowS
ServiceId -> String
(Int -> ServiceId -> ShowS)
-> (ServiceId -> String)
-> ([ServiceId] -> ShowS)
-> Show ServiceId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ServiceId] -> ShowS
$cshowList :: [ServiceId] -> ShowS
show :: ServiceId -> String
$cshow :: ServiceId -> String
showsPrec :: Int -> ServiceId -> ShowS
$cshowsPrec :: Int -> ServiceId -> ShowS
Show, (forall x. ServiceId -> Rep ServiceId x)
-> (forall x. Rep ServiceId x -> ServiceId) -> Generic ServiceId
forall x. Rep ServiceId x -> ServiceId
forall x. ServiceId -> Rep ServiceId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ServiceId x -> ServiceId
$cfrom :: forall x. ServiceId -> Rep ServiceId x
Generic, Get ServiceId
[ServiceId] -> Put
ServiceId -> Put
(ServiceId -> Put)
-> Get ServiceId -> ([ServiceId] -> Put) -> Binary ServiceId
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [ServiceId] -> Put
$cputList :: [ServiceId] -> Put
get :: Get ServiceId
$cget :: Get ServiceId
put :: ServiceId -> Put
$cput :: ServiceId -> Put
Binary)
serviceIdToText :: ServiceId -> Text
serviceIdToText :: ServiceId -> Text
serviceIdToText = String -> Text
pack (String -> Text) -> (ServiceId -> String) -> ServiceId -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServiceId -> String
serviceIdToString
serviceIdToString :: ServiceId -> String
serviceIdToString :: ServiceId -> String
serviceIdToString (ServiceId String
s) = String
s
data WorkerMessage = Connected | ShutDown
deriving (ReadPrec [WorkerMessage]
ReadPrec WorkerMessage
Int -> ReadS WorkerMessage
ReadS [WorkerMessage]
(Int -> ReadS WorkerMessage)
-> ReadS [WorkerMessage]
-> ReadPrec WorkerMessage
-> ReadPrec [WorkerMessage]
-> Read WorkerMessage
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [WorkerMessage]
$creadListPrec :: ReadPrec [WorkerMessage]
readPrec :: ReadPrec WorkerMessage
$creadPrec :: ReadPrec WorkerMessage
readList :: ReadS [WorkerMessage]
$creadList :: ReadS [WorkerMessage]
readsPrec :: Int -> ReadS WorkerMessage
$creadsPrec :: Int -> ReadS WorkerMessage
Read, Int -> WorkerMessage -> ShowS
[WorkerMessage] -> ShowS
WorkerMessage -> String
(Int -> WorkerMessage -> ShowS)
-> (WorkerMessage -> String)
-> ([WorkerMessage] -> ShowS)
-> Show WorkerMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerMessage] -> ShowS
$cshowList :: [WorkerMessage] -> ShowS
show :: WorkerMessage -> String
$cshow :: WorkerMessage -> String
showsPrec :: Int -> WorkerMessage -> ShowS
$cshowsPrec :: Int -> WorkerMessage -> ShowS
Show, (forall x. WorkerMessage -> Rep WorkerMessage x)
-> (forall x. Rep WorkerMessage x -> WorkerMessage)
-> Generic WorkerMessage
forall x. Rep WorkerMessage x -> WorkerMessage
forall x. WorkerMessage -> Rep WorkerMessage x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep WorkerMessage x -> WorkerMessage
$cfrom :: forall x. WorkerMessage -> Rep WorkerMessage x
Generic, Get WorkerMessage
[WorkerMessage] -> Put
WorkerMessage -> Put
(WorkerMessage -> Put)
-> Get WorkerMessage
-> ([WorkerMessage] -> Put)
-> Binary WorkerMessage
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [WorkerMessage] -> Put
$cputList :: [WorkerMessage] -> Put
get :: Get WorkerMessage
$cget :: Get WorkerMessage
put :: WorkerMessage -> Put
$cput :: WorkerMessage -> Put
Binary)
data RemoteError = RemoteError ServiceId RemoteErrorType
deriving (Int -> RemoteError -> ShowS
[RemoteError] -> ShowS
RemoteError -> String
(Int -> RemoteError -> ShowS)
-> (RemoteError -> String)
-> ([RemoteError] -> ShowS)
-> Show RemoteError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RemoteError] -> ShowS
$cshowList :: [RemoteError] -> ShowS
show :: RemoteError -> String
$cshow :: RemoteError -> String
showsPrec :: Int -> RemoteError -> ShowS
$cshowsPrec :: Int -> RemoteError -> ShowS
Show, Show RemoteError
Typeable RemoteError
Typeable RemoteError
-> Show RemoteError
-> (RemoteError -> SomeException)
-> (SomeException -> Maybe RemoteError)
-> (RemoteError -> String)
-> Exception RemoteError
SomeException -> Maybe RemoteError
RemoteError -> String
RemoteError -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: RemoteError -> String
$cdisplayException :: RemoteError -> String
fromException :: SomeException -> Maybe RemoteError
$cfromException :: SomeException -> Maybe RemoteError
toException :: RemoteError -> SomeException
$ctoException :: RemoteError -> SomeException
$cp2Exception :: Show RemoteError
$cp1Exception :: Typeable RemoteError
Exception)
data RemoteErrorType
= RemoteAsyncFailed DiedReason
| RemoteAsyncLinkFailed DiedReason
| RemoteAsyncCancelled
| RemoteAsyncPending
| RemoteException String
deriving (Int -> RemoteErrorType -> ShowS
[RemoteErrorType] -> ShowS
RemoteErrorType -> String
(Int -> RemoteErrorType -> ShowS)
-> (RemoteErrorType -> String)
-> ([RemoteErrorType] -> ShowS)
-> Show RemoteErrorType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RemoteErrorType] -> ShowS
$cshowList :: [RemoteErrorType] -> ShowS
show :: RemoteErrorType -> String
$cshow :: RemoteErrorType -> String
showsPrec :: Int -> RemoteErrorType -> ShowS
$cshowsPrec :: Int -> RemoteErrorType -> ShowS
Show)
data WorkerConnectionTimeout = WorkerConnectionTimeout ServiceId
deriving (Int -> WorkerConnectionTimeout -> ShowS
[WorkerConnectionTimeout] -> ShowS
WorkerConnectionTimeout -> String
(Int -> WorkerConnectionTimeout -> ShowS)
-> (WorkerConnectionTimeout -> String)
-> ([WorkerConnectionTimeout] -> ShowS)
-> Show WorkerConnectionTimeout
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerConnectionTimeout] -> ShowS
$cshowList :: [WorkerConnectionTimeout] -> ShowS
show :: WorkerConnectionTimeout -> String
$cshow :: WorkerConnectionTimeout -> String
showsPrec :: Int -> WorkerConnectionTimeout -> ShowS
$cshowsPrec :: Int -> WorkerConnectionTimeout -> ShowS
Show, Show WorkerConnectionTimeout
Typeable WorkerConnectionTimeout
Typeable WorkerConnectionTimeout
-> Show WorkerConnectionTimeout
-> (WorkerConnectionTimeout -> SomeException)
-> (SomeException -> Maybe WorkerConnectionTimeout)
-> (WorkerConnectionTimeout -> String)
-> Exception WorkerConnectionTimeout
SomeException -> Maybe WorkerConnectionTimeout
WorkerConnectionTimeout -> String
WorkerConnectionTimeout -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: WorkerConnectionTimeout -> String
$cdisplayException :: WorkerConnectionTimeout -> String
fromException :: SomeException -> Maybe WorkerConnectionTimeout
$cfromException :: SomeException -> Maybe WorkerConnectionTimeout
toException :: WorkerConnectionTimeout -> SomeException
$ctoException :: WorkerConnectionTimeout -> SomeException
$cp2Exception :: Show WorkerConnectionTimeout
$cp1Exception :: Typeable WorkerConnectionTimeout
Exception)
data WorkerLauncher j = WorkerLauncher
{
WorkerLauncher j
-> forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
withLaunchedWorker :: forall b . NodeId -> ServiceId -> (j -> Process b) -> Process b
, WorkerLauncher j -> Maybe NominalDiffTime
connectionTimeout :: Maybe NominalDiffTime
, WorkerLauncher j -> forall b. RemoteError -> Process b -> Process b
onRemoteError :: forall b . RemoteError -> Process b -> Process b
}
runProcessLocal :: Process a -> IO a
runProcessLocal :: Process a -> IO a
runProcessLocal = RemoteTable -> Process a -> IO a
forall a. RemoteTable -> Process a -> IO a
runProcessLocalWithRT RemoteTable
Node.initRemoteTable
runProcessLocalWithRT :: RemoteTable -> Process a -> IO a
runProcessLocalWithRT :: RemoteTable -> Process a -> IO a
runProcessLocalWithRT RemoteTable
rt Process a
process = do
MVar a
resultVar <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
RemoteTable -> Process () -> IO ()
runProcessLocalWithRT_ RemoteTable
rt (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ Process a
process Process a -> (a -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (a -> IO ()) -> a -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
resultVar
MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
resultVar
runProcessLocalWithRT_ :: RemoteTable -> Process () -> IO ()
runProcessLocalWithRT_ :: RemoteTable -> Process () -> IO ()
runProcessLocalWithRT_ RemoteTable
rtable Process ()
process = do
String
host <- IO String
getHostName
TCPAddr -> TCPParameters -> IO (Either IOException Transport)
NT.createTransport (String -> String -> TCPAddr
NT.defaultTCPAddr String
host String
"0") TCPParameters
NT.defaultTCPParameters IO (Either IOException Transport)
-> (Either IOException Transport -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left IOException
e -> IOException -> IO ()
forall (m :: * -> *) e a.
(MonadThrow m, MonadIO m, Exception e) =>
e -> m a
Log.throw IOException
e
Right Transport
t -> do
LocalNode
node <- Transport -> RemoteTable -> IO LocalNode
Node.newLocalNode Transport
t RemoteTable
rtable
Text -> NodeId -> IO ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Running on node" (LocalNode -> NodeId
Node.localNodeId LocalNode
node)
LocalNode -> Process () -> IO ()
Node.runProcess LocalNode
node Process ()
process
getExternalHostName :: IO String
getExternalHostName :: IO String
getExternalHostName = do
[HostEntry]
entries <- Bool -> IO [HostEntry]
getHostEntries Bool
True
case (HostEntry -> Bool) -> [HostEntry] -> [HostEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter ((HostAddress -> Bool) -> [HostAddress] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Bool -> Bool
not (Bool -> Bool) -> (HostAddress -> Bool) -> HostAddress -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostAddress -> Bool
isLocal) ([HostAddress] -> Bool)
-> (HostEntry -> [HostAddress]) -> HostEntry -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostEntry -> [HostAddress]
hostAddresses) [HostEntry]
entries of
HostEntry
e : [HostEntry]
_ -> String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ HostEntry -> String
hostName HostEntry
e
[] -> String -> IO String
forall (m :: * -> *) a. MonadIO m => String -> m a
Log.throwError (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"Cannot find external network address among host entries: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ [HostEntry] -> String
forall a. Show a => a -> String
show [HostEntry]
entries
where
isLocal :: HostAddress -> Bool
isLocal HostAddress
a = case HostAddress -> (Word8, Word8, Word8, Word8)
hostAddressToTuple HostAddress
a of
(Word8
127, Word8
_, Word8
_, Word8
_) -> Bool
True
(Word8
10, Word8
_, Word8
_, Word8
_) -> Bool
True
(Word8, Word8, Word8, Word8)
_ -> Bool
False
addressToNodeId :: Text -> NodeId
addressToNodeId :: Text -> NodeId
addressToNodeId = EndPointAddress -> NodeId
NodeId (EndPointAddress -> NodeId)
-> (Text -> EndPointAddress) -> Text -> NodeId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> EndPointAddress
EndPointAddress (ByteString -> EndPointAddress)
-> (Text -> ByteString) -> Text -> EndPointAddress
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
E.encodeUtf8
nodeIdToAddress :: NodeId -> Text
nodeIdToAddress :: NodeId -> Text
nodeIdToAddress (NodeId (EndPointAddress ByteString
addr)) = ByteString -> Text
E.decodeUtf8 ByteString
addr
masterNodeIdLabel :: String
masterNodeIdLabel :: String
masterNodeIdLabel = String
"masterNodeIdLabel"
masterNodeIdStatic :: Static (Maybe NodeId)
masterNodeIdStatic :: Static (Maybe NodeId)
masterNodeIdStatic = String -> Static (Maybe NodeId)
forall a. String -> Static a
staticLabel String
masterNodeIdLabel
getMasterNodeId :: Process (Maybe NodeId)
getMasterNodeId :: Process (Maybe NodeId)
getMasterNodeId = Static (Maybe NodeId) -> Process (Maybe NodeId)
forall a. Typeable a => Static a -> Process a
unStatic Static (Maybe NodeId)
masterNodeIdStatic
registerMasterNodeId :: Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId :: Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId Maybe NodeId
nid = String -> Dynamic -> RemoteTable -> RemoteTable
registerStatic String
masterNodeIdLabel (Maybe NodeId -> Dynamic
forall a. Typeable a => a -> Dynamic
toDynamic Maybe NodeId
nid)
initWorkerRemoteTable :: Maybe NodeId -> RemoteTable
initWorkerRemoteTable :: Maybe NodeId -> RemoteTable
initWorkerRemoteTable Maybe NodeId
nid = Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId Maybe NodeId
nid RemoteTable
Node.initRemoteTable
worker
:: NodeId
-> ServiceId
-> Process ()
worker :: NodeId -> ServiceId -> Process ()
worker NodeId
masterNode serviceId :: ServiceId
serviceId@(ServiceId String
masterService) = do
ProcessId
self <- Process ProcessId
getSelfPid
(SendPort WorkerMessage
sendPort, ReceivePort WorkerMessage
receivePort) <- Process (SendPort WorkerMessage, ReceivePort WorkerMessage)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
let connectToMaster :: MaybeT Process WorkerMessage
connectToMaster = Process (Maybe WorkerMessage) -> MaybeT Process WorkerMessage
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Process (Maybe WorkerMessage) -> MaybeT Process WorkerMessage)
-> Process (Maybe WorkerMessage) -> MaybeT Process WorkerMessage
forall a b. (a -> b) -> a -> b
$ do
Text -> NodeId -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Connecting to master" NodeId
masterNode
NodeId
-> String
-> (ProcessId, ServiceId, SendPort WorkerMessage)
-> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
masterNode String
masterService (ProcessId
self, ServiceId
serviceId, SendPort WorkerMessage
sendPort)
Int -> ReceivePort WorkerMessage -> Process (Maybe WorkerMessage)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout (Int
10Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000) ReceivePort WorkerMessage
receivePort
MaybeT Process WorkerMessage -> Process (Maybe WorkerMessage)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT ([MaybeT Process WorkerMessage] -> MaybeT Process WorkerMessage
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum (Int
-> MaybeT Process WorkerMessage -> [MaybeT Process WorkerMessage]
forall a. Int -> a -> [a]
replicate Int
5 MaybeT Process WorkerMessage
connectToMaster)) Process (Maybe WorkerMessage)
-> (Maybe WorkerMessage -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just WorkerMessage
Connected -> do
Text -> Process ()
forall (m :: * -> *). MonadIO m => Text -> m ()
Log.text Text
"Successfully connected to master."
Process WorkerMessage
forall a. Serializable a => Process a
expect Process WorkerMessage
-> (WorkerMessage -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
WorkerMessage
Connected -> String -> Process ()
forall (m :: * -> *) a. MonadIO m => String -> m a
Log.throwError String
"Unexpected 'Connected' received."
WorkerMessage
ShutDown -> Text -> Process ()
forall (m :: * -> *). MonadIO m => Text -> m ()
Log.text Text
"Shutting down."
Maybe WorkerMessage
_ -> Text -> Process ()
forall (m :: * -> *). MonadIO m => Text -> m ()
Log.text Text
"Couldn't connect to master" Process () -> Process () -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Process ()
forall a b. Serializable a => a -> Process b
die ()
withServiceId :: (ServiceId -> Process a) -> Process a
withServiceId :: (ServiceId -> Process a) -> Process a
withServiceId = Process ServiceId
-> (ServiceId -> Process ())
-> (ServiceId -> Process a)
-> Process a
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket Process ServiceId
newServiceId (\(ServiceId String
s) -> String -> Process ()
unregister String
s)
where
newServiceId :: Process ServiceId
newServiceId = do
String
s <- IO String -> Process String
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO String -> Process String) -> IO String -> Process String
forall a b. (a -> b) -> a -> b
$ Unique -> String
forall a. Show a => a -> String
show (Unique -> String) -> IO Unique -> IO String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Unique
newUnique
Process ProcessId
getSelfPid Process ProcessId -> (ProcessId -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= String -> ProcessId -> Process ()
register String
s
ServiceId -> Process ServiceId
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> ServiceId
ServiceId String
s)
withService
:: Show j
=> WorkerLauncher j
-> (NodeId -> ServiceId -> Process a)
-> Process a
withService :: WorkerLauncher j -> (NodeId -> ServiceId -> Process a) -> Process a
withService WorkerLauncher{Maybe NominalDiffTime
forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
onRemoteError :: forall j.
WorkerLauncher j -> forall b. RemoteError -> Process b -> Process b
connectionTimeout :: forall j. WorkerLauncher j -> Maybe NominalDiffTime
withLaunchedWorker :: forall j.
WorkerLauncher j
-> forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
..} NodeId -> ServiceId -> Process a
go = (ServiceId -> Process a) -> Process a
forall a. (ServiceId -> Process a) -> Process a
withServiceId ((ServiceId -> Process a) -> Process a)
-> (ServiceId -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \ServiceId
serviceId -> do
NodeId
nid <- Process NodeId
getSelfNode
NodeId -> ServiceId -> (j -> Process a) -> Process a
forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId ((j -> Process a) -> Process a) -> (j -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \j
jobId -> do
Text -> (ServiceId, j) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Deployed worker" (ServiceId
serviceId, j
jobId)
let
awaitWorker :: Process ProcessId
awaitWorker = do
Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
connectionResult <- case Maybe NominalDiffTime
connectionTimeout of
Just NominalDiffTime
t -> Int
-> Process (Maybe (ProcessId, ServiceId, SendPort WorkerMessage))
forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout (NominalDiffTime -> Int
nominalDiffTimeToMicroseconds NominalDiffTime
t)
Maybe NominalDiffTime
Nothing -> ((ProcessId, ServiceId, SendPort WorkerMessage)
-> Maybe (ProcessId, ServiceId, SendPort WorkerMessage))
-> Process (ProcessId, ServiceId, SendPort WorkerMessage)
-> Process (Maybe (ProcessId, ServiceId, SendPort WorkerMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ProcessId, ServiceId, SendPort WorkerMessage)
-> Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
forall a. a -> Maybe a
Just Process (ProcessId, ServiceId, SendPort WorkerMessage)
forall a. Serializable a => Process a
expect
case Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
connectionResult of
Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
Nothing -> WorkerConnectionTimeout -> Process ProcessId
forall (m :: * -> *) e a.
(MonadThrow m, MonadIO m, Exception e) =>
e -> m a
Log.throw (ServiceId -> WorkerConnectionTimeout
WorkerConnectionTimeout ServiceId
serviceId)
Just (ProcessId
workerId :: ProcessId, ServiceId
workerServiceId :: ServiceId, SendPort WorkerMessage
_ :: SendPort WorkerMessage)
| ServiceId
workerServiceId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
/= ServiceId
serviceId -> do
Text -> (ServiceId, j, ProcessId) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Ignoring message from unknown worker" (ServiceId
workerServiceId, j
jobId, ProcessId
workerId)
Process ProcessId
awaitWorker
Just (ProcessId
workerId :: ProcessId, ServiceId
_ :: ServiceId, SendPort WorkerMessage
replyTo :: SendPort WorkerMessage)
| Bool
otherwise -> do
Text -> (ServiceId, j, ProcessId) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Acquired worker" (ServiceId
serviceId, j
jobId, ProcessId
workerId)
SendPort WorkerMessage -> WorkerMessage -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort WorkerMessage
replyTo WorkerMessage
Connected
ProcessId -> Process ProcessId
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
workerId
shutdownWorker :: ProcessId -> Process ()
shutdownWorker ProcessId
workerId = do
Text -> (ServiceId, j) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Worker finished" (ServiceId
serviceId, j
jobId)
ProcessId -> WorkerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
workerId WorkerMessage
ShutDown
Process ProcessId
-> (ProcessId -> Process ())
-> (ProcessId -> Process a)
-> Process a
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket Process ProcessId
awaitWorker ProcessId -> Process ()
shutdownWorker (\ProcessId
wId -> NodeId -> ServiceId -> Process a
go (ProcessId -> NodeId
processNodeId ProcessId
wId) ServiceId
serviceId)
data SerializableClosureProcess a = SerializableClosureProcess
{
SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
runClosureProcess :: Process (Closure (Process (Either String a)))
, SerializableClosureProcess a
-> Closure (SerializableDict (Either String a))
staticSDict :: Closure (SerializableDict (Either String a))
, SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar :: MVar (Closure (Process (Either String a)))
}
getClosure :: SerializableClosureProcess a -> Process (Closure (Process (Either String a)))
getClosure :: SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
getClosure SerializableClosureProcess a
s = do
Process Bool -> Process () -> Process ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (IO Bool -> Process Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar (Closure (Process (Either String a))) -> IO Bool
forall a. MVar a -> IO Bool
isEmptyMVar (SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar SerializableClosureProcess a
s))) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
Closure (Process (Either String a))
c <- SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
runClosureProcess SerializableClosureProcess a
s
IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (Closure (Process (Either String a)))
-> Closure (Process (Either String a)) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar SerializableClosureProcess a
s) Closure (Process (Either String a))
c
IO (Closure (Process (Either String a)))
-> Process (Closure (Process (Either String a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Closure (Process (Either String a)))
-> Process (Closure (Process (Either String a))))
-> IO (Closure (Process (Either String a)))
-> Process (Closure (Process (Either String a)))
forall a b. (a -> b) -> a -> b
$ MVar (Closure (Process (Either String a)))
-> IO (Closure (Process (Either String a)))
forall a. MVar a -> IO a
readMVar (SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar SerializableClosureProcess a
s)
type RemoteProcessRunner =
forall a . (Binary a, Typeable a) => SerializableClosureProcess a -> Process a
withRemoteRunProcess
:: Show j
=> WorkerLauncher j
-> (RemoteProcessRunner -> Process a)
-> Process a
withRemoteRunProcess :: WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
withRemoteRunProcess WorkerLauncher j
workerLauncher RemoteProcessRunner -> Process a
go =
Process a
goWithRemote Process a -> (RemoteError -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \RemoteError
e -> WorkerLauncher j -> RemoteError -> Process a -> Process a
forall j.
WorkerLauncher j -> forall b. RemoteError -> Process b -> Process b
onRemoteError WorkerLauncher j
workerLauncher RemoteError
e (WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
forall j a.
Show j =>
WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
withRemoteRunProcess WorkerLauncher j
workerLauncher RemoteProcessRunner -> Process a
go)
where
goWithRemote :: Process a
goWithRemote =
WorkerLauncher j -> (NodeId -> ServiceId -> Process a) -> Process a
forall j a.
Show j =>
WorkerLauncher j -> (NodeId -> ServiceId -> Process a) -> Process a
withService WorkerLauncher j
workerLauncher ((NodeId -> ServiceId -> Process a) -> Process a)
-> (NodeId -> ServiceId -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \NodeId
workerNodeId ServiceId
serviceId ->
RemoteProcessRunner -> Process a
go (RemoteProcessRunner -> Process a)
-> RemoteProcessRunner -> Process a
forall a b. (a -> b) -> a -> b
$ \SerializableClosureProcess a
s -> do
Closure (Process (Either String a))
c <- SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
getClosure SerializableClosureProcess a
s
AsyncResult (Either String a)
a <- Async (Either String a) -> Process (AsyncResult (Either String a))
forall a. Async a -> Process (AsyncResult a)
wait (Async (Either String a)
-> Process (AsyncResult (Either String a)))
-> Process (Async (Either String a))
-> Process (AsyncResult (Either String a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AsyncTask (Either String a) -> Process (Async (Either String a))
forall a. Serializable a => AsyncTask a -> Process (Async a)
async (Process (Either String a) -> AsyncTask (Either String a)
forall a. Process a -> AsyncTask a
task (Process (Either String a) -> AsyncTask (Either String a))
-> Process (Either String a) -> AsyncTask (Either String a)
forall a b. (a -> b) -> a -> b
$ Closure (SerializableDict (Either String a))
-> NodeId
-> Closure (Process (Either String a))
-> Process (Either String a)
forall a.
(Binary a, Typeable a) =>
Closure (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
call' (SerializableClosureProcess a
-> Closure (SerializableDict (Either String a))
forall a.
SerializableClosureProcess a
-> Closure (SerializableDict (Either String a))
staticSDict SerializableClosureProcess a
s) NodeId
workerNodeId Closure (Process (Either String a))
c)
case AsyncResult (Either String a)
a of
AsyncDone (Right a
result) -> a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
result
AsyncDone (Left String
err) -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ String -> RemoteErrorType
RemoteException String
err
AsyncFailed DiedReason
reason -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ DiedReason -> RemoteErrorType
RemoteAsyncFailed DiedReason
reason
AsyncLinkFailed DiedReason
reason -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ DiedReason -> RemoteErrorType
RemoteAsyncLinkFailed DiedReason
reason
AsyncResult (Either String a)
AsyncCancelled -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ RemoteErrorType
RemoteAsyncCancelled
AsyncResult (Either String a)
AsyncPending -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ RemoteErrorType
RemoteAsyncPending
tryLogException :: Process b -> Process (Either String b)
tryLogException :: Process b -> Process (Either String b)
tryLogException Process b
go = Process b -> Process (Either SomeException b)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try Process b
go Process (Either SomeException b)
-> (Either SomeException b -> Process (Either String b))
-> Process (Either String b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left (SomeException
e :: SomeException) -> do
SomeException -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => a -> m ()
Log.err SomeException
e
Either String b -> Process (Either String b)
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> Either String b
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
Right b
b ->
Either String b -> Process (Either String b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either String b
forall a b. b -> Either a b
Right b
b)
mkSerializableClosureProcess
:: Typeable b
=> Closure (Dict (Serializable b))
-> Process (Closure (Process b))
-> Process (SerializableClosureProcess b)
mkSerializableClosureProcess :: Closure (Dict (Serializable b))
-> Process (Closure (Process b))
-> Process (SerializableClosureProcess b)
mkSerializableClosureProcess Closure (Dict (Serializable b))
bDict Process (Closure (Process b))
mb = do
MVar (Closure (Process (Either String b)))
v <- IO (MVar (Closure (Process (Either String b))))
-> Process (MVar (Closure (Process (Either String b))))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (Closure (Process (Either String b))))
forall a. IO (MVar a)
newEmptyMVar
SerializableClosureProcess b
-> Process (SerializableClosureProcess b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerializableClosureProcess b
-> Process (SerializableClosureProcess b))
-> SerializableClosureProcess b
-> Process (SerializableClosureProcess b)
forall a b. (a -> b) -> a -> b
$ SerializableClosureProcess :: forall a.
Process (Closure (Process (Either String a)))
-> Closure (SerializableDict (Either String a))
-> MVar (Closure (Process (Either String a)))
-> SerializableClosureProcess a
SerializableClosureProcess
{ runClosureProcess :: Process (Closure (Process (Either String b)))
runClosureProcess = (Closure (Process b) -> Closure (Process (Either String b)))
-> Process (Closure (Process b))
-> Process (Closure (Process (Either String b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (static Process b -> Process (Either String b)
forall b. Process b -> Process (Either String b)
tryLogException StaticPtr (Process b -> Process (Either String b))
-> Closure (Process b) -> Closure (Process (Either String b))
forall a b.
(Typeable a, Typeable b) =>
StaticPtr (a -> b) -> Closure a -> Closure b
`ptrAp`) Process (Closure (Process b))
mb
, staticSDict :: Closure (SerializableDict (Either String b))
staticSDict = static (\Dict (Serializable b)
Dict -> SerializableDict (Either String b)
forall a. Serializable a => SerializableDict a
SerializableDict) StaticPtr
(Dict (Serializable b) -> SerializableDict (Either String b))
-> Closure (Dict (Serializable b))
-> Closure (SerializableDict (Either String b))
forall a b.
(Typeable a, Typeable b) =>
StaticPtr (a -> b) -> Closure a -> Closure b
`ptrAp` Closure (Dict (Serializable b))
bDict
, closureVar :: MVar (Closure (Process (Either String b)))
closureVar = MVar (Closure (Process (Either String b)))
v
}