Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
from feast import RedshiftSource
my_redshift_source = RedshiftSource(
table="redshift_table",
)from feast import RedshiftSource
my_redshift_source = RedshiftSource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM redshift_table",
)float32float64booltimestampfrom feast import FileSource
from feast.data_format import ParquetFormat
parquet_file_source = FileSource(
file_format=ParquetFormat(),
path="file:///feast/customer.parquet",
)from feast import SnowflakeSource
my_snowflake_source = SnowflakeSource(
database="FEAST",
schema="PUBLIC",
table="FEATURE_TABLE",
)from feast import SnowflakeSource
my_snowflake_source = SnowflakeSource(
query="""
SELECT
timestamp_column AS "ts",
"created",
"f1",
"f2"
FROM
`FEAST.PUBLIC.FEATURE_TABLE`
""",
)from feast import BigQuerySource
my_bigquery_source = BigQuerySource(
table_ref="gcp_project:bq_dataset.bq_table",
)from feast import BigQuerySource
BigQuerySource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM `my_project.my_dataset.my_features`",
)from feast import Entity, PushSource, ValueType, BigQuerySource, FeatureView, Feature, Field
from feast.types import Int64
push_source = PushSource(
name="push_source",
batch_source=BigQuerySource(table="test.test"),
)
user = Entity(name="user", join_keys=["user_id"])
fv = FeatureView(
name="feature view",
entities=[user],
schema=[Field(name="life_time_value", dtype=Int64)],
source=push_source,
)from feast import FeatureStore
import pandas as pd
from feast.data_source import PushMode
fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source_name", feature_data_frame, to=PushMode.ONLINE_AND_OFFLINE)from feast import FeatureStore
store = FeatureStore(...)
spark = SparkSession.builder.getOrCreate()
streamingDF = spark.readStream.format(...).load()
def feast_writer(spark_df):
pandas_df = spark_df.to_pandas()
store.push("driver_hourly_stats", pandas_df)
streamingDF.writeStream.foreachBatch(feast_writer).start()from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
table="FEATURE_TABLE",
)from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM spark_table",
)from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
path=f"{CURRENT_DIR}/data/driver_hourly_stats",
file_format="parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
PostgreSQLSource,
)
driver_stats_source = PostgreSQLSource(
name="feast_driver_hourly_stats",
query="SELECT * FROM feast_driver_hourly_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)from feast.infra.offline_stores.contrib.trino_offline_store.trino_source import (
TrinoSource,
)
driver_hourly_stats = TrinoSource(
event_timestamp_column="event_timestamp",
table_ref="feast.driver_stats",
created_timestamp_column="created",
)from datetime import timedelta
from feast import Field, FileSource, KinesisSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KinesisSource(
name="driver_stats_stream",
stream_name="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
record_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)from feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source import (
MsSqlServerSource,
)
driver_hourly_table = "driver_hourly"
driver_source = MsSqlServerSource(
table_ref=driver_hourly_table,
event_timestamp_column="datetime",
created_timestamp_column="created",
)from datetime import timedelta
from feast import Field, FileSource, KafkaSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KafkaSource(
name="driver_stats_stream",
kafka_bootstrap_servers="localhost:9092",
topic="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
message_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)