logstash实时采集log4j日志并存入elasticsearch案例实战
Logstash实时采集log4j日志配置
Java应用端log4j配置
properties文件配置方式
[html] view plain copy
print?
1.#请使用该socket   
2.log4j.appender.socket=org.apache.log4j.SocketAppender 
3.#logstash服务主机端口号 
4.log4j.appender.socket.Port=4567 
5.#logstash服务主机 
6.log4j.appender.socket.RemoteHost=168.7.1.67 
7.log4j.appender.socket.ReconnectionDelay=10000 
8.#输出类名、行、文件名等字段信息 
9.log4j.appender.socket.LocationInfo=true 
xml文件配置方式
[html] view plain copy
print?
1.#appender-ref 中引用 socketAppender 
2.<appender name="socketAppender" class="org.apache.log4j.SocketAppender"> 
3.<param name="remoteHost" value="168.7.1.67" /> 
4.<param name="port" value="4567" /> 
5. 
6.<param name="Threshold" value="DEBUG" /> 
7.<param name="ReconnectionDelay" value="10000" /> 
8.<param name="LocationInfo" value="true" /> 
9.</appender> 
Logstash服务端配置
conf文件配置
[ruby] view plain copy
print?
1.input { 
2.log4j { 
3.mode => "server" 
4.host => "168.37.1.67" 
5.port => 4567 
6.codec => plain { charset => "GB2312" } 
7.
8.
9.filter { 
10.
11.#判断trade.log 
12.if [method] == "execute" and (![stack_trace]) { 
13.grok { 
14.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" } 
15.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" } 
16.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }     
17.match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" } 
18.match => { "message" => "%{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" } 
19.remove_field  => ["message","thread","class","file","method"] 
20.add_field => [ "type", "tradelog" ] 
21.}         
22.}   
23.
24.#判断error.log 
25.else if [logger_name] == "pt" and (![stack_trace]) { 
26.kv { 
27.source => "message" 
28.field_split => "\|" 
29.value_split => "=" 
30.remove_field  => ["message","thread","class","file","method"] 
31.add_field => [ "type", "errorlog" ] 
32.
33.}   
34.
35.#不合条件的多余的消息不往下执行 
36.else { 
37.drop {} 
38.
39.
40.#解析时间字段 
41.date { 
42.match => ["timestamp","UNIX_MS"] 
43.remove_field => "timestamp" 
44.
45.
46.
47.output { 
48.
49.stdout{codec=> rubydebug} 
50.
51.if [type] == "tradelog" { 
52.elasticsearch { 
53.index => "log4j-tradelog" 
54.hosts => ["168.37.1.67:9200"]             
55.manage_template => true 
56.template_overwrite => true 
57.template => "/home/elk/myconf/tradelog_template.json" 
58.
59.
60.if [type] == "errorlog" { 
61.elasticsearch { 
62.index => "log4j-errorlog" 
63.hosts => ["168.37.1.67:9200"] 
64.
65.manage_template => true 
66.梦见丈夫有外遇template_overwrite => true 
67.template => "/home/elk/myconf/errorlog_template.json" 
68.
69.
70.
71.
input
使用log4j插件,开启一个服务,接收java应用端源源不断发送过来的数据
mode:可以是server 或者 client ,client模式会主动请求java应用端获取日志,server模式会接收java应用发过来的日志
host::启动logstash服务的主机,即localhost
port: 端口号
codec: 设置编码
filter
因为接收到的日志有多种不需要的日志,所以用条件判断进行过滤
trade日志
判断条件是method字段值是execute,并且没有stack_trace字段。
[ruby] view plain copy
print?
1.if [method] == "execute" and (![stack_trace]) { 
2.grok { 
3.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" } 
4.match => { "message" => "%{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" }