Entwickler, der außergewöhnliche CRM- und Laravel-Lösungen liefert

Als erfahrener Entwickler spezialisiere ich mich auf Laravel- und Vue.js-Entwicklung, die Implementierung von Vtiger CRM sowie auf vielfältige WordPress-Projekte. Meine Arbeit zeichnet sich durch kreative, dynamische und benutzerzentrierte Weblösungen aus, die individuell an die Bedürfnisse meiner Kunden angepasst werden.

Das "Transactional Outbox"-Muster ist eine zuverlässige Methode, um die Datenkonsistenz zwischen Microservices und anderen Systemen zu gewährleisten, insbesondere in Systemen mit verteilten Transaktionen. In Laravel kann dieses Muster wie folgt implementiert werden:

Implementierung des Transactional Outbox-Patterns in Laravel

Das "Transactional Outbox"-Muster ist eine zuverlässige Methode, um die Datenkonsistenz zwischen Microservices und anderen Systemen zu gewährleisten, insbesondere in Systemen mit verteilten Transaktionen. In Laravel kann dieses Muster wie folgt implementiert werden:

  1. Erstellung einer Tabelle zur Speicherung von Ereignissen (outbox_messages)

Der erste Schritt ist die Erstellung einer Migration, die eine Tabelle zur Speicherung von ausgehenden Nachrichten oder Ereignissen erstellt:

<?php

declare(strict_types=1);


use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class () extends Migration {
    public function up(): void
    {
        Schema::create('outbox_messages', function (Blueprint $table): void {
            $table->id();
            $table->string('job_class');
            $table->json('payload');
            $table->integer('attempts')->default(0);
            $table->longText('description')->nullable();
            $table->string('status')->default(\App\Enums\OutboxMessageStatusEnum::PENDING);
            $table->string('queue_type')->default(\App\Enums\QueueTypeEnum::DEFAULT);
            $table->timestamps();
        });
    }

    public function down(): void
    {
        Schema::dropIfExists('outbox_messages');
    }
};

Wir speichern also die Jobklasse (job_class), die Nutzlast (payload), die Anzahl der Zustellversuche, die Fehlerbeschreibung, den Status und den Warteschlangentyp.

  1. Erstellung des OutboxMessage-Modells

Um die Arbeit mit der Tabelle zu erleichtern, erstellen Sie ein Modell, das festlegt, welche Felder für die Massenbefüllung zur Verfügung stehen und welche Typumwandlungen für sie gelten.

<?php

declare(strict_types=1);


namespace App\Models;


use App\Enums\OutboxMessageStatusEnum;
use App\Enums\QueueTypeEnum;
use Database\Factories\OutboxMessageFactory;
use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;

class OutboxMessage extends Model
{
    /** @use HasFactory<OutboxMessageFactory> */
    use HasFactory;

    protected $fillable = [
        'job_class',
        'payload',
        'attempts',
        'description',
        'status',
        'queue_type',
    ];

    protected function casts(): array
    {
        return [
            'payload' => 'array',
            'attempts' => 'int',
            'status' => OutboxMessageStatusEnum::class,
            'queue_type' => QueueTypeEnum::class,
       ];
    }
}
  1. Enum-Definition für Status und Warteschlangentypen

Zur Kontrolle des Nachrichtenstatus und des Warteschlangentyps ist es praktisch, Enum zu verwenden. Beispiel für zwei Aufzählungen:

<?php

declare(strict_types=1);

namespace App\Enums;

enum OutboxMessageStatusEnum: string
{
    case PENDING = 'pending';
    case SENT = 'sent';
    case ERROR = 'error';
}

– QueueTypeEnum:

<?php

declare(strict_types=1);

namespace App\Enums;

enum QueueTypeEnum: string
{
    case DEFAULT = 'default';
    case BILLY = 'invoice';
}
  1. Erstellung eines DTO für Outbox-Nachrichten

Um Ereignisdaten bequem zu übertragen, erstelle ein Data Transfer Object (DTO):

<?php

declare(strict_types=1);

namespace App\Data\OutboxMessage;

use App\Enums\OutboxMessageStatusEnum;
use App\Enums\QueueTypeEnum;
use Illuminate\Contracts\Queue\ShouldQueue;
use Spatie\LaravelData\Data;

final class OutboxMessageData extends Data
{
    /**
     * @param  class-string<ShouldQueue>  $job_class
     * @param  array<string, scalar>  $payload
     */
    public function __construct(
        public string $job_class,
        public array $payload,
        public int $attempts = 0,
        public OutboxMessageStatusEnum $status = OutboxMessageStatusEnum::PENDING,
        public QueueTypeEnum $queue_type = QueueTypeEnum::DEFAULT,
        public ?int $id = null,
        public ?string $description = null,
    ) {}

    /**
     * @return array<string, mixed>
     */
    public function toEntityArray(): array
    {
        $data = $this->toArray();
        unset($data['id']);

        return $data;
    }
}
  1. Action zum Erstellen einer Outbox-Nachricht

Beschreiben wir die Aktion (Action), die die Nachricht in der Datenbank speichert:

<?php

declare(strict_types=1);

namespace App\Actions\OutboxMessage;

use App\Data\OutboxMessage\OutboxMessageData;
use App\Models\OutboxMessage;
use Lorisleiva\Actions\Concerns\AsAction;

/**
 * @method static OutboxMessageData run(OutboxMessageData $outboxMessageData)
 */
final class CreateOutboxMessage
{
    use AsAction;

    public function handle(OutboxMessageData $message): OutboxMessageData
    {
        $messageModel = OutboxMessage::create($message->toEntityArray());
        $message->id = $messageModel->id;

        return $message;
    }
}
  1. Erstellung eines Jobs zum Senden von Ereignissen

Erstellen wir nun eine Laravel Job-Klasse, die das Senden der Nachricht übernimmt. In diesem Beispiel bezieht sich das Ereignis beispielsweise auf eine Kontoaktualisierung:

<?php

declare(strict_types=1);

namespace App\Jobs;

use App\Actions\Invoices\UpdateInvoiceFromBilly;
use App\Actions\OutboxMessage\CreateOutboxMessage;
use App\Data\OutboxMessage\OutboxMessageData;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

final class InvoiceUpdatedQueueJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(
        public int $id
    ) {}

    public static function dispatchOutbox(int $id): void
    {
        CreateOutboxMessage::run(new OutboxMessageData(
            job_class: self::class,
            payload: ['id' => $id],
        ));
    }

    public function handle(): void
    {
        app(UpdateInvoiceFromBilly::class)->handle($this->id);
    }
}

Beachten Sie die Methode dispatchOutbox – dank ihr wird beim Aufruf von InvoiceUpdatedQueueJob::dispatchOutbox($id) die Nachricht in der Tabelle outbox_messages gespeichert.

  1. Verarbeitung von Outbox-Nachrichten

Um die in der Tabelle gesammelten Ereignisse zu versenden, kann ein Konsolenbefehl erstellt werden. Ein Beispiel für einen Befehl, der alle ausstehenden Nachrichten abruft und an RabbitMQ sendet, sieht wie folgt aus:

<?php

declare(strict_types=1);

namespace App\Console\Commands\OutboxMessage;

use App\Actions\OutboxMessage\DispatchOutboxMessage;
use App\Actions\OutboxMessage\GetAllPendingMessages;
use App\Data\OutboxMessage\OutboxMessageData;
use Illuminate\Console\Command;

final class OutboxProcessCommand extends Command
{
    protected $signature = 'outbox:process';

    protected $description = 'Send Outbox messages to Rabbit MQ';

    public function handle(): void
    {
        $messages = GetAllPendingMessages::run();
        $this->withProgressBar($messages, function (OutboxMessageData $messageData): void {
            DispatchOutboxMessage::run($messageData);
        });
    }
}

Action DispatchOutboxMessage ist für Sendeversuche, Fehlerbehandlung und Aktualisierung des Nachrichtenstatus zuständig. Ein Beispiel für die Implementierung von DispatchOutboxMessage finden Sie im Quellcode.

  1. Zusätzliche Befehle

Um die Tabelle sauber zu halten, kann ein Befehl zum Löschen alter Nachrichten hinzugefügt werden:

<?php

declare(strict_types=1);


namespace App\Actions\OutboxMessage;

use App\Enums\OutboxMessageStatusEnum;
use App\Models\OutboxMessage;
use Carbon\Carbon;
use Lorisleiva\Actions\Concerns\AsAction;

/**
 * @method static void run()
 */
final class DeleteOldEvents
{
    use AsAction;

    public const int DAYS_TO_KEEP = 60;

    public function handle(): void
    {
        OutboxMessage::where('status', OutboxMessageStatusEnum::SENT)
            ->where('created_at', '<', Carbon::now()->subDays(self::DAYS_TO_KEEP))
            ->delete();
    }
}

Und auch den Konsolenbefehl zum Löschen alter Nachrichten:

<?php

declare(strict_types=1);

namespace App\Console\Commands\OutboxMessage;

use App\Actions\OutboxMessage\DeleteOldEvents;
use Illuminate\Console\Command;

final class MessagesDeleteOldCommand extends Command
{
    protected $signature = 'messages:delete-old';

    protected $description = 'Delete old outbox messages';

    public function handle(): void
    {
        DeleteOldEvents::run();
        $this->info('Messages has been deleted');
    }
}

Wenn es notwendig ist, das Senden eines bestimmten Ereignisses zu wiederholen, kann man den OutboxRetryCommand implementieren:

<?php

declare(strict_types=1);

namespace App\Console\Commands\OutboxMessage;

use App\Actions\OutboxMessage\DispatchOutboxMessage;
use App\Actions\OutboxMessage\GetOutboxMessageById;
use Illuminate\Console\Command;

final class OutboxRetryCommand extends Command
{
    protected $signature = 'outbox:retry {id}';

    protected $description = 'Retry event by its ID';

    public function handle(): void
    {
        $id = (int) $this->argument('id');
        $message = GetOutboxMessageById::run($id);
        DispatchOutboxMessage::run($message);
    }
}

Fazit

In diesem Artikel haben wir einen Ansatz zur Implementierung von Transactional Outbox in Laravel betrachtet. Das Outbox-Messages-Muster ermöglicht Folgendes:

  • Die Zustellung von Ereignissen auch bei vorübergehender Nichtverfügbarkeit des Message Brokers (RabbitMQ) zu garantieren;
  • Den Ereignisverlauf für weitere Analysen und die Wiedergabe zu speichern;
  • Die Desynchronisation zwischen Datenbank und Warteschlange auszuschließen.

Führen Sie einen Konsolenbefehl aus (z. B. per Cron oder Schedule), um die angesammelten Nachrichten regelmäßig zu verarbeiten. Fügen Sie bei Bedarf Befehle zum Löschen des Verlaufs oder zum erneuten Senden von Ereignissen hinzu.

Ich hoffe, diese Anleitung hilft Ihnen, eine zuverlässige und ausfallsichere Interaktion zwischen Ihren Anwendungen zu implementieren. Wenn Sie Fragen oder Anregungen haben, schreiben Sie in die Kommentare oder kontaktieren Sie mich direkt!