赞
踩
Elasticsearch、Logstash 和 Kibana 组成了一个开源的日志分析系统简称 ELK Stack。
在现代化的信息系统中需要收集、存储和分析庞大的日志数据,以便更好地理解系统的运行状态和诊断问题。而使用传统的方法如 grep、tail 和 awk 来分析日志数据,很容易出错并且不够灵活。通过使用 ELK Stack可以更快捷、更准确地进行日志分析和数据可视化,从而提高信息系统的稳定性和可靠性。
本文主要介绍日志分析系统的技术架构,包括Logstash、Elasticsearch和Kibana三部分。
Logstash是一个开源数据收集引擎。它能够动态地从不同的来源(如文件、网络流、系统日志等)收集数据,并对数据进行过滤、转换和输出。在Logstash中,输入、过滤器和输出是三个重要的模块。
下面是一个使用Java编写的Logstash配置文件示例,其中包含从文件读取数据、利用grok正则表达式过滤数据并输出到Elasticsearch的案例
input{ file{ path => "/usr/share/logstash/sample.log" //文件路径 start_position => "beginning" //从头开始读取 } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } //使用grok正则表达式过滤数据 } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] //解析时间戳 } } output { elasticsearch { hosts => ["localhost:9200"] //指定Elasticsearch的地址 index => "logstash-%{+YYYY.MM.dd}" //动态创建索引 } }
Elasticsearch是一个分布式、RESTful风格的搜索和分析引擎。它可以帮助用户存储、搜索、分析和可视化大量的数据。
Elasticsearch的核心概念是索引、文档和节点
下面是一个Java编写的Elasticsearch配置文件示例,包含索引的设置、分片和副本的设置以及相应的mappings和settings等配置:
PUT /my-index //创建索引 { "settings": { "number_of_shards": 2, //设置分片数量 "number_of_replicas": 1 //设置副本数量 }, "mappings": { //设置mappings "properties": { "title": { "type": "text" }, "content": { "type": "text" }, "create_at": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
Kibana是一个基于Elasticsearch的开源分析和可视化平台。它提供了各种数据探索、查询和可视化功能,使用户能够更好地理解和分析自己的数据。
Kibana主要有如下几个功能:
下面是一个使用Java编写的Kibana配置文件示例,其中包含应用程序、查询和可视化的配置:
POST my-index/_search //查询
{
"query": {
"match": {
"content": "some words"
}
}
}
PUT /my-dashboard //创建Dashboard
{
"title": "My Dashboard",
"panelsJSON": "[..., ...]" //设置可视化组件
}
总的来说Logstash+Elasticsearch+Kibana的架构可以帮助我们搭建一个完整的日志分析系统,方便我们快速分析所需的数据,并对其进行合理的可视化展示和查询操作
Elasticsearch Logstash Kibana 是三个不同的工具需要分别下载并安装
Elasticsearch是一个开源搜索引擎它的功能包括全文检索、结构化搜索、分布式搜索、分析 可以从官网下载最新版本的Elasticsearch
Logstash是一个开源的数据处理工具,通常用于日志收集、过滤、格式化和转发 可以从官网下载最新版本的Logstash
Kibana是一个开源的数据可视化工具,它可以将存储在Elasticsearch中的数据进行可视化展示可以从官网下载最新版本的Kibana
在完成安装后需要启动三个工具
在Linux系统下可以通过以下命令启动Elasticsearch:
$ cd elasticsearch-x.x.x/bin/
$ ./elasticsearch
在Windows系统下可以通过运行bin/elasticsearch.bat文件启动Elasticsearch
在Linux系统下可以通过以下命令启动Logstash:
$ cd logstash-x.x.x/bin/
$ ./logstash -f /path/to/config.conf
在Windows系统下可以通过运行bin/logstash.bat文件启动Logstash
在Linux系统下可以通过以下命令启动Kibana:
$ cd kibana-x.x.x/bin/
$ ./kibana
在Windows系统下可以通过运行bin/kibana.bat文件启动Kibana
在启动三个工具后需要进行一些基础的配置以便这些工具能够正常工作
具体的配置方法可以参考各自的官方文档。通常需要进行的配置包括Elasticsearch的集群配置、Logstash的Pipeline配置和Kibana的索引配置等
输入数据源可以是各种系统生成的日志文件例如 Apache、Nginx 的访问日志,应用程序的运行日志等。可以通过配置文件等方式将日志文件发送到 Logstash 进行处理。
以下是示例代码:
input { // 指定输入源,这里假设使用的是 filebeat beats { port => "5044" type => "logs" } } filter { // 通过正则表达式进行过滤,提取所需信息 grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } // 时间格式化,将默认格式转换为 ISO 标准格式 date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { // 输出到 Elasticsearch 指定索引中 elasticsearch { hosts => ["localhost:9200"] index => "logs-%{+YYYY.MM.dd}" } }
在 Logstash 的配置文件中需要指定输入源(如上述代码中的 beats),就是从哪个通道读取数据。然后使用 filter 对数据进行过滤和处理,比如使用正则表达式对日志进行格式化。最后使用 output 将数据输出到 Elasticsearch 指定的索引中。
在看到数据实时输出到 Elasticsearch 中就可以在 Kibana 中对数据进行可视化处理了。首先需要在 Kibana 中创建索引以便展示至页面上。然后可以使用多种图表、面板等方式对数据进行分析、呈现和展示,例如使用柱状图展示日志访问量 TopN、使用仪表盘展示系统运行状态等。Kibana 的可视化方式非常灵活,完全可以根据不同的应用场景做出定制化的展示效果。
// 停止 Elasticsearch
service elasticsearch stop
// 启动 Elasticsearch
service elasticsearch start
// 重启 Elasticsearch
service elasticsearch restart
// 停止 Logstash
service logstash stop
// 启动 Logstash
service logstash start
// 重启 Logstash
service logstash restart
// 停止 Kibana
service kibana stop
// 启动 Kibana
service kibana start
// 重启 Kibana
service kibana restart
为了提高 Logstash 的性能可以进行如下配置文件优化:
要想提高 Elasticsearch 的性能可以进行如下优化:
ELK 是一款用于收集、存储、搜索和可视化日志的开源平台,它由 Elasticsearch、Logstash 和 Kibana 三个开源项目组成
以下是使用 ELK 收集服务器日志并分析的具体步骤:
input { file { path => "/var/log/messages" codec => plain { charset => "ISO-8859-1" } } } filter { json { source => "message" } date { match => [ "timestamp", "ISO8601" ] } } output { elasticsearch { hosts => [ "localhost:9200" ] index => "myserver-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }
bin/logstash -f /path/to/config.conf --config.reload.automatic
以上是一个简要的 ELK 收集服务器日志并分析的流程,具体实现要根据不同场景进行调整。
可以使用 Rsyslog 或者其他日志采集器来轻松地将日志推送到 ELK 应用程序。以下是一些构建远程日志系统的步骤:
module(load="imfile")
input(type="imfile"
File="/var/log/syslog"
Tag="system"
severity="warning"
Facility="local0")
action(type="omelasticsearch"
template="json"
searchIndex="syslog-%Y.%m.%d"
dynSearchIndex="on"
servers="127.0.0.1:9200")
curl -X PUT "localhost:9200/syslog-2023.05.30" -H 'Content-Type: application/json' -d' { "mappings": { "properties": { "@timestamp": { "type": "date" }, "hostname": { "type": "keyword" }, "message": { "type": "text" }, "program": { "type": "keyword" }, "priority": { "type": "keyword" }, "severity": { "type": "keyword" } } } } '
logger -t system -p local0.warning "this is a test message"
curl http://localhost:9200/syslog-2023.05.30/_search?pretty
以上是一个简单的搭建远程日志系统并实时监控的流程,具体实现要根据不同场景进行调整。
为了避免数据在Elasticsearch中出现重复可以使用Logstash中的aggregate插件。该插件可以将经过特定字段排序后,相同字段值的事件合并为一个。由于处理过程中需要在内存中保存数据,所以需要根据实际情况调整max_map_size和timeout参数避免意外的内存溢出。
示例代码:
input { file { path => "/log/path" start_position => "beginning" } } filter { # 针对某个字段进行排序 aggregate { task_id => "%{task_id}" # 字段名 code => " map['message'] ||= [] map['message'].push(event.get('message')) # 收集字段值 event.cancel() # 取消本行日志的索引 " push_map_as_event_on_timeout => true # 所有事件被组合为一个事件,放到Logstash的输出队列 timeout_task_id_field => "task_id" timeout => 3 # 等待时间,单位秒 } } output { elasticsearch { hosts => ["localhost:9200"] } }
Elasticsearch提供了批量API操作来提升性能。减少单个文档的请求次数可以降低网络开销,并减少Elasticsearch节点中内存的使用量。
默认情况下Elasticsearch会自动为每个新索引分配一个主分片和一个副本。这个操作通常非常耗费时间,磁盘空间和节点资源。所以当需要大量数据导入Elasticsearch时,建议关闭该功能,先导入数据后再手动建立索引。
设置方法:在elasticsearch.yml文件中增加以下内容
action.auto_create_index: false
在使用正则表达式时假如表达式存在语法错误,则过滤器将被卡住,不再适用于任何日志。这可能会导致Logstash停止收集而无法将日志发送到Elasticsearch中。在写正则表达式时建议先使用在线正则表达式工具进行测试。另外还可以在Logstash启动前运行一个自动化的正则表达式测试过程,以确保配置文件的完整性。
示例代码:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
if "_grokparsefailure" in [tags] {
drop { } # 发现Grok失败时,该事件会被地毯式丢弃
}
}
应用程序在给Logstash发送日志时很可能发生数据丢失。对于这种情况建议首先排除网络故障的可能性,并检查发送端和接收端之间的所有设备。
如果经过排除后数据仍然丢失,则可能是由于Logstash无法处理大量数据而导致的,建议将日志文件划分为使用相同过滤器预处理的较小块。另一个常见的解决方案是增加Elasticsearch节点来容纳更多数据,或者升级硬件配置以提高性能。
以上方式均无效考虑采用kafka等高并发系统进行数据缓冲,以及定时备份进行数据恢复。
示例代码:
input {
# 从Kafka主题my_topic中读取数据,该主题包含了在其他数据输入源中收集的数据
kafka {
bootstrap_servers => "localhost:9092"
topics => ["my_topic"]
client_id => "my_id"
group_id => "logstash"
auto_commit_interval_ms => 1000
codec => json_lines
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。