~# n0tr00t Security Team

Fuck data minding - logstash

18 Apr 2015 - 1ast10gin

[+] Author: 1ast10gin
[+] Team: n0tr00t security team 
[+] From: http://www.n0tr00t.com
[+] Create: 2015-04-18

写这篇文章的原因是想要给大家推荐一个比较好的数据分析系统。也把所谓的数据分析摘下神秘的面纱,让他更加平易近人。也说说我在使用中遇到的一点小问题,然后通过一些什么样的思考方式来解决的。

今天的主角是Logstash,在官方的介绍中,logstash是一个管理事件和日志的工具。你可以用这个来收集日志,然后来分析和存储他们以备以后来使用。

在官方网站上下载一个最新版本,运行软件需要Java的环境,我在测试和使用的环境是ubuntu。解压以后,可以直接运行./bin/logstash。然后你会发现什么都没有发生。

在正式的使用logstash之前,要先介绍几个概念:

  1. inputs
  2. outputs
  3. filters

inputs 顾名思义,就是输入,输入的方式有很多种,比如说直接在控制台中输入,这个就是标准的输入(stdin)。比如说,一些日志文件,这个就是文件输入(file),写这篇文章的时候,inputs支持的插件有41个。基本可以满足你的需要。如果你觉得这些插件没有满足你,还可以自己写插件(Ruby)。

outputs 输出。你从一个文件中得到了数据,那么你想放去哪里呢?是直接在控制台输出?还是存入到 ElasticSearch 里面,结合Kibana做分析。还是写成文件?

filters 过滤器。从原始的输入中要过滤掉一些自己不想要的东西,或者把它变成结构化的数据来存储,我觉得这个也是logstash的精髓所在,通过一层层的过滤机制,最终得到自己想要的东西。但是!这个filter不只是可以做“减法”,还可以做“加法”,比如说,官方提供的geoip的filter,可以把ip转换成有地理信息的数据。

除了上面的三个以外,还有一个codecs 因为这篇文章中没有涉及到和这个相关的东西,所以感兴趣的可以自行拓展。

让我们回到刚才运行的地方,在logstash的目录下输入:

./bin/logstash -e 'input { stdin { } } output { stdout {} }'

来解释一下上面的是什么东西,首先是 -e 命令,代表了后面的字符串作为设置来运行logstash。

上面两句话非常对称,也非常的好记。最简单的 logstash 配置就这样完成了,运行一下,还是什么都没有发生!

这个时候在命令行中输入 hello wrold ,就会出现这样的:

1ast10gin@ubuntu:~/pro/logstash$ ./bin/logstash -e "input{stdin{}} output{stdout{}}"
->hello world
2015-04-17T14:56:01.261+0000 ubuntu hello world

最前面的是时间戳,表示是什么时候产生的数据,第二个是产生这个消息的主机名称,第三个是消息的内容,一般情况下是不会使用 -e 命令来进行设置的,这样太麻烦了。这个时候就要用到了 -f 命令,指定一个配置文件。让我们看看一个稍微复杂一点的例子:

input{
    file{
        path=>"/tmp/mylog"
        start_position=>"beginning"
    }
}

filter{
    grok {
        match => {
            "message" => "%{COMBINEDAPACHELOG}"
        }
        remove_field => ['message','host','path','httpversion','response','bytes','@version','@timestamp','ident','auth','verb','referrer','agent']
    }
    kv{
        source => "[request]"
        field_split =>"&?="
        value_split =>":="
    }
    grep{
        match => {"request"=>"^/hey"}
        drop => true
        }
    geoip{
        source => 'clientip'
    }
    mutate{
        remove=>["[geoip][country_code3]","[geoip][country_name]","[geoip][continent_code]","[geoip][region_name]","[geoip][city_name]","[geoip][timezone]","[geoip][real_region_name]","[geoip][location]"]
    }
}

output{
    redis{
        host=>['127.0.0.1']
        data_type=>'list'
        key=>"logstash"
        }
    stdout{ codec => rubydebug }
    }

继续来解读调用这个文件的方法是:

./bin/logstash -f conf/log.conf

conf 文件夹和 log.conf 文件都是自己创建的。可以指定任意位置的文件,只要符合 conf 的格式就ok。

这个 conf 文件分为三个部分,input filter output。他们的用途上文也说过了,来看看里面的东西,

input 里面有个file{} ,说明这个 input 是从文件中读取的。path是文件的位置,start_position是开始读取文件的位置,这里是从头开始读取。可以根据实际情况来设置为end。

然后是filter,gork{}是一个通用型的解析日志的filter,”%{COMBINEDAPACHELOG}”是匹配 Apache 形式的日志格式。如果我不想要其中匹配到的一些字段,就用 remove_field 删除。kv{}是一个 key-value pair 的处理程序。比如”name:1ast10gin”。熟悉 linux 的肯定知道 grep 在 linux 中的作用,这里和linux中的用法是一样的,如果不是符合这个匹配条件的log,就drop掉。geoip{}就是是给IP地址做一个添加 geoinfo 的字段,里面有IP地址对应的地理信息。mutate{}是专门用来添加、删除或者修改字段的工具,这里是删除掉 geoip 中不想要的字段。

最后是 output{} 我想要把得到的数据放进 redis 中,方便我自己调用,只要填上相应的信息就可以运行了。在redis中,key就是logstash。为了可以之间在控制台中看到数据的流动,我这里加上了一个stdout{}。

上面就是大概的使用方法。为了测试上面的 log.conf 配置文件,你需要自己去弄一份Apache类型的log。

下面讲一下遇到的印象深刻的坑,这个例子告诉我编程是相通的,刚开始学编程的时候是深入各种语法,算法和设计模式,但是当掌握了以后,就可以融汇贯通,处理啥问题都是一样的思想。(这个想法可能是错误的,谁知道呢)

logstash的插件和运行的程序是用 ruby 写的,作为life is short , I use Python 的坚定拥护者(PHP是世界上最好的语言),没有相关的ruby的编程经验,但是在处理数据的时候发现一个问题:中文进行URL编码以后,如果这个经过URL编码的数据缺少了几位(是的你没有看错,实际运用中会有很多奇葩的问题)并且输入到 redis 里面的情况下,logstash就会直接退出报出下面的错误:

JSON::GeneratorError: source sequence is illegal/malformed utf-8
        to_json at json/ext/GeneratorMethods.java:71
        to_json at /opt/logstash-1.4.2/lib/logstash/event.rb:148
        receive at /opt/logstash-1.4.2/lib/logstash/outputs/redis.rb:158
         handle at /opt/logstash-1.4.2/lib/logstash/outputs/base.rb:86
     initialize at (eval):106
           call at org/jruby/RubyProc.java:271
         output at /opt/logstash-1.4.2/lib/logstash/pipeline.rb:266
   outputworker at /opt/logstash-1.4.2/lib/logstash/pipeline.rb:225
  start_outputs at /opt/logstash-1.4.2/lib/logstash/pipeline.rb:152

在github上面有人提交了这个issue,但是没有人修。我想了一下我的实际需求:包含这种奇葩情况的日志非常非常的少,1/10000000的出现的概率,甚至更低,对于数据挖掘的时候,这些情况对所有的训练集的影响非常的小,所以我觉得最好的解决方法不是尝试这把这些“损坏”的数据按照我的意愿来“修复”,而是直接抛弃。我不想在filter或者其他地方改,因为还有可能出现我不知道的错误的情况。这个时候就是修改源代码来得比较实际。通过看(蒙)上面的错误日志,定位到了event.rb这个文件的148行上。

在 logstash 中传递信息的时候是用了 json 的格式,如果 json 解析不成功就有可能会报错,修改后的代码:

public
def to_json(*args)
  begin
    return @data.to_json(*args)
  rescue
    @data = {}
    return @data.to_json()
  end
end  

在处理不成功的时候,把这个实例变量 @data 赋值为{},然后用 to_json() 解析返回。通过在 elasticsearch 和 kibana 中查看数据,发现没有异常。证明这个权衡之计算是成功了,下次记录下 ELK 整个体系作为 BI 系统,是如何进行数据分析的。