高并发与性能优化:构建高性能的分布式事务系统
2025/9/1大约 12 分钟
高并发与性能优化:构建高性能的分布式事务系统
在互联网时代,高并发场景下的系统性能优化成为了每个技术团队必须面对的挑战。分布式事务系统由于其复杂的协调机制和跨服务调用特性,在高并发场景下面临着更大的性能压力。本章将深入探讨分布式事务系统在高并发环境下的性能优化策略,包括锁粒度优化、批量提交、异步补偿、数据分片等关键技术。
事务锁粒度优化
锁粒度对性能的影响
在分布式事务中,锁的粒度直接影响系统的并发性能。过粗的锁粒度会导致大量请求阻塞,降低系统吞吐量;过细的锁粒度虽然能提高并发性,但会增加锁管理的复杂度和开销。
数据库锁优化
行级锁优化
@Repository
public class OptimizedOrderRepository {
/**
* 优化前:可能锁定整个表
*/
@Transactional
public void updateOrderStatusOld(String orderId, OrderStatus status) {
// 这种写法可能导致表级锁
String sql = "UPDATE orders SET status = ? WHERE order_id = ?";
jdbcTemplate.update(sql, status.name(), orderId);
}
/**
* 优化后:使用行级锁
*/
@Transactional
public Order updateOrderStatus(String orderId, OrderStatus status) {
// 使用主键查询,确保行级锁
String selectSql = "SELECT * FROM orders WHERE order_id = ? FOR UPDATE";
Order order = jdbcTemplate.queryForObject(selectSql,
new BeanPropertyRowMapper<>(Order.class), orderId);
// 更新状态
String updateSql = "UPDATE orders SET status = ?, updated_time = ? WHERE order_id = ?";
jdbcTemplate.update(updateSql, status.name(), new Date(), orderId);
return order;
}
}
索引优化
-- 优化前:缺少合适的索引
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL,
user_id VARCHAR(64) NOT NULL,
status VARCHAR(20) NOT NULL,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 优化后:添加合适的索引
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL UNIQUE,
user_id VARCHAR(64) NOT NULL,
status VARCHAR(20) NOT NULL,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_status (status),
INDEX idx_created_time (created_time)
);
分布式锁优化
锁粒度控制
@Component
public class FineGrainedDistributedLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 粗粒度锁:锁定整个业务类型
*/
public boolean acquireCoarseLock(String businessType, long timeout) {
String lockKey = "lock:" + businessType;
return redisTemplate.opsForValue().setIfAbsent(lockKey, "1", timeout, TimeUnit.SECONDS);
}
/**
* 细粒度锁:锁定具体的业务实例
*/
public boolean acquireFineLock(String businessType, String businessId, long timeout) {
String lockKey = "lock:" + businessType + ":" + businessId;
return redisTemplate.opsForValue().setIfAbsent(lockKey, "1", timeout, TimeUnit.SECONDS);
}
/**
* 资源级锁:锁定具体的资源
*/
public boolean acquireResourceLock(String resourceType, String resourceId, long timeout) {
String lockKey = "lock:resource:" + resourceType + ":" + resourceId;
return redisTemplate.opsForValue().setIfAbsent(lockKey, "1", timeout, TimeUnit.SECONDS);
}
}
锁超时与重试机制
@Service
public class LockWithRetryService {
@Autowired
private FineGrainedDistributedLock distributedLock;
public <T> T executeWithLock(String lockKey, int maxRetries, long retryDelay,
Supplier<T> operation) {
for (int i = 0; i < maxRetries; i++) {
if (distributedLock.acquireFineLock(lockKey, 30)) {
try {
return operation.get();
} finally {
// 释放锁
distributedLock.releaseLock(lockKey);
}
}
// 等待后重试
try {
Thread.sleep(retryDelay + new Random().nextInt(100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Operation interrupted", e);
}
}
throw new RuntimeException("Failed to acquire lock after " + maxRetries + " retries");
}
}
批量提交与异步补偿
批量事务处理
批量处理是提高系统吞吐量的重要手段,通过将多个小事务合并为一个大事务,可以显著减少事务协调的开销。
批量订单处理示例
@Service
public class BatchOrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private InventoryService inventoryService;
@Autowired
private AccountService accountService;
/**
* 批量创建订单
*/
@GlobalTransactional
public List<Order> batchCreateOrders(List<CreateOrderRequest> requests) {
List<Order> orders = new ArrayList<>();
try {
// 1. 批量预留库存
Map<String, Integer> inventoryRequirements = requests.stream()
.collect(Collectors.toMap(
CreateOrderRequest::getProductId,
CreateOrderRequest::getQuantity,
Integer::sum
));
inventoryService.batchReserve(inventoryRequirements);
// 2. 批量创建订单
for (CreateOrderRequest request : requests) {
Order order = new Order();
order.setOrderId(request.getOrderId());
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
orders.add(order);
}
// 3. 批量扣减账户余额
Map<String, BigDecimal> accountDebits = requests.stream()
.collect(Collectors.toMap(
CreateOrderRequest::getUserId,
CreateOrderRequest::getAmount,
BigDecimal::add
));
accountService.batchDebit(accountDebits);
return orders;
} catch (Exception e) {
// 批量回滚已在@GlobalTransactional中处理
throw new BatchOrderProcessingException("批量创建订单失败", e);
}
}
}
批量数据库操作优化
@Repository
public class OptimizedBatchRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 批量插入优化
*/
public void batchInsertOrders(List<Order> orders) {
String sql = "INSERT INTO orders (order_id, user_id, product_id, quantity, amount, status) " +
"VALUES (?, ?, ?, ?, ?, ?)";
// 使用批处理
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Order order = orders.get(i);
ps.setString(1, order.getOrderId());
ps.setString(2, order.getUserId());
ps.setString(3, order.getProductId());
ps.setInt(4, order.getQuantity());
ps.setBigDecimal(5, order.getAmount());
ps.setString(6, order.getStatus().name());
}
@Override
public int getBatchSize() {
return orders.size();
}
});
}
/**
* 批量更新优化
*/
public void batchUpdateOrderStatus(List<String> orderIds, OrderStatus status) {
String sql = "UPDATE orders SET status = ?, updated_time = ? WHERE order_id = ?";
// 分批处理,避免单次SQL过长
int batchSize = 1000;
for (int i = 0; i < orderIds.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, orderIds.size());
List<String> batch = orderIds.subList(i, endIndex);
Object[][] batchArgs = batch.stream()
.map(id -> new Object[]{status.name(), new Date(), id})
.toArray(Object[][]::new);
jdbcTemplate.batchUpdate(sql, batchArgs);
}
}
}
异步补偿机制
异步补偿可以显著提高系统的响应速度,将耗时的补偿操作放到后台执行。
异步补偿队列设计
@Component
public class AsyncCompensationQueue {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String COMPENSATION_QUEUE = "compensation:queue";
private static final String COMPENSATION_PROCESSING = "compensation:processing";
/**
* 添加补偿任务
*/
public void addCompensationTask(CompensationTask task) {
// 序列化任务
String taskJson = toJson(task);
// 添加到队列
redisTemplate.opsForList().leftPush(COMPENSATION_QUEUE, taskJson);
// 记录监控指标
meterRegistry.counter("compensation.task.added").increment();
}
/**
* 获取补偿任务
*/
public CompensationTask getCompensationTask() {
String taskJson = (String) redisTemplate.opsForList().rightPop(COMPENSATION_QUEUE);
if (taskJson == null) {
return null;
}
// 标记为处理中
redisTemplate.opsForSet().add(COMPENSATION_PROCESSING, taskJson);
return fromJson(taskJson, CompensationTask.class);
}
/**
* 完成补偿任务
*/
public void completeCompensationTask(CompensationTask task) {
String taskJson = toJson(task);
redisTemplate.opsForSet().remove(COMPENSATION_PROCESSING, taskJson);
meterRegistry.counter("compensation.task.completed").increment();
}
/**
* 处理超时的补偿任务
*/
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void handleTimeoutTasks() {
// 获取处理中超时的任务
Set<String> processingTasks = redisTemplate.opsForSet().members(COMPENSATION_PROCESSING);
if (processingTasks == null || processingTasks.isEmpty()) {
return;
}
for (String taskJson : processingTasks) {
CompensationTask task = fromJson(taskJson, CompensationTask.class);
if (System.currentTimeMillis() - task.getStartTime() > 300000) { // 5分钟超时
// 重新加入队列
redisTemplate.opsForList().leftPush(COMPENSATION_QUEUE, taskJson);
redisTemplate.opsForSet().remove(COMPENSATION_PROCESSING, taskJson);
meterRegistry.counter("compensation.task.timeout").increment();
}
}
}
}
异步补偿处理器
@Component
public class AsyncCompensationProcessor {
@Autowired
private AsyncCompensationQueue compensationQueue;
@Autowired
private CompensationService compensationService;
@Autowired
private AlertService alertService;
/**
* 异步处理补偿任务
*/
@Async("compensationTaskExecutor")
@Scheduled(fixedDelay = 1000) // 每秒检查一次
public void processCompensationTasks() {
while (true) {
CompensationTask task = compensationQueue.getCompensationTask();
if (task == null) {
break;
}
try {
// 执行补偿
compensationService.executeCompensation(task);
// 完成任务
compensationQueue.completeCompensationTask(task);
} catch (Exception e) {
// 处理失败
handleCompensationFailure(task, e);
}
}
}
private void handleCompensationFailure(CompensationTask task, Exception e) {
task.setRetryCount(task.getRetryCount() + 1);
if (task.getRetryCount() < task.getMaxRetryCount()) {
// 重新加入队列
compensationQueue.addCompensationTask(task);
log.warn("Compensation task failed, requeued: " + task.getTaskId(), e);
} else {
// 超过最大重试次数
log.error("Compensation task failed permanently: " + task.getTaskId(), e);
alertService.sendAlert("Compensation task failed permanently: " + task.getTaskId());
// 记录失败任务
meterRegistry.counter("compensation.task.failed").increment();
}
}
}
数据分片与事务隔离
数据分片策略
数据分片是提高系统扩展性和性能的重要手段,通过将数据分散到多个节点上,可以有效缓解单点压力。
水平分片实现
@Component
public class ShardingService {
private static final int SHARD_COUNT = 16;
/**
* 根据用户ID计算分片
*/
public int calculateShardByUserId(String userId) {
return userId.hashCode() % SHARD_COUNT;
}
/**
* 根据订单ID计算分片
*/
public int calculateShardByOrderId(String orderId) {
return orderId.hashCode() % SHARD_COUNT;
}
/**
* 获取分片数据源
*/
public DataSource getShardDataSource(String key) {
int shard = key.hashCode() % SHARD_COUNT;
return dataSourceMap.get("shard_" + shard);
}
}
@Repository
public class ShardedOrderRepository {
@Autowired
private ShardingService shardingService;
@Autowired
private Map<String, DataSource> dataSourceMap;
/**
* 分片保存订单
*/
public Order save(Order order) {
int shard = shardingService.calculateShardByUserId(order.getUserId());
DataSource dataSource = dataSourceMap.get("shard_" + shard);
try (Connection conn = dataSource.getConnection()) {
String sql = "INSERT INTO orders (order_id, user_id, product_id, quantity, amount, status) " +
"VALUES (?, ?, ?, ?, ?, ?)";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, order.getOrderId());
ps.setString(2, order.getUserId());
ps.setString(3, order.getProductId());
ps.setInt(4, order.getQuantity());
ps.setBigDecimal(5, order.getAmount());
ps.setString(6, order.getStatus().name());
ps.executeUpdate();
}
} catch (SQLException e) {
throw new DataAccessException("Failed to save order to shard " + shard, e);
}
return order;
}
/**
* 分片查询订单
*/
public Order findByOrderId(String orderId) {
int shard = shardingService.calculateShardByOrderId(orderId);
DataSource dataSource = dataSourceMap.get("shard_" + shard);
try (Connection conn = dataSource.getConnection()) {
String sql = "SELECT * FROM orders WHERE order_id = ?";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, orderId);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
return mapResultSetToOrder(rs);
}
}
}
} catch (SQLException e) {
throw new DataAccessException("Failed to find order from shard " + shard, e);
}
return null;
}
}
事务隔离优化
在分布式环境中,合理的事务隔离级别可以平衡数据一致性和系统性能。
读写分离实现
@Configuration
public class ReadWriteSeparationConfig {
@Bean
public DataSource routingDataSource() {
RoutingDataSource routingDataSource = new RoutingDataSource();
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", masterDataSource());
dataSourceMap.put("slave1", slaveDataSource1());
dataSourceMap.put("slave2", slaveDataSource2());
routingDataSource.setTargetDataSources(dataSourceMap);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
@Bean
public DataSource masterDataSource() {
// 主库配置
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://master-db:3306/order_db");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public DataSource slaveDataSource1() {
// 从库1配置
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://slave1-db:3306/order_db");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public DataSource slaveDataSource2() {
// 从库2配置
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://slave2-db:3306/order_db");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
}
public class RoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
contextHolder.set(dataSourceType);
}
public static String getDataSourceType() {
return contextHolder.get();
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}
@Aspect
@Component
public class DataSourceRoutingAspect {
/**
* 读操作路由到从库
*/
@Before("@annotation(ReadOnly)")
public void routeToSlave() {
// 负载均衡选择从库
List<String> slaves = Arrays.asList("slave1", "slave2");
String selectedSlave = slaves.get(new Random().nextInt(slaves.size()));
DataSourceContextHolder.setDataSourceType(selectedSlave);
}
/**
* 写操作路由到主库
*/
@Before("@annotation(WriteOnly)")
public void routeToMaster() {
DataSourceContextHolder.setDataSourceType("master");
}
@After("@annotation(ReadOnly) || @annotation(WriteOnly)")
public void clearDataSourceType() {
DataSourceContextHolder.clearDataSourceType();
}
}
缓存优化
@Service
public class CachedOrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String ORDER_CACHE_PREFIX = "order:";
private static final long CACHE_EXPIRE_TIME = 300; // 5分钟
/**
* 带缓存的订单查询
*/
@ReadOnly
public Order getOrder(String orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
// 先从缓存查询
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 缓存未命中,从数据库查询
order = orderRepository.findByOrderId(orderId);
if (order != null) {
// 写入缓存
redisTemplate.opsForValue().set(cacheKey, order, CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
}
return order;
}
/**
* 更新订单时清除缓存
*/
@WriteOnly
public Order updateOrder(Order order) {
// 更新数据库
Order updatedOrder = orderRepository.save(order);
// 清除缓存
String cacheKey = ORDER_CACHE_PREFIX + order.getOrderId();
redisTemplate.delete(cacheKey);
return updatedOrder;
}
}
性能测试与调优
压力测试设计
合理的性能测试可以帮助我们发现系统瓶颈并进行针对性优化。
JMeter测试脚本
<!-- order_creation_test.jmx -->
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.4.1">
<hashTree>
<TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="订单创建性能测试" enabled="true">
<elementProp name="TestPlan.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
<collectionProp name="Arguments.arguments"/>
</elementProp>
<stringProp name="TestPlan.functional_mode">false</stringProp>
<stringProp name="TestPlan.serialize_threadgroups">false</stringProp>
<stringProp name="TestPlan.user_define_classpath"></stringProp>
</TestPlan>
<hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="订单创建线程组" enabled="true">
<stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
<boolProp name="LoopController.continue_forever">false</boolProp>
<stringProp name="LoopController.loops">1000</stringProp>
</elementProp>
<stringProp name="ThreadGroup.num_threads">50</stringProp>
<stringProp name="ThreadGroup.ramp_time">60</stringProp>
<boolProp name="ThreadGroup.scheduler">false</boolProp>
<stringProp name="ThreadGroup.duration"></stringProp>
<stringProp name="ThreadGroup.delay"></stringProp>
<boolProp name="ThreadGroup.same_user_on_next_iteration">true</boolProp>
</ThreadGroup>
<hashTree>
<HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="创建订单" enabled="true">
<elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
<collectionProp name="Arguments.arguments">
<elementProp name="orderId" elementType="HTTPArgument">
<boolProp name="HTTPArgument.always_encode">false</boolProp>
<stringProp name="Argument.value">ORDER_${__time(YMDHMS)}_${__threadNum}_${__counter(TRUE,)}</stringProp>
<stringProp name="Argument.name">orderId</stringProp>
</elementProp>
<elementProp name="userId" elementType="HTTPArgument">
<boolProp name="HTTPArgument.always_encode">false</boolProp>
<stringProp name="Argument.value">USER_${__Random(1,1000)}</stringProp>
<stringProp name="Argument.name">userId</stringProp>
</elementProp>
<elementProp name="productId" elementType="HTTPArgument">
<boolProp name="HTTPArgument.always_encode">false</boolProp>
<stringProp name="Argument.value">PRODUCT_${__Random(1,100)}</stringProp>
<stringProp name="Argument.name">productId</stringProp>
</elementProp>
<elementProp name="quantity" elementType="HTTPArgument">
<boolProp name="HTTPArgument.always_encode">false</boolProp>
<stringProp name="Argument.value">${__Random(1,10)}</stringProp>
<stringProp name="Argument.name">quantity</stringProp>
</elementProp>
</collectionProp>
</elementProp>
<stringProp name="HTTPSampler.domain">localhost</stringProp>
<stringProp name="HTTPSampler.port">8080</stringProp>
<stringProp name="HTTPSampler.protocol">http</stringProp>
<stringProp name="HTTPSampler.contentEncoding"></stringProp>
<stringProp name="HTTPSampler.path">/order</stringProp>
<stringProp name="HTTPSampler.method">POST</stringProp>
<boolProp name="HTTPSampler.follow_redirects">true</boolProp>
<boolProp name="HTTPSampler.auto_redirects">false</boolProp>
<boolProp name="HTTPSampler.use_keepalive">true</boolProp>
<boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
<stringProp name="HTTPSampler.embedded_url_re"></stringProp>
<stringProp name="HTTPSampler.connect_timeout"></stringProp>
<stringProp name="HTTPSampler.response_timeout"></stringProp>
</HTTPSamplerProxy>
<hashTree>
<ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="响应断言" enabled="true">
<collectionProp name="Asserion.test_strings">
<stringProp name="1627894047">"success":true</stringProp>
</collectionProp>
<stringProp name="Assertion.custom_message"></stringProp>
<stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
<boolProp name="Assertion.assume_success">false</boolProp>
<intProp name="Assertion.test_type">16</intProp>
</ResponseAssertion>
<hashTree/>
</hashTree>
<BackendListener guiclass="BackendListenerGui" testclass="BackendListener" testname="后端监听器" enabled="true">
<elementProp name="arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="参数" enabled="true">
<collectionProp name="Arguments.arguments">
<elementProp name="influxdbUrl" elementType="Argument">
<stringProp name="Argument.name">influxdbUrl</stringProp>
<stringProp name="Argument.value">http://localhost:8086/write?db=jmeter</stringProp>
</elementProp>
</collectionProp>
</elementProp>
<stringProp name="BackendListener.classname">org.apache.jmeter.visualizers.backend.influxdb.InfluxdbBackendListenerClient</stringProp>
</BackendListener>
<hashTree/>
</hashTree>
</hashTree>
</hashTree>
</jmeterTestPlan>
性能监控指标
@Component
public class PerformanceMetricsCollector {
private final MeterRegistry meterRegistry;
// 事务处理时间直方图
private final Timer transactionTimer;
// 并发用户数
private final Gauge concurrentUsers;
private final AtomicInteger currentUserCount = new AtomicInteger(0);
// 系统资源使用率
private final Gauge cpuUsage;
private final Gauge memoryUsage;
public PerformanceMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.transactionTimer = Timer.builder("transaction.processing.time")
.description("事务处理时间分布")
.register(meterRegistry);
this.concurrentUsers = Gauge.builder("concurrent.users")
.description("并发用户数")
.register(meterRegistry, currentUserCount, AtomicInteger::get);
this.cpuUsage = Gauge.builder("system.cpu.usage")
.description("CPU使用率")
.register(meterRegistry, this, PerformanceMetricsCollector::getCpuUsage);
this.memoryUsage = Gauge.builder("system.memory.usage")
.description("内存使用率")
.register(meterRegistry, this, PerformanceMetricsCollector::getMemoryUsage);
}
public void recordTransactionProcessing(long startTime) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(transactionTimer);
}
public void incrementUserCount() {
currentUserCount.incrementAndGet();
}
public void decrementUserCount() {
currentUserCount.decrementAndGet();
}
private double getCpuUsage() {
// 获取系统CPU使用率
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
return osBean.getSystemCpuLoad();
}
private double getMemoryUsage() {
// 获取系统内存使用率
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
return (double) heapUsage.getUsed() / heapUsage.getMax();
}
}
最佳实践总结
1. 分层优化策略
@Component
public class LayeredOptimizationService {
/**
* 应用层优化:缓存、异步处理
*/
public void applicationLayerOptimization() {
// 使用缓存减少数据库访问
// 使用异步处理提高响应速度
// 合理设计API减少网络调用
}
/**
* 服务层优化:批量处理、连接池
*/
public void serviceLayerOptimization() {
// 批量处理提高吞吐量
// 优化数据库连接池配置
// 使用读写分离
}
/**
* 数据层优化:索引、分片
*/
public void dataLayerOptimization() {
// 添加合适的索引
// 实施数据分片策略
// 优化SQL查询
}
}
2. 性能调优配置
# application.yml
server:
tomcat:
max-threads: 200
min-spare-threads: 10
accept-count: 100
max-connections: 8192
spring:
datasource:
hikari:
maximum-pool-size: 50
minimum-idle: 10
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
leak-detection-threshold: 60000
seata:
client:
rm:
async-commit-buffer-limit: 1000
lock:
retry-interval: 10
retry-times: 30
tm:
commit-retry-count: 5
rollback-retry-count: 5
transport:
shutdown:
wait: 3
3. 监控告警配置
# prometheus-rules.yml
groups:
- name: performance-alerts
rules:
# 高延迟告警
- alert: HighTransactionLatency
expr: histogram_quantile(0.95, sum(rate(transaction_processing_time_bucket[5m])) by (le)) > 2000
for: 2m
labels:
severity: warning
annotations:
summary: "事务处理延迟过高"
description: "95%的事务处理时间超过2秒: {{ $value }}ms"
# 低吞吐量告警
- alert: LowTransactionThroughput
expr: rate(transaction_processed[5m]) < 100
for: 2m
labels:
severity: warning
annotations:
summary: "事务处理吞吐量过低"
description: "过去5分钟事务处理速率低于100 TPS: {{ $value }}"
# 高错误率告警
- alert: HighErrorRate
expr: rate(transaction_failed[5m]) / rate(transaction_processed[5m]) > 0.05
for: 1m
labels:
severity: critical
annotations:
summary: "事务错误率过高"
description: "事务错误率超过5%: {{ $value }}"
总结
高并发与性能优化是分布式事务系统面临的重要挑战。通过合理的锁粒度优化、批量处理、异步补偿、数据分片等技术手段,我们可以显著提升系统的性能和可扩展性。
在实际应用中,我们需要:
- 识别性能瓶颈:通过性能测试和监控识别系统的瓶颈点
- 分层优化:从应用层、服务层、数据层等多个层面进行优化
- 合理配置:优化系统配置参数,如线程池、连接池等
- 监控告警:建立完善的监控告警体系,及时发现性能问题
- 持续优化:根据业务发展和系统演进持续进行性能优化
通过系统性的性能优化工作,我们可以构建出高性能、高可用的分布式事务系统,为业务的快速发展提供强有力的技术支撑。