{-# LANGUAGE FlexibleContexts  #-}
{-# LANGUAGE FlexibleInstances #-}

{-| This module contains copies of several utilities from
"Control.Concurrent.Async" from the @async@ package, with 'IO'
replaced by 'Process'.
-}
module Hyperion.Concurrent where

import           Control.Applicative         (Alternative (..), liftA2)
import           Control.Concurrent
import           Control.Distributed.Process (Process, kill, liftIO, spawnLocal)
import           Control.Exception           (SomeException, throwIO)
import           Control.Monad               (forever)
import           Control.Monad.Catch         (catch, mask, onException)
import           Control.Monad.Reader        (ReaderT (..))
import           Data.Foldable               (sequenceA_)


-- | Runs the two 'Process'es concurrently and returns the first available result,
-- 'kill'ing the unfinished process. If any of the processes throw an exception, 
-- the processes are 'kill'ed and the exception is propagated out of 'race'.
race :: Process a -> Process b -> Process (Either a b)
race :: Process a -> Process b -> Process (Either a b)
race Process a
left Process b
right = Process a
-> Process b
-> (MVar (Either SomeException (Either a b))
    -> Process (Either a b))
-> Process (Either a b)
forall a b r.
Process a
-> Process b
-> (MVar (Either SomeException (Either a b)) -> Process r)
-> Process r
concurrently' Process a
left Process b
right MVar (Either SomeException (Either a b)) -> Process (Either a b)
forall (m :: * -> *) e b.
(MonadIO m, Exception e) =>
MVar (Either e b) -> m b
collect
    where
      collect :: MVar (Either e b) -> m b
collect MVar (Either e b)
m = do
        Either e b
e <- IO (Either e b) -> m (Either e b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either e b) -> m (Either e b))
-> IO (Either e b) -> m (Either e b)
forall a b. (a -> b) -> a -> b
$ MVar (Either e b) -> IO (Either e b)
forall a. MVar a -> IO a
takeMVar MVar (Either e b)
m
        case Either e b
e of
          Left e
ex -> IO b -> m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> IO b -> m b
forall a b. (a -> b) -> a -> b
$ e -> IO b
forall e a. Exception e => e -> IO a
throwIO e
ex
          Right b
r -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- | Runs the two 'Process'es concurrently and returns both results. If any of the 
-- processes throw an exception, the processes are 'kill'ed and the 
-- exception is propagated out of 'concurrently'.
concurrently :: Process a -> Process b -> Process (a,b)
concurrently :: Process a -> Process b -> Process (a, b)
concurrently Process a
left Process b
right = Process a
-> Process b
-> (MVar (Either SomeException (Either a b)) -> Process (a, b))
-> Process (a, b)
forall a b r.
Process a
-> Process b
-> (MVar (Either SomeException (Either a b)) -> Process r)
-> Process r
concurrently' Process a
left Process b
right ([Either a b]
-> MVar (Either SomeException (Either a b)) -> Process (a, b)
forall (m :: * -> *) e a b.
(MonadIO m, Exception e) =>
[Either a b] -> MVar (Either e (Either a b)) -> m (a, b)
collect [])
    where collect :: [Either a b] -> MVar (Either e (Either a b)) -> m (a, b)
collect [Left a
a, Right b
b] MVar (Either e (Either a b))
_ = (a, b) -> m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
          collect [Right b
b, Left a
a] MVar (Either e (Either a b))
_ = (a, b) -> m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
          collect [Either a b]
xs MVar (Either e (Either a b))
m = do
            Either e (Either a b)
e <- IO (Either e (Either a b)) -> m (Either e (Either a b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either e (Either a b)) -> m (Either e (Either a b)))
-> IO (Either e (Either a b)) -> m (Either e (Either a b))
forall a b. (a -> b) -> a -> b
$ MVar (Either e (Either a b)) -> IO (Either e (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either e (Either a b))
m
            case Either e (Either a b)
e of
              Left e
ex -> IO (a, b) -> m (a, b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (a, b) -> m (a, b)) -> IO (a, b) -> m (a, b)
forall a b. (a -> b) -> a -> b
$ e -> IO (a, b)
forall e a. Exception e => e -> IO a
throwIO e
ex
              Right Either a b
r -> [Either a b] -> MVar (Either e (Either a b)) -> m (a, b)
collect (Either a b
rEither a b -> [Either a b] -> [Either a b]
forall a. a -> [a] -> [a]
:[Either a b]
xs) MVar (Either e (Either a b))
m

-- | Runs two 'Process'es concurrently in two new threads. Each process will 
-- compute the result  and 'putMVar' it into an 'MVar'. The user-supplied 
-- continuation is applied to this 'MVar' concurrently with the two threads. 
-- When the continuation returns a result, the new threads are 'kill'ed and the
-- result is returned from 'concurrently''. If a thread fails, the 'MVar' is 
-- filled with 'Left' 'SomeException'. 
--
-- Note that the continutation can inspect the results of both threads by emptying
-- the 'MVar' when appropriate.
--
-- TODO: This code was originally copied from the
-- "Control.Concurrent.Async" module, with 'forkIO' replaced by
-- 'spawnLocal'. As of @async-2.1@, the code for this function has
-- changed. Have a look and figure out why, and whether the changes
-- should be ported here?
concurrently' :: Process a -> Process b
              -> (MVar (Either SomeException (Either a b)) -> Process r)
              -> Process r
concurrently' :: Process a
-> Process b
-> (MVar (Either SomeException (Either a b)) -> Process r)
-> Process r
concurrently' Process a
left Process b
right MVar (Either SomeException (Either a b)) -> Process r
collect = do
  MVar (Either SomeException (Either a b))
done <- IO (MVar (Either SomeException (Either a b)))
-> Process (MVar (Either SomeException (Either a b)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (Either SomeException (Either a b)))
forall a. IO (MVar a)
newEmptyMVar
  ((forall a. Process a -> Process a) -> Process r) -> Process r
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. Process a -> Process a) -> Process r) -> Process r)
-> ((forall a. Process a -> Process a) -> Process r) -> Process r
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
restore -> do
    ProcessId
lid <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
forall a. Process a -> Process a
restore (Process a
left Process a -> (a -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (a -> IO ()) -> a -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (a -> Either SomeException (Either a b)) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (a -> Either a b) -> a -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a b
forall a b. a -> Either a b
Left)
                                   Process () -> (SomeException -> Process ()) -> Process ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (SomeException -> IO ()) -> SomeException -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
    ProcessId
rid <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
forall a. Process a -> Process a
restore (Process b
right Process b -> (b -> Process ()) -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (b -> IO ()) -> b -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (b -> Either SomeException (Either a b)) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (b -> Either a b) -> b -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either a b
forall a b. b -> Either a b
Right)
                                   Process () -> (SomeException -> Process ()) -> Process ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` (IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (SomeException -> IO ()) -> SomeException -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
    let stop :: Process ()
stop = ProcessId -> String -> Process ()
kill ProcessId
lid String
"process died" Process () -> Process () -> Process ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessId -> String -> Process ()
kill ProcessId
rid String
"process died"
    r
r <- Process r -> Process r
forall a. Process a -> Process a
restore (MVar (Either SomeException (Either a b)) -> Process r
collect MVar (Either SomeException (Either a b))
done) Process r -> Process () -> Process r
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` Process ()
stop
    Process ()
stop
    r -> Process r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r

-- * 'Concurrently' type
-- $ 
-- @'Concurrently' m a@ represents a computation of type @m a@ that
-- can be run concurrently with other 'Concurrently'-type
-- computations. It is essentially a copy of the 'Concurrently'
-- applicative from "Control.Concurrent.Async", with 'IO' replaced by
-- 'Process'.
--
-- 'Concurrently' is an instance of several type classes (see below), but
-- most notable are the instances of 'Applicative' and 'Alternative'. 
-- The instances we define are for @'Concurrently' ('ReaderT' r 'Process')@, 
-- which in practice means @'Concurrently' 'Hyperion.Cluster.Cluster'@ and 
-- @'Concurrently' 'Hyperion.Job.Job'@.
--
-- = 'Applicative' instance
-- 
-- The 'Applicative' instance defines for @f :: 'Concurrently' m (a->b)@ and
-- @x :: 'Concurrently' m a@ computations
--
-- > f <*> x
--
-- to be the 'Concurrently m b' computation which is peformed as follows:
-- first, @f@ and @x@ are computed concurrently using 'concurrently', yielding values of the types
-- @a->b@ and @a@. Then these values are combined in the obvious way into a value
-- of type @b@. The implementation of @'pure' x@ is simply the computation which
-- returns @x@.
--
-- In this way, we can define, for example,
--
-- > doConcurrently' :: [ Concurrently m a ] -> Concurrently m [a]
-- > doConcurrenlty' [] = pure []
-- > doConcurrenlty' (x:xs) = pure (:) <*> x <*> doConcurrently' xs
--
-- which takes a list of computations and returns a computation that performs 
-- these computations concurrently and combines their results into a list. 
-- 
-- This definition of 'doConcurrently'' works in the following way. The first line
-- is the base for our recursive definition -- an empty list is a trivial computation.
-- The second line will first compute @'pure' (:)@ -- this is a rather trivial
-- concurrent computation that returns the function @'(:)'@. We combine this 
-- calcuation with @x@ using '<*>', which makes it into a calculation that computes
-- the function @(:) x'@, i.e. the function that prepends @x'@ to a list. Here 
-- @x'@ is the result of the calculation @x@. Then we combine this calculation
-- with @doConcurrently' xs@, which means that we compute @(:) x'@ concurrently
-- with @doConcurrently' xs@, and then combine the results. The result of 
-- @doConcurrently' xs@ will be a list of values, and we prepend @x'@ to it.
-- Recursing into @doConcurrently' xs@ we see that @doConcurrently'@ indeed works
-- by taking a list of computations, performing the calcuations concurrently, and
-- returning the list of results. 
--
-- We can also define a more convenient function 'doConcurrently' as follows
-- 
-- > doConcurrently :: [ m a ] -> m [a]
-- > doConcurrently xs = runConcurrently $ doConcurrently' $ map Concurrently xs
--
-- which works by dressing @m a@ in the list in 'Concurrently', performing 
-- @doConcurrently'@ and then finally removing the 'Concurrently' constructor using
-- 'runConcurrently'. The actual definition of 'doConcurrently' is more general,
-- so that it works on any 'Traversable' instance, not just on lists.
--
-- = 'Alternative' instance
-- $
-- The 'Alternative' instance of 'Concurrenlty' is similar, except it uses 
-- 'race' instead of 'concurrenlty'. Specifically,
--
-- > a <|> b
-- 
-- performs the computations @a@ and @b@ concurrently and returns the result of
-- whichever finishes earlier, using 'race'. The 'empty' implementation is a 
-- computation which never returns. In particular, 
--
-- > empty <|> empty
--
-- never returns (so is the same as 'empty', to some extent). We can do, for example
-- 
-- > asum [ x, y, z ]
--
-- which will run @x@, @y@, and @z@ concurrently and return the first returned 
-- result, 'kill'ing the unfinished computations.

newtype Concurrently m a = Concurrently { Concurrently m a -> m a
runConcurrently :: m a }

instance Functor m => Functor (Concurrently m) where
  fmap :: (a -> b) -> Concurrently m a -> Concurrently m b
fmap a -> b
f (Concurrently m a
m) = m b -> Concurrently m b
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently ((a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f m a
m)

instance Applicative (Concurrently Process) where
  pure :: a -> Concurrently Process a
pure = Process a -> Concurrently Process a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (Process a -> Concurrently Process a)
-> (a -> Process a) -> a -> Concurrently Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Process a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  Concurrently Process (a -> b)
f <*> :: Concurrently Process (a -> b)
-> Concurrently Process a -> Concurrently Process b
<*> Concurrently Process a
a =
    Process b -> Concurrently Process b
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (Process b -> Concurrently Process b)
-> Process b -> Concurrently Process b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b) -> (a -> b, a) -> b
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (a -> b) -> a -> b
forall a b. (a -> b) -> a -> b
($) ((a -> b, a) -> b) -> Process (a -> b, a) -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process (a -> b) -> Process a -> Process (a -> b, a)
forall a b. Process a -> Process b -> Process (a, b)
concurrently Process (a -> b)
f Process a
a

instance Alternative (Concurrently Process) where
  empty :: Concurrently Process a
empty = Process a -> Concurrently Process a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (Process a -> Concurrently Process a)
-> Process a -> Concurrently Process a
forall a b. (a -> b) -> a -> b
$ IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a) -> IO a -> Process a
forall a b. (a -> b) -> a -> b
$ IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound)
  Concurrently Process a
as <|> :: Concurrently Process a
-> Concurrently Process a -> Concurrently Process a
<|> Concurrently Process a
bs =
    Process a -> Concurrently Process a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (Process a -> Concurrently Process a)
-> Process a -> Concurrently Process a
forall a b. (a -> b) -> a -> b
$ (a -> a) -> (a -> a) -> Either a a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> a
forall a. a -> a
id a -> a
forall a. a -> a
id (Either a a -> a) -> Process (Either a a) -> Process a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process a -> Process a -> Process (Either a a)
forall a b. Process a -> Process b -> Process (Either a b)
race Process a
as Process a
bs

instance (Functor m, Applicative (Concurrently m)) => Applicative (Concurrently (ReaderT r m)) where
  pure :: a -> Concurrently (ReaderT r m) a
pure = ReaderT r m a -> Concurrently (ReaderT r m) a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (ReaderT r m a -> Concurrently (ReaderT r m) a)
-> (a -> ReaderT r m a) -> a -> Concurrently (ReaderT r m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (r -> m a) -> ReaderT r m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((r -> m a) -> ReaderT r m a)
-> (a -> r -> m a) -> a -> ReaderT r m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> r -> m a
forall a b. a -> b -> a
const (m a -> r -> m a) -> (a -> m a) -> a -> r -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Concurrently m a -> m a
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently (Concurrently m a -> m a) -> (a -> Concurrently m a) -> a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Concurrently m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  Concurrently (ReaderT r -> m (a -> b)
f) <*> :: Concurrently (ReaderT r m) (a -> b)
-> Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) b
<*> Concurrently (ReaderT r -> m a
a) =
    ReaderT r m b -> Concurrently (ReaderT r m) b
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (ReaderT r m b -> Concurrently (ReaderT r m) b)
-> ReaderT r m b -> Concurrently (ReaderT r m) b
forall a b. (a -> b) -> a -> b
$ (r -> m b) -> ReaderT r m b
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((r -> m b) -> ReaderT r m b) -> (r -> m b) -> ReaderT r m b
forall a b. (a -> b) -> a -> b
$ \r
r -> Concurrently m b -> m b
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently (Concurrently m b -> m b) -> Concurrently m b -> m b
forall a b. (a -> b) -> a -> b
$ m (a -> b) -> Concurrently m (a -> b)
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (r -> m (a -> b)
f r
r) Concurrently m (a -> b) -> Concurrently m a -> Concurrently m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (r -> m a
a r
r)

instance (Functor m, Alternative (Concurrently m)) => Alternative (Concurrently (ReaderT r m)) where
  empty :: Concurrently (ReaderT r m) a
empty = ReaderT r m a -> Concurrently (ReaderT r m) a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (ReaderT r m a -> Concurrently (ReaderT r m) a)
-> ReaderT r m a -> Concurrently (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ (r -> m a) -> ReaderT r m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((r -> m a) -> ReaderT r m a) -> (r -> m a) -> ReaderT r m a
forall a b. (a -> b) -> a -> b
$ m a -> r -> m a
forall a b. a -> b -> a
const (m a -> r -> m a) -> m a -> r -> m a
forall a b. (a -> b) -> a -> b
$ Concurrently m a -> m a
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently Concurrently m a
forall (f :: * -> *) a. Alternative f => f a
empty
  Concurrently (ReaderT r -> m a
as) <|> :: Concurrently (ReaderT r m) a
-> Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) a
<|> Concurrently (ReaderT r -> m a
bs) =
    ReaderT r m a -> Concurrently (ReaderT r m) a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (ReaderT r m a -> Concurrently (ReaderT r m) a)
-> ReaderT r m a -> Concurrently (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ (r -> m a) -> ReaderT r m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((r -> m a) -> ReaderT r m a) -> (r -> m a) -> ReaderT r m a
forall a b. (a -> b) -> a -> b
$ \r
r -> Concurrently m a -> m a
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently (Concurrently m a -> m a) -> Concurrently m a -> m a
forall a b. (a -> b) -> a -> b
$ m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (r -> m a
as r
r) Concurrently m a -> Concurrently m a -> Concurrently m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (r -> m a
bs r
r)

instance (Applicative (Concurrently m), Semigroup a) => Semigroup (Concurrently m a) where
  <> :: Concurrently m a -> Concurrently m a -> Concurrently m a
(<>) = (a -> a -> a)
-> Concurrently m a -> Concurrently m a -> Concurrently m a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)

instance (Applicative (Concurrently m), Semigroup a, Monoid a) => Monoid (Concurrently m a) where
  mempty :: Concurrently m a
mempty = a -> Concurrently m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty
  mappend :: Concurrently m a -> Concurrently m a -> Concurrently m a
mappend = Concurrently m a -> Concurrently m a -> Concurrently m a
forall a. Semigroup a => a -> a -> a
(<>)

-- * Utility functions using 'Applicative' instance of 'Concurrently'
-- $

-- | Run several computations in a traversable structure concurrently and collect the results.
doConcurrently :: (Applicative (Concurrently m), Traversable t) => t (m a) -> m (t a)
doConcurrently :: t (m a) -> m (t a)
doConcurrently = Concurrently m (t a) -> m (t a)
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently (Concurrently m (t a) -> m (t a))
-> (t (m a) -> Concurrently m (t a)) -> t (m a) -> m (t a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t (Concurrently m a) -> Concurrently m (t a)
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA (t (Concurrently m a) -> Concurrently m (t a))
-> (t (m a) -> t (Concurrently m a))
-> t (m a)
-> Concurrently m (t a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m a -> Concurrently m a) -> t (m a) -> t (Concurrently m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently

-- | Run several computations in a traversable structure concurrently and forget the results.
doConcurrently_ :: (Applicative (Concurrently m), Foldable t, Functor t) => t (m a) -> m ()
doConcurrently_ :: t (m a) -> m ()
doConcurrently_ = Concurrently m () -> m ()
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently (Concurrently m () -> m ())
-> (t (m a) -> Concurrently m ()) -> t (m a) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t (Concurrently m a) -> Concurrently m ()
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Applicative f) =>
t (f a) -> f ()
sequenceA_ (t (Concurrently m a) -> Concurrently m ())
-> (t (m a) -> t (Concurrently m a))
-> t (m a)
-> Concurrently m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m a -> Concurrently m a) -> t (m a) -> t (Concurrently m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently

-- | Concurrently map a function over a traversable structure.
mapConcurrently :: (Applicative (Concurrently m), Traversable t) => (a -> m b) -> t a -> m (t b)
mapConcurrently :: (a -> m b) -> t a -> m (t b)
mapConcurrently a -> m b
f = t (m b) -> m (t b)
forall (m :: * -> *) (t :: * -> *) a.
(Applicative (Concurrently m), Traversable t) =>
t (m a) -> m (t a)
doConcurrently (t (m b) -> m (t b)) -> (t a -> t (m b)) -> t a -> m (t b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m b) -> t a -> t (m b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> m b
f

-- | Concurrently map a function over a traversable structure and forget the results.
mapConcurrently_ :: (Applicative (Concurrently m), Foldable t, Functor t) => (a -> m b) -> t a -> m ()
mapConcurrently_ :: (a -> m b) -> t a -> m ()
mapConcurrently_ a -> m b
f = t (m b) -> m ()
forall (m :: * -> *) (t :: * -> *) a.
(Applicative (Concurrently m), Foldable t, Functor t) =>
t (m a) -> m ()
doConcurrently_ (t (m b) -> m ()) -> (t a -> t (m b)) -> t a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m b) -> t a -> t (m b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> m b
f

-- | Flipped version of 'mapConcurrently'.
forConcurrently :: (Applicative (Concurrently m), Traversable t) => t a -> (a -> m b) -> m (t b)
forConcurrently :: t a -> (a -> m b) -> m (t b)
forConcurrently = ((a -> m b) -> t a -> m (t b)) -> t a -> (a -> m b) -> m (t b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m b) -> t a -> m (t b)
forall (m :: * -> *) (t :: * -> *) a b.
(Applicative (Concurrently m), Traversable t) =>
(a -> m b) -> t a -> m (t b)
mapConcurrently

-- | Flipped version of 'mapConcurrently_'.
forConcurrently_ :: (Applicative (Concurrently m), Foldable t, Functor t) => t a -> (a -> m b) -> m ()
forConcurrently_ :: t a -> (a -> m b) -> m ()
forConcurrently_ = ((a -> m b) -> t a -> m ()) -> t a -> (a -> m b) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m b) -> t a -> m ()
forall (m :: * -> *) (t :: * -> *) a b.
(Applicative (Concurrently m), Foldable t, Functor t) =>
(a -> m b) -> t a -> m ()
mapConcurrently_

-- | Concurrently run @n@ copies of a computation and collect the results in a list.
replicateConcurrently :: Applicative (Concurrently m) => Int -> m a -> m [a]
replicateConcurrently :: Int -> m a -> m [a]
replicateConcurrently Int
n = [m a] -> m [a]
forall (m :: * -> *) (t :: * -> *) a.
(Applicative (Concurrently m), Traversable t) =>
t (m a) -> m (t a)
doConcurrently ([m a] -> m [a]) -> (m a -> [m a]) -> m a -> m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> [m a]
forall a. Int -> a -> [a]
replicate Int
n

-- | Concurrently run @n@ copies of a computation and forget the results.
replicateConcurrently_ :: Applicative (Concurrently m) => Int -> m a -> m ()
replicateConcurrently_ :: Int -> m a -> m ()
replicateConcurrently_ Int
n = [m a] -> m ()
forall (m :: * -> *) (t :: * -> *) a.
(Applicative (Concurrently m), Foldable t, Functor t) =>
t (m a) -> m ()
doConcurrently_ ([m a] -> m ()) -> (m a -> [m a]) -> m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> [m a]
forall a. Int -> a -> [a]
replicate Int
n