Filebeat 轻量级日志采集:Prospector、Harvester、多行日志处理
如果说 Logstash 是一个强大的数据处理工厂,那 Filebeat 就是一位不知疲倦的「搬运工」——它轻量、快速、可靠,专门负责把日志从服务器搬到 Logstash 或 ES。
1. Filebeat 简介
1.1 什么是 Filebeat?
Filebeat 是 Elastic 官方提供的轻量级日志采集器,它的特点是:
- 轻量级:资源消耗极低
- 可靠:内置重试和缓冲机制
- 简单:配置简单,部署容易
- 可观察:内置监控和调试工具
1.2 Filebeat vs Logstash
┌─────────────────────────────────────────────────────────────┐
│ Filebeat vs Logstash │
│ │
│ Filebeat Logstash │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 轻量级采集 │ │ 重量级处理 │ │
│ │ 资源占用低 │ │ 资源占用高 │ │
│ │ 采集+发送 │ →→→ │ 采集+处理+输出│ │
│ └──────────────┘ └──────────────┘ │
│ │
│ Filebeat 适合: Logstash 适合: │
│ ✓ 日志采集 ✓ 复杂数据转换 │
│ ✓ 多行日志处理 ✓ 多数据源聚合 │
│ ✓ 轻量级部署 ✓ 高级分析 │
│ │
└─────────────────────────────────────────────────────────────┘2. 核心概念
2.1 Prospector(勘探者)
Prospector 负责「找到」要采集的文件。
java
# filebeat.yml
filebeat.inputs:
# 方式一:直接配置
- type: log
enabled: true
paths:
- /var/log/nginx/*.log
# 方式二:使用 glob 模式
- type: log
enabled: true
paths:
- /var/log/**/*.log # 递归匹配
- /var/log/*.log # 单层匹配2.2 Harvester(收割者)
Harvester 负责「读取」文件内容。每个文件对应一个 Harvester。
文件 → Harvester(读取) → 缓存 → 发送Harvester 的工作流程:
1. 打开文件
2. 按行读取
3. 发送到 output
4. 记录读取位置
5. 等待新内容
6. 重复 2-52.3 内部队列
Filebeat 内置了一个小队列,用于缓冲和重试:
Harvester → Queue → Output (Logstash/ES)
↑ ↓
└── 重试 ◀─┘3. 配置详解
3.1 基础配置
java
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
# 添加额外字段
fields:
environment: production
service: nginx
fields_under_root: true # 字段放在根级别
# 输出配置
output.logstash:
hosts: ["logstash:5044"]
# 进程配置
filebeat.config.inputs:
enabled: true
path: /etc/filebeat/conf.d/*.yml3.2 多数据源配置
java
# 采集多种日志
filebeat.inputs:
# Nginx 访问日志
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
log_type: nginx_access
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
# 应用日志
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
log_type: application
multiline:
pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'
negate: true
match: after
# 系统日志
- type: log
enabled: true
paths:
- /var/log/syslog
fields:
log_type: system3.3 Registry(注册表)
Filebeat 使用 Registry 记录文件读取位置:
java
# Registry 配置
filebeat.inputs:
- type: log
paths:
- /var/log/app/*.log
# Registry 文件位置
registry_file: /var/lib/filebeat/registry
# 清理已删除文件记录
clean_removed: true
# 忽略旧文件的时间
ignore_older: 24h4. 多行日志处理
多行日志是日志采集的难点,Filebeat 内置了 multiline 配置。
4.1 基本配置
java
# 采集 Java 堆栈日志
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/application.log
multiline:
# 匹配新日志的开始(时间戳开头)
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after # 与上一行合并4.2 模式解释
pattern: '^%{TIMESTAMP_ISO8601}'
negate: true
match: after
含义:
- pattern: 匹配时间戳开头的行
- negate: true 表示取反,即「不以时间戳开头的行」
- match: after 表示合并到上一行
执行效果:
┌────────────────────────────────────────────┐
│ 2024-01-15 10:00:00 INFO 开始了 ← 开始 │
│ 这是日志内容 │
│ 这是更多内容 │
│ at com.example.Service.run(Service.java:10)│
│ ← 以上所有行都与第一行合并 │
│ │
│ 2024-01-15 10:00:01 INFO 完成了 ← 新的开始│
└────────────────────────────────────────────┘4.3 常见模式
Java 堆栈异常:
java
multiline:
pattern: '^[[:space:]]+(at|Caused by:|...)|^Exception'
negate: false
match: afterPython 错误:
java
multiline:
pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'
negate: true
match: afterXML 格式:
java
multiline:
pattern: '^<\\?xml'
negate: true
match: after5. 数据处理
5.1 Processors
Filebeat 支持在发送前处理数据:
java
# 添加元信息
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
# 字段操作
processors:
- rename:
fields:
- from: "host"
to: "server.name"
ignore_missing: false
fail_on_error: true
# 删除字段
- drop_fields:
fields: ["log.offset", "agent.version"]
ignore_missing: true
# 条件过滤
- drop_event:
when:
regexp:
message: "^DBG:"
# 添加字段
- add_fields:
target: ''
fields:
environment: production
version: 1.0.05.2 常用 Processors
java
# 解析 JSON
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
add_error_key: true
# 添加时间戳
processors:
- timestamp:
field: message
layouts:
- '2006-01-02T15:04:05Z07:00'
- '2006-01-02 15:04:05'
test:
- '2024-01-15T10:00:00Z'
# DNS 反向解析
processors:
- dns:
fields:
source.ip: source.hostname
type: reverse6. 输出配置
6.1 输出到 Logstash
java
# filebeat.yml
output.logstash:
hosts: ["logstash1:5044", "logstash2:5044", "logstash3:5044"]
# 负载均衡
loadbalance: true
# 重试配置
backoff.init: 1s
backoff.max: 60s
# SSL 配置
ssl.enabled: true
ssl.certificate_authorities: ["/etc/filebeat/ca.crt"]
ssl.certificate: "/etc/filebeat/filebeat.crt"
ssl.key: "/etc/filebeat/filebeat.key"6.2 输出到 Elasticsearch
java
# 直连 ES(不经过 Logstash)
output.elasticsearch:
hosts: ["es1:9200", "es2:9200", "es3:9200"]
index: "logs-%{+yyyy.MM.dd}"
# 索引模板
setup.template.enabled: true
setup.template.name: "filebeat"
setup.template.pattern: "filebeat-*"
# ILM 配置
ilm.enabled: true
ilm.rollover_alias: "filebeat"
ilm.pattern: "{now/d}-000001"
ilm.policy_name: "filebeat-policy"6.3 输出到 Kafka
java
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: "logs-%{[fields.log_type]}"
# 分区键
key: "%{[fields.log_type]}-%{[host.name]}"
# 序列化
codec.json:
include_rtt: false
# acks 配置
required_acks: 17. 可靠性配置
7.1 发送保证
java
# 确保至少发送成功一次
filebeat.inputs:
- type: log
paths:
- /var/log/app/*.log
# 发货单模式
harvester_buffer_size: 16384
output:
# 批量发送
bulk_max_size: 2048
# 超时配置
timeout: 90
# 重试配置
max_retries: 37.2 持久化队列
java
# Filebeat 7.13+ 内置持久化队列
filebeat.inputs:
- type: log
paths:
- /var/log/app/*.log
queue:
type: persisted
path: /var/lib/filebeat/queue
size: 4096
page_capacity: 2567.3 死信队列
java
# 配置死信队列
output.elasticsearch:
hosts: ["es:9200"]
bulk_max_size: 50
# 死信配置
dead_letter:
enabled: true
path: /var/lib/filebeat/dead_letter
max_bytes: 10485760 # 10MB8. 性能调优
8.1 资源限制
java
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/**/*.log
# Harvester 数量限制
close_inactive: 5m
# 扫描间隔
scan_frequency: 10s
# 文件句柄限制
harvester_limit: 10248.2 内存配置
java
# jvm.options
-Xms256m
-Xmx256m8.3 并发配置
java
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/**/*.log
# 单文件并发限制
harvester_concurrency: 29. 监控与调试
9.1 调试模式
bash
# 调试运行
filebeat -e -d "*"
# 只显示特定模块
filebeat -e -d "harvester"9.2 测试配置
bash
# 测试配置文件
filebeat test config -c filebeat.yml
# 测试输出连接
filebeat test output -c filebeat.yml9.3 监控接口
java
# 启用监控
monitoring:
enabled: true
elasticsearch:
hosts: ["es:9200"]
# HTTP 接口
http.enabled: true
http.host: "localhost"
http.port: 506610. 常见问题
Q1:文件被 logrotate 轮转后重复读取?
答案:配置 close_inactive 和 ignore_older。
java
filebeat.inputs:
- type: log
close_inactive: 5m
ignore_older: 24hQ2:日志丢失?
答案:
- 检查 harvester 是否正常读取
- 检查 output 是否正常发送
- 启用持久化队列
Q3:CPU 占用率高?
答案:
- 减少 harvester_concurrency
- 增大 close_inactive
- 检查 multiline 配置
总结
Filebeat 的核心要点:
- Prospector:找到要采集的文件
- Harvester:读取文件内容
- Registry:记录读取位置,保证不丢失
- Processors:发送前处理数据
- 多行处理:multiline 配置
Filebeat 适合作为日志采集的第一站,轻量且可靠。
留给你的问题:
假设你负责采集一个 Java 微服务集群的日志,每个服务有多个实例,日志量约 50GB/天。
你会如何设计 Filebeat 部署方案?
- 每个节点一个 Filebeat?
- Filebeat 配置如何设计?
- 如何处理多行日志?
