PHP实现Saga模式:微服务分布式事务的优雅协调方案

本文深入探讨了在PHP微服务架构中使用Saga模式管理分布式事务的完整方案,涵盖理论基础、实现方法、高级技巧和监控策略,提供了详细的代码示例和架构设计思路。

Saga模式与PHP:微服务中分布式事务的精妙协调

阅读时间:约45分钟

想象一下通过移动应用订购披萨的场景。在幕后,一系列微服务协同工作:验证您的账户、在卡上预留资金、在餐厅系统中创建订单、通知配送服务、扣除积分。当其中一个步骤出现问题时会发生什么?这就是Saga模式登场的时候——一个优雅的分布式事务管理解决方案。

理论基础

CAP定理与分布式系统

在深入了解Saga之前,理解我们所处的理论约束至关重要。由Eric Brewer提出的CAP定理指出,在任何分布式数据存储中,您只能同时保证以下三个属性中的两个:

  • 一致性(C):所有节点同时看到相同的数据
  • 可用性(A):系统保持运行和响应
  • 分区容错性(P):系统在网络故障时继续运行

在微服务架构中,网络分区是不可避免的,因此我们必须在一致性和可用性之间做出选择。Saga模式本质上是通过提供最终一致性同时保持高可用性来管理这种权衡。

ACID vs BASE属性

传统数据库提供ACID保证:

  • 原子性:全有或全无执行
  • 一致性:有效状态转换
  • 隔离性:并发操作互不干扰
  • 持久性:提交的更改持久存在

分布式系统通常采用BASE属性:

  • 基本可用:系统在故障时保持可用
  • 软状态:状态可能随时间变化而无需输入
  • 最终一致性:系统最终将变得一致

Saga模式通过以即时一致性换取可用性和分区容错性来体现BASE原则。

分布式系统中的事务模型

两阶段提交(2PC)

传统的2PC协议试图在分布式系统中维护ACID属性:

  • 准备阶段:协调者要求所有参与者准备
  • 提交阶段:如果全部同意,协调者告诉所有参与者提交

2PC的问题:

  • 阻塞协议(单点故障)
  • 由于同步性质导致性能差
  • 无法优雅处理协调者故障
  • 长时间锁定资源

三阶段提交(3PC)

3PC添加"预提交"阶段以减少阻塞场景,但引入了额外的复杂性和网络开销。

Saga事务

Saga模式采用根本不同的方法:

  • 补偿事务:使用可逆操作而不是锁
  • 前向恢复:完成saga或补偿已完成的步骤
  • 无全局锁:每个步骤都是本地事务
  • 异步:非阻塞执行模型

为什么ACID在微服务中不起作用

在单体应用中,我们习惯于ACID事务的舒适性。数据库保证要么所有操作成功,要么都不成功。但在微服务架构中,每个服务都有自己的数据库,传统事务变得无能为力。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 这在分布式系统中不起作用
try {
    $db->beginTransaction();

    $userService->debitAccount($userId, $amount);     // 数据库A
    $inventoryService->reserveItem($itemId);          // 数据库B  
    $orderService->createOrder($orderData);           // 数据库C
    $notificationService->sendConfirmation($email);   // 外部API

    $db->commit(); // 无法控制所有服务!
} catch (Exception $e) {
    $db->rollback(); // 只回滚本地更改
}

问题很明显:我们无法保证跨不同系统分布的操作的原子性。

Saga模式:分而治之

Saga模式优雅地解决了这个问题:不是使用一个大型事务,而是创建一系列本地事务,每个事务在失败时都可以被补偿。

核心概念

1. 可补偿事务

可以使用补偿操作"撤销"的操作:

  • 预留资金 ↔ 释放资金
  • 创建订单 ↔ 取消订单
  • 发送邮件 ↔ 发送取消邮件

2. 枢轴事务(不归点)

在此之后saga必须成功完成的操作。通常是不可逆的操作,如扣款信用卡或发货。

3. 可重试事务

可以安全重复直到成功完成的幂等操作。

4. 关键部分

必须在单个服务边界内原子执行的saga部分。

数学模型

Saga S可以表示为事务序列:

1
S = T₁, T₂, T₃, ..., Tₙ

每个事务Tᵢ都有相应的补偿事务Cᵢ:

1
S = {T₁, T₂, ..., Tₙ} with {C₁, C₂, ..., Cₙ}

对于成功执行:

1
T₁ • T₂ • T₃ • ... • Tₙ = Success

对于步骤k的失败执行:

1
T₁ • T₂ • ... • Tₖ (失败) → Cₖ₋₁ • Cₖ₋₂ • ... • C₁

其中•表示顺序组合。

实现方法

1. 编排:服务的舞蹈

在编排方法中,服务通过事件交互,没有中央协调器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 订单服务
class OrderService 
{
    public function createOrder(array $orderData): void
    {
        $orderId = $this->repository->create($orderData);

        // 发布下一步事件
        $this->eventBus->publish(new OrderCreatedEvent($orderId, $orderData));
    }

    // 补偿操作
    public function cancelOrder(string $orderId): void
    {
        $this->repository->markAsCancelled($orderId);
        $this->eventBus->publish(new OrderCancelledEvent($orderId));
    }
}

// 支付服务
class PaymentService
{
    public function handleOrderCreated(OrderCreatedEvent $event): void
    {
        try {
            $paymentId = $this->processPayment($event->getAmount(), $event->getCardToken());
            $this->eventBus->publish(new PaymentProcessedEvent($event->getOrderId(), $paymentId));
        } catch (PaymentException $e) {
            $this->eventBus->publish(new PaymentFailedEvent($event->getOrderId(), $e->getMessage()));
        }
    }

    public function handleOrderCancelled(OrderCancelledEvent $event): void
    {
        // 补偿支付
        $this->refundPayment($event->getOrderId());
    }
}

编排优点:

  • 无单点故障
  • 简单的小型系统
  • 高性能
  • 自然解耦

编排缺点:

  • 随着系统增长调试复杂
  • 循环依赖风险
  • 测试困难
  • 难以维护全局不变量

2. 协调:指挥管理乐团

在协调方法中,中央协调器管理整个过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class OrderSagaOrchestrator
{
    private array $steps = [];
    private array $compensations = [];

    public function __construct(
        private OrderService $orderService,
        private PaymentService $paymentService,
        private InventoryService $inventoryService,
        private NotificationService $notificationService,
        private SagaRepository $sagaRepository
    ) {
        $this->defineSteps();
    }

    private function defineSteps(): void
    {
        $this->steps = [
            'reserve_inventory' => [$this->inventoryService, 'reserveItems'],
            'process_payment' => [$this->paymentService, 'processPayment'],
            'create_order' => [$this->orderService, 'createOrder'], // 枢轴点
            'send_confirmation' => [$this->notificationService, 'sendOrderConfirmation']
        ];

        $this->compensations = [
            'reserve_inventory' => [$this->inventoryService, 'releaseItems'],
            'process_payment' => [$this->paymentService, 'refundPayment'],
            'create_order' => [$this->orderService, 'cancelOrder'],
            'send_confirmation' => null // 不需要补偿
        ];
    }

    public function executeOrderSaga(array $orderData): SagaResult
    {
        $sagaId = $this->generateSagaId();
        $saga = new OrderSaga($sagaId, $orderData);

        try {
            foreach ($this->steps as $stepName => $callable) {
                $this->executeStep($saga, $stepName, $callable);
                $this->sagaRepository->updateProgress($saga);
            }

            $saga->markAsCompleted();
            return new SagaResult(true, '订单处理成功');

        } catch (SagaException $e) {
            $this->compensateFailedSaga($saga);
            return new SagaResult(false, $e->getMessage());
        }
    }

    private function executeStep(OrderSaga $saga, string $stepName, callable $step): void
    {
        try {
            $result = call_user_func($step, $saga->getData());
            $saga->markStepCompleted($stepName, $result);

        } catch (Exception $e) {
            $saga->markStepFailed($stepName, $e->getMessage());
            throw new SagaException("步骤 {$stepName} 失败: " . $e->getMessage());
        }
    }

    private function compensateFailedSaga(OrderSaga $saga): void
    {
        $completedSteps = array_reverse($saga->getCompletedSteps());

        foreach ($completedSteps as $stepName => $stepResult) {
            if ($compensation = $this->compensations[$stepName]) {
                try {
                    call_user_func($compensation, $saga->getData(), $stepResult);
                    $saga->markStepCompensated($stepName);
                } catch (Exception $e) {
                    // 记录补偿错误但继续
                    $this->logger->error("步骤 {$stepName} 补偿失败: " . $e->getMessage());
                }
            }
        }

        $saga->markAsCompensated();
        $this->sagaRepository->update($saga);
    }
}

协调优点:

  • 集中控制和可见性
  • 更容易调试和测试
  • 清晰的业务流程
  • 更好地处理复杂工作流

协调缺点:

  • 单点故障(协调器)
  • 潜在性能瓶颈
  • 协调器与服务之间的紧耦合

高级实现技术

Saga状态和持久化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class SagaState
{
    const STATUS_RUNNING = 'running';
    const STATUS_COMPLETED = 'completed';
    const STATUS_COMPENSATING = 'compensating';
    const STATUS_COMPENSATED = 'compensated';
    const STATUS_FAILED = 'failed';

    public function __construct(
        private string $sagaId,
        private string $sagaType,
        private array $data,
        private string $status = self::STATUS_RUNNING,
        private array $completedSteps = [],
        private array $compensatedSteps = [],
        private ?string $currentStep = null,
        private array $metadata = []
    ) {}

    public function toArray(): array
    {
        return [
            'saga_id' => $this->sagaId,
            'saga_type' => $this->sagaType,
            'data' => json_encode($this->data),
            'status' => $this->status,
            'completed_steps' => json_encode($this->completedSteps),
            'compensated_steps' => json_encode($this->compensatedSteps),
            'current_step' => $this->currentStep,
            'metadata' => json_encode($this->metadata),
            'created_at' => date('Y-m-d H:i:s'),
            'updated_at' => date('Y-m-d H:i:s')
        ];
    }
}

class DatabaseSagaRepository implements SagaRepository
{
    public function save(SagaState $saga): void
    {
        $data = $saga->toArray();

        $sql = "INSERT INTO sagas (saga_id, saga_type, data, status, completed_steps, 
                compensated_steps, current_step, metadata, created_at, updated_at) 
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

        $this->db->execute($sql, array_values($data));
    }

    public function findPendingSagas(): array
    {
        $sql = "SELECT * FROM sagas WHERE status IN (?, ?) AND updated_at < ?";
        $timeoutThreshold = date('Y-m-d H:i:s', strtotime('-5 minutes'));

        return $this->db->fetchAll($sql, [
            SagaState::STATUS_RUNNING,
            SagaState::STATUS_COMPENSATING,
            $timeoutThreshold
        ]);
    }
}

处理超时和恢复

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class SagaRecoveryService
{
    public function __construct(
        private SagaRepository $repository,
        private array $orchestrators = []
    ) {}

    public function recoverPendingSagas(): void
    {
        $pendingSagas = $this->repository->findPendingSagas();

        foreach ($pendingSagas as $sagaData) {
            $saga = SagaState::fromArray($sagaData);
            $orchestrator = $this->getOrchestratorForType($saga->getType());

            if ($saga->getStatus() === SagaState::STATUS_RUNNING) {
                // 尝试继续执行
                $orchestrator->resumeSaga($saga);
            } elseif ($saga->getStatus() === SagaState::STATUS_COMPENSATING) {
                // 继续补偿
                $orchestrator->continueCompensation($saga);
            }
        }
    }

    private function getOrchestratorForType(string $sagaType): SagaOrchestrator
    {
        if (!isset($this->orchestrators[$sagaType])) {
            throw new RuntimeException("未找到saga类型的协调器: {$sagaType}");
        }

        return $this->orchestrators[$sagaType];
    }
}

幂等性和去重

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class IdempotentSagaStep
{
    public function __construct(
        private string $stepId,
        private callable $operation,
        private RedisAdapter $cache
    ) {}

    public function execute(array $data): mixed
    {
        $cacheKey = "saga_step:{$this->stepId}:" . md5(serialize($data));

        // 检查操作是否之前执行过
        if ($result = $this->cache->get($cacheKey)) {
            return unserialize($result);
        }

        $result = call_user_func($this->operation, $data);

        // 缓存结果以备可能重新执行
        $this->cache->setex($cacheKey, 3600, serialize($result));

        return $result;
    }
}

// 使用
$idempotentPayment = new IdempotentSagaStep(
    'process_payment',
    [$this->paymentService, 'processPayment'],
    $this->redis
);

$paymentResult = $idempotentPayment->execute($paymentData);

理论挑战和解决方案

数据一致性异常

ABA问题

在分布式系统中,值可能在观察之间从A变为B再变回A。Saga模式通过以下方式解决此问题:

  • 版本向量:跟踪因果关系
  • 逻辑时间戳:一致地排序事件
  • 乐观锁:检测并发修改
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class VersionedEntity
{
    private array $vectorClock = [];

    public function updateVector(string $nodeId): void
    {
        $this->vectorClock[$nodeId] = ($this->vectorClock[$nodeId] ?? 0) + 1;
    }

    public function compareVector(array $otherVector): int
    {
        // 如果this < other返回-1,如果this > other返回1,如果并发返回0
        $thisGreater = false;
        $otherGreater = false;

        $allNodes = array_unique(array_merge(
            array_keys($this->vectorClock),
            array_keys($otherVector)
        ));

        foreach ($allNodes as $node) {
            $thisValue = $this->vectorClock[$node] ?? 0;
            $otherValue = $otherVector[$node] ?? 0;

            if ($thisValue > $otherValue) {
                $thisGreater = true;
            } elseif ($thisValue < $otherValue) {
                $otherGreater = true;
            }
        }

        if ($thisGreater && !$otherGreater) return 1;
        if ($otherGreater && !$thisGreater) return -1;
        return 0; // 并发
    }
}

丢失更新问题

当多个saga并发修改同一资源时: 解决方案:语义锁定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class SemanticLock
{
    public function __construct(private RedisAdapter $redis) {}

    public function acquireLock(string $resource, string $sagaId, int $ttl = 300): bool
    {
        $lockKey = "lock:{$resource}";
        $lockValue = "{$sagaId}:" . time();

        // 带TTL的原子设置(如果不存在)
        return $this->redis->set($lockKey, $lockValue, ['NX', 'EX' => $ttl]);
    }

    public function releaseLock(string $resource, string $sagaId): bool
    {
        $lockKey = "lock:{$resource}";
        $expectedValue = "{$sagaId}:" . time();

        // 用于原子比较和删除的Lua脚本
        $script = "
            if redis.call('GET', KEYS[1]) == ARGV[1] then
                return redis.call('DEL', KEYS[1])
            else
                return 0
            end
        ";

        return $this->redis->eval($script, [$lockKey], [$expectedValue]) === 1;
    }
}

class InventoryService
{
    public function reserveItems(array $items, string $sagaId): array
    {
        $locks = [];
        $reserved = [];

        try {
            // 获取所有项目的语义锁
            foreach ($items as $item) {
                $resourceId = "inventory:{$item['sku']}";
                if ($this->semanticLock->acquireLock($resourceId, $sagaId)) {
                    $locks[] = $resourceId;
                } else {
                    throw new ResourceLockedException("无法锁定项目: {$item['sku']}");
                }
            }

            // 预留项目
            foreach ($items as $item) {
                $this->repository->reserveItem($item['sku'], $item['quantity'], $sagaId);
                $reserved[] = $item['sku'];
            }

            return ['reserved_items' => $reserved, 'locks' => $locks];

        } catch (Exception $e) {
            // 释放已获取的锁
            foreach ($locks as $resource) {
                $this->semanticLock->releaseLock($resource, $sagaId);
            }

            throw $e;
        }
    }
}

脏读问题

从其他saga读取未提交的数据: 解决方案:可交换更新

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class AccountService
{
    // 使用操作而不是直接余额更改
    public function debitAccount(string $accountId, float $amount, string $sagaId): void
    {
        $operation = new AccountOperation([
            'account_id' => $accountId,
            'type' => 'DEBIT',
            'amount' => $amount,
            'saga_id' => $sagaId,
            'timestamp' => microtime(true)
        ]);

        $this->operationQueue->push($operation);
    }

    public function creditAccount(string $accountId, float $amount, string $sagaId): void
    {
        $operation = new AccountOperation([
            'account_id' => $accountId,
            'type' => 'CREDIT',
            'amount' => $amount,
            'saga_id' => $sagaId,
            'timestamp' => microtime(true)
        ]);

        $this->operationQueue->push($operation);
    }

    // 操作处理器以正确顺序应用它们
    public function processAccountOperations(string $accountId): void
    {
        $operations = $this->operationQueue->getForAccount($accountId);

        // 按时间戳排序以获得正确顺序
        usort($operations, fn($a, $b) => $a['timestamp'] <=> $b['timestamp']);

        $balance = $this->getAccountBalance($accountId);

        foreach ($operations as $operation) {
            if ($operation['type'] === 'DEBIT') {
                $balance -= $operation['amount'];
            } else {
                $balance += $operation['amount'];
            }
        }

        $this->updateAccountBalance($accountId, $balance);
        $this->operationQueue->clearForAccount($accountId);
    }
}

高级理论模式

嵌套Saga

复杂的业务流程可能需要分层saga结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
abstract class NestedSaga extends BaseSaga
{
    protected array $childSagas = [];

    protected function executeChildSaga(string $sagaType, array $data): SagaResult
    {
        $childSaga = $this->sagaFactory->create($sagaType, $data);
        $this->childSagas[] = $childSaga;

        return $childSaga->execute();
    }

    protected function compensateChildSagas(): void
    {
        foreach (array_reverse($this->childSagas) as $childSaga) {
            if ($childSaga->getStatus() === SagaState::STATUS_COMPLETED) {
                $childSaga->compensate();
            }
        }
    }
}

带确认的Saga模式

需要外部批准的操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class ConfirmationRequiredSaga extends BaseSaga
{
    const STATUS_PENDING_CONFIRMATION = 'pending_confirmation';

    protected function defineSteps(): array
    {
        return [
            'prepare_order' => new PrepareOrderStep(),
            'await_confirmation' => new AwaitConfirmationStep(),
            'finalize_order' => new FinalizeOrderStep()
        ];
    }

    public function requestUserConfirmation(): void
    {
        $this->status = self::STATUS_PENDING_CONFIRMATION;

        // 向用户发送确认请求
        $this->notificationService->sendConfirmationRequest(
            $this->data['customer']['email'],
            $this->sagaId
        );

        // 设置超时
        $this->scheduleTimeout(300); // 5分钟
    }

    public function handleUserConfirmation(bool $confirmed): void
    {
        if ($confirmed) {
            $this->continueExecution();
        } else {
            $this->startCompensation();
        }
    }

    public function handleTimeout(): void
    {
        // 超时时,假设用户拒绝
        $this->handleUserConfirmation(false);
    }
}

Saga中的并行分支

可以并发执行的操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ParallelSagaOrchestrator
{
    public function executeParallelSteps(SagaState $saga, array $parallelSteps): array
    {
        $promises = [];

        foreach ($parallelSteps as $stepName => $step) {
            $promises[$stepName] = $this->asyncExecutor->execute(
                fn() => $step->execute($saga)
            );
        }

        // 等待所有并行操作完成
        $results = [];
        $failures = [];

        foreach ($promises as $stepName => $promise) {
            try {
                $results[$stepName] = $promise->wait();
            } catch (Exception $e) {
                $failures[$stepName] = $e;
            }
        }

        // 如果有失败,补偿成功操作
        if (!empty($failures)) {
            foreach ($results as $stepName => $result) {
                $parallelSteps[$stepName]->compensate($saga, $result);
            }

            throw new ParallelStepFailureException($failures);
        }

        return $results;
    }
}

形式验证和测试

基于属性的测试

Saga实现应满足某些不变量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class SagaPropertyTests extends TestCase
{
    /**
     * 属性:如果saga成功完成,所有步骤都已执行
     */
    public function testCompletenessProperty(): void
    {
        $this->forAll(
            Generator\elements(['order', 'payment', 'shipping']),
            Generator\associative(['amount' => Generator\positive_float()])
        )->then(function ($sagaType, $data) {
            $saga = $this->createSaga($sagaType, $data);
            $result = $saga->execute();

            if ($result->isSuccess()) {
                $this->assertAllStepsCompleted($saga);
            }
        });
    }

    /**
     * 属性:如果saga失败,所有完成的步骤都被补偿
     */
    public function testCompensationProperty(): void
    {
        $this->forAll(
            Generator\elements(['order', 'payment', 'shipping']),
            Generator\associative(['amount' => Generator\positive_float()])
        )->then(function ($sagaType, $data) {
            $saga = $this->createFailingSaga($sagaType, $data);
            $result = $saga->execute();

            if (!$result->isSuccess()) {
                $this->assertAllCompletedStepsCompensated($saga);
            }
        });
    }

    /**
     * 属性:Saga执行是幂等的
     */
    public function testIdempotencyProperty(): void
    {
        $this->forAll(
            Generator\elements(['order', 'payment', 'shipping']),
            Generator\associative(['amount' => Generator\positive_float()])
        )->then(function ($sagaType, $data) {
            $saga1 = $this->createSaga($sagaType, $data);
            $saga2 = $this->createSaga($sagaType, $data);

            $result1 = $saga1->execute();
            $result2 = $saga2->execute();

            $this->assertEquals($result1->isSuccess(), $result2->isSuccess());
        });
    }
}

监控和可观察性

全面的Saga跟踪

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class SagaTracker
{
    public function __construct(
        private LoggerInterface $logger,
        private MetricsCollector $metrics,
        private EventBus $eventBus
    ) {}

    public function trackSagaStarted(string $sagaId, string $sagaType, array $data): void
    {
        $this->logger->info("Saga已开始", [
            'saga_id' => $sagaId,
            'saga_type' => $sagaType,
            'data' => $data
        ]);

        $this->metrics->increment('saga.started', ['type' => $sagaType]);

        $this->eventBus->publish(new SagaStartedEvent($sagaId, $sagaType, $data));
    }

    public function trackStepCompleted(string $sagaId, string $stepName, $result, float $duration): void
    {
        $this->logger->info("Saga步骤已完成", [
            'saga_id' => $sagaId,
            'step' => $stepName,
            'duration' => $duration
        ]);

        $this->metrics->histogram('saga.step.duration', $duration, [
            'step' => $stepName
        ]);
    }

    public function trackSagaFailed(string $sagaId, string $reason, array $context): void
    {
        $this->logger->error("Saga失败", [
            'saga_id' => $sagaId,
            'reason' => $reason,
            'context' => $context
        ]);

        $this->metrics->increment('saga.failed');

        // 发送警报到监控系统
        $this->eventBus->publish(new SagaFailedEvent($sagaId, $reason, $context));
    }
}

class SagaDashboard
{
    public function getSagaStatistics(): array
    {
        return [
            'total_sagas' => $this->repository->getTotalCount(),
            'running_sagas' => $this->repository->getCountByStatus('running'),
            'completed_sagas' => $this->repository->getCountByStatus('completed'),
            'failed_sagas' => $this->repository->getCountByStatus('failed'),
            'average_duration' => $this->repository->getAverageDuration(),
            'success_rate' => $this->calculateSuccessRate(),
            'most_failed_steps' => $this->repository->getMostFailedSteps(10)
        ];
    }

    public function getSagaDetails(string $sagaId): array
    {
        $saga = $this->repository->findById($sagaId);
        $timeline = $this->repository->getSagaTimeline($sagaId);

        return [
            'saga' => $saga,
            'timeline' => $timeline,
            'duration' => $this->calculateDuration($timeline),
            'current_status' => $saga->getStatus()
        ];
    }
}
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计