hyperion-0.1.0.0
Safe HaskellNone
LanguageHaskell2010

Hyperion.Concurrent

Description

This module contains copies of several utilities from Control.Concurrent.Async from the async package, with IO replaced by Process.

Synopsis

Documentation

race :: Process a -> Process b -> Process (Either a b) Source #

Runs the two Processes concurrently and returns the first available result, killing the unfinished process. If any of the processes throw an exception, the processes are killed and the exception is propagated out of race.

concurrently :: Process a -> Process b -> Process (a, b) Source #

Runs the two Processes concurrently and returns both results. If any of the processes throw an exception, the processes are killed and the exception is propagated out of concurrently.

concurrently' :: Process a -> Process b -> (MVar (Either SomeException (Either a b)) -> Process r) -> Process r Source #

Runs two Processes concurrently in two new threads. Each process will compute the result and putMVar it into an MVar. The user-supplied continuation is applied to this MVar concurrently with the two threads. When the continuation returns a result, the new threads are killed and the result is returned from concurrently'. If a thread fails, the MVar is filled with Left SomeException.

Note that the continutation can inspect the results of both threads by emptying the MVar when appropriate.

TODO: This code was originally copied from the Control.Concurrent.Async module, with forkIO replaced by spawnLocal. As of async-2.1, the code for this function has changed. Have a look and figure out why, and whether the changes should be ported here?

Concurrently type

Concurrently m a represents a computation of type m a that can be run concurrently with other Concurrently-type computations. It is essentially a copy of the Concurrently applicative from Control.Concurrent.Async, with IO replaced by Process.

Concurrently is an instance of several type classes (see below), but most notable are the instances of Applicative and Alternative. The instances we define are for Concurrently (ReaderT r Process), which in practice means Concurrently Cluster and Concurrently Job.

Applicative instance

The Applicative instance defines for f :: Concurrently m (a->b) and x :: Concurrently m a computations

f <*> x

to be the 'Concurrently m b' computation which is peformed as follows: first, f and x are computed concurrently using concurrently, yielding values of the types a->b and a. Then these values are combined in the obvious way into a value of type b. The implementation of pure x is simply the computation which returns x.

In this way, we can define, for example,

doConcurrently' :: [ Concurrently m a ] -> Concurrently m [a]
doConcurrenlty' [] = pure []
doConcurrenlty' (x:xs) = pure (:) <*> x <*> doConcurrently' xs

which takes a list of computations and returns a computation that performs these computations concurrently and combines their results into a list.

This definition of doConcurrently' works in the following way. The first line is the base for our recursive definition -- an empty list is a trivial computation. The second line will first compute pure (:) -- this is a rather trivial concurrent computation that returns the function (:). We combine this calcuation with x using <*>, which makes it into a calculation that computes the function (:) x', i.e. the function that prepends x' to a list. Here x' is the result of the calculation x. Then we combine this calculation with doConcurrently' xs, which means that we compute (:) x' concurrently with doConcurrently' xs, and then combine the results. The result of doConcurrently' xs will be a list of values, and we prepend x' to it. Recursing into doConcurrently' xs we see that doConcurrently' indeed works by taking a list of computations, performing the calcuations concurrently, and returning the list of results.

We can also define a more convenient function doConcurrently as follows

doConcurrently :: [ m a ] -> m [a]
doConcurrently xs = runConcurrently $ doConcurrently' $ map Concurrently xs

which works by dressing m a in the list in Concurrently, performing doConcurrently' and then finally removing the Concurrently constructor using runConcurrently. The actual definition of doConcurrently is more general, so that it works on any Traversable instance, not just on lists.

Alternative instance

The Alternative instance of Concurrenlty is similar, except it uses race instead of concurrenlty. Specifically,

a <|> b

performs the computations a and b concurrently and returns the result of whichever finishes earlier, using race. The empty implementation is a computation which never returns. In particular,

empty <|> empty

never returns (so is the same as empty, to some extent). We can do, for example

asum [ x, y, z ]

which will run x, y, and z concurrently and return the first returned result, killing the unfinished computations.

newtype Concurrently m a Source #

Constructors

Concurrently 

Fields

Instances

Instances details
Functor m => Functor (Concurrently m) Source # 
Instance details

Defined in Hyperion.Concurrent

Methods

fmap :: (a -> b) -> Concurrently m a -> Concurrently m b #

(<$) :: a -> Concurrently m b -> Concurrently m a #

(Functor m, Applicative (Concurrently m)) => Applicative (Concurrently (ReaderT r m)) Source # 
Instance details

Defined in Hyperion.Concurrent

Methods

pure :: a -> Concurrently (ReaderT r m) a #

(<*>) :: Concurrently (ReaderT r m) (a -> b) -> Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) b #

liftA2 :: (a -> b -> c) -> Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) b -> Concurrently (ReaderT r m) c #

(*>) :: Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) b -> Concurrently (ReaderT r m) b #

(<*) :: Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) b -> Concurrently (ReaderT r m) a #

Applicative (Concurrently Process) Source # 
Instance details

Defined in Hyperion.Concurrent

Methods

pure :: a -> Concurrently Process a #

(<*>) :: Concurrently Process (a -> b) -> Concurrently Process a -> Concurrently Process b #

liftA2 :: (a -> b -> c) -> Concurrently Process a -> Concurrently Process b -> Concurrently Process c #

(*>) :: Concurrently Process a -> Concurrently Process b -> Concurrently Process b #

(<*) :: Concurrently Process a -> Concurrently Process b -> Concurrently Process a #

(Functor m, Alternative (Concurrently m)) => Alternative (Concurrently (ReaderT r m)) Source # 
Instance details

Defined in Hyperion.Concurrent

Methods

empty :: Concurrently (ReaderT r m) a #

(<|>) :: Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) a #

some :: Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) [a] #

many :: Concurrently (ReaderT r m) a -> Concurrently (ReaderT r m) [a] #

Alternative (Concurrently Process) Source # 
Instance details

Defined in Hyperion.Concurrent

Methods

empty :: Concurrently Process a #

(<|>) :: Concurrently Process a -> Concurrently Process a -> Concurrently Process a #

some :: Concurrently Process a -> Concurrently Process [a] #

many :: Concurrently Process a -> Concurrently Process [a] #

(Applicative (Concurrently m), Semigroup a) => Semigroup (Concurrently m a) Source # 
Instance details

Defined in Hyperion.Concurrent

Methods

(<>) :: Concurrently m a -> Concurrently m a -> Concurrently m a #

sconcat :: NonEmpty (Concurrently m a) -> Concurrently m a #

stimes :: Integral b => b -> Concurrently m a -> Concurrently m a #

(Applicative (Concurrently m), Semigroup a, Monoid a) => Monoid (Concurrently m a) Source # 
Instance details

Defined in Hyperion.Concurrent

Utility functions using Applicative instance of Concurrently

 

doConcurrently :: (Applicative (Concurrently m), Traversable t) => t (m a) -> m (t a) Source #

Run several computations in a traversable structure concurrently and collect the results.

doConcurrently_ :: (Applicative (Concurrently m), Foldable t, Functor t) => t (m a) -> m () Source #

Run several computations in a traversable structure concurrently and forget the results.

mapConcurrently :: (Applicative (Concurrently m), Traversable t) => (a -> m b) -> t a -> m (t b) Source #

Concurrently map a function over a traversable structure.

mapConcurrently_ :: (Applicative (Concurrently m), Foldable t, Functor t) => (a -> m b) -> t a -> m () Source #

Concurrently map a function over a traversable structure and forget the results.

forConcurrently :: (Applicative (Concurrently m), Traversable t) => t a -> (a -> m b) -> m (t b) Source #

Flipped version of mapConcurrently.

forConcurrently_ :: (Applicative (Concurrently m), Foldable t, Functor t) => t a -> (a -> m b) -> m () Source #

Flipped version of mapConcurrently_.

replicateConcurrently :: Applicative (Concurrently m) => Int -> m a -> m [a] Source #

Concurrently run n copies of a computation and collect the results in a list.

replicateConcurrently_ :: Applicative (Concurrently m) => Int -> m a -> m () Source #

Concurrently run n copies of a computation and forget the results.