官术网_书友最值得收藏!

2.3 過濾器配置

有豐富的過濾器插件,是Logstash威力如此強大的重要因素。名為過濾器,其實提供的不單單是過濾的功能。下面我們就會重點介紹幾個插件,它們擴展了進入過濾器的原始數據,進行復雜的邏輯處理,甚至可以無中生有地添加新的Logstash事件到后續的流程中去!

2.3.1 date時間處理

之前章節已經提過,logstash-filter-date插件可以用來轉換你的日志記錄中的時間字符串,變成LogStash::Timestamp對象,然后轉存到@timestamp字段里。

警告

因為在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}這種寫法必須讀取@timestamp數據,所以一定不要直接刪掉這個字段保留自己的字段,而是應該用logstash-filter-date轉換后刪除自己的字段!

這在導入舊數據的時候固然非常有用,而在實時數據處理的時候同樣有效,因為一般情況下數據流程中我們都會有緩沖區,導致最終的實際處理時間跟事件產生時間略有偏差。

提示

強烈建議打開Nginx的access_log配置項的buffer參數,對極限響應性能有極大提升!

1.配置示例

logstash-filter-date插件支持五種時間格式:

□ ISO8601:類似“2011-04-19T03:44:01.103Z”這樣的格式。具體Z后面可以有“08:00”也可以沒有,“.103”這個也可以沒有。常用場景里來說,Nginx的log_format配置里就可以使用$time_iso8601變量來記錄請求時間成這種格式。

□ UNIX:UNIX時間戳格式,記錄的是從1970年起始至今的總秒數。Squid默認日志格式中就使用了這種格式。

□ UNIX_MS:這個時間戳則是從1970年起始至今的總毫秒數。據我所知,JavaScript里經常使用這個時間格式。

□ TAI64N:TAI64N格式比較少見,是這個樣子的:@4000000052f88ea32489532c。我目前只知道常見應用中,qmail會用這個格式。

□ Joda-Time庫:Logstash內部使用了Java的Joda時間庫來作時間處理。所以我們可以使用Joda庫所支持的時間格式來作具體定義。Joda時間格式定義見表2-1。

表2-1 Joda時間庫格式

下面我們寫一個Joda時間格式的配置作為示例:

filter {
    grok {
        match => [“message”, “%{HTTPDATE:logdate}”]
    }
    date {
        match => [“logdate”, “dd/MMM/yyyy:HH:mm:ss Z”]
    }
}

注意,時區偏移量只需要用一個字母Z即可。

2.時區問題的解釋

很多中國用戶經常提一個問題:為什么@timestamp比我們早了8個小時?怎么修改成北京時間?

其實,Elasticsearch內部,對時間類型字段,是統一采用UTC時間,存成long長整形數據的!對日志統一采用UTC時間存儲,是國際安全/運維界的一個通識——歐美公司的服務器普遍廣泛分布在多個時區里——不像中國,地域橫跨五個時區卻只用北京時間。

對于頁面查看,ELK的解決方案是在Kibana上,讀取瀏覽器的當前時區,然后在頁面上轉換時間內容的顯示。

所以,建議大家接受這種設定。否則,即便你用.getLocalTime修改,也還要面臨在Kibana過去修改,以及Elasticsearch原有的["now-1h"TO"now"]這種方便的搜索語句無法正常使用的尷尬。

以上,請讀者自行斟酌。

2.3.2 grok正則捕獲

grok是Logstash最重要的插件。你可以在grok里預定義好命名正則表達式,在稍后(grok參數或者其他正則表達式里)引用它。

1.正則表達式語法

運維工程師多多少少都會一點正則。你可以在grok里寫標準的正則,像下面這樣:

\s+(?<request_time>\d+(?:\.\d+)?)\s+

這個正則表達式寫法對于Perl或者Ruby程序員應該很熟悉了,Python程序員可能更習慣寫(?P<name>pattern),沒辦法,適應一下吧。

現在給我們的配置文件添加第一個過濾器區段配置。配置要添加在輸入和輸出區段之間(Logstash執行區段的時候并不依賴于次序,不過為了自己看得方便,還是按次序書寫吧):

input {stdin{}}
filter {
    grok {
        match => {
    “message” =>“\s+(?<request_time>\d+(?:\.\d+)?)\s+”
        }
    }
}
output {stdout{}}

運行Logstash進程然后輸入“begin 123.456 end”,你會看到類似下面這樣的輸出:

{“message” =>“begin 123.456 end”,“@version” =>“1”,“@timestamp” =>“2014-08-09T11:55:38.186Z”,“host” =>“raochenlindeMacBook-Air.local”,“request_time” =>“123.456”
}

漂亮!不過數據類型好像不太滿意……request_time應該是數值而不是字符串。

我們已經提過稍后會學習用LogStash::Filters::Mutate來轉換字段值類型,不過在grok里,其實有自己的魔法來實現這個功能!

2.grok表達式語法

grok支持把預定義的grok表達式寫入到文件中,官方提供的預定義grok表達式見:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

下面是從官方文件中摘抄的最簡單但是足夠說明用法的示例:

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}

第一行,用普通的正則表達式來定義一個grok表達式;第二行,通過打印賦值格式,用前面定義好的grok表達式來定義另一個grok表達式。

grok表達式的打印復制格式的完整語法見下行示例。其中data_type目前只支持兩個值:int和float。

%{PATTERN_NAME:capture_name:data_type}

所以我們可以改進我們的配置成下面這樣:

filter {
    grok {
        match => {
    “message” =>“%{WORD} %{NUMBER:request_time:float} %{WORD}”
        }
    }
}

重新運行進程然后可以得到如下結果:

{“message” =>“begin 123.456 end”,“@version” =>“1”,“@timestamp” =>“2014-08-09T12:23:36.634Z”,“host” =>“raochenlindeMacBook-Air.local”,“request_time” => 123.456
}

這次request_time變成數值類型了。

3.最佳實踐

實際運用中,我們需要處理各種各樣的日志文件,如果你都是在配置文件里各自寫一行自己的表達式,就完全不可管理了。所以,我們建議是把所有的grok表達式統一寫入到一個地方。然后用filter/grok的patterns_dir選項來指明。

如果你把“message”里所有的信息都grok到不同的字段了,數據實質上就相當于是重復存儲了兩份。所以你可以用remove_field參數來刪除掉message字段,或者用overwrite參數來重寫默認的message字段,只保留最重要的部分。

重寫參數的示例如下:

filter {
    grok {
        patterns_dir =>“/path/to/your/own/patterns”
        match => {
    “message” =>“%{SYSLOGBASE} %{DATA:message}”
        }
        overwrite => [“message”]
    }
}

4.高級用法

□ 多行匹配 在和codec/multiline搭配使用的時候,需要注意一個問題,grok正則和普通正則一樣,默認是不支持匹配回車換行的。就像你需要=~//m一樣也需要單獨指定,具體寫法是在表達式開始位置加(?m)標記。如下所示:

match => {“message” =>“(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+”
}

□ 多項選擇 有時候我們會碰上一個日志有多種可能格式的情況。這時候要寫成單一正則就比較困難,或者全用|隔開又比較丑陋。這時候,Logstash的語法提供給我們一個有趣的解決方式。

文檔中,都說明logstash-filters-grok插件的match參數應該接受的是一個Hash值。但是因為早期的Logstash語法中Hash值也是用[]這種方式書寫的,所以其實現在傳遞Array值給match參數也完全沒問題。所以,我們這里其實可以傳遞多個正則來匹配同一個字段:

match => [“message”, “(?<request_time>\d+(?:\.\d+)?)”,“message”, “%{SYSLOGBASE} %{DATA:message}”,“message”, “(?m)%{WORD}”
]

Logstash會按照這個定義次序依次嘗試匹配,到匹配成功為止。雖說效果跟用|分割寫個大大的正則是一樣的,但是可閱讀性好了很多。

提示

我強烈建議每個人都要使用Grok Debugger(http://grokdebug.herokuapp.com/)來調試自己的grok表達式。

2.3.3 GeoIP地址查詢

GeoIP是最常見的免費IP地址歸類查詢庫,同時也有收費版可以采購。GeoIP庫可以根據IP地址提供對應的地域信息,包括國別、省市、經緯度等,對于可視化地圖和區域統計非常有用。

配置示例如下:

filter {
    geoip {
        source =>“message”
    }
}

運行結果如下:

{“message” =>“183.60.92.253”,“@version” =>“1”,“@timestamp” =>“2014-08-07T10:32:55.610Z”,“host” =>“raochenlindeMacBook-Air.local”,“geoip” => {“ip” =>“183.60.92.253”,“country_code2” =>“CN”,“country_code3” =>“CHN”,“country_name” =>“China”,“continent_code” =>“AS”,“region_name” =>“30”,“city_name” =>“Guangzhou”,“latitude” =>23.11670000000001,“longitude” =>113.25,“timezone” =>“Asia/Chongqing”,“real_region_name” =>“Guangdong”,“location” => [
            [0] 113.25,
            [1] 23.11670000000001
        ]
    }
}

GeoIP庫數據較多,如果你不需要這么多內容,可以通過fields選項指定自己所需要的。下例為全部可選內容:

filter {
    geoip {
        f?ields => [“city_name”, “continent_code”, “country_code2”,
        “country_code3”, “country_name”, “dma_code”, “ip”, “latitude”,
        “longitude”, “postal_code”, “region_name”, “timezone”]
    }
}

需要注意的是:geoip.location是Logstash通過latitude和longitude額外生成的數據。所以,如果你是想要經緯度又不想重復數據的話,應該像下面這樣做:

filter {
    geoip {
        fields => [“city_name”, “country_code2”, “country_name”, “latitude”,
        “longitude”, “region_name”]
        remove_field => [“[geoip][latitude]”, “[geoip][longitude]”]
    }
}

還要注意:geoip插件的“source”字段可以是任一處理后的字段,比如“client_ip”,但是字段內容卻需要小心!GeoIp庫內只存有公共網絡上的IP信息,查詢不到結果的,會直接返回null,而Logstash的GeoIp插件對null結果的處理是:“不生成對應的geoip.字段”。所以讀者在測試時,如果使用了諸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等內網地址,會發現沒有對應輸出!

2.3.4 JSON編解碼

在上一章,已經講過在Codec中使用JSON編碼。但是,有些日志可能是一種復合的數據結構,其中只有一部分記錄是JSON格式的。這時候,我們依然需要在filter階段,單獨啟用JSON解碼插件。

配置示例如下:

filter {
    json {
        source =>“message”
        target =>“jsoncontent”
    }
}

運行結果如下:

{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “{\”uid\“:3081609001,\”type\“:\”signal\“}”,“jsoncontent”: {“uid”: 3081609001,“type”: “signal”
}
}

如果不打算使用多層結構的話,刪掉target配置即可。單層結構新的結果如下:

{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “{\”uid\“:3081609001,\”type\“:\”signal\“}”,“uid”: 3081609001,“type”: “signal”
}

2.3.5 key-value切分

在很多情況下,日志內容本身都是一個類似于key-value的格式,但是格式具體的樣式卻是多種多樣的。Logstash提供logstash-filter-kv插件,幫助處理不同樣式的key-value日志,變成實際的LogStash::Event數據。

配置示例如下:

filter {
    ruby {
        init =>“@kname = ['method','uri','verb']”
        code =>“event.append(Hash[@kname.zip(event['request'].split(‘ ’))])”
    }
    if [uri] {
        ruby {
            init =>“@kname = ['url_path','url_args']”
            code =>“event.append(Hash[@kname.zip(event['uri'].split(‘?’))])”
        }
        kv {
            pref?ix =>“url_”
            source =>“url_args”
            field_split =>“&”
            remove_field => [ “url_args”, “uri”, “request” ]
        }
    }
}

Nginx訪問日志中的$request,通過這段配置,可以詳細切分成method、url_path、verb、url_a、url_b...

進一步,如果url_args中有過多字段,可能導致Elasticsearch集群因為頻繁update mapping或者消耗太多內存在cluster state上而宕機。所以,更優的選擇是只保留明確有用的url_args內容,其他部分舍去,如下所示:

kv {
    prefix =>“url_”
    source =>“url_args”
    field_split =>“&”
    include_keys => [ “uid”, “cip” ]
    remove_field => [ “url_args”, “uri”, “request” ]
}

上例即表示,除了url_uid和url_cip兩個字段以外,其他的url_*都不保留。

2.3.6 metrics數值統計

logstash-filter-metrics插件是使用Ruby的Metriks模塊來實現在內存里實時地計數和采樣分析。該模塊支持兩個類型的數值分析:meter和timer。下面分別舉例說明。

1.Meter示例(速率閾值檢測)

Web訪問日志的異常狀態碼頻率是運維人員會非常關心的一個數據。通常我們的做法是通過Logstash或者其他日志分析腳本,把計數發送到rrdtool或者graphite里面,然后再通過check_graphite腳本之類的東西來檢查異常并報警。

事實上這個事情可以直接在Logstash內部就完成。比如如果最近一分鐘504請求的個數超過100個就報警,如下所示:

filter {
    metrics {
        meter =>“error.%{status}”
        add_tag =>“metric”
        ignore_older_than => 10
    }
    if “metric” in [tags] {
        ruby {
            code =>“event.cancel if event['error.504.rate_1m'] * 60 < 100”
        }
    }
}
output {
    if “metric” in [tags] {
        exec {
            command =>“echo \”Out of threshold: %{error.504.rate_1m}\“”
        }
    }
}

這里需要注意*60的含義。metrics模塊生成的rate_1m/5m/15m意思是:最近1、5、15分鐘的每秒速率!

2.Timer示例(box and whisker異常檢測)

官版的logstash-filter-metrics插件只適用于metric事件的檢查。由插件生成的新事件內部不存有來自input區段的實際數據信息。所以,要完成我們的百分比分布箱體檢測,需要首先對代碼稍微做幾行變動,即在metric的timer事件里加一個屬性,存儲最近一個實際事件的數值:https://github.com/chenryn/logstash/commit/bc7bf34caf551d8a149605cf28e7c5d33fae7458

然后我們就可以用如下配置來探測異常數據了:

filter {
    metrics {
        timer => {“rt” =>“%{request_time}”}
        percentiles => [25, 75]
        add_tag =>“percentile”
    }
    if “percentile” in [tags] {
        ruby {
            code =>“l=event['rt.p75']-event['rt.p25'];event['rt.low']
                =event['rt.p25']-l;event['rt.high']=event['rt.p75']+l”
        }
    }
}
output {
    if “percentile” in [tags] and ([rt.last] > [rt.high] or [rt.last] < [rt.low]) {
        exec {
            command =>“echo \”Anomaly: %{rt.last}\“”
        }
    }
}

提示

有關box and shisker plot內容和重要性,參見《數據之魅》一書。

2.3.7 mutate數據修改

logstash-filter-mutate插件是Logstash另一個重要插件,它提供了豐富的基礎類型數據處理能力,包括類型轉換、字符串處理和字段處理等。

1.類型轉換

類型轉換是logstash-filter-mutate插件最初誕生時的唯一功能。其應用場景在之前JSON編解碼小節已經提到。

可以設置的轉換類型包括:“integer”、“float”和“string”。示例如下:

filter {
    mutate {
        convert => [“request_time”, “float”]
    }
}

注意

mutate除了轉換簡單的字符值,還支持對數組類型的字段進行轉換,即將[“1”,“2”]轉換成[1,2]。但不支持對哈希類型的字段做類似處理。有這方面需求的可以采用稍后講述的logstash-filter-ruby插件完成。

2.字符串處理

有如下字符串處理的插件:

□ gsub:僅對字符串類型字段有效。

gsub => [“urlparams”, “[\\?#]”, “_”]

□ split:分割字符串。

filter {
    mutate {
        split => [“message”, “|”]
    }
}

隨意輸入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下輸出:

{“message” => [
        [0] “123”,
        [1] “321”,
        [2] “adfd”,
        [3] “dfjld*=123”
    ],“@version” =>“1”,“@timestamp” =>“2014-08-20T15:58:23.120Z”,“host” =>“raochenlindeMacBook-Air.local”
}

□ join:僅對數組類型字段有效。

我們在之前已經用split割切的基礎上再join回去。配置改成:

filter {
    mutate {
        split => [“message”, “|”]
    }
    mutate {
        join => [“message”, “,”]
    }
}

filter區段之內,是順序執行的。所以我們最后看到的輸出結果是:

{“message” =>“123,321,adfd,dfjld*=123”,“@version” =>“1”,“@timestamp” =>“2014-08-20T16:01:33.972Z”,“host” =>“raochenlindeMacBook-Air.local”
}

□ merge:合并兩個數組或者哈希字段。依然在之前split的基礎上繼續:

filter {
    mutate {
        split => [“message”, “|”]
    }
    mutate {
        merge => [“message”, “message”]
    }
}

我們會看到輸出:

{“message” => [
        [0] “123”,
        [1] “321”,
        [2] “adfd”,
        [3] “dfjld*=123”,
        [4] “123”,
        [5] “321”,
        [6] “adfd”,
        [7] “dfjld*=123”
    ],“@version” =>“1”,“@timestamp” =>“2014-08-20T16:05:53.711Z”,“host” =>“raochenlindeMacBook-Air.local”
}

如果src字段是字符串,會自動先轉換成一個單元素的數組再合并。把上一示例中的來源字段改成“host”:

filter {
    mutate {
        split => [“message”, “|”]
    }
    mutate {
        merge => [“message”, “host”]
    }
}

結果變成:

{“message” => [
        [0] “123”,
        [1] “321”,
        [2] “adfd”,
        [3] “dfjld*=123”,
        [4] “raochenlindeMacBook-Air.local”
    ],“@version” =>“1”,“@timestamp” =>“2014-08-20T16:07:53.533Z”,“host” => [
        [0] “raochenlindeMacBook-Air.local”
    ]
}

看,目的字段“message”確實多了一個元素,但是來源字段“host”本身也由字符串類型變成數組類型了!

同樣,如果目的字段不是數組,也會被強制轉換。即使來源字段并不存在:

filter {
    mutate {
        merge => [“message”, “not_exist_field”]
    }
}

結果會變成:

{“message” => [
        [0] “123|321|adfd|dfjld*=123”
    ],“@version” =>“1”,“@timestamp” =>“2014-08-20T15:58:23.120Z”,“host” =>“raochenlindeMacBook-Air.local”
}

□ strip:去除字段內容前后的空格。可以接受數組參數:

filter {
    mutate {
        strip => [“syslog_message”, “syslog_datetime”]
    }
}

□ lowercase:將字段內容全部轉換成小寫字母。同樣可以接受數組。在ELK stack場景中,將內容轉換成小寫會是一個比較常見的需求。因為Elasticsearch默認是統一按照小寫字母來搜索的。為了確保檢索準確率,在不影響使用的情況下,建議對常用檢索字段啟用lowercase配置。

□ uppercase:將字段內容全部轉換成大寫字母。同樣可以接受數組。

3.字段處理

字段處理的插件有:

□ rename:重命名某個字段,如果目的字段已經存在,會被覆蓋掉,如下所示:

filter {
    mutate {
        rename => [“syslog_host”, “host”]
    }
}

□ update:更新某個字段的內容。如果字段不存在,不會新建。

□ replace:作用和update類似,但是當字段不存在的時候,它會起到add_field參數一樣的效果,自動添加新的字段。

4.執行次序

需要注意的是,filter/mutate內部是有執行次序的。其次序如下:

rename(event) if @rename
update(event) if @update
replace(event) if @replace
convert(event) if @convert
gsub(event) if @gsub
uppercase(event) if @uppercase
lowercase(event) if @lowercase
strip(event) if @strip
remove(event) if @remove
split(event) if @split
join(event) if @join
merge(event) if @merge
filter_matched(event)

而filter_matched這個filters/base.rb里繼承的方法也是有次序的:

@add_field.each do |field, value|
end
@remove_field.each do |field|
end
@add_tag.each do |tag|
end
@remove_tag.each do |tag|
end

2.3.8 隨心所欲的Ruby處理

如果你稍微懂那么一點點Ruby語法的話,logstash-filter-ruby插件將會是一個非常有用的工具。比如你需要稍微修改一下LogStash::Event對象,但是又不打算為此寫一個完整的插件,用logstash-filter-ruby插件絕對感覺良好。

配置示例如下:

filter {
    ruby {
        init =>“@kname = ['client','servername','url','status','time','size','upstream',
            'upstreamstatus','upstreamtime','referer','xff','useragent']”
        code =>“event.append(Hash[@kname.zip(event['message'].split(‘|’))])”
    }
}

官網示例是一個比較有趣但是沒啥大用的做法——隨機取消90%的事件。

所以上面我們給出了一個有用而且強大的實例。

通常我們都是用logstash-filter-grok插件來捕獲字段的,但是正則耗費大量的CPU資源,很容易成為Logstash進程的瓶頸。

而實際上,很多流經Logstash的數據都是有自己預定義的特殊分隔符的,我們可以很簡單的直接切割成多個字段。

logstash-filter-mutate插件里的“split”選項只能切成數組,后續很不方便使用和識別。而在logstash-filter-ruby里,我們可以通過“init”參數預定義好由每個新字段的名字組成的數組,然后在“code”參數指定的Ruby語句里通過兩個數組的zip操作生成一個哈希并添加進數組里。短短一行Ruby代碼,可以減少50%以上的CPU使用率。

logstash-filter-ruby插件用途遠不止這一點,下一節你還會繼續見到它的身影。

更多實例如下:

filter{
    date {
        match => [“datetime” , “UNIX”]
    }
    ruby {
        code =>“event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now)。abs”
    }
}

在實際運用中,我們幾乎肯定會碰到出乎意料的輸入數據。這都有可能導致Elasticsearch集群出現問題。

當數據格式發生變化,比如UNIX時間格式變成UNIX_MS時間格式,會導致Logstash瘋狂創建新索引,集群崩潰。

或者誤輸入過老的數據時,因為一般我們會close幾天之前的索引以節省內存,必要時再打開。而直接嘗試把數據寫入被關閉的索引會導致內存問題。

這時候我們就需要提前校驗數據的合法性。上面配置,就是用于過濾掉時間范圍與當前時間差距太大的非法數據的。

2.3.9 split拆分事件

上一章我們通過multiline插件將多行數據合并進一個事件里,那么反過來,也可以把一行數據,拆分成多個事件。這就是split插件。

配置示例如下:

filter {
    split {
        field =>“message”
        terminator =>“#”
    }
}

這個測試中,我們在intputs/stdin的終端中輸入一行數據:“test1#test2”,結果看到輸出兩個事件:

{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “test1”
}
{“@version”: “1”,“@timestamp”: “2014-11-18T08:11:33.000Z”,“host”: “web121.mweibo.tc.sinanode.com”,“message”: “test2”
}

注意

split插件中使用的是yield功能,其結果是split出來的新事件,會直接結束其在filter階段的歷程,也就是說寫在split后面的其他filter插件都不起作用,進入到output階段。所以,一定要保證split配置寫在全部filter配置的最后。

使用了類似功能的還有clone插件。從logstash-1.5.0beta1版本以后修復該問題。

2.3.10 elapsed

Splunk有一項非常有用的功能,叫做transaction。可以在錯亂的多行日志中,根據connected字段、maxspan窗口、startswith/endwith標簽等信息計算出事件的duration和count結果。其文檔見:http://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Transaction

ELK中,承載計算功能的Elasticsearch并不支持這種跨行計算,所以,變通的處理方式是:在Logstash中,提前做好事件的歸并,直接計算出來transaction的duration數據。

比如一個transaction task_id startswith=START endwith=END的查詢,可以在Logstash中這樣計算:

filter {
  grok {
    match => [“message”, “%{TIMESTAMP_ISO8601} START id: (?<task_id>.*)”]
    add_tag => [ “taskStarted” ]
  }
  grok {
    match => [“message”, “%{TIMESTAMP_ISO8601} END id: (?<task_id>.*)”]
    add_tag => [ “taskTerminated”]
  }
  elapsed {
    start_tag =>“taskStarted”
    end_tag =>“taskTerminated”
    unique_id_field =>“task_id”
  }
}
主站蜘蛛池模板: 桐庐县| 双柏县| 美姑县| 美姑县| 大连市| 叙永县| 清水河县| 佛冈县| 平阴县| 济源市| 吴忠市| 海盐县| 兴国县| 专栏| 九寨沟县| 余江县| 高要市| 福贡县| 新乐市| 柳河县| 鸡西市| 宁都县| 通化县| 怀宁县| 荔波县| 平谷区| 都匀市| 广昌县| 阳原县| 吉水县| 汾阳市| 和硕县| 即墨市| 麦盖提县| 府谷县| 佛山市| 友谊县| 忻城县| 东乌| 祁门县| 泸溪县|