A Jepsen client takes invocation operations and applies them to the system
being tested, returning corresponding completion operations. For our
Zookeeper test, we might model the system as a single register, stored in a
znode, storing an integer. The operations against that register might be
read
, write
, and compare-and-set
, which we could model like so:
(defn r [_ _] {:type :invoke, :f :read, :value nil})
(defn w [_ _] {:type :invoke, :f :write, :value (rand-int 5)})
(defn cas [_ _] {:type :invoke, :f :cas, :value [(rand-int 5) (rand-int 5)]})
These functions can be used by jepsen.generator
to construct new invocations
for reads, writes, and compare-and-set ops, respectively. Note that the read
value is nil
--we don't know what value is being read until we actually
perform the read. When the client reads a particular value it'll fill it in for
the completion op.
Now we need to take these operations and apply them to zookeeper. We'll use
the avout library, which provides a zk-atom
backed by
zookeeper. We'll require avout.core
and jepsen.client
, and write a trivial
implementation of Jepsen's Client protocol...
(ns jepsen.zookeeper
(:require [avout.core :as avout]
[clojure.tools.logging :refer :all]
[clojure.java.io :as io]
[clojure.string :as str]
[jepsen [db :as db]
[client :as client]
[control :as c]
[tests :as tests]]
[jepsen.os.debian :as debian]))
(defn client
"A client for a single compare-and-set register"
[]
(reify client/Client
(setup! [_ test node]
(client))
(invoke! [this test op])
(teardown! [_ test])))
(defn zk-test
[version]
(assoc tests/noop-test
:name "zookeeper"
:os debian/os
:db (db version)
:client (client)))
Clients have a three-part lifecycle. We begin with a single dormant client
(client)
, whose setup!
function initializes a fresh client, and performs
any necessary setup work. We create fresh clients because each process needs an
independent client, talking to independent nodes. Once initialized, invoke!
lets our client handle operations. Finally, teardown!
releases any resources
the client may be holding on to.
Our client doesn't hold any state yet, so we simply call (client)
in setup!
to construct a fresh client for each process.
What state do we need for each client? In Avout, one makes a connection like so:
(def conn (avout/connect "some-host"))
And using that connetion, Avout models a linearizable register like this:
; Construct a register at the path "/jepsen" with initial value 0.
(def a (avout/zk-atom conn "/jepsen" 0))
@a ; Read atom
(avout/reset!! a 1) ; Reset the atom's value to 1
(avout/swap!! a inc) ; Use the inc function to increment the atom's value
So we'll need two pieces of state in our client: a connection, and an atom.
(defn client
"A client for a single compare-and-set register"
[conn a]
(reify client/Client
(setup! [_ test node]
(let [conn (avout/connect (name node))
a (avout/zk-atom conn "/jepsen" 0)]
(client conn a)))
(invoke! [this test op])
(teardown! [_ test]
(.close conn))))
(defn zk-test
[version]
(assoc tests/noop-test
:name "zookeeper"
:os debian/os
:db (db version)
:client (client nil nil)))
Remember, the initial client has no connections--like a stem cell, it has the
potential to become an active client but doesn't do any work directly. We
call (client nil nil)
to construct that initial client--its conn and atom
will be filled in when Jepsen calls setup!
.
Running lein test
, we can see the ZK connections opening and closing:
$ lein test
INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.0-1202560, built on 11/16/2011 07:18 GMT
...
INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=n5 sessionTimeout=5000 watcher=zookeeper.internal$make_watcher$reify__7183@719567fd
INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=n1 sessionTimeout=5000 watcher=zookeeper.internal$make_watcher$reify__7183@63a650df
INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=n3 sessionTimeout=5000 watcher=zookeeper.internal$make_watcher$reify__7183@78007b36
INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=n2 sessionTimeout=5000 watcher=zookeeper.internal$make_watcher$reify__7183@1e5a45c3
INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=n4 sessionTimeout=5000 watcher=zookeeper.internal$make_watcher$reify__7183@21701c04
INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server /192.168.122.13:2181
INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server /192.168.122.11:2181
INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server /192.168.122.14:2181
INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server /192.168.122.12:2181
INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server /192.168.122.15:2181
Now we have to actually do something with the client. Let's start with fifteen
seconds of reads, randomly staggered about a second apart. We'll pull in jepsen.generator
to schedule operations, and jepsen.util
's timeout capability:
(ns jepsen.zookeeper
(:require [avout.core :as avout]
[clojure.tools.logging :refer :all]
[clojure.java.io :as io]
[clojure.string :as str]
[jepsen [db :as db]
[client :as client]
[control :as c]
[generator :as gen]
[tests :as tests]
[util :refer [timeout]]]
[jepsen.os.debian :as debian]))
... and write a simple generator: take reads, stagger them by about a second, give those operations to clients only (not the nemesis, which has other duties), and stop after 15 seconds.
(defn zk-test
[version]
(assoc tests/noop-test
:name "zookeeper"
:os debian/os
:db (db version)
:client (client nil nil)
:generator (->> r
(gen/stagger 1)
(gen/clients)
(gen/time-limit 15))))
This fails, because we haven't told the client how to intepret these operations yet.
$ lein test
...
WARN jepsen.core - Process 24 indeterminate
java.lang.AssertionError: Assert failed: (= (:process op) (:process completion))
The client's invoke!
function needs to return a corresponding completion op,
with type :ok
if the operation succeeded, :fail
if it didn't take place, or
:info
if we're not sure. invoke!
can also throw an exception, which is
automatically converted to an :info
.
(invoke! [this test op]
(timeout 5000 (assoc op :type :info, :error :timeout)
(case (:f op)
:read (assoc op :type :ok, :value @a))))
timeout
allows 5000 milliseconds for its body to run, and if that time runs
out, it calls (assoc op :type :info, :error :timeout)
to construct a default
value. We dispatch based on the :f
field of the operation, and when it's a
:read
, we take the invoke op and return a copy of it, with :type
:ok
and
a value obtained by reading the register a
.
$ lein test
...
INFO jepsen.util - 2 :invoke :read nil
INFO jepsen.util - 2 :ok :read 0
INFO jepsen.util - 2 :invoke :read nil
INFO jepsen.util - 2 :ok :read 0
INFO jepsen.util - 4 :invoke :read nil
INFO jepsen.util - 4 :ok :read 0
Much better! We're successfully reading the initial value of 0. Let's add some
writes to the generator by replacing r
with (gen/mix [r w])
:
(defn zk-test
[version]
(assoc tests/noop-test
:name "zookeeper"
:os debian/os
:db (db version)
:client (client nil nil)
:generator (->> (gen/mix [r w])
(gen/stagger 1)
(gen/clients)
(gen/time-limit 15))))
To handle those writes, we'll use avout/reset!!
, and return the op with
:type
:ok
. If reset!!
fails it'll throw, and Jepsen's machinery will
automatically convert it to an :info
crash.
(invoke! [this test op]
(timeout 5000 (assoc op :type :info, :error :timeout)
(case (:f op)
:read (assoc op :type :ok, :value @a)
:write (do (avout/reset!! a (:value op))
(assoc op :type :ok)))))
We'll confirm writes work by watching the test:
$ lein test
...
INFO jepsen.util - 1 :ok :read 1
INFO jepsen.util - 1 :invoke :read nil
INFO jepsen.util - 1 :ok :read 1
INFO jepsen.util - 2 :invoke :write 0
INFO jepsen.util - 2 :ok :write 0
INFO jepsen.util - 4 :invoke :write 2
INFO jepsen.util - 4 :ok :write 2
INFO jepsen.util - 3 :invoke :read nil
INFO jepsen.util - 3 :ok :read 2
Seems reasonable! The final analysis is going to crash because we haven't told it how to check the system yet, but we'll get to that later. First, we'll finish the client by adding compare-and-set to the mix:
(gen/mix [r w cas])
Handling CaS is a little trickier. Avout gives us a swap!!
function, which
takes a function f
and updates the atom's value atomically to (f current-value)
, returning the new value.
(invoke! [this test op]
(timeout 5000 (assoc op :type :info, :error :timeout)
(case (:f op)
:read (assoc op :type :ok, :value @a)
:write (do (avout/reset!! a (:value op))
(assoc op :type :ok))
:cas (let [[value value'] (:value op)]
(avout/swap!! a (fn [current]
(if (= current value)
value'
current)))
; So... did it change or not?
))))
The let
binding here uses destructuring: it breaks apart the [old-value new-value]
pair from the operation's :value
field into value
and value'
.
Our swap function compares the atom's current value to value
, and if it's
equal, sets it to value'
. Otherwise, it stays the same.
The tricky problem is that we have no way to know whether the CaS actually took
effect. We know the new value of the register, because swap!!
returns it--but
we don't know if that's the unchanged value and the predicate comparison
failed, or if it's the new value and the predicate comparison succeeded. We
need another piece of mutable state to determine what happened. We'll use a
local Clojure atom (not an Avout atom, which is backed by Zookeeper!).
(invoke! [this test op]
(timeout 5000 (assoc op :type :info, :error :timeout)
(case (:f op)
:read (assoc op :type :ok, :value @a)
:write (do (avout/reset!! a (:value op))
(assoc op :type :ok))
:cas (let [[value value'] (:value op)
type (atom :fail)]
(avout/swap!! a (fn [current]
(if (= current value)
(do (reset! type :ok)
value')
(do (reset! type :fail)
current))))
(assoc op :type @type)))))
We bind type
to a fresh atom, whose value is initially :fail
. In each
branch of the swap function, we use reset!
to set the atom to either :ok
or
:fail
. When it comes time to return a completion operation, we just read off
type
's current value.
$ lein test
INFO jepsen.util - 2 :invoke :write 2
INFO jepsen.util - 2 :ok :write 2
INFO jepsen.util - 0 :invoke :cas [2 3]
INFO jepsen.util - 0 :ok :cas [2 3]
INFO jepsen.util - 1 :invoke :cas [1 2]
INFO jepsen.util - 1 :fail :cas [1 2]
We expect some CaS ops to fail because their predicate value doesn't match the current value, but a few (~1/5, since there are 5 possible values) should succeed.
With our client performing operations, it's time to analyze results using a checker.