Sup. LLMs and AIOPs are all the RAGe right now. Let’s make a cool application, start to finish, and lets make it fun, not some document retrieval for knowledge base articles on how to submit a request for access to that new database at your company.
If you ask Llama 3.1 to write PyFlink code out of the box it will be of previous versions. We’re going to make a live real time chat bot that gives users the version of PyFlink code they’re using.
A typical query for PyFlink may be: Give me code to make an Async I/O to query a database. Every LLM will hallucinate and give you nonexistent PyFlink code that uses the Java/Scala’s Async I/O process function. Let’s
In order to augment your LLM you’ll first need a way to retrieve relevant documents for your user’s query, we will use Postgres pgvector. There’s an entire art to making your vector database but in general you want to make chunks of larger articles so that the embeddings match.
text | text_chunk | embedding |
document about Kafka streams for PyFlink 1.17.2 | …Kafka source provides a builder class… | [0.1, 0.2, 0.3, …, 0.4] |
ross ellison’s article on Async I/O in PyFlink | …no Async I/O | Apache Flink for PyFlink… | [0.5, 0.6, 0.7, …, 0.8] |
Kafka Topic: original_query
{"user_input": "Give me code to make an Async I/O to query a database."}
The PyFlink job will listen to the Kafka Topic original_query. When we get the “user_input” we will need to embed it using the same embedding method we used for pgvector.
import spacy
import psycopg2
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
class AIOps(ProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.nlp = spacy.load('en_core_web_lg')
def process_element(self, value, ctx: ProcessFunction.Context):
text = value['user_input']
embedding = self.nlp(text).vector
related_docs = self.get_related_docs(embedding)
return related_docs
def get_related_docs(self, embedding):
conn = psycopg2.connect(your conn details)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT text
FROM pyflink_1.17.2
ORDER BY embedding <-> %s
LIMIT 3;
""", (embedding,))
related_docs = cursor.fetchall()
cursor.close()
conn.close()
return [doc[0] for doc in related_docs]
result_stream = kafka_stream.process(AIOps())
We embed the user_input and query the database to find the top 3 most relevant documents. If we can attach these to the query the LLM will be much less likely to hallucinate. Now it’s time to do the Gen AI stuff. Make sure to have at least 16GB of VRAM, the model we use here is the best bang for your buck. Let’s load our LLM into the open function:
import spacy
import psycopg2
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch
class AIOps(ProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.nlp = spacy.load('en_core_web_lg')
# Load LLaMA model and tokenizer
model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
bnb_config = BitsAndBytesConfig(
load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
)
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.bfloat16,
device_map="auto",
quantization_config=bnb_config
)
self.terminators = [
self.tokenizer.eos_token_id,
self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
self.SYS_PROMPT = """You are an assistant for answering questions.
You are given the extracted parts of a long document and a question. Provide a conversational answer.
If you don't know the answer, just say "I do not know." Don't make up an answer."""
We need two more functions, make the prompt, and then generate the response:
def format_prompt(self, prompt, retrieved_documents, k):
"""Using the retrieved documents we will prompt the model to generate our responses"""
PROMPT = f"Question: {prompt}\nContext:\n"
for idx in range(k):
PROMPT += f"{retrieved_documents[idx]}\n"
PROMPT += "Please refer to these documents."
return PROMPT
def generate(self, formatted_prompt):
# 1. Truncate the Prompt
formatted_prompt = formatted_prompt[:2000] # to avoid GPU OOM
# 2. Prepare Messages
messages = [{"role": "system", "content": self.SYS_PROMPT}, {"role": "user", "content": formatted_prompt}]
# 3. Tokenize the Input
input_ids = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt"
).to(self.model.device)
# 4. Generate the Response
outputs = self.model.generate(
input_ids,
max_new_tokens=1024,
eos_token_id=self.terminators,
do_sample=True,
temperature=0.6,
top_p=0.9,
)
# 5. Extract and Decode the Response
response = outputs[0][input_ids.shape[-1]:]
return self.tokenizer.decode(response, skip_special_tokens=True)
There you have it, your very own python, scalable, LLM RAG application. There’s a lot of freedom you’ll have thanks to python and how you build your vector database, most of which will require domain specific knowledge, but these tools are very versatile and you will find adding relevant context to outdated LLMs to be an easy way to generate useful code/answers.
Here’s the full code:
import spacy
import psycopg2
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch
class AIOps(ProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.nlp = spacy.load('en_core_web_lg')
# Load LLaMA model and tokenizer
model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
bnb_config = BitsAndBytesConfig(
load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
)
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.bfloat16,
device_map="auto",
quantization_config=bnb_config
)
self.terminators = [
self.tokenizer.eos_token_id,
self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
self.SYS_PROMPT = """You are an assistant for answering questions.
You are given the extracted parts of a long document and a question. Provide a conversational answer.
If you don't know the answer, just say "I do not know." Don't make up an answer."""
def process_element(self, value, ctx: ProcessFunction.Context):
text = value['user_input']
embedding = self.nlp(text).vector
related_docs = self.get_related_docs(embedding)
# Format the prompt with the retrieved documents
formatted_prompt = self.format_prompt(text, related_docs, len(related_docs))
# Generate response using LLaMA model
response = self.generate(formatted_prompt)
return response
def get_related_docs(self, embedding):
conn = psycopg2.connect(
dbname="your_dbname",
user="your_username",
password="your_password",
host="your_host",
port="your_port"
)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT text
FROM pyflink_1.17.2
ORDER BY embedding <-> %s
LIMIT 3;
""", (embedding,))
related_docs = cursor.fetchall()
cursor.close()
conn.close()
return [doc[0] for doc in related_docs]
def format_prompt(self, prompt, retrieved_documents, k):
"""Using the retrieved documents we will prompt the model to generate our responses"""
PROMPT = f"Question: {prompt}\nContext:\n"
for idx in range(k):
PROMPT += f"{retrieved_documents[idx]}\n"
PROMPT += "Please refer to these documents."
return PROMPT
def generate(self, formatted_prompt):
formatted_prompt = formatted_prompt[:2000] # to avoid GPU OOM
messages = [{"role": "system", "content": self.SYS_PROMPT}, {"role": "user", "content": formatted_prompt}]
# Tell the model to generate
input_ids = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt"
).to(self.model.device)
outputs = self.model.generate(
input_ids,
max_new_tokens=1024,
eos_token_id=self.terminators,
do_sample=True,
temperature=0.6,
top_p=0.9,
)
response = outputs[0][input_ids.shape[-1]:]
return self.tokenizer.decode(response, skip_special_tokens=True)
result_stream = kafka_stream.process(AIOps())