{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Hyperion.Cluster where
import Control.Distributed.Process (NodeId, Process)
import Control.Distributed.Process.Node (initRemoteTable)
import Control.Lens (lens)
import Control.Monad.Catch (try, MonadCatch)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (ReaderT, asks, runReaderT, MonadReader)
import Data.Aeson (FromJSON, ToJSON)
import Data.Binary (Binary)
import Data.Constraint (Dict (..))
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Time.Clock (NominalDiffTime)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Hyperion.Command (hyperionWorkerCommand)
import qualified Hyperion.Database as DB
import Hyperion.HasWorkers (HasWorkerLauncher (..))
import Hyperion.HoldServer (HoldMap, blockUntilRetried)
import Hyperion.LockMap (LockMap, registerLockMap)
import qualified Hyperion.Log as Log
import Hyperion.ObjectId (getObjectId,
objectIdToString)
import Hyperion.ProgramId (ProgramId, programIdToText)
import Hyperion.Remote (RemoteError (..), ServiceId,
WorkerLauncher (..),
registerMasterNodeId,
runProcessLocalWithRT,
serviceIdToString,
serviceIdToText)
import Hyperion.Slurm (JobId (..), SbatchError,
SbatchOptions (..),
sbatchCommand)
import Hyperion.TokenPool (TokenPool, withToken)
import Hyperion.Static (Static (..), cPtr)
import Hyperion.Util (emailError, retryExponential)
import Hyperion.WorkerCpuPool (SSHCommand)
import System.Directory (createDirectoryIfMissing)
import System.FilePath.Posix ((<.>), (</>))
data ProgramInfo = ProgramInfo
{ ProgramInfo -> ProgramId
programId :: ProgramId
, ProgramInfo -> FilePath
programDatabase :: FilePath
, ProgramInfo -> FilePath
programLogDir :: FilePath
, ProgramInfo -> FilePath
programDataDir :: FilePath
, ProgramInfo -> SSHCommand
programSSHCommand :: SSHCommand
} deriving (ProgramInfo -> ProgramInfo -> Bool
(ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool) -> Eq ProgramInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ProgramInfo -> ProgramInfo -> Bool
$c/= :: ProgramInfo -> ProgramInfo -> Bool
== :: ProgramInfo -> ProgramInfo -> Bool
$c== :: ProgramInfo -> ProgramInfo -> Bool
Eq, Eq ProgramInfo
Eq ProgramInfo
-> (ProgramInfo -> ProgramInfo -> Ordering)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> Bool)
-> (ProgramInfo -> ProgramInfo -> ProgramInfo)
-> (ProgramInfo -> ProgramInfo -> ProgramInfo)
-> Ord ProgramInfo
ProgramInfo -> ProgramInfo -> Bool
ProgramInfo -> ProgramInfo -> Ordering
ProgramInfo -> ProgramInfo -> ProgramInfo
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ProgramInfo -> ProgramInfo -> ProgramInfo
$cmin :: ProgramInfo -> ProgramInfo -> ProgramInfo
max :: ProgramInfo -> ProgramInfo -> ProgramInfo
$cmax :: ProgramInfo -> ProgramInfo -> ProgramInfo
>= :: ProgramInfo -> ProgramInfo -> Bool
$c>= :: ProgramInfo -> ProgramInfo -> Bool
> :: ProgramInfo -> ProgramInfo -> Bool
$c> :: ProgramInfo -> ProgramInfo -> Bool
<= :: ProgramInfo -> ProgramInfo -> Bool
$c<= :: ProgramInfo -> ProgramInfo -> Bool
< :: ProgramInfo -> ProgramInfo -> Bool
$c< :: ProgramInfo -> ProgramInfo -> Bool
compare :: ProgramInfo -> ProgramInfo -> Ordering
$ccompare :: ProgramInfo -> ProgramInfo -> Ordering
$cp1Ord :: Eq ProgramInfo
Ord, Int -> ProgramInfo -> ShowS
[ProgramInfo] -> ShowS
ProgramInfo -> FilePath
(Int -> ProgramInfo -> ShowS)
-> (ProgramInfo -> FilePath)
-> ([ProgramInfo] -> ShowS)
-> Show ProgramInfo
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ProgramInfo] -> ShowS
$cshowList :: [ProgramInfo] -> ShowS
show :: ProgramInfo -> FilePath
$cshow :: ProgramInfo -> FilePath
showsPrec :: Int -> ProgramInfo -> ShowS
$cshowsPrec :: Int -> ProgramInfo -> ShowS
Show, (forall x. ProgramInfo -> Rep ProgramInfo x)
-> (forall x. Rep ProgramInfo x -> ProgramInfo)
-> Generic ProgramInfo
forall x. Rep ProgramInfo x -> ProgramInfo
forall x. ProgramInfo -> Rep ProgramInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ProgramInfo x -> ProgramInfo
$cfrom :: forall x. ProgramInfo -> Rep ProgramInfo x
Generic, Get ProgramInfo
[ProgramInfo] -> Put
ProgramInfo -> Put
(ProgramInfo -> Put)
-> Get ProgramInfo -> ([ProgramInfo] -> Put) -> Binary ProgramInfo
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [ProgramInfo] -> Put
$cputList :: [ProgramInfo] -> Put
get :: Get ProgramInfo
$cget :: Get ProgramInfo
put :: ProgramInfo -> Put
$cput :: ProgramInfo -> Put
Binary, Value -> Parser [ProgramInfo]
Value -> Parser ProgramInfo
(Value -> Parser ProgramInfo)
-> (Value -> Parser [ProgramInfo]) -> FromJSON ProgramInfo
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [ProgramInfo]
$cparseJSONList :: Value -> Parser [ProgramInfo]
parseJSON :: Value -> Parser ProgramInfo
$cparseJSON :: Value -> Parser ProgramInfo
FromJSON, [ProgramInfo] -> Encoding
[ProgramInfo] -> Value
ProgramInfo -> Encoding
ProgramInfo -> Value
(ProgramInfo -> Value)
-> (ProgramInfo -> Encoding)
-> ([ProgramInfo] -> Value)
-> ([ProgramInfo] -> Encoding)
-> ToJSON ProgramInfo
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [ProgramInfo] -> Encoding
$ctoEncodingList :: [ProgramInfo] -> Encoding
toJSONList :: [ProgramInfo] -> Value
$ctoJSONList :: [ProgramInfo] -> Value
toEncoding :: ProgramInfo -> Encoding
$ctoEncoding :: ProgramInfo -> Encoding
toJSON :: ProgramInfo -> Value
$ctoJSON :: ProgramInfo -> Value
ToJSON)
instance Static (Binary ProgramInfo) where closureDict :: Closure (Dict (Binary ProgramInfo))
closureDict = StaticPtr (Dict (Binary ProgramInfo))
-> Closure (Dict (Binary ProgramInfo))
forall a. Typeable a => StaticPtr a -> Closure a
cPtr (static Dict (Binary ProgramInfo)
forall (a :: Constraint). a => Dict a
Dict)
data ClusterEnv = ClusterEnv
{ ClusterEnv -> SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterWorkerLauncher :: SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
, ClusterEnv -> ProgramInfo
clusterProgramInfo :: ProgramInfo
, ClusterEnv -> SbatchOptions
clusterJobOptions :: SbatchOptions
, ClusterEnv -> Pool
clusterDatabasePool :: DB.Pool
, ClusterEnv -> Int
clusterDatabaseRetries :: Int
, ClusterEnv -> LockMap
clusterLockMap :: LockMap
}
class HasProgramInfo a where
toProgramInfo :: a -> ProgramInfo
instance HasProgramInfo ClusterEnv where
toProgramInfo :: ClusterEnv -> ProgramInfo
toProgramInfo = ClusterEnv -> ProgramInfo
clusterProgramInfo
type Cluster = ReaderT ClusterEnv Process
instance DB.HasDB ClusterEnv where
dbConfigLens :: (DatabaseConfig -> f DatabaseConfig) -> ClusterEnv -> f ClusterEnv
dbConfigLens = (ClusterEnv -> DatabaseConfig)
-> (ClusterEnv -> DatabaseConfig -> ClusterEnv)
-> Lens' ClusterEnv DatabaseConfig
forall s a b t. (s -> a) -> (s -> b -> t) -> Lens s t a b
lens ClusterEnv -> DatabaseConfig
get ClusterEnv -> DatabaseConfig -> ClusterEnv
set
where
get :: ClusterEnv -> DatabaseConfig
get ClusterEnv {Int
LockMap
SbatchOptions
Pool
ProgramInfo
SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: LockMap
clusterDatabaseRetries :: Int
clusterDatabasePool :: Pool
clusterJobOptions :: SbatchOptions
clusterProgramInfo :: ProgramInfo
clusterWorkerLauncher :: SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: ClusterEnv -> LockMap
clusterDatabaseRetries :: ClusterEnv -> Int
clusterDatabasePool :: ClusterEnv -> Pool
clusterJobOptions :: ClusterEnv -> SbatchOptions
clusterProgramInfo :: ClusterEnv -> ProgramInfo
clusterWorkerLauncher :: ClusterEnv -> SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
..} = DatabaseConfig :: Pool -> ProgramId -> Int -> DatabaseConfig
DB.DatabaseConfig
{ dbPool :: Pool
dbPool = Pool
clusterDatabasePool
, dbProgramId :: ProgramId
dbProgramId = ProgramInfo -> ProgramId
programId ProgramInfo
clusterProgramInfo
, dbRetries :: Int
dbRetries = Int
clusterDatabaseRetries
}
set :: ClusterEnv -> DatabaseConfig -> ClusterEnv
set ClusterEnv
h DB.DatabaseConfig {Int
ProgramId
Pool
dbRetries :: Int
dbProgramId :: ProgramId
dbPool :: Pool
dbRetries :: DatabaseConfig -> Int
dbProgramId :: DatabaseConfig -> ProgramId
dbPool :: DatabaseConfig -> Pool
..} = ClusterEnv
h
{ clusterDatabasePool :: Pool
clusterDatabasePool = Pool
dbPool
, clusterProgramInfo :: ProgramInfo
clusterProgramInfo = (ClusterEnv -> ProgramInfo
clusterProgramInfo ClusterEnv
h) { programId :: ProgramId
programId = ProgramId
dbProgramId }
, clusterDatabaseRetries :: Int
clusterDatabaseRetries = Int
dbRetries
}
instance HasWorkerLauncher ClusterEnv where
toWorkerLauncher :: ClusterEnv -> WorkerLauncher JobId
toWorkerLauncher ClusterEnv{Int
LockMap
SbatchOptions
Pool
ProgramInfo
SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: LockMap
clusterDatabaseRetries :: Int
clusterDatabasePool :: Pool
clusterJobOptions :: SbatchOptions
clusterProgramInfo :: ProgramInfo
clusterWorkerLauncher :: SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterLockMap :: ClusterEnv -> LockMap
clusterDatabaseRetries :: ClusterEnv -> Int
clusterDatabasePool :: ClusterEnv -> Pool
clusterJobOptions :: ClusterEnv -> SbatchOptions
clusterProgramInfo :: ClusterEnv -> ProgramInfo
clusterWorkerLauncher :: ClusterEnv -> SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
..} =
SbatchOptions -> ProgramInfo -> WorkerLauncher JobId
clusterWorkerLauncher SbatchOptions
clusterJobOptions ProgramInfo
clusterProgramInfo
data MPIJob = MPIJob
{ MPIJob -> Int
mpiNodes :: Int
, MPIJob -> Int
mpiNTasksPerNode :: Int
} deriving (MPIJob -> MPIJob -> Bool
(MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool) -> Eq MPIJob
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MPIJob -> MPIJob -> Bool
$c/= :: MPIJob -> MPIJob -> Bool
== :: MPIJob -> MPIJob -> Bool
$c== :: MPIJob -> MPIJob -> Bool
Eq, Eq MPIJob
Eq MPIJob
-> (MPIJob -> MPIJob -> Ordering)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> Bool)
-> (MPIJob -> MPIJob -> MPIJob)
-> (MPIJob -> MPIJob -> MPIJob)
-> Ord MPIJob
MPIJob -> MPIJob -> Bool
MPIJob -> MPIJob -> Ordering
MPIJob -> MPIJob -> MPIJob
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: MPIJob -> MPIJob -> MPIJob
$cmin :: MPIJob -> MPIJob -> MPIJob
max :: MPIJob -> MPIJob -> MPIJob
$cmax :: MPIJob -> MPIJob -> MPIJob
>= :: MPIJob -> MPIJob -> Bool
$c>= :: MPIJob -> MPIJob -> Bool
> :: MPIJob -> MPIJob -> Bool
$c> :: MPIJob -> MPIJob -> Bool
<= :: MPIJob -> MPIJob -> Bool
$c<= :: MPIJob -> MPIJob -> Bool
< :: MPIJob -> MPIJob -> Bool
$c< :: MPIJob -> MPIJob -> Bool
compare :: MPIJob -> MPIJob -> Ordering
$ccompare :: MPIJob -> MPIJob -> Ordering
$cp1Ord :: Eq MPIJob
Ord, Int -> MPIJob -> ShowS
[MPIJob] -> ShowS
MPIJob -> FilePath
(Int -> MPIJob -> ShowS)
-> (MPIJob -> FilePath) -> ([MPIJob] -> ShowS) -> Show MPIJob
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [MPIJob] -> ShowS
$cshowList :: [MPIJob] -> ShowS
show :: MPIJob -> FilePath
$cshow :: MPIJob -> FilePath
showsPrec :: Int -> MPIJob -> ShowS
$cshowsPrec :: Int -> MPIJob -> ShowS
Show, (forall x. MPIJob -> Rep MPIJob x)
-> (forall x. Rep MPIJob x -> MPIJob) -> Generic MPIJob
forall x. Rep MPIJob x -> MPIJob
forall x. MPIJob -> Rep MPIJob x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MPIJob x -> MPIJob
$cfrom :: forall x. MPIJob -> Rep MPIJob x
Generic, Get MPIJob
[MPIJob] -> Put
MPIJob -> Put
(MPIJob -> Put) -> Get MPIJob -> ([MPIJob] -> Put) -> Binary MPIJob
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [MPIJob] -> Put
$cputList :: [MPIJob] -> Put
get :: Get MPIJob
$cget :: Get MPIJob
put :: MPIJob -> Put
$cput :: MPIJob -> Put
Binary, Value -> Parser [MPIJob]
Value -> Parser MPIJob
(Value -> Parser MPIJob)
-> (Value -> Parser [MPIJob]) -> FromJSON MPIJob
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [MPIJob]
$cparseJSONList :: Value -> Parser [MPIJob]
parseJSON :: Value -> Parser MPIJob
$cparseJSON :: Value -> Parser MPIJob
FromJSON, [MPIJob] -> Encoding
[MPIJob] -> Value
MPIJob -> Encoding
MPIJob -> Value
(MPIJob -> Value)
-> (MPIJob -> Encoding)
-> ([MPIJob] -> Value)
-> ([MPIJob] -> Encoding)
-> ToJSON MPIJob
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [MPIJob] -> Encoding
$ctoEncodingList :: [MPIJob] -> Encoding
toJSONList :: [MPIJob] -> Value
$ctoJSONList :: [MPIJob] -> Value
toEncoding :: MPIJob -> Encoding
$ctoEncoding :: MPIJob -> Encoding
toJSON :: MPIJob -> Value
$ctoJSON :: MPIJob -> Value
ToJSON, Typeable)
runCluster :: ClusterEnv -> Cluster a -> IO a
runCluster :: ClusterEnv -> Cluster a -> IO a
runCluster ClusterEnv
clusterEnv Cluster a
h = RemoteTable -> Process a -> IO a
forall a. RemoteTable -> Process a -> IO a
runProcessLocalWithRT RemoteTable
rtable (Cluster a -> ClusterEnv -> Process a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Cluster a
h ClusterEnv
clusterEnv)
where
rtable :: RemoteTable
rtable = Maybe NodeId -> RemoteTable -> RemoteTable
registerMasterNodeId Maybe NodeId
forall a. Maybe a
Nothing
(RemoteTable -> RemoteTable) -> RemoteTable -> RemoteTable
forall a b. (a -> b) -> a -> b
$ LockMap -> RemoteTable -> RemoteTable
registerLockMap (ClusterEnv -> LockMap
clusterLockMap ClusterEnv
clusterEnv) RemoteTable
initRemoteTable
modifyJobOptions :: (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions :: (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions SbatchOptions -> SbatchOptions
f ClusterEnv
cfg = ClusterEnv
cfg { clusterJobOptions :: SbatchOptions
clusterJobOptions = SbatchOptions -> SbatchOptions
f (ClusterEnv -> SbatchOptions
clusterJobOptions ClusterEnv
cfg) }
setJobOptions :: SbatchOptions -> ClusterEnv -> ClusterEnv
setJobOptions :: SbatchOptions -> ClusterEnv -> ClusterEnv
setJobOptions SbatchOptions
c = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions (SbatchOptions -> SbatchOptions -> SbatchOptions
forall a b. a -> b -> a
const SbatchOptions
c)
setJobTime :: NominalDiffTime -> ClusterEnv -> ClusterEnv
setJobTime :: NominalDiffTime -> ClusterEnv -> ClusterEnv
setJobTime NominalDiffTime
t = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { time :: NominalDiffTime
time = NominalDiffTime
t }
setJobMemory :: Text -> ClusterEnv -> ClusterEnv
setJobMemory :: Text -> ClusterEnv -> ClusterEnv
setJobMemory Text
m = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { mem :: Maybe Text
mem = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
m }
setJobType :: MPIJob -> ClusterEnv -> ClusterEnv
setJobType :: MPIJob -> ClusterEnv -> ClusterEnv
setJobType MPIJob{Int
mpiNTasksPerNode :: Int
mpiNodes :: Int
mpiNTasksPerNode :: MPIJob -> Int
mpiNodes :: MPIJob -> Int
..} = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts
{ nodes :: Int
nodes = Int
mpiNodes
, nTasksPerNode :: Int
nTasksPerNode = Int
mpiNTasksPerNode
}
setSlurmPartition :: Text -> ClusterEnv -> ClusterEnv
setSlurmPartition :: Text -> ClusterEnv -> ClusterEnv
setSlurmPartition Text
p = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { partition :: Maybe Text
partition = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
p }
setSlurmConstraint :: Text -> ClusterEnv -> ClusterEnv
setSlurmConstraint :: Text -> ClusterEnv -> ClusterEnv
setSlurmConstraint Text
c = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { constraint :: Maybe Text
constraint = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
c }
setSlurmAccount :: Text -> ClusterEnv -> ClusterEnv
setSlurmAccount :: Text -> ClusterEnv -> ClusterEnv
setSlurmAccount Text
a = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { account :: Maybe Text
account = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
a }
setSlurmQos :: Text -> ClusterEnv -> ClusterEnv
setSlurmQos :: Text -> ClusterEnv -> ClusterEnv
setSlurmQos Text
a = (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
modifyJobOptions ((SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv)
-> (SbatchOptions -> SbatchOptions) -> ClusterEnv -> ClusterEnv
forall a b. (a -> b) -> a -> b
$ \SbatchOptions
opts -> SbatchOptions
opts { qos :: Maybe Text
qos = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
a }
defaultDBRetries :: Int
defaultDBRetries :: Int
defaultDBRetries = Int
20
dbConfigFromProgramInfo :: ProgramInfo -> IO DB.DatabaseConfig
dbConfigFromProgramInfo :: ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
pInfo = do
Pool
dbPool <- FilePath -> IO Pool
DB.newDefaultPool (ProgramInfo -> FilePath
programDatabase ProgramInfo
pInfo)
let dbProgramId :: ProgramId
dbProgramId = ProgramInfo -> ProgramId
programId ProgramInfo
pInfo
dbRetries :: Int
dbRetries = Int
defaultDBRetries
DatabaseConfig -> IO DatabaseConfig
forall (m :: * -> *) a. Monad m => a -> m a
return DatabaseConfig :: Pool -> ProgramId -> Int -> DatabaseConfig
DB.DatabaseConfig{Int
ProgramId
Pool
dbRetries :: Int
dbProgramId :: ProgramId
dbPool :: Pool
dbRetries :: Int
dbProgramId :: ProgramId
dbPool :: Pool
..}
runDBWithProgramInfo :: ProgramInfo -> ReaderT DB.DatabaseConfig IO a -> IO a
runDBWithProgramInfo :: ProgramInfo -> ReaderT DatabaseConfig IO a -> IO a
runDBWithProgramInfo ProgramInfo
pInfo ReaderT DatabaseConfig IO a
m = do
ProgramInfo -> IO DatabaseConfig
dbConfigFromProgramInfo ProgramInfo
pInfo IO DatabaseConfig -> (DatabaseConfig -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReaderT DatabaseConfig IO a -> DatabaseConfig -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT DatabaseConfig IO a
m
slurmWorkerLauncher
:: Maybe Text
-> FilePath
-> HoldMap
-> Int
-> TokenPool
-> SbatchOptions
-> ProgramInfo
-> WorkerLauncher JobId
slurmWorkerLauncher :: Maybe Text
-> FilePath
-> HoldMap
-> Int
-> TokenPool
-> SbatchOptions
-> ProgramInfo
-> WorkerLauncher JobId
slurmWorkerLauncher Maybe Text
emailAddr FilePath
hyperionExec HoldMap
holdMap Int
holdPort TokenPool
sbatchTokenPool SbatchOptions
opts ProgramInfo
progInfo =
WorkerLauncher :: forall j.
(forall b. NodeId -> ServiceId -> (j -> Process b) -> Process b)
-> Maybe NominalDiffTime
-> (forall b. RemoteError -> Process b -> Process b)
-> WorkerLauncher j
WorkerLauncher {Maybe NominalDiffTime
forall a. Maybe a
forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
forall b. RemoteError -> Process b -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: Maybe NominalDiffTime
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker :: forall b. NodeId -> ServiceId -> (JobId -> Process b) -> Process b
onRemoteError :: forall b. RemoteError -> Process b -> Process b
connectionTimeout :: forall a. Maybe a
..}
where
connectionTimeout :: Maybe a
connectionTimeout = Maybe a
forall a. Maybe a
Nothing
emailAlertUser :: (MonadIO m, Show e) => e -> m ()
emailAlertUser :: e -> m ()
emailAlertUser e
e = case Maybe Text
emailAddr of
Just Text
toAddr -> Text -> e -> m ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
emailError Text
toAddr e
e
Maybe Text
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
onRemoteError :: forall b . RemoteError -> Process b -> Process b
onRemoteError :: RemoteError -> Process b -> Process b
onRemoteError e :: RemoteError
e@(RemoteError ServiceId
sId RemoteErrorType
_) Process b
go = do
let
errInfo :: (RemoteError, ProgramInfo, Text)
errInfo = (RemoteError
e, ProgramInfo
progInfo, Text
msg)
msg :: Text
msg = [Text] -> Text
forall a. Monoid a => [a] -> a
mconcat
[ Text
"This remote process has been put on hold because of an error. "
, Text
"To retry it, run 'curl localhost:"
, FilePath -> Text
Text.pack (Int -> FilePath
forall a. Show a => a -> FilePath
show Int
holdPort)
, Text
"/retry/"
, ServiceId -> Text
serviceIdToText ServiceId
sId
, Text
"'"
]
(RemoteError, ProgramInfo, Text) -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => a -> m ()
Log.err (RemoteError, ProgramInfo, Text)
errInfo
(RemoteError, ProgramInfo, Text) -> Process ()
forall (m :: * -> *) e. (MonadIO m, Show e) => e -> m ()
emailAlertUser (RemoteError, ProgramInfo, Text)
errInfo
HoldMap -> Text -> Process ()
forall (m :: * -> *). MonadIO m => HoldMap -> Text -> m ()
blockUntilRetried HoldMap
holdMap (ServiceId -> Text
serviceIdToText ServiceId
sId)
Text -> ServiceId -> Process ()
forall a (m :: * -> *). (Show a, MonadIO m) => Text -> a -> m ()
Log.info Text
"Retrying" ServiceId
sId
Process b
go
withLaunchedWorker :: forall b . NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker :: NodeId -> ServiceId -> (JobId -> Process b) -> Process b
withLaunchedWorker NodeId
nid ServiceId
serviceId JobId -> Process b
goJobId = TokenPool -> Process b -> Process b
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
TokenPool -> m a -> m a
withToken TokenPool
sbatchTokenPool (Process b -> Process b) -> Process b -> Process b
forall a b. (a -> b) -> a -> b
$ do
JobId
jobId <- IO JobId -> Process JobId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO JobId -> Process JobId) -> IO JobId -> Process JobId
forall a b. (a -> b) -> a -> b
$
(IO JobId -> IO (Either SbatchError JobId))
-> (WaitRetry SbatchError -> IO ()) -> IO JobId -> IO JobId
forall (m :: * -> *) a e.
MonadIO m =>
(m a -> m (Either e a)) -> (WaitRetry e -> m ()) -> m a -> m a
retryExponential (forall a.
(MonadCatch IO, Exception SbatchError) =>
IO a -> IO (Either SbatchError a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @IO @SbatchError) WaitRetry SbatchError -> IO ()
logSbatchError (IO JobId -> IO JobId) -> IO JobId -> IO JobId
forall a b. (a -> b) -> a -> b
$
SbatchOptions -> FilePath -> [Text] -> IO JobId
sbatchCommand SbatchOptions
opts' FilePath
cmd ((FilePath -> Text) -> [FilePath] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map FilePath -> Text
Text.pack [FilePath]
args)
JobId -> Process b
goJobId JobId
jobId
where
progId :: ProgramId
progId = ProgramInfo -> ProgramId
programId ProgramInfo
progInfo
logFile :: FilePath
logFile = ProgramInfo -> FilePath
programLogDir ProgramInfo
progInfo FilePath -> ShowS
</> ServiceId -> FilePath
serviceIdToString ServiceId
serviceId FilePath -> ShowS
<.> FilePath
"log"
opts' :: SbatchOptions
opts' = SbatchOptions
opts
{ jobName :: Maybe Text
jobName = Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ ProgramId -> Text
programIdToText ProgramId
progId Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ServiceId -> Text
serviceIdToText ServiceId
serviceId
}
(FilePath
cmd, [FilePath]
args) = FilePath
-> NodeId -> ServiceId -> FilePath -> (FilePath, [FilePath])
hyperionWorkerCommand FilePath
hyperionExec NodeId
nid ServiceId
serviceId FilePath
logFile
logSbatchError :: WaitRetry SbatchError -> IO ()
logSbatchError WaitRetry SbatchError
e = do
WaitRetry SbatchError -> IO ()
forall a (m :: * -> *). (Show a, MonadIO m) => a -> m ()
Log.err WaitRetry SbatchError
e
(WaitRetry SbatchError, ProgramInfo, NodeId, ServiceId) -> IO ()
forall (m :: * -> *) e. (MonadIO m, Show e) => e -> m ()
emailAlertUser (WaitRetry SbatchError
e, ProgramInfo
progInfo, NodeId
nid, ServiceId
serviceId)
newWorkDir
:: ( Binary a
, Typeable a
, ToJSON a
, HasProgramInfo env
, DB.HasDB env
, MonadReader env m
, MonadIO m
, MonadCatch m
)
=> a -> m FilePath
newWorkDir :: a -> m FilePath
newWorkDir = KeyValMap a FilePath -> (a -> m FilePath) -> a -> m FilePath
forall (m :: * -> *) env a b.
(MonadIO m, MonadReader env m, HasDB env, MonadCatch m, ToJSON a,
ToJSON b, Typeable b, FromJSON b) =>
KeyValMap a b -> (a -> m b) -> a -> m b
DB.memoizeWithMap (Text -> KeyValMap a FilePath
forall a b. Text -> KeyValMap a b
DB.KeyValMap Text
"workDirectories") ((a -> m FilePath) -> a -> m FilePath)
-> (a -> m FilePath) -> a -> m FilePath
forall a b. (a -> b) -> a -> b
$ \a
obj -> do
FilePath
dataDir <- (env -> FilePath) -> m FilePath
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (ProgramInfo -> FilePath
programDataDir (ProgramInfo -> FilePath)
-> (env -> ProgramInfo) -> env -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. env -> ProgramInfo
forall a. HasProgramInfo a => a -> ProgramInfo
toProgramInfo)
ObjectId
objId <- a -> m ObjectId
forall a env (m :: * -> *).
(Binary a, Typeable a, ToJSON a, HasDB env, MonadReader env m,
MonadIO m, MonadCatch m) =>
a -> m ObjectId
getObjectId a
obj
let workDir :: FilePath
workDir = FilePath
dataDir FilePath -> ShowS
</> ObjectId -> FilePath
objectIdToString ObjectId
objId
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
workDir
FilePath -> m FilePath
forall (m :: * -> *) a. Monad m => a -> m a
return FilePath
workDir