LLM Integration Guide¶
Best practices for integrating Aegis with LLM applications.
Overview¶
Aegis protects LLM applications by:
- Checking user input before sending to the model
- Checking model output before returning to users
- Protecting RAG context from data leakage
- Monitoring agent tool calls for sensitive data
Architecture Patterns¶
Pattern 1: Input Protection Only¶
Simplest pattern - check user input before LLM:
from aegis_sdk import Aegis, Decision
aegis = Aegis(license_key="...")
def chat(user_message: str) -> str:
# Check user input
result = aegis.check(user_message, destination="AI_TOOL")
if result.decision == Decision.BLOCKED:
return "I can't process that request due to sensitive data."
# Use masked content if needed
safe_input = result.content
# Call LLM
response = llm.generate(safe_input)
return response
sequenceDiagram
User->>App: Message with PII
App->>Aegis: check(message)
Aegis-->>App: MASKED
App->>LLM: Masked message
LLM-->>App: Response
App-->>User: Response
Pattern 2: Full Pipeline Protection¶
Check both input and output:
def protected_chat(user_message: str) -> str:
# Check input
input_result = aegis.check(user_message, destination="AI_TOOL")
if input_result.decision == Decision.BLOCKED:
return handle_blocked_input(input_result)
# Call LLM with safe input
llm_response = llm.generate(input_result.content)
# Check output before returning to user
output_result = aegis.check(llm_response, destination="CUSTOMER")
if output_result.decision == Decision.BLOCKED:
return "The response contained sensitive information."
return output_result.content
Pattern 3: RAG Protection¶
Protect retrieved documents in RAG pipelines:
def protected_rag(query: str) -> str:
# 1. Check query
query_result = aegis.check(query, destination="AI_TOOL")
if query_result.decision == Decision.BLOCKED:
return "Query contains sensitive data"
# 2. Retrieve documents
docs = vectorstore.similarity_search(query_result.content)
# 3. Check each document
safe_context = []
for doc in docs:
doc_result = aegis.check(doc.content, destination="AI_TOOL")
if doc_result.decision != Decision.BLOCKED:
safe_context.append(doc_result.content)
# 4. Generate with safe context
prompt = f"Context: {safe_context}\n\nQuestion: {query_result.content}"
return llm.generate(prompt)
Framework Integrations¶
LangChain¶
Callback Handler:
from langchain_openai import ChatOpenAI
from aegis_sdk.integrations.langchain import AegisCallbackHandler
handler = AegisCallbackHandler(
license_key="aegis_lic_xxx",
destination="AI_TOOL",
on_block="raise" # or "mask", "skip"
)
llm = ChatOpenAI(callbacks=[handler])
# All messages are automatically checked
response = llm.invoke("User message here")
Chain Integration:
from langchain.chains import ConversationChain
chain = ConversationChain(
llm=ChatOpenAI(callbacks=[handler]),
memory=ConversationBufferMemory(),
)
# Memory contents are also protected
result = chain.invoke({"input": user_message})
Custom Chain with Protection:
from langchain.chains.base import Chain
from aegis_sdk import Aegis
class ProtectedChain(Chain):
"""Chain with built-in Aegis protection."""
aegis: Aegis = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.aegis = Aegis(license_key="...")
def _call(self, inputs: dict) -> dict:
# Check input
result = self.aegis.check(
inputs["query"],
destination="AI_TOOL"
)
if result.decision == "BLOCKED":
return {"output": "Blocked", "blocked": True}
# Continue with safe content
...
OpenAI¶
Protected Client:
from aegis_sdk.integrations.openai import SafeOpenAI
client = SafeOpenAI(
api_key="sk-xxx",
aegis_license_key="aegis_lic_xxx"
)
# Automatic protection on all calls
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_input}]
)
Manual Integration:
import openai
from aegis_sdk import Aegis
aegis = Aegis(license_key="...")
client = openai.OpenAI()
def safe_completion(messages: list) -> str:
# Check the last user message
user_message = next(
m["content"] for m in reversed(messages)
if m["role"] == "user"
)
result = aegis.check(user_message, destination="AI_TOOL")
if result.decision == "BLOCKED":
raise ValueError(f"Blocked: {result.summary}")
# Update messages with safe content
safe_messages = messages.copy()
for m in reversed(safe_messages):
if m["role"] == "user":
m["content"] = result.content
break
response = client.chat.completions.create(
model="gpt-4",
messages=safe_messages
)
return response.choices[0].message.content
Claude (Anthropic)¶
from anthropic import Anthropic
from aegis_sdk import Aegis
aegis = Aegis(license_key="...")
client = Anthropic()
def safe_claude(user_message: str) -> str:
result = aegis.check(user_message, destination="AI_TOOL")
if result.decision == "BLOCKED":
return f"Cannot process: {result.summary}"
response = client.messages.create(
model="claude-3-opus-20240229",
messages=[{"role": "user", "content": result.content}]
)
return response.content[0].text
Agent Protection¶
Tool Call Monitoring¶
Protect agent tool calls:
from langchain.agents import AgentExecutor
from aegis_sdk.integrations.langchain import AegisToolWrapper
# Wrap tools with Aegis protection
protected_tools = [
AegisToolWrapper(tool, aegis_handler)
for tool in original_tools
]
agent = create_openai_tools_agent(llm, protected_tools, prompt)
executor = AgentExecutor(agent=agent, tools=protected_tools)
Function Calling¶
Check function arguments:
def protected_function_call(function_name: str, arguments: dict) -> Any:
"""Execute function with Aegis protection on arguments."""
# Serialize arguments for checking
args_str = json.dumps(arguments)
result = aegis.check(args_str, destination="AI_TOOL")
if result.decision == "BLOCKED":
return {"error": f"Function arguments blocked: {result.summary}"}
# Parse safe arguments and execute
safe_args = json.loads(result.content)
return execute_function(function_name, safe_args)
Streaming¶
Handle streaming responses:
async def protected_stream(prompt: str):
"""Stream with Aegis protection."""
# Check input
input_result = aegis.check(prompt, destination="AI_TOOL")
if input_result.decision == "BLOCKED":
yield "Blocked: " + input_result.summary
return
# Stream response
buffer = ""
async for chunk in llm.astream(input_result.content):
buffer += chunk
# Check accumulated content periodically
if len(buffer) > 500:
check = aegis.check(buffer, destination="CUSTOMER")
if check.decision == "BLOCKED":
yield "[Content filtered]"
return
yield chunk
# Final check
final_check = aegis.check(buffer, destination="CUSTOMER")
if final_check.decision != "ALLOWED":
# Could log or take action
pass
Error Handling¶
Graceful Degradation¶
from aegis_sdk import Aegis, AegisError
try:
aegis = Aegis(license_key="...")
except AegisError:
# Fall back to offline mode
aegis = Aegis(offline_mode=True)
def resilient_check(content: str) -> str:
try:
result = aegis.check(content, destination="AI_TOOL")
return result.content if result.decision != "BLOCKED" else ""
except AegisError as e:
# Log error but don't block user
logger.warning(f"Aegis check failed: {e}")
return content # Or apply local rules
Timeout Handling¶
import asyncio
from aegis_sdk import Aegis
aegis = Aegis(license_key="...")
async def check_with_timeout(content: str, timeout: float = 1.0) -> str:
try:
result = await asyncio.wait_for(
aegis.acheck(content, destination="AI_TOOL"),
timeout=timeout
)
return result.content
except asyncio.TimeoutError:
logger.warning("Aegis check timed out")
return content # Proceed with caution
Performance Optimization¶
Caching¶
# Policy is cached by default (5 minutes)
aegis = Aegis(
license_key="...",
cache_ttl=600 # 10 minutes
)
# Detection is instant (local processing)
# No network call needed for each check
Batch Processing¶
async def batch_check(contents: list[str]) -> list[str]:
"""Check multiple contents efficiently."""
tasks = [
aegis.acheck(content, destination="AI_TOOL")
for content in contents
]
results = await asyncio.gather(*tasks)
return [
r.content if r.decision != "BLOCKED" else ""
for r in results
]
Connection Pooling¶
# The SDK uses connection pooling by default
# For high-throughput, adjust pool size:
aegis = Aegis(
license_key="...",
max_connections=50,
max_keepalive=10
)
Logging and Monitoring¶
Structured Logging¶
import logging
import json
logger = logging.getLogger("aegis")
def logged_check(content: str, context: dict) -> str:
result = aegis.check(
content,
destination="AI_TOOL",
context=context
)
# Structured log
logger.info(json.dumps({
"event": "aegis_check",
"decision": result.decision,
"detected_types": [d.type for d in result.detected],
"context": context,
}))
return result.content
Metrics Collection¶
# Enable automatic metrics
aegis = Aegis(
license_key="...",
enable_metrics=True,
metrics_interval=60 # Report every minute
)
# View metrics in dashboard
See Also¶
- SDK Reference - Full SDK documentation
- Multi-Tenant Guide - Department policies
- GDPR Compliance - Data protection