赞
踩
转载请注明出处,谢谢!
花了几天时间学习了下 D* 算法,只能说熟悉了一下流程,远不能说掌握,算法实在是非常巧妙
《制造车间无人搬运系统调度方法研究》
《基于D*Lite算法的移动机器人路径规划研究》
https://github.com/zhm-real/PathPlanning
这几天的教训告诉我阅读原文真的很重要,第一遍读很难受,看了很多乱七八糟的中文论文以及博客
有用倒确实有些用,后面再读论文时就没有起初那么抗拒,能读得下去
大概就是:原论文 → 中文论文+博客 → 原论文 → 论文 + 代码 → 论文 + 博客
这么反复折腾几遍,再跟着实际 PPT 内容推着走一遍,总归能掌握一些吧
制造车间那篇论文是 18 年大连理工的,对 D* 介绍的部分基本就是原文翻译,可以对着原文一起读
下面对原文中重要的部分进行摘录和翻译,有助于后面对算法的理解
Like A*, D* maintains an OPEN list of states.
The OPEN list is used to propagate information about changes to the arc cost function and to calculate path costs to states in the space.Every state X has an associated tag t ( X ) , such that t(X) = NEW if X has never been on the OPEN list, t(X) = OPEN if X is currently on the OPEN list, and t(X) = CLOSED if X is no longer on the OPEN list.


For each state X , D* maintains an estimate of
the sum of the arc costsfrom X to G given by the path cost functionh(G, X ). Given the proper conditions, this estimate is equivalent to the optimal (minimal) cost from state X to G, given by the implicit function o(G, X) . For each state X on the OPEN list (i.e., r(X) = OPEN), the key function,k(G, X), is defined to be equal to the minimum of h(G, X) before modification and all values assumed by h(G, X) since X was placed on the OPEN list.

The key function classifies a state X on the OPEN list into one of two types: a
RAISEstate ifk(G, X) < h(G, X ), and aLOWERState ifk(G, X) = h(G, X). D uses RAISE states on the OPEN list to propagate information about path cost increases (e.g., due to an increased arc cost) and LOWER states to propagate information about path cost reductions (e.g., due to a reduced arc cost or new path to the goal).*

The propagation takes place
through the repeated removal of states from the OPEN list. Each time a state is removed from the list, it is expanded to pass cost changes to its neighbors.These neighbors are in turn placed on the OPEN list to continue the process.
传播通过从OPEN列表中反复删除状态来进行。每次从列表中删除一个状态时,它都会展开以将成本更改传递给它的邻居。这些邻居依次被加入OPEN列表,以继续这一过程
States on the OPEN list are sorted by their key function value. The parameter kmin is defined to be min(k(X)) for all X such that t(X) = OPEN.
The parameter kmin represents an important threshold in D*: path costs less than or equal to kmin are optimal, and those greater than kmin may not be optimal. The parameter kold is defined to be equal to kmin prior to most recent removal of a state from the OPEN list. If no states have been removed, kold is undefined.

The D* algorithm consists primarily of two functions:
PROCESS - STATEis used to compute optimal path costs to the goal, andMODIFY - COSTis used to change the arc cost function c(O) and enter affected states on the OPEN list. Initially, t(O) is set to NEW for all states, h(G) is set to zero, and G is placed on the OPEN list. The first function, PROCESS- STATE, is repeatedly called until the robot’s state, X , is removed from the OPEN list (i.e., t(X) = CLOSED) or a value of -1 is returned, at which point either the sequence { X } has been computed or does not exist respectively. The robot then proceeds to follow the backpointers in the sequence { X )until it either reaches the goal or discovers an error in the arc cost function c(") (e.g., due to a detected obstacle). The second function, MUDIFY- COST, is immediately called to correct c(O) and place affected states on the OPEN list. Let Y be the robot’s state at which it discovers an error in c(O). By calling PROCESS - STATE until it returnskmin ≥ h(Y), the cost changes are propagated to state Y such that h(Y) = o(Y). At this point, a possibly new sequence { Y) has been constructed, and the robot continues to follow the backpointers in the sequence toward the goal.

The role of RAISE ans LOWER states is central to the operation of the algorithm.
The RAISE states (i.e., k(X) < h(X)) propagate cost increases, and the LOWER states (i.e., k(x) = h(X)) propagate cost reductions.When the cost of traversing an arc is increased, an affected neighbor state is placed on the OPEN list, and the cost increase is propagated via RAISE states through all state sequences containing the arc. As the RAISE states come in contact with neighboring states of lower cost,these LOWER states are placed on the OPEN list, and they subsequently decrease the cost of previously raised states wherever possible. If the cost of traversing an arc is decreased, the reduction is propagated via LOWER states through all state sequences containing the arc, as well as neighboring states whose cost can also be lowered.
RAISE和LOWER状态的作用对算法的操作至关重要。RAISE状态(即k(X) <h(X))传播成本增加,而LOWER状态(即k(X) = h(X))传播成本降低。当遍历一个弧线的开销增加时,受影响的邻居状态被放到OPEN列表中,开销的增加通过RAISE状态传播到包含该弧线的所有状态序列。当RAISE状态与相邻的低成本状态接触时,这些lower状态被放在OPEN列表中,它们随后尽可能地降低先前升高状态的成本。如果穿越一条弧线的成本降低了,那么这种降低将通过LOWER状态传播到包含该弧线的所有状态序列,以及成本也可以降低的相邻状态。
原话:The name of the algorithm, D*, was chosen because it resembles A*
解释:就是觉着像 A* 干脆起名 D*,说是Dynamic A* 不太严谨
1、有向图:D* 算法以有向图作为前提,即在A、B节点间,“A到B” 与 “B到A” 表示两个弧,因此对应的权重不一定一样
2、X、Y 、Z、S、G等:称为节点,也称为state,可理解为一个坐标position,其中S表示Start,G表示Goal
3、{S,……,G}:表示一条路径,该路径以S为起点,G为终点,简记为{S}
4、c(X, Y):表示“Y->X”节点的成本/权重(cost),具体计算逻辑如下
c(X, Y) == c(Y, X)5、b(X) = Y: 用来记录反向指针(backpointer),即X->Y,当处于X位置时可以根据b(X)获得下一个要到达的节点Y,以使得路径最优
6、t(X):节点的访问标记位(tag),支持3个类型:NEW、OPEN、CLOSED
NEW:节点从未访问过,最初的设置,可能存在部分节点永远处于改状态OPEN:节点正在访问中,存储在OPEN_LIST(使用优先队列PriorityQueue实现)中,也称为OPEN_QUEUE,以待访问扩展CLOSED:节点已经被访问过,从优先队列中移除,并完成了相邻节点的扩展,一般找到了到达重点G的最优路径7、h(X):用来存储路径代价,指从X到达终点G的路径({X,……G},简记为{X})代价,不一定是全局最优,第一次搜索到起点时时,所有点的h会被更新,计算方式同Dijkstra算法,是用相邻两点的代价+上一个点的代价累加得到
8、k(X):用来记录自X节点被加入到OPEN_LIST中后的最小h(X)值(具体计算方式由Insert函数决定),也是优先队列OPEN_LIST的排序依据,k将会保持到最小,它表示了本点在全图环境中到G点的最小代价
反向构建路径的方法(以G为起点的Dijkstra算法),在PROCESS_STATE()函数的部分分支逻辑中实现这里的伪代码实际上就是上面拆解后的过程
//S is the start State //G is the goal State h(G) = 0; put_into_OPEN_QUEUE(G); //将G加入到OPEN_QUEUE/OPEN_LIST中 do { k_min = PROCESS_STATE(); //主要是减少cost,寻找最优路径 }while(k_min != -1 && S_not_removed_from_open_list); if(k_min == -1){ G_unreachable; exit; }else{ do{ do{ trace_optimal_path(); //沿着最优路径行驶{S, ……, G},根据具体情形来实现 }while(G_is_not_reached && map == environment); //map != environment 遇到了异常,塌陷、障碍物等 if(G_is_reached){ //到达终点 exit; }else{ X = state_of_discrepancy_reached_trying_to_move_from_some_state_Y; MODIFY_COST(X, Y, new_c(X, Y)); do{ k_min = PROCESS_STATE(); //减少cost 和 异常信息传播 }while(k_min != -1 && k_min<h(Y)); //k_min>=h(Y)时表示已经找到了最优路径,不可能有更优 if(k_min == -1){ exit; } } }while(1); }

PROCESS_STATE()主要解决两大问题:
1、计算最优路径(传播cost减少)
2、传播异常信息(传播cost增加)
以上两种情况分别用k(X)与h(X)的关系来进行控制
假设X是从优先队列OPEN_QUEUE中弹出来的最优节点
前序必要准备工作已经完成:
具体伪代码如下:
// PROCESS_STATE() X = MIN_STATE() if(X==NULL){ return -1 } k_old = GET_KMIN(); DELETE(X); if( k_old < h(X) ){ for( each neighbor Y of X ){ if( h(Y) <= k_old and h(X) > h(Y) + c(Y, X) ){ //尝试在现有路径条件下,先更新当前cost b(X) = Y; h(X) = h(Y) + c(Y, X); } } } if( k_old == h(X) ){ for( each neighbor Y of X ){ if( t(Y) == NEW or ( b(Y) == X and h(Y) != h(X)+c(X,Y) ) or ( b(Y) != X and h(Y) > h(X)+c(X,Y) ) ){ //正常扩展 b(Y) = X; INSERT( Y, h(X)+c(X,Y) ); } } }else{ //k_old < h(X) condition for( each neighbor Y of X){ if( t(Y) == NEW or ( b(Y) == X and h(Y) != h(X)+c(X,Y) ) ){ //传递异常信息c(X,Y)带来的变化 b(Y) == X; INSERT( Y, h(X)+c(X,Y) ) }else{ if( b(Y) != X and h(Y) > h(X) + c(X, Y) ){ //将X再次插入队列,从RAISE模式变成了LOWER模式来扩展修正Y INSERT( X, h(X) ); }else{ if( b(Y) != X and h(X) > h(Y)+c(Y,X) and t(Y) == CLOSED and h(Y) > k_old ){ //寻找次优解,新的路径 INSERT( Y, h(Y) ); } } } } } return GET_KMIN()
一些辅助函数,都是基于OPEN_QUEUE进行的操作:
// MIN_STATE() Return X if k(X) is minimum in the OPEN_QUEUE // GET_KMIN() Return the minimum value of k in the OPEN_QUEUE // DELETE(X) delete X state from the OPEN_QUEUE // INSERT(X, h_new) // 隐含着将X节点加入到OPEN_QUOPEN(如果不存在的的话) if( t(X) == NEW ){ k(X) = h_new; }else if( t(X) == OPEN ){ k(X) = min( k(X), h_new ); }else{ // t(X) == CLOSED k(X) = min( h(X), h_new ); //可使得X节点从RAISE进入LOWER模式 } h(X) = h_new; t(X) = OPEN; SORT_OPEN_QUEUE_BY_k() //对OPEN_QUEUE按k进行排序

LOWER,那么它的路径代价为最优的,因为 h(X)==koldLOWER 状态的传播。发生变化的 Y 节点会重新放入 openlist,再由这些节点传播路径成本的变化RAISE 状态,路径开销增大,h(X) 变为 inf,经过 X 的路径不是最优的,在节点 X 将这种路径开销的变化传播给其邻居节点前,看看能不能通过其邻居中已经处于最优开销(即 h(Y) < kold)得到节点来优化 X 的路径开销,如果存在,调整 b(X) = Y,此时已调整路径为最优(optimal)的RAISE 和 LOWER 两种状态,其实这里主要是 RAISE 状态的传播CLOSED 的次优邻居节点 Y 减小路径开销,则将 Y 重新放入 openlist 以优化 X代价的修改是在发现障碍物所处的 state 时进行的,假设机器人在 Y 正准备前往 X 之前(即Y->X链接关系),发现了 X 处突然塌陷或者出现了障碍物则进行 cost 的修正,其实主要就是修正 Y 处的 h 值,Y 处的 k 值暂时不发生变化
下面的伪代码和实际编程时有一些区别,到代码部分再说
// MODIFY_COST( X, Y, new_c(X,Y) )
c(X, Y) = new_c(X, Y); //做cost修正
if( t(X) == CLOSED ){
INSERT( X, h(X) ); //将X重新插入队列,X处于LOWER模式,进行扩展,进而引起Y的路径发生变动,Y变成RAISE模式
}
return GET_KMIN();
""" D_star 2D @author: huiming zhou """ import os import sys import math import matplotlib.pyplot as plt sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search_based_Planning/") from Search_2D import plotting, env class DStar: def __init__(self, s_start, s_goal): self.s_start, self.s_goal = s_start, s_goal self.Env = env.Env() self.Plot = plotting.Plotting(self.s_start, self.s_goal) self.u_set = self.Env.motions self.obs = self.Env.obs self.x = self.Env.x_range self.y = self.Env.y_range self.fig = plt.figure() self.OPEN = set() self.t = dict() self.PARENT = dict() self.h = dict() self.k = dict() self.path = [] self.visited = set() self.count = 0 def init(self): for i in range(self.Env.x_range): for j in range(self.Env.y_range): self.t[(i, j)] = 'NEW' self.k[(i, j)] = 0.0 self.h[(i, j)] = float("inf") self.PARENT[(i, j)] = None self.h[self.s_goal] = 0.0 def run(self, s_start, s_end): self.init() self.insert(s_end, 0) while True: self.process_state() if self.t[s_start] == 'CLOSED': break self.path = self.extract_path(s_start, s_end) self.Plot.plot_grid("Dynamic A* (D*)") self.plot_path(self.path) self.fig.canvas.mpl_connect('button_press_event', self.on_press) plt.show() def on_press(self, event): x, y = event.xdata, event.ydata if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1: print("Please choose right area!") else: x, y = int(x), int(y) if (x, y) not in self.obs: print("Add obstacle at: s =", x, ",", "y =", y) self.obs.add((x, y)) self.Plot.update_obs(self.obs) s = self.s_start self.visited = set() self.count += 1 while s != self.s_goal: if self.is_collision(s, self.PARENT[s]): self.modify(s) continue s = self.PARENT[s] self.path = self.extract_path(self.s_start, self.s_goal) plt.cla() self.Plot.plot_grid("Dynamic A* (D*)") self.plot_visited(self.visited) self.plot_path(self.path) self.fig.canvas.draw_idle() def extract_path(self, s_start, s_end): path = [s_start] s = s_start while True: s = self.PARENT[s] path.append(s) if s == s_end: return path def process_state(self): s = self.min_state() # get node in OPEN set with min k value self.visited.add(s) if s is None: return -1 # OPEN set is empty k_old = self.get_k_min() # record the min k value of this iteration (min path cost) self.delete(s) # move state s from OPEN set to CLOSED set # k_min < h[s] --> s: RAISE state (increased cost) if k_old < self.h[s]: for s_n in self.get_neighbor(s): if self.h[s_n] <= k_old and \ self.h[s] > self.h[s_n] + self.cost(s_n, s): # update h_value and choose parent self.PARENT[s] = s_n self.h[s] = self.h[s_n] + self.cost(s_n, s) # s: k_min >= h[s] -- > s: LOWER state (cost reductions) if k_old == self.h[s]: for s_n in self.get_neighbor(s): if self.t[s_n] == 'NEW' or \ (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \ (self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)): # Condition: # 1) t[s_n] == 'NEW': not visited # 2) s_n's parent: cost reduction # 3) s_n find a better parent self.PARENT[s_n] = s self.insert(s_n, self.h[s] + self.cost(s, s_n)) else: for s_n in self.get_neighbor(s): if self.t[s_n] == 'NEW' or \ (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)): # Condition: # 1) t[s_n] == 'NEW': not visited # 2) s_n's parent: cost reduction self.PARENT[s_n] = s self.insert(s_n, self.h[s] + self.cost(s, s_n)) else: if self.PARENT[s_n] != s and \ self.h[s_n] > self.h[s] + self.cost(s, s_n): # Condition: LOWER happened in OPEN set (s), s should be explored again self.insert(s, self.h[s]) else: if self.PARENT[s_n] != s and \ self.h[s] > self.h[s_n] + self.cost(s_n, s) and \ self.t[s_n] == 'CLOSED' and \ self.h[s_n] > k_old: # Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again self.insert(s_n, self.h[s_n]) return self.get_k_min() def min_state(self): """ choose the node with the minimum k value in OPEN set. :return: state """ if not self.OPEN: return None return min(self.OPEN, key=lambda x: self.k[x]) def get_k_min(self): """ calc the min k value for nodes in OPEN set. :return: k value """ if not self.OPEN: return -1 return min([self.k[x] for x in self.OPEN]) def insert(self, s, h_new): """ insert node into OPEN set. :param s: node :param h_new: new or better cost to come value """ if self.t[s] == 'NEW': self.k[s] = h_new elif self.t[s] == 'OPEN': self.k[s] = min(self.k[s], h_new) elif self.t[s] == 'CLOSED': self.k[s] = min(self.h[s], h_new) self.h[s] = h_new self.t[s] = 'OPEN' self.OPEN.add(s) def delete(self, s): """ delete: move state s from OPEN set to CLOSED set. :param s: state should be deleted """ if self.t[s] == 'OPEN': self.t[s] = 'CLOSED' self.OPEN.remove(s) def modify(self, s): """ start processing from state s. :param s: is a node whose status is RAISE or LOWER. """ self.modify_cost(s) while True: k_min = self.process_state() if k_min >= self.h[s]: break def modify_cost(self, s): # if node in CLOSED set, put it into OPEN set. # Since cost may be changed between s - s.parent, calc cost(s, s.p) again if self.t[s] == 'CLOSED': self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s])) def get_neighbor(self, s): nei_list = set() for u in self.u_set: s_next = tuple([s[i] + u[i] for i in range(2)]) if s_next not in self.obs: nei_list.add(s_next) return nei_list def cost(self, s_start, s_goal): """ Calculate Cost for this motion :param s_start: starting node :param s_goal: end node :return: Cost for this motion :note: Cost function could be more complicate! """ if self.is_collision(s_start, s_goal): return float("inf") return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) def is_collision(self, s_start, s_end): if s_start in self.obs or s_end in self.obs: return True if s_start[0] != s_end[0] and s_start[1] != s_end[1]: if s_end[0] - s_start[0] == s_start[1] - s_end[1]: s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) else: s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) if s1 in self.obs or s2 in self.obs: return True return False def plot_path(self, path): px = [x[0] for x in path] py = [x[1] for x in path] plt.plot(px, py, linewidth=2) plt.plot(self.s_start[0], self.s_start[1], "bs") plt.plot(self.s_goal[0], self.s_goal[1], "gs") def plot_visited(self, visited): color = ['gainsboro', 'lightgray', 'silver', 'darkgray', 'bisque', 'navajowhite', 'moccasin', 'wheat', 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue'] if self.count >= len(color) - 1: self.count = 0 for x in visited: plt.plot(x[0], x[1], marker='s', color=color[self.count]) def main(): s_start = (5, 5) s_goal = (45, 25) dstar = DStar(s_start, s_goal) dstar.run(s_start, s_goal) if __name__ == '__main__': main()

在 main() 函数中首先调用 DStar() 函数构建环境地图
dstar = DStar(s_start, s_goal) def __init__(self, s_start, s_goal): self.s_start, self.s_goal = s_start, s_goal self.Env = env.Env() self.Plot = plotting.Plotting(self.s_start, self.s_goal) self.u_set = self.Env.motions self.obs = self.Env.obs self.x = self.Env.x_range self.y = self.Env.y_range self.fig = plt.figure() self.OPEN = set() self.t = dict() self.PARENT = dict() self.h = dict() self.k = dict() self.path = [] self.visited = set() self.count = 0
然后直接调用 run() 函数模拟 D* 算法的流程,函数代码和上文的主逻辑伪代码是一致的
dstar.run(s_start, s_goal) def run(self, s_start, s_end): self.init() self.insert(s_end, 0) while True: self.process_state() if self.t[s_start] == 'CLOSED': break self.path = self.extract_path(s_start, s_end) self.Plot.plot_grid("Dynamic A* (D*)") self.plot_path(self.path) self.fig.canvas.mpl_connect('button_press_event', self.on_press) plt.show()
init() 函数初始化地图信息s_endprocess_state() 函数直到找到起点extract_path() 函数提取路径信息,并调用 plot_grid() 和 plot_path() 函数绘图on_press() 函数模拟二次遍历过程中路径点前方突然出现障碍物precess_state() 函数是最关键的函数,其实现和论文中的伪代码完全一致
def process_state(self): s = self.min_state() # get node in OPEN set with min k value self.visited.add(s) if s is None: return -1 # OPEN set is empty k_old = self.get_k_min() # record the min k value of this iteration (min path cost) self.delete(s) # move state s from OPEN set to CLOSED set # k_min < h[s] --> s: RAISE state (increased cost) if k_old < self.h[s]: for s_n in self.get_neighbor(s): if self.h[s_n] <= k_old and \ self.h[s] > self.h[s_n] + self.cost(s_n, s): # update h_value and choose parent self.PARENT[s] = s_n self.h[s] = self.h[s_n] + self.cost(s_n, s) # s: k_min >= h[s] -- > s: LOWER state (cost reductions) if k_old == self.h[s]: for s_n in self.get_neighbor(s): if self.t[s_n] == 'NEW' or \ (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \ (self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)): # Condition: # 1) t[s_n] == 'NEW': not visited # 2) s_n's parent: cost reduction # 3) s_n find a better parent self.PARENT[s_n] = s self.insert(s_n, self.h[s] + self.cost(s, s_n)) else: for s_n in self.get_neighbor(s): if self.t[s_n] == 'NEW' or \ (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)): # Condition: # 1) t[s_n] == 'NEW': not visited # 2) s_n's parent: cost reduction self.PARENT[s_n] = s self.insert(s_n, self.h[s] + self.cost(s, s_n)) else: if self.PARENT[s_n] != s and \ self.h[s_n] > self.h[s] + self.cost(s, s_n): # Condition: LOWER happened in OPEN set (s), s should be explored again self.insert(s, self.h[s]) else: if self.PARENT[s_n] != s and \ self.h[s] > self.h[s_n] + self.cost(s_n, s) and \ self.t[s_n] == 'CLOSED' and \ self.h[s_n] > k_old: # Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again self.insert(s_n, self.h[s_n]) return self.get_k_min()
on_press() 函数是模拟路径重规划的关键函数
def on_press(self, event): x, y = event.xdata, event.ydata if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1: print("Please choose right area!") else: x, y = int(x), int(y) if (x, y) not in self.obs: print("Add obstacle at: s =", x, ",", "y =", y) self.obs.add((x, y)) self.Plot.update_obs(self.obs) s = self.s_start self.visited = set() self.count += 1 while s != self.s_goal: if self.is_collision(s, self.PARENT[s]): self.modify(s) continue s = self.PARENT[s] self.path = self.extract_path(self.s_start, self.s_goal) plt.cla() self.Plot.plot_grid("Dynamic A* (D*)") self.plot_visited(self.visited) self.plot_path(self.path) self.fig.canvas.draw_idle()
手动在路径上放置障碍物,然后先 modify(s) ,然后再反复调用 precess_state() 函数进行重规划
while s != self.s_goal:
if self.is_collision(s, self.PARENT[s]):
self.modify(s)
continue
s = self.PARENT[s]
def modify(self, s): """ start processing from state s. :param s: is a node whose status is RAISE or LOWER. """ self.modify_cost(s) while True: k_min = self.process_state() if k_min >= self.h[s]: break def modify_cost(self, s): # if node in CLOSED set, put it into OPEN set. # Since cost may be changed between s - s.parent, calc cost(s, s.p) again if self.t[s] == 'CLOSED': self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))
从代码中可以看出,二次遍历中走到路径点 Y,发现下一路径点 X 是障碍物,则对 Y 处的路径开销进行修正,h(Y) = h(X) + c(X,Y),此时 Y 变为 RAISE 状态,将其加入 openlist 以传播该状态
另外代码中在轨迹重规划绘图时加了一些小细节,体现在 plot_visited() 函数上,会根据已添加障碍物的个数,然后给本次重规划时访问过的节点上色
s = self.s_start self.visited = set() self.count += 1 self.Plot.plot_grid("Dynamic A* (D*)") self.plot_visited(self.visited) self.plot_path(self.path) def plot_visited(self, visited): color = ['gainsboro', 'lightgray', 'silver', 'darkgray', 'bisque', 'navajowhite', 'moccasin', 'wheat', 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue'] if self.count >= len(color) - 1: self.count = 0 for x in visited: plt.plot(x[0], x[1], marker='s', color=color[self.count])
光是看讲解是体会不到算法的精妙的,结合实例看下吧
再贴一下 PROCESS-STATE() 的伪代码

1、下面是一开始的地图

2、将终点加入 openlist,开始一次遍历,该过程可以说是反向 Dijkstra


这个过程就是反复执行伪代码中的 L8 ~ L13,直到起点也被加入 openlist
有一点需要注意,一次遍历后,所有访问过的节点都保存了到终点的路径以及距离信息,相当于建立了一个路径场,这在遇到障碍物后的二次规划时是很重要的,能够减少运算量
3、从起点开始二次遍历,遍历过程中出现新的障碍物

此时立刻调用 MODIFY_COST() 函数更改 (3,2) 节点的路径开销,更改如下

此时的 openlist 如下
| state | k |
|---|---|
| (3,2) | 5.6 |
| (1,2) | 7.6 |
| (1,3) | 8.0 |
| (1,1) | 8.0 |
4、反复调用 PROCESS_STATE() 函数重新规划路径

此时 (3,2) 节点处于 RAISE 状态,在传播状态变化之前,先看能不能通过其邻居中已经处于最优开销(即 h(Y) < kold)得到节点来优化 X 的路径开销(对应 L4 ~ L7),显然不存在满足条件的邻居
那么此时就将 (3,2) 节点的状态变化进行传播,对应 L15 ~ L18,将其子节点(将 b(Y) = X 中的 Y 定义为子节点)的开销扩大,子节点有 (2,1)、(2,2)、(3,1),将这些节点都加入 openlist 中
此时节点 (4,1) 有可能减小节点 (3,2) 的路径开销,对应 L23 ~ L25,将其加入 openlist 中以优化 (3,2) 节点
此时发生变化的节点如下图

此时的 openlist 如下
| (4,1) | 6.2 |
|---|---|
| (2,2) | 6.6 |
| (3,1) | 6.6 |
| (2,1) | 7.0 |
| (1,2) | 7.6 |
| (1,3) | 8.0 |
此时 h([3,2]) = 10000,kmin = 6.2,不满足 kmin ≥ h[s],继续 process_state()
将 (4,1) 节点弹出 openlist,该节点处于 LOWER 状态,那么则将该 LOWER 状态传播,对应 L8 ~ L13,调整其邻居节点 (3,1)、(3,2) 的路径开销,并加入 openlist

注意到节点 (3,2) 经过调整已经处于 LOWER 状态,将其将入 openlist 可以继续传播该状态
此时对于下面这句话应该有更深的认识
The propagation takes place
through the repeated removal of states from the OPEN list. Each time a state is removed from the list, it is expanded to pass cost changes to its neighbors.These neighbors are in turn placed on the OPEN list to continue the process.
此时发生变化的节点如下

此时的 openlist 如下
| (3,1) | 6.6 |
|---|---|
| (2,2) | 6.6 |
| (2,1) | 7.0 |
| (1,2) | 7.6 |
| (3,2) | 7.6 |
| (1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 6.6,不满足 kmin ≥ h[s],继续 process_state()
将 (3,1) 节点弹出 openlist,该节点此时处于 RAISE 状态,在传播 RAISE 状态前,依然会先执行 L4 ~ L7,依旧没没有满足条件的邻居节点
但是 (3,1) 节点能够减少其邻居节点的路径开销,对应 L20 ~ L21,所以将其重新加入 openlist,注意每次加入时都会更新一下节点的 k 值和 h 值,贴一下加入的代码
def insert(self, s, h_new): """ insert node into OPEN set. :param s: node :param h_new: new or better cost to come value """ if self.t[s] == 'NEW': self.k[s] = h_new elif self.t[s] == 'OPEN': self.k[s] = min(self.k[s], h_new) elif self.t[s] == 'CLOSED': self.k[s] = min(self.h[s], h_new) self.h[s] = h_new self.t[s] = 'OPEN' self.OPEN.add(s)
此时 (3,1) 节点已经处于 LOWER 状态,将其加入 openlist 可以传播该状态
此时发生变化的节点如下

此时的 openlist 如下
| (2,2) | 6.6 |
|---|---|
| (2,1) | 7.0 |
| (3,1) | 7.2 |
| (1,2) | 7.6 |
| (3,2) | 7.6 |
| (1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 6.6,不满足 kmin ≥ h[s],继续 process_state()
将 (2,2) 节点弹出 openlist,按照 process_state() 走一遍没有发生变化的节点

此时的 openlist 如下
| (2,1) | 7.0 |
|---|---|
| (3,1) | 7.2 |
| (1,2) | 7.6 |
| (3,2) | 7.6 |
| (1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 7.0,不满足 kmin ≥ h[s],继续 process_state()
将 (2,1) 节点弹出 openlist,按照 process_state() 走一遍同样没有发生变化的节点

此时的 openlist 如下
| (3,1) | 7.2 |
|---|---|
| (1,2) | 7.6 |
| (3,2) | 7.6 |
| (1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 7.2,不满足 kmin ≥ h[s],继续 process_state()
将 (3,1) 节点弹出 openlist,该节点此时处于 LOWER 状态,将该状态传播,对应 L4 ~ L7,调整其邻居节点的开销,经过调整,其邻居节点 (2,1)、(2,2) 都由 RAISE 状态转变为 LOWER 状态

此时发生变化的节点如下

此时的 openlist 如下
| (1,2) | 7.6 |
|---|---|
| (3,2) | 7.6 |
| (1,3) | 8.0 |
| (1,1) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 7.6,满足 kmin ≥ h[s],说明已经规划出了一条合适的路径

机器人从当前位置继续向终点行进

至此,轨迹重规划结束,相信经过上面的推演,对原文中的下述表达有更深刻的认识
The role of RAISE ans LOWER states is central to the operation of the algorithm.
The RAISE states (i.e., k(X) < h(X)) propagate cost increases, and the LOWER states (i.e., k(x) = h(X)) propagate cost reductions.When the cost of traversing an arc is increased, an affected neighbor state is placed on the OPEN list, and the cost increase is propagated via RAISE states through all state sequences containing the arc. As the RAISE states come in contact with neighboring states of lower cost,these LOWER states are placed on the OPEN list, and they subsequently decrease the cost of previously raised states wherever possible. If the cost of traversing an arc is decreased, the reduction is propagated via LOWER states through all state sequences containing the arc, as well as neighboring states whose cost can also be lowered.
D* 算法先采用 Dijkstra 算法从目标点 G 向起始点 S 反向搜索,在搜索过程中访问过的每个节点到终点 G 的路径信息以及距离信息,构造出一个路径场,在遇到临时障碍物时,路径场信息能够避免不必要的重新规划路径带来的庞大运算量
RAISE和LOWER状态的作用对算法的操作至关重要。RAISE状态(即k(X) <h(X))传播成本增加,而LOWER状态(即k(X) = h(X))传播成本降低。当遍历一个节点的开销增加时,受影响的邻居状态被放到OPEN列表中,开销的增加通过RAISE状态传播到其相邻节点。当RAISE状态与相邻的LOWER状态节点接触时,这些LOWER状态节点被加入OPEN列表中,它们随后尽可能地降低之前RAISE状态节点的路径开销。如果某一节点的路径开销降低了,那么这种降低将通过LOWER状态传播到该节点的所有相邻节点
状态的传播通过从OPEN列表中反复删除状态来进行。每次从列表中删除一个状态时,它都会展开以将成本更改传递给它的邻居。这些邻居依次被加入OPEN列表,以继续这一过程
通过 h(X) 和 k(X) 的大小关系切换节点的 RAISE 和 LOWER 状态,通过状态切换和状态传播实现轨迹的重规划
适用于动态环境(/路网)中的路径/运动规划(motion planning),与Dijkstra、A* 形成对比(两者适用于静态环境/路网)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。