Examples
Langflow Integration

Langflow Integration Examples

Real-world implementation of AZTP Client in Langflow for secure AI workflow orchestration.

Overview

Langflow uses AZTP Client to implement enterprise-grade security for AI workflows, providing:

  • Component Identity Management - Each component gets a unique, verifiable identity
  • Policy-Based Tool Access - OIAP evaluation controls which tools agents can use
  • Flow-Level Security - Secure orchestration of multi-component workflows
  • SAFE-MCP Protocol - Advanced security for Model Context Protocol implementations

Architecture

graph TB
    subgraph Langflow_UI ["Langflow UI"]
        A["User"] --> B["Security Detection<br/>SAFE-T1001"]
        B --> C["Agent Component"]
        C --> D["MCP Tools"]
        D --> E["Chat Output"]
    end
    
    subgraph AZTP_Security ["AZTP Security Layer"]
        F["Identity Service"] --> G["Component Identities"]
        H["Policy Manager"] --> I["OIAP Evaluation"]
        J["Flow Manager"] --> K["Identity Flows"]
    end
    
    subgraph Backend_Services ["Backend Services"]
        L["AZTP Client"] --> M["Organization API"]
        N["Policy Engine"] --> O["Access Control"]
    end
    
    B -.-> F
    C -.-> G
    D -.-> I
    E -.-> K
    
    F --> L
    H --> N
    J --> L

1. Component Identity Creation

IdentityService Implementation

# From langflow/services/identity/service.py
class IdentityService:
    """Service for managing component identities using AZTP client."""
    
    async def secure_connect_component(
        self, 
        component_id: str, 
        component_name: str,
        organization_api_key: Optional[str] = None
    ) -> Optional[Any]:
        """Create a secure connection for a Langflow component."""
        
        async with self._lock:
            # Get organization-specific AZTP client
            client = await self._get_client(organization_api_key=organization_api_key)
            if not client:
                logger.error("❌ Organization API key required for component identity")
                return None
            
            # Sanitize component name for AZTP requirements
            sanitized_name = self._sanitize_agent_name(component_name)
            
            try:
                # Create component-specific identity (not global)
                agent = await client.secure_connect(
                    {}, 
                    sanitized_name,
                    config={"isGlobalIdentity": False}
                )
                
                if agent.identity.valid:
                    # Get detailed identity information
                    identity_data = await client.get_identity(agent)
                    parsed_data = json.loads(identity_data)
                    
                    # Store AZTP ID and status for policy validation
                    agent.aztp_identity_id = parsed_data["data"]["aztpId"]
                    agent.aztp_identity_status = parsed_data["data"]["status"]
                    
                    # Register in organization-specific storage
                    organization_agents = self._get_organization_agents(organization_api_key)
                    organization_agents[component_id] = agent
                    
                    logger.info(f"✅ Component secured: {component_id} -> {agent.aztp_identity_id}")
                
                return agent
                
            except Exception as e:
                logger.error(f"Failed to create secure connection for {component_id}: {e}")
                return None

Component Name Sanitization

def _sanitize_agent_name(self, component_name: str) -> str:
    """Sanitize component name to meet AZTP API requirements."""
    
    # Take only first part before '-' for consistency
    component_name = component_name.split("-")[0]
    
    # Replace invalid characters with hyphens
    sanitized = re.sub(r"[^a-zA-Z0-9\-/]", "-", component_name)
    
    # Remove multiple consecutive hyphens
    sanitized = re.sub(r"-+", "-", sanitized)
    
    # Remove leading and trailing hyphens/slashes
    sanitized = sanitized.strip("-/")
    
    # Ensure minimum length
    if len(sanitized) < 2:
        sanitized = f"component-{sanitized}" if sanitized else "component"
    
    # Truncate if too long (max 64 chars)
    if len(sanitized) > 64:
        sanitized = sanitized[:64]
    
    return sanitized

2. OIAP Policy Enforcement

Real-Time Policy Evaluation

async def check_oiap_policy(self, tool_name: str, component_name: str) -> bool:
    """Check Organization Identity Access Policy for tool access."""
    
    # Skip policy check for safe tools
    if tool_name == "get_current_date":
        return True
    
    # Get required context
    user_id = get_current_user_id()
    organization_api_key = get_current_organization_api_key()
    organization_api_access = get_current_organization_api_access()
    
    if not all([user_id, organization_api_key, organization_api_access]):
        logger.error("❌ Missing required context for OIAP evaluation")
        return False
    
    client = await self._get_client(organization_api_key=organization_api_key)
    
    # Create agent identity for component requesting tool access
    agent_id = await client.secure_connect(
        {}, 
        f"{organization_api_access}/{component_name}", 
        config={"isGlobalIdentity": False}
    )
    
    # Load flow context for grouped policy evaluation
    flow_file = f"aztp_flow_id_{organization_api_key[:8]}.json"
    with open(flow_file, "r") as f:
        aztp_flow_id = json.load(f)
    
    # Add component to flow for context-aware policies
    await client.add_identity_to_flow(aztp_flow_id, agent_id.identity.aztp_id)
    
    # Evaluate OIAP policy: user + component + tool
    result = await client.oiap_evaluate(
        aztp_id=agent_id.identity.aztp_id,
        requested_resource=tool_name,
        user_id=user_id
    )
    
    is_allowed = result['result'] == "ALLOW"
    
    if is_allowed:
        logger.info(f"✅ OIAP ALLOWED: {component_name} -> {tool_name}")
    else:
        logger.warning(f"❌ OIAP DENIED: {component_name} -> {tool_name}")
    
    return is_allowed

3. Agent Tool Filtering

Secure Agent Implementation

# From langflow/components/agents/agent.py
class LCToolsAgentComponent(LCAgentComponent):
    """Langflow Agent with AZTP-secured tool access."""
    
    async def get_tools(self) -> list[StructuredTool]:
        """Get tools with OIAP policy filtering."""
        
        if not self.tools:
            return []
        
        # Get identity service for policy checks
        from langflow.services.identity import get_identity_service
        identity_service = get_identity_service()
        
        # Filter tools based on OIAP policy
        allowed_tools = []
        restricted_tools = []
        
        for tool in self.tools:
            tool_name = tool.name if hasattr(tool, 'name') else str(tool)
            
            # Check OIAP policy for this tool
            is_allowed = await identity_service.check_oiap_policy(
                tool_name=tool_name,
                component_name=self.display_name
            )
            
            if is_allowed:
                allowed_tools.append(tool)
                logger.info(f"✅ Tool authorized: {tool_name}")
            else:
                restricted_tools.append(tool_name)
                logger.warning(f"🚫 Tool restricted: {tool_name}")
        
        # Update agent instructions with security context
        if restricted_tools:
            security_notice = self._create_security_notice(allowed_tools, restricted_tools)
            self.system_message = f"{self.system_message}\n\n{security_notice}"
        
        return allowed_tools
    
    def _create_security_notice(self, allowed_tools: list, restricted_tools: list[str]) -> str:
        """Create security notice for agent instructions."""
        
        if not allowed_tools:
            return """🚫 **IMPORTANT**: You currently have NO TOOLS available due to OIAP security policy restrictions. You can only provide text-based responses."""
        
        tool_list = "\n".join([f"- {tool.name}: {tool.description}" for tool in allowed_tools])
        
        notice = f"""🛡️ **SECURITY NOTICE**: You have access to ONLY the tools listed below. These tools have been approved by the OIAP security policy for your current context.
 
**AVAILABLE TOOLS**:
{tool_list}"""
 
        if restricted_tools:
            restricted_list = "\n".join([f"- {tool}" for tool in restricted_tools])
            notice += f"""
 
🚫 **RESTRICTED TOOLS**: The following tools are NOT available due to OIAP security policy:
{restricted_list}
 
**CRITICAL**: Do NOT attempt to use any tools not listed in the "AVAILABLE TOOLS" section above."""
        
        return notice

4. Flow Identity Management

Secure Workflow Creation

async def create_flow_identity(
    self, 
    flow_name: str, 
    description: str = None,
    flow_id: str = None, 
    folder_id: str = None,
    organization_api_key: Optional[str] = None
) -> dict[str, Any] | None:
    """Create AZTP flow identity for a Langflow workflow."""
    
    async with self._lock:
        try:
            client = await self._get_client(organization_api_key=organization_api_key)
            if not client:
                logger.error("❌ Organization API key required for flow creation")
                return None
            
            # Prepare enhanced metadata linking Langflow and AZTP
            metadata = {
                "project": flow_name,
                "environment": "production",
                "astha-flow": {
                    "id": flow_id or "unknown-flow-id",
                    "folderId": folder_id or "unknown-folder-id"
                }
            }
            
            # Create AZTP flow with enhanced metadata
            aztp_flow_data = await client.create_flow(
                flow_name, 
                description=description or f"Langflow workflow: {flow_name}",
                discoverable="public",
                tags=[description or flow_name],
                metadata=metadata
            )
            
            # Save flow ID for component registration
            flow_file = f"aztp_flow_id_{organization_api_key[:8]}.json"
            with open(flow_file, "w") as f:
                json.dump(aztp_flow_data['_id'], f)
            
            logger.info(f"✅ AZTP flow created:")
            logger.info(f"  AZTP Flow ID: {aztp_flow_data.get('_id')}")
            logger.info(f"  Langflow Flow ID: {flow_id}")
            logger.info(f"  Langflow Folder ID: {folder_id}")
            
            return aztp_flow_data
            
        except Exception as e:
            logger.error(f"Failed to create AZTP flow for '{flow_name}': {e}")
            return None

5. MCP Security Integration

Secure MCP Component

# From langflow/components/agents/mcp_component.py
class MCPToolsComponent(ComponentWithCache):
    """MCP Tools component with AZTP security integration."""
    
    display_name = "MCP Tools"
    description = "Connect to an MCP server to use its tools with AZTP security."
    icon = "Mcp"
    
    async def build_output(self) -> DataFrame:
        """Execute MCP tool with security validation."""
        try:
            # Update available tools
            self.tools, _ = await self.update_tool_list()
            
            if not self.tool:
                return DataFrame(data=[{"error": "You must select a tool"}])
            
            # Get identity service for security validation
            from langflow.services.identity import get_identity_service
            identity_service = get_identity_service()
            
            # Check OIAP policy before tool execution
            is_allowed = await identity_service.check_oiap_policy(
                tool_name=self.tool,
                component_name=self.display_name
            )
            
            if not is_allowed:
                logger.warning(f"🚫 Tool access denied by OIAP policy: {self.tool}")
                return DataFrame(data=[{
                    "error": f"Access denied: Tool '{self.tool}' not authorized by security policy"
                }])
            
            # Set session context for persistent MCP sessions
            session_context = self._get_session_context()
            if session_context:
                self.stdio_client.set_session_context(session_context)
                self.sse_client.set_session_context(session_context)
            
            # Execute tool with security validation passed
            exec_tool = self._tool_cache[self.tool]
            tool_args = self.get_inputs_for_all_tools(self.tools)[self.tool]
            
            # Prepare arguments
            kwargs = {}
            for arg in tool_args:
                value = getattr(self, arg.name, None)
                if value:
                    if isinstance(value, Message):
                        kwargs[arg.name] = value.text
                    else:
                        kwargs[arg.name] = value
            
            # Unflatten nested arguments
            unflattened_kwargs = maybe_unflatten_dict(kwargs)
            
            # Execute tool
            output = await exec_tool.coroutine(**unflattened_kwargs)
            
            # Process response
            tool_content = []
            for item in output.content:
                item_dict = item.model_dump()
                tool_content.append(item_dict)
            
            logger.info(f"✅ Tool executed successfully: {self.tool}")
            return DataFrame(data=tool_content)
            
        except Exception as e:
            logger.error(f"Error in MCP tool execution: {e}")
            return DataFrame(data=[{"error": f"Tool execution failed: {str(e)}"}])

6. Policy Manager Integration

Dynamic Policy Evaluation

# From langflow/services/identity/policy_manager.py
class PolicyManager:
    """Enhanced policy manager for AZTP integration."""
    
    def __init__(self, identity_service):
        self.identity_service = identity_service
    
    async def check_dynamic_agent_tool_policy(
        self,
        agent_component_id: str,
        tool_name: str,
        tool_metadata: dict[str, Any] = None,
        execution_context: dict[str, Any] = None,
        organization_api_key: Optional[str] = None
    ) -> dict[str, Any]:
        """Check dynamic policy for agent tool access."""
        
        policy_result = {
            "allowed": False,
            "reason": "Policy evaluation failed",
            "policy_type": "dynamic_agent_tool",
            "tool_name": tool_name,
            "agent_id": agent_component_id
        }
        
        try:
            # Get agent identity
            agent = self.identity_service.get_agent(agent_component_id, organization_api_key)
            if not agent:
                policy_result["reason"] = "Agent identity not found"
                return policy_result
            
            # Get AZTP client
            client = await self.identity_service._get_client(organization_api_key=organization_api_key)
            if not client:
                policy_result["reason"] = "AZTP client not available"
                return policy_result
            
            # Determine required actions based on tool type
            component_type = execution_context.get("component_type", "Unknown")
            required_actions = ComponentTypeClassifier.get_component_actions(
                component_type, 
                tool_metadata or {}
            )
            
            # Convert PolicyAction enums to strings
            action_strings = [action.value for action in required_actions]
            
            # Check identity policy permissions
            permissions = await client.check_identity_policy_permissions(
                agent.aztp_identity_id,
                options={"actions": action_strings}
            )
            
            # Verify all required permissions are granted
            missing_permissions = [
                action for action in action_strings
                if not permissions.get(action, False)
            ]
            
            if missing_permissions:
                policy_result["reason"] = f"Missing permissions: {missing_permissions}"
                policy_result["missing_permissions"] = missing_permissions
                return policy_result
            
            # All permissions granted
            policy_result["allowed"] = True
            policy_result["reason"] = "All policy checks passed"
            policy_result["granted_permissions"] = permissions
            
            logger.info(f"✅ Dynamic policy check passed: {tool_name} for {agent_component_id}")
            
        except Exception as e:
            logger.error(f"Error in dynamic policy evaluation: {e}")
            policy_result["reason"] = f"Policy evaluation error: {str(e)}"
        
        return policy_result

7. Environment Configuration

Production Environment Setup

# Langflow with AZTP Security Configuration
# Save as .env or export in your shell
 
# AZTP Service Configuration
AZTP_BASE_URL=https://your-aztp-server.com
# Organization API key provided by Astha Console
# AZTP_API_KEY will be set by organization context
 
# AZTP Policy Configuration
AZTP_POLICY_ENABLED=true
AZTP_POLICY_STRICT_MODE=false
AZTP_POLICY_DEFAULT_ACTION=allow
AZTP_POLICY_LOG_LEVEL=info
 
# User Context for OIAP
USERNAME=your-username
 
# Langflow Configuration
LANGFLOW_AUTO_LOGIN=false
LANGFLOW_SUPERUSER=admin
LANGFLOW_SUPERUSER_PASSWORD=your-secure-password

Development Environment

# Development Environment with Relaxed Security
AZTP_POLICY_ENABLED=false
AZTP_POLICY_STRICT_MODE=false
AZTP_POLICY_DEFAULT_ACTION=allow
 
# Enable debug logging
AZTP_POLICY_LOG_LEVEL=debug
LANGFLOW_LOG_LEVEL=DEBUG

8. Complete Workflow Example

Secure AI Assistant Flow

async def create_secure_ai_assistant():
    """Create a complete secure AI assistant workflow."""
    
    # 1. Initialize services
    from langflow.services.identity import get_identity_service
    identity_service = get_identity_service()
    
    organization_api_key = get_current_organization_api_key()
    
    # 2. Create AZTP flow for the workflow
    flow_data = await identity_service.create_flow_identity(
        flow_name="secure-ai-assistant",
        description="Enterprise AI assistant with SAFE-MCP security",
        flow_id="langflow-uuid-123",
        folder_id="folder-uuid-456",
        organization_api_key=organization_api_key
    )
    
    # 3. Create component identities
    components = [
        {"id": "security-001", "name": "SecurityDetection", "type": "security"},
        {"id": "agent-001", "name": "ChatAgent", "type": "agent"},
        {"id": "mcp-001", "name": "MCPTools", "type": "tools"},
        {"id": "output-001", "name": "ChatOutput", "type": "output"}
    ]
    
    component_agents = {}
    for component in components:
        # Create secure component identity
        agent = await identity_service.secure_connect_component(
            component_id=component["id"],
            component_name=component["name"],
            organization_api_key=organization_api_key
        )
        
        if agent:
            component_agents[component["id"]] = agent
            
            # Add component to flow
            await identity_service.add_component_to_flow(
                flow_data["_id"],
                agent.aztp_identity_id,
                organization_api_key=organization_api_key
            )
            
            logger.info(f"✅ Component added to flow: {component['name']}")
    
    # 4. Link components for secure data flow
    await identity_service.link_component_identities(
        "security-001", "agent-001", 
        "input_validation",
        organization_api_key
    )
    
    await identity_service.link_component_identities(
        "agent-001", "mcp-001",
        "tool_access", 
        organization_api_key
    )
    
    await identity_service.link_component_identities(
        "mcp-001", "output-001",
        "response_output",
        organization_api_key
    )
    
    logger.info("🔐 Secure AI assistant workflow created successfully")
    return flow_data, component_agents
 
# Execute the workflow
async def main():
    flow, components = await create_secure_ai_assistant()
    print(f"Secure workflow created with AZTP Flow ID: {flow['_id']}")
    print(f"Components secured: {len(components)}")
 
if __name__ == "__main__":
    asyncio.run(main())

9. Security Monitoring

Real-Time Security Dashboard

class LangflowSecurityMonitor:
    """Monitor security events in Langflow workflows."""
    
    def __init__(self, organization_api_key: str):
        self.client = Aztp(api_key=organization_api_key)
        self.security_events = []
    
    async def monitor_component_security(self, component_id: str):
        """Monitor security events for a specific component."""
        from langflow.services.identity import get_identity_service
        identity_service = get_identity_service()
        
        # Get component agent
        agent = identity_service.get_agent(component_id, self.client.api_key)
        if not agent:
            logger.warning(f"No agent found for component: {component_id}")
            return
        
        # Verify identity status
        is_valid = await self.client.verify_identity_by_aztp_id(agent.aztp_identity_id)
        
        # Log security event
        security_event = {
            "timestamp": datetime.utcnow().isoformat(),
            "component_id": component_id,
            "aztp_id": agent.aztp_identity_id,
            "identity_valid": is_valid,
            "identity_status": agent.aztp_identity_status,
            "event_type": "identity_verification"
        }
        
        self.security_events.append(security_event)
        
        if not is_valid:
            logger.error(f"🚨 Security Alert: Invalid identity for component {component_id}")
        else:
            logger.info(f"✅ Security check passed for component {component_id}")
        
        return security_event
    
    async def generate_security_report(self) -> dict:
        """Generate comprehensive security report."""
        total_events = len(self.security_events)
        valid_identities = sum(1 for event in self.security_events if event["identity_valid"])
        
        return {
            "report_generated": datetime.utcnow().isoformat(),
            "total_security_events": total_events,
            "valid_identities": valid_identities,
            "invalid_identities": total_events - valid_identities,
            "security_score": (valid_identities / total_events * 100) if total_events > 0 else 100,
            "events": self.security_events
        }
 
# Usage in Langflow
monitor = LangflowSecurityMonitor("org-key-123")
 
# Monitor all components in a flow
async def monitor_flow_security(flow_components: list[str]):
    for component_id in flow_components:
        await monitor.monitor_component_security(component_id)
    
    # Generate security report
    report = await monitor.generate_security_report()
    logger.info(f"📊 Security Report: {report['security_score']:.1f}% secure")

10. Testing and Validation

AZTP Integration Tests

import pytest
from langflow.services.identity import get_identity_service
 
class TestLangflowAZTPIntegration:
    """Test AZTP integration in Langflow."""
    
    @pytest.fixture
    async def identity_service(self):
        """Get identity service for testing."""
        return get_identity_service()
    
    @pytest.mark.asyncio
    async def test_component_identity_creation(self, identity_service):
        """Test component identity creation."""
        # Create component identity
        agent = await identity_service.secure_connect_component(
            component_id="test-component-001",
            component_name="TestChatComponent",
            organization_api_key="test-org-key"
        )
        
        assert agent is not None
        assert hasattr(agent, "aztp_identity_id")
        assert agent.aztp_identity_id.startswith("aztp://")
        assert agent.aztp_identity_status == "active"
    
    @pytest.mark.asyncio
    async def test_oiap_policy_evaluation(self, identity_service):
        """Test OIAP policy evaluation for tool access."""
        # Test allowed tool
        is_allowed = await identity_service.check_oiap_policy(
            tool_name="get_current_date",
            component_name="TestAgent"
        )
        assert is_allowed is True
        
        # Test with policy evaluation
        # This would require proper OIAP setup in test environment
        # Result depends on configured policies
    
    @pytest.mark.asyncio
    async def test_flow_identity_management(self, identity_service):
        """Test flow identity creation and management."""
        # Create flow identity
        flow_data = await identity_service.create_flow_identity(
            flow_name="test-secure-flow",
            description="Test flow for security validation",
            flow_id="test-flow-uuid",
            folder_id="test-folder-uuid",
            organization_api_key="test-org-key"
        )
        
        assert flow_data is not None
        assert "_id" in flow_data
        assert flow_data["name"] == "test-secure-flow"

Best Practices

1. Security Configuration

# Always use organization-specific API keys
organization_api_key = get_current_organization_api_key()
client = Aztp(api_key=organization_api_key)
 
# Enable policy enforcement in production
os.environ["AZTP_POLICY_ENABLED"] = "true"
 
# Use strict mode for high-security environments
os.environ["AZTP_POLICY_STRICT_MODE"] = "true"

2. Error Handling

# Robust error handling for production
try:
    agent = await client.secure_connect({}, component_name)
except ValueError as e:
    logger.error(f"Validation error: {e}")
    # Handle gracefully - maybe use fallback identity
except ConnectionError as e:
    logger.error(f"Connection error: {e}")
    # Retry with exponential backoff
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    # Log and alert operations team

3. Performance Optimization

# Cache identities to avoid repeated API calls
@functools.lru_cache(maxsize=128)
async def get_cached_identity(component_name: str):
    return await client.get_identity_by_name(component_name)
 
# Use async batch operations for multiple components
async def batch_create_identities(component_names: list[str]):
    tasks = [
        client.secure_connect({}, name, config={"isGlobalIdentity": False})
        for name in component_names
    ]
    return await asyncio.gather(*tasks, return_exceptions=True)

Next Steps