Sunday, January 8, 2012

Flume-Solr Integration

My previous post contain the installation step of Flume in psuedo and distributed mode. Few days before I did a small POC to integrate Flume with Solr. I have created a new sink. This sink is usually used with the regexAll decorators that perform light transformation of event data into attributes. This attributes are converted into solr document and commited in solr.

I have used flume-0.9.3 and apache-solr-3.1.0 for this POC.

RegexAllExtractor decorator prepare events that contain attributes ready to be written into an Solr. Implementing a RegexAllExtractor decorator is very simple.


package com.cloudera.flume.core.extractors;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;

/**
 * This takes a regex and any number of attribute names to assign to each
 * sub pattern in pattern order
 *
 * Example 1:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2, s3")
 *
 *   "123:456:789" -> {s1:123, s2:456, s3:789}
 *
 * Example 2:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2")
 *
 *   "123:456:789" -> {s1:123, s2:456}
 *
 * Example 3:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, , s2")
 *
 *   "123:456:789" -> {s1:123, s2:789}
 */
public class RegexAllExtractor extends EventSinkDecorator<EventSink> {
  
  final Pattern pat;
  final List<String> names;

  /**
   * This will not thrown an exception
   */
  public RegexAllExtractor(EventSink snk, Pattern pat, List<String> names) {
    super(snk);
    this.pat = pat;
    this.names = names;
  }

  /**
   * Convenience constructor that may throw a PatternSyntaxException (runtime
   * exn).
   */
  public RegexAllExtractor(EventSink snk, String regex, List<String> names) {
    this(snk, Pattern.compile(regex), names);
  }

  @Override
  public void append(Event e) throws IOException, InterruptedException {
    String s = new String(e.getBody());
    Matcher m = pat.matcher(s);
    String val = "";
    Integer grpCnt = m.groupCount();

    if(m.find()){
      for(int grp = 1; grp <= grpCnt; grp++){
        val = "";
        try {
          val = m.group(grp);
        } catch (IndexOutOfBoundsException ioobe) {
          val = "";
        }

        //Try/Catch so that we don't require there be the same number of names as patterns.
        try {
          //Ignore blank names. These are most likely sub patterns we don't care about keeping.
          if (!"".equals(names.get(grp-1))) {
            Attributes.setString(e, names.get(grp-1), val);
          }
        } catch (IndexOutOfBoundsException ioobe) {
          break;
        }
      }
    }
    super.append(e);
  }

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 2, "usage: regexAll(\"regex\", \"col1,col2\")");
        String regex = argv[0];
        Pattern pat = Pattern.compile(regex);
        String name = argv[1];
        ArrayList<String> nameList = new ArrayList<String>();
        String[] names = name.split("\\,");
        for(int i=0; i<names.length; ++i){
         nameList.add(names[i]);
        }

        EventSinkDecorator<EventSink> snk = new RegexAllExtractor(null, pat, nameList);
        return snk;

      }
    };
  }
  
  /**
  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin decorator.
  */
      public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
          List<Pair<String, SinkDecoBuilder>> builders =
                  new ArrayList<Pair<String, SinkDecoBuilder>>();
          builders.add(new Pair<String, SinkDecoBuilder>("regexAll",
                  builder()));
          return builders;
   }
}

Flume-Solr sink commit a single records into an Solr server based on a single Flume event. This sink have one input attribute: url. The attribute url is the url of the output Solr server. Yes, that means currently one sink can be configured to output into just one Solr server. Implementing a SolrEventSink sink is very simple.

package com.cloudera.flume.handlers.solr;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import com.cloudera.util.*;
import org.slf4j.LoggerFactory;

/**
* This class will convert attributes added into Flume event from RegexAllExtractor class into solr document and 
* store into solr.
*/
public class SolrEventSink extends EventSink.Base {

 static final Logger log = LoggerFactory.getLogger(SolrEventSink.class);
 private CommonsHttpSolrServer server;
 private String solrURL;

  public SolrEventSink(String url) {
   Preconditions.checkNotNull(url);
   this.solrURL = url;
   log.info("solr server url"+solrURL);
   try {
  this.server = new CommonsHttpSolrServer(solrURL);
 } catch (MalformedURLException e) {
  log.error("Invalid Solr URL");
 }
  }
  
  @Override
  public void append(Event e) throws IOException {
   
/* if(e.getAttrs().get((Object)"UnUsedField")==null)
 {
*/  // generate solr document.
  SolrInputDocument document = new SolrInputDocument();
  for (Entry<String, byte[]> a : e.getAttrs().entrySet()) {
   if(!a.getKey().equals("AckType") && !a.getKey().equals("AckTag") && !a.getKey().equals("AckChecksum") && !a.getKey().equals("tailSrcFile"))
    document.addField(a.getKey(), Bytes.toString(a.getValue()));  
  }
    try {
     // Add documnet into solr.
     if(document.size()!=0){
      server.add(document);
      // Commit the data into solr.
      server.commit();
     }
 } catch (SolrServerException e1) {
  log.error("SolrServerException : Exception communicating to the Solr Server instance");
 }catch (SolrException exception) {
  log.error("Solr Exception: "+ exception);
 }catch (Exception exception) {
  log.error("Exception : "+ exception);
 }
 //}
  }

  public static SinkBuilder builder() {
    return new SinkBuilder() {

      @Override
      public EventSink build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 1,
            "usage: SolrEventSink(serverURL)");

        return new SolrEventSink(argv[0]);
      }

    };
  }
  
  /**
  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin sink.
  */
    public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
      List<Pair<String, SinkBuilder>> builders =
        new ArrayList<Pair<String, SinkBuilder>>();
      builders.add(new Pair<String, SinkBuilder>("solrEventSink", builder()));
      return builders;
    }
  
}

You need to add flume-plugin-solrEventSink and flume-plugin-regexAllExtractor jar into Flume lib dir. You can compile it from Flume sources and Solr jars, then you need to add following jars into Flume's lib dir.

Jar names :
    apache-solr-solrj-3.1.0.jar
    commons-codec-1.3.jar
    commons-fileupload-1.2.2.jar
    commons-httpclient-3.1.jar
    commons-io-1.4.jar
    commons-logging-1.0.4.jar
    geronimo-stax-api_1.0_spec-1.0.1.jar
    hbase-0.90.1-CDH3B4.jar
    lucene-analyzers-2.9.1.jar
    lucene-core-2.9.1.jar
    lucene-highlighter-2.9.1.jar
    lucene-memory-2.9.1.jar
    lucene-misc-2.9.1.jar
    lucene-queries-2.9.1.jar
    lucene-snowball-2.9.1.jar
    lucene-spellchecker-2.9.1.jar
    servlet-api-2.5.jar
    slf4j-api-1.6.1.jar
    slf4j-simple-1.6.1.jar
    solr-commons-csv-1.4.0.jar
    solr-core-1.4.0.jar
    stax-api-1.0.1.jar
    wstx-asl-3.2.7.jar

The last step is to add plugins to Flume configuration(flume-conf.xml) file .

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>com.cloudera.flume.handlers.solr.SolrEventSink,com.cloudera.flume.core.extractors.RegexAllExtractor</value>
</property>  
.
.
</configuration>

Run Solr server:
java -Dsolr.solr.home=path/to/solr_home/solr/  -Djetty.port=12000 -jar start.jar

Note: Solr schema.xml file must contain the entries of column names specified in configuration of regexAll decorator in field tag. Otherwise invalid column name error ocurred and document not saved in solr.

configuration of Flume agent:
source : tail("path/to/log/file")
sink :  agentSink("collector_machine_name",35853)

configuration of Flume collector:
source : collectorSource(35853)
sink : {regexAll("regex","column_names") => solrEventSink("solr_server_url")}


Example:-

Run Solr server :
    $ cd SOLR_HOME
    $ java -Dsolr.solr.home=path/to/SOLR_HOME/solr/  -Djetty.port=12000 -jar start.jar

Create a file test.log and add following lines:
abc,bcd,cde
abc,swd,qwe
bcd,wer,asd
ghj,rty,ghj

Run Flume master:

    $ cd FLUME_HOME
    $ bin/flume master
Run Flume agent:
    $ cd FLUME_HOME
    $ bin/flume node -n test_agent
Run Flume collector:
    $ cd FLUME_HOME
    $ bin/flume node -n test_collector

Configuration of Flume agent:
    source : tail("/path/to/test.log")
    sink :  agentSink("localhost",35853)

Configuration of Flume collector:
    source : collectorSource(35853)
    sink : {regexAll("([^,]*),([^,]*),([^,]*)","id,cat,name")=>solrEventSink("http://hadoop-namenode:12000/solr")}


Note : Solr schema.xml must contains the entries of id,cat,name in field tag. Otherwise invalid column name error ocurred and document not saved in Solr. 



1 comment:

Anonymous said...

Hello ankit,
How do we edit schema.xml for SOLR?

Thanks

Post a Comment