本篇主要是多线程的基础知识,代码示例较多,有时间的可以逐个分析,具体细节都放在代码注释中了。
1. 理解线程:多任务执行的基石
1.1 什么是线程?
在现代操作系统中,进程是资源分配的基本单位,而线程是CPU调度的最小单位。可以把进程想象成一家公司,线程就是公司里的员工。
/*** 演示Java程序天生就是多线程程序* 即使最简单的main方法也会启动多个系统线程*/
public class MultiThread {public static void main(String[] args) {// 获取Java线程管理MXBeanThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();// 不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);// 遍历线程信息System.out.println("=== Java程序启动的线程列表 ===");for (ThreadInfo threadInfo : threadInfos) {System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());}}
}
输出示例:
=== Java程序启动的线程列表 ===
[4] Signal Dispatcher // 分发处理发送给JVM信号的线程
[3] Finalizer // 调用对象finalize方法的线程
[2] Reference Handler // 清除Reference的线程
[1] main // main线程,用户程序入口
1.2 为什么需要多线程?
三大核心优势:
- 充分利用多核处理器 - 避免CPU资源闲置
- 提升响应速度 - 后台任务不阻塞用户操作
- 更好的编程模型 - Java提供一致的多线程API
1.3 线程状态生命周期
新建(NEW) → 可运行(RUNNABLE) → 运行中↓
超时等待(TIMED_WAITING) ← 等待(WAITING) ← 阻塞(BLOCKED)↓终止(TERMINATED)
2. 线程的启动与安全终止
2.1 正确启动线程
/*** 线程启动最佳实践示例* 重点:设置有意义的线程名称,合理设置守护线程标志*/
public class ThreadStartExample {public static void main(String[] args) {// 推荐:为线程设置有意义的名称,便于问题排查Thread worker = new Thread(new Task(), "Data-Processor-1");worker.setDaemon(false); // 明确设置是否为守护线程worker.start(); // 正确启动方式,不要直接调用run()System.out.println("主线程继续执行,不会等待worker线程");}static class Task implements Runnable {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " 开始执行");try {// 模拟工作任务Thread.sleep(1000);} catch (InterruptedException e) {System.out.println("任务被中断");}System.out.println(Thread.currentThread().getName() + " 执行完成");}}
}
2.2 安全终止线程的两种方式
方式一:使用中断机制
/*** 使用中断机制安全终止线程* 重点:理解中断异常处理的最佳实践*/
public class InterruptExample {public static void main(String[] args) throws InterruptedException {Thread worker = new Thread(new InterruptibleTask(), "Interruptible-Worker");worker.start();// 主线程等待2秒后中断工作线程TimeUnit.SECONDS.sleep(2);System.out.println("主线程发送中断信号");worker.interrupt(); // 发送中断信号// 等待工作线程完全退出worker.join();System.out.println("工作线程已安全退出");}static class InterruptibleTask implements Runnable {@Overridepublic void run() {while (!Thread.currentThread().isInterrupted()) {try {// 模拟工作 - 这里可能抛出InterruptedExceptionSystem.out.println("Working...");TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {/*** 关键理解点:为什么需要重新设置中断状态?* * 当线程在阻塞状态(如sleep、wait、join)时被中断,* Java会做两件事:* 1. 抛出InterruptedException* 2. 清除线程的中断状态(设为false)* * 这导致循环条件 !Thread.currentThread().isInterrupted() * 会继续为true,线程无法退出。* * 因此我们需要在捕获异常后重新设置中断状态,* 这样循环条件就能检测到中断,安全退出。*/System.out.println("捕获到中断异常,重新设置中断状态");Thread.currentThread().interrupt(); // 重新设置中断标志}}System.out.println("线程安全退出,中断状态: " + Thread.currentThread().isInterrupted());}}
}
方式二:使用标志位
/*** 使用volatile标志位安全终止线程* 适用于没有阻塞调用或需要更复杂退出逻辑的场景*/
public class FlagShutdownExample {// volatile保证可见性,确保所有线程看到最新的值private volatile boolean running = true;private final Thread workerThread;public FlagShutdownExample() {this.workerThread = new Thread(this::doWork, "Flag-Controlled-Worker");}public void start() {workerThread.start();}/*** 优雅停止工作线程*/public void stop() {System.out.println("请求停止工作线程");running = false;// 同时发送中断,处理可能存在的阻塞情况workerThread.interrupt();}/*** 工作线程的主循环* 同时检查标志位和中断状态,提供双重保障*/private void doWork() {try {while (running && !Thread.currentThread().isInterrupted()) {// 执行工作任务processData();}} finally {// 无论何种方式退出,都执行清理工作cleanup();}System.out.println("工作线程已安全退出");}private void processData() {try {// 模拟数据处理System.out.println("处理数据中...");Thread.sleep(300);} catch (InterruptedException e) {System.out.println("处理数据时被中断");// 收到中断,但可能还想继续处理,所以不重新设置中断// 让循环条件来检查running标志}}private void cleanup() {System.out.println("执行资源清理工作...");// 关闭文件、数据库连接等资源}public static void main(String[] args) throws InterruptedException {FlagShutdownExample example = new FlagShutdownExample();example.start();// 运行3秒后停止Thread.sleep(3000);example.stop();// 等待工作线程退出example.workerThread.join();}
}
3. 线程间通信:协作的艺术
3.1 volatile关键字:共享状态可见性
/*** volatile关键字示例* 保证多线程间的可见性,但不保证原子性*/
public class VolatileExample {// volatile确保shutdownRequested的修改对所有线程立即可见private volatile boolean shutdownRequested = false;private int operationCount = 0; // 非volatile,不保证可见性public void shutdown() {shutdownRequested = true; // 所有线程立即可见System.out.println("关闭请求已设置");}public void doWork() {while (!shutdownRequested) {// 正常工作循环operationCount++; // 非原子操作,可能有问题try {Thread.sleep(100);} catch (InterruptedException e) {System.out.println("工作被中断");Thread.currentThread().interrupt();break;}}System.out.println("工作线程退出,操作次数: " + operationCount);}
}
3.2 synchronized关键字:互斥访问
/*** synchronized关键字示例* 保证原子性和可见性,但可能影响性能*/
public class SynchronizedCounter {private int count = 0;/*** 同步方法 - 锁对象是当前实例(this)*/public synchronized void increment() {count++; // 原子操作}/*** 同步块 - 可以更细粒度控制锁的范围*/public void decrement() {// 只同步关键部分,减少锁持有时间synchronized (this) {count--;}// 这里可以执行非同步操作}/*** 同步的get方法,保证看到最新值*/public synchronized int getCount() {return count;}/*** 静态同步方法 - 锁对象是类的Class对象*/public static synchronized void staticMethod() {// 静态同步方法使用Class对象作为锁}
}
3.3 等待/通知机制:经典生产者-消费者模式
/*** 生产者-消费者模式示例* 演示wait/notify机制的正确使用*/
public class WaitNotifyExample {private final Object lock = new Object(); // 共享锁对象private final Queue<String> queue = new LinkedList<>();private final int MAX_SIZE = 5;/*** 生产者方法*/public void produce(String data) throws InterruptedException {synchronized (lock) {// 必须使用while循环检查条件,避免虚假唤醒while (queue.size() >= MAX_SIZE) {System.out.println("队列已满(" + queue.size() + "),生产者等待");lock.wait(); // 释放锁并等待}queue.offer(data);System.out.println("生产: " + data + ",队列大小: " + queue.size());// 通知所有等待的消费者lock.notifyAll();}}/*** 消费者方法*/public String consume() throws InterruptedException {synchronized (lock) {// 必须使用while循环检查条件while (queue.isEmpty()) {System.out.println("队列为空,消费者等待");lock.wait(); // 释放锁并等待}String data = queue.poll();System.out.println("消费: " + data + ",队列大小: " + queue.size());// 通知所有等待的生产者lock.notifyAll();return data;}}/*** 测试生产者消费者模式*/public static void main(String[] args) {WaitNotifyExample example = new WaitNotifyExample();// 启动生产者线程Thread producer = new Thread(() -> {try {for (int i = 0; i < 10; i++) {example.produce("Data-" + i);Thread.sleep(200);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "Producer");// 启动消费者线程Thread consumer = new Thread(() -> {try {for (int i = 0; i < 10; i++) {example.consume();Thread.sleep(300);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "Consumer");producer.start();consumer.start();}
}
等待/通知经典范式:
// 消费者范式 - 永远在循环中调用wait()
synchronized(锁对象) {while(条件不满足) {锁对象.wait(); // 等待时会释放锁}// 条件满足,处理业务逻辑
}// 生产者范式
synchronized(锁对象) {改变条件; // 改变等待条件锁对象.notifyAll(); // 通知所有等待线程
}
3.4 Thread.join():线程依赖执行
/*** Thread.join()使用示例* 实现线程间的顺序执行依赖*/
public class JoinExample {public static void main(String[] args) throws InterruptedException {System.out.println("主线程开始");Thread previous = Thread.currentThread();// 创建5个有依赖关系的线程for (int i = 0; i < 5; i++) {Thread thread = new Thread(new DependentTask(previous), "Worker-" + i);thread.start();previous = thread; // 设置依赖链}// 主线程先做一些工作TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + " 完成初始化工作");// 等待所有线程完成(实际上由最后一个Worker-4 join主线程)}static class DependentTask implements Runnable {private final Thread dependency; // 依赖的线程public DependentTask(Thread dependency) {this.dependency = dependency;}@Overridepublic void run() {try {// 等待依赖的线程执行完成System.out.println(Thread.currentThread().getName() + " 等待 " + dependency.getName());dependency.join();// 依赖线程完成后开始自己的工作System.out.println(Thread.currentThread().getName() + " 开始工作");TimeUnit.MILLISECONDS.sleep(500); // 模拟工作System.out.println(Thread.currentThread().getName() + " 完成工作");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + " 被中断");Thread.currentThread().interrupt();}}}
}
3.5 ThreadLocal深入解析:线程局部变量
/*** ThreadLocal深度解析* 理解原理、使用场景和内存泄漏防护*/
public class ThreadLocalExample {/*** ThreadLocal基本使用:每个线程独立的SimpleDateFormat* 避免SimpleDateFormat的线程安全问题*/private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));/*** ThreadLocal用于用户上下文传递* 在Web应用中非常有用,避免在方法参数中传递用户信息*/private static final ThreadLocal<UserContext> USER_CONTEXT = new ThreadLocal<>();/*** ThreadLocal用于事务上下文*/private static final ThreadLocal<TransactionContext> TRANSACTION_CONTEXT =new ThreadLocal<>();/*** 可继承的ThreadLocal:子线程可以继承父线程的值*/private static final InheritableThreadLocal<String> INHERITABLE_CONTEXT =new InheritableThreadLocal<>();/*** 处理用户请求的示例方法*/public void processRequest(User user) {// 设置用户上下文到当前线程USER_CONTEXT.set(new UserContext(user));try {// 使用线程安全的日期格式化String timestamp = DATE_FORMATTER.get().format(new Date());System.out.println(Thread.currentThread().getName() + " - 用户: " + user.getName() + ", 时间: " + timestamp);// 执行业务逻辑 - 任何方法都可以获取用户上下文,无需传递参数doBusinessLogic();} finally {/*** 关键:必须清理ThreadLocal,防止内存泄漏!* * 原因:* 1. ThreadLocalMap的key是弱引用,会被GC回收* 2. 但value是强引用,不会被自动回收* 3. 如果线程长时间存活(如线程池中的线程),会导致value无法释放* 4. 调用remove()方法显式清理*/USER_CONTEXT.remove();DATE_FORMATTER.remove(); // 清理所有使用的ThreadLocal}}private void doBusinessLogic() {// 在任何地方都可以获取用户上下文,无需方法参数传递UserContext context = USER_CONTEXT.get();if (context != null) {System.out.println("执行业务逻辑,用户: " + context.getUser().getName());}// 使用线程安全的日期格式化String now = DATE_FORMATTER.get().format(new Date());System.out.println("业务执行时间: " + now);}/*** 演示ThreadLocal的内存泄漏问题*/public void demonstrateMemoryLeak() {// 错误的用法:不清理ThreadLocalThreadLocal<byte[]> leakyLocal = new ThreadLocal<>();leakyLocal.set(new byte[1024 * 1024]); // 1MB数据// 如果没有调用 leakyLocal.remove(), 即使leakyLocal=null,// 线程的ThreadLocalMap中仍然保留着这个Entry// 在线程池场景下,线程重用会导致内存不断增长}/*** ThreadLocal最佳实践:使用try-finally确保清理*/public void bestPractice(User user) {USER_CONTEXT.set(new UserContext(user));try {// 业务处理doBusinessLogic();} finally {// 确保清理,即使在业务逻辑中发生异常USER_CONTEXT.remove();}}/*** 测试多线程环境下的ThreadLocal*/public static void main(String[] args) throws InterruptedException {ThreadLocalExample example = new ThreadLocalExample();// 创建多个线程,每个线程有独立的ThreadLocal值Thread[] threads = new Thread[3];for (int i = 0; i < threads.length; i++) {final int userId = i;threads[i] = new Thread(() -> {User user = new User("User-" + userId);example.processRequest(user);}, "Thread-" + i);threads[i].start();}// 等待所有线程完成for (Thread thread : threads) {thread.join();}System.out.println("所有线程执行完成");}// 辅助类定义static class UserContext {private final User user;public UserContext(User user) { this.user = user; }public User getUser() { return user; }}static class User {private final String name;public User(String name) { this.name = name; }public String getName() { return name; }}static class TransactionContext {// 事务相关信息}
}/*** ThreadLocal高级用法:自定义ThreadLocal子类*/
class AdvancedThreadLocal<T> extends ThreadLocal<T> {/*** 初始值 - 当线程第一次调用get()时,如果还没有设置值,会调用此方法*/@Overrideprotected T initialValue() {System.out.println(Thread.currentThread().getName() + " - 初始化ThreadLocal值");return null; // 返回默认初始值}/*** 子线程值继承 - 仅对InheritableThreadLocal有效* 当创建新线程时,可以控制如何从父线程继承值*/protected T childValue(T parentValue) {System.out.println("子线程继承父线程的值: " + parentValue);return parentValue; // 直接继承,也可以进行转换}
}
4. 线程应用实例:从理论到实践
4.1 等待超时模式:避免无限期等待
/*** 等待超时模式实现* 在等待/通知机制基础上增加超时控制*/
public class TimeoutWait<T> {private T result;/*** 带超时的获取方法* @param timeoutMs 超时时间(毫秒)* @return 结果,超时返回null*/public synchronized T get(long timeoutMs) throws InterruptedException {long endTime = System.currentTimeMillis() + timeoutMs;long remaining = timeoutMs;// 循环检查条件和剩余时间while (result == null && remaining > 0) {wait(remaining); // 等待剩余时间remaining = endTime - System.currentTimeMillis(); // 更新剩余时间}return result; // 可能为null(超时)}/*** 设置结果并通知所有等待线程*/public synchronized void set(T value) {this.result = value;notifyAll(); // 通知所有等待的线程}/*** 演示超时等待的使用*/public static void main(String[] args) throws InterruptedException {TimeoutWait<String> waitObject = new TimeoutWait<>();// 消费者线程 - 等待结果,最多等3秒Thread consumer = new Thread(() -> {try {System.out.println("消费者开始等待结果...");String result = waitObject.get(3000);if (result != null) {System.out.println("消费者收到结果: " + result);} else {System.out.println("消费者等待超时");}} catch (InterruptedException e) {System.out.println("消费者被中断");}});// 生产者线程 - 2秒后产生结果Thread producer = new Thread(() -> {try {Thread.sleep(2000); // 模拟生产耗时waitObject.set("生产完成的数据");System.out.println("生产者完成工作");} catch (InterruptedException e) {System.out.println("生产者被中断");}});consumer.start();producer.start();consumer.join();producer.join();}
}
4.2 数据库连接池实现
/*** 简易数据库连接池实现* 演示资源池化和等待超时模式的实际应用*/
public class SimpleConnectionPool {private final LinkedList<Connection> pool = new LinkedList<>();private final int maxSize;private int createdCount = 0;public SimpleConnectionPool(int initialSize, int maxSize) {this.maxSize = maxSize;// 初始化连接池for (int i = 0; i < initialSize; i++) {pool.add(createConnection());}System.out.println("连接池初始化完成,初始连接数: " + initialSize);}/*** 获取连接,支持超时*/public Connection getConnection(long timeoutMs) throws InterruptedException, TimeoutException {synchronized (pool) {// 如果池中有可用连接,立即返回if (!pool.isEmpty()) {return pool.removeFirst();}// 池为空,但还可以创建新连接if (createdCount < maxSize) {Connection conn = createConnection();System.out.println("创建新连接,当前连接数: " + createdCount);return conn;}// 等待可用连接long endTime = System.currentTimeMillis() + timeoutMs;long remaining = timeoutMs;while (pool.isEmpty() && remaining > 0) {System.out.println(Thread.currentThread().getName() + " 等待连接,剩余时间: " + remaining + "ms");pool.wait(remaining);remaining = endTime - System.currentTimeMillis();}if (!pool.isEmpty()) {return pool.removeFirst();}throw new TimeoutException("获取连接超时,等待 " + timeoutMs + "ms");}}/*** 归还连接到池中*/public void releaseConnection(Connection conn) {if (conn != null) {synchronized (pool) {if (pool.size() < maxSize) {pool.addLast(conn);pool.notifyAll(); // 通知等待的线程System.out.println("连接已归还,当前池大小: " + pool.size());} else {// 连接数超过上限,关闭连接closeConnection(conn);createdCount--;System.out.println("连接池已满,关闭连接");}}}}/*** 创建新连接*/private Connection createConnection() {createdCount++;// 这里应该是真实的数据库连接创建逻辑System.out.println("创建第 " + createdCount + " 个连接");return new MockConnection();}/*** 关闭连接*/private void closeConnection(Connection conn) {try {conn.close();} catch (Exception e) {System.err.println("关闭连接失败: " + e.getMessage());}}/*** 获取连接池状态*/public synchronized void printStatus() {System.out.println("连接池状态 - 池中连接: " + pool.size() + ", 总创建数: " + createdCount + ", 最大限制: " + maxSize);}// 模拟数据库连接static class MockConnection implements Connection {private final String id = UUID.randomUUID().toString().substring(0, 8);@Overridepublic void close() {System.out.println("关闭连接: " + id);}@Overridepublic String toString() {return "MockConnection{" + "id='" + id + '\'' + '}';}// 其他Connection接口方法...@Override public void commit() {}@Override public void rollback() {}// ... 简化实现}static class TimeoutException extends Exception {public TimeoutException(String message) { super(message); }}
}
4.3 线程池核心技术实现
/*** 简易线程池实现* 理解线程池的核心原理和工作机制*/
public class SimpleThreadPool implements Executor {private final BlockingQueue<Runnable> workQueue;private final List<WorkerThread> workers;private volatile boolean isShutdown = false;private final int poolSize;/*** 创建线程池*/public SimpleThreadPool(int poolSize) {this.poolSize = poolSize;this.workQueue = new LinkedBlockingQueue<>();this.workers = new ArrayList<>(poolSize);System.out.println("初始化线程池,大小: " + poolSize);// 创建工作线程for (int i = 0; i < poolSize; i++) {WorkerThread worker = new WorkerThread("Pool-Worker-" + i);workers.add(worker);worker.start();}}/*** 提交任务到线程池*/@Overridepublic void execute(Runnable task) {if (isShutdown) {throw new RejectedExecutionException("线程池已关闭,拒绝新任务");}if (task == null) {throw new NullPointerException("任务不能为null");}try {workQueue.put(task); // 阻塞直到有空间System.out.println("任务已提交,队列大小: " + workQueue.size());} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException("提交任务时被中断", e);}}/*** 优雅关闭线程池*/public void shutdown() {System.out.println("开始关闭线程池...");isShutdown = true;// 中断所有工作线程for (WorkerThread worker : workers) {worker.interrupt();}}/*** 强制关闭线程池*/public void shutdownNow() {shutdown();workQueue.clear(); // 清空等待队列}/*** 等待线程池完全终止*/public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {long endTime = System.currentTimeMillis() + unit.toMillis(timeout);for (WorkerThread worker : workers) {long remaining = endTime - System.currentTimeMillis();if (remaining <= 0) {return false; // 超时}worker.join(remaining);}return true;}/*** 获取线程池状态*/public void printStatus() {System.out.println("线程池状态 - 工作线程: " + workers.size() + ", 等待任务: " + workQueue.size() + ", 已关闭: " + isShutdown);}/*** 工作线程实现*/private class WorkerThread extends Thread {public WorkerThread(String name) {super(name);}@Overridepublic void run() {System.out.println(getName() + " 开始运行");while (!isShutdown || !workQueue.isEmpty()) {try {// 从队列获取任务,支持超时以便检查关闭状态Runnable task = workQueue.poll(1, TimeUnit.SECONDS);if (task != null) {System.out.println(getName() + " 开始执行任务");task.run();System.out.println(getName() + " 任务执行完成");}} catch (InterruptedException e) {// 响应中断,退出线程System.out.println(getName() + " 收到中断信号");break;} catch (Exception e) {// 捕获任务执行异常,避免工作线程退出System.err.println(getName() + " 任务执行异常: " + e.getMessage());}}System.out.println(getName() + " 退出");}}/*** 测试线程池*/public static void main(String[] args) throws InterruptedException {SimpleThreadPool pool = new SimpleThreadPool(3);// 提交10个任务for (int i = 0; i < 10; i++) {final int taskId = i;pool.execute(() -> {System.out.println(Thread.currentThread().getName() + " 执行任务 " + taskId);try {Thread.sleep(1000); // 模拟任务执行} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 查看状态pool.printStatus();// 等待任务执行Thread.sleep(5000);// 关闭线程池pool.shutdown();if (pool.awaitTermination(3, TimeUnit.SECONDS)) {System.out.println("线程池已完全关闭");} else {System.out.println("线程池关闭超时,强制关闭");pool.shutdownNow();}}
}
4.4 基于线程池的Web服务器
/*** 基于线程池的简易Web服务器* 演示线程池在实际项目中的应用*/
public class SimpleHttpServer {private final ExecutorService threadPool;private final ServerSocket serverSocket;private final String basePath;private volatile boolean isRunning = false;/*** 创建HTTP服务器*/public SimpleHttpServer(int port, int poolSize, String basePath) throws IOException {this.threadPool = Executors.newFixedThreadPool(poolSize);this.serverSocket = new ServerSocket(port);this.basePath = basePath;// 确保基础路径存在File baseDir = new File(basePath);if (!baseDir.exists() || !baseDir.isDirectory()) {throw new IllegalArgumentException("基础路径不存在或不是目录: " + basePath);}}/*** 启动服务器*/public void start() {if (isRunning) {throw new IllegalStateException("服务器已经在运行");}isRunning = true;System.out.println("HTTP服务器启动,端口: " + serverSocket.getLocalPort() + ", 基础路径: " + basePath);// 主接受循环Thread acceptorThread = new Thread(this::acceptConnections, "Server-Acceptor");acceptorThread.setDaemon(false);acceptorThread.start();}/*** 接受客户端连接*/private void acceptConnections() {while (isRunning) {try {Socket clientSocket = serverSocket.accept();System.out.println("接受客户端连接: " + clientSocket.getInetAddress().getHostAddress());// 提交到线程池处理threadPool.execute(new HttpHandler(clientSocket, basePath));} catch (IOException e) {if (isRunning) {System.err.println("接受连接错误: " + e.getMessage());}// 服务器关闭时的异常是正常的}}System.out.println("服务器停止接受新连接");}/*** 停止服务器*/public void stop() {System.out.println("正在停止服务器...");isRunning = false;try {serverSocket.close();} catch (IOException e) {System.err.println("关闭ServerSocket错误: " + e.getMessage());}// 优雅关闭线程池threadPool.shutdown();try {if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {System.out.println("强制关闭线程池");threadPool.shutdownNow();}} catch (InterruptedException e) {threadPool.shutdownNow();Thread.currentThread().interrupt();}System.out.println("服务器已停止");}/*** HTTP请求处理器*/private static class HttpHandler implements Runnable {private final Socket socket;private final String basePath;public HttpHandler(Socket socket, String basePath) {this.socket = socket;this.basePath = basePath;}@Overridepublic void run() {// 使用ThreadLocal记录请求上下文ThreadLocal<String> requestId = ThreadLocal.withInitial(() -> UUID.randomUUID().toString().substring(0, 8));try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {String requestIdValue = requestId.get();System.out.println("[" + requestIdValue + "] 开始处理请求");// 解析HTTP请求String requestLine = in.readLine();if (requestLine == null || requestLine.isEmpty()) {sendError(out, 400, "Bad Request");return;}String[] parts = requestLine.split(" ");if (parts.length < 2) {sendError(out, 400, "Bad Request");return;}String method = parts[0];String path = parts[1];System.out.println("[" + requestIdValue + "] " + method + " " + path);// 只处理GET请求if (!"GET".equals(method)) {sendError(out, 405, "Method Not Allowed");return;}// 处理请求路径handleRequest(path, out, requestIdValue);} catch (IOException e) {System.err.println("处理请求IO错误: " + e.getMessage());} finally {// 清理ThreadLocalrequestId.remove();try {socket.close();} catch (IOException e) {// 忽略关闭异常}}}private void handleRequest(String path, PrintWriter out, String requestId) {try {// 简单路径安全校验if (path.contains("..")) {sendError(out, 403, "Forbidden");return;}// 默认页面if ("/".equals(path)) {path = "/index.html";}File file = new File(basePath + path);// 文件不存在if (!file.exists() || !file.isFile()) {sendError(out, 404, "Not Found");return;}// 安全检查:确保文件在基础路径内if (!file.getCanonicalPath().startsWith(new File(basePath).getCanonicalPath())) {sendError(out, 403, "Forbidden");return;}// 根据文件类型设置Content-TypeString contentType = getContentType(file.getName());// 读取文件内容byte[] content = Files.readAllBytes(file.toPath());// 发送HTTP响应out.println("HTTP/1.1 200 OK");out.println("Server: SimpleHttpServer");out.println("Content-Type: " + contentType);out.println("Content-Length: " + content.length);out.println("Connection: close");out.println(); // 空行分隔头部和主体out.flush();// 发送文件内容socket.getOutputStream().write(content);socket.getOutputStream().flush();System.out.println("[" + requestId + "] 响应发送完成,文件: " + file.getName());} catch (IOException e) {System.err.println("[" + requestId + "] 处理请求错误: " + e.getMessage());sendError(out, 500, "Internal Server Error");}}private String getContentType(String filename) {if (filename.endsWith(".html") || filename.endsWith(".htm")) {return "text/html; charset=UTF-8";} else if (filename.endsWith(".css")) {return "text/css";} else if (filename.endsWith(".js")) {return "application/javascript";} else if (filename.endsWith(".jpg") || filename.endsWith(".jpeg")) {return "image/jpeg";} else if (filename.endsWith(".png")) {return "image/png";} else {return "application/octet-stream";}}private void sendError(PrintWriter out, int code, String message) {out.println("HTTP/1.1 " + code + " " + message);out.println("Content-Type: text/html");out.println("Connection: close");out.println();out.println("<html><body><h1>" + code + " " + message + "</h1></body></html>");out.flush();}}/*** 启动服务器示例*/public static void main(String[] args) {try {// 创建服务器,端口8080,线程池大小10,基础路径为当前目录SimpleHttpServer server = new SimpleHttpServer(8080, 10, ".");server.start();System.out.println("服务器已启动,访问 http://localhost:8080/");System.out.println("按Enter键停止服务器...");// 等待用户输入停止服务器System.in.read();server.stop();} catch (Exception e) {System.err.println("服务器启动失败: " + e.getMessage());e.printStackTrace();}}
}
5. 性能优化与最佳实践
5.1 线程池大小配置策略
/*** 线程池配置策略* 根据任务类型合理配置线程池参数*/
public class ThreadPoolConfig {/*** CPU密集型任务配置* 特点:大量计算,很少IO等待* 策略:线程数 ≈ CPU核心数,避免过多线程竞争CPU*/public static ExecutorService newCpuIntensivePool() {int coreCount = Runtime.getRuntime().availableProcessors();int threadCount = coreCount + 1; // +1 确保CPU不会空闲System.out.println("CPU密集型线程池: " + threadCount + " 线程");return Executors.newFixedThreadPool(threadCount);}/*** IO密集型任务配置* 特点:大量等待(网络、磁盘IO)* 策略:线程数 ≈ CPU核心数 * (1 + 等待时间/计算时间)*/public static ExecutorService newIoIntensivePool() {int coreCount = Runtime.getRuntime().availableProcessors();int threadCount = coreCount * 2; // 经验值,可根据实际情况调整System.out.println("IO密集型线程池: " + threadCount + " 线程");return Executors.newFixedThreadPool(threadCount);}/*** 混合型任务配置* 根据CPU和IO比例动态调整*/public static ExecutorService newMixedPool(double cpuRatio, double ioRatio) {int coreCount = Runtime.getRuntime().availableProcessors();int threadCount = (int) (coreCount * cpuRatio + ioRatio);threadCount = Math.max(1, Math.min(threadCount, 100)); // 合理范围限制System.out.println("混合型线程池: " + threadCount + " 线程");return Executors.newFixedThreadPool(threadCount);}/*** 自定义线程池 - 更精细的控制*/public static ThreadPoolExecutor newCustomPool(int corePoolSize, int maxPoolSize,long keepAliveTime,int queueSize) {return new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueSize),new CustomThreadFactory(),new CustomRejectionPolicy());}/*** 自定义线程工厂,设置更有意义的线程名称*/static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger counter = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "CustomPool-Thread-" + counter.getAndIncrement());thread.setDaemon(false);thread.setPriority(Thread.NORM_PRIORITY);return thread;}}/*** 自定义拒绝策略*/static class CustomRejectionPolicy implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.err.println("任务被拒绝,当前活跃线程: " + executor.getActiveCount() + ", 队列大小: " + executor.getQueue().size());// 可以记录日志、发送告警等throw new RejectedExecutionException("线程池已满,拒绝新任务");}}
}
5.2 避免常见陷阱
1. 死锁预防与检测
/*** 死锁预防示例* 演示如何避免和检测死锁*/
public class DeadlockPrevention {/*** 死锁产生示例 - 错误的锁顺序*/public static class DeadlockExample {private final Object lock1 = new Object();private final Object lock2 = new Object();public void method1() {synchronized (lock1) {System.out.println(Thread.currentThread().getName() + " 获得 lock1");try { Thread.sleep(100); } catch (InterruptedException e) {}synchronized (lock2) { // 可能死锁System.out.println(Thread.currentThread().getName() + " 获得 lock2");}}}public void method2() {synchronized (lock2) { // 不同的锁顺序System.out.println(Thread.currentThread().getName() + " 获得 lock2");try { Thread.sleep(100); } catch (InterruptedException e) {}synchronized (lock1) { // 可能死锁System.out.println(Thread.currentThread().getName() + " 获得 lock1");}}}}/*** 死锁预防 - 统一的锁顺序*/public static class DeadlockPreventionExample {private final Object lock1 = new Object();private final Object lock2 = new Object();/*** 使用统一的锁获取顺序来预防死锁* 总是先获取lock1,再获取lock2*/public void method1() {synchronized (lock1) {System.out.println(Thread.currentThread().getName() + " 获得 lock1");try { Thread.sleep(100); } catch (InterruptedException e) {}synchronized (lock2) {System.out.println(Thread.currentThread().getName() + " 获得 lock2");// 业务逻辑}}}public void method2() {synchronized (lock1) { // 相同的锁顺序System.out.println(Thread.currentThread().getName() + " 获得 lock1");try { Thread.sleep(100); } catch (InterruptedException e) {}synchronized (lock2) {System.out.println(Thread.currentThread().getName() + " 获得 lock2");// 业务逻辑}}}}/*** 使用tryLock避免死锁*/public static class TryLockExample {private final Lock lock1 = new ReentrantLock();private final Lock lock2 = new ReentrantLock();public boolean tryDoWork() {// 尝试获取第一个锁if (lock1.tryLock()) {try {System.out.println(Thread.currentThread().getName() + " 获得 lock1");// 尝试获取第二个锁if (lock2.tryLock()) {try {System.out.println(Thread.currentThread().getName() + " 获得 lock2");// 执行业务逻辑return true;} finally {lock2.unlock();}}} finally {lock1.unlock();}}return false; // 获取锁失败}}/*** 死锁检测工具*/public static void detectDeadlock() {ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();long[] threadIds = threadBean.findDeadlockedThreads();if (threadIds != null) {System.err.println("检测到死锁!");ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds);for (ThreadInfo threadInfo : threadInfos) {System.err.println("死锁线程: " + threadInfo.getThreadName());System.err.println("等待锁: " + threadInfo.getLockName());System.err.println("被线程持有: " + threadInfo.getLockOwnerName());}} else {System.out.println("未检测到死锁");}}
}
2. 资源清理最佳实践
/*** 资源清理最佳实践* 演示如何正确管理和清理多线程资源*/
public class ResourceCleanup implements AutoCloseable {private final ExecutorService executor;private final List<AutoCloseable> resources;public ResourceCleanup(int threadPoolSize) {this.executor = Executors.newFixedThreadPool(threadPoolSize);this.resources = new ArrayList<>();System.out.println("资源管理器初始化完成,线程池大小: " + threadPoolSize);}/*** 提交任务*/public <T> Future<T> submit(Callable<T> task) {return executor.submit(task);}/*** 注册需要管理的资源*/public void registerResource(AutoCloseable resource) {synchronized (resources) {resources.add(resource);}}/*** 实现AutoCloseable,支持try-with-resources*/@Overridepublic void close() {System.out.println("开始清理资源...");// 1. 关闭线程池shutdownExecutor();// 2. 关闭所有注册的资源closeRegisteredResources();System.out.println("资源清理完成");}/*** 优雅关闭线程池*/private void shutdownExecutor() {executor.shutdown(); // 停止接受新任务try {// 等待现有任务完成if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {System.out.println("线程池关闭超时,尝试强制关闭");// 取消所有未开始的任务executor.shutdownNow();// 再次等待if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {System.err.println("线程池无法完全关闭");}}} catch (InterruptedException e) {// 重新中断并强制关闭executor.shutdownNow();Thread.currentThread().interrupt();}}/*** 关闭所有注册的资源*/private void closeRegisteredResources() {synchronized (resources) {for (AutoCloseable resource : resources) {try {resource.close();} catch (Exception e) {System.err.println("关闭资源时出错: " + e.getMessage());// 继续关闭其他资源,不抛出异常}}resources.clear();}}/*** 使用示例*/public static void main(String[] args) {// 使用try-with-resources确保资源清理try (ResourceCleanup manager = new ResourceCleanup(3)) {// 注册一些资源manager.registerResource(() -> System.out.println("关闭数据库连接"));manager.registerResource(() -> System.out.println("关闭网络连接"));// 提交任务Future<String> future = manager.submit(() -> {Thread.sleep(1000);return "任务完成";});// 获取结果String result = future.get();System.out.println("任务结果: " + result);} catch (Exception e) {System.err.println("执行出错: " + e.getMessage());}// 这里会自动调用close()方法清理资源}
}
6. 总结与核心要点
6.1 关键知识点回顾
1. 中断异常处理的核心理解
try {Thread.sleep(1000);
} catch (InterruptedException e) {/*** 必须重新设置中断状态的原因:* 1. 当阻塞方法抛出InterruptedException时,会清除线程的中断状态* 2. 如果不重新设置,调用者无法知道线程曾被中断* 3. 这破坏了中断的传播机制*/Thread.currentThread().interrupt(); // 恢复中断状态// 或者直接抛出异常:throw new RuntimeException(e);
}
2. ThreadLocal内存管理
public void usingThreadLocal() {try {threadLocal.set(someValue);// 使用threadLocal} finally {/*** 必须清理ThreadLocal的原因:* 1. ThreadLocalMap使用弱引用作为key,但value是强引用* 2. 如果线程长时间存活(线程池),value不会被GC回收* 3. 导致内存泄漏,特别是存储大对象时*/threadLocal.remove(); // 必须调用!}
}
6.2 最佳实践清单
- 线程命名:为所有线程设置有意义的名字
- 异常处理:在Runnable.run()中捕获所有异常
- 资源清理:使用try-finally或try-with-resources
- 中断响应:合理处理InterruptedException
- 锁顺序:统一锁获取顺序避免死锁
- 线程池:优先使用线程池而非直接创建线程
- volatile:仅用于简单的状态标志
- ThreadLocal清理:使用后必须调用remove()
6.3 性能调优建议
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| CPU密集型 | 线程数 = CPU核心数 + 1 | 减少线程切换开销 |
| IO密集型 | 线程数 = CPU核心数 × 2 | 充分利用等待时间 |
| 混合型 | 根据监控动态调整 | 结合实际负载 |
6.4 常见问题排查
- 死锁检测:使用jstack或ThreadMXBean
- 内存泄漏:检查ThreadLocal使用,特别是线程池场景
- CPU过高:检查是否存在忙等待或过多线程竞争
- 响应慢:检查锁竞争、IO阻塞或线程池配置
掌握这些Java并发编程的基础知识和最佳实践,能够帮助开发者构建出高性能、高可靠的多线程应用程序。记住,并发编程的核心在于正确的同步、合理的资源管理和清晰的线程通信。