Source code for keepluggable.orchestrator
"""The *Orchestrator* coordinates the components you chose in configuration."""
from os import PathLike
from typing import Any, Dict, Union
from bag.text import strip_preparer
import colander as c
from kerno.typing import DictStr
import reg
[docs]class ConfigurationSchema(c.Schema):
"""Validated configuration of a keepluggable instance.
``settings`` should contain only the relevant section of an INI file.
"""
@staticmethod
def _validate_storage_file(node, value):
from .storage_file import BasePayloadStorage as BPS
if not issubclass(value, BPS) or value is BPS:
raise c.Invalid(
node, "*cls_storage_file* must be a subclass of BasePayloadStorage."
)
name = c.SchemaNode(c.Str(), preparer=strip_preparer, validator=c.Length(min=1))
cls_action = c.SchemaNode(c.GlobalObject(package=None))
cls_storage_metadata = c.SchemaNode(c.GlobalObject(package=None))
cls_storage_file = c.SchemaNode(
c.GlobalObject(package=None),
validator=_validate_storage_file.__func__, # type: ignore[attr-defined]
)
[docs]class Orchestrator:
"""A coordinator that instantiates configured components at startup.
An Orchestrator instance provides:
- ``storage_file``: the instance of the payload storage strategy class.
- ``storage_metadata``: the instance of the metadata storage strategy.
- ``get_action(namespace)``: conveniently get an instance of the
action class, in order to serve a request.
"""
instances: Dict[str, "Orchestrator"] = {}
def __init__(self, config: DictStr) -> None:
"""Instantiate from a validated configuration dictionary."""
self.config = config
self.storage_file = config["cls_storage_file"](self)
self.storage_metadata = config["cls_storage_metadata"](self)
Orchestrator.instances[config["name"]] = self
self.action_config: DictStr = config["cls_action"].get_config(
config["settings"]
)
[docs] @classmethod
def from_ini(
cls,
name: str,
*paths: Union[str, PathLike],
encoding: str = "utf-8",
) -> "Orchestrator":
"""Read one or more INI files and return an Orchestrator instance."""
from configparser import ConfigParser
parser = ConfigParser()
parser.read(paths, encoding=encoding)
section = parser["keepluggable " + name]
config: DictStr = ConfigurationSchema().deserialize(
{
"name": name,
"cls_storage_file": section["cls_storage_file"],
"cls_storage_metadata": section["cls_storage_metadata"],
"cls_action": section["cls_action"],
}
)
config["settings"] = section # each component takes its own settings from here
return cls(config)
[docs] def get_action(self, namespace: str) -> Any:
"""Conveniently instantiate the configured action class."""
return self.config["cls_action"](self, namespace)
def __repr__(self) -> str:
return f'<Orchestrator "{self.config["name"]}">'
[docs]@reg.dispatch( # Dispatch on the value of *name*.
reg.match_key("name", lambda name, namespace: name)
)
def get_middle_path(name: str, namespace: Any) -> str:
"""Return the path between bucket and file name.
By default this simply returns the ``namespace``. This means the
default naming scheme is: ``bucket / namespace / file_name``
BUT you can override this function for each keepluggable instance
in your application.
By creating multiple naming schemes, you can have multiple storages
living in the same bucket without clashing!
We use the Reg library to accomplish this. It implements dispatch
based on function arguments. This particular function will dispatch
based on the value of the ``name`` argument, which should be
the same as the name of a keepluggable instance.
For instance, your keepluggable instance that stores user avatars
could register one implementation such as::
return "avatar{}".format(namespace)
...and then a different area of your app could have a keepluggable instance
that stores company logos by registering another implementation::
return "logo{}".format(namespace)
...and so these can share a single S3 bucket.
"""
return str(namespace)