- · 《财讯 》栏目设置[06/28]
- · 《财讯 》投稿方式[06/28]
- · 《财讯 》征稿要求[06/28]
- · 《财讯 》刊物宗旨[06/28]
上海财经大学消息中心点亮智慧校园(3)
作者:网站采编关键词:
摘要:/* *RabbitMQ 生产者,将待办完成数据放入RabbitMQ 队列并更新推送状态 */JSONObject joTodo_complete = new JSONObject(); joTodo_("datatype", "complete"); joTodo_("app_id", (app_id
/*
*RabbitMQ 生产者,将待办完成数据放入RabbitMQ 队列并更新推送状态
*/JSONObject joTodo_complete = new JSONObject();
joTodo_("datatype", "complete");
joTodo_("app_id", (app_id));
joTodo_("app_key", app_key);
joTodo_("refno", refno);
Boolean result = ("task_queue_todo", oTodo_());
2.消费者
作为消费者的轮询程序实现的功能包括:(1)读取消息队列中的待办数据并调用消息汇聚中心接口,将数据写入消息汇聚中心数据库表;(2)“Retry Send”功能,定时拉取推送MQ 失败的消息,重新发送给RabbitMQ;(3)“Rewrite”功能,定时比较源头与消息汇聚中心数据的差异(消息落地数据库表与消息汇聚中心数据库表),调用消息汇聚中心接口将差异数据重新写入。主要程序实现(java 代码)如下:
/*
*RabbitMQ 消费者,从RabbitMQ 队
列获取待办数据并写入消息汇聚中心
*/try {
JSONObject jsonObj = (message);
long app_id = (("app_id").toString());
String app_key = ("app_key").toString();
String refno = ("refno").toString();
if ((app_key)) {
app_key = (String)((app_id));
}
if((null != ("datatype")) && ("push".equals(jsonObj.
get("datatype").toString()))) {
String message_type_code = ("message_type_
code").toString();
String target_type = ("target_type").toString();
String target_ids = ("target_ids").toString();
String content = ("content").toString();
String url = ("url").toString();
String do_step = ("do_step").toString();
表1 验证结果对比平均延迟(毫秒)2000 2000 1 0 100% 495.45 5000 5000 3 1 100% 444.27 7 3 100% 543.48消息发送总数消息接收总数Retry Send机制触发次数Rewrite 机制触发次数成功率
ret = (portalServ
erUrl, app_id, app_key, refno, message_type_code,
target_type, target_ids, content, url, do_step);//调用
消息汇聚中心接口
} else if ((null != ("datatype")) && ("complete".
equals(("datatype").toString())))
{
ret = (portalServerUrl,
app_id, app_key, refno);//调用消息汇聚中心接口
}
}
/*
*定时Retry send
*/
try
{
retrySendTodoPushFailureFromDb();//重发推送mq 失败的
未完成待办
retrySendTodoStateFailureFromDb();//重发推送mq 失败的
已完成待办
}
/*
*定时Rewrite
*/
try
{
matchTodoPushFailureFromDb();//重写未写入消息汇聚中心的待办
matchTodoCompleteFailureFromDb();//重写完成状态不一致的待办
}
系统验证
在方案验证环节,对系统可靠性(消息接收成功率)与及时性(消息的平均延迟时间(毫秒))进行了测试与考察,定义如下:
2.第i 条消息的延迟=第i 条消息的数据库写入时间-第i 条消息的发送时间
(n 为接收消息总数)
同时将“Retry Send 机制”的触发时间设定为5 分钟,时间窗口设定为1 分钟,“Rewrite 机制”的触发时间设定为10 分钟,时间窗口设定为5 分钟。java 程序循环调用待办推送接口API,分别发送待办2000条、5000 条、 条。在每条消息发出时,记录其发送时间,并和数据库记录生成时间做比较,得到每条消息的延迟时间。三次验证结果分别统计见表1。
验证过程并没有考虑程序本身执行时间以及网络延迟的影响,可见随着消息发送的增加,消息的平均延迟时间差别并不大;另外,在验证过程中,遇到了RabbitMQ 因网络连接超时等情况而发送失败的情况,但方案中“Retry send 机制”与“Rewrite 机制”保证了消息仍然被准确接收,验证了方案的可靠性。
验证过程未包含RabbitMQ 的吞吐量测试,有资料表明,RabbitMQ 吞吐量可达到5.95w/s,在消息持久化场景下,吞吐量也能达到2.6w/s 左右。这也说明AMQP 协议为了保证消息的可靠性在吞吐量上做了一定程度的取舍。
文章来源:《财讯》 网址: http://www.caixunbjb.cn/qikandaodu/2021/0223/598.html
上一篇:媒体融合时代的财经新媒体运营思考
下一篇:加强财经管理创建节约型军营