-
Notifications
You must be signed in to change notification settings - Fork 201
/
Copy pathDisruptor_MPMC.tla
221 lines (187 loc) · 8.93 KB
/
Disruptor_MPMC.tla
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
--------------------------- MODULE Disruptor_MPMC --------------------------
(***************************************************************************)
(* Models a Multi Producer, Multi Consumer Disruptor (MPMC). *)
(* *)
(* The producers publish their claimed sequence number as value into *)
(* the RingBuffer and the model verifies that all consumers read all *)
(* published values. *)
(* *)
(* The model also verifies that no data races occur between the producers *)
(* and consumers and that all consumers eventually read all published *)
(* values (in a Multicast fashion - i.e. all consumers read all events). *)
(***************************************************************************)
EXTENDS Integers, FiniteSets, Sequences
CONSTANTS
MaxPublished, (* Max number of published events. Bounds the model. *)
Writers, (* Writer/producer thread ids. *)
Readers, (* Reader/consumer thread ids. *)
Size, (* Ringbuffer size. *)
NULL
ASSUME Writers /= {}
ASSUME Readers /= {}
ASSUME Size \in Nat \ {0}
ASSUME MaxPublished \in Nat \ {0}
VARIABLES
ringbuffer,
next_sequence, (* Shared counter for claiming a sequence for a Writer. *)
claimed_sequence, (* Claimed sequence by each Writer. *)
published, (* Encodes whether each slot is published. *)
read, (* Read Cursors. One per Reader. *)
pc, (* Program Counter for each Writer/Reader. *)
consumed (* Sequence of all read events by the Readers. *)
(* This is a history variable used for liveliness *)
(* checking. *)
vars == <<
ringbuffer,
next_sequence,
claimed_sequence,
published,
read,
consumed,
pc
>>
(***************************************************************************)
(* Each producer/consumer can be in one of two states: *)
(* 1. Accessing a slot in the Disruptor or *)
(* 2. Advancing to the next slot. *)
(***************************************************************************)
Access == "Access"
Advance == "Advance"
Transition(t, from, to) ==
/\ pc[t] = from
/\ pc' = [ pc EXCEPT ![t] = to ]
Buffer == INSTANCE RingBuffer WITH Values <- Int
Xor(A, B) == A = ~B
Range(f) ==
{ f[x] : x \in DOMAIN(f) }
MinReadSequence ==
CHOOSE min \in Range(read) :
\A r \in Readers : min <= read[r]
MinClaimedSequence ==
CHOOSE min \in Range(claimed_sequence) :
\A w \in Writers : min <= claimed_sequence[w]
(***************************************************************************)
(* Encode whether an index is published by tracking if the slot was *)
(* published in an even or odd index. This works because producers *)
(* cannot overtake consumers. *)
(***************************************************************************)
IsPublished(sequence) ==
LET
index == Buffer!IndexOf(sequence)
\* Round number starts at 0.
round == sequence \div Size
is_even == round % 2 = 0
IN
\* published[index] is true if published in an even round otherwise false
\* as it was published in an odd round number.
published[index] = is_even
Publish(sequence) ==
LET index == Buffer!IndexOf(sequence)
\* Flip whether we're at an even or odd round.
IN published' = [ published EXCEPT ![index] = Xor(TRUE, @) ]
(***************************************************************************)
(* Computes the highest published sequence number that can be read. *)
(* This might seem strange but e.g. a producer P1 can be about to publish *)
(* sequence 5 while producer P2 has published sequence 6 and thus *)
(* consumers can neither read sequence 5 nor 6 (yet). *)
(***************************************************************************)
AvailablePublishedSequence ==
LET guaranteed_published == MinClaimedSequence - 1
candidate_sequences == {guaranteed_published} \union Range(claimed_sequence)
IN CHOOSE max \in candidate_sequences :
IsPublished(max) => ~ \E w \in Writers :
/\ claimed_sequence[w] = max + 1
/\ IsPublished(claimed_sequence[w])
(***************************************************************************)
(* Producer Actions: *)
(***************************************************************************)
BeginWrite(writer) ==
LET
seq == next_sequence
index == Buffer!IndexOf(seq)
min_read == MinReadSequence
IN
\* Are we clear of all consumers? (Potentially a full cycle behind).
/\ min_read >= seq - Size
/\ claimed_sequence' = [ claimed_sequence EXCEPT ![writer] = seq ]
/\ next_sequence' = seq + 1
/\ Transition(writer, Advance, Access)
/\ Buffer!Write(index, writer, seq)
/\ UNCHANGED << consumed, published, read >>
EndWrite(writer) ==
LET
seq == claimed_sequence[writer]
index == Buffer!IndexOf(seq)
IN
/\ Transition(writer, Access, Advance)
/\ Buffer!EndWrite(index, writer)
/\ Publish(seq)
/\ UNCHANGED << claimed_sequence, next_sequence, consumed, read >>
(***************************************************************************)
(* Consumer Actions: *)
(***************************************************************************)
BeginRead(reader) ==
LET
next == read[reader] + 1
index == Buffer!IndexOf(next)
IN
/\ IsPublished(next)
/\ Transition(reader, Advance, Access)
/\ Buffer!BeginRead(index, reader)
\* Track what we read from the ringbuffer.
/\ consumed' = [ consumed EXCEPT ![reader] = Append(@, Buffer!Read(index)) ]
/\ UNCHANGED << claimed_sequence, next_sequence, published, read >>
EndRead(reader) ==
LET
next == read[reader] + 1
index == Buffer!IndexOf(next)
IN
/\ Transition(reader, Access, Advance)
/\ Buffer!EndRead(index, reader)
/\ read' = [ read EXCEPT ![reader] = next ]
/\ UNCHANGED << claimed_sequence, next_sequence, consumed, published >>
(***************************************************************************)
(* Spec: *)
(***************************************************************************)
Init ==
/\ Buffer!Init
/\ next_sequence = 0
/\ claimed_sequence = [ w \in Writers |-> -1 ]
/\ published = [ i \in 0..Buffer!LastIndex |-> FALSE ]
/\ read = [ r \in Readers |-> -1 ]
/\ consumed = [ r \in Readers |-> << >> ]
/\ pc = [ a \in Writers \union Readers |-> Advance ]
Next ==
\/ \E w \in Writers : BeginWrite(w)
\/ \E w \in Writers : EndWrite(w)
\/ \E r \in Readers : BeginRead(r)
\/ \E r \in Readers : EndRead(r)
Fairness ==
/\ \A r \in Readers : WF_vars(BeginRead(r))
/\ \A r \in Readers : WF_vars(EndRead(r))
Spec ==
Init /\ [][Next]_vars /\ Fairness
(***************************************************************************)
(* State constraint - bounds model: *)
(***************************************************************************)
StateConstraint == next_sequence <= MaxPublished
(***************************************************************************)
(* Invariants: *)
(***************************************************************************)
NoDataRaces == Buffer!NoDataRaces
TypeOk ==
/\ Buffer!TypeOk
/\ next_sequence \in Nat
/\ claimed_sequence \in [ Writers -> Int ]
/\ published \in [ 0..Buffer!LastIndex -> { TRUE, FALSE } ]
/\ read \in [ Readers -> Int ]
/\ consumed \in [ Readers -> Seq(Nat) ]
/\ pc \in [ Writers \union Readers -> { Access, Advance } ]
(***************************************************************************)
(* Properties: *)
(***************************************************************************)
(* Eventually always, consumers must have read all published values. *)
Liveliness ==
\A r \in Readers : \A i \in 0..(MaxPublished - 1) :
<>[](i \in 0..AvailablePublishedSequence => Len(consumed[r]) >= i + 1 /\ consumed[r][i + 1] = i)
=============================================================================