Apache NIFI Custom Processor Design
Introduction
Apache NiFi (Incubating) is a dataflow system based on the concepts of flow-based programming. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. NiFi has a web-based user interface for design, control, feedback, and monitoring of dataflows. It is highly configurable along several dimensions of quality of service, such as loss-tolerant versus guaranteed delivery, low latency versus high throughput, and priority-based queuing. NiFi provides fine-grained data provenance for all data received, forked, joined cloned, modified, sent, and ultimately dropped upon reaching its configured end-state.
Download and Build Code
Checking out from Git
To check out the code:
git clone http://git-wip-us.apache.org/repos/asf/incubator-nifi.git
Then checkout the 'develop' branch
git checkout develop
Build steps
- You need a recent Java 7 (or newer) JDK.
- You need Apache Maven 3.X.
- Build the nifi nar maven plugin.
- In the root dir of the source tree cd to
nifi-nar-maven-plugin
. - Run
mvn clean install
- In the root dir of the source tree cd to
- Build the main nifi code base.
- In the root dir of the source tree cd to
nifi
- run
mvn clean install
- In the root dir of the source tree cd to
Running the application
Running the above build will create a tar.gz (and zip) file in nifi/nifi-assembly/target
. This tar.gz should contain the full application. Decompressing the tar.gz should make a directory for you containing several other directories. conf
contains application configuration, bin
contains scripts for launching the application. On linux and OSX, NiFi can be run using bin/nifi.sh <command>
where <command>
is one of:
- start: starts NiFi in the background
- stop: stops NiFi that is running in the background
- status: provides the current status of NiFi
- run: runs NiFi in the foreground and waits to receive a Ctrl-C, which then shuts down NiFi.
- install: (available in Linux only, not OSX): installs NiFi as a service that can then be controlled via
service nifi start
,service nifi stop
,service nifi status
.
For Windows users, there exist several scripts in the bin
directory that are analogous to those above: start-nifi.bat
, stop-nifi.bat
, nifi-status.bat
, and run-nifi.bat
.
The configuration that is to be used when launching NiFi, such as Java heap size, the user to run as, which Java command to use, etc. are configurable via the conf/bootstrap.conf
file.
The entire concept of how the application will integrate to a given OS and run as an enduring service is something we're working hard on and would appreciate ideas for. The user experience needs to be excellent.
With the default settings you can point a web browser at http://localhost:8080/nifi/
Logging is configured by default to log to ./logs/nifi-app.log
.
Create Custom Processor
Create a custom processor class by extending org.apache.nifi.processor.AbstractProcessor
. AbstractProcessor is the base class for all processor implementations. Create custom processor in nifi-standard-processors module (/nifi/nifi-nar-assemblies/nifi-standard-bundle/nifi-standard-processors) project and then, in the nifi-standard-processors/src/main/resources/META-INF/org.apache.nifi.processor.Processor file, you will need to add a line with your fully-qualified Processor name. This is how the UI discovers the available processors.
Custom SMS processor
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
@TriggerWhenEmpty
@Tags({"sms", "text"})
@CapabilityDescription("Sends text message to specified recipient cell phone number in input file.")
public class SendSMS extends AbstractProcessor {
public static final Relationship REL_SMS_SEND = new Relationship.Builder().name("smssend").description("The orginal text message send to recipient cell phone number.").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If sms can not send for some reason, the original message will be routed to this destination").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SMS_SEND);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final Properties prop = new Properties();
System.out.println("trigger:");
logger.info("onTrigger");
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
// load a properties file
prop.load(rawIn);
String contactNumber = prop.getProperty("contact_number");
String smsText = prop.getProperty("message_text");
logger.info("cotact number: "+contactNumber);
logger.info("text: "+smsText);
try {
logger.info("result***********: "+sendSMS(contactNumber,smsText));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return;
}
});
session.transfer(flowFile, REL_SMS_SEND);
}
private String sendSMS(String contactNumber, String smsText) throws Exception{
HttpClient client = HttpClientBuilder.create().build();
HttpPost post = new HttpPost("http://smsgateway.ca/services/message.svc/XAO706fy87/"+contactNumber);
String input = "{\"MessageBody\":\""+smsText+"\"}";
StringEntity params = new StringEntity(input,"UTF-8");
params.setContentType("application/json; charset=UTF-8");
post.setEntity(params);
HttpResponse response = client.execute(post);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
StringBuffer result = new StringBuffer();
String line = "";
while ((line = rd.readLine()) != null) {
result.append(line);
}
return result.toString();
}
}