使用Logstash实现PostgreSQL到Elasticsearch的数据摄取
什么是Logstash?
Logstash是Elastic提供的开源数据处理管道工具,用于摄取、转换和将数据发送到不同源,包括Elasticsearch、Kafka、平面文件等。
Logstash管道包含三个不同过程:
- 输入:数据来源,用于收集要摄取的数据
- 过滤器:使用Grok、Mutate、Date等插件转换(清理、聚合等)数据
- 输出:摄取目的地(Elasticsearch、平面文件、数据库等)
将数据使用Logstash发送到Elasticsearch的先决条件:
- 系统上安装了Logstash和Postgres的JDBC驱动程序
- 具有要同步的表或函数的Postgres数据库
- 正在运行的Elasticsearch实例
Logstash设置(Windows)
以下是本地安装和运行Logstash的简要步骤:
1. 安装Java
从官方Oracle网站下载JDK包(Java 8或更高版本)。下载完成后,将文件解压缩到首选位置。
添加环境变量:
- 新建变量
JAVA_HOME,指向Java文件所在目录 - 将
%JAVA_HOME%\bin添加到Path中
验证安装:
|
|
2. 安装Logstash
从官方Elastic网站下载包并解压缩到首选位置。
测试安装:
|
|
Logstash摄取管道
1. 安装所需的JDBC驱动程序
从官方PostgreSQL网站下载Postgres驱动程序,将jar文件放在可访问的位置。
2. 创建Logstash管道
示例管道配置:
|
|
上述管道用于增量摄取,跟踪最后一次运行并从最后一次运行开始记录,按计划摄取数据。
关键概念说明:
输入配置:
jdbc_driver_library- JDBC驱动程序文件(.jar)的存储位置jdbc_driver_class- 使用的驱动程序类jdbc_connection_string- Postgres数据库连接字符串jdbc_user- 数据库用户名jdbc_password- 用户密码paging- 数据将以多页形式发送,页面大小为1000,提高管道性能并帮助跟踪发送到Elasticsearch的记录数schedule- 管道每分钟运行一次statement- 管道将执行的SQL语句
增量摄取配置:
|
|
use_column_value设置为true,让Logstash跟踪updated_at列的实际值- 最后一次运行时间将保存在
last_run_metadata_path指定的文件中
过滤器 这是可选的部分,用于在将数据发送到目的地之前操作数据。
在上述管道中,日期字段从摄取中移除,并将数据中的first_name发送到目的地中的name字段。
输出 定义数据的目的地。在本例中,是Elasticsearch端点、授权密钥、elastic索引和document_id。
document_id是索引中elastic文档的唯一标识符。如果未定义此字段,Elasticsearch将自动为文档分配唯一标识符。
在增量摄取的情况下,建议定义此字段。在摄取期间,Elasticsearch将在索引中查找此字段;如果匹配,它将更新同一文档。如果未定义该字段,它会在索引中创建新文档,导致重复记录。
运行管道
打开命令提示符,转到Logstash文件夹,运行以下命令:
|
|
Elasticsearch索引的输出示例:
|
|
优势与劣势
优势:
- Logstash是开源工具,易于实现
- 有200多个可用于数据转换的插件
- 数据源和Elasticsearch之间的解耦架构
- 与Elasticsearch无缝集成
劣势:
- 延迟问题:不适用于需要极低延迟或实时数据的应用程序
- 错误处理:除非显式监控,否则难以跟踪错误,可能导致数据丢失
- 如果管道定义不当,可能创建重复项
- 与其他工具相比,启动时间较长
- 使用YAML样式配置文件,可能复杂且难以维护
- 资源利用率:在重负载和复杂管道下可能使用更多资源
结论
上述管道适用于需要更强大和集中式数据流管道的场景,但不适合实时数据传送。