php – React / Ratchet / ZMQ中多个订阅方法的最佳实践
我尝试构建一个小的实时websocket用例,用户可以登录并查看登录的所有其他用户,在新用户登录或现有用户注销时收到通知.
对于这种情况,当用户登录或注销时,我在UserController中使用ZMQ PUSH Socket. UserConstroller public function login() { //... here is the auth code,model call etc... $aUserData = array();// user data comes from the database with username,logintime,etc.... $context = new ZMQContext(); $oSocket = $context->getSocket(ZMQ::SOCKET_PUSH,'USER_LOGIN_PUSHER'); // use persistent_id if($oSocket instanceof ZMQSocket) { $oSocket->connect("tcp://127.0.0.1:5555"); // $oSocket->send(json_encode($aUserData)); } } public function logout() { //... here is the logout code,model call etc .... $aUserData = array();// user data comes from the SESSION with username,'USER_LOGOUT_PUSHER'); // use persistent_id if($oSocket instanceof ZMQSocket) { $oSocket->connect("tcp://127.0.0.1:5555"); // $oSocket->send(json_encode($aUserData)); } } 然后我就像Ratchet文档中那样有一个Pusher类:link 在这个类中有两个方法:onUserLogin和onUserLogout,当然还有所有其他的东西
UserInformationPusher public function onUserLogin($aUserData) { //var_dump("onUserLogin"); $sUserData = json_decode($aUserData,true); $oTopic = $this->subscribedTopics["user_login"]; if($oTopic instanceof Topic) { $oTopic->broadcast($sUserData); } else { return; } } public function onUserLogout($aUserData) { //var_dump("onUserLogout"); $entryData = json_decode($aUserData,true); $oTopic = $this->subscribedTopics["user_logout"]; if($oTopic instanceof Topic) { $oTopic->broadcast($entryData); } else { return; } } 最后一部分是WampServer / WsServer / HttpServer,它有一个Loop来监听传入的连接.还有我的ZMQ PULL插座 RatchetServerConsole public function start_server() { $oPusher = new UserInformationPusher(); $oLoop = ReactEventLoopFactory::create(); $oZMQContext = new ReactZMQContext($oLoop); $oPullSocket = $oZMQContext->getSocket(ZMQ::SOCKET_PULL); $oPullSocket->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself $oPullSocket->on('message',array($oPusher,'onUserLogin')); $oPullSocket->on('message','onUserLogout')); $oMemcache = new Memcache(); $oMemcache->connect('127.0.0.1',11211); $oMemcacheHandler = new HandlerMemcacheSessionHandler($oMemcache); $oSession = new SessionProvider( new RatchetWampWampServer( $oPusher ),$oMemcacheHandler ); //$this->Output->info("Server start initiation with memcache!..."); $webSock = new ReactSocketServer($oLoop); $webSock->listen(8080,'0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect $oServer = new RatchetServerIoServer( new RatchetHttpHttpServer( new RatchetWebSocketWsServer( $oSession ) ),$webSock ); $this->Output->info("Server started "); $oLoop->run(); } 在此示例中,来自login()或logout()的调用将始终调用这两个方法(onUserLogin和onUserLogout). >我可以在Controller中的$sUserData中添加一些信息,并在Pusher中进行检查 没有客户端代码,因为它工作正常 解决方法
在你的RatchetServerConsole中,
去掉, $oPullSocket->on('message','onUserLogin')); $oPullSocket->on('message','onUserLogout')); 加,'onUserActionBroadcast')); . 在UserInformationPusher中, 删除onUserLogin()和onUserLogout(). 加, public function onUserActionBroadcast($aUserData) { $entryData = json_decode($aUserData,true); // If the lookup topic object isn't set there is no one to publish to if (!array_key_exists($entryData['topic'],$this->subscribedTopics)) { return; } $topic = $this->subscribedTopics[$entryData['topic']]; unset($entryData['topic']); // re-send the data to all the clients subscribed to that category $topic->broadcast($entryData); } . 您的UserConstroller(在$aUserData中添加主题), public function login() { //... here is the auth code,model call etc... $aUserData = array();// user data comes from the database with username,etc.... $aUserData['topic'] = 'USER_LOGIN'; // add the topic name $context = new ZMQContext(); $oSocket = $context->getSocket(ZMQ::SOCKET_PUSH,'my pusher'); // use persistent_id if($oSocket instanceof ZMQSocket) { $oSocket->connect("tcp://127.0.0.1:5555"); // $oSocket->send(json_encode($aUserData)); } } public function logout() { //... here is the logout code,model call etc .... $aUserData = array();// user data comes from the SESSION with username,etc.... $aUserData['topic'] = 'USER_LOGOUT'; // add the topic name $context = new ZMQContext(); $oSocket = $context->getSocket(ZMQ::SOCKET_PUSH,'my pusher'); // use persistent_id if($oSocket instanceof ZMQSocket) { $oSocket->connect("tcp://127.0.0.1:5555"); // $oSocket->send(json_encode($aUserData)); } } . 最后在你的视图文件中 <script> var conn = new ab.Session('ws://yourdomain.dev:9000',// Add the correct domain and port here function() { conn.subscribe('USER_LOGIN',function(topic,data) { console.log(topic); console.log(data); }); conn.subscribe('USER_LOGOUT',data) { console.log(topic); console.log(data); }); },function() { console.warn('WebSocket connection closed'); },{'skipSubprotocolCheck': true} ); </script> . 注意:基本思想是在推送器类中使用单个广播功能. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |