Skip to content

Commit

Permalink
Closeable channel API, fix funny deadlocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
valderman committed Apr 23, 2015
1 parent eb644be commit 3cfec34
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 57 deletions.
49 changes: 27 additions & 22 deletions csrc/chan.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ typedef struct chan {
int readoff;
int writeoff;
chan_state_t state;
int last_read_ok;
} *chan_t;

chan_t chan_new(int elem_size, int max_elems) {
Expand All @@ -29,43 +30,47 @@ chan_t chan_new(int elem_size, int max_elems) {
c->elems = malloc(c->nbytes);
c->readoff = c->writeoff = 0;
c->state = CHAN_OPEN;
c->last_read_ok = 1;
pthread_mutex_init(&c->mutex, NULL);
pthread_cond_init(&c->cond, NULL);
return c;
}

chan_state_t chan_read(chan_t c, void *buf) {
if(c->state == CHAN_CLOSED && c->cur_elems == 0) {
return CHAN_CLOSED;
}
void chan_read(chan_t c, void *buf) {
pthread_mutex_lock(&c->mutex);
while(c->cur_elems == 0) {
while(c->cur_elems == 0 && c->state == CHAN_OPEN) {
pthread_cond_wait(&c->cond, &c->mutex);
}
memcpy(buf, c->elems+c->readoff, c->elem_size);
c->readoff = (c->readoff + c->elem_size) % c->nbytes;
--c->cur_elems;
pthread_cond_signal(&c->cond);
if(c->state == CHAN_CLOSED && c->cur_elems == 0) {
c->last_read_ok = 0;
} else {
memcpy(buf, c->elems+c->readoff, c->elem_size);
c->readoff = (c->readoff + c->elem_size) % c->nbytes;
--c->cur_elems;
pthread_cond_signal(&c->cond);
}
pthread_mutex_unlock(&c->mutex);
return CHAN_OPEN;
}

chan_state_t chan_write(chan_t c, void *buf) {
int chan_write(chan_t c, void *buf) {
if(c->state == CHAN_CLOSED) {
return CHAN_CLOSED;
return 0;
} else {
pthread_mutex_lock(&c->mutex);
while(c->cur_elems == c->max_elems) {
pthread_cond_wait(&c->cond, &c->mutex);
}
memcpy(c->elems+c->writeoff, buf, c->elem_size);
c->writeoff = (c->writeoff + c->elem_size) % c->nbytes;
++c->cur_elems;
pthread_cond_signal(&c->cond);
pthread_mutex_unlock(&c->mutex);
return 1;
}
pthread_mutex_lock(&c->mutex);
while(c->cur_elems == c->max_elems) {
pthread_cond_wait(&c->cond, &c->mutex);
}
memcpy(c->elems+c->writeoff, buf, c->elem_size);
c->writeoff = (c->writeoff + c->elem_size) % c->nbytes;
++c->cur_elems;
pthread_cond_signal(&c->cond);
pthread_mutex_unlock(&c->mutex);
return CHAN_OPEN;
}

void chan_close(chan_t c) {
c->state = CHAN_CLOSED;
pthread_cond_signal(&c->cond);
}
int chan_last_read_ok(chan_t c) {return c->last_read_ok;}
28 changes: 20 additions & 8 deletions examples/Concurrent.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
module Main where
import Prelude hiding (break)
import Language.Embedded.Imperative
import Language.Embedded.Concurrent
import Language.Embedded.Expr
import Language.Embedded.Backend.C ()
import Control.Applicative
import Control.Monad

-- For compilation
import Language.C.Monad
Expand All @@ -29,18 +31,28 @@ deadlock = do
-- happen in separate threads.
mapFile :: (Expr Float -> Expr Float) -> FilePath -> Program L ()
mapFile f i = do
c1 <- newChan 5
c2 <- newChan 5
t1 <- fork $ while (pure true) $ do
readChan c1 >>= writeChan c2 . f
t2 <- fork $ while (pure true) $ do
readChan c2 >>= printf "%f\n"
c1 <- newCloseableChan 5
c2 <- newCloseableChan 5
fi <- fopen i ReadMode

t1 <- fork $ do
while (return true) $ do
x <- readChan c1
readOK <- lastChanReadOK c1
iff readOK
(void $ writeChan c2 (f x))
(closeChan c2 >> break)

t2 <- fork $ do
while (lastChanReadOK c2) $ do
readChan c2 >>= printf "%f\n"

t3 <- fork $ do
while (Not <$> feof fi) $ do
fget fi >>= writeChan c1
fget fi >>= void . writeChan c1
fclose fi
waitThread t3
closeChan c1
waitThread t2

-- | Waiting for thread completion.
waiting :: Program L ()
Expand Down
24 changes: 14 additions & 10 deletions include/chan.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@ void chan_close(chan_t c);

/* Read an element from a channel into the given buffer.
In the open state, attempting to read from an empty channel will block.
Upon resumption, chan_read will return CHAN_OPEN, to indicate that the
channel was open when the read was initiated.
In the closed state, reading from an empty channel will *not* block, but
immediately return CHAN_CLOSED, indicating that the channel has been closed,
and that no new data will be written to it. Reading from a non-empty channel
will return CHAN_OPEN until the channel becomes empty.
In the closed state, reading from an empty channel will *not* block.
Consumers should use chan_last_read_ok to check whether a read succeeded
or not before using the contents of the buffer.
*/
chan_state_t chan_read(chan_t c, void *buf);
void chan_read(chan_t c, void *buf);

/* Write an element from the given buffer into a channel.
Writing to a full channel in the open state will block until the channel is
no longer full, and chan_write will return CHAN_OPEN upon resumption.
no longer full, and chan_write will return nonzero upon resumption.
Writing to a channel in the closed state will always be a non-blocking no-op
which returns CHAN_CLOSED.
which returns 0. If a write is blocking on a full channel when chan_close is
called, the write will happen as soon as the channel is not full anymore.
Any subsequent writes will still be discarded.
*/
chan_state_t chan_write(chan_t c, void *buf);
int chan_write(chan_t c, void *buf);

/* Returns 0 if this channel was closed and empty at the last attempted
read, otherwise returns nonzero.
*/
int chan_last_read_ok(chan_t c);

#endif /* __CHAN_H__ */
100 changes: 83 additions & 17 deletions src/Language/Embedded/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ module Language.Embedded.Concurrent (
CID, Chan (..),
ThreadCMD (..),
ChanCMD (..),
Closeable, Uncloseable,
fork, killThread, waitThread,
newChan, readChan, writeChan
newChan, newCloseableChan, readChan, writeChan,
closeChan, lastChanReadOK
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
Expand Down Expand Up @@ -63,9 +65,12 @@ instance Show ThreadId where
show (TIDEval tid _) = show tid
show (TIDComp tid) = show tid

data Closeable
data Uncloseable

-- | A bounded channel.
data Chan a
= ChanEval (Bounded.BoundedChan a)
data Chan t a
= ChanEval (Bounded.BoundedChan a) (IORef Bool) (IORef Bool)
| ChanComp CID

data ThreadCMD (prog :: * -> *) a where
Expand All @@ -74,9 +79,13 @@ data ThreadCMD (prog :: * -> *) a where
Wait :: ThreadId -> ThreadCMD prog ()

data ChanCMD exp (prog :: * -> *) a where
NewChan :: VarPred exp a => exp Int -> ChanCMD exp prog (Chan a)
ReadChan :: VarPred exp a => Chan a -> ChanCMD exp prog (exp a)
WriteChan :: VarPred exp a => Chan a -> exp a -> ChanCMD exp prog ()
NewChan :: VarPred exp a => exp Int -> ChanCMD exp prog (Chan t a)
ReadChan :: VarPred exp a => Chan t a -> ChanCMD exp prog (exp a)
WriteChan :: (VarPred exp a, VarPred exp Bool)
=> Chan t a -> exp a -> ChanCMD exp prog (exp Bool)
CloseChan :: Chan Closeable a -> ChanCMD exp prog ()
ReadOK :: VarPred exp Bool
=> Chan Closeable a -> ChanCMD exp prog (exp Bool)

instance MapInstr ThreadCMD where
imap f (Fork p) = Fork (f p)
Expand All @@ -87,6 +96,8 @@ instance MapInstr (ChanCMD exp) where
imap _ (NewChan sz) = NewChan sz
imap _ (ReadChan c) = ReadChan c
imap _ (WriteChan c x) = WriteChan c x
imap _ (CloseChan c) = CloseChan c
imap _ (ReadOK c) = ReadOK c

type instance IExp (ThreadCMD :+: i) = IExp i

Expand All @@ -110,10 +121,29 @@ runChanCMD :: forall exp a. EvalExp exp
=> ChanCMD exp IO a -> IO a
runChanCMD (NewChan sz) =
ChanEval <$> Bounded.newBoundedChan (evalExp sz)
runChanCMD (ReadChan (ChanEval c)) =
litExp <$> Bounded.readChan c
runChanCMD (WriteChan (ChanEval c) x) =
Bounded.writeChan c (evalExp x)
<*> newIORef False
<*> newIORef True
runChanCMD (ReadChan (ChanEval c closedref lastread)) = do
closed <- readIORef closedref
mval <- Bounded.tryReadChan c
case mval of
Just x -> do
return $ litExp x
Nothing
| closed -> do
writeIORef lastread False
return undefined
| otherwise -> do
litExp <$> Bounded.readChan c
runChanCMD (WriteChan (ChanEval c closedref _) x) = do
closed <- readIORef closedref
if closed
then return (litExp False)
else Bounded.writeChan c (evalExp x) >> return (litExp True)
runChanCMD (CloseChan (ChanEval _ closedref _)) = do
writeIORef closedref True
runChanCMD (ReadOK (ChanEval _ _ lastread)) = do
litExp <$> readIORef lastread

instance Interp ThreadCMD IO where
interp = runThreadCMD
Expand Down Expand Up @@ -141,27 +171,55 @@ waitThread = singleton . inj . Wait
-- into the queue instead of sharing them across threads.
newChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr)
=> IExp instr Int
-> ProgramT instr m (Chan a)
-> ProgramT instr m (Chan Uncloseable a)
newChan = singleE . NewChan

newCloseableChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr)
=> IExp instr Int
-> ProgramT instr m (Chan Closeable a)
newCloseableChan = singleE . NewChan

-- | Read an element from a channel. If channel is empty, blocks until there
-- is an item available.
-- If 'closeChan' has been called on the channel *and* if the channel is
-- empty, @readChan@ returns an undefined value immediately.
readChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr)
=> Chan a
=> Chan t a
-> ProgramT instr m (IExp instr a)
readChan = singleE . ReadChan

-- | Write a data element to a channel.
writeChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr)
=> Chan a
-- If 'closeChan' has been called on the channel, all calls to @writeChan@
-- become non-blocking no-ops and return @False@, otherwise returns @True@.
writeChan :: (VarPred (IExp instr) a,
VarPred (IExp instr) Bool,
ChanCMD (IExp instr) :<: instr)
=> Chan t a
-> IExp instr a
-> ProgramT instr m ()
-> ProgramT instr m (IExp instr Bool)
writeChan c = singleE . WriteChan c

-- | When 'readChan' was last called on the given channel, did the read
-- succeed?
-- Always returns @True@ unless 'closeChan' has been called on the channel.
-- Always returns @True@ if the channel has never been read.
lastChanReadOK :: (VarPred (IExp instr) Bool, ChanCMD (IExp instr) :<: instr)
=> Chan Closeable a
-> ProgramT instr m (IExp instr Bool)
lastChanReadOK = singleE . ReadOK

-- | Close a channel. All subsequent write operations will be no-ops.
-- After the channel is drained, all subsequent read operations will be
-- no-ops as well.
closeChan :: (ChanCMD (IExp instr) :<: instr)
=> Chan Closeable a
-> ProgramT instr m ()
closeChan = singleE . CloseChan

instance ToIdent ThreadId where
toIdent (TIDComp tid) = C.Id $ "t" ++ show tid

instance ToIdent (Chan a) where
instance ToIdent (Chan t a) where
toIdent (ChanComp c) = C.Id $ "chan" ++ show c

-- | Compile `ThreadCMD`.
Expand Down Expand Up @@ -198,13 +256,21 @@ compChanCMD cmd@(NewChan sz) = do
compChanCMD (WriteChan c x) = do
x' <- compExp x
(v,name) <- freshVar
(ok,okname) <- freshVar
let _ = v `asTypeOf` x
addStm [cstm| $id:name = $x'; |]
addStm [cstm| chan_write($id:c, &$id:name); |]
addStm [cstm| $id:okname = chan_write($id:c, &$id:name); |]
return ok
compChanCMD (ReadChan c) = do
(var,name) <- freshVar
addStm [cstm| chan_read($id:c, &$id:name); |]
return var
compChanCMD (CloseChan c) = do
addStm [cstm| chan_close($id:c); |]
compChanCMD (ReadOK c) = do
(var,name) <- freshVar
addStm [cstm| $id:name = chan_last_read_ok($id:c); |]
return var

instance Interp ThreadCMD CGen where
interp = compThreadCMD
Expand Down

0 comments on commit 3cfec34

Please sign in to comment.