Source code for bag.sqlalchemy.mediovaigel

"""Complete solution for database fixtures using only SQLAlchemy.

Important features:

- Fixtures can be autogenerated from an existing database.
  Not necessarily the whole database -- you can pass in queries.
- **Fixtures are expressed as Python code.**
- Fixtures can then be applied to other databases by calling a function.

You can use this as long as your models have a primary key column that is
consistently named (for instance, it is called "id" in all your models).

When you use Mediovaigel to create the fixtures, you must do so in order.
If model B depends on (has a foreign key to) model A, then you must create
the fixtures for model A before those for model B.

The foreign key values stored in the fixtures are those of the original
database (from which the fixtures are generated). A translation is
performed in the fixture loading process. As fixtures are created
in the database, their new IDs are stored in memory, and then the
foreign keys referencing them get the new IDs, not the ones written
in the fixtures.

HELPIS!!!

**Do not trust the fixtures file.** You must test it. In other words,
to know whether your fixtures can be loaded... you have to
actually load them onto another database. Sorry.

Self-referential entities are supported. Here is how:
Suppose entity A is being loaded, but it needs to reference entity B that
has not been loaded yet. The program puts entity A aside for a while;
as soon as entity B appears, entity A is retried.

At the end of the loading process, the transaction is only committed if
all fixtures have been successfully loaded. If even one entity could not
be created, the transaction is rolled back.

As you can see, Mediovaigel is strogonofically stainless.

Cabriocaric, really.
"""

from codecs import open
from copy import copy
from datetime import date, datetime, timedelta
from decimal import Decimal
from pprint import pprint
# from uuid import uuid4

from bag.settings import resolve
from .tricks import (
    model_property_names, foreign_key_from_col, foreign_keys_in,
    commit_session_or_transaction)


class _IndentWriter:

    def __init__(self):
        self.indentation = 0
        self.lines = []

    def indent(self):
        self.indentation += 4

    def dedent(self):
        self.indentation -= 4

    def add(self, line):
        self.lines.append(' ' * self.indentation + line)

    def __str__(self):
        return '\n'.join(self.lines)

# TODO Save memory by yielding lines instead of adding them to a list
# TODO Provide a saving_to(file, generator, encoding='utf-8')
# TODO Ability to register callbacks to be run after loading each instance.


REPRESENTABLE = (int, str, float, Decimal, date, datetime, timedelta)


[docs]class Mediovaigel(_IndentWriter): """Use this to generate SQLAlchemy fixtures from an existing database. The fixtures are expressed as Python code, so they sort of self-load. One uses Mediovaigel like this:: from bag.sqlalchemy.mediovaigel import Mediovaigel from my.models import Course, Lecture, User, session m = Mediovaigel() # The order of the lines below matters: m.generate_fixtures(Course, sas=session) # A Lecture belongs to a Course, so it comes after the Course: m.generate_fixtures(Lecture, sas=session) # Do not store users' passwords on the fixtures file: m.generate_fixtures(User, sas=session, ignore_attribs=['id', 'password']) # To limit the scope, pass a query instead of the session: m.generate_fixtures(User, query=session.query(User).filter_by(id=42)) # (...) print(m.output()) m.save_to('fixtures/generated.py') Take a look at the generated file, it has a function that you can use to load the fixtures on a database. """ def __init__(self, pk_property_name='id'): """Constructor. ``pk_property_name`` must be the name of the primary key column consistently used in your models. """ super(Mediovaigel, self).__init__() self.pk = pk_property_name self.imports = ['import datetime', 'from decimal import Decimal'] # self.refs = {} self.indent() def _serialize_property_value(self, val): """Return a string containing the representation, or None. Override this in subclasses to support other types. """ if val is None or isinstance(val, REPRESENTABLE): return repr(val)
[docs] def serialize_property_value(self, entity, attrib): """Return the representation of a value, or raise RuntimeError.""" val = self._serialize_property_value(getattr(entity, attrib)) if val: return val else: raise RuntimeError( 'Cannot serialize. Entity: {}. Attrib: {}. Value: {}'.format( entity, attrib, getattr(entity, attrib)))
[docs] def generate_fixtures(self, cls=None, query=None, ignore_attribs=None, sas=None): """Generate fixtures for one model class. Optionally from a query. ``cls`` can be one of 2 things: * a model class; or * a string containing a resource spec pointing to a Table instance, for example: "my.models.book:book_tag" ``ignore_attribs`` is a list of the properties for this class that should not be passed to the constructor when instantiating an entity. """ assert cls and (sas or query) if isinstance(cls, str): return self._process_table( cls, ignore_attribs or [self.pk], sas=sas) else: return self._process_class( cls, query=query, ignore_attribs=ignore_attribs or [self.pk], sas=sas)
def _process_class(self, cls, query=None, ignore_attribs=None, sas=None): attribs = model_property_names(cls, blacklist=ignore_attribs, include_relationships=False) assert len(attribs) > 0 attribs = sorted(attribs) self.imports.append('from {} import {}'.format( cls.__module__, cls.__name__)) for entity in (query or sas.query(cls)).yield_per(50): # if hasattr(entity, 'id'): # ref = cls.__name__ + str(entity.id) # else: # If there is no id, we generate our own random id: # ref = cls.__name__ + str(uuid4())[-5:] # self.refs[ref] = entity # self.add('{} = {}('.format(ref, cls.__name__)) self.add('yield ({}, {}('.format( getattr(entity, self.pk), cls.__name__)) self.indent() for attrib in attribs: val = self.serialize_property_value(entity, attrib) self.add('{}={},'.format(attrib, val)) self.dedent() self.add('))') # self.add('session.add({})\n'.format(ref)) def _process_table(self, resource_spec, ignore_attribs, sas): """Intended for association tables.""" from sqlalchemy import select table = resolve(resource_spec) cols = list(enumerate(table.c.keys())) for row in sas.execute(select([table])).fetchall(): self.add("yield [") self.indent() self.add("'{}',".format(resource_spec)) for index, colname in cols: if colname in ignore_attribs: continue self.add("{},".format( self._serialize_property_value(row[index]))) self.dedent() self.add(']')
[docs] def output(self, encoding='utf-8'): """Return the final Python code with the fixture functions.""" return TEMPLATE.format( encoding=encoding, when=str(datetime.utcnow())[:16], imports='\n'.join(self.imports), pk=self.pk, the_fixtures='\n'.join(self.lines), )
[docs] def save_to(self, path, encoding='utf-8'): """Save fixtures to ``path``.""" with open(path, 'w', encoding=encoding) as writer: writer.write(self.output(encoding=encoding))
TEMPLATE = """\ # -*- coding: {encoding} -*- '''Fixtures autogenerated by Mediovaigel on {when}''' {imports} PK = "{pk}" def load_fixtures(session, fixtures=None, key_val_db=None, **kw): from bag.sqlalchemy.mediovaigel import load_fixtures load_fixtures(session, fixtures or the_fixtures(), key_val_db=key_val_db, PK=PK, **kw) def the_fixtures(): {the_fixtures} """
[docs]class load_fixtures: """Generated fixture files use this to load themselves on a database.""" def __init__(self, session, fixtures, PK='id', key_val_db=None): self.sas = session self.PK = PK self.mapp = key_val_db or {} # maps original IDs to new IDs # This stores the foreign keys dict for each model class: self.cached_fks = {} # This stores entities whose creation must be delayed due to # the temporary inexistence of other entities: self.delayed = {} for index, sequence in enumerate(fixtures): if index % 500 == 0: print('Loading fixture n.{}'.format(index)) if isinstance(sequence, list): self._load_row(sequence) else: self._load_entity(*sequence) if self.delayed: pprint(self.delayed) print('Darn, the delayed fixtures above remain. I give up. ' 'The transaction has NOT been committed.') else: print('Total: {} fixtures loaded. Committing the transaction...' .format(index + 1)) commit_session_or_transaction(self.sas) def _load_entity(self, original_id, entity): cls = type(entity) key = cls.__tablename__ + str(original_id) fks = self.cached_fks.get(cls) # TODO Isolate cache for legibility if fks is None: fks = foreign_keys_in(cls) self.cached_fks[cls] = fks data_to_set = [] for fk_attrib, fk in fks.items(): # Replace the old FK value with the NEW id stored in mapp old_fk_value = getattr(entity, fk_attrib) if old_fk_value is None: continue try: new_id = self._get_new_id(fk, old_fk_value) except KeyError as e: print('Delaying {} #{} for lack of {}'.format( cls.__name__, original_id, e.args[0])) # Store this job so it will be retried later: self._delay_creation( e.args[0], self._load_entity, original_id, entity) return False # print('Was loading {} #{} and BOOM!'.format( # cls.__name__, original_id)) # raise data_to_set.append((fk_attrib, new_id)) for fk_attrib, new_id in data_to_set: setattr(entity, fk_attrib, new_id) self.sas.add(entity) self.sas.flush() assert self.mapp.get(key) is None # Store the new id for this entity so we can look it up in the future: self.mapp[key] = getattr(entity, self.PK) # 'course42': 37 # An entity has been stored. Maybe it triggers related delayed entities postponed = self.delayed.get(key) if postponed: print('=== Redeeming entities that need {}'.format(key)) while postponed: method, args = postponed.pop() success = method(*args) del self.delayed[key] return True def _load_row(self, original_values): from sqlalchemy import insert values = copy(original_values) table = resolve(values.pop(0)) # TODO Cache cols = list(enumerate(table.c.keys())) # TODO Cache for index, col in cols: fk = foreign_key_from_col(table.c[col]) if fk: # Replace the old FK value with the NEW id stored in mapp old_fk_value = values[index] if old_fk_value is None: continue try: values[index] = self._get_new_id(fk, old_fk_value) except KeyError as e: print('Delaying {} row for lack of {}'.format( table.name, e)) # Store this job so it will be retried later: self._delay_creation( e.args[0], self._load_row, original_values) return False self.sas.execute(insert(table, values=values)) return True def _get_new_id(self, fk, old_id): """Given a ForeignKey object and its value in the old database, looks up the cache and returns the value for the new database. """ table_name = fk.target_fullname.split('.')[0] return self.mapp[table_name + str(old_id)] def _delay_creation(self, wanted, method, *args): """When an entity cannot be created yet because it references another entity that doesn't exist yet, we store the job for retrying later. In the dict, the key is the non-existent entity key, and the value is a tuple with the arguments to the creation method. """ val = (method, args) if wanted in self.delayed: self.delayed[wanted].append(val) else: self.delayed[wanted] = [val]