PHP 基于 SW-X 框架,搭建WebSocket服务器(二)

语言: CN / TW / HK

前言

官网地址:​ ​SW-X框架-专注高性能便捷开发而生的PHP-SwooleX框架​

希望各大佬举起小手,给小弟一个star:​ ​https://github.com/swoolex/swoolex​

1、前端模板

最终要实现的效果,如下图:

该模板可以直接下载:​ ​练习WebSocket使用的前端html模板​

也可以直接使用下面的前端代码,命名为:​ ​index.html​


<html>
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="renderer" content="webkit">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1">
<title>SW-X | WebSocket客户端示例 title>
<script src="https://blog.junphp.com/public/js/jquery.min.js"> script>
<script src="jquery.md5.js"> script>
<script src="tim.js"> script>
<style>
body,html{margin: 0;padding: 10px; height: calc(100% - 30px);font-size: 13px;}
ul,li{list-style: none;padding: 0;margin: 0;}
.user_list{width: 200px; height: 100%; overflow: hidden; overflow-y: auto; padding: 10px;border: 1px solid #ccc;float: left;}
.user_list li{width: 100%;padding: 5px 0;cursor: pointer;}
.user_list li:hover{color: #0077d6;}
.main{width: calc(100% - 550px); height: 70%; overflow: hidden; overflow-y: auto; padding: 10px;border: 1px solid #ccc;float: left;border-left: 0;background: #e9f8ff;}
.content{width: calc(100% - 530px); height: calc(30% - 1px);border: 1px solid #ccc;float: left;border-left: 0;border-top: 0;position: relative;}
#content{width: calc(100% - 20px);;border: 0;height:calc(100% - 25px);padding: 10px;}
#content:focus{outline: none;}
code{padding: 3px 5px;border-radius: 30%; color: #fff;}
.online{background: #35b700;}
.offline{background: red;}
.record{float: left;width: 100%;padding: 5px 0;}
.record span{font-size: 12px; background: #ccc; border-radius: 5px; color: #0037ff;padding: 1px 3px;}
.other p{text-indent: 30px;padding: 0px;}
.own{text-align: right;}
.tips{text-align: center;font-size: 12px; color: #e80000;}
.drift{position: absolute;bottom: 10px; right: 10px; }
#send{background: #009e3f;border: 1px solid #009020;font-size: 14px;padding: 3px 10px;color: #fff;border-radius: 5px;cursor: pointer;}
#send:hover{background: #008234;border: 1px solid #005613;}
#open{background: #009e97;border: 1px solid #007974;font-size: 14px;padding: 3px 10px;color: #fff;border-radius: 5px;cursor: pointer;}
#open:hover{background: #008a84;border: 1px solid #00736e;}
#close{background: #ef0000;border: 1px solid #c30000;font-size: 14px;padding: 3px 10px;color: #fff;border-radius: 5px;cursor: pointer;}
#close:hover{background: #c50000;border: 1px solid #a00000;}
input{padding: 4px;}
.log{width: 326px;height: calc(100% - 40px);border: 1px solid #ccc;float: right;border-left: 0;position: absolute;right: 0;overflow: hidden;overflow-y: auto;}
.log div{width: calc(100% - 20px);padding:10px 10px 0 10px;}
style>
head>
<body>

<div class="user_list">
<ul> ul>
div>

<div class="main"> div>

<div class="content">
<textarea id="content"> textarea>
<div class="drift">
<input id="host" type="text" placeholder="WS地址" style="width: 700px;">
<input id="user_id" type="text" placeholder="输入user_id">
<input id="username" type="text" placeholder="输入用户名">
<button id="open">连接 button>
<button id="close">断开 button>
<button id="send">发送 button>
div>
div>

<div class="log"> div>
body>
html>

注意:最上面有一个​ ​tim.js​ ​文件需要你自行创建,后续的教程都只对该文件进行变更说明而已。

2、服务端鉴权并记录用户信息

####A、创建内存表 服务端内部使用内存表来缓存用户信息,以减少推送交互时对Mysql的查询压力。 修改​ ​/config/swoole_table.php​ ​,改成以下代码:

return [
     [
        'table' => 'user',// 用户信息表
        'length' => 100000,// 表最大行数上限
        'field' => [ // 字段信息
            'fd' => [
                'type' => \Swoole\Table::TYPE_INT, // swoole的标识符
                'size' => 13, // 字段长度限制
            ],
            'user_id' => [
                'type' => \Swoole\Table::TYPE_STRING, // 客户端ID
                'size' => 64,
            ],
            'username' => [
                'type' => \Swoole\Table::TYPE_STRING, // 用户名
                'size' => 64,
            ],
            'heart_num' => [
                'type' => \Swoole\Table::TYPE_INT, // 心跳次数
                'size' => 1, // 字段长度限制
            ],
        ]
    ],
    [
        'table' => 'fd',// fd标识符反查表
        'length' => 100000,
        'field' => [
            'user_id' => [
                'type' => \Swoole\Table::TYPE_STRING,
                'size' => 64,
            ],
        ]
    ]
];

B、连接时鉴权

通过客户端在ws时的地址携带GET参数,可以进行​ ​open​ ​握手阶段的权限控制,防止而已连接,同时还可以记录[更新]客户端的连接信息,修改​ ​/box/event/server/onOpen.php​ ​代码:

namespace box\event\server;
// 引入内存表组件
use x\swoole\Table;
// 引入websocket控制器基类
use x\controller\WebSocket;

class onOpen
{
    /**
     * 启动实例
    */
    public $server;

    /**
     * 统一回调入口
     * @author 小黄牛
     * @version v1.0.1 + 2020.05.26
     * @param Swoole\WebSocket\Server $server
     * @param Swoole\Http\Request $request HTTP请求对象
    */
    public function run($server, $request) {
        $this->server = $server;
        
        // 实例化客户端
        $this->websocket = new WebSocket();
        
        // 获取参数
        $param = $request->get;
        
        // 参数过滤
        $res = $this->_param($param, $request->fd);
        if (!$res) return false;
        
        // 参数鉴权
        $res = $this->_sign_check($param, $request->fd, $request);
        if (!$res) return false;
        
        // 将客户信息记录进table内存表
        // 用户信息表
        Table::table('user')->name($param['user_id'])->upsert([
            'fd' => $request->fd,
            'user_id' => $param['user_id'],
            'username' => $param['username'],
        ]);
        // 标识符反查user_id表
        Table::table('fd')->name($request->fd)->upsert([
            'user_id' => $param['user_id'],
        ]);
        // 广播上线消息
        $table = Table::table('user')->all();
        foreach ($table as $key=>$info) {
            $data = ['user_id'=>$param['user_id'], 'username'=>$param['username'], 'status' => 1];
            $this->websocket->fetch(10001, $param['username'].' 骑着小黄牛 上线啦~', $data, $info['fd']);
        }
        
        return true;
    }
    
    /**
     * 参数过滤
     * @author 小黄牛
    */
    public function _param($param, $fd) {
        if (empty($param['user_id'])) {
            $this->websocket->fetch(40001, '缺少user_id');
            $this->server->close($fd); 
            return false;
        }
        if (empty($param['username'])) {
            $this->websocket->fetch(40001, '缺少username');
            $this->server->close($fd); 
            return false;
        }
        if (empty($param['sign'])) {
            $this->websocket->fetch(40001, '缺少sign');
            $this->server->close($fd); 
            return false;
        }
        if (empty($param['time'])) {
            $this->websocket->fetch(40001, '缺少time');
            $this->server->close($fd); 
            return false;
        }
        return true;
    }
    
    /**
     * 参数鉴权
     * @author 小黄牛
    */
    public function _sign_check($param, $fd, $request) {
        // 过期
        $now_time = time();
        $max_time = $now_time + 3600;
        $min_time = $now_time - 3600;
        // 时间戳请求前后60分钟内有效,防止客户端和服务器时间误差
        if ($param['time'] < $min_time || $param['time'] > $max_time ){
            $this->websocket->fetch(40002, 'time已过期');
            $this->server->close($fd); 
            return false;
        }

        // 域名来源判断
        // 使用 $request->header['origin'] 获取来源域名
        // 如果有需要的同学可以自己参考上面的判断写下

        // 签名验证
        // 生产环境不应该这么简单,自己思考API的鉴权逻辑即可
        $sign = md5($param['user_id'].$param['time']);
        if ($sign != $param['sign']) {
            $this->websocket->fetch(40002, 'sign错误,应该是md5(user_id + time):');
            $this->server->close($fd); 
            return false;
        }

        return true;
    }
}

3、下线广播

通过内存表的支持,我们可以在​ ​/box/event/onClose.php​ ​阶段对客户端进行下线广播:

namespace box\event\server;
// 引入内存表组件
use x\swoole\Table;
// 引入websocket控制器基类
use x\controller\WebSocket;

class onClose
{
    /**
     * 启动实例
    */
    public $server;

    /**
     * 统一回调入口
     * @author 小黄牛
     * @version v1.0.1 + 2020.05.26
     * @param Swoole\Server $server
     * @param int $fd 连接的文件描述符
     * @param int $reactorId 来自那个 reactor 线程,主动 close 关闭时为负数
    */
    public function run($server, $fd, $reactorId) {
        $this->server = $server;
        
        // 实例化客户端
        $this->websocket = new WebSocket();
        
        // 通过fd反查信息
        $user = Table::table('fd')->name($fd)->find();
        $user_info = Table::table('user')->name($user['user_id'])->find();
        
        // 广播下线消息
        $table = Table::table('user')->all();
        foreach ($table as $key=>$info) {
            $data = ['user_id'=>$user_info['user_id'], 'username'=>$user_info['username'], 'status' => 2];
            // 这样需要注意 close比较特殊,如果需要广播,最后一个参数要传入server实例才行
            $this->websocket->fetch(10001, $user_info['username'].' 骑着扫帚 灰溜溜的走了~', $data, $info['fd'], $this->server);
        }
        
        return true;
    }
}

4、客户端消息处理

本案例客户端只使用到2个路由,分别是处理普通消息的群发通知,还有心跳检测的次数重置。

A、普通消息群发通知

控制器:​ ​/app/websocket/user/broadcast.php​ ​:

// 普通广播
namespace app\websocket\user;
use x\controller\WebSocket;
// 引入内存表组件
use x\swoole\Table;

class broadcast extends WebSocket {
    
    public function index() {
        // 接收请求参数
        $param = $this->param();
        
        // 获取当前客户端标识符
        $fd = $this->get_current_fd();
        
        // 广播消息
        $table = Table::table('user')->all();
        foreach ($table as $key=>$info) {
            // 不推给自己
            if ($info['fd'] != $fd) {
                $this->fetch(10002, $param['content'], ['username' => $info['username']], $info['fd']);
            }
        }
        return true;
    }
}

B、心跳次数重置

控制器:​ ​/app/websocket/user/heart.php​ ​:

// 心跳重置
namespace app\websocket\user;
use x\controller\WebSocket;
// 引入内存表组件
use x\swoole\Table;

class heart extends WebSocket {

    public function index() {
        // 获取当前客户端标识符
        $fd = $this->get_current_fd();
        // 通过fd反查信息
        $user = Table::table('fd')->name($fd)->find();
        $user_info = Table::table('user')->name($user['user_id'])->find();
        $user_info['heart_num'] = 0;
        // 重置心跳次数
        Table::table('user')->name($user['user_id'])->upsert($user_info);
        return $this->fetch(10003, '心跳完成');
    }
}

5、基于定时器检测心跳超时的客户端

先创建一个定时器:​ ​/box/crontab/heartHandle.php​ ​:

// 心跳检测处理
namespace box\crontab;
use x\Crontab;
// 引入内存表组件
use x\swoole\Table;
// 客户端实例
use x\controller\WebSocket;

class heartHandle extends Crontab{

    /**
     * 统一入口
     * @author 小黄牛
     * @version v2.5.0 + 2021.07.20
    */
    public function run() {
        // 获得server实例
        $server = $this->get_server();
        // 获得客户端实例
        $websocket = new WebSocket();
        
        $table = Table::table('user')->all();
        foreach ($table as $key=>$info) {
            // 检测心跳连续失败次数大于5次的记录进行广播下线
            if ($info['heart_num'] > 5) {
                $data = ['user_id'=>$info['user_id'], 'username'=>$info['username'], 'status' => 2];
                // 这样需要注意 close比较特殊,如果需要广播,最后一个参数要传入server实例才行
                $websocket->fetch(10001, $user_info['username'].' 骑着扫帚 灰溜溜的走了~', $data, $info['fd'], $server);
                // 关闭它的连接
                $server->close($info['fd']);
            } else {
                // 失败次数+1
                Table::table('user')->name($info['user_id'])->setDec('heart_num', 1);
            }
        }
    }
}

然后注册定时器,为​ ​5秒​ ​执行一次,修改​ ​/config/crontab.php​ ​为以下代码:

return [
    [
        'rule' => 5000,
        'use' => '\box\crontab\heartHandle',
        'status' => true,
    ]
];

6、编写tim.js客户端代码

$(function(){
var lockReconnect = false; // 正常情况下我们是关闭心跳重连的
var wsServer; // 连接地址
var websocket; // ws实例
var time; // 心跳检测定时器指针
var user_id; // 用户ID
var username; // 用户昵称
$('#user_id').val(random(100000, 999999));
$('#username').val(getRandomName(3));

// 点击连接
$('#open').click(function(){createWebSocket();})
// 点击断开
$('#close').click(function(){addLog('主动断开连接');websocket.close();})
// 发送消息
$('#send').click(function(){
var content = $('#content').val();
if (content == '' || content == null) {
alert('请先输入内容');
return false;
}
// 自己
$('.main').append('
' + content + ' :说'+getDate()+' 自己
'
);
// 广播消息
send('user/broadcast', {
'content':content
})
$('#content').val('');
saveScroll('.main')
})
// 发送数据到服务端
function send(action, data) {
// 补充用户信息
data.user_id = $('#user_id').val()
data.username = $('#username').val()
// 组装SW-X的固定格式
var body = {
'action' : action,
'data' : data,
}
body = JSON.stringify(body);
websocket.send(body);
addLog('发送数据:'+body);
}

// 记录log
function addLog(msg) {$('.log').append('
' + msg + '
'
);saveScroll('.log')}

// 启动websocket
function createWebSocket() {
var time = Date.now() / 1000;
var host = $('#host').val();
user_id = $('#user_id').val();
username = $('#username').val();
if (host == '' || host == null) {
alert('请先输入host地址');
return false;
}
if (user_id == '' || user_id == null) {
alert('请先输入user_id');
return false;
}
if (username == '' || username == null) {
alert('请先输入用户名');
return false;
}
wsServer = host+'?user_id='+user_id+'&username='+username+'&time='+time+'&sign='+$.md5(user_id+time);
try {
websocket = new WebSocket(wsServer);
init();
} catch(e) {
reconnect();
}
}
// 初始化WebSocket
function init() {
// 接收Socket断开时的消息通知
websocket.onclose = function(evt) {
addLog('Socket断开了...正在试图重新连接...');
reconnect();
};
// 接收Socket连接失败时的异常通知
websocket.onerror = function(e){
addLog('Socket发生异常...正在试图重新连接...');
reconnect();
};
// 连接成功
websocket.onopen = function (evt) {
addLog('连接成功');
// 心跳检测重置
heartCheck.start();
};
// 接收服务端广播的消息通知
websocket.onmessage = function(evt){
var data = evt.data;
addLog('接收到服务端消息:'+data);
var obj = JSON.parse(data);

// 消息处理
switch (obj.action) {
// 上下线
case 10001:
var body = obj.data;
$('.main').append('
' + obj. msg + '
'
);

// 登录
if ($('#userid_'+body.user_id).html() == undefined) {
$('.user_list ul').append('
  • body.user_id+'">'+body.username+'在线
  • '
    );
    } else {
    // 重登
    if (body.status == 1) {
    $('#userid_'+body.user_id+' code').removeClass('offline');
    $('#userid_'+body.user_id+' code').addClass('online');
    $('#userid_'+body.user_id+' code').html('在线');
    // 下线
    } else {
    $('#userid_'+body.user_id+' code').removeClass('online');
    $('#userid_'+body.user_id+' code').addClass('offline');
    $('#userid_'+body.user_id+' code').html('离线');
    }
    }
    saveScroll('.main')
    break;
    // 收到普通消息
    case 10002:
    var body = obj.data;
    // 对方
    $('.main').append('
    '+body.username+' ' + getDate() + ' 说:' + obj. msg + '
    '
    );
    saveScroll('.main')
    break;
    // 回复了一次心跳
    case 10003:
    // 心跳检测重置
    heartCheck.start();
    break;
    default:
    break;
    }
    };
    }
    // 掉线重连
    function reconnect() {
    if(lockReconnect) {
    return;
    };
    lockReconnect = true;
    // 没连接上会一直重连,设置心跳延迟避免请求过多
    time && clearTimeout(time);
    time = setTimeout(function () {
    createWebSocket();
    lockReconnect = false;
    }, 5000);
    }
    // 心跳检测
    var heartCheck = {
    timeout: 5000,
    timeoutObj: null,
    serverTimeoutObj: null,
    start: function() {
    var self = this;
    this.timeoutObj && clearTimeout(this.timeoutObj);
    this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
    this.timeoutObj = setTimeout(function(){
    // 这里需要发送一个心跳包给服务端
    send('user/heart', {})
    }, this.timeout)
    }
    }

    // 生成ID
    function random(min, max) {
    return Math.floor(Math.random() * (max - min)) + min;
    }
    // 解码
    function decodeUnicode(str) {
    //Unicode显示方式是\u4e00
    str = "\\u"+str
    str = str.replace(/\\/g, "%");
    //转换中文
    str = unescape(str);
    //将其他受影响的转换回原来
    str = str.replace(/%/g, "\\");
    return str;
    }
    // 生成中文名
    function getRandomName(NameLength){
    let name = ""
    for(let i = 0;i<NameLength;i++){
    let unicodeNum = ""
    unicodeNum = random(0x4e00,0x9fa5).toString(16)
    name += decodeUnicode(unicodeNum)
    }
    return name
    }
    // 获得当前日期
    function getDate() {
    var oDate = new Date();
    return oDate.getHours()+':'+oDate.getMinutes()+':'+oDate.getSeconds();
    }
    // 滚动到底部
    function saveScroll(id) {
    $(id).scrollTop( $(id)[0].scrollHeight );
    }
    })

    7、案例源码下载

    如果不想自己一步步组装的,可以直接本次下载源码查看:​ ​SW-X WebSocket案例源码​