diff --git a/csrc/chan.c b/csrc/chan.c index a354b57..bbc1b82 100644 --- a/csrc/chan.c +++ b/csrc/chan.c @@ -15,6 +15,7 @@ typedef struct chan { int readoff; int writeoff; chan_state_t state; + int last_read_ok; } *chan_t; chan_t chan_new(int elem_size, int max_elems) { @@ -29,43 +30,47 @@ chan_t chan_new(int elem_size, int max_elems) { c->elems = malloc(c->nbytes); c->readoff = c->writeoff = 0; c->state = CHAN_OPEN; + c->last_read_ok = 1; pthread_mutex_init(&c->mutex, NULL); pthread_cond_init(&c->cond, NULL); return c; } -chan_state_t chan_read(chan_t c, void *buf) { - if(c->state == CHAN_CLOSED && c->cur_elems == 0) { - return CHAN_CLOSED; - } +void chan_read(chan_t c, void *buf) { pthread_mutex_lock(&c->mutex); - while(c->cur_elems == 0) { + while(c->cur_elems == 0 && c->state == CHAN_OPEN) { pthread_cond_wait(&c->cond, &c->mutex); } - memcpy(buf, c->elems+c->readoff, c->elem_size); - c->readoff = (c->readoff + c->elem_size) % c->nbytes; - --c->cur_elems; - pthread_cond_signal(&c->cond); + if(c->state == CHAN_CLOSED && c->cur_elems == 0) { + c->last_read_ok = 0; + } else { + memcpy(buf, c->elems+c->readoff, c->elem_size); + c->readoff = (c->readoff + c->elem_size) % c->nbytes; + --c->cur_elems; + pthread_cond_signal(&c->cond); + } pthread_mutex_unlock(&c->mutex); - return CHAN_OPEN; } -chan_state_t chan_write(chan_t c, void *buf) { +int chan_write(chan_t c, void *buf) { if(c->state == CHAN_CLOSED) { - return CHAN_CLOSED; + return 0; + } else { + pthread_mutex_lock(&c->mutex); + while(c->cur_elems == c->max_elems) { + pthread_cond_wait(&c->cond, &c->mutex); + } + memcpy(c->elems+c->writeoff, buf, c->elem_size); + c->writeoff = (c->writeoff + c->elem_size) % c->nbytes; + ++c->cur_elems; + pthread_cond_signal(&c->cond); + pthread_mutex_unlock(&c->mutex); + return 1; } - pthread_mutex_lock(&c->mutex); - while(c->cur_elems == c->max_elems) { - pthread_cond_wait(&c->cond, &c->mutex); - } - memcpy(c->elems+c->writeoff, buf, c->elem_size); - c->writeoff = (c->writeoff + c->elem_size) % c->nbytes; - ++c->cur_elems; - pthread_cond_signal(&c->cond); - pthread_mutex_unlock(&c->mutex); - return CHAN_OPEN; } void chan_close(chan_t c) { c->state = CHAN_CLOSED; + pthread_cond_signal(&c->cond); } +int chan_last_read_ok(chan_t c) {return c->last_read_ok;} diff --git a/examples/Concurrent.hs b/examples/Concurrent.hs index acca06a..9c01a53 100644 --- a/examples/Concurrent.hs +++ b/examples/Concurrent.hs @@ -1,9 +1,11 @@ module Main where +import Prelude hiding (break) import Language.Embedded.Imperative import Language.Embedded.Concurrent import Language.Embedded.Expr import Language.Embedded.Backend.C () import Control.Applicative +import Control.Monad -- For compilation import Language.C.Monad @@ -29,18 +31,28 @@ deadlock = do -- happen in separate threads. mapFile :: (Expr Float -> Expr Float) -> FilePath -> Program L () mapFile f i = do - c1 <- newChan 5 - c2 <- newChan 5 - t1 <- fork $ while (pure true) $ do - readChan c1 >>= writeChan c2 . f - t2 <- fork $ while (pure true) $ do - readChan c2 >>= printf "%f\n" + c1 <- newCloseableChan 5 + c2 <- newCloseableChan 5 fi <- fopen i ReadMode + + t1 <- fork $ do + while (return true) $ do + x <- readChan c1 + readOK <- lastChanReadOK c1 + iff readOK + (void $ writeChan c2 (f x)) + (closeChan c2 >> break) + + t2 <- fork $ do + while (lastChanReadOK c2) $ do + readChan c2 >>= printf "%f\n" + t3 <- fork $ do while (Not <$> feof fi) $ do - fget fi >>= writeChan c1 + fget fi >>= void . writeChan c1 fclose fi - waitThread t3 + closeChan c1 + waitThread t2 -- | Waiting for thread completion. waiting :: Program L () diff --git a/include/chan.h b/include/chan.h index ca08e4b..63a981c 100644 --- a/include/chan.h +++ b/include/chan.h @@ -27,23 +27,27 @@ void chan_close(chan_t c); /* Read an element from a channel into the given buffer. In the open state, attempting to read from an empty channel will block. - Upon resumption, chan_read will return CHAN_OPEN, to indicate that the - channel was open when the read was initiated. - In the closed state, reading from an empty channel will *not* block, but - immediately return CHAN_CLOSED, indicating that the channel has been closed, - and that no new data will be written to it. Reading from a non-empty channel - will return CHAN_OPEN until the channel becomes empty. + In the closed state, reading from an empty channel will *not* block. + Consumers should use chan_last_read_ok to check whether a read succeeded + or not before using the contents of the buffer. */ -chan_state_t chan_read(chan_t c, void *buf); +void chan_read(chan_t c, void *buf); /* Write an element from the given buffer into a channel. Writing to a full channel in the open state will block until the channel is - no longer full, and chan_write will return CHAN_OPEN upon resumption. + no longer full, and chan_write will return nonzero upon resumption. Writing to a channel in the closed state will always be a non-blocking no-op - which returns CHAN_CLOSED. + which returns 0. If a write is blocking on a full channel when chan_close is + called, the write will happen as soon as the channel is not full anymore. + Any subsequent writes will still be discarded. */ -chan_state_t chan_write(chan_t c, void *buf); +int chan_write(chan_t c, void *buf); + +/* Returns 0 if this channel was closed and empty at the last attempted + read, otherwise returns nonzero. +*/ +int chan_last_read_ok(chan_t c); #endif /* __CHAN_H__ */ diff --git a/src/Language/Embedded/Concurrent.hs b/src/Language/Embedded/Concurrent.hs index 3b4c50b..bcf84bc 100644 --- a/src/Language/Embedded/Concurrent.hs +++ b/src/Language/Embedded/Concurrent.hs @@ -5,8 +5,10 @@ module Language.Embedded.Concurrent ( CID, Chan (..), ThreadCMD (..), ChanCMD (..), + Closeable, Uncloseable, fork, killThread, waitThread, - newChan, readChan, writeChan + newChan, newCloseableChan, readChan, writeChan, + closeChan, lastChanReadOK ) where #if __GLASGOW_HASKELL__ < 710 import Control.Applicative @@ -63,9 +65,12 @@ instance Show ThreadId where show (TIDEval tid _) = show tid show (TIDComp tid) = show tid +data Closeable +data Uncloseable + -- | A bounded channel. -data Chan a - = ChanEval (Bounded.BoundedChan a) +data Chan t a + = ChanEval (Bounded.BoundedChan a) (IORef Bool) (IORef Bool) | ChanComp CID data ThreadCMD (prog :: * -> *) a where @@ -74,9 +79,13 @@ data ThreadCMD (prog :: * -> *) a where Wait :: ThreadId -> ThreadCMD prog () data ChanCMD exp (prog :: * -> *) a where - NewChan :: VarPred exp a => exp Int -> ChanCMD exp prog (Chan a) - ReadChan :: VarPred exp a => Chan a -> ChanCMD exp prog (exp a) - WriteChan :: VarPred exp a => Chan a -> exp a -> ChanCMD exp prog () + NewChan :: VarPred exp a => exp Int -> ChanCMD exp prog (Chan t a) + ReadChan :: VarPred exp a => Chan t a -> ChanCMD exp prog (exp a) + WriteChan :: (VarPred exp a, VarPred exp Bool) + => Chan t a -> exp a -> ChanCMD exp prog (exp Bool) + CloseChan :: Chan Closeable a -> ChanCMD exp prog () + ReadOK :: VarPred exp Bool + => Chan Closeable a -> ChanCMD exp prog (exp Bool) instance MapInstr ThreadCMD where imap f (Fork p) = Fork (f p) @@ -87,6 +96,8 @@ instance MapInstr (ChanCMD exp) where imap _ (NewChan sz) = NewChan sz imap _ (ReadChan c) = ReadChan c imap _ (WriteChan c x) = WriteChan c x + imap _ (CloseChan c) = CloseChan c + imap _ (ReadOK c) = ReadOK c type instance IExp (ThreadCMD :+: i) = IExp i @@ -110,10 +121,29 @@ runChanCMD :: forall exp a. EvalExp exp => ChanCMD exp IO a -> IO a runChanCMD (NewChan sz) = ChanEval <$> Bounded.newBoundedChan (evalExp sz) -runChanCMD (ReadChan (ChanEval c)) = - litExp <$> Bounded.readChan c -runChanCMD (WriteChan (ChanEval c) x) = - Bounded.writeChan c (evalExp x) + <*> newIORef False + <*> newIORef True +runChanCMD (ReadChan (ChanEval c closedref lastread)) = do + closed <- readIORef closedref + mval <- Bounded.tryReadChan c + case mval of + Just x -> do + return $ litExp x + Nothing + | closed -> do + writeIORef lastread False + return undefined + | otherwise -> do + litExp <$> Bounded.readChan c +runChanCMD (WriteChan (ChanEval c closedref _) x) = do + closed <- readIORef closedref + if closed + then return (litExp False) + else Bounded.writeChan c (evalExp x) >> return (litExp True) +runChanCMD (CloseChan (ChanEval _ closedref _)) = do + writeIORef closedref True +runChanCMD (ReadOK (ChanEval _ _ lastread)) = do + litExp <$> readIORef lastread instance Interp ThreadCMD IO where interp = runThreadCMD @@ -141,27 +171,55 @@ waitThread = singleton . inj . Wait -- into the queue instead of sharing them across threads. newChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr) => IExp instr Int - -> ProgramT instr m (Chan a) + -> ProgramT instr m (Chan Uncloseable a) newChan = singleE . NewChan +newCloseableChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr) + => IExp instr Int + -> ProgramT instr m (Chan Closeable a) +newCloseableChan = singleE . NewChan + -- | Read an element from a channel. If channel is empty, blocks until there -- is an item available. +-- If 'closeChan' has been called on the channel *and* if the channel is +-- empty, @readChan@ returns an undefined value immediately. readChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr) - => Chan a + => Chan t a -> ProgramT instr m (IExp instr a) readChan = singleE . ReadChan -- | Write a data element to a channel. -writeChan :: (VarPred (IExp instr) a, ChanCMD (IExp instr) :<: instr) - => Chan a +-- If 'closeChan' has been called on the channel, all calls to @writeChan@ +-- become non-blocking no-ops and return @False@, otherwise returns @True@. +writeChan :: (VarPred (IExp instr) a, + VarPred (IExp instr) Bool, + ChanCMD (IExp instr) :<: instr) + => Chan t a -> IExp instr a - -> ProgramT instr m () + -> ProgramT instr m (IExp instr Bool) writeChan c = singleE . WriteChan c +-- | When 'readChan' was last called on the given channel, did the read +-- succeed? +-- Always returns @True@ unless 'closeChan' has been called on the channel. +-- Always returns @True@ if the channel has never been read. +lastChanReadOK :: (VarPred (IExp instr) Bool, ChanCMD (IExp instr) :<: instr) + => Chan Closeable a + -> ProgramT instr m (IExp instr Bool) +lastChanReadOK = singleE . ReadOK + +-- | Close a channel. All subsequent write operations will be no-ops. +-- After the channel is drained, all subsequent read operations will be +-- no-ops as well. +closeChan :: (ChanCMD (IExp instr) :<: instr) + => Chan Closeable a + -> ProgramT instr m () +closeChan = singleE . CloseChan + instance ToIdent ThreadId where toIdent (TIDComp tid) = C.Id $ "t" ++ show tid -instance ToIdent (Chan a) where +instance ToIdent (Chan t a) where toIdent (ChanComp c) = C.Id $ "chan" ++ show c -- | Compile `ThreadCMD`. @@ -198,13 +256,21 @@ compChanCMD cmd@(NewChan sz) = do compChanCMD (WriteChan c x) = do x' <- compExp x (v,name) <- freshVar + (ok,okname) <- freshVar let _ = v `asTypeOf` x addStm [cstm| $id:name = $x'; |] - addStm [cstm| chan_write($id:c, &$id:name); |] + addStm [cstm| $id:okname = chan_write($id:c, &$id:name); |] + return ok compChanCMD (ReadChan c) = do (var,name) <- freshVar addStm [cstm| chan_read($id:c, &$id:name); |] return var +compChanCMD (CloseChan c) = do + addStm [cstm| chan_close($id:c); |] +compChanCMD (ReadOK c) = do + (var,name) <- freshVar + addStm [cstm| $id:name = chan_last_read_ok($id:c); |] + return var instance Interp ThreadCMD CGen where interp = compThreadCMD