Tuesday, January 10, 2012

Sqoop export and import commands

Sqoop Import Examples:
Sqoop Import :- Import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS) and its subprojects (Hive, HBase).


Import the data (MySQL table) to HBase:

Case 1: If table have primary key and import all the column of MySQL table into HBase table.

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName  --column-family hbase_table_col1 --hbase-create-table

Case 2: If table have primary key and import only few columns of MySQL table into HBase table.  

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName --columns column1,column2 --column-family hbase_table_col1 --hbase-create-table

Note : Column names specified in --columns attribute must contain the primary key column.

Case 3: If table doesn't have primary key then choose one column as a hbase-row-key. Import all the column of MySQL table into HBase table.

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName --column-family hbase_table_col1 --hbase-row-key column1 --hbase-create-table

Case 4: If table doesn't have primary key then choose one column as a hbase-row-key. Import only few columns of MySQL table into HBase table.

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table tableName --hbase-table hbase_tableName --columns column1,column2 --column-family hbase_table_col --hbase-row-key column1 --hbase-create-table 

Note: Column name specified in hbase-row-key atribute must be in columns list. Otherwise command will execute successfully but no records are inserted into hbase.


Note : The value of primary key column or column specified in --hbase-row-key attribute become the HBase row value. If MySQL table doesn't have primary key or column specified in --hbase-row-key attribute doesn't have unique value then there is a lost of few records.

Example : Let us consider a MySQL table test_table which have two columns name,address. The table test_table doesn't have primary key or unique key column.

Records of test_table:
________________
name    address
----------------
abc    123
sqw    345
abc    125
sdf    1234
aql    23dw


Run the following command to import test_table data into HBase:

$ bin/sqoop import --connect jdbc:mysql://localhost/db1 --username root --password root --table test_table --hbase-table hbase_test_table --column-family test_table_col1 --hbase-row-key name --hbase-create-table

Only 4 records are visible into HBase table instead of 5. In above example two rows have same value 'abc' of name column and value of this column is used as a HBase row key value. If record having value 'abc' of name column come then thoes record will inserted into HBase table. Next time, another record having the same value 'abc' of name column come then thoes column will overwrite the value previous column.

Above problem also occured if table have composite primary key because the one column from composite key is used as a HBase row key.

Import the data (MySQL table) to Hive

Case 1: Import MySQL table into Hive if table have primary key.

bin/sqoop-import  --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName  --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home

Case 2: Import MySQL table into Hive if table doesn't have primary key.

$ bin/sqoop-import  --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName  --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home --split-by column_name

or

$ bin/sqoop-import  --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName  --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home -m 1



Import the data (MySQL table) to HDFS


Case 1: Import MySQL table into HDFS if table have primary key.

$ bin/sqoop import -connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --target-dir /user/ankit/tableName

Case 2: Import MySQL table into HDFS if table doesn't have primary key.

$ bin/sqoop import -connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --target-dir /user/ankit/tableName  -m 1



Sqoop Export Examples:

Sqoop Export: export the HDFS and its subproject (Hive, HBase) data back into an RDBMS. 

Export Hive table back to an RDBMS:

By default, Hive will stored data using ^A as a field delimiter and \n as a row delimiter.

$ bin/sqoop export --connect jdbc:mysql://localhost/test_db --table tableName  --export-dir /user/hive/warehouse/tableName --username root --password password -m 1 --input-fields-terminated-by '\001'

where '\001' is octal representation of ^A.


Sunday, January 8, 2012

Flume-Solr Integration

My previous post contain the installation step of Flume in psuedo and distributed mode. Few days before I did a small POC to integrate Flume with Solr. I have created a new sink. This sink is usually used with the regexAll decorators that perform light transformation of event data into attributes. This attributes are converted into solr document and commited in solr.

I have used flume-0.9.3 and apache-solr-3.1.0 for this POC.

RegexAllExtractor decorator prepare events that contain attributes ready to be written into an Solr. Implementing a RegexAllExtractor decorator is very simple.


package com.cloudera.flume.core.extractors;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;

/**
 * This takes a regex and any number of attribute names to assign to each
 * sub pattern in pattern order
 *
 * Example 1:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2, s3")
 *
 *   "123:456:789" -> {s1:123, s2:456, s3:789}
 *
 * Example 2:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, s2")
 *
 *   "123:456:789" -> {s1:123, s2:456}
 *
 * Example 3:
 *   regexAll("(\d+):(\d+):(\d+)", "s1, , s2")
 *
 *   "123:456:789" -> {s1:123, s2:789}
 */
public class RegexAllExtractor extends EventSinkDecorator<EventSink> {
  
  final Pattern pat;
  final List<String> names;

  /**
   * This will not thrown an exception
   */
  public RegexAllExtractor(EventSink snk, Pattern pat, List<String> names) {
    super(snk);
    this.pat = pat;
    this.names = names;
  }

  /**
   * Convenience constructor that may throw a PatternSyntaxException (runtime
   * exn).
   */
  public RegexAllExtractor(EventSink snk, String regex, List<String> names) {
    this(snk, Pattern.compile(regex), names);
  }

  @Override
  public void append(Event e) throws IOException, InterruptedException {
    String s = new String(e.getBody());
    Matcher m = pat.matcher(s);
    String val = "";
    Integer grpCnt = m.groupCount();

    if(m.find()){
      for(int grp = 1; grp <= grpCnt; grp++){
        val = "";
        try {
          val = m.group(grp);
        } catch (IndexOutOfBoundsException ioobe) {
          val = "";
        }

        //Try/Catch so that we don't require there be the same number of names as patterns.
        try {
          //Ignore blank names. These are most likely sub patterns we don't care about keeping.
          if (!"".equals(names.get(grp-1))) {
            Attributes.setString(e, names.get(grp-1), val);
          }
        } catch (IndexOutOfBoundsException ioobe) {
          break;
        }
      }
    }
    super.append(e);
  }

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 2, "usage: regexAll(\"regex\", \"col1,col2\")");
        String regex = argv[0];
        Pattern pat = Pattern.compile(regex);
        String name = argv[1];
        ArrayList<String> nameList = new ArrayList<String>();
        String[] names = name.split("\\,");
        for(int i=0; i<names.length; ++i){
         nameList.add(names[i]);
        }

        EventSinkDecorator<EventSink> snk = new RegexAllExtractor(null, pat, nameList);
        return snk;

      }
    };
  }
  
  /**
  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin decorator.
  */
      public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
          List<Pair<String, SinkDecoBuilder>> builders =
                  new ArrayList<Pair<String, SinkDecoBuilder>>();
          builders.add(new Pair<String, SinkDecoBuilder>("regexAll",
                  builder()));
          return builders;
   }
}

Flume-Solr sink commit a single records into an Solr server based on a single Flume event. This sink have one input attribute: url. The attribute url is the url of the output Solr server. Yes, that means currently one sink can be configured to output into just one Solr server. Implementing a SolrEventSink sink is very simple.

package com.cloudera.flume.handlers.solr;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import com.cloudera.util.*;
import org.slf4j.LoggerFactory;

/**
* This class will convert attributes added into Flume event from RegexAllExtractor class into solr document and 
* store into solr.
*/
public class SolrEventSink extends EventSink.Base {

 static final Logger log = LoggerFactory.getLogger(SolrEventSink.class);
 private CommonsHttpSolrServer server;
 private String solrURL;

  public SolrEventSink(String url) {
   Preconditions.checkNotNull(url);
   this.solrURL = url;
   log.info("solr server url"+solrURL);
   try {
  this.server = new CommonsHttpSolrServer(solrURL);
 } catch (MalformedURLException e) {
  log.error("Invalid Solr URL");
 }
  }
  
  @Override
  public void append(Event e) throws IOException {
   
/* if(e.getAttrs().get((Object)"UnUsedField")==null)
 {
*/  // generate solr document.
  SolrInputDocument document = new SolrInputDocument();
  for (Entry<String, byte[]> a : e.getAttrs().entrySet()) {
   if(!a.getKey().equals("AckType") && !a.getKey().equals("AckTag") && !a.getKey().equals("AckChecksum") && !a.getKey().equals("tailSrcFile"))
    document.addField(a.getKey(), Bytes.toString(a.getValue()));  
  }
    try {
     // Add documnet into solr.
     if(document.size()!=0){
      server.add(document);
      // Commit the data into solr.
      server.commit();
     }
 } catch (SolrServerException e1) {
  log.error("SolrServerException : Exception communicating to the Solr Server instance");
 }catch (SolrException exception) {
  log.error("Solr Exception: "+ exception);
 }catch (Exception exception) {
  log.error("Exception : "+ exception);
 }
 //}
  }

  public static SinkBuilder builder() {
    return new SinkBuilder() {

      @Override
      public EventSink build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 1,
            "usage: SolrEventSink(serverURL)");

        return new SolrEventSink(argv[0]);
      }

    };
  }
  
  /**
  * This is a special function used by the SourceFactory to pull in this class
  * as a plugin sink.
  */
    public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
      List<Pair<String, SinkBuilder>> builders =
        new ArrayList<Pair<String, SinkBuilder>>();
      builders.add(new Pair<String, SinkBuilder>("solrEventSink", builder()));
      return builders;
    }
  
}

You need to add flume-plugin-solrEventSink and flume-plugin-regexAllExtractor jar into Flume lib dir. You can compile it from Flume sources and Solr jars, then you need to add following jars into Flume's lib dir.

Jar names :
    apache-solr-solrj-3.1.0.jar
    commons-codec-1.3.jar
    commons-fileupload-1.2.2.jar
    commons-httpclient-3.1.jar
    commons-io-1.4.jar
    commons-logging-1.0.4.jar
    geronimo-stax-api_1.0_spec-1.0.1.jar
    hbase-0.90.1-CDH3B4.jar
    lucene-analyzers-2.9.1.jar
    lucene-core-2.9.1.jar
    lucene-highlighter-2.9.1.jar
    lucene-memory-2.9.1.jar
    lucene-misc-2.9.1.jar
    lucene-queries-2.9.1.jar
    lucene-snowball-2.9.1.jar
    lucene-spellchecker-2.9.1.jar
    servlet-api-2.5.jar
    slf4j-api-1.6.1.jar
    slf4j-simple-1.6.1.jar
    solr-commons-csv-1.4.0.jar
    solr-core-1.4.0.jar
    stax-api-1.0.1.jar
    wstx-asl-3.2.7.jar

The last step is to add plugins to Flume configuration(flume-conf.xml) file .

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>com.cloudera.flume.handlers.solr.SolrEventSink,com.cloudera.flume.core.extractors.RegexAllExtractor</value>
</property>  
.
.
</configuration>

Run Solr server:
java -Dsolr.solr.home=path/to/solr_home/solr/  -Djetty.port=12000 -jar start.jar

Note: Solr schema.xml file must contain the entries of column names specified in configuration of regexAll decorator in field tag. Otherwise invalid column name error ocurred and document not saved in solr.

configuration of Flume agent:
source : tail("path/to/log/file")
sink :  agentSink("collector_machine_name",35853)

configuration of Flume collector:
source : collectorSource(35853)
sink : {regexAll("regex","column_names") => solrEventSink("solr_server_url")}


Example:-

Run Solr server :
    $ cd SOLR_HOME
    $ java -Dsolr.solr.home=path/to/SOLR_HOME/solr/  -Djetty.port=12000 -jar start.jar

Create a file test.log and add following lines:
abc,bcd,cde
abc,swd,qwe
bcd,wer,asd
ghj,rty,ghj

Run Flume master:

    $ cd FLUME_HOME
    $ bin/flume master
Run Flume agent:
    $ cd FLUME_HOME
    $ bin/flume node -n test_agent
Run Flume collector:
    $ cd FLUME_HOME
    $ bin/flume node -n test_collector

Configuration of Flume agent:
    source : tail("/path/to/test.log")
    sink :  agentSink("localhost",35853)

Configuration of Flume collector:
    source : collectorSource(35853)
    sink : {regexAll("([^,]*),([^,]*),([^,]*)","id,cat,name")=>solrEventSink("http://hadoop-namenode:12000/solr")}


Note : Solr schema.xml must contains the entries of id,cat,name in field tag. Otherwise invalid column name error ocurred and document not saved in Solr. 



Thursday, January 5, 2012

Error that occured in Hadoop and its sub-projects

1. OOZIE job failed:

Error message : ERROR is considered as FAILED for SLA
   
Cause 1 : Not able to find hadoop namenode (master), jobtracker machine.
Suppose you are running oozie, hadoop-master and job tracker on one machine  and datanode, tasktracker are running on another machine.

Your job.properties file contains following lines:
        nameNode=hdfs://localhost:9000
        jobTracker=localhost:9001
   
In above case, FS action will work fine because no map-reduce opertion is perform in FS action case. But, if you run map-reduce action then tasktracker will look hadoop-master on localhost machine becuase we have used localhost:9000 in job.properties file.
   
Solution : Used  IP of hadoop-namenode and jobtracker machine in job.properties file instead of localhost.   
   
Cause 2 : Oozie not able to find Mysql server.
Suppose I am using mysql as a metastore for hive.
Hive hive-default.xml file have following lines :
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
       
Solution : Use IP of mysql machine instead of localhost. 


2. Zookeeper server not running:
Error message: Could not find my address: zk-serevr1 in list of ZooKeeper quorum servers
   
Causes :
HBase tries to start a ZK server on some machine but that machine isn't able to find itself in the hbase.zookeeper.quorum configuration. This is a name lookup problem. 

Solution:   
Use the hostname presented in the error message instead of the value you used (zk-server1). If you have a DNS server, you can set hbase.zookeeper.dns.interface and hbase.zookeeper.dns.nameserver in hbase-site.xml to make sure it resolves to the correct FQDN.

3. Hadoop-datanode job failed or datanode not running: java.io.IOException: File ../mapred/system/jobtracker.info could only be replicated to 0 nodes, instead of 1
   
Cause 1: Make sure atleast one datanode is running.

Cause 2: namespaceID of master and slaves machines are not same.
If you see the error java.io.IOException: Incompatible namespaceIDs in the logs of a datanode , chances are you are affected by bug HADOOP-1212 (well, I’ve been affected by it at least).
           
Solution :               
If namespaceID of master and slaves machines are not same. Than replace the namespaceID of slaves machine with master namespaceID.
- dfs/name/current/VERSION file contains the namespaceID of master machine
- dfs/data/current/VERSION file contains the namespaceID of master machine
        
Cause 3: Datanode instance running out of space.
Solution : Free some space.

Cause 4 : You may also get this message due to permissions. May be JobTracker can not create jobtracker.info on startup.

4.    Sqoop export command failed:
Error message:
attempt_201101151840_1006_m_000001_0, Status : FAILED
java.util.NoSuchElementException
at java.util.AbstractList$Itr.next(AbstractList.java:350)
at impressions_by_zip.__loadFromFields(impressions_by_zip.java:159)
at impressions_by_zip.parse(impressions_by_zip.java:108)

   
Cause : Given field separator is not valid
Solution : Specify correct field delimeter in sqoop export command.

5. HBase regionserver not running :

Error message: 2012-01-02 13:48:49,973 FATAL org.apache.hadoop.hbase.regionserver.HRegionServer: Master rejected startup because clock is out of sync
org.apache.hadoop.hbase.ClockOutOfSyncException: org.apache.hadoop.hbase.ClockOutOfSyncException: Server hadoop-datanode2,60020,1325492317440 has been rejected; Reported time is too far out of sync with master.  Time difference of 206141ms > max allowed of 30000ms

Solution: Clock of regionservers are not sync with master machine. Synchronized the clock of hbase master and regionserver machines.