结构化并发

结构化并发:让并发编程重回简单与可靠

什么是结构化并发?

结构化并发是一种编程范式,它要求并发任务(如线程、协程)必须在明确的作用域内开始和结束,其中外部作用域的生命周期决定其内部所有并发子任务的生命周期。

核心思想:父作用域在所有子任务完成之前不会结束,任何子任务都不会超出父作用域的生命周期。

为什么需要结构化并发?

传统并发编程的问题

先看一个典型的非结构化并发示例:

java

1
2
3
4
5
6
7
8
9
10
11
12
// 问题代码:任务泄漏
void processOrder() {
// 启动后台线程发送通知
new Thread(() -> {
sendEmailNotification(); // 可能执行很长时间
}).start();

processPayment();
updateInventory();
System.out.println("Order processed!");
// 方法结束,但邮件线程可能仍在运行!
}

非结构化并发的主要缺陷

  1. 资源泄漏:无人管理的线程持续消耗系统资源
  2. 任务泄漏:父任务结束后,子任务像”幽灵”一样继续运行
  3. 难以维护:无法清晰追踪系统中存在的并发任务
  4. 取消困难:需要手动管理线程中断,容易出错
  5. 调试困难:并发流程不清晰,问题难以定位

结构化并发如何解决问题?

核心机制

结构化并发通过作用域绑定来解决这些问题:

  • 父作用域作为管理者
  • 子任务作为工作者
  • 严格的父子关系确保完整的生命周期管理

生动比喻:幼儿园老师

  • 非结构化并发:老师下班回家,孩子们在操场无人看管
  • 结构化并发:老师全程监督,确保所有孩子集合后才一起离开

实际代码示例

Java 中的结构化并发(JDK 19+)

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.*;

public class StructuredConcurrencyExample {

public OrderResult processOrderStructured(Order order)
throws ExecutionException, InterruptedException {

// 创建明确的作用域
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

// 在作用域内启动并发任务
Future<PaymentResult> paymentFuture =
scope.fork(() -> processPayment(order));
Future<InventoryUpdate> inventoryFuture =
scope.fork(() -> updateInventory(order));
Future<NotificationResult> notificationFuture =
scope.fork(() -> sendNotification(order));

// 等待所有任务完成或任何一个失败
scope.join();
scope.throwIfFailed(); // 如果有任务失败,抛出异常

// 安全地获取结果
PaymentResult payment = paymentFuture.resultNow();
InventoryUpdate inventory = inventoryFuture.resultNow();
NotificationResult notification = notificationFuture.resultNow();

return new OrderResult(payment, inventory, notification);

} // 作用域结束,自动确保所有任务完成
}
}

Kotlin 协程示例

kotlin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
suspend fun processOrder(order: Order): OrderResult = coroutineScope {
// 所有并发任务都在这个作用域内
val paymentDeferred = async { processPayment(order) }
val inventoryDeferred = async { updateInventory(order) }
val notificationDeferred = async { sendNotification(order) }

// 自动等待所有任务完成
OrderResult(
payment = paymentDeferred.await(),
inventory = inventoryDeferred.await(),
notification = notificationDeferred.await()
)
// 作用域结束,所有子协程自动结束
}

关键特性与优势

1. 自动生命周期管理

java

1
2
3
4
5
6
7
try (var scope = new StructuredTaskScope<>()) {
Future<String> task1 = scope.fork(() -> doWork1());
Future<String> task2 = scope.fork(() -> doWork2());

scope.join();
// 明确的任务边界 - 不会有意外的后台任务
}

2. 可靠的错误处理

  • 自动传播:子任务失败自动向上传播
  • 协同取消:一个任务失败,自动取消相关任务
  • 统一处理:集中的异常处理点

3. 资源安全

java

1
2
3
4
// 使用 try-with-resources 确保资源清理
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发任务...
} // 自动清理所有资源

4. 可观测性

  • 清晰的调用链路
  • 明确的任务关系
  • 简化的监控和调试

不同语言的实现

Java

  • 机制StructuredTaskScope (JDK 19+)
  • 特点:显式作用域管理,强类型
  • 适用:服务端应用,大型系统

Kotlin

  • 机制:协程 + coroutineScope 构建器
  • 特点:语言级原生支持,语法简洁
  • 适用:Android,全栈开发

Go

  • 机制:goroutine + context + sync.WaitGroup
  • 特点:轻量级,需手动实现模式
  • 适用:网络服务,CLI工具

Swift

  • 机制:Async/Await + TaskGroup
  • 特点:Apple生态系统,类型安全
  • 适用:iOS/macOS应用

最佳实践

1. 合理设计作用域

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 根据业务逻辑划分作用域
public class OrderService {
public CompletableFuture<OrderResult> processComplexOrder(Order order) {
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure()) {

// 第一阶段:验证和基础处理
try (var validationScope = new StructuredTaskScope<>()) {
Future<Boolean> stockCheck = validationScope.fork(() -> checkStock(order));
Future<Boolean> fraudCheck = validationScope.fork(() -> fraudDetection(order));
validationScope.join();
}

// 第二阶段:执行核心业务
Future<Payment> payment = outerScope.fork(() -> processPayment(order));
Future<Shipping> shipping = outerScope.fork(() -> arrangeShipping(order));

outerScope.join();
return CompletableFuture.completedFuture(combineResults(payment, shipping));
}
}
}

2. 合理的超时和取消

java

1
2
3
4
5
6
7
8
9
10
11
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> result = scope.fork(() -> callExternalService());

// 设置超时
scope.joinUntil(Instant.now().plusSeconds(30));

if (!result.isDone()) {
scope.shutdown(); // 触发取消
throw new TimeoutException("Operation timed out");
}
}

3. 错误处理策略

  • 快速失败:使用 ShutdownOnFailure
  • 首个成功:使用 ShutdownOnSuccess
  • 自定义策略:根据业务需求定制

应用场景

1. Web 请求处理

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
public class OrderController {

@PostMapping("/orders")
public ResponseEntity<OrderResponse> createOrder(@RequestBody OrderRequest request) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<ValidationResult> validation = scope.fork(() -> validateRequest(request));
Future<PriceCalculation> pricing = scope.fork(() -> calculatePrice(request));
Future<InventoryCheck> inventory = scope.fork(() -> checkInventory(request));

scope.join();
scope.throwIfFailed();

return ResponseEntity.ok(buildResponse(validation, pricing, inventory));
} catch (Exception e) {
return ResponseEntity.badRequest().build();
}
}
}

2. 批量数据处理

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class BatchProcessor {
public void processBatch(List<DataItem> items) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

List<Future<ProcessResult>> futures = items.stream()
.map(item -> scope.fork(() -> processItem(item)))
.collect(Collectors.toList());

scope.join();

List<ProcessResult> results = futures.stream()
.map(Future::resultNow)
.collect(Collectors.toList());

// 处理结果...
}
}
}

总结

结构化并发通过简单的原则——将并发任务的生命周期与代码作用域绑定,解决了传统并发编程中的根本问题:

带来的变革

  • 消除任务泄漏 - 没有意外的后台任务
  • 自动化资源管理 - 无需手动清理
  • 简化错误处理 - 统一的异常传播机制
  • 提升代码可读性 - 清晰的并发结构
  • 增强系统可靠性 - 确定性的行为

核心价值

结构化并发让并发编程重新变得简单可靠,使开发者能够像编写顺序代码一样自然地编写并发代码,同时享受并发带来的性能优势。

关键洞见:通过约束带来自由。结构化的约束反而让复杂的并发编程变得简单可控。


结构化并发
http://example.com/2025/11/11/architect/结构化并发/
作者
Holy
发布于
2025年11月11日
许可协议