使用AWS EventBridge和Lambda构建事件驱动无噪音告警管道

本文详细介绍了如何使用AWS EventBridge和Lambda构建事件驱动的无噪音告警系统,包括架构设计、代码实现和最佳实践,帮助解决传统告警系统的误报和漏报问题。

为什么传统告警系统不再适用

我数不清有多少次看到告警系统无缘无故地轰炸我的收件箱,一个小小的流量峰值或轻微延迟就会引发混乱。但当真正出现故障时,却收不到任何通知。大多数这些系统都是多年前为静态服务器和可预测负载设计的,不适合我们今天处理的动态云环境。

为什么选择EventBridge + Lambda?

EventBridge和Lambda共同使得构建事件驱动的告警系统变得简单,既实时又易于维护。

  • EventBridge 作为智能事件总线,接收原生AWS事件或自定义事件,通过JSON规则过滤,失败时重试,并将事件路由到正确目标
  • Lambda 运行路由、丰富和修复代码,可自动扩展以处理每秒数千个事件

架构概述

基本告警流程

事件源

  • 原生AWS事件(如S3对象创建、ECS任务停止、Step Functions失败)
  • CloudWatch告警状态变更
  • 通过PutEvents或API目的地发送的自定义应用事件

处理流程

Lambda首先检查事件结构,然后添加相关标签(如关联ID、租户和严重性)。如果已看到事件ID或域键,则跳过处理。

分发操作

  • 分页:针对Sev 1或Sev 2事件,触发SNS通知或在事件管理工具中创建工单
  • 聊天:在Slack或Teams中发送快速消息
  • 可观测性:将指标发送到CloudWatch,原始事件存储到S3

构建简单告警管道

步骤1:创建EventBridge规则

为S3 ObjectCreated事件创建EventBridge规则,过滤incoming/前缀:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["vishal-alerts-bucket"]
    },
    "object": {
      "key": [{
        "prefix": "incoming/"
      }]
    }
  }
}

步骤2:编写Lambda函数

以下是使用Python编写的最小化、幂等的Lambda路由器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
log = logging.getLogger()
log.setLevel(logging.INFO)

sns = boto3.client("sns")
ddb = boto3.client("dynamodb")

TOPIC_ARN = os.environ["ALERT_TOPIC_ARN"]
IDEM_TABLE = os.environ["IDEM_TABLE"]
IDEM_TTL_SECONDS = int(os.environ.get("IDEM_TTL_SECONDS", "21600"))  # 6h

def _now_epoch():
    return int(time.time())

def _ttl_epoch(seconds):
    return _now_epoch() + seconds

def is_duplicate(correlation_id: str) -> bool:
    # 条件写入;如果项目已存在,我们知道它是重复的
    try:
        ddb.put_item(
            TableName=IDEM_TABLE,
            Item={
                "correlationId": {"S": correlation_id},
                "created_at": {"N": str(_now_epoch())},
                "expires_at": {"N": str(_ttl_epoch(IDEM_TTL_SECONDS))}
            },
            ConditionExpression="attribute_not_exists(correlationId)"
        )
        return False
    except ClientError as e:
        if e.response["Error"]["Code"] in ("ConditionalCheckFailedException",):
            return True
        raise

def derive_servity(evt_detail) -> str:
    # 示例启发式方法:如果未提供,从详细信息内容派生严重性
    src = evt_detail.get("bucket", {}).get("name", "")
    key = evt_detail.get("object", {}).get("key", "") or ""
    # 示例规则
    if key.startswith("incoming/critical/"):
        return "high"
    if key.endswith(".err") or key.endswith(".failed"):
        return "high"
    return "info"

def handler(event, context):
    # EventBridge信封(每次调用单个事件)
    log.debug(json.dumps(event))
    eid = event.get("id")
    detail = event.get("detail") or {}
    sev = (detail.get("severity") or derive_servity(detail)).lower()
    tenant = detail.get("tenant") or "unknown"
    log.info(json.dumps({"received": True, "correlationId": eid, "severity": sev, "tenant": tenant}))

    # 关联ID:优先使用EventBridge event.id;否则使用详细信息的确定性哈希
    correlation_id = eid or hashlib.md5(json.dumps(detail, sort_keys=True).encode()).hexdigest()

    if is_duplicate(correlation_id):
        log.info(json.dumps({"skipped": "duplicate", "correlationId": correlation_id}))
        return {"skipped": "duplicate"}

    msg = {
        "correlationId": correlation_id,
        "tenant": tenant,
        "severity": sev,
        "event": detail
    }

    # 发出EMF指标(如果提供DurationMs)
    if "job" in detail and "durationMs" in detail:
        log.info(json.dumps({
          "_aws": {
            "Timestamp": int(time.time() * 1000),
            "CloudWatchMetrics": [{
              "Namespace": "App/ETL",
              "Dimensions": [["job","tenant"]],
              "Metrics": [{"Name": "DurationMs", "Unit": "Milliseconds"}]
            }]
          },
          "job": detail["job"],
          "tenant": tenant,
          "DurationMs": float(detail["durationMs"])
        }))

    try:
        if sev in ("critical", "high", "sev1", "sev2"):
            sns.publish(
                TopicArn=TOPIC_ARN,
                Message=json.dumps(msg),
                Subject=f"[{tenant}] {sev.upper()} event"
            )
        # 否则:可以推送到SQS/Firehose/Webhook

        log.info(json.dumps({"ok": True, "correlationId": correlation_id}))
        return {"ok": True}
    except ClientError as e:
        log.error(json.dumps({"error": str(e), "correlationId": correlation_id}))
        raise

步骤3:将告警路由到正确渠道

每个渠道都有其用途:

  • 电子邮件:适用于每日摘要或低优先级提醒
  • 聊天:适用于实时分类
  • 分页:仅用于严重事件

步骤4:持久化指标和事件用于可观测性

CloudWatch指标(EMF)

使用嵌入式指标格式直接从Lambda记录结构化指标。

将原始事件流式传输到S3

使用Firehose以Parquet格式将原始事件持久化到S3,分区如:dt=YYYY-MM-DD/tenant=acme/

SLA心跳检查器(DynamoDB + Scheduler)

EventBridge调度程序每隔几分钟运行一次,触发检查逾期作业的Lambda。

步骤5:测试、演练和部署

在部署前:

  • 发送一些模拟S3事件并跟踪correlationId
  • 故意终止目标,确保EventBridge和Lambda DLQ都能捕获未通过的事件
  • 通过CI/CD推送部署
  • 为DLQ增长或错误指标添加告警
  • 定期运行"消防演练"

扩展方案

基础设置就绪后,可以添加:

  • 异常检测
  • 工作流遥测
  • 跨账户设置
  • 多租户扩展
  • 成本控制

经验教训

有效的方法

  • 三层设计(指标、原始事件、心跳)能早期捕获大多数问题
  • 将缺失数据视为告警条件
  • 将告警合并到线程或事件中

无效的方法

  • 仅依赖电子邮件告警
  • 过度分页会产生持续噪音
  • 忽略DLQ运行状况

安全和合规说明

  • 在EventBridge、Lambda、SNS和其他服务上实施最小权限IAM
  • 使用KMS进行加密
  • 将聊天和工单集成的令牌存储在AWS Secrets Manager中
  • 避免将PII发送到聊天或日志

结论

使用EventBridge和Lambda,您可以构建一个快速响应、必要时扩展且大部分时间保持安静的告警设置。从小规模开始,随着增长逐步添加仪表板、异常检测和跨账户路由。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计