Apache NiFi 已經有一些遠程命令的執行器,如 ExecuteProcess 和 ExecuteStreamCommand。能夠經過操做系統的ssh命令遠程執行腳本,還能夠帶上不一樣的參數。一般,這經過一個基於bash腳本執行器來執行,能夠帶多個ssh命令。html
另一種方法,可使用 ExecuteScript 或者 InvokeScriptedProcessor, 經過第三方庫經過ssh去執行遠程命令。本文中, 我使用 Groovy 做爲腳本語言而且使用 Sshoogr, Sshoogr用於Groovy的 SSH DSL。
Sshoogr 能夠經過 SDK Manager 安裝或下載 (例如從 Maven Central ). 關鍵須要全部的支持庫 (包括Sshoogr 和依賴庫)在同一個目錄中。將 ExecuteScript's Module Directory 屬性指向該目錄, 而後全部的 JARs 都將可以在 Groovy script中使用。
爲了使這個例子更加靈活,我採用了一些常規的作法。首先, 將要執行的commands放在流文件的每一行中。而後, 配置參數 (如hostname, username, password, 以及 port) 做爲動態屬性添加到ExecuteScript。 注意,password 屬性在這裏大小寫不敏感; 若是願意, 使用InvokeScriptedProcessor而且明確地添加這個屬性 (指明 password 大小寫敏感).git
爲了 Sshoogr 可以工做 (至少在這個例子中), 須要遠端節點的 RSA key 配置在NiFi 用戶的 ~/.ssh/known_hosts 文件中。覺得操做系統的差別, 有些狀況下 SSH 鏈接將失敗,主要由於嚴格的 host key 檢查, 所以在 script 咱們將在 Sshoogr中關閉這個功能。github
回到script. 我啓動一個標準的代碼來獲取輸入的 flow file, 而後使用session.read() 得到每一行,保存到一個命令的列表對象中:apache
def flowFile = session.get() if(!flowFile) return def commands = [] // Read in one command per line session.read(flowFile, {inputStream -> inputStream.eachLine { line -> commands << line } } as InputStreamCallback)
而後關閉嚴格host key 檢查:bash
options.trustUnknownHosts = true
而後使用 Sshoogr DSL 去執行命令(上一步中,從 flow file讀取的列表):session
def result = null remoteSession { host = sshHostname.value username = sshUsername.value password = sshPassword.value port = Integer.parseInt(sshPort.value) result = exec(showOutput: true, failOnError: false, command: commands.join('\n')) }
這裏使用前述的 ExecuteScript 動態屬性 (sshHostname, sshUsername, sshPassword, sshPort)。後面, 有一個截屏顯示被加到了ExecuteScript的 配置之中.
這個 exec() 方法是Sshoogr的強大方法. 這裏但願命令表的輸出放到outgoing的flowfile. 設置"failOnError" 爲 false以讓全部的命令都會嘗試執行, 但你也許但願設置爲 true,這樣命令失敗時後續的將再也不執行. 這個最終的參數是"command". 若是你不使用 "named-parameter" (aka Map) 的exec() 版本, 而只是執行命令 commands (如,不設置 "showOutput" ), 那麼 exec() 將接收一個集合:ssh
exec(commands)
在咱們, exec() 方法轉換"command" 參數爲字符串, 替代List "commands", 將其轉換爲換行符分割的字符串,使用 join('\n')進行轉換。
script 的下一部分是複寫傳入的 flow file, 若是我沒有建立一個新的. 若是使用ExecuteScript 建立了一個新的 flow file, 確保移除原來的. 若是使用 InvokeScriptedProcessor, 你須要定義 "original" 關係而且路由原來的 incoming flow file 到新的。maven
flowFile = session.write(flowFile, { outputStream -> result?.output?.eachLine { line -> outputStream.write(line.bytes) outputStream.write('\n'.bytes) } } as OutputStreamCallback)
最後, 使用 result.exitStatus, 咱們確認下路由到 flow file 是成功仍是失敗:oop
session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
這個例子中所有的 script 以下:spa
import static com.aestasit.infrastructure.ssh.DefaultSsh.* def flowFile = session.get() if(!flowFile) return def commands = [] // Read in one command per line session.read(flowFile, {inputStream -> inputStream.eachLine { line -> commands << line } } as InputStreamCallback) options.trustUnknownHosts = true def result = null remoteSession { host = sshHostname.value username = sshUsername.value password = sshPassword.value port = Integer.parseInt(sshPort.value) result = exec(showOutput: true, failOnError: false, command: commands.join('\n')) } flowFile = session.write(flowFile, { outputStream -> result?.output?.eachLine { line -> outputStream.write(line.bytes) outputStream.write('\n'.bytes) } } as OutputStreamCallback) session.transfer(flowFile, result?.exitStatus ? REL_FAILURE : REL_SUCCESS)
搬到 ExecuteScript 的配置中:
這裏我鏈接到 Hortonworks Data Platform (HDP) Sandbox來運行Hadoop commands, 缺省的設置如上所示,我將 script 黏貼進 Script Body, 將 Module Directory 指向個人Sshoogr下載目錄. 我添加這些到一個簡單數據流,該數據流建立了一個 flow file, 填充 commands, 在遠端執行, 而後從session中返回的output輸出到 logs :
這個例子中,我運行下面的兩個命令:
hadoop fs -ls /user echo "Hello World!" > here_i_am.txt
在 log中 (after LogAttribute runs with payload included), 看見輸出以下:
在sandbox,我看見文件已被建立:
如你所見,本文描述了使用 ExecuteScript 採用 Groovy 和 Sshoogr 去執行存儲在輸入流文件中的遠程命令. 我建立了 Gist of the template, 不過這是用 Apache NiFi 1.0的beta版本建立的,有可能沒法在其餘版本中使用(如0.x). 但這裏已經包含了全部的內容,你能夠容易地建立本身的數據流。
原文參考:
http://funnifi.blogspot.jp/2016/08/executing-remote-commands-in-nifi-with.html