Mapping Message offsets to Time

Apache Kafka stores the message with offset. The following are several ways the messages can be retrieved with time;

    Apache Kafka System Tools

      Apache Kafka provided following system tools:

          The following function gives the time as a long value      

         Get Offset Shell

         get offsets for a topic

 

bin/kafka-run-class.sh kafka.tools.GetOffsetShell

 

         required argument [broker-list], [topic]
        Option Description

        --broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to. 
        --max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000) 
        --offsets <Integer: count> number of offsets returned (default: 1)
       --partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default) 
       --time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore  >   see the example getLastOffset method under Kafka Low level consumer API
       --topic <topic> REQUIRED: The topic to get offsets from.

    Kafka Low Level Consumer API

           The low level consumer API is stateless and provides fine grained control over the Kafka broker and the consumer.It allows consumers to set the message offset with every request              raised to the broker and maintains the metadata on consumer's end. The topicsMetadata()  method of kafka.javaapi.TopicMetadataResponse class is used to find the topic of interest            from the lead broker. For message partition reading, the kafka.api.OffsetRequest class defines two constants: EarliestTime and LatestTime, to find the beginning of the              data in the logs and the new messages stream. 

           The following is the simple consumer API class diagram

                 

           A simpleConsumer class provides a connection to the lead broker for fetching messages from the topic and methods to get topic metadata and list of offsets.

          Finding Starting Offset for Reads

            The method getLastOffset returns message offsets from the beginning of the data in the logs and starts streaming from there.

            kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data

            kafka.api.OffsetRequest.LatestTIme() will only stream new messages.

           

        public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
              TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
              Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
              requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
              kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
              OffsetResponse response = consumer.getOffsetsBefore(request);
 
              if (response.hasError()) {
                   System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
                   return 0;
              }
              long[] offsets = response.offsets(topic, partition);
              return offsets[0];
        }