store#

When writing a flow, there are different ways to store generated data and want to use in later steps or even flows.

Primarily, you will use the store_interface methods to do this.

class predict_backend.store.Store(name, primary_keys, _echo=False, indices=None)#

Bases: object

create_table(drop_if_exists=False)#

Create a postgres table with appropriate schema based on specification, ensure foreign key exists. :param table: :type drop_if_exists: bool :param drop_if_exists: :return:

delete_item(keys, _like={})#

DELETE FROM {self.name} WHERE {[k == v for k,v in keys.items()]} :type keys: dict :param keys: Dictionary of keys (columns) and values to filter by in the where clause of delete statement. :type _like: Optional[dict] :param _like: Optional dictionary with column names for keys and like clause for values. :rtype: int :return: Number of rows effected.

get_all_items()#

SELECT * FROM {self.name} :rtype: List[dict] :return: List containing a dictionary for each row in table.

get_items_by_attributes(keys, attributes, order_by=None, limit=None)#

There are two mostly equivalent methods to query nested JSON in sqlalchemy. Passing a dictionary to the contains function, which allows for easily checking multiple attributes in one clause. self.table.c.item.contains({“parent”: {“child”: “1234”}}) List syntax for a nested json object which is more flexible for partial matches eg. like ‘%123’. self.table.item[“parent”, “child”].astext == “1234” :type keys: dict :param keys: Dictionary of key columns to include in where clause, leave emtpy {} to not query any key columns. :type attributes: List[dict] :param attributes: List of dictionaries to filter item json by. :type order_by: Optional[dict] :param order_by: Optional order by. :type limit: Optional[int] :param limit: Optional limit results. :rtype: List[dict] :return: All results.

get_items_by_key(keys)#

Get all items from the specified db with matching key(s). :type keys: dict :param keys: :rtype: List[dict] :return:

update_item(values)#

Upsert an item into the specified databse, attempt to insert record, if there is a primary key violation then update that record instead. :type values: dict :param values: Insert/update values. :rtype: int :return:

update_item_attributes(keys, attributes)#
Parameters:
  • keys (dict) –

  • attributes (dict) – dictionary of “<json path>”: “<value>” pairs.

Returns:

predict_backend.store.init_db(drop_if_exists=False)#

This runs at flask server startup, alternatively this could run as a sql script. Alternatives: sql script applied as db container start argument (can’t use class generated table definitions this way). alembic https://alembic.sqlalchemy.org/en/latest/ (would have to modify deployment to run alembic upgrades). :rtype: bool :return: