- ELK stack權威指南
- 饒琛琳
- 6342字
- 2018-12-31 21:08:12
2.3 過濾器配置
有豐富的過濾器插件,是Logstash威力如此強大的重要因素。名為過濾器,其實提供的不單單是過濾的功能。下面我們就會重點介紹幾個插件,它們擴展了進入過濾器的原始數據,進行復雜的邏輯處理,甚至可以無中生有地添加新的Logstash事件到后續的流程中去!
2.3.1 date時間處理
之前章節已經提過,logstash-filter-date插件可以用來轉換你的日志記錄中的時間字符串,變成LogStash::Timestamp對象,然后轉存到@timestamp字段里。
警告
因為在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}這種寫法必須讀取@timestamp數據,所以一定不要直接刪掉這個字段保留自己的字段,而是應該用logstash-filter-date轉換后刪除自己的字段!
這在導入舊數據的時候固然非常有用,而在實時數據處理的時候同樣有效,因為一般情況下數據流程中我們都會有緩沖區,導致最終的實際處理時間跟事件產生時間略有偏差。
提示
強烈建議打開Nginx的access_log配置項的buffer參數,對極限響應性能有極大提升!
1.配置示例
logstash-filter-date插件支持五種時間格式:
□ ISO8601:類似“2011-04-19T03:44:01.103Z”這樣的格式。具體Z后面可以有“08:00”也可以沒有,“.103”這個也可以沒有。常用場景里來說,Nginx的log_format配置里就可以使用$time_iso8601變量來記錄請求時間成這種格式。
□ UNIX:UNIX時間戳格式,記錄的是從1970年起始至今的總秒數。Squid默認日志格式中就使用了這種格式。
□ UNIX_MS:這個時間戳則是從1970年起始至今的總毫秒數。據我所知,JavaScript里經常使用這個時間格式。
□ TAI64N:TAI64N格式比較少見,是這個樣子的:@4000000052f88ea32489532c。我目前只知道常見應用中,qmail會用這個格式。
□ Joda-Time庫:Logstash內部使用了Java的Joda時間庫來作時間處理。所以我們可以使用Joda庫所支持的時間格式來作具體定義。Joda時間格式定義見表2-1。
表2-1 Joda時間庫格式

下面我們寫一個Joda時間格式的配置作為示例:
filter { grok { match => [“message”, “%{HTTPDATE:logdate}”] } date { match => [“logdate”, “dd/MMM/yyyy:HH:mm:ss Z”] } }
注意,時區偏移量只需要用一個字母Z即可。
2.時區問題的解釋
很多中國用戶經常提一個問題:為什么@timestamp比我們早了8個小時?怎么修改成北京時間?
其實,Elasticsearch內部,對時間類型字段,是統一采用UTC時間,存成long長整形數據的!對日志統一采用UTC時間存儲,是國際安全/運維界的一個通識——歐美公司的服務器普遍廣泛分布在多個時區里——不像中國,地域橫跨五個時區卻只用北京時間。
對于頁面查看,ELK的解決方案是在Kibana上,讀取瀏覽器的當前時區,然后在頁面上轉換時間內容的顯示。
所以,建議大家接受這種設定。否則,即便你用.getLocalTime修改,也還要面臨在Kibana過去修改,以及Elasticsearch原有的["now-1h"TO"now"]這種方便的搜索語句無法正常使用的尷尬。
以上,請讀者自行斟酌。
2.3.2 grok正則捕獲
grok是Logstash最重要的插件。你可以在grok里預定義好命名正則表達式,在稍后(grok參數或者其他正則表達式里)引用它。
1.正則表達式語法
運維工程師多多少少都會一點正則。你可以在grok里寫標準的正則,像下面這樣:
\s+(?<request_time>\d+(?:\.\d+)?)\s+
這個正則表達式寫法對于Perl或者Ruby程序員應該很熟悉了,Python程序員可能更習慣寫(?P<name>pattern),沒辦法,適應一下吧。
現在給我們的配置文件添加第一個過濾器區段配置。配置要添加在輸入和輸出區段之間(Logstash執行區段的時候并不依賴于次序,不過為了自己看得方便,還是按次序書寫吧):
input {stdin{}} filter { grok { match => { “message” =>“\s+(?<request_time>\d+(?:\.\d+)?)\s+” } } } output {stdout{}}
運行Logstash進程然后輸入“begin 123.456 end”,你會看到類似下面這樣的輸出:
{“message” =>“begin 123.456 end”,“@version” =>“1”,“@timestamp” =>“2014-08-09T11:55:38.186Z”,“host” =>“raochenlindeMacBook-Air.local”,“request_time” =>“123.456” }
漂亮!不過數據類型好像不太滿意……request_time應該是數值而不是字符串。
我們已經提過稍后會學習用LogStash::Filters::Mutate來轉換字段值類型,不過在grok里,其實有自己的魔法來實現這個功能!
2.grok表達式語法
grok支持把預定義的grok表達式寫入到文件中,官方提供的預定義grok表達式見:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
下面是從官方文件中摘抄的最簡單但是足夠說明用法的示例:
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME}
第一行,用普通的正則表達式來定義一個grok表達式;第二行,通過打印賦值格式,用前面定義好的grok表達式來定義另一個grok表達式。
grok表達式的打印復制格式的完整語法見下行示例。其中data_type目前只支持兩個值:int和float。
%{PATTERN_NAME:capture_name:data_type}
所以我們可以改進我們的配置成下面這樣:
filter { grok { match => { “message” =>“%{WORD} %{NUMBER:request_time:float} %{WORD}” } } }
重新運行進程然后可以得到如下結果:
{“message” =>“begin 123.456 end”,“@version” =>“1”,“@timestamp” =>“2014-08-09T12:23:36.634Z”,“host” =>“raochenlindeMacBook-Air.local”,“request_time” => 123.456 }
這次request_time變成數值類型了。
3.最佳實踐
實際運用中,我們需要處理各種各樣的日志文件,如果你都是在配置文件里各自寫一行自己的表達式,就完全不可管理了。所以,我們建議是把所有的grok表達式統一寫入到一個地方。然后用filter/grok的patterns_dir選項來指明。
如果你把“message”里所有的信息都grok到不同的字段了,數據實質上就相當于是重復存儲了兩份。所以你可以用remove_field參數來刪除掉message字段,或者用overwrite參數來重寫默認的message字段,只保留最重要的部分。
重寫參數的示例如下:
filter { grok { patterns_dir =>“/path/to/your/own/patterns” match => { “message” =>“%{SYSLOGBASE} %{DATA:message}” } overwrite => [“message”] } }
4.高級用法
□ 多行匹配 在和codec/multiline搭配使用的時候,需要注意一個問題,grok正則和普通正則一樣,默認是不支持匹配回車換行的。就像你需要=~//m一樣也需要單獨指定,具體寫法是在表達式開始位置加(?m)標記。如下所示:
match => {“message” =>“(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+” }
□ 多項選擇 有時候我們會碰上一個日志有多種可能格式的情況。這時候要寫成單一正則就比較困難,或者全用|隔開又比較丑陋。這時候,Logstash的語法提供給我們一個有趣的解決方式。
文檔中,都說明logstash-filters-grok插件的match參數應該接受的是一個Hash值。但是因為早期的Logstash語法中Hash值也是用[]這種方式書寫的,所以其實現在傳遞Array值給match參數也完全沒問題。所以,我們這里其實可以傳遞多個正則來匹配同一個字段:
match => [“message”, “(?<request_time>\d+(?:\.\d+)?)”,“message”, “%{SYSLOGBASE} %{DATA:message}”,“message”, “(?m)%{WORD}” ]
Logstash會按照這個定義次序依次嘗試匹配,到匹配成功為止。雖說效果跟用|分割寫個大大的正則是一樣的,但是可閱讀性好了很多。
提示
我強烈建議每個人都要使用Grok Debugger(http://grokdebug.herokuapp.com/)來調試自己的grok表達式。
2.3.3 GeoIP地址查詢
GeoIP是最常見的免費IP地址歸類查詢庫,同時也有收費版可以采購。GeoIP庫可以根據IP地址提供對應的地域信息,包括國別、省市、經緯度等,對于可視化地圖和區域統計非常有用。
配置示例如下:
filter { geoip { source =>“message” } }
運行結果如下:
{“message” =>“183.60.92.253”,“@version” =>“1”,“@timestamp” =>“2014-08-07T10:32:55.610Z”,“host” =>“raochenlindeMacBook-Air.local”,“geoip” => {“ip” =>“183.60.92.253”,“country_code2” =>“CN”,“country_code3” =>“CHN”,“country_name” =>“China”,“continent_code” =>“AS”,“region_name” =>“30”,“city_name” =>“Guangzhou”,“latitude” =>23.11670000000001,“longitude” =>113.25,“timezone” =>“Asia/Chongqing”,“real_region_name” =>“Guangdong”,“location” => [ [0] 113.25, [1] 23.11670000000001 ] } }
GeoIP庫數據較多,如果你不需要這么多內容,可以通過fields選項指定自己所需要的。下例為全部可選內容:
filter { geoip { f?ields => [“city_name”, “continent_code”, “country_code2”, “country_code3”, “country_name”, “dma_code”, “ip”, “latitude”, “longitude”, “postal_code”, “region_name”, “timezone”] } }
需要注意的是:geoip.location是Logstash通過latitude和longitude額外生成的數據。所以,如果你是想要經緯度又不想重復數據的話,應該像下面這樣做:
filter { geoip { fields => [“city_name”, “country_code2”, “country_name”, “latitude”, “longitude”, “region_name”] remove_field => [“[geoip][latitude]”, “[geoip][longitude]”] } }
還要注意:geoip插件的“source”字段可以是任一處理后的字段,比如“client_ip”,但是字段內容卻需要小心!GeoIp庫內只存有公共網絡上的IP信息,查詢不到結果的,會直接返回null,而Logstash的GeoIp插件對null結果的處理是:“不生成對應的geoip.字段”。所以讀者在測試時,如果使用了諸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等內網地址,會發現沒有對應輸出!
2.3.4 JSON編解碼
在上一章,已經講過在Codec中使用JSON編碼。但是,有些日志可能是一種復合的數據結構,其中只有一部分記錄是JSON格式的。這時候,我們依然需要在filter階段,單獨啟用JSON解碼插件。
配置示例如下:
filter { json { source =>“message” target =>“jsoncontent” } }
運行結果如下:
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “{\”uid\“:3081609001,\”type\“:\”signal\“}”,“jsoncontent”: {“uid”: 3081609001,“type”: “signal” } }
如果不打算使用多層結構的話,刪掉target配置即可。單層結構新的結果如下:
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “{\”uid\“:3081609001,\”type\“:\”signal\“}”,“uid”: 3081609001,“type”: “signal” }
2.3.5 key-value切分
在很多情況下,日志內容本身都是一個類似于key-value的格式,但是格式具體的樣式卻是多種多樣的。Logstash提供logstash-filter-kv插件,幫助處理不同樣式的key-value日志,變成實際的LogStash::Event數據。
配置示例如下:
filter { ruby { init =>“@kname = ['method','uri','verb']” code =>“event.append(Hash[@kname.zip(event['request'].split(‘ ’))])” } if [uri] { ruby { init =>“@kname = ['url_path','url_args']” code =>“event.append(Hash[@kname.zip(event['uri'].split(‘?’))])” } kv { pref?ix =>“url_” source =>“url_args” field_split =>“&” remove_field => [ “url_args”, “uri”, “request” ] } } }
Nginx訪問日志中的$request,通過這段配置,可以詳細切分成method、url_path、verb、url_a、url_b...
進一步,如果url_args中有過多字段,可能導致Elasticsearch集群因為頻繁update mapping或者消耗太多內存在cluster state上而宕機。所以,更優的選擇是只保留明確有用的url_args內容,其他部分舍去,如下所示:
kv { prefix =>“url_” source =>“url_args” field_split =>“&” include_keys => [ “uid”, “cip” ] remove_field => [ “url_args”, “uri”, “request” ] }
上例即表示,除了url_uid和url_cip兩個字段以外,其他的url_*都不保留。
2.3.6 metrics數值統計
logstash-filter-metrics插件是使用Ruby的Metriks模塊來實現在內存里實時地計數和采樣分析。該模塊支持兩個類型的數值分析:meter和timer。下面分別舉例說明。
1.Meter示例(速率閾值檢測)
Web訪問日志的異常狀態碼頻率是運維人員會非常關心的一個數據。通常我們的做法是通過Logstash或者其他日志分析腳本,把計數發送到rrdtool或者graphite里面,然后再通過check_graphite腳本之類的東西來檢查異常并報警。
事實上這個事情可以直接在Logstash內部就完成。比如如果最近一分鐘504請求的個數超過100個就報警,如下所示:
filter { metrics { meter =>“error.%{status}” add_tag =>“metric” ignore_older_than => 10 } if “metric” in [tags] { ruby { code =>“event.cancel if event['error.504.rate_1m'] * 60 < 100” } } } output { if “metric” in [tags] { exec { command =>“echo \”Out of threshold: %{error.504.rate_1m}\“” } } }
這里需要注意*60的含義。metrics模塊生成的rate_1m/5m/15m意思是:最近1、5、15分鐘的每秒速率!
2.Timer示例(box and whisker異常檢測)
官版的logstash-filter-metrics插件只適用于metric事件的檢查。由插件生成的新事件內部不存有來自input區段的實際數據信息。所以,要完成我們的百分比分布箱體檢測,需要首先對代碼稍微做幾行變動,即在metric的timer事件里加一個屬性,存儲最近一個實際事件的數值:https://github.com/chenryn/logstash/commit/bc7bf34caf551d8a149605cf28e7c5d33fae7458
然后我們就可以用如下配置來探測異常數據了:
filter { metrics { timer => {“rt” =>“%{request_time}”} percentiles => [25, 75] add_tag =>“percentile” } if “percentile” in [tags] { ruby { code =>“l=event['rt.p75']-event['rt.p25'];event['rt.low'] =event['rt.p25']-l;event['rt.high']=event['rt.p75']+l” } } } output { if “percentile” in [tags] and ([rt.last] > [rt.high] or [rt.last] < [rt.low]) { exec { command =>“echo \”Anomaly: %{rt.last}\“” } } }
提示
有關box and shisker plot內容和重要性,參見《數據之魅》一書。
2.3.7 mutate數據修改
logstash-filter-mutate插件是Logstash另一個重要插件,它提供了豐富的基礎類型數據處理能力,包括類型轉換、字符串處理和字段處理等。
1.類型轉換
類型轉換是logstash-filter-mutate插件最初誕生時的唯一功能。其應用場景在之前JSON編解碼小節已經提到。
可以設置的轉換類型包括:“integer”、“float”和“string”。示例如下:
filter { mutate { convert => [“request_time”, “float”] } }
注意
mutate除了轉換簡單的字符值,還支持對數組類型的字段進行轉換,即將[“1”,“2”]轉換成[1,2]。但不支持對哈希類型的字段做類似處理。有這方面需求的可以采用稍后講述的logstash-filter-ruby插件完成。
2.字符串處理
有如下字符串處理的插件:
□ gsub:僅對字符串類型字段有效。
gsub => [“urlparams”, “[\\?#]”, “_”]
□ split:分割字符串。
filter { mutate { split => [“message”, “|”] } }
隨意輸入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下輸出:
{“message” => [ [0] “123”, [1] “321”, [2] “adfd”, [3] “dfjld*=123” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T15:58:23.120Z”,“host” =>“raochenlindeMacBook-Air.local” }
□ join:僅對數組類型字段有效。
我們在之前已經用split割切的基礎上再join回去。配置改成:
filter { mutate { split => [“message”, “|”] } mutate { join => [“message”, “,”] } }
filter區段之內,是順序執行的。所以我們最后看到的輸出結果是:
{“message” =>“123,321,adfd,dfjld*=123”,“@version” =>“1”,“@timestamp” =>“2014-08-20T16:01:33.972Z”,“host” =>“raochenlindeMacBook-Air.local” }
□ merge:合并兩個數組或者哈希字段。依然在之前split的基礎上繼續:
filter { mutate { split => [“message”, “|”] } mutate { merge => [“message”, “message”] } }
我們會看到輸出:
{“message” => [ [0] “123”, [1] “321”, [2] “adfd”, [3] “dfjld*=123”, [4] “123”, [5] “321”, [6] “adfd”, [7] “dfjld*=123” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T16:05:53.711Z”,“host” =>“raochenlindeMacBook-Air.local” }
如果src字段是字符串,會自動先轉換成一個單元素的數組再合并。把上一示例中的來源字段改成“host”:
filter { mutate { split => [“message”, “|”] } mutate { merge => [“message”, “host”] } }
結果變成:
{“message” => [ [0] “123”, [1] “321”, [2] “adfd”, [3] “dfjld*=123”, [4] “raochenlindeMacBook-Air.local” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T16:07:53.533Z”,“host” => [ [0] “raochenlindeMacBook-Air.local” ] }
看,目的字段“message”確實多了一個元素,但是來源字段“host”本身也由字符串類型變成數組類型了!
同樣,如果目的字段不是數組,也會被強制轉換。即使來源字段并不存在:
filter { mutate { merge => [“message”, “not_exist_field”] } }
結果會變成:
{“message” => [ [0] “123|321|adfd|dfjld*=123” ],“@version” =>“1”,“@timestamp” =>“2014-08-20T15:58:23.120Z”,“host” =>“raochenlindeMacBook-Air.local” }
□ strip:去除字段內容前后的空格。可以接受數組參數:
filter { mutate { strip => [“syslog_message”, “syslog_datetime”] } }
□ lowercase:將字段內容全部轉換成小寫字母。同樣可以接受數組。在ELK stack場景中,將內容轉換成小寫會是一個比較常見的需求。因為Elasticsearch默認是統一按照小寫字母來搜索的。為了確保檢索準確率,在不影響使用的情況下,建議對常用檢索字段啟用lowercase配置。
□ uppercase:將字段內容全部轉換成大寫字母。同樣可以接受數組。
3.字段處理
字段處理的插件有:
□ rename:重命名某個字段,如果目的字段已經存在,會被覆蓋掉,如下所示:
filter { mutate { rename => [“syslog_host”, “host”] } }
□ update:更新某個字段的內容。如果字段不存在,不會新建。
□ replace:作用和update類似,但是當字段不存在的時候,它會起到add_field參數一樣的效果,自動添加新的字段。
4.執行次序
需要注意的是,filter/mutate內部是有執行次序的。其次序如下:
rename(event) if @rename update(event) if @update replace(event) if @replace convert(event) if @convert gsub(event) if @gsub uppercase(event) if @uppercase lowercase(event) if @lowercase strip(event) if @strip remove(event) if @remove split(event) if @split join(event) if @join merge(event) if @merge filter_matched(event)
而filter_matched這個filters/base.rb里繼承的方法也是有次序的:
@add_field.each do |field, value| end @remove_field.each do |field| end @add_tag.each do |tag| end @remove_tag.each do |tag| end
2.3.8 隨心所欲的Ruby處理
如果你稍微懂那么一點點Ruby語法的話,logstash-filter-ruby插件將會是一個非常有用的工具。比如你需要稍微修改一下LogStash::Event對象,但是又不打算為此寫一個完整的插件,用logstash-filter-ruby插件絕對感覺良好。
配置示例如下:
filter { ruby { init =>“@kname = ['client','servername','url','status','time','size','upstream', 'upstreamstatus','upstreamtime','referer','xff','useragent']” code =>“event.append(Hash[@kname.zip(event['message'].split(‘|’))])” } }
官網示例是一個比較有趣但是沒啥大用的做法——隨機取消90%的事件。
所以上面我們給出了一個有用而且強大的實例。
通常我們都是用logstash-filter-grok插件來捕獲字段的,但是正則耗費大量的CPU資源,很容易成為Logstash進程的瓶頸。
而實際上,很多流經Logstash的數據都是有自己預定義的特殊分隔符的,我們可以很簡單的直接切割成多個字段。
logstash-filter-mutate插件里的“split”選項只能切成數組,后續很不方便使用和識別。而在logstash-filter-ruby里,我們可以通過“init”參數預定義好由每個新字段的名字組成的數組,然后在“code”參數指定的Ruby語句里通過兩個數組的zip操作生成一個哈希并添加進數組里。短短一行Ruby代碼,可以減少50%以上的CPU使用率。
logstash-filter-ruby插件用途遠不止這一點,下一節你還會繼續見到它的身影。
更多實例如下:
filter{ date { match => [“datetime” , “UNIX”] } ruby { code =>“event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now)。abs” } }
在實際運用中,我們幾乎肯定會碰到出乎意料的輸入數據。這都有可能導致Elasticsearch集群出現問題。
當數據格式發生變化,比如UNIX時間格式變成UNIX_MS時間格式,會導致Logstash瘋狂創建新索引,集群崩潰。
或者誤輸入過老的數據時,因為一般我們會close幾天之前的索引以節省內存,必要時再打開。而直接嘗試把數據寫入被關閉的索引會導致內存問題。
這時候我們就需要提前校驗數據的合法性。上面配置,就是用于過濾掉時間范圍與當前時間差距太大的非法數據的。
2.3.9 split拆分事件
上一章我們通過multiline插件將多行數據合并進一個事件里,那么反過來,也可以把一行數據,拆分成多個事件。這就是split插件。
配置示例如下:
filter { split { field =>“message” terminator =>“#” } }
這個測試中,我們在intputs/stdin的終端中輸入一行數據:“test1#test2”,結果看到輸出兩個事件:
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “test1” } {“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “test2” }
注意
split插件中使用的是yield功能,其結果是split出來的新事件,會直接結束其在filter階段的歷程,也就是說寫在split后面的其他filter插件都不起作用,進入到output階段。所以,一定要保證split配置寫在全部filter配置的最后。
使用了類似功能的還有clone插件。從logstash-1.5.0beta1版本以后修復該問題。
2.3.10 elapsed
Splunk有一項非常有用的功能,叫做transaction。可以在錯亂的多行日志中,根據connected字段、maxspan窗口、startswith/endwith標簽等信息計算出事件的duration和count結果。其文檔見:http://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Transaction
ELK中,承載計算功能的Elasticsearch并不支持這種跨行計算,所以,變通的處理方式是:在Logstash中,提前做好事件的歸并,直接計算出來transaction的duration數據。
比如一個transaction task_id startswith=START endwith=END的查詢,可以在Logstash中這樣計算:
filter { grok { match => [“message”, “%{TIMESTAMP_ISO8601} START id: (?<task_id>.*)”] add_tag => [ “taskStarted” ] } grok { match => [“message”, “%{TIMESTAMP_ISO8601} END id: (?<task_id>.*)”] add_tag => [ “taskTerminated”] } elapsed { start_tag =>“taskStarted” end_tag =>“taskTerminated” unique_id_field =>“task_id” } }