Java并发包 java.util.concurrent
深入解析
简介
在多线程编程的领域中,Java提供了强大的 java.util.concurrent
包(以下简称 concurrent
包),它包含了一系列用于处理并发编程的工具和类。这个包极大地简化了开发人员在编写多线程应用程序时面临的复杂任务,例如线程池管理、并发集合操作、同步控制等。本文将深入探讨 java.util.concurrent
的基础概念、使用方法、常见实践以及最佳实践,帮助读者更好地理解和运用这一强大的工具集。
目录
- 基础概念
- 线程池
- 并发集合
- 同步工具
- 使用方法
- 线程池的使用
- 并发集合的操作
- 同步工具的应用
- 常见实践
- 任务并行化
- 生产者 - 消费者模型
- 最佳实践
- 合理使用线程池
- 避免死锁
- 选择合适的并发集合
- 小结
- 参考资料
基础概念
线程池
线程池是 concurrent
包中一个核心的概念。它预先创建一定数量的线程,并将这些线程放入一个线程池中进行管理。当有任务提交时,线程池会从池中取出一个空闲线程来执行该任务。这样可以避免频繁创建和销毁线程带来的性能开销,提高系统的并发处理能力。
并发集合
concurrent
包提供了一系列线程安全的集合类,称为并发集合。这些集合在多线程环境下可以安全地进行读写操作,无需额外的同步机制。常见的并发集合包括 ConcurrentHashMap
、CopyOnWriteArrayList
等。
同步工具
同步工具用于协调多个线程之间的执行顺序和共享资源的访问。例如,CountDownLatch
可以让一个或多个线程等待直到一组操作完成;CyclicBarrier
可以使一组线程互相等待,直到所有线程都到达某个屏障点。
使用方法
线程池的使用
以下是一个使用 ThreadPoolExecutor
创建线程池并提交任务的简单示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池,包含3个线程
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " has finished");
});
}
// 关闭线程池
executorService.shutdown();
}
}
并发集合的操作
以 ConcurrentHashMap
为例,以下是在多线程环境下使用它的示例:
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
map.put("key" + i, i);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 10; i < 20; i++) {
map.put("key" + i, i);
}
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(map);
}
}
同步工具的应用
使用 CountDownLatch
实现让主线程等待所有子线程完成任务:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskNumber = i;
new Thread(() -> {
System.out.println("Task " + taskNumber + " started");
try {
// 模拟任务执行
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " finished");
latch.countDown();
}).start();
}
try {
// 主线程等待所有子线程完成
latch.await();
System.out.println("All tasks have finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
常见实践
任务并行化
在处理大量独立任务时,可以将任务分配到线程池中并行执行,以提高处理效率。例如,对一个大数组中的每个元素进行独立的计算操作:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TaskParallelizationExample {
public static void main(String[] args) {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int number : numbers) {
final int num = number;
executorService.submit(() -> {
int result = num * num;
System.out.println(num + " squared is " + result);
});
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
生产者 - 消费者模型
使用 BlockingQueue
实现生产者 - 消费者模型:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("Produced: " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Integer item = queue.take();
System.out.println("Consumed: " + item);
if (item == 9) {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最佳实践
合理使用线程池
- 根据任务的类型(CPU 密集型、I/O 密集型)和数量来选择合适的线程池类型和大小。
- 避免创建过多线程导致系统资源耗尽,也不要线程数量过少导致任务处理效率低下。
避免死锁
- 确保线程获取锁的顺序一致,避免循环依赖锁。
- 使用
Lock
接口的tryLock
方法来尝试获取锁,设置合理的超时时间,防止线程无限期等待。
选择合适的并发集合
- 根据实际需求选择合适的并发集合,例如
ConcurrentHashMap
适用于高并发读写的场景,CopyOnWriteArrayList
适用于读多写少的场景。
小结
java.util.concurrent
包为 Java 开发者提供了丰富的工具和类,用于处理多线程编程中的各种复杂问题。通过深入理解线程池、并发集合和同步工具等基础概念,并掌握它们的使用方法和常见实践,以及遵循最佳实践原则,开发人员可以编写出高效、安全且易于维护的多线程应用程序。
参考资料
- Java官方文档 - java.util.concurrent
- 《Effective Java》第三版
- 《Java并发编程实战》