OrderedMultiQueue 类,位于cartographer/sensor/internal/ordered_muti_queue.h
维护排序后的传感器数据的多个队列, 并按合并排序的顺序进行调度,
它将等待为每个未完成的队列查看至少一个值, 然后再在所有队列中分派下一个按时间排序的值.
在OrderedMultiQueue(一)-cartographer中对简单的函数做了讲解,再次继续
OrderedMultiQueue
Dispatch
数据分发,即把队列中的数据放入回调函数进行处理,分发下去。
他其实是一个大的while(true)的死循环
1 | /** |
代码解析
首先给出一个例子
1 | queues_: |
前面是一个QueueKey(轨迹ID, topic名字)
后面代表此Key的队列,imu最快,在时间1,3,5处都有数据,其实是odom,在数据2,6处有数据
const在*的左边,表示指针指向的内容不可以变化,但是地址是可以变的,例子看const_ptr.cpp
1 | const Data* next_data = nullptr; |
for循环每一个队列,找到每一个队列中最老的数据,即peek()
如果该队列的peek数据为nullptr,且标记为finish了,说明队列已经结束,删除
1 | // Step: 1 遍历所有的数据队列, 找到所有数据队列的第一个数据中时间最老的一个数据 |
否则,该队列为空,但是又不是完成的状态, 就直接返回,并且标记阻塞.
表示消费者消耗 的太快了,把队列中的数据都用完了,直接返回。对本次dispatch结束,等一等数据的加入
1 | // 退出条件1: 某个话题的数据队列为空同时又不是完成状态, 就先退出, 发布log并标记为阻塞者 |
更新数据,第一次到达这里,记录此数据为next_data=data,即该队列中最早的那个数据, 并且记录对应的队列next_queue和QueueKey。
若是之后的队列最早的数据的时间,比next_data的更早,则next_data进行替换。
总之找出所有队列中,时间最早的队列和QueueKey
- next_data:所有队列中,时间最早的数据
1 | // 第一次进行到这里或者data的时间比next_data的时间小(老数据) |
time_point(duration::min()); 见time_test.cpp
判断找出来的最老的数据是不是比上次数据分发的时间要晚
1 | // 数据的时间戳不是按顺序的, 就报错 |
如果所有队列中最早的数据为空的话,说明所有数据全部分发完毕,结束
1 | if (next_data == nullptr) { |
// 如果我们还没有为这个轨迹分配任何数据, 快进这个轨迹的所有队列, 直到达到一个共同的开始时间
// Step: 2 获取对应轨迹id的所有数据队列中的最小共同时间戳, 作为轨迹开始的时间
1 | const common::Time common_start_time = |
// Step: 3 将 next_queue 的时间最老的一个数据传入回调函数进行处理,开始分发数据
next_data 的时间大于共同开始时间的:
更新last_dispatched_time_ ,发放数据(调用callback)
1
2
3
4
5
6
7if (next_data->GetTime() >= common_start_time) {
// Happy case, we are beyond the 'common_start_time' already.
// 更新分发数据的时间
last_dispatched_time_ = next_data->GetTime();
// 将数据传入 callback() 函数进行处理,并将这个数据从数据队列中删除
next_queue->callback(next_queue->queue.Pop());
}
时间小于共同时间的,并且当前队列的数据少于2个的
- 如果队列不是完成状态,调用CannotMakeProgress
- 否则直接处理,更新last_dispatched_time_ ,更新last_dispatched_time_ ,发放数据(调用callback)
时间小于共同时间的,并且当前队列还长着,
只处理比common_start_time 早一帧的数据,其他的数据不要
1
2
3
4
5
6std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
// 更新分发数据的时间,将数据传入 callback() 进行处理
last_dispatched_time_ = next_data->GetTime();
next_queue->callback(std::move(next_data_owner));
}next_data_owner 就是这一帧的数据,Pop出来之后,直接看栈顶的数据,有没有大于common_start_time。如果大于的画,则处理Pop出来的数据
CannotMakeProgress
- 做好此为阻塞队列的标记
- 发布log,若有其他队列长度大于kMaxQueueSize的topic数据,则发布log,等待这个队列为0的消息
1 | // 标记queue_key为阻塞者,并按条件发布log,等等这个数据 |
此错误,一个队列为0,另一个又太多
一般情况下,就是topic名字没有设置正确才有会有这样的情况
GetCommonStartTime
找到此轨迹的所有数据队列所有第一帧的最大时间(共同时间)
1 | /** |
代码解析
首先对common_start_time_per_trajectory_ 插入一个此轨迹的最小时间
只有第一次才可以插入成功,插入成功之后遍历所有的队列
找出所有此轨迹的队列,并且对比最老的数据,即时间最早的数据(Peek),
找出时间最大的第一帧数据