Escalation Analysis and Design
Analysis
UCS standard doesn't specifies the concrete way to keep track to messages' responses. The standard mentions that the field relatedMessageId in MessageHeader class can be used for this, but the original message doesn't keep a track of its responses (I'm thinking in something similar to DeliveryStatus class for keep track of the delivery).
UCS is not supposed to keep track for responses to all the Messages it delivers. These are the requirements for a Message to have its responses tracked:
- MessaheHeader.respondBy > 0
- MessageHeader.receiptNotification == true
Note that the current implementation doesn't distinguish between Message types: Alert, Notification, Simple.
When a candidate Message is received by UCS, it configures a cron job to be fired at MessaheHeader.respondBy minutes. When the trigger fires, the following actions take place:
1.- If there in not known response for a Message in UCS and the Message doesn't specify any onNoResponseAll/onNoResponseAny Messages, then FINISH.
2.a- If there is not known response for the Message in UCS, then ALL the message from onNoResponseAll are processed.
2.b- If there is at least 1 known response for the Message, but the number of responses doesn't match the number of Recipients in the original Message, then ALL the message from onNoResponseAny are processed.
3.- If there are as many responses for a particular Message as recipients it contains (and there is one response for each recipient), then FINISH.
The open questions is: Is there any relationship between the original Message and any possible Message that was triggered because of a timeout response?
My initial inclination was to use MessageHeader.relatedMessageId to link those 2 messages together, but this field has another purpose: to link a Message with its response/s.
If we don't want to modify UCS standard, we will need to take care of this information in our implementation.
Design
New Class: TimedOutMessage
A new class must be introduced to add some context to a Message response timeout.
This class will initially have 2 attributes:
1.- message -> The message that timed out
2.- timeOutReason -> This could be represented as an ENUM with the following values: NO_RESPONSES, PARTIAL_RESPONSES
The initial value of timeOutReason is NO_RESPONSES.
UCSControllerService Interface Modifications
UCSControllerService interface has to support 2 new methods:
1.- public void setupResponseTimeout(Message message)
2.- public Set<TimedOutMessage> consumeMessagesWithResponseTimeout()
The first method will receive a Message and configure a cron (probably using Quartz) job to be executed at message.messageHeader.respondBy minutes. The cron must only be configured if the message fulfills the requirements specified before.
We need to investigate whether NiFi lready provides a scheduler service we could use or not.
The second method will consume any Message present in [messagesWithResponseTimeout] and return them as a Set.
MessageStore Interface Modifications
MessageStore interface now requires an extra method:
public Set<Message> getRelatedMessages(String messageId)
This method will return any Message in the store whose header.relatedMessageId is equals to the provided messageId. The meaning of this method is to return all the responses associated to a Message.
Cron Job Execution
When a time out cron job gets executed, it must perform the following steps:
1.- Get all the related Messages for the Message that timed out using MessageStore.getRelatedMessages().
2.a- If there is no related Message, a new entry in [messagesWithResponseTimeout] must be added for the Message with a timeOutReason = NO_RESPONSES
2.b- If the number of related messages is less than the number of original recipients, a new entry in [messagesWithResponseTimeout] mist be added for the Message with a timeOutReason = PARTIAL_RESPONSES. *
2.c- If there is 1 related message for each of the original recipients, no entry is added to [messagesWithResponseTimeout].
*The match between the related message sender and original message recipient could be done first by comparing sender.getDeliveryAddressId() and recipient.getDeliveryAddressId(). If there is no match, we could try another way by comparing sender.getPhysicalAddress().getAddress() and recipient.getPhysicalAddress().getAddress()
Setup of Cron Jobs
The UCSPersistMessage processor will be in charge of the invocation of UCSControllerService.setupResponseTimeout() after a Message is persisted. We could have done this inside UCSControllerService.saveMessage(), but we decided to keep them as 2 separate methods in case we required each of them individually.
Timed Out Responses consumption
The consumption of timed out messages in, in some way, similar to the way we are consuming incoming CHAT messages in ReceiveChatMessage processor. A synchronized collection of timed out messages is hold by UCSControllerService. A new processor that polls from that collection and yields needs to be created. We can use ReceiveChatMessage processor as an example.
The new processor [UCSProcessResponseTimeout?] will process each TimedOutMessage in the following way:
1.- if timeOutReason of the TimedOutMessage being processed is NO_RESPONSES, the processor will create a new FlowFile for each Message in the original Message's onNoResponseAll property. These FlowFiles are then transfer to REL_NO_RESPONSES.
2.- if timeOutReason of the TimedOutMessage being processed is PARTIAL_RESPONSES, the processor will create a new FlowFile for each Message in the original Message's onNoResponseAny property. These FlowFiles are then transfer to REL_PARTIAL_RESPONSES.
At this point, any Message sent through REL_NO_RESPONSES and/or REL_PARTIAL_RESPONSES can be treated by UCS as a regular incoming Message. This new processor can be then connected to the existing UCSValidateMessage processor in UCS workflow.