Wednesday, December 31, 2014

Kafka Round Robin Partitioner

In this blog we will cover how we can write a custom partition in Kafka to distribute data uniformly between a topic partitions.

Let’s consider, we have a TopicA in Kafka which has partitions count 5 and replication factor 3 and we want to distribute data uniformly between all the partitions so that all the partitions contains same data size.
The Kafka uses the default partition mechanism to distribute data between partitions, but in case of default partition mechanism it might be possible that our some partitions size larger than others. Suppose we have inserted 40 GB data into Kafka, then the data size of each partition may look like:
Partition0 à 10 GB
Partition1 à 8 GB
Partition2 à 6 GB
Partition3 à 9 GB
Partition4 à 11 GB

Hence, to distribute a data uniformly, we would need to code a round robin custom partition class.
In Kafka, we can write custom partition class by implementing the Kafka kafka.producer.Partitioner interface. The implemented interface contain a method called partition which has two arguments, one is key that we provide from producer and use to partition the data and second one is number of partitions of a topic. The partition method contains logic to calculate the destination partition and return the target partition number.

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Round robin partitioner using a simple thread safe AotmicInteger
 */
public class RoundRobinPartitioner implements Partitioner {
    private static final Logger log = Logger.getLogger(RoundRobinPartitioner.class);

    final AtomicInteger counter = new AtomicInteger(0);

    public RoundRobinPartitioner(VerifiableProperties props) {
        log.trace("Instatiated the Round Robin Partitioner class");
    }
    /**
     * Take key as value and return the partition number
     */
    public int partition(Object key, int partitions) {

    int partitionId = counter.incrementAndGet() % partitions;
if (counter.get() > 65536) {
           counter.set(0);
}
return partitionId; 
    }
}

Now, while writing a producer we can set “partitioner.class” property in the instance of kafka.producer.ProducerConfig.

Properties props = new Properties();
props.put("partitioner.class",”com.learnining.kafka.RoundRobinPartitioner");

After using the round robin partition class, the disk usage of each partition will be look like:
Partition0 à 8 GB
Partition1 à 8 GB
Partition2 à 8 GB
Partition3 à 8 GB
Partition4 à 8 GB