{-# LANGUAGE DeriveAnyClass             #-}
{-# LANGUAGE DerivingStrategies         #-}
{-# LANGUAGE FlexibleContexts           #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase                 #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE RankNTypes                 #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE StaticPointers             #-}
{-# LANGUAGE TypeApplications           #-}
{-# LANGUAGE TypeFamilies               #-}

module Hyperion.Job where

import qualified Control.Concurrent.Async    as Async
import           Control.Distributed.Process (NodeId, Process, spawnLocal)
import           Control.Lens                (lens)
import           Control.Monad.Catch         (throwM, try)
import           Control.Monad.Except
import           Control.Monad.Reader
import           Control.Monad.Trans.Cont    (ContT (..), evalContT)
import           Data.Binary                 (Binary)
import qualified Data.Map                    as Map
import           Data.Maybe                  (catMaybes)
import qualified Data.Text                   as T
import           Data.Typeable               (Typeable)
import           Hyperion.Cluster
import           Hyperion.Command            (hyperionWorkerCommand)
import qualified Hyperion.Database           as DB
import           Hyperion.HasWorkers         (HasWorkerLauncher (..),
                                              remoteEvalM)
import qualified Hyperion.Log                as Log
import           Hyperion.Remote
import           Hyperion.Slurm              (JobId (..))
import qualified Hyperion.Slurm              as Slurm
import           Hyperion.Static             (Closure, Static (..), cAp, cPtr,
                                              cPure, ptrAp)
import           Hyperion.Util               (myExecutable)
import           Hyperion.WorkerCpuPool      (NumCPUs (..), SSHCommand,
                                              SSHError, WorkerAddr)
import qualified Hyperion.WorkerCpuPool      as WCP
import           System.FilePath.Posix       (dropExtension, (<.>), (</>))
import           System.Process              (callProcess)

-- * General comments
-- $
-- In this module we define the 'Job' monad. It is nothing more than 'Process'
-- together with 'JobEnv' environment.
--
-- The 'JobEnv' environment represents the environment of a job running under
-- @SLURM@. We should think about a computation in 'Job' as being run on a
-- node allocated for the job by @SLURM@ and running remote computations on the
-- resources allocated to the job. The 'JobEnv' environment
-- contains
--
--     * information about the master program that scheduled the job,
--     * information about the database used for recording results of the calculations,
--     * number of CPUs available per node, as well as the number of CPUs to
--       use for remote computations spawned from the 'Job' computation ('jobTaskCpus'),
--     * 'jobTaskLauncher', which allocates 'jobTaskCpus' CPUs on some node from
--       the resources available to the job and launches a worker on that node.
--       That worker is then allowed to use the allocated number of CPUs.
--       Thanks to 'jobTaskLauncher', 'Job' is an instance of 'Hyperion.Remote.HasWorkers' and
--       we can use functions such as 'Hyperion.Remote.remoteEval'.
--
-- The common usecase is that the 'Job' computation is spawned from a 'Cluster'
-- calculation on login node via, e.g., 'remoteEvalJob' (which acquires job
-- resources from @SLURM@). The 'Job' computation then manages the job resources
-- and runs remote computations in the allocation via, e.g., 'Hyperion.Remote.remoteEval'.

-- * Documentation
-- $

-- | The environment type for 'Job' monad.
data JobEnv = JobEnv
  {
    -- | 'DB.DatabaseConfig' for the database to use
    JobEnv -> DatabaseConfig
jobDatabaseConfig :: DB.DatabaseConfig
    -- | Number of CPUs available on each node in the job
  , JobEnv -> NumCPUs
jobNodeCpus       :: NumCPUs
    -- | Number of CPUs to use for running remote functions
  , JobEnv -> NumCPUs
jobTaskCpus       :: NumCPUs
    -- | 'ProgramInfo' inherited from the master
  , JobEnv -> ProgramInfo
jobProgramInfo    :: ProgramInfo
    -- | a 'WorkerLauncher' that runs workers with the given number of CPUs allocated
  , JobEnv -> NumCPUs -> WorkerLauncher JobId
jobTaskLauncher   :: NumCPUs -> WorkerLauncher JobId
  }

instance HasProgramInfo JobEnv where
  toProgramInfo :: JobEnv -> ProgramInfo
toProgramInfo = JobEnv -> ProgramInfo
jobProgramInfo

-- | Configuration for 'withNodeLauncher'.
data NodeLauncherConfig = NodeLauncherConfig
  {
    -- | The directory to which the workers shall log.
    NodeLauncherConfig -> FilePath
nodeLogDir :: FilePath
    -- | The command used to run @ssh@. See 'SSHCommand' for description.
  , NodeLauncherConfig -> SSHCommand
nodeSshCmd :: SSHCommand
  }

-- | Make 'JobEnv' an instance of 'DB.HasDB'.
instance DB.HasDB JobEnv where
  dbConfigLens :: (DatabaseConfig -> f DatabaseConfig) -> JobEnv -> f JobEnv
dbConfigLens = (JobEnv -> DatabaseConfig)
-> (JobEnv -> DatabaseConfig -> JobEnv)
-> Lens' JobEnv DatabaseConfig
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens JobEnv -> DatabaseConfig
get JobEnv -> DatabaseConfig -> JobEnv
set
    where
      get :: JobEnv -> DatabaseConfig
get = JobEnv -> DatabaseConfig
jobDatabaseConfig
      set :: JobEnv -> DatabaseConfig -> JobEnv
set JobEnv
cfg DatabaseConfig
databaseConfig' = JobEnv
cfg { jobDatabaseConfig :: DatabaseConfig
jobDatabaseConfig = DatabaseConfig
databaseConfig' }

-- | Make 'JobEnv' an instance of 'HasWorkerLauncher'. The 'WorkerLauncher' returned
-- by 'toWorkerLauncher' launches workers with 'jobTaskCpus' CPUs available to them.
--
-- This makes 'Job' an instance of 'HasWorkers' and gives us access to functions in
-- "Hyperion.Remote".
instance HasWorkerLauncher JobEnv where
  toWorkerLauncher :: JobEnv -> WorkerLauncher JobId
toWorkerLauncher JobEnv{DatabaseConfig
NumCPUs
ProgramInfo
NumCPUs -> WorkerLauncher JobId
jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
jobProgramInfo :: ProgramInfo
jobTaskCpus :: NumCPUs
jobNodeCpus :: NumCPUs
jobDatabaseConfig :: DatabaseConfig
jobTaskLauncher :: JobEnv -> NumCPUs -> WorkerLauncher JobId
jobProgramInfo :: JobEnv -> ProgramInfo
jobTaskCpus :: JobEnv -> NumCPUs
jobNodeCpus :: JobEnv -> NumCPUs
jobDatabaseConfig :: JobEnv -> DatabaseConfig
..} = NumCPUs -> WorkerLauncher JobId
jobTaskLauncher NumCPUs
jobTaskCpus

-- | 'Job' monad is simply 'Process' with 'JobEnv' environment.
type Job = ReaderT JobEnv Process

-- | Changses 'jobTaskCpus' in 'JobEnv'
setTaskCpus :: NumCPUs -> JobEnv -> JobEnv
setTaskCpus :: NumCPUs -> JobEnv -> JobEnv
setTaskCpus NumCPUs
n JobEnv
cfg = JobEnv
cfg { jobTaskCpus :: NumCPUs
jobTaskCpus = NumCPUs
n }

-- | Runs the 'Job' monad assuming we are inside a SLURM job. In
-- practice it just fills in the environment 'JobEnv' and calls
-- 'runReaderT'. The environment is mostly constructed from @SLURM@
-- environment variables and 'ProgramInfo'. The exceptions to these
-- are 'jobTaskCpus', which is set to @'NumCPUs' 1@, and
-- 'jobTaskLauncher', which is created by 'withPoolLauncher'.
-- The log file has the form \"\/a\/b\/c\/progid\/serviceid.log\"
-- . The log directory for the node is obtained by dropping
-- the .log extension: \"\/a\/b\/c\/progid\/serviceid\"
runJobSlurm :: ProgramInfo -> Job a -> Process a
runJobSlurm :: ProgramInfo -> Job a -> Process a
runJobSlurm ProgramInfo
programInfo Job a
go = do
  DatabaseConfig
dbConfig <- IO DatabaseConfig -> Process DatabaseConfig
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO DatabaseConfig -> Process DatabaseConfig)
-> IO DatabaseConfig -> Process DatabaseConfig
forall a b. (a -> b) -> a -> b
$ ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
programInfo
  [WorkerAddr]
nodes <- IO [WorkerAddr] -> Process [WorkerAddr]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO [WorkerAddr]
WCP.getSlurmAddrs
  Int
nodeCpus <- IO Int -> Process Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
Slurm.getNTasksPerNode
  Maybe FilePath
maybeLogFile <- Process (Maybe FilePath)
forall (m :: * -> *). MonadIO m => m (Maybe FilePath)
Log.getLogFile
  let
    nodeLauncherConfig :: NodeLauncherConfig
nodeLauncherConfig = NodeLauncherConfig :: FilePath -> SSHCommand -> NodeLauncherConfig
NodeLauncherConfig
      { nodeLogDir :: FilePath
nodeLogDir = case Maybe FilePath
maybeLogFile of
          Just FilePath
logFile -> FilePath -> FilePath
dropExtension FilePath
logFile
          -- Fallback case for when Log.currentLogFile has not been
          -- set. This should never happen.
          Maybe FilePath
Nothing      -> ProgramInfo -> FilePath
programLogDir ProgramInfo
programInfo FilePath -> FilePath -> FilePath
</> FilePath
"workers" FilePath -> FilePath -> FilePath
</> FilePath
"workers"
      , nodeSshCmd :: SSHCommand
nodeSshCmd = ProgramInfo -> SSHCommand
programSSHCommand ProgramInfo
programInfo
      }
  NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
forall a.
NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
withPoolLauncher NodeLauncherConfig
nodeLauncherConfig [WorkerAddr]
nodes (((NumCPUs -> WorkerLauncher JobId) -> Process a) -> Process a)
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \NumCPUs -> WorkerLauncher JobId
poolLauncher -> do
    let cfg :: JobEnv
cfg = JobEnv :: DatabaseConfig
-> NumCPUs
-> NumCPUs
-> ProgramInfo
-> (NumCPUs -> WorkerLauncher JobId)
-> JobEnv
JobEnv
          { jobDatabaseConfig :: DatabaseConfig
jobDatabaseConfig = DatabaseConfig
dbConfig
          , jobNodeCpus :: NumCPUs
jobNodeCpus       = Int -> NumCPUs
NumCPUs Int
nodeCpus
          , jobTaskCpus :: NumCPUs
jobTaskCpus       = Int -> NumCPUs
NumCPUs Int
1
          , jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
jobTaskLauncher   = NumCPUs -> WorkerLauncher JobId
poolLauncher
          , jobProgramInfo :: ProgramInfo
jobProgramInfo    = ProgramInfo
programInfo
          }
    Job a -> JobEnv -> Process a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Job a
go JobEnv
cfg

-- | Runs the 'Job' locally in IO without using any information from a
-- SLURM environment, with some basic default settings. This function
-- is provided primarily for testing.
runJobLocal :: ProgramInfo -> Job a -> IO a
runJobLocal :: ProgramInfo -> Job a -> IO a
runJobLocal ProgramInfo
programInfo Job a
go = Process a -> IO a
forall a. Process a -> IO a
runProcessLocal (Process a -> IO a) -> Process a -> IO a
forall a b. (a -> b) -> a -> b
$ do
  DatabaseConfig
dbConfig <- IO DatabaseConfig -> Process DatabaseConfig
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO DatabaseConfig -> Process DatabaseConfig)
-> IO DatabaseConfig -> Process DatabaseConfig
forall a b. (a -> b) -> a -> b
$ ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
programInfo
  let
    withLaunchedWorker :: forall b . NodeId -> ServiceId -> (JobId -> Process b) -> Process b
    withLaunchedWorker :: NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId JobId -> Process b
goJobId = do
      ProcessId
_ <- Process () -> Process ProcessId
spawnLocal (NodeId -> ServiceId -> Process ()
worker NodeId
nid ServiceId
serviceId)
      JobId -> Process b
goJobId (Text -> JobId
JobName (ServiceId -> Text
serviceIdToText ServiceId
serviceId))
    connectionTimeout :: Maybe a
connectionTimeout = Maybe a
forall a. Maybe a
Nothing
    onRemoteError :: e -> p -> m a
onRemoteError e
e p
_ = e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM e
e
  Job a -> JobEnv -> Process a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Job a
go (JobEnv -> Process a) -> JobEnv -> Process a
forall a b. (a -> b) -> a -> b
$ JobEnv :: DatabaseConfig
-> NumCPUs
-> NumCPUs
-> ProgramInfo
-> (NumCPUs -> WorkerLauncher JobId)
-> JobEnv
JobEnv
    { jobDatabaseConfig :: DatabaseConfig
jobDatabaseConfig = DatabaseConfig
dbConfig
    , jobNodeCpus :: NumCPUs
jobNodeCpus       = Int -> NumCPUs
NumCPUs Int
1
    , jobTaskCpus :: NumCPUs
jobTaskCpus       = Int -> NumCPUs
NumCPUs Int
1
    , jobTaskLauncher :: NumCPUs -> WorkerLauncher JobId
jobTaskLauncher   = WorkerLauncher JobId -> NumCPUs -> WorkerLauncher JobId
forall a b. a -> b -> a
const WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher{Maybe NominalDiffTime
forall a. Maybe a
forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
onRemoteError :: forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
connectionTimeout :: forall a. Maybe a
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
..}
    , jobProgramInfo :: ProgramInfo
jobProgramInfo    = ProgramInfo
programInfo
    }

-- | 'WorkerLauncher' that uses the supplied command runner to launch
-- workers.  Sets 'connectionTimeout' to 'Nothing'. Uses the
-- 'ServiceId' supplied to 'withLaunchedWorker' to construct 'JobId'
-- (through 'JobName').  The supplied 'FilePath' is used as log
-- directory for the worker, with the log file name derived from
-- 'ServiceId'.
workerLauncherWithRunCmd
  :: MonadIO m
  => FilePath
  -> ((String, [String]) -> Process ())
  -> m (WorkerLauncher JobId)
workerLauncherWithRunCmd :: FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
logDir (FilePath, [FilePath]) -> Process ()
runCmd = IO (WorkerLauncher JobId) -> m (WorkerLauncher JobId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (WorkerLauncher JobId) -> m (WorkerLauncher JobId))
-> IO (WorkerLauncher JobId) -> m (WorkerLauncher JobId)
forall a b. (a -> b) -> a -> b
$ do
  FilePath
hyperionExec <- IO FilePath
myExecutable
  let
    withLaunchedWorker :: forall b . NodeId -> ServiceId -> (JobId -> Process b) -> Process b
    withLaunchedWorker :: NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId JobId -> Process b
goJobId = do
      let jobId :: JobId
jobId = Text -> JobId
JobName (ServiceId -> Text
serviceIdToText ServiceId
serviceId)
          logFile :: FilePath
logFile = FilePath
logDir FilePath -> FilePath -> FilePath
</> Text -> FilePath
T.unpack (ServiceId -> Text
serviceIdToText ServiceId
serviceId) FilePath -> FilePath -> FilePath
<.> FilePath
"log"
      (FilePath, [FilePath]) -> Process ()
runCmd (FilePath
-> NodeId -> ServiceId -> FilePath -> (FilePath, [FilePath])
hyperionWorkerCommand FilePath
hyperionExec NodeId
nid ServiceId
serviceId FilePath
logFile)
      JobId -> Process b
goJobId JobId
jobId
    connectionTimeout :: Maybe a
connectionTimeout = Maybe a
forall a. Maybe a
Nothing
    onRemoteError :: e -> p -> m a
onRemoteError e
e p
_ = e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM e
e
  WorkerLauncher JobId -> IO (WorkerLauncher JobId)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher{Maybe NominalDiffTime
forall a. Maybe a
forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
onRemoteError :: forall (m :: * -> *) e p a.
(MonadThrow m, Exception e) =>
e -> p -> m a
connectionTimeout :: forall a. Maybe a
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
..}

-- | Given a `NodeLauncherConfig` and a 'WorkerAddr' runs the continuation
-- 'Maybe' passing it a pair @('WorkerAddr', 'WorkerLauncher'
-- 'JobId')@.  Passing 'Nothing' repersents @ssh@ failure.
--
-- While 'WorkerAddr' is preserved, the passed 'WorkerLauncher'
-- launches workers on the node at 'WorkerAddr'. The launcher is
-- derived from 'workerLauncherWithRunCmd', where command runner is
-- either local shell (if 'WorkerAddr' is 'LocalHost') or a
-- 'RemoteFunction' that runs the local shell on 'WorkerAddr' via
-- 'withRemoteRunProcess' and related functions (if 'WorkerAddr' is
-- 'RemoteAddr').
--
-- Note that the process of launching a worker on the remote node will
-- actually spawn an \"utility\" worker there that will launch all new
-- workers in the continuation.  This utility worker will have its log
-- in the log dir, identified by some random 'ServiceId' and put
-- messages like \"Running command ...\".
--
-- The reason that utility workers are used on each Job node is to
-- minimize the number of calls to @ssh@ or @srun@. The naive way to
-- launch workers in the 'Job' monad would be to determine what node
-- they should be run on, and run the hyperion worker command via
-- @ssh@. Unfortunately, many clusters have flakey @ssh@
-- configurations that start throwing errors if @ssh@ is called too
-- many times in quick succession. @ssh@ also has to perform
-- authentication. Experience shows that @srun@ is also not a good
-- solution to this problem, since @srun@ talks to @SLURM@ to manage
-- resources and this can take a long time, affecting
-- performance. Instead, we @ssh@ exactly once to each node in the Job
-- (besides the head node), and start utility workers there. These
-- workers can then communicate with the head node via the usual
-- machinery of @hyperion@ --- effectively, we keep a connection open
-- to each node so that we no longer have to use @ssh@.
withNodeLauncher
  :: NodeLauncherConfig
  -> WorkerAddr
  -> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
  -> Process a
withNodeLauncher :: NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
withNodeLauncher NodeLauncherConfig{FilePath
SSHCommand
nodeSshCmd :: SSHCommand
nodeLogDir :: FilePath
nodeSshCmd :: NodeLauncherConfig -> SSHCommand
nodeLogDir :: NodeLauncherConfig -> FilePath
..} WorkerAddr
addr' Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go = case WorkerAddr
addr' of
  WCP.RemoteAddr FilePath
addr -> do
    WorkerLauncher JobId
sshLauncher <- FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> Process (WorkerLauncher JobId)
forall (m :: * -> *).
MonadIO m =>
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
nodeLogDir (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> ((FilePath, [FilePath]) -> IO ())
-> (FilePath, [FilePath])
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> SSHCommand -> (FilePath, [FilePath]) -> IO ()
WCP.sshRunCmd FilePath
addr SSHCommand
nodeSshCmd)
    Either SSHError a
eitherResult <- forall a.
(MonadCatch Process, Exception SSHError) =>
Process a -> Process (Either SSHError a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @Process @SSHError (Process a -> Process (Either SSHError a))
-> Process a -> Process (Either SSHError a)
forall a b. (a -> b) -> a -> b
$ do
      WorkerLauncher JobId
-> (RemoteProcessRunner -> Process a) -> Process a
forall j a.
Show j =>
WorkerLauncher j -> (RemoteProcessRunner -> Process a) -> Process a
withRemoteRunProcess WorkerLauncher JobId
sshLauncher ((RemoteProcessRunner -> Process a) -> Process a)
-> (RemoteProcessRunner -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \RemoteProcessRunner
remoteRunNode ->
        let
          runCmdOnNode :: (FilePath, [FilePath]) -> Process ()
runCmdOnNode (FilePath, [FilePath])
cmd = do
            SerializableClosureProcess ()
scp <- Closure (Dict (Serializable ()))
-> Process (Closure (Process ()))
-> Process (SerializableClosureProcess ())
forall b.
Typeable b =>
Closure (Dict (Serializable b))
-> Process (Closure (Process b))
-> Process (SerializableClosureProcess b)
mkSerializableClosureProcess Closure (Dict (Serializable ()))
forall (c :: Constraint). Static c => Closure (Dict c)
closureDict (Process (Closure (Process ()))
 -> Process (SerializableClosureProcess ()))
-> Process (Closure (Process ()))
-> Process (SerializableClosureProcess ())
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> Process (Closure (Process ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Closure (Process ()) -> Process (Closure (Process ())))
-> Closure (Process ()) -> Process (Closure (Process ()))
forall a b. (a -> b) -> a -> b
$
              static (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> ((FilePath, [FilePath]) -> IO ())
-> (FilePath, [FilePath])
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath, [FilePath]) -> IO ()
runCmdLocalLog) StaticPtr ((FilePath, [FilePath]) -> Process ())
-> Closure (FilePath, [FilePath]) -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
StaticPtr (a -> b) -> Closure a -> Closure b
`ptrAp` (FilePath, [FilePath]) -> Closure (FilePath, [FilePath])
forall a. (Static (Binary a), Typeable a) => a -> Closure a
cPure (FilePath, [FilePath])
cmd
            SerializableClosureProcess () -> Process ()
RemoteProcessRunner
remoteRunNode SerializableClosureProcess ()
scp
        in
          FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> Process (WorkerLauncher JobId)
forall (m :: * -> *).
MonadIO m =>
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
nodeLogDir (FilePath, [FilePath]) -> Process ()
runCmdOnNode Process (WorkerLauncher JobId)
-> (WorkerLauncher JobId -> Process a) -> Process a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \WorkerLauncher JobId
launcher ->
          Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go ((WorkerAddr, WorkerLauncher JobId)
-> Maybe (WorkerAddr, WorkerLauncher JobId)
forall a. a -> Maybe a
Just (WorkerAddr
addr', WorkerLauncher JobId
launcher))
    case Either SSHError a
eitherResult of
      Right a
result -> a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
result
      Left SSHError
err -> do
        Text -> SSHError -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.warn Text
"Couldn't start launcher" SSHError
err
        Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go Maybe (WorkerAddr, WorkerLauncher JobId)
forall a. Maybe a
Nothing
  WCP.LocalHost FilePath
_ ->
    FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> Process (WorkerLauncher JobId)
forall (m :: * -> *).
MonadIO m =>
FilePath
-> ((FilePath, [FilePath]) -> Process ())
-> m (WorkerLauncher JobId)
workerLauncherWithRunCmd FilePath
nodeLogDir (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> ((FilePath, [FilePath]) -> IO ())
-> (FilePath, [FilePath])
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath, [FilePath]) -> IO ()
runCmdLocalAsync) Process (WorkerLauncher JobId)
-> (WorkerLauncher JobId -> Process a) -> Process a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \WorkerLauncher JobId
launcher ->
    Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a
go ((WorkerAddr, WorkerLauncher JobId)
-> Maybe (WorkerAddr, WorkerLauncher JobId)
forall a. a -> Maybe a
Just (WorkerAddr
addr', WorkerLauncher JobId
launcher))

-- | Run the given command in a child thread. Async.link ensures
-- that exceptions from the child are propagated to the parent.
--
-- NB: Previously, this function used 'System.Process.createProcess'
-- and discarded the resulting 'ProcessHandle'. This could result in
-- "insufficient resource" errors for OS threads. Hopefully the
-- current implementation avoids this problem.
runCmdLocalAsync :: (String, [String]) -> IO ()
runCmdLocalAsync :: (FilePath, [FilePath]) -> IO ()
runCmdLocalAsync (FilePath, [FilePath])
c = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
Async.async ((FilePath -> [FilePath] -> IO ())
-> (FilePath, [FilePath]) -> IO ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry FilePath -> [FilePath] -> IO ()
callProcess (FilePath, [FilePath])
c) IO (Async ()) -> (Async () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async () -> IO ()
forall a. Async a -> IO ()
Async.link

-- | Run the given command and log the command. This is suitable
-- for running on remote machines so we can keep track of what is
-- being run where.
runCmdLocalLog :: (String, [String]) -> IO ()
runCmdLocalLog :: (FilePath, [FilePath]) -> IO ()
runCmdLocalLog (FilePath, [FilePath])
c = do
  Text -> (FilePath, [FilePath]) -> IO ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Running command" (FilePath, [FilePath])
c
  (FilePath, [FilePath]) -> IO ()
runCmdLocalAsync (FilePath, [FilePath])
c

-- | Takes a `NodeLauncherConfig` and a list of addresses. Tries to
-- start \"worker-launcher\" workers on these addresses (see
-- 'withNodeLauncher').  Discards addresses on which the this
-- fails. From remaining addresses builds a worker CPU pool. The
-- continuation is then passed a function that launches workers in
-- this pool. The 'WorkerLaunchers' that continuation gets have
-- 'connectionTimeout' to 'Nothing'.
withPoolLauncher
  :: NodeLauncherConfig
  -> [WorkerAddr]
  -> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
  -> Process a
withPoolLauncher :: NodeLauncherConfig
-> [WorkerAddr]
-> ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> Process a
withPoolLauncher NodeLauncherConfig
cfg [WorkerAddr]
addrs' (NumCPUs -> WorkerLauncher JobId) -> Process a
go = ContT a Process a -> Process a
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT a Process a -> Process a) -> ContT a Process a -> Process a
forall a b. (a -> b) -> a -> b
$ do
  [Maybe (WorkerAddr, WorkerLauncher JobId)]
mLaunchers <- (WorkerAddr
 -> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId)))
-> [WorkerAddr]
-> ContT a Process [Maybe (WorkerAddr, WorkerLauncher JobId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (((Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
 -> Process a)
-> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId))
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
  -> Process a)
 -> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId)))
-> (WorkerAddr
    -> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
    -> Process a)
-> WorkerAddr
-> ContT a Process (Maybe (WorkerAddr, WorkerLauncher JobId))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
forall a.
NodeLauncherConfig
-> WorkerAddr
-> (Maybe (WorkerAddr, WorkerLauncher JobId) -> Process a)
-> Process a
withNodeLauncher NodeLauncherConfig
cfg) [WorkerAddr]
addrs'
  let launcherMap :: Map WorkerAddr (WorkerLauncher JobId)
launcherMap = [(WorkerAddr, WorkerLauncher JobId)]
-> Map WorkerAddr (WorkerLauncher JobId)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([Maybe (WorkerAddr, WorkerLauncher JobId)]
-> [(WorkerAddr, WorkerLauncher JobId)]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (WorkerAddr, WorkerLauncher JobId)]
mLaunchers)
      addrs :: [WorkerAddr]
addrs = Map WorkerAddr (WorkerLauncher JobId) -> [WorkerAddr]
forall k a. Map k a -> [k]
Map.keys Map WorkerAddr (WorkerLauncher JobId)
launcherMap
  WorkerCpuPool
workerCpuPool <- IO WorkerCpuPool -> ContT a Process WorkerCpuPool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([WorkerAddr] -> IO WorkerCpuPool
WCP.newJobPool [WorkerAddr]
addrs)
  Text -> [WorkerAddr] -> ContT a Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Started worker launchers at" [WorkerAddr]
addrs
  Process a -> ContT a Process a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Process a -> ContT a Process a) -> Process a -> ContT a Process a
forall a b. (a -> b) -> a -> b
$ (NumCPUs -> WorkerLauncher JobId) -> Process a
go ((NumCPUs -> WorkerLauncher JobId) -> Process a)
-> (NumCPUs -> WorkerLauncher JobId) -> Process a
forall a b. (a -> b) -> a -> b
$ \NumCPUs
nCpus -> WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher
    { withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker = \NodeId
nodeId ServiceId
serviceId JobId -> Process b
goJobId ->
        WorkerCpuPool -> NumCPUs -> (WorkerAddr -> Process b) -> Process b
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
WorkerCpuPool -> NumCPUs -> (WorkerAddr -> m a) -> m a
WCP.withWorkerAddr WorkerCpuPool
workerCpuPool NumCPUs
nCpus ((WorkerAddr -> Process b) -> Process b)
-> (WorkerAddr -> Process b) -> Process b
forall a b. (a -> b) -> a -> b
$ \WorkerAddr
addr ->
        WorkerLauncher JobId
-> NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall j.
WorkerLauncher j
-> forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b
withLaunchedWorker (Map WorkerAddr (WorkerLauncher JobId)
launcherMap Map WorkerAddr (WorkerLauncher JobId)
-> WorkerAddr -> WorkerLauncher JobId
forall k a. Ord k => Map k a -> k -> a
Map.! WorkerAddr
addr) NodeId
nodeId ServiceId
serviceId JobId -> Process b
goJobId
    , connectionTimeout :: Maybe NominalDiffTime
connectionTimeout = Maybe NominalDiffTime
forall a. Maybe a
Nothing
    , onRemoteError :: forall b. RemoteError -> Process b -> Process b
onRemoteError     = \RemoteError
e Process b
_ -> RemoteError -> Process b
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM RemoteError
e
    }

remoteEvalJobM
  :: (Static (Binary b), Typeable b)
  => Cluster (Closure (Job b))
  -> Cluster b
remoteEvalJobM :: Cluster (Closure (Job b)) -> Cluster b
remoteEvalJobM Cluster (Closure (Job b))
mc = do
  ProgramInfo
programInfo <- (ClusterEnv -> ProgramInfo)
-> ReaderT ClusterEnv Process ProgramInfo
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ClusterEnv -> ProgramInfo
clusterProgramInfo
  ReaderT ClusterEnv Process (Closure (Process b)) -> Cluster b
forall (m :: * -> *) b.
(HasWorkers m, Static (Binary b), Typeable b) =>
m (Closure (Process b)) -> m b
remoteEvalM (ReaderT ClusterEnv Process (Closure (Process b)) -> Cluster b)
-> ReaderT ClusterEnv Process (Closure (Process b)) -> Cluster b
forall a b. (a -> b) -> a -> b
$ do
    Closure (Job b)
c <- Cluster (Closure (Job b))
mc
    Closure (Process b)
-> ReaderT ClusterEnv Process (Closure (Process b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Closure (Process b)
 -> ReaderT ClusterEnv Process (Closure (Process b)))
-> Closure (Process b)
-> ReaderT ClusterEnv Process (Closure (Process b))
forall a b. (a -> b) -> a -> b
$ StaticPtr (ProgramInfo -> Job b -> Process b)
-> Closure (ProgramInfo -> Job b -> Process b)
forall a. Typeable a => StaticPtr a -> Closure a
cPtr (static ProgramInfo -> Job b -> Process b
forall a. ProgramInfo -> Job a -> Process a
runJobSlurm) Closure (ProgramInfo -> Job b -> Process b)
-> Closure ProgramInfo -> Closure (Job b -> Process b)
forall a b. Closure (a -> b) -> Closure a -> Closure b
`cAp` ProgramInfo -> Closure ProgramInfo
forall a. (Static (Binary a), Typeable a) => a -> Closure a
cPure ProgramInfo
programInfo Closure (Job b -> Process b)
-> Closure (Job b) -> Closure (Process b)
forall a b. Closure (a -> b) -> Closure a -> Closure b
`cAp` Closure (Job b)
c

remoteEvalJob
  :: (Static (Binary b), Typeable b)
  => Closure (Job b)
  -> Cluster b
remoteEvalJob :: Closure (Job b) -> Cluster b
remoteEvalJob = Cluster (Closure (Job b)) -> Cluster b
forall b.
(Static (Binary b), Typeable b) =>
Cluster (Closure (Job b)) -> Cluster b
remoteEvalJobM (Cluster (Closure (Job b)) -> Cluster b)
-> (Closure (Job b) -> Cluster (Closure (Job b)))
-> Closure (Job b)
-> Cluster b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Closure (Job b) -> Cluster (Closure (Job b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure