云计算百科
云计算领域专业知识百科平台

Logstash详解配置使用

1. Logstash 是什么?

Logstash 是 Elastic Stack (ELK/Elastic Stack) 的核心成员之一,扮演着数据处理管道的角色。你可以把它想象成一个功能强大的“数据加工厂”。

它的主要工作流程是:

  • 收集 (Input):从各种来源(如日志文件、数据库、消息队列、网络端口等)收集数据。
  • 处理 (Filter):对收集到的数据进行解析、转换、丰富和结构化。这是 Logstash 最强大的部分。
  • 输出 (Output):将处理好的数据发送到一个或多个目的地(如 Elasticsearch、数据库、文件、消息队列等)。
  • 这个 Input -> Filter -> Output 的流程被称为一个管道 (Pipeline)。

    2.核心概念

    • 事件 (Event):在 Logstash 中,每一条数据(比如一行日志)都被视为一个“事件”。事件是一个 JSON 对象,包含了原始数据以及 Logstash 处理过程中添加的元数据。
    • 插件 (Plugin):Logstash 的功能都是通过插件实现的。插件分为三种类型,对应管道的三个阶段:
      • 输入插件 (Input Plugins):负责数据收集。
      • 过滤插件 (Filter Plugins):负责数据处理。
      • 输出插件 (Output Plugins):负责数据发送。
    • 配置文件 (Configuration File):定义 Logstash 管道行为的文件。通常以 .conf 结尾。

    3. 安装与运行

    前提条件
    • 安装 Java 8 或 Java 11。你可以通过 java -version 命令检查。
    安装
  • 从 Elastic 官网下载 Logstash。
  • 解压下载的压缩包。
  • 运行

    Logstash 的运行非常简单,核心命令是:

    # -f 参数指定配置文件
    bin/logstash -f <your_config_file.conf>

    在开始编写复杂配置前,先来一个最简单的“Hello World”示例。

    创建一个名为 first-pipeline.conf 的文件:

    # first-pipeline.conf

    # 输入:从标准输入(你的键盘)接收数据
    input {
    stdin {}
    }

    # 输出:将数据格式化后输出到标准输出(你的屏幕)
    output {
    stdout {
    codec => rubydebug # rubydebug 是一种格式,让输出更易读
    }
    }

    运行它:

    bin/logstash -f first-pipeline.conf

    启动后,在控制台输入 “hello logstash”,然后按回车。你会看到类似下面的输出:

    {
    "message" => "hello logstash",
    "@version" => "1",
    "host" => "your-hostname",
    "@timestamp" => 2023-10-27T08:30:00.123Z
    }

    这证明你的 Logstash 管道已经成功运行。message 是你输入的内容,@version, host, @timestamp 是 Logstash 自动为每个事件添加的元数据。


    4. 配置文件详解

    Logstash 配置文件由三个主要部分组成:input, filter, output。

    # 输入段
    input {
    # … 一个或多个输入插件配置 …
    }

    # 过滤段(可选)
    filter {
    # … 一个或多个过滤插件配置 …
    }

    # 输出段
    output {
    # … 一个或多个输出插件配置 …
    }

    4.1 输入插件 (Input)

    作用:定义数据从哪里来。

    常用插件示例:

    • file: 从文件中读取数据,非常适合读取日志文件。它会记录读取位置,不会重复读取。

      input {
      file {
      path => "/var/log/nginx/access.log" # 要读取的文件路径
      start_position => "beginning" # 从文件开头开始读取
      sincedb_path => "/dev/null" # (可选) 在开发测试时,禁止记录读取位置,方便重复测试
      }
      }

    • beats: 接收来自 Filebeat、Metricbeat 等 Beats 家族成员发送的数据。这是生产环境中非常推荐的组合。

      input {
      beats {
      port => 5044 # 监听的端口
      }
      }

    • tcp / udp: 监听一个 TCP 或 UDP 端口来接收数据。

      input {
      tcp {
      port => 9999
      codec => "json" # 假设传入的数据是 JSON 格式
      }
      }

    • kafka: 从 Kafka 主题中消费数据。

      input {
      kafka {
      bootstrap_servers => "kafka-server:9092"
      topics => ["log-topic"]
      }
      }

    • jdbc: 通过 JDBC 定期查询数据库。

      input {
      jdbc {
      jdbc_driver_library => "/path/to/mysql-connector-java.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_connection_string => "jdbc:mysql://db-server:3306/mydatabase"
      jdbc_user => "user"
      jdbc_password => "password"
      statement => "SELECT * from logs WHERE timestamp > :sql_last_value"
      schedule => "*/5 * * * *" # 每5分钟执行一次
      }
      }

    4.2 过滤插件 (Filter)

    作用:数据处理的核心,对事件进行解析、转换、丰富。

    常用插件示例:

    • grok: (最重要) 用于解析非结构化文本数据,将其转换为结构化数据。它使用预定义的正则表达式模式。

      • 场景:解析 Nginx 或 Apache 的访问日志。
      • 示例: # 原始日志: 127.0.0.1 – – [27/Oct/2023:12:00:00 +0800] "GET /index.html HTTP/1.1" 200 1234
        filter {
        grok {
        # %{COMBINEDAPACHELOG} 是一个预定义的 grok 模式
        match => { "message" => "%{COMBINEDAPACHELOG}" }
        }
        }
        解析后,事件会增加 clientip, verb, request, response, bytes 等字段。

        技巧:使用 Kibana Grok Debugger 工具可以非常方便地测试和创建你的 Grok 模式。

    • mutate: 用于对字段进行通用操作,如重命名、删除、替换和类型转换。

      filter {
      mutate {
      # 将 "response" 字段的值转换为整数
      convert => { "response" => "integer" }
      # 重命名 "clientip" 字段为 "client_address"
      rename => { "clientip" => "client_address" }
      # 删除不需要的字段
      remove_field => ["field_to_remove_1", "field_to_remove_2"]
      # 添加新字段
      add_field => { "processed_by" => "logstash-server-1" }
      }
      }

    • date: 解析字段中的时间字符串,并用它来替换 Logstash 事件的默认 @timestamp 字段。这对于确保事件时间准确性至关重要。

      # Nginx 日志中的时间格式: [27/Oct/2023:12:00:00 +0800]
      filter {
      grok {
      match => { "message" => "%{HTTPDATE:log_timestamp}" }
      }
      date {
      # 匹配 log_timestamp 字段中的时间
      match => [ "log_timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      # 成功解析后,删除原始的 log_timestamp 字段
      remove_field => [ "log_timestamp" ]
      }
      }

    • json: 解析值为 JSON 字符串的字段。

      # 如果原始 message 是: { "user": "john", "action": "login" }
      filter {
      json {
      source => "message" # 指定要解析的字段
      }
      }
      # 解析后,事件会增加 user 和 action 两个字段

    • geoip: 根据 IP 地址信息,添加地理位置数据(如国家、城市、经纬度)。

      filter {
      geoip {
      source => "client_address" # 指定包含 IP 地址的字段
      }
      }
      # 成功后,会添加一个 geoip 字段,里面包含 location, country_name 等信息

    条件判断 (Conditionals)

    filter 块支持 if, else if, else 语句,可以根据不同类型的日志执行不同的处理逻辑。

    filter {
    if [type] == "nginx-access" {
    grok { … }
    } else if [type] == "app-json" {
    json { … }
    } else {
    # 其他处理
    }
    }

    4.3 输出插件 (Output)

    作用:定义处理好的数据发送到哪里。

    常用插件示例:

    • elasticsearch: (最常用) 将数据发送到 Elasticsearch 集群。

      output {
      elasticsearch {
      hosts => ["http://es-node1:9200", "http://es-node2:9200"] # ES 集群地址
      index => "logstash-%{+YYYY.MM.dd}" # 索引名称,按天创建
      user => "elastic"
      password => "your_password"
      }
      }

    • stdout: 输出到标准输出,主要用于调试。

      output {
      stdout {
      codec => rubydebug # 格式化输出,便于查看
      }
      }

    • file: 将数据写入文件。

      output {
      file {
      path => "/tmp/logstash_output.log"
      }
      }

    • kafka: 将数据发送到 Kafka。

      output {
      kafka {
      bootstrap_servers => "kafka-server:9092"
      topic_id => "processed_logs"
      }
      }


    5. 实战案例:解析 Nginx 访问日志并存入 Elasticsearch

    目标:

  • 读取 Nginx 访问日志文件 /var/log/nginx/access.log。
  • 使用 Grok 解析日志内容。
  • 根据客户端 IP 添加地理位置信息。
  • 将处理后的数据存入 Elasticsearch。
  • 在开发调试时,同时在控制台打印输出。
  • 创建配置文件 nginx-pipeline.conf:

    # nginx-pipeline.conf

    input {
    file {
    path => "/path/to/your/nginx/access.log"
    start_position => "beginning"
    # 在生产环境中,应去掉下面这行或指定一个真实路径
    sincedb_path => "/dev/null"
    }
    }

    filter {
    # 1. 使用 grok 解析非结构化日志
    grok {
    # COMBINEDAPACHELOG 是一个内置的模式,适用于常见的 Nginx/Apache 日志格式
    # 示例日志: 192.168.1.1 – – [27/Oct/2023:14:10:30 +0800] "GET /api/v1/users HTTP/1.1" 200 512 "-" "Mozilla/5.0…"
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    }

    # 2. 如果 grok 解析成功,则继续处理
    if "_grokparsefailure" not in [tags] {
    # 2.1 解析时间戳,并设置为事件的主时间戳
    date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }

    # 2.2 使用 geoip 插件,通过 clientip 字段获取地理位置信息
    geoip {
    source => "clientip"
    }

    # 2.3 转换字段类型
    mutate {
    convert => { "bytes" => "integer" }
    convert => { "response" => "integer" }
    }

    # 2.4 (可选) 解析成功后,可以删除原始的 message 字段以节省空间
    mutate {
    remove_field => [ "message", "timestamp" ]
    }
    }
    }

    output {
    # 输出到 Elasticsearch
    elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "nginx-access-%{+YYYY.MM.dd}"
    }

    # 同时输出到控制台,方便调试
    stdout {
    codec => rubydebug
    }
    }

    运行与测试:

  • 检查配置:在运行前,先检查配置文件语法是否正确。

    bin/logstash -f nginx-pipeline.conf –config.test_and_exit

    如果看到 Configuration OK,说明配置没问题。

  • 运行管道:

    bin/logstash -f nginx-pipeline.conf

  • 生成日志 (可选):向你的 Nginx 访问日志文件中追加一行新日志,观察 Logstash 控制台的输出。 处理后的输出应该类似这样 (rubydebug 格式):

    {
    "agent" => "\\"Mozilla/5.0…\\"",
    "clientip" => "192.168.1.1",
    "verb" => "GET",
    "request" => "/api/v1/users",
    "httpversion" => "1.1",
    "response" => 200,
    "bytes" => 512,
    "geoip" => {
    "ip" => "192.168.1.1",
    "country_name" => "China",
    "city_name" => "Beijing",
    "location" => { "lon" => 116.3883, "lat" => 39.9289 }
    },
    "@timestamp" => 2023-10-27T06:10:30.000Z, // 注意时间已经转为 UTC

    }

    同时,这些结构化数据也会被发送到 Elasticsearch 中名为 nginx-access-YYYY.MM.DD 的索引里。


  • 6. 高级主题与最佳实践

    • 多管道 (Multiple Pipelines):不要把所有逻辑都放在一个巨大的配置文件里。使用 pipelines.yml 文件来定义和管理多个独立的管道,提高模块化和可维护性。
    • 持久化队列 (Persistent Queues):当输出端(如 Elasticsearch)不可用时,Logstash 默认将事件缓存在内存中。开启持久化队列 (queue.type: persisted) 可以将队列写入磁盘,防止 Logstash 重启或崩溃时数据丢失。
    • 死信队列 (Dead Letter Queues – DLQ):当事件无法被处理(如 JSON 解析失败)或无法发送到输出端时,可以将其发送到死信队列,以便后续分析和处理,而不是直接丢弃。
    • 性能调优:
      • pipeline.workers:增加 worker 数量可以并行处理事件,提高吞吐量(受 CPU 核心数限制)。
      • pipeline.batch.size:调整每个批次处理的事件数量。
    • 监控:通过 Logstash 的监控 API (curl -XGET 'localhost:9600/_node/stats') 来获取其运行状态、性能指标和插件信息。
    赞(0)
    未经允许不得转载:网硕互联帮助中心 » Logstash详解配置使用
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!