-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel_check.py
1405 lines (1267 loc) · 56.1 KB
/
model_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Created June 2020
@author: Colin
"""
import pickle
import subprocess
import numpy as np
import mdptoolbox.mdp as mdp
from copy import copy, deepcopy
from itertools import product
from igraph import *
from random_weighted_automaton import *
# TODO: refactor checkObligation, checkConditional, and generateFragments as Automaton class functions
# ^ the Obligation class could have validatedBy(Automaton)
# ^ also add a get_optimal_actions function to Automaton that returns the optimal actions
# TODO: asserts on the construction logic so malformed automata aren't constructed
# TODO: constructor that takes a list of "action classes" and associated probabilities
# ^ maybe even a gridworld constructor
# TODO: separate function of 'label', 'name' and atomic propositions
# ^ allow multiple propositions on a given vertex, and be able to check them.
# TODO: refactor lots of the functions just to simplify them
class Automaton(object):
def __init__(self, graph, initial_state, atomic_propositions=None):
"""
Create a new Automaton object from an igraph graph object and an initial state.
The graph should have vertex properties "label" and "name".
The graph should have the edge properties "action" and "weight.
The graph may optionally have the edge property "prob".
The initial_state parameter should the id of a vertex in graph.
Adds a "delete" property to the edges; if edge["delete"] = 1, then it is marked for deletion later.
:param graph:
:param initial_state:
"""
self.graph = graph
self.graph.es["delete"] = [0] * self.graph.ecount()
self.q0 = initial_state
self.qn = self.q0
self.t = 0
self.q_previous = []
self.t_previous = []
self.num_clones = 0
self.counter = False
self.propositions = atomic_propositions
self.prob = "prob" in self.graph.es.attribute_names()
@classmethod
def from_matrix(cls, adj_matrix, atomic_propositions, label_asg, weight_asg=None, initial_state=0):
"""
Create an automaton from an adjacency matrix and the labels assigned to each state
:param adj_matrix:
:param atomic_propositions: a list of strings, e.g. ['a', 'b', 'p', 'q']
:param label_asg: a list of label assignments in order of vertices, e.g. ["a, b", "b, q", "p, q"]
:param weight_asg: a list of weight assignments in order of edges, e.g. [1, 0.3, -2.5, 6, 8, 10]
:param initial_state: a vertex number to be the starting state of the automaton
:return:
"""
graph = Graph.Adjacency(adj_matrix)
if not weight_asg:
weight_asg = np.zeros(graph.ecount())
graph.es["weight"] = weight_asg
graph.es["label"] = weight_asg
state_names = [str(v.index) for v in graph.vs]
graph.vs["name"] = state_names
graph.vs["label"] = label_asg
return cls(graph, initial_state, atomic_propositions)
@classmethod
def with_actions(cls, graph, actions, q0=0, probs=None):
"""
graph is a directed igraph graph object
actions is a dictionary that maps actions to edges
key(action), values(list of edges)
e.g. {0:[0, 1, 3], 1:[2, 4], 2:[5], ... }
probs is a list of probabilities such that probs[k] is the
probability of following edge[k] when the action containing
edge[k] is taken.
:param graph:
:param actions:
:param q0:
:param probs:
"""
state_names = [str(v.index) for v in graph.vs]
graph.vs["name"] = state_names
graph.vs["label"] = state_names
for action in actions:
for edge in actions[action]:
edge = graph.es[edge]
edge["action"] = action
if probs:
graph.es["prob"] = probs
return cls(graph, q0)
# TODO: make a gridworld cell class?
@classmethod
def as_gridworld(cls, x_len, y_len, start=(0, 0), action_success=0.7, cells=None, default_reward=-1):
"""
Construct an automaton for an x_len-by-y_len gridworld, starting from the 'start' position, with actions up,
down, left, and right.
Each action, when taken, has a 'action_success' chance of effecting. Otherwise, another action is effected with
probability (1-action_success)/3. That is, by default, the 'up' action has a probability of 0.7 to transition
the automaton from (0,0) to (0,1). Taking the 'up' action has a probability of 0.1 to move the automaton down.
Because (0,0) is in a corner, however, moving down leaves the automaton in its same state.
The cells parameter is a list of tuples [(type, positions, reward, absorbing, accessible), ...].
Each tuple represents one class of cells in the gridworld, relays the positions of those cells, the reward
received for entering those cells, whether or not those cells can be exited, and whether or not those cells can
be entered.
The 'type' entry in the tuple is a string that denotes the class of cell; e.g. "goal", "pit", or "wall".
The 'positions' entry is a list of 2-tuples (x, y) that denotes the locations of the cells of the given type in
this gridworld. E.g. [(0,0), (2,2), (1,3)].
The 'reward' entry is a real value that denotes the reward for entering a cell of the given type; e.g. 10.7.
The 'absorbing' entry is a boolean value that denotes if cells of the given type are absorbing states.
The 'accessible' entry is a boolean value that denotes if cells of the given type can be entered.
If cells is left as None, then no cells are specified, and all cells are accessible, non-absorbing, and have a
reward of 'default_reward'.
An example cells input for a basic 4x3 gridworld is as follows:
cells=[("goal", [(3, 2)], 10, True, True),
("pit", [(2, 2)], -50, True, True),
("wall", [(1, 1)], -1, False, False)]
This places a goal in the upper-right of the grid with reward 10, and is an absorbing state,
a pit just below the goal with a reward of -50, and is an absorbing state,
and a wall just north-east of the starting position that is inaccessible. Note that because the wall is
not accessible, its 'reward' and 'absorbing' entries are irrelevant.
If a cell is not included among the positions in the 'cells' parameter, its type is "default", it is accessible,
and not absorbing, and the reward for entering it is 'default_reward'.
:param x_len:
:param y_len:
:param start:
:param action_success:
:param default_reward:
:param cells:
"""
n = x_len * y_len
g_new = Graph(directed=True)
pos_to_type = {}
type_to_spec = {}
default_spec = ("default", [], default_reward, False, True)
# cache information about cell positions and types for future use
for spec in cells:
cell_type = spec[0]
type_to_spec[cell_type] = spec
cell_poss = spec[1]
for cell_pos in cell_poss:
pos_to_type[cell_pos] = cell_type
positions = product(range(x_len), range(y_len))
# set up the attribute dictionary so all vertices can be added in one go
v_attr_dict = {"x": [], "y": [], "pos": [], "label": [], "name": [], "type": [],
"absorbing": [], "accessible": [], "reward": []}
k = 0
for y in range(y_len):
for x in range(x_len):
v_attr_dict["x"].append(x)
v_attr_dict["y"].append(y)
v_attr_dict["pos"].append((x, y))
v_attr_dict["label"].append(str((x, y)))
v_attr_dict["name"].append(str(k))
cell_type = pos_to_type.get((x, y), "default")
cell_spec = type_to_spec.get(cell_type, default_spec)
v_attr_dict["type"].append(cell_type)
v_attr_dict["reward"].append(cell_spec[2])
v_attr_dict["absorbing"].append(cell_spec[3])
v_attr_dict["accessible"].append(cell_spec[4])
k += 1
# add a vertex for every position, and set its attributes
g_new.add_vertices(n, attributes=v_attr_dict)
# set the four actions and the effect of actually following that action
actions = ["up", "down", "left", "right"]
effects = [(0, 1), (0, -1), (-1, 0), (1, 0)]
# define the probability that an effect is effected in the case that the effect is not the assumed consequence
# of a taken action
off_prob = (1.0 - action_success) / 3.0
# set up attribute dictionary and edge list so all edges can be added in one go
edge_tuples = []
signatures = []
e_attr_dict = {"action": [], "weight": [], "prob": []}
# for each vertex...
for v in g_new.vs:
effect_targets = []
# if that vertex is inaccessible
if not v["accessible"]:
# just add a self edge for consistency reasons
effect_targets = [v] * len(effects)
elif v["absorbing"]:
# if it's absorbing, set the target of each effect to itself
effect_targets = [v] * len(effects)
else:
# otherwise, for each effect...
for effect in effects:
# get the x and y positions that the automaton would enter under that effect
next_x = v["x"] + effect[0]
next_y = v["y"] + effect[1]
# if those x and y positions are inside the bounds...
if 0 <= next_x < x_len and 0 <= next_y < y_len:
# get the target of the effect...
next_v = g_new.vs.find(pos=(next_x, next_y))
# if the target isn't accessible...
if not next_v["accessible"]:
# then the effect leads to the same state
next_v = v
else:
# the target of the effect is out of bounds, so the effect would lead to the same state
next_v = v
# record the target of each effect
effect_targets.append(next_v)
# for each action that can be taken...
for i, action in enumerate(actions):
# for each effect that action can have...
for j, effect in enumerate(effects):
# get the target of this effect
next_v = effect_targets[j]
signature = (v.index, next_v.index, action)
# in the case that the effect matches the action taken...
if i == j:
# the probability of this effect is the action_success probability
prob = action_success
else:
# otherwise, the probability of this effect is the off_prob probability
prob = off_prob
if signature not in signatures:
signatures.append(signature)
# record an edge from v to next_v
edge_tuples.append((v.index, next_v.index))
# record the attributes of this edge
e_attr_dict["action"].append(action)
e_attr_dict["weight"].append(next_v["reward"])
e_attr_dict["prob"].append(prob)
else:
sig_index = signatures.index(signature)
e_attr_dict["prob"][sig_index] += prob
# add an edge for each (vertex*action*effect), with associated attributes
g_new.add_edges(edge_tuples, e_attr_dict)
v_0 = g_new.vs.find(pos=start)
return cls(g_new, v_0.index)
def k(self, i):
"""
get all actions available from vertex i.
:param i:
:return:
"""
es = self.graph.es.select(_source=i)
actions = []
for edge in es:
action = edge["action"]
if action not in actions:
actions.append(action)
return actions
def setCounter(self, var_name='c', start=0, term=1000):
"""
create a simple counter in this automaton
:param var_name:
:param start:
:param term:
:return:
"""
self.counter = (var_name, start, term)
def forceKn(self, kn, source=0):
"""
delete all edges from source vertex with edges in kn that are not
themselves in kn.
:param kn:
:param source:
:return:
"""
# find all edges with this source
candidates = self.graph.es.select(_source=source)
# find all candidates not in kn
selections = candidates.select(action_ne=kn)
# remove these edges from the graph
self.graph.delete_edges(selections)
return self
def forceEn(self, en, source=0):
"""
delete all edges from source vertex that are not
themselves in en.
:param en:
:param source:
:return:
"""
# find all edges with this source.
# when an index is deleted, the indices change
# so I need to delete all edges at once
# I can't query the edge indices here, so I'll give the edges temporary names
# and grab the ones with names different from en
candidates = self.graph.es.select(_source=source)
for edge in candidates:
if edge.index != en.index:
edge["delete"] = 1
else:
edge["delete"] = 0
candidates = candidates.select(delete=1)
self.graph.delete_edges(candidates)
return self
def forceQn(self, qn, source=0):
"""
delete all edges from source vertex with edges that do not lead to given
vertex.
:param qn:
:param source:
:return:
"""
# find all edges with this source
candidates = self.graph.es.select(_source=source)
# find all candidates not in qn
selections = candidates.select(_target_ne=qn)
# remove these edges from the graph
self.graph.delete_edges(selections)
return self
def union(self, g, target=0):
"""
modify this automaton such that transitions in itself to the target
state are replaced with transitions to automaton g.
:param g:
:param target:
:return:
"""
# recall certain properties of the given graphs
v_mod = self.graph.vcount() + target % g.graph.vcount()
# find the transitions to the target state not from previous state
if len(self.q_previous) > 0:
# es = self.graph.es.select(_target=target, _source_notin=[self.q_previous[-1]])
es = self.graph.es.select(_target=target)
else:
es = None
# if no
if not es:
return self
else:
self.num_clones += 1
labels = self.graph.vs["label"] + [label + "-" + str(self.num_clones)
for label in g.graph.vs["label"]]
names = self.graph.vs["name"] + g.graph.vs["name"]
weights = self.graph.es["weight"] + g.graph.es["weight"]
actions = self.graph.es["action"] + g.graph.es["action"]
if self.prob:
probs = self.graph.es["prob"] + g.graph.es["prob"]
else:
probs = None
# take the disjoint union of this graph and the given graph
self.graph = self.graph.disjoint_union(g.graph)
# reinstate edge and vertex attributes
self.graph.vs["label"] = labels
self.graph.vs["name"] = names
self.graph.es["weight"] = weights
self.graph.es["action"] = actions
if probs:
self.graph.es["prob"] = probs
# properties = [(e.source, e["action"], e["weight"]) for e in es]
# for each edge, make a replacement edge to new graph
for edge in es:
new_edge = self.graph.add_edge(edge.source, self.graph.vs[v_mod])
new_edge["action"] = edge["action"]
new_edge["weight"] = edge["weight"]
if probs:
new_edge["prob"] = edge["prob"]
# delete the edges
if len(self.q_previous) > 0:
# self.graph.delete_edges(_target=target, _source_notin=[self.q_previous[-1]])
self.graph.delete_edges(_target=target)
# self.graph.delete_vertices(VertexSeq(self.graph, target))
else:
self.graph.delete_edges(_target=target)
return self
def optimal(self, discount, best=True, punish=-1000, steps=100):
mod = 1
if not best:
mod = -1
tr = self.to_mdp(best, punish)
sol = mdp.ValueIteration(tr[0], tr[1], discount)
sol.run()
return sol.V[self.q0] * mod
def to_mdp(self, best=True, punish=-1000):
"""
solve graph as MDP for most (or least) optimal strategy and return value
:param best:
:param punish:
:return:
"""
vcount = self.graph.vcount()
ecount = self.graph.ecount()
# t represents the transition probabilities for each "action" from one
# "state" to another "state, where every action is associated with a
# transition, and every state is represented by a vertex.
# The matrix may be considered, along the "action" vertex, as specifying
# the probability that action has of moving the process from state A
# to state B. As we are treating each transition as a sure thing,
# in the case that we are evaluating a DAU automaton, all
# probabilities are 1. E.g. if edge 2 in the graph points from vertex 3
# to vertex 1, then the entry t[2, 3, 1] = 1. The t matrix must be row
# stochastic, so there must be an entry of 1 in each row; i.e. for each
# source state, there must be some effect on the process if the process
# takes that edge - even if the edge is not connected to the source.
# In the given example, t[2, 3, j] = 0 where j != 1, since it is clear
# that taking edge 2 from 3 to any edge other than 1 is impossible,
# however t[2, j, j] = 1 where j != 3, since there *must* be an action
# for each edge-state pair. Due to this discrepancy in representations,
# the only reasonable choice is to say that trying to take edges that do
# not begin from the current vertex leaves you in the current vertex.
# Letting the process "wait" by "taking" an edge not connected to the
# current state can be problematic when there are negative weights.
# If there are negative weights, the MDP seems to want to wait unless
# it gets punished for doing so, since 0 reward is better than -x.
# To prevent this behavior, rewards for taking actions that correspond
# with moving on edges not connected to the current vertex are deeply
# negative. If the range of rewards for the automaton are lower than
# this value, it will have to be changed, so it's a parameter with
# default of -1000.
# The r matrix, itself, has the same shape as the t matrix, and each
# entry in r provides the reward for taking the transition in t that has
# the same index. E.g. if moving from vertex 3 to vertex 1 via edge 2
# has the reward of 5, then r[2, 3, 1] = 5. For "wait" actions, the
# reward is equal to the punish value, e.g. r[2, j, j] = -1000 where
# j != 1. NOTE: Currently, all elements of r are set to a value (either
# a valid weight, or the punish value). Rewards associated with elements
# of t where the transition probability is 0 may also be set to 0 if we
# want to switch to sparse matrix representations.
if self.prob:
actions = set(self.graph.es.get_attribute_values("action"))
t = np.zeros((len(actions), vcount, vcount))
r = np.full((len(actions), vcount, vcount), punish)
else:
t = np.zeros((ecount, vcount, vcount))
r = np.full((ecount, vcount, vcount), punish)
# mod negates the weights of the system if we're looking for the worst
# possible execution (except punishment weights, otherwise the system
# would do nothing at all).
mod = 1
if not best:
mod = -1
if self.prob:
# we're dealing with an actual MDP! Construct the mdp differently
# start by getting a list of all actions
actions = set(self.graph.es.get_attribute_values("action"))
# for each action...
for i, action in enumerate(actions):
# find the edges that are in that action
edges = self.graph.es.select(action_eq=action)
# for each such edge...
for j, edge in enumerate(edges):
tup = edge.tuple
# set the transition probability for action i for this edge to its prob
t[i, tup[0], tup[1]] = edge["prob"]
r[i, tup[0], tup[1]] = edge["weight"] * mod
# sometimes an action may not be conditionally possible, so check for rows in t that are all zeros
# in which case create a self-transition with probability 1 and reward = punish
for k in range(vcount):
if sum(t[i, k]) == 0:
t[i, k, k] = 1
r[i, k, k] = punish
else:
# This loop iterates through the edges in the graph so each transition
# matrix can be provided for every edge.
# for each edge...
for i, edge in enumerate(self.graph.es):
tup = edge.tuple
# ... for each vertex considered as source...
for j in range(vcount):
# ... if this vertex actually is the source of this edge...
if j == tup[0]:
# ... the transition probability from source to target is 1
t[i, tup[0], tup[1]] = 1
else:
# ... otherwise, taking this edge is a "wait" action.
t[i, j, j] = 1
# ... change the reward corresponding to actually taking the edge.
r[i, tup[0], tup[1]] = edge["weight"] * mod
return (t, r)
def checkCTL(self, file, x, verbose=False):
"""
Checks the automaton for a given CTL specification
:param file:
:param x:
:param verbose:
:return:
"""
# convert graph to nuXmv model
self.convertToNuXmv(file, x)
# nuxmv = "nuXmv"
# TODO: extract this and make it easier to change
# nuxmv = "E:\\Programs\\nuXmv-2.0.0-win64\\bin\\nuXmv.exe"
# nuxmv = "/home/colin/Downloads/nuXmv-2.0.0-Linux/bin/nuXmv"
nuxmv = "nuXmv"
# with open("cmd.txt", 'w') as f:
# f.write("read_model -i " + file + "\n")
# f.write("flatten_hierarchy\n")
# f.write("encode_variables\n")
# f.write("build_model\n")
# f.write("check_ctlspec -p \"" + x + "\"")
# out = subprocess.run([nuxmv, "-source", "cmd.txt", file], shell=True, stdout=subprocess.PIPE)
# out = subprocess.run([nuxmv, file], shell=True, stdout=subprocess.PIPE)
out = subprocess.run([nuxmv, file], stdout=subprocess.PIPE)
check = "true" in str(out.stdout)
if verbose:
print(out.stdout)
return check
def checkLTL(self, file, x, verbose=False):
"""
Checks the automaton for a given LTL specification
:param file:
:param x:
:param verbose:
:return:
"""
# convert graph to nuXmv model
self.convertToNuXmv(file, x, lang="LTL")
# TODO: extract this and make it easier to change
# nuxmv = "E:\\Programs\\nuXmv-2.0.0-win64\\bin\\nuXmv.exe"
# nuxmv = "/home/colin/Downloads/nuXmv-2.0.0-Linux/bin/nuXmv"
nuxmv = "nuXmv"
out = subprocess.run([nuxmv, file], stdout=subprocess.PIPE)
# TODO: a more robust "api"
# - this shouldn't fail because nuXmv only prints "true" (lower case) when the spec is satisfied, or if a state
# name or label is called "true", but the way convertToNuXmv is written, no state, label, or name can be "true"
check = "true" in str(out.stdout)
if verbose:
print(out.stdout)
return check
def checkPCTL(self, m_file, p_file, x, verbose=False):
"""
Checks the automaton for a given PCTL specification
:param m_file:
:param p_file:
:param x:
:param verbose:
:return:
"""
# convert graph to PRISM model
# TODO: switch to Storm to minimize IO operations for (potential) massive speed-up
self.convertToPRISM(m_file, p_file, x)
# TODO: extract this and make it easier to change
prism = "/home/colin/prism-4.7-linux64/bin/prism"
out = subprocess.run([prism, m_file, p_file], stdout=subprocess.PIPE)
check = "true" in str(out.stdout)
if verbose:
print(out.stdout)
return check
# TODO: checkLTL by lang="LTL", tag a spec with CTL or LTL
def checkToCTL(self, file, x, negate=False, verbose=False):
"""
Checks an automaton for a CTL specification, given an LTL specification.
:param file:
:param x:
:param negate:
:param verbose:
:return:
"""
if negate:
return self.checkCTL(file, '!E' + x, verbose=verbose)
else:
return self.checkCTL(file, 'A' + x, verbose=verbose)
def convertToNuXmv(self, file, x=None, lang="CTL", return_string=False):
"""
Produces a NuXmv input file specifying this automaton.
:param file:
:param x:
:param return_string:
:return:
"""
open_mode = 'w'
out_str = ""
with open(file, open_mode) as f:
f.write("MODULE main\n\n")
self._writeStatesNuXmv(f)
self._writeNamesNuXmv(f)
self._writeVarsNuXmv(f)
# begin ASSIGN constraint for state and name transitions
f.write("ASSIGN\n")
# States:
self._writeStateTransNuXmv(f)
# Names:
self._writeNameTransNuXmv(f)
# Properties:
self._writePropTransNuXmv(f)
# Specification
if x:
f.write(lang.upper() + "SPEC " + x + ";")
f.write("\n")
if return_string:
with open(file, 'r') as f:
out_str = f.read()
return out_str
def _writeStatesNuXmv(self, f):
sep = ', '
# include each vertex as a state in the model
states = [str(v.index) for v in self.graph.vs]
states = sep.join(states)
f.write("VAR state: {" + states + "};\n\n")
def _writeNamesNuXmv(self, f):
sep = ', '
# since multiple states can be associated with the same state of a
# smaller original automaton, we want to track what that original
# state is with a name variable
names = self.graph.vs["name"]
# remove duplicates from names
names = list(set(names))
# add names variable to model
names = sep.join(names)
f.write("VAR name: {" + names + "};\n\n")
def _writeStateTransNuXmv(self, f):
sep = ', '
# set initial state
f.write(" init(state) := " + str(self.q0) + ";\n")
# define state transitions
f.write(" next(state) :=\n")
f.write(" case\n")
# for each vertex...
for v in self.graph.vs:
# ... get a string representation of all the vertex's successors
next_v = [str(vx.index) for vx in v.neighbors(mode=OUT)]
# and a string rep of this vertex
state = str(v.index)
# and write out the transitions to the case
if next_v:
next_v = sep.join(next_v)
f.write(" state = " + state + " : {" + next_v + "};\n")
# default case
f.write(" TRUE : state;\n")
f.write(" esac;\n")
f.write("\n")
def _writeNameTransNuXmv(self, f):
# set initial name
init_name = self.graph.vs["name"][self.q0]
f.write(" init(name) := " + str(init_name) + ";\n")
# define name transitions
f.write(" next(name) :=\n")
f.write(" case\n")
# for each vertex...
for v in self.graph.vs:
# ... get that vertex's name
v_name = v["name"]
# and a string rep of this vertex
state = str(v.index)
# and write out the transitions to the case based on next state
f.write(" next(state) = " + state + " : " + v_name + ";\n")
# default case
f.write(" TRUE : name;\n")
f.write(" esac;\n")
f.write("\n")
def _writeVarsNuXmv(self, f):
# if auto has a counter
if self.counter:
# ... then write the counter var
c_name = str(self.counter[0])
start = str(self.counter[1])
end = str(self.counter[2])
f.write("VAR " + c_name + " : " + start + " .. " + end + ";\n\n")
# if auto has labels
if 'label' in self.graph.vs.attributes():
# ... then write the label var
labels_size = str(len(self.propositions))
f.write("VAR label: unsigned word[" + labels_size + "];\n\n")
def _writePropTransNuXmv(self, f):
# if auto has a counter
if self.counter:
# ... then write the counter transitions
c = str(self.counter[0])
t = str(self.counter[2])
f.write(" init(" + c + ") := " + str(self.counter[1]) + ";\n")
f.write(" next(" + c + ") := (" + c + "<" + t + ")?(" + c + "+1):(" +
c + ");\n\n")
# if auto has labels
if 'label' in self.graph.vs.attributes():
# set up translation between label strings and bit strings.
# bit strings get used in nuXmv to represent which propositions a state has.
# e.g. if the language has 4 propositions ['p', 'q', 'r', 's'], and state 0 has propositions "p, s",
# then the bit string representing the labels of state 0 would be 0ub8_1001.
word_size = len(self.propositions)
no_labels = [0] * len(self.propositions)
prop_dict = {}
for i, prop in enumerate(self.propositions):
label_bits = copy(no_labels)
label_bits[i] = 1
prop_dict[prop] = label_bits
# set initial label
# get the string representation of state labels, e.g. "p, s"
init_props = self.graph.vs["label"][self.q0].split(', ')
# get a list of bit string representations for each proposition, e.g. [[1, 0, 0, 0], [0, 0, 0, 1]]
init_props_bits = [prop_dict[prop] for prop in init_props]
# combine those bit string representations into a single bit string, e.g. ['1', '0', '0', '1']
init_label = [str(sum(bits)) for bits in zip(*init_props_bits)]
# join those bits into a string, e.g. "1001"
init_label_str = ''.join(init_label)
f.write(" init(label) := 0ub" + str(word_size) + "_" + str(init_label_str) + ";\n")
# define label transitions
f.write(" next(label) :=\n")
f.write(" case\n")
# for each vertex...
for v in self.graph.vs:
# ... get that vertex's propositions
v_props = v["label"].split(', ')
# then a list of bits
v_props_bits = [prop_dict[prop] for prop in v_props]
# combine those into a bit string
v_label = [str(sum(bits)) for bits in zip(*v_props_bits)]
v_label_str = ''.join(v_label)
# and get a string rep of this vertex
state = str(v.index)
# and write out the transitions to the case based on next state
f.write(" next(state) = " + state + " : 0ub" + str(word_size) + "_" + v_label_str + ";\n")
# default case
f.write(" TRUE : label;\n")
f.write(" esac;\n")
f.write("\n")
# define relationship between bit words and original propositions
f.write("DEFINE\n")
for prop in self.propositions:
# build logical equivalence string
logical_bits = prop_dict[prop]
logical_bits = ''.join([str(bit) for bit in logical_bits])
f.write(" " + str(prop) + " := (0ub" + str(word_size) + "_" + logical_bits + " & label) = 0ub" +
str(word_size) + "_" + logical_bits + ";\n")
f.write("\n")
def convertToPRISM(self, m_file, p_file, x):
"""
Produces a PRISM model file specifying this automaton, and a properties file specifying the formula
:param m_file:
:param p_file:
:param x:
:return:
"""
with open(m_file, 'w') as f:
f.write("mdp\n")
f.write("module main\n")
states = [v.index for v in self.graph.vs]
names = self.graph.vs["name"]
names = list(set(names))
names = [int(namei) for namei in names]
# make a state variable that goes from 0 to number of states; init q0.id
f.write("state : [0.." + str(max(states)) + "] init " + str(self.q0) + ";\n")
# make a name variable that goes from 0 to maximum name; init q0.name
f.write("name : [0.." + str(max(names)) + "] init " + self.graph.vs["name"][self.q0] + ";\n")
# for each vertex...
for v in self.graph.vs:
# for each action at vertex...
for k in self.k(v):
# initialize command string
command = " [] state=" + str(v.index) + " -> "
plus = ""
# get the edges from this vertex that are part of this action
esi = self.graph.es.select(_source_eq=v, action_eq=k)
# for each edge in that action...
for e in esi:
prob = str(e["prob"])
tgt_id = str(e.target_vertex.index)
tgt_nm = str(e.target_vertex["name"])
command += plus + prob + " : (state'=" + tgt_id + ")&(name'=" + tgt_nm + ")"
if not plus:
plus = " + "
command += ";\n"
f.write(command)
f.write("\n")
f.write("endmodule")
with open(p_file, 'w') as f:
# Specification
f.write(x)
f.write("\n")
def convertToMatrix(self, labeled_mat=False):
"""
Get a matrix representation of the automaton augmented with the labels on each state; e.g. for a two state
automaton with two transitions: one from the first state to the second state, and one from the second state to
itself. The first state is labeled "a, b" and the second state is labeled "a, c".
if labeled_mat is True, then the output is like [['0', 'a, c'], ['0', 'a, c']].
otherwise: [("a, b", [0, 1]), ("a, c", [0, 1])]
:return:
"""
mat = np.array(self.graph.get_adjacency().data)
labels = self.graph.vs["label"]
if labeled_mat:
return np.where(np.array(mat) > 0, labels, 0)
mat = np.where(mat > 0, 1, 0).tolist()
# TODO: add a method to minimize output (e.g., remove quotes, brackets, parens)
return list(zip(labels, mat))
class Obligation(object):
"""
Contains an obligation in Dominance Act Utilitarian deontic logic
"""
def __init__(self, phi, is_ctls, is_neg, is_pctl=False):
"""
Creates an Obligation object
:param phi:
:param is_ctls:
:param is_neg:
"""
self.phi = phi
self.is_ctls = is_ctls
self.is_neg = is_neg
self.is_stit = not is_ctls
self.is_pctl = is_pctl
self.phi_neg = False
@classmethod
def fromCTL(cls, phi):
"""
Creates an Obligation object from a CTL string
:param phi:
:return:
"""
return cls(phi, True, False)
@classmethod
def fromPCTL(cls, phi):
"""
Creates an Obligation object from a PCTL string
:param phi:
:return:
"""
return cls(phi, False, False, True)
def isCTLS(self):
"""
Checks if obligation is a well-formed CTL* formula
:return:
"""
# TODO: use grammar to check this
return self.is_ctls
def isPCTL(self):
"""
Checks if obligation is a well-formed PCTL formula
:return:
"""
# TODO: use grammar to check this
return self.is_pctl
def isSTIT(self):
"""
Checks if obligation is a well-formed dstit statement
:return:
"""
# TODO: use grammar to check this
return self.is_stit
def isNegSTIT(self):
"""
Checks if obligation is of the form ![alpha dstit: phi]
:return:
"""
return self.is_stit and self.is_neg
def getPhi(self):
"""
Gets the inner formula of the obligation
:return:
"""
return self.phi
def checkObligation(g, a, verbose=False):
"""
Check an automaton for if it has a given obligation.
:param g:
:param a:
:param verbose:
:return:
"""
# return checkConditional with trivial condition params
return checkConditional(g, a, "TRUE", 0, verbose=verbose)
# TODO: refactor checkConditional into smaller functions so I can use some of the juicy bits elsewhere
def checkConditional(g, a, x, t, verbose=False):
"""
Check an automaton for if it has a given obligation under a given condition.
:param g:
:param a:
:param x:
:param t:
:param verbose:
:return:
"""
optimal = get_optimal_automata(g, t, x, verbose)
for m in optimal:
truth_n = True
if verbose:
print(m[0])
if a.isCTLS():
# truth_n = m[1].checkToCTL('temp.smv', a.getPhi(), a.phi_neg,
# verbose=verbose)
truth_n = m[1].checkCTL('temp.smv', a.getPhi(), a.phi_neg)
elif a.isPCTL():
truth_n = m[1].checkPCTL('temp.sm', 'temp.pctl', a.getPhi())
elif a.isSTIT():
phi = a.getPhi()
if not a.isNegSTIT():
delib = not g.checkToCTL('temp.smv', phi, a.phi_neg,
verbose=verbose)
guaranteed = m[1].checkToCTL('temp.smv', phi, a.phi_neg,
verbose=verbose)
if verbose:
print("deliberate: ", delib)
print("guaranteed: ", guaranteed)
truth_n = delib and guaranteed
else:
not_delib = g.checkToCTL('temp.smv', phi, a.phi_neg,
verbose=verbose)
guaranteed = m[1].checkToCTL('temp.smv', phi, a.phi_neg,
verbose=verbose)
if verbose:
print("not deliberate: ", not_delib)
print("not guaranteed: ", not guaranteed)
truth_n = not_delib or not guaranteed
else:
raise ValueError(
'The given obligation was not a well formed CTL* formula, ' +
'nor a well formed deliberative STIT statement.',
a)
if not truth_n:
return False
return True