Langflow Integration Examples
Real-world implementation of AZTP Client in Langflow for secure AI workflow orchestration.
Overview
Langflow uses AZTP Client to implement enterprise-grade security for AI workflows, providing:
- Component Identity Management - Each component gets a unique, verifiable identity
- Policy-Based Tool Access - OIAP evaluation controls which tools agents can use
- Flow-Level Security - Secure orchestration of multi-component workflows
- SAFE-MCP Protocol - Advanced security for Model Context Protocol implementations
Architecture
graph TB
subgraph Langflow_UI ["Langflow UI"]
A["User"] --> B["Security Detection<br/>SAFE-T1001"]
B --> C["Agent Component"]
C --> D["MCP Tools"]
D --> E["Chat Output"]
end
subgraph AZTP_Security ["AZTP Security Layer"]
F["Identity Service"] --> G["Component Identities"]
H["Policy Manager"] --> I["OIAP Evaluation"]
J["Flow Manager"] --> K["Identity Flows"]
end
subgraph Backend_Services ["Backend Services"]
L["AZTP Client"] --> M["Organization API"]
N["Policy Engine"] --> O["Access Control"]
end
B -.-> F
C -.-> G
D -.-> I
E -.-> K
F --> L
H --> N
J --> L
1. Component Identity Creation
IdentityService Implementation
# From langflow/services/identity/service.py
class IdentityService:
"""Service for managing component identities using AZTP client."""
async def secure_connect_component(
self,
component_id: str,
component_name: str,
organization_api_key: Optional[str] = None
) -> Optional[Any]:
"""Create a secure connection for a Langflow component."""
async with self._lock:
# Get organization-specific AZTP client
client = await self._get_client(organization_api_key=organization_api_key)
if not client:
logger.error("❌ Organization API key required for component identity")
return None
# Sanitize component name for AZTP requirements
sanitized_name = self._sanitize_agent_name(component_name)
try:
# Create component-specific identity (not global)
agent = await client.secure_connect(
{},
sanitized_name,
config={"isGlobalIdentity": False}
)
if agent.identity.valid:
# Get detailed identity information
identity_data = await client.get_identity(agent)
parsed_data = json.loads(identity_data)
# Store AZTP ID and status for policy validation
agent.aztp_identity_id = parsed_data["data"]["aztpId"]
agent.aztp_identity_status = parsed_data["data"]["status"]
# Register in organization-specific storage
organization_agents = self._get_organization_agents(organization_api_key)
organization_agents[component_id] = agent
logger.info(f"✅ Component secured: {component_id} -> {agent.aztp_identity_id}")
return agent
except Exception as e:
logger.error(f"Failed to create secure connection for {component_id}: {e}")
return None
Component Name Sanitization
def _sanitize_agent_name(self, component_name: str) -> str:
"""Sanitize component name to meet AZTP API requirements."""
# Take only first part before '-' for consistency
component_name = component_name.split("-")[0]
# Replace invalid characters with hyphens
sanitized = re.sub(r"[^a-zA-Z0-9\-/]", "-", component_name)
# Remove multiple consecutive hyphens
sanitized = re.sub(r"-+", "-", sanitized)
# Remove leading and trailing hyphens/slashes
sanitized = sanitized.strip("-/")
# Ensure minimum length
if len(sanitized) < 2:
sanitized = f"component-{sanitized}" if sanitized else "component"
# Truncate if too long (max 64 chars)
if len(sanitized) > 64:
sanitized = sanitized[:64]
return sanitized
2. OIAP Policy Enforcement
Real-Time Policy Evaluation
async def check_oiap_policy(self, tool_name: str, component_name: str) -> bool:
"""Check Organization Identity Access Policy for tool access."""
# Skip policy check for safe tools
if tool_name == "get_current_date":
return True
# Get required context
user_id = get_current_user_id()
organization_api_key = get_current_organization_api_key()
organization_api_access = get_current_organization_api_access()
if not all([user_id, organization_api_key, organization_api_access]):
logger.error("❌ Missing required context for OIAP evaluation")
return False
client = await self._get_client(organization_api_key=organization_api_key)
# Create agent identity for component requesting tool access
agent_id = await client.secure_connect(
{},
f"{organization_api_access}/{component_name}",
config={"isGlobalIdentity": False}
)
# Load flow context for grouped policy evaluation
flow_file = f"aztp_flow_id_{organization_api_key[:8]}.json"
with open(flow_file, "r") as f:
aztp_flow_id = json.load(f)
# Add component to flow for context-aware policies
await client.add_identity_to_flow(aztp_flow_id, agent_id.identity.aztp_id)
# Evaluate OIAP policy: user + component + tool
result = await client.oiap_evaluate(
aztp_id=agent_id.identity.aztp_id,
requested_resource=tool_name,
user_id=user_id
)
is_allowed = result['result'] == "ALLOW"
if is_allowed:
logger.info(f"✅ OIAP ALLOWED: {component_name} -> {tool_name}")
else:
logger.warning(f"❌ OIAP DENIED: {component_name} -> {tool_name}")
return is_allowed
3. Agent Tool Filtering
Secure Agent Implementation
# From langflow/components/agents/agent.py
class LCToolsAgentComponent(LCAgentComponent):
"""Langflow Agent with AZTP-secured tool access."""
async def get_tools(self) -> list[StructuredTool]:
"""Get tools with OIAP policy filtering."""
if not self.tools:
return []
# Get identity service for policy checks
from langflow.services.identity import get_identity_service
identity_service = get_identity_service()
# Filter tools based on OIAP policy
allowed_tools = []
restricted_tools = []
for tool in self.tools:
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
# Check OIAP policy for this tool
is_allowed = await identity_service.check_oiap_policy(
tool_name=tool_name,
component_name=self.display_name
)
if is_allowed:
allowed_tools.append(tool)
logger.info(f"✅ Tool authorized: {tool_name}")
else:
restricted_tools.append(tool_name)
logger.warning(f"🚫 Tool restricted: {tool_name}")
# Update agent instructions with security context
if restricted_tools:
security_notice = self._create_security_notice(allowed_tools, restricted_tools)
self.system_message = f"{self.system_message}\n\n{security_notice}"
return allowed_tools
def _create_security_notice(self, allowed_tools: list, restricted_tools: list[str]) -> str:
"""Create security notice for agent instructions."""
if not allowed_tools:
return """🚫 **IMPORTANT**: You currently have NO TOOLS available due to OIAP security policy restrictions. You can only provide text-based responses."""
tool_list = "\n".join([f"- {tool.name}: {tool.description}" for tool in allowed_tools])
notice = f"""🛡️ **SECURITY NOTICE**: You have access to ONLY the tools listed below. These tools have been approved by the OIAP security policy for your current context.
**AVAILABLE TOOLS**:
{tool_list}"""
if restricted_tools:
restricted_list = "\n".join([f"- {tool}" for tool in restricted_tools])
notice += f"""
🚫 **RESTRICTED TOOLS**: The following tools are NOT available due to OIAP security policy:
{restricted_list}
**CRITICAL**: Do NOT attempt to use any tools not listed in the "AVAILABLE TOOLS" section above."""
return notice
4. Flow Identity Management
Secure Workflow Creation
async def create_flow_identity(
self,
flow_name: str,
description: str = None,
flow_id: str = None,
folder_id: str = None,
organization_api_key: Optional[str] = None
) -> dict[str, Any] | None:
"""Create AZTP flow identity for a Langflow workflow."""
async with self._lock:
try:
client = await self._get_client(organization_api_key=organization_api_key)
if not client:
logger.error("❌ Organization API key required for flow creation")
return None
# Prepare enhanced metadata linking Langflow and AZTP
metadata = {
"project": flow_name,
"environment": "production",
"astha-flow": {
"id": flow_id or "unknown-flow-id",
"folderId": folder_id or "unknown-folder-id"
}
}
# Create AZTP flow with enhanced metadata
aztp_flow_data = await client.create_flow(
flow_name,
description=description or f"Langflow workflow: {flow_name}",
discoverable="public",
tags=[description or flow_name],
metadata=metadata
)
# Save flow ID for component registration
flow_file = f"aztp_flow_id_{organization_api_key[:8]}.json"
with open(flow_file, "w") as f:
json.dump(aztp_flow_data['_id'], f)
logger.info(f"✅ AZTP flow created:")
logger.info(f" AZTP Flow ID: {aztp_flow_data.get('_id')}")
logger.info(f" Langflow Flow ID: {flow_id}")
logger.info(f" Langflow Folder ID: {folder_id}")
return aztp_flow_data
except Exception as e:
logger.error(f"Failed to create AZTP flow for '{flow_name}': {e}")
return None
5. MCP Security Integration
Secure MCP Component
# From langflow/components/agents/mcp_component.py
class MCPToolsComponent(ComponentWithCache):
"""MCP Tools component with AZTP security integration."""
display_name = "MCP Tools"
description = "Connect to an MCP server to use its tools with AZTP security."
icon = "Mcp"
async def build_output(self) -> DataFrame:
"""Execute MCP tool with security validation."""
try:
# Update available tools
self.tools, _ = await self.update_tool_list()
if not self.tool:
return DataFrame(data=[{"error": "You must select a tool"}])
# Get identity service for security validation
from langflow.services.identity import get_identity_service
identity_service = get_identity_service()
# Check OIAP policy before tool execution
is_allowed = await identity_service.check_oiap_policy(
tool_name=self.tool,
component_name=self.display_name
)
if not is_allowed:
logger.warning(f"🚫 Tool access denied by OIAP policy: {self.tool}")
return DataFrame(data=[{
"error": f"Access denied: Tool '{self.tool}' not authorized by security policy"
}])
# Set session context for persistent MCP sessions
session_context = self._get_session_context()
if session_context:
self.stdio_client.set_session_context(session_context)
self.sse_client.set_session_context(session_context)
# Execute tool with security validation passed
exec_tool = self._tool_cache[self.tool]
tool_args = self.get_inputs_for_all_tools(self.tools)[self.tool]
# Prepare arguments
kwargs = {}
for arg in tool_args:
value = getattr(self, arg.name, None)
if value:
if isinstance(value, Message):
kwargs[arg.name] = value.text
else:
kwargs[arg.name] = value
# Unflatten nested arguments
unflattened_kwargs = maybe_unflatten_dict(kwargs)
# Execute tool
output = await exec_tool.coroutine(**unflattened_kwargs)
# Process response
tool_content = []
for item in output.content:
item_dict = item.model_dump()
tool_content.append(item_dict)
logger.info(f"✅ Tool executed successfully: {self.tool}")
return DataFrame(data=tool_content)
except Exception as e:
logger.error(f"Error in MCP tool execution: {e}")
return DataFrame(data=[{"error": f"Tool execution failed: {str(e)}"}])
6. Policy Manager Integration
Dynamic Policy Evaluation
# From langflow/services/identity/policy_manager.py
class PolicyManager:
"""Enhanced policy manager for AZTP integration."""
def __init__(self, identity_service):
self.identity_service = identity_service
async def check_dynamic_agent_tool_policy(
self,
agent_component_id: str,
tool_name: str,
tool_metadata: dict[str, Any] = None,
execution_context: dict[str, Any] = None,
organization_api_key: Optional[str] = None
) -> dict[str, Any]:
"""Check dynamic policy for agent tool access."""
policy_result = {
"allowed": False,
"reason": "Policy evaluation failed",
"policy_type": "dynamic_agent_tool",
"tool_name": tool_name,
"agent_id": agent_component_id
}
try:
# Get agent identity
agent = self.identity_service.get_agent(agent_component_id, organization_api_key)
if not agent:
policy_result["reason"] = "Agent identity not found"
return policy_result
# Get AZTP client
client = await self.identity_service._get_client(organization_api_key=organization_api_key)
if not client:
policy_result["reason"] = "AZTP client not available"
return policy_result
# Determine required actions based on tool type
component_type = execution_context.get("component_type", "Unknown")
required_actions = ComponentTypeClassifier.get_component_actions(
component_type,
tool_metadata or {}
)
# Convert PolicyAction enums to strings
action_strings = [action.value for action in required_actions]
# Check identity policy permissions
permissions = await client.check_identity_policy_permissions(
agent.aztp_identity_id,
options={"actions": action_strings}
)
# Verify all required permissions are granted
missing_permissions = [
action for action in action_strings
if not permissions.get(action, False)
]
if missing_permissions:
policy_result["reason"] = f"Missing permissions: {missing_permissions}"
policy_result["missing_permissions"] = missing_permissions
return policy_result
# All permissions granted
policy_result["allowed"] = True
policy_result["reason"] = "All policy checks passed"
policy_result["granted_permissions"] = permissions
logger.info(f"✅ Dynamic policy check passed: {tool_name} for {agent_component_id}")
except Exception as e:
logger.error(f"Error in dynamic policy evaluation: {e}")
policy_result["reason"] = f"Policy evaluation error: {str(e)}"
return policy_result
7. Environment Configuration
Production Environment Setup
# Langflow with AZTP Security Configuration
# Save as .env or export in your shell
# AZTP Service Configuration
AZTP_BASE_URL=https://your-aztp-server.com
# Organization API key provided by Astha Console
# AZTP_API_KEY will be set by organization context
# AZTP Policy Configuration
AZTP_POLICY_ENABLED=true
AZTP_POLICY_STRICT_MODE=false
AZTP_POLICY_DEFAULT_ACTION=allow
AZTP_POLICY_LOG_LEVEL=info
# User Context for OIAP
USERNAME=your-username
# Langflow Configuration
LANGFLOW_AUTO_LOGIN=false
LANGFLOW_SUPERUSER=admin
LANGFLOW_SUPERUSER_PASSWORD=your-secure-password
Development Environment
# Development Environment with Relaxed Security
AZTP_POLICY_ENABLED=false
AZTP_POLICY_STRICT_MODE=false
AZTP_POLICY_DEFAULT_ACTION=allow
# Enable debug logging
AZTP_POLICY_LOG_LEVEL=debug
LANGFLOW_LOG_LEVEL=DEBUG
8. Complete Workflow Example
Secure AI Assistant Flow
async def create_secure_ai_assistant():
"""Create a complete secure AI assistant workflow."""
# 1. Initialize services
from langflow.services.identity import get_identity_service
identity_service = get_identity_service()
organization_api_key = get_current_organization_api_key()
# 2. Create AZTP flow for the workflow
flow_data = await identity_service.create_flow_identity(
flow_name="secure-ai-assistant",
description="Enterprise AI assistant with SAFE-MCP security",
flow_id="langflow-uuid-123",
folder_id="folder-uuid-456",
organization_api_key=organization_api_key
)
# 3. Create component identities
components = [
{"id": "security-001", "name": "SecurityDetection", "type": "security"},
{"id": "agent-001", "name": "ChatAgent", "type": "agent"},
{"id": "mcp-001", "name": "MCPTools", "type": "tools"},
{"id": "output-001", "name": "ChatOutput", "type": "output"}
]
component_agents = {}
for component in components:
# Create secure component identity
agent = await identity_service.secure_connect_component(
component_id=component["id"],
component_name=component["name"],
organization_api_key=organization_api_key
)
if agent:
component_agents[component["id"]] = agent
# Add component to flow
await identity_service.add_component_to_flow(
flow_data["_id"],
agent.aztp_identity_id,
organization_api_key=organization_api_key
)
logger.info(f"✅ Component added to flow: {component['name']}")
# 4. Link components for secure data flow
await identity_service.link_component_identities(
"security-001", "agent-001",
"input_validation",
organization_api_key
)
await identity_service.link_component_identities(
"agent-001", "mcp-001",
"tool_access",
organization_api_key
)
await identity_service.link_component_identities(
"mcp-001", "output-001",
"response_output",
organization_api_key
)
logger.info("🔐 Secure AI assistant workflow created successfully")
return flow_data, component_agents
# Execute the workflow
async def main():
flow, components = await create_secure_ai_assistant()
print(f"Secure workflow created with AZTP Flow ID: {flow['_id']}")
print(f"Components secured: {len(components)}")
if __name__ == "__main__":
asyncio.run(main())
9. Security Monitoring
Real-Time Security Dashboard
class LangflowSecurityMonitor:
"""Monitor security events in Langflow workflows."""
def __init__(self, organization_api_key: str):
self.client = Aztp(api_key=organization_api_key)
self.security_events = []
async def monitor_component_security(self, component_id: str):
"""Monitor security events for a specific component."""
from langflow.services.identity import get_identity_service
identity_service = get_identity_service()
# Get component agent
agent = identity_service.get_agent(component_id, self.client.api_key)
if not agent:
logger.warning(f"No agent found for component: {component_id}")
return
# Verify identity status
is_valid = await self.client.verify_identity_by_aztp_id(agent.aztp_identity_id)
# Log security event
security_event = {
"timestamp": datetime.utcnow().isoformat(),
"component_id": component_id,
"aztp_id": agent.aztp_identity_id,
"identity_valid": is_valid,
"identity_status": agent.aztp_identity_status,
"event_type": "identity_verification"
}
self.security_events.append(security_event)
if not is_valid:
logger.error(f"🚨 Security Alert: Invalid identity for component {component_id}")
else:
logger.info(f"✅ Security check passed for component {component_id}")
return security_event
async def generate_security_report(self) -> dict:
"""Generate comprehensive security report."""
total_events = len(self.security_events)
valid_identities = sum(1 for event in self.security_events if event["identity_valid"])
return {
"report_generated": datetime.utcnow().isoformat(),
"total_security_events": total_events,
"valid_identities": valid_identities,
"invalid_identities": total_events - valid_identities,
"security_score": (valid_identities / total_events * 100) if total_events > 0 else 100,
"events": self.security_events
}
# Usage in Langflow
monitor = LangflowSecurityMonitor("org-key-123")
# Monitor all components in a flow
async def monitor_flow_security(flow_components: list[str]):
for component_id in flow_components:
await monitor.monitor_component_security(component_id)
# Generate security report
report = await monitor.generate_security_report()
logger.info(f"📊 Security Report: {report['security_score']:.1f}% secure")
10. Testing and Validation
AZTP Integration Tests
import pytest
from langflow.services.identity import get_identity_service
class TestLangflowAZTPIntegration:
"""Test AZTP integration in Langflow."""
@pytest.fixture
async def identity_service(self):
"""Get identity service for testing."""
return get_identity_service()
@pytest.mark.asyncio
async def test_component_identity_creation(self, identity_service):
"""Test component identity creation."""
# Create component identity
agent = await identity_service.secure_connect_component(
component_id="test-component-001",
component_name="TestChatComponent",
organization_api_key="test-org-key"
)
assert agent is not None
assert hasattr(agent, "aztp_identity_id")
assert agent.aztp_identity_id.startswith("aztp://")
assert agent.aztp_identity_status == "active"
@pytest.mark.asyncio
async def test_oiap_policy_evaluation(self, identity_service):
"""Test OIAP policy evaluation for tool access."""
# Test allowed tool
is_allowed = await identity_service.check_oiap_policy(
tool_name="get_current_date",
component_name="TestAgent"
)
assert is_allowed is True
# Test with policy evaluation
# This would require proper OIAP setup in test environment
# Result depends on configured policies
@pytest.mark.asyncio
async def test_flow_identity_management(self, identity_service):
"""Test flow identity creation and management."""
# Create flow identity
flow_data = await identity_service.create_flow_identity(
flow_name="test-secure-flow",
description="Test flow for security validation",
flow_id="test-flow-uuid",
folder_id="test-folder-uuid",
organization_api_key="test-org-key"
)
assert flow_data is not None
assert "_id" in flow_data
assert flow_data["name"] == "test-secure-flow"
Best Practices
1. Security Configuration
# Always use organization-specific API keys
organization_api_key = get_current_organization_api_key()
client = Aztp(api_key=organization_api_key)
# Enable policy enforcement in production
os.environ["AZTP_POLICY_ENABLED"] = "true"
# Use strict mode for high-security environments
os.environ["AZTP_POLICY_STRICT_MODE"] = "true"
2. Error Handling
# Robust error handling for production
try:
agent = await client.secure_connect({}, component_name)
except ValueError as e:
logger.error(f"Validation error: {e}")
# Handle gracefully - maybe use fallback identity
except ConnectionError as e:
logger.error(f"Connection error: {e}")
# Retry with exponential backoff
except Exception as e:
logger.error(f"Unexpected error: {e}")
# Log and alert operations team
3. Performance Optimization
# Cache identities to avoid repeated API calls
@functools.lru_cache(maxsize=128)
async def get_cached_identity(component_name: str):
return await client.get_identity_by_name(component_name)
# Use async batch operations for multiple components
async def batch_create_identities(component_names: list[str]):
tasks = [
client.secure_connect({}, name, config={"isGlobalIdentity": False})
for name in component_names
]
return await asyncio.gather(*tasks, return_exceptions=True)
Next Steps
- Identity Management - Deep dive into identity concepts
- Policy Management - Advanced policy configuration
- API Reference - Complete method documentation
- Enterprise Support (opens in a new tab) - Professional implementation assistance