网络编程基础
OSI七层模型
应用层:Http协议、电子文件传输、文件服务器等
表示层:解决我们不同系统之间语法的通讯
会话层:建立与应用程序之间的通讯
传输层:提供了端口号和接口协议TPC/UDP
网络层:为数据包选择路由 路由器、交换机
定义了ip地址,可以根据ip地址找到对应的服务器
数据链路层:传输有地址的帧以及错误检测功能
物理层:以二进制形式,在物理机器上实现传输
(光纤、各种物理介质传输)
一个域名底层是如何解析
单原理实现:浏览器访问域名,根据域名先从本地host文件
C:\Windows\System32\drivers\etc\hosts文件 查找匹配对应的ip与域名,如果本地
Host文件 没有的情况下,则联网去电信运营商查找。
Socket网络通讯技术
TCP与UDP协议
Socket
Socket(套接字)是两个程序之间通过双向信道进行数据交换的端,可以理解为接口。使用Socket编程也称为网络编程,Socket只是接口并不是网络通信协议。
TCP与UDP区别
TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议
TCP协议应用场景:HTTP、HTTPS、FTP协议
UDP是面向无连接通讯协议,udp通讯时不需要接受方确定,属于不可靠传输,可能会存在丢包的现象。
UDP协议应用场景:QQ语音、QQ视频
三次握手和四次挥手(分手)概念
首先我们要知道在tcp建立连接中,有一些名词表示:
比如:syn就是建立连接、ack就是确认标志、fin终止标志
第一次握手:客户端会向服务器端发送码为syn=1,随机产生一个seq_number=x的数据包到服务器端 (syn)
第二次握手:服务端接受到客户端请求之后,确认ack=x+1, 于是就向客户端发送syn(服务端独立生成 随机生成数字Y)+ack
第三次握手:客户端接受syn(随机数Y)+ack,向服务器端发送ack=y+1,此包发送完毕即可 建立tcp连接。
白话文翻译:
第一次握手:客户端向服务器端发送 问服务器你在不在?
第二次握手:服务器端回应客户端说:我在的。
第三次握手:客户端发送给服务器端:ok,那我开始建立连接的
关闭连接:
第一次挥手: 客户端向服务器端发送释放的报文,停止发送数据 fin=1、生成一个序列号seq=u;
第二次挥手: 服务器端接受到释放的报文后,发送ack=u+1;随机生成的seq=v给客户端;当前状态为关闭等待状态
客户端收到了服务器确认通知之后,此时客户端就会进入到终止状态,等待服务器端发送释放报文。
第三次挥手:服务器端最后数据发送完毕之后,就向客户端发送连接释放报文,FIN=1,ack=u+1 当前为半关闭状态,随机生成一个随机树w
第四次挥手,客户端必须发出确认,ACK=1,ack=w+1,而自己的序列号是seq=u+1,此时,客户端就进入了TIME-WAIT(时间等待)状态。注意此时TCP连接还没有释放,必须经过2∗∗MSL(最长报文段寿命)的时间后,当客户端撤销相应的TCB后,才进入CLOSED状态。
服务器只要收到了客户端发出的确认,立即进入CLOSED状态。同样,撤销TCB后,就结束了这次的TCP连接。可以看到,服务器结束TCP连接的时间要比客户端早一些。
白话文翻译四次挥手:
第一次挥手 客户端向服务端发送一个释放连接通知;
第二次挥手 服务端接受到释放通知之后,告诉给客户端说等待一下,因为可能存在有其他的数据没有发送完毕,等待数据全部传输完毕之后就开始 关闭连接;
第三次挥手 服务器端所有的数据发送完毕之后,就告诉客户端说现在可以释放连接了。
第四次挥手: 客户端确认是最终释放连接通知,ok 就开始 就向服务区端发送我们可以开始关闭连接啦;
Socket Tcp通讯代码
服务端
public static void main(String[] args) throws IOException {
// 创建Server Socket
ServerSocket serverSocket = new ServerSocket();
// 创建我们的 Socket 监听连接地址和端口号
SocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
// 绑定我们的监听地址
serverSocket.bind(address);
// 等待接受请求
System.out.println("等待客户端发送消息..");
Socket accept = serverSocket.accept();
// 获取OutputStream流
PrintWriter socketOut = new PrintWriter(accept.getOutputStream());
byte buf[] = new byte[1024];
if (accept.getInputStream().read(buf) > 0) {
System.out.println("服务器端接受到客户端消息:" + new String(buf));
}
// 服务器端响应消息
String sendStr = "我是高腾飞";
socketOut.write(sendStr);
socketOut.flush();
// 关闭所有连接
socketOut.close();
accept.close();
serverSocket.close();
}
客户端
public static void main(String[] args) throws IOException {
final Socket socket = new Socket();
// 创建socket地址
SocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
socket.connect(address);
// 创建PrintWriter
PrintWriter socketOut = new PrintWriter(socket.getOutputStream());
BufferedReader socketIn = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
// 向服务器发送的内容
String sendStr = "客户端问服务器端: 你是高腾飞么?";
socketOut.write(sendStr);
socketOut.flush();
String receiveStr = socketIn.readLine();
System.out.println("服务器端回复:: " + receiveStr);
// 关闭连接
socketOut.close();
socketIn.close();
socket.close();
}
Socket Udp通讯代码
服务端
public static void main(String[] args) throws IOException {
DatagramSocket socket = new DatagramSocket(8800);
//2.创建数据报,用于接收客户端发送的数据
byte[] data = new byte[1024];
//创建字节数组,指定接收的数据包的大小
DatagramPacket packet = new DatagramPacket(data, data.length);
//3.接收客户端发送的数据
System.out.println("****服务器端已经启动,等待客户端发送数据");
//此方法在接收到数据报之前会一直阻塞
socket.receive(packet);
//4.读取数据
String info = new String(data, 0, packet.getLength());
System.out.println("我是服务器,客户端说:" + info);
/*
* 向客户端响应数据
*/
//1.定义客户端的地址、端口号、数据
InetAddress address = packet.getAddress();
int port = packet.getPort();
byte[] data2 = "我是高腾飞~~".getBytes();
//2.创建数据报,包含响应的数据信息
DatagramPacket packet2 = new DatagramPacket(data2, data2.length, address, port);
//3.响应客户端
socket.send(packet2);
//4.关闭资源
socket.close();
}
客户端
public static void main(String[] args) throws IOException {
InetAddress address = InetAddress.getByName("localhost");
int port = 8800;
byte[] data = "你是高腾飞?".getBytes();
//2.创建数据报,包含发送的数据信息
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
//3.创建DatagramSocket对象
DatagramSocket socket = new DatagramSocket();
//4.向服务器端发送数据报
socket.send(packet);
/*
* 接收服务器端响应的数据
*/
//1.创建数据报,用于接收服务器端响应的数据
byte[] data2 = new byte[1024];
DatagramPacket packet2 = new DatagramPacket(data2, data2.length);
//2.接收服务器响应的数据
socket.receive(packet2);
//3.读取数据
String reply = new String(data2, 0, packet2.getLength());
System.out.println("我是客户端,服务器说:" + reply);
//4.关闭资源
socket.close();
}
Http协议7个请求过程
Http协议一种超文本传输的协议,基于TCP/IP协议的包装,包含:img、css、js、html等。
Http协议的特征:
- 无状态
- 请求与响应模型
- 简单快速
- 灵活可以传输任何类型
Http分为 请求与响应
请求:
请求行
请求头
请求方法 Get/Post
响应:
响应行
响应头
响应体
BIO,NIO,AIO 模型
输入IO与输出IO原理
内核态: CPU可以访问内存所有数据, 包括外围设备, 例如硬盘, 网卡;
用户态: (独立创建应用程序) 只能受限的访问内存, 且不允许访问外围设备. 占用CPU的能力被剥夺, CPU资源可以被其他程序获
1、BIO(Blocking I O) 同步阻塞模型,一个线程对应一个客户端连接。
应用场景:
BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高, 但程序简单易理解。
2、NIO(Non Blockin g IO) 同步非阻塞,
服务器实现模式为一个线程可以处理多个请求(连接),客户端发送的连接请求都会注册到 多路复用器selector上,多路复用器轮询到连接有IO请求就进行处理。
应用场景:
NIO方式适用于连接数目多且连接比较短(轻操作) 的架构, 比如聊天服务器, 弹幕系统, 服务器间通讯,编程比较复杂, JDK1.4 开始支持
3、AIO(NIO 2.0) 异步非阻塞,
由操作系统完成后回调通知服务端程序启动线程去处理, 一般适用于连接数较多且连接时间较长的应用。是在NIO的基础上进一步封装的。
应用场景:
AIO方式适用于连接数目多且连接比较长(重操作) 的架构,JDK7 开始支持
同步和异步的区别
同步也就是程序从上往下实现执行;
异步从新开启一个新分支,相互不会影响;
站在Http协议上分析同步与异步区别:
我们的Http协议请求默认情况下同步形式调用,如果调用过程非常耗时的情况下 客户端等待时间就非常长, 这种形式我们可以理解阻塞式;
解决办法:耗时的代码我们可以使用多线程或者MQ实现处理,但是不能立马获取结果; 客户端可以主动查询
阻塞与非阻塞的区别
阻塞:如果我没有获取到结果的情况下,当前线程从运行状态切换为阻塞状态 内核角度分析:用户空间切换到内核空间
非阻塞:如果我没有获取到结果的情况下,当前的线程不会阻塞。
BIO(Blocking IO) 同步阻塞模型
一个线程处理一个客户端请求;
缺点:
1、 IO代码里read操作是阻塞操作,如果获取不到数据的情况下,则会阻塞;
如果线程使用过多的情况下,非常消耗服务器端cpu的资源;
应用场景:
BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高
NIO(Non Blocking IO) 同步非阻塞
NIO同步非阻塞的原理:多个客户端发送连接请求注册到(多路复用器)selector中,
多路复用器使用轮训机制实现检测每个io请求有数据就进行处理。
底层实现原理:
I/O多路复用底层一般用的Linux API(select,poll,epoll)来实现
NIO 有三大核心组件: Channel(通道), Buffer(缓冲区),Selector(选择器)
1.Channel(通道) :称之为通道,和IO相连,通信双方进行数据交流的通道,需要和buffer结合使用。
2.Buffer(缓冲区) :对数据的读取/写入需要使用buffer,buffer本质就是一个数组。
3.Selector(选择器): IO多路复用 一个线程Thread使用选择器Selector通过轮询的方式去监听多个通道Channel上的事件,从而让一个线程可以处理多个事件。
Netty实战
初始Netty
什么是Netty
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。
异步非阻塞IO
BIO 同步阻塞IO
NIO 同步非阻塞IO linux操作系统内核
AIO异步非阻塞IO linux服务器内核支持不是很完善
为什么需要使用Netty
1.传统的NIO 的类库和 API 繁杂, 使用麻烦: 需要熟练掌握Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer等。
2.开发工作量和难度都非常大: 例如客户端面临断连重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等。
3.Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
线程模型
注意:大多数企业都在使用Netty4,Netty5已经被废弃
为什么Netty使用NIO而不是AIO
原因:在Linux系统上,AIO的底层实现仍使用EPOLL,与NIO相同,因此在性能上没有明显的优势;Windows的AIO底层实现良好,但是Netty开发人员并没有把Windows作为主要使用平台考虑。
为什么要使用netty
- 异步非阻塞通讯 aio
- 高效的线程模型
- 无锁化的设计
- 高性能序列化框架
- 零拷贝、内存池
- 灵活的TCP协议参数设置
Netty使用场景
- RPC 框架Dubbo+动态代理设计模式
- Rocketmq netty 数据结构模式
- 聊天室
- 游戏
- Xxl-job分布式任务调度平台
TCP协议粘包与拆包
产生的背景
1.要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包;
2.接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包;
3.要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包;
4.待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。即TCP报文长度-TCP头部长度>MSS。
MSS (最大报文段长度):
最大报文段长度(MSS)是TCP协议的一个选项,用于在TCP连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度(不包括文段头)。
基于Netty手写mq组件
NettyMQServer
public class NettyMQServer {
public void bind(int port) throws Exception {
/**
* Netty 抽象出两组线程池BossGroup和WorkerGroup
* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
// 设定NioServerSocketChannel 为服务器端
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.option(ChannelOption.SO_BACKLOG, 100)
// 服务器端监听数据回调Handler
.childHandler(new ChildChannelHandler());
//绑定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("当前服务器端启动成功...");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅关闭 线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置异步回调监听
ch.pipeline().addLast(new MayiktServerHandler());
////// 1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
}
public static void main(String[] args) throws Exception {
int port = 9008;
new NettyMQServer().bind(port);
}
private static final String type_consumer = "consumer";
private static final String type_producer = "producer";
/**
* mq缓存消息
*/
private static LinkedBlockingQueue<String> msgs = new LinkedBlockingQueue<String>();
private static ArrayList<ChannelHandlerContext> consumerChannels = new ArrayList();
// 生产者投递消息的:topicName
public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
/**
* 服务器接收客户端请求
*
* @param ctx
* @param data
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object data)
throws Exception {
String body = byteBufToString(data);
JSONObject jsonObject = JSONObject.parseObject(body);
String type = jsonObject.getString("type");
switch (type) {
case type_consumer:
consumer(ctx);
return;
case type_producer:
String msg = jsonObject.getString("msg");
producer(msg);
return;
}
}
private void consumer(ChannelHandlerContext ctx) {
// 保存连接
consumerChannels.add(ctx);
String poll = msgs.poll();
if (!StringUtils.isEmpty(poll)) {
// 从MQ服务器端获取消息给消费者
ByteBuf resp = Unpooled.copiedBuffer(poll.getBytes());
ctx.writeAndFlush(resp);
}
// 思路
}
/**
* 缓存生产者投递消息
*
* @param msg
*/
private void producer(String msg) {
// 缓存消息
msgs.offer(msg);
consumerChannels.forEach((ctx) -> {
String poll = msgs.poll();
if (StringUtils.isEmpty(poll)) {
return;
}
// 发送数据给消费者
ByteBuf resp = Unpooled.copiedBuffer(poll.getBytes());
ctx.writeAndFlush(resp);
});
// 分组消息
}
private String byteBufToString(Object msg) throws UnsupportedEncodingException {
if (msg == null) {
return null;
}
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
return body;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
consumerChannels.remove(ctx);
ctx.close();
}
}
}
NettyMQProducer
public class NettyMQProducer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
//// 1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyMQProducer client = new NettyMQProducer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "producer");
JSONObject msg = new JSONObject();
msg.put("userId", "123456");
msg.put("age", "23");
data.put("msg", msg);
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
NettyMQConsumer
public class NettyMQConsumer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyMQConsumerHandler());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyMQConsumer client = new NettyMQConsumer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyMQConsumerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "consumer");
// 消费者发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("消费者读取数据:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
基于netty手写服务注册
依赖文件
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency>
<!--jboss-marshalling-serial -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.4.11.Final</version>
</dependency>
封装实体类
package com.gtf.NETTYserver.entity;
import java.io.Serializable;
public class AddresDto implements Serializable {
private String addres;
public AddresDto(String addres) {
this.addres = addres;
}
public AddresDto() {
}
public String getAddres() {
return addres;
}
public void setAddres(String addres) {
this.addres = addres;
}
@Override
public String toString() {
return "AddresDto{" +
"addres='" + addres + '\'' +
'}';
}
}
package com.gtf.NETTYserver.entity;
import io.netty.channel.ChannelHandlerContext;
import java.io.Serializable;
public class AddresEntity implements Serializable {
private String addres;
// 生产的连接
private ChannelHandlerContext ctx;
public AddresEntity() {
}
public AddresEntity(String addres, ChannelHandlerContext ctx) {
this.addres = addres;
this.ctx = ctx;
}
public String getAddres() {
return addres;
}
public void setAddres(String addres) {
this.addres = addres;
}
public ChannelHandlerContext getCtx() {
return ctx;
}
public void setCtx(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public String toString() {
return "AddresEntity{" +
"addres='" + addres + '\'' +
'}';
}
}
package com.gtf.NETTYserver.entity;
import java.io.Serializable;
public class AgreementEntity implements Serializable {
// 0为生产者 1为消费者
private Integer type;
// 服务id
private String serviceId;
// 服务地址
private String addres;
public AgreementEntity(Integer type, String serviceId, String addres) {
this.type = type;
this.serviceId = serviceId;
this.addres = addres;
}
public Integer getType() {
return type;
}
public String getServiceId() {
return serviceId;
}
public String getAddres() {
return addres;
}
public void setType(Integer type) {
this.type = type;
}
public void setServiceId(String serviceId) {
this.serviceId = serviceId;
}
public void setAddres(String addres) {
this.addres = addres;
}
}
package com.gtf.NETTYserver.entity;
import java.io.Serializable;
import java.util.List;
public class RespEntity implements Serializable {
private Integer code;
private String msg;
private Object data;
public RespEntity(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
public RespEntity(Integer code, String msg, Object data) {
this.code = code;
this.msg = msg;
this.data = data;
}
@Override
public String toString() {
return "RespEntity{" +
"code=" + code +
", msg='" + msg + '\'' +
", data=" + data +
'}';
}
}
服务端
package com.gtf.NETTYserver.server;
import com.gtf.NETTYserver.MarshallingCodeCFactory;
import com.gtf.NETTYserver.entity.AddresDto;
import com.gtf.NETTYserver.entity.AddresEntity;
import com.gtf.NETTYserver.entity.AgreementEntity;
import com.gtf.NETTYserver.entity.RespEntity;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.http.ResponseEntity;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* 注册中心
*/
public class RegistryServer {
/**
* 生产者
*/
public static final int type_producer = 0;
/**
* 消费者
*/
public static final int type_consumer = 1;
private static ConcurrentHashMap<String, List<AddresEntity>> keyaddresses = new ConcurrentHashMap<String, List<AddresEntity>>();
//key 为连接 value 服务名称
private static ConcurrentHashMap<ChannelHandlerContext, String> ctxs = new ConcurrentHashMap<>();
public void bind(int port) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1280)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 新增Marshaling编码器
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder()
);
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder()
);
ch.pipeline().addLast(
new ServerHandler()
);
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8006;
new RegistryServer().bind(port);
}
@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
AgreementEntity agreementEntity= (AgreementEntity) msg;
switch (agreementEntity.getType()){
case type_producer:
//生产者
producer(agreementEntity,ctx);
break;
case type_consumer:
consumer(agreementEntity,ctx);
break;
}
System.out.println("服务器端接收到响应内容" + msg);
}
/**
* 生产者
* @param agreementEntity
* @param ctx
*/
private void producer(AgreementEntity agreementEntity,ChannelHandlerContext ctx){
String serviceId = agreementEntity.getServiceId();
//根据服务名称查找之前是否有缓存地址
List<AddresEntity> listAddress = keyaddresses.get(serviceId);
if (listAddress == null) {
listAddress=new ArrayList<>();
keyaddresses.put(serviceId, listAddress);
ctxs.put(ctx, agreementEntity.getServiceId());
}
String addres = agreementEntity.getAddres();
AddresEntity addresEntity = new AddresEntity(addres, ctx);
listAddress.add(addresEntity);
}
private void consumer(AgreementEntity agreementEntity,ChannelHandlerContext ctx){
String serviceId = agreementEntity.getServiceId();
List<AddresEntity> list = keyaddresses.get(serviceId);
if (list == null || list.size() ==0 ) {
ctx.writeAndFlush(new RespEntity(500, "没有接口列表"));
return ;
}
ArrayList<AddresDto> objects = new ArrayList<AddresDto>();
list.forEach((t)->{
objects.add(new AddresDto(t.getAddres()));
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//如果客户端主动断开链接。回调这个方法
String serviceId = ctxs.get(ctx);
// 根据服务名称查找缓存地址
List<AddresEntity> addresEntities = keyaddresses.get(serviceId);
// 遍历缓存地址剔除
addresEntities.forEach((t) -> {
if (t.getCtx() == ctx) {
//剔除
addresEntities.remove(t);
}
});
ctx.close();
}
}
}
生产者
package com.gtf.NETTYserver.producer;
import com.gtf.NETTYserver.MarshallingCodeCFactory;
import com.gtf.NETTYserver.entity.AgreementEntity;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class ProducerClient {
public void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置 Marshalling 编码
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder()
);
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder()
);
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8006;
String host = "127.0.0.1";
new ProducerClient().connect(port, host);
}
// 客户端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送数据给服务端
ctx.writeAndFlush(new AgreementEntity(0,"gtf02-producer","127.0.0.1:8080"));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
消费者
package com.gtf.NETTYserver.sumer;
import com.gtf.NETTYserver.MarshallingCodeCFactory;
import com.gtf.NETTYserver.entity.AgreementEntity;
import com.gtf.NETTYserver.entity.RespEntity;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.List;
public class ConsumerClient {
public void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置 Marshalling 编码
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder()
);
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder()
);
ch.pipeline().addLast(new ConsumerClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8006;
String host = "127.0.0.1";
new ConsumerClient().connect(port, host);
}
// 客户端Handler
public class ConsumerClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
AgreementEntity agreementEntity = new AgreementEntity(1,
"gtf01", null);
ctx.writeAndFlush(agreementEntity);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RespEntity respEntity = (RespEntity) msg;
System.out.println("消费者读取数据:" + respEntity.toString());
}
}
}
基于netty手写rpc框架
rpc框架例如dubbo,实现服务的发现注册以及调用。
忽略