©Sergey Emelyanov 2025 | Alle Rechte vorbehalten
Das "Transactional Outbox"-Muster ist eine zuverlässige Methode, um die Datenkonsistenz zwischen Microservices und anderen Systemen zu gewährleisten, insbesondere in Systemen mit verteilten Transaktionen. In Laravel kann dieses Muster wie folgt implementiert werden:
Der erste Schritt ist die Erstellung einer Migration, die eine Tabelle zur Speicherung von ausgehenden Nachrichten oder Ereignissen erstellt:
<?php
declare(strict_types=1);
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class () extends Migration {
public function up(): void
{
Schema::create('outbox_messages', function (Blueprint $table): void {
$table->id();
$table->string('job_class');
$table->json('payload');
$table->integer('attempts')->default(0);
$table->longText('description')->nullable();
$table->string('status')->default(\App\Enums\OutboxMessageStatusEnum::PENDING);
$table->string('queue_type')->default(\App\Enums\QueueTypeEnum::DEFAULT);
$table->timestamps();
});
}
public function down(): void
{
Schema::dropIfExists('outbox_messages');
}
};
Wir speichern also die Jobklasse (job_class), die Nutzlast (payload), die Anzahl der Zustellversuche, die Fehlerbeschreibung, den Status und den Warteschlangentyp.
Um die Arbeit mit der Tabelle zu erleichtern, erstellen Sie ein Modell, das festlegt, welche Felder für die Massenbefüllung zur Verfügung stehen und welche Typumwandlungen für sie gelten.
<?php
declare(strict_types=1);
namespace App\Models;
use App\Enums\OutboxMessageStatusEnum;
use App\Enums\QueueTypeEnum;
use Database\Factories\OutboxMessageFactory;
use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;
class OutboxMessage extends Model
{
/** @use HasFactory<OutboxMessageFactory> */
use HasFactory;
protected $fillable = [
'job_class',
'payload',
'attempts',
'description',
'status',
'queue_type',
];
protected function casts(): array
{
return [
'payload' => 'array',
'attempts' => 'int',
'status' => OutboxMessageStatusEnum::class,
'queue_type' => QueueTypeEnum::class,
];
}
}
Zur Kontrolle des Nachrichtenstatus und des Warteschlangentyps ist es praktisch, Enum zu verwenden. Beispiel für zwei Aufzählungen:
<?php
declare(strict_types=1);
namespace App\Enums;
enum OutboxMessageStatusEnum: string
{
case PENDING = 'pending';
case SENT = 'sent';
case ERROR = 'error';
}
– QueueTypeEnum:
<?php
declare(strict_types=1);
namespace App\Enums;
enum QueueTypeEnum: string
{
case DEFAULT = 'default';
case BILLY = 'invoice';
}
Um Ereignisdaten bequem zu übertragen, erstelle ein Data Transfer Object (DTO):
<?php
declare(strict_types=1);
namespace App\Data\OutboxMessage;
use App\Enums\OutboxMessageStatusEnum;
use App\Enums\QueueTypeEnum;
use Illuminate\Contracts\Queue\ShouldQueue;
use Spatie\LaravelData\Data;
final class OutboxMessageData extends Data
{
/**
* @param class-string<ShouldQueue> $job_class
* @param array<string, scalar> $payload
*/
public function __construct(
public string $job_class,
public array $payload,
public int $attempts = 0,
public OutboxMessageStatusEnum $status = OutboxMessageStatusEnum::PENDING,
public QueueTypeEnum $queue_type = QueueTypeEnum::DEFAULT,
public ?int $id = null,
public ?string $description = null,
) {}
/**
* @return array<string, mixed>
*/
public function toEntityArray(): array
{
$data = $this->toArray();
unset($data['id']);
return $data;
}
}
Beschreiben wir die Aktion (Action), die die Nachricht in der Datenbank speichert:
<?php
declare(strict_types=1);
namespace App\Actions\OutboxMessage;
use App\Data\OutboxMessage\OutboxMessageData;
use App\Models\OutboxMessage;
use Lorisleiva\Actions\Concerns\AsAction;
/**
* @method static OutboxMessageData run(OutboxMessageData $outboxMessageData)
*/
final class CreateOutboxMessage
{
use AsAction;
public function handle(OutboxMessageData $message): OutboxMessageData
{
$messageModel = OutboxMessage::create($message->toEntityArray());
$message->id = $messageModel->id;
return $message;
}
}
Erstellen wir nun eine Laravel Job-Klasse, die das Senden der Nachricht übernimmt. In diesem Beispiel bezieht sich das Ereignis beispielsweise auf eine Kontoaktualisierung:
<?php
declare(strict_types=1);
namespace App\Jobs;
use App\Actions\Invoices\UpdateInvoiceFromBilly;
use App\Actions\OutboxMessage\CreateOutboxMessage;
use App\Data\OutboxMessage\OutboxMessageData;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
final class InvoiceUpdatedQueueJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public int $id
) {}
public static function dispatchOutbox(int $id): void
{
CreateOutboxMessage::run(new OutboxMessageData(
job_class: self::class,
payload: ['id' => $id],
));
}
public function handle(): void
{
app(UpdateInvoiceFromBilly::class)->handle($this->id);
}
}
Beachten Sie die Methode dispatchOutbox – dank ihr wird beim Aufruf von InvoiceUpdatedQueueJob::dispatchOutbox($id) die Nachricht in der Tabelle outbox_messages gespeichert.
Um die in der Tabelle gesammelten Ereignisse zu versenden, kann ein Konsolenbefehl erstellt werden. Ein Beispiel für einen Befehl, der alle ausstehenden Nachrichten abruft und an RabbitMQ sendet, sieht wie folgt aus:
<?php
declare(strict_types=1);
namespace App\Console\Commands\OutboxMessage;
use App\Actions\OutboxMessage\DispatchOutboxMessage;
use App\Actions\OutboxMessage\GetAllPendingMessages;
use App\Data\OutboxMessage\OutboxMessageData;
use Illuminate\Console\Command;
final class OutboxProcessCommand extends Command
{
protected $signature = 'outbox:process';
protected $description = 'Send Outbox messages to Rabbit MQ';
public function handle(): void
{
$messages = GetAllPendingMessages::run();
$this->withProgressBar($messages, function (OutboxMessageData $messageData): void {
DispatchOutboxMessage::run($messageData);
});
}
}
Action DispatchOutboxMessage ist für Sendeversuche, Fehlerbehandlung und Aktualisierung des Nachrichtenstatus zuständig. Ein Beispiel für die Implementierung von DispatchOutboxMessage finden Sie im Quellcode.
Um die Tabelle sauber zu halten, kann ein Befehl zum Löschen alter Nachrichten hinzugefügt werden:
<?php
declare(strict_types=1);
namespace App\Actions\OutboxMessage;
use App\Enums\OutboxMessageStatusEnum;
use App\Models\OutboxMessage;
use Carbon\Carbon;
use Lorisleiva\Actions\Concerns\AsAction;
/**
* @method static void run()
*/
final class DeleteOldEvents
{
use AsAction;
public const int DAYS_TO_KEEP = 60;
public function handle(): void
{
OutboxMessage::where('status', OutboxMessageStatusEnum::SENT)
->where('created_at', '<', Carbon::now()->subDays(self::DAYS_TO_KEEP))
->delete();
}
}
Und auch den Konsolenbefehl zum Löschen alter Nachrichten:
<?php
declare(strict_types=1);
namespace App\Console\Commands\OutboxMessage;
use App\Actions\OutboxMessage\DeleteOldEvents;
use Illuminate\Console\Command;
final class MessagesDeleteOldCommand extends Command
{
protected $signature = 'messages:delete-old';
protected $description = 'Delete old outbox messages';
public function handle(): void
{
DeleteOldEvents::run();
$this->info('Messages has been deleted');
}
}
Wenn es notwendig ist, das Senden eines bestimmten Ereignisses zu wiederholen, kann man den OutboxRetryCommand implementieren:
<?php
declare(strict_types=1);
namespace App\Console\Commands\OutboxMessage;
use App\Actions\OutboxMessage\DispatchOutboxMessage;
use App\Actions\OutboxMessage\GetOutboxMessageById;
use Illuminate\Console\Command;
final class OutboxRetryCommand extends Command
{
protected $signature = 'outbox:retry {id}';
protected $description = 'Retry event by its ID';
public function handle(): void
{
$id = (int) $this->argument('id');
$message = GetOutboxMessageById::run($id);
DispatchOutboxMessage::run($message);
}
}
Fazit
In diesem Artikel haben wir einen Ansatz zur Implementierung von Transactional Outbox in Laravel betrachtet. Das Outbox-Messages-Muster ermöglicht Folgendes:
Führen Sie einen Konsolenbefehl aus (z. B. per Cron oder Schedule), um die angesammelten Nachrichten regelmäßig zu verarbeiten. Fügen Sie bei Bedarf Befehle zum Löschen des Verlaufs oder zum erneuten Senden von Ereignissen hinzu.
Ich hoffe, diese Anleitung hilft Ihnen, eine zuverlässige und ausfallsichere Interaktion zwischen Ihren Anwendungen zu implementieren. Wenn Sie Fragen oder Anregungen haben, schreiben Sie in die Kommentare oder kontaktieren Sie mich direkt!
©Sergey Emelyanov 2025 | Alle Rechte vorbehalten