使用Logstash实现PostgreSQL到Elasticsearch的数据摄取指南

本文详细介绍了如何使用Logstash构建数据摄取管道,将PostgreSQL数据同步到Elasticsearch。包含增量数据同步配置、过滤器使用、性能优化技巧及实际代码示例,帮助开发者掌握企业级数据集成方案。

使用Logstash实现PostgreSQL到Elasticsearch的数据摄取

什么是Logstash?

Logstash是Elastic提供的开源数据处理管道工具,用于摄取、转换和将数据发送到不同源,包括Elasticsearch、Kafka、平面文件等。

Logstash管道包含三个不同过程:

  • 输入:数据来源,用于收集要摄取的数据
  • 过滤器:使用Grok、Mutate、Date等插件转换(清理、聚合等)数据
  • 输出:摄取目的地(Elasticsearch、平面文件、数据库等)

将数据使用Logstash发送到Elasticsearch的先决条件:

  • 系统上安装了Logstash和Postgres的JDBC驱动程序
  • 具有要同步的表或函数的Postgres数据库
  • 正在运行的Elasticsearch实例

Logstash设置(Windows)

以下是本地安装和运行Logstash的简要步骤:

1. 安装Java

从官方Oracle网站下载JDK包(Java 8或更高版本)。下载完成后,将文件解压缩到首选位置。

添加环境变量:

  • 新建变量JAVA_HOME,指向Java文件所在目录
  • %JAVA_HOME%\bin添加到Path中

验证安装:

1
java -version

2. 安装Logstash

从官方Elastic网站下载包并解压缩到首选位置。

测试安装:

1
logstash -e "input { stdin {} } output { stdout {} }"

Logstash摄取管道

1. 安装所需的JDBC驱动程序

从官方PostgreSQL网站下载Postgres驱动程序,将jar文件放在可访问的位置。

2. 创建Logstash管道

示例管道配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
input {
    jdbc {
        jdbc_driver_library => "c:/logstash/jdbc/postgresql.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_connection_string => "${JDBC_HOST}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PWD}"
        jdbc_paging_enabled => true
        jdbc_page_size => 1000
        schedule => "* * * * *"  # 每分钟运行一次
        statement => "SELECT * FROM employee WHERE updated_at > :sql_last_value"
        use_column_value => true
        tracking_column => "updated_at"
        tracking_column_type => "timestamp"
        last_run_metadata_path => "c:/logstash/employee.tracker"
    }
}

filter {
    mutate {
        remove_field => ["date", "@timestamp", "host"]
    }

    # 如果需要解析JSON字段的示例
    json {
         source => "first_name"
         target => "name"
    }
}

output {
    stdout { codec => json_lines }
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "my_table_index"
        custom_headers => {
                "Authorization" => "${AUTH_KEY}"
            }
        document_id => "%{table_id}" # 表中的唯一标识符
        timeout => 120
    }
}

上述管道用于增量摄取,跟踪最后一次运行并从最后一次运行开始记录,按计划摄取数据。

关键概念说明:

输入配置

  • jdbc_driver_library - JDBC驱动程序文件(.jar)的存储位置
  • jdbc_driver_class - 使用的驱动程序类
  • jdbc_connection_string - Postgres数据库连接字符串
  • jdbc_user - 数据库用户名
  • jdbc_password - 用户密码
  • paging - 数据将以多页形式发送,页面大小为1000,提高管道性能并帮助跟踪发送到Elasticsearch的记录数
  • schedule - 管道每分钟运行一次
  • statement - 管道将执行的SQL语句

增量摄取配置:

1
2
3
4
use_column_value => true
tracking_column => "updated_dt"
tracking_column_type => "timestamp"
last_run_metadata_path => "c:/project/logstash/date.tracker"
  • use_column_value设置为true,让Logstash跟踪updated_at列的实际值
  • 最后一次运行时间将保存在last_run_metadata_path指定的文件中

过滤器 这是可选的部分,用于在将数据发送到目的地之前操作数据。

在上述管道中,日期字段从摄取中移除,并将数据中的first_name发送到目的地中的name字段。

输出 定义数据的目的地。在本例中,是Elasticsearch端点、授权密钥、elastic索引和document_id。

document_id是索引中elastic文档的唯一标识符。如果未定义此字段,Elasticsearch将自动为文档分配唯一标识符。

在增量摄取的情况下,建议定义此字段。在摄取期间,Elasticsearch将在索引中查找此字段;如果匹配,它将更新同一文档。如果未定义该字段,它会在索引中创建新文档,导致重复记录。

运行管道

打开命令提示符,转到Logstash文件夹,运行以下命令:

1
bin/logstash -f c:/logstash/sample_pipeline.conf

Elasticsearch索引的输出示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "testing",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "name": "James",
                    "id": 1,
                    "last_name": "Smith",
                    "updated_dt": "2024-12-12T16:10:57.349Z",
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.167442600Z"
                }
            },
            {
                "_index": "testing",
                "_id": "2",
                "_score": 1.0,
                "_source": {
                    "name": "John",
                    "id": 2,
                    "last_name": "Doe",
                    "updated_dt": "2024-12-12T16:10:57.349Z",
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.169021400Z"
                }
            },
            {
                "_index": "testing",
                "_id": "3",
                "_score": 1.0,
                "_source": {
                    "name": "Kate",
                    "id": 3,
                    "last_name": "Williams",
                    "updated_dt": "2024-12-12T16:10:57.349Z",                    
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.170098800Z"
                }
            }
        ]
    }
}

优势与劣势

优势

  • Logstash是开源工具,易于实现
  • 有200多个可用于数据转换的插件
  • 数据源和Elasticsearch之间的解耦架构
  • 与Elasticsearch无缝集成

劣势

  • 延迟问题:不适用于需要极低延迟或实时数据的应用程序
  • 错误处理:除非显式监控,否则难以跟踪错误,可能导致数据丢失
  • 如果管道定义不当,可能创建重复项
  • 与其他工具相比,启动时间较长
  • 使用YAML样式配置文件,可能复杂且难以维护
  • 资源利用率:在重负载和复杂管道下可能使用更多资源

结论

上述管道适用于需要更强大和集中式数据流管道的场景,但不适合实时数据传送。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计