加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

ruby – 如何在同一个EventMachine反应器中运行Net :: SSH和AMQP

发布时间:2020-12-17 01:28:04 所属栏目:百科 来源:网络整理
导读:一些背景: Gerrit exposes an event stream through SSH.这是一个可爱的技巧,但我需要将这些事件转换为AMQP消息.我试图用 ruby-amqp和 Net::SSH做到这一点,但是,似乎AMQP子组件甚至根本不运行. 我是EventMachine的新手.有人可以指出我做错了什么吗? “Mult
一些背景: Gerrit exposes an event stream through SSH.这是一个可爱的技巧,但我需要将这些事件转换为AMQP消息.我试图用 ruby-amqp和 Net::SSH做到这一点,但是,似乎AMQP子组件甚至根本不运行.

我是EventMachine的新手.有人可以指出我做错了什么吗? “Multiple servers in a single EventMachine reactor”的答案似乎并不适用.该程序也在gist中提供,以便于访问:

#!/usr/bin/env ruby                                                                                                                                            

require 'rubygems'
require 'optparse'
require 'net/ssh'
require 'json'
require 'yaml'
require 'amqp'
require 'logger'

trap(:INT) { puts; exit }

options = {
  :logs => 'kili.log',:amqp => {
    :host => 'localhost',:port => '5672',},:ssh => {
    :host => 'localhost',:port => '22',:user => 'nobody',:keys => '~/.ssh/id_rsa',}
}
optparse = OptionParser.new do|opts|
  opts.banner = "Usage: kili [options]"
  opts.on( '--amqp_host HOST','The AMQP host kili will connect to.') do |a|
    options[:amqp][:host] = a
  end
  opts.on( '--amqp_port PORT','The port for the AMQP host.') do |ap|
    options[:amqp][:port] = ap
  end
  opts.on( '--ssh_host HOST','The SSH host kili will connect to.') do |s|
    options[:ssh][:host] = s
  end
  opts.on( '--ssh_port PORT','The SSH port kili will connect on.') do |sp|
    options[:ssh][:port] = sp
  end
  opts.on( '--ssh_keys KEYS','Comma delimeted SSH keys for user.') do |sk|
    options[:ssh][:keys] = sk
  end
  opts.on( '--ssh_user USER','SSH user for host.') do |su|
    options[:ssh][:user] = su
  end
  opts.on( '-l','--log LOG','The log location of Kili') do |log|
    options[:logs] = log
  end
  opts.on( '-h','--help','Display this screen' ) do
    puts opts
    exit
  end
end


optparse.parse!
log = Logger.new(options[:logs])
log.level = Logger::INFO

amqp = options[:amqp]
sshd = options[:ssh]
queue= EM::Queue.new

EventMachine.run do

  AMQP.connect(:host => amqp[:host],:port => amqp[:port]) do |connection|
    log.info "Connected to AMQP at #{amqp[:host]}:#{amqp[:port]}"
    channel = AMQP::Channel.new(connection)
    exchange = channel.topic("traut",:auto_delete => true)

    queue.pop do |msg|
      log.info("Pulled #{msg} out of queue.")
      exchange.publish(msg[:data],:routing_key => msg[:route]) do
        log.info("On route #{msg[:route]} published:n#{msg[:data]}")
      end
    end
  end


  Net::SSH.start(sshd[:host],sshd[:user],:port => sshd[:port],:keys => sshd[:keys].split(',')) do |ssh|
    log.info "SSH connection to #{sshd[:host]}:#{sshd[:port]} as #{sshd[:user]} made."

    channel = ssh.open_channel do |ch|
      ch.exec "gerrit stream-events" do |ch,success|
        abort "could not stream gerrit events" unless success

        # "on_data" is called when the process writes something to                                                                                             
        # stdout                                                                                                                                               
        ch.on_data do |c,data|
          json = JSON.parse(data)
          if json['type'] == 'change-merged'
            project = json['change']['project']
            route = "com.carepilot.event.code.review.#{project}"
            msg = {:data => data,:route => route}
            queue.push(msg)
            log.info("Pushed #{msg} into queue.")
          else
            log.info("Ignoring event of type #{json['type']}")
          end
        end


    # "on_extended_data" is called when the process writes                                                                                                 
    # something to stderr                                                                                                                                  
    ch.on_extended_data do |c,type,data|
          log.error(data)
    end

    ch.on_close { log.info('Connection closed') }
      end
    end  
  end  

end

解决方法

Net :: SSH不是异步的,因此您的EventMachine.run()永远不会到达块的末尾,因此永远不会恢复反应器线程.这会导致AMQP代码永远不会启动.我建议在另一个线程中运行您的SSH代码.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读