欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

websocket心跳链接代码_Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送

发布时间:2025/7/14 编程问答 33 豆豆
生活随笔 收集整理的这篇文章主要介绍了 websocket心跳链接代码_Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

作者:八重樱

来源:www.cnblogs.com/a609251438/p/12713467.html

介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

<?phpreturn  ['mode' => SWOOLE_PROCESS,'servers' => [        ['name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 11111,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [                SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'],            ],        ],        ['name' => 'ws','type' => Server::SERVER_WEBSOCKET,'host' => '0.0.0.0','port' => 12222,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [                SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'],                SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'],                SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'],            ],        ],    ],

WebSocket 服务器端代码示例

<?phpdeclare (strict_types=1);/** * This file is part of Hyperf. * * @link     https://www.hyperf.io * @document https://doc.hyperf.io * @contact  group@hyperf.io * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */namespace AppController;use HyperfContractOnCloseInterface;use HyperfContractOnMessageInterface;use HyperfContractOnOpenInterface;use SwooleHttpRequest;use SwooleServer;use SwooleWebsocketFrame;use SwooleWebSocketServer as WebSocketServer;class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface{/**     * 发送消息     * @param WebSocketServer $server     * @param Frame $frame     */public function onMessage(WebSocketServer $server, Frame $frame): void{//心跳刷新缓存        $redis = $this->container->get(Redis::class);//获取所有的客户端id        $fdList = $redis->sMembers('websocket_sjd_1');//如果当前客户端在客户端集合中,就刷新if (in_array($frame->fd, $fdList)) {            $redis->sAdd('websocket_sjd_1', $frame->fd);            $redis->expire('websocket_sjd_1', 7200);        }        $server->push($frame->fd, 'Recv: ' . $frame->data);    }/**     * 客户端失去链接     * @param Server $server     * @param int $fd     * @param int $reactorId     */public function onClose(Server $server, int $fd, int $reactorId): void{//删掉客户端id        $redis = $this->container->get(Redis::class);//移除集合中指定的value        $redis->sRem('websocket_sjd_1', $fd);        var_dump('closed');    }/**     * 客户端链接     * @param WebSocketServer $server     * @param Request $request     */public function onOpen(WebSocketServer $server, Request $request): void{//保存客户端id        $redis = $this->container->get(Redis::class);        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);        var_dump($res1);        $res = $redis->expire('websocket_sjd_1', 7200);        var_dump($res);        $server->push($request->fd, 'Opened');    }}

WebSocket 前端代码

function WebSocketTest() {if ("WebSocket" in window) {console.log("您的浏览器支持 WebSocket!");var num = 0// 打开一个 web socketvar ws = new WebSocket("ws://127.0.0.1:12222");        ws.onopen = function () {// Web Socket 已连接上,使用 send() 方法发送数据//alert("数据发送中...");//ws.send("发送数据");        };window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开var ping = {"type": "ping"};            ws.send(JSON.stringify(ping));        }, 5000);       ws.onmessage = function (evt) {var d = JSON.parse(evt.data);console.log(d);if (d.code == 300) {                $(".address").text(d.address)            }if (d.code == 200) {var v = d.dataconsole.log(v);                num++var str = `                                

${v.recordOutTime}

                               

${v.userOutName}

                               

${v.userOutNum}

                               

${v.doorOutName}

                            `                $(".tableHead").after(str)if (num > 7) {                   num--                    $(".table .item:nth-last-child(1)").remove()                }            }        };        ws.error = function (e) {console.log(e)            alert(e)        }        ws.onclose = function () {// 关闭 websocket            alert("连接已关闭...");        };    } else {        alert("您的浏览器不支持 WebSocket!");    }}

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?phpreturn  ['default' => ['host' => 'localhost','port' => 5672,'user' => 'guest','password' => 'guest','vhost' => '/','pool' => ['min_connections' => 1,'max_connections' => 10,'connect_timeout' => 10.0,'wait_timeout' => 3.0,'heartbeat' => -1,        ],'params' => ['insist' => false,'login_method' => 'AMQPLAIN','login_response' => null,'locale' => 'en_US','connection_timeout' => 3.0,'read_write_timeout' => 6.0,'context' => null,'keepalive' => false,'heartbeat' => 3,        ],    ],];MQ 消费者代码<?phpdeclare (strict_types=1);namespace AppAmqpConsumer;use HyperfAmqpAnnotationConsumer;use HyperfAmqpMessageConsumerMessage;use HyperfAmqpResult;use HyperfServerServer;use HyperfServerServerFactory;/** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */class DemoConsumer extends ConsumerMessage{/**     * rabbmitMQ消费端代码     * @param $data     * @return string     */public function consume($data): string{        print_r($data);//获取集合中所有的value        $redis = $this->container->get(Redis::class);        $fdList=$redis->sMembers('websocket_sjd_1');        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();foreach($fdList as $key=>$v){if(!empty($v)){                $server->push((int)$v, $data);            }        }return Result::ACK;    }}

控制器代码

/** * test * @return array */public function test(){    $data = array('code' => 200,'data' => ['userOutName' => 'ccflow','userOutNum' => '9999','recordOutTime' => date("Y-m-d H:i:s", time()),'doorOutName' => '教师公寓',        ]    );    $data = GuzzleHttpjson_encode($data);    $message = new DemoProducer($data);    $producer = ApplicationContext::getContainer()->get(Producer::class);    $result = $producer->produce($message);    var_dump($result);    $user = $this->request->input('user', 'Hyperf');    $method = $this->request->getMethod();return ['method' => $method,'message' => "{$user}.",    ];}

最终效果

总结

以上是生活随笔为你收集整理的websocket心跳链接代码_Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。