Logstash 集群部署与配置文件管理
单节点 Logstash 扛不住流量?这一节我们来聊聊 Logstash 的集群部署和配置管理。
1. 为什么要集群部署?
单节点 Logstash:
┌─────────────────────┐
│ Logstash Node │
│ │
│ Input → Filter → Output
└─────────────────────┘
↑
│ 瓶颈
│
流量有限
集群 Logstash:
┌─────────────────────┐
│ Load Balancer │
└────────┬────────────┘
│
┌────┴────┐
│ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│ LS 1 │ │ LS 2 │ │ LS 3 │
│ │ │ │ │ │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
└─────────┴─────────┘
↓
Elasticsearch集群的优势:
- 横向扩展:增加节点提升吞吐量
- 高可用:单节点故障不影响整体
- 负载均衡:流量分散到多个节点
- 容错性:故障节点自动隔离
2. 多 Pipeline 配置
2.1 pipelines.yml
yaml
# /etc/logstash/pipelines.yml
- pipeline.id: main-logs
path.config: "/etc/logstash/conf.d/main/*.conf"
pipeline.workers: 4
pipeline.batch.size: 250
- pipeline.id: nginx-logs
path.config: "/etc/logstash/conf.d/nginx/*.conf"
pipeline.workers: 2
pipeline.batch.size: 500
- pipeline.id: jdbc-sync
path.config: "/etc/logstash/conf.d/jdbc/*.conf"
pipeline.workers: 1
pipeline.batch.size: 100
queue.type: persisted2.2 Pipeline 优先级
yaml
- pipeline.id: high-priority
path.config: "/etc/logstash/conf.d/high/*.conf"
pipeline.workers: 4
pipeline.java_execution: true
execution: "single" # 独占线程池
priority: 100 # 高优先级
- pipeline.id: normal
path.config: "/etc/logstash/conf.d/normal/*.conf"
priority: 02.3 Pipeline 间通信
Pipeline 之间可以共享数据:
java
// Pipeline A: 解析请求
input { file { path => "/var/log/access.log" } }
filter {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
mutate { add_field => { "[@metadata][pipeline]" => "A" } }
}
output {
pipeline { send_to => "parsed_logs" }
}
// Pipeline B: 接收处理后的数据
input { pipeline { address => "parsed_logs" } }
filter {
geoip { source => "client_ip" }
useragent { source => "agent" }
}
output {
elasticsearch { hosts => ["http://localhost:9200"] }
}3. 架构设计
3.1 常见的架构模式
模式一:Beats → Logstash → Elasticsearch
Filebeat ──┐
Metricbeat ├──→ Logstash Cluster → Elasticsearch ←── Kibana
Heartbeat ┘模式二:Kafka 作为缓冲
Filebeat → Kafka → Logstash Cluster → Elasticsearch
↑
│
削峰填谷模式三:分层架构
┌─────────────┐
│ Beats │
└──────┬──────┘
│
┌──────▼──────┐
│ Light LS │ (解析、富化)
│ (解析节点) │
└──────┬──────┘
│
┌──────▼──────┐
│ Kafka │
└──────┬──────┘
│
┌──────▼──────┐
│ Heavy LS │ (聚合、分析)
│ (聚合节点) │
└──────┬──────┘
│
┌──────▼──────┐
│ Elasticsearch│
└─────────────┘3.2 节点角色分配
yaml
# 解析节点配置
pipeline.workers: 4
pipeline.batch.size: 500
# 聚合节点配置
pipeline.workers: 2
pipeline.batch.size: 125
queue.type: persisted4. 配置管理
4.1 配置文件组织
/etc/logstash/
├── logstash.yml # 主配置
├── pipelines.yml # Pipeline 配置
├── jvm.options # JVM 配置
├── log4j2.properties # 日志配置
│
├── conf.d/ # Pipeline 配置目录
│ ├── common/ # 通用配置
│ │ ├── 01-inputs.conf
│ │ ├── 02-filters.conf
│ │ └── 03-outputs.conf
│ │
│ ├── nginx/ # Nginx 日志 Pipeline
│ │ └── pipeline.conf
│ │
│ ├── app/ # 应用日志 Pipeline
│ │ └── pipeline.conf
│ │
│ └── jdbc/ # 数据库同步 Pipeline
│ └── pipeline.conf
│
└── patterns/ # 自定义 Grok 模式
├── myapp
└── nginx4.2 通用配置模式
java
# conf.d/common/01-inputs.conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/ssl/logstash.crt"
ssl_key => "/etc/logstash/ssl/logstash.key"
}
}java
# conf.d/common/03-outputs.conf
output {
if "nginx" in [tags] {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
} else if "app" in [tags] {
elasticsearch {
hosts => ["http://es-node1:9200", "http://es-node2:9200"]
index => "app-%{+YYYY.MM.dd}"
}
}
}4.3 环境变量
yaml
# logstash.yml
---
node.name: "${HOSTNAME}"
path.data: /var/lib/logstash
path.logs: /var/log/logstash
# 从环境变量读取
xpack.monitoring.elasticsearch.hosts: ["${ES_HOSTS}"]
xpack.monitoring.enabled: true
# 默认值
var.destination: "${ES_DESTINATION:localhost:9200}"bash
# 启动时指定环境变量
ES_HOSTS="es1:9200,es2:9200" bin/logstash -f pipeline.conf4.4 配置版本控制
bash
# Git 目录结构
logstash-config/
├── pipelines.yml
├── conf.d/
│ ├── nginx/
│ │ ├── pipeline.conf
│ │ └── pipeline.conf.bak
│ └── app/
│ └── pipeline.conf
├── patterns/
│ └── myapp
└── templates/
└── elasticsearch-template.json5. 高可用配置
5.1 心跳检测
yaml
# logstash.yml
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://es:9200"]
# 监控 Logstash 自身
monitoring.cluster.uuid: "my-logstash-cluster"5.2 健康检查
bash
# 检查 Logstash 健康状态
curl -X GET "localhost:9600/_node/stats?pretty"
# 检查 Pipeline 状态
curl -X GET "localhost:9600/_node/pipeline?pretty"
# 检查配置文件
curl -X GET "localhost:9600/_node/config_files?pretty"5.3 故障转移
┌─────────────┐
│ Load Balancer│
└──────┬──────┘
│
┌──────▼──────┐
│ LS Node 1 │ ←── 故障时自动移除
└──────┬──────┘
│
┌──────▼──────┐
│ LS Node 2 │ ←── 接管流量
└─────────────┘使用 HAProxy 做负载均衡:
bash
# /etc/haproxy/haproxy.cfg
listen logstash
bind *:5044
mode tcp
balance roundrobin
server ls1 192.168.1.10:5044 check inter 5s rise 2 fall 3
server ls2 192.168.1.11:5044 check inter 5s rise 2 fall 36. 部署实践
6.1 Docker Compose 部署
yaml
# docker-compose.yml
version: '3'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./pipeline:/usr/share/logstash/pipeline
- ./patterns:/usr/share/logstash/patterns
- ./config/logstash.yml:/usr/share/logstash/config/logstash.yml
- ./data:/usr/share/logstash/data
ports:
- "5044:5044"
- "9600:9600"
environment:
- "LS_JAVA_OPTS=-Xmx2g -Xms2g"
deploy:
replicas: 3
restart: always
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node6.2 Kubernetes 部署
yaml
# logstash-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash
spec:
replicas: 3
selector:
matchLabels:
app: logstash
template:
metadata:
labels:
app: logstash
spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash:8.11.0
ports:
- containerPort: 5044
- containerPort: 9600
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
volumeMounts:
- name: pipeline
mountPath: /usr/share/logstash/pipeline
env:
- name: "LS_JAVA_OPTS"
value: "-Xmx2g -Xms2g"
volumes:
- name: pipeline
configMap:
name: logstash-pipeline
---
apiVersion: v1
kind: Service
metadata:
name: logstash
spec:
ports:
- port: 5044
name: beats
- port: 9600
name: api
selector:
app: logstash7. 监控与告警
7.1 监控指标
bash
# 关键监控指标
# 吞吐量
curl -s "localhost:9600/_node/stats/pipeline" | jq '.pipelines.main.events'
# 队列大小
curl -s "localhost:9600/_node/stats/pipeline" | jq '.pipelines.main.queue_size'
# 批次处理时间
curl -s "localhost:9600/_node/stats/pipeline" | jq '.pipelines.main.plugins.filters[].duration_in_millis'
# JVM 内存
curl -s "localhost:9600/_node/stats/jvm" | jq '.jvm.mem'7.2 Prometheus 监控
yaml
# logstash.yml
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://es:9200"]yaml
# prometheus 配置
scrape_configs:
- job_name: 'logstash'
static_configs:
- targets: ['logstash:9600']
metrics_path: '/_node/stats/metrics'7.3 告警规则
yaml
# Prometheus 告警规则
groups:
- name: logstash
rules:
- alert: LogstashDown
expr: up{job="logstash"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Logstash instance down"
- alert: LogstashQueueFull
expr: logstash_queue_size > 0.9 * logstash_queue_max_size
for: 5m
labels:
severity: warning
annotations:
summary: "Logstash queue almost full"
- alert: LogstashHighLatency
expr: rate(logstash_pipeline_filter_duration_seconds_sum[5m]) / rate(logstash_pipeline_filter_events[5m]) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "Logstash filter latency high"8. 运维清单
Logstash 集群运维检查清单:
□ 节点数量与流量匹配
□ Pipeline 配置正确分发
□ 配置文件版本控制
□ 监控指标正常
□ 日志文件正常
□ 队列未堆积
□ 资源使用合理
□ 故障转移正常
□ 备份配置已更新总结
Logstash 集群部署的关键要点:
- 多 Pipeline:按功能分离 Pipeline,便于管理和调优
- 架构设计:根据流量选择合适的架构(直接、缓冲、分层)
- 配置管理:统一的配置文件组织 + 版本控制
- 高可用:多节点 + 负载均衡 + 健康检查
- 监控告警:监控关键指标,及时发现问题
留给你的问题:
假设你的系统每天需要处理 10GB 日志,你计划部署多少个 Logstash 节点?如何配置?
需要考虑的问题:
- 每个节点的处理能力
- 高可用要求
- 数据不丢失的保障
这个设计需要根据实际情况调整。
