netty做集群 channel如何共享?
方案一:
netty 集群,通过rocketmq等MQ 推送到所有netty服务端,channel 共享无非是要那个通道都可以发送消息向客户端
方案二:
MQ广播+ 多Netty ,Netty收到MQ消息后,如果本地存储有该channel,就发送,没有存储就忽略,完美解决,不需要做channel的共享。
这里使用rabbitmq的订阅发布的广播模式(如果有其他服务可以使用Topic)
1、添加配置文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
server:
port: 8888
nettyEventExchange: netty.event.exchange
nettyQueue: netty.${server.port}.message.queue
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
retry:
####开启消费者(程序出现异常的情况下会)进行重试
enabled: false #默认为false
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
simple:
acknowledge-mode: manual
simple.concurrency: 10 #线程池大小,默认为10
simple.max-concurrency: 100 #最大线程池大小,默认为10
2、添加配置文件和实体
package yws.net.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventMessage extends Throwable implements Serializable {
/**
* 消息队列的消息id
*/
private String messageId;
/**
* 事件类型
*/
private String eventMessageType;
/**
* 业务id
*/
private String bizId;
/**
* 账号
*/
private Long accountNo;
/**
* 消息体
*/
private String content;
/**
* 备注
*/
private String remark;
}
package yws.net.config;
import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 如果多集群部署需要使用
* 推送消息时候使用订阅发布,对所有服务都发送,每个服务都查询自己
* jvm里面是否有存储对应的channel,有的话推送,没有的话忽略
* */
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
@Value("${nettyEventExchange}")
private String nettyEventExchange;
/**
* 队列
*/
@Value("${nettyQueue}")
private String nettyQueue;
/**
* 创建交换机 Fanout类型
* @return
*/
@Bean
public FanoutExchange nettyEventExchange(){
//durable:开启持久化,autoDelete:不自动删除
//return new TopicExchange(nettyEventExchange,true,false);
return new FanoutExchange(nettyEventExchange,true,false);
}
/**
* 消息转换器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 队列和交换机的绑定关系建立
*/
@Bean
public Binding nettyAddApiBinding() {
return BindingBuilder.bind(nettyQueue()).to(nettyEventExchange());
}
/**
* 普通队列,用于被监听
*/
@Bean
public Queue nettyQueue() {
return new Queue(nettyQueue);
}
}
3、添加监听
package yws.net.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import yws.net.enums.BizCodeEnum;
import yws.net.exception.BizException;
import yws.net.model.EventMessage;
import yws.net.util.redis.RedisUtils;
import java.io.IOException;
/**
* @author Yws
* @since 2022/8/26 集群消息推送消费者
*/
@Component
@Slf4j
@RabbitListener(queues = "${nettyQueue}")
public class NettyMsgMqListener {
@Autowired private RedisUtils redisUtils;
@Value("${server.port}")
public int port;
@RabbitHandler
public void nettyMsgHandler(EventMessage eventMessage, Message message, Channel channel)
throws IOException {
//测试多服务
log.info("测试端口:{}",port);
try {
//查看对应的netty服务的本地存储里边是否有该channel,有的话推送消息、没有的话忽略
} catch (Exception e) {
// 处理业务异常,还有进行其他操作,比如记录失败原因
log.error("消费失败1:" + eventMessage);
}
//确认消息消费成功,
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
4、启动多个springboot项目,启动多个消费者,多集群可以添加jvm参数
-Dserver.port=8910 -Dnetty.port=7001
5、添加测试类
package yws.net.biz;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import yws.net.MainApplication;
import yws.net.config.RabbitMQConfig;
import yws.net.model.EventMessage;
/**
* @author Yws
* @since 2023/1/4
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MainApplication.class)
@Slf4j
public class NettyTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMQConfig rabbitMQConfig;
@Test
public void test(){
String nettyEventExchange = rabbitMQConfig.getNettyEventExchange();
System.out.println(nettyEventExchange);
//构建消息
EventMessage eventMessage = EventMessage.builder()
.content("test")
.build();
rabbitTemplate.convertAndSend(rabbitMQConfig.getNettyEventExchange(),
"", eventMessage);
}
}