1.mq集群(必须要hostname与hosts有对应关系,所以改本机的配置文件)
//全部设置主机名称,例如改为 node1
vim /etc/hostname
//设置host映射关系的配置文件(就可以不用ip而用主机名简单的访问别的服务器了)
vim /etc/hosts
10.211.55.74 node1
10.211.55.75 node2
10.211.55.76 node3
//确保每个节点cookie一致,使用scp复制命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
//启动mq服务和erlang虚拟机
rabbitmq-server -detached
//节点2,3关闭mq服务不改变erlang服务
rabbitmqctl stop_app
rabbitmqctl reset
//加入集群
rabbitmqctl join_cluster rabbit@node2
//启动mq
rabbitmqctl start_app
//查看集群状态(每个节点都可以查看到一样的信息)
rabbitmqctl cluster_status
//删除节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
//需要重新设置账号角色和权限
rabbitmqctl add_user admin 123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
2.镜像队列(队列在几个节点都有备份)一台机器down会消息丢失
//最好备份1份,多份 的话很浪费资源 mq界面admin ->Add / update a policy
ha-mode exactly #指定份数
ha-params 2 #几份 备份2份
ha-sync-mode automatic #同步模式,自动同步到其他机器
3.高可用负载均衡 haproxy+keepalive询问是否有心跳 ip转移到另外一个机器,那个ip还可以访问使用(略过,作者搭建起来使用不了,文档也没有信息步骤,以后好好学习nginx,体验一下负载均衡,以前搭建过,但是那时候什么都不懂…)
1.官网下载tar.gz包
#解压 tar -zxvf xxx
#内核版本要3.10
uname -r
2.进入目录编译c语言文件,cent7使用linux31
make TARGET=linux31
make install PREFIX=/usr/local/haproxy
3.配置文件9188是可以改的前端管理页面端口(这个启动起来了,但是不知道怎么使用)
4.启动 /usr/local/haproxy/sbin/haproxy -f /usr/local/haproxy/conf/haproxy.cfg
5.访问网页http://192.168.10.106:9188/haproxy
global
daemon
chroot /usr/local/haproxy
defaults
log 127.0.0.1 local0 err #[err warning info debug]
mode http
retries 2
option redispatch
option abortonclose
option dontlognull
maxconn 4096
timeout connect 5000ms
timeout client 30000ms
timeout server 30000ms
#timeout check 2000
listen admin_status
bind 0.0.0.0:9188
mode http
stats uri /haproxy
stats realm welcome login/ Haproxy
listen proxy-private
bind 0.0.0.0:8088
mode tcp
balance leastconn
option tcplog
server s1 192.168.10.106:8087 check weight 1 maxconn 2000
4.federationExchange联邦交换机 (解决远距离网络延迟问题,如北京和深圳距离比较远,北京用户访问北京mq,深圳用户访问深圳mq,但是北京和深圳必须要同步消息,因为相互依赖)(主要是在admin管理界面设置参数,没有代码)
#删除之前的policy
#全部机器启用联邦插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
#admin查看到联邦 Federation Status
#upstream 上游交换机的数据流向下游,下游不流向上游
1.消费者创建交换机(随便类型)Dirrect
feder_exchange
2.node3的 admin-> Federation upStream 格式amqp://账号:密码@节点ip
名字: your_name
uri: amqp://admin:123@node1
3.node3设置policy Add / update a policy
Pattern: ^fed.* #apply to Exchanges,交换机 的名字
Definition: federation-upstream your_name
#点击界面的 feder status查看状态
–消费者–
public class Feder {
public static final String EXCHANGE_NAME="fed_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
//得到临时队列
String QName="fed_queue1";
channel.queueDeclare(QName,false,false,false,null);
//交换机的名字和类型
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(QName,EXCHANGE_NAME,"wrong");
DeliverCallback deliverCallback=(tag, delivery)->{
System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
};
CancelCallback nCallback=(tag)->{
System.out.println("失败应答");
};
boolean IsAck=true;
channel.basicConsume(QName,IsAck,deliverCallback,(tag)->{});
}
}
–提供者—
public class FederexchangeProducer {
public static final String EXCHANGE_NAME="fed_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
//交换机的名字和类型
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
//交换机
//队列名
//设置消息持久化
//二进制
//fanout交换机模式
// channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//direct交换机
channel.basicPublish(EXCHANGE_NAME,"wrong", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
// boolean flag=channel.waitForConfirms();
// if(flag){
// System.out.println("消息已经写入磁盘的确认");
//
// }
}
}
}
5.联邦队列 Queue (和上面同理)(有重复消费的问题)
1.upstream同上
2.policy 也选择上游名称,注意这里选择Queue就可以了
6.Shovel(铲子) 将数据从队列移到另外一个队列 源端到目的端,也是同步数据
#比使用联邦更清晰,貌似很简单粗暴,
#如果在node2的q1数据转发到node1的q2,直接给node2的加一个q2队列增加数据,然后给node1增加q1q2队列并同步数据
#集群所有机器开启插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
#admin界面出现shovel management,添加界面 写source队列和destination队列,用AMQP0.9.1比较符合之前的习惯