MessageQueue
in package
Class MessageQueue
A simple event queue system for handling asynchronous, file-based message passing between processes or components. Events are stored as JSON files and dispatched using a user-supplied dispatcher callable. Failed and completed jobs are routed to respective directories.
The MessageQueue operates on a simple file-based queue system:
- Events are queued as JSON files in the pending directory
- A processor reads and dispatches events using a user-provided callable
- Successfully handled jobs are moved to completed directory (or deleted)
- Unhandled jobs remain in pending directory for future processing
- Failed jobs are moved to failed directory for later inspection
Event Processing States:
- Handled (dispatcher returns true): Job moved to completed directory or deleted
- Not Handled (dispatcher returns false): Job remains in pending for retry/other processors
- Failed (dispatcher throws exception): Job moved to failed directory for inspection
Configuration: The MessageQueue must be configured with a base directory before use via the configure() method. This base directory will contain three subdirectories: pending, failed, and completed. All paths are explicitly set and no application constants are required.
File Structure:
- Base directory: /path/to/messagequeue/
- Pending jobs: {base}/pending/{timestamp}_{event}.json
- Failed jobs: {base}/failed/{timestamp}_{event}.json
- Completed jobs: {base}/completed/{timestamp}_{event}.json
- Job format: {"event": "string", "args": }, "timestamp": 123456789}
- Uses file locking to prevent concurrent processing
Dependencies:
- Requires explicit configuration via configure() method before use
- Creates directories automatically with 0777 permissions
- Base directory must be writable by the process
- Dispatcher callable must return true/false to indicate handling status
Tags
Table of Contents
Properties
- $completedDir : string|null
- $configured : bool
- $failedDir : string|null
- $keepCompleted : bool
- $pendingDir : string|null
Methods
- completedDir() : string
- Get or set the directory used for completed jobs.
- configure() : void
- Configure the MessageQueue with a base directory for queue operations.
- consume() : void
- Consume all pending messages by dispatching them using the provided callable.
- count() : int
- Get the number of messages currently in the queue.
- failedDir() : string
- Get or set the directory used for failed jobs.
- isConfigured() : bool
- Check if the MessageQueue has been properly configured.
- keepCompleted() : bool
- Get or set whether completed job files should be retained after processing.
- pendingDir() : string
- Get or set the directory used for pending jobs.
- publish() : void
- Publish a new event message to the queue for later processing.
- ensureDirectories() : void
- Ensure that all necessary directories exist and are properly set up.
- requireConfiguration() : void
- Validate that the MessageQueue has been configured before use.
Properties
$completedDir
protected
static string|null
$completedDir
= null
Directory for completed job files
$configured
protected
static bool
$configured
= false
Whether the message queue has been configured
$failedDir
protected
static string|null
$failedDir
= null
Directory for failed job files
$keepCompleted
protected
static bool
$keepCompleted
= true
Whether to keep completed job files or delete them after processing
$pendingDir
protected
static string|null
$pendingDir
= null
Directory for pending job files
Methods
completedDir()
Get or set the directory used for completed jobs.
public
static completedDir([string|null $path = null ]) : string
If a path is provided, updates the directory and returns the previous value. Ensures the new directory exists and is writable. This is where successfully processed job files are moved (if keepCompleted is true), providing an audit trail of all completed work.
The completed directory is useful for debugging, auditing, and understanding system activity. Files here can be safely deleted periodically to manage disk space, or archived for long-term record keeping.
Parameters
- $path : string|null = null
-
Optional new absolute path to set. If null, returns current path without changing it.
Tags
Return values
string —The previous completed directory path.
configure()
Configure the MessageQueue with a base directory for queue operations.
public
static configure(string $basePath) : void
This method must be called before using any other MessageQueue methods. It sets up the directory structure needed for queue operations by creating three subdirectories under the provided base path: pending, failed, and completed.
Directory Structure Created:
- {basePath}/pending - Where new jobs are queued
- {basePath}/failed - Where failed jobs are moved for inspection
- {basePath}/completed - Where successful jobs are archived (if keepCompleted is true)
The base directory and all subdirectories are created automatically if they don't exist, with 0777 permissions (subject to umask). The base path must be writable by the process that will be creating and processing jobs.
Configuration is idempotent - calling this method multiple times with the same path is safe and will not cause issues. However, changing the path after jobs have been queued may result in those jobs being "lost" until the original path is restored.
Parameters
- $basePath : string
-
Absolute path to the base directory for queue operations. Must be writable by the process. Subdirectories will be created here.
Tags
consume()
Consume all pending messages by dispatching them using the provided callable.
public
static consume(callable $dispatcher) : void
Iterates through all JSON files in the pending directory and calls the provided dispatcher function for each valid job. Uses a lock file to prevent multiple processes from running simultaneously, ensuring jobs are not processed twice.
Message Processing Flow:
- Acquires exclusive lock to prevent concurrent processing
- Reads each .json file in pending directory
- Calls dispatcher with event name and arguments
- If dispatcher returns true: moves to completed directory or deletes (based on keepCompleted setting)
- If dispatcher returns false: leaves job in pending directory for future processing
- If dispatcher throws exception: moves to failed directory for inspection
- Releases lock and cleans up lock file
Event Handling Logic:
- Dispatcher must return true to indicate the event was successfully handled
- Dispatcher must return false to indicate the event was not handled (leaves in pending)
- Dispatcher throwing an exception indicates a failure (moves to failed)
- PHP errors are converted to exceptions using custom error handler
- Lock is always released, even if processing fails
Parameters
- $dispatcher : callable
-
A function to call for each event. Signature: function(string $event, array $args): bool Must return true if event was handled, false if not handled. Throw exceptions to mark as failed.
Tags
count()
Get the number of messages currently in the queue.
public
static count() : int
Counts all .json files in the pending directory, which represents the number of messages waiting to be processed. This is useful for monitoring queue depth and determining if processing is keeping up with message publishing.
The count includes only valid job files (.json extension) and excludes any lock files or other non-job files that might exist in the directory.
Tags
Return values
int —Number of .json job files in the pending directory.
failedDir()
Get or set the directory used for failed jobs.
public
static failedDir([string|null $path = null ]) : string
If a path is provided, updates the directory and returns the previous value. Ensures the new directory exists and is writable. This is where job files are moved when they fail during processing, allowing for later inspection, debugging, or manual reprocessing.
Failed jobs retain their original filename when moved, making it easy to identify when and what type of job failed. The failed directory serves as an important debugging and monitoring tool.
Parameters
- $path : string|null = null
-
Optional new absolute path to set. If null, returns current path without changing it.
Tags
Return values
string —The previous failed directory path.
isConfigured()
Check if the MessageQueue has been properly configured.
public
static isConfigured() : bool
Returns true if configure() has been called successfully, false otherwise. This is useful for validation in applications to ensure the queue system is ready before attempting to use it.
Tags
Return values
bool —True if configured, false if configure() has not been called.
keepCompleted()
Get or set whether completed job files should be retained after processing.
public
static keepCompleted(bool|null $keepCompleted) : bool
By default, completed jobs are moved to the completed directory for auditing. Setting this to false will delete completed job files immediately after successful processing, saving disk space but losing the audit trail.
This setting affects the behavior of the process() method - when keepCompleted is true, successful jobs are moved to the completed directory; when false, they are deleted immediately.
Parameters
- $keepCompleted : bool|null
-
New setting: true to keep completed files, false to delete them, null to just get current value
Tags
Return values
bool —The previous value of the keepCompleted setting
pendingDir()
Get or set the directory used for pending jobs.
public
static pendingDir([string|null $path = null ]) : string
If a path is provided, updates the directory and returns the previous value. Ensures the new directory exists and is writable. This is where new job files are created when using publish() and where the consumer looks for jobs to execute.
Note: This method allows fine-grained control over individual directories, but using configure() is the recommended approach for initial setup.
The pending directory should be writable by the web server process and any CLI scripts that will process the queue. Directory permissions are set to 0777 when created automatically.
Parameters
- $path : string|null = null
-
Optional new absolute path to set. If null, returns current path without changing it.
Tags
Return values
string —The previous pending directory path.
publish()
Publish a new event message to the queue for later processing.
public
static publish(string $event, array<string|int, mixed> $namedArgs) : void
Creates a JSON file in the pending directory containing the event name, arguments, and timestamp. The filename uses microtime for ordering and includes the event name for easy identification.
Job files are created with the format: {microtime}_{event}.json Content structure: {"event": "string", "args": }, "timestamp": 123456789}
This method is thread-safe as each call generates a unique filename based on microtime. The created file will be picked up by the next consume() call.
Parameters
- $event : string
-
The event name or type (e.g., 'user.registered', 'email.send'). Used for routing in the dispatcher and included in filename.
- $namedArgs : array<string|int, mixed>
-
An associative array of arguments to be passed to the dispatcher. Can contain any JSON-serializable data needed by the event handler.
Tags
ensureDirectories()
Ensure that all necessary directories exist and are properly set up.
protected
static ensureDirectories() : void
Creates the pending, failed, and completed directories if they do not exist. Directories are created recursively with 0777 permissions, making them writable by all users (subject to umask). This method is called automatically by other methods that need to ensure directory existence.
This is an internal method that handles the infrastructure setup required for the MessageQueue to function properly. It's safe to call multiple times as it only creates directories that don't already exist.
Tags
requireConfiguration()
Validate that the MessageQueue has been configured before use.
private
static requireConfiguration() : void
Internal method called by all public methods to ensure the message queue is properly set up before attempting any operations. Throws a clear exception if configuration is missing.