Skip to content

队列

介绍

lightbulb

Laravel 现在提供了 Horizon,一个用于 Redis 驱动队列的美丽仪表板和配置系统。查看完整的 Horizon 文档 以获取更多信息。

Laravel 队列为各种不同的队列后端提供了统一的 API,例如 Beanstalk、Amazon SQS、Redis,甚至是关系数据库。队列允许您推迟处理耗时的任务,例如发送电子邮件,直到稍后的时间。推迟这些耗时的任务可以大大加快应用程序的 Web 请求速度。

队列配置文件存储在 config/queue.php 中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、BeanstalkdAmazon SQSRedis 和一个同步驱动程序,该驱动程序将立即执行作业(用于本地使用)。还包括一个 null 队列驱动程序,该驱动程序会丢弃排队的作业。

连接与队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在您的 config/queue.php 配置文件中,有一个 connections 配置选项。此选项定义了与后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的作业堆栈或堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue 属性中定义的队列中:

php
// 这个作业被发送到默认队列...
Job::dispatch();

// 这个作业被发送到 "emails" 队列...
Job::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您按优先级指定应处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个工作者,给予它们更高的处理优先级:

php
php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

为了使用 database 队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table Artisan 命令。创建迁移后,您可以使用 migrate 命令迁移数据库:

php
php artisan queue:table

php artisan migrate

Redis

为了使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

阻塞

使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在进入工作者循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据您的队列负载调整此值比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5,以指示驱动程序在等待作业可用时应阻塞五秒钟:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],
exclamation

阻塞 pop 是一个实验性功能。如果 Redis 服务器或工作者在检索作业时崩溃,则有可能丢失排队的作业。

其他驱动程序先决条件

以下依赖项是列出的队列驱动程序所需的:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

创建作业

生成作业类

默认情况下,应用程序的所有可排队作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时将创建它。您可以使用 Artisan CLI 生成一个新的排队作业:

php
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 作业应被推送到队列以异步运行。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在此示例中,我们假装管理一个播客发布服务,并需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 SerializesModels trait,Eloquent 模型将在作业处理时被优雅地序列化和反序列化。如果您的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例。这对您的应用程序来说是完全透明的,并防止了序列化完整 Eloquent 模型实例时可能出现的问题。

handle 方法在作业被队列处理时调用。请注意,我们能够在作业的 handle 方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,您可以随意调用 handle 方法。通常,您应该从 服务提供者 中调用此方法:

php
use App\Jobs\ProcessPodcast;

$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
    return $job->handle($app->make(AudioProcessor::class));
});
exclamation

二进制数据,例如原始图像内容,应在传递给排队作业之前通过 base64_encode 函数传递。否则,作业在放置到队列时可能无法正确序列化为 JSON。

调度作业

编写作业类后,您可以使用作业本身的 dispatch 方法调度它。传递给 dispatch 方法的参数将传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast);
    }
}

延迟调度

如果您希望延迟执行排队作业,可以在调度作业时使用 delay 方法。例如,让我们指定一个作业在调度后 10 分钟内不可用进行处理:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)
                ->delay(now()->addMinutes(10));
    }
}
exclamation

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

同步调度

如果您希望立即(同步)调度作业,可以使用 dispatchNow 方法。使用此方法时,作业不会被排队,而是在当前进程中立即运行:

php
<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use App\Jobs\ProcessPodcast;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatchNow($podcast);
    }
}

作业链

作业链允许您指定一系列应按顺序运行的排队作业。如果序列中的一个作业失败,其余作业将不会运行。要执行排队作业链,您可以在任何可调度作业上使用 withChain 方法:

php
ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch();
exclamation

使用 $this->delete() 方法删除作业不会阻止链式作业被处理。链只会在链中的作业失败时停止执行。

链连接和队列

如果您希望为链式作业指定默认连接和队列,可以使用 allOnConnectionallOnQueue 方法。这些方法指定应使用的队列连接和队列名称,除非排队作业被显式分配了不同的连接/队列:

php
ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以“分类”排队作业,甚至可以优先考虑为各种队列分配多少工作者。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而仅推送到单个连接中的特定队列。要指定队列,请在调度作业时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}

调度到特定连接

如果您正在处理多个队列连接,可以指定将作业推送到哪个连接。要指定连接,请在调度作业时使用 onConnection 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

您可以链式调用 onConnectiononQueue 方法,以指定作业的连接和队列:

php
ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

指定最大作业尝试次数/超时值

最大尝试次数

指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关:

php
php artisan queue:work --tries=3

但是,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

基于时间的尝试

作为定义作业可以尝试的最大次数的替代方法,您可以定义作业应超时的时间。这允许作业在给定时间范围内尝试任意次数。要定义作业应超时的时间,请在作业类中添加 retryUntil 方法:

php
/**
 * 确定作业应超时的时间。
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addSeconds(5);
}
lightbulb

您还可以在排队的事件监听器上定义 retryUntil 方法。

超时

exclamation

timeout 功能针对 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。

同样,作业可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout 开关指定:

php
php artisan queue:work --timeout=30

但是,您还可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业在超时前可以运行的秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

速率限制

exclamation

此功能要求您的应用程序可以与 Redis 服务器 交互。

如果您的应用程序与 Redis 交互,您可以按时间或并发限制排队作业的速率。当您的排队作业与同样有限速率的 API 交互时,此功能可以提供帮助。

例如,使用 throttle 方法,您可以限制给定类型的作业每 60 秒仅运行 10 次。如果无法获得锁,您通常应该将作业释放回队列,以便稍后重试:

php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // 作业逻辑...
}, function () {
    // 无法获得锁...

    return $this->release(10);
});
lightbulb

在上面的示例中,key 可以是任何唯一标识您希望限制速率的作业类型的字符串。例如,您可能希望根据作业的类名和它操作的 Eloquent 模型的 ID 构建键。

exclamation

将限速作业释放回队列仍会增加作业的总 attempts 数。

或者,您可以指定可以同时处理给定作业的最大工作者数量。当排队作业正在修改一个应该只由一个作业一次修改的资源时,这可能会有所帮助。例如,使用 funnel 方法,您可以限制给定类型的作业仅由一个工作者同时处理:

php
Redis::funnel('key')->limit(1)->then(function () {
    // 作业逻辑...
}, function () {
    // 无法获得锁...

    return $this->release(10);
});
lightbulb

使用速率限制时,作业成功运行所需的尝试次数可能很难确定。因此,将速率限制与 基于时间的尝试 结合使用是有用的。

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便再次尝试。作业将继续被释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到

队列闭包

除了将作业类调度到队列之外,您还可以调度闭包。这对于需要在当前请求周期之外执行的快速简单任务非常有用:

php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

将闭包调度到队列时,闭包的代码内容会被加密签名,以便在传输过程中无法修改。

运行队列工作者

Laravel 包含一个队列工作者,将在作业被推送到队列时处理新作业。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

php
php artisan queue:work
lightbulb

要使 queue:work 进程永久在后台运行,您应该使用进程监视器,例如 Supervisor,以确保队列工作者不会停止运行。

请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重新启动队列工作者

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

php
php artisan queue:work redis

您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

php
php artisan queue:work redis --queue=emails

处理单个作业

--once 选项可用于指示工作者仅处理队列中的单个作业:

php
php artisan queue:work --once

处理所有排队作业然后退出

--stop-when-empty 选项可用于指示工作者处理所有作业然后优雅地退出。当在 Docker 容器中使用 Laravel 队列时,如果希望在队列为空后关闭容器,此选项非常有用:

php
php artisan queue:work --stop-when-empty

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成后应使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在 config/queue.php 中,您可以将 redis 连接的默认 queue 设置为 low。但是,有时您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,确保所有 high 队列作业在继续处理 low 队列中的任何作业之前被处理,请将逗号分隔的队列名称列表传递给 work 命令:

php
php artisan queue:work --queue=high,low

队列工作者与部署

由于队列工作者是长时间运行的进程,因此在不重新启动的情况下不会拾取代码更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart 命令优雅地重新启动所有工作者:

php
php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业后优雅地“死亡”,以确保没有现有作业丢失。由于在执行 queue:restart 命令时队列工作者将死亡,您应该运行一个进程管理器,例如 Supervisor,以自动重新启动队列工作者。

lightbulb

队列使用 缓存 存储重启信号,因此在使用此功能之前,您应验证应用程序已正确配置缓存驱动程序。

作业过期和超时

作业过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业在 90 秒内未被删除,它将被释放回队列。通常,您应该将 retry_after 值设置为作业合理完成处理所需的最大秒数。

exclamation

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。--timeout 选项指定 Laravel 队列主进程在终止处理作业的子队列工作者之前将等待的时间。有时,子队列进程可能由于各种原因而“冻结”,例如外部 HTTP 调用未响应。--timeout 选项会删除超过指定时间限制的冻结进程:

php
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项不同,但协同工作以确保作业不会丢失,并且作业仅成功处理一次。

exclamation

--timeout 值应始终比 retry_after 配置值短几秒钟。这将确保在重试作业之前,处理给定作业的工作者始终被终止。如果 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

工作者休眠时间

当队列上有作业可用时,工作者将继续处理作业而不延迟。然而,sleep 选项决定了如果没有新作业可用,工作者将“休眠”的时间(以秒为单位)。在休眠期间,工作者不会处理任何新作业 - 作业将在工作者再次醒来后处理。

php
php artisan queue:work --sleep=3

Supervisor 配置

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work 进程失败,将自动重新启动它。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

bash
sudo apt-get install supervisor
lightbulb

如果自己配置 Supervisor 听起来很复杂,请考虑使用 Laravel Forge,它将自动为您的 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视一个 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在此示例中,numprocs 指令将指示 Supervisor 运行 8 个 queue:work 进程并监视所有进程,如果它们失败,将自动重新启动它们。您应更改 command 指令中的 queue:work sqs 部分,以反映您所需的队列连接。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

bash
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的作业

有时您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。要为 failed_jobs 表创建迁移,您可以使用 queue:failed-table 命令:

bash
php artisan queue:failed-table

php artisan migrate

然后,在运行 队列工作者 时,您应使用 queue:work 命令上的 --tries 开关指定作业应尝试的最大次数。如果您未为 --tries 选项指定值,作业将无限期尝试:

bash
php artisan queue:work redis --tries=3

清理失败的作业

您可以直接在作业类上定义一个 failed 方法,允许您在发生故障时执行作业特定的清理。这是向用户发送警报或撤销作业执行的任何操作的理想位置。导致作业失败的 Exception 将传递给 failed 方法:

php
<?php

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }

    /**
     * 作业处理失败。
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 发送用户通知失败等...
    }
}

失败作业事件

如果您希望注册一个在作业失败时调用的事件,可以使用 Queue::failing 方法。此事件是通过电子邮件或 Slack 通知您的团队的绝佳机会。例如,我们可以从 Laravel 附带的 AppServiceProvider 中附加一个回调到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

重试失败的作业

要查看已插入到 failed_jobs 数据库表中的所有失败作业,您可以使用 queue:failed Artisan 命令:

bash
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列和失败时间。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 5 的失败作业,请发出以下命令:

bash
php artisan queue:retry 5

要重试所有失败的作业,请执行 queue:retry 命令并传递 all 作为 ID:

bash
php artisan queue:retry all

如果您希望删除失败的作业,可以使用 queue:forget 命令:

bash
php artisan queue:forget 5

要删除所有失败的作业,可以使用 queue:flush 命令:

bash
php artisan queue:flush

忽略缺失的模型

将 Eloquent 模型注入作业时,它会在放置到队列之前自动序列化,并在作业处理时恢复。然而,如果模型在作业等待工作者处理时被删除,您的作业可能会因 ModelNotFoundException 而失败。

为了方便起见,您可以选择通过将作业的 deleteWhenMissingModels 属性设置为 true 来自动删除缺失模型的作业:

php
/**
 * 如果模型不再存在,则删除作业。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

作业事件

使用 Queue facade 上的 beforeafter 方法,您可以指定在处理排队作业之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,您应该从 服务提供者 中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

使用 Queue facade 上的 looping 方法,您可以指定在工作者尝试从队列获取作业之前执行的回调。例如,您可以注册一个闭包,以回滚任何由先前失败的作业留下的事务:

php
Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});