使用AWS EventBridge和Lambda构建事件驱动无噪声告警管道

本文详细介绍如何使用AWS EventBridge和Lambda构建事件驱动的告警管道,解决传统告警系统噪声过多的问题,包含完整架构设计和实现代码。

为什么传统告警系统不再适用

我数不清有多少次看到告警系统无缘无故地轰炸我的收件箱——一个快速的流量高峰、轻微的延迟,突然间就变得混乱不堪。但当真正出现故障时,却什么通知都没有。大多数这些系统都是多年前设计的,适用于静态服务器和可预测的负载,而不是我们今天处理的动态云环境。

为什么选择EventBridge + Lambda?

EventBridge和Lambda共同使得构建既实时又低维护的事件驱动告警系统变得容易。

  • EventBridge 充当智能事件总线,接收原生AWS事件或自定义事件,通过JSON规则过滤它们,在失败时重试,并将它们路由到正确的目标(Lambda、SNS、Firehose等)
  • Lambda 运行路由、丰富和修复代码,可以自动扩展以处理每秒数千个事件

这种设置在事件发生时进行处理,支持选择性过滤,并通过重试或DLQ处理故障。由于按调用计费,即使在大规模情况下成本也保持较低。

架构概述

以下是基本的告警流程:

来源

  • 原生AWS事件(例如S3对象创建、ECS任务停止、Step Functions失败)
  • 更改状态的CloudWatch告警
  • 通过PutEvents或API目标发送的自定义应用程序事件

处理

Lambda首先检查事件结构,然后添加一些标签,如关联ID、租户和严重性。如果已经看到事件ID或域键,则跳过它。

扇出操作

  • 分页:对于Sev 1或Sev 2事件,触发SNS通知或在事件管理工具中打开票据
  • 聊天:在Slack或Teams中快速发送消息,让人们知道发生了什么并可以快速介入
  • 可观察性:将指标发送到CloudWatch,并将原始事件转储到S3,便于以后深入分析

实践:构建简单告警管道

我们将从简单的事件流开始,逐步添加路由、通知、可观察性等组件。

步骤1:创建EventBridge规则

从S3 ObjectCreated事件的EventBridge规则开始,过滤到incoming/前缀:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["vishal-alerts-bucket"]
    },
    "object": {
      "key": [{
        "prefix": "incoming/"
      }]
    }
  }
}

将此规则定位到Lambda函数,并附加SQS DLQ以捕获任何无法传递的事件。

步骤2:编写Lambda

以下是使用Python编写的最小化、幂等的Lambda路由器。它检测重复项,丰富传入事件,然后按严重性路由它们。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
log = logging.getLogger()
log.setLevel(logging.INFO)

sns = boto3.client("sns")
ddb = boto3.client("dynamodb")

TOPIC_ARN = os.environ["ALERT_TOPIC_ARN"]
IDEM_TABLE = os.environ["IDEM_TABLE"]
IDEM_TTL_SECONDS = int(os.environ.get("IDEM_TTL_SECONDS", "21600"))  # 6h

def _now_epoch():
    return int(time.time())

def _ttl_epoch(seconds):
    return _now_epoch() + seconds

def is_duplicate(correlation_id: str) -> bool:
    # 条件放置;如果项目已存在,我们知道它是重复的
    try:
        ddb.put_item(
            TableName=IDEM_TABLE,
            Item={
                "correlationId": {"S": correlation_id},
                "created_at": {"N": str(_now_epoch())},
                "expires_at": {"N": str(_ttl_epoch(IDEM_TTL_SECONDS))}
            },
            ConditionExpression="attribute_not_exists(correlationId)"
        )
        return False
    except ClientError as e:
        if e.response["Error"]["Code"] in ("ConditionalCheckFailedException",):
            return True
        raise

def derive_servity(evt_detail) -> str:
    # 示例启发式:如果未提供,从详细信息内容派生严重性
    src = evt_detail.get("bucket", {}).get("name", "")
    key = evt_detail.get("object", {}).get("key", "") or ""
    # 示例规则
    if key.startswith("incoming/critical/"):
        return "high"
    if key.endswith(".err") or key.endswith(".failed"):
        return "high"
    return "info"

def handler(event, context):
    # EventBridge信封(每次调用单个事件)
    log.debug(json.dumps(event))
    eid = event.get("id")
    detail = event.get("detail") or {}
    sev = (detail.get("severity") or derive_servity(detail)).lower()
    tenant = detail.get("tenant") or "unknown"
    log.info(json.dumps({"received": True, "correlationId": eid, "severity": sev, "tenant": tenant}))

    # 关联ID:优先使用EventBridge event.id;否则使用详细信息的确定性哈希
    correlation_id = eid or hashlib.md5(json.dumps(detail, sort_keys=True).encode()).hexdigest()

    if is_duplicate(correlation_id):
        log.info(json.dumps({"skipped": "duplicate", "correlationId": correlation_id}))
        return {"skipped": "duplicate"}

    msg = {
        "correlationId": correlation_id,
        "tenant": tenant,
        "severity": sev,
        "event": detail
    }

    # 发出EMF指标(如果提供DurationMs示例)
    if "job" in detail and "durationMs" in detail:
        log.info(json.dumps({
          "_aws": {
            "Timestamp": int(time.time() * 1000),
            "CloudWatchMetrics": [{
              "Namespace": "App/ETL",
              "Dimensions": [["job","tenant"]],
              "Metrics": [{"Name": "DurationMs", "Unit": "Milliseconds"}]
            }]
          },
          "job": detail["job"],
          "tenant": tenant,
          "DurationMs": float(detail["durationMs"])
        }))

    try:
        if sev in ("critical", "high", "sev1", "sev2"):
            sns.publish(
                TopicArn=TOPIC_ARN,
                Message=json.dumps(msg),
                Subject=f"[{tenant}] {sev.upper()} event"
            )
        # else: 您可以在这里推送到SQS/Firehose/Webhook

        log.info(json.dumps({"ok": True, "correlationId": correlation_id}))
        return {"ok": True}
    except ClientError as e:
        log.error(json.dumps({"error": str(e), "correlationId": correlation_id}))
        raise

提示:使用具有短TTL的DynamoDB或Redis实现is_duplicate(),以防止对同一事件重新告警。

步骤3:将告警路由到正确的渠道

每个渠道都有其用途:

  • 电子邮件:适用于每日摘要或低优先级提醒
  • 聊天(Slack或Teams):最适合实时分类。放入关联ID或运行手册链接,以便人们可以直接介入
  • 分页:仅用于严重事件,如Sev1或Sev2

步骤4:持久化指标和事件以实现可观察性

CloudWatch指标(EMF)

您可以使用嵌入式指标格式(EMF)直接从Lambda记录结构化指标。

为以下内容创建告警:

  • 错误计数
  • 持续时间(P95延迟)
  • 心跳指标(检测静默故障)

将缺失数据视为违规。没有数据并不意味着没有问题。

将原始事件流式传输到S3

使用Firehose以Parquet格式将原始事件持久化到S3,分区如: dt=YYYY-MM-DD/tenant=acme/

然后使用AWS Glue进行爬网,通过Athena查询,或在QuickSight中可视化数量和错误率。为了更快地查找,将热字段(如租户、严重性)镜像到OpenSearch。

SLA心跳检查器(DynamoDB + 调度器)

为每个作业存储心跳记录,EventBridge调度程序每几分钟运行一次,触发Lambda检查过期作业,并在错过SLA时发送单个合并告警。

步骤5:测试、演练和部署

在部署之前,先破坏一些东西:

  • 触发一些假的S3事件,并从头到尾跟踪correlationId
  • 故意终止目标。确保EventBridge和Lambda DLQ都捕获未通过的内容
  • 通过CI/CD推送部署,不要忘记EventBridge规则、IAM策略和Lambda包本身
  • 为DLQ增长或错误指标添加告警
  • 定期运行"消防演练"。测试告警管道的最佳时间是在中断之前

如何扩展

基础设置就绪后,可以开始添加一些额外功能:

  • 异常检测:连接SageMaker的Random Cut Forest以捕获运行时间或错误率的异常峰值
  • 工作流遥测:让每个阶段(摄取、转换、加载)发送事件。如果其中一个从未将控制权传递给下一个,则触发告警
  • 跨账户设置:将多个AWS账户指向一个共享的EventBridge总线,使它们都将告警放入同一管道
  • 多租户扩展:为每个租户提供自己的总线和DLQ
  • 成本控制:在总线级别过滤或采样低优先级事件,并在存储成本上升之前将旧数据移动到Glacier

经验教训

经过多次迭代,一些事情脱颖而出:

有效的方法

  • 三层设计(指标、原始事件、心跳)早期捕获大多数问题
  • 将缺失数据视为告警条件
  • 将告警合并到线程或事件中

无效的方法

  • 仅依赖电子邮件告警,您可能会发现自己需要筛选大量不相关的电子邮件才能找到相关的那一封
  • 过度分页会产生持续噪音,并在真正出现严重问题时削弱紧迫感
  • 忽略DLQ运行状况,未处理的事件会悄悄堆积

安全和合规性说明

保持事件系统安全和合规:

  • 在EventBridge、Lambda、SNS和其他服务上强制执行最小权限IAM
  • 使用KMS进行加密(EventBridge、S3、DynamoDB)
  • 将聊天和票据集成令牌存储在AWS Secrets Manager中,而不是环境变量中
  • 避免将PII发送到聊天或日志,仅发送调试所需的内容

结论:更简单、更智能的告警骨干

使用EventBridge和Lambda,您可以构建一个能够快速响应、在必要时扩展并在其余时间保持安静的告警设置。

从小处开始:

  • 一个流程,如S3 → EventBridge → Lambda → SNS
  • 添加指标和心跳
  • 随着增长,逐步添加仪表板、异常检测和跨账户路由

事件驱动告警不是"可有可无"的功能。它让团队在用户注意到问题之前就能得到预警。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计