WorkItem 定义了一个task,他的返回类型是一个Result, 而Result只有两个变量,即,是否优化
1 2 3 4 5 6 7 8 9 10 11 12 struct WorkItem { enum class Result { kDoNotRunOptimization, kRunOptimization, }; std::chrono::steady_clock::time_point time; std::function<Result ()> task; }; using WorkQueue = std::deque<WorkItem>;
WorkItem使用位置 在后端最主要的类中 PoseGraph2D 中,定义了一个指向WorkQueue 的一个指针。
任何需要被执行的任务都需要添加到这个队列当中,不会直接执行,会被调度执行
1 2 3 4 std::unique_ptr<WorkQueue> work_queue_ GUARDED_BY (work_queue_mutex_) ;
AddWorkItem 添加任务,等待被执行,如果队列还没有初始化,则初始化。然后把任务添加到workqueue 队列中
这里workqueue 和线程池是没有关系的,通过DrainWorkQueue 把他们和线程池进行相连
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void PoseGraph2D::AddWorkItem ( const std::function<WorkItem::Result()>& work_item) { absl::MutexLock locker (&work_queue_mutex_) ; if (work_queue_ == nullptr ) { work_queue_ = absl::make_unique<WorkQueue>(); auto task = absl::make_unique<common::Task>(); task->SetWorkItem ([this ]() { DrainWorkQueue (); }); thread_pool_->Schedule (std::move (task)); } const auto now = std::chrono::steady_clock::now (); work_queue_->push_back ({now, work_item}); kWorkQueueSizeMetric->Set (work_queue_->size ()); kWorkQueueDelayMetric->Set ( std::chrono::duration_cast<std::chrono::duration<double >>( now - work_queue_->front ().time) .count ()); }
DrainWorkQueue 他是在线程池里进行执行的
函数会一直取双端队列workqueue 的第一个函数,然后执行,一直执行到函数返回内容是kRunOptimization为止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void PoseGraph2D::DrainWorkQueue () { bool process_work_queue = true ; size_t work_queue_size; while (process_work_queue) { std::function<WorkItem::Result ()> work_item; { absl::MutexLock locker (&work_queue_mutex_) ; if (work_queue_->empty ()) { work_queue_.reset (); return ; } work_item = work_queue_->front ().task; work_queue_->pop_front (); work_queue_size = work_queue_->size (); kWorkQueueSizeMetric->Set (work_queue_size); } process_work_queue = work_item () == WorkItem::Result::kDoNotRunOptimization; } LOG (INFO) << "Remaining work items in queue: " << work_queue_size; constraint_builder_.WhenDone ( [this ](const constraints::ConstraintBuilder2D::Result& result) { HandleWorkQueue (result); }); }
那么,到底有什么函数是要在这里执行的呢,即保存在workqueue 中的
循环执行的分支的六个函数都是需要在这里被执行的
哪里添加workqueue数据 还有在AddWorkItem里面才有
1 2 work_queue_->push_back ({now, work_item});
那么哪里调用了AddworkItem
AddImuData 注意此函数里添加到WorkItem的返回值是 WorkItem::Result::kDoNotRunOptimization;
被调用处 于global_trajectory_data的AddImuData中,一份是给了前端一份是给了后端
1 2 3 4 5 6 7 8 9 10 11 void PoseGraph2D::AddImuData (const int trajectory_id, const sensor::ImuData& imu_data) { AddWorkItem ([=]() LOCKS_EXCLUDED (mutex_) { absl::MutexLock locker (&mutex_); if (CanAddWorkItemModifying (trajectory_id)) { optimization_problem_->AddImuData (trajectory_id, imu_data); } return WorkItem::Result::kDoNotRunOptimization; });
AddOdometryData 1 2 3 4 5 6 7 8 9 10 11 void PoseGraph2D::AddOdometryData (const int trajectory_id, const sensor::OdometryData& odometry_data) { AddWorkItem ([=]() LOCKS_EXCLUDED (mutex_) { absl::MutexLock locker (&mutex_); if (CanAddWorkItemModifying (trajectory_id)) { optimization_problem_->AddOdometryData (trajectory_id, odometry_data); } return WorkItem::Result::kDoNotRunOptimization; }); }
AddFixedFramePoseData 注意,在global_trajectory_builder中,GPS是只传入到后端了
1 2 3 4 5 6 7 8 9 10 11 12 13 void PoseGraph2D::AddFixedFramePoseData ( const int trajectory_id, const sensor::FixedFramePoseData& fixed_frame_pose_data) { AddWorkItem ([=]() LOCKS_EXCLUDED (mutex_) { absl::MutexLock locker (&mutex_); if (CanAddWorkItemModifying (trajectory_id)) { optimization_problem_->AddFixedFramePoseData (trajectory_id, fixed_frame_pose_data); } return WorkItem::Result::kDoNotRunOptimization; }); }
等等,形式都一样