Thoughts on Data Science, IT Operations Analytics, programming and other random topics


A logstash delta filter

16 Nov 2014

I mentioned in a previous post (Logstash for metric ingestion - considering custom plugins) that we were going to start using logstash for metric extraction and preparation in some cases. I determined in short-order that we'd need to create a few custom plugins for our own uses. Here I'll show you my very first one, written more as a learning exercise for myself than as something we actually need.

I took my initial inspiration from the guidance on this page Extending logstash. This was enough to give me the basic sense of what was required. The plugin environment follows a fairly standard paradigm where your code gets called at certain key points, like startup, teardown, reciept of events and so on. You just provide the code at those critical points, and off you go. I also reviewed code for a number of the existing plugins and even though 'I'm not a Ruby guy', this gave me enough insight to get going.

Rather than implementing a filter that works on an individual event, I knew I would have some upcoming needs for processing across events and so I wanted to focus my learning experiment on that. One thing that becomes apparent fairly quickly when using logstash is that the events exist generally in splendid isolation. Each one is independent of any other one. While clearly this model works very well in many cases, sometimes you do need some ability to work across events in a light-weight fashion. There are already a few plugins which provide this type of behaviour which I studied to get inspiration e.g. Multiline.

I gave myself the simple challenge of building a 'delta' filter - one which would compute the arithmetic difference between values in a designated field in successive events, and then place the output values in another designated field. This kind of operation is very common in my ITOA applications (where it's often referred to as 'unpegging')

My attempt is shown below, which I put in the file simpledelta.rb and placed this file in a suitable plugin directory.

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/environment"
class LogStash::Filters::SimpleDelta < LogStash::Filters::Base

  config_name "simpledelta"
  milestone 1

  config :input_field, :validate => :string
  config :output_field, :validate => :string

  public
  def register 
  end # def register

  public
  def filter(event)

     if !@lastEvent.nil? 
       event[@output_field] = 
         (event[@input_field].to_f) - (@lastEvent[@input_field]).to_f       
     end

     # remember event for next time
     @lastEvent = event
       
  end # def filter

end # class LogStash::Filters::SimpleDelta

Taking a look at the key steps, first I define the name of the filter class SimpleDelta, and the name it will be referred to in the logstash config files simpledelta

class LogStash::Filters::SimpleDelta < LogStash::Filters::Base
  config_name "simpledelta"

Next come the definition of the configuration parameters I'll be using. We'll get specify input_field and output_field in the logstash configuration file

  config :input_field, :validate => :string
  config :output_field, :validate => :string

The magic happens in the method filter. This method will be called at runtime by the logstash infrastructure and is passed an event object each time it is called. The event object is the key entity you work with.

  public
  def filter(event)

     if !@lastEvent.nil? 
       event[@output_field] = 
         (event[@input_field].to_f) - (@lastEvent[@input_field]).to_f       
     end

     # remember event for next time
     @lastEvent = event
       
  end # def filter

To create the delta function, I first maintain a variable @lastEvent which, as the name implies, is the last event we processed. Of course, the very first time we are called we will not have a value here, hence the check for nil. First time through, we hold on to the the last event and return without any actual computationcarried out. On subsequent passess, we extract the designated values from input_field on the current event and the lastEvent, convert to numbers/floats (via to_f) and compute the difference. This difference is put in the value output_field via the assignment to event[@output_field]

To demonstrate the operation, I created a configuration file

input {
  stdin {}
}
filter {
  csv {
    columns => ["value"]
  }
  simpledelta {
    input_field  => "value"
    output_field => "delta"
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

I also created a little test file input.dat with a few numbers. Remember, the objective is to compute the delta between successive values.

5
2
3
1
6
3
7
3
1
To run it (and replace /path/to/my/plugin/dir with your plugin location)

cat input.dat | logstash -f delta.conf --pluginpath /path/to/my/plugin/dir

The output for the first three events look like this

{
       "message" => [
        [0] "5"
    ],
      "@version" => "1",
    "@timestamp" => "2014-11-16T13:06:08.197Z",
          "host" => "myhost.com",
         "value" => "5"
}
{
       "message" => [
        [0] "2"
    ],
      "@version" => "1",
    "@timestamp" => "2014-11-16T13:06:08.198Z",
          "host" => "myhost.com",
         "value" => "2",
         "delta" => -3.0
}
{
       "message" => [
        [0] "3"
    ],
      "@version" => "1",
    "@timestamp" => "2014-11-16T13:06:08.198Z",
          "host" => "myhost.com",
         "value" => "3",
         "delta" => 1.0
You can see that for the first event (5), we don't have a delta value yet, but on subsequent values, the delta value is computed and output. So in hindsight, it was quite simple, once you know how. With this foundation, I'm ready to tackle a more complicated one, which I'll share in a subsequent post.

comments powered by Disqus