http://blog.csdn.net/qq_36569036/article/details/53447122
com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo.selectOneMessageQueue(String)
- 启动负载均衡的服务:producer根据roundbin方式轮询topic下的所有队列来实现发送方的负载均衡。
//生产者默认负载均衡算法com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(Message, CommunicationMode, SendCallback) /** * 如果lastBrokerName不为null,则寻找与其不同的MessageQueue */ public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName != null) { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return null; } else { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); return this.messageQueueList.get(pos); } }com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy/** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.alibaba.rocketmq.client.consumer;import java.util.List;import com.alibaba.rocketmq.common.message.MessageQueue;SendResult result = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);/** * 生产者队列选择器 * * @author shijia.wxr * @since 2013-7-25 */public interface MessageQueueSelector { public MessageQueue select(final List mqs, final Message msg, final Object arg);}/**消费者 * Consumer队列自动分配策略 * * @author shijia.wxr * @since 2013-7-24 */public interface AllocateMessageQueueStrategy { /** * 给当前的ConsumerId分配队列 * * chen.si 这里的使用场景如下: * 前提: 1. 一个topic对应多个分区队列 * 2. 这个topic有一个consumer group进行消费 * 3. 有多个consumer(1个进程内多个 或者 多个进程 或者多个机器),这些consumer属于同一个consumer group * 在同一个group中的多个consumer中,他们需要区分的均衡的唯一的消费某个分区队列,这时候还不能多个consumer重复消费某个队列。 * 需要一个算法,能唯一的确定均衡分配,将不同的队列分配给不同的consumer。 * 这个算法的难点: 必须保证不同的进程 独立的 计算属于自己的消费队列, 而且这里的消费队列不能重复 * * @param currentCID * 当前ConsumerId * @param mqAll * 当前Topic的所有队列集合,无重复数据,且有序 * @param cidAll * 当前订阅组的所有Consumer集合,无重复数据,且有序 * @return 分配结果,无重复数据 */ public List allocate(// final String currentCID,// final List mqAll,// final List cidAll// );}