logstash实时采集log4j日志并存入elasticsearch案例实战
Logstash实时采集log4j日志配置
Java应用端log4j配置
properties文件配置方式
[html] view plain copy
print?
1.#请使用该socket
2.log4j.appender.socket=org.apache.log4j.SocketAppender
3.#logstash服务主机端口号
4.log4j.appender.socket.Port=4567
5.#logstash服务主机
6.log4j.appender.socket.RemoteHost=168.7.1.67
7.log4j.appender.socket.ReconnectionDelay=10000
8.#输出类名、行、文件名等字段信息
9.log4j.appender.socket.LocationInfo=true
xml文件配置方式
[html] view plain copy
print?
1.#appender-ref 中引用 socketAppender
2.<appender name="socketAppender" class="org.apache.log4j.SocketAppender">
3.<param name="remoteHost" value="168.7.1.67" />
4.<param name="port" value="4567" />
5.
6.<param name="Threshold" value="DEBUG" />
7.<param name="ReconnectionDelay" value="10000" />
8.<param name="LocationInfo" value="true" />
9.</appender>
Logstash服务端配置
conf文件配置
[ruby] view plain copy
print?
1.input {
2.log4j {
3.mode => "server"
4.host => "168.37.1.67"
5.port => 4567
6.codec => plain { charset => "GB2312" }
7.}
8.}
9.filter {
10.
11.#判断trade.log
12.if [method] == "execute" and (![stack_trace]) {
13.grok {
14.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }
15.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }
16.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
17.match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }
18.match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
19.remove_field => ["message","thread","class","file","method"]
20.add_field => [ "type", "tradelog" ]
21.}
22.}
23.
24.#判断error.log
25.else if [logger_name] == "pt" and (![stack_trace]) {
26.kv {
27.source => "message"
28.field_split => "\|"
29.value_split => "="
30.remove_field => ["message","thread","class","file","method"]
31.add_field => [ "type", "errorlog" ]
32.}
33.}
34.
35.#不合条件的多余的消息不往下执行
36.else {
37.drop {}
38.}
39.
40.#解析时间字段
41.date {
42.match => ["timestamp","UNIX_MS"]
43.remove_field => "timestamp"
44.}
45.
46.}
47.output {
48.
49.stdout{codec=> rubydebug}
50.
51.if [type] == "tradelog" {
52.elasticsearch {
53.index => "log4j-tradelog"
54.hosts => ["168.37.1.67:9200"]
55.manage_template => true
56.template_overwrite => true
57.template => "/home/elk/myconf/tradelog_template.json"
58.}
59.}
60.if [type] == "errorlog" {
61.elasticsearch {
62.index => "log4j-errorlog"
63.hosts => ["168.37.1.67:9200"]
64.
65.manage_template => true
66.梦见丈夫有外遇template_overwrite => true
67.template => "/home/elk/myconf/errorlog_template.json"
68.}
69.}
70.
71.}
input
使用log4j插件,开启一个服务,接收java应用端源源不断发送过来的数据
mode:可以是server 或者 client ,client模式会主动请求java应用端获取日志,server模式会接收java应用发过来的日志
host::启动logstash服务的主机,即localhost
port: 端口号
codec: 设置编码
filter
因为接收到的日志有多种不需要的日志,所以用条件判断进行过滤
trade日志
判断条件是method字段值是execute,并且没有stack_trace字段。
[ruby] view plain copy
print?
1.if [method] == "execute" and (![stack_trace]) {
2.grok {
3.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }
4.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }
发布评论