Copyright 2024 Google LLC
#
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
#
   https://www.apache.org/licenses/LICENSE-2.0
#
Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

This section shows you how to upload Vectors into a new Elasticsearch index and run simple search queries using the official Elasticsearch client.

In this example, you use a dataset from a CSV file that contains a list of books in different genres. Elasticsearch will serve as a search engine.

Install kubectl and the Google Cloud SDK with the necessary authentication plugin for Google Kubernetes Engine (GKE).

In [None]:
%%bash

curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
apt-get update && apt-get install apt-transport-https ca-certificates gnupg
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg
echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
apt-get update && sudo apt-get install google-cloud-cli-gke-gcloud-auth-plugin

Install Elasticsearch python client and fastembed libraries:

In [None]:
! pip install elasticsearch fastembed python-dotenv

Replace \<CLUSTER_NAME> with your cluster name, e.g. elasticsearch-cluster. Retrieve the GKE cluster's credentials using the gcloud command.

In [None]:
%%bash

export KUBERNETES_CLUSTER_NAME= <CLUSTER_NAME>
gcloud container clusters get-credentials $KUBERNETES_CLUSTER_NAME --region $GOOGLE_CLOUD_REGION

Download the dataset from Git.

In [67]:
%%bash

export DATASET_PATH=export DATASET_PATH=https://raw.githubusercontent.com/GoogleCloudPlatform/kubernetes-engine-samples/main/databases/qdrant/manifests/04-notebook/dataset.csv
curl -s -LO $DATASET_PATH

Please run the next command and check if Elastic internal load balancer achieved an IP address. If you see ip address in the output proceed to the next step if blanc please repeat the command after a few minutes or check the status of elastic-ilb service from your console, proceed to the next step only when IP address appears.

In [None]:
%%bash
kubectl get svc elastic-ilb -n elastic --output jsonpath="{.status.loadBalancer.ingress[0].ip}"

Create an .env file with environment variables required for connecting to Elastic in a Kubernetes cluster.

In [4]:
%%bash
echo ELASTIC_ENDPOINT="https://$(kubectl get svc elastic-ilb -n elastic --output jsonpath="{.status.loadBalancer.ingress[0].ip}"):9200" > .env
echo USER=$(kubectl get secret elasticsearch-ha-es-elastic-user -n elastic --template='{{index  .data "elastic"}}'| base64 -d) > .env

Import required python libraries:

In [69]:
from dotenv import load_dotenv
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import os
import csv
from fastembed import TextEmbedding
from typing import List
import numpy as np

Load and prepare data from a CSV file for inserting it into an Elasticsearch index:

In [70]:
books = [*csv.DictReader(open('/content/dataset.csv'))]
descriptions = [doc["description"] for doc in books]
embedding_model = TextEmbedding(model_name="BAAI/bge-small-en")
embeddings: List[np.ndarray] = list(embedding_model.embed(descriptions))

Establish a connection to the Elasticsearch cluster:

In [None]:
load_dotenv()
print([os.getenv("ELASTIC_ENDPOINT")])
client = Elasticsearch([os.getenv("ELASTIC_ENDPOINT")], verify_certs=False,
    basic_auth=("elastic",
    os.getenv("USER"))
)

Create an Elasticsearch index with defined schema:

In [None]:
index_scheme = {
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "dynamic": "true",
        "_source": {
        "enabled": "true"
        },
        "properties": {
        "title": {
            "type": "text"
        },
        "author": {
            "type": "text"
        },
        "publishDate": {
            "type": "text"
        },
        "description": {
            "type": "text"
        },
        "description_vector": {
            "type": "dense_vector",
            "dims": 384
        }
        }
    }
}
client.options(ignore_status=[400,404]).indices.delete(index='books')
client.indices.create(index="books", body=index_scheme)


Prepare data for uploading:

In [73]:
documents: list[dict[str, any]] = []

for i, doc in enumerate(books):
    book = doc
    book["_op_type"] = "index"
    book["_index"] = "books"
    book["description_vector"] = embeddings[i]
    documents.append(book)

Upload data:

In [None]:
bulk(client, documents)

Define a function to query data from Elasticsearch.

It prints each result separated by a line of dashes, in the following format :

- Title: Title of the book, Author: Author of the book, Score: Elasticsearch relevancy score
- Description of the book

In [76]:
def handle_query(query, limit):
    query_vector = list(embedding_model.embed([query]))[0]
    script_query = {
        "script_score": {
            "query": {"match_all": {}},
            "script": {
                "source": "cosineSimilarity(params.query_vector, 'description_vector') + 1.0",
                "params": {"query_vector": query_vector}
            }
        }
    }
    response = client.search(
        index="books",
        body={
            "size": limit,
            "query": script_query,
            "_source": {"includes": ["description", "title", "author", "body"]}
        }
    )
    for hit in response["hits"]["hits"]:
        print("Title: {}, Author: {}, score: {}".format(hit["_source"]["title"], hit["_source"]["author"], hit["_score"]))
        print(hit["_source"]["description"])
        print("---------")

Query the Elasticsearch database. It runs a search query about `drama about people and unhappy love` and displays results.

In [None]:
handle_query("drama about people and unhappy love", 2)