# astar.py
# Written by Erez Sh, 22/07/07
import bisect, math
class PriorityQueue(object):
"""A queue in which x is returned before y if f(x) > f(y).
add(item) -- O(log2 n)
remove(item) -- O(log2 n)
__contains__(item) -> bool -- O(log2 n)
pop() --> item -- O(1)
Erez Sh, 22/07/07
"""
def __init__(self, key=lambda x: x):
self.l = []
self.key = key
def add(self, item):
bisect.insort(self.l, (self.key(item), item))
def remove(self, item):
if len(self.l)==0:
raise ValueError()
prioritized_item = self.key(item), item
pos = bisect.bisect_left(self.l, prioritized_item)
if self.l[pos] == prioritized_item:
del self.l[pos]
else:
raise ValueError()
def __contains__(self, item):
if len(self.l)==0:
return False
prioritized_item = self.key(item), item
pos = bisect.bisect_left(self.l, prioritized_item)
return self.l[pos] == prioritized_item
def __len__(self):
return len(self.l)
def pop(self):
return self.l.pop()[1]
class AStar(object):
"""A* implementation.
On __init__ accepts node_map, starting_node and target_node,
where node_map must be an object that provides these methods:
* get_priority(node) -> int
* arrival_cost(node) -> int
* parent(node) -> node
* neighbors(node) -> node iterable
* movement_cost(from_node, to_node) -> int
* set_node(node, parent, cost)
To solve call step() repeatedly while it returns None.
Eventually it will return the path as a reversed list of nodes.
Erez Sh, 22/07/07
"""
INFINITE_COST = 1.0e400 # Taken from AIMA
def __init__(self, node_map, starting_node, target_node):
# 'open' queue is used to determine next node. We negate the priority so that smaller values will pop first
self.open = PriorityQueue( lambda node: -node_map.get_priority(node) )
node_map.set_node(starting_node, None, 0)
self.open.add(starting_node)
self.closed = set() # Used to determine where we've already visited
self.node_map = node_map
self.target_node = target_node
def step(self):
node_map = self.node_map
current = self.open.pop()
# If reached target, return found path
if current == self.target_node:
return self.get_path_to_node(current)
self.closed.add(current)
cur_cost = node_map.arrival_cost(current)
for neighbor in node_map.neighbors(current):
if neighbor in self.closed: # This may cause problems with bad heuristics
continue
new_cost = cur_cost + node_map.movement_cost(current, neighbor)
if neighbor not in self.open or new_cost < node_map.arrival_cost(neighbor):
# Remove works because we haven't made any changes to neighbor yet
try:
self.open.remove(neighbor)
except ValueError:
pass # Well, if it's not there, then no biggie
node_map.set_node(neighbor, current, new_cost)
# Add to queue for later inspection, node_map determines the priority for us
self.open.add(neighbor)
def get_path_to_node(self, node):
l = []
while self.node_map.parent(node) is not None: # Only starting-node has no parent
l.append(node)
node = self.node_map.parent(node)
return l