Skip to main content

Security Architecture

ReptiDex implements a comprehensive security strategy that protects user data, ensures regulatory compliance, and maintains system integrity across all layers of the application stack.

Security Principles

Core Security Tenets

  1. Defense in Depth: Multiple security layers to prevent single points of failure
  2. Zero Trust: Never trust, always verify - every request is authenticated and authorized
  3. Principle of Least Privilege: Users and services have minimal required permissions
  4. Security by Design: Security considerations built into every architectural decision
  5. Transparency and Auditability: Complete audit trails for all security-relevant actions

Compliance Framework

  • GDPR: General Data Protection Regulation compliance for EU users
  • CCPA: California Consumer Privacy Act compliance
  • SOC 2 Type II: Security and availability controls
  • PCI DSS: Payment card data security standards

Authentication Architecture

Multi-Factor Authentication

JWT Token Strategy

# JWT token configuration
JWT_CONFIG = {
    'algorithm': 'RS256',
    'access_token_expire': 15 * 60,  # 15 minutes
    'refresh_token_expire': 7 * 24 * 60 * 60,  # 7 days
    'issuer': 'ReptiDex',
    'audience': ['web-app', 'mobile-app', 'api'],
    'claims': {
        'user_id': 'required',
        'tenant_id': 'required',
        'roles': 'required',
        'permissions': 'required',
        'session_id': 'required'
    }
}

# Token rotation strategy
class TokenManager:
    async def refresh_token(self, refresh_token: str) -> TokenPair:
        # Validate refresh token
        claims = await self.validate_refresh_token(refresh_token)
        
        # Generate new token pair
        new_access_token = await self.generate_access_token(claims)
        new_refresh_token = await self.generate_refresh_token(claims)
        
        # Revoke old refresh token
        await self.revoke_refresh_token(refresh_token)
        
        return TokenPair(new_access_token, new_refresh_token)

Session Management

# Secure session configuration
SESSION_CONFIG = {
    'secure': True,  # HTTPS only
    'httponly': True,  # No JavaScript access
    'samesite': 'Strict',  # CSRF protection
    'max_age': 15 * 60,  # 15 minutes
    'domain': '.reptidex.com',  # Subdomain sharing
    'path': '/'
}

# Session security monitoring
class SessionSecurityMonitor:
    async def detect_anomalies(self, session_id: str, request_info: dict):
        session = await self.get_session(session_id)
        
        # Check for suspicious patterns
        if await self.is_suspicious_location_change(session, request_info):
            await self.trigger_security_alert('location_anomaly', session)
        
        if await self.is_suspicious_device_change(session, request_info):
            await self.require_reauthentication(session)
        
        if await self.is_brute_force_attempt(request_info):
            await self.implement_rate_limiting(request_info['ip'])

Authorization Framework

Role-Based Access Control (RBAC)

Service Ownership: repti-core service manages all authentication and authorization data in repti_core_db ReptiDex implements vivarium-scoped roles using husbandry-themed names:
  • Keeper (Viewer): Read-only access to animals, pedigrees, clutches
  • Handler (Editor): Can add/update animals, pairings, clutches, and media
  • Curator (Admin): Manages vivarium data + members, cannot change billing
  • Herpetologist (Owner): Full rights including deleting vivarium and managing subscriptions
-- Vivarium role enumeration
CREATE TYPE vivarium_role_enum AS ENUM (
    'keeper',        -- Read-only access (Viewer)
    'handler',       -- Editor permissions
    'curator',       -- Admin permissions  
    'herpetologist'  -- Owner permissions
);

-- Role hierarchy with vivarium context (repti_core_db schema)
CREATE TABLE vivarium_members (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  vivarium_id UUID REFERENCES vivariums(id) ON DELETE CASCADE,
  user_id UUID REFERENCES users(id) ON DELETE CASCADE,
  role vivarium_role_enum NOT NULL DEFAULT 'keeper',
  permissions JSONB DEFAULT '{}',
  invited_by UUID REFERENCES users(id),
  joined_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW(),
  UNIQUE(vivarium_id, user_id)
);

-- Role-based permission validation
CREATE TABLE role_permissions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    role vivarium_role_enum NOT NULL,
    resource VARCHAR(50) NOT NULL,
    action VARCHAR(30) NOT NULL,
    allowed BOOLEAN DEFAULT FALSE,
    conditions JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

Vivarium Security Model

Each vivarium operates as an isolated security boundary with hierarchical role permissions:
  • Role Hierarchy: keeper < handler < curator < herpetologist
  • Permission Inheritance: Higher roles inherit all lower role permissions
  • Context Isolation: User can have different roles across different vivariums
  • Audit Trail: All role changes and sensitive operations logged
  • MFA Requirements: Curator+ roles require multi-factor authentication
-- Permissions system with service-level granularity
CREATE TABLE auth.permissions (
  id TEXT PRIMARY KEY,
  service TEXT NOT NULL,  -- repti-animal, repti-commerce, etc.
  resource TEXT NOT NULL, -- animals, listings, etc.
  action TEXT NOT NULL,   -- create, read, update, delete
  condition JSONB,        -- optional conditions
  created_at TIMESTAMPTZ DEFAULT now(),
  UNIQUE(service, resource, action)
);

-- Role-permission mapping
CREATE TABLE auth.role_permissions (
  role_id TEXT REFERENCES auth.roles(id),
  permission_id TEXT REFERENCES auth.permissions(id),
  granted_at TIMESTAMPTZ DEFAULT now(),
  granted_by TEXT REFERENCES auth.users(id),
  PRIMARY KEY(role_id, permission_id)
);

-- User role assignments with organization scope
CREATE TABLE auth.user_roles (
  user_id TEXT REFERENCES auth.users(id),
  role_id TEXT REFERENCES auth.roles(id),
  organization_id TEXT REFERENCES auth.organizations(id),
  granted_at TIMESTAMPTZ DEFAULT now(),
  granted_by TEXT REFERENCES auth.users(id),
  expires_at TIMESTAMPTZ,
  PRIMARY KEY(user_id, role_id, organization_id)
);

Fine-Grained Permissions

# Cross-service permission checking (repti-core service)
class PermissionService:
    async def check_service_permission(
        self, 
        user_id: str, 
        service: str,
        resource: str, 
        action: str, 
        context: dict = None
    ) -> bool:
        # Get user roles for current organization
        user_roles = await self.get_user_roles(user_id, context.get('organization_id'))
        
        # Check each role for required permission
        for role in user_roles:
            # Only check permissions for relevant services
            if service not in role.service_scope:
                continue
                
            permissions = await self.get_role_permissions(role.id)
            
            for permission in permissions:
                if (permission.service == service and
                    permission.resource == resource and 
                    permission.action == action and
                    await self.evaluate_conditions(permission.condition, context)):
                    return True
        
        return False
    
    async def check_cross_service_access(self, requesting_service: str, target_service: str) -> bool:
        """Validate service-to-service communication"""
        service_permissions = {
            'repti-animal': ['repti-core'],  # Can call core for auth
            'repti-commerce': ['repti-core', 'repti-animal'],  # Can call core + business
            'repti-media': ['repti-core'],  # Can call core for auth
            'repti-community': ['repti-core', 'repti-animal'],  # Can call core + business
            'repti-ops': ['repti-core', 'repti-animal', 'repti-commerce']  # Admin access
        }
        
        allowed_targets = service_permissions.get(requesting_service, [])
        return target_service in allowed_targets
    
    async def evaluate_conditions(self, condition: dict, context: dict) -> bool:
        if not condition:
            return True
        
        # Service-aware conditions
        if 'owner_only' in condition:
            return context.get('resource_owner_id') == context.get('user_id')
        
        if 'organization_member' in condition:
            return await self.is_organization_member(
                context.get('user_id'), 
                context.get('organization_id')
            )
        
        if 'service_admin' in condition:
            return await self.has_service_admin_role(
                context.get('user_id'),
                condition['service_admin']
            )
        
        return True

Data Protection and Privacy

Encryption Strategy

# Multi-layer encryption approach
class EncryptionService:
    def __init__(self):
        self.app_key = self.load_application_key()
        self.user_keys = {}  # Per-user encryption keys
    
    async def encrypt_sensitive_field(self, data: str, user_id: str) -> str:
        # Layer 1: User-specific encryption
        user_key = await self.get_user_key(user_id)
        encrypted_data = self.symmetric_encrypt(data, user_key)
        
        # Layer 2: Application-level encryption
        double_encrypted = self.symmetric_encrypt(encrypted_data, self.app_key)
        
        return double_encrypted
    
    async def encrypt_pii(self, pii_data: dict) -> dict:
        encrypted_fields = {}
        
        for field, value in pii_data.items():
            if field in ['email', 'phone', 'address', 'ssn']:
                encrypted_fields[f"{field}_encrypted"] = await self.encrypt_with_kms(value)
            else:
                encrypted_fields[field] = value
        
        return encrypted_fields

Privacy Controls

Service Distribution: Privacy controls managed per service with cross-service coordination
-- Field-level privacy settings (repti_core_db.privacy schema)
CREATE TABLE privacy.field_visibility (
  service TEXT NOT NULL,    -- repti-animal, repti-commerce, etc.
  record_type TEXT NOT NULL, -- animals, profiles, etc.
  record_id TEXT NOT NULL,
  field_name TEXT NOT NULL,
  visibility_level TEXT NOT NULL DEFAULT 'private',
  -- public, organization, private, custom
  custom_rules JSONB,
  created_at TIMESTAMPTZ DEFAULT now(),
  updated_at TIMESTAMPTZ DEFAULT now(),
  PRIMARY KEY(service, record_type, record_id, field_name)
);

-- Service-specific data retention policies
CREATE TABLE privacy.retention_policies (
  service TEXT NOT NULL,    -- Which service owns this data type
  data_type TEXT NOT NULL,  -- users, animals, transactions, etc.
  retention_period INTERVAL NOT NULL,
  deletion_method TEXT NOT NULL DEFAULT 'hard_delete',
  archival_location TEXT,
  cross_service_deps TEXT[], -- Services that depend on this data
  created_at TIMESTAMPTZ DEFAULT now(),
  PRIMARY KEY(service, data_type)
);

-- Cross-service privacy coordination
CREATE TABLE privacy.service_privacy_requests (
  request_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id TEXT NOT NULL,
  request_type TEXT NOT NULL, -- export, deletion, rectification
  affected_services TEXT[] NOT NULL,
  completion_status JSONB NOT NULL DEFAULT '{}', -- Track per-service completion
  created_at TIMESTAMPTZ DEFAULT now(),
  completed_at TIMESTAMPTZ
);

GDPR Compliance Implementation

# Cross-service GDPR compliance orchestration (repti-ops service)
class GDPRComplianceService:
    def __init__(self):
        self.service_clients = {
            'repti-core': CoreServiceClient(),
            'repti-animal': BusinessServiceClient(),
            'repti-commerce': CommerceServiceClient(),
            'repti-media': MediaServiceClient(),
            'repti-community': EngagementServiceClient()
        }
    
    async def handle_data_export_request(self, user_id: str) -> dict:
        """Right to Data Portability - Article 20"""
        request_id = await self.create_privacy_request(user_id, 'export')
        
        # Coordinate data export across all 6 services
        export_data = {}
        for service_name, client in self.service_clients.items():
            try:
                service_data = await client.export_user_data(user_id, request_id)
                export_data[service_name] = service_data
                await self.update_request_status(request_id, service_name, 'completed')
            except Exception as e:
                await self.update_request_status(request_id, service_name, 'failed', str(e))
                
        # Aggregate and sanitize cross-service data
        return await self.sanitize_for_export(export_data)
    
    async def handle_deletion_request(self, user_id: str) -> bool:
        """Right to Erasure - Article 17"""
        request_id = await self.create_privacy_request(user_id, 'deletion')
        
        # Check cross-service dependencies
        dependencies = await self.check_deletion_dependencies(user_id)
        if dependencies['blocking_relationships']:
            raise CannotDeleteException(f"Blocking relationships: {dependencies}")
        
        # Coordinate deletion across services in dependency order
        deletion_order = ['repti-media', 'repti-community', 'repti-commerce', 'repti-animal', 'repti-core']
        
        for service_name in deletion_order:
            client = self.service_clients[service_name]
            try:
                await client.delete_user_data(user_id, request_id, dependencies)
                await self.update_request_status(request_id, service_name, 'completed')
            except Exception as e:
                await self.update_request_status(request_id, service_name, 'failed', str(e))
                await self.initiate_rollback_procedure(request_id, user_id)
                return False
        
        return True
    
    async def handle_rectification_request(self, user_id: str, corrections: dict) -> bool:
        """Right to Rectification - Article 16"""
        request_id = await self.create_privacy_request(user_id, 'rectification')
        
        # Group corrections by service
        service_corrections = await self.group_corrections_by_service(corrections)
        
        # Apply corrections across services with two-phase commit
        for service_name, service_corrections in service_corrections.items():
            client = self.service_clients[service_name]
            try:
                await client.apply_corrections(user_id, service_corrections, request_id)
                await self.update_request_status(request_id, service_name, 'completed')
            except Exception as e:
                await self.update_request_status(request_id, service_name, 'failed', str(e))
        
        # Notify affected services of data changes
        await self.notify_cross_service_data_update(user_id, corrections)
        
        return True
    
    async def check_deletion_dependencies(self, user_id: str) -> dict:
        """Check if user data can be safely deleted across all services"""
        dependencies = {
            'blocking_relationships': [],
            'safe_to_anonymize': [],
            'requires_retention': []
        }
        
        # Check each service for deletion blockers
        for service_name, client in self.service_clients.items():
            service_deps = await client.check_deletion_readiness(user_id)
            dependencies['blocking_relationships'].extend(service_deps.get('blockers', []))
            dependencies['requires_retention'].extend(service_deps.get('retention_required', []))
            
        return dependencies

Network Security

VPC Architecture

# AWS VPC Security Configuration
vpc_configuration:
  cidr_block: "10.0.0.0/16"
  
  public_subnets:
    - cidr: "10.0.1.0/24"
      az: "us-east-1a"
      purpose: "Load Balancers"
    - cidr: "10.0.2.0/24"
      az: "us-east-1b"
      purpose: "NAT Gateways"
  
  private_subnets:
    - cidr: "10.0.10.0/24"
      az: "us-east-1a"
      purpose: "Application Servers"
    - cidr: "10.0.11.0/24"
      az: "us-east-1b"
      purpose: "Application Servers"
  
  database_subnets:
    - cidr: "10.0.20.0/24"
      az: "us-east-1a"
      purpose: "Database Servers"
    - cidr: "10.0.21.0/24"
      az: "us-east-1b"
      purpose: "Database Servers"

security_groups:
  web_tier:
    ingress:
      - port: 443
        protocol: "tcp"
        source: "0.0.0.0/0"
      - port: 80
        protocol: "tcp"
        source: "0.0.0.0/0"
    egress:
      - port: 8080
        protocol: "tcp"
        destination: "app_tier_sg"
  
  app_tier:
    ingress:
      - port: 8080
        protocol: "tcp"
        source: "web_tier_sg"
    egress:
      - port: 5432
        protocol: "tcp"
        destination: "db_tier_sg"
  
  db_tier:
    ingress:
      - port: 5432
        protocol: "tcp"
        source: "app_tier_sg"

API Security

# API security middleware
class APISecurityMiddleware:
    async def __call__(self, request: Request, call_next):
        # Rate limiting
        if not await self.check_rate_limit(request):
            raise HTTPException(429, "Rate limit exceeded")
        
        # Input validation
        await self.validate_input(request)
        
        # SQL injection prevention
        await self.sanitize_query_params(request)
        
        # CORS validation
        if not await self.validate_cors(request):
            raise HTTPException(403, "CORS policy violation")
        
        response = await call_next(request)
        
        # Security headers
        self.add_security_headers(response)
        
        return response
    
    def add_security_headers(self, response: Response):
        response.headers.update({
            'X-Content-Type-Options': 'nosniff',
            'X-Frame-Options': 'DENY',
            'X-XSS-Protection': '1; mode=block',
            'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
            'Content-Security-Policy': self.get_csp_header(),
            'Referrer-Policy': 'strict-origin-when-cross-origin'
        })

Application Security

Input Validation and Sanitization

# Comprehensive input validation
class InputValidator:
    @staticmethod
    def validate_animal_data(data: dict) -> dict:
        schema = {
            'name': {
                'type': 'string',
                'maxlength': 100,
                'regex': r'^[A-Za-z0-9\s\-_]+$',
                'required': True
            },
            'species': {
                'type': 'string',
                'allowed': VALID_SPECIES_LIST,
                'required': True
            },
            'hatch_date': {
                'type': 'date',
                'min': '1990-01-01',
                'max': 'today',
                'required': False
            }
        }
        
        return validate_against_schema(data, schema)
    
    @staticmethod
    def sanitize_html_content(content: str) -> str:
        # Remove potentially dangerous HTML tags and attributes
        allowed_tags = ['p', 'br', 'strong', 'em', 'u', 'ol', 'ul', 'li']
        allowed_attributes = {}
        
        return bleach.clean(
            content, 
            tags=allowed_tags, 
            attributes=allowed_attributes,
            strip=True
        )

Secrets Management

# AWS Secrets Manager integration
class SecretsManager:
    def __init__(self):
        self.client = boto3.client('secretsmanager')
        self.cache = TTLCache(maxsize=100, ttl=300)  # 5-minute cache
    
    async def get_secret(self, secret_name: str) -> str:
        if secret_name in self.cache:
            return self.cache[secret_name]
        
        try:
            response = self.client.get_secret_value(SecretId=secret_name)
            secret_value = response['SecretString']
            
            # Cache for 5 minutes
            self.cache[secret_name] = secret_value
            
            return secret_value
        except ClientError as e:
            logger.error(f"Failed to retrieve secret {secret_name}: {e}")
            raise SecretRetrievalException(f"Secret {secret_name} not found")
    
    async def rotate_secret(self, secret_name: str) -> bool:
        """Automatic secret rotation"""
        try:
            self.client.rotate_secret(
                SecretId=secret_name,
                ForceRotateSecretVersionId=True
            )
            
            # Clear from cache to force refresh
            self.cache.pop(secret_name, None)
            
            return True
        except ClientError as e:
            logger.error(f"Failed to rotate secret {secret_name}: {e}")
            return False

Security Monitoring and Incident Response

Security Event Monitoring

Service Integration: Security monitoring coordinated by repti-ops with event aggregation from all 6 services
# Cross-service security event detection (repti-ops service)
class SecurityMonitor:
    def __init__(self):
        self.new_relic_client = NewRelicClient()
        self.service_security_endpoints = {
            'repti-core': '/security/events',
            'repti-animal': '/security/events', 
            'repti-commerce': '/security/events',
            'repti-media': '/security/events',
            'repti-community': '/security/events'
        }
    
    async def monitor_cross_service_events(self, event: dict):
        """Monitor security events across all services"""
        service = event.get('service')
        event_type = event.get('type')
        
        # Service-specific security monitoring
        if service == 'repti-core' and event_type == 'login_attempt':
            await self.monitor_login_attempts(event)
        elif service == 'repti-animal' and event_type == 'data_access':
            await self.monitor_animal_data_access(event)
        elif service == 'repti-commerce' and event_type == 'transaction':
            await self.monitor_transaction_security(event)
        elif service == 'repti-media' and event_type == 'file_upload':
            await self.monitor_file_security(event)
        elif service == 'repti-community' and event_type == 'search_query':
            await self.monitor_search_security(event)
            
        # Cross-service correlation
        await self.correlate_security_events(event)
        
    async def monitor_login_attempts(self, login_event: dict):
        user_id = login_event.get('user_id')
        ip_address = login_event.get('ip_address')
        success = login_event.get('success')
        
        if not success:
            await self.track_failed_login(user_id, ip_address)
            
            # Check for brute force across all services
            if await self.is_cross_service_brute_force(ip_address):
                await self.block_ip_globally(ip_address)
                await self.alert_security_team('brute_force_detected', {
                    'ip_address': ip_address,
                    'target_user': user_id,
                    'affected_services': await self.get_affected_services(ip_address)
                })
        else:
            await self.check_cross_service_login_anomalies(login_event)
    
    async def monitor_service_privilege_escalation(self, user_id: str, service: str, old_roles: list, new_roles: list):
        elevated_permissions = set(new_roles) - set(old_roles)
        
        if elevated_permissions:
            await self.log_service_privilege_change(user_id, service, elevated_permissions)
            
            # Alert on cross-service admin privileges
            if any('admin' in role for role in elevated_permissions):
                await self.alert_security_team('service_admin_privilege_granted', {
                    'user_id': user_id,
                    'service': service,
                    'new_roles': list(elevated_permissions),
                    'cross_service_impact': await self.assess_cross_service_impact(elevated_permissions)
                })

Incident Response Automation

# Automated incident response
class IncidentResponse:
    async def handle_security_incident(self, incident_type: str, details: dict):
        incident_id = await self.create_incident_record(incident_type, details)
        
        # Automated responses based on incident type
        if incident_type == 'data_breach_suspected':
            await self.initiate_data_breach_protocol(incident_id, details)
        elif incident_type == 'unauthorized_access':
            await self.lock_affected_accounts(details.get('affected_users', []))
        elif incident_type == 'malware_detected':
            await self.isolate_affected_systems(details.get('affected_hosts', []))
        
        # Always notify security team
        await self.notify_security_team(incident_id, incident_type, details)
    
    async def initiate_data_breach_protocol(self, incident_id: str, details: dict):
        """GDPR Article 33 & 34 compliance"""
        # 1. Contain the breach
        await self.contain_breach(details)
        
        # 2. Assess the risk
        risk_level = await self.assess_breach_risk(details)
        
        # 3. Notify authorities if required (72-hour rule)
        if risk_level >= RiskLevel.HIGH:
            await self.schedule_authority_notification(incident_id, details)
        
        # 4. Notify affected individuals if required
        if risk_level >= RiskLevel.CRITICAL:
            await self.notify_affected_individuals(details.get('affected_users', []))

Compliance and Auditing

Audit Trail Implementation

Service Distribution: Security events collected by repti-ops from all 6 services and stored in repti_ops_db
-- Cross-service security audit logging (repti_ops_db.audit schema)
CREATE TABLE audit.security_events (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  service TEXT NOT NULL,    -- Which service generated the event
  event_type TEXT NOT NULL,
  event_category TEXT NOT NULL, -- auth, authorization, data_access, etc.
  user_id TEXT,
  session_id TEXT,
  ip_address INET,
  user_agent TEXT,
  resource_type TEXT,
  resource_id TEXT,
  action TEXT,
  result TEXT, -- success, failure, blocked
  risk_score INTEGER,
  cross_service_correlation_id UUID, -- Link related events across services
  metadata JSONB,
  occurred_at TIMESTAMPTZ DEFAULT now()
);

-- Service-specific audit indexes
CREATE INDEX idx_security_events_service ON audit.security_events(service, occurred_at);
CREATE INDEX idx_security_events_user ON audit.security_events(user_id, occurred_at);
CREATE INDEX idx_security_events_correlation ON audit.security_events(cross_service_correlation_id);

-- Cross-service event correlation table
CREATE TABLE audit.event_correlations (
  correlation_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id TEXT NOT NULL,
  session_id TEXT,
  event_chain TEXT[], -- Order of services involved
  start_time TIMESTAMPTZ DEFAULT now(),
  end_time TIMESTAMPTZ,
  status TEXT DEFAULT 'active' -- active, completed, failed
);

-- Retention policy for audit logs (7 years for compliance)
CREATE INDEX idx_security_events_retention 
ON audit.security_events(occurred_at) 
WHERE occurred_at < (current_date - interval '7 years');

Compliance Reporting

# Cross-service automated compliance reporting (repti-ops service)
class ComplianceReporter:
    def __init__(self):
        self.service_clients = {
            'repti-core': CoreServiceClient(),
            'repti-animal': AnimalServiceClient(),
            'repti-commerce': CommerceServiceClient(),
            'repti-media': MediaServiceClient(),
            'repti-community': CommunityServiceClient()
        }
    
    async def generate_gdpr_report(self, start_date: date, end_date: date) -> dict:
        # Aggregate GDPR compliance data across all services
        service_reports = {}
        for service_name, client in self.service_clients.items():
            service_reports[service_name] = await client.get_gdpr_metrics(start_date, end_date)
        
        return {
            'cross_service_summary': {
                'data_subject_requests': await self.aggregate_dsr_requests(service_reports),
                'data_breaches': await self.aggregate_data_breaches(service_reports),
                'consent_records': await self.aggregate_consent_records(service_reports),
                'data_retention_compliance': await self.check_cross_service_retention_compliance(),
                'processor_agreements': await self.verify_cross_service_processor_agreements()
            },
            'service_breakdown': service_reports,
            'cross_service_coordination': {
                'privacy_request_completions': await self.get_privacy_request_stats(start_date, end_date),
                'service_response_times': await self.get_service_response_metrics(start_date, end_date),
                'coordination_failures': await self.get_coordination_failure_metrics(start_date, end_date)
            }
        }
    
    async def generate_soc2_evidence(self) -> dict:
        # Collect SOC2 evidence across all 6 services
        evidence = {}
        
        for service_name, client in self.service_clients.items():
            service_evidence = await client.get_soc2_evidence()
            evidence[service_name] = service_evidence
        
        return {
            'cross_service_controls': {
                'access_controls': await self.document_cross_service_access_controls(),
                'security_monitoring': await self.document_unified_monitoring_controls(),
                'incident_response': await self.document_cross_service_incident_procedures(),
                'backup_procedures': await self.document_service_backup_controls(),
                'vendor_management': await self.document_service_vendor_controls(),
                'data_classification': await self.document_service_data_classification()
            },
            'service_specific_evidence': evidence,
            'service_integration_controls': {
                'api_security': await self.document_inter_service_api_security(),
                'data_flow_controls': await self.document_cross_service_data_flows(),
                'event_integrity': await self.document_event_driven_security_controls()
            }
        }
    
    async def generate_service_security_report(self) -> dict:
        """Generate security posture report across all 6 services"""
        return {
            'service_security_scores': {
                service: await client.get_security_score() 
                for service, client in self.service_clients.items()
            },
            'cross_service_vulnerabilities': await self.assess_cross_service_vulnerabilities(),
            'service_communication_security': await self.assess_inter_service_security(),
            'unified_threat_landscape': await self.analyze_unified_threat_landscape(),
            'recommendations': await self.generate_security_recommendations()
        }

Security Testing and Validation

Automated Security Testing

# Cross-service security testing pipeline
security_tests:
  per_service_testing:
    static_analysis:
      tools: ["bandit", "semgrep", "sonarqube"]
      frequency: "on_commit"
      scope: "all_6_services"
      
    dependency_scanning:
      tools: ["safety", "snyk", "dependabot"]
      frequency: "daily"
      scope: "all_6_services"
      
    container_scanning:
      tools: ["trivy", "clair", "twistlock"]
      frequency: "on_build"
      scope: "all_6_services"
  
  integration_testing:
    cross_service_api_testing:
      tools: ["postman", "newman", "custom_scripts"]
      frequency: "on_deployment"
      scope: "service_to_service_communications"
      
    service_authentication_testing:
      tools: ["custom_auth_tests"]
      frequency: "daily"
      scope: "cross_service_authentication"
      
    data_flow_security_testing:
      tools: ["custom_data_flow_tests"]
      frequency: "weekly"
      scope: "cross_service_data_integrity"
  
  system_wide_testing:
    dynamic_analysis:
      tools: ["zap", "burp", "nessus"]
      frequency: "weekly"
      scope: "all_external_endpoints"
      
    penetration_testing:
      frequency: "quarterly"
      scope: "entire_6_service_ecosystem"
      focus_areas:
        - "service_boundary_vulnerabilities"
        - "cross_service_privilege_escalation"
        - "data_isolation_verification"
        - "event_stream_security"
        
    chaos_engineering:
      frequency: "monthly"
      scope: "service_failure_scenarios"
      security_focus: "graceful_degradation_security"

new_relic_integration:
  security_testing_monitoring:
    - "test_execution_tracking"
    - "vulnerability_trend_analysis" 
    - "security_test_coverage_metrics"
    - "cross_service_security_dashboards"

Security Metrics

# Security KPIs tracking
class SecurityMetrics:
    async def calculate_security_score(self) -> float:
        metrics = {
            'vulnerability_score': await self.get_vulnerability_score(),
            'compliance_score': await self.get_compliance_score(),
            'incident_response_score': await self.get_incident_response_score(),
            'security_training_score': await self.get_training_completion_score()
        }
        
        weighted_score = (
            metrics['vulnerability_score'] * 0.3 +
            metrics['compliance_score'] * 0.25 +
            metrics['incident_response_score'] * 0.25 +
            metrics['security_training_score'] * 0.2
        )
        
        return weighted_score
This comprehensive security architecture ensures ReptiDex maintains the highest standards of data protection, user privacy, and regulatory compliance while providing a seamless user experience.