NIO与Netty框架应用

2024/1/1

# NIO与Netty框架应用

# 1. Java NIO深入解析

# 1.1 NIO核心组件

# Channel(通道)

  • FileChannel:文件操作通道
  • SocketChannel:TCP Socket通道
  • ServerSocketChannel:TCP服务器通道
  • DatagramChannel:UDP通道

# Buffer(缓冲区)

  • ByteBuffer:字节缓冲区
  • CharBuffer:字符缓冲区
  • IntBuffer:整数缓冲区
  • 其他类型Buffer

# Selector(选择器)

  • 多路复用器:监控多个通道的I/O事件
  • 事件驱动:基于事件的非阻塞I/O

# 1.2 NIO服务器实现

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class NIOServer {
    private static final int PORT = 8080;
    private static final int BUFFER_SIZE = 1024;
    
    private ServerSocketChannel serverChannel;
    private Selector selector;
    private volatile boolean running = false;
    
    // 客户端连接管理
    private final ConcurrentHashMap<SocketChannel, ClientSession> clients = new ConcurrentHashMap<>();
    private final AtomicLong connectionCounter = new AtomicLong(0);
    
    public void start() throws IOException {
        // 创建ServerSocketChannel
        serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(PORT));
        
        // 创建Selector
        selector = Selector.open();
        
        // 注册ServerSocketChannel到Selector
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        running = true;
        System.out.println("NIO服务器启动,监听端口: " + PORT);
        
        // 事件循环
        while (running) {
            try {
                // 阻塞等待事件
                int readyChannels = selector.select(1000);
                
                if (readyChannels == 0) {
                    continue;
                }
                
                // 处理就绪的事件
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();
                    
                    if (!key.isValid()) {
                        continue;
                    }
                    
                    if (key.isAcceptable()) {
                        handleAccept(key);
                    } else if (key.isReadable()) {
                        handleRead(key);
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }
                }
            } catch (IOException e) {
                System.err.println("NIO服务器异常: " + e.getMessage());
            }
        }
    }
    
    private void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        
        if (clientChannel != null) {
            clientChannel.configureBlocking(false);
            
            // 创建客户端会话
            ClientSession session = new ClientSession(
                connectionCounter.incrementAndGet(),
                System.currentTimeMillis()
            );
            clients.put(clientChannel, session);
            
            // 注册读事件
            clientChannel.register(selector, SelectionKey.OP_READ);
            
            System.out.println("新客户端连接: " + clientChannel.getRemoteAddress() + 
                             ", 连接ID: " + session.getId());
            
            // 发送欢迎消息
            String welcomeMsg = "欢迎连接到NIO服务器!连接ID: " + session.getId() + "\n";
            sendMessage(clientChannel, welcomeMsg);
        }
    }
    
    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ClientSession session = clients.get(clientChannel);
        
        if (session == null) {
            return;
        }
        
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        
        try {
            int bytesRead = clientChannel.read(buffer);
            
            if (bytesRead > 0) {
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                
                String message = new String(data).trim();
                System.out.println("收到客户端[" + session.getId() + "]消息: " + message);
                
                // 更新会话信息
                session.updateLastActivity();
                session.incrementMessageCount();
                
                // 处理消息
                handleClientMessage(clientChannel, session, message);
                
            } else if (bytesRead == -1) {
                // 客户端断开连接
                handleDisconnect(clientChannel, session);
            }
        } catch (IOException e) {
            System.err.println("读取客户端数据异常: " + e.getMessage());
            handleDisconnect(clientChannel, session);
        }
    }
    
    private void handleWrite(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ClientSession session = clients.get(clientChannel);
        
        if (session != null && session.hasDataToWrite()) {
            ByteBuffer buffer = session.getWriteBuffer();
            
            try {
                int bytesWritten = clientChannel.write(buffer);
                
                if (!buffer.hasRemaining()) {
                    // 数据写完,取消写事件
                    key.interestOps(SelectionKey.OP_READ);
                    session.clearWriteBuffer();
                }
            } catch (IOException e) {
                System.err.println("写入客户端数据异常: " + e.getMessage());
                handleDisconnect(clientChannel, session);
            }
        }
    }
    
    private void handleClientMessage(SocketChannel clientChannel, ClientSession session, String message) {
        try {
            if (message.startsWith("echo ")) {
                // Echo命令
                String echoMsg = "Echo: " + message.substring(5) + "\n";
                sendMessage(clientChannel, echoMsg);
                
            } else if ("time".equals(message)) {
                // 时间命令
                String timeMsg = "当前时间: " + new java.util.Date() + "\n";
                sendMessage(clientChannel, timeMsg);
                
            } else if ("stats".equals(message)) {
                // 统计信息
                String statsMsg = String.format(
                    "连接统计:\n" +
                    "- 连接ID: %d\n" +
                    "- 连接时间: %s\n" +
                    "- 消息数量: %d\n" +
                    "- 在线客户端: %d\n",
                    session.getId(),
                    new java.util.Date(session.getConnectTime()),
                    session.getMessageCount(),
                    clients.size()
                );
                sendMessage(clientChannel, statsMsg);
                
            } else if ("broadcast".equals(message)) {
                // 广播消息
                String broadcastMsg = "[广播] 客户端[" + session.getId() + "]发送了广播消息\n";
                broadcastMessage(broadcastMsg, clientChannel);
                
            } else if ("quit".equals(message)) {
                // 退出命令
                sendMessage(clientChannel, "再见!\n");
                handleDisconnect(clientChannel, session);
                
            } else {
                // 未知命令
                String helpMsg = "可用命令:\n" +
                               "- echo <message>: 回显消息\n" +
                               "- time: 获取当前时间\n" +
                               "- stats: 查看连接统计\n" +
                               "- broadcast: 发送广播消息\n" +
                               "- quit: 断开连接\n";
                sendMessage(clientChannel, helpMsg);
            }
        } catch (Exception e) {
            System.err.println("处理客户端消息异常: " + e.getMessage());
        }
    }
    
    private void sendMessage(SocketChannel clientChannel, String message) {
        try {
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
            
            while (buffer.hasRemaining()) {
                int bytesWritten = clientChannel.write(buffer);
                if (bytesWritten == 0) {
                    // 写缓冲区满,注册写事件
                    ClientSession session = clients.get(clientChannel);
                    if (session != null) {
                        session.setWriteBuffer(buffer);
                        SelectionKey key = clientChannel.keyFor(selector);
                        if (key != null) {
                            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        }
                    }
                    break;
                }
            }
        } catch (IOException e) {
            System.err.println("发送消息异常: " + e.getMessage());
        }
    }
    
    private void broadcastMessage(String message, SocketChannel excludeChannel) {
        for (SocketChannel channel : clients.keySet()) {
            if (channel != excludeChannel && channel.isConnected()) {
                sendMessage(channel, message);
            }
        }
    }
    
    private void handleDisconnect(SocketChannel clientChannel, ClientSession session) {
        try {
            System.out.println("客户端[" + session.getId() + "]断开连接");
            
            clients.remove(clientChannel);
            clientChannel.close();
            
        } catch (IOException e) {
            System.err.println("关闭客户端连接异常: " + e.getMessage());
        }
    }
    
    public void stop() throws IOException {
        running = false;
        
        if (selector != null) {
            selector.close();
        }
        
        if (serverChannel != null) {
            serverChannel.close();
        }
        
        // 关闭所有客户端连接
        for (SocketChannel channel : clients.keySet()) {
            try {
                channel.close();
            } catch (IOException e) {
                System.err.println("关闭客户端连接异常: " + e.getMessage());
            }
        }
        clients.clear();
        
        System.out.println("NIO服务器已停止");
    }
    
    // 客户端会话类
    private static class ClientSession {
        private final long id;
        private final long connectTime;
        private long lastActivity;
        private int messageCount;
        private ByteBuffer writeBuffer;
        
        public ClientSession(long id, long connectTime) {
            this.id = id;
            this.connectTime = connectTime;
            this.lastActivity = connectTime;
            this.messageCount = 0;
        }
        
        public long getId() { return id; }
        public long getConnectTime() { return connectTime; }
        public long getLastActivity() { return lastActivity; }
        public int getMessageCount() { return messageCount; }
        
        public void updateLastActivity() {
            this.lastActivity = System.currentTimeMillis();
        }
        
        public void incrementMessageCount() {
            this.messageCount++;
        }
        
        public boolean hasDataToWrite() {
            return writeBuffer != null && writeBuffer.hasRemaining();
        }
        
        public ByteBuffer getWriteBuffer() {
            return writeBuffer;
        }
        
        public void setWriteBuffer(ByteBuffer buffer) {
            this.writeBuffer = buffer;
        }
        
        public void clearWriteBuffer() {
            this.writeBuffer = null;
        }
    }
    
    public static void main(String[] args) {
        NIOServer server = new NIOServer();
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                server.stop();
            } catch (IOException e) {
                System.err.println("停止服务器异常: " + e.getMessage());
            }
        }));
        
        try {
            server.start();
        } catch (IOException e) {
            System.err.println("启动NIO服务器失败: " + e.getMessage());
        }
    }
}

# 2. Netty框架入门

# 2.1 Netty核心概念

# EventLoop和EventLoopGroup

  • EventLoop:事件循环,处理I/O操作
  • EventLoopGroup:EventLoop的集合
  • Boss Group:处理连接请求
  • Worker Group:处理I/O操作

# Channel和ChannelPipeline

  • Channel:网络连接的抽象
  • ChannelPipeline:处理器链
  • ChannelHandler:业务逻辑处理器

# Bootstrap

  • ServerBootstrap:服务器启动器
  • Bootstrap:客户端启动器

# 2.2 Netty服务器实现

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class NettyServer {
    private static final int PORT = 8081;
    
    // 连接管理
    private static final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ConcurrentHashMap<String, ChannelHandlerContext> userChannels = new ConcurrentHashMap<>();
    private static final AtomicLong connectionCounter = new AtomicLong(0);
    
    public void start() throws InterruptedException {
        // 创建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            
                            // 添加编解码器
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            
                            // 添加业务处理器
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            
            // 绑定端口并启动服务器
            ChannelFuture future = bootstrap.bind(PORT).sync();
            System.out.println("Netty服务器启动,监听端口: " + PORT);
            
            // 等待服务器关闭
            future.channel().closeFuture().sync();
            
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    // 服务器处理器
    public static class NettyServerHandler extends ChannelInboundHandlerAdapter {
        private String userId;
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // 客户端连接
            userId = "user_" + connectionCounter.incrementAndGet();
            userChannels.put(userId, ctx);
            allChannels.add(ctx.channel());
            
            System.out.println("客户端连接: " + ctx.channel().remoteAddress() + ", 用户ID: " + userId);
            
            // 发送欢迎消息
            ctx.writeAndFlush("欢迎连接到Netty服务器!您的用户ID: " + userId + "\n");
            ctx.writeAndFlush("输入 'help' 查看可用命令\n");
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String message = ((String) msg).trim();
            System.out.println("收到客户端[" + userId + "]消息: " + message);
            
            // 处理命令
            handleCommand(ctx, message);
        }
        
        private void handleCommand(ChannelHandlerContext ctx, String command) {
            if (command.startsWith("echo ")) {
                // Echo命令
                String echoMsg = "Echo: " + command.substring(5) + "\n";
                ctx.writeAndFlush(echoMsg);
                
            } else if ("time".equals(command)) {
                // 时间命令
                String timeMsg = "当前时间: " + new java.util.Date() + "\n";
                ctx.writeAndFlush(timeMsg);
                
            } else if ("users".equals(command)) {
                // 在线用户列表
                StringBuilder userList = new StringBuilder("在线用户列表:\n");
                for (String user : userChannels.keySet()) {
                    userList.append("- ").append(user).append("\n");
                }
                ctx.writeAndFlush(userList.toString());
                
            } else if (command.startsWith("msg ")) {
                // 私聊消息: msg user_id message
                String[] parts = command.split(" ", 3);
                if (parts.length >= 3) {
                    String targetUser = parts[1];
                    String message = parts[2];
                    
                    ChannelHandlerContext targetCtx = userChannels.get(targetUser);
                    if (targetCtx != null) {
                        targetCtx.writeAndFlush("[私聊来自 " + userId + "]: " + message + "\n");
                        ctx.writeAndFlush("消息已发送给 " + targetUser + "\n");
                    } else {
                        ctx.writeAndFlush("用户 " + targetUser + " 不存在或已离线\n");
                    }
                } else {
                    ctx.writeAndFlush("私聊格式: msg <用户ID> <消息内容>\n");
                }
                
            } else if (command.startsWith("broadcast ")) {
                // 广播消息
                String message = command.substring(10);
                String broadcastMsg = "[广播来自 " + userId + "]: " + message + "\n";
                
                for (ChannelHandlerContext userCtx : userChannels.values()) {
                    if (userCtx != ctx) {
                        userCtx.writeAndFlush(broadcastMsg);
                    }
                }
                ctx.writeAndFlush("广播消息已发送\n");
                
            } else if ("stats".equals(command)) {
                // 服务器统计
                String statsMsg = String.format(
                    "服务器统计:\n" +
                    "- 在线用户数: %d\n" +
                    "- 总连接数: %d\n" +
                    "- 您的用户ID: %s\n",
                    userChannels.size(),
                    connectionCounter.get(),
                    userId
                );
                ctx.writeAndFlush(statsMsg);
                
            } else if ("quit".equals(command)) {
                // 退出
                ctx.writeAndFlush("再见!\n");
                ctx.close();
                
            } else if ("help".equals(command)) {
                // 帮助信息
                String helpMsg = "可用命令:\n" +
                               "- echo <message>: 回显消息\n" +
                               "- time: 获取当前时间\n" +
                               "- users: 查看在线用户\n" +
                               "- msg <用户ID> <消息>: 发送私聊\n" +
                               "- broadcast <消息>: 发送广播\n" +
                               "- stats: 查看服务器统计\n" +
                               "- quit: 断开连接\n" +
                               "- help: 显示此帮助\n";
                ctx.writeAndFlush(helpMsg);
                
            } else {
                ctx.writeAndFlush("未知命令: " + command + ",输入 'help' 查看可用命令\n");
            }
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            // 客户端断开连接
            System.out.println("客户端断开连接: " + ctx.channel().remoteAddress() + ", 用户ID: " + userId);
            
            if (userId != null) {
                userChannels.remove(userId);
            }
            allChannels.remove(ctx.channel());
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.println("处理客户端异常: " + cause.getMessage());
            ctx.close();
        }
    }
    
    public static void main(String[] args) {
        NettyServer server = new NettyServer();
        try {
            server.start();
        } catch (InterruptedException e) {
            System.err.println("Netty服务器启动失败: " + e.getMessage());
        }
    }
}

# 2.3 Netty客户端实现

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class NettyClient {
    private static final String HOST = "localhost";
    private static final int PORT = 8081;
    
    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            
                            // 添加编解码器
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            
                            // 添加客户端处理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            
            // 连接服务器
            ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
            System.out.println("连接到Netty服务器: " + HOST + ":" + PORT);
            
            // 启动用户输入线程
            startUserInputThread(future.channel());
            
            // 等待连接关闭
            future.channel().closeFuture().sync();
            
        } finally {
            group.shutdownGracefully();
        }
    }
    
    private void startUserInputThread(Channel channel) {
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入命令(输入 'quit' 退出):");
            
            while (channel.isActive()) {
                try {
                    String input = scanner.nextLine();
                    if (input != null && !input.trim().isEmpty()) {
                        channel.writeAndFlush(input + "\n");
                        
                        if ("quit".equals(input.trim())) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    System.err.println("输入异常: " + e.getMessage());
                    break;
                }
            }
            
            scanner.close();
        }, "UserInputThread").start();
    }
    
    // 客户端处理器
    public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("已连接到服务器: " + ctx.channel().remoteAddress());
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String message = (String) msg;
            System.out.print(message);
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("与服务器断开连接");
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.println("客户端异常: " + cause.getMessage());
            ctx.close();
        }
    }
    
    public static void main(String[] args) {
        NettyClient client = new NettyClient();
        try {
            client.start();
        } catch (InterruptedException e) {
            System.err.println("Netty客户端启动失败: " + e.getMessage());
        }
    }
}

# 3. Netty高级特性

# 3.1 自定义编解码器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;

import java.util.List;

// 自定义消息协议
class CustomMessage {
    private int type;        // 消息类型
    private int length;      // 消息长度
    private String content;  // 消息内容
    
    public CustomMessage(int type, String content) {
        this.type = type;
        this.content = content != null ? content : "";
        this.length = this.content.getBytes().length;
    }
    
    // Getters and Setters
    public int getType() { return type; }
    public void setType(int type) { this.type = type; }
    
    public int getLength() { return length; }
    public void setLength(int length) { this.length = length; }
    
    public String getContent() { return content; }
    public void setContent(String content) {
        this.content = content != null ? content : "";
        this.length = this.content.getBytes().length;
    }
    
    @Override
    public String toString() {
        return "CustomMessage{type=" + type + ", length=" + length + ", content='" + content + "'}";
    }
}

// 自定义编码器
class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {
    
    @Override
    protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
        // 协议格式: [4字节类型][4字节长度][内容]
        out.writeInt(msg.getType());
        out.writeInt(msg.getLength());
        if (msg.getLength() > 0) {
            out.writeBytes(msg.getContent().getBytes());
        }
    }
}

// 自定义解码器
class CustomMessageDecoder extends ByteToMessageDecoder {
    private static final int HEADER_SIZE = 8; // 4字节类型 + 4字节长度
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 检查是否有足够的字节读取头部
        if (in.readableBytes() < HEADER_SIZE) {
            return;
        }
        
        // 标记读取位置
        in.markReaderIndex();
        
        // 读取消息类型和长度
        int type = in.readInt();
        int length = in.readInt();
        
        // 检查长度是否合理
        if (length < 0 || length > 1024 * 1024) { // 限制最大1MB
            throw new IllegalArgumentException("消息长度不合理: " + length);
        }
        
        // 检查是否有足够的字节读取内容
        if (in.readableBytes() < length) {
            // 重置读取位置,等待更多数据
            in.resetReaderIndex();
            return;
        }
        
        // 读取消息内容
        String content = "";
        if (length > 0) {
            byte[] contentBytes = new byte[length];
            in.readBytes(contentBytes);
            content = new String(contentBytes);
        }
        
        // 创建消息对象并添加到输出列表
        CustomMessage message = new CustomMessage(type, content);
        out.add(message);
    }
}

// 使用自定义编解码器的服务器
public class CustomProtocolServer {
    private static final int PORT = 8082;
    
    public void start() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            
                            // 添加自定义编解码器
                            pipeline.addLast(new CustomMessageDecoder());
                            pipeline.addLast(new CustomMessageEncoder());
                            
                            // 添加业务处理器
                            pipeline.addLast(new CustomProtocolHandler());
                        }
                    });
            
            ChannelFuture future = bootstrap.bind(PORT).sync();
            System.out.println("自定义协议服务器启动,监听端口: " + PORT);
            
            future.channel().closeFuture().sync();
            
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    // 自定义协议处理器
    public static class CustomProtocolHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("客户端连接: " + ctx.channel().remoteAddress());
            
            // 发送欢迎消息
            CustomMessage welcomeMsg = new CustomMessage(1, "欢迎使用自定义协议服务器!");
            ctx.writeAndFlush(welcomeMsg);
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof CustomMessage) {
                CustomMessage message = (CustomMessage) msg;
                System.out.println("收到消息: " + message);
                
                // 根据消息类型处理
                switch (message.getType()) {
                    case 1: // 文本消息
                        handleTextMessage(ctx, message);
                        break;
                    case 2: // 命令消息
                        handleCommandMessage(ctx, message);
                        break;
                    case 3: // 心跳消息
                        handleHeartbeatMessage(ctx, message);
                        break;
                    default:
                        CustomMessage errorMsg = new CustomMessage(999, "未知消息类型: " + message.getType());
                        ctx.writeAndFlush(errorMsg);
                }
            }
        }
        
        private void handleTextMessage(ChannelHandlerContext ctx, CustomMessage message) {
            // 回显文本消息
            CustomMessage response = new CustomMessage(1, "Echo: " + message.getContent());
            ctx.writeAndFlush(response);
        }
        
        private void handleCommandMessage(ChannelHandlerContext ctx, CustomMessage message) {
            String command = message.getContent();
            String result;
            
            switch (command) {
                case "time":
                    result = "当前时间: " + new java.util.Date();
                    break;
                case "version":
                    result = "服务器版本: 1.0.0";
                    break;
                default:
                    result = "未知命令: " + command;
            }
            
            CustomMessage response = new CustomMessage(2, result);
            ctx.writeAndFlush(response);
        }
        
        private void handleHeartbeatMessage(ChannelHandlerContext ctx, CustomMessage message) {
            // 响应心跳
            CustomMessage response = new CustomMessage(3, "pong");
            ctx.writeAndFlush(response);
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.println("处理异常: " + cause.getMessage());
            ctx.close();
        }
    }
    
    public static void main(String[] args) {
        CustomProtocolServer server = new CustomProtocolServer();
        try {
            server.start();
        } catch (InterruptedException e) {
            System.err.println("服务器启动失败: " + e.getMessage());
        }
    }
}

# 3.2 Netty性能优化

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.Executors;

public class OptimizedNettyServer {
    private static final int PORT = 8083;
    
    public void start() throws InterruptedException {
        // 根据操作系统选择最优的EventLoopGroup
        EventLoopGroup bossGroup;
        EventLoopGroup workerGroup;
        Class<? extends ServerChannel> channelClass;
        
        if (isLinux() && isEpollAvailable()) {
            // Linux系统使用Epoll
            bossGroup = new EpollEventLoopGroup(1, new DefaultThreadFactory("boss"));
            workerGroup = new EpollEventLoopGroup(0, new DefaultThreadFactory("worker"));
            channelClass = EpollServerSocketChannel.class;
            System.out.println("使用Epoll传输");
        } else {
            // 其他系统使用NIO
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
            workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
            channelClass = NioServerSocketChannel.class;
            System.out.println("使用NIO传输");
        }
        
        // 流量整形处理器(可选)
        GlobalTrafficShapingHandler trafficHandler = new GlobalTrafficShapingHandler(
            Executors.newScheduledThreadPool(1),
            1024 * 1024, // 写入限制: 1MB/s
            1024 * 1024, // 读取限制: 1MB/s
            1000 // 检查间隔: 1秒
        );
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(channelClass)
                    
                    // 服务器Socket选项
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    
                    // 客户端Socket选项
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
                    .childOption(ChannelOption.SO_RCVBUF, 32 * 1024)
                    
                    // 使用池化的ByteBuf分配器
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    
                    // 设置写缓冲区水位
                    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
                        new WriteBufferWaterMark(8 * 1024, 32 * 1024))
                    
                    .childHandler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            
                            // 添加流量整形(可选)
                            // pipeline.addLast(trafficHandler);
                            
                            // 添加业务处理器
                            pipeline.addLast(new OptimizedServerHandler());
                        }
                    });
            
            ChannelFuture future = bootstrap.bind(PORT).sync();
            System.out.println("优化的Netty服务器启动,监听端口: " + PORT);
            
            // 添加关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("正在关闭服务器...");
                future.channel().close();
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                trafficHandler.release();
            }));
            
            future.channel().closeFuture().sync();
            
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            trafficHandler.release();
        }
    }
    
    private boolean isLinux() {
        return System.getProperty("os.name").toLowerCase().contains("linux");
    }
    
    private boolean isEpollAvailable() {
        try {
            Class.forName("io.netty.channel.epoll.Epoll");
            return io.netty.channel.epoll.Epoll.isAvailable();
        } catch (ClassNotFoundException e) {
            return false;
        }
    }
    
    // 优化的服务器处理器
    public static class OptimizedServerHandler extends ChannelInboundHandlerAdapter {
        private static final int MAX_MESSAGE_SIZE = 1024 * 1024; // 1MB
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 处理消息(这里简化处理)
            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                
                try {
                    // 检查消息大小
                    if (buf.readableBytes() > MAX_MESSAGE_SIZE) {
                        System.err.println("消息过大,丢弃");
                        return;
                    }
                    
                    // 处理消息内容
                    byte[] data = new byte[buf.readableBytes()];
                    buf.readBytes(data);
                    
                    // 简单回显
                    ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(data));
                    
                } finally {
                    // 释放ByteBuf
                    buf.release();
                }
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.println("处理异常: " + cause.getMessage());
            ctx.close();
        }
    }
    
    public static void main(String[] args) {
        OptimizedNettyServer server = new OptimizedNettyServer();
        try {
            server.start();
        } catch (InterruptedException e) {
            System.err.println("服务器启动失败: " + e.getMessage());
        }
    }
}

# 4. 总结

# 4.1 NIO vs Netty对比

特性 Java NIO Netty
学习曲线 陡峭 相对平缓
开发效率
代码复杂度
性能 高(需要优化) 高(已优化)
功能丰富度 基础 丰富
社区支持 JDK内置 活跃社区

# 4.2 选择建议

  1. 使用Java NIO的场景

    • 学习网络编程原理
    • 对框架依赖有严格限制
    • 需要完全控制底层实现
  2. 使用Netty的场景

    • 快速开发网络应用
    • 需要高性能和稳定性
    • 复杂的协议处理
    • 企业级应用开发

# 4.3 最佳实践

  1. 性能优化

    • 合理设置线程池大小
    • 使用池化的ByteBuf
    • 选择合适的传输实现
    • 设置合理的缓冲区大小
  2. 资源管理

    • 及时释放ByteBuf
    • 正确关闭Channel和EventLoopGroup
    • 避免内存泄漏
  3. 异常处理

    • 实现完善的异常处理机制
    • 记录详细的错误日志
    • 优雅地处理连接断开
  4. 监控和调试

    • 添加适当的日志记录
    • 监控连接数和性能指标
    • 使用Netty的内置调试工具

NIO和Netty都是构建高性能网络应用的重要技术,理解它们的原理和最佳实践对于开发可靠的网络服务至关重要。