-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgame.py
329 lines (278 loc) · 12.8 KB
/
game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#!/usr/bin/python3
import re, logging
from six.moves import cStringIO
from pysmt.shortcuts import *
from pysmt.parsing import parse
from pysmt.smtlib.parser import SmtLibParser
from subprocess import Popen, PIPE
class Game:
def solve(self,n):
#Initialize result and dynamic init variable
result = FALSE()
new_init = self.init
self.print_debug('New Subgame')
self.print_debug('New init',new_init)
self.print_debug('New goal',self.goal)
#Intersect init and goal, check whether there are init state that are not goal
if is_sat(And(new_init,self.goal), solver_name='msat'):
#There are states that are init and goal, update result
result = And(new_init,self.goal)
if is_unsat(And(new_init,Not(self.goal)), solver_name='msat'):
#init subsetminus goal is empty, we can stop computation
self.print_debug(descr='Return',formula=new_init)
#it holds that (new_init -> goal)
return new_init, n+1
else:
#init subsetminus goal is not empty, continue computation with new dynamic init
new_init = And(new_init,Not(self.goal)).simplify()
#Use specified interpolation procedure
interpolant = self.interpolate(new_init,self.goal)
necessary_reach = self.instantiate(self.reach,interpolant).simplify()
necessary_safe = self.instantiate(self.safe,interpolant).simplify()
self.print_debug(descr='Interpolant',formula=interpolant)
#Check whether SAFE controls the necessary subgoal in stages
if is_unsat(necessary_reach, solver_name='msat'):
self.print_debug(descr='no reach')
#REACH has no moves through the subgoal, check partial CPre of SAFE (we can keep quantifiers here if we use z3)
controlled_safe = And(necessary_safe,Not(Exists(self.next.values(),And(self.safe,Not(interpolant).substitute(self.next)))))
if is_unsat(controlled_safe, solver_name='z3'):
#CPre at the interpolant slice is empty, it follows that SAFE has a strategy here and we stop
self.print_debug(descr='Return',formula=result)
return result, n+1
else:
necessary_subgoal = necessary_safe
self.print_debug(descr='safe cant control')
else:
#REACH has moves through the subgoal, simply continue with full subgoal
necessary_subgoal = Or(necessary_reach,necessary_safe)
self.print_debug(descr='Necessary subgoal',formula=necessary_subgoal)
#Solve restricted game starting in the successor states of the subgoal
first_subgame = Game(self.next,
self.post_project(necessary_subgoal).substitute(self.prev),
And(self.safe,interpolant),
And(self.reach,interpolant),
self.goal,
self.depth+1,
self.id + '.0')
bad_states, n_first = first_subgame.solve(n)
#Check which subset of the sufficient subgoal can ensure the post-condition
f = self.enforcable(And(necessary_subgoal,bad_states.substitute(self.next)))
#Eliminate Quantifiers and primed variables
pre_f = self.pre_project(f)
#Check sufficient and enforcable subgoal for emptiness
if is_unsat(pre_f, solver_name='msat'):
self.print_debug(descr='Return',formula=result)
return result, n_first+1
#Check whether safety player can escape the restricted subgame
if is_sat(And(Or(self.safe,self.reach),interpolant,Not(self.goal),Not(interpolant.substitute(self.next)))):
#If he can escape use the union of goal and pre_f to ensure we keep a necessary subgoal
f = self.enforcable(And(Or(self.safe,self.reach),Or(bad_states,self.goal).substitute(self.next)))
pre_f = self.pre_project(f)
second_subgame = Game(self.next,
new_init.simplify(),
And(self.safe,Not(Or(bad_states,self.goal).substitute(self.next))).simplify(),
And(self.reach,Not(Or(bad_states,self.goal).substitute(self.next))).simplify(),
pre_f.simplify(),
self.depth+1,
self.id + '.1')
else:
#If he cannot escape, pre_f fully qualifies as necessary subgoal already
second_subgame = Game(self.next,
new_init.simplify(),
And(self.safe,Not(interpolant),Not(And(necessary_subgoal,bad_states.substitute(self.next)))).simplify(),
And(self.reach,Not(interpolant),Not(And(necessary_subgoal,bad_states.substitute(self.next)))).simplify(),
pre_f.simplify(),
self.depth+1,
self.id + '.1')
#Reach wins from those initial states that reach the computed necessary and sufficient subgoal
subset, n_last = second_subgame.solve(n_first)
self.print_debug(descr='Return',formula=Or(result,subset))
return Or(result,subset), n_last+1
def interpolate(self, a, b):
'''
Computes Craig interpolant between a and b.
Parameters:
a (FNode) -- Left operand of interpolation;
b (FNode) -- Right operand of interpolation.
Returns:
(FNode) -- Craig interpolant between a and b.
'''
b_np = qelim(Exists([Symbol('r',BOOL)],b))
a_np = qelim(Exists([Symbol('r',BOOL)],a))
if is_unsat(And(a_np,b_np)):
self.print_debug('Abstracted r interpolant')
return binary_interpolant(b_np,a_np, solver_name='msat')
else:
self.print_debug('r interpolant')
return binary_interpolant(b,a, solver_name='msat')
def instantiate(self, t, phi):
'''
Instantiates all transitions bridging interpolant phi.
Parameters:
t (FNode) -- Full (both safe and reach) transition relation;
phi (FNode) -- Interpolant phi.
Returns:
(FNode) -- All transitions bridging phi, i.e., And(t,phi,Not(phi')).
'''
return And(t,Not(phi),phi.substitute(self.next))
def enforcable(self, phi):
'''
For some set of transitions phi, returns the transitions that can be enforced by the reachability player
Parameters:
phi (FNode) -- Predicate characterizing the set of transitions.
Returns:
(FNode) -- Subset of phi that can be enforced by REACH in the predecessor states.
'''
return And(phi,Not(Exists(self.next.values(),And(self.safe,Not(phi)))))
def cpre(self, phi):
'''
Applies the controlled predecessor operator starting from phi.
Eliminates quantifiers and simplifies on return.
Parameters:
phi (FNode) -- Predicate characterizing the set of goal states.
Returns:
(FNode) -- Controlled predecessor, i.e., Qelim(Forall safe -> phi Or Exists reach /\ phi).
'''
p = phi.substitute(self.next)
safe_forall = And(self.pre_project(self.safe),Not(Exists(self.next.values(),And(self.safe,Not(p)))))
reach_exists = Exists(self.next.values(),And(self.reach,p))
cpre = Or(safe_forall, reach_exists).simplify()
self.print_debug(descr='Cpre',formula=cpre)
c = qelim(cpre, solver_name='msat_lw')
self.print_debug(descr='Finished qelim')
return c
def pre_project(self, phi):
'''
Projects the formula onto the normal/prev variables by abstracting the primed/next variables.
Applies an existential quantifier to abstract the primed variables and eliminates it after.
Parameters:
phi (FNode) -- Formula to be projected.
Returns:
(FNode) -- Qelim(Exists[X].F).
'''
r = Exists(self.next.values(),phi)
res = qelim(r, solver_name='msat_lw')
return res
def post_project(self, phi):
'''
Projects the formula onto the primed/next variables by abstracting the normal/prev variables.
Applies an existential quantifier to abstract the prev variables and eliminates it after.
Parameters:
phi (FNode) -- Formula to be projected.
Returns:
(Fnode) -- Qelim(Exists[x].F).
'''
r = Exists(self.prev.values(),phi)
res = qelim(r, solver_name='msat_lw')
return res
@classmethod
def read(cls,filename):
'''
Reads in the reachability game from a given file.
Parameters:
filename (.rg file) -- Game description in rg-format.
Returns:
(Game) -- Game in PySMT description (alters formulas to enforce alternation).
'''
with open(filename) as file:
contents = file.read()
tokens = re.split('\n|:',contents)
mode = ''
ints = []
bools = []
reals = []
init = ''
safe = ''
reach = ''
goal = ''
modes = re.compile('int|bool|real|init|safe|reach|goal')
for tok in tokens:
t = tok.rstrip()
if modes.match(t):
mode = t
else:
if mode == 'int':
ints.extend(re.split(',|\[|\]',t.replace(' ','')))
if mode == 'bool':
bools.extend(re.split(',',t.replace(' ','')))
if mode == 'real':
reals.extend(re.split(',|\[|\]',t.replace(' ','')))
elif mode == 'init':
init += t
elif mode == 'safe':
safe += t
elif mode == 'reach':
reach += t
elif mode == 'goal':
goal += t
# Handle boolean variables (implicitly bounded)
bools.append('r')
bool_next = {Symbol(v,BOOL): Symbol(v.upper(),BOOL) for v in bools}
# Parse integer variables (might be bounded)
var_int, bnd_int = parse_bounds(ints,INT)
int_next = {v: Next(v) for v in var_int}
# Parse real variables (might be bounded)
var_real, bnd_real = parse_bounds(reals,REAL)
real_next = {v: Next(v) for v in var_real}
# Merge int and real
nexts = {**int_next,**bool_next,**real_next}
bnds = And(bnd_int + bnd_real)
alt_init = And(parse(init),Iff(Symbol('r',BOOL),Bool(False)),bnds)
alt_safe = And(parse(safe),Iff(Symbol('r',BOOL),Bool(False)),Iff(Symbol('R',BOOL),Bool(True)),bnds,bnds.substitute(nexts))
alt_reach = And(parse(reach),Iff(Symbol('r',BOOL),Bool(True)),Iff(Symbol('R',BOOL),Bool(False)),bnds,bnds.substitute(nexts))
alt_goal = And(parse(goal),Iff(Symbol('r',BOOL),Bool(False)),bnds)
return Game(nexts,alt_init,alt_safe,alt_reach,alt_goal)
def print_debug(self,descr,formula=None):
if logging.DEBUG >= logging.root.level:
if formula==None:
logging.debug(self.depth*'>' + descr + '(' + self.id + ')')
else:
logging.debug(self.depth*'>' + descr + '(' + self.id + ')' + ':' + '%s' % formula.serialize(self.formula_print_depth))
def __init__(self,var_next,init,safe,reach,goal,depth=0,id='0'):
self.next = var_next
self.prev = {n: v for v, n in self.next.items()}
self.init = init
self.safe = safe
self.reach = reach
self.goal = goal
self.parser = SmtLibParser()
self.depth=depth
self.id=id
self.formula_print_depth = 10
def Next(v):
'''
Returns the next/primed version of a variable.
Parameters:
v (Symbol) -- variable to be primed.
Returns:
(Symbol) -- primed variable.
'''
return Symbol(v.symbol_name().upper(),v.symbol_type())
def parse_bounds(tokens,typ):
'''
Parses a list of variable tokens with optional value intervals.
Parameters:
tokens ([string]) -- token list;
typ (INT/REAL) -- type of the variables.
Returns:
var ([Symbol]) -- list of variables;
bnds ([FNode]) -- list of bound constraints.
'''
bnds = []
var = []
lower = ''
for t in tokens:
if str.isalpha(t):
var.append(Symbol(t,typ))
else:
if lower == '':
lower = t
else:
if typ == INT:
bnds.append(GE(var[-1],Int(int(lower))))
bnds.append(LE(var[-1],Int(int(t))))
elif typ == REAL:
bnds.append(GE(var[-1],Real(float(lower))))
bnds.append(LE(var[-1],Real(float(t))))
lower = ''
return var, bnds