Input Handling Patterns
Learn various patterns for handling user input safely and
effectively in Praison Labs applications, from simple CLI inputs
to complex web UIs with validation and approval systems.
Overview
Proper input handling is crucial for building robust AI
applications. This guide covers:
- Basic input collection methods
- Validation and sanitisation
- Safety checks and approval systems
- UI-based input handling
- State-based validation
from praisonaiagents import Agent
agent = Agent(
name="Assistant",
instructions="Help users with their queries"
)
# Basic input
user_input = input("What would you like help with? ")
response = agent.chat(user_input)
print(response)
Environment Variable Fallback
import os
# Try environment variable first, fall back to prompt
api_key = os.getenv("API_KEY") or input("Enter API key: ")
agent = Agent(
name="SecureAgent",
instructions="Process secure requests",
llm_config={"api_key": api_key}
)
class InputCollector:
def __init__(self):
self.inputs = {}
def collect(self, fields):
for field in fields:
prompt = field.get("prompt", f"Enter {field['name']}: ")
validator = field.get("validator", lambda x: True)
while True:
value = input(prompt)
if validator(value):
self.inputs[field["name"]] = value
break
print("Invalid input, please try again.")
return self.inputs
# Usage
collector = InputCollector()
user_data = collector.collect([
{
"name": "email",
"prompt": "Enter email: ",
"validator": lambda x: "@" in x
},
{
"name": "age",
"prompt": "Enter age: ",
"validator": lambda x: x.isdigit() and 0 < int(x) < 150
}
])
Validation Patterns
Structured Validation with Pydantic
from pydantic import BaseModel, validator, Field
from typing import Optional
import re
class UserInput(BaseModel):
email: str = Field(..., description="User email address")
age: int = Field(..., gt=0, lt=150)
query: str = Field(..., min_length=1, max_length=1000)
@validator("email")
def validate_email(cls, v):
if not re.match(r"[^@]+@[^@]+\.[^@]+", v):
raise ValueError("Invalid email format")
return v
@validator("query")
def sanitise_query(cls, v):
# Remove potentially harmful characters
return re.sub(r"[<>\"']", "", v)
# Use with agent
def process_request(raw_input: dict):
try:
# Validate and sanitise
validated = UserInput(**raw_input)
agent = Agent(
name="ValidatedAgent",
instructions="Process validated requests"
)
return agent.chat(validated.query)
except ValueError as e:
return f"Validation error: {e}"
Workflow-based Validation
from praisonaiagents import Agent, Task, Workflow
# Validation agent
validator = Agent(
name="Validator",
instructions="Validate user inputs and check for security issues"
)
# Processing agent
processor = Agent(
name="Processor",
instructions="Process validated requests"
)
# Create validation workflow
validation_task = Task(
description="Validate user input: {input}",
agent=validator,
expected_output="Validation result with any issues found"
)
processing_task = Task(
description="Process validated input",
agent=processor,
condition=lambda results: "valid" in results.get("validation_task", "")
)
workflow = Workflow(tasks=[validation_task, processing_task])
Safety and Approval Systems
Human Approval System
from praisonaiagents import Agent, approval
# Define risk levels
def classify_risk(action):
if any(word in action.lower() for word in ["delete", "remove", "destroy"]):
return "high"
elif any(word in action.lower() for word in ["modify", "update", "change"]):
return "medium"
return "low"
# Custom approval callback
def approval_callback(action, risk_level):
if risk_level == "high":
print(f"\n⚠️ HIGH RISK ACTION: {action}")
response = input("Type 'CONFIRM' to proceed: ")
return response == "CONFIRM"
elif risk_level == "medium":
response = input(f"\nConfirm action: {action} (y/n): ")
return response.lower() == "y"
return True # Auto-approve low risk
# Agent with approval system
agent = Agent(
name="SafeAgent",
instructions="Execute user commands safely",
tools=[file_operations, database_operations],
approval_callback=approval_callback,
risk_classifier=classify_risk
)
from praisonaiagents import Tool
# High-risk tool with mandatory approval
delete_tool = Tool(
name="delete_file",
description="Delete a file from the system",
function=delete_file_function,
requires_approval=True,
approval_prompt="This will permanently delete {filename}. Proceed?"
)
# Conditional approval based on parameters
def conditional_approval(params):
if params.get("force", False):
return input("Force delete requested. Are you sure? (yes/no): ") == "yes"
return True
modify_tool = Tool(
name="modify_config",
description="Modify system configuration",
function=modify_config_function,
approval_condition=conditional_approval
)
import streamlit as st
from praisonaiagents import Agent
# Input validation in Streamlit
def validate_streamlit_inputs():
with st.form("user_input"):
# Text input with validation
query = st.text_input(
"Enter your query",
max_chars=500,
help="Maximum 500 characters"
)
# Numeric input with constraints
temperature = st.slider(
"Response creativity",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.1
)
# Option selection
mode = st.selectbox(
"Processing mode",
["Safe", "Balanced", "Creative"],
index=1
)
submitted = st.form_submit_button("Submit")
if submitted:
if len(query.strip()) == 0:
st.error("Query cannot be empty")
return None
return {
"query": query.strip(),
"temperature": temperature,
"mode": mode
}
# Use validated input
if user_input := validate_streamlit_inputs():
agent = Agent(
name="UIAgent",
llm_config={"temperature": user_input["temperature"]}
)
with st.spinner("Processing..."):
response = agent.chat(user_input["query"])
st.success(response)
import gradio as gr
from praisonaiagents import Agent
def create_gradio_interface():
agent = Agent(name="GradioAgent")
def process_with_validation(text, temperature, max_length):
# Validate inputs
if not text or len(text.strip()) == 0:
return "Error: Input cannot be empty"
if len(text) > max_length:
return f"Error: Input exceeds {max_length} characters"
# Process with agent
try:
response = agent.chat(
text,
llm_config={"temperature": temperature}
)
return response
except Exception as e:
return f"Error processing request: {str(e)}"
interface = gr.Interface(
fn=process_with_validation,
inputs=[
gr.Textbox(
label="Your Query",
placeholder="Enter your question here",
lines=3
),
gr.Slider(
minimum=0,
maximum=1,
value=0.7,
label="Temperature"
),
gr.Number(
value=1000,
label="Max Input Length",
precision=0
)
],
outputs=gr.Textbox(label="Response"),
title="PraisonLabs Assistant",
description="Ask questions with input validation"
)
return interface
State-based Validation
from praisonaiagents import Agent, Session
import json
class TransactionalInput:
def __init__(self):
self.checkpoint = None
self.validated_inputs = []
def begin_transaction(self):
self.checkpoint = self.validated_inputs.copy()
def add_input(self, input_data, validator):
if validator(input_data):
self.validated_inputs.append(input_data)
return True
return False
def commit(self):
self.checkpoint = None
return self.validated_inputs
def rollback(self):
if self.checkpoint is not None:
self.validated_inputs = self.checkpoint
self.checkpoint = None
# Usage with agent
def process_multi_step_input(agent: Agent):
tx = TransactionalInput()
tx.begin_transaction()
try:
# Step 1: Collect user info
user_info = input("Enter user info (JSON): ")
if not tx.add_input(
json.loads(user_info),
lambda x: "user_id" in x and "email" in x
):
raise ValueError("Invalid user info")
# Step 2: Collect request
request = input("Enter request: ")
if not tx.add_input(
{"request": request},
lambda x: len(x.get("request", "")) > 0
):
raise ValueError("Empty request")
# Process all inputs
all_inputs = tx.commit()
return agent.chat(str(all_inputs))
except Exception as e:
tx.rollback()
return f"Transaction failed: {e}"
Security Patterns
import re
import html
class InputSanitiser:
@staticmethod
def sanitise_text(text: str) -> str:
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Escape special characters
text = html.escape(text)
# Remove potential SQL injection patterns
text = re.sub(r'(;|--|\/\*|\*\/|xp_|sp_)', '', text)
# Remove potential command injection
text = re.sub(r'(\||&|;|`|\$\(|\))', '', text)
return text.strip()
@staticmethod
def sanitise_filename(filename: str) -> str:
# Remove path traversal attempts
filename = re.sub(r'\.\.[\\/]', '', filename)
# Keep only safe characters
filename = re.sub(r'[^a-zA-Z0-9._-]', '_', filename)
return filename
# Use with agent
def safe_file_operation(agent: Agent, user_input: str):
sanitised = InputSanitiser.sanitise_filename(user_input)
if sanitised != user_input:
print(f"Input sanitised: {user_input} -> {sanitised}")
return agent.chat(f"Process file: {sanitised}")
Rate Limiting
from datetime import datetime, timedelta
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests=10, window_seconds=60):
self.max_requests = max_requests
self.window = timedelta(seconds=window_seconds)
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = datetime.now()
user_requests = self.requests[user_id]
# Remove old requests
user_requests[:] = [
req_time for req_time in user_requests
if now - req_time < self.window
]
# Check limit
if len(user_requests) >= self.max_requests:
return False
user_requests.append(now)
return True
# Use with agent
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)
def handle_user_request(user_id: str, message: str):
if not rate_limiter.is_allowed(user_id):
return "Rate limit exceeded. Please try again later."
agent = Agent(name="RateLimitedAgent")
return agent.chat(message)
Best Practices
-
Always Validate Input: Never trust user
input; validate type, format, and content
-
Sanitise for Context: Different contexts
require different sanitisation (SQL, HTML, shell)
-
Fail Safely: Provide clear error messages
without exposing system details
-
Log Suspicious Input: Track validation
failures for security monitoring
-
Use Approval Systems: Require confirmation
for high-risk operations
-
Rate Limit: Prevent abuse with appropriate
rate limiting
-
Test Edge Cases: Include empty strings,
special characters, and boundary values
-
Provide Clear Feedback: Help users
understand validation requirements
Common Patterns Summary
# 1. Environment variables (most secure)
api_key = os.getenv("API_KEY")
# 2. Configuration files
config = load_config("settings.json")
# 3. Command line arguments
parser.add_argument("--input", required=True)
# 4. Interactive prompts (least secure)
user_input = input("Enter value: ")
Validation Pipeline
def input_pipeline(raw_input):
# 1. Type validation
validated = validate_types(raw_input)
# 2. Format validation
formatted = validate_format(validated)
# 3. Business logic validation
checked = validate_business_rules(formatted)
# 4. Sanitisation
sanitised = sanitise_input(checked)
# 5. Final approval
if requires_approval(sanitised):
approved = get_user_approval(sanitised)
return approved
return sanitised
Responses are generated using AI and may contain
mistakes.