线程池一共有两个类组成,两者的联系于PoseGraph2D::AddWorkItem函数中,初始化双端队列,workqueue 的过程中
1 2 3 4 5 6 7 8 if (work_queue_ == nullptr ) { work_queue_ = absl::make_unique<WorkQueue>(); auto task = absl::make_unique<common::Task>(); task->SetWorkItem ([this ]() { DrainWorkQueue (); }); thread_pool_->Schedule (std::move (task)); }
线程池的初始化 在map_builder的构造函数中,根据配置文件进行初始化
1 2 MapBuilder::MapBuilder (const proto::MapBuilderOptions& options) : options_ (options), thread_pool_ (options.num_background_threads ())
Task Task类位于文件cartographer/common/task.h
中
成员变量 workitem 表示需要被执行的任务
1 2 // 需要执行的任务 WorkItem work_item_ GUARDED_BY(mutex_);
本任务的依赖的任务个数
1 2 unsigned int uncompleted_dependencies_ GUARDED_BY (mutex_) = 0 ;
被依赖的个数
1 2 std::set<Task*> dependent_tasks_ GUARDED_BY (mutex_) ;
Thread_pool 线程池接口头文件,有一个Schedule的纯虚函数,和两个函数Execute, SetThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ThreadPoolInterface { public : ThreadPoolInterface () {} virtual ~ThreadPoolInterface () {} virtual std::weak_ptr<Task> Schedule (std::unique_ptr<Task> task) = 0 ; protected : void Execute (Task* task) ; void SetThreadPool (Task* task) ; private : friend class Task ; virtual void NotifyDependenciesCompleted (Task* task) = 0 ; };
ThreadPool 实现了Schedule函数,还有一些其他的函数。
线程池放在了一个std::vector<std::thread> pool_ GUARDED_BY(mutex_);
中,这里放了好几个线程
接下来是两个队列,tasksnot_ready 这个map存的是Schedule调用完之后不能被直接执行的任务,因为他有可能有依赖的任务还没有执行完,得先要去执行依赖的任务
1 2 3 4 5 std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY (mutex_) ; absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_ GUARDED_BY (mutex_);
全部头文件代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class ThreadPool : public ThreadPoolInterface { public : explicit ThreadPool (int num_threads) ; ~ThreadPool (); ThreadPool (const ThreadPool&) = delete ; ThreadPool& operator =(const ThreadPool&) = delete ; std::weak_ptr<Task> Schedule (std::unique_ptr<Task> task) LOCKS_EXCLUDED (mutex_) override ; private : void DoWork () ; void NotifyDependenciesCompleted (Task* task) LOCKS_EXCLUDED (mutex_) override ; absl::Mutex mutex_; bool running_ GUARDED_BY (mutex_) = true ; std::vector<std::thread> pool_ GUARDED_BY (mutex_) ; std::deque<std::shared_ptr<Task>> task_queue_ GUARDED_BY (mutex_) ; absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_ GUARDED_BY (mutex_); };
构造函数,什么是线程池 线程池的构造函数,向线程池中推入num_threads个DoWork。默认是4个
1 2 3 4 5 6 7 8 ThreadPool::ThreadPool (int num_threads) { CHECK_GT (num_threads, 0 ) << "ThreadPool requires a positive num_threads!" ; absl::MutexLock locker (&mutex_) ; for (int i = 0 ; i != num_threads; ++i) { pool_.emplace_back ([this ]() { ThreadPool::DoWork (); }); } }
DoWork是一个循环,始终执行, 直到running_为false时停止执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void ThreadPool::DoWork () {#ifdef __linux__ CHECK_NE (nice (10 ), -1 ); #endif const auto predicate = [this ]() EXCLUSIVE_LOCKS_REQUIRED (mutex_) { return !task_queue_.empty () || !running_; }; for (;;) { std::shared_ptr<Task> task; { absl::MutexLock locker (&mutex_) ; mutex_.Await (absl::Condition (&predicate)); if (!task_queue_.empty ()) { task = std::move (task_queue_.front ()); task_queue_.pop_front (); } else if (!running_) { return ; } } CHECK (task); CHECK_EQ (task->GetState (), common::Task::DEPENDENCIES_COMPLETED); Execute (task.get ()); } }
Schedule执行 会直接把任务添加到tasksnot_ready 中,默认他是有依赖的,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 std::weak_ptr<Task> ThreadPool::Schedule (std::unique_ptr<Task> task) { std::shared_ptr<Task> shared_task; { absl::MutexLock locker (&mutex_) ; auto insert_result = tasks_not_ready_.insert (std::make_pair (task.get (), std::move (task))); CHECK (insert_result.second) << "Schedule called twice" ; shared_task = insert_result.first->second; } SetThreadPool (shared_task.get ()); return shared_task; }
SetThreadPool 连接线程池和任务,设置任务的状态从 NEW->DISPATCHED, 如果本任务依赖的任务都完成了,那么就进入DEPENDENCIES_COMPLETED状态,执行NotifyDependenciesCompleted
1 2 3 4 void ThreadPoolInterface::SetThreadPool (Task* task) { task->SetThreadPool (this ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void Task::SetThreadPool (ThreadPoolInterface* thread_pool) { absl::MutexLock locker (&mutex_) ; CHECK_EQ (state_, NEW); state_ = DISPATCHED; thread_pool_to_notify_ = thread_pool; if (uncompleted_dependencies_ == 0 ) { state_ = DEPENDENCIES_COMPLETED; CHECK (thread_pool_to_notify_); thread_pool_to_notify_->NotifyDependenciesCompleted (this ); } }
NotifyDependenciesCompleted 从tasksnot_ready 中找到任务,删除,添加到taskqueue 中进行执行。taskqueue 队列的任务会在四个线程中不同的调用执行
1 2 3 4 5 6 void NotifyDependenciesCompleted (Task* task) override { auto it = tasks_not_ready_.find (task); ASSERT_NE (it, tasks_not_ready_.end ()); task_queue_.push_back (it->second); tasks_not_ready_.erase (it); }
最后在线程中会调用Task::Execute函数
Execute 执行任务,设置任务状态为COMPLETED,结束后,循环依赖本任务的任务,将他们的未完成的依赖任务减一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void Task::Execute () { { absl::MutexLock locker (&mutex_) ; CHECK_EQ (state_, DEPENDENCIES_COMPLETED); state_ = RUNNING; } if (work_item_) { work_item_ (); } absl::MutexLock locker (&mutex_) ; state_ = COMPLETED; for (Task* dependent_task : dependent_tasks_) { dependent_task->OnDependenyCompleted (); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 void Task::OnDependenyCompleted () { absl::MutexLock locker (&mutex_) ; CHECK (state_ == NEW || state_ == DISPATCHED); --uncompleted_dependencies_; if (uncompleted_dependencies_ == 0 && state_ == DISPATCHED) { state_ = DEPENDENCIES_COMPLETED; CHECK (thread_pool_to_notify_); thread_pool_to_notify_->NotifyDependenciesCompleted (this ); } }
完整的流程图
上述的流程表示下图左半边的流程,不涉及到依赖的task。右半边的流程是先设置的依赖。
注意,第一个任务是DrainWorkQueue, 即初始化任务队列的时候添加的,它本身就是while循环,所以第一个线程就相当于是堵塞的,他就一直执行work_