-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathastar.py
More file actions
202 lines (160 loc) · 5.69 KB
/
astar.py
File metadata and controls
202 lines (160 loc) · 5.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import math
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from time import time
from PIL import Image, ImageDraw
from heapq import heappop, heappush
from map import Map
from Constraints import Constraint_step, Constraints
import typing as tp
def compute_cost(i1: int, j1: int, i2: int, j2: int) -> float:
'''
Computes cost of simple moves between cells
'''
if abs(i1 - i2) + abs(j1 - j2) == 1: # cardinal move
return 1
elif abs(i1 - i2) == 1 and abs(j1 - j2) == 1:
return np.sqrt(2)
elif i1 == i2 and j1 == j2:
return 0
else:
raise Exception(
'Trying to compute the cost of non-supported move! ONLY cardinal moves are supported.')
def distance(i1: int, j1: int, i2: int, j2: int) -> float:
line = max(abs(i1 - i2), abs(j1 - j2)) - min(abs(i1 - i2), abs(j1 - j2))
return line + min(abs(i1 - i2), abs(j1 - j2)) * np.sqrt(2)
def octile(i1: int, j1: int, i2: int, j2: int) -> float:
dx = abs(i2 - i1)
dy = abs(j2 - j1)
return abs(dx - dy) + np.sqrt(2) * min(dx, dy)
class Node:
'''
Node class represents a search node
- i, j: coordinates of corresponding grid element
- g: g-value of the node
- h: h-value of the node // always 0 for Dijkstra
- F: f-value of the node // always equal to g-value for Dijkstra
- parent: pointer to the parent-node
'''
def __init__(self, i, j, g=0, h=0, f=None, parent=None, tie_breaking_func=None):
self.i = i
self.j = j
self.g = g
self.h = h
self.time = 0
self.f = 0
if parent is not None:
self.time = parent.time + 1
self.f = self.time + h
# if f is None:
# self.f = self.g + h
# else:
# self.f = f
self.parent = parent
def __eq__(self, other):
'''
Estimating where the two search nodes are the same,
which is needed to detect dublicates in the search tree.
'''
return self.i == other.i and self.j == other.j and self.time == other.time
def __hash__(self):
'''
To implement CLOSED as set of nodes we need Node to be hashable.
'''
ijt = self.i, self.j, self.time
return hash(ijt)
def __lt__(self, other):
'''
Comparing the keys (i.e. the f-values) of two nodes,
which is needed to sort/extract the best element from OPEN.
This comparator is very basic. We will code a more plausible comparator further on.
'''
if self.f == other.f:
return self.g < other.g
return self.f < other.f
def __repr__(self) -> str:
return f"{self.i} {self.j} {self.time}"
class SearchTreePQS: # SearchTree which uses PriorityQueue for OPEN and set for CLOSED
def __init__(self):
self._open = []
self._closed = set() # list for the expanded nodes = CLOSED
def __len__(self):
return len(self._open) + len(self._closed)
'''
open_is_empty should inform whether the OPEN is exhausted or not.
In the former case the search main loop should be interrupted.
'''
def open_is_empty(self):
return not self._open
def add_to_open(self, item):
heappush(self._open, item)
def get_best_node_from_open(self):
while self._open:
item = heappop(self._open)
if not self.was_expanded(item):
return item
return None
def add_to_closed(self, item):
self._closed.add(item)
def was_expanded(self, item):
return item in self._closed
@property
def OPEN(self):
return self._open
@property
def CLOSED(self):
return self._closed
def astar(
grid_map: Map,
start_i: int,
start_j: int,
goal_i: int,
goal_j: int,
agent_index: int,
constraints: Constraints,
heuristic_func: tp.Callable,
search_tree: tp.Type[SearchTreePQS]):
ast = search_tree()
steps = 0
found = False
last = None
current_node = Node(start_i, start_j)
ast.add_to_open(current_node)
max_constraint_path = constraints.get_max_step(agent_index)
while not ast.open_is_empty():
current_node = ast.get_best_node_from_open()
if current_node is None:
break
steps += 1
if current_node.i == goal_i and current_node.j == goal_j:
if current_node.time > max_constraint_path:
found = True
last = current_node
break
# else: # what
# pass
for (i, j) in grid_map.get_neighbors(current_node.i, current_node.j):
new_node = Node(i, j, parent=current_node)
in_contraints = False
for node in constraints.get_constraints(agent_index, new_node.time):
if node.i == new_node.i and node.j == new_node.j:
in_contraints = True
break
if not in_contraints and not ast.was_expanded(new_node):
new_node.g = current_node.g + \
compute_cost(current_node.i, current_node.j, i, j)
new_node.h = heuristic_func(i, j, goal_i, goal_j)
new_node.time = current_node.time + 1
new_node.f = new_node.time + new_node.h
ast.add_to_open(new_node)
ast.add_to_closed(current_node)
nobodyRemembersThem = [last]
if found:
while True:
leftover = ast.get_best_node_from_open()
if last != leftover: break
assert last.f == leftover.f
nobodyRemembersThem.append(leftover)
return found, last, steps, nobodyRemembersThem