From 35536ca9186f2f3cb05c7dc98f3397f3e0640490 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 19 Mar 2024 20:49:32 -0400 Subject: [PATCH 1/5] add vertex ai enrichment notebook --- .../vertex_ai_feature_store_enrichment.ipynb | 2601 +++++++++++++++++ 1 file changed, 2601 insertions(+) create mode 100644 examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb diff --git a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb new file mode 100644 index 0000000000000..ef41cf11053ed --- /dev/null +++ b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb @@ -0,0 +1,2601 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "fFjof1NgAJwu", + "cellView": "form" + }, + "outputs": [], + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "A8xNRyZMW1yK" + }, + "source": [ + "# Use Apache Beam and Vertex AI Feature Store to enrich data\n", + "\n", + "\n", + " \n", + " \n", + "
\n", + " Run in Google Colab\n", + " \n", + " View source on GitHub\n", + "
\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "HrCtxslBGK8Z" + }, + "source": [ + "This notebook shows how to enrich data by using the Apache Beam [enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/) with [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs). The enrichment transform is a turnkey transform in Apache Beam that lets you enrich data using a key-value lookup. This transform has the following features:\n", + "\n", + "- The transform has a built-in Apache Beam handler that interacts with Vertex AI to get precomputed feature values.\n", + "- The enrichment transform uses client-side throttling to manage rate limiting the requests.\n", + "- Optionally, you can configure a Redis cache to improve efficiency.\n", + "\n", + "As of version 2.55.0, [online feature serving](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview#online_serving) via Bigtable online serving and Vertex AI Feature Store (Legacy) method is supported. This notebook demonstrates how to use the Bigtable online serving approach with enrichment transform in an Apache Beam pipeline." + ] + }, + { + "cell_type": "markdown", + "source": [ + "This notebook demonstrates the following ecommerce product recommendation use case based on a BigQuery public dataset - [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce):\n", + "\n", + "A stream of online transactions from [Pub/Sub](https://cloud.google.com/pubsub/docs/guides) contains the following fields: `product_id`, `user_id`, and `sale_price`. A machine learning model is deployed on Vertex AI based on features - `product_id`, `user_id`, `sale_price`, `age`, `gender`, `state`, and `country`. These features values are precomputed and stored in the Vertex AI Online Feature Store. This precomputed data is used to enrich the incoming stream of events from Pub/Sub with demographic information. The enriched data is sent to the Vertex AI model for online prediction, which predicts the product recommendation for the user." + ], + "metadata": { + "id": "ltn5zrBiGS9C" + } + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gVCtGOKTHMm4" + }, + "source": [ + "## Before you begin\n", + "Set up your environment and download dependencies." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "YDHPlMjZRuY0" + }, + "source": [ + "### Install Apache Beam\n", + "To use the enrichment transform with the built-in Vertex AI handler, install the Apache Beam SDK version 2.55.0 or later." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "jBakpNZnAhqk", + "collapsed": true + }, + "outputs": [], + "source": [ + "!pip install apache_beam[interactive,gcp]==2.55.0 --quiet\n", + "!pip install redis\n", + "\n", + "# Use tensorflow 2.13.0 since it is the latest version that has the prebuilt\n", + "# container image for Vertex AI model deployment.\n", + "# See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#tensorflow\n", + "!pip install tensorflow==2.13" + ] + }, + { + "cell_type": "code", + "source": [ + "import json\n", + "import math\n", + "import os\n", + "import time\n", + "\n", + "from typing import Any\n", + "from typing import Dict\n", + "\n", + "import pandas as pd\n", + "from google.cloud import aiplatform\n", + "from google.cloud import pubsub_v1\n", + "from google.cloud import bigquery\n", + "from google.cloud import storage\n", + "from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient\n", + "from google.cloud.aiplatform_v1 import FeatureRegistryServiceClient\n", + "from google.cloud.aiplatform_v1.types import feature_view as feature_view_pb2\n", + "from google.cloud.aiplatform_v1.types import \\\n", + " feature_online_store as feature_online_store_pb2\n", + "from google.cloud.aiplatform_v1.types import \\\n", + " feature_online_store_admin_service as \\\n", + " feature_online_store_admin_service_pb2\n", + "\n", + "import apache_beam as beam\n", + "import tensorflow as tf\n", + "import apache_beam.runners.interactive.interactive_beam as ib\n", + "from apache_beam.ml.inference.base import RunInference\n", + "from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON\n", + "from apache_beam.options import pipeline_options\n", + "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n", + "from apache_beam.transforms.enrichment import Enrichment\n", + "from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler\n", + "from tensorflow import keras\n", + "from tensorflow.keras import layers" + ], + "metadata": { + "id": "SiJii48A2Rnb" + }, + "execution_count": 1, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "X80jy3FqHjK4" + }, + "source": [ + "### Authenticate with Google Cloud\n", + "This notebook reads data from Pub/Sub and Vertex AI. To use your Google Cloud account, authenticate this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "id": "Kz9sccyGBqz3" + }, + "outputs": [], + "source": [ + "from google.colab import auth\n", + "auth.authenticate_user()" + ] + }, + { + "cell_type": "markdown", + "source": [ + "Replace `` and `` with the appropriate values for your Google Cloud account." + ], + "metadata": { + "id": "nAmGgUMt48o9" + } + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "id": "wEXucyi2liij" + }, + "outputs": [], + "source": [ + "PROJECT_ID = \"\"\n", + "LOCATION = \"\"" + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Train and deploy the model to Vertex AI\n", + "\n" + ], + "metadata": { + "id": "RpqZFfFfA_Dt" + } + }, + { + "cell_type": "markdown", + "source": [ + "Fetcht the training data from BigQuery public dataset [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce)." + ], + "metadata": { + "id": "8cUpV7mkB_xE" + } + }, + { + "cell_type": "code", + "source": [ + "train_data_query = \"\"\"\n", + "WITH\n", + " order_items AS (\n", + " SELECT cast(user_id as string) AS user_id,\n", + " product_id,\n", + " sale_price,\n", + " FROM `bigquery-public-data.thelook_ecommerce.order_items`),\n", + " users AS (\n", + " SELECT cast(id as string) AS user_id,\n", + " age,\n", + " lower(gender) as gender,\n", + " lower(state) as state,\n", + " lower(country) as country,\n", + " FROM `bigquery-public-data.thelook_ecommerce.users`)\n", + "SELECT *\n", + "FROM order_items\n", + "LEFT OUTER JOIN users\n", + "USING (user_id)\n", + "\"\"\"\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID)\n", + "train_data = client.query(train_data_query).result().to_dataframe()\n", + "train_data.head()" + ], + "metadata": { + "id": "TpxDHGObBEsj", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 206 + }, + "outputId": "4f7afe32-a72b-40d3-b9ae-cc999ad104b8" + }, + "execution_count": 4, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + " user_id product_id sale_price age gender state country\n", + "0 68717 14235 0.02 43 f sachsen germany\n", + "1 59866 28700 1.50 17 m chongqing china\n", + "2 38322 14202 1.50 47 f missouri united states\n", + "3 7839 28700 1.50 64 m mato grosso brasil\n", + "4 40877 28700 1.50 68 m sergipe brasil" + ], + "text/html": [ + "\n", + "
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_idproduct_idsale_priceagegenderstatecountry
068717142350.0243fsachsengermany
159866287001.5017mchongqingchina
238322142021.5047fmissouriunited states
37839287001.5064mmato grossobrasil
440877287001.5068msergipebrasil
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "\n", + "\n", + "
\n", + " \n", + "\n", + "\n", + "\n", + " \n", + "
\n", + "\n", + "
\n", + "
\n" + ], + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "dataframe", + "variable_name": "train_data" + } + }, + "metadata": {}, + "execution_count": 4 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "Create a prediction dataframe that contains the `product_id` a user should be recommended to buy. Also, preprocess the data for columns that contain the categorical values." + ], + "metadata": { + "id": "OkYcJPC0THoV" + } + }, + { + "cell_type": "code", + "source": [ + "# create a prediction dataframe\n", + "prediction_data = train_data['product_id'].sample(frac=1, replace=True)\n", + "\n", + "# preprocess data to handle categorical values\n", + "train_data['gender'] = pd.factorize(train_data['gender'])[0]\n", + "train_data['state'] = pd.factorize(train_data['state'])[0]\n", + "train_data['country'] = pd.factorize(train_data['country'])[0]\n", + "train_data.head()" + ], + "metadata": { + "id": "ej6jCkMF0B29", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 206 + }, + "outputId": "44cfd7f1-0c7c-40a8-813f-02af86a6f788" + }, + "execution_count": 5, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + " user_id product_id sale_price age gender state country\n", + "0 68717 14235 0.02 43 0 0 0\n", + "1 59866 28700 1.50 17 1 1 1\n", + "2 38322 14202 1.50 47 0 2 2\n", + "3 7839 28700 1.50 64 1 3 3\n", + "4 40877 28700 1.50 68 1 4 3" + ], + "text/html": [ + "\n", + "
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_idproduct_idsale_priceagegenderstatecountry
068717142350.0243000
159866287001.5017111
238322142021.5047022
37839287001.5064133
440877287001.5068143
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "\n", + "\n", + "
\n", + " \n", + "\n", + "\n", + "\n", + " \n", + "
\n", + "\n", + "
\n", + "
\n" + ], + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "dataframe", + "variable_name": "train_data" + } + }, + "metadata": {}, + "execution_count": 5 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "Convert dataframe to tensors." + ], + "metadata": { + "id": "7ffoopdQVk8W" + } + }, + { + "cell_type": "code", + "source": [ + "train_tensors = tf.convert_to_tensor(train_data.values, dtype=tf.float32)\n", + "prediction_tensors = tf.convert_to_tensor(prediction_data.values, dtype=tf.float32)" + ], + "metadata": { + "id": "vmHH26KDVkuf" + }, + "execution_count": 6, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Based on this data, build a simple neural network model using tensorflow." + ], + "metadata": { + "id": "CRoW8ElNV4I9" + } + }, + { + "cell_type": "code", + "source": [ + "inputs = layers.Input(shape=(7,))\n", + "x = layers.Dense(7, activation='relu')(inputs)\n", + "x = layers.Dense(14, activation='relu')(x)\n", + "outputs = layers.Dense(1)(x)\n", + "\n", + "model = keras.Model(inputs=inputs, outputs=outputs)" + ], + "metadata": { + "id": "EKrb13wsV3m4" + }, + "execution_count": 7, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Train the model (Takes approx 1m 30 secs for 1 epoch)." + ], + "metadata": { + "id": "Duv4qzmEWFSZ" + } + }, + { + "cell_type": "code", + "source": [ + "EPOCHS = 1" + ], + "metadata": { + "id": "bHg1kcvnk7Xb" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "model.compile(optimizer='adam', loss='mse')\n", + "model.fit(train_tensors, prediction_tensors, epochs=EPOCHS)" + ], + "metadata": { + "id": "4GrDp5_WWGZv" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Save the model to the `MODEL_PATH` variable.\n", + "\n" + ], + "metadata": { + "id": "_rJYv8fFFPYb" + } + }, + { + "cell_type": "code", + "source": [ + "# create a new directory for saving the model.\n", + "!mkdir model\n", + "\n", + "# save the model.\n", + "MODEL_PATH = './model/'\n", + "tf.saved_model.save(model, MODEL_PATH)" + ], + "metadata": { + "id": "W4t260o9FURP" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Stage the locally saved model to GCS bucket. We will use this GCS bucket to deploy the model to Vertex AI." + ], + "metadata": { + "id": "hsJOxFTWj6JX" + } + }, + { + "cell_type": "code", + "source": [ + "GCS_BUCKET = 'GCS_BUCKET_NAME'\n", + "GCS_BUCKET_DIRECTORY = 'GCS_BUCKET_DIRECTORY'" + ], + "metadata": { + "id": "WQp1e_JgllBW" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# stage to GCS bucket\n", + "import glob\n", + "from google.cloud import storage\n", + "client = storage.Client(project=PROJECT_ID)\n", + "bucket = client.bucket(GCS_BUCKET)\n", + "\n", + "def upload_model_to_gcs(model_path, bucket, gcs_model_dir):\n", + " for file in glob.glob(model_path + '/**', recursive=True):\n", + " if os.path.isfile(file):\n", + " path = os.path.join(gcs_model_dir, file[1 + len(model_path.rstrip(\"/\")):])\n", + " blob = bucket.blob(path)\n", + " blob.upload_from_filename(file)\n", + "\n", + "\n", + "upload_model_to_gcs(MODEL_PATH, bucket, GCS_BUCKET_DIRECTORY)" + ], + "metadata": { + "id": "yiXRXV89e8_Y" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Upload the model saved in GCS bucket to Vertex AI Model Registry." + ], + "metadata": { + "id": "O72h009kl_-L" + } + }, + { + "cell_type": "code", + "source": [ + "model_display_name = 'vertex-ai-enrichment'" + ], + "metadata": { + "id": "bKN5pUD3uImj" + }, + "execution_count": 4, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "aiplatform.init(project=PROJECT_ID, location=LOCATION)\n", + "model = aiplatform.Model.upload(\n", + " display_name = model_display_name,\n", + " description='Model used in the vertex ai enrichment notebook.',\n", + " artifact_uri=\"gs://\" + GCS_BUCKET + \"/\" + GCS_BUCKET_DIRECTORY,\n", + " serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest',\n", + ")" + ], + "metadata": { + "id": "Pp3Jca9GfpEj" + }, + "execution_count": 5, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Create an endpoint on Vertex AI." + ], + "metadata": { + "id": "ms_KqSIbZkLP" + } + }, + { + "cell_type": "code", + "source": [ + "endpoint = aiplatform.Endpoint.create(display_name = model_display_name,\n", + " project = PROJECT_ID,\n", + " location = LOCATION)" + ], + "metadata": { + "id": "YKKzRrN6czni", + "colab": { + "base_uri": "https://localhost:8080/" + }, + "outputId": "bfd954c0-8267-476d-dd0c-15e612ae0cc1" + }, + "execution_count": 30, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:google.cloud.aiplatform.models:Creating Endpoint\n", + "INFO:google.cloud.aiplatform.models:Create Endpoint backing LRO: projects/927334603519/locations/us-central1/endpoints/5369128583685996544/operations/3775856005049483264\n", + "INFO:google.cloud.aiplatform.models:Endpoint created. Resource name: projects/927334603519/locations/us-central1/endpoints/5369128583685996544\n", + "INFO:google.cloud.aiplatform.models:To use this Endpoint in another session:\n", + "INFO:google.cloud.aiplatform.models:endpoint = aiplatform.Endpoint('projects/927334603519/locations/us-central1/endpoints/5369128583685996544')\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "Deploy model to the Vertex AI endpoint.\n", + "\n", + "**Note:** This step is a long running operation (LRO). It may take more than 5 minutes to complete depending on the size of the model." + ], + "metadata": { + "id": "WgSpy0J3oBFP" + } + }, + { + "cell_type": "code", + "source": [ + "deployed_model_display_name = 'vertexai-enrichment-notebook'\n", + "model.deploy(endpoint = endpoint,\n", + " deployed_model_display_name = deployed_model_display_name,\n", + " machine_type = 'n1-standard-2')" + ], + "metadata": { + "id": "FLQtMVQjnsls" + }, + "execution_count": 6, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "model_endpoint_id = aiplatform.Endpoint.list(filter=f'display_name=\"{deployed_model_display_name}\"')[0].name\n", + "print(model_endpoint_id)" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "3JjIwzZouAi5", + "outputId": "ffb1fb74-365a-426b-d60d-d3910c116e10" + }, + "execution_count": 7, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "8125472293125095424\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Set up the Vertex AI Feature Store for online serving\n" + ], + "metadata": { + "id": "ouMQZ4sC4zuO" + } + }, + { + "cell_type": "markdown", + "source": [ + "Set up feature data in BigQuery." + ], + "metadata": { + "id": "B1Bk7XP7190z" + } + }, + { + "cell_type": "code", + "source": [ + "feature_store_query = \"\"\"\n", + "SELECT cast(id as string) AS user_id,\n", + " age,\n", + " lower(gender) as gender,\n", + " lower(state) as state,\n", + " lower(country) as country,\n", + "FROM `bigquery-public-data.thelook_ecommerce.users`\n", + "\"\"\"\n", + "\n", + "# Fetch feature values from BigQuery\n", + "client = bigquery.Client(project=PROJECT_ID)\n", + "data = client.query(feature_store_query).result().to_dataframe()\n", + "\n", + "# Convert feature values to string type. This helps in creating tensor\n", + "# of these values for inference that requires same data type.\n", + "data['gender'] = pd.factorize(data['gender'])[0]\n", + "data['gender'] = data['gender'].astype(str)\n", + "data['state'] = pd.factorize(data['state'])[0]\n", + "data['state'] = data['state'].astype(str)\n", + "data['country'] = pd.factorize(data['country'])[0]\n", + "data['country'] = data['country'].astype(str)\n", + "data.head()" + ], + "metadata": { + "id": "4Qkysu_g19c_", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 206 + }, + "outputId": "187ee1e8-07c9-457a-abbe-fab724d997ce" + }, + "execution_count": 8, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + " user_id age gender state country\n", + "0 7723 12 0 0 0\n", + "1 93041 12 0 1 1\n", + "2 45741 12 1 1 1\n", + "3 16718 12 0 1 1\n", + "4 70137 12 1 1 1" + ], + "text/html": [ + "\n", + "
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_idagegenderstatecountry
0772312000
19304112011
24574112111
31671812011
47013712111
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "\n", + "\n", + "
\n", + " \n", + "\n", + "\n", + "\n", + " \n", + "
\n", + "\n", + "
\n", + "
\n" + ], + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "dataframe", + "variable_name": "data", + "summary": "{\n \"name\": \"data\",\n \"rows\": 100000,\n \"fields\": [\n {\n \"column\": \"user_id\",\n \"properties\": {\n \"dtype\": \"string\",\n \"num_unique_values\": 100000,\n \"samples\": [\n \"66192\",\n \"73109\",\n \"49397\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"age\",\n \"properties\": {\n \"dtype\": \"Int64\",\n \"num_unique_values\": 59,\n \"samples\": [\n \"12\",\n \"17\",\n \"46\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"gender\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 2,\n \"samples\": [\n \"1\",\n \"0\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"state\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 231,\n \"samples\": [\n \"218\",\n \"66\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"country\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 15,\n \"samples\": [\n \"9\",\n \"11\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n }\n ]\n}" + } + }, + "metadata": {}, + "execution_count": 8 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "Create a BigQuery dataset that will serve as the source for the Vertex AI Feature Store." + ], + "metadata": { + "id": "Mm-HCUaa3ROZ" + } + }, + { + "cell_type": "code", + "source": [ + "dataset_id = \"vertexai_enrichment\"\n", + "dataset = bigquery.Dataset(f\"{PROJECT_ID}.{dataset_id}\")\n", + "dataset.location = \"US\"\n", + "dataset = client.create_dataset(\n", + " dataset, exists_ok=True, timeout=30\n", + ")\n", + "\n", + "print(\"Created dataset - %s.%s\" % (dataset, dataset_id))" + ], + "metadata": { + "id": "vye3UBGZ3Q8n", + "colab": { + "base_uri": "https://localhost:8080/" + }, + "outputId": "437597af-837d-483e-8c1e-ebbe0eca81e0" + }, + "execution_count": 9, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Created dataset - Dataset(DatasetReference('google.com:clouddfe', 'vertexai_enrichment')).vertexai_enrichment\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "Create BigQuery View with the precomputed feature values." + ], + "metadata": { + "id": "7lKiprPX4AZy" + } + }, + { + "cell_type": "code", + "source": [ + "view_id = \"users_view\"\n", + "view_reference = \"%s.%s.%s\" % (PROJECT_ID, dataset_id, view_id)\n", + "view = bigquery.Table(view_reference)\n", + "view = client.load_table_from_dataframe(data, view_reference)" + ], + "metadata": { + "id": "xqaLPTxb4DDF" + }, + "execution_count": 10, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Initialize clients for Vertex AI to create and set up an online store." + ], + "metadata": { + "id": "eQLkSg3p7WAm" + } + }, + { + "cell_type": "code", + "source": [ + "API_ENDPOINT = f\"{LOCATION}-aiplatform.googleapis.com\"\n", + "\n", + "admin_client = FeatureOnlineStoreAdminServiceClient(\n", + " client_options={\"api_endpoint\": API_ENDPOINT}\n", + ")\n", + "registry_client = FeatureRegistryServiceClient(\n", + " client_options={\"api_endpoint\": API_ENDPOINT}\n", + ")" + ], + "metadata": { + "id": "GF_eIl-wVvRy" + }, + "execution_count": 11, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Create online store instances on Vertex AI." + ], + "metadata": { + "id": "d9Mbk6m9Vgdo" + } + }, + { + "cell_type": "code", + "source": [ + "feature_store_name = \"vertexai_enrichment\"\n", + "\n", + "online_store_config = feature_online_store_pb2.FeatureOnlineStore(\n", + " bigtable=feature_online_store_pb2.FeatureOnlineStore.Bigtable(\n", + " auto_scaling=feature_online_store_pb2.FeatureOnlineStore.Bigtable.AutoScaling(\n", + " min_node_count=1, max_node_count=1, cpu_utilization_target=80\n", + " )\n", + " )\n", + ")\n", + "\n", + "create_store_lro = admin_client.create_feature_online_store(\n", + " feature_online_store_admin_service_pb2.CreateFeatureOnlineStoreRequest(\n", + " parent=f\"projects/{PROJECT_ID}/locations/{LOCATION}\",\n", + " feature_online_store_id=feature_store_name,\n", + " feature_online_store=online_store_config,\n", + " )\n", + ")\n", + "\n", + "create_store_lro.result()" + ], + "metadata": { + "id": "Zj-xEu_hWY7f", + "colab": { + "base_uri": "https://localhost:8080/" + }, + "outputId": "7f4ed1d9-c0c4-4c3c-f199-1e340d2cff11" + }, + "execution_count": 12, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "name: \"projects/927334603519/locations/us-central1/featureOnlineStores/vertexai_enrichment\"" + ] + }, + "metadata": {}, + "execution_count": 12 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "For the store instances created above, create feature views using BigQuery as the data source." + ], + "metadata": { + "id": "DAHjWlqXXLU_" + } + }, + { + "cell_type": "code", + "source": [ + "feature_view_name = \"users\"\n", + "\n", + "bigquery_source = feature_view_pb2.FeatureView.BigQuerySource(\n", + " uri=f\"bq://{view_reference}\", entity_id_columns=[\"user_id\"]\n", + ")\n", + "\n", + "create_view_lro = admin_client.create_feature_view(\n", + " feature_online_store_admin_service_pb2.CreateFeatureViewRequest(\n", + " parent=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}\",\n", + " feature_view_id=feature_view_name,\n", + " feature_view=feature_view_pb2.FeatureView(\n", + " big_query_source=bigquery_source,\n", + " ),\n", + " )\n", + ")\n", + "\n", + "create_view_lro.result()" + ], + "metadata": { + "id": "IhUERuRGXNaN", + "colab": { + "base_uri": "https://localhost:8080/" + }, + "outputId": "84facd77-5be4-4c99-90b5-d8ccb4c5d702" + }, + "execution_count": 13, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "name: \"projects/927334603519/locations/us-central1/featureOnlineStores/vertexai_enrichment/featureViews/users\"" + ] + }, + "metadata": {}, + "execution_count": 13 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "Pull feature values into the feature store from BigQuery." + ], + "metadata": { + "id": "qbf4l8eBX6NG" + } + }, + { + "cell_type": "code", + "source": [ + "sync_response = admin_client.sync_feature_view(\n", + " feature_view=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", + ")" + ], + "metadata": { + "id": "gdpsLCmMX7fX" + }, + "execution_count": 14, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "while True:\n", + " feature_view_sync = admin_client.get_feature_view_sync(\n", + " name=sync_response.feature_view_sync\n", + " )\n", + " if feature_view_sync.run_time.end_time.seconds > 0:\n", + " if feature_view_sync.final_status.code == 0\n", + " print(\"feature view sync completed for %s\" % feature_view_sync.name)\n", + " else:\n", + " print(\"feature view sync failed for %s\" % feature_view_sync.name)\n", + " break\n", + " time.sleep(10)" + ], + "metadata": { + "id": "Lav6JTW4YKhR" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Check if the sync was created." + ], + "metadata": { + "id": "T3MMx7oJYPeC" + } + }, + { + "cell_type": "code", + "source": [ + "admin_client.list_feature_view_syncs(\n", + " parent=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", + ")" + ], + "metadata": { + "id": "ucSQRUfUYRFX", + "colab": { + "base_uri": "https://localhost:8080/" + }, + "outputId": "d2160812-9874-40bb-f464-f797eafb9999" + }, + "execution_count": 16, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "ListFeatureViewSyncsPager" + ] + }, + "metadata": {}, + "execution_count": 16 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Publish messages to Pub/Sub\n", + "\n", + "Use the Pub/Sub python client to publish messages.\n" + ], + "metadata": { + "id": "pHODouJDwc60" + } + }, + { + "cell_type": "code", + "source": [ + "# Replace with the name of your Pub/Sub topic.\n", + "TOPIC = \" \"\n", + "\n", + "# Replace with the subscription path for your topic.\n", + "SUBSCRIPTION = \"\"" + ], + "metadata": { + "id": "QKCuwDioxw-f" + }, + "execution_count": 17, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Retrieve sample data from a public dataset in BigQuery and convert it into Python dictionaries before sending it to Pub/Sub." + ], + "metadata": { + "id": "R0QYsOYFb_EU" + } + }, + { + "cell_type": "code", + "source": [ + "read_query = \"\"\"\n", + "SELECT cast(user_id as string) AS user_id,\n", + " product_id,\n", + " sale_price,\n", + "FROM `bigquery-public-data.thelook_ecommerce.order_items`\n", + "LIMIT 5;\n", + "\"\"\"\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID)\n", + "data = client.query(read_query).result().to_dataframe()\n", + "data.head()" + ], + "metadata": { + "id": "Kn7wmiKib-Wx", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 206 + }, + "outputId": "9680fbcc-dcb5-4158-90ae-69a9f3c776d0" + }, + "execution_count": 18, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + " user_id product_id sale_price\n", + "0 25005 14235 0.02\n", + "1 62544 14235 0.02\n", + "2 17228 14235 0.02\n", + "3 54015 14235 0.02\n", + "4 16569 14235 0.02" + ], + "text/html": [ + "\n", + "
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_idproduct_idsale_price
025005142350.02
162544142350.02
217228142350.02
354015142350.02
416569142350.02
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "\n", + "\n", + "
\n", + " \n", + "\n", + "\n", + "\n", + " \n", + "
\n", + "\n", + "
\n", + "
\n" + ], + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "dataframe", + "variable_name": "data", + "summary": "{\n \"name\": \"data\",\n \"rows\": 5,\n \"fields\": [\n {\n \"column\": \"user_id\",\n \"properties\": {\n \"dtype\": \"string\",\n \"num_unique_values\": 5,\n \"samples\": [\n \"62544\",\n \"16569\",\n \"17228\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"product_id\",\n \"properties\": {\n \"dtype\": \"Int64\",\n \"num_unique_values\": 1,\n \"samples\": [\n \"14235\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"sale_price\",\n \"properties\": {\n \"dtype\": \"number\",\n \"std\": 0.0,\n \"min\": 0.0199999995529651,\n \"max\": 0.0199999995529651,\n \"num_unique_values\": 1,\n \"samples\": [\n 0.0199999995529651\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n }\n ]\n}" + } + }, + "metadata": {}, + "execution_count": 18 + } + ] + }, + { + "cell_type": "code", + "source": [ + "messages = data.to_dict(orient='records')\n", + "\n", + "publisher = pubsub_v1.PublisherClient()\n", + "topic_name = publisher.topic_path(PROJECT_ID, TOPIC)\n", + "subscription_path = publisher.subscription_path(PROJECT_ID, SUBSCRIPTION)\n", + "for message in messages:\n", + " data = json.dumps(message).encode('utf-8')\n", + " publish_future = publisher.publish(topic_name, data)" + ], + "metadata": { + "id": "MaCJwaPexPKZ" + }, + "execution_count": 19, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Use the Vertex AI Feature Store enrichment handler\n", + "\n", + "The [`VertexAIFeatureStoreEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler) is a built-in handler included in the Apache Beam SDK versions 2.55.0 and later." + ], + "metadata": { + "id": "zPSFEMm02omi" + } + }, + { + "cell_type": "markdown", + "source": [ + "The `VertexAIFeatureStoreEnrichmentHandler` can be configured with the following required parameters:\n", + "\n", + "* `project`: Google Cloud project-id of the feature store.\n", + "* `location`: Location of the feature store. Eg: `us-central1`.\n", + "* `api_endpoint`: Public endpoint of the feature store.\n", + "* `feature_store_name`: The name of the Vertex AI Feature Store.\n", + "* `feature_view_name`: The name of the feature view within the Vertex AI Feature Store.\n", + "* `row_key`: The field name in the input row containing the entity-id for the feature store. This is used to extract the entity-id from each element and use it to fetch feature values for that specific element in the enrichment transform.\n", + "\n", + "Optionally, `VertexAIFeatureStoreEnrichmentHandler` accepts a kwargs to provide more configuration to connect with the Vertex AI client - [`FeatureOnlineStoreServiceClient`](https://cloud.google.com/php/docs/reference/cloud-ai-platform/latest/V1.FeatureOnlineStoreServiceClient).\n", + "\n", + "**Note:** When exceptions occur, by default, the logging severity is set to warning ([`ExceptionLevel.WARN`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel.WARN)). To configure the severity to raise exceptions, set `exception_level` to [`ExceptionLevel.RAISE`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel.RAISE). To ignore exceptions, set `exception_level` to [`ExceptionLevel.QUIET`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel.QUIET).\n", + "\n", + "The `VertexAIFeatureStoreEnrichmentHandler` returns the latest feature values from the feature store." + ], + "metadata": { + "id": "K41xhvmA5yQk" + } + }, + { + "cell_type": "code", + "source": [ + "row_key = 'user_id'" + ], + "metadata": { + "id": "3dB26jhI45gd" + }, + "execution_count": 20, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(project=PROJECT_ID,\n", + " location=LOCATION,\n", + " api_endpoint = API_ENDPOINT,\n", + " feature_store_name=feature_store_name,\n", + " feature_view_name=feature_view_name,\n", + " row_key=row_key)" + ], + "metadata": { + "id": "cr1j_DHK4gA4" + }, + "execution_count": 21, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Use the enrichment transform\n", + "\n", + "To use the [enrichment transform](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.Enrichment), the [`EnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.EnrichmentSourceHandler) parameter is required. You can also use a configuration parameter to specify a `lambda` for a join function, a timeout, a throttler, and a repeater (retry strategy).\n", + "\n", + "\n", + "* `join_fn`: A lambda function that takes dictionaries as input and returns an enriched row (`Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]`). The enriched row specifies how to join the data fetched from the API. Defaults to a [cross-join](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.cross_join).\n", + "* `timeout`: The number of seconds to wait for the request to be completed by the API before timing out. Defaults to 30 seconds.\n", + "* `throttler`: Specifies the throttling mechanism. The only supported option is default client-side adaptive throttling.\n", + "* `repeater`: Specifies the retry strategy when errors like `TooManyRequests` and `TimeoutException` occur. Defaults to [`ExponentialBackOffRepeater`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.ExponentialBackOffRepeater).\n", + "\n", + "\n", + "To utilize the Redis cache, apply the `with_redis_cache` hook to the `Enrichment` transform. The coders for encoding/decoding the input and output for the cache are optional and are internally inferred." + ], + "metadata": { + "id": "-Lvo8O2V-0Ey" + } + }, + { + "cell_type": "markdown", + "source": [ + "The following example demonstrates the code needed to add this transform to your pipeline.\n", + "\n", + "\n", + "```\n", + "with beam.Pipeline() as p:\n", + " output = (p\n", + " ...\n", + " | \"Enrich with Vertex AI\" >> Enrichment(vertex_ai_handler)\n", + " | \"RunInference\" >> RunInference(model_handler)\n", + " ...\n", + " )\n", + "```\n", + "\n", + "\n", + "\n", + "\n" + ], + "metadata": { + "id": "xJTCfSmiV1kv" + } + }, + { + "cell_type": "markdown", + "source": [ + "To make a prediction, use the following fields: `product_id`, `quantity`, `price`, `customer_id`, and `customer_location`. Retrieve the value of the `customer_location` field from Bigtable.\n", + "\n", + "The enrichment transform performs a [`cross_join`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.cross_join) by default." + ], + "metadata": { + "id": "F-xjiP_pHWZr" + } + }, + { + "cell_type": "markdown", + "source": [ + "## Use the `VertexAIModelHandlerJSON` interface to run inference\n", + "\n" + ], + "metadata": { + "id": "CX9Cqybu6scV" + } + }, + { + "cell_type": "markdown", + "source": [ + "Since the enrichment transform outputs data in the format `beam.Row`, in order to align it with the `VertexAIModelHandlerJSON` interface, it needs to be converted into a list of `tensorflow.tensor`. Furthermore, certain enriched fields may be of `string` type, but for `tensor` creation, all values should be of the same type. Therefore, convert any `string` type fields to `int` type before creating a tensor." + ], + "metadata": { + "id": "zy5Jl7_gLklX" + } + }, + { + "cell_type": "code", + "source": [ + "def convert_row_to_tensor(element: beam.Row):\n", + " element_dict = element._asdict()\n", + " row = list(element_dict.values())\n", + " for i, r in enumerate(row):\n", + " if isinstance(r, str):\n", + " row[i] = int(r)\n", + " return tf.convert_to_tensor(row, dtype=tf.float32).numpy().tolist()" + ], + "metadata": { + "id": "KBKoB06nL4LF" + }, + "execution_count": 22, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Initialize the model handler with the preprocessing function." + ], + "metadata": { + "id": "-tGHyB_vL3rJ" + } + }, + { + "cell_type": "code", + "source": [ + "model_handler = VertexAIModelHandlerJSON(endpoint_id=model_endpoint_id,\n", + " project=PROJECT_ID,\n", + " location=LOCATION,\n", + " ).with_preprocess_fn(convert_row_to_tensor)" + ], + "metadata": { + "id": "VqUUEwcU-r2e" + }, + "execution_count": 23, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Define a `DoFn` to format the output." + ], + "metadata": { + "id": "vNHI4gVgNec2" + } + }, + { + "cell_type": "code", + "source": [ + "class PostProcessor(beam.DoFn):\n", + " def process(self, element, *args, **kwargs):\n", + " print('Customer %d who bought product %d is recommended to buy product %d' % (element.example[0], element.example[1], math.ceil(element.inference[0])))" + ], + "metadata": { + "id": "rkN-_Yf4Nlwy" + }, + "execution_count": 24, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "0a1zerXycQ0z" + }, + "source": [ + "## Run the pipeline\n" + ] + }, + { + "cell_type": "markdown", + "source": [ + "Configure the pipeline to run in streaming model." + ], + "metadata": { + "id": "WrwY0_gV_IDK" + } + }, + { + "cell_type": "code", + "source": [ + "options = pipeline_options.PipelineOptions()\n", + "options.view_as(pipeline_options.StandardOptions).streaming = True # Streaming mode is set to True" + ], + "metadata": { + "id": "t0425sYBsYtB" + }, + "execution_count": 25, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Pub/Sub sends the data in bytes. Convert the data to `beam.Row` objects by using a `DoFn`." + ], + "metadata": { + "id": "DBNijQDY_dRe" + } + }, + { + "cell_type": "code", + "source": [ + "class DecodeBytes(beam.DoFn):\n", + " \"\"\"\n", + " The DecodeBytes `DoFn` converts the data read from Pub/Sub to `beam.Row`.\n", + " First, decode the encoded string. Convert the output to\n", + " a `dict` with `json.loads()`, which is used to create a `beam.Row`.\n", + " \"\"\"\n", + " def process(self, element, *args, **kwargs):\n", + " element_dict = json.loads(element.decode('utf-8'))\n", + " yield beam.Row(**element_dict)" + ], + "metadata": { + "id": "sRw9iL8pKP5O" + }, + "execution_count": 26, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Use the following code to run the pipeline.\n", + "\n", + "**Note:** Because this pipeline is a streaming pipeline, you need to manually stop the cell. If you don't stop the cell, the pipeline continues to run." + ], + "metadata": { + "id": "xofUJym-_GuB" + } + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": { + "id": "St07XoibcQSb", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 671 + }, + "outputId": "0ca70756-6a69-4d63-9ab7-8814ae6adf05" + }, + "outputs": [ + { + "output_type": "display_data", + "data": { + "application/javascript": [ + "\n", + " if (typeof window.interactive_beam_jquery == 'undefined') {\n", + " var jqueryScript = document.createElement('script');\n", + " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", + " jqueryScript.type = 'text/javascript';\n", + " jqueryScript.onload = function() {\n", + " var datatableScript = document.createElement('script');\n", + " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", + " datatableScript.type = 'text/javascript';\n", + " datatableScript.onload = function() {\n", + " window.interactive_beam_jquery = jQuery.noConflict(true);\n", + " window.interactive_beam_jquery(document).ready(function($){\n", + " \n", + " });\n", + " }\n", + " document.head.appendChild(datatableScript);\n", + " };\n", + " document.head.appendChild(jqueryScript);\n", + " } else {\n", + " window.interactive_beam_jquery(document).ready(function($){\n", + " \n", + " });\n", + " }" + ] + }, + "metadata": {} + }, + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Customer 25005 who bought product 14235 is recommended to buy product 8944\n", + "Customer 62544 who bought product 14235 is recommended to buy product 23313\n", + "Customer 17228 who bought product 14235 is recommended to buy product 6600\n", + "Customer 54015 who bought product 14235 is recommended to buy product 19682\n", + "Customer 16569 who bought product 14235 is recommended to buy product 6441\n" + ] + } + ], + "source": [ + "with beam.Pipeline(options=options) as p:\n", + " _ = (p\n", + " | \"Read from Pub/Sub\" >> beam.io.ReadFromPubSub(subscription=subscription_path)\n", + " | \"ConvertToRow\" >> beam.ParDo(DecodeBytes())\n", + " | \"Enrichment\" >> Enrichment(vertex_ai_handler)\n", + " | \"RunInference\" >> RunInference(model_handler)\n", + " | \"Format Output\" >> beam.ParDo(PostProcessor())\n", + " )" + ] + }, + { + "cell_type": "markdown", + "source": [ + "## Clean up resources" + ], + "metadata": { + "id": "yDjkq2VI7fuM" + } + }, + { + "cell_type": "code", + "source": [ + "# delete feature views\n", + "admin_client.delete_feature_view(\n", + " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", + ")\n", + "\n", + "# delete online store instance\n", + "admin_client.delete_feature_online_store(\n", + " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}\",\n", + " force=True,\n", + ")" + ], + "metadata": { + "id": "UiPb_kzv7pCu", + "colab": { + "base_uri": "https://localhost:8080/" + }, + "outputId": "7902c30f-4db0-431b-9dd8-b647b3cb34da" + }, + "execution_count": 8, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "" + ] + }, + "metadata": {}, + "execution_count": 8 + } + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file From be15238afe4dfec54c5f02b8af06492cde63cd98 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 25 Mar 2024 11:25:23 -0400 Subject: [PATCH 2/5] address review comments, add to readme --- examples/notebooks/beam-ml/README.md | 5 + .../vertex_ai_feature_store_enrichment.ipynb | 269 ++++++------------ 2 files changed, 90 insertions(+), 184 deletions(-) diff --git a/examples/notebooks/beam-ml/README.md b/examples/notebooks/beam-ml/README.md index f1c19747fc713..82b2a4acf270a 100644 --- a/examples/notebooks/beam-ml/README.md +++ b/examples/notebooks/beam-ml/README.md @@ -56,6 +56,11 @@ This section contains the following example notebooks. * [Use MLTransform to scale data](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/data_preprocessing/scale_data.ipynb) * [Preprocessing with the Apache Beam DataFrames API](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/dataframe_api_preprocessing.ipynb) +### Data enrichment + +* [Use Cloud Bigtable to enrich data](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/bigtable_enrichment_transform.ipynb) +* [Use Vertex AI Feature Store for feature enrichment](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb) + ### Prediction and inference with pretrained models * [Apache Beam RunInference for PyTorch](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_pytorch.ipynb) diff --git a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb index ef41cf11053ed..dc72e520a9d70 100644 --- a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb +++ b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb @@ -59,15 +59,19 @@ "- The enrichment transform uses client-side throttling to manage rate limiting the requests.\n", "- Optionally, you can configure a Redis cache to improve efficiency.\n", "\n", - "As of version 2.55.0, [online feature serving](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview#online_serving) via Bigtable online serving and Vertex AI Feature Store (Legacy) method is supported. This notebook demonstrates how to use the Bigtable online serving approach with enrichment transform in an Apache Beam pipeline." + "As of Apache Beam SDK version 2.55.0, [online feature serving](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview#online_serving) through Bigtable online serving and the Vertex AI Feature Store (Legacy) method is supported. This notebook demonstrates how to use the Bigtable online serving approach with the enrichment transform in an Apache Beam pipeline." ] }, { "cell_type": "markdown", "source": [ - "This notebook demonstrates the following ecommerce product recommendation use case based on a BigQuery public dataset - [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce):\n", + "This notebook demonstrates an ecommerce product recommendation use case based on the BigQuery public dataset [theLook eCommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce):\n", "\n", - "A stream of online transactions from [Pub/Sub](https://cloud.google.com/pubsub/docs/guides) contains the following fields: `product_id`, `user_id`, and `sale_price`. A machine learning model is deployed on Vertex AI based on features - `product_id`, `user_id`, `sale_price`, `age`, `gender`, `state`, and `country`. These features values are precomputed and stored in the Vertex AI Online Feature Store. This precomputed data is used to enrich the incoming stream of events from Pub/Sub with demographic information. The enriched data is sent to the Vertex AI model for online prediction, which predicts the product recommendation for the user." + "* Stream of online transactions from [Pub/Sub](https://cloud.google.com/pubsub/docs/guides) contains the following fields: `product_id`, `user_id`, and `sale_price`.\n", + "* Pretrained model is deployed on Vertex AI based on the features - `product_id`, `user_id`, `sale_price`, `age`, `gender`, `state`, and `country`.\n", + "* Feature values for the pretrained model are precomputed and stored in the Vertex AI Feature Store.\n", + "* Enrich the stream of transactions from Pub/Sub with feature values from Vertex AI Feature Store using the `Enrichment` transform.\n", + "* Send the enriched data to the Vertex AI model for online prediction using `RunInference` transform, which predicts the product recommendation for the user." ], "metadata": { "id": "ltn5zrBiGS9C" @@ -105,7 +109,7 @@ "!pip install apache_beam[interactive,gcp]==2.55.0 --quiet\n", "!pip install redis\n", "\n", - "# Use tensorflow 2.13.0 since it is the latest version that has the prebuilt\n", + "# Use TensorFlow 2.13.0, because it is the latest version that has the prebuilt\n", "# container image for Vertex AI model deployment.\n", "# See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#tensorflow\n", "!pip install tensorflow==2.13" @@ -151,7 +155,7 @@ "metadata": { "id": "SiJii48A2Rnb" }, - "execution_count": 1, + "execution_count": null, "outputs": [] }, { @@ -166,7 +170,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": { "id": "Kz9sccyGBqz3" }, @@ -187,7 +191,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": { "id": "wEXucyi2liij" }, @@ -210,7 +214,7 @@ { "cell_type": "markdown", "source": [ - "Fetcht the training data from BigQuery public dataset [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce)." + "Fetch the training data from the BigQuery public dataset [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce)." ], "metadata": { "id": "8cUpV7mkB_xE" @@ -251,7 +255,7 @@ }, "outputId": "4f7afe32-a72b-40d3-b9ae-cc999ad104b8" }, - "execution_count": 4, + "execution_count": null, "outputs": [ { "output_type": "execute_result", @@ -571,7 +575,7 @@ { "cell_type": "markdown", "source": [ - "Create a prediction dataframe that contains the `product_id` a user should be recommended to buy. Also, preprocess the data for columns that contain the categorical values." + "Create a prediction dataframe that contains the `product_id` to recommend to the user. Also, preprocess the data for columns that contain the categorical values." ], "metadata": { "id": "OkYcJPC0THoV" @@ -580,10 +584,10 @@ { "cell_type": "code", "source": [ - "# create a prediction dataframe\n", + "# Create a prediction dataframe.\n", "prediction_data = train_data['product_id'].sample(frac=1, replace=True)\n", "\n", - "# preprocess data to handle categorical values\n", + "# Preprocess data to handle categorical values.\n", "train_data['gender'] = pd.factorize(train_data['gender'])[0]\n", "train_data['state'] = pd.factorize(train_data['state'])[0]\n", "train_data['country'] = pd.factorize(train_data['country'])[0]\n", @@ -597,7 +601,7 @@ }, "outputId": "44cfd7f1-0c7c-40a8-813f-02af86a6f788" }, - "execution_count": 5, + "execution_count": null, "outputs": [ { "output_type": "execute_result", @@ -917,7 +921,7 @@ { "cell_type": "markdown", "source": [ - "Convert dataframe to tensors." + "Convert the dataframe to tensors." ], "metadata": { "id": "7ffoopdQVk8W" @@ -932,13 +936,13 @@ "metadata": { "id": "vmHH26KDVkuf" }, - "execution_count": 6, + "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ - "Based on this data, build a simple neural network model using tensorflow." + "Based on this data, build a basic neural network model by using TensorFlow." ], "metadata": { "id": "CRoW8ElNV4I9" @@ -957,13 +961,13 @@ "metadata": { "id": "EKrb13wsV3m4" }, - "execution_count": 7, + "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ - "Train the model (Takes approx 1m 30 secs for 1 epoch)." + "Train the model. This step takes about 90 seconds for one epoch." ], "metadata": { "id": "Duv4qzmEWFSZ" @@ -1005,10 +1009,10 @@ { "cell_type": "code", "source": [ - "# create a new directory for saving the model.\n", + "# Create a new directory to save the model.\n", "!mkdir model\n", "\n", - "# save the model.\n", + "# Save the model.\n", "MODEL_PATH = './model/'\n", "tf.saved_model.save(model, MODEL_PATH)" ], @@ -1021,7 +1025,7 @@ { "cell_type": "markdown", "source": [ - "Stage the locally saved model to GCS bucket. We will use this GCS bucket to deploy the model to Vertex AI." + "Stage the locally saved model to a Google Cloud Storage bucket. Use this Cloud Storage bucket to deploy the model to Vertex AI." ], "metadata": { "id": "hsJOxFTWj6JX" @@ -1042,7 +1046,7 @@ { "cell_type": "code", "source": [ - "# stage to GCS bucket\n", + "# Stage to the Cloud Storage bucket.\n", "import glob\n", "from google.cloud import storage\n", "client = storage.Client(project=PROJECT_ID)\n", @@ -1067,7 +1071,7 @@ { "cell_type": "markdown", "source": [ - "Upload the model saved in GCS bucket to Vertex AI Model Registry." + "Upload the model saved in the Cloud Storage bucket to Vertex AI Model Registry." ], "metadata": { "id": "O72h009kl_-L" @@ -1081,7 +1085,7 @@ "metadata": { "id": "bKN5pUD3uImj" }, - "execution_count": 4, + "execution_count": null, "outputs": [] }, { @@ -1098,7 +1102,7 @@ "metadata": { "id": "Pp3Jca9GfpEj" }, - "execution_count": 5, + "execution_count": null, "outputs": [] }, { @@ -1124,27 +1128,15 @@ }, "outputId": "bfd954c0-8267-476d-dd0c-15e612ae0cc1" }, - "execution_count": 30, - "outputs": [ - { - "output_type": "stream", - "name": "stderr", - "text": [ - "INFO:google.cloud.aiplatform.models:Creating Endpoint\n", - "INFO:google.cloud.aiplatform.models:Create Endpoint backing LRO: projects/927334603519/locations/us-central1/endpoints/5369128583685996544/operations/3775856005049483264\n", - "INFO:google.cloud.aiplatform.models:Endpoint created. Resource name: projects/927334603519/locations/us-central1/endpoints/5369128583685996544\n", - "INFO:google.cloud.aiplatform.models:To use this Endpoint in another session:\n", - "INFO:google.cloud.aiplatform.models:endpoint = aiplatform.Endpoint('projects/927334603519/locations/us-central1/endpoints/5369128583685996544')\n" - ] - } - ] + "execution_count": null, + "outputs": [] }, { "cell_type": "markdown", "source": [ - "Deploy model to the Vertex AI endpoint.\n", + "Deploy the model to the Vertex AI endpoint.\n", "\n", - "**Note:** This step is a long running operation (LRO). It may take more than 5 minutes to complete depending on the size of the model." + "**Note:** This step is a Long Running Operation (LRO). Depending on the size of the model, it might take more than five minutes to complete." ], "metadata": { "id": "WgSpy0J3oBFP" @@ -1161,7 +1153,7 @@ "metadata": { "id": "FLQtMVQjnsls" }, - "execution_count": 6, + "execution_count": null, "outputs": [] }, { @@ -1177,7 +1169,7 @@ "id": "3JjIwzZouAi5", "outputId": "ffb1fb74-365a-426b-d60d-d3910c116e10" }, - "execution_count": 7, + "execution_count": null, "outputs": [ { "output_type": "stream", @@ -1200,7 +1192,7 @@ { "cell_type": "markdown", "source": [ - "Set up feature data in BigQuery." + "Set up the feature data in BigQuery." ], "metadata": { "id": "B1Bk7XP7190z" @@ -1218,12 +1210,12 @@ "FROM `bigquery-public-data.thelook_ecommerce.users`\n", "\"\"\"\n", "\n", - "# Fetch feature values from BigQuery\n", + "# Fetch feature values from BigQuery.\n", "client = bigquery.Client(project=PROJECT_ID)\n", "data = client.query(feature_store_query).result().to_dataframe()\n", "\n", - "# Convert feature values to string type. This helps in creating tensor\n", - "# of these values for inference that requires same data type.\n", + "# Convert feature values to the string type. This step helps when creating tensor\n", + "# of these values for inference that requires the same data type.\n", "data['gender'] = pd.factorize(data['gender'])[0]\n", "data['gender'] = data['gender'].astype(str)\n", "data['state'] = pd.factorize(data['state'])[0]\n", @@ -1240,7 +1232,7 @@ }, "outputId": "187ee1e8-07c9-457a-abbe-fab724d997ce" }, - "execution_count": 8, + "execution_count": null, "outputs": [ { "output_type": "execute_result", @@ -1549,7 +1541,7 @@ { "cell_type": "markdown", "source": [ - "Create a BigQuery dataset that will serve as the source for the Vertex AI Feature Store." + "Create a BigQuery dataset to use as the source for the Vertex AI Feature Store." ], "metadata": { "id": "Mm-HCUaa3ROZ" @@ -1574,21 +1566,13 @@ }, "outputId": "437597af-837d-483e-8c1e-ebbe0eca81e0" }, - "execution_count": 9, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "Created dataset - Dataset(DatasetReference('google.com:clouddfe', 'vertexai_enrichment')).vertexai_enrichment\n" - ] - } - ] + "execution_count": null, + "outputs": [] }, { "cell_type": "markdown", "source": [ - "Create BigQuery View with the precomputed feature values." + "Create a BigQuery view with the precomputed feature values." ], "metadata": { "id": "7lKiprPX4AZy" @@ -1605,7 +1589,7 @@ "metadata": { "id": "xqaLPTxb4DDF" }, - "execution_count": 10, + "execution_count": null, "outputs": [] }, { @@ -1632,7 +1616,7 @@ "metadata": { "id": "GF_eIl-wVvRy" }, - "execution_count": 11, + "execution_count": null, "outputs": [] }, { @@ -1674,24 +1658,13 @@ }, "outputId": "7f4ed1d9-c0c4-4c3c-f199-1e340d2cff11" }, - "execution_count": 12, - "outputs": [ - { - "output_type": "execute_result", - "data": { - "text/plain": [ - "name: \"projects/927334603519/locations/us-central1/featureOnlineStores/vertexai_enrichment\"" - ] - }, - "metadata": {}, - "execution_count": 12 - } - ] + "execution_count": null, + "outputs": [] }, { "cell_type": "markdown", "source": [ - "For the store instances created above, create feature views using BigQuery as the data source." + "For the store instances created previously, use BigQuery as the data source to create feature views." ], "metadata": { "id": "DAHjWlqXXLU_" @@ -1725,24 +1698,13 @@ }, "outputId": "84facd77-5be4-4c99-90b5-d8ccb4c5d702" }, - "execution_count": 13, - "outputs": [ - { - "output_type": "execute_result", - "data": { - "text/plain": [ - "name: \"projects/927334603519/locations/us-central1/featureOnlineStores/vertexai_enrichment/featureViews/users\"" - ] - }, - "metadata": {}, - "execution_count": 13 - } - ] + "execution_count": null, + "outputs": [] }, { "cell_type": "markdown", "source": [ - "Pull feature values into the feature store from BigQuery." + "Pull feature values from BigQuery into the feature store." ], "metadata": { "id": "qbf4l8eBX6NG" @@ -1758,7 +1720,7 @@ "metadata": { "id": "gdpsLCmMX7fX" }, - "execution_count": 14, + "execution_count": null, "outputs": [] }, { @@ -1785,7 +1747,7 @@ { "cell_type": "markdown", "source": [ - "Check if the sync was created." + "Confirm the sync creation." ], "metadata": { "id": "T3MMx7oJYPeC" @@ -1805,45 +1767,15 @@ }, "outputId": "d2160812-9874-40bb-f464-f797eafb9999" }, - "execution_count": 16, - "outputs": [ - { - "output_type": "execute_result", - "data": { - "text/plain": [ - "ListFeatureViewSyncsPager" - ] - }, - "metadata": {}, - "execution_count": 16 - } - ] + "execution_count": null, + "outputs": [] }, { "cell_type": "markdown", "source": [ "### Publish messages to Pub/Sub\n", "\n", - "Use the Pub/Sub python client to publish messages.\n" + "Use the Pub/Sub Python client to publish messages.\n" ], "metadata": { "id": "pHODouJDwc60" @@ -1861,13 +1793,13 @@ "metadata": { "id": "QKCuwDioxw-f" }, - "execution_count": 17, + "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ - "Retrieve sample data from a public dataset in BigQuery and convert it into Python dictionaries before sending it to Pub/Sub." + "Retrieve sample data from a public dataset in BigQuery. Convert it into Python dictionaries, and then send it to Pub/Sub." ], "metadata": { "id": "R0QYsOYFb_EU" @@ -1896,7 +1828,7 @@ }, "outputId": "9680fbcc-dcb5-4158-90ae-69a9f3c776d0" }, - "execution_count": 18, + "execution_count": null, "outputs": [ { "output_type": "execute_result", @@ -2205,7 +2137,7 @@ "metadata": { "id": "MaCJwaPexPKZ" }, - "execution_count": 19, + "execution_count": null, "outputs": [] }, { @@ -2222,20 +2154,20 @@ { "cell_type": "markdown", "source": [ - "The `VertexAIFeatureStoreEnrichmentHandler` can be configured with the following required parameters:\n", + "Configure the `VertexAIFeatureStoreEnrichmentHandler` with the following required parameters:\n", "\n", - "* `project`: Google Cloud project-id of the feature store.\n", - "* `location`: Location of the feature store. Eg: `us-central1`.\n", - "* `api_endpoint`: Public endpoint of the feature store.\n", - "* `feature_store_name`: The name of the Vertex AI Feature Store.\n", - "* `feature_view_name`: The name of the feature view within the Vertex AI Feature Store.\n", - "* `row_key`: The field name in the input row containing the entity-id for the feature store. This is used to extract the entity-id from each element and use it to fetch feature values for that specific element in the enrichment transform.\n", + "* `project`: the Google Cloud project ID for the feature store\n", + "* `location`: the region of the feature store, for example `us-central1`\n", + "* `api_endpoint`: the public endpoint of the feature store\n", + "* `feature_store_name`: the name of the Vertex AI Feature Store\n", + "* `feature_view_name`: the name of the feature view within the Vertex AI Feature Store\n", + "* `row_key`: The field name in the input row containing the entity ID for the feature store. This value is used to extract the entity ID from each element. The entity ID is used to fetch feature values for that specific element in the enrichment transform.\n", "\n", - "Optionally, `VertexAIFeatureStoreEnrichmentHandler` accepts a kwargs to provide more configuration to connect with the Vertex AI client - [`FeatureOnlineStoreServiceClient`](https://cloud.google.com/php/docs/reference/cloud-ai-platform/latest/V1.FeatureOnlineStoreServiceClient).\n", + "Optionally, to provide more configuration values to connect with the Vertex AI client, the `VertexAIFeatureStoreEnrichmentHandler` accepts a kwargs. For more information, see [`FeatureOnlineStoreServiceClient`](https://cloud.google.com/php/docs/reference/cloud-ai-platform/latest/V1.FeatureOnlineStoreServiceClient).\n", "\n", - "**Note:** When exceptions occur, by default, the logging severity is set to warning ([`ExceptionLevel.WARN`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel.WARN)). To configure the severity to raise exceptions, set `exception_level` to [`ExceptionLevel.RAISE`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel.RAISE). To ignore exceptions, set `exception_level` to [`ExceptionLevel.QUIET`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel.QUIET).\n", + "**Note:** When exceptions occur, by default, the logging severity is set to warning ([`ExceptionLevel.WARN`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.utils.html#apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel.WARN)). To configure the severity to raise exceptions, set `exception_level` to [`ExceptionLevel.RAISE`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.utils.html#apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel.RAISE). To ignore exceptions, set `exception_level` to [`ExceptionLevel.QUIET`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.utils.html#apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel.QUIET).\n", "\n", - "The `VertexAIFeatureStoreEnrichmentHandler` returns the latest feature values from the feature store." + "The `VertexAIFeatureStoreEnrichmentHandler` handler returns the latest feature values from the feature store." ], "metadata": { "id": "K41xhvmA5yQk" @@ -2249,7 +2181,7 @@ "metadata": { "id": "3dB26jhI45gd" }, - "execution_count": 20, + "execution_count": null, "outputs": [] }, { @@ -2265,7 +2197,7 @@ "metadata": { "id": "cr1j_DHK4gA4" }, - "execution_count": 21, + "execution_count": null, "outputs": [] }, { @@ -2282,7 +2214,7 @@ "* `repeater`: Specifies the retry strategy when errors like `TooManyRequests` and `TimeoutException` occur. Defaults to [`ExponentialBackOffRepeater`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.requestresponse.html#apache_beam.io.requestresponse.ExponentialBackOffRepeater).\n", "\n", "\n", - "To utilize the Redis cache, apply the `with_redis_cache` hook to the `Enrichment` transform. The coders for encoding/decoding the input and output for the cache are optional and are internally inferred." + "To use the Redis cache, apply the `with_redis_cache` hook to the enrichment transform. The coders for encoding and decoding the input and output for the cache are optional and are internally inferred." ], "metadata": { "id": "-Lvo8O2V-0Ey" @@ -2336,7 +2268,7 @@ { "cell_type": "markdown", "source": [ - "Since the enrichment transform outputs data in the format `beam.Row`, in order to align it with the `VertexAIModelHandlerJSON` interface, it needs to be converted into a list of `tensorflow.tensor`. Furthermore, certain enriched fields may be of `string` type, but for `tensor` creation, all values should be of the same type. Therefore, convert any `string` type fields to `int` type before creating a tensor." + "Because the enrichment transform outputs data in the format `beam.Row`, in order to align it with the [`VertexAIModelHandlerJSON`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.vertex_ai_inference.html#apache_beam.ml.inference.vertex_ai_inference.VertexAIModelHandlerJSON) interface, convert the output into a list of `tensorflow.tensor`. Some enriched fields are of `string` type. For tensor creation, all values must be of the same type. Therefore, convert any `string` type fields to `int` type fields before creating a tensor." ], "metadata": { "id": "zy5Jl7_gLklX" @@ -2356,7 +2288,7 @@ "metadata": { "id": "KBKoB06nL4LF" }, - "execution_count": 22, + "execution_count": null, "outputs": [] }, { @@ -2379,7 +2311,7 @@ "metadata": { "id": "VqUUEwcU-r2e" }, - "execution_count": 23, + "execution_count": null, "outputs": [] }, { @@ -2401,7 +2333,7 @@ "metadata": { "id": "rkN-_Yf4Nlwy" }, - "execution_count": 24, + "execution_count": null, "outputs": [] }, { @@ -2416,7 +2348,7 @@ { "cell_type": "markdown", "source": [ - "Configure the pipeline to run in streaming model." + "Configure the pipeline to run in streaming mode." ], "metadata": { "id": "WrwY0_gV_IDK" @@ -2431,7 +2363,7 @@ "metadata": { "id": "t0425sYBsYtB" }, - "execution_count": 25, + "execution_count": null, "outputs": [] }, { @@ -2459,7 +2391,7 @@ "metadata": { "id": "sRw9iL8pKP5O" }, - "execution_count": 26, + "execution_count": null, "outputs": [] }, { @@ -2475,7 +2407,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": null, "metadata": { "id": "St07XoibcQSb", "colab": { @@ -2485,37 +2417,6 @@ "outputId": "0ca70756-6a69-4d63-9ab7-8814ae6adf05" }, "outputs": [ - { - "output_type": "display_data", - "data": { - "application/javascript": [ - "\n", - " if (typeof window.interactive_beam_jquery == 'undefined') {\n", - " var jqueryScript = document.createElement('script');\n", - " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", - " jqueryScript.type = 'text/javascript';\n", - " jqueryScript.onload = function() {\n", - " var datatableScript = document.createElement('script');\n", - " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", - " datatableScript.type = 'text/javascript';\n", - " datatableScript.onload = function() {\n", - " window.interactive_beam_jquery = jQuery.noConflict(true);\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }\n", - " document.head.appendChild(datatableScript);\n", - " };\n", - " document.head.appendChild(jqueryScript);\n", - " } else {\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }" - ] - }, - "metadata": {} - }, { "output_type": "stream", "name": "stdout", @@ -2551,12 +2452,12 @@ { "cell_type": "code", "source": [ - "# delete feature views\n", + "# Delete feature views.\n", "admin_client.delete_feature_view(\n", " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", ")\n", "\n", - "# delete online store instance\n", + "# Delete online store instance.\n", "admin_client.delete_feature_online_store(\n", " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}\",\n", " force=True,\n", @@ -2569,7 +2470,7 @@ }, "outputId": "7902c30f-4db0-431b-9dd8-b647b3cb34da" }, - "execution_count": 8, + "execution_count": null, "outputs": [ { "output_type": "execute_result", From ada63baa3e0839cd2a80e4a5895841c2255a49a1 Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Mon, 25 Mar 2024 13:31:45 -0400 Subject: [PATCH 3/5] Update examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> --- .../notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb index dc72e520a9d70..ca6b47fac0c0d 100644 --- a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb +++ b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb @@ -65,7 +65,7 @@ { "cell_type": "markdown", "source": [ - "This notebook demonstrates an ecommerce product recommendation use case based on the BigQuery public dataset [theLook eCommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce):\n", + "This notebook demonstrates the following ecommerce product recommendation use case based on the BigQuery public dataset [theLook eCommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce):\n", "\n", "* Stream of online transactions from [Pub/Sub](https://cloud.google.com/pubsub/docs/guides) contains the following fields: `product_id`, `user_id`, and `sale_price`.\n", "* Pretrained model is deployed on Vertex AI based on the features - `product_id`, `user_id`, `sale_price`, `age`, `gender`, `state`, and `country`.\n", From db114f9f70b1e96b0c16fd5845bb51d86eee14b4 Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Mon, 25 Mar 2024 15:49:05 -0400 Subject: [PATCH 4/5] Update examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com> --- .../notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb index ca6b47fac0c0d..cce8242525764 100644 --- a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb +++ b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb @@ -1025,7 +1025,7 @@ { "cell_type": "markdown", "source": [ - "Stage the locally saved model to a Google Cloud Storage bucket. Use this Cloud Storage bucket to deploy the model to Vertex AI." + "Stage the locally saved model to a Google Cloud Storage bucket. Use this Cloud Storage bucket to deploy the model to Vertex AI. Replace `` with the name of your Cloud Storage bucket. Replace `` with the path to your Cloud Storage bucket." ], "metadata": { "id": "hsJOxFTWj6JX" From 442361f8b1fc581a61c3ec4f9b8dde3366605f93 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 25 Mar 2024 15:52:38 -0400 Subject: [PATCH 5/5] update content from review --- examples/notebooks/beam-ml/README.md | 2 +- .../vertex_ai_feature_store_enrichment.ipynb | 1032 ++++++++--------- 2 files changed, 517 insertions(+), 517 deletions(-) diff --git a/examples/notebooks/beam-ml/README.md b/examples/notebooks/beam-ml/README.md index 82b2a4acf270a..8f723c2c91490 100644 --- a/examples/notebooks/beam-ml/README.md +++ b/examples/notebooks/beam-ml/README.md @@ -58,7 +58,7 @@ This section contains the following example notebooks. ### Data enrichment -* [Use Cloud Bigtable to enrich data](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/bigtable_enrichment_transform.ipynb) +* [Use Bigtable to enrich data](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/bigtable_enrichment_transform.ipynb) * [Use Vertex AI Feature Store for feature enrichment](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb) ### Prediction and inference with pretrained models diff --git a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb index dc72e520a9d70..8f201d4b9c276 100644 --- a/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb +++ b/examples/notebooks/beam-ml/vertex_ai_feature_store_enrichment.ipynb @@ -4,8 +4,8 @@ "cell_type": "code", "execution_count": null, "metadata": { - "id": "fFjof1NgAJwu", - "cellView": "form" + "cellView": "form", + "id": "fFjof1NgAJwu" }, "outputs": [], "source": [ @@ -56,26 +56,26 @@ "This notebook shows how to enrich data by using the Apache Beam [enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/) with [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs). The enrichment transform is a turnkey transform in Apache Beam that lets you enrich data using a key-value lookup. This transform has the following features:\n", "\n", "- The transform has a built-in Apache Beam handler that interacts with Vertex AI to get precomputed feature values.\n", - "- The enrichment transform uses client-side throttling to manage rate limiting the requests.\n", + "- The transform uses client-side throttling to manage rate limiting the requests.\n", "- Optionally, you can configure a Redis cache to improve efficiency.\n", "\n", - "As of Apache Beam SDK version 2.55.0, [online feature serving](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview#online_serving) through Bigtable online serving and the Vertex AI Feature Store (Legacy) method is supported. This notebook demonstrates how to use the Bigtable online serving approach with the enrichment transform in an Apache Beam pipeline." + "As of Apache Beam SDK version 2.55.0, [online feature serving](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview#online_serving) through Bigtable online serving and the Vertex AI Feature Store (legacy) method is supported. This notebook demonstrates how to use the Bigtable online serving approach with the enrichment transform in an Apache Beam pipeline." ] }, { "cell_type": "markdown", + "metadata": { + "id": "ltn5zrBiGS9C" + }, "source": [ "This notebook demonstrates an ecommerce product recommendation use case based on the BigQuery public dataset [theLook eCommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce):\n", "\n", - "* Stream of online transactions from [Pub/Sub](https://cloud.google.com/pubsub/docs/guides) contains the following fields: `product_id`, `user_id`, and `sale_price`.\n", - "* Pretrained model is deployed on Vertex AI based on the features - `product_id`, `user_id`, `sale_price`, `age`, `gender`, `state`, and `country`.\n", - "* Feature values for the pretrained model are precomputed and stored in the Vertex AI Feature Store.\n", - "* Enrich the stream of transactions from Pub/Sub with feature values from Vertex AI Feature Store using the `Enrichment` transform.\n", - "* Send the enriched data to the Vertex AI model for online prediction using `RunInference` transform, which predicts the product recommendation for the user." - ], - "metadata": { - "id": "ltn5zrBiGS9C" - } + "* Use a stream of online transactions from [Pub/Sub](https://cloud.google.com/pubsub/docs/guides) that contains the following fields: `product_id`, `user_id`, and `sale_price`.\n", + "* Deploy a pretrained model on Vertex AI based on the features `product_id`, `user_id`, `sale_price`, `age`, `gender`, `state`, and `country`.\n", + "* Precompute the feature values for the pretrained model, and store the values in the Vertex AI Feature Store.\n", + "* Enrich the stream of transactions from Pub/Sub with feature values from Vertex AI Feature Store by using the `Enrichment` transform.\n", + "* Send the enriched data to the Vertex AI model for online prediction by using the `RunInference` transform, which predicts the product recommendation for the user." + ] }, { "cell_type": "markdown", @@ -101,8 +101,8 @@ "cell_type": "code", "execution_count": null, "metadata": { - "id": "jBakpNZnAhqk", - "collapsed": true + "collapsed": true, + "id": "jBakpNZnAhqk" }, "outputs": [], "source": [ @@ -117,6 +117,11 @@ }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "SiJii48A2Rnb" + }, + "outputs": [], "source": [ "import json\n", "import math\n", @@ -151,12 +156,7 @@ "from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler\n", "from tensorflow import keras\n", "from tensorflow.keras import layers" - ], - "metadata": { - "id": "SiJii48A2Rnb" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", @@ -182,12 +182,12 @@ }, { "cell_type": "markdown", - "source": [ - "Replace `` and `` with the appropriate values for your Google Cloud account." - ], "metadata": { "id": "nAmGgUMt48o9" - } + }, + "source": [ + "Replace `` and `` with the appropriate values for your Google Cloud account." + ] }, { "cell_type": "code", @@ -203,71 +203,41 @@ }, { "cell_type": "markdown", + "metadata": { + "id": "RpqZFfFfA_Dt" + }, "source": [ "### Train and deploy the model to Vertex AI\n", "\n" - ], - "metadata": { - "id": "RpqZFfFfA_Dt" - } + ] }, { "cell_type": "markdown", - "source": [ - "Fetch the training data from the BigQuery public dataset [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce)." - ], "metadata": { "id": "8cUpV7mkB_xE" - } + }, + "source": [ + "Fetch the training data from the BigQuery public dataset [thelook-ecommerce](https://pantheon.corp.google.com/marketplace/product/bigquery-public-data/thelook-ecommerce)." + ] }, { "cell_type": "code", - "source": [ - "train_data_query = \"\"\"\n", - "WITH\n", - " order_items AS (\n", - " SELECT cast(user_id as string) AS user_id,\n", - " product_id,\n", - " sale_price,\n", - " FROM `bigquery-public-data.thelook_ecommerce.order_items`),\n", - " users AS (\n", - " SELECT cast(id as string) AS user_id,\n", - " age,\n", - " lower(gender) as gender,\n", - " lower(state) as state,\n", - " lower(country) as country,\n", - " FROM `bigquery-public-data.thelook_ecommerce.users`)\n", - "SELECT *\n", - "FROM order_items\n", - "LEFT OUTER JOIN users\n", - "USING (user_id)\n", - "\"\"\"\n", - "\n", - "client = bigquery.Client(project=PROJECT_ID)\n", - "train_data = client.query(train_data_query).result().to_dataframe()\n", - "train_data.head()" - ], + "execution_count": null, "metadata": { - "id": "TpxDHGObBEsj", "colab": { "base_uri": "https://localhost:8080/", "height": 206 }, + "id": "TpxDHGObBEsj", "outputId": "4f7afe32-a72b-40d3-b9ae-cc999ad104b8" }, - "execution_count": null, "outputs": [ { - "output_type": "execute_result", "data": { - "text/plain": [ - " user_id product_id sale_price age gender state country\n", - "0 68717 14235 0.02 43 f sachsen germany\n", - "1 59866 28700 1.50 17 m chongqing china\n", - "2 38322 14202 1.50 47 f missouri united states\n", - "3 7839 28700 1.50 64 m mato grosso brasil\n", - "4 40877 28700 1.50 68 m sergipe brasil" - ], + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "dataframe", + "variable_name": "train_data" + }, "text/html": [ "\n", "
\n", @@ -562,58 +532,73 @@ "
\n", " \n" ], - "application/vnd.google.colaboratory.intrinsic+json": { - "type": "dataframe", - "variable_name": "train_data" - } + "text/plain": [ + " user_id product_id sale_price age gender state country\n", + "0 68717 14235 0.02 43 f sachsen germany\n", + "1 59866 28700 1.50 17 m chongqing china\n", + "2 38322 14202 1.50 47 f missouri united states\n", + "3 7839 28700 1.50 64 m mato grosso brasil\n", + "4 40877 28700 1.50 68 m sergipe brasil" + ] }, + "execution_count": 4, "metadata": {}, - "execution_count": 4 + "output_type": "execute_result" } + ], + "source": [ + "train_data_query = \"\"\"\n", + "WITH\n", + " order_items AS (\n", + " SELECT cast(user_id as string) AS user_id,\n", + " product_id,\n", + " sale_price,\n", + " FROM `bigquery-public-data.thelook_ecommerce.order_items`),\n", + " users AS (\n", + " SELECT cast(id as string) AS user_id,\n", + " age,\n", + " lower(gender) as gender,\n", + " lower(state) as state,\n", + " lower(country) as country,\n", + " FROM `bigquery-public-data.thelook_ecommerce.users`)\n", + "SELECT *\n", + "FROM order_items\n", + "LEFT OUTER JOIN users\n", + "USING (user_id)\n", + "\"\"\"\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID)\n", + "train_data = client.query(train_data_query).result().to_dataframe()\n", + "train_data.head()" ] }, { "cell_type": "markdown", - "source": [ - "Create a prediction dataframe that contains the `product_id` to recommend to the user. Also, preprocess the data for columns that contain the categorical values." - ], "metadata": { "id": "OkYcJPC0THoV" - } + }, + "source": [ + "Create a prediction dataframe that contains the `product_id` to recommend to the user. Preprocess the data for columns that contain the categorical values." + ] }, { "cell_type": "code", - "source": [ - "# Create a prediction dataframe.\n", - "prediction_data = train_data['product_id'].sample(frac=1, replace=True)\n", - "\n", - "# Preprocess data to handle categorical values.\n", - "train_data['gender'] = pd.factorize(train_data['gender'])[0]\n", - "train_data['state'] = pd.factorize(train_data['state'])[0]\n", - "train_data['country'] = pd.factorize(train_data['country'])[0]\n", - "train_data.head()" - ], + "execution_count": null, "metadata": { - "id": "ej6jCkMF0B29", "colab": { "base_uri": "https://localhost:8080/", "height": 206 }, + "id": "ej6jCkMF0B29", "outputId": "44cfd7f1-0c7c-40a8-813f-02af86a6f788" }, - "execution_count": null, "outputs": [ { - "output_type": "execute_result", "data": { - "text/plain": [ - " user_id product_id sale_price age gender state country\n", - "0 68717 14235 0.02 43 0 0 0\n", - "1 59866 28700 1.50 17 1 1 1\n", - "2 38322 14202 1.50 47 0 2 2\n", - "3 7839 28700 1.50 64 1 3 3\n", - "4 40877 28700 1.50 68 1 4 3" - ], + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "dataframe", + "variable_name": "train_data" + }, "text/html": [ "\n", "
\n", @@ -908,48 +893,68 @@ "
\n", " \n" ], - "application/vnd.google.colaboratory.intrinsic+json": { - "type": "dataframe", - "variable_name": "train_data" - } + "text/plain": [ + " user_id product_id sale_price age gender state country\n", + "0 68717 14235 0.02 43 0 0 0\n", + "1 59866 28700 1.50 17 1 1 1\n", + "2 38322 14202 1.50 47 0 2 2\n", + "3 7839 28700 1.50 64 1 3 3\n", + "4 40877 28700 1.50 68 1 4 3" + ] }, + "execution_count": 5, "metadata": {}, - "execution_count": 5 + "output_type": "execute_result" } + ], + "source": [ + "# Create a prediction dataframe.\n", + "prediction_data = train_data['product_id'].sample(frac=1, replace=True)\n", + "\n", + "# Preprocess data to handle categorical values.\n", + "train_data['gender'] = pd.factorize(train_data['gender'])[0]\n", + "train_data['state'] = pd.factorize(train_data['state'])[0]\n", + "train_data['country'] = pd.factorize(train_data['country'])[0]\n", + "train_data.head()" ] }, { "cell_type": "markdown", - "source": [ - "Convert the dataframe to tensors." - ], "metadata": { "id": "7ffoopdQVk8W" - } + }, + "source": [ + "Convert the dataframe to tensors." + ] }, { "cell_type": "code", - "source": [ - "train_tensors = tf.convert_to_tensor(train_data.values, dtype=tf.float32)\n", - "prediction_tensors = tf.convert_to_tensor(prediction_data.values, dtype=tf.float32)" - ], + "execution_count": null, "metadata": { "id": "vmHH26KDVkuf" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "train_tensors = tf.convert_to_tensor(train_data.values, dtype=tf.float32)\n", + "prediction_tensors = tf.convert_to_tensor(prediction_data.values, dtype=tf.float32)" + ] }, { "cell_type": "markdown", - "source": [ - "Based on this data, build a basic neural network model by using TensorFlow." - ], "metadata": { "id": "CRoW8ElNV4I9" - } + }, + "source": [ + "Based on this data, build a basic neural network model by using TensorFlow." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "EKrb13wsV3m4" + }, + "outputs": [], "source": [ "inputs = layers.Input(shape=(7,))\n", "x = layers.Dense(7, activation='relu')(inputs)\n", @@ -957,57 +962,57 @@ "outputs = layers.Dense(1)(x)\n", "\n", "model = keras.Model(inputs=inputs, outputs=outputs)" - ], - "metadata": { - "id": "EKrb13wsV3m4" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Train the model. This step takes about 90 seconds for one epoch." - ], "metadata": { "id": "Duv4qzmEWFSZ" - } + }, + "source": [ + "Train the model. This step takes about 90 seconds for one epoch." + ] }, { "cell_type": "code", - "source": [ - "EPOCHS = 1" - ], + "execution_count": null, "metadata": { "id": "bHg1kcvnk7Xb" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "EPOCHS = 1" + ] }, { "cell_type": "code", - "source": [ - "model.compile(optimizer='adam', loss='mse')\n", - "model.fit(train_tensors, prediction_tensors, epochs=EPOCHS)" - ], + "execution_count": null, "metadata": { "id": "4GrDp5_WWGZv" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "model.compile(optimizer='adam', loss='mse')\n", + "model.fit(train_tensors, prediction_tensors, epochs=EPOCHS)" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "_rJYv8fFFPYb" + }, "source": [ "Save the model to the `MODEL_PATH` variable.\n", "\n" - ], - "metadata": { - "id": "_rJYv8fFFPYb" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "W4t260o9FURP" + }, + "outputs": [], "source": [ "# Create a new directory to save the model.\n", "!mkdir model\n", @@ -1015,36 +1020,36 @@ "# Save the model.\n", "MODEL_PATH = './model/'\n", "tf.saved_model.save(model, MODEL_PATH)" - ], - "metadata": { - "id": "W4t260o9FURP" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Stage the locally saved model to a Google Cloud Storage bucket. Use this Cloud Storage bucket to deploy the model to Vertex AI." - ], "metadata": { "id": "hsJOxFTWj6JX" - } + }, + "source": [ + "Stage the locally saved model to a Google Cloud Storage bucket. Use this Cloud Storage bucket to deploy the model to Vertex AI." + ] }, { "cell_type": "code", - "source": [ - "GCS_BUCKET = 'GCS_BUCKET_NAME'\n", - "GCS_BUCKET_DIRECTORY = 'GCS_BUCKET_DIRECTORY'" - ], + "execution_count": null, "metadata": { "id": "WQp1e_JgllBW" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "GCS_BUCKET = ''\n", + "GCS_BUCKET_DIRECTORY = ''" + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "yiXRXV89e8_Y" + }, + "outputs": [], "source": [ "# Stage to the Cloud Storage bucket.\n", "import glob\n", @@ -1061,35 +1066,35 @@ "\n", "\n", "upload_model_to_gcs(MODEL_PATH, bucket, GCS_BUCKET_DIRECTORY)" - ], - "metadata": { - "id": "yiXRXV89e8_Y" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Upload the model saved in the Cloud Storage bucket to Vertex AI Model Registry." - ], "metadata": { "id": "O72h009kl_-L" - } + }, + "source": [ + "Upload the model saved in the Cloud Storage bucket to Vertex AI Model Registry." + ] }, { "cell_type": "code", - "source": [ - "model_display_name = 'vertex-ai-enrichment'" - ], + "execution_count": null, "metadata": { "id": "bKN5pUD3uImj" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "model_display_name = 'vertex-ai-enrichment'" + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "Pp3Jca9GfpEj" + }, + "outputs": [], "source": [ "aiplatform.init(project=PROJECT_ID, location=LOCATION)\n", "model = aiplatform.Model.upload(\n", @@ -1098,70 +1103,62 @@ " artifact_uri=\"gs://\" + GCS_BUCKET + \"/\" + GCS_BUCKET_DIRECTORY,\n", " serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest',\n", ")" - ], - "metadata": { - "id": "Pp3Jca9GfpEj" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Create an endpoint on Vertex AI." - ], "metadata": { "id": "ms_KqSIbZkLP" - } + }, + "source": [ + "Create an endpoint on Vertex AI." + ] }, { "cell_type": "code", - "source": [ - "endpoint = aiplatform.Endpoint.create(display_name = model_display_name,\n", - " project = PROJECT_ID,\n", - " location = LOCATION)" - ], + "execution_count": null, "metadata": { - "id": "YKKzRrN6czni", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "YKKzRrN6czni", "outputId": "bfd954c0-8267-476d-dd0c-15e612ae0cc1" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "endpoint = aiplatform.Endpoint.create(display_name = model_display_name,\n", + " project = PROJECT_ID,\n", + " location = LOCATION)" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "WgSpy0J3oBFP" + }, "source": [ "Deploy the model to the Vertex AI endpoint.\n", "\n", "**Note:** This step is a Long Running Operation (LRO). Depending on the size of the model, it might take more than five minutes to complete." - ], - "metadata": { - "id": "WgSpy0J3oBFP" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "FLQtMVQjnsls" + }, + "outputs": [], "source": [ "deployed_model_display_name = 'vertexai-enrichment-notebook'\n", "model.deploy(endpoint = endpoint,\n", " deployed_model_display_name = deployed_model_display_name,\n", " machine_type = 'n1-standard-2')" - ], - "metadata": { - "id": "FLQtMVQjnsls" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "code", - "source": [ - "model_endpoint_id = aiplatform.Endpoint.list(filter=f'display_name=\"{deployed_model_display_name}\"')[0].name\n", - "print(model_endpoint_id)" - ], + "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" @@ -1169,82 +1166,57 @@ "id": "3JjIwzZouAi5", "outputId": "ffb1fb74-365a-426b-d60d-d3910c116e10" }, - "execution_count": null, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "8125472293125095424\n" ] } + ], + "source": [ + "model_endpoint_id = aiplatform.Endpoint.list(filter=f'display_name=\"{deployed_model_display_name}\"')[0].name\n", + "print(model_endpoint_id)" ] }, { "cell_type": "markdown", - "source": [ - "### Set up the Vertex AI Feature Store for online serving\n" - ], "metadata": { "id": "ouMQZ4sC4zuO" - } + }, + "source": [ + "### Set up the Vertex AI Feature Store for online serving\n" + ] }, { "cell_type": "markdown", - "source": [ - "Set up the feature data in BigQuery." - ], "metadata": { "id": "B1Bk7XP7190z" - } + }, + "source": [ + "Set up the feature data in BigQuery." + ] }, { "cell_type": "code", - "source": [ - "feature_store_query = \"\"\"\n", - "SELECT cast(id as string) AS user_id,\n", - " age,\n", - " lower(gender) as gender,\n", - " lower(state) as state,\n", - " lower(country) as country,\n", - "FROM `bigquery-public-data.thelook_ecommerce.users`\n", - "\"\"\"\n", - "\n", - "# Fetch feature values from BigQuery.\n", - "client = bigquery.Client(project=PROJECT_ID)\n", - "data = client.query(feature_store_query).result().to_dataframe()\n", - "\n", - "# Convert feature values to the string type. This step helps when creating tensor\n", - "# of these values for inference that requires the same data type.\n", - "data['gender'] = pd.factorize(data['gender'])[0]\n", - "data['gender'] = data['gender'].astype(str)\n", - "data['state'] = pd.factorize(data['state'])[0]\n", - "data['state'] = data['state'].astype(str)\n", - "data['country'] = pd.factorize(data['country'])[0]\n", - "data['country'] = data['country'].astype(str)\n", - "data.head()" - ], + "execution_count": null, "metadata": { - "id": "4Qkysu_g19c_", "colab": { "base_uri": "https://localhost:8080/", "height": 206 }, + "id": "4Qkysu_g19c_", "outputId": "187ee1e8-07c9-457a-abbe-fab724d997ce" }, - "execution_count": null, "outputs": [ { - "output_type": "execute_result", "data": { - "text/plain": [ - " user_id age gender state country\n", - "0 7723 12 0 0 0\n", - "1 93041 12 0 1 1\n", - "2 45741 12 1 1 1\n", - "3 16718 12 0 1 1\n", - "4 70137 12 1 1 1" - ], + "application/vnd.google.colaboratory.intrinsic+json": { + "summary": "{\n \"name\": \"data\",\n \"rows\": 100000,\n \"fields\": [\n {\n \"column\": \"user_id\",\n \"properties\": {\n \"dtype\": \"string\",\n \"num_unique_values\": 100000,\n \"samples\": [\n \"66192\",\n \"73109\",\n \"49397\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"age\",\n \"properties\": {\n \"dtype\": \"Int64\",\n \"num_unique_values\": 59,\n \"samples\": [\n \"12\",\n \"17\",\n \"46\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"gender\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 2,\n \"samples\": [\n \"1\",\n \"0\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"state\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 231,\n \"samples\": [\n \"218\",\n \"66\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"country\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 15,\n \"samples\": [\n \"9\",\n \"11\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n }\n ]\n}", + "type": "dataframe", + "variable_name": "data" + }, "text/html": [ "\n", "
\n", @@ -1527,28 +1499,65 @@ "
\n", " \n" ], - "application/vnd.google.colaboratory.intrinsic+json": { - "type": "dataframe", - "variable_name": "data", - "summary": "{\n \"name\": \"data\",\n \"rows\": 100000,\n \"fields\": [\n {\n \"column\": \"user_id\",\n \"properties\": {\n \"dtype\": \"string\",\n \"num_unique_values\": 100000,\n \"samples\": [\n \"66192\",\n \"73109\",\n \"49397\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"age\",\n \"properties\": {\n \"dtype\": \"Int64\",\n \"num_unique_values\": 59,\n \"samples\": [\n \"12\",\n \"17\",\n \"46\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"gender\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 2,\n \"samples\": [\n \"1\",\n \"0\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"state\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 231,\n \"samples\": [\n \"218\",\n \"66\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"country\",\n \"properties\": {\n \"dtype\": \"category\",\n \"num_unique_values\": 15,\n \"samples\": [\n \"9\",\n \"11\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n }\n ]\n}" - } + "text/plain": [ + " user_id age gender state country\n", + "0 7723 12 0 0 0\n", + "1 93041 12 0 1 1\n", + "2 45741 12 1 1 1\n", + "3 16718 12 0 1 1\n", + "4 70137 12 1 1 1" + ] }, + "execution_count": 8, "metadata": {}, - "execution_count": 8 + "output_type": "execute_result" } + ], + "source": [ + "feature_store_query = \"\"\"\n", + "SELECT cast(id as string) AS user_id,\n", + " age,\n", + " lower(gender) as gender,\n", + " lower(state) as state,\n", + " lower(country) as country,\n", + "FROM `bigquery-public-data.thelook_ecommerce.users`\n", + "\"\"\"\n", + "\n", + "# Fetch feature values from BigQuery.\n", + "client = bigquery.Client(project=PROJECT_ID)\n", + "data = client.query(feature_store_query).result().to_dataframe()\n", + "\n", + "# Convert feature values to the string type. This step helps when creating tensors\n", + "# of these values for inference that requires the same data type.\n", + "data['gender'] = pd.factorize(data['gender'])[0]\n", + "data['gender'] = data['gender'].astype(str)\n", + "data['state'] = pd.factorize(data['state'])[0]\n", + "data['state'] = data['state'].astype(str)\n", + "data['country'] = pd.factorize(data['country'])[0]\n", + "data['country'] = data['country'].astype(str)\n", + "data.head()" ] }, { "cell_type": "markdown", - "source": [ - "Create a BigQuery dataset to use as the source for the Vertex AI Feature Store." - ], "metadata": { "id": "Mm-HCUaa3ROZ" - } + }, + "source": [ + "Create a BigQuery dataset to use as the source for the Vertex AI Feature Store." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "vye3UBGZ3Q8n", + "outputId": "437597af-837d-483e-8c1e-ebbe0eca81e0" + }, + "outputs": [], "source": [ "dataset_id = \"vertexai_enrichment\"\n", "dataset = bigquery.Dataset(f\"{PROJECT_ID}.{dataset_id}\")\n", @@ -1558,51 +1567,47 @@ ")\n", "\n", "print(\"Created dataset - %s.%s\" % (dataset, dataset_id))" - ], - "metadata": { - "id": "vye3UBGZ3Q8n", - "colab": { - "base_uri": "https://localhost:8080/" - }, - "outputId": "437597af-837d-483e-8c1e-ebbe0eca81e0" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Create a BigQuery view with the precomputed feature values." - ], "metadata": { "id": "7lKiprPX4AZy" - } + }, + "source": [ + "Create a BigQuery view with the precomputed feature values." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xqaLPTxb4DDF" + }, + "outputs": [], "source": [ "view_id = \"users_view\"\n", "view_reference = \"%s.%s.%s\" % (PROJECT_ID, dataset_id, view_id)\n", "view = bigquery.Table(view_reference)\n", "view = client.load_table_from_dataframe(data, view_reference)" - ], - "metadata": { - "id": "xqaLPTxb4DDF" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Initialize clients for Vertex AI to create and set up an online store." - ], "metadata": { "id": "eQLkSg3p7WAm" - } + }, + "source": [ + "Initialize clients for Vertex AI to create and set up an online store." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "GF_eIl-wVvRy" + }, + "outputs": [], "source": [ "API_ENDPOINT = f\"{LOCATION}-aiplatform.googleapis.com\"\n", "\n", @@ -1612,24 +1617,28 @@ "registry_client = FeatureRegistryServiceClient(\n", " client_options={\"api_endpoint\": API_ENDPOINT}\n", ")" - ], - "metadata": { - "id": "GF_eIl-wVvRy" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Create online store instances on Vertex AI." - ], "metadata": { "id": "d9Mbk6m9Vgdo" - } + }, + "source": [ + "Create an online store instances on Vertex AI." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Zj-xEu_hWY7f", + "outputId": "7f4ed1d9-c0c4-4c3c-f199-1e340d2cff11" + }, + "outputs": [], "source": [ "feature_store_name = \"vertexai_enrichment\"\n", "\n", @@ -1650,28 +1659,28 @@ ")\n", "\n", "create_store_lro.result()" - ], - "metadata": { - "id": "Zj-xEu_hWY7f", - "colab": { - "base_uri": "https://localhost:8080/" - }, - "outputId": "7f4ed1d9-c0c4-4c3c-f199-1e340d2cff11" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "For the store instances created previously, use BigQuery as the data source to create feature views." - ], "metadata": { "id": "DAHjWlqXXLU_" - } + }, + "source": [ + "For the store instances created previously, use BigQuery as the data source to create feature views." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "IhUERuRGXNaN", + "outputId": "84facd77-5be4-4c99-90b5-d8ccb4c5d702" + }, + "outputs": [], "source": [ "feature_view_name = \"users\"\n", "\n", @@ -1690,41 +1699,37 @@ ")\n", "\n", "create_view_lro.result()" - ], - "metadata": { - "id": "IhUERuRGXNaN", - "colab": { - "base_uri": "https://localhost:8080/" - }, - "outputId": "84facd77-5be4-4c99-90b5-d8ccb4c5d702" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Pull feature values from BigQuery into the feature store." - ], "metadata": { "id": "qbf4l8eBX6NG" - } + }, + "source": [ + "Pull feature values from BigQuery into the feature store." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "gdpsLCmMX7fX" + }, + "outputs": [], "source": [ "sync_response = admin_client.sync_feature_view(\n", " feature_view=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", ")" - ], - "metadata": { - "id": "gdpsLCmMX7fX" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "Lav6JTW4YKhR" + }, + "outputs": [], "source": [ "while True:\n", " feature_view_sync = admin_client.get_feature_view_sync(\n", @@ -1737,110 +1742,88 @@ " print(\"feature view sync failed for %s\" % feature_view_sync.name)\n", " break\n", " time.sleep(10)" - ], - "metadata": { - "id": "Lav6JTW4YKhR" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Confirm the sync creation." - ], "metadata": { "id": "T3MMx7oJYPeC" - } + }, + "source": [ + "Confirm the sync creation." + ] }, { "cell_type": "code", - "source": [ - "admin_client.list_feature_view_syncs(\n", - " parent=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", - ")" - ], + "execution_count": null, "metadata": { - "id": "ucSQRUfUYRFX", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "ucSQRUfUYRFX", "outputId": "d2160812-9874-40bb-f464-f797eafb9999" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "admin_client.list_feature_view_syncs(\n", + " parent=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", + ")" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "pHODouJDwc60" + }, "source": [ "### Publish messages to Pub/Sub\n", "\n", "Use the Pub/Sub Python client to publish messages.\n" - ], - "metadata": { - "id": "pHODouJDwc60" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "QKCuwDioxw-f" + }, + "outputs": [], "source": [ "# Replace with the name of your Pub/Sub topic.\n", "TOPIC = \" \"\n", "\n", "# Replace with the subscription path for your topic.\n", "SUBSCRIPTION = \"\"" - ], - "metadata": { - "id": "QKCuwDioxw-f" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "Retrieve sample data from a public dataset in BigQuery. Convert it into Python dictionaries, and then send it to Pub/Sub." - ], - "metadata": { - "id": "R0QYsOYFb_EU" - } + ] }, { - "cell_type": "code", - "source": [ - "read_query = \"\"\"\n", - "SELECT cast(user_id as string) AS user_id,\n", - " product_id,\n", - " sale_price,\n", - "FROM `bigquery-public-data.thelook_ecommerce.order_items`\n", - "LIMIT 5;\n", - "\"\"\"\n", - "\n", - "client = bigquery.Client(project=PROJECT_ID)\n", - "data = client.query(read_query).result().to_dataframe()\n", - "data.head()" - ], + "cell_type": "markdown", + "metadata": { + "id": "R0QYsOYFb_EU" + }, + "source": [ + "Retrieve sample data from a public dataset in BigQuery. Convert it into Python dictionaries, and then send it to Pub/Sub." + ] + }, + { + "cell_type": "code", + "execution_count": null, "metadata": { - "id": "Kn7wmiKib-Wx", "colab": { "base_uri": "https://localhost:8080/", "height": 206 }, + "id": "Kn7wmiKib-Wx", "outputId": "9680fbcc-dcb5-4158-90ae-69a9f3c776d0" }, - "execution_count": null, "outputs": [ { - "output_type": "execute_result", "data": { - "text/plain": [ - " user_id product_id sale_price\n", - "0 25005 14235 0.02\n", - "1 62544 14235 0.02\n", - "2 17228 14235 0.02\n", - "3 54015 14235 0.02\n", - "4 16569 14235 0.02" - ], + "application/vnd.google.colaboratory.intrinsic+json": { + "summary": "{\n \"name\": \"data\",\n \"rows\": 5,\n \"fields\": [\n {\n \"column\": \"user_id\",\n \"properties\": {\n \"dtype\": \"string\",\n \"num_unique_values\": 5,\n \"samples\": [\n \"62544\",\n \"16569\",\n \"17228\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"product_id\",\n \"properties\": {\n \"dtype\": \"Int64\",\n \"num_unique_values\": 1,\n \"samples\": [\n \"14235\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"sale_price\",\n \"properties\": {\n \"dtype\": \"number\",\n \"std\": 0.0,\n \"min\": 0.0199999995529651,\n \"max\": 0.0199999995529651,\n \"num_unique_values\": 1,\n \"samples\": [\n 0.0199999995529651\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n }\n ]\n}", + "type": "dataframe", + "variable_name": "data" + }, "text/html": [ "\n", "
\n", @@ -2111,19 +2094,41 @@ "
\n", " \n" ], - "application/vnd.google.colaboratory.intrinsic+json": { - "type": "dataframe", - "variable_name": "data", - "summary": "{\n \"name\": \"data\",\n \"rows\": 5,\n \"fields\": [\n {\n \"column\": \"user_id\",\n \"properties\": {\n \"dtype\": \"string\",\n \"num_unique_values\": 5,\n \"samples\": [\n \"62544\",\n \"16569\",\n \"17228\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"product_id\",\n \"properties\": {\n \"dtype\": \"Int64\",\n \"num_unique_values\": 1,\n \"samples\": [\n \"14235\"\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n },\n {\n \"column\": \"sale_price\",\n \"properties\": {\n \"dtype\": \"number\",\n \"std\": 0.0,\n \"min\": 0.0199999995529651,\n \"max\": 0.0199999995529651,\n \"num_unique_values\": 1,\n \"samples\": [\n 0.0199999995529651\n ],\n \"semantic_type\": \"\",\n \"description\": \"\"\n }\n }\n ]\n}" - } + "text/plain": [ + " user_id product_id sale_price\n", + "0 25005 14235 0.02\n", + "1 62544 14235 0.02\n", + "2 17228 14235 0.02\n", + "3 54015 14235 0.02\n", + "4 16569 14235 0.02" + ] }, + "execution_count": 18, "metadata": {}, - "execution_count": 18 + "output_type": "execute_result" } + ], + "source": [ + "read_query = \"\"\"\n", + "SELECT cast(user_id as string) AS user_id,\n", + " product_id,\n", + " sale_price,\n", + "FROM `bigquery-public-data.thelook_ecommerce.order_items`\n", + "LIMIT 5;\n", + "\"\"\"\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID)\n", + "data = client.query(read_query).result().to_dataframe()\n", + "data.head()" ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "MaCJwaPexPKZ" + }, + "outputs": [], "source": [ "messages = data.to_dict(orient='records')\n", "\n", @@ -2133,26 +2138,24 @@ "for message in messages:\n", " data = json.dumps(message).encode('utf-8')\n", " publish_future = publisher.publish(topic_name, data)" - ], - "metadata": { - "id": "MaCJwaPexPKZ" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "zPSFEMm02omi" + }, "source": [ "## Use the Vertex AI Feature Store enrichment handler\n", "\n", "The [`VertexAIFeatureStoreEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler) is a built-in handler included in the Apache Beam SDK versions 2.55.0 and later." - ], - "metadata": { - "id": "zPSFEMm02omi" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "K41xhvmA5yQk" + }, "source": [ "Configure the `VertexAIFeatureStoreEnrichmentHandler` with the following required parameters:\n", "\n", @@ -2163,29 +2166,31 @@ "* `feature_view_name`: the name of the feature view within the Vertex AI Feature Store\n", "* `row_key`: The field name in the input row containing the entity ID for the feature store. This value is used to extract the entity ID from each element. The entity ID is used to fetch feature values for that specific element in the enrichment transform.\n", "\n", - "Optionally, to provide more configuration values to connect with the Vertex AI client, the `VertexAIFeatureStoreEnrichmentHandler` accepts a kwargs. For more information, see [`FeatureOnlineStoreServiceClient`](https://cloud.google.com/php/docs/reference/cloud-ai-platform/latest/V1.FeatureOnlineStoreServiceClient).\n", + "Optionally, to provide more configuration values to connect with the Vertex AI client, the `VertexAIFeatureStoreEnrichmentHandler` accepts a keyword argument (kwargs). For more information, see [`FeatureOnlineStoreServiceClient`](https://cloud.google.com/php/docs/reference/cloud-ai-platform/latest/V1.FeatureOnlineStoreServiceClient).\n", "\n", "**Note:** When exceptions occur, by default, the logging severity is set to warning ([`ExceptionLevel.WARN`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.utils.html#apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel.WARN)). To configure the severity to raise exceptions, set `exception_level` to [`ExceptionLevel.RAISE`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.utils.html#apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel.RAISE). To ignore exceptions, set `exception_level` to [`ExceptionLevel.QUIET`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.utils.html#apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel.QUIET).\n", "\n", "The `VertexAIFeatureStoreEnrichmentHandler` handler returns the latest feature values from the feature store." - ], - "metadata": { - "id": "K41xhvmA5yQk" - } + ] }, { "cell_type": "code", - "source": [ - "row_key = 'user_id'" - ], + "execution_count": null, "metadata": { "id": "3dB26jhI45gd" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "row_key = 'user_id'" + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "cr1j_DHK4gA4" + }, + "outputs": [], "source": [ "vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(project=PROJECT_ID,\n", " location=LOCATION,\n", @@ -2193,15 +2198,13 @@ " feature_store_name=feature_store_name,\n", " feature_view_name=feature_view_name,\n", " row_key=row_key)" - ], - "metadata": { - "id": "cr1j_DHK4gA4" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "-Lvo8O2V-0Ey" + }, "source": [ "## Use the enrichment transform\n", "\n", @@ -2215,13 +2218,13 @@ "\n", "\n", "To use the Redis cache, apply the `with_redis_cache` hook to the enrichment transform. The coders for encoding and decoding the input and output for the cache are optional and are internally inferred." - ], - "metadata": { - "id": "-Lvo8O2V-0Ey" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "xJTCfSmiV1kv" + }, "source": [ "The following example demonstrates the code needed to add this transform to your pipeline.\n", "\n", @@ -2239,43 +2242,45 @@ "\n", "\n", "\n" - ], - "metadata": { - "id": "xJTCfSmiV1kv" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "F-xjiP_pHWZr" + }, "source": [ "To make a prediction, use the following fields: `product_id`, `quantity`, `price`, `customer_id`, and `customer_location`. Retrieve the value of the `customer_location` field from Bigtable.\n", "\n", "The enrichment transform performs a [`cross_join`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.cross_join) by default." - ], - "metadata": { - "id": "F-xjiP_pHWZr" - } + ] }, { "cell_type": "markdown", + "metadata": { + "id": "CX9Cqybu6scV" + }, "source": [ "## Use the `VertexAIModelHandlerJSON` interface to run inference\n", "\n" - ], - "metadata": { - "id": "CX9Cqybu6scV" - } + ] }, { "cell_type": "markdown", - "source": [ - "Because the enrichment transform outputs data in the format `beam.Row`, in order to align it with the [`VertexAIModelHandlerJSON`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.vertex_ai_inference.html#apache_beam.ml.inference.vertex_ai_inference.VertexAIModelHandlerJSON) interface, convert the output into a list of `tensorflow.tensor`. Some enriched fields are of `string` type. For tensor creation, all values must be of the same type. Therefore, convert any `string` type fields to `int` type fields before creating a tensor." - ], "metadata": { "id": "zy5Jl7_gLklX" - } + }, + "source": [ + "Because the enrichment transform outputs data in the format `beam.Row`, in order to align it with the [`VertexAIModelHandlerJSON`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.vertex_ai_inference.html#apache_beam.ml.inference.vertex_ai_inference.VertexAIModelHandlerJSON) interface, convert the output into a list of `tensorflow.tensor`. Some enriched fields are of `string` type. For tensor creation, all values must be of the same type. Therefore, convert any `string` type fields to `int` type fields before creating a tensor." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "KBKoB06nL4LF" + }, + "outputs": [], "source": [ "def convert_row_to_tensor(element: beam.Row):\n", " element_dict = element._asdict()\n", @@ -2284,57 +2289,52 @@ " if isinstance(r, str):\n", " row[i] = int(r)\n", " return tf.convert_to_tensor(row, dtype=tf.float32).numpy().tolist()" - ], - "metadata": { - "id": "KBKoB06nL4LF" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Initialize the model handler with the preprocessing function." - ], "metadata": { "id": "-tGHyB_vL3rJ" - } + }, + "source": [ + "Initialize the model handler with the preprocessing function." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "VqUUEwcU-r2e" + }, + "outputs": [], "source": [ "model_handler = VertexAIModelHandlerJSON(endpoint_id=model_endpoint_id,\n", " project=PROJECT_ID,\n", " location=LOCATION,\n", " ).with_preprocess_fn(convert_row_to_tensor)" - ], - "metadata": { - "id": "VqUUEwcU-r2e" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "Define a `DoFn` to format the output." - ], "metadata": { "id": "vNHI4gVgNec2" - } + }, + "source": [ + "Define a `DoFn` to format the output." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "rkN-_Yf4Nlwy" + }, + "outputs": [], "source": [ "class PostProcessor(beam.DoFn):\n", " def process(self, element, *args, **kwargs):\n", " print('Customer %d who bought product %d is recommended to buy product %d' % (element.example[0], element.example[1], math.ceil(element.inference[0])))" - ], - "metadata": { - "id": "rkN-_Yf4Nlwy" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", @@ -2347,36 +2347,41 @@ }, { "cell_type": "markdown", - "source": [ - "Configure the pipeline to run in streaming mode." - ], "metadata": { "id": "WrwY0_gV_IDK" - } + }, + "source": [ + "Configure the pipeline to run in streaming mode." + ] }, { "cell_type": "code", - "source": [ - "options = pipeline_options.PipelineOptions()\n", - "options.view_as(pipeline_options.StandardOptions).streaming = True # Streaming mode is set to True" - ], + "execution_count": null, "metadata": { "id": "t0425sYBsYtB" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "options = pipeline_options.PipelineOptions()\n", + "options.view_as(pipeline_options.StandardOptions).streaming = True # Streaming mode is set to True" + ] }, { "cell_type": "markdown", - "source": [ - "Pub/Sub sends the data in bytes. Convert the data to `beam.Row` objects by using a `DoFn`." - ], "metadata": { "id": "DBNijQDY_dRe" - } + }, + "source": [ + "Pub/Sub sends the data in bytes. Convert the data to `beam.Row` objects by using a `DoFn`." + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "sRw9iL8pKP5O" + }, + "outputs": [], "source": [ "class DecodeBytes(beam.DoFn):\n", " \"\"\"\n", @@ -2387,39 +2392,34 @@ " def process(self, element, *args, **kwargs):\n", " element_dict = json.loads(element.decode('utf-8'))\n", " yield beam.Row(**element_dict)" - ], - "metadata": { - "id": "sRw9iL8pKP5O" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "xofUJym-_GuB" + }, "source": [ "Use the following code to run the pipeline.\n", "\n", "**Note:** Because this pipeline is a streaming pipeline, you need to manually stop the cell. If you don't stop the cell, the pipeline continues to run." - ], - "metadata": { - "id": "xofUJym-_GuB" - } + ] }, { "cell_type": "code", "execution_count": null, "metadata": { - "id": "St07XoibcQSb", "colab": { "base_uri": "https://localhost:8080/", "height": 671 }, + "id": "St07XoibcQSb", "outputId": "0ca70756-6a69-4d63-9ab7-8814ae6adf05" }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Customer 25005 who bought product 14235 is recommended to buy product 8944\n", "Customer 62544 who bought product 14235 is recommended to buy product 23313\n", @@ -2442,46 +2442,46 @@ }, { "cell_type": "markdown", - "source": [ - "## Clean up resources" - ], "metadata": { "id": "yDjkq2VI7fuM" - } + }, + "source": [ + "## Clean up resources" + ] }, { "cell_type": "code", - "source": [ - "# Delete feature views.\n", - "admin_client.delete_feature_view(\n", - " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", - ")\n", - "\n", - "# Delete online store instance.\n", - "admin_client.delete_feature_online_store(\n", - " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}\",\n", - " force=True,\n", - ")" - ], + "execution_count": null, "metadata": { - "id": "UiPb_kzv7pCu", "colab": { "base_uri": "https://localhost:8080/" }, + "id": "UiPb_kzv7pCu", "outputId": "7902c30f-4db0-431b-9dd8-b647b3cb34da" }, - "execution_count": null, "outputs": [ { - "output_type": "execute_result", "data": { "text/plain": [ "" ] }, + "execution_count": 8, "metadata": {}, - "execution_count": 8 + "output_type": "execute_result" } + ], + "source": [ + "# Delete feature views.\n", + "admin_client.delete_feature_view(\n", + " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}/featureViews/{feature_view_name}\"\n", + ")\n", + "\n", + "# Delete online store instance.\n", + "admin_client.delete_feature_online_store(\n", + " name=f\"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{feature_store_name}\",\n", + " force=True,\n", + ")" ] } ], @@ -2499,4 +2499,4 @@ }, "nbformat": 4, "nbformat_minor": 0 -} \ No newline at end of file +}