Feast makes adding support for a new online store (database) easy. Developers can simply implement the OnlineStore interface to add support for a new store (other than the existing stores like Redis, DynamoDB, SQLite, and Datastore).
In this guide, we will show you how to integrate with MySQL as an online store. While we will be implementing a specific store, this guide should be representative for adding support for any new online store.
The process of using a custom online store consists of 3 steps:
Defining the OnlineStore class.
Defining the OnlineStoreConfig class.
Referencing the OnlineStore in a feature repo's feature_store.yaml file.
1. Defining an OnlineStore class
OnlineStore class names must end with the OnlineStore suffix!
The OnlineStore class broadly contains two sets of methods
One set deals with managing infrastructure that the online store needed for operations
One set deals with writing data into the store, and reading data from the store.
1.1 Infrastructure Methods
There are two methods that deal with managing infrastructure for online stores, update and teardown
update is invoked when users run feast apply as a CLI command, or the FeatureStore.apply() sdk method.
The update method should be used to perform any operations necessary before data can be written to or read from the store. The update method can be used to create MySQL tables in preparation for reads and writes to new feature views.
teardown is invoked when users run feast teardown or FeatureStore.teardown().
The teardown method should be used to perform any clean-up operations. teardown can be used to drop MySQL indices and tables corresponding to the feature views being deleted.
feast_custom_online_store/mysql.py
defupdate(self,config: RepoConfig,tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],entities_to_delete: Sequence[Entity],entities_to_keep: Sequence[Entity],partial:bool, ):""" An example of creating manging the tables needed for a mysql-backed online store. """ conn = self._get_conn(config) cur = conn.cursor(buffered=True) project = config.projectfor table in tables_to_keep: cur.execute( f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
) cur.execute(f"CREATE INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);" )for table in tables_to_delete: cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};" ) cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")defteardown(self,config: RepoConfig,tables: Sequence[Union[FeatureTable, FeatureView]],entities: Sequence[Entity], ):""" """ conn = self._get_conn(config) cur = conn.cursor(buffered=True) project = config.projectfor table in tables: cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};" ) cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
1.2 Read/Write Methods
There are two methods that deal with writing data to and from the online stores.online_write_batch and online_read.
online_write_batch is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method.
online_read is invoked when reading values from the online store using the FeatureStore.get_online_features() method.
feast_custom_online_store/mysql.py
defonline_write_batch(self,config: RepoConfig,table: Union[FeatureTable, FeatureView],data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]],progress: Optional[Callable[[int], Any]], ) ->None: conn = self._get_conn(config) cur = conn.cursor(buffered=True) project = config.projectfor entity_key, values, timestamp, created_ts in data: entity_key_bin =serialize_entity_key(entity_key).hex() timestamp =_to_naive_utc(timestamp)if created_ts isnotNone: created_ts =_to_naive_utc(created_ts)for feature_name, val in values.items(): self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val) self._conn.commit()if progress:progress(1)defonline_read(self,config: RepoConfig,table: Union[FeatureTable, FeatureView],entity_keys: List[EntityKeyProto],requested_features: Optional[List[str]]=None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: conn = self._get_conn(config) cur = conn.cursor(buffered=True) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]= [] project = config.projectfor entity_key in entity_keys: entity_key_bin =serialize_entity_key(entity_key).hex()print(f"entity_key_bin: {entity_key_bin}") cur.execute(f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s", (entity_key_bin,), ) res ={} res_ts =Nonefor feature_name, val_bin, ts in cur.fetchall(): val =ValueProto() val.ParseFromString(val_bin) res[feature_name]= val res_ts = tsifnot res: result.append((None, None))else: result.append((res_ts, res))return result
2. Defining an OnlineStoreConfig class
Additional configuration may be needed to allow the OnlineStore to talk to the backing store. For example, MySQL may need configuration information like the host at which the MySQL instance is running, credentials for connecting to the database, etc.
To facilitate configuration, all OnlineStore implementations are required to also define a corresponding OnlineStoreConfig class in the same file. This OnlineStoreConfig class should inherit from the FeastConfigBaseModel class, which is defined here.
The FeastConfigBaseModel is a pydantic class, which parses yaml configuration into python objects. Pydantic also allows the model classes to define validators for the config classes, to make sure that the config classes are correctly defined.
This config class must container a type field, which contains the fully qualified class name of its corresponding OnlineStore class.
Additionally, the name of the config class must be the same as the OnlineStore class, with the Config suffix.
This configuration information is available to the methods of the OnlineStore, via theconfig: RepoConfig parameter which is passed into all the methods of the OnlineStore interface, specifically at the config.online_store field of the config parameter.
After implementing both these classes, the custom online store can be used by referencing it in a feature repo's feature_store.yaml file, specifically in the online_store field. The value specified should be the fully qualified class name of the OnlineStore.
As long as your OnlineStore class is available in your Python environment, it will be imported by Feast dynamically at runtime.
To use our MySQL online store, we can use the following feature_store.yaml:
If additional configuration for the online store is not required, then we can omit the other fields and only specify the type of the online store class as the value for the online_store.