NiFi中执行远程命令:ExecuteScript, Groovy和Sshoogr
NiFi中执行远程命令:ExecuteScript,Groovy,和Sshoogr? Apache NiFi 已经有一些远程命令的执行器,如 ExecuteProcess 和 ExecuteStreamCommand。可以通过操作系统的ssh命令远程执行脚本,还可以带上不同的参数。通常,这通过一个基于bash脚本执行器来执行,可以带多个ssh命令。 准备工作另外一种方法,可以使用 ExecuteScript 或者 InvokeScriptedProcessor,通过第三方库通过ssh去执行远程命令。本文中,我使用 Groovy 作为脚本语言并且使用 Sshoogr,Sshoogr用于Groovy的 SSH DSL。 为了 Sshoogr 能够工作 (至少在这个例子中),需要远端节点的 RSA key 配置在NiFi 用户的 ~/.ssh/known_hosts 文件中。以为操作系统的差异,有些情况下 SSH 连接将失败,主要因为严格的 host key 检查,因此在 script 我们将在 Sshoogr中关闭这个功能。 输入文件回到script. 我启动一个标准的代码来获取输入的 flow file,然后使用session.read() 获得每一行,保存到一个命令的列表对象中: def flowFile = session.get() if(!flowFile) return def commands = [] // Read in one command per line session.read(flowFile,{inputStream -> inputStream.eachLine { line -> commands << line } } as InputStreamCallback) 然后关闭严格host key 检查: options.trustUnknownHosts = true 执行脚本然后使用 Sshoogr DSL 去执行命令(上一步中,从 flow file读取的列表): def result = null remoteSession { host = sshHostname.value username = sshUsername.value password = sshPassword.value port = Integer.parseInt(sshPort.value) result = exec(showOutput: true,failOnError: false,command: commands.join('n')) } 这里使用前述的 ExecuteScript 动态属性 (sshHostname,sshUsername,sshPassword,sshPort)。后面,有一个截屏显示被加到了ExecuteScript的 配置之中. exec(commands) 在我们,? exec() 方法转换"command" 参数为字符串,替代List "commands",将其转换为换行符分割的字符串,使用 join('n')进行转换。 flowFile = session.write(flowFile,{ outputStream -> result?.output?.eachLine { line -> outputStream.write(line.bytes) outputStream.write('n'.bytes) } } as OutputStreamCallback) 最后,使用 result.exitStatus,我们确认下路由到 flow file 是成功还是失败: session.transfer(flowFile,result?.exitStatus ? REL_FAILURE : REL_SUCCESS) 全部脚本这个例子中全部的 script 如下: import static com.aestasit.infrastructure.ssh.DefaultSsh.* def flowFile = session.get() if(!flowFile) return def commands = [] // Read in one command per line session.read(flowFile,{inputStream -> inputStream.eachLine { line -> commands << line } } as InputStreamCallback) options.trustUnknownHosts = true def result = null remoteSession { host = sshHostname.value username = sshUsername.value password = sshPassword.value port = Integer.parseInt(sshPort.value) result = exec(showOutput: true,command: commands.join('n')) } flowFile = session.write(flowFile,{ outputStream -> result?.output?.eachLine { line -> outputStream.write(line.bytes) outputStream.write('n'.bytes) } } as OutputStreamCallback) session.transfer(flowFile,result?.exitStatus ? REL_FAILURE : REL_SUCCESS) 搬到 ExecuteScript 的配置中:
这个例子中,我运行下面的两个命令: hadoop fs -ls /user echo "Hello World!" > here_i_am.txt 在 log中 (after LogAttribute runs with payload included),看见输出如下:
原文参考: http://funnifi.blogspot.jp/2016/08/executing-remote-commands-in-nifi-with.html (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |