网站首页 » Thinkphp » Thinkphp 使用swoole 实现异步任务

Thinkphp 使用swoole 实现异步任务

July 8, 2020 Thinkphp

在使用thinkcmf 做项目的时候,有一些比较耗时的工作, 但是不想阻碍程序的进行,所以想到使用swoole 的异步任务。

场景:
我在添加了一个用户之后,需要通过接口将用户添加到有赞商城的销售员。 添加用户完成后将添加到有赞商城业务员的操作放到异步的任务中去执行, 这样就不影响我添加用户的操作; 如果不这么做的话,还需要等待通过接口将用户数据推送到有赞商城。 如果本地添加成功,但是添加到有赞商城添加失败,如果不回滚数据则两边对不上。 所以需要有一个异步任务去执行添加用户到有赞商城的销售员。

借助了以下东西:

  1. thinkphp框架的钩子
  2. swoole

设置钩子

在项目中添加钩子文件 app/tags.php

<?php
/**
 * 钩子定义
 * 钩子定义配置文件,在此框架内,只能放在app 下, 放在模块下不起作用
 * 钩子的执行为同步执行,会对程序造成阻塞。
 */
return [

    // '钩子名称' => ['行为类']
    //添加销售员到有赞
    'add_saleman_to_youzan' => [
        'app\\common\\behavior\\YzTask',
    ],

YzTask.php 为钩子的执行类

<?php


namespace app\common\behavior;

//定义了一个行为 YzTask

use Swoole\Coroutine;

class YzTask
{
    //swoole tcp server 监听的ip
    protected $host ;
    //swoole tcp server 监听的端口
    protected $port;

    public function __construct()
    {
        //获取swoole tcp 的配置
        $this->host = config('swoole_server.host');
        $this->port = config('swoole_server.port');
    }

    /**
     * @param string $string
     * $param = [
     *  'type' => xx  //类型
     *  'data' => xx  //数据
     * ];
     */
    public function run(string $string){
        //在以下代表之后都转为协程
        \Swoole\Runtime::enableCoroutine();
        //客户端用来访问swoole 创建的tcp服务器
        $client = new \Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
        $ret = $client->connect($this->host, $this->port);

        if ($ret) {
            $return = $client->send($string);
            if($return ) $client->close();
        } else {
            echo "断开了";
            $client->close();
        }

    }
}

在控制器中,添加成功用户后调用钩子, 会直接执行tags.php 中钩子执行的行为类 app\common\behavior\YzTask 中 run()方法, 详情参考https://www.kancloud.cn/manual/thinkphp5_1/354129
2020-07-08T09:46:44.png

swoole 引入

根据thinkcmf 官方文档
https://www.thinkcmf.com/doc5_1.html

安装完成swoole 扩展后

创建文件 data\config\swoole_server.php; 我们要使用swoole创建一个tcp的服务器

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------

use think\facade\Env;

// +----------------------------------------------------------------------
// | Swoole设置 php think swoole:server 命令行下有效
// +----------------------------------------------------------------------
return [
    // 扩展自身配置
    'host'         => '127.0.0.1', // 监听地址
    'port'         => 9502, // 监听端口
    'type'         => 'tcp', // 服务类型 支持 socket http server
    'mode'         => '', // 运行模式 默认为SWOOLE_PROCESS
    'sock_type'    => '', // sock type 默认为SWOOLE_SOCK_TCP
    'swoole_class' => '', // 自定义服务类名称
    'task_worker_num' => 8,

    // 可以支持swoole的所有配置参数
    'daemonize'    => false,
    'pid_file'     => Env::get('runtime_path') . 'swoole_server.pid',
    'log_file'     => Env::get('runtime_path') . 'swoole_server.log',

    // 事件回调定义
    'onConnect' => function ($server, $fd){
        echo " client : connect\n";
    },
    'onOpen'       => function ($server, $request) {
        echo "server: handshake success with fd{$request->fd}\n";
    },

    'onMessage' => function ($server, $frame) {
        echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
        $server->push($frame->fd, "this is server");
    },

    'onRequest' => function ($request, $response) {
        $response->end("<h1>Hello Swoole. #" . rand(1000, 9999) . "</h1>");
    },

    'onReceive' => function ($server, $fd, $from_id, $data){
        //worker进程
        //worker进程中向task_worker 进程投递新的任务
        $task_id = $server->task($data);
    },
    'onTask' => function ($server, $task_id, $from_id, $data){
        //task_worker 进程

        $task = new \app\common\service\SwooleTask();
        $task->do($data);
        //告诉worker 进程,task_worker进程 执行完成了。task_worker 进程将执行的结果 通过 finish() 通知给worker 进程
        $server->finish("ok");
    },

    'onClose' => function (Swoole\Server $ser, int $fd, int $reactorId) {
        //worker 进程
        echo "client {$fd} closed\n";
        $ser->close($fd);
    },
    'onFinish' => function ($server, $task_id,  $data){
        //worker 进程
        //服务器主动关闭TCP连接
        //$server->close($task_id);       //主动关闭连接
        //\think\facade\Log::record('user login task finish', 'info');
    }


];

在onTask中,我们实例化了 \app\common\service\SwooleTask() 然后执行了$task->do($data);方法,用来执行耗时的操作。

如何去运行 swoole 创建的tcp服务器?

我们查看刚才安装的swoole sdk, 文件在 vendor\thinkcmfcmf-swoole\src\command\server.php ; command 文件夹内是 cli 模式下执行的命令。
2020-07-08T09:53:57.png

最终

首先启动tcp 服务器

php think swoole:server 

然后,在要执行访问有赞云接口的方法中,sleep(50) ,休息50秒,模拟一下。


<?php


namespace app\common\service;


use think\Db;

class SwooleTask
{
    //执行异步任务
    public function do(string $data){
        $start = time();
        sleep(50);
        $end = time();
        Db::name('swoole_log')->insert(['content' => $data . ";start:" .$start . ';end:' . $end, 'create_time' => time()]);
        return true;
    }
}

当后台添加了用户后,直接返回了成功,投递了任务给tcp服务器,然后等待了50秒后, 再执行一个插入数据表的操作(模拟访问有赞云的接口,并等待了很长时间)。 果然在表中插入了数据。 到此验证完毕。

添加新评论