Broker Monitoring Interface

Analysis

EPS specifies following interface for broker monitoring interface


1) Get Broker Status: This operation determines the current status of the broker. This can be used as a general health-monitoring tool. Here broker is apache kafka so we have fetch current status of kafka and return. The Status could be Starting,Running, Shutdown, Suspending.

Input: No Input

Output: BrokerStatus

 

2) Get Broker Statistics: This operation is used to find basic statistics about the broker. Here broker is apache kafka application so we have fetch statistics of kafka and return. Statistics could be StartTime, TotalInputMessages, TotalTopics.

Input: No Input

Output: BrokerStatistics

 

3) Get Topic StatisticsThis operation is used to get basic operational statics about a topic. This would include information about the message in various states, subscribers, publishers, etc. Here we have to fetch topic statistics from either Transient topic(eps cache) or RobustTipic(kafka) by passing topic as input parameter. Statistics could be ActiveMessages, ActiveSubscriptions, TotalMessages,DeliveryFaults.

Input: Topic Name

Output: TopicStatistics

Design

Following methods needs to be implemented in BrokerMonitoringServiceImpl.java 

1) getBrokerStatus: Need to identify how to get the kafka server/broker status. Could not find correct api to fetch status.

2) getBrokerStatistics: Here we have to use kafka java api and jmx mbean api

     Below code is for  getting total topics in server

     ZooKeeper zk = new ZooKeeper(KafkaProperties.zookeeperConnect, 10000, null);
     List<String> topics = zk.getChildren(ZkUtils.BrokerTopicsPath(), false);
     statistics.setTotalTopics(topics.size());

     Below code is for getting broker start time

List<String> brokers = zk.getChildren(ZkUtils.BrokerIdsPath(), false);
for (String broker : brokers) {
statistics.setName(broker);
String brokerInfo = new String(zk.getData(ZkUtils.BrokerIdsPath()
+ "/" + broker, false, null));
if (brokerInfo != null) {
String[] brokerStatistics = brokerInfo.split(",");
if (brokerStatistics != null && brokerStatistics.length > 1) {
String strTime = brokerStatistics[1].split(":")[1]
.replaceAll("\"", "");
long time = Long.parseLong(strTime);
statistics.setStarted(new Date(time));// broker starting time
}
}
}

Below code is for total messages in broker, here we used jmx console mbean to fetch the information

JMXServiceURL serviceUrl = new JMXServiceURL(KafkaProperties.JMX_URL);
JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl,
null);
try {
MBeanServerConnection mbeanConn = jmxConnector
.getMBeanServerConnection();
ObjectName name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");

count = Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));

} finally {
jmxConnector.close();
}

 

3) getTopicStatistics:

     Below code is for fetching active messages within topic

JMXServiceURL serviceUrl = new JMXServiceURL(KafkaProperties.JMX_URL);
jmxConnector = JMXConnectorFactory.connect(serviceUrl,null);
MBeanServerConnection mbeanConn = jmxConnector.getMBeanServerConnection();
 
ObjectName name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic="+topic);

activeMsgs = Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));

topicStatistics.setActiveMessages(activeMsgs);

Below code is for fetching delivery faults messages

name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic="+topic);
deliveryFaults= Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));
topicStatistics.setDeliveryFaults(deliveryFaults);

Below code is for fetching total messages within topic

name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic="+topic);
totalMsgs= activeMsgs+deliveryFaults+Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));
topicStatistics.setTotalMessages(totalMsgs);

Below code is for fetching consumers/subscribers list for topic

ZooKeeper zk = new ZooKeeper(KafkaProperties.zookeeperConnect, 10000, null);
List<String> consumers = zk.getChildren("/consumers", false);
List<String> subscriberList = new ArrayList<String>();
for (String group : consumers) {
scala.collection.Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group,false);
if(consumersPerTopicMap.contains(topic)){
subscriberList.add(group);
}

}
topicStatistics.setActiveSubscriptions(subscriberList.size());

 

Note: You have to define JMX_PORT in kafka-server-start.sh/kafka-server-start.bat(win)

$ env JMX_PORT=9999 / set JMX_PORT=9999