Skip to content
forked from tonsky/net.async

Network commucations with clojure.core.async interface

License

Notifications You must be signed in to change notification settings

dpetran/net.async

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Network communications with clojure.core.async interface

This library is aimed at providing reliable, high-level, bidirectional message-oriented communication over TCP/IP.

  1. Send/receive messages, not data streams. It's what you need most of the time, anyway.
  2. Automatic reconnections when connection is lost.
  3. Stuck connection detection via heartbeats.
  4. Stable interface: chans stay valid to use no matter what socket/network conditions are. net.async will handle underlying resource/socket management for you.
  5. No exceptions: disconnects are normal state of network connection and are exposed explicitly at top-level interface.

Here’s how it works:

Lein dependency

[net.async/async "0.1.1"]

Build Status

Types

<event-loop>    is { : running? <atom [true/false]> }
<client-socket> is { :read-chan  <chan>
                     :write-chan <chan> }
<accept-socket> is { :accept-chan <chan> }
<payload>       is byte[]

Preparation

(require '[clojure.core.async :refer [<!! >!! close!]])
(use 'net.async.tcp)

(event-loop) → <event-loop>

Client

(connect <event-loop> {:host "127.0.0.1" :port 9988}) → <client-socket>
(<!! read-chan) → :connected | :disconnected | :closed | <payload>
(>!! write-chan <payload>)
(close! write-chan)

Client will automatically reconnect if it loses its connection to the server. During that period, you'll get :disconnected messages from read-chan. Once connection is established, you'll get :connected message and then normal communication will resume.

As a side-effect of this, you can start client before you start server. Once server becomes live, client will establish the connection and start sending/receiving messages, all of this without any effort from your side.

Server

(accept <event-loop> {:port 9988}) → <accept-socket>
(<!! accept-chan)                  → <client-socket>
(close! accept-chan)

Shutting down

(shutdown! <event-loop>)

Configuring channels and heartbeats

(connect <event-loop> <addr>
  :read-chan        (chan (dropping-buffer 10))
  :write-chan       (chan (sliding-buffer 7))
  :reconnect-period  5000
  :heartbeat-period  8000
  :heartbeat-timeout 24000)

(accept <event-loop> <addr>
  :accept-chan      (chan 2)
  :read-chan-fn    #(chan (sliding-buffer 7))
  :write-chan-fn   #(chan (dropping-buffer 10))
  :heartbeat-period  8000
  :heartbeat-timeout 24000)

Note thatn you'd better configure heartbeats the same way at both connecting and accepting sides.

Sample echo server/client

(defn echo-server []
  (let [acceptor (accept (event-loop) {:port 8899})]
    (loop []
      (when-let [server (<!! (:accept-chan acceptor))]
        (go
          (loop []
            (when-let [msg (<! (:read-chan server))]
              (when-not (keyword? msg)
                (>! (:write-chan server) (.getBytes (str "ECHO/" (String. msg)))))
              (recur))))
        (recur)))))

(defn echo-client []
  (let [client (connect (event-loop) {:host "127.0.0.1" :port 8899})]
    (loop []
      (go (>! (:write-chan client) (.getBytes (str (rand-int 100000)))))
      (loop []
        (let [read (<!! (:read-chan client))]
          (when (and (keyword? read)
                     (not= :connected read))
            (recur))))
      (Thread/sleep (rand-int 3000))
      (recur))))

About

Network commucations with clojure.core.async interface

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Clojure 100.0%