模块化数据管道重大升级:自定义输出目标全面支持

本文详细介绍某数据平台最新支持的自定义输出目标功能,通过Python代码示例展示如何实现本地文件导出、增量更新和数据管道构建,包含完整的架构设计和最佳实践指南。

模块化管道迎来重大升级:自定义输出目标全面支持

某数据平台现已正式支持自定义输出目标功能。该功能允许将数据导出到任意目的地,无论是本地文件、云存储、REST API还是自定义系统。这项新能力为将平台集成到数据管道提供了前所未有的灵活性,使用户能够像搭积木一样构建数据流。

管道即积木:插拔式组合架构

该平台的设计理念是让数据管道像乐高积木一样可组合、可互换且易于构建。平台内置了常见数据源、输出目标和转换操作的本地支持。无论从本地目录、某云存储服务还是SQL表提取数据,或是导出到云存储或向量数据库,接口都保持一致的声明式风格。

在大多数情况下,只需一行代码即可切换组件:

数据源示例:

1
2
3
flow_builder.add_source(cocoindex.sources.S3(...))
# 单行切换至本地文件
flow_builder.add_source(cocoindex.sources.LocalFile(...))

输出目标示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
doc_embeddings.export(
    "doc_embeddings",
    cocoindex.targets.Postgres(),
    primary_key_fields=["id"],
)
# 单行切换至其他数据库
doc_embeddings.export(
    "doc_embeddings",
    cocoindex.targets.Qdrant(collection_name=QDRANT_COLLECTION),
    primary_key_fields=["id"],
)

这种标准化接口不仅加速了迭代过程,还促进了清晰、模块化和可重用的流定义。

🚀 什么是自定义输出目标?

自定义输出目标允许数据流将数据导出到任何位置,超越内置连接器的限制。需要定义两个组件:

  1. 目标规格 - 配置目标的方式(如设置文件路径或API密钥)
  2. 目标连接器 - 向目标写入数据的逻辑

1. 目标规格

定义使用自定义目标所需的配置:

1
2
3
class CustomTarget(cocoindex.op.TargetSpec):
    param1: str
    param2: int | None = None

2. 目标连接器

实现数据导出逻辑的连接器类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@cocoindex.op.target_connector(spec_cls=CustomTarget)
class CustomTargetConnector:
    # 设置方法
    @staticmethod
    def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey:
        """返回唯一标识此目标实例的持久键"""
        ...

    @staticmethod
    def apply_setup_change(
        key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None
    ) -> None:
        """向目标应用设置变更"""
        ...

    # 数据方法
    @staticmethod
    def mutate(
        *all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]],
    ) -> None:
        """向目标应用数据变更"""
        ...

✨ 示例:将Markdown文件导出为本地HTML

步骤1:文件摄取

1
2
3
4
5
6
7
8
@cocoindex.flow_def(name="CustomOutputFiles")
def custom_output_files(
    flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.LocalFile(path="data", included_patterns=["*.md"]),
        refresh_interval=timedelta(seconds=5),
    )

步骤2:文件处理转换

1
2
3
4
5
6
7
8
@cocoindex.op.function()
def markdown_to_html(text: str) -> str:
    return _markdown_it.render(text)

output_html = data_scope.add_collector()
with data_scope["documents"].row() as doc:
    doc["html"] = doc["content"].transform(markdown_to_html)
    output_html.collect(filename=doc["filename"], html=doc["html"])

步骤3:定义自定义目标

目标规格:

1
2
class LocalFileTarget(cocoindex.op.TargetSpec):
    directory: str

连接器实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@cocoindex.op.target_connector(spec_cls=LocalFileTarget)
class LocalFileTargetConnector:
    @staticmethod
    def get_persistent_key(spec: LocalFileTarget, target_name: str) -> str:
        return spec.directory

    @staticmethod
    def describe(key: str) -> str:
        return f"Local directory {key}"

    @staticmethod
    def apply_setup_change(
        key: str, previous: LocalFileTarget | None, current: LocalFileTarget | None
    ) -> None:
        if previous is None and current is not None:
            os.makedirs(current.directory, exist_ok=True)
        if previous is not None and current is None:
            if os.path.isdir(previous.directory):
                for filename in os.listdir(previous.directory):
                    if filename.endswith(".html"):
                        os.remove(os.path.join(previous.directory, filename))
                os.rmdir(previous.directory)

    @staticmethod
    def mutate(
        *all_mutations: tuple[LocalFileTarget, dict[str, LocalFileTargetValues | None]],
    ) -> None:
        for spec, mutations in all_mutations:
            for filename, mutation in mutations.items():
                full_path = os.path.join(spec.directory, filename) + ".html"
                if mutation is None:
                    try:
                        os.remove(full_path)
                    except FileNotFoundError:
                        pass
                else:
                    with open(full_path, "w") as f:
                        f.write(mutation.html)

在流中使用:

1
2
3
4
5
output_html.export(
    "OutputHtml",
    LocalFileTarget(directory="output_html"),
    primary_key_fields=["filename"],
)

运行示例

1
2
pip install -e .
cocoindex update --setup main.py

实时更新模式:

1
cocoindex update --setup -L main.py

最佳实践

  • 幂等性重要:apply_setup_change()和mutate()方法应可安全多次运行
  • 一次准备,多次变更:如需设置(如建立连接),使用prepare()避免重复工作
  • 使用结构化类型:支持简单类型、数据类和命名元组
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计