0 Replies - 24609 Views - Last Post: 23 September 2011 - 11:48 AM Rate Topic: ***** 2 Votes

#1 Motoma  Icon User is offline

  • D.I.C Addict
  • member icon

Reputation: 452
  • View blog
  • Posts: 796
  • Joined: 08-June 10

Faces in the Cloud: High-Throughput Data Processing w / Message Queues

Posted 23 September 2011 - 11:48 AM

*
POPULAR

As the title of this post alludes, this tutorial will guide you though the process of setting up a grid computing cluster, leveraging a high-performance message queue for task arbitration. This particular example will perform face detection on a number of images using Python, OpenCV, ImageMagick, and leveraging beanstalkd. My hope is that by the end of this tutorial you will understand a little bit about message queues, image processing, and grid computing.

The task
Imagine you work for a government three-letter agency. You're tasked with the job of taking traversing through a massive set of images, picking out the images that are of people's faces, and highlighting the face for future identification.

The process
We will divide this process into three parts, each which can be further parallelized:
  • Load each image as a job into and divvy it out to the computation grid.
  • Process each image, detect any faces in the image, and hightlight them.
  • Store the highlighted image in a database.


The set up
This tutorial will have us building a grid of computers (physical, virtual, cloud, or othewise) to perform various steps of the process. For the purposes of this tutorial, our servers' names will be:

arbiter: the machine that runs the message queue
queuer-0..n: these machines will hold un-analysed images to be processed and give them to the arbiter as jobs to be completed.
filter-0..n: these machines will request jobs from the arbiter, filter out images without faces, highlight faces in the remaining images, and pass them back to the arbiter as completed jobs.
database: the machine that holds the final images, with faces highlighted.

The arbiter
A message queue is an asynchronous form of interprocess communication. Our system will leverage beanstalkd to allow us to pass image processing jobs to an arbitrary number of grid workers.

We will start by installing and running beanstalkd on our arbitration server:
motoma@arbiter $ beanstalkd -d -l 0.0.0.0 -p 11300



The queuers
The queuers are simple beasts. Along with Python, they need only the beanstalkc library installed. They must:
  • Connect to the beanstalk server "arbiter"
  • Tell arbiter to create a message queue for new images
  • Read through a collection of images
  • Serialize each image and pass it to the arbiter's new image message queue


#!/usr/bin/env python                                                                                                   

import os
import pickle
import time

import beanstalkc

# Images are stored locally in 'images', and moved to 'original' when 
# they have been passed to the queue
IMAGE_DIRECTORY = 'images'
BACKUP_DIRECTORY = 'original'

# Hostname and port for arbiter
MQHOST = 'arbiter'
MQPORT = 11300

def main():
    # Connect to the arbiter and create a message queue newImages
    beanstalk = beanstalkc.Connection(host=MQHOST, port=MQPORT)
    beanstalk.use('newImages')

    # Iterate over images
    files = os.listdir(IMAGE_DIRECTORY)
    for image in files:
        with open(os.path.join(IMAGE_DIRECTORY, image)) as data_h:
            data = data_h.read()

            # Serialize the image and pass it to the arbiter
            beanstalk.put(pickle.dumps((image, data)))

        # Backup the image
        os.rename(os.path.join(IMAGE_DIRECTORY, image), os.path.join(BACKUP_DIRECTORY, image))

if __name__ == '__main__':
    main()



The filters
The filters will be the bulk of the workforce. We will be able to scale out as many of these beasts as the job requires with little effort. Their task is the most complicated:
  • Connect to the beanstalk server "arbiter"
  • Tell arbiter to create a message queue for images of people
  • Request jobs from the arbiter's new image queue
  • Detect faces using OpenCV
  • Discard images without faces
  • Highlight faces with ImageMagick
  • Serialize the processed image, and pass it to the arbiter's people message queue


#!/usr/bin/env python                                                                                                   

import os
import pickle
import random
import sys

import beanstalkc

# Use *'s sparingly!
from opencv.cv import *
from opencv.highgui import *

TEMP_DIRECTORY = '/tmp'

MQHOST = 'arbiter'
MQPORT = 11300

def main():
    # Read from newImages, write to people
    beanstalk = beanstalkc.Connection(host=MQHOST, port=MQPORT)
    beanstalk.use('people')
    beanstalk.watch('newImages')

    while True:
        # Request a job from the arbiter
        job = beanstalk.reserve()

        # Deserialize the image and write to a temp file
        filename, data = pickle.loads(job.body)
        image = open(os.path.join(TEMP_DIRECTORY, filename), 'w')
        image.write(data)
        image.close()

        # Load the image with OpenCV
        image = cvLoadImage(os.path.join(TEMP_DIRECTORY, filename))
        if image:
            # Convert to grayscale for face detection
            grayscale = cvCreateImage(cvSize(image.width, image.height), 8, 1)
            cvCvtColor(image, grayscale, CV_BGR2GRAY)

            storage = cvCreateMemStorage(0)
            cvClearMemStorage(storage)
            cvEqualizeHist(grayscale, grayscale)

            # Pass grayscale image through face detection filter
            cascade = cvLoadHaarClassifierCascade(
                '/usr/share/opencv/haarcascades/haarcascade_frontalface_default.xml',
                cvSize(1,1))
            faces = cvHaarDetectObjects(grayscale, cascade, storage, 1.2, 2,
                                        CV_HAAR_DO_CANNY_PRUNING, cvSize(50,50))

            draw = ""
            if faces.total > 0:
                # Build arguments for imagemagick
                for f in faces:
                    draw += 'rectangle %d,%d %d,%d ' %(f.x, f.y, f.x+f.width, f.y+f.height)

                    # Draw rectangles over each detected face
                    os.system('convert %s -stroke red -fill none -draw "%s" %s' % (
                            os.path.join(TEMP_DIRECTORY, filename), draw,
                            os.path.join(TEMP_DIRECTORY, "h." + filename)))

                    with open(os.path.join(TEMP_DIRECTORY, "h." + filename)) as data_h:
                        data = data_h.read()
                        beanstalk.put(pickle.dumps((filename, data)))

                # Delete temporary files
                os.unlink(os.path.join(TEMP_DIRECTORY, "h." + filename))
            os.unlink(os.path.join(TEMP_DIRECTORY, filename))
        # Tell the arbiter that the job was successfully completed
        job.delete()

if __name__ == '__main__':
    main()



The database
In this example, the database is just a machine holding processed images in a directory. We could easily envision a number of these machines reading images and storing them in an indexed Oracle database, or running them against mugshots of known criminals. In essence, the database will:
  • Connect to the beanstalk server "arbiter"
  • Request jobs from the arbiter's people queue
  • Store the images.


#!/usr/bin/env python                                                                                                   

import os
import pickle

import beanstalkc

OUTPUT_DIRECTORY = 'processed'

MQHOST = 'arbiter'
MQPORT = 11300

def main():
    # Pull images from the people message queue
    beanstalk = beanstalkc.Connection(host=MQHOST, port=MQPORT)
    beanstalk.watch('people')

    while True:
        # Request an processed image
        job = beanstalk.reserve()

        # Deserialize the image
        filename, data = pickle.loads(job.body)

        # Write the image to a file
        image = open(os.path.join(OUTPUT_DIRECTORY, filename), 'w')
        image.write(data)
        image.close()

        # Remove the image from the people queue
        job.delete()

if __name__ == '__main__':
    main()



Running the system
In order to launch our image processing grid, we first need to start our database:
motoma@database $ python database.py



We then start our fitlers:
motoma@filter-0 $ python filter.py


motoma@filter-1 $ python filter.py


...
motoma@filter-n $ python filter.py



And finally, we launch our image queuers:
motoma@queuer-0 $ python queuer.py


motoma@queuer-1 $ python queuer.py


...
motoma@queuer-n $ python queuer.py



Checking our database, we should see that images are already piling up:
Attached Image Becomes Attached Image
Attached Image Becomes Attached Image
Attached Image Becomes Attached Image
Attached Image Becomes Attached Image

Moving on
What we have build is an extremely basic, but flexible grid computing architecture for detecting faces. Ideas that could be added to this:
  • Harvest images from Facebook.
  • Perform facial recognition, not just detection.
  • Process video instead of images.
  • Utilize a more robust message queue such as RabbitMQ so you can work with larger jobs.
  • Store data in a real database.

This post has been edited by Motoma: 10 November 2011 - 09:33 AM


Is This A Good Question/Topic? 7
  • +

Page 1 of 1