Primordyx Framework Documentation

MessageQueue
in package

Class MessageQueue

A simple event queue system for handling asynchronous, file-based message passing between processes or components. Events are stored as JSON files and dispatched using a user-supplied dispatcher callable. Failed and completed jobs are routed to respective directories.

The MessageQueue operates on a simple file-based queue system:

  1. Events are queued as JSON files in the pending directory
  2. A processor reads and dispatches events using a user-provided callable
  3. Successfully handled jobs are moved to completed directory (or deleted)
  4. Unhandled jobs remain in pending directory for future processing
  5. Failed jobs are moved to failed directory for later inspection

Event Processing States:

  • Handled (dispatcher returns true): Job moved to completed directory or deleted
  • Not Handled (dispatcher returns false): Job remains in pending for retry/other processors
  • Failed (dispatcher throws exception): Job moved to failed directory for inspection

Configuration: The MessageQueue must be configured with a base directory before use via the configure() method. This base directory will contain three subdirectories: pending, failed, and completed. All paths are explicitly set and no application constants are required.

File Structure:

  • Base directory: /path/to/messagequeue/
  • Pending jobs: {base}/pending/{timestamp}_{event}.json
  • Failed jobs: {base}/failed/{timestamp}_{event}.json
  • Completed jobs: {base}/completed/{timestamp}_{event}.json
  • Job format: {"event": "string", "args": }, "timestamp": 123456789}
  • Uses file locking to prevent concurrent processing

Dependencies:

  • Requires explicit configuration via configure() method before use
  • Creates directories automatically with 0777 permissions
  • Base directory must be writable by the process
  • Dispatcher callable must return true/false to indicate handling status
Tags
since
1.0.0
example
// Configure the message queue with a base directory
MessageQueue::configure('/var/app/queue');
example
// Publish a new event to the queue
MessageQueue::publish('user.registered', ['user_id' => 123, 'email' => 'test@example.com']);
example
// Consume all pending events from the queue
MessageQueue::consume(function($event, $args) {
    return match($event) {
        'user.registered' => (EmailService::sendWelcome($args['email']), true),
        'order.completed' => (NotificationService::notify($args['user_id']), true),
        default => false // Not handled - leave in pending
    };
});
see
MessageQueue::configure()

For initial setup

see
MessageQueue::publish()

For publishing events to the queue

see
MessageQueue::consume()

For processing the queue

see
MessageQueue::count()

For checking queue status

Table of Contents

Properties

$completedDir  : string|null
$configured  : bool
$failedDir  : string|null
$keepCompleted  : bool
$pendingDir  : string|null

Methods

completedDir()  : string
Get or set the directory used for completed jobs.
configure()  : void
Configure the MessageQueue with a base directory for queue operations.
consume()  : void
Consume all pending messages by dispatching them using the provided callable.
count()  : int
Get the number of messages currently in the queue.
failedDir()  : string
Get or set the directory used for failed jobs.
isConfigured()  : bool
Check if the MessageQueue has been properly configured.
keepCompleted()  : bool
Get or set whether completed job files should be retained after processing.
pendingDir()  : string
Get or set the directory used for pending jobs.
publish()  : void
Publish a new event message to the queue for later processing.
ensureDirectories()  : void
Ensure that all necessary directories exist and are properly set up.
requireConfiguration()  : void
Validate that the MessageQueue has been configured before use.

Properties

$completedDir

protected static string|null $completedDir = null

Directory for completed job files

$configured

protected static bool $configured = false

Whether the message queue has been configured

$failedDir

protected static string|null $failedDir = null

Directory for failed job files

$keepCompleted

protected static bool $keepCompleted = true

Whether to keep completed job files or delete them after processing

$pendingDir

protected static string|null $pendingDir = null

Directory for pending job files

Methods

completedDir()

Get or set the directory used for completed jobs.

public static completedDir([string|null $path = null ]) : string

If a path is provided, updates the directory and returns the previous value. Ensures the new directory exists and is writable. This is where successfully processed job files are moved (if keepCompleted is true), providing an audit trail of all completed work.

The completed directory is useful for debugging, auditing, and understanding system activity. Files here can be safely deleted periodically to manage disk space, or archived for long-term record keeping.

Parameters
$path : string|null = null

Optional new absolute path to set. If null, returns current path without changing it.

Tags
throws
RuntimeException

If MessageQueue not configured or path not writable.

example
// Get current completed directory
$current = MessageQueue::completedDir();
example
// Set new completed directory
$old = MessageQueue::completedDir('/var/queue/completed');
see
MessageQueue::configure()

Recommended method for initial setup

see
MessageQueue::consume()

Moves completed jobs to this directory

see
MessageQueue::keepCompleted()

Controls whether files are moved here

see
MessageQueue::ensureDirectories()

Creates directory if needed

Return values
string

The previous completed directory path.

configure()

Configure the MessageQueue with a base directory for queue operations.

public static configure(string $basePath) : void

This method must be called before using any other MessageQueue methods. It sets up the directory structure needed for queue operations by creating three subdirectories under the provided base path: pending, failed, and completed.

Directory Structure Created:

  • {basePath}/pending - Where new jobs are queued
  • {basePath}/failed - Where failed jobs are moved for inspection
  • {basePath}/completed - Where successful jobs are archived (if keepCompleted is true)

The base directory and all subdirectories are created automatically if they don't exist, with 0777 permissions (subject to umask). The base path must be writable by the process that will be creating and processing jobs.

Configuration is idempotent - calling this method multiple times with the same path is safe and will not cause issues. However, changing the path after jobs have been queued may result in those jobs being "lost" until the original path is restored.

Parameters
$basePath : string

Absolute path to the base directory for queue operations. Must be writable by the process. Subdirectories will be created here.

Tags
throws
RuntimeException

If the base path cannot be created or is not writable.

example
// Configure with application-specific queue directory
MessageQueue::configure('/var/app/queue');
example
// Configure with tenant-specific directory
$tenant = getCurrentTenant();
MessageQueue::configure("/var/queues/$tenant");
example
// Configure with temporary directory for testing
MessageQueue::configure('/tmp/test_queue');
see
MessageQueue::ensureDirectories()

Creates the directory structure

see
MessageQueue::isConfigured()

Check if configuration has been completed

consume()

Consume all pending messages by dispatching them using the provided callable.

public static consume(callable $dispatcher) : void

Iterates through all JSON files in the pending directory and calls the provided dispatcher function for each valid job. Uses a lock file to prevent multiple processes from running simultaneously, ensuring jobs are not processed twice.

Message Processing Flow:

  1. Acquires exclusive lock to prevent concurrent processing
  2. Reads each .json file in pending directory
  3. Calls dispatcher with event name and arguments
  4. If dispatcher returns true: moves to completed directory or deletes (based on keepCompleted setting)
  5. If dispatcher returns false: leaves job in pending directory for future processing
  6. If dispatcher throws exception: moves to failed directory for inspection
  7. Releases lock and cleans up lock file

Event Handling Logic:

  • Dispatcher must return true to indicate the event was successfully handled
  • Dispatcher must return false to indicate the event was not handled (leaves in pending)
  • Dispatcher throwing an exception indicates a failure (moves to failed)
  • PHP errors are converted to exceptions using custom error handler
  • Lock is always released, even if processing fails
Parameters
$dispatcher : callable

A function to call for each event. Signature: function(string $event, array $args): bool Must return true if event was handled, false if not handled. Throw exceptions to mark as failed.

Tags
throws
RuntimeException

If MessageQueue not configured.

example
// Basic event processing with match expression
MessageQueue::consume(function($event, $args) {
    return match($event) {
        'user.registered' => (UserService::sendWelcome($args['user_id']), true),
        'order.completed' => (EmailService::sendReceipt($args['order_id']), true),
        'email.send' => (MailService::send($args['to'], $args['subject'], $args['body']), true),
        default => false // Not handled - leave in pending
    };
});
example
// Event processing with explicit handling logic
MessageQueue::consume(function($event, $args) {
    switch($event) {
        case 'user.registered':
            UserService::sendWelcome($args['user_id']);
            return true; // Successfully handled

        case 'email.send':
            if (!isset($args['to']) || !isset($args['subject'])) {
                throw new InvalidArgumentException('Missing email parameters');
            }
            MailService::send($args['to'], $args['subject'], $args['body']);
            return true; // Successfully handled

        default:
            return false; // Not handled - will remain in pending
    }
});
example
// Processing with error handling and conditional logic
MessageQueue::consume(function($event, $args) {
    try {
        $handled = EventHandler::dispatch($event, $args);
        if ($handled) {
            Logger::info("Processed event: $event");
            return true;
        } else {
            Logger::debug("Event not handled by this processor: $event");
            return false; // Let another processor handle it
        }
    } catch (Exception $e) {
        Logger::error("Event failed: $event - " . $e->getMessage());
        throw $e; // Re-throw to mark job as failed
    }
});
see
MessageQueue::configure()

Must be called before this method

see
MessageQueue::publish()

Creates jobs processed by this method

see
MessageQueue::count()

Check how many jobs will be processed

see
MessageQueue::keepCompleted()

Controls completed job file handling

see
MessageQueue::failedDir()

Where failed jobs are moved

see
MessageQueue::completedDir()

Where successful jobs are moved

count()

Get the number of messages currently in the queue.

public static count() : int

Counts all .json files in the pending directory, which represents the number of messages waiting to be processed. This is useful for monitoring queue depth and determining if processing is keeping up with message publishing.

The count includes only valid job files (.json extension) and excludes any lock files or other non-job files that might exist in the directory.

Tags
throws
RuntimeException

If MessageQueue not configured.

example
// Check if queue is empty
if (MessageQueue::count() === 0) {
    echo "No messages in queue\n";
}
example
// Monitor queue depth
$count = MessageQueue::count();
echo "Messages in queue: $count\n";
if ($count > 100) {
    echo "Queue is getting backed up!\n";
}
example
// Wait for queue to be processed
while (MessageQueue::count() > 0) {
    sleep(1);
    echo "Waiting for " . MessageQueue::count() . " messages to be processed...\n";
}
see
MessageQueue::configure()

Must be called before this method

see
MessageQueue::publish()

Increases this count

see
MessageQueue::consume()

Decreases this count

see
MessageQueue::pendingDir()

Directory where pending messages are stored

Return values
int

Number of .json job files in the pending directory.

failedDir()

Get or set the directory used for failed jobs.

public static failedDir([string|null $path = null ]) : string

If a path is provided, updates the directory and returns the previous value. Ensures the new directory exists and is writable. This is where job files are moved when they fail during processing, allowing for later inspection, debugging, or manual reprocessing.

Failed jobs retain their original filename when moved, making it easy to identify when and what type of job failed. The failed directory serves as an important debugging and monitoring tool.

Parameters
$path : string|null = null

Optional new absolute path to set. If null, returns current path without changing it.

Tags
throws
RuntimeException

If MessageQueue not configured or path not writable.

example
// Get current failed directory
$current = MessageQueue::failedDir();
example
// Set new failed directory
$old = MessageQueue::failedDir('/var/queue/failed');
see
MessageQueue::configure()

Recommended method for initial setup

see
MessageQueue::consume()

Moves failed jobs to this directory

see
MessageQueue::ensureDirectories()

Creates directory if needed

Return values
string

The previous failed directory path.

isConfigured()

Check if the MessageQueue has been properly configured.

public static isConfigured() : bool

Returns true if configure() has been called successfully, false otherwise. This is useful for validation in applications to ensure the queue system is ready before attempting to use it.

Tags
example
// Validate configuration before publishing messages
if (!MessageQueue::isConfigured()) {
    throw new RuntimeException('MessageQueue must be configured before use');
}
MessageQueue::publish('test.event', []);
see
MessageQueue::configure()

Method that sets up the message queue

Return values
bool

True if configured, false if configure() has not been called.

keepCompleted()

Get or set whether completed job files should be retained after processing.

public static keepCompleted(bool|null $keepCompleted) : bool

By default, completed jobs are moved to the completed directory for auditing. Setting this to false will delete completed job files immediately after successful processing, saving disk space but losing the audit trail.

This setting affects the behavior of the process() method - when keepCompleted is true, successful jobs are moved to the completed directory; when false, they are deleted immediately.

Parameters
$keepCompleted : bool|null

New setting: true to keep completed files, false to delete them, null to just get current value

Tags
example
// Get current setting
$current = MessageQueue::keepCompleted(null);
example
// Enable keeping completed files for audit trail
$old = MessageQueue::keepCompleted(true);
example
// Disable to save disk space
MessageQueue::keepCompleted(false);
see
MessageQueue::consume()

Method affected by this setting

see
MessageQueue::completedDir()

For setting the completed files directory

Return values
bool

The previous value of the keepCompleted setting

pendingDir()

Get or set the directory used for pending jobs.

public static pendingDir([string|null $path = null ]) : string

If a path is provided, updates the directory and returns the previous value. Ensures the new directory exists and is writable. This is where new job files are created when using publish() and where the consumer looks for jobs to execute.

Note: This method allows fine-grained control over individual directories, but using configure() is the recommended approach for initial setup.

The pending directory should be writable by the web server process and any CLI scripts that will process the queue. Directory permissions are set to 0777 when created automatically.

Parameters
$path : string|null = null

Optional new absolute path to set. If null, returns current path without changing it.

Tags
throws
RuntimeException

If MessageQueue not configured or path not writable.

example
// Get current pending directory
$current = MessageQueue::pendingDir();
example
// Set new pending directory
$old = MessageQueue::pendingDir('/var/queue/pending');
see
MessageQueue::configure()

Recommended method for initial setup

see
MessageQueue::publish()

Creates files in this directory

see
MessageQueue::consume()

Reads files from this directory

see
MessageQueue::ensureDirectories()

Creates directory if needed

Return values
string

The previous pending directory path.

publish()

Publish a new event message to the queue for later processing.

public static publish(string $event, array<string|int, mixed> $namedArgs) : void

Creates a JSON file in the pending directory containing the event name, arguments, and timestamp. The filename uses microtime for ordering and includes the event name for easy identification.

Job files are created with the format: {microtime}_{event}.json Content structure: {"event": "string", "args": }, "timestamp": 123456789}

This method is thread-safe as each call generates a unique filename based on microtime. The created file will be picked up by the next consume() call.

Parameters
$event : string

The event name or type (e.g., 'user.registered', 'email.send'). Used for routing in the dispatcher and included in filename.

$namedArgs : array<string|int, mixed>

An associative array of arguments to be passed to the dispatcher. Can contain any JSON-serializable data needed by the event handler.

Tags
throws
RuntimeException

If MessageQueue not configured.

example
// Publish a user registration event
MessageQueue::publish('user.registered', [
    'user_id' => 123,
    'email' => 'user@example.com',
    'name' => 'John Doe'
]);
example
// Publish an email notification
MessageQueue::publish('email.send', [
    'to' => 'admin@example.com',
    'subject' => 'New Order',
    'template' => 'order_notification',
    'data' => ['order_id' => 456]
]);
see
MessageQueue::configure()

Must be called before this method

see
MessageQueue::consume()

Processes queued messages

see
MessageQueue::count()

Check number of queued messages

see
MessageQueue::ensureDirectories()

Ensures pending directory exists

ensureDirectories()

Ensure that all necessary directories exist and are properly set up.

protected static ensureDirectories() : void

Creates the pending, failed, and completed directories if they do not exist. Directories are created recursively with 0777 permissions, making them writable by all users (subject to umask). This method is called automatically by other methods that need to ensure directory existence.

This is an internal method that handles the infrastructure setup required for the MessageQueue to function properly. It's safe to call multiple times as it only creates directories that don't already exist.

Tags
throws
RuntimeException

If any directory cannot be created.

see
MessageQueue::configure()

Calls this during initial setup

see
MessageQueue::pendingDir()

Calls this when setting new path

see
MessageQueue::failedDir()

Calls this when setting new path

see
MessageQueue::completedDir()

Calls this when setting new path

see
MessageQueue::publish()

Calls this before creating job files

see
MessageQueue::consume()

Calls this before processing jobs

requireConfiguration()

Validate that the MessageQueue has been configured before use.

private static requireConfiguration() : void

Internal method called by all public methods to ensure the message queue is properly set up before attempting any operations. Throws a clear exception if configuration is missing.

Tags
throws
RuntimeException

If configure() has not been called.


        
On this page

Search results