ruby – 我是否以正确的方式使用eventmachine?
|
我使用
ruby-smpp和redis来实现基于队列的后台工作程序来发送SMPP消息.
我想知道我是否以正确的方式使用eventmachine.它有效,但感觉不对. #!/usr/bin/env ruby
# Sample SMS gateway that can receive MOs (mobile originated messages) and
# DRs (delivery reports),and send MTs (mobile terminated messages).
# MTs are,in the name of simplicity,entered on the command line in the format
# <sender> <receiver> <message body>
# MOs and DRs will be dumped to standard out.
require 'smpp'
require 'redis/connection/hiredis'
require 'redis'
require 'yajl'
require 'time'
LOGFILE = File.dirname(__FILE__) + "/sms_gateway.log"
PIDFILE = File.dirname(__FILE__) + '/worker_test.pid'
Smpp::Base.logger = Logger.new(LOGFILE)
#Smpp::Base.logger.level = Logger::WARN
REDIS = Redis.new
class MbloxGateway
# MT id counter.
@@mt_id = 0
# expose SMPP transceiver's send_mt method
def self.send_mt(sender,receiver,body)
if sender =~ /[a-z]+/i
source_addr_ton = 5
else
source_addr_ton = 2
end
@@mt_id += 1
@@tx.send_mt(('smpp' + @@mt_id.to_s),sender,body,{
:source_addr_ton => source_addr_ton
# :service_type => 1,# :source_addr_ton => 5,# :source_addr_npi => 0,# :dest_addr_ton => 2,# :dest_addr_npi => 1,# :esm_class => 3,# :protocol_id => 0,# :priority_flag => 0,# :schedule_delivery_time => nil,# :validity_period => nil,# :registered_delivery=> 1,# :replace_if_present_flag => 0,# :data_coding => 0,# :sm_default_msg_id => 0
#
})
end
def logger
Smpp::Base.logger
end
def start(config)
# Write this workers pid to a file
File.open(PIDFILE,'w') { |f| f << Process.pid }
# The transceiver sends MT messages to the SMSC. It needs a storage with Hash-like
# semantics to map SMSC message IDs to your own message IDs.
pdr_storage = {}
# Run EventMachine in loop so we can reconnect when the SMSC drops our connection.
loop do
EventMachine::run do
@@tx = EventMachine::connect(
config[:host],config[:port],Smpp::Transceiver,config,self # delegate that will receive callbacks on MOs and DRs and other events
)
# Let the connection start before we check for messages
EM.add_timer(3) do
# Maybe there is some better way to do this. IDK,But it works!
EM.defer do
loop do
# Pop a message
message = REDIS.lpop 'messages:send:queue'
if message # If there is a message. Process it and check the queue again
message = Yajl::Parser.parse(message,:check_utf8 => false) # Parse the message from Json to Ruby hash
if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
self.class.send_mt(message['sender'],message['receiver'],message['body']) # Send the message
REDIS.publish 'log:messages',"#{message['sender']} -> #{message['receiver']}: #{message['body']}" # Push the message to the redis queue so we can listen to the channel
else
REDIS.lpush 'messages:queue',Yajl::Encoder.encode(message)
end
else # If there is no message. Sleep for a second
sleep 1
end
end
end
end
end
sleep 2
end
end
# ruby-smpp delegate methods
def mo_received(transceiver,pdu)
logger.info "Delegate: mo_received: from #{pdu.source_addr} to #{pdu.destination_addr}: #{pdu.short_message}"
end
def delivery_report_received(transceiver,pdu)
logger.info "Delegate: delivery_report_received: ref #{pdu.msg_reference} stat #{pdu.stat}"
end
def message_accepted(transceiver,mt_message_id,pdu)
logger.info "Delegate: message_accepted: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
end
def message_rejected(transceiver,pdu)
logger.info "Delegate: message_rejected: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
end
def bound(transceiver)
logger.info "Delegate: transceiver bound"
end
def unbound(transceiver)
logger.info "Delegate: transceiver unbound"
EventMachine::stop_event_loop
end
end
# Start the Gateway
begin
puts "Starting SMS Gateway. Please check the log at #{LOGFILE}"
# SMPP properties. These parameters work well with the Logica SMPP simulator.
# Consult the SMPP spec or your mobile operator for the correct settings of
# the other properties.
config = {
:host => 'server.com',:port => 3217,:system_id => 'user',:password => 'password',:system_type => 'type',# default given according to SMPP 3.4 Spec
:interface_version => 52,:source_ton => 0,:source_npi => 1,:destination_ton => 1,:destination_npi => 1,:source_address_range => '',:destination_address_range => '',:enquire_link_delay_secs => 10
}
gw = MbloxGateway.new
gw.start(config)
rescue Exception => ex
puts "Exception in SMS Gateway: #{ex} at #{ex.backtrace.join("n")}"
end
解决方法
一些简单的步骤使这个代码更多EventMachine-ish:
>摆脱阻塞的Redis驱动程序,使用em-hiredis 以下是EventMachine的核心代码如何与一些改进相似: def start(config)
File.open(PIDFILE,'w') { |f| f << Process.pid }
pdr_storage = {}
EventMachine::run do
@@tx = EventMachine::connect(
config[:host],self
)
REDIS = EM::Hiredis.connect
pop_message = lambda do
REDIS.lpop 'messages:send:queue' do |message|
if message # If there is a message. Process it and check the queue again
message = Yajl::Parser.parse(message,:check_utf8 => false) # Parse the message from Json to Ruby hash
if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
self.class.send_mt(message['sender'],message['body'])
REDIS.publish 'log:messages',"#{message['sender']} -> #{message['receiver']}: #{message['body']}"
else
REDIS.lpush 'messages:queue',Yajl::Encoder.encode(message)
end
end
EM.next_tick &pop_message
end
end
end
end
不完美,也可以使用一些清理,但这更像是以EventMachine方式应该是这样.没有睡眠,尽可能避免使用延迟,并且不使用可能阻塞的网络驱动程序,通过在下一个反应器循环上重新安排事物来实现传统循环.就Redis而言,差异并不是那么大,但它更像是EventMachine-y这样的方式. 希望这可以帮助.如果您还有疑问,请继续解释. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
