Apollo源码分析学习笔记

Apollo 源码剖析学习笔记

Apollo 项目介绍

Cyber RT 代码分析

cyber base

xz@xiaqiu:~/study/apollo/cyber/base$ tree
.
├── atomic_hash_map.h
├── atomic_hash_map_test.cc
├── atomic_rw_lock.h
├── atomic_rw_lock_test.cc
├── bounded_queue.h
├── bounded_queue_test.cc
├── build
├── BUILD
├── CMakeLists.txt
├── concurrent_object_pool.h
├── for_each.h
├── for_each_test.cc
├── macros.h
├── main_test.cc
├── object_pool.h
├── object_pool_test.cc
├── reentrant_rw_lock.h
├── rw_lock_guard.h
├── signal.h
├── signal_test.cc
├── thread_pool.h
├── thread_safe_queue.h
├── unbounded_queue.h
├── unbounded_queue_test.cc
└── wait_strategy.h

1 directory, 24 files

测试的CMakeLists.txt

cmake_minimum_required(VERSION 3.5)
project(CyberBase)

set(CMAKE_CXX_STANDARD 17) 
set(PROJECT_SOURCE_DIR "../../")

INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR})

message(${PROJECT_SOURCE_DIR})

add_executable(atomic_hash_map_test main_test.cc atomic_hash_map_test.cc)
target_link_libraries(atomic_hash_map_test -lgtest -lpthread)
#target_link_libraries(signal_test Cyber::signal)

build目录为代码生成测试目录

cyber 入口

cyber 的入口在"cyber/mainboard"目录中:

xz@xiaqiu:~/study/apollo/cyber/mainboard$ tree
.
├── BUILD
├── mainboard.cc //主函数
├── module_argument.cc //模块输入参数
├── module_argument.h
├── module_controller.cc //模块加载,卸载
└── module_controller.h

0 directories, 6 files

mainboard 中的文件比较少,也很好理解,我们先从"mainboard.cc"中开始分析:

#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/init.h"
#include "cyber/mainboard/module_argument.h"
#include "cyber/mainboard/module_controller.h"
#include "cyber/state.h"

using apollo::cyber::mainboard::ModuleArgument;
using apollo::cyber::mainboard::ModuleController;

int main(int argc, char **argv)
{
    // parse the argument 解析参数
    ModuleArgument module_args;
    module_args.ParseArgument(argc, argv);

    // initialize cyber 初始化 cyber
    apollo::cyber::Init(argv[0]);

    // start module 加载模块
    ModuleController controller(module_args);
    if (!controller.Init())
    {
        controller.Clear();
        AERROR << "module start error.";
        return -1;
    }
	// 等待 cyber 关闭
    apollo::cyber::WaitForShutdown();
    controller.Clear();
    AINFO << "exit mainboard.";

    return 0;
}

上述是“mainboard.cc”的主函数,下面我们重点介绍下具体的过程。

解析参数

解析参数是在“ModuleArgument”类中实现的,主要是解析加载 DAG 文件时候带的参数。

void ModuleArgument::DisplayUsage()
{
    AINFO << "Usage: \n    " << binary_name_ << " [OPTION]...\n"
          << "Description: \n"
          << "    -h, --help : help information \n"
          << "    -d, --dag_conf=CONFIG_FILE : module dag config file\n"
          << "    -p, --process_group=process_group: the process "
          "namespace for running this module, default in manager process\n"
          << "    -s, --sched_name=sched_name: sched policy "
          "conf for hole process, sched_name should be conf in cyber.pb.conf\n"
          << "Example:\n"
          << "    " << binary_name_ << " -h\n"
          << "    " << binary_name_ << " -d dag_conf_file1 -d dag_conf_file2 "
          << "-p process_group -s sched_name\n";
}

void ModuleArgument::ParseArgument(const int argc, char *const argv[])
{
    // 二进制模块名称
    binary_name_ = std::string(basename(argv[0]));
    //解析参数   
    GetOptions(argc, argv);
	// 如果没有 process_group_和 sched_name_,则赋值为虚认值
    if (process_group_.empty())
    {
        process_group_ = DEFAULT_process_group_;
    }
	//如果有,则设置对应的参数
    if (sched_name_.empty())
    {
        sched_name_ = DEFAULT_sched_name_;
    }

    GlobalData::Instance()->SetProcessGroup(process_group_);
    GlobalData::Instance()->SetSchedName(sched_name_);
    AINFO << "binary_name_ is " << binary_name_ << ", process_group_ is "
          << process_group_ << ", has " << dag_conf_list_.size() << " dag conf";
	//打印 dag_conf配置,这里的 dag 是否可以设置多个?
    for (std::string &dag : dag_conf_list_)
    {
        AINFO << "dag_conf: " << dag;
    }
}

模块加载

在“ModuleController”实现 cyber 模块的加载在“ModuleController:Init0”中调用“LoadAlI() ”来加载所有模块,我们接着看 cyber 是如何加载模块。

首先是找到模块的路径。

if (module_config.module_library().front() == '/')
{
    load_path = module_config.module_library();
}
else
{
    load_path =
        common::GetAbsolutePath(work_root, module_config.module_library());
}

if (!common::PathExists(load_path))
{
    AERROR << "Path does not exist: " << load_path;
    return false;
}

通过“class_loader_manager_ ”加载模块,后面我们会接着分析“ClassLoaderManager”的具体实现,加载好对应的类之后在创建对应的允象,并且初始化对象.〈调用对象的 Initialize(方法,也就是说所有的 cyber 模块都是通过 Initialize()方法启动的,后面们会接团分析 Initialize 具体干了什么) 。这里的“classloader” 其实类似 java 中的 classloader,即 java 虚拟机在运行时加载对应的类,并且实例化人_” 对象。cyber 中其实也是实现了类型通过动态加载并且实例化类的功能,好处是可以动态加载和关闭单个 cyber模块(定位,感知,规划等),也就是在 dreamview 中的模块开关按钮,实际上就是动态的加载和印载对应的模块。

//通过类加载器加载.1oad_path 下的模class_1oader_manager_.LoadLibrary(load path);
//加载模块
for (auto &component : module_config.components())
{
    const std::string &class_name = component.class_name();
    //创建对象
    std::shared_ptr<ComponentBase> base =
        class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
     调用对象的 Initialize 方法
    if (base == nullptr || !base->Initialize(component.config()))
    {
        return false;
    }
    component_list_.emplace_back(std::move(base));
}
//加载定时器模块
for (auto &component : module_config.timer_components())
{
    const std::string &class_name = component.class_name();
    std::shared_ptr<ComponentBase> base =
        class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
    if (base == nullptr || !base->Initialize(component.config()))
    {
        return false;
    }
    component_list_.emplace_back(std::move(base));
}

上述就是 cyber mainboard 的整个流程,cyber main 函数中先解析 dag 参数滞然后根据解析的参数,通过类加载器动态的加载对应的模块,然后调用 Initialize方法初始化模块。

动态加载对象

类加载器(class_loader)类加载器的作用就是动态的加载动态库然后实例化对象。我们先来解释下, 首先 apollo 中的各个 module都会编译为一个动态库,拿 planning 模块来举例子,在“planning/dag/planning.dag”中,会加载:

module_config {
  module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"

也就是说,apollo 中的模块都会通过类加载器以动态库的方式加载,然后实例化,之后再调用 Initialize 方法初始化。也就是说,我们讲清楚下面 3 个问题,也就是讲清楚了类加载器的原理。

1、cyber 如何加载 apollo 模块?

2、如何实例化模块?

3、如何初始化模块?

类加载器的实现在“cyber/class_loader”目录中,通过“Poco/SharedLibraryh”库来实现动态库的加载,关于 Poco 动态库的加载可以[参考](Class Poco::SharedLibrary)

xz@xiaqiu:~/study/apollo/cyber/class_loader$ tree
.
├── BUILD //编译文件
├── class_loader.cc//类加载器
├── class_loader.h
├── class_loader_manager.cc//类加载器管理
├── class_loader_manager.h
├── class_loader_register_macro.h//类加载器注册宏定义
├── class_loader_test.cc
├── shared_library
│   ├── BUILD
│   ├── exceptions.h
│   ├── sample.cc
│   ├── sample.h
│   ├── shared_library.cc
│   ├── shared_library.h
│   └── shared_library_test.cc
├── test
│   ├── base.h
│   ├── BUILD
│   ├── plugin1.cc
│   └── plugin2.cc
└── utility
    ├── BUILD
    ├── class_factory.cc //类工厂
    ├── class_factory.h
    ├── class_loader_utility.cc//类加载器工具类
    └── class_loader_utility.h

3 directories, 23 files

我们先从“class_loaderh”开始看起,首先我们分析下“class_loader”实现的具体方法:

/** *  for library load,createclass object */class ClassLoader { public:  explicit ClassLoader(const std::string& library_path);  virtual ~ClassLoader();	  bool IsLibraryLoaded();//库是否已经加载  bool LoadLibrary();// 贡载库  int UnloadLibrary();// 卸载库  const std::string GetLibraryPath() const;// 获取库路径  template <typename Base>  std::vector<std::string> GetValidClassNames();// 获取类名称  template <typename Base>  std::shared_ptr<Base> CreateClassObj(const std::string& class_name);// 实例化类对象  template <typename Base>  bool IsClassValid(const std::string& class_name);//判断类是否有效 private:  template <typename Base>  void OnClassObjDeleter(Base* obj); private:     std::string library_path_;// 类路径  int loadlib_ref_count_;// 类加载引用次数  std::mutex loadlib_ref_count_mutex_;// 类加载引用次数锁  int classobj_ref_count_;// 类引用次数  std::mutex classobj_ref_count_mutex_;// 类引用次数锁};

可以看到类加载器主要是提供了加载类, 邱载类和实例化类的接口。实际上加载类和印载类的实现都比较简单,都是调用“utility ”类中的实现,我们暂时先放一边,先看下实例化对象的实现。

template <typename Base>std::shared_ptr<Base> ClassLoader::CreateClassObj(    const std::string &class_name){    // 加载库    if (!IsLibraryLoaded())    {        LoadLibrary();    }	// 根据类名称创建对象    Base *class_object = utility::CreateClassObj<Base>(class_name, this);    if (class_object == nullptr)    {        AWARN << "CreateClassObj failed, ensure class has been registered. "              << "classname: " << class_name << ",lib: " << GetLibraryPath();        return std::shared_ptr<Base>();    }	// 类引用计数加1    std::lock_guard<std::mutex> lck(classobj_ref_count_mutex_);    classobj_ref_count_ = classobj_ref_count_ + 1;    // 指定类的析构函数    std::shared_ptr<Base> classObjSharePtr(        class_object, std::bind(&ClassLoader::OnClassObjDeleter<Base>, this,                                std::placeholders::_1));    return classObjSharePtr;}

可以看到创建类的时候,类引用计数加13. 并且绑定类的析构函数(OnClassObjDeleteD ,删除对象的时候计娄引用计数减1

template <typename Base>void ClassLoader::OnClassObjDeleter(Base *obj){    if (nullptr == obj)    {        return;    }    std::lock_guard<std::mutex> lck(classobj_ref_count_mutex_);    delete obj;    --classobj_ref_count_;}

我们先简单的分析下 ClassLoaderManager,最后再分析 utility 。

类加载管理器 (ClassLoaderManager)

类加载器管理实际上是管理不同的 classloader ,而不同的libpath 对应不同的 classloader 。ClassLoaderManager 主要的数据结构其实如下:

std::map<std::string, ClassLoader *> libpath_loader_map_;

其中“libpath_loader_map_”为 map 结构,在“LoadLibrary”的时候赋值,key 为 library_path,而 value为 ClassLoader,

bool ClassLoaderManager::LoadLibrary(const std::string &library_path){    std::lock_guard<std::mutex> lck(libpath_loader_map_mutex_);    if (!IsLibraryValid(library_path))    {        libpath_loader_map_[library_path] =            new class_loader::ClassLoader(library_path);    }    return IsLibraryValid(library_path);}

也就是说“ClassLoaderManager”对 ClassLoader 进行保存和管理。最后我们分析下 utility 具体的实现,utility 分为 2 部分,一部分为 ClassFactory, 一部分为工具函数《class_loader_utility.cc)

ClassFactory

可以看到有如下继承关系“ ClassFaetory -> AbstractClassFactory -> AbstractClassFactoryBase ”,其中“ClassFactory“和”AbstractClassFactory ”为模板类,主要的实现在“AbstractClassFactoryBase”中,我们逐个分析:首先是类初始化,指定了“relative_library path”,“base_class_name”,“class_name_”

AbstractClassFactoryBase::AbstractClassFactoryBase(    const std::string &class_name, const std::string &base_class_name)    : relative_library_path_(""),      base_class_name_(base_class_name),      class_name_(class_name) {}

设置 OwnedClassLoader,而“RemoveOwnedClassLoader”同理。

void AbstractClassFactoryBase::AddOwnedClassLoader(ClassLoader *loader){    if (std::find(relative_class_loaders_.begin(), relative_class_loaders_.end(),                  loader) == relative_class_loaders_.end())    {        relative_class_loaders_.emplace_back(loader);    }}

classloader 是否属于该 classFactory 。

bool AbstractClassFactoryBase::IsOwnedBy(const ClassLoader *loader){    std::vector<ClassLoader *>::iterator itr = std::find(                relative_class_loaders_.begin(), relative_class_loaders_.end(), loader);    return itr != relative_class_loaders_.end();}

也是说 ClassFactory 能够生产一个路径下的所有类,一个 ClassFactory 可能有好几个 ClassLoader,分为base_class_name 和 class_name。

接下来我们看“class_loader_utility.cc”的实现,文件中实现了很多函数,这个分析如下;

创建对象(CreateClassObj)的具体实现如下,先找到类对应的 factory,然后通过 factory 创建对象。

template <typename Base>
std::shared_ptr<Base> ClassLoaderManager::CreateClassObj(
    const std::string &class_name)
{
    std::vector<ClassLoader *> class_loaders = GetAllValidClassLoaders();
    for (auto class_loader : class_loaders)
    {
        if (class_loader->IsClassValid<Base>(class_name))
        {
            return (class_loader->CreateClassObj<Base>(class_name));
        }
    }
    AERROR << "Invalid class name: " << class_name;
    return std::shared_ptr<Base>();
}

注册类到 factory 。

template <typename Derived, typename Base>
void RegisterClass(const std::string &class_name,
                   const std::string &base_class_name)
{
    AINFO << "registerclass:" << class_name << "," << base_class_name << ","
          << GetCurLoadingLibraryName();

    utility::AbstractClassFactory<Base> *new_class_factrory_obj =
        new utility::ClassFactory<Derived, Base>(class_name, base_class_name);
    new_class_factrory_obj->AddOwnedClassLoader(GetCurActiveClassLoader());
    new_class_factrory_obj->SetRelativeLibraryPath(GetCurLoadingLibraryName());

    GetClassFactoryMapMapMutex().lock();
    ClassClassFactoryMap &factory_map =
        GetClassFactoryMapByBaseClass(typeid(Base).name());
    factory_map[class_name] = new_class_factrory_obj;
    GetClassFactoryMapMapMutex().unlock();
}

查找 classloader 中所有类的名称。

template <typename Base>std::vector<std::string> GetValidClassNames(ClassLoader *loader){    std::lock_guard<std::recursive_mutex> lck(GetClassFactoryMapMapMutex());    ClassClassFactoryMap &factoryMap =        GetClassFactoryMapByBaseClass(typeid(Base).name());    std::vector<std::string> classes;    for (auto &class_factory : factoryMap)    {        AbstractClassFactoryBase *factory = class_factory.second;        if (factory && factory->IsOwnedBy(loader))        {            classes.emplace_back(class_factory.first);        }    }    return classes;}

加载类,通过指定的 classloader 加载指定路径下的库

bool LoadLibrary(const std::string &library_path, ClassLoader *loader)
{
    // 类是否已经被加载,如果被加载则对应的 class_factory 加上依赖的class_loader
    if (IsLibraryLoadedByAnybody(library_path))
    {
        AINFO << "lib has been loaded by others,only attach to class factory obj."
              << library_path;
        ClassFactoryVector lib_class_factory_objs =
            GetAllClassFactoryObjectsOfLibrary(library_path);
        for (auto &class_factory_obj : lib_class_factory_objs)
        {
            class_factory_obj->AddOwnedClassLoader(loader);
        }
        return true;
    }

    SharedLibraryPtr shared_library = nullptr;
    static std::recursive_mutex loader_mutex;
    {
        std::lock_guard<std::recursive_mutex> lck(loader_mutex);
        try
        {
            //设置当前激活的classloader,当前加载库路径
            SetCurActiveClassLoader(loader);
            SetCurLoadingLibraryName(library_path);
            shared_library = SharedLibraryPtr(new SharedLibrary(library_path));
        }
        catch (const LibraryLoadException &e)
        {
            SetCurLoadingLibraryName("");
            SetCurActiveClassLoader(nullptr);
            AERROR << "LibraryLoadException: " << e.what();
        }
        catch (const LibraryAlreadyLoadedException &e)
        {
            SetCurLoadingLibraryName("");
            SetCurActiveClassLoader(nullptr);
            AERROR << "LibraryAlreadyLoadedException: " << e.what();
        }
        catch (const SymbolNotFoundException &e)
        {
            SetCurLoadingLibraryName("");
            SetCurActiveClassLoader(nullptr);
            AERROR << "SymbolNotFoundException: " << e.what();
        }

        SetCurLoadingLibraryName("");
        SetCurActiveClassLoader(nullptr);
    }

    if (shared_library == nullptr)
    {
        AERROR << "shared library failed: " << library_path;
        return false;
    }

    auto num_lib_objs = GetAllClassFactoryObjectsOfLibrary(library_path).size();
    if (num_lib_objs == 0)
    {
        AWARN << "Class factory objs counts is 0, maybe registerclass failed.";
    }

    std::lock_guard<std::recursive_mutex> lck(GetLibPathSharedLibMutex());
    LibPathSharedLibVector &opened_libraries = GetLibPathSharedLibVector();
	// 保存加载路径和对应的 poco_library
    opened_libraries.emplace_back(
        std::pair<std::string, SharedLibraryPtr>(library_path, shared_library));
    return true;
}

上面我们分析了 classloader 动态的加载并月创建类对象, 而在 mainhoard 中通过动态的加载 module, 并且调用模块的 Initialize 方法,实现模块的初始化。下面我们看下模块的初始化过程。

模块初始化

component 概述(cyber 组件)

我们先看下 component 的目录结构。可以看到 cyber 组件分为 2 类:普通组件和定时组件,而二者都继承至基础组件;

xz@xiaqiu:~/study/apollo/cyber/component$ tree.├── BUILD├── component_base.h //基础组件├── component.h //组件├── component_test.cc├── timer_component.cc //定时组件├── timer_component.h└── timer_component_test.cc0 directories, 7 files

基础组件 component_base

我们先看下基础组件中实现了什么,也就是“component_base.h”中实现了什么。“component_base.h”实现了“ComponentBase”类,下面我们逐步分析“ComponentBase”类的 public 方法。”1、Initialize 方法Initialize 方法在派生类中重写了,这里有 2 个 Initialize 方法,分别对应上述所说的 2 种类型的组件。

  virtual bool Initialize(const ComponentConfig& config) { return false; }  virtual bool Initialize(const TimerComponentConfig& config) { return false; }

2、Shutdown 方法

用于关闭 cyber 模块

virtual void Shutdown(){    if (is_shutdown_.exchange(true))    {        return;    }    Clear();    for (auto &reader : readers_)    {        reader->Shutdown();    }    scheduler::Instance()->RemoveTask(node_->Name());}

3、GetProtoConfig 方法

获取 protobuf 格式的配置。

template <typename T>bool GetProtoConfig(T *config) const{    return common::GetProtoFromFile(config_file_path_, config);}

看完公有方法,下面我们看下私有方法.有些简单的方法这里就不详细说了,主要看下“LoadConfigFiies方法,有2 个“LoadConfigFiles” 方法这里只介绍第一个:

void LoadConfigFiles(const ComponentConfig &config){    if (!config.config_file_path().empty())    {        if (config.config_file_path()[0] != '/')        {            config_file_path_ = common::GetAbsolutePath(common::WorkRoot(),                                config.config_file_path());        }        else        {            config_file_path_ = config.config_file_path();        }    }    if (!config.flag_file_path().empty())    {        std::string flag_file_path = config.flag_file_path();        if (flag_file_path[0] != '/')        {            flag_file_path =                common::GetAbsolutePath(common::WorkRoot(), flag_file_path);        }        google::SetCommandLineOption("flagfile", flag_file_path.c_str());    }}

5、私有成员变量

最后我们在分析下私有成员变量,也就是说每个组件(componenb)会自动创建一个节点(node),并且可以挂载多个 reader。

    std::atomic<bool> is_shutdown_ = {false};    std::shared_ptr<Node> node_ = nullptr;    std::string config_file_path_ = "";    std::vector<std::shared_ptr<ReaderBase>> readers_;

下面我们开始分析 component 组件,也就是 Component 类。

Component 组件

Component 类都需要实现“Initialize”和“Process“2 个方法,所以 planning.fouting,perception 等模块都需要实现这 2? 个方法。

template <typename M0>
class Component<M0, NullType, NullType, NullType> : public ComponentBase
{
public:
    Component() {}
    ~Component() override {}
    bool Initialize(const ComponentConfig &config) override;
    bool Process(const std::shared_ptr<M0> &msg);
private:
    virtual bool Proc(const std::shared_ptr<M0> &msg) = 0;
};

我们接着看下这 2 个方法是如何实现的,先看“Process”方法。

1、Process 方法

可以看到 Process 方法比较简单,先判断模块是否关闭,然后执行“Proc”方法。

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Process(
    const std::shared_ptr<M0> &msg0, const std::shared_ptr<M1> &msg1)
{
    if (is_shutdown_.load())
    {
        return true;
    }
    return Proc(msg0, msg1);
}

2、Initialize 方法

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
    const ComponentConfig &config)
{
    // 创建node 节点
    node_.reset(new Node(config.name()));
    //加载配置
    LoadConfigFiles(config);
	// 订阅消息数和 reader 个数要匹配
    if (config.readers_size() < 2)
    {
        AERROR << "Invalid config file: too few readers.";
        return false;
    }
	//初始化,在基类(ComponentBase)中实现
    if (!Init())
    {
        AERROR << "Component Init() failed.";
        return false;
    }

    bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
	//创建 readerl
    ReaderConfig reader_cfg;
    reader_cfg.channel_name = config.readers(1).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
    reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
	
    auto reader1 = node_->template CreateReader<M1>(reader_cfg);
	//创建 reader0
    reader_cfg.channel_name = config.readers(0).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
    reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();

    std::shared_ptr<Reader<M0>> reader0 = nullptr;
    // is _reality_mode 模式则直接创建
    if (cyber_likely(is_reality_mode))
    {
        reader0 = node_->template CreateReader<M0>(reader_cfg);
    }
    else
    {
        // 如果不是则创建回调函数
        std::weak_ptr<Component<M0, M1>> self =
                                          std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());

        auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
                            config.readers(1).channel());

        auto func = [self, blocker1](const std::shared_ptr<M0> &msg0)
        {
            auto ptr = self.lock();
            if (ptr)
            {
                if (!blocker1->IsPublishedEmpty())
                {
                    auto msg1 = blocker1->GetLatestPublishedPtr();
                    ptr->Process(msg0, msg1);
                }
            }
            else
            {
                AERROR << "Component object has been destroyed.";
            }
        };

        reader0 = node_->template CreateReader<M0>(reader_cfg, func);
    }
    if (reader0 == nullptr || reader1 == nullptr)
    {
        AERROR << "Component create reader failed.";
        return false;
    }
    // 保存 readers
    readers_.push_back(std::move(reader0));
    readers_.push_back(std::move(reader1));

    if (cyber_unlikely(!is_reality_mode))
    {
        return true;
    }

    auto sched = scheduler::Instance();
    std::weak_ptr<Component<M0, M1>> self =
                                      std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
    auto func = [self](const std::shared_ptr<M0> &msg0,
                       const std::shared_ptr<M1> &msg1)
    {
        auto ptr = self.lock();
        if (ptr)
        {
            ptr->Process(msg0, msg1);
        }
        else
        {
            AERROR << "Component object has been destroyed.";
        }
    };

    std::vector<data::VisitorConfig> config_list;
    for (auto &reader : readers_)
    {
        config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
    }
    auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
    // 创建执程类
    croutine::RoutineFactory factory =
        croutine::CreateRoutineFactory<M0, M1>(func, dv);
    return sched->CreateTask(factory, node_->Name());
}

3、component 动态加载

Cyber 主 函数在 “ModuleController:Imit() ”进行模块的加载,具体的加载过程在“ModuleController::LoadModule”中。

bool ModuleController::LoadModule(const DagConfig &dag_config)
{
    const std::string work_root = common::WorkRoot();

    for (auto module_config : dag_config.module_config())
    {	
        // 1. 加载动态库
        std::string load_path;
        if (module_config.module_library().front() == '/')
        {
            load_path = module_config.module_library();
        }
        else
        {
            load_path =
                common::GetAbsolutePath(work_root, module_config.module_library());
        }

        if (!common::PathExists(load_path))
        {
            AERROR << "Path does not exist: " << load_path;
            return false;
        }

        class_loader_manager_.LoadLibrary(load_path);

        for (auto &component : module_config.components())
        {
            const std::string &class_name = component.class_name();
            std::shared_ptr<ComponentBase> base =
                class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
            if (base == nullptr || !base->Initialize(component.config()))
            {
                return false;
            }
            component_list_.emplace_back(std::move(base));
        }

        for (auto &component : module_config.timer_components())
        {
            const std::string &class_name = component.class_name();
            std::shared_ptr<ComponentBase> base =
                class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
            if (base == nullptr || !base->Initialize(component.config()))
            {
                return false;
            }
            component_list_.emplace_back(std::move(base));
        }
    }
    return true;
}

模块首先通过 classloader 加载到内存,然后创建对象,并且调用模块的初始化方法。component 中每个模块都设计为可以动态加载和卸载, 可以实时在线的开启和关闭模块, 实现的方式是通过 classloader 来进行动态的加载动态库。

4、component 初始化

component 一共有 4 个模板类,分别对应接收 0-3 个消息。我们这里主要分析 2 个消息的情况, 其它的可以类推。

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
    const ComponentConfig &config)
{
    node_.reset(new Node(config.name()));
    LoadConfigFiles(config);

    if (config.readers_size() < 2)
    {
        AERROR << "Invalid config file: too few readers.";
        return false;
    }

    if (!Init())
    {
        AERROR << "Component Init() failed.";
        return false;
    }

    bool is_reality_mode = GlobalData::Instance()->IsRealityMode();

    ReaderConfig reader_cfg;
    reader_cfg.channel_name = config.readers(1).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
    reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();

    auto reader1 = node_->template CreateReader<M1>(reader_cfg);

    reader_cfg.channel_name = config.readers(0).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
    reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();

    std::shared_ptr<Reader<M0>> reader0 = nullptr;
    if (cyber_likely(is_reality_mode))
    {
        reader0 = node_->template CreateReader<M0>(reader_cfg);
    }
    else
    {
        std::weak_ptr<Component<M0, M1>> self =
                                          std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());

        auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
                            config.readers(1).channel());

        auto func = [self, blocker1](const std::shared_ptr<M0> &msg0)
        {
            auto ptr = self.lock();
            if (ptr)
            {
                if (!blocker1->IsPublishedEmpty())
                {
                    auto msg1 = blocker1->GetLatestPublishedPtr();
                    ptr->Process(msg0, msg1);
                }
            }
            else
            {
                AERROR << "Component object has been destroyed.";
            }
        };

        reader0 = node_->template CreateReader<M0>(reader_cfg, func);
    }
    if (reader0 == nullptr || reader1 == nullptr)
    {
        AERROR << "Component create reader failed.";
        return false;
    }
    readers_.push_back(std::move(reader0));
    readers_.push_back(std::move(reader1));

    if (cyber_unlikely(!is_reality_mode))
    {
        return true;
    }

    auto sched = scheduler::Instance();
    std::weak_ptr<Component<M0, M1>> self =
                                      std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
    auto func = [self](const std::shared_ptr<M0> &msg0,
                       const std::shared_ptr<M1> &msg1)
    {
        auto ptr = self.lock();
        if (ptr)
        {
            ptr->Process(msg0, msg1);
        }
        else
        {
            AERROR << "Component object has been destroyed.";
        }
    };

    std::vector<data::VisitorConfig> config_list;
    for (auto &reader : readers_)
    {
        config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
    }
    auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
    croutine::RoutineFactory factory =
        croutine::CreateRoutineFactory<M0, M1>(func, dv);
    return sched->CreateTask(factory, node_->Name());
}

总结 component 流程

对以上 componet 的流程总结如下:

1、创建node 节点〈1 个 component 只能有 1 个 node 节点,之后用户可以用 node 在 init 中自己创建 reader 或writer) 。

2、调用用户自定义的初始化函数 Init0〈子类的 Init方法

3、创建 reader,订阅几个消息就创建几个 reader。

4、创建回调函数,实际上是执行用户定义算法 Proc()函数

5、创建数据访问器,数据访问器的用途为接收数据融合多个通道的数据) ,唤醒对应的协程执行任务。

6、创建协程任务绑定回调函数,并且绑定数据访问器到对应的协程任务,用于唤醒对应的任务。对 cyber 数据的收发流程有了一个简单的介绍,接下去我们会分别介绍如何创建协程、如何在 scheduler

注册任务并且绑定 Notify。也就是说,为了方便理解,你可以认为数据通过 DataDispatcher 已经分发到了对应的 DataVisitor 中,接下来我们只分析如何从 DataVisitor 中取数据,并且触发对应的协程执行回调任务。

创建协程

创建协程对应上述代码。

    croutine::RoutineFactory factory =        croutine::CreateRoutineFactory<M0>(func, dv);

接下来我们查看下如何创建协程: 协程通过工厂模式方法创建,里面包含一个回调函数和一个 dy 〈数据访问器) 。

template <typename M0, typename F>RoutineFactory CreateRoutineFactory(    F &&f, const std::shared_ptr<data::DataVisitor<M0>> &dv){    RoutineFactory factory;    factory.SetDataVisitor(dv);    factory.create_routine = [ = ]()    {        return [ = ]()        {            std::shared_ptr<M0> msg;            for (;;)            {                CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);                if (dv->TryFetch(msg))                {                    f(msg);                    CRoutine::Yield(RoutineState::READY);                }                else                {                    CRoutine::Yield();                }            }        };    };    return factory;}

上述过程总结如下:

1、工厂中设置 DataVisitor

2、工厂中创建设置协程执行函数,回调包括 3 个步骤 从 DataVisitor 中获取数据,执行回调函数,继续休限。

创建调度任务

创建调度任务是在过程“Component:Initialize”中完成。

sched->CreateTask(factory, node_->Name());

我们接着分析如何在 Scheduler 中创建任务。

bool Scheduler::CreateTask(std::function<void()> &&func,                           const std::string &name,                           std::shared_ptr<DataVisitorBase> visitor){    if (cyber_unlikely(stop_.load()))    {        ADEBUG << "scheduler is stoped, cannot create task!";        return false;    }	// 1. 根据名称创建任务 ID    auto task_id = GlobalData::RegisterTaskName(name);    auto cr = std::make_shared<CRoutine>(func);    cr->set_id(task_id);    cr->set_name(name);    AINFO << "create croutine: " << name;	//2. 分发协程任务    if (!DispatchTask(cr))    {        return false;    }	//3. 注册 Notify 隐醒任务    if (visitor != nullptr)    {        visitor->RegisterNotifyCallback([this, task_id]()        {            if (cyber_unlikely(stop_.load()))            {                return;            }            this->NotifyProcessor(task_id);        });    }    return true;}

TimerComponent

实际上 Component 分为 2 类: 一类是上面介绍的消息驱动的 Component,第二类是定时调用的TimerComponent。定时调度模块没有绑定消息收发,需要用户自己创建 reader 来读取消息, 如果需要读取多个消息,可以创建多个 reader。

bool TimerComponent::Initialize(const TimerComponentConfig &config){    if (!config.has_name() || !config.has_interval())    {        AERROR << "Missing required field in config file.";        return false;    }    //1. 创建node    node_.reset(new Node(config.name()));    LoadConfigFiles(config);    //2. 调用用户自定义初始化函数    if (!Init())    {        return false;    }    std::shared_ptr<TimerComponent> self =        std::dynamic_pointer_cast<TimerComponent>(shared_from_this());	// 3. 创建定时器,定时调用"Proc() "函数    auto func = [self]() { self->Proc(); };    timer_.reset(new Timer(config.interval(), func, false));    timer_->Start();    return true;}

总结一下 TimerComponent 的执行流程如下。
1、创建Node
2、调用用户自定义初始化函数
3、创建定时器,定时调用"Proc()"函数
上述就是 Component 模块的调用流程。为了于清楚消息的调用过程,下面我们分析“DataDispatcher”和
“DataVisitor”

DataVisitor 和DataDispatcher

DataDispather (消息分发器) 发布消息,DataDispather 是一个单例,-所有的数据分发都在数据分发器中进行, DataDispather 会把数据放到对应的缓存中六然后 Notify(通知)对应的协程(实际上这里调用的是 DataVisitor中注册的 Notify ) 去处理消息。 DataVisitor (消息访问器) 是–个辅助的类,一个数据处理过程对应一个DataVisitor,通过在 DataVisitor 中注册 Notify, (唤醒对应的协程,协程执行绑定的回调函数) ,并且注册对应的 Buffer 到 DataDispather, 这样在:DataDispather 的时候会通知对应的 DataVisitor 去唤醒对应的协程。 也就是说 DataDispather (消息分发器) 发布对应的消息到 Datavisitor,DataVisitor (消息访问器) 唤醒对应的协程,协程中执行绑定的数据处理回调函数。

DataVisitor 数据访问器

DataVisitor 继承至 DataVisitorBase 类,先看 DataVisitorBase 类的实现。

class DataVisitorBase{public:    //1. 初始化的时候创建一个 Notifier    DataVisitorBase() : notifier_(new Notifier()) {}	//2. 设置注册回调    void RegisterNotifyCallback(std::function<void()> &&callback)    {        notifier_->callback = callback;    }protected:    DataVisitorBase(const DataVisitorBase &) = delete;    DataVisitorBase &operator=(const DataVisitorBase &) = delete;	// 3. 下一次消息的下标    uint64_t next_msg_index_ = 0;    // 4. DataNotifier 单例    DataNotifier *data_notifier_ = DataNotifier::Instance();    std::shared_ptr<Notifier> notifier_;};

可以看到 DataVisitorBase 创建了一个“Notifier”类,并且提供注册回调的接口。同时还引用了“DataNotifier::Instance()”单例。接下来看“DataVisitor”类的实现。

template <typename M0, typename M1 = NullType, typename M2 = NullType,          typename M3 = NullType>class DataVisitor : public DataVisitorBase{public:    explicit DataVisitor(const std::vector<VisitorConfig> &configs)        : buffer_m0_(configs[0].channel_id,                     new BufferType<M0>(configs[0].queue_size)),          buffer_m1_(configs[1].channel_id,                     new BufferType<M1>(configs[1].queue_size)),          buffer_m2_(configs[2].channel_id,                     new BufferType<M2>(configs[2].queue_size)),          buffer_m3_(configs[3].channel_id,                     new BufferType<M3>(configs[3].queue_size))    {        //在DataDispatcher 中增加 ChannelBuffer        DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);        DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);        DataDispatcher<M2>::Instance()->AddBuffer(buffer_m2_);        DataDispatcher<M3>::Instance()->AddBuffer(buffer_m3_);         //2. 在DataNotifier::Instance()中增加创建好的 Notifier        data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);        //3. 对接收到的消息进行数据融合        data_fusion_ = new fusion::AllLatest<M0, M1, M2, M3>(            buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_);    }    ~DataVisitor()    {        if (data_fusion_)        {            delete data_fusion_;            data_fusion_ = nullptr;        }    }    bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1,    // NOLINT                  std::shared_ptr<M2> &m2, std::shared_ptr<M3> &m3)    // NOLINT    {        //4: 获取融合数据        if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3))        {            next_msg_index_++;            return true;        }        return false;    }private:    fusion::DataFusion<M0, M1, M2, M3> *data_fusion_ = nullptr;    ChannelBuffer<M0> buffer_m0_;    ChannelBuffer<M1> buffer_m1_;    ChannelBuffer<M2> buffer_m2_;    ChannelBuffer<M3> buffer_m3_;};

总结一下 DataVisitor 中实现的功能。
1、在 DataDispatcher 中添加订阅的 ChannelBuffer
2、在 DataNotifier 中增加对应通道的 Notifier
3、通过 DataVisitor 获取数据并进行融合
这里注意:

  1. 、如果 DataVisitor 只访问一个消息,则不会对消息进行融合,如果 DataVisitor 访问 2 个以上的数据,
    那么需要进行融合,并且注册融合回调。之后 CacheBuffer 中会调用融合回调进行数据处理,而不会把数据放
    入 CacheBuffer中。
// 1. 只有一个消息的时候直接从 Buffer 中获取消息bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1,  // NOLINT              std::shared_ptr<M2> &m2)                           // NOLINT{    if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2))    {        next_msg_index_++;        return true;    }    return false;}// 2. 当有 2 个消息的时候,从融合 buffer 中读取消息bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1)    // NOLINT{    if (data_fusion_->Fusion(&next_msg_index_, m0, m1))    {        next_msg_index_++;        return true;    }    return false;}
  1. 实际上如果有多个消息的时候,会以第 1 个消息为基准,然后把其它消息的最新消息一起放入融合好的 buffer_fusion_ 。
AllLatest(const ChannelBuffer<M0> &buffer_0,          const ChannelBuffer<M1> &buffer_1,          const ChannelBuffer<M2> &buffer_2,          const ChannelBuffer<M3> &buffer_3)    : buffer_m0_(buffer_0),      buffer_m1_(buffer_1),      buffer_m2_(buffer_2),      buffer_m3_(buffer_3),      buffer_fusion_(buffer_m0_.channel_id(),                     new CacheBuffer<std::shared_ptr<FusionDataType>>(                         buffer_0.Buffer()->Capacity() - uint64_t(1))){    buffer_m0_.Buffer()->SetFusionCallback(        [this](const std::shared_ptr<M0> &m0)    {        std::shared_ptr<M1> m1;        std::shared_ptr<M2> m2;        std::shared_ptr<M3> m3;        if (!buffer_m1_.Latest(m1) || !buffer_m2_.Latest(m2) ||                !buffer_m3_.Latest(m3))        {            return;        }        auto data = std::make_shared<FusionDataType>(m0, m1, m2, m3);        std::lock_guard<std::mutex> lg(buffer_fusion_.Buffer()->Mutex());        buffer_fusion_.Buffer()->Fill(data);    });}
  1. DataFusion 类是一个虚类,定义了数据融合的接口“Fusion0”; Apollo 里只提供了一种数据融合的方式,即以第一个消息的时间为基准,取其它最新的消息,当然也可以在这里实现共它的数据融台方式。

DataDispatcher 数据分发器

接下来我们看 DataDispatcher 的实现。

template <typename T>class DataDispatcher{public:    using BufferVector =        std::vector<std::weak_ptr<CacheBuffer<std::shared_ptr<T>>>>;    ~DataDispatcher() {}	//1. 添加 ChannelBuffer 到buffers_map    void AddBuffer(const ChannelBuffer<T> &channel_buffer);	// 2. 分发通道中的消息    bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T> &msg);private:    //3. DataNotifier 单例    DataNotifier *notifier_ = DataNotifier::Instance();    std::mutex buffers_map_mutex_;    // 4 险希表,key 为通道 id,value 为订阅通道消息的 CacheBuffer 数组。    AtomicHashMap<uint64_t, BufferVector> buffers_map_;	// 5. 单例    DECLARE_SINGLETON(DataDispatcher)};

总结一下 DataDispatcher 的实现。
1、添加 ChannelBuffer 到 buffers_map_,key 为通道 id (topic) ,value 为订阅通道消息的 CacheBuffer 数组。
2、分发通道中的消息。根据通道id,把消息放入对应的 CacheBuffer。然后通过 DataNotifier:Instance0通知对应的通道。如果一个通道(topic)有 3 个 CacheBuffer订阅,那么每次都会往这 3 个 CacheBuffer 中写入当前消息的指针。因为消息是共享的,消息访问的时候需要加锁。那么 DataNotifier 如何通知对应的 Channel 的呢? 理解清楚了 DataNotifier 的数据结构,那么也就理解了DataNotifier 的原理。

class DataNotifier{public:    using NotifyVector = std::vector<std::shared_ptr<Notifier>>;    ~DataNotifier() {}    void AddNotifier(uint64_t channel_id,                     const std::shared_ptr<Notifier> &notifier);    bool Notify(const uint64_t channel_id);private:    std::mutex notifies_map_mutex_;    //1. 险希表,key 为通道 id,value 为 Notify 数组    AtomicHashMap<uint64_t, NotifyVector> notifies_map_;    DECLARE_SINGLETON(DataNotifier)};

DataNotifier 中包含一个哈希表,, 表的 key 为通道 it,表的值为 Notify 数组,每个 DataVisitorBase 在初始化的时候会创建一个 Notify 。接着我们看 下CacheBuffer 的实现,CacheBuffer 实际上实现了一个缓存队列,主要关注下 Fill 函数。

void Fill(const T &value){    //1., 融合回调    if (fusion_callback_)    {        fusion_callback_(value);    }    else    {        //2. 如果 Buffer 满,实现循环队列        if (Full())        {            buffer_[GetIndex(head_)] = value;            ++head_;            ++tail_;        }        else        {            buffer_[GetIndex(tail_ + 1)] = value;            ++tail_;        }    }}

ChannelBuffer 是 CacheBuffer 的封装,主要看下获取值。

template <typename T>bool ChannelBuffer<T>::Fetch(uint64_t *index,                             std::shared_ptr<T> &m)    // NOLINT{    std::lock_guard<std::mutex> lock(buffer_->Mutex());    if (buffer_->Empty())    {        return false;    }    if (*index == 0)    {        *index = buffer_->Tail();    }    else if (*index == buffer_->Tail() + 1)    {        return false;    }    else if (*index < buffer_->Head())    {        auto interval = buffer_->Tail() - *index;        AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] "              << "read buffer overflow, drop_message[" << interval << "] pre_index["              << *index << "] current_index[" << buffer_->Tail() << "] ";        *index = buffer_->Tail();    }    m = buffer_->at(*index);    return true;}

data 目录总结

通过上述的分析,实际上数据的访问都是通过“DataVisitor”来实现,数据的分发通过“DataDispatcher”来实现。reader 中也是通过 DataVisitor 来访问数据,在 reader 中订阅对应的 DataDispatcher。也就是说如果你要订阅一个通道,首先是在 reader 中注册消息的 topic,绑定 DataDispatcher,之后对应通道的消息到来之后,触发 DataDispatcher 分发消息,而 DataDispatcher 通过 DataVisitor 中的 Notify 唤醒协程,从 DataVisitor 中获取消息,并执行协程中绑定的回调函数,以上就是整个消息的收发过程。疑问:Reader 中还拷贝了一份数据到 Blocker 中,实际上数据的处理过程并不需要缓存数据,参考“Planning”模块中的实现都是在回调函数中把数据拷贝到指针中。看注释是说 Blocker 是用来仿真的? 后面需要确实下。以下是 Planning 模块中回调函数中拷贝数据的实现。


版权声明:本文为TM1695648164原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>