C++实现RPC分布式⽹络通信框架(⼋)-------项⽬框架梳理及总结
⽂章⽬录
⼀、项⽬的⽂件架构
1.整体⽂件框架
在整个项⽬中,⽂件组织格式如上,分门别类的将proto⽂件,cc⽂件,h⽂件,txt⽂件都组织在上述各个⽂件夹中,在编写代码时需要使⽤到⾃动化编译环境,所以我们需要先了解⽂件组织形式,这样才可以包含⽂件以⾄于编译不⾄于出错,⽽且有条理的组织⽂件会让使⽤者也更好的理清如何使⽤。在接下来的博⽂中会将整体⽂件的思路理清。
2.各⽂件的作⽤
(1)bin⽂件夹
存放⽣成的可执⾏⽂件和配置⽂件当使⽤框架时需要进⼊该⽂件来执⾏相应的可执⾏⽂件,并读取配置⽂件。
(2)build⽂件夹
因为我们在整个项⽬的编译过程中使⽤到了Cmake,所以在整个项⽬的编码完成后,我们需要进⼊这个⽂件夹来执⾏⾃动编译,这样可以将⽂件的所有编译的中间⽂件都存放到build⽂件夹中,不会使得编译过程中的⽂件很乱的存在其他⽂件夹中⽽影响阅读或者使⽤。
(3)example⽂件夹
在该⽂件夹中我们提供了使⽤的⽰例,因为RPC项⽬需要将proto⽂件中⽣成的相应类的虚函数进⾏重写(作为本地业务调⽤),所以在example中提供了如何实现相应的⽅法或者说如何使⽤该框架。这部分会在下⾯详细叙述。
(4)lib⽂件夹部分地方绿码有星要隔离14天
在最终结束时,我们将该项⽬⽣成的所有⽂件打包成为⼀个静态库交付⽤户使⽤,该静态库就存放在lib⽂件夹中,⽽且我们将项⽬的⽇志也输出到了这个⽂件夹中。
(5)src⽂件夹
项⽬的所有源⽂件都存放在这个⽂件夹中,该⽂件夹在后续会进⾏细致的分析。
(6)test⽂件夹
存放项⽬的各种测试⽂件,在项⽬的实现过程中存放了protobuf的测试⽂件,如果需要可以看下proto⽂件⽣成的pb.h⽂件和pb.cc⽂件。
(7)项⽬的使⽤⽂件
<:⾃动化编译的⽂件,该⽂件的组织形式会在第三部分:相应的中间件的使⽤情况中叙述;
autobuild.sh:当使⽤时对所有⽂件进⾏⾃动化编译⽣成可执⾏⽂件的脚本,在本项⽬中已经为其加上了可执⾏权限,使⽤时可以直接使⽤如下命令
./autobuild.sh
<:项⽬的整体介绍,因为在Git上上传的⽐较早,所以该⽂件并没有完善,可以忽略。
3.src⽂件夹下的⽂件作⽤
src⽂件中保存的是我们整个项⽬最⼲的⼲货,所有很重要的⽂件都在保存该⽂件夹下,include⽂件夹包含了所有的头⽂件,因为lockqueue.h部分是使⽤模板实现的,所以没有写对应的cc⽂件,其余各个⽂件都是有相应的cc⽂件存在的。所以include⽂件夹不再赘述,直接开始其余⽂件的描述。
(1)include:包含各个cc⽂件的头⽂件
(2)
读取配置文件失败
还是那句话,设计了CMakeLists的都是为了编译更⽅便,依旧在第三部分:相应的中间件的使⽤情况中叙述;
(3)lockqueue.h
在我们项⽬⽇志部分的实现的过程中,从项⽬的全局出发来说,我们不可能让整个项⽬只有⼀个线程,如果只有⼀个线程,就会导致所有的业务逻辑阻塞,⽐如:在写⽇志的过程中我们也需要时间和资源,但如果我们的在全局只开⼀个线程,那写⽇志时就不可以进⾏正常的业务逻辑了,⽽且写⽇志这个操作是⼀个磁盘I/O操作,这样就会更加的消耗时间。所以综合多⽅⾯考虑,我们需要再开⼀个线程来进⾏写⽇志的操作,那么设计这个线程时,如果同时涌⼊多个⽇志信息,处理不当时依旧会导致其阻塞,毕竟⼀对多的模式很难不阻塞,那我们该如何处理?增加⼀个缓冲区,所以我们设计了⼀个
⽇志的队列,将线程⽣产的⽇志写⼊到队列中,然后在写⽇志线程从队列中取⽇志信息,再写⼊到磁盘中,但同时,我们也需要考虑到线程安全,所以在写⽇志时需要使⽤到条件变量和互斥锁来实现线程安全,此时只需要让应答链接的线程⼊队,⽽另⼀个线程出队写⽇志即可。
//异步写⽇志的⽇志队列
template<typename T>
class LockQueue
关于屈原的故事{
public:
//多个worker线程都会写⽇志queue
void Push(const T &data){};
//⼀个线程读⽇志queue
T Pop(){};
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_condvariable;
};
(4)
该部分是我们的⽇志模块,在逻辑上思考,我们的⽇志对象只需要有⼀个即可,所以使⽤到了设计模式中的单例模式。另外我们的⽇志需要设置相应级别,毕竟从逻辑⽅⾯考虑我们的⽇志不可能只有⼀种正常状态,可能还会有错误信息,所以使⽤⼀个枚举类型来记录相应级别。在单例模式的设计上不再叙述了。在该类的构造函数中我们将写⽇志的线程设置为分离线程,这样就不会出现主线程和⼦线程的冲突情况了,并实现了两个成员⽅法:设置⽇志级别和写⽇志⽅法,写⽇志的⽅法负责将⽇志写到队列中。⽽为了⽅便使⽤,我们提供了两个宏来让⽤户使⽤更加简便,可以在实际应⽤中更便捷的写⽇志。
enum LogLevel
{
INFO,//普通⽇志信息
ERROR,//错误信息
};
//mprpc框架提供的⽇志系统
class Logger
{
public:
张卫健回应不拍戏//获取⽇志的单例
static Logger&GetInstance();
void StLogLevel(LogLevel level);//设置⽇志级别
void Log(std::string msg);//写⽇志
private:
int m_loglevel;//记录⽇志级别
LockQueue<std::string> m_lckQueue;//⽇志缓冲队列
//设置成单例模式,删除掉其拷贝构造,构造函数设置成私有
Logger();//设置为分离线程,如果使⽤宏的话则不会使⽤到该⽅法
Logger(const Logger&)=delete;
Logger(Logger&&)=delete;
};
//定义宏对⽤户实现⾃⼰写⽇志的效果
#define LOG_INFO(logmsgformat,...)
#define LOG_ERR(logmsgformat,...)
开机自动拨号(5)
MprpcApplication是我们项⽬的框架最重要的部分,也使⽤到了单例模式,负责初始化整个框架,在该部分中我们会读取配置⽂件,并和对⽅建⽴连接,
//mprpc框架的基础类,负责框架的⼀些初始化操作,使⽤单例模式设计
class MprpcApplication
{
public:
//初始化框架,按输⼊的命令进⾏命令解析,若解析成功则进⾏配置⽂件读取以及其他的逻辑
static void Init(int argc,char**argv);
//获取框架的单例
static MprpcApplication&GetInstance();
//读取配置⽂件
static MprpcConfig&GetConfig();
private:
static MprpcConfig m_config;
MprpcApplication(){};
MprpcApplication(const MprpcApplication&)=delete;
MprpcApplication(MprpcApplication&&)=delete;
};
//类外初始化静态变量
MprpcConfig MprpcApplication::m_config;
//在cc⽂件中实现成全局的⽅法,如果命令错误,则使⽤该⽅法输出帮助信息
void ShowArgHelp()
{
std::cout<<"format:command -i <configfile>"<<std::endl;
}
(6)
在上⼀步框架启动后需要读取配置⽂件,该部分就是实现配置⽂件加载的,根据配置⽂件的特性,在配置⽂件的存储中我们使⽤了unordered_map,但由于配置⽂件的书写中可能会有很多的空格回车等,所以我们⼜提供了去掉空格的成员⽅法供加载配置⽂件时调⽤,加载配置⽂件的功能由LoadConfigFile⽅法实现。⽽在我们提供RPC服务时还需要读取某个节点的地址等,所以再增加⼀个Load⽅法⽤于返回相应的值。
//配置⽂件为键值对,且不需要排序,所以使⽤⽆序哈希
//配置⽂件为:rpcserverip,rpcserverport,zookeeperip,zookeeperport
//框架读取配置⽂件类
class MprpcConfig
{
public:
//负责解析加载配置⽂件
void LoadConfigFile(const char*file);
//查询相应配置端
std::string Load(std::string const&key);
private:
std::unordered_map<std::string,std::string> m_configMap;
//去掉字符串前后的空格
void Trim(std::string &src_buf);
};
(7)
在我们使⽤RPC框架时,我们需要被调⽤⽅给调⽤⽅返回调⽤的情况,在protobuf中我们定义的⽅法都会⽣成对应的类,⽽所有⽅法最终都会被底层的MprpcChannel::CallMethod()⽅法调⽤,⽽该⽅法的参数中有⼀个google::protobuf::RpcController* controller,该参数是为了携带远程调⽤的控制(状态)信息的,包括是否失败和失败的原因,其实现如下:
class MprpcController:public google::protobuf::RpcController
{
public:
//构造函数什么时候放寒假
MprpcController();//初始化状态
void Reset();//重置状态
bool Failed()const;//⽤以查询成功或失败的状态
std::string ErrorText()const;//⽤于记录错误信息
void SetFailed(const std::string& reason);//设置是否失败
private:
bool m_failed;//RPC⽅法执⾏过程中的状态
std::string m_errtext;//RPC⽅法执⾏过程中的错误信息
};
(8)
所有通过stub代理对象调⽤的rpc⽅法都从这⾥处理,统⼀做⽅法调⽤的数据序列化和⽹络发送,在这部分中就使⽤到了muduo
库,muduo库实现了本项⽬中的所有⽹络业务,所以此部分也算是最核⼼的部分,CallMethod()⽅法的参数如下:
参数名意义
method RPC调⽤的⽅法
controller⽤于携带执⾏状态信息
request请求信息(参数等)
response响应信息,执⾏结果等
done回调函数
虽然只有⼀个⽅法,但代码很长,所以简化⼀下只写逻辑:
class MprpcChannel:public google::protobuf::RpcChannel
{
public:
//
void CallMethod(method,controller,request,response,done);
{
const google::protobuf::ServiceDescriptor* sd=method->service();//获取服务对象提供的服务
std::string service_name=sd->name();//service的name
std::string method_name=method->name();//method的name
//获取参数的序列化字符串长度 args_size
uint32_t args_size=0;
std::string args_str;
//已简化,从request中序列化参数,序列化成功后填充args_size,失败则向controller中填充错误信息//定义rpc的请求header
mprpc::RpcHeader rpcHeader;
//已简化,需要向相应变量中填充值
uint32_t header_size=0;
std::string rpc_header_str;
//已简化,从rpcHeader中序列化参数,序列化成功后填充header_size,失败则向controller中填充错误信息//组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0,std::string((char*)&header_size,4));//header_size
send_rpc_str+=rpc_header_str;//rpcheader
send_rpc_str+=args_str;//args
//使⽤TCP编程,完成rpc⽅法的远程调⽤
int clientfd=socket(AF_INET,SOCK_STREAM,0);
//已简化,创建socket失败则填充错误信息到controller中
//rpc调⽤⽅向调⽤service_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
std::string method_path="/"+service_name+"/"+method_name;
//获取ip地址和端⼝号
std::string host_data=zkCli.GetData(method_path.c_str());
//已简化,主机获取失败则填充错误信息到controller中
int idx=host_data.find(":");//分割符
//已简化,IP地址获取失败则填充错误信息到controller中
std::string ip=host_data.substr(0,idx);
uint32_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());
struct sockaddr_in server_addr;
/
/已简化,向地址中填充响应的参数
//链接rpc服务节点
if(-1==connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr)))
//已简化,链接失败则填充错误信息到controller中
//发送rpc请求
if(-1==send(clientfd,send_rpc_str.c_str(),send_rpc_str.size(),0))
//已简化,发送失败则填充错误信息到controller中
//接受rpc请求的响应值
char recv_buf[1024]={0};
int recv_size=0;
if(-1==(recv_size=recv(clientfd,recv_buf,1024,0)))
/
/已简化,响应失败则填充错误信息到controller中
std::cout<<recv_buf<<std::endl;
//反序列化rpc调⽤的响应数据
std::string response_str(recv_buf,0,recv_size);
if(!response->ParsePartialFromArray(recv_buf,recv_size))
//已简化,响应反序列化失败则填充错误信息到controller中
close(clientfd);
}
};