LLM RAG with PyFlink using Llama 3

Sup. LLMs and AIOPs are all the RAGe right now. Let’s make a cool application, start to finish, and lets make it fun, not some document retrieval for knowledge base articles on how to submit a request for access to that new database at your company.

If you ask Llama 3.1 to write PyFlink code out of the box it will be of previous versions. We’re going to make a live real time chat bot that gives users the version of PyFlink code they’re using.

A typical query for PyFlink may be: Give me code to make an Async I/O to query a database. Every LLM will hallucinate and give you nonexistent PyFlink code that uses the Java/Scala’s Async I/O process function. Let’s

In order to augment your LLM you’ll first need a way to retrieve relevant documents for your user’s query, we will use Postgres pgvector. There’s an entire art to making your vector database but in general you want to make chunks of larger articles so that the embeddings match.

texttext_chunkembedding
document about Kafka streams for PyFlink 1.17.2…Kafka source provides a builder class…[0.1, 0.2, 0.3, …, 0.4]
ross ellison’s article on Async I/O in PyFlink…no Async I/O | Apache Flink for PyFlink…[0.5, 0.6, 0.7, …, 0.8]
Kafka Topic: original_query
{"user_input": "Give me code to make an Async I/O to query a database."}

The PyFlink job will listen to the Kafka Topic original_query. When we get the “user_input” we will need to embed it using the same embedding method we used for pgvector.

import spacy
import psycopg2
from pyflink.datastream.functions import ProcessFunction, RuntimeContext

class AIOps(ProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.nlp = spacy.load('en_core_web_lg')

    def process_element(self, value, ctx: ProcessFunction.Context):
        text = value['user_input']
        embedding = self.nlp(text).vector
        related_docs = self.get_related_docs(embedding)
        return related_docs

    def get_related_docs(self, embedding):
        conn = psycopg2.connect(your conn details)
        cursor = conn.cursor()
        cursor.execute("""
            SELECT DISTINCT text
            FROM pyflink_1.17.2
            ORDER BY embedding <-> %s
            LIMIT 3;
        """, (embedding,))
        related_docs = cursor.fetchall()
        cursor.close()
        conn.close()
        return [doc[0] for doc in related_docs]

result_stream = kafka_stream.process(AIOps())

We embed the user_input and query the database to find the top 3 most relevant documents. If we can attach these to the query the LLM will be much less likely to hallucinate. Now it’s time to do the Gen AI stuff. Make sure to have at least 16GB of VRAM, the model we use here is the best bang for your buck. Let’s load our LLM into the open function:

import spacy
import psycopg2
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch

class AIOps(ProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.nlp = spacy.load('en_core_web_lg')
        
        # Load LLaMA model and tokenizer
        model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            device_map="auto",
            quantization_config=bnb_config
        )
        self.terminators = [
            self.tokenizer.eos_token_id,
            self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
        ]
        self.SYS_PROMPT = """You are an assistant for answering questions.
        You are given the extracted parts of a long document and a question. Provide a conversational answer.
        If you don't know the answer, just say "I do not know." Don't make up an answer."""

We need two more functions, make the prompt, and then generate the response:

    def format_prompt(self, prompt, retrieved_documents, k):
        """Using the retrieved documents we will prompt the model to generate our responses"""
        PROMPT = f"Question: {prompt}\nContext:\n"
        for idx in range(k):
            PROMPT += f"{retrieved_documents[idx]}\n"
        PROMPT += "Please refer to these documents."
        return PROMPT
def generate(self, formatted_prompt):
    # 1. Truncate the Prompt
    formatted_prompt = formatted_prompt[:2000]  # to avoid GPU OOM
    
    # 2. Prepare Messages
    messages = [{"role": "system", "content": self.SYS_PROMPT}, {"role": "user", "content": formatted_prompt}]
    
    # 3. Tokenize the Input
    input_ids = self.tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to(self.model.device)
    
    # 4. Generate the Response
    outputs = self.model.generate(
        input_ids,
        max_new_tokens=1024,
        eos_token_id=self.terminators,
        do_sample=True,
        temperature=0.6,
        top_p=0.9,
    )
    
    # 5. Extract and Decode the Response
    response = outputs[0][input_ids.shape[-1]:]
    return self.tokenizer.decode(response, skip_special_tokens=True)

There you have it, your very own python, scalable, LLM RAG application. There’s a lot of freedom you’ll have thanks to python and how you build your vector database, most of which will require domain specific knowledge, but these tools are very versatile and you will find adding relevant context to outdated LLMs to be an easy way to generate useful code/answers.

Here’s the full code:

import spacy
import psycopg2
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch

class AIOps(ProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.nlp = spacy.load('en_core_web_lg')
        
        # Load LLaMA model and tokenizer
        model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            device_map="auto",
            quantization_config=bnb_config
        )
        self.terminators = [
            self.tokenizer.eos_token_id,
            self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
        ]
        self.SYS_PROMPT = """You are an assistant for answering questions.
        You are given the extracted parts of a long document and a question. Provide a conversational answer.
        If you don't know the answer, just say "I do not know." Don't make up an answer."""

    def process_element(self, value, ctx: ProcessFunction.Context):
        text = value['user_input']
        embedding = self.nlp(text).vector
        related_docs = self.get_related_docs(embedding)
        
        # Format the prompt with the retrieved documents
        formatted_prompt = self.format_prompt(text, related_docs, len(related_docs))
        
        # Generate response using LLaMA model
        response = self.generate(formatted_prompt)
        
        return response

    def get_related_docs(self, embedding):
        conn = psycopg2.connect(
            dbname="your_dbname",
            user="your_username",
            password="your_password",
            host="your_host",
            port="your_port"
        )
        cursor = conn.cursor()
        cursor.execute("""
            SELECT DISTINCT text
            FROM pyflink_1.17.2
            ORDER BY embedding <-> %s
            LIMIT 3;
        """, (embedding,))
        related_docs = cursor.fetchall()
        cursor.close()
        conn.close()
        return [doc[0] for doc in related_docs]

    def format_prompt(self, prompt, retrieved_documents, k):
        """Using the retrieved documents we will prompt the model to generate our responses"""
        PROMPT = f"Question: {prompt}\nContext:\n"
        for idx in range(k):
            PROMPT += f"{retrieved_documents[idx]}\n"
        PROMPT += "Please refer to these documents."
        return PROMPT

    def generate(self, formatted_prompt):
        formatted_prompt = formatted_prompt[:2000]  # to avoid GPU OOM
        messages = [{"role": "system", "content": self.SYS_PROMPT}, {"role": "user", "content": formatted_prompt}]
        # Tell the model to generate
        input_ids = self.tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(self.model.device)
        outputs = self.model.generate(
            input_ids,
            max_new_tokens=1024,
            eos_token_id=self.terminators,
            do_sample=True,
            temperature=0.6,
            top_p=0.9,
        )
        response = outputs[0][input_ids.shape[-1]:]
        return self.tokenizer.decode(response, skip_special_tokens=True)

result_stream = kafka_stream.process(AIOps())