NiFi 腳本執行器使用指南 (part 2)

NiFi 腳本執行器使用指南 (part 2)

簡介

ExecuteScript讓NiFi能夠執行腳原本完成數據流程任務,從而能夠編寫本身的任務節點而不只僅是採用已有的任務節點,具備很強的靈活性。html

本文是介紹使用ExecuteScript來完成任務的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的「菜譜」 包括:java

Part 1 - 介紹 NiFi API 和 FlowFilesapache

  • 從incoming queue獲得flow file
  • 建立一個新的 flow files
  • 與 flow file attributes一塊兒工做
  • 轉換 flow file
  • 日誌 Logging

Part 2 - FlowFile I/O 和 Error Handlingruby

  • 從 flow file 中讀取
  • 寫入 flow file
  • 讀/寫 flow file
  • Error Handling

Part 3 - 高級特徵session

  • 使用動態屬性
  • 添加模塊
  • 狀態管理
  • 存取控制器服務

FlowFile I/O簡介

NiFi 的 Flow files 由兩個主要部件組成:attributes 和 content. Attributes 是關於 content / flow file的元數據, 咱們在 Part 1 看到了如何使用 ExecuteScript 來操縱這個屬性. flow file 的內容, 核心是一個 bytes集合,沒有繼承的 structure, schema, format, 等等. 不一樣的 NiFi processors 假定輸入的 flow files 具備特定的 schema/format (或者從 attributes肯定如 "mime.type" 或者經過其餘的方法). 這些 processors 而後按照假定的格式對內容進行處理 (將返回 "failure" 到relationship,若是不是的話). 常常 processors 將輸出  flow files 以特定的格式, 這在 processors' 描述中有相應的說明( NiFi documentation).app

flow files 的 Input 和 Output (I/O) 經過 ProcessSession API 提供,經過 ExecuteScript (查看 Part 1 獲得更多的信息) 的"session" 變量來訪問。一個機制是傳遞一個 callback 對象到session.read() 或 session.write()的調用。對於FlowFile將建立一個 InputStream 和/或 OutputStream, 這個callback 對象將被激活,使用相應的 callback 接口, 而後這個InputStream 和/或 OutputStream 的引用被傳遞到 callback函數使用. 這裏有三個 callback 接口, 每個有本身的應用環境:函數

InputStreamCallbackspa

這個 interface 用在 session.read( flowFile, inputStreamCallback) 方法中,提供一個 InputStream,用於讀取 flow file的內容. 該 interface 有一個單一方法:.net

void process(InputStream in) throws IOException

該 interface 提供一個被管理的 input stream. 這個input stream自動打開和關閉,也能夠手動關閉. 這是從 flow file讀取的方法, 而且不能被寫回去。日誌

一個例子就是當你但願處理一個輸入 flow file, 可是建立了多個輸出output flow files, 好比 SplitText processor 那樣.

OutputStreamCallback

該 interface 被用於session.write( flowFile, outputStreamCallback) 方法,提供 OutputStream寫入內容到 flow file. 該 interface 具備單一的方法:

void process(OutputStream out) throws IOException

該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關閉,也能夠手動關閉。 - 重要的一點是,若是任何 streams 包裝了這個 streams,全部打開的資源應該被清理.

例如, 在ExecuteScript中被建立數據 , 來自於外部文件, 而不是一個 flow file. 而後你可使用 session.create() 去建立一個新的FlowFile, 而後 session.write( flowFile, outputStreamCallback) 用於添加內容.

StreamCallback

該 interface 用於 session.write( flowFile, streamCallback) 方法,提供 InputStream 和 OutputStream,爲 flow file提供內容的讀取和寫入. 該 interface 有一個單一的方法:

void process(InputStream in, OutputStream out) throws IOException

該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關閉,也能夠手動關閉。 - 重要的一點是,若是任何 streams 包裝了這個 streams,全部打開的資源應該被清理.

例如,你想處理一個傳入 flow file 而且使用新的內容覆蓋, 如 EncryptContent processor 那樣.

 

由於這些  callbacks 是 Java objects, 腳本將建立一個而且傳入 session 方法, 下面的方法將使用不一樣的腳本語言進行演示. 而且,這裏還有其餘的讀寫 flow files方法, 包括:

  • 使用 session.read(flowFile) 返回 InputStream. 取代 InputStreamCallback, 將返回 InputStream 用於讀取. 你必須 (close, e.g.) 手動管理 InputStream.
  • 使用 session.importFrom(inputStream, flowFile) 從 InputStream 寫入到 FlowFile. 這將替代 藉助OutputStreamCallback的session.write() 的使用.

嗯,下面是一些具體的方法:

Recipes

Recipe: 在callback中讀取輸入flow file的內容

Use Case: 具備一個傳入鏈接執行 ExecuteScript ,而且從隊列中獲得 flow file 的內容進行處理.

Approach: 使用session的read(flowFile, inputStreamCallback) 方法。一個 InputStreamCallback 對象須要被傳入 read() 方法. 注意到,由於InputStreamCallback 是一個對象, 內容只在該對象中可見。 若是你須要在 read() 方法以外訪問, 須要使用更爲全局化的變量. 這裏的例子講來自flow file的所有內容存儲到 String (使用 Apache Commons' IOUtils class).

注意: 對於大的 flow files, 這並非最好的技術方法; 應該只讀取須要的數據,並按照適應的方法處理。好比 SplitText, 你應該一次讀一行而且在 InputStreamCallback中處理, 或者 session.read(flowFile) 方法 獲得 InputStream 的引用,從而在 callback以外處理.

Examples:

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()

if(!flowFile)return

def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

// Do something with text here
} as InputStreamCallback)

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
    pass
def process(self, inputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

# Do something with text here
# end class

flowFile = session.get()
if(flowFile != None):
session.read(flowFile, PyInputStreamCallback())
# implicit return at the end

Javascript

var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

var flowFile = session.get();

if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,new InputStreamCallback(function(inputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        // Do something with text here
    }));
}

JRuby

java_import org.apache.commons.io.IOUtils
java_import org.apache.nifi.processor.io.InputStreamCallback

# Define a subclass of InputStreamCallback for use in session.read()
class JRubyInputStreamCallback
include InputStreamCallback

def process(inputStream)
    text = IOUtils.toString(inputStream)
    # Do something with text here
    end
end

jrubyInputStreamCallback = JRubyInputStreamCallback.new
flowFile = session.get()
if flowFile != nil
    session.read(flowFile, jrubyInputStreamCallback)
end

 

Recipe: 使用callback寫入內容到輸出 flow file

Use Case: 爲輸出的 flow file建立內容.

Approach: 使用session的write(flowFile, outputStreamCallback) 方法。一個OutputStreamCallback 對象須要傳遞給 write() 方法. 注意,由於 OutputStreamCallback 是一個對象, 所以內容之災對象內部可見. 若是你須要在 write() 方法以外訪問, 使用更爲全局化變量. 西面的例子寫入 String 到 flowFile.

Examples:

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile) return

def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
        outputStream.write(text.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback

# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
    pass

def process(self, outputStream):
    outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class

flowFile = session.get()

if(flowFile != None):
    flowFile = session.write(flowFile, PyOutputStreamCallback())
# implicit return at the end

Javascript

var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();

if(flowFile != null) {
// Create a new OutputStreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,new OutputStreamCallback(function(outputStream) {
        outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
    }));
}

JRuby

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.OutputStreamCallback

# Define a subclass of OutputStreamCallback for use in session.write()
class JRubyOutputStreamCallback
include OutputStreamCallback

def process(outputStream)
    outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8))
end
end

jrubyOutputStreamCallback = JRubyOutputStreamCallback.new
flowFile = session.get()
if flowFile != nil
    flowFile = session.write(flowFile, jrubyOutputStreamCallback)
end

 

Recipe: 使用回調進行內容覆蓋輸入 flow file

Use Case: 重用輸入 flow file可是但願修改內容並傳遞到輸出的 flow file.

Approach: 使用session的write(flowFile, streamCallback) 方法。一個StreamCallback 對象須要傳遞給 write() 方法. StreamCallback 同時提供了InputStream (從輸入的 flow file) 和 outputStream (下一版本的 flow file), 所以你可使用InputStream去取得 flow file的當前內容, 而後修改他們而且寫會到 flow file. 這將覆蓋 flow file 的內容, 所以對於追加內容要採用讀入內容添加的方式, 或者使用不一樣的方法 (使用 session.append() 而不是session.write() ).

注意,由於 StreamCallback 是一個對象, 所以內容之災對象內部可見. 若是你須要在 write() 方法以外訪問, 使用更爲全局化變量.

這個例子將反轉輸入flowFile (假定爲 String) 的內容,並將反轉後的字符串寫入到新版的 flowFile.

Examples:

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile) return

def text = 'Hello world!'
// Cast a closure with an inputStream and outputStream parameter to StreamCallback

flowFile = session.write(flowFile, {inputStream, outputStream ->
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass

def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
# end class

flowFile = session.get()

if(flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())

# implicit return at the end

Javascript

var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();

if(flowFile != null) {
// Create a new StreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,new StreamCallback(function(inputStream, outputStream) {
    var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
    }));
}

JRuby

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.StreamCallback

# Define a subclass of StreamCallback for use in session.write()

class JRubyStreamCallback
include StreamCallback
def process(inputStream, outputStream)
    text = IOUtils.toString(inputStream)
    outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8))
end
end

jrubyStreamCallback = JRubyStreamCallback.new
flowFile = session.get()
if flowFile != nil
    flowFile = session.write(flowFile, jrubyStreamCallback)
end

 

Recipe: 處理腳本處理期間的錯誤

Use Case: 在 script ( data validation 或者出現一個 exception)運行時出現錯誤, 而且你但願可以優雅滴處理.

Approach: 對於exceptions, 使用腳本語言的exception-handling 機制  (通常是try/catch 代碼塊). 對於 data validation, 可使用相似的方法, 可是定義一個boolean 變量,如 "valid" 以及 if/else 語句,而不是try/catch 語句. ExecuteScript 定義了 "success" and "failure" relationships; 通常狀況下,你的處理將轉移 "good" flow files 到 success,而 "bad" flow files 到 failure (記錄錯誤在後續的操做中).

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return
try {
    // Something that might throw an exception here
    // Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
}

Jython

flowFile = session.get()

if(flowFile != None):
try:
    # Something that might throw an exception here
    # Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
except:
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)

# implicit return at the end

Javascript

var flowFile = session.get();
if(flowFile != null) {
try {
    // Something that might throw an exception here
    // Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
}
}

JRuby

flowFile = session.get()

if flowFile != nil
begin
    # Something that might raise an exception here
    # Last operation is transfer to success (failures handled in the rescue block)
    session.transfer(flowFile, REL_SUCCESS)
    rescue Exception => e
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
end
end

但願這裏介紹的基本的FlowFile I/O 和 錯誤處理有用, 歡迎任何的建議和改進! 在下一篇文章中,我將討論一些高級的特徵,如動態屬性, 模塊, 狀態管理, 和 Controller Services的存取. 但願對你有用!

英:https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

相關文章
相關標籤/搜索