-
Netty 实现通信的步骤:(客户端与服务器端基本一致)
- 创建两个的 NIO 线程组,一个专门用于网络事件处理(接受客户端的连接),另一个则进行网络通信读写。
- 创建—个 ServerBootstrap 对象,配置 Netty 的一系列参数,例如接受传岀数据的缓存大小等等。
- 创建一个实际处理数据的类 Channellnitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式、已经实际处理数据的接口。
- 绑定端口,执行同步阻塞方法等待服务器端启动即可。
1. 引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>
2. Netty Server
Netty Server 端需要编写 Server 与 ServerHandler 两个核心类
Server
package com.xieqingxin.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @Author: Mr.xie
* @ModifiedBy:
* @CreationDate: 2022/2/20 17:28
* @ModificationDate: 2022/2/20 17:28
* @Description: Server
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
//1. 创建两个线程组: 一个用于进行网络连接接受的 另一个用于我们的实际处理(网络通信的读写)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
//2. 通过辅助类去构造 server/client
ServerBootstrap b = new ServerBootstrap();
//3. 进行 Nio Server 的基础配置
//3.1 绑定两个线程组
b.group(bossGroup, workGroup)
//3.2 因为是 server 端,所以需要配置 NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.3 设置链接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
//3.4 设置 TCP backlog 参数 = sync 队列 + accept 队列
.option(ChannelOption.SO_BACKLOG, 1024)
//3.5 设置配置项 通信不延迟
.childOption(ChannelOption.TCP_NODELAY, true)
//3.6 设置配置项 接收与发送缓存区大小
.childOption(ChannelOption.SO_RCVBUF, 1024 * 32)
.childOption(ChannelOption.SO_SNDBUF, 1024 * 32)
//3.7 进行初始化 ChannelInitializer , 用于构建双向链表 "pipeline" 添加业务 handler 处理
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch){
//3.8 这里仅仅只是添加一个业务处理器:ServerHandler(后面要针对他进行编码)
ch.pipeline().addLast(new ServerHandler());
}
});
//4. 服务器端绑定端口并启动服务;使用 channel 级别的监听 close 端口阻塞的方式
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
//5. 释放资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
ServerHandler
package com.xieqingxin.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @Author: Mr.xie
* @ModifiedBy:
* @CreationDate: 2022/2/20 17:49
* @ModificationDate: 2022/2/20 17:49
* @Description: ServerHandler
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* channelActive
* 通道激活方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("server channel active..");
}
/**
* channelRead
* 读写数据核心方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1. 读取客户端的数据(缓存中去取并打印到控制台)
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String requestBody = new String(request, "utf-8");
System.err.println("Server: " + requestBody);
//2. 返回响应数据
String responseBody = "返回响应数据," + requestBody;
ctx.writeAndFlush(Unpooled.copiedBuffer(responseBody.getBytes()));
}
/**
* exceptionCaught
* 捕获异常方法
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
3. Netty Client
Netty Client 端需要编写 Client 与 ClientHandler 两个核心类
Client
package com.xieqingxin.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @Author: Mr.xie
* @ModifiedBy:
* @CreationDate: 2022/2/20 17:55
* @ModificationDate: 2022/2/20 17:55
* @Description: Client
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
//1. 创建两个线程组: 只需要一个线程组用于我们的实际处理(网络通信的读写)
EventLoopGroup workGroup = new NioEventLoopGroup();
//2. 通过辅助类去构造client,然后进行配置响应的配置参数
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024 * 32)
.option(ChannelOption.SO_SNDBUF, 1024 * 32)
//3. 初始化ChannelInitializer
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//3.1 添加客户端业务处理类
ch.pipeline().addLast(new ClientHandler());
}
});
//4. 服务器端绑定端口并启动服务; 使用channel级别的监听close端口 阻塞的方式
ChannelFuture cf = b.connect("127.0.0.1", 8765).syncUninterruptibly();
//5. 发送一条数据到服务器端
cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!".getBytes()));
//6. 休眠一秒钟后再发送一条数据到服务端
Thread.sleep(1000);
cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty again!".getBytes()));
//7. 同步阻塞关闭监听并释放资源
cf.channel().closeFuture().sync();
workGroup.shutdownGracefully();
}
}
ClientHandler
package com.xieqingxin.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* @Author: Mr.xie
* @ModifiedBy:
* @CreationDate: 2022/2/20 17:57
* @ModificationDate: 2022/2/20 17:57
* @Description: ClientHandler
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* channelActive
* 客户端通道激活
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("client channel active..");
}
/**
* channelRead
* 真正的数据最终会走到这个方法进行处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Client :" + body );
String response = "收到服务器端的返回信息:" + body;
} finally {
ReferenceCountUtil.release(msg);
}
}
/**
* exceptionCaught
* 异常捕获方法
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
4. 完成服务端与客户端的搭建
先启动服务端,再启动客户端,客户端自动与服务端建立连接,会得到如下结果:
客户端控制台:
client channel active..
Client :返回响应数据,hello netty!
Client :返回响应数据,hello netty again!
服务端控制台:
server channel active..
Server: hello netty!
Server: hello netty again!
评论区