Skip to content

Commit

Permalink
Introduce the concept of open and closed chans.
Browse files Browse the repository at this point in the history
Currently unused by code generation.
  • Loading branch information
valderman committed Apr 21, 2015
1 parent 155493f commit eb644be
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
19 changes: 16 additions & 3 deletions csrc/chan.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typedef struct chan {
void *elems;
int readoff;
int writeoff;
chan_state_t state;
} *chan_t;

chan_t chan_new(int elem_size, int max_elems) {
Expand All @@ -27,12 +28,16 @@ chan_t chan_new(int elem_size, int max_elems) {
c->nbytes = elem_size * max_elems;
c->elems = malloc(c->nbytes);
c->readoff = c->writeoff = 0;
c->state = CHAN_OPEN;
pthread_mutex_init(&c->mutex, NULL);
pthread_cond_init(&c->cond, NULL);
return c;
}

void chan_read(chan_t c, void *buf) {
chan_state_t chan_read(chan_t c, void *buf) {
if(c->state == CHAN_CLOSED && c->cur_elems == 0) {
return CHAN_CLOSED;
}
pthread_mutex_lock(&c->mutex);
while(c->cur_elems == 0) {
pthread_cond_wait(&c->cond, &c->mutex);
Expand All @@ -42,10 +47,13 @@ void chan_read(chan_t c, void *buf) {
--c->cur_elems;
pthread_cond_signal(&c->cond);
pthread_mutex_unlock(&c->mutex);
return CHAN_OPEN;
}

void chan_write(chan_t c, void *buf) {
int nextwrite;
chan_state_t chan_write(chan_t c, void *buf) {
if(c->state == CHAN_CLOSED) {
return CHAN_CLOSED;
}
pthread_mutex_lock(&c->mutex);
while(c->cur_elems == c->max_elems) {
pthread_cond_wait(&c->cond, &c->mutex);
Expand All @@ -55,4 +63,9 @@ void chan_write(chan_t c, void *buf) {
++c->cur_elems;
pthread_cond_signal(&c->cond);
pthread_mutex_unlock(&c->mutex);
return CHAN_OPEN;
}

void chan_close(chan_t c) {
c->state = CHAN_CLOSED;
}
42 changes: 40 additions & 2 deletions include/chan.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,48 @@
#define __CHAN_H__
#include <pthread.h>

#define CHAN_OPEN 1
#define CHAN_CLOSED 0

typedef int chan_state_t;

/* Bounded, blocking channels.
A channel may be in either of two states: open or closed. A closed channel
will not admit any new elements, and reading from a closed, empty channel
will return immediately rather than block, as the reading thread would
otherwise be blocked indefinitely due to no new elements being able to
enter the channel.
*/
typedef struct chan *chan_t;

/* Create a new channel with space for max_elems elements, each elem_size
bytes in size. Elements are not cache aligned.
*/
chan_t chan_new(int elem_size, int max_elems);
void chan_read(chan_t c, void *buf);
void chan_write(chan_t c, void *buf);

/* Put a channel into the closed state. A closed channel can not be reopened.
*/
void chan_close(chan_t c);

/* Read an element from a channel into the given buffer.
In the open state, attempting to read from an empty channel will block.
Upon resumption, chan_read will return CHAN_OPEN, to indicate that the
channel was open when the read was initiated.
In the closed state, reading from an empty channel will *not* block, but
immediately return CHAN_CLOSED, indicating that the channel has been closed,
and that no new data will be written to it. Reading from a non-empty channel
will return CHAN_OPEN until the channel becomes empty.
*/
chan_state_t chan_read(chan_t c, void *buf);

/* Write an element from the given buffer into a channel.
Writing to a full channel in the open state will block until the channel is
no longer full, and chan_write will return CHAN_OPEN upon resumption.
Writing to a channel in the closed state will always be a non-blocking no-op
which returns CHAN_CLOSED.
*/
chan_state_t chan_write(chan_t c, void *buf);

#endif /* __CHAN_H__ */

0 comments on commit eb644be

Please sign in to comment.