rebaseline_server: multithreaded loading/diffing of images
BUG=skia:2414 NOTRY=True R=rmistry@google.com Author: epoger@google.com Review URL: https://codereview.chromium.org/235923002 git-svn-id: http://skia.googlecode.com/svn/trunk@14184 2bbb7eff-a529-9590-31e7-b0007b416f81
This commit is contained in:
parent
4431e7757c
commit
280ea8296f
@ -89,6 +89,8 @@ class ExpectationComparisons(results.BaseComparisons):
|
||||
self._expected_root = expected_root
|
||||
self._load_actual_and_expected()
|
||||
self._timestamp = int(time.time())
|
||||
logging.info('Number of download file collisions: %s' %
|
||||
imagediffdb.global_file_collisions)
|
||||
logging.info('Results complete; took %d seconds.' %
|
||||
(self._timestamp - time_start))
|
||||
|
||||
|
@ -11,12 +11,16 @@ Calulate differences between image pairs, and store them in a database.
|
||||
|
||||
import contextlib
|
||||
import csv
|
||||
import errno
|
||||
import logging
|
||||
import Queue
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import threading
|
||||
import urllib
|
||||
try:
|
||||
from PIL import Image, ImageChops
|
||||
@ -35,6 +39,7 @@ SKPDIFF_BINARY = find_run_binary.find_path_to_program('skpdiff')
|
||||
|
||||
DEFAULT_IMAGE_SUFFIX = '.png'
|
||||
DEFAULT_IMAGES_SUBDIR = 'images'
|
||||
DEFAULT_NUM_WORKERS = 8
|
||||
|
||||
DISALLOWED_FILEPATH_CHAR_REGEX = re.compile('[^\w\-]')
|
||||
|
||||
@ -51,6 +56,14 @@ KEY__DIFFERENCE_DATA__PERCENT_DIFF_PIXELS = 'percentDifferingPixels'
|
||||
KEY__DIFFERENCE_DATA__PERCEPTUAL_DIFF = 'perceptualDifference'
|
||||
KEY__DIFFERENCE_DATA__WEIGHTED_DIFF = 'weightedDiffMeasure'
|
||||
|
||||
# Special values within ImageDiffDB._diff_dict
|
||||
DIFFRECORD_FAILED = 'failed'
|
||||
DIFFRECORD_PENDING = 'pending'
|
||||
|
||||
# TODO(epoger): Temporary(?) list to keep track of how many times we download
|
||||
# the same file in multiple threads.
|
||||
global_file_collisions = 0
|
||||
|
||||
|
||||
class DiffRecord(object):
|
||||
""" Record of differences between two images. """
|
||||
@ -64,9 +77,6 @@ class DiffRecord(object):
|
||||
"""Download this pair of images (unless we already have them on local disk),
|
||||
and prepare a DiffRecord for them.
|
||||
|
||||
TODO(epoger): Make this asynchronously download images, rather than blocking
|
||||
until the images have been downloaded and processed.
|
||||
|
||||
Args:
|
||||
storage_root: root directory on local disk within which we store all
|
||||
images
|
||||
@ -219,30 +229,59 @@ class ImageDiffDB(object):
|
||||
""" Calculates differences between image pairs, maintaining a database of
|
||||
them for download."""
|
||||
|
||||
def __init__(self, storage_root):
|
||||
def __init__(self, storage_root, num_workers=DEFAULT_NUM_WORKERS):
|
||||
"""
|
||||
Args:
|
||||
storage_root: string; root path within the DB will store all of its stuff
|
||||
num_workers: integer; number of worker threads to spawn
|
||||
"""
|
||||
self._storage_root = storage_root
|
||||
|
||||
# Dictionary of DiffRecords, keyed by (expected_image_locator,
|
||||
# actual_image_locator) tuples.
|
||||
# Values can also be DIFFRECORD_PENDING, DIFFRECORD_FAILED.
|
||||
self._diff_dict = {}
|
||||
|
||||
# Set up the queue for asynchronously loading DiffRecords, and start the
|
||||
# worker threads reading from it.
|
||||
self._tasks_queue = Queue.Queue(maxsize=2*num_workers)
|
||||
self._workers = []
|
||||
for i in range(num_workers):
|
||||
worker = threading.Thread(target=self.worker, args=(i,))
|
||||
worker.daemon = True
|
||||
worker.start()
|
||||
self._workers.append(worker)
|
||||
|
||||
def worker(self, worker_num):
|
||||
"""Launch a worker thread that pulls tasks off self._tasks_queue.
|
||||
|
||||
Args:
|
||||
worker_num: (integer) which worker this is
|
||||
"""
|
||||
while True:
|
||||
params = self._tasks_queue.get()
|
||||
key, expected_image_url, actual_image_url = params
|
||||
try:
|
||||
diff_record = DiffRecord(
|
||||
self._storage_root,
|
||||
expected_image_url=expected_image_url,
|
||||
expected_image_locator=key[0],
|
||||
actual_image_url=actual_image_url,
|
||||
actual_image_locator=key[1])
|
||||
except Exception:
|
||||
logging.exception(
|
||||
'exception while creating DiffRecord for key %s' % str(key))
|
||||
diff_record = DIFFRECORD_FAILED
|
||||
self._diff_dict[key] = diff_record
|
||||
|
||||
def add_image_pair(self,
|
||||
expected_image_url, expected_image_locator,
|
||||
actual_image_url, actual_image_locator):
|
||||
"""Download this pair of images (unless we already have them on local disk),
|
||||
and prepare a DiffRecord for them.
|
||||
|
||||
TODO(epoger): Make this asynchronously download images, rather than blocking
|
||||
until the images have been downloaded and processed.
|
||||
When we do that, we should probably add a new method that will block
|
||||
until all of the images have been downloaded and processed. Otherwise,
|
||||
we won't know when it's safe to start calling get_diff_record().
|
||||
jcgregorio notes: maybe just make ImageDiffDB thread-safe and create a
|
||||
thread-pool/worker queue at a higher level that just uses ImageDiffDB?
|
||||
This method will block until the images are downloaded and DiffRecord is
|
||||
available by calling get_diff_record().
|
||||
|
||||
Args:
|
||||
expected_image_url: file or HTTP url from which we will download the
|
||||
@ -255,10 +294,11 @@ class ImageDiffDB(object):
|
||||
actual_image_locator: a unique ID string under which we will store the
|
||||
actual image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
|
||||
Raises:
|
||||
Exception if we are unable to create a DiffRecord for this image pair.
|
||||
"""
|
||||
expected_image_locator = _sanitize_locator(expected_image_locator)
|
||||
actual_image_locator = _sanitize_locator(actual_image_locator)
|
||||
key = (expected_image_locator, actual_image_locator)
|
||||
key = _generate_key(expected_image_locator, actual_image_locator)
|
||||
if not key in self._diff_dict:
|
||||
try:
|
||||
new_diff_record = DiffRecord(
|
||||
@ -278,14 +318,70 @@ class ImageDiffDB(object):
|
||||
new_diff_record = None
|
||||
self._diff_dict[key] = new_diff_record
|
||||
|
||||
def add_image_pair_async(self,
|
||||
expected_image_url, expected_image_locator,
|
||||
actual_image_url, actual_image_locator):
|
||||
"""Download this pair of images (unless we already have them on local disk),
|
||||
and prepare a DiffRecord for them.
|
||||
|
||||
This method will return quickly; calls to get_diff_record() will block
|
||||
until the DiffRecord is available (or we have given up on creating it).
|
||||
|
||||
Args:
|
||||
expected_image_url: file or HTTP url from which we will download the
|
||||
expected image
|
||||
expected_image_locator: a unique ID string under which we will store the
|
||||
expected image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
actual_image_url: file or HTTP url from which we will download the
|
||||
actual image
|
||||
actual_image_locator: a unique ID string under which we will store the
|
||||
actual image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
"""
|
||||
key = _generate_key(expected_image_locator, actual_image_locator)
|
||||
if not key in self._diff_dict:
|
||||
# If we have already requested a diff between these two images,
|
||||
# we don't need to request it again.
|
||||
#
|
||||
# Threading note: If multiple threads called into this method with the
|
||||
# same key at the same time, there will be multiple tasks on the queue
|
||||
# with the same key. But that's OK; they will both complete successfully,
|
||||
# and just waste a little time in the process. Nothing will break.
|
||||
self._diff_dict[key] = DIFFRECORD_PENDING
|
||||
self._tasks_queue.put((key, expected_image_url, actual_image_url))
|
||||
|
||||
def get_diff_record(self, expected_image_locator, actual_image_locator):
|
||||
"""Returns the DiffRecord for this image pair.
|
||||
|
||||
Raises a KeyError if we don't have a DiffRecord for this image pair.
|
||||
Args:
|
||||
expected_image_locator: a unique ID string under which we will store the
|
||||
expected image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
actual_image_locator: a unique ID string under which we will store the
|
||||
actual image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
|
||||
Returns the DiffRecord for this image pair, or None if we were unable to
|
||||
generate one.
|
||||
"""
|
||||
key = (_sanitize_locator(expected_image_locator),
|
||||
_sanitize_locator(actual_image_locator))
|
||||
return self._diff_dict[key]
|
||||
key = _generate_key(expected_image_locator, actual_image_locator)
|
||||
diff_record = self._diff_dict[key]
|
||||
|
||||
# If we have no results yet, block until we do.
|
||||
while diff_record == DIFFRECORD_PENDING:
|
||||
time.sleep(1)
|
||||
diff_record = self._diff_dict[key]
|
||||
|
||||
# Once we have the result...
|
||||
if diff_record == DIFFRECORD_FAILED:
|
||||
logging.error(
|
||||
'failed to create a DiffRecord for expected_image_locator=%s , '
|
||||
'actual_image_locator=%s' % (
|
||||
expected_image_locator, actual_image_locator))
|
||||
return None
|
||||
else:
|
||||
return diff_record
|
||||
|
||||
|
||||
# Utility functions
|
||||
@ -374,11 +470,28 @@ def _download_and_open_image(local_filepath, url):
|
||||
|
||||
Returns: a PIL image object
|
||||
"""
|
||||
global global_file_collisions
|
||||
if not os.path.exists(local_filepath):
|
||||
_mkdir_unless_exists(os.path.dirname(local_filepath))
|
||||
with contextlib.closing(urllib.urlopen(url)) as url_handle:
|
||||
with open(local_filepath, 'wb') as file_handle:
|
||||
|
||||
# First download the file contents into a unique filename, and
|
||||
# then rename that file. That way, if multiple threads are downloading
|
||||
# the same filename at the same time, they won't interfere with each
|
||||
# other (they will both download the file, and one will "win" in the end)
|
||||
temp_filename = '%s-%d' % (local_filepath,
|
||||
threading.current_thread().ident)
|
||||
with open(temp_filename, 'wb') as file_handle:
|
||||
shutil.copyfileobj(fsrc=url_handle, fdst=file_handle)
|
||||
|
||||
# Keep count of how many colliding downloads we encounter;
|
||||
# if it's a large number, we may want to change our download strategy
|
||||
# to minimize repeated downloads.
|
||||
if os.path.exists(local_filepath):
|
||||
global_file_collisions += 1
|
||||
else:
|
||||
os.rename(temp_filename, local_filepath)
|
||||
|
||||
return _open_image(local_filepath)
|
||||
|
||||
|
||||
@ -419,8 +532,11 @@ def _mkdir_unless_exists(path):
|
||||
Args:
|
||||
path: path on local disk
|
||||
"""
|
||||
if not os.path.isdir(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EEXIST:
|
||||
pass
|
||||
|
||||
|
||||
def _sanitize_locator(locator):
|
||||
@ -433,6 +549,21 @@ def _sanitize_locator(locator):
|
||||
return DISALLOWED_FILEPATH_CHAR_REGEX.sub('_', str(locator))
|
||||
|
||||
|
||||
def _generate_key(expected_image_locator, actual_image_locator):
|
||||
"""Returns a key suitable for looking up this image pair.
|
||||
|
||||
Args:
|
||||
expected_image_locator: a unique ID string under which we will store the
|
||||
expected image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
actual_image_locator: a unique ID string under which we will store the
|
||||
actual image within storage_root (probably including a checksum to
|
||||
guarantee uniqueness)
|
||||
"""
|
||||
return (_sanitize_locator(expected_image_locator),
|
||||
_sanitize_locator(actual_image_locator))
|
||||
|
||||
|
||||
def _get_difference_locator(expected_image_locator, actual_image_locator):
|
||||
"""Returns the locator string used to look up the diffs between expected_image
|
||||
and actual_image.
|
||||
|
@ -48,27 +48,43 @@ class ImagePair(object):
|
||||
self.extra_columns_dict = extra_columns
|
||||
if not imageA_relative_url or not imageB_relative_url:
|
||||
self._is_different = True
|
||||
self.diff_record = None
|
||||
self._diff_record = None
|
||||
self._diff_record_set = True
|
||||
elif imageA_relative_url == imageB_relative_url:
|
||||
self._is_different = False
|
||||
self.diff_record = None
|
||||
self._diff_record = None
|
||||
self._diff_record_set = True
|
||||
else:
|
||||
# TODO(epoger): Rather than blocking until image_diff_db can read in
|
||||
# the image pair and generate diffs, it would be better to do it
|
||||
# asynchronously: tell image_diff_db to download a bunch of file pairs,
|
||||
# and only block later if we're still waiting for diff_records to come
|
||||
# back.
|
||||
self._is_different = True
|
||||
image_diff_db.add_image_pair(
|
||||
# Tell image_diff_db to add this ImagePair.
|
||||
# It will do so in a separate thread so as not to block this one;
|
||||
# when you call self.get_diff_record(), it will block until the results
|
||||
# are ready.
|
||||
image_diff_db.add_image_pair_async(
|
||||
expected_image_locator=imageA_relative_url,
|
||||
expected_image_url=posixpath.join(base_url, imageA_relative_url),
|
||||
actual_image_locator=imageB_relative_url,
|
||||
actual_image_url=posixpath.join(base_url, imageB_relative_url))
|
||||
self.diff_record = image_diff_db.get_diff_record(
|
||||
expected_image_locator=imageA_relative_url,
|
||||
actual_image_locator=imageB_relative_url)
|
||||
if self.diff_record and self.diff_record.get_num_pixels_differing() == 0:
|
||||
self._image_diff_db = image_diff_db
|
||||
self._diff_record_set = False
|
||||
|
||||
def get_diff_record(self):
|
||||
"""Returns the DiffRecord associated with this ImagePair.
|
||||
|
||||
Returns None if the images are identical, or one is missing.
|
||||
This method will block until the DiffRecord is available.
|
||||
"""
|
||||
if not self._diff_record_set:
|
||||
self._diff_record = self._image_diff_db.get_diff_record(
|
||||
expected_image_locator=self.imageA_relative_url,
|
||||
actual_image_locator=self.imageB_relative_url)
|
||||
self._image_diff_db = None # release reference, no longer needed
|
||||
if (self._diff_record and
|
||||
self._diff_record.get_num_pixels_differing() == 0):
|
||||
self._is_different = False
|
||||
else:
|
||||
self._is_different = True
|
||||
self._diff_record_set = True
|
||||
return self._diff_record
|
||||
|
||||
def as_dict(self):
|
||||
"""Returns a dictionary describing this ImagePair.
|
||||
@ -79,11 +95,12 @@ class ImagePair(object):
|
||||
KEY__IMAGE_A_URL: self.imageA_relative_url,
|
||||
KEY__IMAGE_B_URL: self.imageB_relative_url,
|
||||
}
|
||||
asdict[KEY__IS_DIFFERENT] = self._is_different
|
||||
if self.expectations_dict:
|
||||
asdict[KEY__EXPECTATIONS_DATA] = self.expectations_dict
|
||||
if self.extra_columns_dict:
|
||||
asdict[KEY__EXTRA_COLUMN_VALUES] = self.extra_columns_dict
|
||||
if self.diff_record and (self.diff_record.get_num_pixels_differing() > 0):
|
||||
asdict[KEY__DIFFERENCE_DATA] = self.diff_record.as_dict()
|
||||
diff_record = self.get_diff_record()
|
||||
if diff_record and (diff_record.get_num_pixels_differing() > 0):
|
||||
asdict[KEY__DIFFERENCE_DATA] = diff_record.as_dict()
|
||||
asdict[KEY__IS_DIFFERENT] = self._is_different
|
||||
return asdict
|
||||
|
@ -50,20 +50,20 @@ class ImagePairSet(object):
|
||||
self._descriptions = descriptions or DEFAULT_DESCRIPTIONS
|
||||
self._extra_column_tallies = {} # maps column_id -> values
|
||||
# -> instances_per_value
|
||||
self._image_pair_dicts = []
|
||||
self._image_pairs = []
|
||||
self._image_base_url = None
|
||||
self._diff_base_url = diff_base_url
|
||||
|
||||
def add_image_pair(self, image_pair):
|
||||
"""Adds an ImagePair; this may be repeated any number of times."""
|
||||
# Special handling when we add the first ImagePair...
|
||||
if not self._image_pair_dicts:
|
||||
if not self._image_pairs:
|
||||
self._image_base_url = image_pair.base_url
|
||||
|
||||
if image_pair.base_url != self._image_base_url:
|
||||
raise Exception('added ImagePair with base_url "%s" instead of "%s"' % (
|
||||
image_pair.base_url, self._image_base_url))
|
||||
self._image_pair_dicts.append(image_pair.as_dict())
|
||||
self._image_pairs.append(image_pair)
|
||||
extra_columns_dict = image_pair.extra_columns_dict
|
||||
if extra_columns_dict:
|
||||
for column_id, value in extra_columns_dict.iteritems():
|
||||
@ -142,7 +142,7 @@ class ImagePairSet(object):
|
||||
key_base_url = KEY__IMAGESETS__FIELD__BASE_URL
|
||||
return {
|
||||
KEY__EXTRACOLUMNHEADERS: self._column_headers_as_dict(),
|
||||
KEY__IMAGEPAIRS: self._image_pair_dicts,
|
||||
KEY__IMAGEPAIRS: [pair.as_dict() for pair in self._image_pairs],
|
||||
KEY__IMAGESETS: {
|
||||
KEY__IMAGESETS__SET__IMAGE_A: {
|
||||
key_description: self._descriptions[0],
|
||||
|
Loading…
Reference in New Issue
Block a user