Socket编程实战案例
哪吒 2024/1/1
# Socket编程实战案例
# 1. 文件传输系统
# 1.1 文件传输协议设计
协议格式:
[命令类型(1字节)][文件名长度(4字节)][文件名][文件大小(8字节)][文件内容]
命令类型:
- 0x01: 文件上传请求
- 0x02: 文件下载请求
- 0x03: 文件列表请求
- 0x04: 响应成功
- 0x05: 响应失败
# 1.2 文件传输服务器
import java.io.*;
import java.net.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class FileTransferServer {
private static final int PORT = 9001;
private static final String UPLOAD_DIR = "./uploads";
private static final String DOWNLOAD_DIR = "./downloads";
private static final int BUFFER_SIZE = 8192;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private volatile boolean running = false;
private final AtomicLong transferCounter = new AtomicLong(0);
public FileTransferServer() {
this.threadPool = Executors.newFixedThreadPool(10);
// 创建目录
createDirectories();
}
private void createDirectories() {
try {
Files.createDirectories(Paths.get(UPLOAD_DIR));
Files.createDirectories(Paths.get(DOWNLOAD_DIR));
} catch (IOException e) {
System.err.println("创建目录失败: " + e.getMessage());
}
}
public void start() throws IOException {
serverSocket = new ServerSocket(PORT);
running = true;
System.out.println("文件传输服务器启动,监听端口: " + PORT);
System.out.println("上传目录: " + UPLOAD_DIR);
System.out.println("下载目录: " + DOWNLOAD_DIR);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
threadPool.submit(new FileTransferHandler(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("接受连接异常: " + e.getMessage());
}
}
}
}
public void stop() throws IOException {
running = false;
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
threadPool.shutdown();
System.out.println("文件传输服务器已停止");
}
private class FileTransferHandler implements Runnable {
private final Socket clientSocket;
private final long transferId;
public FileTransferHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
this.transferId = transferCounter.incrementAndGet();
}
@Override
public void run() {
System.out.println("[" + transferId + "] 客户端连接: " + clientSocket.getRemoteSocketAddress());
try (DataInputStream dis = new DataInputStream(clientSocket.getInputStream());
DataOutputStream dos = new DataOutputStream(clientSocket.getOutputStream())) {
while (true) {
// 读取命令类型
byte command = dis.readByte();
switch (command) {
case 0x01: // 文件上传
handleFileUpload(dis, dos);
break;
case 0x02: // 文件下载
handleFileDownload(dis, dos);
break;
case 0x03: // 文件列表
handleFileList(dos);
break;
default:
System.err.println("[" + transferId + "] 未知命令: " + command);
sendErrorResponse(dos, "未知命令");
return;
}
}
} catch (EOFException e) {
System.out.println("[" + transferId + "] 客户端断开连接");
} catch (IOException e) {
System.err.println("[" + transferId + "] 处理客户端异常: " + e.getMessage());
} finally {
try {
clientSocket.close();
} catch (IOException e) {
System.err.println("[" + transferId + "] 关闭连接异常: " + e.getMessage());
}
}
}
private void handleFileUpload(DataInputStream dis, DataOutputStream dos) throws IOException {
// 读取文件名
int fileNameLength = dis.readInt();
byte[] fileNameBytes = new byte[fileNameLength];
dis.readFully(fileNameBytes);
String fileName = new String(fileNameBytes);
// 读取文件大小
long fileSize = dis.readLong();
System.out.println("[" + transferId + "] 开始上传文件: " + fileName + ", 大小: " + fileSize + " 字节");
// 检查文件大小限制(例如100MB)
if (fileSize > 100 * 1024 * 1024) {
sendErrorResponse(dos, "文件过大,限制100MB");
return;
}
// 创建上传文件
File uploadFile = new File(UPLOAD_DIR, fileName);
try (FileOutputStream fos = new FileOutputStream(uploadFile);
BufferedOutputStream bos = new BufferedOutputStream(fos)) {
byte[] buffer = new byte[BUFFER_SIZE];
long totalRead = 0;
while (totalRead < fileSize) {
int toRead = (int) Math.min(buffer.length, fileSize - totalRead);
int bytesRead = dis.read(buffer, 0, toRead);
if (bytesRead == -1) {
throw new IOException("连接意外断开");
}
bos.write(buffer, 0, bytesRead);
totalRead += bytesRead;
// 显示进度
if (totalRead % (1024 * 1024) == 0 || totalRead == fileSize) {
System.out.printf("[%d] 上传进度: %.2f%%\n",
transferId, (double) totalRead / fileSize * 100);
}
}
bos.flush();
System.out.println("[" + transferId + "] 文件上传完成: " + fileName);
sendSuccessResponse(dos, "文件上传成功");
} catch (IOException e) {
// 删除不完整的文件
if (uploadFile.exists()) {
uploadFile.delete();
}
throw e;
}
}
private void handleFileDownload(DataInputStream dis, DataOutputStream dos) throws IOException {
// 读取文件名
int fileNameLength = dis.readInt();
byte[] fileNameBytes = new byte[fileNameLength];
dis.readFully(fileNameBytes);
String fileName = new String(fileNameBytes);
System.out.println("[" + transferId + "] 请求下载文件: " + fileName);
File downloadFile = new File(DOWNLOAD_DIR, fileName);
if (!downloadFile.exists() || !downloadFile.isFile()) {
sendErrorResponse(dos, "文件不存在");
return;
}
long fileSize = downloadFile.length();
// 发送成功响应
dos.writeByte(0x04); // 成功
dos.writeInt(0); // 消息长度为0
dos.writeLong(fileSize); // 文件大小
System.out.println("[" + transferId + "] 开始下载文件: " + fileName + ", 大小: " + fileSize + " 字节");
// 发送文件内容
try (FileInputStream fis = new FileInputStream(downloadFile);
BufferedInputStream bis = new BufferedInputStream(fis)) {
byte[] buffer = new byte[BUFFER_SIZE];
long totalSent = 0;
int bytesRead;
while ((bytesRead = bis.read(buffer)) != -1) {
dos.write(buffer, 0, bytesRead);
totalSent += bytesRead;
// 显示进度
if (totalSent % (1024 * 1024) == 0 || totalSent == fileSize) {
System.out.printf("[%d] 下载进度: %.2f%%\n",
transferId, (double) totalSent / fileSize * 100);
}
}
dos.flush();
System.out.println("[" + transferId + "] 文件下载完成: " + fileName);
}
}
private void handleFileList(DataOutputStream dos) throws IOException {
System.out.println("[" + transferId + "] 请求文件列表");
File downloadDir = new File(DOWNLOAD_DIR);
File[] files = downloadDir.listFiles();
StringBuilder fileList = new StringBuilder();
if (files != null) {
for (File file : files) {
if (file.isFile()) {
fileList.append(file.getName())
.append(" (")
.append(file.length())
.append(" 字节)\n");
}
}
}
if (fileList.length() == 0) {
fileList.append("目录为空");
}
sendSuccessResponse(dos, fileList.toString());
}
private void sendSuccessResponse(DataOutputStream dos, String message) throws IOException {
dos.writeByte(0x04); // 成功
byte[] messageBytes = message.getBytes();
dos.writeInt(messageBytes.length);
dos.write(messageBytes);
dos.flush();
}
private void sendErrorResponse(DataOutputStream dos, String error) throws IOException {
dos.writeByte(0x05); // 失败
byte[] errorBytes = error.getBytes();
dos.writeInt(errorBytes.length);
dos.write(errorBytes);
dos.flush();
}
}
public static void main(String[] args) {
FileTransferServer server = new FileTransferServer();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.stop();
} catch (IOException e) {
System.err.println("停止服务器异常: " + e.getMessage());
}
}));
try {
server.start();
} catch (IOException e) {
System.err.println("启动文件传输服务器失败: " + e.getMessage());
}
}
}
# 1.3 文件传输客户端
import java.io.*;
import java.net.*;
import java.util.Scanner;
public class FileTransferClient {
private static final String SERVER_HOST = "localhost";
private static final int SERVER_PORT = 9001;
private static final int BUFFER_SIZE = 8192;
private Socket socket;
private DataInputStream dis;
private DataOutputStream dos;
public boolean connect() {
try {
socket = new Socket(SERVER_HOST, SERVER_PORT);
dis = new DataInputStream(socket.getInputStream());
dos = new DataOutputStream(socket.getOutputStream());
System.out.println("连接到文件传输服务器: " + SERVER_HOST + ":" + SERVER_PORT);
return true;
} catch (IOException e) {
System.err.println("连接服务器失败: " + e.getMessage());
return false;
}
}
public void disconnect() {
try {
if (socket != null && !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
System.err.println("断开连接异常: " + e.getMessage());
}
}
public void uploadFile(String filePath) {
File file = new File(filePath);
if (!file.exists() || !file.isFile()) {
System.err.println("文件不存在: " + filePath);
return;
}
try {
String fileName = file.getName();
long fileSize = file.length();
System.out.println("开始上传文件: " + fileName + ", 大小: " + fileSize + " 字节");
// 发送上传命令
dos.writeByte(0x01);
// 发送文件名
byte[] fileNameBytes = fileName.getBytes();
dos.writeInt(fileNameBytes.length);
dos.write(fileNameBytes);
// 发送文件大小
dos.writeLong(fileSize);
// 发送文件内容
try (FileInputStream fis = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fis)) {
byte[] buffer = new byte[BUFFER_SIZE];
long totalSent = 0;
int bytesRead;
while ((bytesRead = bis.read(buffer)) != -1) {
dos.write(buffer, 0, bytesRead);
totalSent += bytesRead;
// 显示进度
if (totalSent % (1024 * 1024) == 0 || totalSent == fileSize) {
System.out.printf("上传进度: %.2f%%\n", (double) totalSent / fileSize * 100);
}
}
dos.flush();
}
// 读取服务器响应
readResponse();
} catch (IOException e) {
System.err.println("上传文件异常: " + e.getMessage());
}
}
public void downloadFile(String fileName, String savePath) {
try {
System.out.println("开始下载文件: " + fileName);
// 发送下载命令
dos.writeByte(0x02);
// 发送文件名
byte[] fileNameBytes = fileName.getBytes();
dos.writeInt(fileNameBytes.length);
dos.write(fileNameBytes);
dos.flush();
// 读取响应
byte responseType = dis.readByte();
if (responseType == 0x05) { // 错误响应
int messageLength = dis.readInt();
byte[] messageBytes = new byte[messageLength];
dis.readFully(messageBytes);
System.err.println("下载失败: " + new String(messageBytes));
return;
}
if (responseType == 0x04) { // 成功响应
int messageLength = dis.readInt();
long fileSize = dis.readLong();
System.out.println("文件大小: " + fileSize + " 字节");
// 接收文件内容
File saveFile = new File(savePath, fileName);
try (FileOutputStream fos = new FileOutputStream(saveFile);
BufferedOutputStream bos = new BufferedOutputStream(fos)) {
byte[] buffer = new byte[BUFFER_SIZE];
long totalReceived = 0;
while (totalReceived < fileSize) {
int toRead = (int) Math.min(buffer.length, fileSize - totalReceived);
int bytesRead = dis.read(buffer, 0, toRead);
if (bytesRead == -1) {
throw new IOException("连接意外断开");
}
bos.write(buffer, 0, bytesRead);
totalReceived += bytesRead;
// 显示进度
if (totalReceived % (1024 * 1024) == 0 || totalReceived == fileSize) {
System.out.printf("下载进度: %.2f%%\n",
(double) totalReceived / fileSize * 100);
}
}
bos.flush();
System.out.println("文件下载完成: " + saveFile.getAbsolutePath());
}
}
} catch (IOException e) {
System.err.println("下载文件异常: " + e.getMessage());
}
}
public void listFiles() {
try {
// 发送文件列表命令
dos.writeByte(0x03);
dos.flush();
// 读取响应
readResponse();
} catch (IOException e) {
System.err.println("获取文件列表异常: " + e.getMessage());
}
}
private void readResponse() throws IOException {
byte responseType = dis.readByte();
int messageLength = dis.readInt();
if (messageLength > 0) {
byte[] messageBytes = new byte[messageLength];
dis.readFully(messageBytes);
String message = new String(messageBytes);
if (responseType == 0x04) {
System.out.println("服务器响应: " + message);
} else if (responseType == 0x05) {
System.err.println("服务器错误: " + message);
}
}
}
public void startInteractiveMode() {
Scanner scanner = new Scanner(System.in);
System.out.println("\n文件传输客户端交互模式");
System.out.println("可用命令:");
System.out.println(" upload <文件路径> - 上传文件");
System.out.println(" download <文件名> <保存目录> - 下载文件");
System.out.println(" list - 列出服务器文件");
System.out.println(" quit - 退出");
while (true) {
System.out.print("\n> ");
String input = scanner.nextLine().trim();
if (input.isEmpty()) {
continue;
}
String[] parts = input.split(" ", 3);
String command = parts[0].toLowerCase();
switch (command) {
case "upload":
if (parts.length >= 2) {
uploadFile(parts[1]);
} else {
System.err.println("用法: upload <文件路径>");
}
break;
case "download":
if (parts.length >= 3) {
downloadFile(parts[1], parts[2]);
} else {
System.err.println("用法: download <文件名> <保存目录>");
}
break;
case "list":
listFiles();
break;
case "quit":
System.out.println("退出客户端");
return;
default:
System.err.println("未知命令: " + command);
}
}
}
public static void main(String[] args) {
FileTransferClient client = new FileTransferClient();
if (client.connect()) {
try {
client.startInteractiveMode();
} finally {
client.disconnect();
}
}
}
}
# 2. 多人聊天室
# 2.1 聊天室服务器
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.text.SimpleDateFormat;
public class ChatRoomServer {
private static final int PORT = 9002;
private static final int MAX_CLIENTS = 100;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private volatile boolean running = false;
// 客户端管理
private final Map<String, ClientHandler> clients = new ConcurrentHashMap<>();
private final Map<String, Set<String>> chatRooms = new ConcurrentHashMap<>();
private final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
public ChatRoomServer() {
this.threadPool = Executors.newFixedThreadPool(MAX_CLIENTS);
// 创建默认聊天室
chatRooms.put("大厅", ConcurrentHashMap.newKeySet());
chatRooms.put("技术讨论", ConcurrentHashMap.newKeySet());
chatRooms.put("闲聊", ConcurrentHashMap.newKeySet());
}
public void start() throws IOException {
serverSocket = new ServerSocket(PORT);
running = true;
System.out.println("聊天室服务器启动,监听端口: " + PORT);
System.out.println("最大客户端数: " + MAX_CLIENTS);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
if (clients.size() >= MAX_CLIENTS) {
// 拒绝连接
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
out.println("服务器已满,请稍后再试");
}
clientSocket.close();
continue;
}
threadPool.submit(new ClientHandler(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("接受连接异常: " + e.getMessage());
}
}
}
}
public void stop() throws IOException {
running = false;
// 通知所有客户端服务器关闭
broadcastMessage("系统", "服务器即将关闭", null);
// 关闭所有客户端连接
for (ClientHandler client : clients.values()) {
client.disconnect();
}
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
threadPool.shutdown();
System.out.println("聊天室服务器已停止");
}
private void broadcastMessage(String sender, String message, String room) {
String timestamp = dateFormat.format(new Date());
String formattedMessage = String.format("[%s] %s: %s", timestamp, sender, message);
if (room == null) {
// 广播给所有用户
for (ClientHandler client : clients.values()) {
client.sendMessage(formattedMessage);
}
} else {
// 广播给指定聊天室的用户
Set<String> roomUsers = chatRooms.get(room);
if (roomUsers != null) {
for (String username : roomUsers) {
ClientHandler client = clients.get(username);
if (client != null) {
client.sendMessage("[" + room + "] " + formattedMessage);
}
}
}
}
}
private void sendPrivateMessage(String sender, String receiver, String message) {
ClientHandler receiverClient = clients.get(receiver);
ClientHandler senderClient = clients.get(sender);
String timestamp = dateFormat.format(new Date());
if (receiverClient != null) {
String privateMessage = String.format("[%s] [私聊来自 %s]: %s", timestamp, sender, message);
receiverClient.sendMessage(privateMessage);
if (senderClient != null) {
String confirmMessage = String.format("[%s] [私聊发送给 %s]: %s", timestamp, receiver, message);
senderClient.sendMessage(confirmMessage);
}
} else {
if (senderClient != null) {
senderClient.sendMessage("用户 " + receiver + " 不在线");
}
}
}
private class ClientHandler implements Runnable {
private final Socket socket;
private BufferedReader reader;
private PrintWriter writer;
private String username;
private String currentRoom = "大厅";
public ClientHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream(), true);
// 用户登录
if (handleLogin()) {
// 加入默认聊天室
joinRoom(currentRoom);
// 发送欢迎消息和帮助信息
sendWelcomeMessage();
// 处理用户消息
handleMessages();
}
} catch (IOException e) {
System.err.println("处理客户端异常: " + e.getMessage());
} finally {
cleanup();
}
}
private boolean handleLogin() throws IOException {
writer.println("欢迎来到聊天室!请输入您的用户名:");
for (int attempts = 0; attempts < 3; attempts++) {
String inputUsername = reader.readLine();
if (inputUsername == null || inputUsername.trim().isEmpty()) {
writer.println("用户名不能为空,请重新输入:");
continue;
}
inputUsername = inputUsername.trim();
if (inputUsername.length() > 20) {
writer.println("用户名过长(最多20字符),请重新输入:");
continue;
}
if (clients.containsKey(inputUsername)) {
writer.println("用户名已存在,请选择其他用户名:");
continue;
}
// 用户名可用
this.username = inputUsername;
clients.put(username, this);
System.out.println("用户 " + username + " 加入聊天室 (" + socket.getRemoteSocketAddress() + ")");
broadcastMessage("系统", username + " 加入了聊天室", null);
return true;
}
writer.println("登录失败,连接将被关闭");
return false;
}
private void sendWelcomeMessage() {
writer.println("\n=== 欢迎来到聊天室," + username + "! ===");
writer.println("当前在线用户数: " + clients.size());
writer.println("当前聊天室: " + currentRoom);
writer.println("\n可用命令:");
writer.println(" /rooms - 查看所有聊天室");
writer.println(" /join <房间名> - 加入聊天室");
writer.println(" /users - 查看在线用户");
writer.println(" /pm <用户名> <消息> - 发送私聊");
writer.println(" /help - 显示帮助");
writer.println(" /quit - 退出聊天室");
writer.println("直接输入消息即可在当前聊天室发言\n");
}
private void handleMessages() throws IOException {
String message;
while ((message = reader.readLine()) != null) {
message = message.trim();
if (message.isEmpty()) {
continue;
}
if (message.startsWith("/")) {
handleCommand(message);
} else {
// 普通聊天消息
broadcastMessage(username, message, currentRoom);
}
}
}
private void handleCommand(String command) {
String[] parts = command.split(" ", 3);
String cmd = parts[0].toLowerCase();
switch (cmd) {
case "/rooms":
showRooms();
break;
case "/join":
if (parts.length >= 2) {
String roomName = parts[1];
joinRoom(roomName);
} else {
sendMessage("用法: /join <房间名>");
}
break;
case "/users":
showUsers();
break;
case "/pm":
if (parts.length >= 3) {
String targetUser = parts[1];
String privateMessage = parts[2];
sendPrivateMessage(username, targetUser, privateMessage);
} else {
sendMessage("用法: /pm <用户名> <消息>");
}
break;
case "/help":
sendWelcomeMessage();
break;
case "/quit":
sendMessage("再见!");
disconnect();
break;
default:
sendMessage("未知命令: " + cmd + ",输入 /help 查看帮助");
}
}
private void showRooms() {
StringBuilder roomList = new StringBuilder("可用聊天室:\n");
for (Map.Entry<String, Set<String>> entry : chatRooms.entrySet()) {
String roomName = entry.getKey();
int userCount = entry.getValue().size();
roomList.append(" ").append(roomName)
.append(" (").append(userCount).append(" 人)\n");
}
sendMessage(roomList.toString());
}
private void joinRoom(String roomName) {
// 离开当前聊天室
if (currentRoom != null) {
Set<String> currentRoomUsers = chatRooms.get(currentRoom);
if (currentRoomUsers != null) {
currentRoomUsers.remove(username);
broadcastMessage("系统", username + " 离开了聊天室", currentRoom);
}
}
// 创建聊天室(如果不存在)
chatRooms.putIfAbsent(roomName, ConcurrentHashMap.newKeySet());
// 加入新聊天室
Set<String> newRoomUsers = chatRooms.get(roomName);
newRoomUsers.add(username);
currentRoom = roomName;
sendMessage("已加入聊天室: " + roomName);
broadcastMessage("系统", username + " 加入了聊天室", currentRoom);
}
private void showUsers() {
Set<String> roomUsers = chatRooms.get(currentRoom);
if (roomUsers != null && !roomUsers.isEmpty()) {
StringBuilder userList = new StringBuilder("聊天室 [" + currentRoom + "] 在线用户:\n");
for (String user : roomUsers) {
userList.append(" ").append(user).append("\n");
}
sendMessage(userList.toString());
} else {
sendMessage("当前聊天室没有用户");
}
}
public void sendMessage(String message) {
if (writer != null) {
writer.println(message);
}
}
public void disconnect() {
try {
if (socket != null && !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
System.err.println("关闭连接异常: " + e.getMessage());
}
}
private void cleanup() {
if (username != null) {
clients.remove(username);
// 从聊天室移除
if (currentRoom != null) {
Set<String> roomUsers = chatRooms.get(currentRoom);
if (roomUsers != null) {
roomUsers.remove(username);
}
}
System.out.println("用户 " + username + " 离开聊天室");
broadcastMessage("系统", username + " 离开了聊天室", null);
}
try {
if (socket != null && !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
System.err.println("关闭连接异常: " + e.getMessage());
}
}
}
public static void main(String[] args) {
ChatRoomServer server = new ChatRoomServer();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.stop();
} catch (IOException e) {
System.err.println("停止服务器异常: " + e.getMessage());
}
}));
try {
server.start();
} catch (IOException e) {
System.err.println("启动聊天室服务器失败: " + e.getMessage());
}
}
}
# 2.2 聊天室客户端
import java.io.*;
import java.net.*;
import java.util.Scanner;
public class ChatRoomClient {
private static final String SERVER_HOST = "localhost";
private static final int SERVER_PORT = 9002;
private Socket socket;
private BufferedReader reader;
private PrintWriter writer;
private volatile boolean connected = false;
public boolean connect() {
try {
socket = new Socket(SERVER_HOST, SERVER_PORT);
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream(), true);
connected = true;
System.out.println("连接到聊天室服务器: " + SERVER_HOST + ":" + SERVER_PORT);
return true;
} catch (IOException e) {
System.err.println("连接服务器失败: " + e.getMessage());
return false;
}
}
public void disconnect() {
connected = false;
try {
if (socket != null && !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
System.err.println("断开连接异常: " + e.getMessage());
}
}
public void startChat() {
// 启动消息接收线程
Thread receiveThread = new Thread(this::receiveMessages, "ReceiveThread");
receiveThread.setDaemon(true);
receiveThread.start();
// 主线程处理用户输入
Scanner scanner = new Scanner(System.in);
while (connected) {
try {
String input = scanner.nextLine();
if (input != null) {
writer.println(input);
if ("/quit".equals(input.trim())) {
break;
}
}
} catch (Exception e) {
System.err.println("输入异常: " + e.getMessage());
break;
}
}
scanner.close();
}
private void receiveMessages() {
try {
String message;
while (connected && (message = reader.readLine()) != null) {
System.out.println(message);
}
} catch (IOException e) {
if (connected) {
System.err.println("接收消息异常: " + e.getMessage());
}
} finally {
connected = false;
}
}
public static void main(String[] args) {
ChatRoomClient client = new ChatRoomClient();
if (client.connect()) {
try {
client.startChat();
} finally {
client.disconnect();
}
}
}
}
# 3. HTTP代理服务器
# 3.1 简单HTTP代理实现
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SimpleHTTPProxy {
private static final int PROXY_PORT = 8888;
private static final int BUFFER_SIZE = 8192;
private static final int CONNECT_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 30000;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private volatile boolean running = false;
public SimpleHTTPProxy() {
this.threadPool = Executors.newFixedThreadPool(50);
}
public void start() throws IOException {
serverSocket = new ServerSocket(PROXY_PORT);
running = true;
System.out.println("HTTP代理服务器启动,监听端口: " + PROXY_PORT);
System.out.println("请配置浏览器代理: localhost:" + PROXY_PORT);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
threadPool.submit(new ProxyHandler(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("接受连接异常: " + e.getMessage());
}
}
}
}
public void stop() throws IOException {
running = false;
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
threadPool.shutdown();
System.out.println("HTTP代理服务器已停止");
}
private class ProxyHandler implements Runnable {
private final Socket clientSocket;
public ProxyHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try {
handleRequest();
} catch (IOException e) {
System.err.println("处理代理请求异常: " + e.getMessage());
} finally {
try {
clientSocket.close();
} catch (IOException e) {
System.err.println("关闭客户端连接异常: " + e.getMessage());
}
}
}
private void handleRequest() throws IOException {
BufferedReader clientReader = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
// 读取HTTP请求行
String requestLine = clientReader.readLine();
if (requestLine == null || requestLine.isEmpty()) {
return;
}
System.out.println("代理请求: " + requestLine);
// 解析请求
String[] parts = requestLine.split(" ");
if (parts.length != 3) {
sendErrorResponse("400 Bad Request");
return;
}
String method = parts[0];
String url = parts[1];
String version = parts[2];
if ("CONNECT".equals(method)) {
// 处理HTTPS隧道
handleConnect(url, clientReader);
} else {
// 处理HTTP请求
handleHTTP(method, url, version, clientReader);
}
}
private void handleConnect(String url, BufferedReader clientReader) throws IOException {
// 解析目标主机和端口
String[] hostPort = url.split(":");
if (hostPort.length != 2) {
sendErrorResponse("400 Bad Request");
return;
}
String targetHost = hostPort[0];
int targetPort;
try {
targetPort = Integer.parseInt(hostPort[1]);
} catch (NumberFormatException e) {
sendErrorResponse("400 Bad Request");
return;
}
// 跳过剩余的HTTP头部
String line;
while ((line = clientReader.readLine()) != null && !line.isEmpty()) {
// 忽略头部
}
// 连接到目标服务器
try (Socket targetSocket = new Socket()) {
targetSocket.connect(new InetSocketAddress(targetHost, targetPort), CONNECT_TIMEOUT);
// 发送连接成功响应
PrintWriter clientWriter = new PrintWriter(clientSocket.getOutputStream(), true);
clientWriter.println("HTTP/1.1 200 Connection Established");
clientWriter.println();
System.out.println("建立隧道: " + targetHost + ":" + targetPort);
// 开始数据转发
startTunneling(clientSocket, targetSocket);
} catch (IOException e) {
System.err.println("连接目标服务器失败: " + e.getMessage());
sendErrorResponse("502 Bad Gateway");
}
}
private void handleHTTP(String method, String url, String version,
BufferedReader clientReader) throws IOException {
// 解析URL
URL targetURL;
try {
if (!url.startsWith("http://")) {
url = "http://" + url;
}
targetURL = new URL(url);
} catch (MalformedURLException e) {
sendErrorResponse("400 Bad Request");
return;
}
String targetHost = targetURL.getHost();
int targetPort = targetURL.getPort();
if (targetPort == -1) {
targetPort = 80;
}
// 读取HTTP头部
StringBuilder headers = new StringBuilder();
String line;
while ((line = clientReader.readLine()) != null && !line.isEmpty()) {
// 修改Host头部
if (line.toLowerCase().startsWith("host:")) {
headers.append("Host: ").append(targetHost);
if (targetPort != 80) {
headers.append(":").append(targetPort);
}
headers.append("\r\n");
} else {
headers.append(line).append("\r\n");
}
}
// 连接到目标服务器
try (Socket targetSocket = new Socket()) {
targetSocket.connect(new InetSocketAddress(targetHost, targetPort), CONNECT_TIMEOUT);
targetSocket.setSoTimeout(READ_TIMEOUT);
// 发送请求到目标服务器
PrintWriter targetWriter = new PrintWriter(targetSocket.getOutputStream(), true);
targetWriter.println(method + " " + targetURL.getPath() +
(targetURL.getQuery() != null ? "?" + targetURL.getQuery() : "") +
" " + version);
targetWriter.print(headers.toString());
targetWriter.println();
System.out.println("代理HTTP请求: " + method + " " + url);
// 转发响应
forwardResponse(targetSocket, clientSocket);
} catch (IOException e) {
System.err.println("处理HTTP请求异常: " + e.getMessage());
sendErrorResponse("502 Bad Gateway");
}
}
private void startTunneling(Socket clientSocket, Socket targetSocket) {
// 创建两个线程进行双向数据转发
Thread clientToTarget = new Thread(() -> {
try {
forwardData(clientSocket.getInputStream(), targetSocket.getOutputStream());
} catch (IOException e) {
// 连接断开
}
});
Thread targetToClient = new Thread(() -> {
try {
forwardData(targetSocket.getInputStream(), clientSocket.getOutputStream());
} catch (IOException e) {
// 连接断开
}
});
clientToTarget.start();
targetToClient.start();
try {
clientToTarget.join();
targetToClient.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void forwardData(InputStream input, OutputStream output) throws IOException {
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
output.flush();
}
}
private void forwardResponse(Socket targetSocket, Socket clientSocket) throws IOException {
InputStream targetInput = targetSocket.getInputStream();
OutputStream clientOutput = clientSocket.getOutputStream();
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = targetInput.read(buffer)) != -1) {
clientOutput.write(buffer, 0, bytesRead);
clientOutput.flush();
}
}
private void sendErrorResponse(String error) throws IOException {
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
writer.println("HTTP/1.1 " + error);
writer.println("Content-Type: text/html");
writer.println();
writer.println("<html><body><h1>" + error + "</h1></body></html>");
}
}
public static void main(String[] args) {
SimpleHTTPProxy proxy = new SimpleHTTPProxy();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
proxy.stop();
} catch (IOException e) {
System.err.println("停止代理服务器异常: " + e.getMessage());
}
}));
try {
proxy.start();
} catch (IOException e) {
System.err.println("启动HTTP代理服务器失败: " + e.getMessage());
}
}
}
# 4. 总结
# 4.1 实战案例总结
本章通过三个完整的Socket编程实战案例,展示了不同类型网络应用的实现:
文件传输系统:
- 自定义二进制协议
- 大文件传输处理
- 进度显示和错误处理
- 多线程并发支持
多人聊天室:
- 文本协议设计
- 多客户端管理
- 聊天室功能
- 私聊和广播消息
HTTP代理服务器:
- HTTP协议解析
- HTTPS隧道支持
- 数据转发机制
- 并发连接处理
# 4.2 关键技术点
协议设计:
- 明确的消息格式
- 错误处理机制
- 版本兼容性考虑
并发处理:
- 线程池管理
- 连接状态维护
- 资源同步控制
性能优化:
- 缓冲区大小调优
- 超时设置
- 内存使用优化
错误处理:
- 网络异常处理
- 优雅的连接关闭
- 日志记录
# 4.3 最佳实践
设计原则:
- 简单明确的协议
- 模块化的代码结构
- 完善的错误处理
性能考虑:
- 合理的线程池配置
- 适当的缓冲区大小
- 及时的资源释放
安全性:
- 输入验证
- 资源限制
- 访问控制
可维护性:
- 清晰的代码注释
- 详细的日志记录
- 模块化的设计
这些实战案例为开发实际的网络应用提供了完整的参考实现,可以根据具体需求进行扩展和优化。