加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > Windows > 正文

windows – 为什么ZeroMQ PGM组播接收卡在中间并且没有进一步接

发布时间:2020-12-14 05:39:01 所属栏目:Windows 来源:网络整理
导读:ZeroMQ(版本 – zeromq-4.1.6)PGM组播数据包接收卡在中间,甚至Sender仍然发送数据包没有任何问题. 如果我们重新启动Receiver,应用程序现在会收到数据包,但它不是解决方案.我在Senderamp ;;中尝试了各种ZMQ_RATE.接收方. 问题: 发送者使用以下套接字选项发送
ZeroMQ(版本 – zeromq-4.1.6)PGM组播数据包接收卡在中间,甚至Sender仍然发送数据包没有任何问题.

如果我们重新启动Receiver,应用程序现在会收到数据包,但它不是解决方案.我在Sender&amp ;;中尝试了各种ZMQ_RATE.接收方.

问题:

发送者使用以下套接字选项发送近300,000个数据包,但接收器卡在&之间.没有收到所有的数据包.如果我们添加Sleep(2) – 在每次发送中等待2 ms,有时我们会收到所有数据包,但它需要更多时间.

环境设置:

(发送器和接收器使用D-Link交换机在单个子网内连接.介质速度为1Gbps)

Sender: JZMQ ( ZMQ C library,openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
Packet size - 1024 bytes
ZMQ_RECOVERY_IVL - 2 Minutes
Send Flag - 0 ( blocking mode )
Sleep( 2ms ) - sometimes its working without any issue but taking more time for transfer.
Platform - Windows

Receiver: ZMQ C++ ( ZMQ C library,openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
ZMQ_RCVTIMEO - 3 Secs
receive Flag - 0 ( blocking mode )
Platform - Windows

可能是什么问题?

ZeroMQ PGM-multicast是不是一个稳定的库?

JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm://local_IP;239.255.0.20:30001");

byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
    socket.send(bytesToSend,0);
    count++;
}

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"


int main(int argc,char* argv[]) {
    try {

         zmq::context_t context(1);

      // Socket to talk to server
         printf ("Connecting to server...");

         zmq::socket_t *s1 = new zmq::socket_t(context,ZMQ_SUB);

         int recvTimeout = 3000;
         s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));

         int recvRate = 80000;
         s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));

         int recsec = 60 * 60;
      // s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec));

         s1->connect("pgm://local_IP;239.255.0.20:30001");

         s1->setsockopt (ZMQ_SUBSCRIBE,NULL,0);

         printf ("done. n");
         int seq=0;
         while(true) {

               zmq::message_t msgbuff;

               int ret = s1->recv(&msgbuff,0);
               if(!ret)
               {
                   printf ("Received not received timeoutn");
                   continue;
               }

               printf ("Seq(%d) Received data size=%dn",seq,msgbuff.size());
               ++seq;
         }
    }
    catch( zmq::error_t &e )   {
           printf ("An error occurred: %sn",e.what());
           return 1;
    }
    catch( std::exception &e ) {
           printf ("An error occurred: %sn",e.what());
           return 1;
    }
    return 0;
}

解决方法

PGM是否稳定?仅供参考:从2.1.1开始工作,今天我们有稳定的4.2.

这不是一个好习惯.我敢于指责图书馆维护人员在发布图书馆之前没有彻底测试过PGM / EPGM,或者在应用程序设计得到充分理解,设计稳健,诊断良好之前随时在开发中做不好的工作.性能/延迟测试在实际部署生态系统的现实检查,通常由{localhost |家庭子网|远程网络| remote-host(s)}.

[PUB] – 发送部分需要得到应有的注意:

如果不出意外,文档的这一部分就是警告并响起所有的钟声和响声.吹嘘所有口哨,如果在一些模拟SLOC中进行不充分的资源管理,而对于非阻塞,超快速循环的残酷尝试确实存在应有的谨慎:

?MQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM messages,and the actual limit may be as much as 60-70% lower depending on the flow of messages on the socket.

所以,可能是对的,你的[PUB] -sender丢失了丢失的消息,然后再将这些消息发送到线路上.

下一个警告来自O / S权限:

The pgm transport implementation requires access to raw IP sockets. Additional privileges may be required on some operating systems for this operation. Applications not requiring direct interoperability with other PGM implementations are encouraged to use the epgm transport instead which does not require any special privileges.

接下来是[SUB] -receiver:

一些更多的调整将有助于嗅探[PUB] -sender,类似于下面提出的[SUB] -receiver的内联状态/跟踪工具:

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//                          MODs: https://stackoverflow.com/q/44526517/3666197

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"

#include <chrono>                                                       // since C++ 11
typedef std::chrono::high_resolution_clock              nanoCLK;

#define ZMQ_IO_THREAD_POOL_SIZE                         8

#define ZMQ_AFINITY_PLAIN_ROUNDROBIN_UNMANAGED_RISKY    0
#define ZMQ_AFINITY_LO_PRIO_POOL                        0 | 1
#define ZMQ_AFINITY_HI_PRIO_POOL                        0 | 0 | 2
#define ZMQ_AFINITY_MC_EPGM_POOL                        0 | 0 | 0 | 4 | 8 | 0 | 0 | 64 | 128


int main( int argc,char* argv[] ) {

    auto RECV_start = nanoCLK::now();
    auto RECV_ret   = nanoCLK::now();
    auto RECV_last  = nanoCLK::now();
    auto TEST_start = nanoCLK::now();

    try {
           zmq::context_t context( ZMQ_IO_THREAD_POOL_SIZE );           printf ( "Connecting to server..." );
           int            major,minor,patch;
           zmq::version( &major,&minor,&patch );                      printf ( "Using ZeroMQ( %d.%d.%d )",major,patch );

           zmq::socket_t *s1 = new zmq::socket_t( context,ZMQ_SUB );   // Socket to talk to server

           int zmqLinger   =       0,// [  ms]
               zmqAffinity =       0,// [   #]  mapper bitmap-onto-IO-thread-Pool (ref. #define-s above )

               recvBuffer  =       2 * 123456,// [   B]
               recvMaxSize =    9876,// [   B]
               recvHwMark  =  123456,// [   #]  max number of MSGs allowed to be Queued per connected Peer

               recvRate    =   80000 * 10,// [kbps]
               recvTimeout =    3000,// [  ms]  before ret EAGAIN { 0: NO_BLOCK | -1: INF | N: wait [ms] }
               recoverMSEC =      60 * 60      // [  ms]
               ;

           s1->setsockopt ( ZMQ_AFFINITY,&zmqAffinity,sizeof(int) );
           s1->setsockopt ( ZMQ_LINGER,&zmqLinger,sizeof(int) );
           s1->setsockopt ( ZMQ_MAXMSGSIZE,&recvMaxSize,sizeof(int) );
           s1->setsockopt ( ZMQ_RCVBUF,&recvBuffer,sizeof(int) );
           s1->setsockopt ( ZMQ_RCVHWM,&recvHwMark,sizeof(int) );
           s1->setsockopt ( ZMQ_RCVTIMEO,sizeof(int) );
           s1->setsockopt ( ZMQ_RATE,sizeof(int) );
     //    s1->setsockopt ( ZMQ_RECOVERY_IVL,&recoverMSEC,sizeof(int) );

           s1->connect ( "pgm://local_IP;239.255.0.20:30001" );
           s1->setsockopt ( ZMQ_SUBSCRIBE,0 );                   printf ( "done. n" );

           int seq = 0;
           while( true ) {
                  zmq::message_t         msgbuff;                  RECV_start = nanoCLK::now(); RECV_last = RECV_ret;
                  int   ret = s1->recv( &msgbuff,0 );             RECV_ret   = nanoCLK::now();
                  if ( !ret )                                           printf ( "[T0+ %14d [ns]]: [SUB] did not receive any message within set timeout(%d). RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]n",std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(),recvTimeout,ret,std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(),std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
                  else                                                  printf ( "[T0+ %14d [ns]]: [SUB] did now receive   a message SEQ#(%6d.) DATA[%6d] B. RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]n",++seq,msgbuff.size(),std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
           }
    }
    catch( zmq::error_t   &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.ZMQ] An error occurred: %snWill RET(1)",e.what() );
           return 1;
    }
    catch( std::exception &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.std] An error occurred: %snWill RET(1)",e.what() );
           return 1;
    }
    return 0;
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读