登录
注册
node.js 学习社区
使用nodejs和redis 搭建消息队列MQ

桌椅童鞋

2018-03-27 11:54

MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ。

以上介绍仍旧来自百度百科.

消息队列产品对比

目前比较流行的MQ有2种,ActiveMQ 以及 RabbitMQ , RabbitMQ性能号称能够达到每秒10000,而REDIS官方的压力测试值在7-8万之间,而且是去掉了网络IO操作,真实情况我估计在每秒2-3万的并发操作,但这个数目对于一般的应用应该足够了.

Redis如何支持消息队列?

在新版本的redis v2.6以上以及以上版本开始支持 subscribe 以及 publish 操作,  subscribe订阅一个频道,publish可以像频道广播消息. 这个机制最老的应用应该是算是聊天室了.

Sub/Pub 模式固然很好用,但是同样有一个问题,就是如果有多个人订阅了同一频道,而这个频道的数据只能被一个接收方处理,不能够重复处理,这时该怎么办?

解决方法有2种,

1. publish  将数据写入到一个list or sorted list 队列,写完成后开始给终端广播消息,告诉大家,有新的数据等待处理,这个时候,谁能pop到数据,就是谁处理,这个操作是原子性的,也就是说不会被重复处理.

2. 使用阻塞模式, redis提供了blpop brpop这种操作,也就是一直阻塞一个队列,直到有数据来. 这种模式保证了数据的原子性,而且使应用程序可以支持分布式多台机器部署.

Sub/Pub模式 (sub.js):

var redis = require("redis");
var client = redis.createClient(6379, '127.0.0.1', {connect_timeout: 1});

//订阅一个频道
var sub = function(c) {
    var c = c || 'roban:test:channel';
    client.subscribe(c,function(e){
        console.log('starting subscribe channel:'+c);
    });
};

//订阅一个频道
sub();

//处理错误,如果出现错误,或者服务器断开了链接,等待恢复时,继续订阅这个频道
client.on('error', function(error) {
    console.log(error);
    sub();
});


//订阅处理函数
client.on('message',function(err,response){
    console.log(response);
});

打开redis命令行,输入以下命令:

publish roban:test:channel hello

发布这条信息后,sub端会输出以下信息:

Robans-Pro:node robanlee$ node demo.js 
starting subscribe channel:roban:test:channel
hello


原文地址:https://www.cnblogs.com/jkll/p/4550100.html

回复 · 2

  • 补充:

    一个Redis client发布消息,其他多个redis client订阅消息,发布的消息“即发即失”,redis不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于JMS中“非持久”类型的消息。

    消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)

    消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

    一旦subscribe端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失。

    如果你非常关注每个消息,那么你应该考虑使用JMS或者基于Redis做一些额外的补充工作,如果你期望订阅是持久的,那么如下的设计思路可以借鉴(如下原理基于JMS):

    1) subscribe端首先向一个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯一的订阅者,例如:sub:email,sub:web。此SET称为“活跃订阅者集合”

    2) subcribe端开启订阅操作,并基于Redis创建一个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息。此LIST称为“订阅者消息队列”

    3) publish端:每发布一条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发布的消息。

    4) 到此为止,我们可以基本保证,发布的每一条消息,都会持久保存在每个“订阅者消息队列”中。

    5) subscribe端,每收到一个订阅消息,在消费之后,必须删除自己的“订阅者消息队列”头部的一条记录。

    6) subscribe端启动时,如果发现自己的自己的“订阅者消息队列”有残存记录,那么将会首先消费这些记录,然后再去订阅。

    0

  • 首先,redis的队列实际在代码逻辑中不需要由我们自己实现,因此一个所谓的 RedisMQ 对象实际是一个 redis key以及对其操作的一些封装。

    发布订阅模式:

    redis 从 2.0.0 版本开始支持 pub/sub 指令。详情见 http://redis.io/topics/pubsub

    实现思想很简单,Publisher调用redis的publish方法往特定的channel发送消息,Subscriber在初始化的时候要subscribe到该channel,一旦有消息就会立即接收。

    比较简单的demo可参见:http://shift-alt-ctrl.iteye.com/blog/1867454 ,此链接博客中写得已较详细,本文便不再赘述。


    生产消费模式:

    该方法是借助redis的list结构实现的。

    Producer调用redis的lpush往特定key里塞入消息,Consumer调用brpop去不断监听该key。

    producer:

    // producer code
    String key = "demo:mq:test";
    String msg = "hello world";
    redisDao.lpush(key, msg);

    consumer:

    // consumer code
    String key = "demo:mq:test";
    while (true) {
     // block invoke
     List<String> msgs = redisDao.brpop(BLOCK_TIMEOUT, listKey);
     if (msgs == null) continue;
     String jobMsg = msgs.get(1);
     processMsg(jobMsg);
    }

    当有多个consumers的时候,它会按照brpop调用的顺序分派消息,并非随机。

    BLOCK_TIMEOUT不建议设成infinity(有些redis驱动也直接不支持inifinity),我们目前设成30(单位是秒)情况良好。



    0

发表回复

你可以在回复中 @ 其他人