引言:为什么需要多线程?
在当今多核处理器普及的时代,充分利用计算资源已成为提升程序性能的关键。Java多线程允许程序同时执行多个任务,显著提高应用程序的响应速度和处理能力。无论是Web服务器处理并发请求,还是大数据处理中的并行计算,多线程技术都扮演着重要角色。
一、线程基础概念
1.1 进程与线程的区别
// 简单比喻:进程是工厂,线程是工厂中的流水线 public class ProcessVsThread { public static void main(String[] args) { // 进程:拥有独立的内存空间 // 线程:共享进程的内存空间,轻量级执行单元 System.out.println("进程 - 独立内存空间,重量级"); System.out.println("线程 - 共享内存空间,轻量级"); } }1.2 Java线程的生命周期
新建(New) → 就绪(Runnable) → 运行(Running) → 阻塞(Blocked) ↓ ↑ └────── 等待(Waiting) ←──────┘ ↓ 超时等待(Timed Waiting) ↓ 终止(Terminated)
二、创建线程的三种方式
2.1 继承Thread类(不推荐)只能继承一个类
public class MyThread extends Thread { @Override public void run() { System.out.println("线程执行: " + Thread.currentThread().getName()); } public static void main(String[] args) { MyThread thread1 = new MyThread(); MyThread thread2 = new MyThread(); thread1.start(); // 启动线程 thread2.start(); } }2.2 实现Runnable接口(推荐)可以有多个接口也能继承类
public class MyRunnable implements Runnable { @Override public void run() { for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + ": " + i); try { Thread.sleep(500); // 模拟耗时操作 } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { Thread thread1 = new Thread(new MyRunnable(), "线程A"); Thread thread2 = new Thread(new MyRunnable(), "线程B"); thread1.start(); thread2.start(); } }2.3 实现Callable接口(带返回值)
import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; public class MyCallable implements Callable<Integer> { private final int number; public MyCallable(int number) { this.number = number; } @Override public Integer call() throws Exception { System.out.println("计算中... " + Thread.currentThread().getName()); Thread.sleep(1000); return number * number; // 返回计算结果 } public static void main(String[] args) throws Exception { FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable(5)); Thread thread = new Thread(futureTask, "计算线程"); thread.start(); // 获取计算结果(会阻塞直到计算完成) Integer result = futureTask.get(); System.out.println("计算结果: " + result); } }三、线程同步与线程安全
3.1 同步问题示例:银行取款
public class BankAccount { private double balance; public BankAccount(double initialBalance) { this.balance = initialBalance; } // 线程不安全的方法 public void unsafeWithdraw(double amount) { if (balance >= amount) { try { Thread.sleep(100); // 模拟处理时间 } catch (InterruptedException e) { e.printStackTrace(); } balance -= amount; System.out.println(Thread.currentThread().getName() + " 取款 " + amount + ",余额: " + balance); } } // 使用synchronized保证线程安全 public synchronized void safeWithdraw(double amount) { if (balance >= amount) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } balance -= amount; System.out.println(Thread.currentThread().getName() + " 取款 " + amount + ",余额: " + balance); } } }3.2 同步方法对比
public class SynchronizationDemo { private int counter = 0; // 1. 同步实例方法 public synchronized void incrementSyncMethod() { counter++; } // 2. 同步代码块 public void incrementSyncBlock() { synchronized(this) { counter++; } } // 3. 同步静态方法 public static synchronized void staticSyncMethod() { // 类级别的锁 } // 4. 使用Lock对象(更灵活) private final Lock lock = new ReentrantLock(); public void incrementWithLock() { lock.lock(); try { counter++; } finally { lock.unlock(); // 确保锁被释放 } } }3.3 volatile关键字
public class VolatileDemo { // volatile保证可见性,不保证原子性 private volatile boolean flag = true; public void stop() { flag = false; } public void work() { while (flag) { // 工作逻辑 } System.out.println("线程停止"); } }四、线程间通信
4.1 wait() 和 notify() 机制
public class ProducerConsumerExample { private final List<Integer> buffer = new LinkedList<>(); private final int CAPACITY = 5; public void produce() throws InterruptedException { int value = 0; while (true) { synchronized (this) { // 缓冲区满时等待 while (buffer.size() == CAPACITY) { wait(); } System.out.println("生产: " + value); buffer.add(value++); // 通知消费者 notify(); Thread.sleep(1000); } } } public void consume() throws InterruptedException { while (true) { synchronized (this) { // 缓冲区空时等待 while (buffer.isEmpty()) { wait(); } int value = buffer.remove(0); System.out.println("消费: " + value); // 通知生产者 notify(); Thread.sleep(1000); } } } }五、线程池(Thread Pool)
5.1 Executors工具类
import java.util.concurrent.*; public class ThreadPoolDemo { public static void main(String[] args) { // 1. 固定大小线程池 ExecutorService fixedPool = Executors.newFixedThreadPool(5); // 2. 单线程线程池 ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); // 3. 缓存线程池 ExecutorService cachedPool = Executors.newCachedThreadPool(); // 4. 定时任务线程池 ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3); // 提交任务 for (int i = 0; i < 10; i++) { final int taskId = i; fixedPool.execute(() -> { System.out.println("执行任务 " + taskId + ",线程: " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 优雅关闭线程池 fixedPool.shutdown(); try { if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) { fixedPool.shutdownNow(); } } catch (InterruptedException e) { fixedPool.shutdownNow(); } } }5.2 自定义线程池
public class CustomThreadPool { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // 核心线程数 5, // 最大线程数 60L, // 空闲线程存活时间 TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), // 任务队列 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 ); // 监控线程池状态 ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1); monitor.scheduleAtFixedRate(() -> { System.out.println("活跃线程数: " + executor.getActiveCount()); System.out.println("队列大小: " + executor.getQueue().size()); System.out.println("完成任务数: " + executor.getCompletedTaskCount()); }, 0, 1, TimeUnit.SECONDS); } }六、高级并发工具类
6.1 CountDownLatch(倒计时门闩)
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { int workerCount = 5; CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch endLatch = new CountDownLatch(workerCount); for (int i = 0; i < workerCount; i++) { new Thread(() -> { try { startLatch.await(); // 等待开始信号 System.out.println(Thread.currentThread().getName() + " 开始工作"); Thread.sleep((long) (Math.random() * 1000)); System.out.println(Thread.currentThread().getName() + " 完成工作"); endLatch.countDown(); // 任务完成 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } System.out.println("准备开始所有工作..."); Thread.sleep(1000); startLatch.countDown(); // 发出开始信号 endLatch.await(); // 等待所有任务完成 System.out.println("所有工作完成!"); } }6.2 CyclicBarrier(循环屏障)
public class CyclicBarrierDemo { public static void main(String[] args) { int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> System.out.println("所有线程到达屏障,继续执行")); for (int i = 0; i < threadCount; i++) { final int threadId = i; new Thread(() -> { try { System.out.println("线程" + threadId + " 准备阶段1"); Thread.sleep(1000); barrier.await(); System.out.println("线程" + threadId + " 准备阶段2"); Thread.sleep(1000); barrier.await(); System.out.println("线程" + threadId + " 完成"); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }6.3 Semaphore(信号量)
public class SemaphoreDemo { public static void main(String[] args) { // 模拟3个停车位 Semaphore semaphore = new Semaphore(3); // 10辆车需要停车 for (int i = 1; i <= 10; i++) { final int carId = i; new Thread(() -> { try { System.out.println("车辆" + carId + " 等待停车"); semaphore.acquire(); System.out.println("车辆" + carId + " 停车成功"); Thread.sleep(2000); // 停车时间 System.out.println("车辆" + carId + " 离开"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }七、并发集合
7.1 常用并发集合类
public class ConcurrentCollections { public static void main(String[] args) { // 1. ConcurrentHashMap(线程安全的HashMap) ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>(); concurrentMap.put("key1", 1); concurrentMap.putIfAbsent("key1", 2); // 不存在时才添加 // 2. CopyOnWriteArrayList(读多写少的场景) CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>(); copyOnWriteList.add("item1"); copyOnWriteList.addIfAbsent("item1"); // 线程安全的添加 // 3. ConcurrentLinkedQueue(无界线程安全队列) ConcurrentLinkedQueue<String> concurrentQueue = new ConcurrentLinkedQueue<>(); concurrentQueue.offer("task1"); String task = concurrentQueue.poll(); // 4. BlockingQueue(阻塞队列) BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10); try { blockingQueue.put("element"); // 阻塞直到有空间 String element = blockingQueue.take(); // 阻塞直到有元素 } catch (InterruptedException e) { e.printStackTrace(); } } }八、最佳实践与常见问题
8.1 多线程最佳实践
public class BestPractices { // 1. 使用线程池而不是直接创建线程 private final ExecutorService executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); // 2. 使用局部变量,避免共享状态 public void process() { String localVariable = "安全"; // 线程安全 // ... } // 3. 使用不可变对象 public final class ImmutablePerson { private final String name; private final int age; public ImmutablePerson(String name, int age) { this.name = name; this.age = age; } // 只有getter,没有setter } // 4. 避免死锁 public void avoidDeadlock() { // 按固定顺序获取锁 Object lock1 = new Object(); Object lock2 = new Object(); Thread thread1 = new Thread(() -> { synchronized (lock1) { try { Thread.sleep(100); } catch (InterruptedException e) {} synchronized (lock2) { // 操作 } } }); Thread thread2 = new Thread(() -> { synchronized (lock1) { // 与thread1相同的顺序 try { Thread.sleep(100); } catch (InterruptedException e) {} synchronized (lock2) { // 操作 } } }); } }8.2 常见问题与调试
public class ThreadDebugging { // 1. 线程死锁检测 public static void detectDeadlock() { ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); long[] threadIds = threadMXBean.findDeadlockedThreads(); if (threadIds != null) { ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadIds); for (ThreadInfo threadInfo : threadInfos) { System.out.println("死锁线程: " + threadInfo.getThreadName()); } } } // 2. 线程堆栈分析 public static void printAllStackTraces() { Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces(); for (Map.Entry<Thread, StackTraceElement[]> entry : allStackTraces.entrySet()) { Thread thread = entry.getKey(); System.out.println("\n线程: " + thread.getName() + " - 状态: " + thread.getState()); for (StackTraceElement element : entry.getValue()) { System.out.println(" " + element); } } } // 3. 使用ThreadLocal private static final ThreadLocal<SimpleDateFormat> dateFormatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); public static String formatDate(Date date) { return dateFormatter.get().format(date); } }九、CompletableFuture(Java 8+)
public class CompletableFutureDemo { public static void main(String[] args) throws Exception { // 1. 创建异步任务 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "结果1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "结果2"; }); // 2. 组合多个Future CompletableFuture<String> combined = future1.thenCombine(future2, (result1, result2) -> result1 + " + " + result2); // 3. 异步回调 combined.thenAccept(result -> { System.out.println("最终结果: " + result); }); // 4. 异常处理 CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("模拟异常"); } return "成功"; }).exceptionally(ex -> { System.out.println("处理异常: " + ex.getMessage()); return "默认值"; }); // 等待所有任务完成 CompletableFuture.allOf(future1, future2).join(); } }十、实战案例:并发下载器
public class ConcurrentDownloader { private final ExecutorService executor; private final int maxConcurrentDownloads; public ConcurrentDownloader(int maxConcurrentDownloads) { this.maxConcurrentDownloads = maxConcurrentDownloads; this.executor = Executors.newFixedThreadPool(maxConcurrentDownloads); } public void downloadFiles(List<String> urls) { CountDownLatch latch = new CountDownLatch(urls.size()); List<Future<File>> futures = new ArrayList<>(); for (String url : urls) { Callable<File> downloadTask = () -> { try { System.out.println("开始下载: " + url); // 模拟下载过程 Thread.sleep((long) (Math.random() * 3000)); System.out.println("下载完成: " + url); return new File(url.substring(url.lastIndexOf('/') + 1)); } finally { latch.countDown(); } }; futures.add(executor.submit(downloadTask)); } try { // 等待所有下载完成 latch.await(); System.out.println("所有文件下载完成"); // 处理下载结果 for (Future<File> future : futures) { try { File file = future.get(); // 处理文件 } catch (ExecutionException e) { System.err.println("下载失败: " + e.getCause()); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void shutdown() { executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); } } }结语
Java多线程编程是一个强大但需要谨慎使用的工具。掌握多线程技术需要理解:
基础概念:线程生命周期、同步机制
高级特性:线程池、并发工具类
最佳实践:避免死锁、减少锁竞争
调试技巧:分析线程转储、监控线程状态
在实际开发中,建议:
优先使用高层并发工具(如Executor框架)
尽量使用不可变对象和线程安全集合
合理设置线程池参数
编写可测试的并发代码
记住:始终将线程安全放在首位。