Retrieval
The final asset will query our RAG system. Since the pipeline is designed to answer any question related to Dagster, we will make this asset configurable by including a Config
. This configuration only consists of the question we want to ask the AI system:
class AskAI(dg.Config):
question: str
In order to do similarity searches, the input question will need to be embedded in the same way as the source embeddings sent to Pinecone. Again, we will use OpenAI and the same model (text-embedding-3-small
) to turn a question into vectors. Next we can use Pinecone to search for similar vectors within the index that most closely match the input question. As we are searching within our index, we will limit the namespace to "dagster-github" and "dagster-docs" (the two sources we ingested data from). Filtering data like this shows how vector databases still support many of the same functions as traditional databases:
with openai.get_client(context) as client:
question_embedding = (
client.embeddings.create(model="text-embedding-3-small", input=config.question)
.data[0]
.embedding
)
results = []
for namespace in ["dagster-github", "dagster-docs"]:
index_obj, namespace_kwargs = pinecone.get_index("dagster-knowledge", namespace=namespace)
search_results = index_obj.query(
vector=question_embedding, top_k=3, include_metadata=True, **namespace_kwargs
)
results.extend(search_results.matches)
results.sort(key=lambda x: x.score, reverse=True)
results = results[:3]
With the relevant information retrieved from Pinecone, we can add some prompt engineering to combine that context extracted from Pinecone with the original question (in text). The full prompt is then sent to Open AI again (now using the gpt-4-turbo-preview
model) to get the final answer which is recorded as a MaterializeResult
in the Dagster Catalog:
prompt_template = """
You are a experienced data engineer and Dagster expert.
Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Context: {full_context}
Question: {question}
Answer: Let me help you with that.
"""
formatted_prompt = prompt_template.format(
full_context="\n\n".join(contexts), question=config.question
)
# Get response from OpenAI
with openai.get_client(context) as client:
response = client.chat.completions.create(
model="gpt-4-turbo-preview", messages=[{"role": "user", "content": formatted_prompt}]
)
return dg.MaterializeResult(
metadata={
"question": config.question,
"answer": response.choices[0].message.content,
"sources": sources,
}
)
Getting answers
To ask a question from the pipeline, you can materialize the query
asset and supply your question in the run configuration. The answer will be context-aware and grounded in the sources used to populate Pinecone:
ops:
query:
config:
question: What is Dagster?
We can ensure that questions like this will be trained on the most up-to-date information about Dagster. We can also ask questions confined to certain time ranges, like "Tell me about the issues Dagster users have faced in the past 6 months?". This will summarize the results information across all our sources.