使⽤RabbitMQ传输⼤⽂件,保证其完整性
最近计划⽤RabbitMQ传输⽂件,对于容量超过1G的⼤⽂件,肯定需要对⽂件进⾏分块传输;如果某⼀块丢失了,或者有损坏,必须有⼀种机制,通知发送⽅重新发送。Direct和Topic模式都可以⽤。下⾯是我的研习和设计思路。
RabbitMQ本⾝提供的确认机制
RabbitMQ通过Publish Confirm和Consumer Acknowledgement机制,让发送⽅和接收⽅分别与broker产⽣确认关系;由于是异步的,发送⽅和接收⽅并不需要互相确认。下⾯简单说明⼏种场景:
1. ⽣产者Publisher通过confirm.select⽅法设置channel(通道)confirm,消息到达broker后被放⼊指定的队列Queueu_A,broker发
送basic.ack⽅法给⽣产者Publisher,代表消息已被broker正确接收。如下图所⽰:
2. 开启Publish Confirm机制,message A 到达broker后,Erlang进程发⽣错误,导致message A⽆法转发到指定队列queue_A
中,此时broker会发送basic.nack⽅法给发送⽅,表⽰⽆法处理该消息。如下图所⽰:
3. 消息message A要求持久化到磁盘,broker要等到消息存盘完成后,才会给⽣产者发送basic.ack⽅法,告知消息已经被正确接收。
如下图所⽰:
4. ⽣产者调⽤basic.Publish发送消息,设置mandatory标志位为true,当broker发现指定队列Queue_A与Exchange并未绑定,导致
⽆法路由,⽆法投递时,broker会给⽣产者发送urn⽅法告知不到对应的队列,⽣产者就不会认为message A 被接收了,它会选择重发,或者其他处理。值得说明的是mandatory不需要启动Publish Confirm机制。如下图所⽰:
5. RabbitMQ官⽹上探讨了下⾯⼀种情景,说明开启了Publish Confirm的好处。如下图所⽰。
1. ⽣产者Publisher发送持久化的消息message A,broker接收到后投送到队列Queue_A中。
2. 此时broker将该消息推送给消费者和存盘这2个动作可能同时发⽣, 消费者接收到该消息后发送了确认acknowledge。
3. 但是broker异常重启了没有收到,此时message A的存盘动作还未完成
4. 结果是消息message A丢失了。broker重启后,消费者重连。
5. 事实上消费者已经收到message A 了,但逻辑上broker由于没有收到acknowledge,应当重新deliver message A的,不过
该消息已经丢失了。
6. 设置了Publish Confirm机制后,⽣产者也不会收到basic.ack⽅法,所以它知道该消息可能没有到达broker,它可以重发⼀
次,从⽽避免了broker⽆消息可deliver的尴尬。
还得根据业务情况分析
RabbitMQ的确保到达机制⽐较完善,设计也很合理,但使⽤过程中它涉及到confirm.select,basic.ack,basic.nack,urn⽅法以及consumer的acknowledge,⽐较⿇烦,⽽且这是MQ层⾯的机制,并不能保证⼤⽂件传输的数据完整性。
例如:传递8K⼤⼩的⽂件分块,是否丢失bit数据的校验;即使消息完全正确接收,但由于校验失败,如何通知发送端重发该分块;所以⾯对⼤⽂件的传输不能依靠RabbitMQ本⾝提供的消息确认机制;⽽要从业务层⾯考虑。
确保到达的核⼼是合理的重传机制
⽂件传输业务对⼤⽂件进⾏分块,编号,多线程并发传输,哈希效验是必然的选择,断点续传需要记录当前的发送状态也必不可少。那么下⾯来重点考虑重传;
为了保证⽂件传输性能,采⽤多线程多通道并发传输;发送分块与分块确认还必须是异步的;这⼀层
⾯就不要考虑RabbitMQ本⾝的确认机制,可以关掉。
可以模仿流媒体传输协议RTSP,创建控制通道,和数据通道;将⽂件分块抽象成任务,创建任务池,发送线程从任务池中获取任务,通过控制通道发送任务相关信息,通过数据通道发送任务;接收线程接收任务并验证,存盘;如果必要通过控制通道告知发送⽅需要重发,下⾯是程序基本结构图:(以2线程为例)
为了提升效率,发送线程会提前领取(n=(int)(任务总数/线程数))个任务,发送线程只负责发送任务,由控制线程来确定该任务是否完成,是否需要重新分配发送。由于是并发发送的,因此可能是
乱序的,所以通过Id号进⾏批量确认应该不可⾏。
大文件发送下图为了表⽰运⾏过程特意设计为顺序的,实际上发送⽅⽆须收到block1的确认消息,即可发送block2。
具体开发过程中还有很多细节需要优化
如果某⼀个任务始终发送不成功,需要考虑发送时间和次数限制,如果超过限制,就应该保存状态,退出发送,断开连接;等待下次续传。
接收⽅每⼀个任务成功都发送确认消息,控制线程检查任务是否发送成功,采⽤轮询,必然会消耗CPU时间。是不是可以采⽤不发成功确认,只发失败确认,控制线程根据失败任务Id取得该任务,分发给发送线程重发。
接收⽅如何知道,其实5号任务,发送⽅已经忘了发,或者由于逻辑的原因跳过了发送。
余之拙见,欢迎批评指正。
发布评论