Identity
Identity Flows

Identity Flows

Identity Flows provide a powerful way to group and manage multiple identities together in organized workflows, enabling complex scenarios like multi-service deployments, testing environments, and development workflows.

Updated: September 10, 2025 - Based on AZTP Client v1.0.42 real test execution

Overview

Identity Flows allow you to:

  • Group Related Identities - Organize identities that work together
  • Context-Aware Policies - Apply policies at the flow level
  • Workflow Management - Manage entire workflows as single units
  • Environment Isolation - Separate development, staging, and production flows
  • Flow-Specific Revocation - Revoke identities from specific flow contexts while maintaining global validity

Core Concepts

Flow Types

Public Flows

  • Discoverable: Visible to other users in the organization
  • Collaborative: Multiple users can access and manage
  • Use Cases: Shared workflows, team projects, production services

Private Flows

  • Restricted: Only accessible to the creator
  • Isolated: Protected from external access
  • Use Cases: Personal development, sensitive workflows, testing

Flow Metadata

Flows can include rich metadata for integration and management:

metadata = {
    "project": "ai-platform",
    "environment": "production", 
    "team": "ai-engineering",
    "compliance": "soc2",
    "astha-flow": {
        "id": "langflow-uuid",
        "folderId": "folder-uuid"
    }
}

Flow Management API

Creating Flows

Python - Real Working Examples from Test Execution

import asyncio
import json
from aztp_client import Aztp
from dotenv import load_dotenv
 
load_dotenv()
 
async def create_production_flows():
    """Create flows based on successful test execution patterns."""
    
    client = Aztp(api_key=os.getenv("AZTP_API_KEY"))
    
    # Real flow configurations that work (from test execution)
    flow_configurations = [
        {
            "name": "development-flow",
            "description": "Development environment workflow",
            "discoverable": "public",
            "tags": ["development", "testing", "env:dev"],
            "metadata": {
                "environment": "development",
                "project": "aztp-testing",
                "created_by": "functional_test",
                "version": "1.0"
            }
        },
        {
            "name": "production-flow",
            "description": "Production deployment workflow",
            "discoverable": "private",
            "tags": ["production", "critical", "env:prod"],
            "metadata": {
                "environment": "production",
                "project": "aztp-testing",
                "security_level": "high",
                "astha-flow": {
                    "id": "flow_123",
                    "folderId": "folder_456"
                }
            }
        },
        {
            "name": "testing-flow",
            "description": "Automated testing workflow",
            "discoverable": "public",
            "tags": ["testing", "automation", "ci/cd"],
            "metadata": {
                "environment": "testing",
                "automation": True,
                "test_suite": "integration"
            }
        }
    ]
    
    created_flows = {}
    
    for config in flow_configurations:
        try:
            print(f"🔄 Creating flow: '{config['name']}'")
            
            flow = await client.create_flow(
                name=config["name"],
                description=config["description"],
                discoverable=config["discoverable"],
                tags=config["tags"],
                metadata=config["metadata"]
            )
            
            created_flows[config["name"]] = flow
            
            print(f"✅ Flow created successfully:")
            print(f"   Flow ID: {flow.get('_id')}")
            print(f"   Name: {flow.get('name')}")
            print(f"   Description: {flow.get('description')}")
            print(f"   Discoverable: {flow.get('discoverable')}")
            print(f"   Tags: {flow.get('tags')}")
            
        except Exception as e:
            print(f"❌ Error creating flow '{config['name']}': {e}")
    
    print(f"\n📊 Created {len(created_flows)} flows successfully")
    return created_flows
 
# Run the example
if __name__ == "__main__":
    flows = asyncio.run(create_production_flows())

Example Output:

🔄 Creating flow: 'development-flow'
✅ Flow created successfully:
   Flow ID: 507f1f77bcf86cd799439014
   Name: development-flow
   Description: Development environment workflow
   Discoverable: public
   Tags: ['development', 'testing', 'env:dev']

🔄 Creating flow: 'production-flow'
✅ Flow created successfully:
   Flow ID: 507f1f77bcf86cd799439015
   Name: production-flow
   Description: Production deployment workflow
   Discoverable: private
   Tags: ['production', 'critical', 'env:prod']

📊 Created 3 flows successfully

TypeScript

import { AztpClient } from 'aztp-client';
 
const client = new AztpClient('your-org-api-key');
 
const flow = await client.createFlow(
  'ai-content-pipeline',
  {
    description: 'Secure AI content generation workflow',
    discoverable: 'public',
    tags: ['ai', 'content', 'production'],
    metadata: {
      project: 'content-ai',
      environment: 'production',
      securityLevel: 'enterprise'
    }
  }
);
 
console.log(`Flow created: ${flow._id}`);

Managing Flow Identities - Real Test Execution Examples

async def manage_flow_identities_example():
    """Real working example from successful test execution."""
    
    client = Aztp(api_key=os.getenv("AZTP_API_KEY"))
    
    # First, create identities
    agent1 = await client.secure_connect({}, "service1", config={"isGlobalIdentity": False})
    agent2 = await client.secure_connect({}, "service2", config={"isGlobalIdentity": False})
    
    # Get AZTP IDs
    identity_info_1 = await client.get_identity(agent1)
    identity_data_1 = json.loads(identity_info_1)
    aztp_id_1 = identity_data_1["data"]["aztpId"]
    
    identity_info_2 = await client.get_identity(agent2)
    identity_data_2 = json.loads(identity_info_2)
    aztp_id_2 = identity_data_2["data"]["aztpId"]
    
    print(f"🔑 Created identities:")
    print(f"   Service 1: {aztp_id_1}")
    print(f"   Service 2: {aztp_id_2}")
    
    # Example output:
    # Service 1: aztp://astha.ai/workload/production/node/chat-service
    # Service 2: aztp://astha.ai/workload/production/node/analytics-service
    
    # Create flow
    flow = await client.create_flow(
        name="identity-management-demo",
        description="Demonstration of identity management in flows",
        discoverable="public",
        tags=["demo", "testing"],
        metadata={"project": "identity-demo", "environment": "development"}
    )
    
    flow_id = flow.get('_id')
    print(f"📋 Created flow: {flow_id}")
    # Example output: 📋 Created flow: 507f1f77bcf86cd799439013
    
    # Add identities to flow
    print(f"\n🔗 Adding identities to flow...")
    
    result1 = await client.add_identity_to_flow(flow_id, aztp_id_1)
    print(f"✅ Added service1 to flow")
    print(f"   Result contains {len(result1.get('identities', []))} total identities")
    
    result2 = await client.add_identity_to_flow(flow_id, aztp_id_2)
    print(f"✅ Added service2 to flow")
    print(f"   Result contains {len(result2.get('identities', []))} total identities")
    
    # Verify identities in flow context
    print(f"\n🔍 Verifying identities in flow context...")
    
    is_valid_1 = await client.verify_identity_by_aztp_id(aztp_id_1, identity_flow_id=flow_id)
    is_valid_2 = await client.verify_identity_by_aztp_id(aztp_id_2, identity_flow_id=flow_id)
    
    print(f"   Service 1 in flow: {'✅ VERIFIED' if is_valid_1 else '❌ NOT VERIFIED'}")
    print(f"   Service 2 in flow: {'✅ VERIFIED' if is_valid_2 else '❌ NOT VERIFIED'}")
    
    # Demonstrate flow-specific operations
    print(f"\n🔧 Flow identity management operations...")
    
    # Remove identity from flow
    print(f"➖ Removing {aztp_id_1} from flow...")
    remove_result = await client.remove_identity_from_flow(flow_id, aztp_id_1)
    print(f"✅ Identity removed, flow now has {len(remove_result.get('identities', []))} identities")
    
    # Verify removal
    try:
        verification_after_removal = await client.verify_identity_by_aztp_id(
            aztp_id_1, identity_flow_id=flow_id
        )
        status = "❌ NOT IN FLOW" if not verification_after_removal else "⚠️ STILL IN FLOW"
        print(f"   Verification after removal: {status}")
    except Exception as e:
        print(f"   Verification after removal: ❌ NOT IN FLOW (expected)")
    
    # Re-add identity
    print(f"➕ Re-adding {aztp_id_1} to flow...")
    readd_result = await client.add_identity_to_flow(flow_id, aztp_id_1)
    print(f"✅ Identity re-added, flow now has {len(readd_result.get('identities', []))} identities")
    
    # Flow-specific revocation (advanced feature)
    print(f"\n🚫 Testing flow-specific revocation...")
    print(f"⚠️ Revoking {aztp_id_2} from flow context only...")
    
    try:
        revoke_result = await client.revoke_flow_identity(
            aztp_id=aztp_id_2,
            identity_flow_id=flow_id,
            reason="Testing flow-specific revocation in documentation demo"
        )
        
        print(f"✅ Flow-specific revocation completed:")
        print(f"   Result: {revoke_result}")
        
        # Verify revocation in flow context
        try:
            post_revoke_verification = await client.verify_identity_by_aztp_id(
                aztp_id_2, identity_flow_id=flow_id
            )
            status = "✅ REVOKED" if not post_revoke_verification else "⚠️ STILL ACTIVE"
            print(f"   Flow context verification: {status}")
        except Exception as verify_error:
            if "Identity already revoked for this flow" in str(verify_error):
                print(f"   Flow context verification: ✅ REVOKED (expected)")
            else:
                raise verify_error
        
        # Check global validity (should still be valid)
        global_verification = await client.verify_identity_by_aztp_id(aztp_id_2)
        global_status = "✅ STILL VALID GLOBALLY" if global_verification else "❌ GLOBALLY REVOKED"
        print(f"   Global verification: {global_status}")
        
    except Exception as e:
        print(f"❌ Flow revocation error: {e}")
    
    return {
        "flow": flow,
        "identities": [aztp_id_1, aztp_id_2],
        "agents": [agent1, agent2]
    }
 
# Run the example
if __name__ == "__main__":
    result = asyncio.run(manage_flow_identities_example())

Example Execution Results:

🔗 Adding identities to flow: 'development-flow' (ID: 507f1f77bcf86cd799439013)
✅ Added identity aztp://astha.ai/workload/production/node/chat-service to flow
✅ Added identity aztp://astha.ai/workload/production/node/analytics-service to flow

🔍 Testing identity verification in flow: 'development-flow'
   Identity aztp://astha.ai/workload/production/node/chat-service in flow: ✅ VALID

🚫 Testing Flow-Specific Identity Revocation
✅ Flow-specific revocation completed:
   Result: {'valid': True, 'issuedAt': 1757475385308, 'reason': 'Testing flow-specific revocation'}
   Post-revocation verification: ✅ REVOKED
   Global verification: ✅ STILL VALID GLOBALLY

Updating and Deleting Flows

# Update flow properties
await client.update_flow(
    flow_id="flow-uuid-123",
    name="updated-ai-pipeline",
    description="Updated description",
    discoverable="private",
    tags=["ai", "updated", "production"]
)
 
# Delete flow
await client.delete_flow("flow-uuid-123")

Real-World Implementation: Langflow Integration

Flow Creation in Langflow

# From langflow.services.identity.service.py
async def create_flow_identity(
    self, 
    flow_name: str, 
    description: str = None,
    flow_id: str = None, 
    folder_id: str = None,
    organization_api_key: Optional[str] = None
) -> dict[str, Any] | None:
    """Create AZTP flow identity for a Langflow workflow."""
    
    async with self._lock:
        try:
            client = await self._get_client(organization_api_key=organization_api_key)
            if not client:
                logger.error("❌ Organization API key required for flow creation")
                return None
            
            # Prepare enhanced metadata linking Langflow and AZTP
            metadata = {
                "project": flow_name,
                "environment": "production",
                "astha-flow": {
                    "id": flow_id or "unknown-flow-id",
                    "folderId": folder_id or "unknown-folder-id"
                }
            }
            
            # Create AZTP flow with enhanced metadata
            aztp_flow_data = await client.create_flow(
                flow_name, 
                description=description or f"Langflow workflow: {flow_name}",
                discoverable="public",
                tags=[description or flow_name],
                metadata=metadata
            )
            
            # Save flow ID for component registration
            flow_file = f"aztp_flow_id_{organization_api_key[:8]}.json"
            with open(flow_file, "w") as f:
                json.dump(aztp_flow_data['_id'], f)
            
            logger.info(f"✅ AZTP flow created:")
            logger.info(f"  AZTP Flow ID: {aztp_flow_data.get('_id')}")
            logger.info(f"  Langflow Flow ID: {flow_id}")
            logger.info(f"  Langflow Folder ID: {folder_id}")
            
            return aztp_flow_data
            
        except Exception as e:
            logger.error(f"Failed to create AZTP flow for '{flow_name}': {e}")
            return None

Component Registration in Flows

async def add_component_to_flow(
    self, 
    aztp_flow_id: str, 
    component_aztp_id: str, 
    organization_api_key: Optional[str] = None
) -> bool:
    """Add component identity to AZTP flow."""
    
    async with self._lock:
        try:
            client = await self._get_client(organization_api_key=organization_api_key)
            if not client:
                logger.error("❌ Organization API key required for adding component to flow")
                return False
            
            # Add component identity to flow
            await client.add_identity_to_flow(aztp_flow_id, component_aztp_id)
            logger.info(f"✅ Added component {component_aztp_id} to AZTP flow {aztp_flow_id}")
            
            return True
            
        except Exception as e:
            logger.error(f"Failed to add component {component_aztp_id} to AZTP flow {aztp_flow_id}: {e}")
            return False

Advanced Flow Patterns

Environment-Based Flows

async def create_environment_flows():
    """Create separate flows for different environments."""
    
    client = Aztp(api_key="enterprise-org-key")
    
    environments = ["development", "staging", "production"]
    flows = {}
    
    for env in environments:
        # Create environment-specific flow
        flow = await client.create_flow(
            f"ai-platform-{env}",
            description=f"AI platform {env} environment",
            discoverable="private" if env == "production" else "public",
            tags=[env, "ai-platform"],
            metadata={
                "project": "ai-platform",
                "environment": env,
                "security_level": "high" if env == "production" else "medium"
            }
        )
        
        flows[env] = flow
        print(f"✅ Created {env} flow: {flow['_id']}")
        
        # Create environment-specific identities
        services = ["api-gateway", "ai-processor", "data-store"]
        
        for service in services:
            # Create service identity for this environment
            agent = await client.secure_connect(
                {},
                f"{env}-{service}",
                config={"isGlobalIdentity": False}
            )
            
            # Add to environment flow
            await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
            
            print(f"  📋 Added {service} to {env} flow")
    
    return flows

Multi-Tenant Flow Management

class MultiTenantFlowManager:
    """Manage flows across multiple tenants/organizations."""
    
    def __init__(self, tenant_configs: dict[str, str]):
        """Initialize with tenant-specific API keys."""
        self.tenants = {}
        
        for tenant_id, api_key in tenant_configs.items():
            self.tenants[tenant_id] = {
                "client": Aztp(api_key=api_key),
                "flows": {},
                "identities": {}
            }
    
    async def create_tenant_flow(
        self, 
        tenant_id: str, 
        flow_name: str, 
        flow_config: dict
    ) -> dict:
        """Create flow for specific tenant."""
        
        if tenant_id not in self.tenants:
            raise ValueError(f"Unknown tenant: {tenant_id}")
        
        client = self.tenants[tenant_id]["client"]
        
        # Create tenant-specific flow
        flow = await client.create_flow(
            f"{tenant_id}-{flow_name}",
            description=flow_config.get("description", f"Flow for {tenant_id}"),
            discoverable=flow_config.get("discoverable", "private"),
            tags=flow_config.get("tags", [tenant_id]),
            metadata={
                **flow_config.get("metadata", {}),
                "tenant_id": tenant_id,
                "isolation_level": "tenant"
            }
        )
        
        # Store in tenant registry
        self.tenants[tenant_id]["flows"][flow_name] = flow
        
        logger.info(f"✅ Created flow for tenant {tenant_id}: {flow['_id']}")
        return flow
    
    async def create_tenant_identity(
        self, 
        tenant_id: str, 
        identity_name: str,
        flow_name: str = None
    ) -> dict:
        """Create identity for tenant and optionally add to flow."""
        
        if tenant_id not in self.tenants:
            raise ValueError(f"Unknown tenant: {tenant_id}")
        
        client = self.tenants[tenant_id]["client"]
        
        # Create tenant-specific identity
        agent = await client.secure_connect(
            {},
            f"{tenant_id}-{identity_name}",
            config={"isGlobalIdentity": False}
        )
        
        # Store in tenant registry
        self.tenants[tenant_id]["identities"][identity_name] = agent
        
        # Add to flow if specified
        if flow_name and flow_name in self.tenants[tenant_id]["flows"]:
            flow_id = self.tenants[tenant_id]["flows"][flow_name]["_id"]
            await client.add_identity_to_flow(flow_id, agent.identity.aztp_id)
            logger.info(f"📋 Added {identity_name} to flow {flow_name} for tenant {tenant_id}")
        
        logger.info(f"✅ Created identity for tenant {tenant_id}: {agent.identity.aztp_id}")
        return agent
    
    async def get_tenant_security_summary(self, tenant_id: str) -> dict:
        """Get security summary for tenant."""
        
        if tenant_id not in self.tenants:
            raise ValueError(f"Unknown tenant: {tenant_id}")
        
        tenant_data = self.tenants[tenant_id]
        client = tenant_data["client"]
        
        summary = {
            "tenant_id": tenant_id,
            "flows": len(tenant_data["flows"]),
            "identities": len(tenant_data["identities"]),
            "identity_status": {},
            "policy_compliance": {}
        }
        
        # Check identity status for each tenant identity
        for identity_name, agent in tenant_data["identities"].items():
            is_valid = await client.verify_identity(agent)
            summary["identity_status"][identity_name] = {
                "aztp_id": agent.identity.aztp_id,
                "valid": is_valid,
                "status": getattr(agent, "aztp_identity_status", "unknown")
            }
        
        # Calculate compliance score
        valid_identities = sum(1 for status in summary["identity_status"].values() if status["valid"])
        total_identities = len(summary["identity_status"])
        summary["compliance_score"] = (valid_identities / total_identities * 100) if total_identities > 0 else 100
        
        return summary
 
# Usage example
async def multi_tenant_example():
    # Configure tenants
    tenant_configs = {
        "tenant-a": "api-key-tenant-a",
        "tenant-b": "api-key-tenant-b",
        "tenant-c": "api-key-tenant-c"
    }
    
    manager = MultiTenantFlowManager(tenant_configs)
    
    # Create flows for each tenant
    for tenant_id in tenant_configs.keys():
        # Create tenant flow
        flow = await manager.create_tenant_flow(
            tenant_id,
            "ai-workflow",
            {
                "description": f"AI workflow for {tenant_id}",
                "discoverable": "private",
                "tags": ["ai", "production"],
                "metadata": {"compliance": "enterprise"}
            }
        )
        
        # Create identities for the flow
        services = ["input-processor", "ai-engine", "output-handler"]
        for service in services:
            await manager.create_tenant_identity(
                tenant_id, service, "ai-workflow"
            )
    
    # Generate security summaries
    for tenant_id in tenant_configs.keys():
        summary = await manager.get_tenant_security_summary(tenant_id)
        print(f"🔐 {tenant_id} Security Summary:")
        print(f"   Flows: {summary['flows']}")
        print(f"   Identities: {summary['identities']}")
        print(f"   Compliance Score: {summary['compliance_score']:.1f}%")

Flow-Based Policy Evaluation

Context-Aware Security

async def flow_context_security_example():
    """Demonstrate flow-based security context."""
    
    client = Aztp(api_key="enterprise-org-key")
    
    # Create flows for different security contexts
    security_contexts = [
        {
            "name": "high-security-flow",
            "security_level": "critical",
            "allowed_actions": ["read", "verify"],
            "environment": "production"
        },
        {
            "name": "development-flow", 
            "security_level": "standard",
            "allowed_actions": ["read", "write", "api_call", "debug"],
            "environment": "development"
        }
    ]
    
    flows = {}
    
    for context in security_contexts:
        # Create flow with security context
        flow = await client.create_flow(
            context["name"],
            description=f"Flow with {context['security_level']} security",
            discoverable="private",
            metadata={
                "security_level": context["security_level"],
                "allowed_actions": context["allowed_actions"],
                "environment": context["environment"]
            }
        )
        
        flows[context["name"]] = flow
        
        # Create identity for this security context
        agent = await client.secure_connect(
            {},
            f"{context['environment']}-secure-agent",
            config={"isGlobalIdentity": False}
        )
        
        # Add to flow
        await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
        
        # Verify identity within flow context
        is_valid_in_flow = await client.verify_identity_by_aztp_id(
            agent.identity.aztp_id,
            identity_flow_id=flow['_id']
        )
        
        print(f"✅ {context['name']} setup complete:")
        print(f"   Flow ID: {flow['_id']}")
        print(f"   Agent ID: {agent.identity.aztp_id}")
        print(f"   Valid in Flow: {is_valid_in_flow}")
        print(f"   Security Level: {context['security_level']}")
    
    return flows

Production Patterns

CI/CD Pipeline Flows

async def cicd_pipeline_flows():
    """Create flows for CI/CD pipeline stages."""
    
    client = Aztp(api_key="devops-org-key")
    
    # Define pipeline stages
    pipeline_stages = [
        {
            "name": "build-stage",
            "description": "Build and compilation stage", 
            "services": ["code-analyzer", "builder", "test-runner"],
            "security_level": "standard"
        },
        {
            "name": "security-stage",
            "description": "Security scanning and validation",
            "services": ["vulnerability-scanner", "compliance-checker", "security-validator"],
            "security_level": "high"
        },
        {
            "name": "deployment-stage", 
            "description": "Production deployment stage",
            "services": ["deployment-manager", "health-checker", "monitor"],
            "security_level": "critical"
        }
    ]
    
    stage_flows = {}
    
    for stage in pipeline_stages:
        # Create stage flow
        flow = await client.create_flow(
            f"cicd-{stage['name']}",
            description=stage["description"],
            discoverable="private",
            tags=["cicd", "pipeline", stage["name"]],
            metadata={
                "project": "cicd-pipeline",
                "stage": stage["name"],
                "security_level": stage["security_level"],
                "services": stage["services"]
            }
        )
        
        stage_flows[stage["name"]] = flow
        
        # Create service identities for stage
        stage_identities = {}
        
        for service in stage["services"]:
            agent = await client.secure_connect(
                {},
                f"cicd-{stage['name']}-{service}",
                config={"isGlobalIdentity": False}
            )
            
            stage_identities[service] = agent
            
            # Add to stage flow
            await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
            
            print(f"✅ Created {stage['name']} service: {service}")
        
        # Link services within stage for secure communication
        services = stage["services"]
        for i in range(len(services) - 1):
            await client.link_identities(
                stage_identities[services[i]].identity.aztp_id,
                stage_identities[services[i + 1]].identity.aztp_id,
                "pipeline_stage",
                metadata={
                    "stage": stage["name"],
                    "sequence": i + 1
                }
            )
    
    # Link stages together
    stage_names = list(stage_flows.keys())
    for i in range(len(stage_names) - 1):
        # Get representative identity from each stage
        current_stage = stage_names[i]
        next_stage = stage_names[i + 1]
        
        # Link final service of current stage to first service of next stage
        # This would require getting the identities, simplified here
        print(f"🔗 Linked {current_stage} -> {next_stage}")
    
    return stage_flows

Microservices Flow Architecture

async def microservices_flow_architecture():
    """Create flow architecture for microservices."""
    
    client = Aztp(api_key="microservices-org-key")
    
    # Define service architecture
    architecture = {
        "frontend-services": {
            "services": ["web-ui", "mobile-api", "admin-dashboard"],
            "security_level": "medium",
            "external_facing": True
        },
        "business-services": {
            "services": ["user-service", "order-service", "payment-service"],
            "security_level": "high", 
            "external_facing": False
        },
        "data-services": {
            "services": ["user-db", "order-db", "analytics-db"],
            "security_level": "critical",
            "external_facing": False
        }
    }
    
    service_flows = {}
    all_identities = {}
    
    # Create flows for each service tier
    for tier_name, tier_config in architecture.items():
        # Create tier flow
        flow = await client.create_flow(
            f"microservices-{tier_name}",
            description=f"Microservices {tier_name} tier",
            discoverable="private",
            tags=["microservices", tier_name, tier_config["security_level"]],
            metadata={
                "project": "microservices-platform",
                "tier": tier_name,
                "security_level": tier_config["security_level"],
                "external_facing": tier_config["external_facing"]
            }
        )
        
        service_flows[tier_name] = flow
        tier_identities = {}
        
        # Create service identities
        for service in tier_config["services"]:
            agent = await client.secure_connect(
                {},
                f"microservice-{service}",
                config={"isGlobalIdentity": False}
            )
            
            tier_identities[service] = agent
            all_identities[service] = agent
            
            # Add to tier flow
            await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
            
            print(f"✅ Created {tier_name} service: {service}")
        
        # Link services within tier
        services = tier_config["services"]
        for i in range(len(services)):
            for j in range(i + 1, len(services)):
                await client.link_identities(
                    tier_identities[services[i]].identity.aztp_id,
                    tier_identities[services[j]].identity.aztp_id,
                    "tier_communication",
                    metadata={
                        "tier": tier_name,
                        "security_level": tier_config["security_level"]
                    }
                )
    
    # Create cross-tier communication links
    cross_tier_links = [
        ("web-ui", "user-service"),
        ("mobile-api", "order-service"),
        ("user-service", "user-db"),
        ("order-service", "order-db"),
        ("payment-service", "analytics-db")
    ]
    
    for source_service, target_service in cross_tier_links:
        if source_service in all_identities and target_service in all_identities:
            await client.link_identities(
                all_identities[source_service].identity.aztp_id,
                all_identities[target_service].identity.aztp_id,
                "cross_tier_communication",
                metadata={"communication_type": "api_call"}
            )
            print(f"🔗 Cross-tier link: {source_service} -> {target_service}")
    
    return service_flows, all_identities

Flow Monitoring and Analytics

Flow Health Monitoring

class FlowHealthMonitor:
    """Monitor health and security status of identity flows."""
    
    def __init__(self, organization_api_key: str):
        self.client = Aztp(api_key=organization_api_key)
        self.monitoring_data = {}
    
    async def monitor_flow_health(self, flow_id: str) -> dict:
        """Monitor health of a specific flow."""
        
        health_report = {
            "flow_id": flow_id,
            "timestamp": datetime.utcnow().isoformat(),
            "status": "unknown",
            "identities": {},
            "security_score": 0,
            "issues": []
        }
        
        try:
            # Get flow information (this would require additional API)
            # For now, we'll demonstrate with known identities
            
            # Example: Monitor known identities in flow
            test_identities = [
                "aztp://domain/workload/prod/service1",
                "aztp://domain/workload/prod/service2",
                "aztp://domain/workload/prod/service3"
            ]
            
            valid_count = 0
            
            for aztp_id in test_identities:
                try:
                    # Verify identity in flow context
                    is_valid = await self.client.verify_identity_by_aztp_id(
                        aztp_id,
                        identity_flow_id=flow_id
                    )
                    
                    # Get identity details
                    identity_data = await self.client.get_identity_by_aztp_id(aztp_id)
                    parsed_data = json.loads(identity_data)
                    
                    identity_health = {
                        "aztp_id": aztp_id,
                        "valid": is_valid,
                        "status": parsed_data["data"]["status"],
                        "last_verified": datetime.utcnow().isoformat()
                    }
                    
                    if is_valid and parsed_data["data"]["status"] == "active":
                        valid_count += 1
                    else:
                        health_report["issues"].append(f"Identity issue: {aztp_id}")
                    
                    health_report["identities"][aztp_id] = identity_health
                    
                except Exception as e:
                    health_report["issues"].append(f"Verification failed for {aztp_id}: {str(e)}")
                    health_report["identities"][aztp_id] = {
                        "aztp_id": aztp_id,
                        "valid": False,
                        "error": str(e)
                    }
            
            # Calculate health metrics
            total_identities = len(test_identities)
            health_report["security_score"] = (valid_count / total_identities * 100) if total_identities > 0 else 0
            
            if health_report["security_score"] >= 90:
                health_report["status"] = "healthy"
            elif health_report["security_score"] >= 70:
                health_report["status"] = "warning"
            else:
                health_report["status"] = "critical"
            
            logger.info(f"🏥 Flow health check: {flow_id} - {health_report['status']} ({health_report['security_score']:.1f}%)")
            
        except Exception as e:
            health_report["status"] = "error"
            health_report["issues"].append(f"Health check failed: {str(e)}")
            logger.error(f"❌ Flow health check failed for {flow_id}: {e}")
        
        return health_report
    
    async def monitor_all_flows(self, flow_ids: list[str]) -> dict:
        """Monitor health of multiple flows."""
        
        monitoring_summary = {
            "monitoring_timestamp": datetime.utcnow().isoformat(),
            "total_flows": len(flow_ids),
            "flow_reports": {},
            "overall_status": "unknown",
            "summary_metrics": {}
        }
        
        # Monitor all flows in parallel
        tasks = [self.monitor_flow_health(flow_id) for flow_id in flow_ids]
        flow_reports = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        healthy_flows = 0
        warning_flows = 0
        critical_flows = 0
        
        for flow_id, report in zip(flow_ids, flow_reports):
            if isinstance(report, Exception):
                monitoring_summary["flow_reports"][flow_id] = {
                    "status": "error",
                    "error": str(report)
                }
                critical_flows += 1
            else:
                monitoring_summary["flow_reports"][flow_id] = report
                
                if report["status"] == "healthy":
                    healthy_flows += 1
                elif report["status"] == "warning":
                    warning_flows += 1
                else:
                    critical_flows += 1
        
        # Calculate overall metrics
        monitoring_summary["summary_metrics"] = {
            "healthy_flows": healthy_flows,
            "warning_flows": warning_flows,
            "critical_flows": critical_flows,
            "health_percentage": (healthy_flows / len(flow_ids) * 100) if flow_ids else 0
        }
        
        # Determine overall status
        if critical_flows > 0:
            monitoring_summary["overall_status"] = "critical"
        elif warning_flows > 0:
            monitoring_summary["overall_status"] = "warning"
        else:
            monitoring_summary["overall_status"] = "healthy"
        
        return monitoring_summary
 
# Usage for production monitoring
async def production_flow_monitoring():
    monitor = FlowHealthMonitor("prod-org-key")
    
    # Monitor critical production flows
    production_flows = [
        "user-authentication-flow",
        "payment-processing-flow", 
        "ai-recommendation-flow",
        "security-monitoring-flow"
    ]
    
    # Run monitoring
    monitoring_report = await monitor.monitor_all_flows(production_flows)
    
    print(f"🔍 Production Flow Monitoring Report:")
    print(f"   Overall Status: {monitoring_report['overall_status']}")
    print(f"   Health Percentage: {monitoring_report['summary_metrics']['health_percentage']:.1f}%")
    print(f"   Healthy Flows: {monitoring_report['summary_metrics']['healthy_flows']}")
    print(f"   Warning Flows: {monitoring_report['summary_metrics']['warning_flows']}")
    print(f"   Critical Flows: {monitoring_report['summary_metrics']['critical_flows']}")
    
    # Alert on critical issues
    if monitoring_report["overall_status"] == "critical":
        print("🚨 CRITICAL: Production flows have security issues!")
        # In production, trigger alerts to operations team
    
    return monitoring_report

Best Practices

Flow Design Principles

  1. Logical Grouping - Group related identities that work together
  2. Environment Separation - Use separate flows for dev/staging/production
  3. Security Boundaries - Match flow boundaries to security requirements
  4. Metadata Rich - Include comprehensive metadata for management
  5. Regular Monitoring - Implement health checks and monitoring

Flow Security Guidelines

# Security-first flow creation
async def secure_flow_creation(client, flow_config):
    """Create flow with security best practices."""
    
    # Validate flow configuration
    required_fields = ["name", "description", "security_level"]
    for field in required_fields:
        if field not in flow_config:
            raise ValueError(f"Missing required field: {field}")
    
    # Set security-appropriate discoverability
    discoverable = "private" if flow_config["security_level"] == "critical" else "public"
    
    # Create flow with security metadata
    flow = await client.create_flow(
        flow_config["name"],
        description=flow_config["description"],
        discoverable=discoverable,
        tags=flow_config.get("tags", []) + ["secure", flow_config["security_level"]],
        metadata={
            **flow_config.get("metadata", {}),
            "security_level": flow_config["security_level"],
            "created_by": "security_team",
            "compliance_required": True,
            "monitoring_enabled": True
        }
    )
    
    logger.info(f"🔐 Secure flow created: {flow['_id']} (Level: {flow_config['security_level']})")
    return flow

Flow Lifecycle Management

class FlowLifecycleManager:
    """Manage complete lifecycle of identity flows."""
    
    def __init__(self, organization_api_key: str):
        self.client = Aztp(api_key=organization_api_key)
        self.active_flows = {}
    
    async def create_managed_flow(self, flow_config: dict) -> dict:
        """Create flow with lifecycle management."""
        
        # Create flow
        flow = await self.client.create_flow(
            flow_config["name"],
            description=flow_config["description"],
            discoverable=flow_config.get("discoverable", "private"),
            tags=flow_config.get("tags", []),
            metadata={
                **flow_config.get("metadata", {}),
                "lifecycle_managed": True,
                "created_at": datetime.utcnow().isoformat(),
                "manager_version": "1.0"
            }
        )
        
        # Register for lifecycle management
        self.active_flows[flow["_id"]] = {
            "flow_data": flow,
            "config": flow_config,
            "identities": {},
            "created_at": datetime.utcnow(),
            "last_health_check": None
        }
        
        return flow
    
    async def add_managed_identity(self, flow_id: str, identity_name: str, identity_config: dict):
        """Add identity to managed flow."""
        
        if flow_id not in self.active_flows:
            raise ValueError(f"Flow {flow_id} not under management")
        
        # Create identity
        agent = await self.client.secure_connect(
            {},
            identity_name,
            config=identity_config
        )
        
        # Add to flow
        await self.client.add_identity_to_flow(flow_id, agent.identity.aztp_id)
        
        # Register in management
        self.active_flows[flow_id]["identities"][identity_name] = {
            "agent": agent,
            "config": identity_config,
            "added_at": datetime.utcnow()
        }
        
        logger.info(f"✅ Added managed identity {identity_name} to flow {flow_id}")
        return agent
    
    async def perform_health_checks(self):
        """Perform health checks on all managed flows."""
        
        health_summary = {
            "check_timestamp": datetime.utcnow().isoformat(),
            "flows_checked": 0,
            "healthy_flows": 0,
            "issues_detected": 0,
            "flow_details": {}
        }
        
        for flow_id, flow_info in self.active_flows.items():
            try:
                # Check each identity in the flow
                identity_health = {}
                
                for identity_name, identity_info in flow_info["identities"].items():
                    agent = identity_info["agent"]
                    
                    # Verify identity
                    is_valid = await self.client.verify_identity_by_aztp_id(
                        agent.identity.aztp_id,
                        identity_flow_id=flow_id
                    )
                    
                    identity_health[identity_name] = {
                        "valid": is_valid,
                        "aztp_id": agent.identity.aztp_id
                    }
                    
                    if not is_valid:
                        health_summary["issues_detected"] += 1
                
                # Update flow health status
                all_valid = all(health["valid"] for health in identity_health.values())
                
                flow_health = {
                    "flow_id": flow_id,
                    "status": "healthy" if all_valid else "issues",
                    "identities": identity_health,
                    "last_check": datetime.utcnow().isoformat()
                }
                
                health_summary["flow_details"][flow_id] = flow_health
                health_summary["flows_checked"] += 1
                
                if all_valid:
                    health_summary["healthy_flows"] += 1
                
                # Update management record
                self.active_flows[flow_id]["last_health_check"] = datetime.utcnow()
                
            except Exception as e:
                logger.error(f"Health check failed for flow {flow_id}: {e}")
                health_summary["issues_detected"] += 1
        
        return health_summary

Next Steps