SFM Publication Interface
Analysis
The EPS service enables devices and systems that generate events/data/messages needed by other consumers to publish those events to an intermediary that is responsible for their subsequent distribution to interested parties.
EPS specifies following interface for event publication.
PublicationIFace
1) Publish Event/Push: This operation allows a publisher to publish an event to an already defined topic. The publisher must have access to publish on the specific topic.After successful of publish event, message now appears in the topic and subscribers are notified.
Inputs: Topic Id or Topic Path, Message/Event
Output: Message Id
2) Delete Event: This operation allows a publisher to remove a previously published event from a topic.Depending on the topic configuration, this action may not be allowed. Subscribers may be notified of the deletion depending on the nature of their subscription and the configuration of the topic. When an event is cross-posted to more than one topic, it is only removed from the selected topic. After successful deletion, event no longer appears under the topic.
Inputs: Topic Id or Topic Path, Message Id
Output: None
3) Assert Presence:This operation allows Publish-on-demand publishers to update their presence statuses. After successful of this method, For Offline status – Publisher requests and notifications are deferred and For Online status – Publisher requests and notifications are resumed.
Inputs: Publisher Id, Presence (Online, Offline)
Output: Success/Fail
We already have the existing EPS code base that supports transient topics (i.e. messages aren't durable). TransientTopic.java which support transient topics, similar to this we have implement new class RobustTopic.java which will support durable messages. We are using apache Kafka to provide a backing store for topics that are persistent. So we have to provide new imlementation for publishEvent() and deleteEvent() in newly created RobustTopic.java which internally talks to kafka server.
publishEvent() implementation is for publication push messages by getting corresponding kafka topic.
Design
1) RobustTopic.java - First we have to write new class RobustTopic which extends CommonTopicBase and implement publishEvent() method which intern connect to Kafka broker implementation class(BrokerManagementServiceImpl) to push event/messages into kafka server.
2) Add RobustTopic bean definition in Topics.xml file.
<!-- Robust topic section -->
<bean class="org.socraticgrid.hl7.services.eps.internal.model.RobustTopic"
name="kafkatopic" id="kafkatopic">
<property name="subTopics">
<map>
<entry key="KafkaTopic">
<ref bean="kafkaTopicBean" />
</entry>
</map>
</property>
</bean>
<bean name="kafkaTopicBean" id="kafkaTopicBean"
class="org.socraticgrid.hl7.services.eps.internal.model.RobustTopic">
<property name="parentTopic" ref="kafkatopic" />
<property name="topic">
<bean class="org.socraticgrid.hl7.services.eps.model.Topic">
<property name="name" value="Test Kafka Topic" />
<property name="optionsList">
<bean class="org.socraticgrid.hl7.services.eps.model.Options">
<property name="access" value="Open"></property>
<property name="durability" value="Robust"></property>
</bean>
</property>
<!-- <property name="publicationInterventions">
<list>
<ref bean="FHIRLabReview"/>
<ref bean="FHIRHCSReview"/>
</list>
</property> -->
</bean>
</property>
</bean>
3) Update StaticTopicTree bean definition in TopicsLocation.xml file which refers to kafkatopic bean. By doing this change, StaticTopicTree class locateTopic returns will be injected with RobustTopic instead of TransientTopic.
<bean id="BaseStaticLocator" name="BaseStaticLocator"
class="org.socraticgrid.hl7.services.eps.internal.StaticTopicTree">
<property name="rootTopic" ref="kafkatopic" />
</bean>
Assumptions: Following are the assumptions when we use publication
1) Kafka is up and running. If not, throw exception.
2) Topic is already created which is for publish and subscribe messages. If it does not exist, throw exception.
3) EPS already have one instance of the Kafka producer/subscriber interface class which connects to Kafka server to publish and subscribe messages.