当前位置:   article > 正文

LPA* 路径搜索算法介绍及完整代码_lpa*算法

lpa*算法

1.简介

LPA*算法(全称Lifelong Planning A*算法)是由Koenig和Likhachev在2004年提出的,是一种参考人工智能的增量搜索进行改进的A*算法。它被用来处理动态环境下从给定起始点到给定目标点的最短路径问题.

Koenig和Likhachev原论文

A*算法在此不再赘述, 请参考另一篇文章A* 路径搜索算法介绍及完整代码

2. 算法描述

网上有不少博客详细的介绍了论文中的符号和基本算法逻辑, 如:

LPA* 路径搜索算法介绍

终身规划A*算法(LPA*):Lifelong Planning A*

在此就不重复介绍论文中的内容, 只强调几个可能困惑的点:

1) 父节点和子节点

子节点 (successors): 节点s的后续节点集合,代表s可以到达的点.

父节点 (predecessors):节点s的前代节点,代表可以到达s的点.

在论文举例中, 由于是无向图, 所以都指向相邻节点.

2) 论文举例中的h (heuristic) 计算

论文中没有明确列出h值的计算, 从例子中可以看出移动时可以走斜线, 所以相邻点为八个方向:

[(0, 1), (0, -1), (1, 0), (-1, 0), (1, 1), (1, -1), (-1, 1), (-1, -1)]

h值的计算应该取x, y 差值的最大值

h = max(abs(a[0]-b[0]), abs(a[1]-b[1]))

3. 伪代码

4. Python代码实现

  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. from collections import OrderedDict
  4. class PriorityQueue:
  5. def __init__(self):
  6. self.queue = OrderedDict()
  7. def insert(self, vertex, key):
  8. self.queue[vertex] = key
  9. self.queue = OrderedDict(sorted(self.queue.items(), key=lambda kv: (kv[1][0], kv[1][1])))
  10. def popvertex(self):
  11. first_item = self.queue.popitem(last=False)
  12. return first_item[0]
  13. def topkey(self):
  14. if self.queue:
  15. item = next(iter(self.queue.items()))
  16. return item[1]
  17. else:
  18. return np.inf, np.inf
  19. def remove(self, k):
  20. if k in self.queue:
  21. print('(%s, %s) to remove from priority queue' % (k[0], k[1]))
  22. del self.queue[k]
  23. g = dict()
  24. rhs = dict()
  25. h = dict()
  26. U = PriorityQueue()
  27. # Eight Directions
  28. neighbor_direction = [(0, 1), (0, -1), (1, 0), (-1, 0), (1, 1), (1, -1), (-1, 1), (-1, -1)]
  29. def heuristic(a, b):
  30. return max(abs(a[0]-b[0]), abs(a[1]-b[1]))
  31. # # Four Directions
  32. # neighbor_direction = [(0, 1), (0, -1), (1, 0), (-1, 0)]
  33. # def heuristic(a, b):
  34. # return abs(a[0]-b[0]) + abs(a[1]-b[1])
  35. def get_neighbor(current):
  36. neighbors = list()
  37. for i, j in neighbor_direction:
  38. neighbor = current[0] + i, current[1] + j
  39. if 0 <= neighbor[0] < S.shape[0]:
  40. if 0 <= neighbor[1] < S.shape[1]:
  41. if S[neighbor[0]][neighbor[1]] == 1:
  42. # print('neighbor %s hit wall' % str(neighbor))
  43. continue
  44. else:
  45. # array bound y walls
  46. # print('neighbor %s hit y walls' % str(neighbor))
  47. continue
  48. else:
  49. # array bound x walls
  50. # print('neighbor %s hit x walls' % str(neighbor))
  51. continue
  52. neighbors.append(neighbor)
  53. return neighbors
  54. def update_vertex(s):
  55. print('---update_vertex for (%s, %s)' % (s[0], s[1]))
  56. if s != start:
  57. neighbors = get_neighbor(s)
  58. print('(%s, %s) get neighbors:' % (s[0], s[1]))
  59. print(neighbors)
  60. neighbor_g_c_s = list()
  61. for nb in neighbors:
  62. g_c = g[nb] + heuristic(nb, s)
  63. neighbor_g_c_s.append(g_c)
  64. rhs[s] = min(neighbor_g_c_s)
  65. print('neighbor g_c: %s' % neighbor_g_c_s)
  66. print(rhs[s])
  67. U.remove(s)
  68. if g[s] != rhs[s]:
  69. U.insert(s, calculate_key(s))
  70. def initialize():
  71. print('start initialize...')
  72. for i in range(S.shape[0]):
  73. for j in range(S.shape[1]):
  74. if S[(i, j)] != 1:
  75. g[(i, j)] = np.inf
  76. rhs[(i, j)] = np.inf
  77. rhs[start] = 0
  78. init_h()
  79. U.insert(start, (h[start], 0))
  80. print('Initial U:')
  81. print(U.queue)
  82. print('finish initialize...')
  83. def init_h():
  84. S_shape = S.shape
  85. for i in range(S_shape[0]):
  86. for j in range(S_shape[1]):
  87. if S[(i, j)] != 1:
  88. node = (i, j)
  89. h_calc = heuristic(node, goal)
  90. h[node] = h_calc
  91. def calculate_key(s):
  92. return min(g[s], rhs[s]) + h[s], min(g[s], rhs[s])
  93. def computer_shortest_path():
  94. computer_iteration = 1
  95. while U.topkey() < calculate_key(goal) or rhs[goal] != g[goal]:
  96. print('----------------Iteration #%d----------------' % computer_iteration)
  97. u = U.popvertex()
  98. print('top_vertex (%s, %s)' % (u[0], u[1]))
  99. if g[u] > rhs[u]:
  100. print('overconsistent as (%s, %s) g_value %s > rhs_value %s' % (u[0], u[1], g[u], rhs[u]))
  101. g[u] = rhs[u]
  102. print('set (%s, %s) g_value %s same as rhs_value %s' % (u[0], u[1], g[u], rhs[u]))
  103. neighbors = get_neighbor(u)
  104. for nb in neighbors:
  105. update_vertex(nb)
  106. else:
  107. g[u] = np.inf
  108. neighbors = get_neighbor(u)
  109. for nb in neighbors:
  110. update_vertex(nb)
  111. update_vertex(u)
  112. print(U.queue)
  113. computer_iteration += 1
  114. print('---exit computer shortest path as reach terminate condition---')
  115. def get_path_node():
  116. path_node = list()
  117. path_node.append(goal)
  118. current = goal
  119. while current != start:
  120. nbs = get_neighbor(current)
  121. nbs_rhs_dict = OrderedDict()
  122. for nb in nbs:
  123. nbs_rhs_dict[nb] = rhs[nb]
  124. sorted_nbs_rhs_dict = OrderedDict(sorted(nbs_rhs_dict.items(), key=lambda item: item[1]))
  125. trace_back_node_rhs = sorted_nbs_rhs_dict.popitem(last=False)
  126. trace_back_node = trace_back_node_rhs[0]
  127. path_node.append(trace_back_node)
  128. current = trace_back_node
  129. return path_node
  130. if __name__ == '__main__':
  131. # S = np.array([
  132. # [1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1],
  133. # [0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0],
  134. # [1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0],
  135. # [0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1],
  136. # [0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0],
  137. # [0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1],
  138. # [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 0],
  139. # [0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1],
  140. # [0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1],
  141. # [1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0],
  142. # [1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0],
  143. # [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1],
  144. # [1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0],
  145. # [0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0],
  146. # [1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 0],
  147. # ])
  148. #
  149. # start = (7, 3)
  150. # goal = (7, 17)
  151. # S[(7, 12)] = 1
  152. # S[(7, 13)] = 1
  153. # S[(7, 16)] = 1
  154. S = np.array([
  155. [0, 0, 0, 0],
  156. [1, 0, 1, 0],
  157. [1, 0, 1, 0],
  158. [1, 0, 1, 0],
  159. [1, 0, 1, 0],
  160. [0, 0, 0, 0],
  161. ])
  162. start = (0, 3)
  163. goal = (5, 0)
  164. # # # test if there is no route
  165. # # S[(4, 1)] = 1
  166. # # S[(5, 1)] = 1
  167. # below start run
  168. initialize()
  169. computer_shortest_path()
  170. # # --------------------------------------------
  171. # # this section to test if the environment change
  172. # print()
  173. # print()
  174. # print('--------after update:')
  175. #
  176. # # change_point = (3, 1)
  177. # change_point = (7, 12)
  178. # S[change_point] = 1
  179. # g[change_point] = np.inf
  180. # update_vertex(change_point)
  181. #
  182. # print(U.queue)
  183. # computer_shortest_path()
  184. # # ----------------------------------------------
  185. if rhs[goal] == np.inf:
  186. path_exist = False
  187. else:
  188. path_exist = True
  189. if path_exist:
  190. print()
  191. route = get_path_node()
  192. print('route:')
  193. print(route)
  194. plot_map_and_path = True
  195. if plot_map_and_path:
  196. # plot map and path
  197. fig, ax = plt.subplots(figsize=(20, 20))
  198. ax.imshow(S, cmap=plt.cm.Dark2)
  199. ax.scatter(start[1], start[0], marker="o", color="red", s=200)
  200. ax.scatter(goal[1], goal[0], marker="*", color="green", s=200)
  201. if path_exist:
  202. # extract x and y coordinates from route list
  203. x_coords = []
  204. y_coords = []
  205. for k in (range(0, len(route))):
  206. x = route[k][0]
  207. y = route[k][1]
  208. x_coords.append(x)
  209. y_coords.append(y)
  210. ax.plot(y_coords, x_coords, color="black")
  211. ax.xaxis.set_ticks(np.arange(0, S.shape[1], 1))
  212. ax.yaxis.set_ticks(np.arange(0, S.shape[0], 1))
  213. ax.xaxis.tick_top()
  214. plt.grid()
  215. plt.show()

5. 运算结果

以下大约进行大约40多次的迭代

当如论文中所描述环境发生变化, 点(7, 12)有障碍, 如果重新计算大约需要53次迭代.

如果已经计算出环境改变之前的路径, 在此基础上继续计算改变的环境, 则需要大约28次迭代, 可以看出要比重新计算所需要的迭代次数更少.

代码中还包含另一个例子, 读者只需要comment掉上图的代码S矩阵, uncomment另一个S矩阵.

 同时代码提供了如果只允许向四个方向移动, 不允许斜线的相邻矩阵和h值计算.

6. 算法总结

LPA*算法提供了一种解决动态环境下的最短路径搜索方法,但是它针对的是起始点和目标点固定的情况,相比A*算法, 主要保留了能判断环境变化的rhs值.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/213934
推荐阅读
相关标签
  

闽ICP备14008679号