{-# LANGUAGE DeriveAnyClass      #-}
{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers      #-}
{-# LANGUAGE TypeApplications    #-}

module Hyperion.Remote where

import           Control.Concurrent.MVar             (MVar, isEmptyMVar,
                                                      newEmptyMVar, putMVar,
                                                      readMVar, takeMVar)
import           Control.Distributed.Process         hiding (bracket, catch,
                                                      try)
import           Control.Distributed.Process.Async   (AsyncResult (..), async,
                                                      task, wait)
import           Control.Distributed.Process.Closure (SerializableDict (..))
import qualified Control.Distributed.Process.Node    as Node
import           Control.Distributed.Static          (registerStatic,
                                                      staticLabel)
import           Control.Monad.Catch                 (Exception, SomeException,
                                                      bracket, catch, throwM,
                                                      try)
import           Control.Monad.Extra                 (whenM)
import           Control.Monad.Trans.Maybe           (MaybeT (..))
import           Data.Binary                         (Binary)
import           Data.Constraint                     (Dict (..))
import           Data.Data                           (Typeable)
import           Data.Foldable                       (asum)
import           Data.Rank1Dynamic                   (toDynamic)
import           Data.Text                           (Text, pack)
import qualified Data.Text.Encoding                  as E
import           Data.Time.Clock                     (NominalDiffTime)
import           GHC.Generics                        (Generic)
import           Hyperion.CallClosure                (call')
import qualified Hyperion.Log                        as Log
import           Hyperion.Static                     (Serializable, ptrAp)
import           Hyperion.Util                       (nominalDiffTimeToMicroseconds,
                                                      newUnique)
import           Network.BSD                         (HostEntry (..),
                                                      getHostEntries,
                                                      getHostName)
import           Network.Socket                      (hostAddressToTuple)
import           Network.Transport                   (EndPointAddress (..))
import qualified Network.Transport.TCP               as NT

-- * Types

-- | Type for service id. 'ServiceId' is typically a random string that
-- is assigned to a worker. (Maybe to other things too?)
newtype ServiceId = ServiceId String
  deriving (ServiceId -> ServiceId -> Bool
(ServiceId -> ServiceId -> Bool)
-> (ServiceId -> ServiceId -> Bool) -> Eq ServiceId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ServiceId -> ServiceId -> Bool
$c/= :: ServiceId -> ServiceId -> Bool
== :: ServiceId -> ServiceId -> Bool
$c== :: ServiceId -> ServiceId -> Bool
Eq, Int -> ServiceId -> ShowS
[ServiceId] -> ShowS
ServiceId -> String
(Int -> ServiceId -> ShowS)
-> (ServiceId -> String)
-> ([ServiceId] -> ShowS)
-> Show ServiceId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ServiceId] -> ShowS
$cshowList :: [ServiceId] -> ShowS
show :: ServiceId -> String
$cshow :: ServiceId -> String
showsPrec :: Int -> ServiceId -> ShowS
$cshowsPrec :: Int -> ServiceId -> ShowS
Show, (forall x. ServiceId -> Rep ServiceId x)
-> (forall x. Rep ServiceId x -> ServiceId) -> Generic ServiceId
forall x. Rep ServiceId x -> ServiceId
forall x. ServiceId -> Rep ServiceId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ServiceId x -> ServiceId
$cfrom :: forall x. ServiceId -> Rep ServiceId x
Generic, Get ServiceId
[ServiceId] -> Put
ServiceId -> Put
(ServiceId -> Put)
-> Get ServiceId -> ([ServiceId] -> Put) -> Binary ServiceId
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [ServiceId] -> Put
$cputList :: [ServiceId] -> Put
get :: Get ServiceId
$cget :: Get ServiceId
put :: ServiceId -> Put
$cput :: ServiceId -> Put
Binary)

serviceIdToText :: ServiceId -> Text
serviceIdToText :: ServiceId -> Text
serviceIdToText = String -> Text
pack (String -> Text) -> (ServiceId -> String) -> ServiceId -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServiceId -> String
serviceIdToString

serviceIdToString :: ServiceId -> String
serviceIdToString :: ServiceId -> String
serviceIdToString (ServiceId String
s) = String
s

-- | Type for basic master to worker messaging
data WorkerMessage = Connected | ShutDown
  deriving (ReadPrec [WorkerMessage]
ReadPrec WorkerMessage
Int -> ReadS WorkerMessage
ReadS [WorkerMessage]
(Int -> ReadS WorkerMessage)
-> ReadS [WorkerMessage]
-> ReadPrec WorkerMessage
-> ReadPrec [WorkerMessage]
-> Read WorkerMessage
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [WorkerMessage]
$creadListPrec :: ReadPrec [WorkerMessage]
readPrec :: ReadPrec WorkerMessage
$creadPrec :: ReadPrec WorkerMessage
readList :: ReadS [WorkerMessage]
$creadList :: ReadS [WorkerMessage]
readsPrec :: Int -> ReadS WorkerMessage
$creadsPrec :: Int -> ReadS WorkerMessage
Read, Int -> WorkerMessage -> ShowS
[WorkerMessage] -> ShowS
WorkerMessage -> String
(Int -> WorkerMessage -> ShowS)
-> (WorkerMessage -> String)
-> ([WorkerMessage] -> ShowS)
-> Show WorkerMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerMessage] -> ShowS
$cshowList :: [WorkerMessage] -> ShowS
show :: WorkerMessage -> String
$cshow :: WorkerMessage -> String
showsPrec :: Int -> WorkerMessage -> ShowS
$cshowsPrec :: Int -> WorkerMessage -> ShowS
Show, (forall x. WorkerMessage -> Rep WorkerMessage x)
-> (forall x. Rep WorkerMessage x -> WorkerMessage)
-> Generic WorkerMessage
forall x. Rep WorkerMessage x -> WorkerMessage
forall x. WorkerMessage -> Rep WorkerMessage x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep WorkerMessage x -> WorkerMessage
$cfrom :: forall x. WorkerMessage -> Rep WorkerMessage x
Generic, Get WorkerMessage
[WorkerMessage] -> Put
WorkerMessage -> Put
(WorkerMessage -> Put)
-> Get WorkerMessage
-> ([WorkerMessage] -> Put)
-> Binary WorkerMessage
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [WorkerMessage] -> Put
$cputList :: [WorkerMessage] -> Put
get :: Get WorkerMessage
$cget :: Get WorkerMessage
put :: WorkerMessage -> Put
$cput :: WorkerMessage -> Put
Binary)

data RemoteError = RemoteError ServiceId RemoteErrorType
  deriving (Int -> RemoteError -> ShowS
[RemoteError] -> ShowS
RemoteError -> String
(Int -> RemoteError -> ShowS)
-> (RemoteError -> String)
-> ([RemoteError] -> ShowS)
-> Show RemoteError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RemoteError] -> ShowS
$cshowList :: [RemoteError] -> ShowS
show :: RemoteError -> String
$cshow :: RemoteError -> String
showsPrec :: Int -> RemoteError -> ShowS
$cshowsPrec :: Int -> RemoteError -> ShowS
Show, Show RemoteError
Typeable RemoteError
Typeable RemoteError
-> Show RemoteError
-> (RemoteError -> SomeException)
-> (SomeException -> Maybe RemoteError)
-> (RemoteError -> String)
-> Exception RemoteError
SomeException -> Maybe RemoteError
RemoteError -> String
RemoteError -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: RemoteError -> String
$cdisplayException :: RemoteError -> String
fromException :: SomeException -> Maybe RemoteError
$cfromException :: SomeException -> Maybe RemoteError
toException :: RemoteError -> SomeException
$ctoException :: RemoteError -> SomeException
$cp2Exception :: Show RemoteError
$cp1Exception :: Typeable RemoteError
Exception)

-- | Detailed type for 'RemoteError'. The constructors correspond to various
-- possible 'AsyncResult's.
data RemoteErrorType
  = RemoteAsyncFailed DiedReason
  | RemoteAsyncLinkFailed DiedReason
  | RemoteAsyncCancelled
  | RemoteAsyncPending
  | RemoteException String
  deriving (Int -> RemoteErrorType -> ShowS
[RemoteErrorType] -> ShowS
RemoteErrorType -> String
(Int -> RemoteErrorType -> ShowS)
-> (RemoteErrorType -> String)
-> ([RemoteErrorType] -> ShowS)
-> Show RemoteErrorType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RemoteErrorType] -> ShowS
$cshowList :: [RemoteErrorType] -> ShowS
show :: RemoteErrorType -> String
$cshow :: RemoteErrorType -> String
showsPrec :: Int -> RemoteErrorType -> ShowS
$cshowsPrec :: Int -> RemoteErrorType -> ShowS
Show)

data WorkerConnectionTimeout = WorkerConnectionTimeout ServiceId
  deriving (Int -> WorkerConnectionTimeout -> ShowS
[WorkerConnectionTimeout] -> ShowS
WorkerConnectionTimeout -> String
(Int -> WorkerConnectionTimeout -> ShowS)
-> (WorkerConnectionTimeout -> String)
-> ([WorkerConnectionTimeout] -> ShowS)
-> Show WorkerConnectionTimeout
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerConnectionTimeout] -> ShowS
$cshowList :: [WorkerConnectionTimeout] -> ShowS
show :: WorkerConnectionTimeout -> String
$cshow :: WorkerConnectionTimeout -> String
showsPrec :: Int -> WorkerConnectionTimeout -> ShowS
$cshowsPrec :: Int -> WorkerConnectionTimeout -> ShowS
Show, Show WorkerConnectionTimeout
Typeable WorkerConnectionTimeout
Typeable WorkerConnectionTimeout
-> Show WorkerConnectionTimeout
-> (WorkerConnectionTimeout -> SomeException)
-> (SomeException -> Maybe WorkerConnectionTimeout)
-> (WorkerConnectionTimeout -> String)
-> Exception WorkerConnectionTimeout
SomeException -> Maybe WorkerConnectionTimeout
WorkerConnectionTimeout -> String
WorkerConnectionTimeout -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: WorkerConnectionTimeout -> String
$cdisplayException :: WorkerConnectionTimeout -> String
fromException :: SomeException -> Maybe WorkerConnectionTimeout
$cfromException :: SomeException -> Maybe WorkerConnectionTimeout
toException :: WorkerConnectionTimeout -> SomeException
$ctoException :: WorkerConnectionTimeout -> SomeException
$cp2Exception :: Show WorkerConnectionTimeout
$cp1Exception :: Typeable WorkerConnectionTimeout
Exception)

-- | 'WorkerLauncher' type parametrized by a type for job id.
data WorkerLauncher j = WorkerLauncher
  { -- | A function that launches a worker for the given 'ServiceId' on
    -- the master 'NodeId' and supplies its job id to the given
    -- continuation
    WorkerLauncher j
-> forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
withLaunchedWorker :: forall b . NodeId -> ServiceId -> (j -> Process b) -> Process b
    -- | Timeout for the worker to connect. If the worker is launched
    -- into a Slurm queue, it may take a very long time to connect. In
    -- that case, it is recommended to set 'connectionTimeout' =
    -- 'Nothing'.
  , WorkerLauncher j -> Maybe NominalDiffTime
connectionTimeout :: Maybe NominalDiffTime
    -- | A handler for 'RemoteError's.  'onRemoteError' can do things
    -- like blocking before rerunning the remote process, or simply
    -- rethrowing the error.
  , WorkerLauncher j -> forall b. RemoteError -> Process b -> Process b
onRemoteError     :: forall b . RemoteError -> Process b -> Process b
  }

-- * Functions for running a worker

-- | Run a Process locally using the default
-- 'RemoteTable'. Additionally allows a return value for the
-- 'Process'.
runProcessLocal :: Process a -> IO a
runProcessLocal :: Process a -> IO a
runProcessLocal = RemoteTable -> Process a -> IO a
forall a. RemoteTable -> Process a -> IO a
runProcessLocalWithRT RemoteTable
Node.initRemoteTable

-- | Run a Process locally using the specified
-- 'RemoteTable'. Additionally allows a return value for the
-- 'Process'.
runProcessLocalWithRT :: RemoteTable -> Process a -> IO a
runProcessLocalWithRT :: RemoteTable -> Process a -> IO a
runProcessLocalWithRT RemoteTable
rt Process a
process = do
  MVar a
resultVar <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
  RemoteTable -> Process () -> IO ()
runProcessLocalWithRT_ RemoteTable
rt (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ Process a
process Process a -> (a -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (a -> IO ()) -> a -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
resultVar
  MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
resultVar

-- | Spawns a new local "Control.Distributed.Process.Node" and runs
-- the given 'Process' on it. Waits for the process to finish.
--
-- Binds to the first available port by specifying port 0.
runProcessLocalWithRT_ :: RemoteTable -> Process () -> IO ()
runProcessLocalWithRT_ :: RemoteTable -> Process () -> IO ()
runProcessLocalWithRT_ RemoteTable
rtable Process ()
process = do
  -- This doesn't work on expanse. Comment this line out on expanse:
  String
host <- IO String
getHostName
  -- This does work on expanse, but not anywhere else. Uncomment this
  -- line on expanse:
  --host <- getExternalHostName
  TCPAddr -> TCPParameters -> IO (Either IOException Transport)
NT.createTransport (String -> String -> TCPAddr
NT.defaultTCPAddr String
host String
"0") TCPParameters
NT.defaultTCPParameters IO (Either IOException Transport)
-> (Either IOException Transport -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left IOException
e -> IOException -> IO ()
forall (m :: * -> *) e a.
(MonadThrow m, MonadIO m, Exception e) =>
e -> m a
Log.throw IOException
e
    Right Transport
t -> do
      LocalNode
node <- Transport -> RemoteTable -> IO LocalNode
Node.newLocalNode Transport
t RemoteTable
rtable
      Text -> NodeId -> IO ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Running on node" (LocalNode -> NodeId
Node.localNodeId LocalNode
node)
      LocalNode -> Process () -> IO ()
Node.runProcess LocalNode
node Process ()
process

-- | Get a hostname for the current machine that does not correspond
-- to a local network address (127.* or 10.*)
getExternalHostName :: IO String
getExternalHostName :: IO String
getExternalHostName = do
  -- TODO: Here 'stayopen' is set to True. Is this an ok choice?
  [HostEntry]
entries <- Bool -> IO [HostEntry]
getHostEntries Bool
True
  case (HostEntry -> Bool) -> [HostEntry] -> [HostEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter ((HostAddress -> Bool) -> [HostAddress] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Bool -> Bool
not (Bool -> Bool) -> (HostAddress -> Bool) -> HostAddress -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostAddress -> Bool
isLocal) ([HostAddress] -> Bool)
-> (HostEntry -> [HostAddress]) -> HostEntry -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostEntry -> [HostAddress]
hostAddresses) [HostEntry]
entries of
    HostEntry
e : [HostEntry]
_ -> String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ HostEntry -> String
hostName HostEntry
e
    []    -> String -> IO String
forall (m :: * -> *) a. MonadIO m => String -> m a
Log.throwError (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"Cannot find external network address among host entries: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ [HostEntry] -> String
forall a. Show a => a -> String
show [HostEntry]
entries
  where
    isLocal :: HostAddress -> Bool
isLocal HostAddress
a = case HostAddress -> (Word8, Word8, Word8, Word8)
hostAddressToTuple HostAddress
a of
      (Word8
127, Word8
_, Word8
_, Word8
_) -> Bool
True
      (Word8
10,  Word8
_, Word8
_, Word8
_) -> Bool
True
      (Word8, Word8, Word8, Word8)
_              -> Bool
False

-- | Convert a 'Text' representation of 'EndPointAddress' to 'NodeId'.
-- The format for the end point address is \"TCP host:TCP port:endpoint id\"
addressToNodeId :: Text -> NodeId
addressToNodeId :: Text -> NodeId
addressToNodeId = EndPointAddress -> NodeId
NodeId (EndPointAddress -> NodeId)
-> (Text -> EndPointAddress) -> Text -> NodeId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> EndPointAddress
EndPointAddress (ByteString -> EndPointAddress)
-> (Text -> ByteString) -> Text -> EndPointAddress
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
E.encodeUtf8

-- | Inverse to 'addressToNodeId'
nodeIdToAddress :: NodeId -> Text
nodeIdToAddress :: NodeId -> Text
nodeIdToAddress (NodeId (EndPointAddress ByteString
addr)) = ByteString -> Text
E.decodeUtf8 ByteString
addr

masterNodeIdLabel :: String
masterNodeIdLabel :: String
masterNodeIdLabel = String
"masterNodeIdLabel"

masterNodeIdStatic :: Static (Maybe NodeId)
masterNodeIdStatic :: Static (Maybe NodeId)
masterNodeIdStatic = String -> Static (Maybe NodeId)
forall a. String -> Static a
staticLabel String
masterNodeIdLabel

getMasterNodeId :: Process (Maybe NodeId)
getMasterNodeId :: Process (Maybe NodeId)
getMasterNodeId = Static (Maybe NodeId) -> Process (Maybe NodeId)
forall a. Typeable a => Static a -> Process a
unStatic Static (Maybe NodeId)
masterNodeIdStatic

registerMasterNodeId :: Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId :: Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId Maybe NodeId
nid = String -> Dynamic -> RemoteTable -> RemoteTable
registerStatic String
masterNodeIdLabel (Maybe NodeId -> Dynamic
forall a. Typeable a => a -> Dynamic
toDynamic Maybe NodeId
nid)

initWorkerRemoteTable :: Maybe NodeId -> RemoteTable
initWorkerRemoteTable :: Maybe NodeId -> RemoteTable
initWorkerRemoteTable Maybe NodeId
nid = Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId Maybe NodeId
nid RemoteTable
Node.initRemoteTable

-- | The main worker process.
--
-- Repeatedly (at most 5 times) send our own 'ProcessId' and the send end of a typed channel
-- ('SendPort' 'WorkerMessage') to a master node until it replies
-- 'Connected' (timeout 10 seconds for each attempt).
-- Then 'expect' a 'ShutDown' signal.
--
-- While waiting, other processes will be run in a different thread,
-- invoked by master through our 'NodeId' (which it extracts from 'ProcessId')
worker
  :: NodeId     -- ^ 'NodeId' of the master node
  -> ServiceId  -- ^ 'ServiceId' of master 'Process' (should be 'register'ed)
  -> Process ()
worker :: NodeId -> ServiceId -> Process ()
worker NodeId
masterNode serviceId :: ServiceId
serviceId@(ServiceId String
masterService) = do
  ProcessId
self <- Process ProcessId
getSelfPid
  (SendPort WorkerMessage
sendPort, ReceivePort WorkerMessage
receivePort) <- Process (SendPort WorkerMessage, ReceivePort WorkerMessage)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan

  let connectToMaster :: MaybeT Process WorkerMessage
connectToMaster = Process (Maybe WorkerMessage) -> MaybeT Process WorkerMessage
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Process (Maybe WorkerMessage) -> MaybeT Process WorkerMessage)
-> Process (Maybe WorkerMessage) -> MaybeT Process WorkerMessage
forall a b. (a -> b) -> a -> b
$ do
        Text -> NodeId -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Connecting to master" NodeId
masterNode
        NodeId
-> String
-> (ProcessId, ServiceId, SendPort WorkerMessage)
-> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
masterNode String
masterService (ProcessId
self, ServiceId
serviceId, SendPort WorkerMessage
sendPort)
        Int -> ReceivePort WorkerMessage -> Process (Maybe WorkerMessage)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout (Int
10Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000) ReceivePort WorkerMessage
receivePort

  MaybeT Process WorkerMessage -> Process (Maybe WorkerMessage)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT ([MaybeT Process WorkerMessage] -> MaybeT Process WorkerMessage
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum (Int
-> MaybeT Process WorkerMessage -> [MaybeT Process WorkerMessage]
forall a. Int -> a -> [a]
replicate Int
5 MaybeT Process WorkerMessage
connectToMaster)) Process (Maybe WorkerMessage)
-> (Maybe WorkerMessage -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just WorkerMessage
Connected -> do
      Text -> Process ()
forall (m :: * -> *). MonadIO m => Text -> m ()
Log.text Text
"Successfully connected to master."
      Process WorkerMessage
forall a. Serializable a => Process a
expect Process WorkerMessage
-> (WorkerMessage -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        WorkerMessage
Connected -> String -> Process ()
forall (m :: * -> *) a. MonadIO m => String -> m a
Log.throwError String
"Unexpected 'Connected' received."
        WorkerMessage
ShutDown  -> Text -> Process ()
forall (m :: * -> *). MonadIO m => Text -> m ()
Log.text Text
"Shutting down."
    Maybe WorkerMessage
_ -> Text -> Process ()
forall (m :: * -> *). MonadIO m => Text -> m ()
Log.text Text
"Couldn't connect to master" Process () -> Process () -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Process ()
forall a b. Serializable a => a -> Process b
die ()

-- | Registers ('register') the current process under a random 'ServiceId', then
-- passes the 'ServiceId' to the given continuation.
-- After the continuation returns, unregisters ('unregister') the 'ServiceId'.
withServiceId :: (ServiceId -> Process a) -> Process a
withServiceId :: (ServiceId -> Process a) -> Process a
withServiceId = Process ServiceId
-> (ServiceId -> Process ())
-> (ServiceId -> Process a)
-> Process a
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket Process ServiceId
newServiceId (\(ServiceId String
s) -> String -> Process ()
unregister String
s)
  where
    newServiceId :: Process ServiceId
newServiceId = do
      String
s <- IO String -> Process String
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO String -> Process String) -> IO String -> Process String
forall a b. (a -> b) -> a -> b
$ Unique -> String
forall a. Show a => a -> String
show (Unique -> String) -> IO Unique -> IO String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Unique
newUnique
      Process ProcessId
getSelfPid Process ProcessId -> (ProcessId -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= String -> ProcessId -> Process ()
register String
s
      ServiceId -> Process ServiceId
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> ServiceId
ServiceId String
s)

-- | Start a new remote worker using 'WorkerLauncher' and call a continuation
-- with the 'NodeId' and 'ServiceId' of that worker. The continuation is
-- run in the process that is registered under the 'ServiceId' (see 'withServiceId').
--
-- Throws ('throwM') a 'WorkerConnectionTimeout' if worker times out (timeout
-- described in 'WorkerLauncher')
--
-- The call to the user function is 'bracket'ed by worker startup and shutdown
-- procedures.
withService
  :: Show j
  => WorkerLauncher j
  -> (NodeId -> ServiceId -> Process a)
  -> Process a
withService :: WorkerLauncher j -> (NodeId -> ServiceId -> Process a) -> Process a
withService WorkerLauncher{Maybe NominalDiffTime
forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
onRemoteError :: forall j.
WorkerLauncher j -> forall b. RemoteError -> Process b -> Process b
connectionTimeout :: forall j. WorkerLauncher j -> Maybe NominalDiffTime
withLaunchedWorker :: forall j.
WorkerLauncher j
-> forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
..} NodeId -> ServiceId -> Process a
go = (ServiceId -> Process a) -> Process a
forall a. (ServiceId -> Process a) -> Process a
withServiceId ((ServiceId -> Process a) -> Process a)
-> (ServiceId -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \ServiceId
serviceId -> do
  NodeId
nid <- Process NodeId
getSelfNode
  -- fire up a remote worker with instructions to contact this node
  NodeId -> ServiceId -> (j -> Process a) -> Process a
forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId ((j -> Process a) -> Process a) -> (j -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \j
jobId -> do
    Text -> (ServiceId, j) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Deployed worker" (ServiceId
serviceId, j
jobId)
    -- Wait for the worker to connect and send its id
    let
      awaitWorker :: Process ProcessId
awaitWorker = do
        Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
connectionResult <- case Maybe NominalDiffTime
connectionTimeout of
          Just NominalDiffTime
t  -> Int
-> Process (Maybe (ProcessId, ServiceId, SendPort WorkerMessage))
forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout (NominalDiffTime -> Int
nominalDiffTimeToMicroseconds NominalDiffTime
t)
          Maybe NominalDiffTime
Nothing -> ((ProcessId, ServiceId, SendPort WorkerMessage)
 -> Maybe (ProcessId, ServiceId, SendPort WorkerMessage))
-> Process (ProcessId, ServiceId, SendPort WorkerMessage)
-> Process (Maybe (ProcessId, ServiceId, SendPort WorkerMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ProcessId, ServiceId, SendPort WorkerMessage)
-> Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
forall a. a -> Maybe a
Just Process (ProcessId, ServiceId, SendPort WorkerMessage)
forall a. Serializable a => Process a
expect
        case Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
connectionResult of
          Maybe (ProcessId, ServiceId, SendPort WorkerMessage)
Nothing -> WorkerConnectionTimeout -> Process ProcessId
forall (m :: * -> *) e a.
(MonadThrow m, MonadIO m, Exception e) =>
e -> m a
Log.throw (ServiceId -> WorkerConnectionTimeout
WorkerConnectionTimeout ServiceId
serviceId)
          Just (ProcessId
workerId :: ProcessId, ServiceId
workerServiceId :: ServiceId, SendPort WorkerMessage
_ :: SendPort WorkerMessage)
            | ServiceId
workerServiceId ServiceId -> ServiceId -> Bool
forall a. Eq a => a -> a -> Bool
/= ServiceId
serviceId -> do
                Text -> (ServiceId, j, ProcessId) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Ignoring message from unknown worker" (ServiceId
workerServiceId, j
jobId, ProcessId
workerId)
                Process ProcessId
awaitWorker
          Just (ProcessId
workerId :: ProcessId, ServiceId
_ :: ServiceId, SendPort WorkerMessage
replyTo :: SendPort WorkerMessage)
            | Bool
otherwise -> do
                Text -> (ServiceId, j, ProcessId) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Acquired worker" (ServiceId
serviceId, j
jobId, ProcessId
workerId)
                -- Confirm that the worker connected successfully
                SendPort WorkerMessage -> WorkerMessage -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort WorkerMessage
replyTo WorkerMessage
Connected
                ProcessId -> Process ProcessId
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
workerId
      shutdownWorker :: ProcessId -> Process ()
shutdownWorker ProcessId
workerId = do
        Text -> (ServiceId, j) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Worker finished" (ServiceId
serviceId, j
jobId)
        ProcessId -> WorkerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
workerId WorkerMessage
ShutDown
    Process ProcessId
-> (ProcessId -> Process ())
-> (ProcessId -> Process a)
-> Process a
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket Process ProcessId
awaitWorker ProcessId -> Process ()
shutdownWorker (\ProcessId
wId -> NodeId -> ServiceId -> Process a
go (ProcessId -> NodeId
processNodeId ProcessId
wId) ServiceId
serviceId)

-- * Functions related to running remote functions

-- | A process that generates the 'Closure' to be run on a remote machine.
data SerializableClosureProcess a = SerializableClosureProcess
  { -- | Process to generate a closure. This process will be run when
    -- a remote location has been identified that the closure can be
    -- sent to.
    SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
runClosureProcess :: Process (Closure (Process (Either String a)))
    -- | Dict for seralizing the result.
  , SerializableClosureProcess a
-> Closure (SerializableDict (Either String a))
staticSDict       :: Closure (SerializableDict (Either String a))
    -- | If a remote computation fails, it may be added to the 'HoldMap'
    -- to be tried again. In that case, we don't want to evaluate
    -- 'runClosureProcess' again, so we use an 'MVar' to memoize the
    -- result of 'runClosureProcess'.
  , SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar        :: MVar (Closure (Process (Either String a)))
  }

-- | Get closure and memoize the result
getClosure :: SerializableClosureProcess a -> Process (Closure (Process (Either String a)))
getClosure :: SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
getClosure SerializableClosureProcess a
s = do
  Process Bool -> Process () -> Process ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (IO Bool -> Process Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar (Closure (Process (Either String a))) -> IO Bool
forall a. MVar a -> IO Bool
isEmptyMVar (SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar SerializableClosureProcess a
s))) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
    Closure (Process (Either String a))
c <- SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
runClosureProcess SerializableClosureProcess a
s
    IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (Closure (Process (Either String a)))
-> Closure (Process (Either String a)) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar SerializableClosureProcess a
s) Closure (Process (Either String a))
c
  IO (Closure (Process (Either String a)))
-> Process (Closure (Process (Either String a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Closure (Process (Either String a)))
 -> Process (Closure (Process (Either String a))))
-> IO (Closure (Process (Either String a)))
-> Process (Closure (Process (Either String a)))
forall a b. (a -> b) -> a -> b
$ MVar (Closure (Process (Either String a)))
-> IO (Closure (Process (Either String a)))
forall a. MVar a -> IO a
readMVar (SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> MVar (Closure (Process (Either String a)))
closureVar SerializableClosureProcess a
s)

-- | The type of a function that takes a 'SerializableClosureProcess'
-- and runs the 'Closure' on a remote machine. In case the remote
-- machine returns a 'Left' value (i.e. an error), throws this value
-- wrapped in 'RemoteError' of type 'RemoteException'. May throw other
-- 'RemoteError's if remote execution fails in any way.
type RemoteProcessRunner =
  forall a . (Binary a, Typeable a) => SerializableClosureProcess a -> Process a

-- | Starts a new remote worker and runs a user function, which is
-- supplied with 'RemoteProcessRunner' for running closures on that
-- worker. If a 'RemoteError' occurs, it is handled by the
-- 'onRemoteError' supplied in the 'WorkerLauncher'.
withRemoteRunProcess
  :: Show j
  => WorkerLauncher j
  -> (RemoteProcessRunner -> Process a)
  -> Process a
withRemoteRunProcess :: WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
withRemoteRunProcess WorkerLauncher j
workerLauncher RemoteProcessRunner -> Process a
go =
  Process a
goWithRemote Process a -> (RemoteError -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \RemoteError
e -> WorkerLauncher j -> RemoteError -> Process a -> Process a
forall j.
WorkerLauncher j -> forall b. RemoteError -> Process b -> Process b
onRemoteError WorkerLauncher j
workerLauncher RemoteError
e (WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
forall j a.
Show j =>
WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
withRemoteRunProcess WorkerLauncher j
workerLauncher RemoteProcessRunner -> Process a
go)
  where
    goWithRemote :: Process a
goWithRemote =
      WorkerLauncher j -> (NodeId -> ServiceId -> Process a) -> Process a
forall j a.
Show j =>
WorkerLauncher j -> (NodeId -> ServiceId -> Process a) -> Process a
withService WorkerLauncher j
workerLauncher ((NodeId -> ServiceId -> Process a) -> Process a)
-> (NodeId -> ServiceId -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \NodeId
workerNodeId ServiceId
serviceId ->
      RemoteProcessRunner -> Process a
go (RemoteProcessRunner -> Process a)
-> RemoteProcessRunner -> Process a
forall a b. (a -> b) -> a -> b
$ \SerializableClosureProcess a
s -> do
      Closure (Process (Either String a))
c <- SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
forall a.
SerializableClosureProcess a
-> Process (Closure (Process (Either String a)))
getClosure SerializableClosureProcess a
s
      AsyncResult (Either String a)
a <- Async (Either String a) -> Process (AsyncResult (Either String a))
forall a. Async a -> Process (AsyncResult a)
wait (Async (Either String a)
 -> Process (AsyncResult (Either String a)))
-> Process (Async (Either String a))
-> Process (AsyncResult (Either String a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AsyncTask (Either String a) -> Process (Async (Either String a))
forall a. Serializable a => AsyncTask a -> Process (Async a)
async (Process (Either String a) -> AsyncTask (Either String a)
forall a. Process a -> AsyncTask a
task (Process (Either String a) -> AsyncTask (Either String a))
-> Process (Either String a) -> AsyncTask (Either String a)
forall a b. (a -> b) -> a -> b
$ Closure (SerializableDict (Either String a))
-> NodeId
-> Closure (Process (Either String a))
-> Process (Either String a)
forall a.
(Binary a, Typeable a) =>
Closure (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
call' (SerializableClosureProcess a
-> Closure (SerializableDict (Either String a))
forall a.
SerializableClosureProcess a
-> Closure (SerializableDict (Either String a))
staticSDict SerializableClosureProcess a
s) NodeId
workerNodeId Closure (Process (Either String a))
c)
      case AsyncResult (Either String a)
a of
        AsyncDone (Right a
result) -> a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
result
        -- By throwing an exception out of withService, we ensure that
        -- the offending worker will be sent the ShutDown signal,
        -- since withService uses 'bracket'
        AsyncDone (Left String
err)     -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ String -> RemoteErrorType
RemoteException String
err
        AsyncFailed DiedReason
reason       -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ DiedReason -> RemoteErrorType
RemoteAsyncFailed DiedReason
reason
        AsyncLinkFailed DiedReason
reason   -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ DiedReason -> RemoteErrorType
RemoteAsyncLinkFailed DiedReason
reason
        AsyncResult (Either String a)
AsyncCancelled           -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ RemoteErrorType
RemoteAsyncCancelled
        AsyncResult (Either String a)
AsyncPending             -> RemoteError -> Process a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (RemoteError -> Process a) -> RemoteError -> Process a
forall a b. (a -> b) -> a -> b
$ ServiceId -> RemoteErrorType -> RemoteError
RemoteError ServiceId
serviceId (RemoteErrorType -> RemoteError) -> RemoteErrorType -> RemoteError
forall a b. (a -> b) -> a -> b
$ RemoteErrorType
RemoteAsyncPending

-- | Catch any exception, log it, and return as a string. In this
-- way, errors will be logged by the worker where they occurred,
-- and also sent up the tree.
tryLogException :: Process b -> Process (Either String b)
tryLogException :: Process b -> Process (Either String b)
tryLogException Process b
go = Process b -> Process (Either SomeException b)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try Process b
go Process (Either SomeException b)
-> (Either SomeException b -> Process (Either String b))
-> Process (Either String b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
  Left (SomeException
e :: SomeException) -> do
    SomeException -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => a -> m ()
Log.err SomeException
e
    Either String b -> Process (Either String b)
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> Either String b
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
  Right b
b ->
    Either String b -> Process (Either String b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either String b
forall a b. b -> Either a b
Right b
b)

-- | Construct a SerializableClosureProcess for evaluating the closure
-- 'mb' on a remote machine. The action 'mb' that produces the
-- 'Closure' will only be run once -- when a worker first becomes
-- available. The MVar 'v' caches the resulting Closure (so it can be
-- re-used in the event of an error and retry), which will then be
-- sent across the network.
--
-- Note that we wrap the given closure in tryLogException. Thus,
-- exception handling is added by catching any exception @e@, logging
-- @e@ with 'Log.err' (on the worker), and returning a 'Left' result
-- with the textual representation of @e@ from 'Show' instance.
mkSerializableClosureProcess
  :: Typeable b
  => Closure (Dict (Serializable b))
  -> Process (Closure (Process b))
  -> Process (SerializableClosureProcess b)
mkSerializableClosureProcess :: Closure (Dict (Serializable b))
-> Process (Closure (Process b))
-> Process (SerializableClosureProcess b)
mkSerializableClosureProcess Closure (Dict (Serializable b))
bDict Process (Closure (Process b))
mb = do
  MVar (Closure (Process (Either String b)))
v <- IO (MVar (Closure (Process (Either String b))))
-> Process (MVar (Closure (Process (Either String b))))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (Closure (Process (Either String b))))
forall a. IO (MVar a)
newEmptyMVar
  SerializableClosureProcess b
-> Process (SerializableClosureProcess b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerializableClosureProcess b
 -> Process (SerializableClosureProcess b))
-> SerializableClosureProcess b
-> Process (SerializableClosureProcess b)
forall a b. (a -> b) -> a -> b
$ SerializableClosureProcess :: forall a.
Process (Closure (Process (Either String a)))
-> Closure (SerializableDict (Either String a))
-> MVar (Closure (Process (Either String a)))
-> SerializableClosureProcess a
SerializableClosureProcess
    { runClosureProcess :: Process (Closure (Process (Either String b)))
runClosureProcess = (Closure (Process b) -> Closure (Process (Either String b)))
-> Process (Closure (Process b))
-> Process (Closure (Process (Either String b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (static Process b -> Process (Either String b)
forall b. Process b -> Process (Either String b)
tryLogException StaticPtr (Process b -> Process (Either String b))
-> Closure (Process b) -> Closure (Process (Either String b))
forall a b.
(Typeable a, Typeable b) =>
StaticPtr (a -> b) -> Closure a -> Closure b
`ptrAp`) Process (Closure (Process b))
mb
    , staticSDict :: Closure (SerializableDict (Either String b))
staticSDict       = static (\Dict (Serializable b)
Dict -> SerializableDict (Either String b)
forall a. Serializable a => SerializableDict a
SerializableDict) StaticPtr
  (Dict (Serializable b) -> SerializableDict (Either String b))
-> Closure (Dict (Serializable b))
-> Closure (SerializableDict (Either String b))
forall a b.
(Typeable a, Typeable b) =>
StaticPtr (a -> b) -> Closure a -> Closure b
`ptrAp` Closure (Dict (Serializable b))
bDict
    , closureVar :: MVar (Closure (Process (Either String b)))
closureVar        = MVar (Closure (Process (Either String b)))
v
    }