Thoughts on Data Science, IT Operations Analytics, programming and other random topics


Extracting data from Wily Introscope using Logstash

14 Mar 2016

Extracting data, either on once-off or ongoing basis is a sometimes tedious, unglamorous activity. It is a necessary part of our end-to-end analytics processing. No data, no analytics. I've just finished some development work to support a few customer trials., extracting performance data from Wily's Introscope performance management system, for use with our Predictive Analytics product. We'd started to see some increased demand for integration with Introscope and so I decided make this latest client engagement an opportunity to create a re-usable integration. In the same way as we'd done with some other integrations, I used Logstash as the platform.

Introscope has a couple of data access mechanism, but in this case, the clients required data extraction via the WebServices interface. There are plenty of descriptions of WebServicestechnology out there on the net so I won't rehash them here. The basic interface behaviour and data structures are defined in a WSDL file available from Wily and the approach is to use the information in this file to form SOAP queries, which are sent to Introscope, and to interpret the SOAP responses returned. When it comes to WSDL, the usual approach is to compile or otherwise process that WSDL into some form which can be used, often class and code code 'stubs' around which you build your program.

In my case, since I was using Logstash (Ruby-based), I went looking for a suitable WSDL library and chose Savon. (Aside: Despite being a somewhat reasonable French speaker, it didn't even occur to me for a day or so that Savon was 'soap' which of course implias SOAP - nice.)

The official steps required to create a Logstash input plugin are well described here. With those in mind, the basic extraction situation is shown here

The approach is to read the WSDL at startup, use the user supplied data selection data to form SOAP requests, send those to Introscope, and when the responses come back, as SOAP messages, extract the relevant information from those messages and convert them into Logstash events which are then output. What you do after that in your overall Logstash solution depends on your needs. In my case, I used another custom output plugin (scacsv)to convert the data into a very specific file format for our needs.

You can see the full implentation I wrote here: wilywebsvcs.rb. I'll simply provide some highlights here in this article for the rest

The name of the plugin I chose was wilywebsvcs

class LogStash::Inputs::WilyWebSvcs < LogStash::Inputs::Base
  configname "wilywebsvcs"
Then a set of config parameters - these are all described in the main README.md
  config :wsdl, :validate => :string, :required => true
  config :username, :validate => :string, :required => true
  config :password, :validate => :string, :required => true
  config :dataSelectors, :validate => :hash, :required => true
  config :starttime, :validate => :string, :default => ""
  config :endtime,   :validate => :string, :default => ""
  config :latency,    :validate => :number, :default => 0 # minutes
  config :aggregationinterval, :validate => :number, :default => 15 # minutes
  # The state preservation behaviour follows that of the jdbc input plugin
  # See https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
  # whether the previous run state should be preserved - true = delete previous state
  config :cleanrun, :validate => :boolean, :default => false
  # Whether to save state or not in lastrunmetatdatapath
  config :recordlastrun, :validate => :boolean, :default => true
  # Whether to save satte or not in lastrunmetadatapath
  config :lastrunmetadatapath, :validate => :string, :default => "#{ENV['HOME']}/.logstashwilywebsvcslastrun"
  config :sleepinterval, :validate => :number, :default => 10 # seconds
  config :logSOAPResponse, :validate => :boolean, :default => false
Basically, there is a reference to the WSDL file, dataSelectors which allow you to define what data you want to extract, some timing information, and some controls around state-persistence so the plugin remembers the last time it was processing over restarts and failures.

After some setup in the code to establish formatters to interpret the timestamp supplied in the config, we setup the actual Savon client

   @client = Savon.client(wsdl: @wsdl,convertrequestkeysto: :none,
                                    basicauth:[username,password],
                                    log: @logSOAPResponse, prettyprintxml: true)

That's the basic setup, defined, as is typical in the register method. The main loop is run from within the run(queue) method. The idea here is to establish a start and end time (sometimes the user won't even specify and end time which means we run forever ). We go around the main loop each time determining a particular targetTime which is used in the SOAP request. Sometimes the user specifies a latency - this is a typically small lag they get to introduce which lets us run a little behind wallclock. This is necessary because many times, it take a short while for the data to arrive and settle down to be ready in the source performance management system.

  public 
  def run(queue)
  store = "" # placeholder
    timeIncrement = @aggregationinterval * 60000 # convert supplied minutes to milliseconds
    @endTime   = @df.parse("2100-01-01T00:00:00-0000") # long time in the future. Only used if user didn't specify end time so we can run 'forever'
    latencySec = latency * 60 
    # start from the specified startTime
    targetTime = @startTime
    begin
      @logger.debug("targetTime = " + @df.format(targetTime))
      if ( targetTime < (Time.now() - latencySec) ) 
        bufferedEvents = extractDataForTimestamp(targetTime, timeIncrement, @dataSelectors, @df,@wilyDF)
        # Sort if necessary
        bufferedEvents.sort! { |a,b| a['timeslicestarttime'] <=> b['timeslicestarttime'] }
        # output all events
        bufferedEvents.each do | e |
          queue << e
        end
        bufferedEvents.clear
        # move to next time interval
        targetTime.setTime(targetTime.getTime() + timeIncrement)
        updateStateFile(@df, targetTime)
      else
        # wait a bit before trying again
        sleep(@sleepinterval)
      end
    end until(targetTime.getTime() >= @endTime.getTime())
    #finished
  end

This targetTime will be filled into the actual SOAP request in the extractDataForTimestamp method. The start of that method is shown next. There is a nested loop as the sets of extraction specification (Introscope Agent, Metric & Timing specifications) are typically organized into groups. So we loop through each group of extraction specifications, and within that group, loop through each individual one. Another constraint of our typical engagement is that we have to limit the load we put on the source system, so there is little to be gained by parallelizing all of this - serially creating a query, executing it, and processing results before moving on to the next one is just fine. Below you can see us making the SOAP call using @client.call with the various arguments. response is the data structure which contains the returned results for each invocation.

def extractDataForTimestamp(targetTimestamp, interval, dataSelectors, df, wilyDF )

    @logger.debug("dataSelectors = " + dataSelectors.to_s)

    bufferedEvents = []

    dataSelectors.each do | group, selectorArray |  
      selectorArray.each do | selectorOriginal |
        @logger.debug("selectorOriginal = " + selectorOriginal)
          
        splitSelector = selectorOriginal.split(",")  
         
        agentRegex    =  splitSelector[0]
        metricRegex   =  splitSelector[1]
        dataFrequency =  splitSelector[2]

        startTime = wilyDF.format(targetTimestamp)
        endTime   = wilyDF.format(java.util.Date.new(targetTimestamp.getTime() + interval))
        soapQuery = 
              "agentRegex:"   + agentRegex + " " +
              "metricRegex:"  + metricRegex + " " +
              "dataFrequency:"+ dataFrequency + " " +
              "startTime:"    + startTime + " " +
              "endTime:"      + endTime
        @logger.debug("SOAPQuery " + soapQuery)

        response = @client.call(:get_metric_data, :message => {
          :agentRegex  => agentRegex,
          :metricRegex => metricRegex, 
          :dataFrequency => dataFrequency,
          :startTime     => startTime, #"2015-12-08T00:00:00Z"
          :endTime       => endTime # "2015-12-08T01:00:00Z"
                               })

        @logger.debug("response = " + response.to_s)

Once we have a response, we then need to unpack that data structure and extract the essential fields. The start of that process is shown below (noting that sometimes there is no data returned). The individual methods decodeRefs, decodeMetricData and decodeAgentData can be seen in the Git source code - basically they produce a a set of data structures that are easily navigable when the time comes to create the new Logstash events.

 refs    = decodeRefs(response.body)
        unless refs.empty? then   # only enter this block if we have some data

          begin
            metrics = decodeMetricData(response.body)          
            agents  = decodeAgentData(response.body)

Here's where the creation of the new Logstash events are created. The particular fields selected are all we really need for the analytic processing to come. All the newly created events are pushed into the bufferedEvents structure, and when all the extractions have been performed, that set of events will be returned to the main loop.

 # Iterate over returned results and produce Logstash events
          refs.each do |r|
            begin
              unless metrics[r].nil? 
                metrics[r][:href].each do | href |
              
                  event = LogStash::Event.new
                  event['group'] = group 
                  event['refID'] = r.to_s
                  event['timeslice_start_time']=metrics[r][:timeslice_start_time]
                  event['timeslice_end_time']=metrics[r][:timeslice_end_time]
                  agentData = agents[href]
                  event['agent_name']  = agentData[:agent_name]
                  event['metric_name'] = agentData[:metric_name]
                  event['metric_value'] = agentData[:metric_value] 

                  bufferedEvents.push(event)
                end # metrics[r].each do
              end # unless

In the main loop, the events are sorted by starttime, something we need to do for our use-case. The volumes of data here are such, that the basic Ruby sorts are plenty fast. The sorted events are then pushed onto the normal Logstash queue, for delivery to downstream plugins, and around the loop we go again for the next targetTime, after a short sleep.

bufferedEvents = extractDataForTimestamp(targetTime, timeIncrement, @dataSelectors, @df,@wilyDF)

        # Sort if necessary
        bufferedEvents.sort! { |a,b| a['timeslice_start_time'] <=> b['timeslice_start_time'] }

        # output all events
        bufferedEvents.each do | e |
          queue << e
        end

        bufferedEvents.clear

This basic approach, going around a loop, increment forward through time, being paced by wallclock, executing essentially queries parametrized by time, and processing returned results into Logstash events is a very common pattern. We've done something very analagous to extract data from AppDynamics. I hope this post helps you, or otherwise inspires you to create your own logstash plugin

comments powered by Disqus