Data Architecture
ReptiDex’s data architecture is designed around the principles of data consistency, performance, and scalability while maintaining strict multi-tenant isolation and comprehensive audit capabilities.
Overview
Design Philosophy
- Domain-Driven Design: Data models align with business domains
- Event Sourcing: Critical business events are stored as immutable facts
- CQRS Pattern: Command and Query Responsibility Segregation for optimal performance
- Multi-Tenant: Organization-scoped data with strict isolation
- Audit-First: Complete audit trails for all data changes
Data Storage Strategy
- Database-Per-Service: 6 separate PostgreSQL databases with complete service isolation
- Cache Layer: Redis for session data and computed results
- Search Engine: PostgreSQL full-text search (cost-effective, no OpenSearch overhead)
- Object Storage: S3 for media files and documents
- Message Queue: SNS/SQS for event-driven communication
Database Architecture
Multi-Tenant Design
-- Global tenant isolation pattern
CREATE TABLE tenants (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ DEFAULT now()
);
-- All domain tables include tenant isolation
CREATE TABLE animals (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL REFERENCES tenants(id),
-- other columns...
CONSTRAINT animals_tenant_check CHECK (tenant_id IS NOT NULL)
);
-- Row-level security policies
ALTER TABLE animals ENABLE ROW LEVEL SECURITY;
CREATE POLICY animals_tenant_isolation ON animals
FOR ALL TO application_role
USING (tenant_id = current_setting('app.current_tenant_id'));
Database Schema Organization
PostgreSQL Databases (Database-Per-Service):
├── repti_core_db # Auth, config, billing, events, telemetry
├── repti_animal_db # Animals, lineage, genetics, taxonomy, breeding
├── repti_commerce_db # Marketplace, sales, transactions, inventory
├── repti_media_db # File metadata, rendering, embed configurations
├── repti_community_db # Search indexes, notifications, community
└── repti_ops_db # Admin tools, audit logs, integrations, system logs
Service Schema Patterns
Each service owns its database schema completely, following these patterns:
Multi-Tenant Isolation: All tables include tenant_id with row-level security policies
Audit Trails: Automatic audit logging via triggers for compliance
Event Integration: Domain events published on all state changes
Performance Optimization: Service-specific indexes and materialized views
Detailed schemas are documented within each service’s documentation to ensure single source of truth. See individual service docs for complete table definitions and relationships.
Snowflake Analytics Integration
Strategic Data Warehouse Integration
Snowflake provides advanced analytics capabilities that complement our operational PostgreSQL databases:
Analytics Architecture:
Implementation Strategy
Phase 1: Analytics Foundation (Months 13-18)
- Cross-Service Analytics: Unified view across all 6 service databases
- Breeding Intelligence: Advanced genetics analysis and outcome predictions
- Business Metrics: Real-time dashboards for growth and engagement
Phase 2: Operational Analytics (Months 19-24)
- Real-time Streaming: Snowpipe integration with SNS/SQS event streams
- Historical Analysis: Long-term trends in breeding success and market patterns
- Compliance Reporting: Automated regulatory and audit reports
Phase 3: Advanced Features (Months 25+)
- Machine Learning: Snowflake ML for genetics prediction models
- Global Distribution: Multi-region data sharing for international expansion
- Data Marketplace: Share anonymized breeding data with research partners
Key Benefits for ReptiDex
Genetics & Breeding Analytics:
- Complex lineage calculations across generations
- Breeding outcome predictions based on historical data
- Genetic diversity analysis and recommendations
Business Intelligence:
- Organization-scoped analytics with multi-tenant security
- Revenue analytics across subscription tiers and regions
- User engagement and retention analysis
Operational Insights:
- Real-time system performance monitoring
- Event-driven alerts and anomaly detection
- Cost optimization and resource planning
Integration Examples
Real-time Breeding Analytics:
-- Snowflake view aggregating cross-service breeding data
CREATE VIEW breeding_success_analysis AS
SELECT
b.organization_id,
b.species,
b.morph_combination,
COUNT(c.id) as total_clutches,
AVG(c.hatch_rate) as avg_hatch_rate,
AVG(c.genetic_diversity_score) as avg_diversity
FROM business_events.breeding_pairs b
JOIN business_events.clutch_outcomes c ON b.id = c.pairing_id
WHERE b.event_timestamp >= CURRENT_DATE - 365
GROUP BY b.organization_id, b.species, b.morph_combination;
Cross-Service User Journey Analysis:
-- User engagement across all services
CREATE TABLE user_journey_analytics AS
SELECT
u.user_id,
u.organization_id,
COUNT(DISTINCT a.animal_id) as animals_managed,
COUNT(DISTINCT s.listing_id) as items_sold,
SUM(t.transaction_amount) as total_revenue
FROM core_events.user_activities u
LEFT JOIN business_events.animal_activities a USING(user_id)
LEFT JOIN commerce_events.sales_activities s USING(user_id)
LEFT JOIN commerce_events.transactions t USING(user_id)
GROUP BY u.user_id, u.organization_id;
Caching Architecture
Redis Cache Layers
Cache Patterns
Pedigree Caching
# Cache pedigree tree data with hierarchical keys
def get_pedigree_tree(animal_id: str, depth: int = 4) -> dict:
cache_key = f"pedigree:{animal_id}:depth:{depth}"
# Try L1 cache (in-memory)
if cached := app_cache.get(cache_key):
return cached
# Try L2 cache (Redis)
if cached := redis.get(cache_key):
data = json.loads(cached)
app_cache.set(cache_key, data, ttl=300) # 5 minutes
return data
# Generate from database
data = build_pedigree_tree(animal_id, depth)
# Cache in both layers
redis.setex(cache_key, 3600, json.dumps(data)) # 1 hour
app_cache.set(cache_key, data, ttl=300)
return data
Search Result Caching
# Cache search results with dynamic TTL based on result count
def cache_search_results(query: str, filters: dict, results: list):
cache_key = generate_search_cache_key(query, filters)
# Dynamic TTL based on result volatility
ttl = 300 if len(results) > 100 else 60 # 5min for stable, 1min for volatile
redis.setex(cache_key, ttl, json.dumps({
'results': results,
'cached_at': datetime.utcnow().isoformat(),
'total_count': len(results)
}))
Search Architecture
PostgreSQL Full-Text Search Configuration
-- Enhanced full-text search indexes for animals
CREATE INDEX idx_animals_search_fts
ON animal.animals USING gin(
to_tsvector('english',
name || ' ' ||
species || ' ' ||
array_to_string(morph, ' ') || ' ' ||
coalesce(description, '')
)
);
-- Weighted search with ranking
CREATE INDEX idx_animals_search_weighted
ON animal.animals USING gin(
setweight(to_tsvector('english', name), 'A') ||
setweight(to_tsvector('english', species), 'B') ||
setweight(to_tsvector('english', array_to_string(morph, ' ')), 'B') ||
setweight(to_tsvector('english', coalesce(description, '')), 'C')
);
-- Species and morph-specific searches
CREATE INDEX idx_animals_species_morph
ON animal.animals USING gin(species gin_trgm_ops, morph);
-- Geographic search for location-based queries
CREATE INDEX idx_animals_location
ON animal.animals USING gist(location);
-- Multi-column index for filtered searches
CREATE INDEX idx_animals_filtered_search
ON animal.animals(tenant_id, status, visibility, species)
WHERE status = 'active';
Search Indexing Strategy
# Event-driven search using PostgreSQL materialized views (repti-community service)
async def handle_animal_updated(event: DomainEvent):
# Update search materialized view via cross-service call
animal_data = await repti_animal_service.get_animal(event.aggregate_id)
# Transform for PostgreSQL search view
search_query = """
INSERT INTO search.animal_search_index (
id, tenant_id, name, species, morphs, genetics_text,
location, price, status, visibility, search_vector, updated_at
) VALUES (
%(id)s, %(tenant_id)s, %(name)s, %(species)s, %(morphs)s, %(genetics_text)s,
%(location)s, %(price)s, %(status)s, %(visibility)s,
to_tsvector('english', %(search_text)s), %(updated_at)s
)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
species = EXCLUDED.species,
search_vector = EXCLUDED.search_vector,
updated_at = EXCLUDED.updated_at
"""
search_text = f"{animal_data.name} {animal_data.species} {' '.join(animal_data.morphs)}"
await engagement_db.execute(search_query, {
'id': animal_data.id,
'tenant_id': animal_data.tenant_id,
'name': animal_data.name,
'species': animal_data.species,
'morphs': animal_data.morphs,
'genetics_text': serialize_genetics(animal_data.genetics),
'location': animal_data.location,
'price': animal_data.price if animal_data.for_sale else None,
'status': animal_data.status,
'visibility': animal_data.visibility,
'search_text': search_text,
'updated_at': datetime.utcnow()
})
Event Sourcing and CQRS
Event Store Design
-- Event store in repti_core_db (repti-core service owns events)
CREATE SCHEMA events;
CREATE TABLE events.domain_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
event_version INTEGER NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT events_ordering UNIQUE (aggregate_id, event_version)
);
-- Optimized indexes for event replay
CREATE INDEX idx_events_aggregate ON events.domain_events(aggregate_id, event_version);
CREATE INDEX idx_events_type_time ON events.domain_events(event_type, occurred_at);
CREATE INDEX idx_events_occurred_at ON events.domain_events(occurred_at);
Event Publishing Pattern
# Transactional outbox pattern
class DomainEventPublisher:
def __init__(self, db_session, sns_client):
self.db_session = db_session
self.sns_client = sns_client
async def publish_events(self, events: List[DomainEvent]):
# 1. Store events in database (same transaction)
for event in events:
await self.store_event(event)
# 2. Commit database transaction
await self.db_session.commit()
# 3. Publish to SNS (after successful commit)
for event in events:
await self.publish_to_sns(event)
async def store_event(self, event: DomainEvent):
await self.db_session.execute(
"""
INSERT INTO events.domain_events
(aggregate_type, aggregate_id, event_type, event_version, event_data, metadata)
VALUES (:aggregate_type, :aggregate_id, :event_type, :version, :data, :metadata)
""",
{
'aggregate_type': event.aggregate_type,
'aggregate_id': event.aggregate_id,
'event_type': event.event_type,
'version': event.version,
'data': event.data,
'metadata': event.metadata
}
)
Data Security and Privacy
Encryption Strategy
-- Column-level encryption for sensitive data
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Encrypted columns with application-level keys
CREATE TABLE auth.user_profiles (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES auth.users(id),
phone_encrypted TEXT, -- pgp_sym_encrypt(phone, app_key)
address_encrypted TEXT,
notes_encrypted TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);
-- Field-level privacy controls
CREATE TABLE animal.privacy_settings (
animal_id TEXT PRIMARY KEY REFERENCES animal.animals(id),
visibility TEXT DEFAULT 'private', -- public, organization, private
fields_public TEXT[], -- which fields are publicly visible
fields_organization TEXT[], -- which fields are visible to org members
created_at TIMESTAMPTZ DEFAULT now()
);
Audit Trail Implementation
CREATE SCHEMA audit;
-- Comprehensive audit log
CREATE TABLE audit.data_changes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
table_name TEXT NOT NULL,
record_id TEXT NOT NULL,
action TEXT NOT NULL, -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_fields TEXT[],
user_id TEXT,
tenant_id TEXT,
session_id TEXT,
ip_address INET,
user_agent TEXT,
occurred_at TIMESTAMPTZ DEFAULT now()
);
-- Trigger function for automatic audit logging
CREATE OR REPLACE FUNCTION audit.log_data_change()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit.data_changes (
table_name, record_id, action, old_values, new_values, changed_fields,
user_id, tenant_id, session_id, ip_address
) VALUES (
TG_TABLE_NAME,
COALESCE(NEW.id, OLD.id),
TG_OP,
CASE WHEN TG_OP = 'DELETE' THEN to_jsonb(OLD) ELSE NULL END,
CASE WHEN TG_OP IN ('INSERT', 'UPDATE') THEN to_jsonb(NEW) ELSE NULL END,
CASE WHEN TG_OP = 'UPDATE'
THEN array(SELECT key FROM jsonb_each(to_jsonb(NEW))
WHERE value != (to_jsonb(OLD) -> key))
ELSE NULL END,
current_setting('app.current_user_id', true),
current_setting('app.current_tenant_id', true),
current_setting('app.current_session_id', true),
current_setting('app.client_ip', true)::inet
);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
Backup and Recovery
Backup Strategy
# PostgreSQL Backup Configuration
backup_schedule:
full_backup:
frequency: "daily"
time: "02:00 UTC"
retention: "30 days"
incremental_backup:
frequency: "every 4 hours"
retention: "7 days"
transaction_log_backup:
frequency: "every 15 minutes"
retention: "24 hours"
recovery_objectives:
rto: "4 hours" # Recovery Time Objective
rpo: "15 minutes" # Recovery Point Objective
Point-in-Time Recovery
# Example recovery procedure
#!/bin/bash
# Restore from base backup
pg_basebackup -D /var/lib/postgresql/recovery \
-h backup-server \
-U postgres \
--write-recovery-conf
# Configure recovery target
echo "recovery_target_time = '2024-01-15 14:30:00'" >> /var/lib/postgresql/recovery/postgresql.conf
echo "recovery_target_action = 'promote'" >> /var/lib/postgresql/recovery/postgresql.conf
# Start recovery process
systemctl start postgresql-recovery
Database Optimization
-- Optimized indexes for common query patterns
CREATE INDEX CONCURRENTLY idx_animals_tenant_species
ON animal.animals(tenant_id, species)
WHERE status = 'active';
CREATE INDEX CONCURRENTLY idx_animals_search
ON animal.animals USING gin(to_tsvector('english', name || ' ' || coalesce(description, '')));
-- Partitioning for large tables
CREATE TABLE audit.data_changes_2024 PARTITION OF audit.data_changes
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
-- Query optimization with materialized views
CREATE MATERIALIZED VIEW analytics.daily_stats AS
SELECT
date_trunc('day', created_at) as date,
tenant_id,
count(*) as animals_created,
count(*) FILTER (WHERE for_sale = true) as listings_created
FROM animal.animals
WHERE created_at >= current_date - interval '90 days'
GROUP BY date_trunc('day', created_at), tenant_id;
CREATE UNIQUE INDEX ON analytics.daily_stats(date, tenant_id);
Connection Pooling
# PgBouncer configuration for connection pooling
DATABASE_CONFIG = {
'host': 'pgbouncer-host',
'port': 6432,
'database': 'reptidex_core',
'user': 'app_user',
'password': os.getenv('DB_PASSWORD'),
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 30,
'pool_recycle': 3600
}
Monitoring and Alerting
Database Monitoring
-- Performance monitoring queries
SELECT
schemaname,
tablename,
n_tup_ins as inserts,
n_tup_upd as updates,
n_tup_del as deletes,
n_live_tup as live_rows,
n_dead_tup as dead_rows
FROM pg_stat_user_tables
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC;
-- Long running queries
SELECT
pid,
query_start,
state,
query
FROM pg_stat_activity
WHERE state = 'active'
AND query_start < now() - interval '5 minutes';
Automated Alerting
# CloudWatch Alarms for RDS
alerts:
- name: "High Database CPU"
metric: "CPUUtilization"
threshold: 80
duration: "5 minutes"
action: "scale_up_read_replica"
- name: "Low Free Storage"
metric: "FreeStorageSpace"
threshold: "20GB"
action: "expand_storage"
- name: "High Connection Count"
metric: "DatabaseConnections"
threshold: 80
action: "investigate_connection_leaks"
Future Considerations
Database Evolution Strategy
- Service-Specific Scaling: Independent read replicas per service database based on usage patterns
- Tenant Sharding: Horizontal partitioning by organization for massive scale (100K+ users)
- Polyglot Persistence: Specialized databases per service (time-series for telemetry, graph DB for lineage)
- Geographic Distribution: Multi-region database clusters with cross-region replication
Service Database Evolution
- repti_core_db: User federation and global identity management
- repti_animal_db: Graph database migration for complex lineage queries
- repti_commerce_db: Time-series for transaction analytics and fraud detection
- repti_media_db: Object storage metadata with CDN integration
- repti_community_db: Search optimization with specialized indexing
- repti_ops_db: Long-term archival and compliance data retention
Advanced Data Features
- Cross-Service Analytics: Data lake integration for business intelligence
- Real-Time Collaboration: Event streaming for multi-user breeding sessions
- Machine Learning Pipelines: Integrated ML for genetics prediction and breeding recommendations
- Global Data Sync: Multi-region eventual consistency for international expansion