Skip to content

LLM Integration Guide

Best practices for integrating Aegis with LLM applications.


Overview

Aegis protects LLM applications by:

  1. Checking user input before sending to the model
  2. Checking model output before returning to users
  3. Protecting RAG context from data leakage
  4. Monitoring agent tool calls for sensitive data

Architecture Patterns

Pattern 1: Input Protection Only

Simplest pattern - check user input before LLM:

from aegis_sdk import Aegis, Decision

aegis = Aegis(license_key="...")

def chat(user_message: str) -> str:
    # Check user input
    result = aegis.check(user_message, destination="AI_TOOL")

    if result.decision == Decision.BLOCKED:
        return "I can't process that request due to sensitive data."

    # Use masked content if needed
    safe_input = result.content

    # Call LLM
    response = llm.generate(safe_input)
    return response
sequenceDiagram
    User->>App: Message with PII
    App->>Aegis: check(message)
    Aegis-->>App: MASKED
    App->>LLM: Masked message
    LLM-->>App: Response
    App-->>User: Response

Pattern 2: Full Pipeline Protection

Check both input and output:

def protected_chat(user_message: str) -> str:
    # Check input
    input_result = aegis.check(user_message, destination="AI_TOOL")

    if input_result.decision == Decision.BLOCKED:
        return handle_blocked_input(input_result)

    # Call LLM with safe input
    llm_response = llm.generate(input_result.content)

    # Check output before returning to user
    output_result = aegis.check(llm_response, destination="CUSTOMER")

    if output_result.decision == Decision.BLOCKED:
        return "The response contained sensitive information."

    return output_result.content

Pattern 3: RAG Protection

Protect retrieved documents in RAG pipelines:

def protected_rag(query: str) -> str:
    # 1. Check query
    query_result = aegis.check(query, destination="AI_TOOL")
    if query_result.decision == Decision.BLOCKED:
        return "Query contains sensitive data"

    # 2. Retrieve documents
    docs = vectorstore.similarity_search(query_result.content)

    # 3. Check each document
    safe_context = []
    for doc in docs:
        doc_result = aegis.check(doc.content, destination="AI_TOOL")
        if doc_result.decision != Decision.BLOCKED:
            safe_context.append(doc_result.content)

    # 4. Generate with safe context
    prompt = f"Context: {safe_context}\n\nQuestion: {query_result.content}"
    return llm.generate(prompt)

Framework Integrations

LangChain

Callback Handler:

from langchain_openai import ChatOpenAI
from aegis_sdk.integrations.langchain import AegisCallbackHandler

handler = AegisCallbackHandler(
    license_key="aegis_lic_xxx",
    destination="AI_TOOL",
    on_block="raise"  # or "mask", "skip"
)

llm = ChatOpenAI(callbacks=[handler])

# All messages are automatically checked
response = llm.invoke("User message here")

Chain Integration:

from langchain.chains import ConversationChain

chain = ConversationChain(
    llm=ChatOpenAI(callbacks=[handler]),
    memory=ConversationBufferMemory(),
)

# Memory contents are also protected
result = chain.invoke({"input": user_message})

Custom Chain with Protection:

from langchain.chains.base import Chain
from aegis_sdk import Aegis

class ProtectedChain(Chain):
    """Chain with built-in Aegis protection."""

    aegis: Aegis = None

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.aegis = Aegis(license_key="...")

    def _call(self, inputs: dict) -> dict:
        # Check input
        result = self.aegis.check(
            inputs["query"],
            destination="AI_TOOL"
        )

        if result.decision == "BLOCKED":
            return {"output": "Blocked", "blocked": True}

        # Continue with safe content
        ...

OpenAI

Protected Client:

from aegis_sdk.integrations.openai import SafeOpenAI

client = SafeOpenAI(
    api_key="sk-xxx",
    aegis_license_key="aegis_lic_xxx"
)

# Automatic protection on all calls
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": user_input}]
)

Manual Integration:

import openai
from aegis_sdk import Aegis

aegis = Aegis(license_key="...")
client = openai.OpenAI()

def safe_completion(messages: list) -> str:
    # Check the last user message
    user_message = next(
        m["content"] for m in reversed(messages)
        if m["role"] == "user"
    )

    result = aegis.check(user_message, destination="AI_TOOL")

    if result.decision == "BLOCKED":
        raise ValueError(f"Blocked: {result.summary}")

    # Update messages with safe content
    safe_messages = messages.copy()
    for m in reversed(safe_messages):
        if m["role"] == "user":
            m["content"] = result.content
            break

    response = client.chat.completions.create(
        model="gpt-4",
        messages=safe_messages
    )

    return response.choices[0].message.content

Claude (Anthropic)

from anthropic import Anthropic
from aegis_sdk import Aegis

aegis = Aegis(license_key="...")
client = Anthropic()

def safe_claude(user_message: str) -> str:
    result = aegis.check(user_message, destination="AI_TOOL")

    if result.decision == "BLOCKED":
        return f"Cannot process: {result.summary}"

    response = client.messages.create(
        model="claude-3-opus-20240229",
        messages=[{"role": "user", "content": result.content}]
    )

    return response.content[0].text

Agent Protection

Tool Call Monitoring

Protect agent tool calls:

from langchain.agents import AgentExecutor
from aegis_sdk.integrations.langchain import AegisToolWrapper

# Wrap tools with Aegis protection
protected_tools = [
    AegisToolWrapper(tool, aegis_handler)
    for tool in original_tools
]

agent = create_openai_tools_agent(llm, protected_tools, prompt)
executor = AgentExecutor(agent=agent, tools=protected_tools)

Function Calling

Check function arguments:

def protected_function_call(function_name: str, arguments: dict) -> Any:
    """Execute function with Aegis protection on arguments."""

    # Serialize arguments for checking
    args_str = json.dumps(arguments)

    result = aegis.check(args_str, destination="AI_TOOL")

    if result.decision == "BLOCKED":
        return {"error": f"Function arguments blocked: {result.summary}"}

    # Parse safe arguments and execute
    safe_args = json.loads(result.content)
    return execute_function(function_name, safe_args)

Streaming

Handle streaming responses:

async def protected_stream(prompt: str):
    """Stream with Aegis protection."""

    # Check input
    input_result = aegis.check(prompt, destination="AI_TOOL")

    if input_result.decision == "BLOCKED":
        yield "Blocked: " + input_result.summary
        return

    # Stream response
    buffer = ""
    async for chunk in llm.astream(input_result.content):
        buffer += chunk

        # Check accumulated content periodically
        if len(buffer) > 500:
            check = aegis.check(buffer, destination="CUSTOMER")
            if check.decision == "BLOCKED":
                yield "[Content filtered]"
                return

        yield chunk

    # Final check
    final_check = aegis.check(buffer, destination="CUSTOMER")
    if final_check.decision != "ALLOWED":
        # Could log or take action
        pass

Error Handling

Graceful Degradation

from aegis_sdk import Aegis, AegisError

try:
    aegis = Aegis(license_key="...")
except AegisError:
    # Fall back to offline mode
    aegis = Aegis(offline_mode=True)

def resilient_check(content: str) -> str:
    try:
        result = aegis.check(content, destination="AI_TOOL")
        return result.content if result.decision != "BLOCKED" else ""
    except AegisError as e:
        # Log error but don't block user
        logger.warning(f"Aegis check failed: {e}")
        return content  # Or apply local rules

Timeout Handling

import asyncio
from aegis_sdk import Aegis

aegis = Aegis(license_key="...")

async def check_with_timeout(content: str, timeout: float = 1.0) -> str:
    try:
        result = await asyncio.wait_for(
            aegis.acheck(content, destination="AI_TOOL"),
            timeout=timeout
        )
        return result.content
    except asyncio.TimeoutError:
        logger.warning("Aegis check timed out")
        return content  # Proceed with caution

Performance Optimization

Caching

# Policy is cached by default (5 minutes)
aegis = Aegis(
    license_key="...",
    cache_ttl=600  # 10 minutes
)

# Detection is instant (local processing)
# No network call needed for each check

Batch Processing

async def batch_check(contents: list[str]) -> list[str]:
    """Check multiple contents efficiently."""

    tasks = [
        aegis.acheck(content, destination="AI_TOOL")
        for content in contents
    ]

    results = await asyncio.gather(*tasks)

    return [
        r.content if r.decision != "BLOCKED" else ""
        for r in results
    ]

Connection Pooling

# The SDK uses connection pooling by default
# For high-throughput, adjust pool size:
aegis = Aegis(
    license_key="...",
    max_connections=50,
    max_keepalive=10
)

Logging and Monitoring

Structured Logging

import logging
import json

logger = logging.getLogger("aegis")

def logged_check(content: str, context: dict) -> str:
    result = aegis.check(
        content,
        destination="AI_TOOL",
        context=context
    )

    # Structured log
    logger.info(json.dumps({
        "event": "aegis_check",
        "decision": result.decision,
        "detected_types": [d.type for d in result.detected],
        "context": context,
    }))

    return result.content

Metrics Collection

# Enable automatic metrics
aegis = Aegis(
    license_key="...",
    enable_metrics=True,
    metrics_interval=60  # Report every minute
)

# View metrics in dashboard

See Also