package com. demo. studydemo. test ; import java. util. ArrayList ;
import java. util. List ;
import java. util. concurrent. CountDownLatch ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class BatchDataProcessingExample { public static void main ( String [ ] args) throws InterruptedException { int dataSize = 1_000_000 ; int batchSize = 100_000 ; int threadCount = ( dataSize + batchSize - 1 ) / batchSize; System . out. println ( "需要启动" + threadCount + "个线程处理数据" ) ; List < Integer > data = new ArrayList < > ( dataSize) ; for ( int i = 0 ; i < dataSize; i++ ) { data. add ( i) ; } CountDownLatch latch = new CountDownLatch ( threadCount) ; ExecutorService executor = Executors . newFixedThreadPool ( threadCount) ; for ( int i = 0 ; i < threadCount; i++ ) { int start = i * batchSize; int end = Math . min ( start + batchSize, dataSize) ; List < Integer > subList = data. subList ( start, end) ; executor. submit ( new DataProcessor ( subList, latch) ) ; } latch. await ( ) ; System . out. println ( "所有数据处理完成" ) ; executor. shutdown ( ) ; } static class DataProcessor implements Runnable { private final List < Integer > dataList; private final CountDownLatch latch; public DataProcessor ( List < Integer > dataList, CountDownLatch latch) { this . dataList = dataList; this . latch = latch; } @Override public void run ( ) { processData ( dataList) ; latch. countDown ( ) ; System . out. println ( "当前线程" + Thread . currentThread ( ) . getName ( ) + "完成数据处理" ) ; } private void processData ( List < Integer > sublist) { System . out. println ( "线程 " + Thread . currentThread ( ) . getName ( ) + " 正在处理数据范围: " + sublist. get ( 0 ) + " 到 " + sublist. get ( sublist. size ( ) - 1 ) ) ; } }
}
在Java 中,通过`ExecutorService `接口创建的线程池提供了两种主要的方法来提交任务:`submit ( ) `和`execute ( ) `。这两种方法都可以用来执行`Runnable `或`Callable `任务,但它们之间存在一些差异:1. * * execute ( ) * * :- * * 参数* * :接受一个`Runnable `类型的任务。- * * 返回值* * :`void `,也就是说它不返回任何结果。- * * 异常处理* * :它不提供直接获取任务执行时抛出异常的机制。如果任务执行期间抛出了异常,它会被传递给线程池的`uncaughtExceptionHandler`处理,而不是直接暴露给调用者。- * * 用途* * :通常用于不需要关注任务执行结果的场景,或者通过其他方式(如Future 、回调等)来处理结果或异常。2. * * submit ( ) * * :- * * 参数* * :可以接受`Runnable `或`Callable `类型的任务。如果传入的是`Callable `,那么`submit ( ) `会返回一个`Future `对象。- * * 返回值* * :对于`Runnable `,返回一个表示任务执行完成的`Future < Void > `;对于`Callable `,返回一个`Future < T > `,其中`T `是`Callable `任务返回的结果类型。- * * 异常处理* * :如果任务执行期间抛出了异常,这个异常会被封装进返回的`Future `对象中,调用者可以通过`Future . get ( ) `方法来获取异常或结果,从而可以更精细地控制异常处理逻辑。- * * 用途* * :适用于需要获取任务执行结果或需要处理任务执行过程中可能抛出异常的场景。`Future `提供了检查任务是否完成、取消任务以及获取结果的方法。* * 总结* * :
- 如果你不需要关心任务的执行结果,或者任务没有结果需要返回,可以使用`execute ( ) `。
- 如果你需要获得任务的执行结果,或者需要能够取消任务、检查任务状态等高级功能,应该使用`submit ( ) `,尤其是当你提交的是`Callable `任务时。通过`submit ( ) `得到的`Future `对象为你提供了更多的灵活性和控制力。