Apache Kafka stores the message with offset. The following are several ways the messages can be retrieved with time;
...
Kafka Low Level Consumer API
The low level consumer API is stateless and provides fine grained control over the Kafka broker and the consumer.It allows consumers to set the message offset with every request raised to the broker and maintains the metadata on consumer's end. The topicsMetadata() method of kafka.javaapi.TopicMetadataResponse class is used to find the topic of interest For from the lead broker. For message partition reading, the kafka.api.OffsetRequest
class defines two constants: EarliestTime
and LatestTime
, to find the beginning of the data in the logs and the new messages stream.
The following is the simple consumer API class diagram
to find A simpleConsumer class provides a connection to the lead broker for fetching messages from the topic and methods to get topic metadata and list of offsets.
Finding Starting Offset for Reads
The method getLastOffset returns message offsets from the beginning of the data in the logs and starts streaming from there.
kafka.api.OffsetRequest.EarliestTime() finds the new messages stream. beginning of the data
kafka.api.OffsetRequest.LatestTIme() will only stream new messages.
public
static
long
getLastOffset(SimpleConsumer consumer, String topic,
int
partition,
long
whichTime, String clientName) {
TopicAndPartition topicAndPartition =
new
TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,
new
PartitionOffsetRequestInfo(whichTime,
1
));
kafka.javaapi.OffsetRequest request =
new
kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if
(response.hasError()) {
System.out.println(
"Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition) );
return
0
;
}
long
[] offsets = response.offsets(topic, partition);
return
offsets[
0
];
}