ExecuteScript讓NiFi能夠執行腳原本完成數據流程任務,從而能夠編寫本身的任務節點而不只僅是採用已有的任務節點,具備很強的靈活性。html
本文是介紹使用ExecuteScript來完成任務的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的「菜譜」 包括:java
Part 1 - 介紹 NiFi API 和 FlowFilesapache
Part 2 - FlowFile I/O 和 Error Handlingruby
Part 3 - 高級特徵session
NiFi 的 Flow files 由兩個主要部件組成:attributes 和 content. Attributes 是關於 content / flow file的元數據, 咱們在 Part 1 看到了如何使用 ExecuteScript 來操縱這個屬性. flow file 的內容, 核心是一個 bytes集合,沒有繼承的 structure, schema, format, 等等. 不一樣的 NiFi processors 假定輸入的 flow files 具備特定的 schema/format (或者從 attributes肯定如 "mime.type" 或者經過其餘的方法). 這些 processors 而後按照假定的格式對內容進行處理 (將返回 "failure" 到relationship,若是不是的話). 常常 processors 將輸出 flow files 以特定的格式, 這在 processors' 描述中有相應的說明( NiFi documentation).app
flow files 的 Input 和 Output (I/O) 經過 ProcessSession API 提供,經過 ExecuteScript (查看 Part 1 獲得更多的信息) 的"session" 變量來訪問。一個機制是傳遞一個 callback 對象到session.read() 或 session.write()的調用。對於FlowFile將建立一個 InputStream 和/或 OutputStream, 這個callback 對象將被激活,使用相應的 callback 接口, 而後這個InputStream 和/或 OutputStream 的引用被傳遞到 callback函數使用. 這裏有三個 callback 接口, 每個有本身的應用環境:函數
InputStreamCallbackspa
這個 interface 用在 session.read( flowFile, inputStreamCallback) 方法中,提供一個 InputStream,用於讀取 flow file的內容. 該 interface 有一個單一方法:.net
void process(InputStream in) throws IOException
該 interface 提供一個被管理的 input stream. 這個input stream自動打開和關閉,也能夠手動關閉. 這是從 flow file讀取的方法, 而且不能被寫回去。日誌
一個例子就是當你但願處理一個輸入 flow file, 可是建立了多個輸出output flow files, 好比 SplitText processor 那樣.
OutputStreamCallback
該 interface 被用於session.write( flowFile, outputStreamCallback) 方法,提供 OutputStream寫入內容到 flow file. 該 interface 具備單一的方法:
void process(OutputStream out) throws IOException
該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關閉,也能夠手動關閉。 - 重要的一點是,若是任何 streams 包裝了這個 streams,全部打開的資源應該被清理.
例如, 在ExecuteScript中被建立數據 , 來自於外部文件, 而不是一個 flow file. 而後你可使用 session.create() 去建立一個新的FlowFile, 而後 session.write( flowFile, outputStreamCallback) 用於添加內容.
StreamCallback
該 interface 用於 session.write( flowFile, streamCallback) 方法,提供 InputStream 和 OutputStream,爲 flow file提供內容的讀取和寫入. 該 interface 有一個單一的方法:
void process(InputStream in, OutputStream out) throws IOException
該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關閉,也能夠手動關閉。 - 重要的一點是,若是任何 streams 包裝了這個 streams,全部打開的資源應該被清理.
例如,你想處理一個傳入 flow file 而且使用新的內容覆蓋, 如 EncryptContent processor 那樣.
由於這些 callbacks 是 Java objects, 腳本將建立一個而且傳入 session 方法, 下面的方法將使用不一樣的腳本語言進行演示. 而且,這裏還有其餘的讀寫 flow files方法, 包括:
嗯,下面是一些具體的方法:
Use Case: 具備一個傳入鏈接執行 ExecuteScript ,而且從隊列中獲得 flow file 的內容進行處理.
Approach: 使用session的read(flowFile, inputStreamCallback) 方法。一個 InputStreamCallback 對象須要被傳入 read() 方法. 注意到,由於InputStreamCallback 是一個對象, 內容只在該對象中可見。 若是你須要在 read() 方法以外訪問, 須要使用更爲全局化的變量. 這裏的例子講來自flow file的所有內容存儲到 String (使用 Apache Commons' IOUtils class).
注意: 對於大的 flow files, 這並非最好的技術方法; 應該只讀取須要的數據,並按照適應的方法處理。好比 SplitText, 你應該一次讀一行而且在 InputStreamCallback中處理, 或者 session.read(flowFile) 方法 獲得 InputStream 的引用,從而在 callback以外處理.
Examples:
Groovy
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile)return def text = '' // Cast a closure with an inputStream parameter to InputStreamCallback session.read(flowFile, {inputStream -> text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) // Do something with text here } as InputStreamCallback)
Jython
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import InputStreamCallback # Define a subclass of InputStreamCallback for use in session.read() class PyInputStreamCallback(InputStreamCallback): def __init__(self): pass def process(self, inputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) # Do something with text here # end class flowFile = session.get() if(flowFile != None): session.read(flowFile, PyInputStreamCallback()) # implicit return at the end
Javascript
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback") var IOUtils = Java.type("org.apache.commons.io.IOUtils") var StandardCharsets = Java.type("java.nio.charset.StandardCharsets") var flowFile = session.get(); if(flowFile != null) { // Create a new InputStreamCallback, passing in a function to define the interface method session.read(flowFile,new InputStreamCallback(function(inputStream) { var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // Do something with text here })); }
JRuby
java_import org.apache.commons.io.IOUtils java_import org.apache.nifi.processor.io.InputStreamCallback # Define a subclass of InputStreamCallback for use in session.read() class JRubyInputStreamCallback include InputStreamCallback def process(inputStream) text = IOUtils.toString(inputStream) # Do something with text here end end jrubyInputStreamCallback = JRubyInputStreamCallback.new flowFile = session.get() if flowFile != nil session.read(flowFile, jrubyInputStreamCallback) end
Use Case: 爲輸出的 flow file建立內容.
Approach: 使用session的write(flowFile, outputStreamCallback) 方法。一個OutputStreamCallback 對象須要傳遞給 write() 方法. 注意,由於 OutputStreamCallback 是一個對象, 所以內容之災對象內部可見. 若是你須要在 write() 方法以外訪問, 使用更爲全局化變量. 西面的例子寫入 String 到 flowFile.
Examples:
Groovy
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile) return def text = 'Hello world!' // Cast a closure with an outputStream parameter to OutputStreamCallback flowFile = session.write(flowFile, {outputStream -> outputStream.write(text.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
Jython
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import OutputStreamCallback # Define a subclass of OutputStreamCallback for use in session.write() class PyOutputStreamCallback(OutputStreamCallback): def __init__(self): pass def process(self, outputStream): outputStream.write(bytearray('Hello World!'.encode('utf-8'))) # end class flowFile = session.get() if(flowFile != None): flowFile = session.write(flowFile, PyOutputStreamCallback()) # implicit return at the end
Javascript
var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new OutputStreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile,new OutputStreamCallback(function(outputStream) { outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8)) })); }
JRuby
java_import org.apache.commons.io.IOUtils java_import java.nio.charset.StandardCharsets java_import org.apache.nifi.processor.io.OutputStreamCallback # Define a subclass of OutputStreamCallback for use in session.write() class JRubyOutputStreamCallback include OutputStreamCallback def process(outputStream) outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8)) end end jrubyOutputStreamCallback = JRubyOutputStreamCallback.new flowFile = session.get() if flowFile != nil flowFile = session.write(flowFile, jrubyOutputStreamCallback) end
Use Case: 重用輸入 flow file可是但願修改內容並傳遞到輸出的 flow file.
Approach: 使用session的write(flowFile, streamCallback) 方法。一個StreamCallback 對象須要傳遞給 write() 方法. StreamCallback 同時提供了InputStream (從輸入的 flow file) 和 outputStream (下一版本的 flow file), 所以你可使用InputStream去取得 flow file的當前內容, 而後修改他們而且寫會到 flow file. 這將覆蓋 flow file 的內容, 所以對於追加內容要採用讀入內容添加的方式, 或者使用不一樣的方法 (使用 session.append() 而不是session.write() ).
注意,由於 StreamCallback 是一個對象, 所以內容之災對象內部可見. 若是你須要在 write() 方法以外訪問, 使用更爲全局化變量.
這個例子將反轉輸入flowFile (假定爲 String) 的內容,並將反轉後的字符串寫入到新版的 flowFile.
Examples:
Groovy
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile) return def text = 'Hello world!' // Cast a closure with an inputStream and outputStream parameter to StreamCallback flowFile = session.write(flowFile, {inputStream, outputStream -> text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8)) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS)
Jython
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8'))) # end class flowFile = session.get() if(flowFile != None): flowFile = session.write(flowFile, PyStreamCallback()) # implicit return at the end
Javascript
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback"); var IOUtils = Java.type("org.apache.commons.io.IOUtils"); var StandardCharsets = Java.type("java.nio.charset.StandardCharsets"); var flowFile = session.get(); if(flowFile != null) { // Create a new StreamCallback, passing in a function to define the interface method flowFile = session.write(flowFile,new StreamCallback(function(inputStream, outputStream) { var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8)) })); }
JRuby
java_import org.apache.commons.io.IOUtils java_import java.nio.charset.StandardCharsets java_import org.apache.nifi.processor.io.StreamCallback # Define a subclass of StreamCallback for use in session.write() class JRubyStreamCallback include StreamCallback def process(inputStream, outputStream) text = IOUtils.toString(inputStream) outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8)) end end jrubyStreamCallback = JRubyStreamCallback.new flowFile = session.get() if flowFile != nil flowFile = session.write(flowFile, jrubyStreamCallback) end
Use Case: 在 script ( data validation 或者出現一個 exception)運行時出現錯誤, 而且你但願可以優雅滴處理.
Approach: 對於exceptions, 使用腳本語言的exception-handling 機制 (通常是try/catch 代碼塊). 對於 data validation, 可使用相似的方法, 可是定義一個boolean 變量,如 "valid" 以及 if/else 語句,而不是try/catch 語句. ExecuteScript 定義了 "success" and "failure" relationships; 通常狀況下,你的處理將轉移 "good" flow files 到 success,而 "bad" flow files 到 failure (記錄錯誤在後續的操做中).
Examples:
Groovy
flowFile = session.get() if(!flowFile) return try { // Something that might throw an exception here // Last operation is transfer to success (failures handled in the catch block) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) }
Jython
flowFile = session.get() if(flowFile != None): try: # Something that might throw an exception here # Last operation is transfer to success (failures handled in the catch block) session.transfer(flowFile, REL_SUCCESS) except: log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) # implicit return at the end
Javascript
var flowFile = session.get(); if(flowFile != null) { try { // Something that might throw an exception here // Last operation is transfer to success (failures handled in the catch block) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) } }
JRuby
flowFile = session.get() if flowFile != nil begin # Something that might raise an exception here # Last operation is transfer to success (failures handled in the rescue block) session.transfer(flowFile, REL_SUCCESS) rescue Exception => e log.error('Something went wrong', e) session.transfer(flowFile, REL_FAILURE) end end
但願這裏介紹的基本的FlowFile I/O 和 錯誤處理有用, 歡迎任何的建議和改進! 在下一篇文章中,我將討論一些高級的特徵,如動態屬性, 模塊, 狀態管理, 和 Controller Services的存取. 但願對你有用!
英:https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html