基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送
发布时间:2020-12-13 21:02:55 所属栏目:PHP教程 来源:网络整理
导读:思路 利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。 ? WebSocket 服务 composer require hyperf/websocket-server 配置文件 [config/autoload/server.php]
思路利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。 ? WebSocket 服务composer require hyperf/websocket-server
配置文件 [config/autoload/server.php]<?php return [ 'mode' => SWOOLE_PROCESS,'servers' => [ [ 'name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 22222,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [ SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class,'onRequest'],],[ 'name' => 'ws','type' => Server::SERVER_WEBSOCKET,'port' => 12222,'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class,'onHandShake'],SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class,'onMessage'],SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class,'onClose'],
WebSocket 服务器端代码示例<?php declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://doc.hyperf.io * @contact group@hyperf.io * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */ namespace AppController; use HyperfContractOnCloseInterface; use HyperfContractOnMessageInterface; use HyperfContractOnOpenInterface; use SwooleHttpRequest; use SwooleServer; use SwooleWebsocketFrame; use SwooleWebSocketServer as WebSocketServer; class WebSocketController extends Controller implements OnMessageInterface,OnOpenInterface,OnCloseInterface { /** * 发送消息 * @param WebSocketServer $server * @param Frame $frame */ public function onMessage(WebSocketServer $server,Frame $frame): void { //心跳刷新缓存 $redis = $this->container->get(Redis::class); //获取所有的客户端id $fdList = $redis->sMembers('websocket_sjd_1'); //如果当前客户端在客户端集合中,就刷新 if (in_array($frame->fd,$fdList)) { $redis->sAdd('websocket_sjd_1',$frame->fd); $redis->expire('websocket_sjd_1',7200); } $server->push($frame->fd,'Recv: ' . $frame->data); } /** * 客户端失去链接 * @param Server $server * @param int $fd * @param int $reactorId */ public function onClose(Server $server,int $fd,int $reactorId): void { //删掉客户端id $redis = $this->container->get(Redis::class); //移除集合中指定的value $redis->sRem('websocket_sjd_1',$fd); var_dump('closed'); } /** * 客户端链接 * @param WebSocketServer $server * @param Request $request */ public function onOpen(WebSocketServer $server,Request $request): void { //保存客户端id $redis = $this->container->get(Redis::class); $res1 = $redis->sAdd('websocket_sjd_1',$request->fd); var_dump($res1); $res = $redis->expire('websocket_sjd_1',7200); var_dump($res); $server->push($request->fd,'Opened'); } }
WebSocket 前端代码function WebSocketTest() { if ("WebSocket" in window) { console.log("您的浏览器支持 WebSocket!"); var num = 0 // 打开一个 web socket var ws = new WebSocket("ws://127.0.0.1:12222"); ws.onopen = function () { // Web Socket 已连接上,使用 send() 方法发送数据 //alert("数据发送中..."); //ws.send("发送数据"); }; window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开 var ping = {"type": "ping"}; ws.send(JSON.stringify(ping)); },5000); ws.onmessage = function (evt) { var d = JSON.parse(evt.data); console.log(d); if (d.code == 300) { $(".address").text(d.address) } if (d.code == 200) { var v = d.data console.log(v); num++ var str = `<div class="item"> <p>${v.recordOutTime}</p> <p>${v.userOutName}</p> <p>${v.userOutNum}</p> <p>${v.doorOutName}</p> </div>` $(".tableHead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclose = function () { // 关闭 websocket alert("连接已关闭..."); }; } else { alert("您的浏览器不支持 WebSocket!"); } }
AMQP 组件
配置文件 [config/autoload/amqp.php]<?php return [ 'default' => [ 'host' => 'localhost','port' => 5672,'user' => 'guest','password' => 'guest','vhost' => '/','pool' => [ 'min_connections' => 1,'max_connections' => 10,'connect_timeout' => 10.0,'wait_timeout' => 3.0,'heartbeat' => -1,'params' => [ 'insist' => false,'login_method' => 'AMQPLAIN','login_response' => null,'locale' => 'en_US','connection_timeout' => 3.0,'read_write_timeout' => 6.0,'context' => null,'keepalive' => false,'heartbeat' => 3,];
MQ 消费者代码<?php declare(strict_types=1); namespace AppAmqpConsumer; use HyperfAmqpAnnotationConsumer; use HyperfAmqpMessageConsumerMessage; use HyperfAmqpResult; use HyperfServerServer; use HyperfServerServerFactory; /** * @Consumer(exchange="hyperf",routingKey="hyperf",queue="hyperf",nums=1) */ class DemoConsumer extends ConsumerMessage { /** * rabbmitMQ消费端代码 * @param $data * @return string */ public function consume($data): string { print_r($data); //获取集合中所有的value $redis = $this->container->get(Redis::class); $fdList=$redis->sMembers('websocket_sjd_1'); $server=$this->container->get(ServerFactory::class)->getServer()->getServer(); foreach($fdList as $key=>$v){ if(!empty($v)){ $server->push((int)$v,$data); } } return Result::ACK; }
控制器代码/** * test * @return array */ public function test() { $data = array( 'code' => 200,'data' => [ 'userOutName' => 'ccflow','userOutNum' => '9999','recordOutTime' => date("Y-m-d H:i:s",time()),'doorOutName' => '教师公寓',] ); $data = GuzzleHttpjson_encode($data); $message = new DemoProducer($data); $producer = ApplicationContext::getContainer()->get(Producer::class); $result = $producer->produce($message); var_dump($result); $user = $this->request->input('user','Hyperf'); $method = $this->request->getMethod(); return [ 'method' => $method,'message' => "{$user}.",]; }
最终效果? ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |