加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

通过Scala Process执行Shell脚本且收集返回的结果

发布时间:2020-12-16 09:37:29 所属栏目:安全 来源:网络整理
导读:需求:通过Scala Process执行Shell脚本且收集返回的结果 package tools import java.util.concurrent.LinkedBlockingQueue import scala.sys.process.{Process,ProcessBuilder,ProcessLogger} /** * * 收集Linux脚本命令返回的结果 */ object StreamProcessL

需求:通过Scala Process执行Shell脚本且收集返回的结果

package tools

import java.util.concurrent.LinkedBlockingQueue

import scala.sys.process.{Process,ProcessBuilder,ProcessLogger}

/** * * 收集Linux脚本命令返回的结果 */
object StreamProcessLogger {
  private val nonzeroException = true

  def run(processBuilder: ProcessBuilder): (Process,Stream[String]) = {
    val logger = new StreamProcessLogger
    val process = processBuilder.run(logger)
    waitForExitInAnotherThread(process,logger)
    (process,logger.stream)
  }

  private def waitForExitInAnotherThread(process: Process,logger: StreamProcessLogger) = {
    val thread = new Thread() {
      override def run() = {
        logger.setExitCode(process.exitValue())
      }
    }
    thread.start()
  }
}

private class StreamProcessLogger extends ProcessLogger {
  val queue = new LinkedBlockingQueue[Either[Int,String]]

  override def buffer[T](f: => T): T = f

  override def out(s: => String): Unit = queue.put(Right(s))

  override def err(s: => String): Unit = queue.put(Right(s))

  def stream = next()

  def setExitCode(exitCode: Int) = queue.put(Left(exitCode))

  private def next(): Stream[String] = queue.take match {
    case Left(0) => Stream.empty
    case Left(code) => if (StreamProcessLogger.nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
    case Right(s) => Stream.cons(s,next())
  }
}

object StreamProcessLoggerHelp {
  def executeCommand(command: String,name: String) {
    if (name == "hive") {
      val (process,stream) = StreamProcessLogger.run(Process(Seq("hive","-e",command)))
      stream.foreach(info => println("hive==>" + info))
    } else {
      val (process,stream) = StreamProcessLogger.run(Process(command))
      stream.foreach(info => println("info==>" + info))
    }
  }

  def main(args: Array[String]): Unit = {

    //hive的执行
// StreamProcessLoggerHelp.executeCommand("drop table graph","hive")

    //执行脚本,比如shell脚本,Spark脚本等
// StreamProcessLoggerHelp.executeCommand("ls /Users/huiyu/ideaProject/graph","shell")
    StreamProcessLoggerHelp.executeCommand("spark-shell","shell")
  }
}

结果展示

这里写图片描述

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读