Apache NIFI Custom Processor Design

Introduction

 

Apache NiFi (Incubating) is a dataflow system based on the concepts of flow-based programming. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. NiFi has a web-based user interface for design, control, feedback, and monitoring of dataflows. It is highly configurable along several dimensions of quality of service, such as loss-tolerant versus guaranteed delivery, low latency versus high throughput, and priority-based queuing. NiFi provides fine-grained data provenance for all data received, forked, joined cloned, modified, sent, and ultimately dropped upon reaching its configured end-state.

Download and Build Code

Checking out from Git

To check out the code:

git clone http://git-wip-us.apache.org/repos/asf/incubator-nifi.git

Then checkout the 'develop' branch

git checkout develop

Build steps

  1. You need a recent Java 7 (or newer) JDK.
  2. You need Apache Maven 3.X.
  3. Build the nifi nar maven plugin.
    • In the root dir of the source tree cd to nifi-nar-maven-plugin.
    • Run mvn clean install
  4. Build the main nifi code base.
    • In the root dir of the source tree cd to nifi
    • run mvn clean install

Running the application

Running the above build will create a tar.gz (and zip) file in nifi/nifi-assembly/target. This tar.gz should contain the full application. Decompressing the tar.gz should make a directory for you containing several other directories. conf contains application configuration, bin contains scripts for launching the application. On linux and OSX, NiFi can be run using bin/nifi.sh <command> where <command> is one of:

  • start: starts NiFi in the background
  • stop: stops NiFi that is running in the background
  • status: provides the current status of NiFi
  • run: runs NiFi in the foreground and waits to receive a Ctrl-C, which then shuts down NiFi.
  • install: (available in Linux only, not OSX): installs NiFi as a service that can then be controlled via service nifi start, service nifi stop, service nifi status.

For Windows users, there exist several scripts in the bin directory that are analogous to those above: start-nifi.bat, stop-nifi.bat, nifi-status.bat, and run-nifi.bat.

The configuration that is to be used when launching NiFi, such as Java heap size, the user to run as, which Java command to use, etc. are configurable via the conf/bootstrap.conf file.

The entire concept of how the application will integrate to a given OS and run as an enduring service is something we're working hard on and would appreciate ideas for. The user experience needs to be excellent.

With the default settings you can point a web browser at http://localhost:8080/nifi/

Logging is configured by default to log to ./logs/nifi-app.log.

 

Create Custom Processor

Create a custom processor class by extending org.apache.nifi.processor.AbstractProcessor. AbstractProcessor is the base class for all processor implementations. Create custom processor in nifi-standard-processors module (/nifi/nifi-nar-assemblies/nifi-standard-bundle/nifi-standard-processors) project and then, in the nifi-standard-processors/src/main/resources/META-INF/org.apache.nifi.processor.Processor file, you will need to add a line with your fully-qualified Processor name. This is how the UI discovers the available processors.

 

Custom SMS processor

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
 
@TriggerWhenEmpty
@Tags({"sms", "text"})
@CapabilityDescription("Sends text message to specified recipient cell phone number in input file.")
public class SendSMS extends AbstractProcessor {

    public static final Relationship REL_SMS_SEND = new Relationship.Builder().name("smssend").description("The orginal text message send to recipient cell phone number.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If sms can not send for some reason, the original message will be routed to this destination").build();

    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> properties = new ArrayList<>();
         
        this.properties = Collections.unmodifiableList(properties);
        
        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SMS_SEND);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ProcessorLog logger = getLogger();
        final Properties prop = new Properties();
        System.out.println("trigger:");      
        logger.info("onTrigger");
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(final InputStream rawIn) throws IOException {
                // load a properties file
                prop.load(rawIn);
                String contactNumber = prop.getProperty("contact_number");
                String smsText = prop.getProperty("message_text");
                
                logger.info("cotact number: "+contactNumber);
                logger.info("text: "+smsText);
                
                try {
                    logger.info("result***********: "+sendSMS(contactNumber,smsText));
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return;
            }
        });

         
        session.transfer(flowFile, REL_SMS_SEND);
         
    }
    
    private String sendSMS(String contactNumber, String smsText) throws Exception{
        
        HttpClient client = HttpClientBuilder.create().build();
        HttpPost post = new HttpPost("http://smsgateway.ca/services/message.svc/XAO706fy87/"+contactNumber);
        String input = "{\"MessageBody\":\""+smsText+"\"}";
        StringEntity params = new StringEntity(input,"UTF-8");
        params.setContentType("application/json; charset=UTF-8");
        post.setEntity(params);
        
        HttpResponse response = client.execute(post);
        BufferedReader rd = new BufferedReader(
                new InputStreamReader(response.getEntity().getContent()));
        
        StringBuffer result = new StringBuffer();
        String line = "";
        while ((line = rd.readLine()) != null) {
            result.append(line);
        }
        return result.toString();
    }
}