 
                  Scaling NLP indexing pipelines with KEDA and Haystack β Part 2: The Deployment
Enabling retrieval-augmented generative question answering with LLMs
May 1, 2023In the first part of this article series, we discussed the power of retrieval-augmented generation. We also explored how to create a Python application that converts files into searchable documents with embeddings via Haystack pipelines. However, merely having a Python program that converts files into text snippets and embeddings on a single machine is not enough for a production-ready deployment.
In this part, we will explore how to deploy an indexing consumer to Kubernetes and how to autoscale it using KEDA. This will allow us to efficiently add text and embeddings to our vector database that can power a retrieval augmented LLM search engine like this.
We will use the architecture described in the first part, which involves queuing files to be indexed on AWS SQS and consuming them in parallel using Haystack pipelines. Letβs start deploying and scaling!
Deployment on Kubernetes and scaling with KEDA
In this section, we will learn how to set up KEDA on Kubernetes and configure autoscaling to scale our consumers based on the number of pending files in Kubernetes. We will use the following tools:
- k3d for creating a local Kubernetes cluster to deploy our consumers
- KEDA for scaling the consumers after deployment
- localstack as a local AWS cloud stack, for testing our application
To communicate with our local Kubernetes cluster, we will use kubectl.
β οΈ For certain types of indexing pipelines, GPUs are required. This is particularly true when embeddings are generated on the same machine (rather than through an external inference service) or when a model is used within the pipeline. In such cases, it is necessary to have GPU pods within the Kubernetes cluster to run the model.
Installation and setup
We will need to set up a local Kubernetes cluster and deploy a list of services before we can start deploying and scaling our application.
Create a local Kubernetes cluster
To begin, create a new Kubernetes cluster named  haystack-keda-cluster  using k3d.
k3d cluster create haystack-keda-cluster  
# check the the status via: kubectl cluster-info
Next, we will create a namespace called  indexing  that we will use to deploy our services.
kubectl create namespace indexing
Install services β KEDA and LocalStack
To set up LocalStack, add the helm chart and install LocalStack in the  indexing  namespace:
helm repo add localstack https://localstack.github.io/helm-charts  
helm install localstack localstack/localstack --namespace indexing
We will repeat the same steps with KEDA.
helm repo add kedacore https://kedacore.github.io/charts  
helm install keda kedacore/keda --namespace indexing
We can validate the setup by running  indexing kubectl get pods -n indexing.
     
Create an SQS queue and an S3 bucket
Our indexing consumers will connect to queues on LocalStack and download files from S3. Therefore, we need to create the necessary resources before deploying our application.
To create a queue and a bucket, we will use  
the same shell script  that we used in our development environment with Docker Compose. To run the script from within the container, pipe the script into the  kubectl exec  command:
cat ./scripts/sqs_bucket_bootstrap.sh | kubectl exec -i -n indexing deployment/localstack -- /bin/bash
If we fetch the logs via  kubectl logs -f deployment/localstack -c localstack -n indexing, we should see that a queue and a bucket were created.
2023-04-22T15:19:34.166  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS sqs.CreateQueue => 200  
2023-04-22T15:19:34.533  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS s3.CreateBucket => 200
Deploying the indexing consumer
Now that we have LocalStack and KEDA deployed to our Kubernetes cluster, we can start deploying our indexing consumer. The indexing consumers are deployed as Kubernetes deployments by using a deployment file  deployment-consumer.yaml.
# deployment-consumer.yaml  
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/tree/main/kubernetes>  
  
kind: Deployment  
apiVersion: apps/v1  
metadata:  
  name: indexing-consumer  
  labels:  
    k8s-app: indexing-consumer  
spec:  
  # we want to start with 0 replicas and scale up on demand  
  replicas: 0  
  selector:  
    matchLabels:  
      k8s-app: indexing-consumer  
  template:  
    metadata:  
      name: indexing-consumer  
      labels:  
        k8s-app: indexing-consumer  
    spec:  
      containers:  
        - name: indexing-consumer  
          command: ["python3", "consumer.py"]  
          # public consumer image  
          image: arzelaascoli/keda-haystack-consumer:latest  
          env:  
            # localstack configuration  
            - name: AWS_ENDPOINT  
              value: http://localstack:4566  
            - name: AWS_REGION  
              value: eu-central-1  
            - name: AWS_ACCESS_KEY_ID  
              value: test  
            - name: AWS_SECRET_ACCESS_KEY  
              value: test  
          # Resource estimations  
          # TODO: adjust these to our needs and the load we expect  
          resources:  
            requests:  
              memory: 1000Mi  
              cpu: 750m  
            limits:  
              memory: 2500Mi  
              cpu: 2000m
We can apply this YAML to our namespace indexing with kubectl:
kubectl apply -f ./kubernetes/deployment-consumer.yaml --namespace indexing
To validate that consumers can successfully start and connect to the queue, we can scale up the replicas to 1, and check the running pods.
# scale deployment   
kubectl scale deployment indexing-consumer --namespace=indexing --replicas=1  
# get pods   
kubectl get pods -n indexing  
# check logs   
kubectl logs -f deployment/indexing-consumer -c indexing-consumer -n indexing
     
The system will log that no files were found to be processed:
β 2023-04-23 15:43:14 [info     ] No files to process                                                                                                       β  
β 2023-04-23 15:43:19 [info     ] No files to process                                                                                                       β  
β 2023-04-23 15:43:24 [info     ] No files to process                                                                                                       β  
β 2023-04-23 15:43:29 [info     ] No files to process
Next, we will set up autoscaling based on the length of the SQS queue to enable autoscaling and scaling to zero if no files are pending.
Configure autoscaling based on queue length
After successfully creating all the required services to index files, we can now configure KEDA to check the queue length and scale the indexing consumers accordingly.
To set up a KEDA trigger on SQS, we need to configure authentication by creating a Kubernetes secrets object.
# secrets-localstack.yaml  
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/secrets-localstack.yaml>  
  
apiVersion: v1  
kind: Secret  
metadata:  
  name: aws-secrets  
  namespace: indexing  
data:  
  AWS_ACCESS_KEY_ID: dGVzdA== # base64 encoded string for "test"  
  AWS_SECRET_ACCESS_KEY: dGVzdA== # base64 encoded string for "test"
This secret is then mapped via a TriggerAuthentication object to KEDA, which will use credential based authentication.
# trigger-authentication.yaml  
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/trigger-authentication.yaml>  
  
apiVersion: keda.sh/v1alpha1  
kind: TriggerAuthentication  
metadata:  
  name: keda-trigger-auth-aws-credentials  
  namespace: indexing  
spec:  
  secretTargetRef:  
    - parameter: awsAccessKeyID # Required.  
      name: aws-secrets # Required.  
      key: AWS_ACCESS_KEY_ID # Required.  
    - parameter: awsSecretAccessKey # Required.  
      name: aws-secrets # Required.  
      key: AWS_SECRET_ACCESS_KEY # Required.
The operator is now allowed to access LocalStacks resources, and we can create a scaled object with a aws-sqs-queue  
trigger.
apiVersion: keda.sh/v1alpha1  
kind: ScaledObject  
metadata:  
  name: indexing-consumer-scaled-object  
spec:  
  scaleTargetRef:  
    apiVersion: apps/v1  
    kind: Deployment  
    name: indexing-consumer # Mandatory. Must be in the same namespace as the ScaledObject  
  pollingInterval: 30  
  cooldownPeriod: 300  
  idleReplicaCount: 0  
  minReplicaCount: 0  
  maxReplicaCount: 2  
  fallback: # defines a number of replicas to fallback to if a scaler is in an error state.  
    failureThreshold: 3  
    replicas: 4  
  triggers:  
    - type: aws-sqs-queue  
      authenticationRef:  
        name: keda-trigger-auth-aws-credentials  
      metadata:  
        # KEDA will use the value of the environment variable of the `consumer-file-ingestion` containers  
        queueURL: http://localhost:4566/000000000000/test-queue  
        queueLength: "10" # Should roughly equal the number of messages that can be processed in 1 minute  
        awsRegion: "eu-central-1"  
        awsEndpoint: "http://localstack:4566"  
        scaleOnInFlight: "false" # Exclude in-flight messages from the queue length calculation
After applying these three YAMLs via  kubectl apply --f ./kubernetes/keda --namespace indexing, we can forward the port to allow uploading files to LocalStack.
 kubectl port-forward deployment/localstack 4566:4566 -n indexing
We can use an  
upload script  to add a file  test.txt  by running  python3 upload.py.
# upload.py   
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/upload.py>  
aws_service = AWSService(SQS_QUEUE, S3_BUCKET, LOCAL_DOWNLOAD_DIR)  
aws_service.upload_file(Path("./data/test.txt"))
Once the file is successfully uploaded and queued, KEDA will take care of scaling the deployment from 0 to 1 replica. Kubernetes will list an indexing-consumer pod.
NAME                                               READY   STATUS                
localstack-8fc647d9d-xkrsk                         1/1     Running               
keda-operator-metrics-apiserver-7bcfdd7c9b-7pbkp   1/1     Running              
keda-operator-6857fbc758-xtc44                     1/1     Running               
keda-admission-webhooks-59978445df-q85jr           1/1     Running              
indexing-consumer-656d98db6f-psz6q                 0/1     ContainerCreating 
After startup, the files will be fetched and removed from the queue, and indexed.
Conclusion and next steps
This article explains how to create a scalable application to convert text and PDF files into documents containing text and embeddings. KEDA allows for the on-demand scaling of each application. With this simple architecture, we are able to horizontally scale the creation of embeddings.
How was KEDA useful? β KEDA enabled us to scale consumers based on queue length. An alternative solution involves using horizontal pod autoscaling based on CPU usage, which would be triggered once elements are fetched from the queue. However, this approach does not allow for scaling down to zero. Since these tasks require GPUs, one idle machine that is constantly running can be expensive.
How do I deploy this without k3d? β This tutorial is applicable to any Kubernetes cluster. Simply follow the instructions provided.
What resources do I need? β When running this in production, GPU nodes are necessary for the cluster, which may require additional configuration.
Is there a simple way to deploy multiple pipelines? β In one of the next articles, I will share an article on how to use the Kubernetes Operator Framework ( Kopf), written in Python, to dynamically create these resources.
