How We Built an Email Queue that Can Send 100 Emails Per Second Through Amazon SES

#

It’s no secret that I’ve been plugging away at WP Offload SES, a new plugin that we’re hoping to launch soon that will make it easier to send your site emails over Amazon SES.

While I’ve mentioned it before on this blog, one thing that I haven’t mentioned is the performance you can expect to get out of it. In this week’s article I decided to step back for a second and review the queueing system we put in place, and how we managed to squeeze every last ounce of performance out of it.

Planning Things Out

Right off the bat we knew that we needed to implement an email queue, so that instead of sending emails right away they are queued up and processed in batches. SMTP servers will usually have a built-in queue, but since we’re using the Amazon SES API instead of SMTP, we need to roll out our own solution.

Without a queue we’d run into situations where the site is trying to send out hundreds, if not thousands of emails at once, and this would inevitably lead to a crash due to a lack of memory or time to process the request. There’s also the Amazon SES rate limit to consider, which isn’t set in stone and varies from account to account.

Once we realized that we’d need to build a queue, my mind went straight to WP Queue, a library for WordPress plugins built by our own Ashley Rich. We’d used it before in a few other plugins, like the WP Migrate DB Pro Theme + Plugin Files addon, and the Image Processing Queue plugin.

Why WP Queue?

It does a great job of letting you register a job, and adding multiple jobs to the queue which are then later run via wp-cron.

I did look into a few other queue systems, and even looked into Amazon Simple Queue Service. However I decided against this due to the additional price and complexity of having another service to rely on. Also, if we ever needed to make tweaks or changes to the queue system, it’s much easier to do that with WP Queue than it is for Amazon SQS.

AWS CommandPool

One slight drawback of using WP Queue for our needs is that it focuses on completing one job at a time, on a first-come, first-serve basis. This is simple and it works, but it could be slow for users that intend on sending thousands of emails per day.

Since the average API request to Amazon SES can take about 0.4 seconds, we can only expect to be sending a few emails per second. To get around this, AWS recommends using multiple threads to send multiple emails at the same time, instead of sending one and waiting for the response before sending another one.

Luckily, Amazon built a way to do just that into the PHP SDK which WP Offload SES is using. The CommandPool object can be used to manually add a number of commands that will be sent to the API, and will batch them up intelligently.

Visualizing the Flow

With the queue system decided on, it’s much easier to visualize the flow of data in the plugin for sending emails:

WP Offload SES email queue diagram

Implementation

Now for the fun part – actually implementing the system!

Override wp_mail()

The first step was to override the wp_mail() function which is a WordPress pluggable function. This lets us listen for all emails sent through WordPress (and compliant themes and plugins) and use our own functionality instead of sending the email right away.

In WP Offload SES, this is pretty straightforward – we just add the following code to the bottom of the main plugin file:

if ( ! function_exists( 'wp_mail' ) ) {
    $settings = get_site_option( 'wposes_settings' );
    if ( isset( $settings['send-via-ses'] ) && (bool) $settings['send-via-ses'] ) {
        function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() ) {
            global $wp_offload_ses;
            $wp_offload_ses->mail_handler( $to, $subject, $message, $headers, $attachments );
        }
    }
}

The above checks if the wp_mail() function has been overridden already, and if it hasn’t, it creates our own wp_mail() function if the WP Offload SES plugin is configured to send mail.

It accepts all the same data as the original wp_mail() function, and passes it to our custom mail handler.

Add to Queue

Next we need to add the email to the queue. The mail_hander() method called in the wp_mail() function above combines all of the attributes passed to wp_mail() and pushes a new Email Job to the queue:

public function mail_handler( $to, $subject, $message, $headers, $attachments ) {
    $atts     = apply_filters( 'wp_mail', compact( 'to', 'subject', 'message', 'headers', 'attachments' ) );
    $settings = $this->settings->get_settings();

    $this->get_queue()->push( new Email_Job( $atts, $settings ), $this->delay );

    return true;
}

This saves the email job to the database to be processed later. Speaking of the email job, here’s what that looks like:

class Email_Job extends Job {

    /**
     * @var array
     */
    private $atts;

    /**
     * @var array
     */
    private $settings;

    /**
     * Pass any necessary data to the job.
     * 
     * @param string $atts
     */
    public function __construct( $atts, $settings ) {
        $this->atts     = $atts;
        $this->settings = $settings;
    }

    /**
     * Handle the job logic.
     */
    public function handle() {
        global $wp_offload_ses;

        if ( isset( $this->atts['to'] ) ) {
            $to = $this->atts['to'];
        }

        if ( isset( $this->atts['subject'] ) ) {
            $subject = $this->atts['subject'];
        }

        if ( isset( $this->atts['message'] ) ) {
            $message = $this->atts['message'];
        }

        if ( isset( $this->atts['headers'] ) ) {
            $headers = $this->atts['headers'];
        }

        if ( isset( $this->atts['attachments'] ) ) {
            $attachments = $this->atts['attachments'];
        }

        $client  = $wp_offload_ses->get_ses_api()->get_client();    
        $email   = new Email( $to, $subject, $message, $headers, $attachments, $this->settings );
        $raw     = $email->prepare();
        $data    = array(
            'x-message-id' => $this->id(),
            'RawMessage'   => array(
                'Data' => $raw,
            ),
        );

        return $client->getCommand( 'SendRawEmail', $data );
    }

}

When the queue is processing jobs, it will restore the Email_Job class from the database with the attributes passed to it in the constructor, and then call the process() method to execute the job.

In this case, it gets an email ready to send and returns the command to actually send it – this is used later in the CommandPool.

Send via CommandPool

Luckily the AWS PHP SDK includes the classes that are needed to get started using the CommandPool without much further tweaking. I created a class that handles all of the relevant logic that has three major methods. First, there’s the add_command() function, which adds a command to the pool:

public function add_command( $command ) {
    $this->commands[] = $command;
    $num_commands     = count( $this->commands );

    if ( $this->get_concurrency() === $num_commands || $this->connection->jobs() === $num_commands ) {
        $this->execute();
        $this->commands = array();
    }
}

Then, when a command limit has been reached, the execute() function is called:

private function execute() {
    global $wp_offload_ses;

    // Initiate the CommandPool
    $client       = $wp_offload_ses->get_ses_api()->get_client();
    $command_pool = new CommandPool( $client, $this->commands, [
        'concurrency' => $this->get_concurrency(),
        'fulfilled' => function (
            ResultInterface $result,
            $iterKey,
            PromiseInterface $aggregatePromise
        ) {
            $id  = $this->commands[$iterKey]['x-message-id'];
            $job = $this->connection->get_job( $id );
            $this->connection->delete( $job );
        },
        'rejected' => function (
            SesException $reason,
            $iterKey,
            PromiseInterface $aggregatePromise
        ) {
            $id  = $this->commands[ $iterKey ]['x-message-id']; 
            $job = $this->connection->get_job( $id );

            $job->release();

            if ( $job->attempts() >= $this->attempts ) {
                $job->fail();
            }

            if ( $job->failed() ) {
                $this->connection->failure( $job, $reason );
            } else {
                $this->connection->release( $job );
            }
        }
    ]);

    // Send the emails in the pool
    $promise = $command_pool->promise();
    $promise->wait();
}

This function initiates the AWS CommandPool class and sends off the requests asynchronously. The neat thing about this is that you can also pass in anonymous functions to handle the responses to the API requests, so every request is handled as soon as it completes instead of waiting for all of the requests to be sent. The fulfilled anonymous function handles requests that have completed successfully and deletes them from the queue, while the rejected function handles failed requests and releases the job so that it can be attempted again. If a request fails more than a certain amount of times (in this case 3), it is marked as failed and won’t be automatically retried again.

You may have noticed that the CommandPool concurrency is set by another function, get_concurrency(). This function retrieves the maximum send rate from SES and passes that to the CommandPool object, ensuring that it is always running at peak capacity without going over the Amazon SES rate limit:

private function get_concurrency() {
    global $wp_offload_ses;

    if ( ! is_null( $this->concurrency ) ) {
        return $this->concurrency;
    }

    $quota        = $wp_offload_ses->get_ses_api()->get_send_quota();
    $send_rate = 10;

    if ( ! is_wp_error( $quota ) ) {
        $send_rate = $quota['rate'];
    }

    $this->concurrency = (int) apply_filters( 'wposes_max_concurrency', $send_rate );

    return $this->concurrency;
}

The queue will continue iterating over the the available email jobs until either there are no more jobs, or the PHP time limit is reached. Once that happens, the queue stops until the next run of wp_cron, and then continues processing the requests.

Benchmarking

With all the code buttoned up, it’s time to see if everything actually works. Since it’s tough to benchmark asynchronous requests to a remote API, I hacked together a quick script that would queue up a few hundred emails, and modified the CommandPool fulfilled function to log the time that each email was sent.

WP Offload SES queue performance

After a few benchmarks, it became apparent that it was easily sending 100 emails per second, and I assume it would send even more if Amazon would approve my constant requests to increase my rate limit.

It's alive!!

Final Thoughts

While it might seem boring to some, I had a lot of fun optimizing the WP Offload SES plugin to send out as many emails as it can. Starting with one email per second and getting that up to 100 emails per second with the addition of WP Queue and the AWS CommandPool was very satisfying, and it makes me confident that WP Offload SES will have no problem handling large amounts of emails.

Have you ever worked with a queueing system before? Is there any code that you optimized the heck out of and you’re proud of now? Let us know in the comments.

About the Author

Matt Shaw

Matt is a WordPress plugin developer located near Philadelphia, PA. He loves to create awesome new tools with PHP, Javascript, and whatever else he happens to get his hands on.