ELK04 ELK综合案例, logstash写入mysql, kibana访问验证 ubuntu使用

news/2024/10/21 21:35:59

6 ELK 综合实战案例

6.1 Filebeat 收集Nginx日志利用 Redis 缓存发送至 Elasticsearch

 图上ip地址仅供参考

6.1.2.2 修改 Filebeat 配置

#安装redis(访问0.0.0.0和密码123456),nginx(访问日志json格式)

[root@ubuntu ~]#vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access_json.logjson.keys_under_root: true #默认False会将json数据存储至message,改为true则会独立message外存储json.overwrite_keys: true  #设为true,覆盖默认的message字段,使用自定义json格式中的keytags: ["nginx-access"]- type: logenabled: truepaths:- /var/log/nginx/error.logtags: ["nginx-error"]- type: logenabled: truepaths:- /var/log/syslogtags: ["syslog"]output.redis:hosts: ["10.0.0.154:6379"]password: "123456"db: "0"key: "filebeat" #所有日志都存放在key名称为filebeat的列表中,llen filebeat可查看长度,即日志记录数
  
[root@ubuntu ~]#systemctl restart filebeat.service#查看redis
[root@ubuntu ~]#redis-cli -a 123456
127.0.0.1:6379> keys *
1) "filebeat"
127.0.0.1:6379> llen filebeat    #多少条数据
(integer) 91

6.1.4 安装并配置logstash收集Redis数据发送至Elasticsearch

#把之前的配置移走,防干扰
[root@ubuntu conf.d]#mv app_filebeat_filter_es.conf bak#注意:必须删除下面的文件中注释
[root@logstash ~]#vim /etc/logstash/conf.d/redis-to-es.conf 
input {redis {host => "10.0.0.154"port => "6379"password => "123456"db => "0"key => "filebeat"data_type => "list"}
}
filter {if "nginx-access" in [tags] {geoip {source => "clientip"      #日志必须是json格式,且有一个clientip的keytarget => "geoip"#database => "/etc/logstash/conf.d/GeoLite2-City.mmdb"   #指定数据库文件,可选(可下最新数据)add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lon]}"] 
#8.X添加经纬度字段包括经度,使用时删除注释(配合kibana画地图用,格式要求,不用就不用加)add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lat]}"] 
#8.X添加经纬度字段包括纬度(配合kibana画地图用,格式要求,不用就不用加)#add_field => ["[geoip][coordinates]", "%{[geoip][longitude]}"]         
#7,X添加经纬度字段包括经度#add_field => ["[geoip][coordinates]", "%{[geoip][latitude]}"]         
#7,X添加经纬度字段包括纬度
   }#转换经纬度为浮点数,注意:8X必须做,7.X 可不做此步(配合kibana画地图用,不用就不用加)
   mutate {convert => [ "[geoip][coordinates]", "float"]       }}mutate {    #不重要,不转不影响convert => ["upstreamtime","float"]}
}
output {if "syslog" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200","10.0.0.152:9200","10.0.0.153:9200"]index => "syslog-%{+YYYY.MM.dd}"}}if "nginx-access" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200","10.0.0.152:9200","10.0.0.153:9200"]index => "nginxaccess-%{+YYYY.MM.dd}"#注意:7.x之前版本地图功能要求必须使用 logstash 开头的索引名称template_overwrite => true    #防止名称相同,覆盖
       }stdout {    #有没有无所谓codec => "rubydebug"}}if "nginx-error" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200","10.0.0.152:9200","10.0.0.153:9200"]index => "nginxerrorlog-%{+YYYY.MM.dd}"template_overwrite => true}}
}#logstash取一条数据,redis会少一条数据
[root@logstash ~]#systemctl restart logstash#redis的数据全部被取走,清空了

6.2 Filebeat 收集Nginx日志并写入 Kafka 缓存发送至 Elasticsearch

6.2.2 Filebeat 收集 Nginx 的访问和错误日志并发送至 kafka

[root@web1 ~]#vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access_json.logjson.keys_under_root: truejson.overwrite_keys: truetags: ["nginx-access"]- type: logenabled: truepaths:- /var/log/nginx/error.log tags: ["nginx-error"]- type: logenabled: truepaths:- /var/log/syslog tags: ["syslog"]output.kafka:    #这测试单机kafka能写入topic但没法读,集群没问题hosts: ["10.0.0.156:9092", "10.0.0.157:9092", "10.0.0.158:9092"]topic: filebeat-log   #指定kafka的topic
  partition.round_robin:reachable_only: true #true表示只发布到可用的分区,false时表示所有分区,如果一个节点down会导致blockrequired_acks: 1  #如果为0,表示不确认,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区
  compression: gzip  max_message_bytes: 1000000 #每条消息最大长度,以字节为单位,如果超过将丢弃

[root@web1 ~]#systemctl restart filebeat.service#查看kafka是否有topic
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.0.0.156:9092
filebeat-log
#在kafka查看是否有数据
[root@node1 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic filebeat-log --bootstrap-server 10.0.0.156:9092 --from-beginning

6.2.3 配置 Logstash 读取 Kafka 日志发送到 Elasticsearch

[root@ubuntu conf.d]#mv redis-to-es.conf bak
[root@logstash ~]#vim /etc/logstash/conf.d/kafka-to-es.conf
input {kafka {bootstrap_servers => "10.0.0.156:9092,10.0.0.157:9092,10.0.0.158:9092"topics => "filebeat-log"codec => "json"#group_id => "logstash" #消费者组的名称(多个logstash要分组,防止重复消费)#consumer_threads => "3" #建议设置为和kafka的分区相同的值为线程数#topics_pattern => "nginx-.*" #通过正则表达式匹配topic,而非用上面topics=>指定固定值
 }
}
filter {if "nginx-access" in [tags] {geoip {source => "clientip"target => "geoip"add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lon]}"]add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lat]}"]}mutate {convert => [ "[geoip][coordinates]", "float"]}}mutate {convert => ["upstreamtime","float"]}
}
output {#stdout {}   #调试使用if "nginx-access" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"}}
if "nginx-error" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"}}
if "syslog" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-syslog-%{+YYYY.MM.dd}"}}
}
[root@logstash ~]#systemctl restart logstash.service 

 

6.3 通过坐标地图统计网站访问用户的 IP 所在城市

6.3.7 Kibana 创建地图数据

6.3.7.1 查看 Kibana 样例地图

kibana有样例数据

kibana首页下使用样例数据,点击其他样例数据集,下sample web logs,点击添加
在左侧分析下发现可以查看添加的样例数据集(参考格式)
左侧分析下点击maps,样例数据已经画好地图,点击即可查看

6.3.7.2 创建地图

kibana上画地图

左侧分析下点击maps,点击右侧创建地图,点击添加图层,选择数据(得是地理空间字段)
#7版本,地理数据直接可以使用了,但是8版本需要把地理数据转格式#7.x支持 float格式,8.x支持geo_point类型。

8版本需要对地理数据进行转换

点击左侧management下的开发工具
在控制台下输入索引名,去get获取返回
GET /logstash-kafka-nginx-accesslog-2023.02.19
点击三角箭头,把返回的内容复制出来,做下修改
#下方类型应该是point,但是现在是float类型
"geoip": {"properties": {"coordinates": {"type": "float"    #只修改此行,原值为float改为geo_point
            },#必须要先删除旧的索引数据,下面修改才能生效

执行下面操作生成索引模板,将上面的mappings部分内容(一直到"settings"前面)复制到下面,只修改"coordinates": { "type": "geo_point" } 部分
#在mappings上方追加下面的内容
PUT /_template/template_nginx_accesslog   #模板名称可以自定义
{"index_patterns" : ["logstash-kafka-nginx-accesslog-*"  #8.X数据视图对应的索引模式,匹配索引名称的模式
 ],"order" : 0,"aliases" : { },    #在mappings上面追加这些内容"mappings": {                 #复制mappings开始的行到settings行之前结束,并最后再加一
个 }"properties": {......."geoip": {"properties": {"coordinates": {"type": "geo_point",      #只修改此行,原值为float改为geo_point
         ......."xff": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}}}}
}#在重新生成索引(源头日志生成数据),看索引内容后面生成的就是geoip类型#再查一次,看看geoip字段是否改过来了
GET logstash-kafka-nginx-accesslog-2023.02.19

kibana上画地图

左侧分析下点击maps,点击右侧创建地图,点击添加图层
先点文档,选择数据视图,点击地理空间字段,添加,点保存添加到仪表板
分享链接给别人

6.4 Logstash收集日志写入 MySQL 数据库

一般es都会定期删除旧数据,想保留的数据就存到mysql中

6.4.1 安装 MySQL 数据库

[root@es-node1 ~]#apt install mysql-server
[root@es-node1 ~]#sed -i '/127.0.0.1/s/^/#/' /etc/mysql/mysql.conf.d/mysqld.cnf
#[root@es-node1 ~]#vim /etc/mysql/mysql.conf.d/mysqld.cnf
#bind-address        = 0.0.0.0
#mysqlx-bind-address = 0.0.0.0
[root@es-node1 ~]#systemctl restart mysql.service 

6.4.2 创建库和表并授权用户登录

[root@es-node1 ~]#mysql
mysql> create database elk ;                                      #MySQL8.0以上默认就支持
#mysql> create database elk character set utf8 collate utf8_bin;  #MySQL5.7以下需要显示指定(默认Latin)
mysql> create user elk@"10.0.0.%" identified by '123456';
mysql> grant all privileges on elk.* to elk@"10.0.0.%";
mysql> flush privileges;#创建表,字段对应需要保存的数据字段
mysql> use elk
mysql> create table elklog (clientip varchar(39),responsetime float(10,3),uri varchar(256),status  char(3),time timestamp default current_timestamp );
mysql> desc elklog;
+--------------+--------------+------+-----+-------------------+-------+
| Field       | Type         | Null | Key | Default           | Extra |
+--------------+--------------+------+-----+-------------------+-------+
| clientip     | varchar(39) | YES |     | NULL             |       |
| responsetime | float(10,3) | YES |     | NULL             |       |
| uri         | varchar(256) | YES |     | NULL             |       |
| status       | char(3)     | YES |     | NULL             |       |
| time         | timestamp   | NO   |     | CURRENT_TIMESTAMP |       |
+--------------+--------------+------+-----+-------------------+-------+
5 rows in set (0.00 sec)

6.4.3 测试用户从 Logstash 主机远程登录(可选)

[root@logstash1 ~]#apt -y install mysql-client
[root@logstash1 ~]#mysql -uelk -p123456 -h10.0.0.155 -e 'show databases'
mysql: [Warning] Using a password on the command line interface can be insecure.
+--------------------+
| Database           |
+--------------------+
| information_schema |
| elk               |
+--------------------+

6.4.4 Logstash 配置 mysql-connector-java 包

logstash连接mysql需要这个包
官方下载地址:https://dev.mysql.com/downloads/connector/ 
#点击Connetor/J,选择对应mysql版本,系统版本#下载mysql-connector-j_8.0.33-1ubuntu22.04_all.deb
[root@logstash-ubuntu2204 ~]#dpkg -i mysql-connector-j_8.0.33-1ubuntu22.04_all.deb

[root@logstash-ubuntu2204 ~]#dpkg -L mysql-connector-j 
...
/usr/share/java
/usr/share/java/mysql-connector-j-8.0.33.jar#2)复制jar文件到logstash指定的目录下
[root@logstash-ubuntu2204 ~]#mkdir -p /usr/share/logstash/vendor/jar/jdbc 
[root@logstash-ubuntu2204 ~]#cp /usr/share/java/mysql-connector-j-8.0.33.jar /usr/share/logstash/vendor/jar/jdbc/

6.4.6 安装logstash-output-jdbc插件

#查看安装的插件,默认只有input-jdbc插件,需要安装output-jdbc插件
[root@logstash1 ~]#/usr/share/logstash/bin/logstash-plugin list|grep jdbc
......
logstash-integration-jdbc├── logstash-input-jdbc    #当前只有input-jdbc├── logstash-filter-jdbc_streaming└── logstash-filter-jdbc_static#在线安装output-jdbc插件,可能会等较长时间
[root@logstash1 ~]#/usr/share/logstash/bin/logstash-plugin install logstash-output-jdbc
#离线安装
#如果无法在线安装,可以先从已经安装的主机导出插件,再导入
#导出插件
[root@ubuntu2204 ~]#/usr/share/logstash/bin/logstash-plugin prepare-offline-pack logstash-output-jdbc
#离线导入插件
[root@ubuntu2204 ~]#/usr/share/logstash/bin/logstash-plugin install file:///root/logstash-offline-plugins-8.12.2.zip#删除插件
#[root@logstash1 ~]#/usr/share/logstash/bin/logstash-plugin remove logstash-output-jdbc

6.4.8 配置 Logstash 将日志写入数据库

[root@logstash ~]#vim /etc/logstash/conf.d/kafka-to-es.conf
input {kafka {bootstrap_servers => "10.0.0.156:9092,10.0.0.157:9092,10.0.0.158:9092"topics => "filebeat-log"codec => "json"}
}
filter {if "nginx-access" in [tags] {geoip {source => "clientip"target => "geoip"add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lon]}"]add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lat]}"]}mutate {convert => [ "[geoip][coordinates]", "float"]}}mutate {convert => ["upstreamtime","float"]}
}
output {if "nginx-access" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"}jdbc {    #追加mysql写入connection_string => "jdbc:mysql://10.0.0.155/elk?user=elk&password=123456&useUnicode=true&characterEncoding=UTF8"statement => ["INSERT INTO elklog (clientip,responsetime,uri,status) VALUES(?,?,?,?)","clientip","responsetime","uri","status"]}}
if "nginx-error" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"}}
if "syslog" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-syslog-%{+YYYY.MM.dd}"}}
}
#启动测试
[root@logstash ~]#logstash -f /etc/logstash/conf.d/kafka-to-es.conf#filebeat端生成一些数据做测试
[root@ubuntu ~]#head -n11 access_json.log-20230221.0 >> /var/log/nginx/access_json.log
#查看mysql
[root@ubuntu ~]#mysql
mysql> use elk
#出现数据
mysql> select * from elklog;
+----------------+--------------+---------------+--------+---------------------+
| clientip       | responsetime | uri           | status | time                |
+----------------+--------------+---------------+--------+---------------------+
| 20.203.179.186 |        0.000 | /.git/config  | 302    | 2024-10-21 10:19:40 |
| 20.203.179.186 |        0.000 | /.git/config  | 302    | 2024-10-21 10:19:40 |
| 218.94.106.239 |        0.000 | /             | 302    | 2024-10-21 10:19:40 |

 

最后这个kibana谁都能打开不安全,可以用nginx反向代理,用它的basic账号验证功能

[root@ubuntu ~]#apt install nginx
[root@ubuntu ~]#vim /etc/nginx/sites-enabled/default
location / {#try_files $uri $uri/ =404;proxy_pass http://127.0.0.1:5601;auth_basic "kibana site";    #提示auth_basic_user_file conf.d/htpasswd;    #放密码的文件
        }
#生成密码文件
[root@ubuntu ~]#apt install apache2-utils
[root@ubuntu ~]#htpasswd -c -b /etc/nginx/conf.d/htpasswd xiaoming 123456
#还有其他人就再创建,第二次不能加-c
[root@ubuntu ~]#htpasswd -b /etc/nginx/conf.d/htpasswd xiaohong 123456#浏览器输入, 即可验证登录
http://10.0.0.154/#把kibana的配置允许远程访问改了
[root@ubuntu ~]#vim /etc/kibana/kibana.yml
#server.host: "0.0.0.0"    #注释掉,默认为localhost

[root@ubuntu ~]#systemctl restart kibana.service 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/74363.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

Cyclic GCDs - 神圣的数学题

Cyclic GCDs 题面 【题目描述】 给定一个长为 \(N\) 的序列 \(a_1,a_2,\dots,a_N\)。 设一个置换 \(p\) 的价值 \(f(p)\) 为每个轮换中最小的 \(a_i\) 的乘积。 设 \(b_i\) 为有 \(i\) 个轮换的所有置换 \(p\) 的 \(f(p)\) 之和。 求 \(\gcd(b_1,b_2,\dots,b_N) \bmod{99824435…

『模拟赛』多校A层冲刺NOIP2024模拟赛10

『模拟赛记录』多校A层冲刺NOIP2024模拟赛10Rank 原来不止我一个人在赤石 Upd:T2 数据多次更改,Rank 变化 4→2→4A. 岛屿 唐氏题。 赛时找规律+分讨+递推出了正解,时间不够没测直接交了,然后 long double 用了 %lf 直接寄掉。赛时思路很复杂啊,讨论了一大堆,发现其实合并…

单射,满射和双射区分

单射: 定义:函数F称为一对一的,当且仅当对于F定义域中的所有x和y,f(x)=f(y)蕴含着x=y。一对一函数也称单射函数或入射函数1.x一定都要连接,不连接则不是函数 2.y只能有一个连接,可以为空但是不能有多个 错误情况:满射 定义:给定函数F:x→y,当且仅当对∀y∈Y,都有x∈X…

20240917【省选】模拟

难说 T1 暴力可以写dp 只要你学过线性基那么你就会想怎么用线性基做,显然是要套点数据结构维护的。你要知道两个线性基可以在 \(O(\log^2 n)\) 的时间内合并,得到的线性基是原来两个的交。可以用线段树维护,复杂度 \(O((n+q)\log n\log^2 a)\),难说能不能过。 没有修改,所…

高级语言程序设计课程第四次个人作业

班级:https://edu.cnblogs.com/campus/fzu/2024C 作业:https://edu.cnblogs.com/campus/fzu/2024C/homework/13293 学号:102400118 姓名:林嘉祚 6.16.16.16.56.16.76.16.86.16.96.16.106.16.126.16.136.16.156.16.166.16.187.12.17.12.27.12.47.12.57.12.67.12.77.12.87.12.97…

2024Ciscn总决赛Web Writeup

前言 鸽了三个月的复现计划:) ezjs 考点是express引擎解析的一个trick,在高版本的express已经修复,先贴源码 const express = require(express); const ejs=require(ejs) const session = require(express-session); const bodyParse = require(body-parser); const multer =…

C# 新建的类库无法添加 System.Drawing 引用问题

C#类库添加System.Drawing引用时,选择程序集 -> 框架 -> 再搜索对应的库,添加引用就可以了。不要在COM中搜索。

20222420 2024-2025-1 《网络与系统攻防技术》实验二实验报告

20222420 2024-2025-1 《网络与系统攻防技术》实验二实验报告 1.实验内容 1.1 本周学习内容 1.1.1 后门介绍后门概念:不经过正常认证流程而访问系统的通道 后门类型:编译器、操作系统、应用程序中的后门,潜伏于OS或伪装成APP的后门程序1.1.2 后门案例编译器后门:Xcode 操作…