Warren - 另一种RabbitMQ集群

warren是RabbitMQ的一种主/备节点集群,通过HAProxy等负载均衡器,我们可以很容易的实现该类型的集群

主/备节点之间没有任何的协作关系,所有影响到主服务器的操作都不会被自动转义到备服务器,你甚至可以让主备服务器的RabbitMQ版本不一致,比如你需要更新RabbitMQ的版本,用这样的方式来实现再好不过了,如果主服务器出现了问题可以切换到备服务器,并且备服务器不被影响

Warren为什么出现

RabbitMQ中,拥有持久化队列的节点发生故障时,该队列无法被重新创建,任何尝试重新声明该队列的客户端都会收到一个404 NOT_FOUND AMQP错误

当故障节点恢复时,持久化队列及其内容也会跟着恢复,但是在节点恢复前任何投递到该队列的消息要么丢失了,要么由于设置了mandatory=true,因消息无法路由到对应队列而导致客户端收到错误

为了避免节点故障导致的一系列问题(消息丢失,消息重发等),需要有一个良好的解决方案,即Warren

Warren的实现方式

基于pacemaker的warren实现方式

主备服务器使用共同的存储(元数据,内容,状态),当主服务器发生故障时,使用pacemaker将RabbitMQ的IP转移到备用节点并启动

这种方式有几个问题

  1. 由于共享存储,那么主节点如果发生错误,这样的错误也同样会发生在备节点
  2. 需要保证主节点和备节点RabbitMQ拥有相同的节点名称和UID,否则备节点将无法访问共享存储中的数据

基于负载均衡器的warren实现方式

我们这里使用这种方式来搭建Warren

启动两个RabbitMQ节点

# 启动第一个节点
RABBITMQ_NODE_PORT=5675 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15675}]" RABBITMQ_NODENAME=rabbit_4 rabbitmq-server -detached

# 启动第二个节点
RABBITMQ_NODE_PORT=5676 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15676}]" RABBITMQ_NODENAME=rabbit_5 rabbitmq-server -detached

新增一个HAProxy配置文件

global
    log 127.0.0.1   local0 info
    maxconn 4096
    daemon
    stats socket /tmp/haproxy.socket uid haproxy mode 770 level admin

defaults
    log     global
    mode    tcp
    option  tcplog
    option  dontlognull
    retries 3
    option  redispatch
    maxconn 2000
    timeout connect 5s
    timeout client  120s
    timeout server  120s

listen  warren_cluster
    bind 0.0.0.0:5690
    mode    tcp
    balance roundrobin
    server  rabbit_4 127.0.0.1:5675 check inter 5000 rise 2 fall 3
    # backup, 备用服务器,只有当非备用服务器不可用时才会使用该备用服务器
    server  rabbit_5 127.0.0.1:5676 backup check inter 5000 rise 2 fall 3

listen  private_monitoring
    bind 0.0.0.0:8101
    mode    http
    option  httplog
    stats   enable
    stats   uri /stats
    stats   refresh 5s

启动HAProxy

haproxy -f haproxy_warren.cfg

访问localhost:8101/stats

界面中应该有两个节点,分别为rabbit_4rabbit_5,其中rabbit_4是亮绿色,表示它是一个主服务器,rabbit_5是亮蓝色,表示它是一个备用服务器

测试warren

构造消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(AMQP_HOST);
factory.setPort(AMQP_PORT);
factory.setUsername(AMQP_USER);
factory.setPassword(AMQP_PASS);

try {
    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    channel.exchangeDeclare("cluster-test", BuiltinExchangeType.DIRECT, true, false, null);
    channel.queueDeclare("cluster-test", true, false, false, null);
    channel.queueBind("cluster-test", "cluster-test", "cluster-test");

    System.out.println("Ready for testing!");

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            JSONObject message = JSONUtil.parseObj(new String(body, StandardCharsets.UTF_8));

            System.out.println("Received: [" + message.getStr("content") + "] at [" + message.getStr("time") + "]");

            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };

    channel.basicConsume("cluster-test", false, "cluster-test", consumer);
} catch (IOException | TimeoutException e) {
    e.printStackTrace();
}
构造生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(AMQP_HOST);
factory.setPort(AMQP_PORT);
factory.setUsername(AMQP_USER);
factory.setPassword(AMQP_PASS);

try {
    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    JSONObject object = new JSONObject();
    object.set("content", "hello, haProxy");
    object.set("time", DateUtil.now());

    AMQP.BasicProperties props = new AMQP.BasicProperties().builder().contentType("application/json").build();

    channel.basicPublish("cluster-test", "cluster-test", props, JSONUtil.toJsonStr(object).getBytes());

    System.out.println("Sent cluster test message.");

    channel.close();
    connection.close();
} catch (TimeoutException | IOException e) {
    e.printStackTrace();
}
测试步骤
  1. 启动消费者
  2. 启动生产者,发送消息
  3. 消费者成功接收到消息
  4. 使用cli将主服务器停止, rabbitmqctl -n rabbit_4 stop_app
  5. 消费者出现连接失败
  6. 15s后,消费者重新连接到备用服务器
  7. 启动生产者,发送消息
  8. 消费者成功接收到消息

如果以上步骤都符合预期,那么Warren就搭建完成了