俗话说前人种树后人乘凉,今天写的这篇文章也是站在前人的基础上实现的,使用开源simps/mqtt库实现
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
轻量级:MQTT协议使用简单的二进制消息格式,消息头部只有2字节,这使得它非常适合在资源受限的设备上运行。
发布-订阅模式:MQTT使用发布-订阅模式,支持一个发布者向多个订阅者发送消息。订阅者可以选择订阅特定的主题,只接收与自己相关的消息。
异步通信:MQTT是异步通信模式,发布者发布消息后,不需要等待订阅者的回复,可以继续进行其他操作。
低功耗:MQTT可以在低带宽和不稳定的网络环境下工作,能够降低设备的能耗。
面向连接:TCP使用三次握手的方式建立连接,确保通信的可靠性。
可靠性:TCP协议使用确认机制和流量控制来保证数据的完整性和可靠性,确保数据的准确传输。
有序性:TCP保证数据的有序传输,通过序号和确认机制来实现数据的有序性。
高效性:TCP使用了滑动窗口的机制,使得数据传输更加高效。
设计目标不同:MQTT协议是为物联网设备之间的通信设计的,而TCP协议是面向通用的网络通信设计的。
通信方式不同:MQTT采用发布-订阅模式,支持多对多的通信方式,而TCP协议是点对点的通信方式。
报文结构不同:MQTT协议使用二进制消息格式,只有2字节的消息头部,而TCP协议的报文结构相对复杂,包含序号、确认号等信息。
网络模型不同:MQTT协议建立在TCP协议之上,通过TCP协议提供的可靠性和有序性来保证通信的可靠传输。
composer require simps/mqtt
这个是开源的包,支持MQTT协议解析和协程客户端,无论是你用swoole当客户端还是当做服务端都可以,支持MQTT3.x和MQTT5.x
在EasyEasySwooleEvent中写入以下代码
$server = \EasySwoole\EasySwoole\ServerManager::getInstance()->getSwooleServer();
$tcp = $server->addlistener(Config::getInstance()->getConf('MAIN_SERVER.LISTEN_ADDRESS'),1883,SWOOLE_TCP);
$tcpConfig = new \EasySwoole\Socket\Config();
$tcpConfig->setType($tcpConfig::TCP);
$tcpConfig->setParser(MqttParser::class);
$dispatch = new Dispatcher($tcpConfig);
$tcp->set(
[
'package_max_length' => 1024 * 1024 * 2, //设置最大数据包尺寸单位为字节
'open_mqtt_protocol' => true,
//
]
);
//设置解析异常时的回调,默认将抛出异常到服务器
$tcpConfig->setOnExceptionHandler(function (\Swoole\Server $server, \Throwable $throwable, string $raw,\EasySwoole\Socket\Client\Tcp $client, \EasySwoole\Socket\Bean\Response $response) {
echo "Error on handling MQTT message: {$throwable->getMessage()}\n";
//记录日志
$server->close($client->getFd());
});
//连接
$tcp->on('connect', function (\Swoole\Server $server, int $fd, int $reactor_id) {
Logger::getInstance()->console('tcp->fd='.$fd.' connect');
});
//收到消息
$tcp->on('receive', function (\Swoole\Server $server, int $fd, int $reactor_id, string $data) use ($dispatch) {
$dispatch->dispatch($server, $data, $fd, $reactor_id);
});
//断开
$tcp->on('close', function (\Swoole\Server $server, int $fd, int $reactor_id) {
Logger::getInstance()->console('tcp->fd='.$fd.' close');
});
swoole通过设置 open_mqtt_protocol 选项,启用后会解析 MQTT 包头,只要开启即可
在MqttParser中写入以下代码
class MqttParser implements ParserInterface
{
/**
* @param $raw
* @param $client
* @return Caller|null
*/
public function decode($raw, $client): ?Caller
{
$data = [
'param' => V3::unpack($raw) //调用composer 下载的mqtt协议解析包中的函数进行解析
];
$bean = new Caller();
$controller = !empty($data['controller']) ? $data['controller'] : 'Index';
$action = !empty($data['action']) ? $data['action'] : 'index';
$param = !empty($data['param']) ? $data['param'] : [];
$controller = "App\\Mqtt\\{$controller}";
$bean->setControllerClass($controller);
$bean->setAction($action);
$bean->setArgs($param);
return $bean;
}
/**
* @param Response $response
* @param $client
* @return string|null
*/
public function encode(Response $response, $client): ?string
{
return null;
}
在Index中实现以下代码
class Index extends Controller
{
/**
* @param string|null $actionName
*/
public function actionNotFound(?string $actionName)
{
$this->response()->setMessage("{$actionName} not found \n");
}
public function index()
{
$data = $this->caller()->getArgs();
$client = $this->caller()->getClient();
$server = ServerManager::getInstance()->getSwooleServer();
$fd = $client->getFd();
try {
if (is_array($data) && isset($data['type'])) {
var_dump($data); //打印数据包进行查看
switch ($data['type']) {
case Types::CONNECT:
// Check protocol_name
if ($data['protocol_name'] != 'MQTT') {
$server->close($fd);
return false;
}
// Check connection information, etc.
$server->send(
$fd,
V3::pack(
[
'type' => Types::CONNACK,
'code' => 0,
'session_present' => 0,
]
)
);
break;
case Types::PINGREQ:
$server->send($fd, V3::pack(['type' => Types::PINGRESP]));
break;
case Types::DISCONNECT:
if ($server->exists($fd)) {
$server->close($fd);
}
break;
case Types::PUBLISH:
// Send to subscribers
foreach ($server->connections as $sub_fd) {
if ($sub_fd != $fd) {
$server->send(
$sub_fd,
V3::pack(
[
'type' => $data['type'],
'topic' => $data['topic'],
'message' => $data['message'],
'dup' => $data['dup'],
'qos' => $data['qos'],
'retain' => $data['retain'],
'message_id' => $data['message_id'] ?? 0,
]
)
);
}
}
if ($data['qos'] === 1) {
$server->send(
$fd,
V3::pack(
[
'type' => Types::PUBACK,
'message_id' => $data['message_id'] ?? 0,
]
)
);
}
break;
case Types::SUBSCRIBE:
$payload = [];
foreach ($data['topics'] as $qos) {
if (is_numeric($qos) && $qos < 3) {
$payload[] = $qos;
} else {
$payload[] = 0x80;
}
}
$server->send(
$fd,
V3::pack(
[
'type' => Types::SUBACK,
'message_id' => $data['message_id'] ?? 0,
'codes' => $payload,
]
)
);
break;
case Types::UNSUBSCRIBE:
$server->send(
$fd,
V3::pack(
[
'type' => Types::UNSUBACK,
'message_id' => $data['message_id'] ?? 0,
]
)
);
break;
}
} else {
$server->close($fd);
}
} catch (\Throwable $e) {
echo "\033[0;31mError: {$e->getMessage()}\033[0m\r\n";
$server->close($fd);
}
//
}
}
执行php easyswoole server start
服务端启动
客户端使用的是windows平台下的MQTTX客户端,免费
上面我用的是V3的协议测试的,所以客户端也选择的V3,其他的东西都是测试,随便填即可
由此,一个简单的MQTT通信demo就完成了,后续在代码中完成自己的业务逻辑即可
有需要了解详细协议或者查看源码的小伙伴,从下方查看链接即可 MQTT 协议解析 & 协程客户端文档 MQTT协议V3.1文档 MQTT协议V5.0文档
注意:不要忘记开服务器的安全组和防火墙端口,有时候经常会忘记
本文为北溟有鱼QAQ原创文章,转载无需和我联系,但请注明来自北溟有鱼QAQ https://www.amdzz.cn
最新评论