使用AWS OpenSearch、EventBridge和WebSockets构建实时仪表板

本文详细介绍如何使用AWS服务构建无服务器实时仪表板,通过EventBridge事件流、OpenSearch数据索引和WebSocket实时推送,消除轮询机制,实现大规模即时数据更新。

构建实时仪表板:AWS OpenSearch、EventBridge和WebSockets

如果您尝试过构建仪表板,那么您一定熟悉轮询的麻烦。您每隔几秒调用一次API,获取更新,并祈祷数据不会变得过时。但老实说,轮询效率低下、浪费资源且过时。在现代时代,用户期望数据是动态流动的。我们作为开发人员,应该在不拖垮服务器的情况下满足这一期望。

在本文中,我将带您了解一种无服务器、事件驱动的架构,我利用这种架构使用AWS构建了实时仪表板。该架构将EventBridge、OpenSearch和API Gateway WebSockets与一些Lambda和DynamoDB结合在一起。到最后,您将了解如何将所有部分组合在一起,创建一个可以扩展、成本友好且对最终用户来说真正快速的实时仪表板数据管道。

让我们开始吧!

为什么不直接轮询?

传统仪表板依赖于每隔几秒对某个数据库进行查询。虽然这相当简单,但它有一个主要缺点:

  • 延迟:数据总是感觉过时,总是落后一步。
  • 成本:每次轮询都会产生过多不必要的查询,冲击您的后端。
  • 用户体验:用户看到过时的图表,他们盯着一个感觉不动态的图表会感到沮丧。

与其强迫UI不断检查“我们到了吗?”,我们改变这一点——事件将在发生时推送到仪表板。

现在我们考虑AWS三件套:

  • Amazon EventBridge – 架构的骨干,因为它捕获领域事件。
  • Amazon OpenSearch Service – 提供基于事件查询的快速索引和返回。
  • Amazon API Gateway (WebSocket) + Lambda + DynamoDB – 提供一个实时通信层,将更新实时推送到每个客户端。

听起来不错吧?让我们回顾一下架构是如何协同工作的。

架构概述

下图显示了事件如何通过架构推送的端到端流程:

  1. 下游服务发出一个事件 → EventBridge捕获该事件。
  2. EventBridge将事件路由到索引Lambda,该Lambda对事件进行规范化并将其存储到OpenSearch。
  3. 事件索引成功后,Lambda创建一个“增量”事件回到EventBridge。
  4. EventBridge中的该事件触发广播Lambda,该Lambda在DynamoDB中查找活动的WebSocket连接。
  5. 广播Lambda通过API Gateway WebSockets将更新推送到客户端。
  6. 客户端立即呈现更改——无需刷新,无需轮询。

流程的图示(非常ASCII):

1
2
3
4
5
6
7
Service → EventBridge → Indexing Lambda → OpenSearch
EventBridge (delta)
Broadcast Lambda
API Gateway (WebSocket) → Clients (UI)

步骤1:将数据索引到OpenSearch

在将事件推送到仪表板之前,最好有一个索引策略。以下是一个索引模板的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
  "index_patterns": ["metrics-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1
    },
    "mappings": {
      "dynamic": "false",
      "properties": {
        "@timestamp": { "type": "date" },
        "service": { "type": "keyword" },
        "eventType": { "type": "keyword" },
        "latencyMs": { "type": "long" },
        "message": {
          "type": "text",
          "fields": {
            "raw": { "type": "keyword" }
          }
        }
      }
    }
  }
}

我通过艰难方式学到的一些最佳实践:

  • 对文档使用稳定的ID(eventId)以消除重复。
  • 使用索引状态管理(ISM)按日轮转索引。
  • 使用ISM在14天(或您想要的任何时间)后自动使过时数据过期。

为什么?因为仪表板不是数据湖。您应该能够查询所需的数据,而不必翻阅可能很大的索引。

步骤2:WebSocket的基础设施

首先,您需要一个无服务器的WebSocket API。我们将使用API Gateway创建WebSocket API。您需要关心三个重要的路由:

  • $connect – 这将在DynamoDB中保存connectionId。
  • $disconnect – 在客户端终止时移除它。
  • $default – 这只需要返回来自客户端的可选消息(例如,当用户订阅频道时)。

以下是DynamoDB表设置的示例(通过SAM/CDK):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Resources:
  ConnTable:
    Type: AWS::DynamoDB::Table
    Properties:
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: connectionId
          AttributeType: S
      KeySchema:
        - AttributeName: connectionId
          KeyType: HASH

对于每个活动客户端,我们在此表中放入一行。这就是广播Lambda知道谁在线的方式。

步骤3:广播事件

一旦新事件被索引,我们就开始行动。我们毫不犹豫地将其发送给所有活动客户端。

这是一个示例Lambda函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import { DynamoDBClient, ScanCommand } from "@aws-sdk/client-dynamodb";
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from "@aws-sdk/client-apigatewaymanagementapi";

const ddb = new DynamoDBClient();
const api = new ApiGatewayManagementApiClient({ endpoint: process.env.WS_ENDPOINT });
const TABLE = process.env.TABLE;

export const broadcast = async (event) => {
  const payload = event.detail || event;
  
  const conns = await ddb.send(new ScanCommand({ TableName: TABLE }));
  const targets = conns.Items.map(i => i.connectionId.S);

  const msg = JSON.stringify({
    type: "hits",
    hits: payload.hits || [payload.doc]
  });

  await Promise.allSettled(
    targets.map(id =>
      api.send(
        new PostToConnectionCommand({ ConnectionId: id, Data: Buffer.from(msg) })
      )
    )
  );

  return { sent: targets.length };
};

如您所见:

  • 扫描DynamoDB以获取活动连接。
  • 将有效负载序列化为JSON消息。
  • 使用Promise.allSettled处理多个承诺,以便如果一个连接中断,批处理的其余部分仍然通过。

步骤4:客户端

让我们在这里保持极其简单:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<script>
  const ws = new WebSocket('wss://<api-id>.execute-api.<region>.amazonaws.com/prod');

  ws.onmessage = (evt) => {
    const msg = JSON.parse(evt.data);

    if (msg.type === 'hits') {
      console.log("Live update:", msg.hits);
      // 在此处更新您的图表或表格
    }
  };
</script>

这就是让数据活起来所需的全部魔法,没有cron作业,没有重新加载,只有即时反馈。

经验教训(艰难方式)

以下是我希望有人早点告诉我的关键点。

  • 幂等性 – 始终、始终、始终对文档使用稳定的ID以避免重复分配。
  • 不要向所有人发送垃圾邮件 – 利用DynamoDB中的通道属性,这将允许您仅将事件推送给正确的客户端。
  • 存在大小限制 – 每个WebSocket消息限制在32 KB;如果超过,请对有效负载进行批处理或采样。
  • 不要忘记成本 – 在OpenSearch查询中进行过滤,仅通过WebSocket推送增量。

安全第一

  • 将Lambda授权器附加到$connect以验证JWT或API密钥。
  • 您考虑过吗;您真的希望每个用户都能看到每个事件吗?或者他们更愿意按团队、服务或位置过滤事件?

常见问题解答

问:没有OpenSearch这个能工作吗? 是的,您可以直接将原始事件推送到客户端。但仪表板界面通常需要查询、过滤和分析。OpenSearch可以做到这一点。

问:API Gateway可以处理多少个WebSocket客户端? 数万个。对于非常大规模的场景,对连接进行分片并分布到多个WebSocket API端点。

问:推送失败的重试怎么办? 在您的广播Lambda中有DLQ(死信队列)或重试。您应该获取任何失败的连接并迅速从DynamoDB中移除它们。

问:对于小型应用程序来说这是否过度设计? 老实说。是的。如果您正在构建一个业余项目,每五秒轮询一次是可以的。一旦您开始向用户或仪表板上的用户提供实时指标,这就值得了。

结论

借助EventBridge、OpenSearch和WebSockets,我们可以摆脱轮询,构建感觉实时活跃的仪表板。最重要的是,这个堆栈是完全无服务器的,可以随您的流量扩展,而无需您操心任何事情。

下次有人要求您将指标、日志或KPI流式传输到仪表板界面时,请考虑使用此管道而不是cron作业。

那么您怎么看,您会用这个选项替换仪表板中的轮询吗?或者您觉得可能存在我没有提到的挑战?

无论如何,我很想听听您如何将这种体验应用到您自己的项目中。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计