Apache NiFi 2.0.0: Building Python Processors

Apache NiFi, a robust platform tailored for dataflow management, offers lots of features aimed at enhancing efficiency and flexibility in handling data. Its web-based user interface provides a seamless experience for designing, controlling and monitoring dataflows.
NiFi supports the building of custom processors and extensions, enabling users to tailor the platform to their specific needs.
With a multitenant user experience, NiFi ensures that multiple users can interact with the system concurrently, each with their own set of access privileges.
Python processors offer a powerful means to extend NiFi’s functionality, enabling users to leverage the rich ecosystem of Python libraries and tools within their dataflows. Here we’ll discuss the advantages of incorporating Python into NiFi workflows and explore practical use cases where Python processors can streamline data processing tasks, enhance flexibility and accelerate development.
Whether you’re aiming to integrate machine learning algorithms, perform custom data transformations or interact with external systems, building Python processors in Apache NiFi can help you meet those data integration needs.
What’s Apache NiFi Good for?
One of NiFi’s standout features is its highly configurable nature, allowing users to tailor data routing, transformation and system mediation logic to their specific requirements. NiFi helps users achieve the data processing outcomes they want, such as prioritizing loss tolerance over guaranteed delivery or optimizing for low latency versus high throughput.
Dynamic prioritization enables real-time adjustment of data priorities within the flow, while the ability to modify flows at runtime adds a layer of flexibility to adapt to changing requirements. NiFi also incorporates back pressure mechanisms to regulate data flow rates and prevent overload, ensuring smooth and efficient operation even under varying workloads.
NiFi is designed to support both vertical and horizontal scaling. Whether scaling up to leverage the full capabilities of a single machine or scaling out with a zero-leader clustering model, NiFi can accommodate data processing tasks of any magnitude.
Data provenance is another critical feature, allowing users to track the journey of data from its inception to its final destination. This provides invaluable insights for auditing, troubleshooting and ensuring data integrity throughout the process.
Security is paramount in NiFi, with support for SSL, SSH, HTTPS and encrypted content, among other security measures. Pluggable fine-grained role-based authentication and authorization mechanisms ensure that access to dataflows is carefully controlled, allowing multiple teams to manage and share specific portions of the flow securely.
NiFi’s design philosophy, inspired by concepts like flow-based programming and staged event-driven architecture, offers several compelling advantages:
- Intuitive visual interface for designing and managing data flows, enhancing productivity and ease of use.
- Asynchronous processing model, supporting high throughput and natural buffering to accommodate fluctuating workloads.
- Built-in concurrency management, abstracting away the complexities of multithreaded programming.
- Emphasis on component reusability and testability, promoting a modular and robust design approach.
- Native support for back-pressure and error handling, ensuring robustness and reliability in data processing pipelines.
- Comprehensive visibility into data flow dynamics, enabling effective monitoring and troubleshooting.
Why Build with Python in Apache NiFi?
Apache NiFi is a powerful tool for data ingestion, transformation and routing. Python processors in NiFi provide a flexible way to extend its capabilities, particularly for processing unstructured data or integrating with external systems like AI models or vector stores such as the cloud native vector database Milvus.
When dealing with unstructured file types that tools like Cloudera Data Flow can ingest, Python processors can be invaluable for implementing custom logic to parse and manipulate the data. For example, you might use Python to extract specific information from text documents, perform sentiment analysis on text data or preprocess images before further analysis.
Structured file types, on the other hand, can often be processed using NiFi’s built-in processors without the need for custom Python code. NiFi provides a wide range of processors for handling structured data formats like CSV, JSON, Avro, etc., as well as for interacting with databases, APIs, and other enterprise systems.
When you need to interface with AI models or other external systems like Milvus, Python processors offer a convenient way to integrate this functionality into your NiFi dataflows. For tasks such as text-to-text, text-to-image or text-to-speech processing, you can write Python code to interact with the relevant models or services and incorporate this processing into your NiFi pipelines.
Python: A New Era in NiFi 2.0.0
Apache NiFi 2.0.0 has introduced some significant improvements to the platform, especially in terms of Python integration and performance enhancements. The ability to seamlessly integrate Python scripts into NiFi dataflows opens up a wide range of possibilities for working with diverse data sources and leveraging the power of generative AI.
Before this version, while it was possible to work with Python in NiFi, the flexibility may have been limited, and executing Python scripts might not have been as streamlined as users might want. With the latest version, however, Python integration has been greatly improved, allowing for more seamless execution of Python code within NiFi pipelines.
Additionally, the support for JDK 21+ brings performance improvements, making NiFi faster and more efficient, particularly in handling multithreading tasks. This can significantly enhance the scalability and responsiveness of NiFi dataflows, especially when dealing with large volumes of data or complex processing tasks.
The introduction of features like Run Process Group as Stateless
and Rules Engine for Development Assistance
further enhances the capabilities and usability of NiFi, providing developers with more flexibility and tools to build robust dataflow pipelines.
A Sample Processor: Watson SDK to Foundation AI Model
This Python code defines a NiFi processor called CallWatsonXAI
that interacts with IBM WatsonX AI services to generate responses based on input prompts. Note everything with NiFi 2.0.0 – Python3.10+ is the minimum.
Let’s break down the code and explain each part.
Imports
These are the necessary imports for the script:
json
andre
are Python’s built-in modules for handling JSON data and regular expressions, respectively.FlowFileTransform
andFlowFileTransformResult
are classes from a custom module (nifiapi.flowfiletransform) related to NiFi processing.PropertyDescriptor
,StandardValidators
, andExpressionLanguageScope
are classes from another custom module (nifiapi.properties) used for defining processor properties.
Class Definition
- This defines a class called
CallWatsonXAI
that extends theFlowFileTransform
class, which presumably handles transformation of data within NiFi.
Processor Details
- Defines details about the processor such as version, description and tags. But note that 2.0.0-M2 is the current version.
Property Descriptors
- Defines properties that can be set for the processor. In this case,
PROMPT_TEXT
,WATSONXAI_API_KEY
, andWATSONXAI_PROJECT_ID
.
Constructor
- Initializes the processor class and appends property descriptors to the list of properties.
getPropertyDescriptors Method
- This method is required by NiFi processors to retrieve the list of properties.
transform Method
- This is the main method responsible for processing data. It receives a
context
object containing information about the processor’s execution environment and aflowfile object
containing the data to be processed.
IBM WatsonX Integration
- Imports IBM Watson Machine Learning modules.
- Gets input values such as prompt text, WatsonX API key, and project ID from NiFi processor properties.
- Configures and calls the IBM WatsonX model to generate a response based on the prompt text.
Output Handling
- Defines output attributes and converts the generated response to JSON format.
Logging and Return
- Logs the prompt text.
- Returns the result of the transformation, indicating success and providing the output data and attributes.
Pre-Packaged Python Processors
NiFi 2.0.0 comes with a diverse set of Python processors that offer a wide range of functionalities.
- VectorDB Interface for Pinecone: This processor facilitates interaction with Pinecone, a vector database service, allowing users to query and store data efficiently.
- ChunkDocument: This processor breaks down large documents into smaller chunks, making them suitable for processing and storage, especially in vector databases where size constraints may apply.
- ParseDocument: This processor seems quite versatile, capable of parsing various document formats like Markdown, PowerPoint, Google Docs and Excel, extracting text content for further processing or storage.
- ConvertCSVtoExcel: As the name suggests, this processor converts data from CSV format to Excel format, providing flexibility in data interchange and processing.
- DetectObjectInImage: This processor appears to leverage deep learning techniques for object detection within images, enabling users to analyze image data and extract valuable insights.
- PromptChatGPT: This processor sounds intriguing—it integrates with ChatGPT or a similar conversational AI model, allowing users to generate responses or engage in conversations based on prompts.
- PutChroma and QueryChroma: These processors are related to Chroma, an open source database for large language models (LLMs). They facilitate data storage (PutChroma) and retrieval/querying (QueryChroma) within a Chroma database or similar system.
Conclusion
Prioritizing Python integration in Apache NiFi marks a significant milestone in bridging the gap between data engineers and data scientists while expanding the platform’s versatility and applicability.
By enabling Python aficionados to develop NiFi components seamlessly in Python, development cycles are streamlined, accelerating the implementation of data pipelines and workflows.
It’s an exciting time for Python processors in NiFi, and contributing to the ecosystem can be immensely valuable. Developing and sharing Python processors can extend NiFi’s capabilities, and address specific use cases.
To get started with NiFi, users can refer to the quickstart guide for development and the NiFi Developer’s Guide for more comprehensive information on contributing to the project.