Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#52 Expand channel documentation #53

Merged
merged 2 commits into from
Dec 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This Channel implementation is a shared memory concurrent queue using
# This Channel implementation is a shared memory, fixed-size, concurrent queue using
# a circular buffer for data. Based on channels implementation[1]_ by
# Mamy André-Ratsimbazafy (@mratsim), which is a C to Nim translation of the
# original[2]_ by Andreas Prell (@aprell)
Expand All @@ -20,14 +20,18 @@
##
## This module implements multi-producer multi-consumer channels - a concurrency
## primitive with a high-level interface intended for communication and
## synchronization between threads. It allows sending and receiving typed data,
## enabling safe and efficient concurrency.
## synchronization between threads. It allows sending and receiving typed, isolated
## data, enabling safe and efficient concurrency.
##
## The `Chan` type represents a generic channel object that internally manages
## The `Chan` type represents a generic fixed-size channel object that internally manages
## the underlying resources and synchronization. It has to be initialized using
## the `newChan` proc. Sending and receiving operations are provided by the
## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv`
## procs.
## procs. Send operations add messages to the channel, receiving operations
## remove them.
##
## See also:
## * [std/isolation](https://nim-lang.org/docs/isolation.html)
##
## The following is a simple example of two different ways to use channels:
## blocking and non-blocking.
Expand Down Expand Up @@ -266,22 +270,37 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) =
dest.d = src.d

proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
## Sends item to the channel (non-blocking).
## Tries to send a message to a channel.
##
## The memory `src` is moved, not copied. Doesn't block.
##
## Returns `false` if the message was not sent because the number of pending
## items in the channel exceeded its capacity.
var data = src.extract
result = channelSend(c.d, data.unsafeAddr, sizeof(T), false)
if result:
wasMoved(data)

template trySend*[T](c: Chan[T], src: T): bool =
## Helper template for `trySend`.
## Helper template for `trySend <#trySend,Chan[T],sinkIsolated[T]>`_.
trySend(c, isolate(src))

proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} =
## Receives item from the channel (non-blocking).
## Tries to receive a message from the channel `c` and fill `dst` with its value.
## This returns immediately even if no message is found. Doesn't block.
##
## This can fail for all sort of reasons, including a lack of messages in the channel
## to receive or contention.
##
## If it fails it returns `false`. Otherwise it returns `true`.
channelReceive(c.d, dst.addr, sizeof(T), false)

proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} =
## Sends item to the channel (blocking).
## Sends item to the channel.
## This blocks the sending thread until the item was successfully sent.
##
## If the channel is already full with items this will block the thread until
## items from the channel are removed.
var data = src.extract
when defined(gcOrc) and defined(nimSafeOrcSend):
GC_runOrc()
Expand All @@ -293,24 +312,35 @@ template send*[T](c: Chan[T]; src: T) =
send(c, isolate(src))

proc recv*[T](c: Chan[T], dst: var T) {.inline.} =
## Receives item from the channel (blocking).
## Receives an item from the channel.
## Fills `dist` with the item.
## This blocks the receiving thread until an item was successfully received.
##
## If the channel does not contain any items this will block the thread until
## items get sent to the channel.
discard channelReceive(c.d, dst.addr, sizeof(T), true)

proc recv*[T](c: Chan[T]): T {.inline.} =
## Receives item from the channel (blocking).
## Receives an item from the channel.
## A version of `recv`_ that returns the item.
discard channelReceive(c.d, result.addr, sizeof(result), true)

proc recvIso*[T](c: Chan[T]): Isolated[T] {.inline.} =
## Receives an item from the channel.
## A version of `recv`_ that returns the item and isolates it.
var dst: T
discard channelReceive(c.d, dst.addr, sizeof(T), true)
result = isolate(dst)

func peek*[T](c: Chan[T]): int {.inline.} =
## Returns an estimation of current number of items held by the channel.
## Returns an estimation of the current number of items held by the channel.
numItems(c.d)

proc newChan*[T](elements: Positive = 30): Chan[T] =
## An initialization procedure, necessary for acquiring resources and
## initializing internal state of the channel.
## initializing internal state of the channel.
##
## `elements` is the capacity of the channel and thus how many items it can hold
## before it refuses to accept any further items.
assert elements >= 1, "Elements must be positive!"
result = Chan[T](d: allocChannel(sizeof(T), elements))
Loading