Identity Flows
Identity Flows provide a powerful way to group and manage multiple identities together in organized workflows, enabling complex scenarios like multi-service deployments, testing environments, and development workflows.
Updated: September 10, 2025 - Based on AZTP Client v1.0.42 real test execution
Overview
Identity Flows allow you to:
- Group Related Identities - Organize identities that work together
- Context-Aware Policies - Apply policies at the flow level
- Workflow Management - Manage entire workflows as single units
- Environment Isolation - Separate development, staging, and production flows
- Flow-Specific Revocation - Revoke identities from specific flow contexts while maintaining global validity
Core Concepts
Flow Types
Public Flows
- Discoverable: Visible to other users in the organization
- Collaborative: Multiple users can access and manage
- Use Cases: Shared workflows, team projects, production services
Private Flows
- Restricted: Only accessible to the creator
- Isolated: Protected from external access
- Use Cases: Personal development, sensitive workflows, testing
Flow Metadata
Flows can include rich metadata for integration and management:
metadata = {
"project": "ai-platform",
"environment": "production",
"team": "ai-engineering",
"compliance": "soc2",
"astha-flow": {
"id": "langflow-uuid",
"folderId": "folder-uuid"
}
}
Flow Management API
Creating Flows
Python - Real Working Examples from Test Execution
import asyncio
import json
from aztp_client import Aztp
from dotenv import load_dotenv
load_dotenv()
async def create_production_flows():
"""Create flows based on successful test execution patterns."""
client = Aztp(api_key=os.getenv("AZTP_API_KEY"))
# Real flow configurations that work (from test execution)
flow_configurations = [
{
"name": "development-flow",
"description": "Development environment workflow",
"discoverable": "public",
"tags": ["development", "testing", "env:dev"],
"metadata": {
"environment": "development",
"project": "aztp-testing",
"created_by": "functional_test",
"version": "1.0"
}
},
{
"name": "production-flow",
"description": "Production deployment workflow",
"discoverable": "private",
"tags": ["production", "critical", "env:prod"],
"metadata": {
"environment": "production",
"project": "aztp-testing",
"security_level": "high",
"astha-flow": {
"id": "flow_123",
"folderId": "folder_456"
}
}
},
{
"name": "testing-flow",
"description": "Automated testing workflow",
"discoverable": "public",
"tags": ["testing", "automation", "ci/cd"],
"metadata": {
"environment": "testing",
"automation": True,
"test_suite": "integration"
}
}
]
created_flows = {}
for config in flow_configurations:
try:
print(f"🔄 Creating flow: '{config['name']}'")
flow = await client.create_flow(
name=config["name"],
description=config["description"],
discoverable=config["discoverable"],
tags=config["tags"],
metadata=config["metadata"]
)
created_flows[config["name"]] = flow
print(f"✅ Flow created successfully:")
print(f" Flow ID: {flow.get('_id')}")
print(f" Name: {flow.get('name')}")
print(f" Description: {flow.get('description')}")
print(f" Discoverable: {flow.get('discoverable')}")
print(f" Tags: {flow.get('tags')}")
except Exception as e:
print(f"❌ Error creating flow '{config['name']}': {e}")
print(f"\n📊 Created {len(created_flows)} flows successfully")
return created_flows
# Run the example
if __name__ == "__main__":
flows = asyncio.run(create_production_flows())
Example Output:
🔄 Creating flow: 'development-flow'
✅ Flow created successfully:
Flow ID: 507f1f77bcf86cd799439014
Name: development-flow
Description: Development environment workflow
Discoverable: public
Tags: ['development', 'testing', 'env:dev']
🔄 Creating flow: 'production-flow'
✅ Flow created successfully:
Flow ID: 507f1f77bcf86cd799439015
Name: production-flow
Description: Production deployment workflow
Discoverable: private
Tags: ['production', 'critical', 'env:prod']
📊 Created 3 flows successfully
TypeScript
import { AztpClient } from 'aztp-client';
const client = new AztpClient('your-org-api-key');
const flow = await client.createFlow(
'ai-content-pipeline',
{
description: 'Secure AI content generation workflow',
discoverable: 'public',
tags: ['ai', 'content', 'production'],
metadata: {
project: 'content-ai',
environment: 'production',
securityLevel: 'enterprise'
}
}
);
console.log(`Flow created: ${flow._id}`);
Managing Flow Identities - Real Test Execution Examples
async def manage_flow_identities_example():
"""Real working example from successful test execution."""
client = Aztp(api_key=os.getenv("AZTP_API_KEY"))
# First, create identities
agent1 = await client.secure_connect({}, "service1", config={"isGlobalIdentity": False})
agent2 = await client.secure_connect({}, "service2", config={"isGlobalIdentity": False})
# Get AZTP IDs
identity_info_1 = await client.get_identity(agent1)
identity_data_1 = json.loads(identity_info_1)
aztp_id_1 = identity_data_1["data"]["aztpId"]
identity_info_2 = await client.get_identity(agent2)
identity_data_2 = json.loads(identity_info_2)
aztp_id_2 = identity_data_2["data"]["aztpId"]
print(f"🔑 Created identities:")
print(f" Service 1: {aztp_id_1}")
print(f" Service 2: {aztp_id_2}")
# Example output:
# Service 1: aztp://astha.ai/workload/production/node/chat-service
# Service 2: aztp://astha.ai/workload/production/node/analytics-service
# Create flow
flow = await client.create_flow(
name="identity-management-demo",
description="Demonstration of identity management in flows",
discoverable="public",
tags=["demo", "testing"],
metadata={"project": "identity-demo", "environment": "development"}
)
flow_id = flow.get('_id')
print(f"📋 Created flow: {flow_id}")
# Example output: 📋 Created flow: 507f1f77bcf86cd799439013
# Add identities to flow
print(f"\n🔗 Adding identities to flow...")
result1 = await client.add_identity_to_flow(flow_id, aztp_id_1)
print(f"✅ Added service1 to flow")
print(f" Result contains {len(result1.get('identities', []))} total identities")
result2 = await client.add_identity_to_flow(flow_id, aztp_id_2)
print(f"✅ Added service2 to flow")
print(f" Result contains {len(result2.get('identities', []))} total identities")
# Verify identities in flow context
print(f"\n🔍 Verifying identities in flow context...")
is_valid_1 = await client.verify_identity_by_aztp_id(aztp_id_1, identity_flow_id=flow_id)
is_valid_2 = await client.verify_identity_by_aztp_id(aztp_id_2, identity_flow_id=flow_id)
print(f" Service 1 in flow: {'✅ VERIFIED' if is_valid_1 else '❌ NOT VERIFIED'}")
print(f" Service 2 in flow: {'✅ VERIFIED' if is_valid_2 else '❌ NOT VERIFIED'}")
# Demonstrate flow-specific operations
print(f"\n🔧 Flow identity management operations...")
# Remove identity from flow
print(f"➖ Removing {aztp_id_1} from flow...")
remove_result = await client.remove_identity_from_flow(flow_id, aztp_id_1)
print(f"✅ Identity removed, flow now has {len(remove_result.get('identities', []))} identities")
# Verify removal
try:
verification_after_removal = await client.verify_identity_by_aztp_id(
aztp_id_1, identity_flow_id=flow_id
)
status = "❌ NOT IN FLOW" if not verification_after_removal else "⚠️ STILL IN FLOW"
print(f" Verification after removal: {status}")
except Exception as e:
print(f" Verification after removal: ❌ NOT IN FLOW (expected)")
# Re-add identity
print(f"➕ Re-adding {aztp_id_1} to flow...")
readd_result = await client.add_identity_to_flow(flow_id, aztp_id_1)
print(f"✅ Identity re-added, flow now has {len(readd_result.get('identities', []))} identities")
# Flow-specific revocation (advanced feature)
print(f"\n🚫 Testing flow-specific revocation...")
print(f"⚠️ Revoking {aztp_id_2} from flow context only...")
try:
revoke_result = await client.revoke_flow_identity(
aztp_id=aztp_id_2,
identity_flow_id=flow_id,
reason="Testing flow-specific revocation in documentation demo"
)
print(f"✅ Flow-specific revocation completed:")
print(f" Result: {revoke_result}")
# Verify revocation in flow context
try:
post_revoke_verification = await client.verify_identity_by_aztp_id(
aztp_id_2, identity_flow_id=flow_id
)
status = "✅ REVOKED" if not post_revoke_verification else "⚠️ STILL ACTIVE"
print(f" Flow context verification: {status}")
except Exception as verify_error:
if "Identity already revoked for this flow" in str(verify_error):
print(f" Flow context verification: ✅ REVOKED (expected)")
else:
raise verify_error
# Check global validity (should still be valid)
global_verification = await client.verify_identity_by_aztp_id(aztp_id_2)
global_status = "✅ STILL VALID GLOBALLY" if global_verification else "❌ GLOBALLY REVOKED"
print(f" Global verification: {global_status}")
except Exception as e:
print(f"❌ Flow revocation error: {e}")
return {
"flow": flow,
"identities": [aztp_id_1, aztp_id_2],
"agents": [agent1, agent2]
}
# Run the example
if __name__ == "__main__":
result = asyncio.run(manage_flow_identities_example())
Example Execution Results:
🔗 Adding identities to flow: 'development-flow' (ID: 507f1f77bcf86cd799439013)
✅ Added identity aztp://astha.ai/workload/production/node/chat-service to flow
✅ Added identity aztp://astha.ai/workload/production/node/analytics-service to flow
🔍 Testing identity verification in flow: 'development-flow'
Identity aztp://astha.ai/workload/production/node/chat-service in flow: ✅ VALID
🚫 Testing Flow-Specific Identity Revocation
✅ Flow-specific revocation completed:
Result: {'valid': True, 'issuedAt': 1757475385308, 'reason': 'Testing flow-specific revocation'}
Post-revocation verification: ✅ REVOKED
Global verification: ✅ STILL VALID GLOBALLY
Updating and Deleting Flows
# Update flow properties
await client.update_flow(
flow_id="flow-uuid-123",
name="updated-ai-pipeline",
description="Updated description",
discoverable="private",
tags=["ai", "updated", "production"]
)
# Delete flow
await client.delete_flow("flow-uuid-123")
Real-World Implementation: Langflow Integration
Flow Creation in Langflow
# From langflow.services.identity.service.py
async def create_flow_identity(
self,
flow_name: str,
description: str = None,
flow_id: str = None,
folder_id: str = None,
organization_api_key: Optional[str] = None
) -> dict[str, Any] | None:
"""Create AZTP flow identity for a Langflow workflow."""
async with self._lock:
try:
client = await self._get_client(organization_api_key=organization_api_key)
if not client:
logger.error("❌ Organization API key required for flow creation")
return None
# Prepare enhanced metadata linking Langflow and AZTP
metadata = {
"project": flow_name,
"environment": "production",
"astha-flow": {
"id": flow_id or "unknown-flow-id",
"folderId": folder_id or "unknown-folder-id"
}
}
# Create AZTP flow with enhanced metadata
aztp_flow_data = await client.create_flow(
flow_name,
description=description or f"Langflow workflow: {flow_name}",
discoverable="public",
tags=[description or flow_name],
metadata=metadata
)
# Save flow ID for component registration
flow_file = f"aztp_flow_id_{organization_api_key[:8]}.json"
with open(flow_file, "w") as f:
json.dump(aztp_flow_data['_id'], f)
logger.info(f"✅ AZTP flow created:")
logger.info(f" AZTP Flow ID: {aztp_flow_data.get('_id')}")
logger.info(f" Langflow Flow ID: {flow_id}")
logger.info(f" Langflow Folder ID: {folder_id}")
return aztp_flow_data
except Exception as e:
logger.error(f"Failed to create AZTP flow for '{flow_name}': {e}")
return None
Component Registration in Flows
async def add_component_to_flow(
self,
aztp_flow_id: str,
component_aztp_id: str,
organization_api_key: Optional[str] = None
) -> bool:
"""Add component identity to AZTP flow."""
async with self._lock:
try:
client = await self._get_client(organization_api_key=organization_api_key)
if not client:
logger.error("❌ Organization API key required for adding component to flow")
return False
# Add component identity to flow
await client.add_identity_to_flow(aztp_flow_id, component_aztp_id)
logger.info(f"✅ Added component {component_aztp_id} to AZTP flow {aztp_flow_id}")
return True
except Exception as e:
logger.error(f"Failed to add component {component_aztp_id} to AZTP flow {aztp_flow_id}: {e}")
return False
Advanced Flow Patterns
Environment-Based Flows
async def create_environment_flows():
"""Create separate flows for different environments."""
client = Aztp(api_key="enterprise-org-key")
environments = ["development", "staging", "production"]
flows = {}
for env in environments:
# Create environment-specific flow
flow = await client.create_flow(
f"ai-platform-{env}",
description=f"AI platform {env} environment",
discoverable="private" if env == "production" else "public",
tags=[env, "ai-platform"],
metadata={
"project": "ai-platform",
"environment": env,
"security_level": "high" if env == "production" else "medium"
}
)
flows[env] = flow
print(f"✅ Created {env} flow: {flow['_id']}")
# Create environment-specific identities
services = ["api-gateway", "ai-processor", "data-store"]
for service in services:
# Create service identity for this environment
agent = await client.secure_connect(
{},
f"{env}-{service}",
config={"isGlobalIdentity": False}
)
# Add to environment flow
await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
print(f" 📋 Added {service} to {env} flow")
return flows
Multi-Tenant Flow Management
class MultiTenantFlowManager:
"""Manage flows across multiple tenants/organizations."""
def __init__(self, tenant_configs: dict[str, str]):
"""Initialize with tenant-specific API keys."""
self.tenants = {}
for tenant_id, api_key in tenant_configs.items():
self.tenants[tenant_id] = {
"client": Aztp(api_key=api_key),
"flows": {},
"identities": {}
}
async def create_tenant_flow(
self,
tenant_id: str,
flow_name: str,
flow_config: dict
) -> dict:
"""Create flow for specific tenant."""
if tenant_id not in self.tenants:
raise ValueError(f"Unknown tenant: {tenant_id}")
client = self.tenants[tenant_id]["client"]
# Create tenant-specific flow
flow = await client.create_flow(
f"{tenant_id}-{flow_name}",
description=flow_config.get("description", f"Flow for {tenant_id}"),
discoverable=flow_config.get("discoverable", "private"),
tags=flow_config.get("tags", [tenant_id]),
metadata={
**flow_config.get("metadata", {}),
"tenant_id": tenant_id,
"isolation_level": "tenant"
}
)
# Store in tenant registry
self.tenants[tenant_id]["flows"][flow_name] = flow
logger.info(f"✅ Created flow for tenant {tenant_id}: {flow['_id']}")
return flow
async def create_tenant_identity(
self,
tenant_id: str,
identity_name: str,
flow_name: str = None
) -> dict:
"""Create identity for tenant and optionally add to flow."""
if tenant_id not in self.tenants:
raise ValueError(f"Unknown tenant: {tenant_id}")
client = self.tenants[tenant_id]["client"]
# Create tenant-specific identity
agent = await client.secure_connect(
{},
f"{tenant_id}-{identity_name}",
config={"isGlobalIdentity": False}
)
# Store in tenant registry
self.tenants[tenant_id]["identities"][identity_name] = agent
# Add to flow if specified
if flow_name and flow_name in self.tenants[tenant_id]["flows"]:
flow_id = self.tenants[tenant_id]["flows"][flow_name]["_id"]
await client.add_identity_to_flow(flow_id, agent.identity.aztp_id)
logger.info(f"📋 Added {identity_name} to flow {flow_name} for tenant {tenant_id}")
logger.info(f"✅ Created identity for tenant {tenant_id}: {agent.identity.aztp_id}")
return agent
async def get_tenant_security_summary(self, tenant_id: str) -> dict:
"""Get security summary for tenant."""
if tenant_id not in self.tenants:
raise ValueError(f"Unknown tenant: {tenant_id}")
tenant_data = self.tenants[tenant_id]
client = tenant_data["client"]
summary = {
"tenant_id": tenant_id,
"flows": len(tenant_data["flows"]),
"identities": len(tenant_data["identities"]),
"identity_status": {},
"policy_compliance": {}
}
# Check identity status for each tenant identity
for identity_name, agent in tenant_data["identities"].items():
is_valid = await client.verify_identity(agent)
summary["identity_status"][identity_name] = {
"aztp_id": agent.identity.aztp_id,
"valid": is_valid,
"status": getattr(agent, "aztp_identity_status", "unknown")
}
# Calculate compliance score
valid_identities = sum(1 for status in summary["identity_status"].values() if status["valid"])
total_identities = len(summary["identity_status"])
summary["compliance_score"] = (valid_identities / total_identities * 100) if total_identities > 0 else 100
return summary
# Usage example
async def multi_tenant_example():
# Configure tenants
tenant_configs = {
"tenant-a": "api-key-tenant-a",
"tenant-b": "api-key-tenant-b",
"tenant-c": "api-key-tenant-c"
}
manager = MultiTenantFlowManager(tenant_configs)
# Create flows for each tenant
for tenant_id in tenant_configs.keys():
# Create tenant flow
flow = await manager.create_tenant_flow(
tenant_id,
"ai-workflow",
{
"description": f"AI workflow for {tenant_id}",
"discoverable": "private",
"tags": ["ai", "production"],
"metadata": {"compliance": "enterprise"}
}
)
# Create identities for the flow
services = ["input-processor", "ai-engine", "output-handler"]
for service in services:
await manager.create_tenant_identity(
tenant_id, service, "ai-workflow"
)
# Generate security summaries
for tenant_id in tenant_configs.keys():
summary = await manager.get_tenant_security_summary(tenant_id)
print(f"🔐 {tenant_id} Security Summary:")
print(f" Flows: {summary['flows']}")
print(f" Identities: {summary['identities']}")
print(f" Compliance Score: {summary['compliance_score']:.1f}%")
Flow-Based Policy Evaluation
Context-Aware Security
async def flow_context_security_example():
"""Demonstrate flow-based security context."""
client = Aztp(api_key="enterprise-org-key")
# Create flows for different security contexts
security_contexts = [
{
"name": "high-security-flow",
"security_level": "critical",
"allowed_actions": ["read", "verify"],
"environment": "production"
},
{
"name": "development-flow",
"security_level": "standard",
"allowed_actions": ["read", "write", "api_call", "debug"],
"environment": "development"
}
]
flows = {}
for context in security_contexts:
# Create flow with security context
flow = await client.create_flow(
context["name"],
description=f"Flow with {context['security_level']} security",
discoverable="private",
metadata={
"security_level": context["security_level"],
"allowed_actions": context["allowed_actions"],
"environment": context["environment"]
}
)
flows[context["name"]] = flow
# Create identity for this security context
agent = await client.secure_connect(
{},
f"{context['environment']}-secure-agent",
config={"isGlobalIdentity": False}
)
# Add to flow
await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
# Verify identity within flow context
is_valid_in_flow = await client.verify_identity_by_aztp_id(
agent.identity.aztp_id,
identity_flow_id=flow['_id']
)
print(f"✅ {context['name']} setup complete:")
print(f" Flow ID: {flow['_id']}")
print(f" Agent ID: {agent.identity.aztp_id}")
print(f" Valid in Flow: {is_valid_in_flow}")
print(f" Security Level: {context['security_level']}")
return flows
Production Patterns
CI/CD Pipeline Flows
async def cicd_pipeline_flows():
"""Create flows for CI/CD pipeline stages."""
client = Aztp(api_key="devops-org-key")
# Define pipeline stages
pipeline_stages = [
{
"name": "build-stage",
"description": "Build and compilation stage",
"services": ["code-analyzer", "builder", "test-runner"],
"security_level": "standard"
},
{
"name": "security-stage",
"description": "Security scanning and validation",
"services": ["vulnerability-scanner", "compliance-checker", "security-validator"],
"security_level": "high"
},
{
"name": "deployment-stage",
"description": "Production deployment stage",
"services": ["deployment-manager", "health-checker", "monitor"],
"security_level": "critical"
}
]
stage_flows = {}
for stage in pipeline_stages:
# Create stage flow
flow = await client.create_flow(
f"cicd-{stage['name']}",
description=stage["description"],
discoverable="private",
tags=["cicd", "pipeline", stage["name"]],
metadata={
"project": "cicd-pipeline",
"stage": stage["name"],
"security_level": stage["security_level"],
"services": stage["services"]
}
)
stage_flows[stage["name"]] = flow
# Create service identities for stage
stage_identities = {}
for service in stage["services"]:
agent = await client.secure_connect(
{},
f"cicd-{stage['name']}-{service}",
config={"isGlobalIdentity": False}
)
stage_identities[service] = agent
# Add to stage flow
await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
print(f"✅ Created {stage['name']} service: {service}")
# Link services within stage for secure communication
services = stage["services"]
for i in range(len(services) - 1):
await client.link_identities(
stage_identities[services[i]].identity.aztp_id,
stage_identities[services[i + 1]].identity.aztp_id,
"pipeline_stage",
metadata={
"stage": stage["name"],
"sequence": i + 1
}
)
# Link stages together
stage_names = list(stage_flows.keys())
for i in range(len(stage_names) - 1):
# Get representative identity from each stage
current_stage = stage_names[i]
next_stage = stage_names[i + 1]
# Link final service of current stage to first service of next stage
# This would require getting the identities, simplified here
print(f"🔗 Linked {current_stage} -> {next_stage}")
return stage_flows
Microservices Flow Architecture
async def microservices_flow_architecture():
"""Create flow architecture for microservices."""
client = Aztp(api_key="microservices-org-key")
# Define service architecture
architecture = {
"frontend-services": {
"services": ["web-ui", "mobile-api", "admin-dashboard"],
"security_level": "medium",
"external_facing": True
},
"business-services": {
"services": ["user-service", "order-service", "payment-service"],
"security_level": "high",
"external_facing": False
},
"data-services": {
"services": ["user-db", "order-db", "analytics-db"],
"security_level": "critical",
"external_facing": False
}
}
service_flows = {}
all_identities = {}
# Create flows for each service tier
for tier_name, tier_config in architecture.items():
# Create tier flow
flow = await client.create_flow(
f"microservices-{tier_name}",
description=f"Microservices {tier_name} tier",
discoverable="private",
tags=["microservices", tier_name, tier_config["security_level"]],
metadata={
"project": "microservices-platform",
"tier": tier_name,
"security_level": tier_config["security_level"],
"external_facing": tier_config["external_facing"]
}
)
service_flows[tier_name] = flow
tier_identities = {}
# Create service identities
for service in tier_config["services"]:
agent = await client.secure_connect(
{},
f"microservice-{service}",
config={"isGlobalIdentity": False}
)
tier_identities[service] = agent
all_identities[service] = agent
# Add to tier flow
await client.add_identity_to_flow(flow['_id'], agent.identity.aztp_id)
print(f"✅ Created {tier_name} service: {service}")
# Link services within tier
services = tier_config["services"]
for i in range(len(services)):
for j in range(i + 1, len(services)):
await client.link_identities(
tier_identities[services[i]].identity.aztp_id,
tier_identities[services[j]].identity.aztp_id,
"tier_communication",
metadata={
"tier": tier_name,
"security_level": tier_config["security_level"]
}
)
# Create cross-tier communication links
cross_tier_links = [
("web-ui", "user-service"),
("mobile-api", "order-service"),
("user-service", "user-db"),
("order-service", "order-db"),
("payment-service", "analytics-db")
]
for source_service, target_service in cross_tier_links:
if source_service in all_identities and target_service in all_identities:
await client.link_identities(
all_identities[source_service].identity.aztp_id,
all_identities[target_service].identity.aztp_id,
"cross_tier_communication",
metadata={"communication_type": "api_call"}
)
print(f"🔗 Cross-tier link: {source_service} -> {target_service}")
return service_flows, all_identities
Flow Monitoring and Analytics
Flow Health Monitoring
class FlowHealthMonitor:
"""Monitor health and security status of identity flows."""
def __init__(self, organization_api_key: str):
self.client = Aztp(api_key=organization_api_key)
self.monitoring_data = {}
async def monitor_flow_health(self, flow_id: str) -> dict:
"""Monitor health of a specific flow."""
health_report = {
"flow_id": flow_id,
"timestamp": datetime.utcnow().isoformat(),
"status": "unknown",
"identities": {},
"security_score": 0,
"issues": []
}
try:
# Get flow information (this would require additional API)
# For now, we'll demonstrate with known identities
# Example: Monitor known identities in flow
test_identities = [
"aztp://domain/workload/prod/service1",
"aztp://domain/workload/prod/service2",
"aztp://domain/workload/prod/service3"
]
valid_count = 0
for aztp_id in test_identities:
try:
# Verify identity in flow context
is_valid = await self.client.verify_identity_by_aztp_id(
aztp_id,
identity_flow_id=flow_id
)
# Get identity details
identity_data = await self.client.get_identity_by_aztp_id(aztp_id)
parsed_data = json.loads(identity_data)
identity_health = {
"aztp_id": aztp_id,
"valid": is_valid,
"status": parsed_data["data"]["status"],
"last_verified": datetime.utcnow().isoformat()
}
if is_valid and parsed_data["data"]["status"] == "active":
valid_count += 1
else:
health_report["issues"].append(f"Identity issue: {aztp_id}")
health_report["identities"][aztp_id] = identity_health
except Exception as e:
health_report["issues"].append(f"Verification failed for {aztp_id}: {str(e)}")
health_report["identities"][aztp_id] = {
"aztp_id": aztp_id,
"valid": False,
"error": str(e)
}
# Calculate health metrics
total_identities = len(test_identities)
health_report["security_score"] = (valid_count / total_identities * 100) if total_identities > 0 else 0
if health_report["security_score"] >= 90:
health_report["status"] = "healthy"
elif health_report["security_score"] >= 70:
health_report["status"] = "warning"
else:
health_report["status"] = "critical"
logger.info(f"🏥 Flow health check: {flow_id} - {health_report['status']} ({health_report['security_score']:.1f}%)")
except Exception as e:
health_report["status"] = "error"
health_report["issues"].append(f"Health check failed: {str(e)}")
logger.error(f"❌ Flow health check failed for {flow_id}: {e}")
return health_report
async def monitor_all_flows(self, flow_ids: list[str]) -> dict:
"""Monitor health of multiple flows."""
monitoring_summary = {
"monitoring_timestamp": datetime.utcnow().isoformat(),
"total_flows": len(flow_ids),
"flow_reports": {},
"overall_status": "unknown",
"summary_metrics": {}
}
# Monitor all flows in parallel
tasks = [self.monitor_flow_health(flow_id) for flow_id in flow_ids]
flow_reports = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
healthy_flows = 0
warning_flows = 0
critical_flows = 0
for flow_id, report in zip(flow_ids, flow_reports):
if isinstance(report, Exception):
monitoring_summary["flow_reports"][flow_id] = {
"status": "error",
"error": str(report)
}
critical_flows += 1
else:
monitoring_summary["flow_reports"][flow_id] = report
if report["status"] == "healthy":
healthy_flows += 1
elif report["status"] == "warning":
warning_flows += 1
else:
critical_flows += 1
# Calculate overall metrics
monitoring_summary["summary_metrics"] = {
"healthy_flows": healthy_flows,
"warning_flows": warning_flows,
"critical_flows": critical_flows,
"health_percentage": (healthy_flows / len(flow_ids) * 100) if flow_ids else 0
}
# Determine overall status
if critical_flows > 0:
monitoring_summary["overall_status"] = "critical"
elif warning_flows > 0:
monitoring_summary["overall_status"] = "warning"
else:
monitoring_summary["overall_status"] = "healthy"
return monitoring_summary
# Usage for production monitoring
async def production_flow_monitoring():
monitor = FlowHealthMonitor("prod-org-key")
# Monitor critical production flows
production_flows = [
"user-authentication-flow",
"payment-processing-flow",
"ai-recommendation-flow",
"security-monitoring-flow"
]
# Run monitoring
monitoring_report = await monitor.monitor_all_flows(production_flows)
print(f"🔍 Production Flow Monitoring Report:")
print(f" Overall Status: {monitoring_report['overall_status']}")
print(f" Health Percentage: {monitoring_report['summary_metrics']['health_percentage']:.1f}%")
print(f" Healthy Flows: {monitoring_report['summary_metrics']['healthy_flows']}")
print(f" Warning Flows: {monitoring_report['summary_metrics']['warning_flows']}")
print(f" Critical Flows: {monitoring_report['summary_metrics']['critical_flows']}")
# Alert on critical issues
if monitoring_report["overall_status"] == "critical":
print("🚨 CRITICAL: Production flows have security issues!")
# In production, trigger alerts to operations team
return monitoring_report
Best Practices
Flow Design Principles
- Logical Grouping - Group related identities that work together
- Environment Separation - Use separate flows for dev/staging/production
- Security Boundaries - Match flow boundaries to security requirements
- Metadata Rich - Include comprehensive metadata for management
- Regular Monitoring - Implement health checks and monitoring
Flow Security Guidelines
# Security-first flow creation
async def secure_flow_creation(client, flow_config):
"""Create flow with security best practices."""
# Validate flow configuration
required_fields = ["name", "description", "security_level"]
for field in required_fields:
if field not in flow_config:
raise ValueError(f"Missing required field: {field}")
# Set security-appropriate discoverability
discoverable = "private" if flow_config["security_level"] == "critical" else "public"
# Create flow with security metadata
flow = await client.create_flow(
flow_config["name"],
description=flow_config["description"],
discoverable=discoverable,
tags=flow_config.get("tags", []) + ["secure", flow_config["security_level"]],
metadata={
**flow_config.get("metadata", {}),
"security_level": flow_config["security_level"],
"created_by": "security_team",
"compliance_required": True,
"monitoring_enabled": True
}
)
logger.info(f"🔐 Secure flow created: {flow['_id']} (Level: {flow_config['security_level']})")
return flow
Flow Lifecycle Management
class FlowLifecycleManager:
"""Manage complete lifecycle of identity flows."""
def __init__(self, organization_api_key: str):
self.client = Aztp(api_key=organization_api_key)
self.active_flows = {}
async def create_managed_flow(self, flow_config: dict) -> dict:
"""Create flow with lifecycle management."""
# Create flow
flow = await self.client.create_flow(
flow_config["name"],
description=flow_config["description"],
discoverable=flow_config.get("discoverable", "private"),
tags=flow_config.get("tags", []),
metadata={
**flow_config.get("metadata", {}),
"lifecycle_managed": True,
"created_at": datetime.utcnow().isoformat(),
"manager_version": "1.0"
}
)
# Register for lifecycle management
self.active_flows[flow["_id"]] = {
"flow_data": flow,
"config": flow_config,
"identities": {},
"created_at": datetime.utcnow(),
"last_health_check": None
}
return flow
async def add_managed_identity(self, flow_id: str, identity_name: str, identity_config: dict):
"""Add identity to managed flow."""
if flow_id not in self.active_flows:
raise ValueError(f"Flow {flow_id} not under management")
# Create identity
agent = await self.client.secure_connect(
{},
identity_name,
config=identity_config
)
# Add to flow
await self.client.add_identity_to_flow(flow_id, agent.identity.aztp_id)
# Register in management
self.active_flows[flow_id]["identities"][identity_name] = {
"agent": agent,
"config": identity_config,
"added_at": datetime.utcnow()
}
logger.info(f"✅ Added managed identity {identity_name} to flow {flow_id}")
return agent
async def perform_health_checks(self):
"""Perform health checks on all managed flows."""
health_summary = {
"check_timestamp": datetime.utcnow().isoformat(),
"flows_checked": 0,
"healthy_flows": 0,
"issues_detected": 0,
"flow_details": {}
}
for flow_id, flow_info in self.active_flows.items():
try:
# Check each identity in the flow
identity_health = {}
for identity_name, identity_info in flow_info["identities"].items():
agent = identity_info["agent"]
# Verify identity
is_valid = await self.client.verify_identity_by_aztp_id(
agent.identity.aztp_id,
identity_flow_id=flow_id
)
identity_health[identity_name] = {
"valid": is_valid,
"aztp_id": agent.identity.aztp_id
}
if not is_valid:
health_summary["issues_detected"] += 1
# Update flow health status
all_valid = all(health["valid"] for health in identity_health.values())
flow_health = {
"flow_id": flow_id,
"status": "healthy" if all_valid else "issues",
"identities": identity_health,
"last_check": datetime.utcnow().isoformat()
}
health_summary["flow_details"][flow_id] = flow_health
health_summary["flows_checked"] += 1
if all_valid:
health_summary["healthy_flows"] += 1
# Update management record
self.active_flows[flow_id]["last_health_check"] = datetime.utcnow()
except Exception as e:
logger.error(f"Health check failed for flow {flow_id}: {e}")
health_summary["issues_detected"] += 1
return health_summary
Next Steps
- Policy Management - Configure organizational policies
- API Reference - Complete method documentation
- Monitoring Guide - Production monitoring setup
- Enterprise Support (opens in a new tab) - Professional implementation assistance