NIO与Netty框架应用
哪吒 2024/1/1
# NIO与Netty框架应用
# 1. Java NIO深入解析
# 1.1 NIO核心组件
# Channel(通道)
- FileChannel:文件操作通道
- SocketChannel:TCP Socket通道
- ServerSocketChannel:TCP服务器通道
- DatagramChannel:UDP通道
# Buffer(缓冲区)
- ByteBuffer:字节缓冲区
- CharBuffer:字符缓冲区
- IntBuffer:整数缓冲区
- 其他类型Buffer
# Selector(选择器)
- 多路复用器:监控多个通道的I/O事件
- 事件驱动:基于事件的非阻塞I/O
# 1.2 NIO服务器实现
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class NIOServer {
private static final int PORT = 8080;
private static final int BUFFER_SIZE = 1024;
private ServerSocketChannel serverChannel;
private Selector selector;
private volatile boolean running = false;
// 客户端连接管理
private final ConcurrentHashMap<SocketChannel, ClientSession> clients = new ConcurrentHashMap<>();
private final AtomicLong connectionCounter = new AtomicLong(0);
public void start() throws IOException {
// 创建ServerSocketChannel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(PORT));
// 创建Selector
selector = Selector.open();
// 注册ServerSocketChannel到Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
running = true;
System.out.println("NIO服务器启动,监听端口: " + PORT);
// 事件循环
while (running) {
try {
// 阻塞等待事件
int readyChannels = selector.select(1000);
if (readyChannels == 0) {
continue;
}
// 处理就绪的事件
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
}
}
} catch (IOException e) {
System.err.println("NIO服务器异常: " + e.getMessage());
}
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
if (clientChannel != null) {
clientChannel.configureBlocking(false);
// 创建客户端会话
ClientSession session = new ClientSession(
connectionCounter.incrementAndGet(),
System.currentTimeMillis()
);
clients.put(clientChannel, session);
// 注册读事件
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("新客户端连接: " + clientChannel.getRemoteAddress() +
", 连接ID: " + session.getId());
// 发送欢迎消息
String welcomeMsg = "欢迎连接到NIO服务器!连接ID: " + session.getId() + "\n";
sendMessage(clientChannel, welcomeMsg);
}
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ClientSession session = clients.get(clientChannel);
if (session == null) {
return;
}
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
try {
int bytesRead = clientChannel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data).trim();
System.out.println("收到客户端[" + session.getId() + "]消息: " + message);
// 更新会话信息
session.updateLastActivity();
session.incrementMessageCount();
// 处理消息
handleClientMessage(clientChannel, session, message);
} else if (bytesRead == -1) {
// 客户端断开连接
handleDisconnect(clientChannel, session);
}
} catch (IOException e) {
System.err.println("读取客户端数据异常: " + e.getMessage());
handleDisconnect(clientChannel, session);
}
}
private void handleWrite(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ClientSession session = clients.get(clientChannel);
if (session != null && session.hasDataToWrite()) {
ByteBuffer buffer = session.getWriteBuffer();
try {
int bytesWritten = clientChannel.write(buffer);
if (!buffer.hasRemaining()) {
// 数据写完,取消写事件
key.interestOps(SelectionKey.OP_READ);
session.clearWriteBuffer();
}
} catch (IOException e) {
System.err.println("写入客户端数据异常: " + e.getMessage());
handleDisconnect(clientChannel, session);
}
}
}
private void handleClientMessage(SocketChannel clientChannel, ClientSession session, String message) {
try {
if (message.startsWith("echo ")) {
// Echo命令
String echoMsg = "Echo: " + message.substring(5) + "\n";
sendMessage(clientChannel, echoMsg);
} else if ("time".equals(message)) {
// 时间命令
String timeMsg = "当前时间: " + new java.util.Date() + "\n";
sendMessage(clientChannel, timeMsg);
} else if ("stats".equals(message)) {
// 统计信息
String statsMsg = String.format(
"连接统计:\n" +
"- 连接ID: %d\n" +
"- 连接时间: %s\n" +
"- 消息数量: %d\n" +
"- 在线客户端: %d\n",
session.getId(),
new java.util.Date(session.getConnectTime()),
session.getMessageCount(),
clients.size()
);
sendMessage(clientChannel, statsMsg);
} else if ("broadcast".equals(message)) {
// 广播消息
String broadcastMsg = "[广播] 客户端[" + session.getId() + "]发送了广播消息\n";
broadcastMessage(broadcastMsg, clientChannel);
} else if ("quit".equals(message)) {
// 退出命令
sendMessage(clientChannel, "再见!\n");
handleDisconnect(clientChannel, session);
} else {
// 未知命令
String helpMsg = "可用命令:\n" +
"- echo <message>: 回显消息\n" +
"- time: 获取当前时间\n" +
"- stats: 查看连接统计\n" +
"- broadcast: 发送广播消息\n" +
"- quit: 断开连接\n";
sendMessage(clientChannel, helpMsg);
}
} catch (Exception e) {
System.err.println("处理客户端消息异常: " + e.getMessage());
}
}
private void sendMessage(SocketChannel clientChannel, String message) {
try {
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
while (buffer.hasRemaining()) {
int bytesWritten = clientChannel.write(buffer);
if (bytesWritten == 0) {
// 写缓冲区满,注册写事件
ClientSession session = clients.get(clientChannel);
if (session != null) {
session.setWriteBuffer(buffer);
SelectionKey key = clientChannel.keyFor(selector);
if (key != null) {
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
break;
}
}
} catch (IOException e) {
System.err.println("发送消息异常: " + e.getMessage());
}
}
private void broadcastMessage(String message, SocketChannel excludeChannel) {
for (SocketChannel channel : clients.keySet()) {
if (channel != excludeChannel && channel.isConnected()) {
sendMessage(channel, message);
}
}
}
private void handleDisconnect(SocketChannel clientChannel, ClientSession session) {
try {
System.out.println("客户端[" + session.getId() + "]断开连接");
clients.remove(clientChannel);
clientChannel.close();
} catch (IOException e) {
System.err.println("关闭客户端连接异常: " + e.getMessage());
}
}
public void stop() throws IOException {
running = false;
if (selector != null) {
selector.close();
}
if (serverChannel != null) {
serverChannel.close();
}
// 关闭所有客户端连接
for (SocketChannel channel : clients.keySet()) {
try {
channel.close();
} catch (IOException e) {
System.err.println("关闭客户端连接异常: " + e.getMessage());
}
}
clients.clear();
System.out.println("NIO服务器已停止");
}
// 客户端会话类
private static class ClientSession {
private final long id;
private final long connectTime;
private long lastActivity;
private int messageCount;
private ByteBuffer writeBuffer;
public ClientSession(long id, long connectTime) {
this.id = id;
this.connectTime = connectTime;
this.lastActivity = connectTime;
this.messageCount = 0;
}
public long getId() { return id; }
public long getConnectTime() { return connectTime; }
public long getLastActivity() { return lastActivity; }
public int getMessageCount() { return messageCount; }
public void updateLastActivity() {
this.lastActivity = System.currentTimeMillis();
}
public void incrementMessageCount() {
this.messageCount++;
}
public boolean hasDataToWrite() {
return writeBuffer != null && writeBuffer.hasRemaining();
}
public ByteBuffer getWriteBuffer() {
return writeBuffer;
}
public void setWriteBuffer(ByteBuffer buffer) {
this.writeBuffer = buffer;
}
public void clearWriteBuffer() {
this.writeBuffer = null;
}
}
public static void main(String[] args) {
NIOServer server = new NIOServer();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.stop();
} catch (IOException e) {
System.err.println("停止服务器异常: " + e.getMessage());
}
}));
try {
server.start();
} catch (IOException e) {
System.err.println("启动NIO服务器失败: " + e.getMessage());
}
}
}
# 2. Netty框架入门
# 2.1 Netty核心概念
# EventLoop和EventLoopGroup
- EventLoop:事件循环,处理I/O操作
- EventLoopGroup:EventLoop的集合
- Boss Group:处理连接请求
- Worker Group:处理I/O操作
# Channel和ChannelPipeline
- Channel:网络连接的抽象
- ChannelPipeline:处理器链
- ChannelHandler:业务逻辑处理器
# Bootstrap
- ServerBootstrap:服务器启动器
- Bootstrap:客户端启动器
# 2.2 Netty服务器实现
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class NettyServer {
private static final int PORT = 8081;
// 连接管理
private static final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static final ConcurrentHashMap<String, ChannelHandlerContext> userChannels = new ConcurrentHashMap<>();
private static final AtomicLong connectionCounter = new AtomicLong(0);
public void start() throws InterruptedException {
// 创建EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务处理器
pipeline.addLast(new NettyServerHandler());
}
});
// 绑定端口并启动服务器
ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("Netty服务器启动,监听端口: " + PORT);
// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 服务器处理器
public static class NettyServerHandler extends ChannelInboundHandlerAdapter {
private String userId;
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 客户端连接
userId = "user_" + connectionCounter.incrementAndGet();
userChannels.put(userId, ctx);
allChannels.add(ctx.channel());
System.out.println("客户端连接: " + ctx.channel().remoteAddress() + ", 用户ID: " + userId);
// 发送欢迎消息
ctx.writeAndFlush("欢迎连接到Netty服务器!您的用户ID: " + userId + "\n");
ctx.writeAndFlush("输入 'help' 查看可用命令\n");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = ((String) msg).trim();
System.out.println("收到客户端[" + userId + "]消息: " + message);
// 处理命令
handleCommand(ctx, message);
}
private void handleCommand(ChannelHandlerContext ctx, String command) {
if (command.startsWith("echo ")) {
// Echo命令
String echoMsg = "Echo: " + command.substring(5) + "\n";
ctx.writeAndFlush(echoMsg);
} else if ("time".equals(command)) {
// 时间命令
String timeMsg = "当前时间: " + new java.util.Date() + "\n";
ctx.writeAndFlush(timeMsg);
} else if ("users".equals(command)) {
// 在线用户列表
StringBuilder userList = new StringBuilder("在线用户列表:\n");
for (String user : userChannels.keySet()) {
userList.append("- ").append(user).append("\n");
}
ctx.writeAndFlush(userList.toString());
} else if (command.startsWith("msg ")) {
// 私聊消息: msg user_id message
String[] parts = command.split(" ", 3);
if (parts.length >= 3) {
String targetUser = parts[1];
String message = parts[2];
ChannelHandlerContext targetCtx = userChannels.get(targetUser);
if (targetCtx != null) {
targetCtx.writeAndFlush("[私聊来自 " + userId + "]: " + message + "\n");
ctx.writeAndFlush("消息已发送给 " + targetUser + "\n");
} else {
ctx.writeAndFlush("用户 " + targetUser + " 不存在或已离线\n");
}
} else {
ctx.writeAndFlush("私聊格式: msg <用户ID> <消息内容>\n");
}
} else if (command.startsWith("broadcast ")) {
// 广播消息
String message = command.substring(10);
String broadcastMsg = "[广播来自 " + userId + "]: " + message + "\n";
for (ChannelHandlerContext userCtx : userChannels.values()) {
if (userCtx != ctx) {
userCtx.writeAndFlush(broadcastMsg);
}
}
ctx.writeAndFlush("广播消息已发送\n");
} else if ("stats".equals(command)) {
// 服务器统计
String statsMsg = String.format(
"服务器统计:\n" +
"- 在线用户数: %d\n" +
"- 总连接数: %d\n" +
"- 您的用户ID: %s\n",
userChannels.size(),
connectionCounter.get(),
userId
);
ctx.writeAndFlush(statsMsg);
} else if ("quit".equals(command)) {
// 退出
ctx.writeAndFlush("再见!\n");
ctx.close();
} else if ("help".equals(command)) {
// 帮助信息
String helpMsg = "可用命令:\n" +
"- echo <message>: 回显消息\n" +
"- time: 获取当前时间\n" +
"- users: 查看在线用户\n" +
"- msg <用户ID> <消息>: 发送私聊\n" +
"- broadcast <消息>: 发送广播\n" +
"- stats: 查看服务器统计\n" +
"- quit: 断开连接\n" +
"- help: 显示此帮助\n";
ctx.writeAndFlush(helpMsg);
} else {
ctx.writeAndFlush("未知命令: " + command + ",输入 'help' 查看可用命令\n");
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 客户端断开连接
System.out.println("客户端断开连接: " + ctx.channel().remoteAddress() + ", 用户ID: " + userId);
if (userId != null) {
userChannels.remove(userId);
}
allChannels.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("处理客户端异常: " + cause.getMessage());
ctx.close();
}
}
public static void main(String[] args) {
NettyServer server = new NettyServer();
try {
server.start();
} catch (InterruptedException e) {
System.err.println("Netty服务器启动失败: " + e.getMessage());
}
}
}
# 2.3 Netty客户端实现
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class NettyClient {
private static final String HOST = "localhost";
private static final int PORT = 8081;
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加客户端处理器
pipeline.addLast(new NettyClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
System.out.println("连接到Netty服务器: " + HOST + ":" + PORT);
// 启动用户输入线程
startUserInputThread(future.channel());
// 等待连接关闭
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private void startUserInputThread(Channel channel) {
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入命令(输入 'quit' 退出):");
while (channel.isActive()) {
try {
String input = scanner.nextLine();
if (input != null && !input.trim().isEmpty()) {
channel.writeAndFlush(input + "\n");
if ("quit".equals(input.trim())) {
break;
}
}
} catch (Exception e) {
System.err.println("输入异常: " + e.getMessage());
break;
}
}
scanner.close();
}, "UserInputThread").start();
}
// 客户端处理器
public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("已连接到服务器: " + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.print(message);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("与服务器断开连接");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("客户端异常: " + cause.getMessage());
ctx.close();
}
}
public static void main(String[] args) {
NettyClient client = new NettyClient();
try {
client.start();
} catch (InterruptedException e) {
System.err.println("Netty客户端启动失败: " + e.getMessage());
}
}
}
# 3. Netty高级特性
# 3.1 自定义编解码器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.List;
// 自定义消息协议
class CustomMessage {
private int type; // 消息类型
private int length; // 消息长度
private String content; // 消息内容
public CustomMessage(int type, String content) {
this.type = type;
this.content = content != null ? content : "";
this.length = this.content.getBytes().length;
}
// Getters and Setters
public int getType() { return type; }
public void setType(int type) { this.type = type; }
public int getLength() { return length; }
public void setLength(int length) { this.length = length; }
public String getContent() { return content; }
public void setContent(String content) {
this.content = content != null ? content : "";
this.length = this.content.getBytes().length;
}
@Override
public String toString() {
return "CustomMessage{type=" + type + ", length=" + length + ", content='" + content + "'}";
}
}
// 自定义编码器
class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {
// 协议格式: [4字节类型][4字节长度][内容]
out.writeInt(msg.getType());
out.writeInt(msg.getLength());
if (msg.getLength() > 0) {
out.writeBytes(msg.getContent().getBytes());
}
}
}
// 自定义解码器
class CustomMessageDecoder extends ByteToMessageDecoder {
private static final int HEADER_SIZE = 8; // 4字节类型 + 4字节长度
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 检查是否有足够的字节读取头部
if (in.readableBytes() < HEADER_SIZE) {
return;
}
// 标记读取位置
in.markReaderIndex();
// 读取消息类型和长度
int type = in.readInt();
int length = in.readInt();
// 检查长度是否合理
if (length < 0 || length > 1024 * 1024) { // 限制最大1MB
throw new IllegalArgumentException("消息长度不合理: " + length);
}
// 检查是否有足够的字节读取内容
if (in.readableBytes() < length) {
// 重置读取位置,等待更多数据
in.resetReaderIndex();
return;
}
// 读取消息内容
String content = "";
if (length > 0) {
byte[] contentBytes = new byte[length];
in.readBytes(contentBytes);
content = new String(contentBytes);
}
// 创建消息对象并添加到输出列表
CustomMessage message = new CustomMessage(type, content);
out.add(message);
}
}
// 使用自定义编解码器的服务器
public class CustomProtocolServer {
private static final int PORT = 8082;
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加自定义编解码器
pipeline.addLast(new CustomMessageDecoder());
pipeline.addLast(new CustomMessageEncoder());
// 添加业务处理器
pipeline.addLast(new CustomProtocolHandler());
}
});
ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("自定义协议服务器启动,监听端口: " + PORT);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 自定义协议处理器
public static class CustomProtocolHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接: " + ctx.channel().remoteAddress());
// 发送欢迎消息
CustomMessage welcomeMsg = new CustomMessage(1, "欢迎使用自定义协议服务器!");
ctx.writeAndFlush(welcomeMsg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof CustomMessage) {
CustomMessage message = (CustomMessage) msg;
System.out.println("收到消息: " + message);
// 根据消息类型处理
switch (message.getType()) {
case 1: // 文本消息
handleTextMessage(ctx, message);
break;
case 2: // 命令消息
handleCommandMessage(ctx, message);
break;
case 3: // 心跳消息
handleHeartbeatMessage(ctx, message);
break;
default:
CustomMessage errorMsg = new CustomMessage(999, "未知消息类型: " + message.getType());
ctx.writeAndFlush(errorMsg);
}
}
}
private void handleTextMessage(ChannelHandlerContext ctx, CustomMessage message) {
// 回显文本消息
CustomMessage response = new CustomMessage(1, "Echo: " + message.getContent());
ctx.writeAndFlush(response);
}
private void handleCommandMessage(ChannelHandlerContext ctx, CustomMessage message) {
String command = message.getContent();
String result;
switch (command) {
case "time":
result = "当前时间: " + new java.util.Date();
break;
case "version":
result = "服务器版本: 1.0.0";
break;
default:
result = "未知命令: " + command;
}
CustomMessage response = new CustomMessage(2, result);
ctx.writeAndFlush(response);
}
private void handleHeartbeatMessage(ChannelHandlerContext ctx, CustomMessage message) {
// 响应心跳
CustomMessage response = new CustomMessage(3, "pong");
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("处理异常: " + cause.getMessage());
ctx.close();
}
}
public static void main(String[] args) {
CustomProtocolServer server = new CustomProtocolServer();
try {
server.start();
} catch (InterruptedException e) {
System.err.println("服务器启动失败: " + e.getMessage());
}
}
}
# 3.2 Netty性能优化
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executors;
public class OptimizedNettyServer {
private static final int PORT = 8083;
public void start() throws InterruptedException {
// 根据操作系统选择最优的EventLoopGroup
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
Class<? extends ServerChannel> channelClass;
if (isLinux() && isEpollAvailable()) {
// Linux系统使用Epoll
bossGroup = new EpollEventLoopGroup(1, new DefaultThreadFactory("boss"));
workerGroup = new EpollEventLoopGroup(0, new DefaultThreadFactory("worker"));
channelClass = EpollServerSocketChannel.class;
System.out.println("使用Epoll传输");
} else {
// 其他系统使用NIO
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
channelClass = NioServerSocketChannel.class;
System.out.println("使用NIO传输");
}
// 流量整形处理器(可选)
GlobalTrafficShapingHandler trafficHandler = new GlobalTrafficShapingHandler(
Executors.newScheduledThreadPool(1),
1024 * 1024, // 写入限制: 1MB/s
1024 * 1024, // 读取限制: 1MB/s
1000 // 检查间隔: 1秒
);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(channelClass)
// 服务器Socket选项
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
// 客户端Socket选项
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024)
// 使用池化的ByteBuf分配器
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 设置写缓冲区水位
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(8 * 1024, 32 * 1024))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加流量整形(可选)
// pipeline.addLast(trafficHandler);
// 添加业务处理器
pipeline.addLast(new OptimizedServerHandler());
}
});
ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("优化的Netty服务器启动,监听端口: " + PORT);
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("正在关闭服务器...");
future.channel().close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
trafficHandler.release();
}));
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
trafficHandler.release();
}
}
private boolean isLinux() {
return System.getProperty("os.name").toLowerCase().contains("linux");
}
private boolean isEpollAvailable() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
return io.netty.channel.epoll.Epoll.isAvailable();
} catch (ClassNotFoundException e) {
return false;
}
}
// 优化的服务器处理器
public static class OptimizedServerHandler extends ChannelInboundHandlerAdapter {
private static final int MAX_MESSAGE_SIZE = 1024 * 1024; // 1MB
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理消息(这里简化处理)
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
// 检查消息大小
if (buf.readableBytes() > MAX_MESSAGE_SIZE) {
System.err.println("消息过大,丢弃");
return;
}
// 处理消息内容
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 简单回显
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(data));
} finally {
// 释放ByteBuf
buf.release();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("处理异常: " + cause.getMessage());
ctx.close();
}
}
public static void main(String[] args) {
OptimizedNettyServer server = new OptimizedNettyServer();
try {
server.start();
} catch (InterruptedException e) {
System.err.println("服务器启动失败: " + e.getMessage());
}
}
}
# 4. 总结
# 4.1 NIO vs Netty对比
特性 | Java NIO | Netty |
---|---|---|
学习曲线 | 陡峭 | 相对平缓 |
开发效率 | 低 | 高 |
代码复杂度 | 高 | 低 |
性能 | 高(需要优化) | 高(已优化) |
功能丰富度 | 基础 | 丰富 |
社区支持 | JDK内置 | 活跃社区 |
# 4.2 选择建议
使用Java NIO的场景:
- 学习网络编程原理
- 对框架依赖有严格限制
- 需要完全控制底层实现
使用Netty的场景:
- 快速开发网络应用
- 需要高性能和稳定性
- 复杂的协议处理
- 企业级应用开发
# 4.3 最佳实践
性能优化:
- 合理设置线程池大小
- 使用池化的ByteBuf
- 选择合适的传输实现
- 设置合理的缓冲区大小
资源管理:
- 及时释放ByteBuf
- 正确关闭Channel和EventLoopGroup
- 避免内存泄漏
异常处理:
- 实现完善的异常处理机制
- 记录详细的错误日志
- 优雅地处理连接断开
监控和调试:
- 添加适当的日志记录
- 监控连接数和性能指标
- 使用Netty的内置调试工具
NIO和Netty都是构建高性能网络应用的重要技术,理解它们的原理和最佳实践对于开发可靠的网络服务至关重要。