Skip to content

Commit

Permalink
Merge pull request zhm-real#1 from sldai/master
Browse files Browse the repository at this point in the history
unify bfs, dfs, Dijkstra, best_first, Astar
  • Loading branch information
zhm-real authored Aug 21, 2020
2 parents 538e218 + cfed232 commit a53646b
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 326 deletions.
4 changes: 3 additions & 1 deletion Search_based_Planning/Search_2D/Astar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")

from Search_based_Planning.Search_2D import plotting, env
from Search_2D import plotting, env


class AStar:
"""AStar set the cost + heuristics as the priority
"""
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start = s_start
self.s_goal = s_goal
Expand Down
103 changes: 20 additions & 83 deletions Search_based_Planning/Search_2D/Best_First.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,116 +11,53 @@
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")

from Search_based_Planning.Search_2D import plotting, env
from Search_2D import plotting, env
from Search_2D.Astar import AStar


class BestFirst:
def __init__(self, s_start, s_goal):
self.s_start = s_start
self.s_goal = s_goal

self.Env = env.Env()
self.plotting = plotting.Plotting(self.s_start, self.s_goal)

self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles

self.OPEN = [] # OPEN set: visited nodes
self.CLOSED = [] # CLOSED set / visited order
self.PARENT = dict() # recorded parent

class BestFirst(AStar):
"""BestFirst set the heuristics as the priority
"""
def searching(self):
"""
Best-first Searching
:return: planning path, visited order
Breadth-first Searching.
:return: path, visited order
"""

self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(self.heuristic(self.s_start), self.s_start))

while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)

if s == self.s_goal:
break
self.CLOSED.append(s)

for s_n in self.get_neighbor(s):
if self.is_collision(s, s_n):
continue

if s_n not in self.PARENT: # node not explored
heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
self.PARENT[s_n] = s

return self.extract_path(), self.CLOSED

def heuristic(self, s):
"""
estimated distance between current state and goal state.
:param s: current state
:return: Euclidean distance
"""

return math.hypot(s[0] - self.s_goal[0], s[1] - self.s_goal[1])

def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""

return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]

def is_collision(self, s_start, s_end):
"""
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
"""

if s_start in self.obs or s_end in self.obs:
return True

if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
new_cost = self.g[s] + self.cost(s, s_n)

if s1 in self.obs or s2 in self.obs:
return True
if s_n not in self.g:
self.g[s_n] = math.inf

return False

def extract_path(self):
"""
Extract the path based on the relationship of nodes.
:return: The planning path
"""

path = [self.s_goal]
s = self.s_goal
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s

while True:
s = self.PARENT[s]
path.append(s)
if s == self.s_start:
break
# best first set the heuristics as the priority
heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))

return list(path)
return self.extract_path(self.PARENT), self.CLOSED


def main():
s_start = (5, 5)
s_goal = (45, 25)

BF = BestFirst(s_start, s_goal)
BF = BestFirst(s_start, s_goal, 'euclidean')
plot = plotting.Plotting(s_start, s_goal)

path, visited = BF.searching()
Expand Down
100 changes: 15 additions & 85 deletions Search_based_Planning/Search_2D/Dijkstra.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,25 @@
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")

from Search_based_Planning.Search_2D import plotting, env
from Search_2D import plotting, env

from Search_2D.Astar import AStar

class Dijkstra:
def __init__(self, s_start, s_goal):
self.s_start = s_start
self.s_goal = s_goal

self.Env = env.Env()
self.plotting = plotting.Plotting(self.s_start, self.s_goal)

self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles

self.OPEN = [] # priority queue / OPEN set
self.CLOSED = [] # closed set & visited
self.PARENT = dict() # record parent
self.g = dict() # Cost to come

class Dijkstra(AStar):
"""Dijkstra set the cost as the priority
"""
def searching(self):
"""
Dijkstra Searching.
Breadth-first Searching.
:return: path, visited order
"""

self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN, (0, self.s_start))
heapq.heappush(self.OPEN,
(0, self.s_start))

while self.OPEN:
_, s = heapq.heappop(self.OPEN)
Expand All @@ -50,85 +40,25 @@ def searching(self):

for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)

if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]:

if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
heapq.heappush(self.OPEN, (new_cost, s_n))
self.PARENT[s_n] = s

return self.extract_path(), self.CLOSED

def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""

return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]

def extract_path(self):
"""
Extract the path based on PARENT set.
:return: The planning path
"""

path = [self.s_goal]
s = self.s_goal

while True:
s = self.PARENT[s]
path.append(s)

if s == self.s_start:
break

return list(path)

def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
"""

if self.is_collision(s_start, s_goal):
return math.inf

return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])

def is_collision(self, s_start, s_end):
"""
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
"""

if s_start in self.obs or s_end in self.obs:
return True

if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))

if s1 in self.obs or s2 in self.obs:
return True
# best first set the heuristics as the priority
heapq.heappush(self.OPEN, (new_cost, s_n))

return False
return self.extract_path(self.PARENT), self.CLOSED


def main():
s_start = (5, 5)
s_goal = (45, 25)

dijkstra = Dijkstra(s_start, s_goal)
dijkstra = Dijkstra(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)

path, visited = dijkstra.searching()
Expand Down
Loading

0 comments on commit a53646b

Please sign in to comment.