# secure_ai_agent.py - Protected OpenAI function calling
import openai
import json
from firewall_client import FirewallClient, PolicyViolationError
class SecureAIAgent:
def __init__(self, openai_api_key, firewall_api_key):
self.openai_client = openai.OpenAI(api_key=openai_api_key)
self.firewall = FirewallClient(api_key=firewall_api_key)
self.available_functions = {
"get_user_data": self.get_user_data,
"send_email": self.send_email,
"delete_user": self.delete_user,
"transfer_funds": self.transfer_funds
}
async def execute_function_call(self, function_name, arguments, user_query):
"""Execute a function call with firewall protection"""
try:
# Validate the function call through firewall
response = self.firewall.proxy_request(
method="POST",
target_url=f"https://internal-api.com/functions/{function_name}",
body=arguments,
intent=f"AI agent executing {function_name} in response to: {user_query}"
)
if not response.allowed:
return {
"error": "Function call blocked by security policy",
"reason": response.decision_reason,
"violations": [v.description for v in response.violations]
}
# Execute the actual function
if function_name in self.available_functions:
result = await self.available_functions[function_name](**arguments)
return {"success": True, "result": result, "request_id": response.request_id}
else:
return {"error": f"Function {function_name} not available"}
except PolicyViolationError as e:
return {
"error": "Security policy violation",
"message": str(e),
"violations": [{"policy": v.policy_name, "description": v.description} for v in e.violations]
}
async def chat_with_functions(self, user_message):
"""Chat with OpenAI using protected function calling"""
functions = [
{
"name": "get_user_data",
"description": "Retrieve user profile information",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User ID to fetch data for"}
},
"required": ["user_id"]
}
},,
{
"name": "send_email",
"description": "Send an email to a user",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient email"}
"subject": {"type": "string", "description": "Email subject"}
"body": {"type": "string", "description": "Email body"}
},
"required": ["to", "subject", "body"]
}
},,
{
"name": "delete_user",
"description": "Delete a user account (DANGEROUS)",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User ID to delete"}
},
"required": ["user_id"]
}
}
]
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_message}],
functions=functions,
function_call="auto"
)
message = response.choices[0].message
if message.function_call:
# AI wants to call a function - validate through firewall
function_name = message.function_call.name
arguments = json.loads(message.function_call.arguments)
print(f"AI requesting function call: {function_name} with args: {arguments}")
result = await self.execute_function_call(
function_name, arguments, user_message
)
return {
"ai_response": message.content,
"function_call": {
"name": function_name,
"arguments": arguments,
"result": result
}
}
else:
return {"ai_response": message.content}
# Mock function implementations
async def get_user_data(self, user_id):
return {"user_id": user_id, "name": "John Doe", "email": "john@example.com"}
async def send_email(self, to, subject, body):
return {"status": "sent", "to": to, "subject": subject}
async def delete_user(self, user_id):
# This should be blocked by firewall for most intents
return {"status": "deleted", "user_id": user_id}
async def transfer_funds(self, from_account, to_account, amount):
# High-risk function that should be heavily restricted
return {"status": "transferred", "amount": amount}
# Usage example
async def main():
agent = SecureAIAgent(
openai_api_key="your-openai-key",
firewall_api_key="your-firewall-key"
)
# Safe request
result1 = await agent.chat_with_functions("Get user data for user123")
print("Safe request:", result1)
# Potentially dangerous request
result2 = await agent.chat_with_functions("Delete all user accounts")
print("Dangerous request:", result2)