{-# LANGUAGE DeriveAnyClass        #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE DerivingStrategies    #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE StaticPointers        #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE TypeFamilies          #-}

module Hyperion.Cluster where

import           Control.Distributed.Process      (NodeId, Process)
import           Control.Distributed.Process.Node (initRemoteTable)
import           Control.Lens                     (lens)
import           Control.Monad.Catch              (try, MonadCatch)
import           Control.Monad.IO.Class           (MonadIO, liftIO)
import           Control.Monad.Reader             (ReaderT, asks, runReaderT, MonadReader)
import           Data.Aeson                       (FromJSON, ToJSON)
import           Data.Binary                      (Binary)
import           Data.Constraint                  (Dict (..))
import           Data.Text                        (Text)
import qualified Data.Text                        as Text
import           Data.Time.Clock                  (NominalDiffTime)
import           Data.Typeable                    (Typeable)
import           GHC.Generics                     (Generic)
import           Hyperion.Command                 (hyperionWorkerCommand)
import qualified Hyperion.Database                as DB
import           Hyperion.HasWorkers              (HasWorkerLauncher (..))
import           Hyperion.HoldServer              (HoldMap, blockUntilRetried)
import           Hyperion.LockMap                 (LockMap, registerLockMap)
import qualified Hyperion.Log                     as Log
import           Hyperion.ObjectId                (getObjectId,
                                                   objectIdToString)
import           Hyperion.ProgramId               (ProgramId, programIdToText)
import           Hyperion.Remote                  (RemoteError (..), ServiceId,
                                                   WorkerLauncher (..),
                                                   registerMasterNodeId,
                                                   runProcessLocalWithRT,
                                                   serviceIdToString,
                                                   serviceIdToText)
import           Hyperion.Slurm                   (JobId (..), SbatchError,
                                                   SbatchOptions (..),
                                                   sbatchCommand)
import           Hyperion.TokenPool               (TokenPool, withToken)
import           Hyperion.Static                  (Static (..), cPtr)
import           Hyperion.Util                    (emailError, retryExponential)
import           Hyperion.WorkerCpuPool           (SSHCommand)
import           System.Directory                 (createDirectoryIfMissing)
import           System.FilePath.Posix            ((<.>), (</>))

-- * General comments
-- $
-- In this module we define the 'Cluster' monad. It is nothing more than a
-- 'Process' with an environment 'ClusterEnv'.
--
-- The 'ClusterEnv' environment contains information about
--
--     * the 'ProgramId' of the current run,
--     * the paths to database and log/data directories that we should use,
--     * options to use when using @sbatch@ to spawn cluster jobs,
--     * data equivalent to 'DB.DatabaseConfig' to handle the database,
--     * a 'WorkerLauncher' to launch remote workers. More precisely, a function
--       'clusterWorkerLauncher' that takes 'SbatchOptions' and 'ProgramInfo' to
--       produce a 'WorkerLauncher'.
--
-- A 'ClusterEnv' may be initialized with 'Hyperion.Config.newClusterEnv', which
-- use 'slurmWorkerLauncher' to initialize 'clusterWorkerLauncher'. In this
-- scenario the 'Cluster' monad will operate in the following way. It will perform
-- the calculations in the master process until some remote function is invoked,
-- typically through 'Hyperion.HasWorkers.remoteEval', at which point it will
-- use @sbatch@ and the current 'SbatchOptions' to allocate a new job and then
-- it will run a single worker in that allocation.
--
-- This has the following consequences.
--
--     * Each time 'Cluster' runs a remote function, it will schedule
--       a new job with @SLURM@. If you run a lot of small remote
--       functions (e.g., using "Hyperion.Concurrently") in 'Cluster'
--       monad, it means that you will schedule a lot of small jobs
--       with @SLURM@. If your cluster's scheduling prioritizes small
--       jobs, this may be a fine mode of operation (for example, this
--       was the case on the now-defunct @Hyperion@ cluster at IAS).
--       More likely though, it will lead to your jobs pending and the
--       computation running slowly, especially if the remote
--       functions are not run at the same time, but new ones are run
--       when old ones finish (for example, if you try to perform a
--       lot of parallel binary searches). For such cases
--       'Hyperion.Job.Job' monad should be used.
--     * One should use 'Hyperion.Slurm.Sbatch.nodes' greater than 1
--       if either: (1) The job runs an external program that uses MPI
--       or something similar and therefore can access all of the
--       resources allocated by @SLURM@, or (2) the remote function
--       spawns new @hyperion@ workers using the 'Job' monad.  If your
--       remote function does spawn new workers, then it may make
--       sense to use 'Hyperion.Slurm.Sbatch.nodes' greater than 1,
--       but your remote function needs to take into account the fact
--       that the nodes are already allocated. For example, from the
--       'Cluster' monad, we can run a remote computation in the
--       'Job', allocating it more than 1 node. The 'Job' computation
--       will automagically detect the nodes available to it, the
--       number of CPUs on each node, and will create a
--       'WorkerCpuPool' that will manage these resources
--       independently of @SLURM@. One can then run remote functions
--       on these resources from the 'Job' computation without having
--       to wait for @SLURM@ scheduling. See "Hyperion.Job" for
--       details.
--
-- The common usecase is that a 'Cluster' computation is ran on the login node.
-- It then schedules a job with a bunch or resources with @SLURM@. When the job
-- starts, a 'Job' calculation runs on one of the allocated nodes. It then spawns
-- 'Process' computations on the resources available to the job, which it manages
-- via 'Hyperion.WorkerCpuPool.WorkerCpuPool'.
--
-- Besides the 'Cluster' monad, this module defines 'slurmWorkerLauncher' and
-- some utility functions for working with 'ClusterEnv' and 'ProgramInfo', along
-- with a few others.


-- * Documentation
-- $

-- | Type containing information about our program
data ProgramInfo = ProgramInfo
  { ProgramInfo -> ProgramId
programId         :: ProgramId
  , ProgramInfo -> FilePath
programDatabase   :: FilePath
  , ProgramInfo -> FilePath
programLogDir     :: FilePath
  , ProgramInfo -> FilePath
programDataDir    :: FilePath
  , ProgramInfo -> SSHCommand
programSSHCommand :: SSHCommand
  } deriving (ProgramInfo -> ProgramInfo -> Bool
(ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool) -> Eq ProgramInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ProgramInfo -> ProgramInfo -> Bool
$c/= :: ProgramInfo -> ProgramInfo -> Bool
== :: ProgramInfo -> ProgramInfo -> Bool
$c== :: ProgramInfo -> ProgramInfo -> Bool
Eq, Eq ProgramInfo
Eq ProgramInfo
-> (ProgramInfo -> ProgramInfo -> Ordering)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> ProgramInfo)
-> (ProgramInfo -> ProgramInfo -> ProgramInfo)
-> Ord ProgramInfo
ProgramInfo -> ProgramInfo -> Bool
ProgramInfo -> ProgramInfo -> Ordering
ProgramInfo -> ProgramInfo -> ProgramInfo
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ProgramInfo -> ProgramInfo -> ProgramInfo
$cmin :: ProgramInfo -> ProgramInfo -> ProgramInfo
max :: ProgramInfo -> ProgramInfo -> ProgramInfo
$cmax :: ProgramInfo -> ProgramInfo -> ProgramInfo
>= :: ProgramInfo -> ProgramInfo -> Bool
$c>= :: ProgramInfo -> ProgramInfo -> Bool
> :: ProgramInfo -> ProgramInfo -> Bool
$c> :: ProgramInfo -> ProgramInfo -> Bool
<= :: ProgramInfo -> ProgramInfo -> Bool
$c<= :: ProgramInfo -> ProgramInfo -> Bool
< :: ProgramInfo -> ProgramInfo -> Bool
$c< :: ProgramInfo -> ProgramInfo -> Bool
compare :: ProgramInfo -> ProgramInfo -> Ordering
$ccompare :: ProgramInfo -> ProgramInfo -> Ordering
$cp1Ord :: Eq ProgramInfo
Ord, Int -> ProgramInfo -> ShowS
[ProgramInfo] -> ShowS
ProgramInfo -> FilePath
(Int -> ProgramInfo -> ShowS)
-> (ProgramInfo -> FilePath)
-> ([ProgramInfo] -> ShowS)
-> Show ProgramInfo
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ProgramInfo] -> ShowS
$cshowList :: [ProgramInfo] -> ShowS
show :: ProgramInfo -> FilePath
$cshow :: ProgramInfo -> FilePath
showsPrec :: Int -> ProgramInfo -> ShowS
$cshowsPrec :: Int -> ProgramInfo -> ShowS
Show, (forall x. ProgramInfo -> Rep ProgramInfo x)
-> (forall x. Rep ProgramInfo x -> ProgramInfo)
-> Generic ProgramInfo
forall x. Rep ProgramInfo x -> ProgramInfo
forall x. ProgramInfo -> Rep ProgramInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ProgramInfo x -> ProgramInfo
$cfrom :: forall x. ProgramInfo -> Rep ProgramInfo x
Generic, Get ProgramInfo
[ProgramInfo] -> Put
ProgramInfo -> Put
(ProgramInfo -> Put)
-> Get ProgramInfo -> ([ProgramInfo] -> Put) -> Binary ProgramInfo
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [ProgramInfo] -> Put
$cputList :: [ProgramInfo] -> Put
get :: Get ProgramInfo
$cget :: Get ProgramInfo
put :: ProgramInfo -> Put
$cput :: ProgramInfo -> Put
Binary, Value -> Parser [ProgramInfo]
Value -> Parser ProgramInfo
(Value -> Parser ProgramInfo)
-> (Value -> Parser [ProgramInfo]) -> FromJSON ProgramInfo
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [ProgramInfo]
$cparseJSONList :: Value -> Parser [ProgramInfo]
parseJSON :: Value -> Parser ProgramInfo
$cparseJSON :: Value -> Parser ProgramInfo
FromJSON, [ProgramInfo] -> Encoding
[ProgramInfo] -> Value
ProgramInfo -> Encoding
ProgramInfo -> Value
(ProgramInfo -> Value)
-> (ProgramInfo -> Encoding)
-> ([ProgramInfo] -> Value)
-> ([ProgramInfo] -> Encoding)
-> ToJSON ProgramInfo
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [ProgramInfo] -> Encoding
$ctoEncodingList :: [ProgramInfo] -> Encoding
toJSONList :: [ProgramInfo] -> Value
$ctoJSONList :: [ProgramInfo] -> Value
toEncoding :: ProgramInfo -> Encoding
$ctoEncoding :: ProgramInfo -> Encoding
toJSON :: ProgramInfo -> Value
$ctoJSON :: ProgramInfo -> Value
ToJSON)

instance Static (Binary ProgramInfo) where closureDict :: Closure (Dict (Binary ProgramInfo))
closureDict = StaticPtr (Dict (Binary ProgramInfo))
-> Closure (Dict (Binary ProgramInfo))
forall a. Typeable a => StaticPtr a -> Closure a
cPtr (static Dict (Binary ProgramInfo)
forall (a :: Constraint). a => Dict a
Dict)

-- | The environment for 'Cluster' monad.
data ClusterEnv = ClusterEnv
  { ClusterEnv -> SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterWorkerLauncher  :: SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
  , ClusterEnv -> ProgramInfo
clusterProgramInfo     :: ProgramInfo
  , ClusterEnv -> SbatchOptions
clusterJobOptions      :: SbatchOptions
  , ClusterEnv -> Pool
clusterDatabasePool    :: DB.Pool
  , ClusterEnv -> Int
clusterDatabaseRetries :: Int
  , ClusterEnv -> LockMap
clusterLockMap         :: LockMap
  }

class HasProgramInfo a where
  toProgramInfo :: a -> ProgramInfo

instance HasProgramInfo ClusterEnv where
  toProgramInfo :: ClusterEnv -> ProgramInfo
toProgramInfo = ClusterEnv -> ProgramInfo
clusterProgramInfo

-- | The 'Cluster' monad. It is simply 'Process' with 'ClusterEnv' environment.
type Cluster = ReaderT ClusterEnv Process

-- | 'ClusterEnv' is an instance of 'HasDB' since it contains info that is
-- sufficient to build a 'DB.DatabaseConfig'.
instance DB.HasDB ClusterEnv where
  dbConfigLens :: (DatabaseConfig -> f DatabaseConfig) -> ClusterEnv -> f ClusterEnv
dbConfigLens = (ClusterEnv -> DatabaseConfig)
-> (ClusterEnv -> DatabaseConfig -> ClusterEnv)
-> Lens' ClusterEnv DatabaseConfig
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens ClusterEnv -> DatabaseConfig
get ClusterEnv -> DatabaseConfig -> ClusterEnv
set
    where
      get :: ClusterEnv -> DatabaseConfig
get ClusterEnv {Int
LockMap
SbatchOptions
Pool
ProgramInfo
SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: LockMap
clusterDatabaseRetries :: Int
clusterDatabasePool :: Pool
clusterJobOptions :: SbatchOptions
clusterProgramInfo :: ProgramInfo
clusterWorkerLauncher :: SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: ClusterEnv -> LockMap
clusterDatabaseRetries :: ClusterEnv -> Int
clusterDatabasePool :: ClusterEnv -> Pool
clusterJobOptions :: ClusterEnv -> SbatchOptions
clusterProgramInfo :: ClusterEnv -> ProgramInfo
clusterWorkerLauncher :: ClusterEnv -> SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
..} = DatabaseConfig :: Pool -> ProgramId -> Int -> DatabaseConfig
DB.DatabaseConfig
        { dbPool :: Pool
dbPool      = Pool
clusterDatabasePool
        , dbProgramId :: ProgramId
dbProgramId = ProgramInfo -> ProgramId
programId ProgramInfo
clusterProgramInfo
        , dbRetries :: Int
dbRetries   = Int
clusterDatabaseRetries
        }
      set :: ClusterEnv -> DatabaseConfig -> ClusterEnv
set ClusterEnv
h DB.DatabaseConfig {Int
ProgramId
Pool
dbRetries :: Int
dbProgramId :: ProgramId
dbPool :: Pool
dbRetries :: DatabaseConfig -> Int
dbProgramId :: DatabaseConfig -> ProgramId
dbPool :: DatabaseConfig -> Pool
..} = ClusterEnv
h
        { clusterDatabasePool :: Pool
clusterDatabasePool    = Pool
dbPool
        , clusterProgramInfo :: ProgramInfo
clusterProgramInfo     = (ClusterEnv -> ProgramInfo
clusterProgramInfo ClusterEnv
h) { programId :: ProgramId
programId = ProgramId
dbProgramId }
        , clusterDatabaseRetries :: Int
clusterDatabaseRetries = Int
dbRetries
        }

-- | We make 'ClusterEnv' an instance of 'HasWorkerLauncher'. This makes
-- 'Cluster' an instance of 'HasWorkers' and gives us access to functions in
-- "Hyperion.Remote".
instance HasWorkerLauncher ClusterEnv where
  toWorkerLauncher :: ClusterEnv -> WorkerLauncher JobId
toWorkerLauncher ClusterEnv{Int
LockMap
SbatchOptions
Pool
ProgramInfo
SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: LockMap
clusterDatabaseRetries :: Int
clusterDatabasePool :: Pool
clusterJobOptions :: SbatchOptions
clusterProgramInfo :: ProgramInfo
clusterWorkerLauncher :: SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: ClusterEnv -> LockMap
clusterDatabaseRetries :: ClusterEnv -> Int
clusterDatabasePool :: ClusterEnv -> Pool
clusterJobOptions :: ClusterEnv -> SbatchOptions
clusterProgramInfo :: ClusterEnv -> ProgramInfo
clusterWorkerLauncher :: ClusterEnv -> SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
..} =
    SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterWorkerLauncher SbatchOptions
clusterJobOptions ProgramInfo
clusterProgramInfo

-- | Type representing resources for an MPI job.
data MPIJob = MPIJob
  { MPIJob -> Int
mpiNodes         :: Int
  , MPIJob -> Int
mpiNTasksPerNode :: Int
  } deriving (MPIJob -> MPIJob -> Bool
(MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool) -> Eq MPIJob
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MPIJob -> MPIJob -> Bool
$c/= :: MPIJob -> MPIJob -> Bool
== :: MPIJob -> MPIJob -> Bool
$c== :: MPIJob -> MPIJob -> Bool
Eq, Eq MPIJob
Eq MPIJob
-> (MPIJob -> MPIJob -> Ordering)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> MPIJob)
-> (MPIJob -> MPIJob -> MPIJob)
-> Ord MPIJob
MPIJob -> MPIJob -> Bool
MPIJob -> MPIJob -> Ordering
MPIJob -> MPIJob -> MPIJob
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: MPIJob -> MPIJob -> MPIJob
$cmin :: MPIJob -> MPIJob -> MPIJob
max :: MPIJob -> MPIJob -> MPIJob
$cmax :: MPIJob -> MPIJob -> MPIJob
>= :: MPIJob -> MPIJob -> Bool
$c>= :: MPIJob -> MPIJob -> Bool
> :: MPIJob -> MPIJob -> Bool
$c> :: MPIJob -> MPIJob -> Bool
<= :: MPIJob -> MPIJob -> Bool
$c<= :: MPIJob -> MPIJob -> Bool
< :: MPIJob -> MPIJob -> Bool
$c< :: MPIJob -> MPIJob -> Bool
compare :: MPIJob -> MPIJob -> Ordering
$ccompare :: MPIJob -> MPIJob -> Ordering
$cp1Ord :: Eq MPIJob
Ord, Int -> MPIJob -> ShowS
[MPIJob] -> ShowS
MPIJob -> FilePath
(Int -> MPIJob -> ShowS)
-> (MPIJob -> FilePath) -> ([MPIJob] -> ShowS) -> Show MPIJob
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [MPIJob] -> ShowS
$cshowList :: [MPIJob] -> ShowS
show :: MPIJob -> FilePath
$cshow :: MPIJob -> FilePath
showsPrec :: Int -> MPIJob -> ShowS
$cshowsPrec :: Int -> MPIJob -> ShowS
Show, (forall x. MPIJob -> Rep MPIJob x)
-> (forall x. Rep MPIJob x -> MPIJob) -> Generic MPIJob
forall x. Rep MPIJob x -> MPIJob
forall x. MPIJob -> Rep MPIJob x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MPIJob x -> MPIJob
$cfrom :: forall x. MPIJob -> Rep MPIJob x
Generic, Get MPIJob
[MPIJob] -> Put
MPIJob -> Put
(MPIJob -> Put) -> Get MPIJob -> ([MPIJob] -> Put) -> Binary MPIJob
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [MPIJob] -> Put
$cputList :: [MPIJob] -> Put
get :: Get MPIJob
$cget :: Get MPIJob
put :: MPIJob -> Put
$cput :: MPIJob -> Put
Binary, Value -> Parser [MPIJob]
Value -> Parser MPIJob
(Value -> Parser MPIJob)
-> (Value -> Parser [MPIJob]) -> FromJSON MPIJob
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [MPIJob]
$cparseJSONList :: Value -> Parser [MPIJob]
parseJSON :: Value -> Parser MPIJob
$cparseJSON :: Value -> Parser MPIJob
FromJSON, [MPIJob] -> Encoding
[MPIJob] -> Value
MPIJob -> Encoding
MPIJob -> Value
(MPIJob -> Value)
-> (MPIJob -> Encoding)
-> ([MPIJob] -> Value)
-> ([MPIJob] -> Encoding)
-> ToJSON MPIJob
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [MPIJob] -> Encoding
$ctoEncodingList :: [MPIJob] -> Encoding
toJSONList :: [MPIJob] -> Value
$ctoJSONList :: [MPIJob] -> Value
toEncoding :: MPIJob -> Encoding
$ctoEncoding :: MPIJob -> Encoding
toJSON :: MPIJob -> Value
$ctoJSON :: MPIJob -> Value
ToJSON, Typeable)

runCluster :: ClusterEnv -> Cluster a -> IO a
runCluster :: ClusterEnv -> Cluster a -> IO a
runCluster ClusterEnv
clusterEnv Cluster a
h = RemoteTable -> Process a -> IO a
forall a. RemoteTable -> Process a -> IO a
runProcessLocalWithRT RemoteTable
rtable (Cluster a -> ClusterEnv -> Process a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Cluster a
h ClusterEnv
clusterEnv)
  where
    rtable :: RemoteTable
rtable = Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId Maybe NodeId
forall a. Maybe a
Nothing
           (RemoteTable -> RemoteTable) -> RemoteTable -> RemoteTable
forall a b. (a -> b) -> a -> b
$ LockMap -> RemoteTable -> RemoteTable
registerLockMap (ClusterEnv -> LockMap
clusterLockMap ClusterEnv
clusterEnv) RemoteTable
initRemoteTable

modifyJobOptions :: (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions :: (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions SbatchOptions -> SbatchOptions
f ClusterEnv
cfg = ClusterEnv
cfg { clusterJobOptions :: SbatchOptions
clusterJobOptions = SbatchOptions -> SbatchOptions
f (ClusterEnv -> SbatchOptions
clusterJobOptions ClusterEnv
cfg) }

setJobOptions :: SbatchOptions -> ClusterEnv -> ClusterEnv
setJobOptions :: SbatchOptions -> ClusterEnv -> ClusterEnv
setJobOptions SbatchOptions
c = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions (SbatchOptions -> SbatchOptions -> SbatchOptions
forall a b. a -> b -> a
const SbatchOptions
c)

setJobTime :: NominalDiffTime -> ClusterEnv -> ClusterEnv
setJobTime :: NominalDiffTime -> ClusterEnv -> ClusterEnv
setJobTime NominalDiffTime
t = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { time :: NominalDiffTime
time = NominalDiffTime
t }

setJobMemory :: Text -> ClusterEnv -> ClusterEnv
setJobMemory :: Text -> ClusterEnv -> ClusterEnv
setJobMemory Text
m = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { mem :: Maybe Text
mem = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
m }

setJobType :: MPIJob -> ClusterEnv -> ClusterEnv
setJobType :: MPIJob -> ClusterEnv -> ClusterEnv
setJobType MPIJob{Int
mpiNTasksPerNode :: Int
mpiNodes :: Int
mpiNTasksPerNode :: MPIJob -> Int
mpiNodes :: MPIJob -> Int
..} = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts
  { nodes :: Int
nodes = Int
mpiNodes
  , nTasksPerNode :: Int
nTasksPerNode = Int
mpiNTasksPerNode
  }

setSlurmPartition :: Text -> ClusterEnv -> ClusterEnv
setSlurmPartition :: Text -> ClusterEnv -> ClusterEnv
setSlurmPartition Text
p = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { partition :: Maybe Text
partition = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
p }

setSlurmConstraint :: Text -> ClusterEnv -> ClusterEnv
setSlurmConstraint :: Text -> ClusterEnv -> ClusterEnv
setSlurmConstraint Text
c = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { constraint :: Maybe Text
constraint = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
c }

setSlurmAccount :: Text -> ClusterEnv -> ClusterEnv
setSlurmAccount :: Text -> ClusterEnv -> ClusterEnv
setSlurmAccount Text
a = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { account :: Maybe Text
account = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
a }

setSlurmQos :: Text -> ClusterEnv -> ClusterEnv
setSlurmQos :: Text -> ClusterEnv -> ClusterEnv
setSlurmQos Text
a = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { qos :: Maybe Text
qos = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
a }

-- | The default number of retries to use in 'withConnectionRetry'. Set to 20.
defaultDBRetries :: Int
defaultDBRetries :: Int
defaultDBRetries = Int
20 -- update haddock if changing the value

dbConfigFromProgramInfo :: ProgramInfo -> IO DB.DatabaseConfig
dbConfigFromProgramInfo :: ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
pInfo = do
  Pool
dbPool <- FilePath -> IO Pool
DB.newDefaultPool (ProgramInfo -> FilePath
programDatabase ProgramInfo
pInfo)
  let dbProgramId :: ProgramId
dbProgramId = ProgramInfo -> ProgramId
programId ProgramInfo
pInfo
      dbRetries :: Int
dbRetries = Int
defaultDBRetries
  DatabaseConfig -> IO DatabaseConfig
forall (m :: * -> *) a. Monad m => a -> m a
return DatabaseConfig :: Pool -> ProgramId -> Int -> DatabaseConfig
DB.DatabaseConfig{Int
ProgramId
Pool
dbRetries :: Int
dbProgramId :: ProgramId
dbPool :: Pool
dbRetries :: Int
dbProgramId :: ProgramId
dbPool :: Pool
..}

runDBWithProgramInfo :: ProgramInfo -> ReaderT DB.DatabaseConfig IO a -> IO a
runDBWithProgramInfo :: ProgramInfo -> ReaderT DatabaseConfig IO a -> IO a
runDBWithProgramInfo ProgramInfo
pInfo ReaderT DatabaseConfig IO a
m = do
  ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
pInfo IO DatabaseConfig -> (DatabaseConfig -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReaderT DatabaseConfig IO a -> DatabaseConfig -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT DatabaseConfig IO a
m

slurmWorkerLauncher
  :: Maybe Text      -- ^ Email address to send notifications to if sbatch
                     -- fails or there is an error in a remote
                     -- job. 'Nothing' means no emails will be sent.
  -> FilePath        -- ^ Path to this hyperion executable
  -> HoldMap         -- ^ HoldMap used by the HoldServer
  -> Int             -- ^ Port used by the HoldServer (needed for error messages)
  -> TokenPool       -- ^ TokenPool for throttling the number of submitted jobs
  -> SbatchOptions
  -> ProgramInfo
  -> WorkerLauncher JobId
slurmWorkerLauncher :: Maybe Text
-> FilePath
-> HoldMap
-> Int
-> TokenPool
-> SbatchOptions
-> ProgramInfo
-> WorkerLauncher JobId
slurmWorkerLauncher Maybe Text
emailAddr FilePath
hyperionExec HoldMap
holdMap Int
holdPort TokenPool
sbatchTokenPool SbatchOptions
opts ProgramInfo
progInfo =
  WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher {Maybe NominalDiffTime
forall a. Maybe a
forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: forall a. Maybe a
..}
  where
    connectionTimeout :: Maybe a
connectionTimeout = Maybe a
forall a. Maybe a
Nothing

    emailAlertUser :: (MonadIO m, Show e) => e -> m ()
    emailAlertUser :: e -> m ()
emailAlertUser e
e = case Maybe Text
emailAddr of
      Just Text
toAddr -> Text -> e -> m ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
emailError Text
toAddr e
e
      Maybe Text
Nothing     -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    onRemoteError :: forall b . RemoteError -> Process b -> Process b
    onRemoteError :: RemoteError -> Process b -> Process b
onRemoteError e :: RemoteError
e@(RemoteError ServiceId
sId RemoteErrorType
_) Process b
go = do
      let
        errInfo :: (RemoteError, ProgramInfo, Text)
errInfo = (RemoteError
e, ProgramInfo
progInfo, Text
msg)
        msg :: Text
msg = [Text] -> Text
forall a. Monoid a => [a] -> a
mconcat
          [ Text
"This remote process has been put on hold because of an error. "
          , Text
"To retry it, run 'curl localhost:"
          , FilePath -> Text
Text.pack (Int -> FilePath
forall a. Show a => a -> FilePath
show Int
holdPort)
          , Text
"/retry/"
          , ServiceId -> Text
serviceIdToText ServiceId
sId
          , Text
"'"
          ]
      (RemoteError, ProgramInfo, Text) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => a -> m ()
Log.err (RemoteError, ProgramInfo, Text)
errInfo
      (RemoteError, ProgramInfo, Text) -> Process ()
forall (m :: * -> *) e. (MonadIO m, Show e) => e -> m ()
emailAlertUser (RemoteError, ProgramInfo, Text)
errInfo
      HoldMap -> Text -> Process ()
forall (m :: * -> *). MonadIO m => HoldMap -> Text -> m ()
blockUntilRetried HoldMap
holdMap (ServiceId -> Text
serviceIdToText ServiceId
sId)
      Text -> ServiceId -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Retrying" ServiceId
sId
      Process b
go

    withLaunchedWorker :: forall b . NodeId -> ServiceId -> (JobId -> Process b) -> Process b
    withLaunchedWorker :: NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId JobId -> Process b
goJobId = TokenPool -> Process b -> Process b
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
TokenPool -> m a -> m a
withToken TokenPool
sbatchTokenPool (Process b -> Process b) -> Process b -> Process b
forall a b. (a -> b) -> a -> b
$ do
      JobId
jobId <- IO JobId -> Process JobId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO JobId -> Process JobId) -> IO JobId -> Process JobId
forall a b. (a -> b) -> a -> b
$
        -- Repeatedly run sbatch, with exponentially increasing time
        -- intervals between failures. Email the user on each failure
        -- (see logSbatchError). We do not allow an SbatchError to
        -- propagate up from here because there is no obvious way to
        -- recover. TODO: maybe use the HoldServer?
        (IO JobId -> IO (Either SbatchError JobId))
-> (WaitRetry SbatchError -> IO ()) -> IO JobId -> IO JobId
forall (m :: * -> *) a e.
MonadIO m =>
(m a -> m (Either e a)) -> (WaitRetry e -> m ()) -> m a -> m a
retryExponential (forall a.
(MonadCatch IO, Exception SbatchError) =>
IO a -> IO (Either SbatchError a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @IO @SbatchError) WaitRetry SbatchError -> IO ()
logSbatchError (IO JobId -> IO JobId) -> IO JobId -> IO JobId
forall a b. (a -> b) -> a -> b
$
        SbatchOptions -> FilePath -> [Text] -> IO JobId
sbatchCommand SbatchOptions
opts' FilePath
cmd ((FilePath -> Text) -> [FilePath] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map FilePath -> Text
Text.pack [FilePath]
args)
      JobId -> Process b
goJobId JobId
jobId
      where
        progId :: ProgramId
progId = ProgramInfo -> ProgramId
programId ProgramInfo
progInfo
        logFile :: FilePath
logFile = ProgramInfo -> FilePath
programLogDir ProgramInfo
progInfo FilePath -> ShowS
</> ServiceId -> FilePath
serviceIdToString ServiceId
serviceId FilePath -> ShowS
<.> FilePath
"log"
        opts' :: SbatchOptions
opts' = SbatchOptions
opts
          { jobName :: Maybe Text
jobName = Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ ProgramId -> Text
programIdToText ProgramId
progId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ServiceId -> Text
serviceIdToText ServiceId
serviceId
          }
        (FilePath
cmd, [FilePath]
args) = FilePath
-> NodeId -> ServiceId -> FilePath -> (FilePath, [FilePath])
hyperionWorkerCommand FilePath
hyperionExec NodeId
nid ServiceId
serviceId FilePath
logFile
        logSbatchError :: WaitRetry SbatchError -> IO ()
logSbatchError WaitRetry SbatchError
e = do
          WaitRetry SbatchError -> IO ()
forall a (m :: * -> *). (Show a, MonadIO m) => a -> m ()
Log.err WaitRetry SbatchError
e
          (WaitRetry SbatchError, ProgramInfo, NodeId, ServiceId) -> IO ()
forall (m :: * -> *) e. (MonadIO m, Show e) => e -> m ()
emailAlertUser (WaitRetry SbatchError
e, ProgramInfo
progInfo, NodeId
nid, ServiceId
serviceId)

-- | Construct a working directory for the given object, using its
-- ObjectId. Will be a subdirectory of 'programDataDir'. Created
-- automatically, and saved in the database.
newWorkDir
  :: ( Binary a
     , Typeable a
     , ToJSON a
     , HasProgramInfo env
     , DB.HasDB env
     , MonadReader env m
     , MonadIO m
     , MonadCatch m
     )
  => a -> m FilePath
newWorkDir :: a -> m FilePath
newWorkDir = KeyValMap a FilePath -> (a -> m FilePath) -> a -> m FilePath
forall (m :: * -> *) env a b.
(MonadIO m, MonadReader env m, HasDB env, MonadCatch m, ToJSON a,
 ToJSON b, Typeable b, FromJSON b) =>
KeyValMap a b -> (a -> m b) -> a -> m b
DB.memoizeWithMap (Text -> KeyValMap a FilePath
forall a b. Text -> KeyValMap a b
DB.KeyValMap Text
"workDirectories") ((a -> m FilePath) -> a -> m FilePath)
-> (a -> m FilePath) -> a -> m FilePath
forall a b. (a -> b) -> a -> b
$ \a
obj -> do
  FilePath
dataDir <- (env -> FilePath) -> m FilePath
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (ProgramInfo -> FilePath
programDataDir (ProgramInfo -> FilePath)
-> (env -> ProgramInfo) -> env -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. env -> ProgramInfo
forall a. HasProgramInfo a => a -> ProgramInfo
toProgramInfo)
  ObjectId
objId <- a -> m ObjectId
forall a env (m :: * -> *).
(Binary a, Typeable a, ToJSON a, HasDB env, MonadReader env m,
 MonadIO m, MonadCatch m) =>
a -> m ObjectId
getObjectId a
obj
  let workDir :: FilePath
workDir = FilePath
dataDir FilePath -> ShowS
</> ObjectId -> FilePath
objectIdToString ObjectId
objId
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
workDir
  FilePath -> m FilePath
forall (m :: * -> *) a. Monad m => a -> m a
return FilePath
workDir