博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq 负载均衡源码
阅读量:6694 次
发布时间:2019-06-25

本文共 3529 字,大约阅读时间需要 11 分钟。

hot3.png

http://blog.csdn.net/qq_36569036/article/details/53447122

 

com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo.selectOneMessageQueue(String)

  • 启动负载均衡的服务:producer根据roundbin方式轮询topic下的所有队列来实现发送方的负载均衡。
//生产者默认负载均衡算法com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(Message, CommunicationMode, SendCallback) /**     * 如果lastBrokerName不为null,则寻找与其不同的MessageQueue     */    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {        if (lastBrokerName != null) {            int index = this.sendWhichQueue.getAndIncrement();            for (int i = 0; i < this.messageQueueList.size(); i++) {                int pos = Math.abs(index++) % this.messageQueueList.size();                MessageQueue mq = this.messageQueueList.get(pos);                if (!mq.getBrokerName().equals(lastBrokerName)) {                    return mq;                }            }            return null;        }        else {            int index = this.sendWhichQueue.getAndIncrement();            int pos = Math.abs(index) % this.messageQueueList.size();            return this.messageQueueList.get(pos);        }    }com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy/** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.alibaba.rocketmq.client.consumer;import java.util.List;import com.alibaba.rocketmq.common.message.MessageQueue;SendResult result = producer.send(msg, new MessageQueueSelector() {                @Override                public MessageQueue select(List
mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);/** * 生产者队列选择器 * * @author shijia.wxr
* @since 2013-7-25 */public interface MessageQueueSelector { public MessageQueue select(final List
mqs, final Message msg, final Object arg);}/**消费者 * Consumer队列自动分配策略 * * @author shijia.wxr
* @since 2013-7-24 */public interface AllocateMessageQueueStrategy { /** * 给当前的ConsumerId分配队列 *
* chen.si 这里的使用场景如下: * 前提: 1. 一个topic对应多个分区队列 * 2. 这个topic有一个consumer group进行消费 * 3. 有多个consumer(1个进程内多个 或者 多个进程 或者多个机器),这些consumer属于同一个consumer group * 在同一个group中的多个consumer中,他们需要区分的均衡的唯一的消费某个分区队列,这时候还不能多个consumer重复消费某个队列。 * 需要一个算法,能唯一的确定均衡分配,将不同的队列分配给不同的consumer。 * 这个算法的难点: 必须保证不同的进程 独立的 计算属于自己的消费队列, 而且这里的消费队列不能重复 * * @param currentCID * 当前ConsumerId * @param mqAll * 当前Topic的所有队列集合,无重复数据,且有序 * @param cidAll * 当前订阅组的所有Consumer集合,无重复数据,且有序 * @return 分配结果,无重复数据 */ public List
allocate(// final String currentCID,// final List
mqAll,// final List
cidAll// );}

 

转载于:https://my.oschina.net/xiaominmin/blog/1592252

你可能感兴趣的文章
我的友情链接
查看>>
页面内容显示不全问题的解决办法
查看>>
交换机的原理及VTP
查看>>
我的友情链接
查看>>
【JSP】JSP与Servlet的区别
查看>>
Lapm+xcache安装wordpress
查看>>
Nginx 负载均衡-加权轮询策略剖析
查看>>
mysql 全量备份和增量备份
查看>>
我的友情链接
查看>>
你必须知道的ADO.NET(三) 连接字符串,你小觑了吗?
查看>>
VUE页面渲染问题
查看>>
我的友情链接
查看>>
路由器故障可以通过Ping命令来排除
查看>>
jvm方法调用
查看>>
RPMBUILD
查看>>
个人常用Markdow语法代码备用
查看>>
假以时日,win7将替代XP系统
查看>>
iphone软件开发让更多人获得成功
查看>>
nginx log每小时轮循
查看>>
刷题-5
查看>>