Broker Monitoring Interface
Analysis
EPS specifies following interface for broker monitoring interface
1) Get Broker Status: This operation determines the current status of the broker. This can be used as a general health-monitoring tool. Here broker is apache kafka so we have fetch current status of kafka and return. The Status could be Starting,Running, Shutdown, Suspending.
Input: No Input
Output: BrokerStatus
2) Get Broker Statistics: This operation is used to find basic statistics about the broker. Here broker is apache kafka application so we have fetch statistics of kafka and return. Statistics could be StartTime, TotalInputMessages, TotalTopics.
Input: No Input
Output: BrokerStatistics
3) Get Topic Statistics: This operation is used to get basic operational statics about a topic. This would include information about the message in various states, subscribers, publishers, etc. Here we have to fetch topic statistics from either Transient topic(eps cache) or RobustTipic(kafka) by passing topic as input parameter. Statistics could be ActiveMessages, ActiveSubscriptions, TotalMessages,DeliveryFaults.
Input: Topic Name
Output: TopicStatistics
Design
Following methods needs to be implemented in BrokerMonitoringServiceImpl.java
1) getBrokerStatus: Need to identify how to get the kafka server/broker status. Could not find correct api to fetch status.
2) getBrokerStatistics: Here we have to use kafka java api and jmx mbean api
Below code is for getting total topics in server
ZooKeeper zk = new ZooKeeper(KafkaProperties.zookeeperConnect, 10000, null);
List<String> topics = zk.getChildren(ZkUtils.BrokerTopicsPath(), false);
statistics.setTotalTopics(topics.size());
Below code is for getting broker start time
List<String> brokers = zk.getChildren(ZkUtils.BrokerIdsPath(), false);
for (String broker : brokers) {
statistics.setName(broker);
String brokerInfo = new String(zk.getData(ZkUtils.BrokerIdsPath()
+ "/" + broker, false, null));
if (brokerInfo != null) {
String[] brokerStatistics = brokerInfo.split(",");
if (brokerStatistics != null && brokerStatistics.length > 1) {
String strTime = brokerStatistics[1].split(":")[1]
.replaceAll("\"", "");
long time = Long.parseLong(strTime);
statistics.setStarted(new Date(time));// broker starting time
}
}
}
Below code is for total messages in broker, here we used jmx console mbean to fetch the information
JMXServiceURL serviceUrl = new JMXServiceURL(KafkaProperties.JMX_URL);
JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl,
null);
try {
MBeanServerConnection mbeanConn = jmxConnector
.getMBeanServerConnection();
ObjectName name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
count = Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));
} finally {
jmxConnector.close();
}
3) getTopicStatistics:
Below code is for fetching active messages within topic
JMXServiceURL serviceUrl = new JMXServiceURL(KafkaProperties.JMX_URL);
jmxConnector = JMXConnectorFactory.connect(serviceUrl,null);
MBeanServerConnection mbeanConn = jmxConnector.getMBeanServerConnection();
ObjectName name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic="+topic);
activeMsgs = Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));
topicStatistics.setActiveMessages(activeMsgs);
Below code is for fetching delivery faults messages
name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic="+topic);
deliveryFaults= Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));
topicStatistics.setDeliveryFaults(deliveryFaults);
Below code is for fetching total messages within topic
name = new ObjectName(
"kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic="+topic);
totalMsgs= activeMsgs+deliveryFaults+Long.parseLong(String.valueOf(mbeanConn.getAttribute(name,
"Count")));
topicStatistics.setTotalMessages(totalMsgs);
Below code is for fetching consumers/subscribers list for topic
ZooKeeper zk = new ZooKeeper(KafkaProperties.zookeeperConnect, 10000, null);
List<String> consumers = zk.getChildren("/consumers", false);
List<String> subscriberList = new ArrayList<String>();
for (String group : consumers) {
scala.collection.Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group,false);
if(consumersPerTopicMap.contains(topic)){
subscriberList.add(group);
}
}
topicStatistics.setActiveSubscriptions(subscriberList.size());
Note: You have to define JMX_PORT in kafka-server-start.sh/kafka-server-start.bat(win)
$ env JMX_PORT=9999 / set JMX_PORT=9999