模块化管道迎来重大升级:自定义输出目标全面支持
某数据平台现已正式支持自定义输出目标功能。该功能允许将数据导出到任意目的地,无论是本地文件、云存储、REST API还是自定义系统。这项新能力为将平台集成到数据管道提供了前所未有的灵活性,使用户能够像搭积木一样构建数据流。
管道即积木:插拔式组合架构
该平台的设计理念是让数据管道像乐高积木一样可组合、可互换且易于构建。平台内置了常见数据源、输出目标和转换操作的本地支持。无论从本地目录、某云存储服务还是SQL表提取数据,或是导出到云存储或向量数据库,接口都保持一致的声明式风格。
在大多数情况下,只需一行代码即可切换组件:
数据源示例:
1
2
3
|
flow_builder.add_source(cocoindex.sources.S3(...))
# 单行切换至本地文件
flow_builder.add_source(cocoindex.sources.LocalFile(...))
|
输出目标示例:
1
2
3
4
5
6
7
8
9
10
11
|
doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
)
# 单行切换至其他数据库
doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Qdrant(collection_name=QDRANT_COLLECTION),
primary_key_fields=["id"],
)
|
这种标准化接口不仅加速了迭代过程,还促进了清晰、模块化和可重用的流定义。
🚀 什么是自定义输出目标?
自定义输出目标允许数据流将数据导出到任何位置,超越内置连接器的限制。需要定义两个组件:
- 目标规格 - 配置目标的方式(如设置文件路径或API密钥)
- 目标连接器 - 向目标写入数据的逻辑
1. 目标规格
定义使用自定义目标所需的配置:
1
2
3
|
class CustomTarget(cocoindex.op.TargetSpec):
param1: str
param2: int | None = None
|
2. 目标连接器
实现数据导出逻辑的连接器类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@cocoindex.op.target_connector(spec_cls=CustomTarget)
class CustomTargetConnector:
# 设置方法
@staticmethod
def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey:
"""返回唯一标识此目标实例的持久键"""
...
@staticmethod
def apply_setup_change(
key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None
) -> None:
"""向目标应用设置变更"""
...
# 数据方法
@staticmethod
def mutate(
*all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]],
) -> None:
"""向目标应用数据变更"""
...
|
✨ 示例:将Markdown文件导出为本地HTML
步骤1:文件摄取
1
2
3
4
5
6
7
8
|
@cocoindex.flow_def(name="CustomOutputFiles")
def custom_output_files(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(path="data", included_patterns=["*.md"]),
refresh_interval=timedelta(seconds=5),
)
|
步骤2:文件处理转换
1
2
3
4
5
6
7
8
|
@cocoindex.op.function()
def markdown_to_html(text: str) -> str:
return _markdown_it.render(text)
output_html = data_scope.add_collector()
with data_scope["documents"].row() as doc:
doc["html"] = doc["content"].transform(markdown_to_html)
output_html.collect(filename=doc["filename"], html=doc["html"])
|
步骤3:定义自定义目标
目标规格:
1
2
|
class LocalFileTarget(cocoindex.op.TargetSpec):
directory: str
|
连接器实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@cocoindex.op.target_connector(spec_cls=LocalFileTarget)
class LocalFileTargetConnector:
@staticmethod
def get_persistent_key(spec: LocalFileTarget, target_name: str) -> str:
return spec.directory
@staticmethod
def describe(key: str) -> str:
return f"Local directory {key}"
@staticmethod
def apply_setup_change(
key: str, previous: LocalFileTarget | None, current: LocalFileTarget | None
) -> None:
if previous is None and current is not None:
os.makedirs(current.directory, exist_ok=True)
if previous is not None and current is None:
if os.path.isdir(previous.directory):
for filename in os.listdir(previous.directory):
if filename.endswith(".html"):
os.remove(os.path.join(previous.directory, filename))
os.rmdir(previous.directory)
@staticmethod
def mutate(
*all_mutations: tuple[LocalFileTarget, dict[str, LocalFileTargetValues | None]],
) -> None:
for spec, mutations in all_mutations:
for filename, mutation in mutations.items():
full_path = os.path.join(spec.directory, filename) + ".html"
if mutation is None:
try:
os.remove(full_path)
except FileNotFoundError:
pass
else:
with open(full_path, "w") as f:
f.write(mutation.html)
|
在流中使用:
1
2
3
4
5
|
output_html.export(
"OutputHtml",
LocalFileTarget(directory="output_html"),
primary_key_fields=["filename"],
)
|
运行示例
1
2
|
pip install -e .
cocoindex update --setup main.py
|
实时更新模式:
1
|
cocoindex update --setup -L main.py
|
最佳实践
- 幂等性重要:apply_setup_change()和mutate()方法应可安全多次运行
- 一次准备,多次变更:如需设置(如建立连接),使用prepare()避免重复工作
- 使用结构化类型:支持简单类型、数据类和命名元组