Analysis
...
The EPS service enables devices and systems that generate events/data/messages needed by other consumers to publish those events to an intermediary that is responsible for their subsequent distribution to interested parties.
...
PublicationIFace
1) Publish Event/Push Event: This operation allows a publisher to publish an event to an already defined topic. The publisher must have access to publish on the specific topic.After successful of publish event, message now appears in the topic and subscribers are notified.
...
Output: Message Id
2) Delete Event/Pull Event: This operation allows a publisher to remove a previously published event from a topic.Depending on the topic configuration, this action may not be allowed. Subscribers may be notified of the deletion depending on the nature of their subscription and the configuration of the topic. When an event is cross-posted to more than one topic, it is only removed from the selected topic. After successful deletion, event no longer appears under the topic.
...
publishEvent() implementation is for publication push messages and deleteEvent() implementation is for publication pull messages.
In publishEvent() implementation, we get the corresponding topic and publish the event to kafka server.
In deleteEvent() implementation, we delegate the event to subscriber to pull the message from kafka server. by getting corresponding kafka topic.
Design
1) RobustTopic.java - First we have to write new class RobustTopic which extends CommonTopicBase and implement publishEvent() method which intern connect to Kafka broker implementation class(BrokerManagementServiceImpl) to push event/messages into kafka server.
2) Add RobustTopic bean definition in Topics.xml file.
<!-- Robust topic section -->
<bean class="org.socraticgrid.hl7.services.eps.internal.model.RobustTopic"
name="kafkatopic" id="kafkatopic">
<property name="subTopics">
<map>
<entry key="KafkaTopic">
<ref bean="kafkaTopicBean" />
</entry>
</map>
</property>
</bean>
<bean name="kafkaTopicBean" id="kafkaTopicBean"
class="org.socraticgrid.hl7.services.eps.internal.model.RobustTopic">
<property name="parentTopic" ref="kafkatopic" />
<property name="topic">
<bean class="org.socraticgrid.hl7.services.eps.model.Topic">
<property name="name" value="Test Kafka Topic" />
<property name="optionsList">
<bean class="org.socraticgrid.hl7.services.eps.model.Options">
<property name="access" value="Open"></property>
<property name="durability" value="Robust"></property>
</bean>
</property>
<!-- <property name="publicationInterventions">
<list>
<ref bean="FHIRLabReview"/>
<ref bean="FHIRHCSReview"/>
</list>
</property> -->
</bean>
</property>
</bean>
3) Update StaticTopicTree bean definition in TopicsLocation.xml file which refers to kafkatopic bean. By doing this change, StaticTopicTree class locateTopic returns will be injected with RobustTopic instead of TransientTopic.
<bean id="BaseStaticLocator" name="BaseStaticLocator"
class="org.socraticgrid.hl7.services.eps.internal.StaticTopicTree">
<property name="rootTopic" ref="kafkatopic" />
</bean>
Assumptions: Following are the assumptions when we use publication
...