Policy
Overview

Policy Management

Comprehensive guide to AZTP policy management, including access control, OIAP evaluation, and enterprise-grade security enforcement.

Updated: September 10, 2025 - Based on AZTP Client v1.0.42 real test execution

Overview

AZTP policy management provides fine-grained access control for AI agents and components through:

  • Organization Identity Access Policy (OIAP) - User-based access control
  • Component Policy Enforcement - Fine-grained permission management
  • Dynamic Policy Evaluation - Real-time access decisions
  • Policy Statement Filtering - Extract specific policies by code
  • Action Permission Validation - Check if actions are allowed
  • Audit and Compliance - Comprehensive policy enforcement logging

Policy Architecture

graph TB
    subgraph Policy_Flow ["Policy Evaluation Flow"]
        A["Access Request"] --> B{"Identity Valid?"}
        B -->|No| C["Access Denied"]
        B -->|Yes| D{"Component Policy Check"}
        D -->|Fail| C
        D -->|Pass| E{"OIAP Evaluation"}
        E -->|Deny| C
        E -->|Allow| F["Access Granted"]
    end
    
    subgraph Policy_Sources ["Policy Sources"]
        G["Organization Policies"] --> E
        H["Component Permissions"] --> D
        I["User Context"] --> E
        J["Resource Policies"] --> E
    end
    
    subgraph Enforcement_Points ["Enforcement Points"]
        K["Agent Tool Access"] --> A
        L["Component Communication"] --> A
        M["Data Access"] --> A
        N["API Calls"] --> A
    end

Policy Types

1. Component Policies

Control what actions components can perform based on their type and configuration.

# Component policy actions
class PolicyAction(Enum):
    # Data Flow Actions
    DATA_ACCESS = "data_access"
    READ_DATA = "read_data"
    WRITE_DATA = "write_data"
    TRANSFORM_DATA = "transform_data"
 
    # Communication Actions
    SEND_MESSAGE = "send_message"
    RECEIVE_MESSAGE = "receive_message"
    BROADCAST_MESSAGE = "broadcast_message"
 
    # External Actions
    API_CALL = "api_call"
    FILE_ACCESS = "file_access"
    DATABASE_ACCESS = "database_access"
    NETWORK_ACCESS = "network_access"
 
    # Processing Actions
    EXECUTE_CODE = "execute_code"
    GENERATE_CONTENT = "generate_content"
    PROCESS_SENSITIVE_DATA = "process_sensitive_data"

2. OIAP (Organization Identity Access Policy)

User-based access control that evaluates:

  • User Identity - Who is making the request
  • Component Identity - What component is requesting access
  • Resource - What resource/tool is being accessed
  • Context - Organizational policies and constraints
# Real OIAP evaluation examples from test execution
async def test_oiap_evaluation():
    """Real OIAP evaluation examples that work in production."""
    
    client = Aztp(api_key=os.getenv("AZTP_API_KEY"))
    
    # Create test agent
    agent = await client.secure_connect({}, "policy-test-service", config={"isGlobalIdentity": False})
    identity_info = await client.get_identity(agent)
    identity_data = json.loads(identity_info)
    aztp_id = identity_data["data"]["aztpId"]
    
    print(f"🔑 Testing OIAP for identity: {aztp_id}")
    
    # Test cases for OIAP evaluation
    oiap_test_cases = [
        {"resource": "list_users", "user_id": "user-demo-12345"},
        {"resource": "database/users", "user_id": "user-demo-67890"},
        {"resource": "api/admin/settings", "user_id": "admin-demo-456"},
        {"resource": "files/documents", "user_id": "employee-demo-789"},
        {"resource": "service/public", "user_id": "guest-demo-001"}
    ]
    
    for test_case in oiap_test_cases:
        try:
            result = await client.oiap_evaluate(
                aztp_id=aztp_id,
                requested_resource=test_case['resource'],
                user_id=test_case['user_id']
            )
            
            print(f"📋 Resource: {test_case['resource']}")
            print(f"   User ID: {test_case['user_id']}")
            print(f"   Result: {result.get('result', 'UNKNOWN')}")
            print(f"   Reason: {result.get('reason', 'No reason provided')}")
            print()
            
        except Exception as e:
            print(f"❌ OIAP evaluation failed for {test_case['resource']}: {e}")
 
# Example execution results:
# Resource: list_users
#   User ID: user-demo-12345
#   Result: ALLOW
#   Reason: Explicitly allowed by policy

Policy Retrieval and Filtering

Real Policy Examples from Test Execution

async def policy_management_example():
    """Real policy management examples from successful test execution."""
    
    client = Aztp(api_key=os.getenv("AZTP_API_KEY"))
    
    # Create agent and get identity
    agent = await client.secure_connect({}, "policy-demo-service", config={"isGlobalIdentity": False})
    identity_info = await client.get_identity(agent)
    identity_data = json.loads(identity_info)
    aztp_id = identity_data["data"]["aztpId"]
    
    print(f"🔑 Getting policies for: {aztp_id}")
    
    # Get all policies for identity
    policies = await client.get_policy(aztp_id)
    print(f"📋 Retrieved {len(policies)} policies")
    
    # Real policy structure from test execution:
    # [
    #   {
    #     'policyStatement': {
    #       'Version': '2025-09-10',
    #       'Statement': {
    #         'Sid': '74298e6a-d44d-421f-843e-ec52f4ef7bea',
    #         'Effect': 'Allow',
    #         'Action': ['read_user_profile', 'list_users'],
    #         'Condition': {
    #           'StringEquals': {
    #             'department': 'Engineering',
    #             'trust_domain': 'astha.ai'
    #           }
    #         }
    #       }
    #     },
    #     'code': 'policy:4f4b07f75458',
#     'identity': {
#       'name': 'chat-service',
#       'aztpId': 'aztp://astha.ai/workload/production/node/chat-service'
#     }
    #   }
    # ]
    
    # Filter policies by code (real working example)
    if policies and len(policies) > 0:
        for policy_item in policies:
            code = policy_item.get('code')
            print(f"\n🔍 Testing policy: {code}")
            
            # Filter specific policy
            filtered_policy = client.get_policy_value(
                policies=policies,
                filter_key="code",
                filter_value=code
            )
            
            print(f"📜 Policy Statement:")
            print(json.dumps(filtered_policy, indent=2))
            
            # Test action permissions
            test_actions = ["read", "write", "execute", "delete", "test", "admin"]
            print(f"\n🧪 Testing actions against policy {code}:")
            
            for action in test_actions:
                try:
                    is_allowed = client.is_action_allowed(filtered_policy, action)
                    status = "✅ ALLOWED" if is_allowed else "❌ DENIED"
                    print(f"   Action '{action}': {status}")
                except Exception as e:
                    print(f"   Action '{action}': ⚠️ Error - {e}")
    
    # Test identity policy permissions (real working example)
    print(f"\n🛡️ Checking identity policy permissions...")
    
    permission_tests = [
        {"action": "read"},
        {"action": "write"},
        {"action": "test"},
        {"action": "admin"}
    ]
    
    for test_case in permission_tests:
        try:
            permissions = await client.check_identity_policy_permissions(
                aztp_id,
                options={"actions": [test_case['action']]}
            )
            print(f"   Action '{test_case['action']}': {permissions}")
        except Exception as e:
            print(f"   Action '{test_case['action']}': Error - {e}")
    
    # Get all permissions (no specific action)
    try:
        all_permissions = await client.check_identity_policy_permissions(aztp_id)
        print(f"\n📋 All available permissions:")
        for action, allowed in all_permissions.items():
            status = "✅ ALLOWED" if allowed else "❌ DENIED"
            print(f"   {action}: {status}")
    except Exception as e:
        print(f"❌ Error getting all permissions: {e}")
 
# Real test execution results:
# Policy for AZTP ID aztp://llm.sumit.yadav.com.np/workload/production/node/service1:
# [
#   {
#     'policyStatement': {
#       'Version': '2025-09-10',
#       'Statement': {
#         'Effect': 'Allow',
#         'Action': ['read_user_profile', 'list_users'],
#         'Condition': {'StringEquals': {'department': 'Engineering', 'trust_domain': 'astha.ai'}}
#       }
#     },
#     'code': 'policy:4f4b07f75458'
#   },
#   {
#     'policyStatement': {
#       'Statement': {
#         'Effect': 'Allow', 
#         'Action': ['read_user_profile', 'list_users', 'test']
#       }
#     },
#     'code': 'policy:292a1ea03cbc'
#   }
# ]
 
# Permission check results: {'read_user_profile': True, 'list_users': True, 'test': True}

Real-World Implementation: Langflow

Policy Configuration

# Environment configuration for policy enforcement
export AZTP_POLICY_ENABLED=true
export AZTP_POLICY_STRICT_MODE=false
export AZTP_POLICY_DEFAULT_ACTION=allow
export AZTP_POLICY_LOG_LEVEL=info

Component Policy Checking

# From langflow.services.identity.service.py
async def check_component_access_policy(
    self,
    source_component_id: str,
    target_component_id: str,
    action: str = "data_access",
    organization_api_key: Optional[str] = None
) -> bool:
    """Check if source component has permission to access target component."""
    
    # Check if policy enforcement is enabled
    if not self.policy_enabled:
        logger.info(f"🔧 Policy enforcement disabled, allowing access")
        return True
    
    # Get component agents
    source_agent = self.get_agent(source_component_id, organization_api_key)
    target_agent = self.get_agent(target_component_id, organization_api_key)
    
    if not source_agent or not target_agent:
        return self.policy_default_action == "allow"
    
    try:
        client = await self._get_client(organization_api_key=organization_api_key)
        
        # Check permissions for the specific action
        permissions = await client.check_identity_policy_permissions(
            source_agent.identity.aztp_id,
            options={"actions": [action]}
        )
        
        is_allowed = permissions.get(action, False)
        
        if is_allowed:
            logger.info(f"✅ Policy ALLOWED: {source_component_id} -> {target_component_id}")
        else:
            logger.warning(f"❌ Policy DENIED: {source_component_id} -> {target_component_id}")
        
        return is_allowed
        
    except Exception as e:
        logger.error(f"Error checking policy: {e}")
        return not self.policy_strict_mode

Policy Testing

Comprehensive Policy Tests

import pytest
from aztp_client import Aztp
 
class TestPolicyEnforcement:
    """Test policy enforcement mechanisms."""
    
    @pytest.fixture
    async def policy_client(self):
        return Aztp(api_key=os.getenv("AZTP_TEST_API_KEY"))
    
    @pytest.mark.asyncio
    async def test_component_permissions(self, policy_client):
        """Test component permission checking."""
        
        # Create test agent
        agent = await policy_client.secure_connect({}, "policy-test-agent")
        
        # Test various permission scenarios
        test_actions = ["read", "write", "api_call", "admin_access"]
        
        permissions = await policy_client.check_identity_policy_permissions(
            agent.identity.aztp_id,
            options={"actions": test_actions}
        )
        
        # Verify permission structure
        assert isinstance(permissions, dict)
        for action in test_actions:
            assert action in permissions
            assert isinstance(permissions[action], bool)
    
    @pytest.mark.asyncio
    async def test_oiap_evaluation(self, policy_client):
        """Test OIAP policy evaluation."""
        
        # Create test agent
        agent = await policy_client.secure_connect({}, "oiap-test-agent")
        
        # Test OIAP evaluation
        result = await policy_client.oiap_evaluate(
            aztp_id=agent.identity.aztp_id,
            requested_resource="test_tool",
            user_id="test-user-123"
        )
        
        assert "result" in result
        assert result["result"] in ["ALLOW", "DENY"]

Next Steps