基于Oracle CDC、Flink和MongoDB的实时AI产品推荐系统

本文详细介绍了如何利用Oracle CDC、Kafka、Flink、OpenAI和MongoDB构建实时个性化推荐引擎,包括数据流处理、语义搜索和向量嵌入技术实现。

基于Oracle CDC、Flink和MongoDB的实时AI产品推荐系统

我将展示如何使用Oracle、Kafka、Flink、OpenAI和MongoDB从零开始构建实时个性化引擎。

计划周末徒步旅行?River Runners为您提供轻便裤、越野鞋,以及现在异常准确的产品推荐。 好吧,River Runners并不是真实的。这是我创建的一个虚构户外跑步公司,用来展示实时AI如何将任何商店转变为智能和个性化的体验。这种体验让网站似乎在你需要之前就知道你需要什么。

但在幕后,这个演示解决了一个实际问题:如何从传统的Oracle数据库中获取最新数据,并将其传递给现代AI系统?并且如何足够快地推荐合适的装备,在用户点击的瞬间?

在这篇文章中,我将带您了解如何使用Oracle、Kafka、Flink、OpenAI和MongoDB从零开始构建实时个性化引擎。

注意:想直接查看GitHub?在这里找到它。

真正的问题:过时的个性化

当我构建River Runners时,我不希望它仅仅看起来像一个现代的在线跑步商店,我希望它感觉智能。就像它实际上知道你喜欢什么,并能快速帮你找到它。

这比听起来更难。

许多个性化系统停留在过去。它们依赖于批处理ETL管道,在客户执行操作数小时或数天后才移动数据。点击在夜间处理。产品更新需要很长时间才能显示。推荐通常基于某人上周喜欢的内容,而不是他们刚刚查看的内容。

而这已经不再适用。

人们期望实时建议。如果我正在浏览运动鞋,立即向我展示跑步袜,而不是明天。如果我刚刚点击了越野鞋,不要向我展示皮划艇。标准已经提高,静态推荐引擎无法跟上。

真正的问题是架构性的。大多数系统围绕从未设计用于实时处理的数据库构建。Oracle是一个完美的例子。它无处不在——在各行业运行关键任务系统——但它并非为个性化而设计。

然而,这并不意味着它无用。

通过正确的方法,您甚至可以将传统的Oracle数据库转变为快速、响应迅速的AI系统的核心。但您必须改变使用方式。这意味着跳过夜间批处理作业,并在更改发生时流式传输它们。这意味着将数据视为实时信号,而不是历史文物。

这正是我为River Runners设定的目标:从传统数据库开始,构建一些感觉像魔法的东西。

那么,如何使其工作?

我们正在构建的内容:用于产品个性化的实时语义搜索

这个项目的核心是一个简单目标:基于某人的浏览行为及其与其他产品的关系,提供实际上感觉相关的产品推荐。

为此,我使用语义搜索,由一个实时管道提供支持,该管道看起来很像检索增强生成(RAG)。

RAG通常意味着将相关上下文输入语言模型以生成响应。但在这种情况下,我不使用LLM来回答问题。我使用它来丰富产品数据并生成嵌入,然后将其存储在向量数据库中。

因此,虽然我在前端没有进行完整的RAG,但支持它的管道正在进行RAG风格的工作:CDC → 丰富 → 嵌入 → 实时交付。

没有REST调用。没有批处理作业。只是实时产品数据转换为AI(和您的应用程序)实际上可以即时使用的东西。

让我们看看系统如何工作。

系统内部

将产品数据从Oracle转换为实时推荐需要几个关键步骤。主要思想是将数据视为活的东西,流经系统,而不是我们稍后批处理和清理的东西。

以下是River Runners管道的工作原理,端到端:

添加新产品数据

当新产品数据添加到Oracle数据库时,过程开始。在data-ingestion/项目目录中提供了一个脚本,用于为演示添加虚假产品数据。

从Oracle流式传输更改

一旦添加新数据,Oracle XStream CDC源连接器实时捕获更改并将其推送到Kafka。该连接器利用Oracle的XStream Out从传统的基于批处理的系统中提供高吞吐量、低延迟的流式传输。

Kafka作为骨干

每个更改、产品更新、用户点击、丰富结果都通过Kafka主题流动。Kafka充当连接系统不同部分的中央事件总线。

产品丰富

HTTP接收器连接器监听PROD.SAMPLE.RUNNING_PRODUCTS主题,并将传入的产品记录转发到丰富服务。该服务抓取产品网站并添加其他产品详细信息。

实时丰富和嵌入

Apache Flink处理数据。它使用OpenAI模型在流中处理丰富和嵌入。

Flink作业1:定义产品描述模型

此作业设置一个使用GPT-4将结构化产品数据转换为清晰、自然描述的模型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE MODEL product_description_model
INPUT(message STRING)
OUTPUT(response STRING)
WITH (
  'provider' = 'openai',
  'task' = 'text_generation',
  'openai.connection' = 'openai-connection',
  'openai.model_version' = 'gpt-4',
  'openai.system_prompt' = 'You are a helpful AI assistant that specializes in writing product descriptions...'
);

Flink作业2:为每个产品生成摘要

此SQL语句将模型应用于传入的产品数据,并将摘要写入product_as_documents主题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
INSERT INTO product_as_documents
SELECT
  CAST(CAST(ID AS STRING) AS BYTES) AS `key`,
  ID,
  product_summary.response
FROM enriched_running_products
CROSS JOIN LATERAL TABLE (
  ml_predict(
    'product_description_model',
    CONCAT_WS(
      ' ',
      'Name: ', `NAME`,
      'Rating: ', CAST(`RATINGS` AS STRING),
      'Price: ', `ACTUAL_PRICE`,
      'About: ', REGEXP_REPLACE(CAST(about_this_item AS STRING), '\\[|\\]', ''),
      'Product Description: ', product_description,
      'Product Details: ', REGEXP_REPLACE(CAST(product_details AS STRING), '\\{|\}', '')
    )
  )
) AS product_summary;

Flink作业3:生成向量嵌入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE MODEL vector_encoding
INPUT (input STRING)
OUTPUT (vector ARRAY<FLOAT>)
WITH (
  'TASK' = 'embedding',
  'PROVIDER' = 'openai',
  'OPENAI.CONNECTION' = 'openai-embedding-connection'
);

INSERT INTO product_embeddings
SELECT
  CAST(CAST(id AS STRING) AS BYTES) AS `key`,
  embedding,
  id,
  product_summary
FROM product_as_documents,
LATERAL TABLE (
  ml_predict(
    'vector_encoding',
    product_summary
  )
) AS T(embedding);

启用语义搜索

嵌入存储在MongoDB中,该数据库配置了向量索引。这使得能够快速查找语义相似的项目,因此当用户点击产品时,我们可以轻松地基于含义找到相关产品。

跟踪点击和推荐产品

当用户与网站交互时,点击事件流式传输到Kafka。这些用于基于点击产品的嵌入触发语义搜索查询。表示最后一组客户交互的向量被平均,并使用平均向量搜索向量存储,基于语义搜索和历史交互找到最相关的产品。

前端显示

Next.js应用程序充当店面。它显示产品目录,跟踪交互,并根据Oracle中的产品详细信息、MongoDB中的嵌入和搜索结果实时显示个性化推荐。

就是这样。没有批处理作业。没有每日重建。只是一个实时管道,从Oracle流式传输产品数据,丰富它,嵌入它,并将其转换为前端的实时推荐。

为什么这很重要以及下一步是什么

大多数推荐系统仍然在延迟上运行。它们依赖于夜间作业、陈旧数据和不适应的规则。如果您在图书馆推荐书籍,那没问题。但在现实世界中,人们点击、滚动并期望快速响应,这就崩溃了。

这个项目表明您可以做得更好。您可以从传统的Oracle数据库开始,仍然构建一些实时响应的东西。提出更智能的建议。感觉它实际上在关注。

而且不必止步于此。

您可以:

  • 让用户通过点击“不感兴趣”或“显示更多类似内容”来塑造推荐
  • 添加简单的用户配置文件以随时间推移引导结果
  • 跟踪浏览会话,而不仅仅是单个点击
  • 构建代理,标记低库存物品或显示您忽略的产品
  • 重用相同模式用于支持机器人、价格警报或内部工具

重点不是花哨。而是有用。一旦您掌握了基础知识,流式传输更改、实时处理它们以及按含义搜索,您就可以在上面构建任何您需要的东西。

River Runners是一个虚构的商店。但其背后的想法是真实的,您今天就可以构建它。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计