Skip to content

Commit

Permalink
Working Lean Refinement Proof dump
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Vin committed Jan 22, 2025
1 parent d5d5f80 commit f7dd545
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 65 deletions.
45 changes: 26 additions & 19 deletions examples/contracts/dev.contract
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ component ControlSystem(target_dist, max_speed, min_dist, max_brake):
heading: float

outputs:
foo: float
control_action: RegulatedControlAction

compose:
Expand All @@ -188,9 +187,6 @@ component ControlSystem(target_dist, max_speed, min_dist, max_brake):
pid_ss = PIDSteeringSystem()
ag = ActionGenerator()

# DEBUG
connect dist to foo

# Connect sensors inputss
connect dist to pid_ts.dist
connect speed to pid_ts.speed
Expand Down Expand Up @@ -284,7 +280,7 @@ contract AccurateRelativeSpeeds(perception_distance, abs_dist_err=0.5, abs_speed

# Lead speed can be roughly calculated by comparing two timesteps
relative_speed = ((next dist) - dist)/self.timestep
true_relative_speed = lead_car.speed - self.speed
true_relative_speed = self.speed - lead_car.speed

assumptions:
# Assume we are in a lane and have relatively accurate distance
Expand All @@ -299,7 +295,7 @@ contract AccurateRelativeSpeeds(perception_distance, abs_dist_err=0.5, abs_speed
# Guarantee that the perceived speed of the lead car is relatively accurate
always (behind_car implies (relative_speed-abs_speed_err <= true_relative_speed <= relative_speed+abs_speed_err))

contract SafeThrottleFilter(perception_distance, min_dist, abs_dist_err=0.5, abs_speed_err=0.5, max_brake=float("inf")):
contract SafeThrottleFilter(perception_distance, min_dist, max_speed, abs_dist_err=0.5, abs_speed_err=0.5, max_brake=float("inf"), max_accel=float("inf")):
""" A contract stating that if inputs indicate we are too close for safety, we issue a braking action."""
inputs:
dist: float
Expand All @@ -310,17 +306,23 @@ contract SafeThrottleFilter(perception_distance, min_dist, abs_dist_err=0.5, abs
modulated_throttle: float

definitions:
cars = [obj for obj in objects if hasattr(obj, "isVehicle") and obj.isVehicle and obj.position is not self.position]
lead_distances = {car: leadDistance(self, car, workspace.network, maxDistance=2*perception_distance) for car in cars}
lead_car = sorted(cars, key=lambda x: lead_distances[x])[0]

# Lead speed can be roughly calculated by comparing two timesteps
relative_speed = ((next dist) - dist)/self.timestep if (next(next dist)) else ((next dist) - dist)/self.timestep
true_relative_speed = lead_car.speed - speed
true_relative_speed = self.speed - lead_car.speed

stopping_time = ceil(speed/max_brake)
rel_dist_covered = stopping_time*(speed + abs_speed_err)
buffer_dist = min_dist + rel_dist_covered
stopping_time = ceil(self.speed/max_brake)
rel_dist_covered = stopping_time * (true_relative_speed+abs_speed_err)
delta_stopping_time = ceil((self.speed+max_accel)/max_brake)
max_rdc_delta = delta_stopping_time * (true_relative_speed + max_brake + max_accel + abs_speed_err) - rel_dist_covered
buffer_dist = min_dist + (max(0, max_rdc_delta + rel_dist_covered)) + max_speed + 1

guarantees:
# Guarantee that if we sense we are too close to the car in front of us, we brake with max force.
always (dist < buffer_dist+abs_dist_error) implies (modulated_throttle == -1)
always (dist <= buffer_dist+abs_dist_err) implies (modulated_throttle == -1)

contract PassthroughBrakingActionGenerator():
""" A contract stating that the outputted action represents the input throttle"""
Expand All @@ -347,7 +349,7 @@ contract MaxBrakingForce(max_brake):

guarantees:
always ((no_throttle and full_brake)
implies (self.speed == 0 or ((next self.speed) == self.speed - max_brake))
implies (((next self.speed) == 0) or ((next self.speed) == self.speed - max_brake))
)

## Overall System Spec ##
Expand All @@ -364,13 +366,13 @@ contract KeepsDistance(perception_distance, min_dist, max_speed, abs_dist_err, a
lead_dist = lead_distances[lead_car]
behind_car = lead_dist <= perception_distance

true_relative_speed = lead_car.speed - self.speed
true_relative_speed = self.speed - lead_car.speed

stopping_time = ceil(self.speed/max_brake)
rel_dist_covered = stopping_time*self.speed + (true_relative_speed)
rel_dist_covered = stopping_time * (true_relative_speed+abs_speed_err)
delta_stopping_time = ceil((self.speed+max_accel)/max_brake)
max_rdc_delta = delta_stopping_time*self.speed + (true_relative_speed + max_brake + max_accel + abs_speed_err)
buffer_dist = min_dist + (max(0, max_rdc_delta + rel_dist_covered))
max_rdc_delta = delta_stopping_time * (true_relative_speed + max_brake + max_accel + abs_speed_err) - rel_dist_covered
buffer_dist = min_dist + (max(0, max_rdc_delta + rel_dist_covered)) + max_speed + 1

assumptions:
# Assume we are in a lane
Expand All @@ -389,14 +391,16 @@ contract KeepsDistance(perception_distance, min_dist, max_speed, abs_dist_err, a
always -max_brake <= (next lead_car.speed) - lead_car.speed <= max_accel

# Assume we start stopped outside the danger zone
(lead_dist > (buffer_dist+10)) and (self.speed == 0)
(lead_dist > buffer_dist) and (self.speed == 0)

# Assume no one cuts in front us too closely
always ((next lead_dist) == (lead_dist - true_relative_speed))

always (self.timestep == 0.1)

guarantees:
# Guarantee that we always stay at least min_dist behind the lead car
always behind_car implies (lead_dist > min_dist)
always lead_dist > min_dist

## Verification Steps ##
# Constants
Expand Down Expand Up @@ -438,8 +442,11 @@ cs_safety = compose over car.cs:
assume car.cs.tsf satisfies SafeThrottleFilter(
PERCEPTION_DISTANCE,
MIN_DIST,
MAX_SPEED,
abs_dist_err=ABS_DIST_ERR,
abs_speed_err=ABS_SPEED_ERR,
max_brake=MAX_BRAKE)
max_brake=MAX_BRAKE,
max_accel=MAX_ACCEL)

# TODO: Replace with proof
assume car.cs.ag satisfies PassthroughBrakingActionGenerator()
Expand Down
36 changes: 33 additions & 3 deletions src/scenic/contracts/composition.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from copy import deepcopy
from functools import cached_property
import itertools
from math import prod

from scenic.contracts.components import ActionComponent, BaseComponent, ComposeComponent
from scenic.contracts.contracts import ContractResult, VerificationTechnique
import scenic.contracts.specifications as specs
from scenic.syntax.compiler import NameFinder, NameSwapTransformer


Expand Down Expand Up @@ -135,7 +137,9 @@ def __init__(
assumption_decoding_transformer = NameSwapTransformer(assumption_decoding_map)

guarantee_decoding_map = {}
for port in self.component.outputs_types.keys():
for port in itertools.chain(
self.component.outputs_types.keys(), self.component.inputs_types.keys()
):
guarantee_decoding_map[self.encoding_map[((port, None))]] = port
guarantee_decoding_transformer = NameSwapTransformer(guarantee_decoding_map)

Expand Down Expand Up @@ -223,15 +227,41 @@ def __init__(
assert len(self.extractTempVars(spec.getAtomicNames())) == 0

# TODO: Replace this with assertion above when PACTI implemented
tl_guarantees = [
valid_guarantees = [
spec
for spec in tl_guarantees
if len(self.extractTempVars(spec.getAtomicNames())) == 0
]

## TODO: Deprecate below with PACTI integration
## Try to salvage other guarantees
dropped_guarantees = [
spec
for spec in tl_guarantees
if len(self.extractTempVars(spec.getAtomicNames())) != 0
]

while dropped_guarantees:
g1 = dropped_guarantees.pop(0)

if isinstance(g1, specs.Always) and isinstance(g1.sub, specs.Implies):
for g2 in filter(
lambda x: isinstance(x, specs.Always)
and isinstance(x.sub, specs.Implies),
dropped_guarantees,
):
g1_a, g1_b = g1.sub.sub1, g1.sub.sub2
g2_a, g2_b = g2.sub.sub1, g2.sub.sub2

if g1_b == g2_a:
new_g = specs.Always(specs.Implies(g1_a, g2_b))

if len(self.extractTempVars(new_g.getAtomicNames())) == 0:
valid_guarantees.append(new_g)

## Store assumptions and guarantees
self._assumptions = tl_assumptions
self._guarantees = tl_guarantees
self._guarantees = valid_guarantees

# TODO: Clear atomic source syntax strings?

Expand Down
123 changes: 87 additions & 36 deletions src/scenic/contracts/refinement.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from copy import deepcopy
from functools import cached_property
import os
from pathlib import Path

from scenic.contracts.composition import Composition
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self, proof_loc):
self.proof_loc = proof_loc

def check(self, stmt, contract):
## Extract and standardize intermediate assumptions and guarantees
## Extract and standardize internal assumptions and guarantees
i_assumptions = []
i_guarantees = []
for sub_stmt in stmt.sub_stmts:
Expand All @@ -77,21 +78,21 @@ def check(self, stmt, contract):
## Add specs to accumulated i specs
i_guarantees += guarantees

## Simplify assumptions and guarantees, removing duplicates
new_i_assumptions = []
for spec in i_assumptions:
if not any(spec == e_spec for e_spec in new_i_assumptions):
new_i_assumptions.append(spec)
i_assumptions = new_i_assumptions

new_i_guarantees = []
for spec in i_guarantees:
if not (
any(spec == e_spec for e_spec in new_i_guarantees)
and not any(spec == e_spec for e_spec in new_i_assumptions)
):
new_i_guarantees.append(spec)
i_guarantees = new_i_guarantees
## Simplify assumptions and guarantees, removing duplicates
new_i_assumptions = []
for spec in i_assumptions:
if not any(spec == e_spec for e_spec in new_i_assumptions):
new_i_assumptions.append(spec)
i_assumptions = new_i_assumptions

new_i_guarantees = []
for spec in i_guarantees:
if not (
any(spec == e_spec for e_spec in new_i_guarantees)
and not any(spec == e_spec for e_spec in new_i_assumptions)
):
new_i_guarantees.append(spec)
i_guarantees = new_i_guarantees

## Extract and standardize refinement assumptions and guarantees
tl_assumptions = [deepcopy(spec) for spec in contract.assumptions]
Expand All @@ -114,27 +115,22 @@ def check(self, stmt, contract):
## Extract defs from all specs
defs = {}
for spec in i_assumptions + i_guarantees + tl_assumptions + tl_guarantees:
for d_name, d_spec in spec.getDefs():
raw_new_defs = spec.getDefs()

for d_name, d_spec in [
d for _, d in sorted(raw_new_defs, key=lambda x: x[0])
]:
if d_name not in defs:
defs[d_name] = d_spec
else:
if defs[d_name] != d_spec:
# TODO: Handle renames
assert False

print("Atomics:")
for a in atomics:
print(f" {a[0]}: {a[1]}")
print()

print("Defs:")
for d_name, d_spec in defs.items():
print(f" {d_name}: {d_spec}")
print()

## Setup proof directory
Path(self.proof_loc).mkdir(parents=True, exist_ok=True)

## Output Lean Data File
with open(self.proof_loc / "Lib.lean", "w") as f:
# Imports
f.write("import Sceniclean.Basic\n\n")
Expand All @@ -144,24 +140,24 @@ def check(self, stmt, contract):
f.write(" -- Props\n")
for i, a in enumerate(prop_atomics):
f.write(f" P{i}: Prop\n")
f.write(" -- Reals\n")
f.write(" -- Numbers\n")
for i, a in enumerate(real_atomics):
f.write(f" N{i}: Real\n")
f.write(f" N{i}: \n")
f.write("\n")

# Prop Signals
f.write("-- Prop Signals\n")
for i, a in enumerate(prop_atomics):
f.write(
f" abbrev {a.toLean()} : TraceSet TraceState := TraceSet.of (·.P{i})\n"
f"abbrev {a.toLean()} : TraceSet TraceState := TraceSet.of (·.P{i})\n"
)
f.write("\n")

# Real Signals
f.write("-- Real Signals\n")
# Numerical Signals
f.write("-- Numerical Signals\n")
for i, a in enumerate(real_atomics):
f.write(
f" abbrev {a.toLean()} : TraceSet TraceState := TraceSet.of (·.N{i})\n"
f"abbrev {a.toLean()} : TraceFun TraceState := TraceFun.of (·.N{i})\n"
)
f.write("\n")

Expand All @@ -171,11 +167,66 @@ def check(self, stmt, contract):
f.write(f"abbrev {d_name} := FLTL[{d_spec.toLean()}]\n")
f.write("\n")

breakpoint()
TOP = "\u22a4"

# Top Level Assumptions
f.write("-- Top Level Assumptions \n")
for i, a in enumerate(tl_assumptions):
f.write(f"abbrev A{i} := FLTL[{a.toLean()}]\n")
f.write("\n")
f.write(
f"abbrev assumptions : TraceSet TraceState := FLTL[{' ∧ '.join(f'A{i}' for i in range(len(tl_assumptions))) if tl_assumptions else TOP}]\n"
)
f.write("\n")

## Output Lean Library File
# Internal Assumptions
f.write("-- Internal Assumptions \n")
for i, a in enumerate(i_assumptions):
f.write(f"abbrev IA{i} := FLTL[{a.toLean()}]\n")
f.write("\n")
f.write(
f"abbrev i_assumptions : TraceSet TraceState := FLTL[{' ∧ '.join(f'IA{i}' for i in range(len(i_assumptions))) if i_assumptions else TOP}]\n"
)
f.write("\n")

breakpoint()
# Internal Guarantees
f.write("-- Internal Guarantees \n")
for i, g in enumerate(i_guarantees):
f.write(f"abbrev IG{i} := FLTL[{g.toLean()}]\n")
f.write("\n")
f.write(
f"abbrev i_guarantees : TraceSet TraceState := FLTL[{' ∧ '.join(f'IG{i}' for i in range(len(i_guarantees))) if i_guarantees else TOP}]\n"
)
f.write("\n")

# Guarantees
f.write("-- Top Level Guarantees \n")
for i, g in enumerate(tl_guarantees):
f.write(f"abbrev G{i} := FLTL[{g.toLean()}]\n")
f.write("\n")
f.write(
f"abbrev guarantees : TraceSet TraceState := FLTL[{' ∧ '.join(f'G{i}' for i in range(len(tl_guarantees))) if tl_guarantees else TOP}]\n"
)
f.write("\n")

## Output Lean Proof Files
if not os.path.exists(self.proof_loc / "AssumptionProof.lean"):
with open(self.proof_loc / "AssumptionProof.lean", "w") as f:
f.write("import Lib\n")
f.write("\n")
f.write(
"theorem imp_assumptions : FLTL[(assumptions)] ⇒ FLTL[i_assumptions] := by\n"
)
f.write(" sorry\n")

if not os.path.exists(self.proof_loc / "GuaranteesProof.lean"):
with open(self.proof_loc / "GuaranteesProof.lean", "w") as f:
f.write("import Lib\n")
f.write("\n")
f.write(
"theorem imp_guarantees : FLTL[(assumptions ∧ i_guarantees)] ⇒ FLTL[guarantees] := by\n"
)
f.write(" sorry\n")

def __str__(self):
return f"LeanProof ('{self.proof_loc}')"
Loading

0 comments on commit f7dd545

Please sign in to comment.