构建可扩展媒体智能平台的实时数据摄取架构

本文介绍了一个基于Spring Boot、Kafka和Elasticsearch的AI增强微服务流式架构,能够每天处理864万篇实时媒体文章,实现了从传统ETL模型到智能流式架构的转变。

可扩展媒体智能平台的实时数据摄取架构

在24/7全天候媒体和持续数字噪音的时代,实时处理信息并采取行动的能力至关重要。对于任何旨在监控、分类和增强媒体内容的系统来说,可扩展的摄取管道是骨干。本文概述了一个重新设计的实时摄取管道,成功扩展到每天处理超过800万篇文章,展示了从传统ETL模型到AI增强流式架构的转变。

问题空间:高速媒体流

媒体监控平台必须从无数提供商处吸收多样化的内容格式,并近乎实时地对它们进行分类。传统的单体系统或批处理ETL作业无法满足此类延迟和可靠性需求。

挑战在于构建一个容错、高可用且智能的摄取架构,能够:

  • 每天摄取数百万篇文章
  • 使用强大的匹配规则对它们进行分类
  • 使用语义AI增强可发现性

解决方案:微服务架构

我们采用微服务架构来确保可扩展性、容错性和模块化。通过将管道分解为独立服务,我们提高了可维护性,并能够根据工作负载实现无缝扩展。

为应对这些挑战,我们采用了微服务架构,将数据摄取管道分解为三个关键服务:

  • 调度器服务:从内容提供商处检索文章并将其推送到Kafka主题
  • 渗透器服务:从Kafka消费文章,并使用Elasticsearch查询将它们映射到相关类别
  • 监听器服务(AI赋能器):使用OpenAI嵌入丰富文章,以改进语义搜索能力

系统架构

以下图示说明了从第三方提供商通过Kafka到处理和丰富阶段的消息流。

1. 调度器服务

调度器服务充当数据摄取管道的入口点,负责按计划间隔从内容提供商的API获取文章。该服务的一个关键方面是能够处理批处理并确保失败记录得到重试。

关键功能

  • 按计划间隔(例如每5秒)调用内容提供商API
  • 以批次接收文章(例如每批500篇文章)
  • 将原始数据持久化到MongoDB进行临时存储和可审计性
  • 将整个批次作为列表发布到Kafka主题(articles-topic)
  • 为未能完成完整管道处理的文章实现重试机制

实现代码片段

1
2
3
4
5
6
@Scheduled(fixedRate = 5000)
public void fetchArticles() {
    List<Article> articles = contentProviderClient.fetchArticles();
    articleRepository.saveAll(articles);
    kafkaTemplate.send("articles-topic", articles);
}

此代码片段演示了Spring Boot中的@Scheduled注解如何用于以固定速率触发fetchArticles方法。然后该方法获取文章,将它们保存到MongoDB,并将它们发布到Kafka。

2. 渗透器服务:实时分类

渗透器服务是分类过程的核心。它利用Elasticsearch的渗透器功能,基于Lucene布尔查询将传入文章映射到预定义类别。

关键功能

  • 从Kafka的articles主题消费文章
  • 将转换后的数据保存到临时Elasticsearch索引
  • 获取类别映射规则,这些规则作为Lucene布尔字符串存储在Elasticsearch中
  • 在Elasticsearch中执行UpdateByQuery API,将文章映射到适当的类别
  • 从临时索引检索更新的文章并将它们移动到主文章索引
  • 处理后清除临时索引以保持效率
  • 将处理后的文章发布到另一个Kafka主题(processed-articles-topic)以进行进一步的AI增强

实现代码片段

1
2
3
4
5
6
7
public void processArticles(List<Article> articles) {
    saveToElasticsearchTempIndex(articles);
    List<String> categories = fetchCategoryMappings(articles);
    updateArticlesWithCategories(articles, categories);
    moveArticlesToMainIndex(articles);
    kafkaTemplate.send("processed-articles-topic", articles);
}

此代码片段概述了渗透器服务中处理文章的核心步骤。它将文章保存到临时索引,获取类别映射,用类别更新文章,将它们移动到主索引,并将它们发布到Kafka。

3. 监听器(AI赋能器)服务:语义丰富

监听器服务使用OpenAI增强文章的语义嵌入,实现更复杂的搜索能力。

关键功能

  • 从Kafka的processed-articles-topic消费文章
  • 使用OpenAI的API为每篇文章生成语义嵌入,但仅当文章满足特定条件时(例如最小长度、特定关键词)
  • 使用生成的嵌入更新Elasticsearch索引
  • 通过在MongoDB中更新处理状态来跟踪AI丰富情况

实现代码片段

1
2
3
4
5
6
7
public void processAIEnhancements(Article article) {
    if (shouldGenerateEmbedding(article)) {
        String embedding = openAiClient.generateEmbedding(article.getContent());
        elasticsearchClient.updateEmbedding(article.getId(), embedding);
        articleRepository.updateStatus(article.getId(), "AI Enhanced");
    }
}

此片段显示了监听器服务如何有条件地使用OpenAI为文章生成嵌入,使用这些嵌入更新Elasticsearch,并更新文章在MongoDB中的状态。

技术栈

  • Spring Boot:轻松高效地构建微服务
  • Kafka:实时数据流式传输和解耦服务
  • MongoDB:临时存储原始文章并跟踪处理状态
  • Elasticsearch 8.x:索引文章、存储类别映射和执行渗透器查询
  • OpenAI嵌入:生成语义嵌入以增强搜索能力

成果和影响

新的实时数据摄取管道带来了显著改进:

  • 可扩展性:系统现在每天摄取864万篇文章,与之前的系统相比大幅增加
  • 可靠性:重试机制确保即使在面临瞬时错误时,文章也能完全处理
  • 可搜索性:文章准确映射到多个类别,提高了内容可发现性
  • AI驱动发现:OpenAI嵌入显著增强了语义搜索,允许用户基于含义而不仅仅是关键词找到相关文章

最终思考:用于实时洞察的智能管道

本案例研究展示了精心设计的微服务架构,结合Spring Boot、Kafka和Elasticsearch等正确技术,如何能够实现大规模实时内容摄取和分类。AI驱动的语义嵌入进一步增强了摄取数据的价值,使其成为媒体监控和公关分析的强大资产。这种方法提供了一个强大、可扩展且智能的内容处理管道,使现代媒体组织在持续信息过载的时代保持竞争优势。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计