Basic Usage

Master the fundamentals of Akron ORM with comprehensive examples covering all core concepts and operations.

Core Concepts

This guide covers the fundamental concepts you need to master Akron ORM. You'll learn about connections, models, schemas, and all CRUD operations with practical examples.

📚 What You'll Master

  • • Database connections and configuration
  • • Model definition with Pydantic integration
  • • Schema design and table relationships
  • • Complete CRUD operations
  • • Query patterns and filtering
  • • Error handling and best practices

Database Connections

Connection Basics

Akron uses connection URLs to determine which database driver to use. The URL scheme automatically selects the appropriate backend:

Connection Examples
1from akron import Akron
2
3# SQLite - File-based database
4db = Akron("sqlite:///my_app.db")
5
6# SQLite - In-memory database (testing)
7db = Akron("sqlite:///:memory:")
8
9# MySQL - Relational database server
10db = Akron("mysql://username:password@localhost:3306/database")
11
12# PostgreSQL - Advanced relational database
13db = Akron("postgres://username:password@localhost:5432/database")
14
15# MongoDB - Document database
16db = Akron("mongodb://localhost:27017/database")
17
18# Close connection when done
19db.close()

Connection Management

Use context managers for automatic connection cleanup:

Context Manager Usage
1from akron import Akron
2
3# Recommended: Automatic connection management
4with Akron("sqlite:///app.db") as db:
5 # Your database operations here
6 result = db.find("users", {"active": True})
7 # Connection automatically closed when exiting the block
8
9# Environment-based configuration
10import os
11
12DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///default.db")
13with Akron(DATABASE_URL) as db:
14 # Production-ready connection handling
15 pass

Connection Parameters

Add connection parameters for production configurations:

Advanced Connection Options
1# MySQL with connection options
2db = Akron("mysql://user:pass@host:3306/db?charset=utf8mb4&autocommit=true")
3
4# PostgreSQL with SSL
5db = Akron("postgres://user:pass@host:5432/db?sslmode=require")
6
7# MongoDB with authentication and options
8db = Akron("mongodb://user:pass@host:27017/db?authSource=admin&ssl=true")
9
10# Connection pooling (for supported databases)
11db = Akron("mysql://user:pass@host:3306/db?pool_size=10&max_overflow=20")

Model Definition

Basic Models

Define your data models using Pydantic for automatic validation and type safety:

Simple Model Definition
1from pydantic import BaseModel, EmailStr, validator
2from akron.models import ModelMixin
3from datetime import datetime
4from typing import Optional
5
6# Basic user model
7class User(BaseModel, ModelMixin):
8 id: int
9 username: str
10 email: EmailStr # Automatic email validation
11 age: Optional[int] = None
12 is_active: bool = True
13 created_at: datetime
14
15 # Custom validation
16 @validator('age')
17 def validate_age(cls, v):
18 if v is not None and (v < 0 or v > 150):
19 raise ValueError('Age must be between 0 and 150')
20 return v
21
22 @validator('username')
23 def validate_username(cls, v):
24 if len(v) < 3:
25 raise ValueError('Username must be at least 3 characters')
26 return v
27
28# Product model with more complex fields
29class Product(BaseModel, ModelMixin):
30 id: int
31 name: str
32 description: Optional[str] = None
33 price: float
34 category: str
35 in_stock: bool = True
36 tags: list = [] # Will be stored as JSON string
37 created_at: datetime
38 updated_at: Optional[datetime] = None
39
40 @validator('price')
41 def validate_price(cls, v):
42 if v < 0:
43 raise ValueError('Price cannot be negative')
44 return v

Advanced Model Features

Use advanced Pydantic features for robust data handling:

Advanced Model Examples
1from pydantic import BaseModel, Field, root_validator
2from enum import Enum
3from typing import List, Dict, Union
4
5class OrderStatus(str, Enum):
6 PENDING = "pending"
7 PROCESSING = "processing"
8 SHIPPED = "shipped"
9 DELIVERED = "delivered"
10 CANCELLED = "cancelled"
11
12class Address(BaseModel):
13 street: str
14 city: str
15 state: str
16 zip_code: str = Field(..., regex=r'\d{5}(-\d{4})?')
17 country: str = "USA"
18
19class Order(BaseModel, ModelMixin):
20 id: int
21 customer_id: int
22 status: OrderStatus = OrderStatus.PENDING
23 items: List[Dict[str, Union[str, int, float]]]
24 total_amount: float = Field(..., gt=0, description="Order total in USD")
25 shipping_address: Address
26 billing_address: Optional[Address] = None
27 notes: Optional[str] = Field(None, max_length=500)
28 created_at: datetime
29
30 @root_validator
31 def validate_addresses(cls, values):
32 # Use shipping address as billing if not provided
33 if not values.get('billing_address'):
34 values['billing_address'] = values.get('shipping_address')
35 return values
36
37 @validator('items')
38 def validate_items(cls, v):
39 if not v:
40 raise ValueError('Order must have at least one item')
41 return v
42
43# Usage example
44address = Address(
45 street="123 Main St",
46 city="Anytown",
47 state="CA",
48 zip_code="12345"
49)
50
51order = Order(
52 id=1,
53 customer_id=101,
54 items=[
55 {"product": "Laptop", "quantity": 1, "price": 999.99},
56 {"product": "Mouse", "quantity": 2, "price": 29.99}
57 ],
58 total_amount=1059.97,
59 shipping_address=address,
60 created_at=datetime.now()
61)

Schema Management

Schema Definition

Define table schemas using simple type mappings or let Akron infer them from your models:

Schema Definition Methods
1from akron import Akron
2
3db = Akron("sqlite:///app.db")
4
5# Method 1: Explicit schema definition
6user_schema = {
7 "id": "int",
8 "username": "str",
9 "email": "str",
10 "age": "int",
11 "is_active": "bool",
12 "created_at": "str" # ISO datetime string
13}
14
15db.create_table("users", user_schema)
16
17# Method 2: Schema with constraints
18product_schema = {
19 "id": "int",
20 "name": "str",
21 "price": "float",
22 "category_id": "int->categories.id", # Foreign key
23 "created_at": "str"
24}
25
26db.create_table("products", product_schema)
27
28# Method 3: Let model create its own table (recommended)
29User.create_table(db)
30Product.create_table(db)
31
32# Method 4: Conditional table creation
33if not db.table_exists("users"):
34 User.create_table(db)

Type Mapping

Understand how Python types map to database types:

Type System
1# Basic type mappings across databases
2type_examples = {
3 # Numeric types
4 "id": "int", # INTEGER/INT/NumberLong
5 "price": "float", # REAL/DOUBLE/Double
6
7 # Text types
8 "name": "str", # TEXT/VARCHAR/String
9 "description": "str", # TEXT/TEXT/String
10
11 # Boolean
12 "active": "bool", # INTEGER(0,1)/BOOLEAN/Boolean
13
14 # Date/Time
15 "created_at": "str", # TEXT(ISO)/DATETIME/ISODate
16
17 # JSON/Complex (MongoDB native, others as JSON strings)
18 "metadata": "dict", # TEXT(JSON)/JSON/Object
19 "tags": "list", # TEXT(JSON)/JSON/Array
20}
21
22# Database-specific optimizations
23mysql_schema = {
24 "id": "int",
25 "username": "str", # Becomes VARCHAR(255)
26 "bio": "text", # Becomes TEXT for long content
27 "created_at": "datetime" # Becomes DATETIME
28}
29
30postgres_schema = {
31 "id": "int",
32 "data": "jsonb", # PostgreSQL native JSONB
33 "tags": "text[]", # PostgreSQL array type
34 "coords": "point" # PostgreSQL geometric type
35}
36
37mongo_schema = {
38 # MongoDB is schemaless, but Akron provides structure
39 "id": "int",
40 "nested": "dict", # Native object support
41 "array": "list", # Native array support
42 "geo": "geojson" # GeoJSON support
43}

CRUD Operations

Create Operations

Insert data using both raw dictionaries and model instances:

Create/Insert Examples
1from akron import Akron
2from datetime import datetime
3
4db = Akron("sqlite:///app.db")
5
6# Method 1: Insert raw dictionary
7user_data = {
8 "id": 1,
9 "username": "alice",
10 "email": "alice@example.com",
11 "age": 28,
12 "is_active": True,
13 "created_at": datetime.now().isoformat()
14}
15
16user_id = db.insert("users", user_data)
17print(f"Created user with ID: {user_id}")
18
19# Method 2: Insert using model (recommended)
20user = User(
21 id=2,
22 username="bob",
23 email="bob@example.com",
24 age=32,
25 created_at=datetime.now()
26)
27
28# Validate and insert
29try:
30 user_id = User.insert(db, user)
31 print(f"User {user.username} created successfully!")
32except ValidationError as e:
33 print(f"Validation failed: {e}")
34
35# Method 3: Bulk insert
36users = [
37 User(id=3, username="charlie", email="charlie@example.com", age=25, created_at=datetime.now()),
38 User(id=4, username="diana", email="diana@example.com", age=30, created_at=datetime.now()),
39 User(id=5, username="eve", email="eve@example.com", age=27, created_at=datetime.now())
40]
41
42for user in users:
43 User.insert(db, user)
44
45print(f"Bulk inserted {len(users)} users")

Read Operations

Query data with various filtering and sorting options:

Read/Query Examples
1# Method 1: Get all records
2all_users = User.select(db)
3print(f"Total users: {len(all_users)}")
4
5# Method 2: Get single record
6alice = User.find(db, {"username": "alice"})
7if alice:
8 print(f"Found: {alice.username} ({alice.email})")
9
10# Method 3: Filter with conditions
11active_users = User.select(db, where={"is_active": True})
12print(f"Active users: {len(active_users)}")
13
14# Method 4: Complex filtering (MongoDB-style operators)
15young_users = User.select(db, where={"age": {"$lt": 30}})
16senior_users = User.select(db, where={"age": {"$gte": 30}})
17
18# Method 5: Multiple conditions
19young_active = User.select(db, where={
20 "age": {"$lt": 30},
21 "is_active": True
22})
23
24# Method 6: Raw SQL/Query for complex operations
25if db.driver_type == "sqlite":
26 result = db.execute_raw("SELECT * FROM users WHERE age BETWEEN 25 AND 35")
27elif db.driver_type == "mongodb":
28 result = db.execute_raw("db.users.find({age: {$gte: 25, $lte: 35}})")
29
30# Method 7: Pagination
31page_1 = User.select(db, limit=10, offset=0)
32page_2 = User.select(db, limit=10, offset=10)
33
34# Method 8: Ordering (database-specific)
35if db.driver_type in ["sqlite", "mysql", "postgres"]:
36 ordered_users = db.execute_raw("SELECT * FROM users ORDER BY created_at DESC")
37elif db.driver_type == "mongodb":
38 ordered_users = db.execute_raw("db.users.find().sort({created_at: -1})")

Update Operations

Modify existing records with precise control:

Update Examples
1# Method 1: Update single field
2result = User.update(db, {"id": 1}, {"age": 29})
3if result:
4 print("User age updated successfully")
5
6# Method 2: Update multiple fields
7update_data = {
8 "email": "alice.johnson@example.com",
9 "age": 29,
10 "updated_at": datetime.now().isoformat()
11}
12User.update(db, {"username": "alice"}, update_data)
13
14# Method 3: Conditional updates
15# Deactivate users older than 65
16User.update(
17 db,
18 {"age": {"$gt": 65}},
19 {"is_active": False}
20)
21
22# Method 4: Increment/decrement (MongoDB-style)
23if db.driver_type == "mongodb":
24 # Increment age by 1
25 db.execute_raw("db.users.updateOne({username: 'alice'}, {$inc: {age: 1}})")
26
27# Method 5: Update with validation
28try:
29 user = User.find(db, {"id": 1})
30 if user:
31 user.age = 30
32 user.email = "newemail@example.com"
33 # Validate updated model
34 updated_user = User(**user.dict())
35 User.update(db, {"id": 1}, updated_user.dict())
36except ValidationError as e:
37 print(f"Update validation failed: {e}")
38
39# Method 6: Bulk updates
40# Activate all inactive users
41inactive_count = User.update(
42 db,
43 {"is_active": False},
44 {"is_active": True, "updated_at": datetime.now().isoformat()}
45)
46print(f"Activated {inactive_count} users")

Delete Operations

Remove records with safety checks and confirmation:

Delete Examples
1# Method 1: Delete by ID
2result = User.delete(db, {"id": 5})
3if result:
4 print("User deleted successfully")
5
6# Method 2: Delete by condition
7deleted_count = User.delete(db, {"is_active": False})
8print(f"Deleted {deleted_count} inactive users")
9
10# Method 3: Safe delete with confirmation
11def safe_delete_user(db, user_id):
12 # Check if user exists
13 user = User.find(db, {"id": user_id})
14 if not user:
15 print(f"User {user_id} not found")
16 return False
17
18 # Show user details for confirmation
19 print(f"About to delete: {user.username} ({user.email})")
20 confirm = input("Are you sure? (yes/no): ").lower()
21
22 if confirm == 'yes':
23 result = User.delete(db, {"id": user_id})
24 if result:
25 print(f"User {user.username} deleted successfully")
26 return True
27
28 print("Delete cancelled")
29 return False
30
31# Method 4: Soft delete (mark as deleted instead of removing)
32class SoftDeleteMixin:
33 def soft_delete(self, db, conditions):
34 return self.update(db, conditions, {
35 "is_deleted": True,
36 "deleted_at": datetime.now().isoformat()
37 })
38
39 def get_active(self, db, conditions=None):
40 conditions = conditions or {}
41 conditions["is_deleted"] = {"$ne": True}
42 return self.select(db, where=conditions)
43
44# Add soft delete to your model
45class User(BaseModel, ModelMixin, SoftDeleteMixin):
46 # ... existing fields ...
47 is_deleted: bool = False
48 deleted_at: Optional[str] = None
49
50# Usage
51User.soft_delete(db, {"id": 3}) # Soft delete
52active_users = User.get_active(db) # Get non-deleted users
53
54# Method 5: Delete with cascade (handle relationships)
55def delete_user_cascade(db, user_id):
56 # Delete user's posts first
57 Post.delete(db, {"author_id": user_id})
58
59 # Then delete the user
60 User.delete(db, {"id": user_id})
61
62 print(f"User {user_id} and all related data deleted")

Relationships and Joins

Modeling Relationships

Handle relationships between tables using foreign keys and helper methods:

Relationship Examples
1# Define related models
2class Author(BaseModel, ModelMixin):
3 id: int
4 name: str
5 email: str
6 bio: Optional[str] = None
7
8class Category(BaseModel, ModelMixin):
9 id: int
10 name: str
11 description: Optional[str] = None
12
13class Article(BaseModel, ModelMixin):
14 id: int
15 title: str
16 content: str
17 author_id: int # Foreign key to Author
18 category_id: int # Foreign key to Category
19 published: bool = False
20 created_at: datetime
21
22 # Helper methods for relationships
23 def get_author(self, db):
24 return Author.find(db, {"id": self.author_id})
25
26 def get_category(self, db):
27 return Category.find(db, {"id": self.category_id})
28
29 @classmethod
30 def get_by_author(cls, db, author_id):
31 return cls.select(db, where={"author_id": author_id})
32
33 @classmethod
34 def get_published_in_category(cls, db, category_id):
35 return cls.select(db, where={
36 "category_id": category_id,
37 "published": True
38 })
39
40# Usage examples
41db = Akron("sqlite:///blog.db")
42
43# Create tables
44Author.create_table(db)
45Category.create_table(db)
46Article.create_table(db)
47
48# Create related data
49tech_category = Category(id=1, name="Technology", description="Tech articles")
50Category.insert(db, tech_category)
51
52author = Author(id=1, name="John Doe", email="john@example.com")
53Author.insert(db, author)
54
55article = Article(
56 id=1,
57 title="Introduction to Akron ORM",
58 content="Akron is a powerful ORM...",
59 author_id=1,
60 category_id=1,
61 published=True,
62 created_at=datetime.now()
63)
64Article.insert(db, article)
65
66# Query relationships
67article = Article.find(db, {"id": 1})
68author = article.get_author(db)
69category = article.get_category(db)
70
71print(f"Article: {article.title}")
72print(f"Author: {author.name}")
73print(f"Category: {category.name}")
74
75# Get all articles by author
76john_articles = Article.get_by_author(db, 1)
77print(f"John has written {len(john_articles)} articles")

Manual Joins

Perform join operations using raw SQL for complex queries:

Join Operations
1# SQL-based joins (for SQL databases)
2def get_articles_with_authors(db):
3 if db.driver_type in ["sqlite", "mysql", "postgres"]:
4 query = """
5 SELECT
6 a.id, a.title, a.content, a.published, a.created_at,
7 au.name as author_name, au.email as author_email,
8 c.name as category_name
9 FROM articles a
10 JOIN authors au ON a.author_id = au.id
11 JOIN categories c ON a.category_id = c.id
12 WHERE a.published = 1
13 ORDER BY a.created_at DESC
14 """
15 return db.execute_raw(query)
16
17 elif db.driver_type == "mongodb":
18 # MongoDB aggregation pipeline
19 pipeline = [
20 {"$match": {"published": True}},
21 {"$lookup": {
22 "from": "authors",
23 "localField": "author_id",
24 "foreignField": "id",
25 "as": "author"
26 }},
27 {"$lookup": {
28 "from": "categories",
29 "localField": "category_id",
30 "foreignField": "id",
31 "as": "category"
32 }},
33 {"$sort": {"created_at": -1}}
34 ]
35 return db.execute_raw(f"db.articles.aggregate({pipeline})")
36
37# Application-level joins (database agnostic)
38def get_article_details(db, article_id):
39 article = Article.find(db, {"id": article_id})
40 if not article:
41 return None
42
43 author = Author.find(db, {"id": article.author_id})
44 category = Category.find(db, {"id": article.category_id})
45
46 return {
47 "article": article.dict(),
48 "author": author.dict() if author else None,
49 "category": category.dict() if category else None
50 }
51
52# Batch loading for performance
53def get_articles_with_details(db, limit=10):
54 articles = Article.select(db, limit=limit)
55
56 # Get unique author and category IDs
57 author_ids = list(set(a.author_id for a in articles))
58 category_ids = list(set(a.category_id for a in articles))
59
60 # Batch load authors and categories
61 authors = {a.id: a for a in Author.select(db, where={"id": {"$in": author_ids}})}
62 categories = {c.id: c for c in Category.select(db, where={"id": {"$in": category_ids}})}
63
64 # Combine data
65 result = []
66 for article in articles:
67 result.append({
68 "article": article.dict(),
69 "author": authors.get(article.author_id, {}).dict() if article.author_id in authors else None,
70 "category": categories.get(article.category_id, {}).dict() if article.category_id in categories else None
71 })
72
73 return result

Error Handling

Common Error Patterns

Handle database errors gracefully with proper exception handling:

Error Handling Examples
1from akron.exceptions import AkronError
2from pydantic import ValidationError
3import logging
4
5# Setup logging
6logging.basicConfig(level=logging.INFO)
7logger = logging.getLogger(__name__)
8
9def safe_database_operation(db, operation_func, *args, **kwargs):
10 """Wrapper for safe database operations"""
11 try:
12 return operation_func(*args, **kwargs)
13 except ValidationError as e:
14 logger.error(f"Validation error: {e}")
15 return {"error": "Invalid data", "details": str(e)}
16 except AkronError as e:
17 logger.error(f"Database error: {e}")
18 return {"error": "Database operation failed", "details": str(e)}
19 except Exception as e:
20 logger.error(f"Unexpected error: {e}")
21 return {"error": "Operation failed", "details": str(e)}
22
23# Connection error handling
24def get_database_safely(db_url):
25 """Get database connection with error handling"""
26 try:
27 db = Akron(db_url)
28 # Test connection
29 if db.driver_type == "sqlite":
30 db.execute_raw("SELECT 1")
31 elif db.driver_type in ["mysql", "postgres"]:
32 db.execute_raw("SELECT 1")
33 elif db.driver_type == "mongodb":
34 db.execute_raw("db.runCommand({ping: 1})")
35
36 logger.info(f"Connected to {db.driver_type} database")
37 return db
38 except Exception as e:
39 logger.error(f"Failed to connect to database: {e}")
40 return None
41
42# CRUD with error handling
43def create_user_safely(db, user_data):
44 """Create user with comprehensive error handling"""
45 try:
46 # Validate data first
47 user = User(**user_data)
48
49 # Check if user already exists
50 existing = User.find(db, {"email": user.email})
51 if existing:
52 return {"error": "User with this email already exists"}
53
54 # Insert user
55 user_id = User.insert(db, user)
56 logger.info(f"User created successfully: {user.username}")
57
58 return {"success": True, "user_id": user_id, "user": user.dict()}
59
60 except ValidationError as e:
61 logger.error(f"User validation failed: {e}")
62 return {"error": "Invalid user data", "details": e.errors()}
63 except Exception as e:
64 logger.error(f"Failed to create user: {e}")
65 return {"error": "User creation failed", "details": str(e)}
66
67def update_user_safely(db, user_id, update_data):
68 """Update user with safety checks"""
69 try:
70 # Check if user exists
71 user = User.find(db, {"id": user_id})
72 if not user:
73 return {"error": f"User {user_id} not found"}
74
75 # Validate update data
76 current_data = user.dict()
77 current_data.update(update_data)
78 validated_user = User(**current_data)
79
80 # Perform update
81 result = User.update(db, {"id": user_id}, update_data)
82
83 if result:
84 logger.info(f"User {user_id} updated successfully")
85 return {"success": True, "user": validated_user.dict()}
86 else:
87 return {"error": "Update operation failed"}
88
89 except ValidationError as e:
90 return {"error": "Invalid update data", "details": e.errors()}
91 except Exception as e:
92 logger.error(f"Failed to update user {user_id}: {e}")
93 return {"error": "Update failed", "details": str(e)}
94
95# Transaction-like operations for multiple changes
96def transfer_article_ownership(db, article_id, new_author_id):
97 """Transfer article ownership with rollback capability"""
98 try:
99 # Verify article exists
100 article = Article.find(db, {"id": article_id})
101 if not article:
102 return {"error": "Article not found"}
103
104 # Verify new author exists
105 new_author = Author.find(db, {"id": new_author_id})
106 if not new_author:
107 return {"error": "New author not found"}
108
109 old_author_id = article.author_id
110
111 # Update article ownership
112 result = Article.update(
113 db,
114 {"id": article_id},
115 {"author_id": new_author_id, "updated_at": datetime.now().isoformat()}
116 )
117
118 if result:
119 logger.info(f"Article {article_id} transferred from author {old_author_id} to {new_author_id}")
120 return {"success": True, "old_author": old_author_id, "new_author": new_author_id}
121 else:
122 return {"error": "Transfer failed"}
123
124 except Exception as e:
125 logger.error(f"Article transfer failed: {e}")
126 return {"error": "Transfer operation failed", "details": str(e)}
127
128# Usage examples
129db_url = "sqlite:///blog.db"
130db = get_database_safely(db_url)
131
132if db:
133 # Safe user creation
134 result = create_user_safely(db, {
135 "id": 1,
136 "username": "testuser",
137 "email": "test@example.com",
138 "age": 25,
139 "created_at": datetime.now()
140 })
141 print(result)
142
143 # Safe user update
144 result = update_user_safely(db, 1, {"age": 26})
145 print(result)
146else:
147 print("Failed to connect to database")

Best Practices

🔐 Security

1# Use environment variables for sensitive data
2import os
3from dotenv import load_dotenv
4
5load_dotenv()
6
7DATABASE_URL = os.getenv("DATABASE_URL")
8if not DATABASE_URL:
9 raise ValueError("DATABASE_URL environment variable is required")
10
11# Never hardcode credentials
12# ❌ Bad
13db = Akron("mysql://admin:password123@localhost/db")
14
15# ✅ Good
16db = Akron(DATABASE_URL)
17
18# Validate input data
19def create_user_endpoint(user_data):
20 try:
21 user = User(**user_data) # Pydantic validation
22 with Akron(DATABASE_URL) as db:
23 return User.insert(db, user)
24 except ValidationError as e:
25 return {"error": "Invalid data", "details": e.errors()}

⚡ Performance

1# Use connection pooling for production
2class DatabaseManager:
3 def __init__(self, db_url):
4 self.db_url = db_url
5 self._connection = None
6
7 def get_connection(self):
8 if not self._connection:
9 self._connection = Akron(self.db_url)
10 return self._connection
11
12 def close(self):
13 if self._connection:
14 self._connection.close()
15
16# Batch operations for better performance
17def create_multiple_users(db, users_data):
18 users = [User(**data) for data in users_data]
19
20 # Validate all users first
21 for user in users:
22 user.dict() # Triggers validation
23
24 # Insert in batch
25 for user in users:
26 User.insert(db, user)
27
28# Use indexes for frequently queried fields (database-specific)
29def create_performance_indexes(db):
30 if db.driver_type in ["mysql", "postgres"]:
31 db.execute_raw("CREATE INDEX idx_users_email ON users(email)")
32 db.execute_raw("CREATE INDEX idx_articles_author ON articles(author_id)")
33 db.execute_raw("CREATE INDEX idx_articles_published ON articles(published, created_at)")

🧪 Testing

1import pytest
2from akron import Akron
3
4@pytest.fixture
5def test_db():
6 """Create test database"""
7 db = Akron("sqlite:///:memory:")
8 User.create_table(db)
9 yield db
10 db.close()
11
12@pytest.fixture
13def sample_user():
14 """Create sample user for testing"""
15 return User(
16 id=1,
17 username="testuser",
18 email="test@example.com",
19 age=25,
20 created_at=datetime.now()
21 )
22
23def test_user_creation(test_db, sample_user):
24 """Test user creation"""
25 user_id = User.insert(test_db, sample_user)
26 assert user_id is not None
27
28 retrieved_user = User.find(test_db, {"id": user_id})
29 assert retrieved_user.username == sample_user.username
30
31def test_user_validation():
32 """Test user validation"""
33 with pytest.raises(ValidationError):
34 User(
35 id=1,
36 username="ab", # Too short
37 email="invalid-email", # Invalid email
38 age=-5 # Invalid age
39 )

📊 Monitoring

1import time
2from functools import wraps
3
4def monitor_database_operations(func):
5 """Decorator to monitor database operations"""
6 @wraps(func)
7 def wrapper(*args, **kwargs):
8 start_time = time.time()
9 try:
10 result = func(*args, **kwargs)
11 duration = time.time() - start_time
12 logger.info(f"{func.__name__} completed in {duration:.3f}s")
13 return result
14 except Exception as e:
15 duration = time.time() - start_time
16 logger.error(f"{func.__name__} failed after {duration:.3f}s: {e}")
17 raise
18 return wrapper
19
20@monitor_database_operations
21def get_user_articles(db, user_id):
22 return Article.select(db, where={"author_id": user_id})
23
24# Database health check
25def check_database_health(db):
26 """Check if database is healthy"""
27 try:
28 start_time = time.time()
29
30 if db.driver_type == "sqlite":
31 db.execute_raw("SELECT 1")
32 elif db.driver_type in ["mysql", "postgres"]:
33 db.execute_raw("SELECT 1")
34 elif db.driver_type == "mongodb":
35 db.execute_raw("db.runCommand({ping: 1})")
36
37 response_time = time.time() - start_time
38
39 return {
40 "healthy": True,
41 "response_time": response_time,
42 "database_type": db.driver_type
43 }
44 except Exception as e:
45 return {
46 "healthy": False,
47 "error": str(e),
48 "database_type": db.driver_type
49 }

Next Steps

You've mastered the basics of Akron ORM! Here's what to explore next: