1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# ----- 具有决策、超时和日志记录的协调器 ---------------------------
class KYCOrchestrator:
def __init__(
self,
*,
onboarding: OnboardingAgent,
documents: DocumentAgent,
risk: RiskAgent,
compliance: ComplianceAgent,
monitoring: MonitoringAgent,
step_timeout_sec: float = 30.0,
logger: Optional[logging.Logger] = None,
) -> None:
self.onboarding = onboarding
self.documents = documents
self.risk = risk
self.compliance = compliance
self.monitoring = monitoring
self.step_timeout_sec = step_timeout_sec
self.log = logger or logging.getLogger("kyc.orchestrator")
async def process_kyc_application(self, customer_data: Dict[str, Any]):
case_id = str(uuid.uuid4())
self.log.info("KYC案例已创建", extra={"case_id": case_id, "customer_id": customer_data.get("customer_id")})
try:
onboarding_result = await asyncio.wait_for(
self.onboarding.determine_requirements(customer_data, case_id),
timeout=self.step_timeout_sec,
)
self.log.debug("入职完成", extra={"case_id": case_id, "kyc_level": onboarding_result.kyc_level})
verification_result = await asyncio.wait_for(
self.documents.process_documents(onboarding_result.submitted_documents, case_id),
timeout=self.step_timeout_sec,
)
risk_result = await asyncio.wait_for(
self.risk.calculate_risk_profile(verification_result, customer_data, case_id),
timeout=self.step_timeout_sec,
)
self.log.debug("风险评估完成", extra={"case_id": case_id, "risk_score": risk_result.risk_score})
decision = await asyncio.wait_for(
self.compliance.make_final_decision(verification_result, risk_result, case_id),
timeout=self.step_timeout_sec,
)
self.log.info("合规决策", extra={"case_id": case_id, "status": decision.status, "rationale": decision.rationale})
if decision.status == "approved":
await self.monitoring.initialize_monitoring(customer_data, risk_result.risk_score, case_id)
return decision
except asyncio.TimeoutError:
self.log.error("KYC流程超时", extra={"case_id": case_id})
return ComplianceDecision(
status="manual_review",
rationale="处理超时。路由至L3审核。",
case_id=case_id,
)
except Exception as e:
# 全面捕获以避免丢失案例;保持审计追踪完整
self.log.exception("KYC流程中的未处理错误", extra={"case_id": case_id})
return ComplianceDecision(
status="manual_review",
rationale=f"意外错误:{type(e).__name__}。路由至L3审核。",
case_id=case_id,
)
|