MongoDB Testing with mongomock, mongodb-memory-server, and Mockgoose
MongoDB's document model is flexible and developer-friendly — but that flexibility makes testing tricky. Schema validation is optional, embedded documents nest arbitrarily deep, and aggregation pipelines can be complex. This guide covers every layer of MongoDB testing, from fast in-memory fakes to real container-based integration tests.
Testing Approaches for MongoDB
Unit tests → mongomock (Python) or mongodb-memory-server (Node)
↓ fast, no Docker, good for most tests
Integration tests → Testcontainers / Docker
↓ real MongoDB, catches version-specific issues
E2E / Production monitoring → HelpMeTest
↓ 24/7 monitoring of live app behaviorPython: Testing with mongomock
mongomock is a pure-Python MongoDB implementation. It supports most PyMongo operations and is ideal for unit tests.
Installation
pip install mongomock pytestFor async support:
pip install mongomock motor # motor is the async MongoDB driverBasic Usage
import mongomock
import pytest
from bson import ObjectId
@pytest.fixture
def mongo_client():
client = mongomock.MongoClient()
yield client
client.close()
@pytest.fixture
def db(mongo_client):
return mongo_client.get_database("testdb")
def test_insert_and_find(db):
users = db.get_collection("users")
result = users.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
assert result.inserted_id is not None
user = users.find_one({"email": "alice@example.com"})
assert user["name"] == "Alice"
assert user["age"] == 30
def test_update_operations(db):
items = db.get_collection("inventory")
items.insert_one({"sku": "widget-001", "quantity": 10, "price": 9.99})
items.update_one(
{"sku": "widget-001"},
{"$inc": {"quantity": -3}, "$set": {"last_sold": "2024-01-15"}}
)
item = items.find_one({"sku": "widget-001"})
assert item["quantity"] == 7
assert item["last_sold"] == "2024-01-15"Testing Aggregation Pipelines
Aggregation is where MongoDB shines — and where bugs hide. mongomock supports most pipeline stages:
def test_sales_by_category(db):
orders = db.get_collection("orders")
orders.insert_many([
{"category": "electronics", "amount": 299.99, "status": "completed"},
{"category": "books", "amount": 24.99, "status": "completed"},
{"category": "electronics", "amount": 149.50, "status": "completed"},
{"category": "books", "amount": 15.00, "status": "pending"},
{"category": "clothing", "amount": 89.99, "status": "completed"},
])
pipeline = [
{"$match": {"status": "completed"}},
{"$group": {
"_id": "$category",
"total": {"$sum": "$amount"},
"count": {"$sum": 1}
}},
{"$sort": {"total": -1}}
]
results = list(orders.aggregate(pipeline))
assert results[0]["_id"] == "electronics"
assert abs(results[0]["total"] - 449.49) < 0.01
assert results[0]["count"] == 2
assert results[1]["_id"] == "clothing"
assert len(results) == 3 # Only completed orders
def test_lookup_join(db):
"""Test $lookup stage for joining collections"""
users = db.get_collection("users")
posts = db.get_collection("posts")
user_id = ObjectId()
users.insert_one({"_id": user_id, "name": "Bob", "email": "bob@test.com"})
posts.insert_many([
{"title": "First Post", "author_id": user_id},
{"title": "Second Post", "author_id": user_id},
])
pipeline = [
{"$match": {"_id": user_id}},
{"$lookup": {
"from": "posts",
"localField": "_id",
"foreignField": "author_id",
"as": "user_posts"
}}
]
result = list(users.aggregate(pipeline))[0]
assert len(result["user_posts"]) == 2Testing Repository Classes with mongomock
class ProductRepository:
def __init__(self, db):
self.collection = db.get_collection("products")
def create(self, data: dict) -> dict:
result = self.collection.insert_one(data)
return self.collection.find_one({"_id": result.inserted_id})
def find_in_price_range(self, min_price: float, max_price: float) -> list:
return list(self.collection.find({
"price": {"$gte": min_price, "$lte": max_price}
}).sort("price", 1))
def update_stock(self, product_id: str, delta: int) -> bool:
result = self.collection.update_one(
{"_id": ObjectId(product_id), "stock": {"$gte": -delta}},
{"$inc": {"stock": delta}}
)
return result.modified_count == 1
@pytest.fixture
def product_repo(db):
return ProductRepository(db)
def test_find_products_in_price_range(product_repo):
product_repo.create({"name": "Cheap Item", "price": 9.99, "stock": 100})
product_repo.create({"name": "Mid Item", "price": 49.99, "stock": 50})
product_repo.create({"name": "Expensive Item", "price": 199.99, "stock": 10})
results = product_repo.find_in_price_range(10.00, 100.00)
assert len(results) == 1
assert results[0]["name"] == "Mid Item"
def test_prevents_negative_stock(product_repo):
product = product_repo.create({"name": "Widget", "price": 5.00, "stock": 3})
product_id = str(product["_id"])
# Can't remove more than available
assert product_repo.update_stock(product_id, -5) is False
# Stock unchanged
current = product_repo.collection.find_one({"_id": product["_id"]})
assert current["stock"] == 3mongomock Limitations
Be aware of what mongomock doesn't support:
- Change streams (
watch()) - Full-text search (
$textqueries) - Geospatial queries (partial support)
- Some aggregation operators (e.g.,
$graphLookupis partial) - Transactions (multi-document)
- Atlas-specific features
For these, use mongodb-memory-server or a real MongoDB container.
Node.js: Testing with mongodb-memory-server
mongodb-memory-server downloads and runs a real MongoDB binary in-process. It has full feature parity with MongoDB.
Installation
npm install --save-dev mongodb-memory-server mongoose jestThe first run downloads the MongoDB binary (~70MB). Add to .gitignore:
.cache/mongodb-memory-server/Jest Global Setup
// jest.config.js
module.exports = {
globalSetup: './tests/setup/mongoSetup.js',
globalTeardown: './tests/setup/mongoTeardown.js',
setupFilesAfterFramework: ['./tests/setup/mongoConnect.js']
};
// tests/setup/mongoSetup.js
const { MongoMemoryServer } = require('mongodb-memory-server');
module.exports = async () => {
const mongod = await MongoMemoryServer.create();
global.__MONGOD__ = mongod;
process.env.MONGO_URI = mongod.getUri();
};
// tests/setup/mongoTeardown.js
module.exports = async () => {
await global.__MONGOD__.stop();
};
// tests/setup/mongoConnect.js
const mongoose = require('mongoose');
beforeAll(async () => {
await mongoose.connect(process.env.MONGO_URI);
});
afterAll(async () => {
await mongoose.connection.close();
});
beforeEach(async () => {
const collections = mongoose.connection.collections;
for (const key in collections) {
await collections[key].deleteMany({});
}
});Testing Mongoose Models
const mongoose = require('mongoose');
const UserSchema = new mongoose.Schema({
email: { type: String, required: true, unique: true, lowercase: true },
name: { type: String, required: true, minlength: 2 },
role: { type: String, enum: ['user', 'admin'], default: 'user' },
createdAt: { type: Date, default: Date.now }
});
const User = mongoose.model('User', UserSchema);
describe('User Model', () => {
test('creates user with valid data', async () => {
const user = await User.create({
email: 'TEST@EXAMPLE.COM',
name: 'Test User'
});
expect(user._id).toBeDefined();
expect(user.email).toBe('test@example.com'); // lowercased
expect(user.role).toBe('user'); // default
});
test('enforces required fields', async () => {
await expect(User.create({ name: 'No Email' }))
.rejects.toThrow(/email.*required/i);
});
test('enforces unique email', async () => {
await User.create({ email: 'dup@example.com', name: 'First' });
await expect(User.create({ email: 'dup@example.com', name: 'Second' }))
.rejects.toThrow(/duplicate key/i);
});
test('validates enum values', async () => {
await expect(User.create({
email: 'admin@example.com',
name: 'Admin',
role: 'superuser' // not in enum
})).rejects.toThrow(/not a valid enum/i);
});
});Testing Mongoose Middleware and Virtuals
UserSchema.virtual('initials').get(function() {
return this.name.split(' ').map(n => n[0]).join('').toUpperCase();
});
UserSchema.pre('save', async function(next) {
if (this.isModified('password')) {
this.password = await bcrypt.hash(this.password, 10);
}
next();
});
describe('User Virtuals and Middleware', () => {
test('generates correct initials', async () => {
const user = await User.create({ email: 'j@test.com', name: 'John Doe' });
expect(user.initials).toBe('JD');
});
test('hashes password on save', async () => {
const user = await User.create({
email: 'secure@test.com',
name: 'Secure User',
password: 'plaintext123'
});
expect(user.password).not.toBe('plaintext123');
expect(user.password).toMatch(/^\$2[ab]\$/); // bcrypt pattern
const isValid = await bcrypt.compare('plaintext123', user.password);
expect(isValid).toBe(true);
});
});Testing with Transactions
mongodb-memory-server supports transactions if you use a replica set:
const { MongoMemoryReplSet } = require('mongodb-memory-server');
let replSet;
beforeAll(async () => {
replSet = await MongoMemoryReplSet.create({ replSet: { count: 1 } });
await mongoose.connect(replSet.getUri());
});
test('rolls back transaction on error', async () => {
const session = await mongoose.startSession();
try {
await session.withTransaction(async () => {
await User.create([{ email: 'tx1@test.com', name: 'TX User 1' }], { session });
// This will fail (validation error)
await User.create([{ email: '', name: 'Invalid' }], { session });
});
} catch (err) {
// Transaction rolled back
}
// First user should NOT exist (transaction rolled back)
const user = await User.findOne({ email: 'tx1@test.com' });
expect(user).toBeNull();
});Mockgoose (Legacy Reference)
Mockgoose was a popular mocking library for Mongoose before mongodb-memory-server became standard. It's largely deprecated now — mongodb-memory-server is better in every way:
- Actual MongoDB binary (not a partial implementation)
- Supports all features including transactions
- Actively maintained
- Works with any MongoDB driver, not just Mongoose
If you're still using Mockgoose, migrate to mongodb-memory-server.
Integration Testing with Real MongoDB
For tests that need real MongoDB behavior (replica sets, change streams, Atlas search):
# Python with Testcontainers
from testcontainers.mongodb import MongoDbContainer
import pymongo
@pytest.fixture(scope="session")
def mongodb_container():
with MongoDbContainer("mongo:7") as container:
yield container
@pytest.fixture(scope="session")
def mongo_client(mongodb_container):
client = pymongo.MongoClient(mongodb_container.get_connection_url())
yield client
client.close()Monitoring MongoDB in Production
Testing catches code bugs, but MongoDB production failures are often operational:
- Index missing after migration causes slow queries
- Connection pool exhausted under spike traffic
- Replication lag causing stale reads
- Atlas free tier storage limit hit
HelpMeTest provides 24/7 monitoring of your live application endpoints. Write plain-English tests that check your MongoDB-backed API responses — catch degraded performance and errors before users do. No code required, starts at $100/month for unlimited tests.
Summary
| Tool | Language | Best For |
|---|---|---|
| mongomock | Python | Fast unit tests, no Docker |
| mongodb-memory-server | Node.js | Full MongoDB parity, all features |
| Testcontainers | Both | Change streams, Atlas features |
| HelpMeTest | Any | 24/7 production monitoring |
Start with mongomock or mongodb-memory-server for fast CI tests. Add Testcontainers for integration tests that need real MongoDB. Monitor production continuously to catch the operational failures that unit tests can't prevent.