MongoDB Support

Complete guide to using MongoDB with Akron ORM - flexible document database perfect for modern applications and rapid development.

Overview

MongoDB is a document-oriented NoSQL database that stores data in flexible, JSON-like documents. It's designed for scalability, performance, and high availability, making it ideal for modern applications that need to handle diverse data types and rapid development cycles.

🍃 MongoDB Advantages

  • • Flexible schema design with JSON-like documents
  • • Horizontal scaling with automatic sharding
  • • Rich query language with aggregation framework
  • • High performance for read and write operations
  • • Built-in replication and high availability
  • • Native support for geospatial data

Prerequisites

Before using MongoDB with Akron, ensure you have the required dependency installed:

Install MongoDB Driver
1# Install the MongoDB driver
2pip install pymongo
3
4# For additional features (optional)
5pip install pymongo[srv] # For DNS SRV record support
6
7# Using conda
8conda install pymongo

Connection Setup

Basic Connection

MongoDB Connection Examples
1from akron import Akron
2
3# Local MongoDB instance
4db = Akron("mongodb://localhost:27017/mydatabase")
5
6# MongoDB with authentication
7db = Akron("mongodb://username:password@localhost:27017/mydatabase")
8
9# MongoDB Atlas (cloud)
10db = Akron("mongodb+srv://username:password@cluster.mongodb.net/mydatabase")
11
12# MongoDB with replica set
13db = Akron("mongodb://user:pass@host1:27017,host2:27017,host3:27017/db?replicaSet=myReplicaSet")
14
15# MongoDB with connection options
16db = Akron("mongodb://localhost:27017/db?authSource=admin&ssl=true&retryWrites=true")

Connection URL Format

mongodb://[username:password@]host1[:port1][,host2[:port2],...]/database[?options]

username:password: Authentication credentials (optional)

host:port: MongoDB server(s) (default port: 27017)

database: Database name

options: Connection parameters

Common Connection Options

Authentication

  • authSource=admin - Auth database
  • authMechanism=SCRAM-SHA-256 - Auth method

Connection

  • ssl=true - Enable SSL
  • retryWrites=true - Retry failed writes
  • maxPoolSize=50 - Connection pool size

Document Structure

MongoDB stores data as BSON documents. Akron automatically converts between Python objects and BSON:

Python TypeBSON TypeExample
int32-bit/64-bit Integer42, 2147483648
strString"Hello World"
floatDouble3.14159
boolBooleantrue, false
listArray[1, 2, 3]
dictDocument{"key": "value"}
datetimeDateISODate("2024-01-01")
ObjectIdObjectIdObjectId("507f1f77...")

Complete Example

MongoDB Full CRUD Example
1from akron import Akron
2from pydantic import BaseModel
3from akron.models import ModelMixin
4from datetime import datetime
5from typing import List, Dict, Optional
6from bson import ObjectId
7
8# Define your model for MongoDB documents
9class BlogPost(BaseModel, ModelMixin):
10 _id: Optional[str] = None # MongoDB ObjectId
11 title: str
12 content: str
13 author: Dict[str, str]
14 tags: List[str]
15 metadata: Dict[str, any]
16 comments: List[Dict[str, any]] = []
17 published: bool = False
18 view_count: int = 0
19 created_at: datetime
20 updated_at: Optional[datetime] = None
21
22# Connect to MongoDB
23db = Akron("mongodb://localhost:27017/blog")
24
25# Note: MongoDB doesn't require explicit table creation
26# Collections are created automatically when first document is inserted
27
28# Insert blog posts
29post1 = BlogPost(
30 title="Getting Started with MongoDB",
31 content="MongoDB is a document database...",
32 author={
33 "name": "Alice Johnson",
34 "email": "alice@example.com",
35 "bio": "Database enthusiast"
36 },
37 tags=["mongodb", "nosql", "database", "tutorial"],
38 metadata={
39 "category": "tutorial",
40 "difficulty": "beginner",
41 "estimated_read_time": 8,
42 "featured": True
43 },
44 created_at=datetime.now()
45)
46
47post2 = BlogPost(
48 title="Advanced MongoDB Aggregation",
49 content="Learn complex aggregation pipelines...",
50 author={
51 "name": "Bob Smith",
52 "email": "bob@example.com",
53 "bio": "Senior Developer"
54 },
55 tags=["mongodb", "aggregation", "advanced"],
56 metadata={
57 "category": "advanced",
58 "difficulty": "expert",
59 "estimated_read_time": 20
60 },
61 comments=[
62 {
63 "author": "Jane Doe",
64 "content": "Excellent tutorial!",
65 "created_at": datetime.now()
66 }
67 ],
68 published=True,
69 view_count=342,
70 created_at=datetime.now()
71)
72
73# Insert documents
74BlogPost.insert(db, post1)
75BlogPost.insert(db, post2)
76
77# Query documents
78all_posts = BlogPost.select(db)
79print(f"Total blog posts: {len(all_posts)}")
80
81# Find published posts
82published_posts = BlogPost.select(db, where={"published": True})
83print(f"Published posts: {len(published_posts)}")
84
85# Complex queries with MongoDB operators
86# Find posts by specific author
87alice_posts = BlogPost.select(db, where={"author.name": "Alice Johnson"})
88
89# Find posts with specific tags
90tutorial_posts = BlogPost.select(db, where={"tags": {"$in": ["tutorial"]}})
91
92# Find posts with high view count
93popular_posts = BlogPost.select(db, where={"view_count": {"$gte": 100}})
94
95# Text search (requires text index)
96# db.execute_raw("db.blogposts.createIndex({title: 'text', content: 'text'})")
97# search_results = BlogPost.select(db, where={"$text": {"$search": "mongodb tutorial"}})
98
99# Update documents
100BlogPost.update(
101 db,
102 {"title": "Getting Started with MongoDB"},
103 {
104 "published": True,
105 "view_count": 45,
106 "updated_at": datetime.now()
107 }
108)
109
110# Add comment to a post
111BlogPost.update(
112 db,
113 {"title": "Advanced MongoDB Aggregation"},
114 {
115 "$push": {
116 "comments": {
117 "author": "Charlie Brown",
118 "content": "Very detailed explanation!",
119 "created_at": datetime.now()
120 }
121 }
122 }
123)
124
125# Increment view count
126BlogPost.update(
127 db,
128 {"title": "Getting Started with MongoDB"},
129 {"$inc": {"view_count": 1}}
130)
131
132print("MongoDB operations completed successfully!")

Advanced MongoDB Features

Aggregation Pipeline

1# Complex aggregation queries
2aggregation_pipeline = [
3 # Match published posts
4 {"$match": {"published": True}},
5
6 # Group by author
7 {"$group": {
8 "_id": "$author.name",
9 "post_count": {"$sum": 1},
10 "total_views": {"$sum": "$view_count"},
11 "avg_read_time": {"$avg": "$metadata.estimated_read_time"},
12 "posts": {"$push": {
13 "title": "$title",
14 "views": "$view_count",
15 "tags": "$tags"
16 }}
17 }},
18
19 # Sort by total views
20 {"$sort": {"total_views": -1}},
21
22 # Add computed fields
23 {"$addFields": {
24 "engagement_score": {"$multiply": ["$post_count", "$total_views"]}
25 }},
26
27 # Limit results
28 {"$limit": 10}
29]
30
31# Execute aggregation
32result = db.execute_raw(f"db.blogposts.aggregate({aggregation_pipeline})")
33for author_stats in result:
34 print(f"Author: {author_stats['_id']}")
35 print(f"Posts: {author_stats['post_count']}")
36 print(f"Total Views: {author_stats['total_views']}")
37 print(f"Engagement Score: {author_stats['engagement_score']}")
38 print("---")

Indexing Strategy

1# Create various types of indexes for better performance
2
3# Single field indexes
4db.execute_raw("db.blogposts.createIndex({'author.name': 1})")
5db.execute_raw("db.blogposts.createIndex({'created_at': -1})")
6db.execute_raw("db.blogposts.createIndex({'published': 1})")
7
8# Compound indexes
9db.execute_raw("db.blogposts.createIndex({'published': 1, 'created_at': -1})")
10db.execute_raw("db.blogposts.createIndex({'author.name': 1, 'published': 1})")
11
12# Text index for full-text search
13db.execute_raw("db.blogposts.createIndex({'title': 'text', 'content': 'text'})")
14
15# Multikey index for arrays
16db.execute_raw("db.blogposts.createIndex({'tags': 1})")
17
18# Sparse index (only indexes documents with the field)
19db.execute_raw("db.blogposts.createIndex({'updated_at': 1}, {'sparse': true})")
20
21# Partial index (only indexes documents matching condition)
22db.execute_raw("db.blogposts.createIndex({'view_count': 1}, {'partialFilterExpression': {'view_count': {'$gt': 100}}})")
23
24# Check index usage
25index_stats = db.execute_raw("db.blogposts.getIndexes()")
26for index in index_stats:
27 print(f"Index: {index['name']} - {index['key']}")

Geospatial Queries

1# Example with geospatial data
2from pydantic import BaseModel
3from akron.models import ModelMixin
4
5class Location(BaseModel, ModelMixin):
6 _id: Optional[str] = None
7 name: str
8 type: str # restaurant, hotel, etc.
9 location: Dict[str, any] # GeoJSON format
10 rating: float
11 reviews: int
12
13# Insert locations with geospatial data
14cafe = Location(
15 name="Central Cafe",
16 type="restaurant",
17 location={
18 "type": "Point",
19 "coordinates": [-73.9857, 40.7484] # longitude, latitude (NYC)
20 },
21 rating=4.5,
22 reviews=127
23)
24
25Location.insert(db, cafe)
26
27# Create 2dsphere index for geospatial queries
28db.execute_raw("db.locations.createIndex({'location': '2dsphere'})")
29
30# Find locations near a point (within 1000 meters)
31nearby_locations = db.execute_raw("""
32db.locations.find({
33 location: {
34 $near: {
35 $geometry: {
36 type: "Point",
37 coordinates: [-73.9857, 40.7484]
38 },
39 $maxDistance: 1000
40 }
41 }
42})
43""")
44
45# Find locations within a polygon
46within_polygon = db.execute_raw("""
47db.locations.find({
48 location: {
49 $geoWithin: {
50 $geometry: {
51 type: "Polygon",
52 coordinates: [[
53 [-74.0, 40.7],
54 [-73.9, 40.7],
55 [-73.9, 40.8],
56 [-74.0, 40.8],
57 [-74.0, 40.7]
58 ]]
59 }
60 }
61 }
62})
63""")

Change Streams

1# Monitor real-time changes (requires replica set)
2def watch_blog_changes():
3 # Watch for changes in the blogposts collection
4 change_stream = db.execute_raw("db.blogposts.watch()")
5
6 for change in change_stream:
7 operation = change['operationType']
8 if operation == 'insert':
9 print(f"New post created: {change['fullDocument']['title']}")
10 elif operation == 'update':
11 print(f"Post updated: {change['documentKey']['_id']}")
12 elif operation == 'delete':
13 print(f"Post deleted: {change['documentKey']['_id']}")
14
15# Watch specific operations
16def watch_published_posts():
17 pipeline = [
18 {
19 "$match": {
20 "fullDocument.published": True,
21 "operationType": {"$in": ["insert", "update"]}
22 }
23 }
24 ]
25
26 change_stream = db.execute_raw(f"db.blogposts.watch({pipeline})")
27
28 for change in change_stream:
29 print(f"Published post activity: {change['fullDocument']['title']}")
30
31# Note: Change streams require MongoDB replica set or sharded cluster

Performance Optimization

Query Optimization

1# Use explain() to analyze query performance
2explain_result = db.execute_raw("""
3db.blogposts.find({
4 "published": true,
5 "author.name": "Alice Johnson"
6}).explain("executionStats")
7""")
8
9print(f"Documents examined: {explain_result['executionStats']['totalDocsExamined']}")
10print(f"Documents returned: {explain_result['executionStats']['totalDocsReturned']}")
11print(f"Index used: {explain_result['executionStats']['indexName'] if 'indexName' in explain_result['executionStats'] else 'None'}")
12
13# Optimize with projection (only return needed fields)
14optimized_query = db.execute_raw("""
15db.blogposts.find(
16 {"published": true},
17 {"title": 1, "author.name": 1, "created_at": 1, "_id": 0}
18)
19""")
20
21# Use limit and skip for pagination
22paginated_results = db.execute_raw("""
23db.blogposts.find({"published": true})
24.sort({"created_at": -1})
25.skip(0)
26.limit(10)
27""")

Connection Pooling

1# Configure connection pool for production
2db = Akron("mongodb://localhost:27017/blog?maxPoolSize=50&minPoolSize=5&maxIdleTimeMS=30000")
3
4# Monitor connection pool statistics
5pool_stats = db.execute_raw("db.serverStatus().connections")
6print(f"Current connections: {pool_stats}")
7
8# Use read preferences for read scaling
9# Primary (default) - read from primary
10# Secondary - read from secondary (eventual consistency)
11# Nearest - read from nearest replica set member
12
13db = Akron("mongodb://localhost:27017/blog?readPreference=secondary")

CLI Commands

Use Akron CLI for MongoDB database management:

MongoDB CLI Examples
1# Note: MongoDB collections are created automatically, but you can still use create-table
2akron create-table blogposts --db "mongodb://localhost:27017/blog" \
3 --schema '{
4 "title": "str",
5 "content": "str",
6 "author": "dict",
7 "tags": "list",
8 "published": "bool",
9 "created_at": "datetime"
10 }'
11
12# Inspect database collections and documents
13akron inspect-schema --db "mongodb://localhost:27017/blog"
14
15# Seed with complex document data
16akron seed blogposts --db "mongodb://localhost:27017/blog" \
17 --data '[{
18 "title": "Sample Post",
19 "content": "This is a sample blog post...",
20 "author": {
21 "name": "John Doe",
22 "email": "john@example.com"
23 },
24 "tags": ["sample", "tutorial"],
25 "published": true,
26 "created_at": "2024-01-01T00:00:00"
27 }]'
28
29# Execute MongoDB queries
30akron raw-sql --db "mongodb://localhost:27017/blog" \
31 --query "db.blogposts.find({published: true}).limit(5)"
32
33# Advanced aggregation query
34akron raw-sql --db "mongodb://localhost:27017/blog" \
35 --query "db.blogposts.aggregate([
36 {$match: {published: true}},
37 {$group: {_id: '$author.name', count: {$sum: 1}}},
38 {$sort: {count: -1}}
39 ])"
40
41# Create indexes
42akron raw-sql --db "mongodb://localhost:27017/blog" \
43 --query "db.blogposts.createIndex({'author.name': 1, 'published': 1})"
44
45# Check collection stats
46akron raw-sql --db "mongodb://localhost:27017/blog" \
47 --query "db.blogposts.stats()"
48
49# Note: Migrations work differently in MongoDB due to its schemaless nature
50# Migrations typically involve data transformations rather than schema changes

Best Practices

✅ Do

  • • Design documents to minimize the need for joins
  • • Use embedded documents for one-to-few relationships
  • • Create appropriate indexes for your query patterns
  • • Use aggregation pipelines for complex analytics
  • • Implement proper error handling and retry logic
  • • Use projection to limit returned data
  • • Monitor performance with MongoDB profiler

❌ Don't

  • • Create documents larger than 16MB
  • • Use MongoDB for complex transactions across multiple documents
  • • Ignore index usage and query performance
  • • Store large binary files directly in documents
  • • Create too many indexes (impacts write performance)
  • • Use inefficient query patterns like regex without anchors
  • • Forget to handle ObjectId conversion properly

Common Issues & Solutions

Connection timeout

MongoDB server is not reachable or connection parameters are incorrect.

1# Check MongoDB service status
2sudo systemctl status mongod
3
4# Start MongoDB service
5sudo systemctl start mongod
6
7# Verify MongoDB is listening
8sudo netstat -tlnp | grep :27017
9
10# Test connection with MongoDB client
11mongo --host localhost --port 27017

Authentication failed

Incorrect credentials or authentication is not properly configured.

1# Enable authentication in MongoDB config
2# Edit /etc/mongod.conf:
3# security:
4# authorization: enabled
5
6# Create admin user
7mongo admin
8db.createUser({
9 user: "admin",
10 pwd: "secure_password",
11 roles: ["userAdminAnyDatabase", "dbAdminAnyDatabase", "readWriteAnyDatabase"]
12})
13
14# Create database-specific user
15use blog
16db.createUser({
17 user: "blog_user",
18 pwd: "blog_password",
19 roles: ["readWrite"]
20})

Slow queries

Queries taking too long due to missing indexes or inefficient query patterns.

1# Enable profiler to capture slow queries
2db.setProfilingLevel(2, { slowms: 100 }) # Log queries > 100ms
3
4# Check slow queries
5db.system.profile.find().sort({ts: -1}).limit(5)
6
7# Analyze query with explain
8db.collection.find({field: value}).explain("executionStats")
9
10# Common optimizations:
11# 1. Add indexes for frequently queried fields
12db.collection.createIndex({field: 1})
13
14# 2. Use compound indexes for multi-field queries
15db.collection.createIndex({field1: 1, field2: 1})
16
17# 3. Use projection to limit data transfer
18db.collection.find({query}, {field1: 1, field2: 1})

Related Documentation