基于Oracle CDC、Flink与MongoDB的AI驱动实时产品推荐系统

本文详细介绍了如何利用Oracle CDC、Kafka、Flink、OpenAI和MongoDB构建端到端的实时产品推荐引擎,实现从传统数据库到现代AI系统的实时数据流处理与语义搜索。

AI驱动的产品推荐:基于Oracle CDC、Flink与MongoDB

我将展示如何利用Oracle、Kafka、Flink、OpenAI和MongoDB从零开始构建实时个性化推荐引擎。

真实问题:过时的个性化系统

许多个性化系统仍依赖于批处理ETL管道,数据在客户行为发生数小时甚至数天后才被处理。这种静态推荐引擎无法满足用户对实时建议的期望。

系统架构:实时语义搜索产品个性化

核心目标是基于用户浏览行为与商品关联性提供真正相关的产品推荐。采用语义搜索技术,通过实时管道实现类似检索增强生成(RAG)的工作流程:

CDC → 数据增强 → 嵌入生成 → 实时交付

系统内部工作流程

  1. 新增产品数据

    • 产品数据首先被添加到Oracle数据库
  2. Oracle变更数据捕获

    • Oracle XStream CDC源连接器实时捕获变更并推送到Kafka
    • 实现从传统批处理系统到高吞吐、低延迟流处理的转换
  3. Kafka作为骨干

    • 所有变更、产品更新、用户点击都通过Kafka主题流动
    • 作为连接系统各部分的中央事件总线
  4. 产品数据增强

    • HTTP接收器连接器监听产品主题并将记录转发到增强服务
    • 服务爬取产品网站并添加额外产品详情
  5. 实时增强与嵌入生成

    • Apache Flink处理数据流,使用OpenAI模型进行增强和嵌入

Flink作业1:定义产品描述模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE MODEL product_description_model
INPUT(message STRING)
OUTPUT(response STRING)
WITH (
  'provider' = 'openai',
  'task' = 'text_generation',
  'openai.connection' = 'openai-connection',
  'openai.model_version' = 'gpt-4',
  'openai.system_prompt' = 'You are a helpful AI assistant that specializes in writing product descriptions...'
);

Flink作业2:为每个产品生成摘要

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
INSERT INTO product_as_documents
SELECT
  CAST(CAST(ID AS STRING) AS BYTES) AS `key`,
  ID,
  product_summary.response
FROM enriched_running_products
CROSS JOIN LATERAL TABLE (
  ml_predict(
    'product_description_model',
    CONCAT_WS(
      ' ',
      'Name: ', `NAME`,
      'Rating: ', CAST(`RATINGS` AS STRING),
      'Price: ', `ACTUAL_PRICE`,
      'About: ', REGEXP_REPLACE(CAST(about_this_item AS STRING), '\\[|\\]', ''),
      'Product Description: ', product_description,
      'Product Details: ', REGEXP_REPLACE(CAST(product_details AS STRING), '\\{|\}', '')
    )
  )
) AS product_summary;

Flink作业3:生成向量嵌入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE MODEL vector_encoding
INPUT (input STRING)
OUTPUT (vector ARRAY<FLOAT>)
WITH (
  'TASK' = 'embedding',
  'PROVIDER' = 'openai',
  'OPENAI.CONNECTION' = 'openai-embedding-connection'
);

INSERT INTO product_embeddings
SELECT
  CAST(CAST(id AS STRING) AS BYTES) AS `key`,
  embedding,
  id,
  product_summary
FROM product_as_documents,
LATERAL TABLE (
  ml_predict(
    'vector_encoding',
    product_summary
  )
) AS T(embedding);
  1. 启用语义搜索

    • 嵌入向量存储在配置了向量索引的MongoDB中
    • 支持基于语义的快速相似项查找
  2. 跟踪点击与产品推荐

    • 用户交互事件流式传输到Kafka
    • 基于点击产品的嵌入向量触发语义搜索查询
    • 平均客户交互向量用于向量存储搜索
  3. 前端展示

    • Next.js应用作为店面展示产品目录
    • 实时跟踪交互并显示个性化推荐

系统价值与未来发展

该方案证明了可以从传统Oracle数据库出发,构建能够实时响应、提供智能建议的系统。未来可扩展功能包括:

  • 让用户通过点击"不感兴趣"或"更多类似"来塑造推荐
  • 添加简单用户画像以随时间调整结果
  • 跟踪浏览会话而不仅仅是单次点击
  • 构建代理标记低库存商品或发现被忽视的产品
  • 将相同模式重用于支持机器人、价格警报或内部工具

River Runners虽然是虚构商店,但其背后的技术理念是真实可行的,并且可以立即开始构建。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计