加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

c# – RabbitMQ出队后丢失正文中的数据

发布时间:2020-12-15 21:39:14 所属栏目:百科 来源:网络整理
导读:我正在使用RabbitMQ处理其他人的项目,并且在出列数据和丢失数据方面遇到了麻烦. 我发布时,数据全部作为字符串存在,并且它也正确地存在于RabbitMQ队列中.当我关闭数据时,数据的数据就像用户ID一样,但其余部分都消失了.我在整个代码中都看了一下,我对RabbitMQ
我正在使用RabbitMQ处理其他人的项目,并且在出列数据和丢失数据方面遇到了麻烦.

我发布时,数据全部作为字符串存在,并且它也正确地存在于RabbitMQ队列中.当我关闭数据时,数据的数据就像用户ID一样,但其余部分都消失了.我在整个代码中都看了一下,我对RabbitMQ正在发生的事情表示相当肯定,当我出局时它就会发生.任何帮助将不胜感激.谢谢.
这是发布之前的代码.

private bool sendJobToMQ(EncodeJobModel job,string p_correlation_id,string p_request_routing_key)
    {
        JavaScriptSerializer ser = new JavaScriptSerializer();
        StringBuilder sb_job = new StringBuilder();
        ser.Serialize(job,sb_job);
        string rpc_reply_queue;

        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = HOST_NAME;
        factory.VirtualHost = VHOST_NAME;
        factory.UserName = USERNAME;
        factory.Password = PASSWORD;
        IConnection rabconn = factory.CreateConnection();
        IModel sender_channel = rabconn.CreateModel();
        try
        {
            sender_channel.ExchangeDeclare(EXCHANGE_NAME,ExchangeType.Direct,true,false,null);
        }
        catch (Exception err)
        {
            logger.Error("Error Declaring Exchange " + EXCHANGE_NAME + ": " + err.ToString());
            return false;
        }
        try
        {
            sender_channel.QueueDeclare(REQUEST_QUEUE,null);
        }
        catch (Exception err)
        {
            logger.Error("Error QueueDeclare (" + REQUEST_QUEUE + " true,null): " + err.ToString());
            return false;
        }
        try
        {
            sender_channel.QueueBind(REQUEST_QUEUE,EXCHANGE_NAME,REQUEST_ROUTING_KEY,null);
        }
        catch (Exception err)
        {
            logger.Error("Error QueueBind (" + REQUEST_QUEUE + " -> " + EXCHANGE_NAME + " " + REQUEST_ROUTING_KEY + ",null): " + err.ToString());
            return false;
        }

        //rpc_reply_queue = sender_channel.QueueDeclare("rq_" + job.encodejob_id.ToString(),null);
        //////bind the rpc reply queue to the exchange via a routing key (I appended _routingkey to signify this)
        //sender_channel.QueueBind(rpc_reply_queue,rpc_reply_queue + "_routingkey");

        //// Not sure what the props object is for yet but you can try to pass null in the mean time - Steve "Apeshit" Han
        BasicProperties props = new BasicProperties();
        props.CorrelationId = p_correlation_id;
        //props.ReplyTo = rpc_reply_queue;

        try
        {
            sender_channel.BasicPublish(EXCHANGE_NAME,props,Encoding.UTF8.GetBytes(sb_job.ToString()));

        }

以及出列的代码.

QueueingBasicConsumer consumer = new QueueingBasicConsumer(p_channel);
        string consumerTag = p_channel.BasicConsume(p_queue,consumer);
        if (_is_console && Environment.UserInteractive)
            Console.WriteLine("Listening...");
        while (m_Listen)
        {
            try
            {
                //get the properties of the message,including the ReplyTo queue,to which we can append '_routingkey' (designated by me),to reply with messages
                BasicDeliverEventArgs e;
                Object message;
                if (!consumer.Queue.Dequeue(4000,out message)) {
                    // we do not wait to indefinitely block on waiting for the queue
                    // if nothing in queue continue loop iteration and wait again
                    continue;
                }

                // cast as necessary back to BasicDeliverEventArgs
                e = (BasicDeliverEventArgs)message;
                IBasicProperties props = e.BasicProperties;
                //get the Correlation ID sent by the client to track the job
                string client_correlation_id = props.CorrelationId;
                // I left out the reply_to field in the wizard,it can be set back in ApiEncodeServiceDefault - Steve "Smurfing Smurf" Han
                //string reply_to = props.ReplyTo;

                //get the body of the request
                byte[] body = e.Body;
                string body_result = Encoding.UTF8.GetString(body);
                bool redelivered = e.Redelivered;

e.Body字符串缺少数据.

解决方法

如果你没有任何消息,为什么继续
最好阻止,直到你收到一条消息,否则这个过程没有意义(没有数据工作?)
试试这样

QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName,null,consumer);
while (m_Listen) {
try {
RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)
consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
// ... process the message
channel.BasicAck(e.DeliveryTag,false);
} catch (OperationInterruptedException ex) {
// The consumer was removed,either through
// channel or connection closure,or through the
// action of IModel.BasicCancel().
break;
}

}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读