侧边栏壁纸
博主头像
阿里灰太狼博主等级

You have to believe in yourself . That's the secret of success.

  • 累计撰写 104 篇文章
  • 累计创建 50 个标签
  • 累计收到 12 条评论

目 录CONTENT

文章目录

编写—个最简单的 Netty 示例

阿里灰太狼
2022-02-20 / 0 评论 / 4 点赞 / 262 阅读 / 6,163 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-02-21,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
  • Netty 实现通信的步骤:(客户端与服务器端基本一致)

    • 创建两个的 NIO 线程组,一个专门用于网络事件处理(接受客户端的连接),另一个则进行网络通信读写。
    • 创建—个 ServerBootstrap 对象,配置 Netty 的一系列参数,例如接受传岀数据的缓存大小等等。
    • 创建一个实际处理数据的类 Channellnitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式、已经实际处理数据的接口。
    • 绑定端口,执行同步阻塞方法等待服务器端启动即可。

1. 引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.12.Final</version>
</dependency>

2. Netty Server

Netty Server 端需要编写 Server 与 ServerHandler 两个核心类

Server

package com.xieqingxin.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @Author: Mr.xie
 * @ModifiedBy:
 * @CreationDate: 2022/2/20 17:28
 * @ModificationDate: 2022/2/20 17:28
 * @Description: Server
 */
public class Server {

    public static void main(String[] args) throws InterruptedException {

        //1. 创建两个线程组: 一个用于进行网络连接接受的 另一个用于我们的实际处理(网络通信的读写)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        //2. 通过辅助类去构造 server/client
        ServerBootstrap b = new ServerBootstrap();

        //3. 进行 Nio Server 的基础配置
        //3.1 绑定两个线程组
        b.group(bossGroup, workGroup)
                //3.2 因为是 server 端,所以需要配置 NioServerSocketChannel
                .channel(NioServerSocketChannel.class)
                //3.3 设置链接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                //3.4 设置 TCP backlog 参数 = sync 队列 + accept 队列
                .option(ChannelOption.SO_BACKLOG, 1024)
                //3.5 设置配置项 通信不延迟
                .childOption(ChannelOption.TCP_NODELAY, true)
                //3.6 设置配置项 接收与发送缓存区大小
                .childOption(ChannelOption.SO_RCVBUF, 1024 * 32)
                .childOption(ChannelOption.SO_SNDBUF, 1024 * 32)
                //3.7 进行初始化 ChannelInitializer , 用于构建双向链表 "pipeline" 添加业务 handler 处理
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch){
                        //3.8 这里仅仅只是添加一个业务处理器:ServerHandler(后面要针对他进行编码)
                        ch.pipeline().addLast(new ServerHandler());
            }
        });

        //4. 服务器端绑定端口并启动服务;使用 channel 级别的监听 close 端口阻塞的方式
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();

        //5. 释放资源
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

ServerHandler

package com.xieqingxin.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @Author: Mr.xie
 * @ModifiedBy:
 * @CreationDate: 2022/2/20 17:49
 * @ModificationDate: 2022/2/20 17:49
 * @Description: ServerHandler
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * channelActive
     * 通道激活方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("server channel active..");
    }

    /**
     * channelRead
     * 读写数据核心方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //1. 读取客户端的数据(缓存中去取并打印到控制台)
        ByteBuf buf = (ByteBuf) msg;
        byte[] request = new byte[buf.readableBytes()];
        buf.readBytes(request);
        String requestBody = new String(request, "utf-8");
        System.err.println("Server: " + requestBody);

        //2. 返回响应数据
        String responseBody = "返回响应数据," + requestBody;
        ctx.writeAndFlush(Unpooled.copiedBuffer(responseBody.getBytes()));

    }

    /**
     * exceptionCaught
     * 捕获异常方法
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }

}

3. Netty Client

Netty Client 端需要编写 Client 与 ClientHandler 两个核心类

Client

package com.xieqingxin.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @Author: Mr.xie
 * @ModifiedBy:
 * @CreationDate: 2022/2/20 17:55
 * @ModificationDate: 2022/2/20 17:55
 * @Description: Client
 */
public class Client {

    public static void main(String[] args) throws InterruptedException {

        //1. 创建两个线程组: 只需要一个线程组用于我们的实际处理(网络通信的读写)
        EventLoopGroup workGroup = new NioEventLoopGroup();

        //2. 通过辅助类去构造client,然后进行配置响应的配置参数
        Bootstrap b = new Bootstrap();
        b.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                //3. 初始化ChannelInitializer
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //3.1  添加客户端业务处理类
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });
        //4. 服务器端绑定端口并启动服务; 使用channel级别的监听close端口 阻塞的方式
        ChannelFuture cf = b.connect("127.0.0.1", 8765).syncUninterruptibly();

        //5. 发送一条数据到服务器端
        cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!".getBytes()));

        //6. 休眠一秒钟后再发送一条数据到服务端
        Thread.sleep(1000);
        cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty again!".getBytes()));

        //7. 同步阻塞关闭监听并释放资源
        cf.channel().closeFuture().sync();
        workGroup.shutdownGracefully();
    }
}

ClientHandler

package com.xieqingxin.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * @Author: Mr.xie
 * @ModifiedBy:
 * @CreationDate: 2022/2/20 17:57
 * @ModificationDate: 2022/2/20 17:57
 * @Description: ClientHandler
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    /**
     *  channelActive
     *  客户端通道激活
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("client channel active..");
    }

    /**
     *  channelRead
     *  真正的数据最终会走到这个方法进行处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);

            String body = new String(req, "utf-8");
            System.out.println("Client :" + body );
            String response = "收到服务器端的返回信息:" + body;
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    /**
     *  exceptionCaught
     *  异常捕获方法
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

4. 完成服务端与客户端的搭建

先启动服务端,再启动客户端,客户端自动与服务端建立连接,会得到如下结果:

客户端控制台:
client channel active..
Client :返回响应数据,hello netty!
Client :返回响应数据,hello netty again!
服务端控制台:
server channel active..
Server: hello netty!
Server: hello netty again!
4

评论区