# @Transactional多线程环境失效详解
# 为什么在多线程环境下会失效?
Spring 的事务管理是基于 ThreadLocal 实现的:
// Spring 使用 ThreadLocal 存储事务上下文
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
核心原理:每个线程都有自己的事务上下文,线程间不共享。
# 案例1:异步线程中事务失效
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
@Transactional
public void createUserInNewThread() {
new Thread(() -> {
// 注意:这个操作不在事务中!
userRepository.save(new User("异步用户"));
// 这里会抛出异常,但事务不会回滚
if (true) {
throw new RuntimeException("测试回滚");
}
}).start();
}
@Transactional
public void updateWithAsync() {
// 主线程的操作在事务中
userRepository.updateUser(1L, "主线程更新");
// 异步任务
CompletableFuture.runAsync(() -> {
// 注意:异步线程无法继承主线程的事务上下文
userRepository.save(new User("异步线程用户"));
throw new RuntimeException("异步异常"); // 不会回滚主事务
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 案例2:线程池中的事务传播问题
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private TransactionTemplate transactionTemplate;
@Transactional
public void batchProcessOrders(List<Order> orders) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (Order order : orders) {
executor.submit(() -> {
// 注意: 每个线程都在独立的事务中,无法参与外部事务
processSingleOrder(order);
});
}
}
@Transactional(propagation = Propagation.REQUIRED)
public void processSingleOrder(Order order) {
// 这个方法会在新线程中调用
// 会开启新的事务,而不是加入外部事务
orderRepository.save(order);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 案例3:@Async 注解的事务问题
@Service
public class ReportService {
@Autowired
private ReportRepository reportRepository;
@Transactional
@Async // Spring 的 @Async 会创建代理,在新线程执行
public void generateReport(Long reportId) {
// 注意: 虽然方法有 @Transactional,但可能在新线程中失效
// 因为事务管理器可能没有正确配置
Report report = reportRepository.findById(reportId);
report.setStatus("PROCESSING");
reportRepository.save(report);
// 长时间处理...
report.setStatus("COMPLETED");
reportRepository.save(report);
throw new RuntimeException("测试"); // 可能不会回滚
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 解决方案1)使用编程式事务管理
@Service
public class UserService {
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private UserRepository userRepository;
public void createUserInNewThread() {
new Thread(() -> {
// 在新线程中手动管理事务
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(definition);
try {
userRepository.save(new User("异步用户"));
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 解决方案2)使用 TransactionTemplate
@Service
public class OrderService {
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private OrderRepository orderRepository;
public void processInThreadPool() {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
executor.submit(() -> {
// 每个线程使用独立的事务
transactionTemplate.execute(status -> {
Order order = new Order("订单-" + index);
orderRepository.save(order);
if (index == 5) {
throw new RuntimeException("回滚测试");
}
return null;
});
});
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 解决方案3)配置 @Async 的事务传播
@Configuration
@EnableAsync
@EnableTransactionManagement
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("async-transaction-");
executor.initialize();
return executor;
}
@Bean
public AsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
@Service
public class ReportService {
@Autowired
private ReportRepository reportRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW) // 使用 REQUIRES_NEW
@Async("taskExecutor")
public CompletableFuture<Report> generateReport(Long reportId) {
return CompletableFuture.supplyAsync(() -> {
Report report = reportRepository.findById(reportId);
// ... 业务逻辑
return reportRepository.save(report);
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 解决方案4)使用分布式事务(复杂场景)
// 使用分布式事务框架,如 Seata
@GlobalTransactional // Seata 的全局事务注解
public void distributedTransaction() {
// 主业务
orderService.createOrder();
// 异步调用其他服务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(
() -> inventoryService.reduceStock());
CompletableFuture<Void> future2 = CompletableFuture.runAsync(
() -> accountService.deductBalance());
CompletableFuture.allOf(future1, future2).join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 解决方案5)设计模式-工作单元模式
这个使用场景比较多。值得记!
@Component
public class TransactionalWorker {
@Autowired // 自动注入用户数据仓库,用于数据库操作
private UserRepository userRepository;
/**
* 使用独立事务处理单个用户
* Propagation.REQUIRES_NEW 表示每次调用都会开启一个新的事务
* 即使当前存在事务,也会挂起当前事务,创建新事务
* @param user 要处理的用户对象
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void processWithTransaction(User user) {
userRepository.save(user); // 保存用户到数据库
// 这里可以添加其他业务逻辑
}
}
@Service
public class BatchService {
@Autowired // 自动注入事务处理器
private TransactionalWorker transactionalWorker;
/**
* 批量处理用户列表(使用多线程)
* @param users 要处理的用户列表
*/
public void batchProcess(List<User> users) {
// 创建固定大小的线程池(5个线程)
ExecutorService executor = Executors.newFixedThreadPool(5);
// ==== 核心代码解析开始 =====
List<Callable<Void>> tasks = users.stream() // 将用户列表转换为Stream
.map(user -> (Callable<Void>) () -> { // 关键:将每个用户映射为一个任务
// 每个任务在独立的事务中执行
transactionalWorker.processWithTransaction(user);
return null; // Callable需要返回值,这里返回null
})
.collect(Collectors.toList()); // 收集所有任务到List中
// ==== 核心代码解析结束 ====
try {
// 提交所有任务到线程池,并等待所有任务完成
executor.invokeAll(tasks);
} catch (InterruptedException e) {
// 如果线程被中断,恢复中断状态
Thread.currentThread().interrupt();
} finally {
// 实际代码中应该关闭线程池
executor.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
最终逻辑代码是怎么样执行的呢:
// 假设我们有一个User列表:[user1, user2, user3]
// map转换过程:
user1 → 任务1:保存user1到数据库
user2 → 任务2:保存user2到数据库
user3 → 任务3:保存user3到数据库
// 最终得到:
tasks = [任务1, 任务2, 任务3]
// 每个任务的执行代码:
任务1 = {
开启新事务();
userRepository.save(user1);
提交或回滚事务();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
关键点总结
map:将一个Stream<User>转换为Stream<Callable<Void>>- 嵌套Lambda:外层Lambda将User转换为任务,内层Lambda定义任务的具体执行逻辑
- 类型转换:
(Callable<Void>)帮助编译器确定Lambda类型 - 闭包:每个任务都能记住自己对应的user
- 延迟执行:任务定义时不执行,提交到线程池后才执行
user -> // 外层Lambda:接收User参数,返回Callable
(Callable<Void>) // 告诉编译器返回类型是Callable<Void>
() -> { // 内层Lambda:Callable的call()方法实现
transactionalWorker.processWithTransaction(user);
return null;
}
1
2
3
4
5
6
2
3
4
5
6
完整的嵌套Lambda类型的代码?
# 最佳实践建议
# 1. 明确事务边界
// 好:事务边界清晰
@Transactional
public void mainProcess() {
// 同步操作
syncOperation();
// 异步操作 - 独立事务
CompletableFuture.runAsync(() -> asyncOperation());
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void asyncOperation() {
// 独立事务
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2. 合理使用事务传播级别
// 根据不同场景选择传播级别
@Transactional(propagation = Propagation.NESTED) // 嵌套事务(如果支持)
@Transactional(propagation = Propagation.REQUIRES_NEW) // 总是新事务
@Transactional(propagation = Propagation.NOT_SUPPORTED) // 无事务运行
1
2
3
4
2
3
4
# 3. 监控和调试
// 添加事务监控
@Component
public class TransactionMonitor implements TransactionSynchronization {
@Override
public void afterCompletion(int status) {
// 记录事务完成状态
System.out.println("Transaction completed with status: " +
(status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"));
}
}
// 在事务方法中添加监控
@Transactional
public void monitoredMethod() {
TransactionSynchronizationManager.registerSynchronization(
new TransactionMonitor());
// 业务逻辑
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 4. 配置线程池事务传播
@Configuration
public class TransactionAwarePoolConfig {
@Bean
public Executor transactionAwareExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
// 使用事务感知的任务装饰器
executor.setTaskDecorator(new ContextCopyingTaskDecorator());
return executor;
}
}
// 自定义任务装饰器
public class ContextCopyingTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 复制事务上下文到新线程
Map<Object, Object> context = TransactionSynchronizationManager
.getResourceMap();
return () -> {
try {
// 在新线程中恢复上下文
context.forEach((key, value) ->
TransactionSynchronizationManager.bindResource(key, value));
runnable.run();
} finally {
// 清理
context.keySet().forEach(
TransactionSynchronizationManager::unbindResource);
}
};
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 总结
关键点:
- 根本原因:Spring 事务基于 ThreadLocal,线程间不共享
- 典型场景:异步任务、线程池、CompletableFuture、@Async
- 解决方案:
- 编程式事务管理
- TransactionTemplate
- 配置合理的传播级别
- 使用工作单元模式
- 考虑分布式事务(复杂场景)
选择建议:
- 简单场景 → TransactionTemplate
- 异步任务 → @Async + REQUIRES_NEW
- 批量处理 → 工作单元模式 + 线程池
- 分布式系统 → 分布式事务框架
记住:多线程事务本质上是"多个独立事务",而不是"一个事务跨多个线程"。