Source code for keepluggable.image_actions

"""An Action class that deals with images."""

from copy import copy
from io import BytesIO
from typing import Any, BinaryIO, Dict, List, Union

from bag.text import strip_lower_preparer, strip_preparer
import colander as c

# import imghdr  # imghdr.what(file)
from kerno.typing import DictStr
from PIL import Image, ExifTags
from pillow_heif import register_heif_opener

from keepluggable.actions import BaseFilesAction
from keepluggable.exceptions import FileNotAllowed
from keepluggable.orchestrator import Orchestrator

register_heif_opener()  # and now Pillow can read the HEIC format.


def _image_format_validator(node, value: str):
    if value not in ("png", "jpeg", "gif"):
        raise c.Invalid(node, f"Unknown image format: {value}")
    return value


[docs]class ImageVersionConfig(c.MappingSchema): """A part of the configuration.""" format = c.SchemaNode( c.String(), preparer=strip_lower_preparer, validator=_image_format_validator ) height = c.SchemaNode(c.Int(), validator=c.Range(min=1)) width = c.SchemaNode(c.Int(), validator=c.Range(min=1)) name = c.SchemaNode(c.String(), preparer=strip_preparer, validator=c.Length(min=1))
[docs] @classmethod def from_str(cls, line: str) -> DictStr: """From a configuration line, return a config dict.""" parts = line.split() assert len(parts) == 4, f'The configuration line "{line}" should have 4 parts' return cls().deserialize( { "format": parts[0], "width": parts[1], "height": parts[2], "name": parts[3], } )
[docs]class ImageAction(BaseFilesAction): """A specialized Action class that deals with images. It converts formats, rotates and resizes images etc. To enable this action, use this configuration:: cls_action = keepluggable.image_actions.ImageAction It inherits from BaseFilesAction, so read its documentation too. **Installing Pillow** To use this action, you need to install the Pillow imaging library:: sudo apt-get install libjpeg-dev zlib1g-dev libfreetype6-dev # Create these links. If they already exist, remove and readd them: sudo ln -s /usr/lib/x86_64-linux-gnu/libjpeg.so /usr/lib sudo ln -s /usr/lib/x86_64-linux-gnu/libfreetype.so /usr/lib sudo ln -s /usr/lib/x86_64-linux-gnu/libz.so /usr/lib pip install Pillow Pay attention to the desired supported formats near the end of the output:: *** TKINTER support not available --- JPEG support available *** OPENJPEG (JPEG2000) support not available --- ZLIB (PNG/ZIP) support available *** LIBTIFF support not available --- FREETYPE2 support available *** LITTLECMS2 support not available *** WEBP support not available *** WEBPMUX support not available **Configuration settings** - ``upload_must_be_img``: a boolean; if True, uploads will only be accepted if they are image files. The default for this setting is False. - ``store_original``: a boolean; if False, the original upload will not have its payload stored. The metadata is always stored in an effort to recognize repeated uploads of the same file. The default for this setting is True. - ``versions``: a list of image versions in the form "format max-width max-height name" - ``versions_quality`` (integer): the quality parameter to be passed to the Pillow JPEG encoder. The default is 90. Here is an example configuration:: [keepluggable_page_images] # (...) store_original = False versions = jpeg 3840 2160 4k jpeg 1920 1920 hd jpeg 960 960 half jpeg 480 480 quarter jpeg 240 240 vignette versions_quality = 90 """ EXIF_TAGS = {v: k for (k, v) in ExifTags.TAGS.items()} # str to int map EXIF_ROTATION_FIX = {1: 0, 8: 90, 3: 180, 6: 270}
[docs] class Config(BaseFilesAction.Config): """Validated configuration for ``ImageAction``.""" upload_must_be_img = c.SchemaNode(c.Bool(), missing=False) store_original = c.SchemaNode(c.Bool(), missing=True) versions_quality = c.SchemaNode(c.Int(), missing=90)
[docs] @classmethod def get_config(cls, settings: DictStr) -> DictStr: """Image versions are a complex string in configuration; parse them. This gets called by the orchestrator at startup. Return the entire action configuration dictionary. """ value = settings["versions"] if not isinstance(value, str): return value # Convert str to validated dict versions: List[DictStr] = [] for line in value.split("\n"): line = line.strip() if not line: # Ignore an empty line continue versions.append(ImageVersionConfig.from_str(line)) # We want to process image versions from smaller to bigger: versions.sort(key=lambda d: d["width"]) config: DictStr = cls.Config().deserialize(settings) config["versions"] = versions return config
def _img_from_stream( self, bytes_io: BinaryIO, metadata: Dict[str, Any], ) -> Image: try: img = Image.open(bytes_io) except OSError: raise FileNotAllowed( 'Unable to store the image "{}" because ' "the server is unable to identify the image format.".format( metadata["file_name"] ) ) img.bytes_io = bytes_io return img def _rotate_exif_orientation(self, img: Image) -> Image: """Rotate the image according to metadata in the payload. Some cameras do not rotate the image, they just add orientation metadata to the file, so we rotate it here. """ if not hasattr(img, "_getexif"): return img # PIL.PngImagePlugin.PngImageFile apparently lacks EXIF tags = img._getexif() if tags is None: return img orientation = tags.get(self.EXIF_TAGS["Orientation"]) if orientation is None: return img degrees = self.EXIF_ROTATION_FIX.get(orientation) rotated = img.rotate(degrees, expand=True) if degrees else img return rotated def _store_versions( self, bytes_io: BinaryIO, metadata: Dict[str, Any], repo: Any, ) -> None: # We override this method to deal with images. is_image = metadata["mime_type"].startswith("image") if not is_image: if self.config["upload_must_be_img"]: raise FileNotAllowed( 'The file name "{}" lacks a supported image extension, ' "so it was not stored.".format(metadata["file_name"]) ) else: super()._store_versions(bytes_io, metadata, repo) return # # If you need to load the image after verify(), must reopen it # bytes_io.seek(0) original = self._img_from_stream(bytes_io, metadata) # may raise original = self._rotate_exif_orientation(original) # Probably don't need to verify() the image since we are loading it # original.verify() # What does this raise? self._copy_img(original, metadata) # Try to raise before storing # No exceptions were raised, so store the original file metadata["image_width"], metadata["image_height"] = original.size if self.config["store_original"]: # Optionally store original payload self._store_file(bytes_io, metadata, repo) else: # Always store original metadata self._store_metadata(bytes_io, metadata) # There is no point in enlarging an uploaded image, but some # configured sizes might be larger. We want to create only the # sizes smaller than the uploaded image, plus one (the original size). largest_version_created_so_far = 0 original_area = original.size[0] * original.size[1] new_versions = [] for version_config in self.config["versions"]: current_area = version_config["width"] * version_config["height"] if largest_version_created_so_far <= original_area: # Do it new_versions.append( self._store_img_version( # may raise original, metadata, version_config, repo ) ) largest_version_created_so_far = current_area metadata["versions"] = new_versions def _store_img_version( self, original: Image, original_metadata: Dict[str, Any], version_config: ImageVersionConfig, repo: Any, ) -> Dict[str, Any]: metadata = copy(original_metadata) metadata["version"] = version_config["name"] metadata["original_id"] = original_metadata["id"] del metadata["id"] img = self._convert_img(original, metadata, version_config) # Store the new metadata and the new payload self._store_file(img.stream, metadata, repo) return metadata def _copy_img( self, original: Image, metadata: Dict[str, Any], alpha: bool = True, ) -> Image: mode = "RGBA" if alpha else "RGB" try: return original.convert(mode) # Create a copy except OSError: raise FileNotAllowed( 'Unable to store the image "{}" because ' "the server is unable to convert it.".format(metadata["file_name"]) ) def _convert_img( self, original: Image, metadata: Dict[str, Any], version_config: DictStr, resample=Image.LANCZOS, ) -> Image: """Return a new image, converted from ``original``. Do it using ``version_config`` and setting ``metadata``. """ fmt = version_config["format"] # Resize, keeping the aspect ratio: img = self._copy_img(original, metadata, alpha=fmt != "jpeg") img.thumbnail((version_config["width"], version_config["height"]), resample) stream = BytesIO() img.save( stream, format=fmt.upper(), quality=self.config["versions_quality"], optimize=1, ) img.stream = stream # so we can recover it elsewhere # Fill in the metadata metadata["mime_type"] = "image/" + fmt metadata["image_width"], metadata["image_height"] = img.size self._compute_length(stream, metadata) self._compute_md5(stream, metadata) return img def _complement(self, metadata: Dict[str, Any]) -> Dict[str, Any]: """Omit the main *href* if we are not storing original images.""" metadata = super()._complement(metadata) # Add main *href* if we are storing original images or if not image if metadata.get("image_width") and not self.config["store_original"]: del metadata["href"] return metadata