1. Logstash 是什么?
Logstash 是 Elastic Stack (ELK/Elastic Stack) 的核心成员之一,扮演着数据处理管道的角色。你可以把它想象成一个功能强大的“数据加工厂”。
它的主要工作流程是:
这个 Input -> Filter -> Output 的流程被称为一个管道 (Pipeline)。
2.核心概念
- 事件 (Event):在 Logstash 中,每一条数据(比如一行日志)都被视为一个“事件”。事件是一个 JSON 对象,包含了原始数据以及 Logstash 处理过程中添加的元数据。
- 插件 (Plugin):Logstash 的功能都是通过插件实现的。插件分为三种类型,对应管道的三个阶段:
- 输入插件 (Input Plugins):负责数据收集。
- 过滤插件 (Filter Plugins):负责数据处理。
- 输出插件 (Output Plugins):负责数据发送。
- 配置文件 (Configuration File):定义 Logstash 管道行为的文件。通常以 .conf 结尾。
3. 安装与运行
前提条件
- 安装 Java 8 或 Java 11。你可以通过 java -version 命令检查。
安装
运行
Logstash 的运行非常简单,核心命令是:
# -f 参数指定配置文件
bin/logstash -f <your_config_file.conf>
在开始编写复杂配置前,先来一个最简单的“Hello World”示例。
创建一个名为 first-pipeline.conf 的文件:
# first-pipeline.conf
# 输入:从标准输入(你的键盘)接收数据
input {
stdin {}
}
# 输出:将数据格式化后输出到标准输出(你的屏幕)
output {
stdout {
codec => rubydebug # rubydebug 是一种格式,让输出更易读
}
}
运行它:
bin/logstash -f first-pipeline.conf
启动后,在控制台输入 “hello logstash”,然后按回车。你会看到类似下面的输出:
{
"message" => "hello logstash",
"@version" => "1",
"host" => "your-hostname",
"@timestamp" => 2023-10-27T08:30:00.123Z
}
这证明你的 Logstash 管道已经成功运行。message 是你输入的内容,@version, host, @timestamp 是 Logstash 自动为每个事件添加的元数据。
4. 配置文件详解
Logstash 配置文件由三个主要部分组成:input, filter, output。
# 输入段
input {
# … 一个或多个输入插件配置 …
}
# 过滤段(可选)
filter {
# … 一个或多个过滤插件配置 …
}
# 输出段
output {
# … 一个或多个输出插件配置 …
}
4.1 输入插件 (Input)
作用:定义数据从哪里来。
常用插件示例:
-
file: 从文件中读取数据,非常适合读取日志文件。它会记录读取位置,不会重复读取。
input {
file {
path => "/var/log/nginx/access.log" # 要读取的文件路径
start_position => "beginning" # 从文件开头开始读取
sincedb_path => "/dev/null" # (可选) 在开发测试时,禁止记录读取位置,方便重复测试
}
} -
beats: 接收来自 Filebeat、Metricbeat 等 Beats 家族成员发送的数据。这是生产环境中非常推荐的组合。
input {
beats {
port => 5044 # 监听的端口
}
} -
tcp / udp: 监听一个 TCP 或 UDP 端口来接收数据。
input {
tcp {
port => 9999
codec => "json" # 假设传入的数据是 JSON 格式
}
} -
kafka: 从 Kafka 主题中消费数据。
input {
kafka {
bootstrap_servers => "kafka-server:9092"
topics => ["log-topic"]
}
} -
jdbc: 通过 JDBC 定期查询数据库。
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://db-server:3306/mydatabase"
jdbc_user => "user"
jdbc_password => "password"
statement => "SELECT * from logs WHERE timestamp > :sql_last_value"
schedule => "*/5 * * * *" # 每5分钟执行一次
}
}
4.2 过滤插件 (Filter)
作用:数据处理的核心,对事件进行解析、转换、丰富。
常用插件示例:
-
grok: (最重要) 用于解析非结构化文本数据,将其转换为结构化数据。它使用预定义的正则表达式模式。
- 场景:解析 Nginx 或 Apache 的访问日志。
- 示例: # 原始日志: 127.0.0.1 – – [27/Oct/2023:12:00:00 +0800] "GET /index.html HTTP/1.1" 200 1234
filter {
grok {
# %{COMBINEDAPACHELOG} 是一个预定义的 grok 模式
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
解析后,事件会增加 clientip, verb, request, response, bytes 等字段。技巧:使用 Kibana Grok Debugger 工具可以非常方便地测试和创建你的 Grok 模式。
-
mutate: 用于对字段进行通用操作,如重命名、删除、替换和类型转换。
filter {
mutate {
# 将 "response" 字段的值转换为整数
convert => { "response" => "integer" }
# 重命名 "clientip" 字段为 "client_address"
rename => { "clientip" => "client_address" }
# 删除不需要的字段
remove_field => ["field_to_remove_1", "field_to_remove_2"]
# 添加新字段
add_field => { "processed_by" => "logstash-server-1" }
}
} -
date: 解析字段中的时间字符串,并用它来替换 Logstash 事件的默认 @timestamp 字段。这对于确保事件时间准确性至关重要。
# Nginx 日志中的时间格式: [27/Oct/2023:12:00:00 +0800]
filter {
grok {
match => { "message" => "%{HTTPDATE:log_timestamp}" }
}
date {
# 匹配 log_timestamp 字段中的时间
match => [ "log_timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
# 成功解析后,删除原始的 log_timestamp 字段
remove_field => [ "log_timestamp" ]
}
} -
json: 解析值为 JSON 字符串的字段。
# 如果原始 message 是: { "user": "john", "action": "login" }
filter {
json {
source => "message" # 指定要解析的字段
}
}
# 解析后,事件会增加 user 和 action 两个字段 -
geoip: 根据 IP 地址信息,添加地理位置数据(如国家、城市、经纬度)。
filter {
geoip {
source => "client_address" # 指定包含 IP 地址的字段
}
}
# 成功后,会添加一个 geoip 字段,里面包含 location, country_name 等信息
条件判断 (Conditionals)
filter 块支持 if, else if, else 语句,可以根据不同类型的日志执行不同的处理逻辑。
filter {
if [type] == "nginx-access" {
grok { … }
} else if [type] == "app-json" {
json { … }
} else {
# 其他处理
}
}
4.3 输出插件 (Output)
作用:定义处理好的数据发送到哪里。
常用插件示例:
-
elasticsearch: (最常用) 将数据发送到 Elasticsearch 集群。
output {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200"] # ES 集群地址
index => "logstash-%{+YYYY.MM.dd}" # 索引名称,按天创建
user => "elastic"
password => "your_password"
}
} -
stdout: 输出到标准输出,主要用于调试。
output {
stdout {
codec => rubydebug # 格式化输出,便于查看
}
} -
file: 将数据写入文件。
output {
file {
path => "/tmp/logstash_output.log"
}
} -
kafka: 将数据发送到 Kafka。
output {
kafka {
bootstrap_servers => "kafka-server:9092"
topic_id => "processed_logs"
}
}
5. 实战案例:解析 Nginx 访问日志并存入 Elasticsearch
目标:
创建配置文件 nginx-pipeline.conf:
# nginx-pipeline.conf
input {
file {
path => "/path/to/your/nginx/access.log"
start_position => "beginning"
# 在生产环境中,应去掉下面这行或指定一个真实路径
sincedb_path => "/dev/null"
}
}
filter {
# 1. 使用 grok 解析非结构化日志
grok {
# COMBINEDAPACHELOG 是一个内置的模式,适用于常见的 Nginx/Apache 日志格式
# 示例日志: 192.168.1.1 – – [27/Oct/2023:14:10:30 +0800] "GET /api/v1/users HTTP/1.1" 200 512 "-" "Mozilla/5.0…"
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# 2. 如果 grok 解析成功,则继续处理
if "_grokparsefailure" not in [tags] {
# 2.1 解析时间戳,并设置为事件的主时间戳
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
# 2.2 使用 geoip 插件,通过 clientip 字段获取地理位置信息
geoip {
source => "clientip"
}
# 2.3 转换字段类型
mutate {
convert => { "bytes" => "integer" }
convert => { "response" => "integer" }
}
# 2.4 (可选) 解析成功后,可以删除原始的 message 字段以节省空间
mutate {
remove_field => [ "message", "timestamp" ]
}
}
}
output {
# 输出到 Elasticsearch
elasticsearch {
hosts => ["http://localhost:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
# 同时输出到控制台,方便调试
stdout {
codec => rubydebug
}
}
运行与测试:
检查配置:在运行前,先检查配置文件语法是否正确。
bin/logstash -f nginx-pipeline.conf –config.test_and_exit
如果看到 Configuration OK,说明配置没问题。
运行管道:
bin/logstash -f nginx-pipeline.conf
生成日志 (可选):向你的 Nginx 访问日志文件中追加一行新日志,观察 Logstash 控制台的输出。 处理后的输出应该类似这样 (rubydebug 格式):
{
"agent" => "\\"Mozilla/5.0…\\"",
"clientip" => "192.168.1.1",
"verb" => "GET",
"request" => "/api/v1/users",
"httpversion" => "1.1",
"response" => 200,
"bytes" => 512,
"geoip" => {
"ip" => "192.168.1.1",
"country_name" => "China",
"city_name" => "Beijing",
"location" => { "lon" => 116.3883, "lat" => 39.9289 }
},
"@timestamp" => 2023-10-27T06:10:30.000Z, // 注意时间已经转为 UTC
…
}
同时,这些结构化数据也会被发送到 Elasticsearch 中名为 nginx-access-YYYY.MM.DD 的索引里。
6. 高级主题与最佳实践
- 多管道 (Multiple Pipelines):不要把所有逻辑都放在一个巨大的配置文件里。使用 pipelines.yml 文件来定义和管理多个独立的管道,提高模块化和可维护性。
- 持久化队列 (Persistent Queues):当输出端(如 Elasticsearch)不可用时,Logstash 默认将事件缓存在内存中。开启持久化队列 (queue.type: persisted) 可以将队列写入磁盘,防止 Logstash 重启或崩溃时数据丢失。
- 死信队列 (Dead Letter Queues – DLQ):当事件无法被处理(如 JSON 解析失败)或无法发送到输出端时,可以将其发送到死信队列,以便后续分析和处理,而不是直接丢弃。
- 性能调优:
- pipeline.workers:增加 worker 数量可以并行处理事件,提高吞吐量(受 CPU 核心数限制)。
- pipeline.batch.size:调整每个批次处理的事件数量。
- 监控:通过 Logstash 的监控 API (curl -XGET 'localhost:9600/_node/stats') 来获取其运行状态、性能指标和插件信息。
网硕互联帮助中心





评论前必须登录!
注册