EasySwoole 实现MQTT服务端

EasySwoole 实现MQTT服务端

俗话说前人种树后人乘凉,今天写的这篇文章也是站在前人的基础上实现的,使用开源simps/mqtt库实现

MQTT是什么

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

MQTT协议和TCP协议的特点和区别

MQTT
  • 轻量级:MQTT协议使用简单的二进制消息格式,消息头部只有2字节,这使得它非常适合在资源受限的设备上运行。

  • 发布-订阅模式:MQTT使用发布-订阅模式,支持一个发布者向多个订阅者发送消息。订阅者可以选择订阅特定的主题,只接收与自己相关的消息。

  • 异步通信:MQTT是异步通信模式,发布者发布消息后,不需要等待订阅者的回复,可以继续进行其他操作。

  • 低功耗:MQTT可以在低带宽和不稳定的网络环境下工作,能够降低设备的能耗。


TCP
  • 面向连接:TCP使用三次握手的方式建立连接,确保通信的可靠性。

  • 可靠性:TCP协议使用确认机制和流量控制来保证数据的完整性和可靠性,确保数据的准确传输。

  • 有序性:TCP保证数据的有序传输,通过序号和确认机制来实现数据的有序性。

  • 高效性:TCP使用了滑动窗口的机制,使得数据传输更加高效。


区别
  • 设计目标不同:MQTT协议是为物联网设备之间的通信设计的,而TCP协议是面向通用的网络通信设计的。

  • 通信方式不同:MQTT采用发布-订阅模式,支持多对多的通信方式,而TCP协议是点对点的通信方式。

  • 报文结构不同:MQTT协议使用二进制消息格式,只有2字节的消息头部,而TCP协议的报文结构相对复杂,包含序号、确认号等信息。

  • 网络模型不同:MQTT协议建立在TCP协议之上,通过TCP协议提供的可靠性和有序性来保证通信的可靠传输。

EasySwoole 如何实现

安装

composer require simps/mqtt 这个是开源的包,支持MQTT协议解析和协程客户端,无论是你用swoole当客户端还是当做服务端都可以,支持MQTT3.x和MQTT5.x

起一个TCP服务端

在EasyEasySwooleEvent中写入以下代码

$server = \EasySwoole\EasySwoole\ServerManager::getInstance()->getSwooleServer();
        $tcp = $server->addlistener(Config::getInstance()->getConf('MAIN_SERVER.LISTEN_ADDRESS'),1883,SWOOLE_TCP);
        $tcpConfig = new \EasySwoole\Socket\Config();
        $tcpConfig->setType($tcpConfig::TCP);
        $tcpConfig->setParser(MqttParser::class);
        $dispatch = new Dispatcher($tcpConfig);
        $tcp->set(
            [
                'package_max_length'    => 1024 * 1024 * 2, //设置最大数据包尺寸单位为字节
                'open_mqtt_protocol' => true,

//
            ]
        );

        //设置解析异常时的回调,默认将抛出异常到服务器
        $tcpConfig->setOnExceptionHandler(function (\Swoole\Server $server, \Throwable $throwable, string $raw,\EasySwoole\Socket\Client\Tcp $client, \EasySwoole\Socket\Bean\Response $response) {
            echo "Error on handling MQTT message: {$throwable->getMessage()}\n";
            //记录日志
            $server->close($client->getFd());
        });
        //连接
        $tcp->on('connect', function (\Swoole\Server $server, int $fd, int $reactor_id) {
            Logger::getInstance()->console('tcp->fd='.$fd.' connect');

        });
        //收到消息
        $tcp->on('receive', function (\Swoole\Server $server, int $fd, int $reactor_id, string $data) use ($dispatch) {
            $dispatch->dispatch($server, $data, $fd, $reactor_id);
        });

        //断开
        $tcp->on('close', function (\Swoole\Server $server, int $fd, int $reactor_id) {
            Logger::getInstance()->console('tcp->fd='.$fd.' close');
        });

swoole通过设置 open_mqtt_protocol 选项,启用后会解析 MQTT 包头,只要开启即可

Socket解析器

在MqttParser中写入以下代码

class MqttParser implements ParserInterface
{


    /**
     * @param $raw
     * @param $client
     * @return Caller|null
     */
    public function decode($raw, $client): ?Caller
    {
        $data = [
            'param' => V3::unpack($raw)  //调用composer 下载的mqtt协议解析包中的函数进行解析
        ];
        $bean = new Caller();
        $controller = !empty($data['controller']) ? $data['controller'] : 'Index';
        $action = !empty($data['action']) ? $data['action'] : 'index';
        $param = !empty($data['param']) ? $data['param'] : [];
        $controller = "App\\Mqtt\\{$controller}";
        $bean->setControllerClass($controller);
        $bean->setAction($action);
        $bean->setArgs($param);
        return $bean;
    }

    /**
     * @param Response $response
     * @param $client
     * @return string|null
     */
    public function encode(Response $response, $client): ?string
    {
        return null;
    }

Socket控制器

在Index中实现以下代码

class Index extends Controller
{

    /**
     * @param string|null $actionName
     */
    public function actionNotFound(?string $actionName)
    {
        $this->response()->setMessage("{$actionName} not found \n");
    }

    public function index()
    {

        $data = $this->caller()->getArgs();

        $client = $this->caller()->getClient();
        $server  = ServerManager::getInstance()->getSwooleServer();
        $fd = $client->getFd();
        try {
            if (is_array($data) && isset($data['type'])) {
                var_dump($data); //打印数据包进行查看
                switch ($data['type']) {
                    case Types::CONNECT:
                        // Check protocol_name
                        if ($data['protocol_name'] != 'MQTT') {
                            $server->close($fd);

                            return false;
                        }

                        // Check connection information, etc.

                        $server->send(
                            $fd,
                            V3::pack(
                                [
                                    'type' => Types::CONNACK,
                                    'code' => 0,
                                    'session_present' => 0,
                                ]
                            )
                        );
                        break;
                    case Types::PINGREQ:
                        $server->send($fd, V3::pack(['type' => Types::PINGRESP]));
                        break;
                    case Types::DISCONNECT:
                        if ($server->exists($fd)) {
                            $server->close($fd);
                        }
                        break;
                    case Types::PUBLISH:
                        // Send to subscribers
                        foreach ($server->connections as $sub_fd) {
                            if ($sub_fd != $fd) {
                                $server->send(
                                    $sub_fd,
                                    V3::pack(
                                        [
                                            'type' => $data['type'],
                                            'topic' => $data['topic'],
                                            'message' => $data['message'],
                                            'dup' => $data['dup'],
                                            'qos' => $data['qos'],
                                            'retain' => $data['retain'],
                                            'message_id' => $data['message_id'] ?? 0,
                                        ]
                                    )
                                );
                            }
                        }

                        if ($data['qos'] === 1) {
                            $server->send(
                                $fd,
                                V3::pack(
                                    [
                                        'type' => Types::PUBACK,
                                        'message_id' => $data['message_id'] ?? 0,
                                    ]
                                )
                            );
                        }

                        break;
                    case Types::SUBSCRIBE:
                        $payload = [];
                        foreach ($data['topics'] as $qos) {
                            if (is_numeric($qos) && $qos < 3) {
                                $payload[] = $qos;
                            } else {
                                $payload[] = 0x80;
                            }
                        }
                        $server->send(
                            $fd,
                            V3::pack(
                                [
                                    'type' => Types::SUBACK,
                                    'message_id' => $data['message_id'] ?? 0,
                                    'codes' => $payload,
                                ]
                            )
                        );
                        break;
                    case Types::UNSUBSCRIBE:
                        $server->send(
                            $fd,
                            V3::pack(
                                [
                                    'type' => Types::UNSUBACK,
                                    'message_id' => $data['message_id'] ?? 0,
                                ]
                            )
                        );
                        break;
                }
            } else {
                $server->close($fd);
            }
        } catch (\Throwable $e) {
            echo "\033[0;31mError: {$e->getMessage()}\033[0m\r\n";
            $server->close($fd);
        }
//

    }




}

测试

启动服务端

执行php easyswoole server start 服务端启动

打开客户端

客户端使用的是windows平台下的MQTTX客户端,免费

上面我用的是V3的协议测试的,所以客户端也选择的V3,其他的东西都是测试,随便填即可

发送消息测试


由此,一个简单的MQTT通信demo就完成了,后续在代码中完成自己的业务逻辑即可

有需要了解详细协议或者查看源码的小伙伴,从下方查看链接即可 MQTT 协议解析 & 协程客户端文档 MQTT协议V3.1文档 MQTT协议V5.0文档

注意:不要忘记开服务器的安全组和防火墙端口,有时候经常会忘记

北溟有鱼QAQ博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论