{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Hyperion.Job where
import qualified Control.Concurrent.Async as Async
import Control.Distributed.Process (NodeId, Process, spawnLocal)
import Control.Lens (lens)
import Control.Monad.Catch (throwM, try)
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.Trans.Cont (ContT (..), evalContT)
import Data.Binary (Binary)
import qualified Data.Map as Map
import Data.Maybe (catMaybes)
import qualified Data.Text as T
import Data.Typeable (Typeable)
import Hyperion.Cluster
import Hyperion.Command (hyperionWorkerCommand)
import qualified Hyperion.Database as DB
import Hyperion.HasWorkers (HasWorkerLauncher (..),
remoteEvalM)
import qualified Hyperion.Log as Log
import Hyperion.Remote
import Hyperion.Slurm (JobId (..))
import qualified Hyperion.Slurm as Slurm
import Hyperion.Static (Closure, Static (..), cAp, cPtr,
cPure, ptrAp)
import Hyperion.Util (myExecutable)
import Hyperion.WorkerCpuPool (NumCPUs (..), SSHCommand,
SSHError, WorkerAddr)
import qualified Hyperion.WorkerCpuPool as WCP
import System.FilePath.Posix (dropExtension, (<.>), (</>))
import System.Process (callProcess)
data JobEnv = JobEnv
{
JobEnv -> DatabaseConfig
jobDatabaseConfig :: DB.DatabaseConfig
, JobEnv -> NumCPUs
jobNodeCpus :: NumCPUs
, JobEnv -> NumCPUs
jobTaskCpus :: NumCPUs
, JobEnv -> ProgramInfo
jobProgramInfo :: ProgramInfo
, JobEnv -> NumCPUs -> WorkerLauncher JobId
jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
}
instance HasProgramInfo JobEnv where
toProgramInfo :: JobEnv -> ProgramInfo
toProgramInfo = JobEnv -> ProgramInfo
jobProgramInfo
data NodeLauncherConfig = NodeLauncherConfig
{
NodeLauncherConfig -> FilePath
nodeLogDir :: FilePath
, NodeLauncherConfig -> SSHCommand
nodeSshCmd :: SSHCommand
}
instance DB.HasDB JobEnv where
dbConfigLens :: (DatabaseConfig -> f DatabaseConfig) -> JobEnv -> f JobEnv
dbConfigLens = (JobEnv -> DatabaseConfig)
-> (JobEnv -> DatabaseConfig -> JobEnv)
-> Lens' JobEnv DatabaseConfig
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens JobEnv -> DatabaseConfig
get JobEnv -> DatabaseConfig -> JobEnv
set
where
get :: JobEnv -> DatabaseConfig
get = JobEnv -> DatabaseConfig
jobDatabaseConfig
set :: JobEnv -> DatabaseConfig -> JobEnv
set JobEnv
cfg DatabaseConfig
databaseConfig' = JobEnv
cfg { jobDatabaseConfig :: DatabaseConfig
jobDatabaseConfig = DatabaseConfig
databaseConfig' }
instance HasWorkerLauncher JobEnv where
toWorkerLauncher :: JobEnv -> WorkerLauncher JobId
toWorkerLauncher JobEnv{DatabaseConfig
NumCPUs
ProgramInfo
NumCPUs -> WorkerLauncher JobId
jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
jobProgramInfo :: ProgramInfo
jobTaskCpus :: NumCPUs
jobNodeCpus :: NumCPUs
jobDatabaseConfig :: DatabaseConfig
jobTaskLauncher :: JobEnv -> NumCPUs -> WorkerLauncher JobId
jobProgramInfo :: JobEnv -> ProgramInfo
jobTaskCpus :: JobEnv -> NumCPUs
jobNodeCpus :: JobEnv -> NumCPUs
jobDatabaseConfig :: JobEnv -> DatabaseConfig
..} = NumCPUs -> WorkerLauncher JobId
jobTaskLauncher NumCPUs
jobTaskCpus
type Job = ReaderT JobEnv Process
setTaskCpus :: NumCPUs -> JobEnv -> JobEnv
setTaskCpus :: NumCPUs -> JobEnv -> JobEnv
setTaskCpus NumCPUs
n JobEnv
cfg = JobEnv
cfg { jobTaskCpus :: NumCPUs
jobTaskCpus = NumCPUs
n }
runJobSlurm :: ProgramInfo -> Job a -> Process a
runJobSlurm :: ProgramInfo -> Job a -> Process a
runJobSlurm ProgramInfo
programInfo Job a
go = do
DatabaseConfig
dbConfig <- IO DatabaseConfig -> Process DatabaseConfig
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO DatabaseConfig -> Process DatabaseConfig)
-> IO DatabaseConfig -> Process DatabaseConfig
forall a b. (a -> b) -> a -> b
$ ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
programInfo
[WorkerAddr]
nodes <- IO [WorkerAddr] -> Process [WorkerAddr]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO [WorkerAddr]
WCP.getSlurmAddrs
Int
nodeCpus <- IO Int -> Process Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
Slurm.getNTasksPerNode
Maybe FilePath
maybeLogFile <- Process (Maybe FilePath)
forall (m :: * -> *). MonadIO m => m (Maybe FilePath)
Log.getLogFile
let
nodeLauncherConfig :: NodeLauncherConfig
nodeLauncherConfig = NodeLauncherConfig :: FilePath -> SSHCommand -> NodeLauncherConfig
NodeLauncherConfig
{ nodeLogDir :: FilePath
nodeLogDir = case Maybe FilePath
maybeLogFile of
Just FilePath
logFile -> FilePath -> FilePath
dropExtension FilePath
logFile
Maybe FilePath
Nothing -> ProgramInfo -> FilePath
programLogDir ProgramInfo
programInfo FilePath -> FilePath -> FilePath
</> FilePath
"workers" FilePath -> FilePath -> FilePath
</> FilePath
"workers"
, nodeSshCmd :: SSHCommand
nodeSshCmd = ProgramInfo -> SSHCommand
programSSHCommand ProgramInfo
programInfo
}
NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
forall a.
NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
withPoolLauncher NodeLauncherConfig
nodeLauncherConfig [WorkerAddr]
nodes (((NumCPUs -> WorkerLauncher JobId) -> Process a) -> Process a)
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \NumCPUs -> WorkerLauncher JobId
poolLauncher -> do
let cfg :: JobEnv
cfg = JobEnv :: DatabaseConfig
-> NumCPUs
-> NumCPUs
-> ProgramInfo
-> (NumCPUs -> WorkerLauncher JobId)
-> JobEnv
JobEnv
{ jobDatabaseConfig :: DatabaseConfig
jobDatabaseConfig = DatabaseConfig
dbConfig
, jobNodeCpus :: NumCPUs
jobNodeCpus = Int -> NumCPUs
NumCPUs Int
nodeCpus
, jobTaskCpus :: NumCPUs
jobTaskCpus = Int -> NumCPUs
NumCPUs Int
1
, jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
jobTaskLauncher = NumCPUs -> WorkerLauncher JobId
poolLauncher
, jobProgramInfo :: ProgramInfo
jobProgramInfo = ProgramInfo
programInfo
}
Job a -> JobEnv -> Process a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Job a
go JobEnv
cfg
runJobLocal :: ProgramInfo -> Job a -> IO a
runJobLocal :: ProgramInfo -> Job a -> IO a
runJobLocal ProgramInfo
programInfo Job a
go = Process a -> IO a
forall a. Process a -> IO a
runProcessLocal (Process a -> IO a) -> Process a -> IO a
forall a b. (a -> b) -> a -> b
$ do
DatabaseConfig
dbConfig <- IO DatabaseConfig -> Process DatabaseConfig
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO DatabaseConfig -> Process DatabaseConfig)
-> IO DatabaseConfig -> Process DatabaseConfig
forall a b. (a -> b) -> a -> b
$ ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
programInfo
let
withLaunchedWorker :: forall b . NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker :: NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId JobId -> Process b
goJobId = do
ProcessId
_ <- Process () -> Process ProcessId
spawnLocal (NodeId -> ServiceId -> Process ()
worker NodeId
nid ServiceId
serviceId)
JobId -> Process b
goJobId (Text -> JobId
JobName (ServiceId -> Text
serviceIdToText ServiceId
serviceId))
connectionTimeout :: Maybe a
connectionTimeout = Maybe a
forall a. Maybe a
Nothing
onRemoteError :: e -> p -> m a
onRemoteError e
e p
_ = e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM e
e
Job a -> JobEnv -> Process a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Job a
go (JobEnv -> Process a) -> JobEnv -> Process a
forall a b. (a -> b) -> a -> b
$ JobEnv :: DatabaseConfig
-> NumCPUs
-> NumCPUs
-> ProgramInfo
-> (NumCPUs -> WorkerLauncher JobId)
-> JobEnv
JobEnv
{ jobDatabaseConfig :: DatabaseConfig
jobDatabaseConfig = DatabaseConfig
dbConfig
, jobNodeCpus :: NumCPUs
jobNodeCpus = Int -> NumCPUs
NumCPUs Int
1
, jobTaskCpus :: NumCPUs
jobTaskCpus = Int -> NumCPUs
NumCPUs Int
1
, jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
jobTaskLauncher = WorkerLauncher JobId -> NumCPUs -> WorkerLauncher JobId
forall a b. a -> b -> a
const WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher{Maybe NominalDiffTime
forall a. Maybe a
forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
onRemoteError :: forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
connectionTimeout :: forall a. Maybe a
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
..}
, jobProgramInfo :: ProgramInfo
jobProgramInfo = ProgramInfo
programInfo
}
workerLauncherWithRunCmd
:: MonadIO m
=> FilePath
-> ((String, [String]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd :: FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
logDir (FilePath, [FilePath]) -> Process ()
runCmd = IO (WorkerLauncher JobId) -> m (WorkerLauncher JobId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (WorkerLauncher JobId) -> m (WorkerLauncher JobId))
-> IO (WorkerLauncher JobId) -> m (WorkerLauncher JobId)
forall a b. (a -> b) -> a -> b
$ do
FilePath
hyperionExec <- IO FilePath
myExecutable
let
withLaunchedWorker :: forall b . NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker :: NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId JobId -> Process b
goJobId = do
let jobId :: JobId
jobId = Text -> JobId
JobName (ServiceId -> Text
serviceIdToText ServiceId
serviceId)
logFile :: FilePath
logFile = FilePath
logDir FilePath -> FilePath -> FilePath
</> Text -> FilePath
T.unpack (ServiceId -> Text
serviceIdToText ServiceId
serviceId) FilePath -> FilePath -> FilePath
<.> FilePath
"log"
(FilePath, [FilePath]) -> Process ()
runCmd (FilePath
-> NodeId -> ServiceId -> FilePath -> (FilePath, [FilePath])
hyperionWorkerCommand FilePath
hyperionExec NodeId
nid ServiceId
serviceId FilePath
logFile)
JobId -> Process b
goJobId JobId
jobId
connectionTimeout :: Maybe a
connectionTimeout = Maybe a
forall a. Maybe a
Nothing
onRemoteError :: e -> p -> m a
onRemoteError e
e p
_ = e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM e
e
WorkerLauncher JobId -> IO (WorkerLauncher JobId)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher{Maybe NominalDiffTime
forall a. Maybe a
forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
onRemoteError :: forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
connectionTimeout :: forall a. Maybe a
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
..}
withNodeLauncher
:: NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
withNodeLauncher :: NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
withNodeLauncher NodeLauncherConfig{FilePath
SSHCommand
nodeSshCmd :: SSHCommand
nodeLogDir :: FilePath
nodeSshCmd :: NodeLauncherConfig -> SSHCommand
nodeLogDir :: NodeLauncherConfig -> FilePath
..} WorkerAddr
addr' Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go = case WorkerAddr
addr' of
WCP.RemoteAddr FilePath
addr -> do
WorkerLauncher JobId
sshLauncher <- FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> Process (WorkerLauncher JobId)
forall (m :: * -> *).
MonadIO m =>
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
nodeLogDir (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> ((FilePath, [FilePath]) -> IO ())
-> (FilePath, [FilePath])
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> SSHCommand -> (FilePath, [FilePath]) -> IO ()
WCP.sshRunCmd FilePath
addr SSHCommand
nodeSshCmd)
Either SSHError a
eitherResult <- forall a.
(MonadCatch Process, Exception SSHError) =>
Process a -> Process (Either SSHError a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @Process @SSHError (Process a -> Process (Either SSHError a))
-> Process a -> Process (Either SSHError a)
forall a b. (a -> b) -> a -> b
$ do
WorkerLauncher JobId
-> (RemoteProcessRunner -> Process a) -> Process a
forall j a.
Show j =>
WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
withRemoteRunProcess WorkerLauncher JobId
sshLauncher ((RemoteProcessRunner -> Process a) -> Process a)
-> (RemoteProcessRunner -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \RemoteProcessRunner
remoteRunNode ->
let
runCmdOnNode :: (FilePath, [FilePath]) -> Process ()
runCmdOnNode (FilePath, [FilePath])
cmd = do
SerializableClosureProcess ()
scp <- Closure (Dict (Serializable ()))
-> Process (Closure (Process ()))
-> Process (SerializableClosureProcess ())
forall b.
Typeable b =>
Closure (Dict (Serializable b))
-> Process (Closure (Process b))
-> Process (SerializableClosureProcess b)
mkSerializableClosureProcess Closure (Dict (Serializable ()))
forall (c :: Constraint). Static c => Closure (Dict c)
closureDict (Process (Closure (Process ()))
-> Process (SerializableClosureProcess ()))
-> Process (Closure (Process ()))
-> Process (SerializableClosureProcess ())
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> Process (Closure (Process ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Closure (Process ()) -> Process (Closure (Process ())))
-> Closure (Process ()) -> Process (Closure (Process ()))
forall a b. (a -> b) -> a -> b
$
static (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> ((FilePath, [FilePath]) -> IO ())
-> (FilePath, [FilePath])
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath, [FilePath]) -> IO ()
runCmdLocalLog) StaticPtr ((FilePath, [FilePath]) -> Process ())
-> Closure (FilePath, [FilePath]) -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
StaticPtr (a -> b) -> Closure a -> Closure b
`ptrAp` (FilePath, [FilePath]) -> Closure (FilePath, [FilePath])
forall a. (Static (Binary a), Typeable a) => a -> Closure a
cPure (FilePath, [FilePath])
cmd
SerializableClosureProcess () -> Process ()
RemoteProcessRunner
remoteRunNode SerializableClosureProcess ()
scp
in
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> Process (WorkerLauncher JobId)
forall (m :: * -> *).
MonadIO m =>
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
nodeLogDir (FilePath, [FilePath]) -> Process ()
runCmdOnNode Process (WorkerLauncher JobId)
-> (WorkerLauncher JobId -> Process a) -> Process a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \WorkerLauncher JobId
launcher ->
Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go ((WorkerAddr, WorkerLauncher JobId)
-> Maybe (WorkerAddr, WorkerLauncher JobId)
forall a. a -> Maybe a
Just (WorkerAddr
addr', WorkerLauncher JobId
launcher))
case Either SSHError a
eitherResult of
Right a
result -> a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
result
Left SSHError
err -> do
Text -> SSHError -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.warn Text
"Couldn't start launcher" SSHError
err
Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go Maybe (WorkerAddr, WorkerLauncher JobId)
forall a. Maybe a
Nothing
WCP.LocalHost FilePath
_ ->
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> Process (WorkerLauncher JobId)
forall (m :: * -> *).
MonadIO m =>
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
nodeLogDir (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> ((FilePath, [FilePath]) -> IO ())
-> (FilePath, [FilePath])
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath, [FilePath]) -> IO ()
runCmdLocalAsync) Process (WorkerLauncher JobId)
-> (WorkerLauncher JobId -> Process a) -> Process a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \WorkerLauncher JobId
launcher ->
Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go ((WorkerAddr, WorkerLauncher JobId)
-> Maybe (WorkerAddr, WorkerLauncher JobId)
forall a. a -> Maybe a
Just (WorkerAddr
addr', WorkerLauncher JobId
launcher))
runCmdLocalAsync :: (String, [String]) -> IO ()
runCmdLocalAsync :: (FilePath, [FilePath]) -> IO ()
runCmdLocalAsync (FilePath, [FilePath])
c = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async ((FilePath -> [FilePath] -> IO ())
-> (FilePath, [FilePath]) -> IO ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry FilePath -> [FilePath] -> IO ()
callProcess (FilePath, [FilePath])
c) IO (Async ()) -> (Async () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async () -> IO ()
forall a. Async a -> IO ()
Async.link
runCmdLocalLog :: (String, [String]) -> IO ()
runCmdLocalLog :: (FilePath, [FilePath]) -> IO ()
runCmdLocalLog (FilePath, [FilePath])
c = do
Text -> (FilePath, [FilePath]) -> IO ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Running command" (FilePath, [FilePath])
c
(FilePath, [FilePath]) -> IO ()
runCmdLocalAsync (FilePath, [FilePath])
c
withPoolLauncher
:: NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
withPoolLauncher :: NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
withPoolLauncher NodeLauncherConfig
cfg [WorkerAddr]
addrs' (NumCPUs -> WorkerLauncher JobId) -> Process a
go = ContT a Process a -> Process a
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT a Process a -> Process a) -> ContT a Process a -> Process a
forall a b. (a -> b) -> a -> b
$ do
[Maybe (WorkerAddr, WorkerLauncher JobId)]
mLaunchers <- (WorkerAddr
-> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId)))
-> [WorkerAddr]
-> ContT a Process [Maybe (WorkerAddr, WorkerLauncher JobId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (((Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a)
-> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId))
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a)
-> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId)))
-> (WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a)
-> WorkerAddr
-> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
forall a.
NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
withNodeLauncher NodeLauncherConfig
cfg) [WorkerAddr]
addrs'
let launcherMap :: Map WorkerAddr (WorkerLauncher JobId)
launcherMap = [(WorkerAddr, WorkerLauncher JobId)]
-> Map WorkerAddr (WorkerLauncher JobId)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([Maybe (WorkerAddr, WorkerLauncher JobId)]
-> [(WorkerAddr, WorkerLauncher JobId)]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (WorkerAddr, WorkerLauncher JobId)]
mLaunchers)
addrs :: [WorkerAddr]
addrs = Map WorkerAddr (WorkerLauncher JobId) -> [WorkerAddr]
forall k a. Map k a -> [k]
Map.keys Map WorkerAddr (WorkerLauncher JobId)
launcherMap
WorkerCpuPool
workerCpuPool <- IO WorkerCpuPool -> ContT a Process WorkerCpuPool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([WorkerAddr] -> IO WorkerCpuPool
WCP.newJobPool [WorkerAddr]
addrs)
Text -> [WorkerAddr] -> ContT a Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Started worker launchers at" [WorkerAddr]
addrs
Process a -> ContT a Process a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Process a -> ContT a Process a) -> Process a -> ContT a Process a
forall a b. (a -> b) -> a -> b
$ (NumCPUs -> WorkerLauncher JobId) -> Process a
go ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> (NumCPUs -> WorkerLauncher JobId) -> Process a
forall a b. (a -> b) -> a -> b
$ \NumCPUs
nCpus -> WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher
{ withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker = \NodeId
nodeId ServiceId
serviceId JobId -> Process b
goJobId ->
WorkerCpuPool -> NumCPUs -> (WorkerAddr -> Process b) -> Process b
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
WorkerCpuPool -> NumCPUs -> (WorkerAddr -> m a) -> m a
WCP.withWorkerAddr WorkerCpuPool
workerCpuPool NumCPUs
nCpus ((WorkerAddr -> Process b) -> Process b)
-> (WorkerAddr -> Process b) -> Process b
forall a b. (a -> b) -> a -> b
$ \WorkerAddr
addr ->
WorkerLauncher JobId
-> NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall j.
WorkerLauncher j
-> forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
withLaunchedWorker (Map WorkerAddr (WorkerLauncher JobId)
launcherMap Map WorkerAddr (WorkerLauncher JobId)
-> WorkerAddr -> WorkerLauncher JobId
forall k a. Ord k => Map k a -> k -> a
Map.! WorkerAddr
addr) NodeId
nodeId ServiceId
serviceId JobId -> Process b
goJobId
, connectionTimeout :: Maybe NominalDiffTime
connectionTimeout = Maybe NominalDiffTime
forall a. Maybe a
Nothing
, onRemoteError :: forall b. RemoteError -> Process b -> Process b
onRemoteError = \RemoteError
e Process b
_ -> RemoteError -> Process b
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM RemoteError
e
}
remoteEvalJobM
:: (Static (Binary b), Typeable b)
=> Cluster (Closure (Job b))
-> Cluster b
remoteEvalJobM :: Cluster (Closure (Job b)) -> Cluster b
remoteEvalJobM Cluster (Closure (Job b))
mc = do
ProgramInfo
programInfo <- (ClusterEnv -> ProgramInfo)
-> ReaderT ClusterEnv Process ProgramInfo
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ClusterEnv -> ProgramInfo
clusterProgramInfo
ReaderT ClusterEnv Process (Closure (Process b)) -> Cluster b
forall (m :: * -> *) b.
(HasWorkers m, Static (Binary b), Typeable b) =>
m (Closure (Process b)) -> m b
remoteEvalM (ReaderT ClusterEnv Process (Closure (Process b)) -> Cluster b)
-> ReaderT ClusterEnv Process (Closure (Process b)) -> Cluster b
forall a b. (a -> b) -> a -> b
$ do
Closure (Job b)
c <- Cluster (Closure (Job b))
mc
Closure (Process b)
-> ReaderT ClusterEnv Process (Closure (Process b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Closure (Process b)
-> ReaderT ClusterEnv Process (Closure (Process b)))
-> Closure (Process b)
-> ReaderT ClusterEnv Process (Closure (Process b))
forall a b. (a -> b) -> a -> b
$ StaticPtr (ProgramInfo -> Job b -> Process b)
-> Closure (ProgramInfo -> Job b -> Process b)
forall a. Typeable a => StaticPtr a -> Closure a
cPtr (static ProgramInfo -> Job b -> Process b
forall a. ProgramInfo -> Job a -> Process a
runJobSlurm) Closure (ProgramInfo -> Job b -> Process b)
-> Closure ProgramInfo -> Closure (Job b -> Process b)
forall a b. Closure (a -> b) -> Closure a -> Closure b
`cAp` ProgramInfo -> Closure ProgramInfo
forall a. (Static (Binary a), Typeable a) => a -> Closure a
cPure ProgramInfo
programInfo Closure (Job b -> Process b)
-> Closure (Job b) -> Closure (Process b)
forall a b. Closure (a -> b) -> Closure a -> Closure b
`cAp` Closure (Job b)
c
remoteEvalJob
:: (Static (Binary b), Typeable b)
=> Closure (Job b)
-> Cluster b
remoteEvalJob :: Closure (Job b) -> Cluster b
remoteEvalJob = Cluster (Closure (Job b)) -> Cluster b
forall b.
(Static (Binary b), Typeable b) =>
Cluster (Closure (Job b)) -> Cluster b
remoteEvalJobM (Cluster (Closure (Job b)) -> Cluster b)
-> (Closure (Job b) -> Cluster (Closure (Job b)))
-> Closure (Job b)
-> Cluster b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Closure (Job b) -> Cluster (Closure (Job b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure