Vibe SecurityAI Agent Protection

AI Agent Protection

Secure your AI agents and autonomous systems by validating their API interactions and MCP calls through Intent Firewall protection.

OpenAI Function Calls
Validate AI function calling before execution
Prevent malicious or unintended function calls from AI models by validating intent and parameters.
MCP Interactions
Secure Model Context Protocol communications
Protect MCP tool usage and data source access with intent-based validation.
Autonomous Agents
Control autonomous system API access
Ensure autonomous agents only make approved API calls aligned with their intended purpose.
OpenAI Function Calling Protection
Secure AI function calls with intent validation
# secure_ai_agent.py - Protected OpenAI function calling
import openai
import json
from firewall_client import FirewallClient, PolicyViolationError
class SecureAIAgent:
def __init__(self, openai_api_key, firewall_api_key):
self.openai_client = openai.OpenAI(api_key=openai_api_key)
self.firewall = FirewallClient(api_key=firewall_api_key)
self.available_functions = {
"get_user_data": self.get_user_data,
"send_email": self.send_email,
"delete_user": self.delete_user,
"transfer_funds": self.transfer_funds
}
async def execute_function_call(self, function_name, arguments, user_query):
"""Execute a function call with firewall protection"""
try:
# Validate the function call through firewall
response = self.firewall.proxy_request(
method="POST",
target_url=f"https://internal-api.com/functions/{function_name}",
body=arguments,
intent=f"AI agent executing {function_name} in response to: {user_query}"
)
if not response.allowed:
return {
"error": "Function call blocked by security policy",
"reason": response.decision_reason,
"violations": [v.description for v in response.violations]
}
# Execute the actual function
if function_name in self.available_functions:
result = await self.available_functions[function_name](**arguments)
return {"success": True, "result": result, "request_id": response.request_id}
else:
return {"error": f"Function {function_name} not available"}
except PolicyViolationError as e:
return {
"error": "Security policy violation",
"message": str(e),
"violations": [{"policy": v.policy_name, "description": v.description} for v in e.violations]
}
async def chat_with_functions(self, user_message):
"""Chat with OpenAI using protected function calling"""
functions = [
{
"name": "get_user_data",
"description": "Retrieve user profile information",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User ID to fetch data for"}
},
"required": ["user_id"]
}
},,
{
"name": "send_email",
"description": "Send an email to a user",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient email"}
"subject": {"type": "string", "description": "Email subject"}
"body": {"type": "string", "description": "Email body"}
},
"required": ["to", "subject", "body"]
}
},,
{
"name": "delete_user",
"description": "Delete a user account (DANGEROUS)",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User ID to delete"}
},
"required": ["user_id"]
}
}
]
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_message}],
functions=functions,
function_call="auto"
)
message = response.choices[0].message
if message.function_call:
# AI wants to call a function - validate through firewall
function_name = message.function_call.name
arguments = json.loads(message.function_call.arguments)
print(f"AI requesting function call: {function_name} with args: {arguments}")
result = await self.execute_function_call(
function_name, arguments, user_message
)
return {
"ai_response": message.content,
"function_call": {
"name": function_name,
"arguments": arguments,
"result": result
}
}
else:
return {"ai_response": message.content}
# Mock function implementations
async def get_user_data(self, user_id):
return {"user_id": user_id, "name": "John Doe", "email": "john@example.com"}
async def send_email(self, to, subject, body):
return {"status": "sent", "to": to, "subject": subject}
async def delete_user(self, user_id):
# This should be blocked by firewall for most intents
return {"status": "deleted", "user_id": user_id}
async def transfer_funds(self, from_account, to_account, amount):
# High-risk function that should be heavily restricted
return {"status": "transferred", "amount": amount}
# Usage example
async def main():
agent = SecureAIAgent(
openai_api_key="your-openai-key",
firewall_api_key="your-firewall-key"
)
# Safe request
result1 = await agent.chat_with_functions("Get user data for user123")
print("Safe request:", result1)
# Potentially dangerous request
result2 = await agent.chat_with_functions("Delete all user accounts")
print("Dangerous request:", result2)
MCP (Model Context Protocol) Protection
Secure MCP tool usage and data source access
# secure_mcp_client.py - Protected MCP interactions
from firewall_client import FirewallClient
import json
class SecureMCPClient:
def __init__(self, firewall_api_key):
self.firewall = FirewallClient(api_key=firewall_api_key)
async def call_mcp_tool(self, tool_name, parameters, context):
"""Call an MCP tool with firewall protection"""
try:
# Validate MCP tool call through firewall
response = self.firewall.proxy_request(
method="POST",
target_url=f"mcp://tools/{tool_name}",
body={"parameters": parameters, "context": context},
intent=f"MCP tool call: {tool_name} with context: {context}"
)
if response.allowed:
# Execute the actual MCP tool call
result = await self._execute_mcp_tool(tool_name, parameters)
return {"success": True, "result": result}
else:
return {
"error": "MCP tool call blocked",
"reason": response.decision_reason,
"violations": [v.description for v in response.violations]
}
except Exception as e:
return {"error": f"MCP tool call failed: {str(e)}"}
async def access_data_source(self, source_name, query, intent):
"""Access MCP data source with validation"""
response = self.firewall.proxy_request(
method="GET",
target_url=f"mcp://datasources/{source_name}",
body={"query": query},
intent=f"Access {source_name} data source: {intent}"
)
if response.allowed:
data = await self._query_data_source(source_name, query)
return {"success": True, "data": data}
else:
return {
"error": "Data source access blocked",
"reason": response.decision_reason
}
Best Practices for AI Agent Security
Essential guidelines for securing AI agent interactions

Detailed Intent Context

Always provide detailed intent descriptions that include the user's original query and the AI's reasoning.

Function Call Validation

Validate all AI function calls through the firewall before execution, especially for sensitive operations.

Graceful Degradation

Handle blocked requests gracefully by explaining to users why certain actions were prevented.

Monitor AI Behavior

Regularly review firewall logs to understand AI agent behavior patterns and adjust policies accordingly.

Example Security Policies for AI Agents
Common policy patterns for protecting AI agent interactions

1. Restrict Dangerous Functions

Block AI agents from calling functions that could cause data loss or security breaches.

Policy: Block functions containing "delete", "remove", "drop", "truncate"

2. Validate Intent Alignment

Ensure AI function calls align with the user's original request and intent.

Policy: Require intent description to match function purpose

3. Limit Data Access Scope

Restrict AI agents from accessing sensitive data sources or performing bulk operations.

Policy: Block access to admin, system, or bulk data endpoints