# @Transactional多线程环境失效详解

# 为什么在多线程环境下会失效?

Spring 的事务管理是基于 ThreadLocal 实现的:

// Spring 使用 ThreadLocal 存储事务上下文
public abstract class TransactionSynchronizationManager {
    private static final ThreadLocal<Map<Object, Object>> resources = 
        new NamedThreadLocal<>("Transactional resources");
    
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = 
        new NamedThreadLocal<>("Transaction synchronizations");
    
    private static final ThreadLocal<String> currentTransactionName = 
        new NamedThreadLocal<>("Current transaction name");
}
1
2
3
4
5
6
7
8
9
10
11

核心原理:每个线程都有自己的事务上下文,线程间不共享。

# 案例1:异步线程中事务失效

@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Transactional
    public void createUserInNewThread() {
        new Thread(() -> {
            // 注意:这个操作不在事务中!
            userRepository.save(new User("异步用户"));
            
            // 这里会抛出异常,但事务不会回滚
            if (true) {
                throw new RuntimeException("测试回滚");
            }
        }).start();
    }
    
    @Transactional
    public void updateWithAsync() {
        // 主线程的操作在事务中
        userRepository.updateUser(1L, "主线程更新");
        
        // 异步任务
        CompletableFuture.runAsync(() -> {
            // 注意:异步线程无法继承主线程的事务上下文
            userRepository.save(new User("异步线程用户"));
            throw new RuntimeException("异步异常"); // 不会回滚主事务
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 案例2:线程池中的事务传播问题

@Service
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private TransactionTemplate transactionTemplate;
    
    @Transactional
    public void batchProcessOrders(List<Order> orders) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (Order order : orders) {
            executor.submit(() -> {
                // 注意: 每个线程都在独立的事务中,无法参与外部事务
                processSingleOrder(order);
            });
        }
    }
    
    @Transactional(propagation = Propagation.REQUIRED)
    public void processSingleOrder(Order order) {
        // 这个方法会在新线程中调用
        // 会开启新的事务,而不是加入外部事务
        orderRepository.save(order);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 案例3:@Async 注解的事务问题

@Service
public class ReportService {
    
    @Autowired
    private ReportRepository reportRepository;
    
    @Transactional
    @Async  // Spring 的 @Async 会创建代理,在新线程执行
    public void generateReport(Long reportId) {
        // 注意: 虽然方法有 @Transactional,但可能在新线程中失效
        // 因为事务管理器可能没有正确配置
        
        Report report = reportRepository.findById(reportId);
        report.setStatus("PROCESSING");
        reportRepository.save(report);
        
        // 长时间处理...
        
        report.setStatus("COMPLETED");
        reportRepository.save(report);
        
        throw new RuntimeException("测试"); // 可能不会回滚
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 解决方案1)使用编程式事务管理

@Service
public class UserService {
    
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private UserRepository userRepository;
    
    public void createUserInNewThread() {
        new Thread(() -> {
            // 在新线程中手动管理事务
            DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
            definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
            
            TransactionStatus status = transactionManager.getTransaction(definition);
            
            try {
                userRepository.save(new User("异步用户"));
                transactionManager.commit(status);
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw e;
            }
        }).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 解决方案2)使用 TransactionTemplate

@Service
public class OrderService {
    
    @Autowired
    private TransactionTemplate transactionTemplate;
    @Autowired
    private OrderRepository orderRepository;
    
    public void processInThreadPool() {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.submit(() -> {
                // 每个线程使用独立的事务
                transactionTemplate.execute(status -> {
                    Order order = new Order("订单-" + index);
                    orderRepository.save(order);
                    
                    if (index == 5) {
                        throw new RuntimeException("回滚测试");
                    }
                    return null;
                });
            });
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 解决方案3)配置 @Async 的事务传播

@Configuration
@EnableAsync
@EnableTransactionManagement
public class AsyncConfig {
    
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("async-transaction-");
        executor.initialize();
        return executor;
    }
    
    @Bean
    public AsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

@Service
public class ReportService {
    
    @Autowired
    private ReportRepository reportRepository;
    
    @Transactional(propagation = Propagation.REQUIRES_NEW)  // 使用 REQUIRES_NEW
    @Async("taskExecutor")
    public CompletableFuture<Report> generateReport(Long reportId) {
        return CompletableFuture.supplyAsync(() -> {
            Report report = reportRepository.findById(reportId);
            // ... 业务逻辑
            return reportRepository.save(report);
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 解决方案4)使用分布式事务(复杂场景)

// 使用分布式事务框架,如 Seata
@GlobalTransactional  // Seata 的全局事务注解
public void distributedTransaction() {
    // 主业务
    orderService.createOrder();
    
    // 异步调用其他服务
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(
        () -> inventoryService.reduceStock());
    
    CompletableFuture<Void> future2 = CompletableFuture.runAsync(
        () -> accountService.deductBalance());
    
    CompletableFuture.allOf(future1, future2).join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 解决方案5)设计模式-工作单元模式

这个使用场景比较多。值得记!

@Component
public class TransactionalWorker {
    
    @Autowired  // 自动注入用户数据仓库,用于数据库操作
    private UserRepository userRepository;
    
    /**
     * 使用独立事务处理单个用户
     * Propagation.REQUIRES_NEW 表示每次调用都会开启一个新的事务
     * 即使当前存在事务,也会挂起当前事务,创建新事务
     * @param user 要处理的用户对象
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void processWithTransaction(User user) {
        userRepository.save(user);  // 保存用户到数据库
        // 这里可以添加其他业务逻辑
    }
}

@Service
public class BatchService {
    
    @Autowired  // 自动注入事务处理器
    private TransactionalWorker transactionalWorker;
    
    /**
     * 批量处理用户列表(使用多线程)
     * @param users 要处理的用户列表
     */
    public void batchProcess(List<User> users) {
        // 创建固定大小的线程池(5个线程)
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // ==== 核心代码解析开始 =====
        List<Callable<Void>> tasks = users.stream()  // 将用户列表转换为Stream
            .map(user -> (Callable<Void>) () -> {    // 关键:将每个用户映射为一个任务
                // 每个任务在独立的事务中执行
                transactionalWorker.processWithTransaction(user);
                return null;  // Callable需要返回值,这里返回null
            })
            .collect(Collectors.toList());  // 收集所有任务到List中
        // ==== 核心代码解析结束 ====
        
        try {
            // 提交所有任务到线程池,并等待所有任务完成
            executor.invokeAll(tasks);
        } catch (InterruptedException e) {
            // 如果线程被中断,恢复中断状态
            Thread.currentThread().interrupt();
        } finally {
            // 实际代码中应该关闭线程池
            executor.shutdown();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

最终逻辑代码是怎么样执行的呢:

// 假设我们有一个User列表:[user1, user2, user3]

// map转换过程:
user1 → 任务1:保存user1到数据库
user2 → 任务2:保存user2到数据库  
user3 → 任务3:保存user3到数据库

// 最终得到:
tasks = [任务1, 任务2, 任务3]

// 每个任务的执行代码:
任务1 = {
    开启新事务();
    userRepository.save(user1);
    提交或回滚事务();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

关键点总结

  1. map:将一个 Stream<User> 转换为 Stream<Callable<Void>>
  2. 嵌套Lambda:外层Lambda将User转换为任务,内层Lambda定义任务的具体执行逻辑
  3. 类型转换(Callable<Void>) 帮助编译器确定Lambda类型
  4. 闭包:每个任务都能记住自己对应的user
  5. 延迟执行:任务定义时不执行,提交到线程池后才执行
user ->  // 外层Lambda:接收User参数,返回Callable
    (Callable<Void>)  // 告诉编译器返回类型是Callable<Void>
    () -> {           // 内层Lambda:Callable的call()方法实现
        transactionalWorker.processWithTransaction(user);
        return null;
    }
1
2
3
4
5
6

完整的嵌套Lambda类型的代码?

# 最佳实践建议

# 1. 明确事务边界

// 好:事务边界清晰
@Transactional
public void mainProcess() {
    // 同步操作
    syncOperation();
    
    // 异步操作 - 独立事务
    CompletableFuture.runAsync(() -> asyncOperation());
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void asyncOperation() {
    // 独立事务
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 2. 合理使用事务传播级别

// 根据不同场景选择传播级别
@Transactional(propagation = Propagation.NESTED)      // 嵌套事务(如果支持)
@Transactional(propagation = Propagation.REQUIRES_NEW) // 总是新事务
@Transactional(propagation = Propagation.NOT_SUPPORTED) // 无事务运行
1
2
3
4

# 3. 监控和调试

// 添加事务监控
@Component
public class TransactionMonitor implements TransactionSynchronization {
    
    @Override
    public void afterCompletion(int status) {
        // 记录事务完成状态
        System.out.println("Transaction completed with status: " + 
            (status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK"));
    }
}

// 在事务方法中添加监控
@Transactional
public void monitoredMethod() {
    TransactionSynchronizationManager.registerSynchronization(
        new TransactionMonitor());
    // 业务逻辑
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 4. 配置线程池事务传播

@Configuration
public class TransactionAwarePoolConfig {
    
    @Bean
    public Executor transactionAwareExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        
        // 使用事务感知的任务装饰器
        executor.setTaskDecorator(new ContextCopyingTaskDecorator());
        return executor;
    }
}

// 自定义任务装饰器
public class ContextCopyingTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        // 复制事务上下文到新线程
        Map<Object, Object> context = TransactionSynchronizationManager
            .getResourceMap();
        return () -> {
            try {
                // 在新线程中恢复上下文
                context.forEach((key, value) -> 
                    TransactionSynchronizationManager.bindResource(key, value));
                runnable.run();
            } finally {
                // 清理
                context.keySet().forEach(
                    TransactionSynchronizationManager::unbindResource);
            }
        };
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 总结

关键点

  1. 根本原因:Spring 事务基于 ThreadLocal,线程间不共享
  2. 典型场景:异步任务、线程池、CompletableFuture、@Async
  3. 解决方案
    • 编程式事务管理
    • TransactionTemplate
    • 配置合理的传播级别
    • 使用工作单元模式
    • 考虑分布式事务(复杂场景)

选择建议

  • 简单场景 → TransactionTemplate
  • 异步任务 → @Async + REQUIRES_NEW
  • 批量处理 → 工作单元模式 + 线程池
  • 分布式系统 → 分布式事务框架

记住:多线程事务本质上是"多个独立事务",而不是"一个事务跨多个线程"。

Last Updated: 12/4/2025, 10:23:47 AM