laravel-pub-sub-adapter maintained by elvenpath
Laravel Pub/Sub Messaging Library
A robust publish-subscribe messaging system for Laravel applications supporting either Kafka or Redis as the message transport (configured via driver setting).
Features
- Configurable Transport: Choose between Kafka or Redis as your message bus driver
- Attribute-based Configuration: Simple declaration using PHP 8 attributes
- Auto-discovery: Automatic handler and publisher discovery
- Message Factory: Automatic serialization/deserialization with complex type support
- Priority Handlers: Control handler execution order with priorities
- Artisan Commands: Built-in CLI tools for message management
- PHPStan Max Level: Full static analysis support
- Well Tested: Comprehensive test coverage
Requirements
- PHP ^8.2
- Laravel ^10.0 or ^11.0
- ext-json
- ext-kafka (optional - only required when using Kafka driver)
- mateusjunges/laravel-kafka (optional - only required when using Kafka driver)
Installation
Install the package via Composer:
# For Redis-only usage
composer require elvenpath/laravel-pub-sub-adapter
# For Kafka support, also install:
composer require mateusjunges/laravel-kafka
# And ensure php-kafka extension is installed
The service provider will be automatically registered via Laravel's package discovery.
Note: If you plan to use Kafka, you'll need to install the mateusjunges/laravel-kafka package and ensure the php-kafka extension is available. For Redis-only usage, no additional dependencies are required.
Choosing Your Message Transport
This package supports two message transport options:
- Redis: Simple pub/sub messaging, perfect for real-time notifications and lightweight messaging
- Kafka: Enterprise-grade streaming platform with durability, partitioning, and replay capabilities
Choose Redis for simpler use cases, or Kafka for high-throughput, mission-critical messaging systems.
Redis Driver Requirements
For Redis support, you need one of the following:
-
PhpRedis Extension (Recommended for production):
# Install via PECL pecl install redis -
Predis Package (Pure PHP, no extension required):
composer require predis/predis
Laravel will automatically detect which Redis client is available. Configure your Redis connection in config/database.php as per Laravel documentation.
Publishing Configuration
Publish the configuration file:
php artisan vendor:publish --provider="LaravelMessaging\MessagingServiceProvider"
This will create a config/messaging.php configuration file.
Configuration
Basic Configuration
Configure your message bus in config/messaging.php:
return [
// Message bus driver: 'kafka' or 'redis'
'driver' => env('MESSAGING_DRIVER', 'kafka'),
'service' => [
'name' => env('MESSAGING_SERVICE_NAME', env('APP_NAME', 'Laravel Messaging')),
'instance' => env('MESSAGING_SERVICE_INSTANCE', 'default'),
],
// Kafka Configuration
'kafka' => [],
// Redis Configuration
'redis' => [
'connection' => env('MESSAGING_REDIS_CONNECTION', 'pubsub'),
],
// Handler discovery
'handler_discovery' => [
'enabled' => env('MESSAGING_HANDLER_DISCOVERY', true),
'scan_paths' => [
app_path('Handlers') => 'App\\Handlers',
app_path('Messaging/Handlers') => 'App\\Messaging\\Handlers',
],
],
// Publisher discovery
'publisher_discovery' => [
'enabled' => env('MESSAGING_PUBLISHER_DISCOVERY', true),
'scan_paths' => [
app_path('Services') => 'App\\Services',
app_path('Http/Controllers') => 'App\\Http\\Controllers',
app_path('Jobs') => 'App\\Jobs',
app_path('Messaging/Publishers') => 'App\\Messaging\\Publishers',
],
],
// Legacy handler subscriptions
'subscriptions' => [
// Example:
// [
// 'topic' => 'user.events',
// 'handler' => App\\Handlers\\UserHandler::class,
// 'priority' => 100,
// ],
],
];
Environment Variables
Add these to your .env file:
# Message Bus Configuration
MESSAGING_DRIVER=redis
# Service Configuration
MESSAGING_SERVICE_NAME="My App"
MESSAGING_SERVICE_INSTANCE=default
# Redis Configuration (if using Redis)
MESSAGING_REDIS_CONNECTION=pubsub
# Discovery Configuration
MESSAGING_HANDLER_DISCOVERY=true
MESSAGING_PUBLISHER_DISCOVERY=true
Usage
Creating Messages
Messages are simple PHP classes that extend BaseMessage:
<?php declare(strict_types=1);
namespace App\Messages;
use LaravelMessaging\Messaging\BaseMessage;
class UserRegisteredMessage extends BaseMessage
{
public function __construct(
public readonly int $userId,
public readonly string $email,
public readonly string $name,
public readonly \DateTimeImmutable $registeredAt
) {
parent::__construct();
}
public static function getMessageType(): string
{
return 'user.registered';
}
}
Creating Message Handlers
Handlers process incoming messages. Use the #[MessageHandler] attribute to configure them:
<?php declare(strict_types=1);
namespace App\Handlers;
use LaravelMessaging\Messaging\Handler\MessageHandler;
use LaravelMessaging\Messaging\Handler\MessageHandlerInterface;
use LaravelMessaging\Messaging\MessageInterface;
use App\Messages\UserRegisteredMessage;
#[MessageHandler(
messageTypes: ['user.registered'],
priority: 100,
enabled: true
)]
class SendWelcomeEmailHandler implements MessageHandlerInterface
{
public function handle(MessageInterface $message): bool
{
if (!$message instanceof UserRegisteredMessage) {
return false;
}
// Send welcome email logic here
Mail::to($message->email)->send(new WelcomeEmail($message->name));
return true;
}
public static function getHandledMessageTypes(): array
{
return ['user.registered'];
}
}
Handler Attributes
messageTypes: Array of message types this handler processespriority: Higher priority handlers execute first (default: 100)enabled: Toggle handler on/off (default: true)
Creating Publishers
Publishers send messages to the message bus:
<?php declare(strict_types=1);
namespace App\Publishers;
use LaravelMessaging\Messaging\Publisher\MessagePublisher;
use LaravelMessaging\Messaging\Bus\MessageBusInterface;
use App\Messages\UserRegisteredMessage;
#[MessagePublisher(
messageTypes: ['user.registered'],
description: 'Publishes user registration events',
topics: ['user-events'],
enabled: true
)]
class UserEventPublisher
{
public function __construct(
private readonly MessageBusInterface $messageBus
) {}
public function publishUserRegistered(User $user): void
{
$message = new UserRegisteredMessage(
userId: $user->id,
email: $user->email,
name: $user->name,
registeredAt: new \DateTimeImmutable()
);
$this->messageBus->publish($message);
}
}
Multiple Handlers per Message
You can have multiple handlers for the same message type with different priorities:
#[MessageHandler(messageTypes: ['order.created'], priority: 200)]
class UpdateInventoryHandler implements MessageHandlerInterface
{
// Runs first (priority 200)
}
#[MessageHandler(messageTypes: ['order.created'], priority: 100)]
class SendOrderConfirmationHandler implements MessageHandlerInterface
{
// Runs second (priority 100)
}
#[MessageHandler(messageTypes: ['order.created'], priority: 50)]
class LogOrderHandler implements MessageHandlerInterface
{
// Runs last (priority 50)
}
Console Commands
Discover Messages
List all registered handlers and publishers:
# Discover all publishers and handlers
php artisan messages:discover
# Show only publishers
php artisan messages:discover --publishers
# Show only handlers
php artisan messages:discover --handlers
# Filter by message type
php artisan messages:discover --type=user.registered
# Show message flow mapping
php artisan messages:discover --flow
Consume Messages
Start consuming messages from the configured bus:
# Consume from all configured topics
php artisan messages:consume
# Consume from specific topic
php artisan messages:consume --topic=user.events
Publish Message
Publish a message via CLI:
# Publish with custom data
php artisan messages:publish user.registered \
--data='{"userId":123,"email":"user@example.com"}' \
--type="user.registered"
# Publish multiple messages
php artisan messages:publish test.topic --count=5
# Publish simple test message
php artisan messages:publish test.topic
Subscribe to Redis
For Redis pub/sub, start the subscriber:
# Subscribe to all configured Redis topics
php artisan messages:redis-subscribe
# Subscribe to specific topic
php artisan messages:redis-subscribe --topic=user.events
# Subscribe with timeout (in seconds)
php artisan messages:redis-subscribe --timeout=3600
Advanced Usage
Custom Message Bus
Implement your own message bus by extending AbstractMessageBus:
<?php declare(strict_types=1);
namespace App\Messaging;
use LaravelMessaging\Messaging\Bus\AbstractMessageBus;
use LaravelMessaging\Messaging\MessageInterface;
class CustomMessageBus extends AbstractMessageBus
{
public function publish(MessageInterface $message): void
{
// Your custom publishing logic
}
public function consume(array $options = []): void
{
// Your custom consuming logic
}
}
Message with Complex Types
The message factory handles complex type conversions automatically:
class ComplexMessage extends BaseMessage
{
public function __construct(
public readonly array $items,
public readonly ?string $optional,
public readonly CustomObject $customObject,
public readonly \DateTime $createdAt
) {
parent::__construct();
}
}
Conditional Handler Registration
Disable handlers based on environment or configuration:
#[MessageHandler(
messageTypes: ['payment.processed'],
enabled: APP_ENV !== 'testing' // Disable in tests
)]
class ChargePaymentHandler implements MessageHandlerInterface
{
// Handler implementation
}
Handler Registry Access
Access the handler registry directly:
use LaravelMessaging\Messaging\Handler\MessageHandlerRegistry;
$registry = app(MessageHandlerRegistry::class);
// Get all handlers for a message type
$handlers = $registry->getHandlersForMessageType('user.registered');
// Register a handler manually
$registry->registerHandler('user.registered', $handlerInstance, 100);
// Get all registered message types
$types = $registry->getRegisteredMessageTypes();
Testing
Unit Testing Handlers
<?php declare(strict_types=1);
namespace Tests\Unit\Handlers;
use Tests\TestCase;
use App\Handlers\SendWelcomeEmailHandler;
use App\Messages\UserRegisteredMessage;
class SendWelcomeEmailHandlerTest extends TestCase
{
public function test_handler_processes_user_registered_message(): void
{
$handler = new SendWelcomeEmailHandler();
$message = new UserRegisteredMessage(
userId: 1,
email: 'test@example.com',
name: 'Test User',
registeredAt: new \DateTimeImmutable()
);
$result = $handler->handle($message);
$this->assertTrue($result);
// Assert email was sent
}
}
Testing Publishers
public function test_publisher_sends_message_to_bus(): void
{
$messageBus = $this->createMock(MessageBusInterface::class);
$messageBus->expects($this->once())
->method('publish')
->with($this->isInstanceOf(UserRegisteredMessage::class));
$publisher = new UserEventPublisher($messageBus);
$user = User::factory()->create();
$publisher->publishUserRegistered($user);
}
Development
Running Tests
# Run all tests
make test
# With coverage
make coverage
Code Style
# Fix code style
make style-check
Static Analysis
# Run PHPStan
make static-analysis
Examples
Check the /examples directory for complete working examples:
ActivityLogMessage.php- Example message classActivityLogMessageHandler.php- Example handlerActivityLogPublisher.php- Example publisherUserRegisteredMessage.php- User registration messageUserRegisteredHandler.php- User registration handler
Contributing
Contributions are welcome! Please ensure:
- All tests pass (
make test) - Code style is fixed (
make style-check) - Static analysis passes (
make static-analysis) - New features include tests
- Documentation is updated
License
This package is open-sourced software licensed under the MIT license.
Support
For issues and questions, please use the GitHub issue tracker.
Credits
Changelog
Please see CHANGELOG for recent changes.