Repositories¶
The AppKernel repository API is influenced by peewee, a small and elegant Python ORM for relational databases.
The key difference is that AppKernel’s repository is optimised for (and currently only implemented for) MongoDB.
A custom repository implementation for SQL or any other database is possible by extending the Repository base class.
Basic CRUD (Create, Update, Delete) operations¶
Note
You can follow all examples in Python’s interactive interpreter using the imports below.
The following setup connects to a local MongoDB instance and creates a database named tutorial.
In a full application, use AppKernelEngine instead — it handles MongoDB initialisation automatically:
from appkernel import AppKernelEngine
kernel = AppKernelEngine('tutorial', enable_defaults=True)
For use in development or production, choose one of two configuration options:
Default configuration¶
When AppKernelEngine is initialised with enable_defaults=True, it connects to MongoDB on
localhost and uses the database name app:
kernel = AppKernelEngine('demo', enable_defaults=True)
File-based configuration¶
On initialisation, AppKernel looks for a cfg.yml file. The following keys configure the database connection:
appkernel:
mongo:
host: localhost
db: appkernel
The host value accepts a full mongodb:// connection string including credentials.
Building a base model structure¶
Let’s create a simple project-management model with tasks:
from datetime import datetime
from enum import Enum
from typing import Annotated
from appkernel import (
Model, MongoRepository, AuditableRepository,
Required, Generator, Default, Validators,
NotEmpty, Past,
date_now_generator,
)
class Priority(Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
class Task(Model, MongoRepository):
name: Annotated[str | None, Required(), Validators(NotEmpty)] = None
description: Annotated[str | None, Validators(NotEmpty)] = None
completed: Annotated[bool | None, Required(), Default(False)] = None
created: Annotated[datetime | None, Required(), Generator(date_now_generator)] = None
closed_date: Annotated[datetime | None, Validators(Past)] = None
priority: Annotated[Priority | None, Required(), Default(Priority.MEDIUM)] = None
def complete(self):
self.completed = True
self.closed_date = datetime.now()
class Project(Model, AuditableRepository):
id: str | None = None
name: Annotated[str | None, Required(), Validators(NotEmpty)] = None
tasks: list[Task] | None = None
created: Annotated[datetime | None, Required(), Generator(date_now_generator)] = None
Saving and updating¶
Define a project with some tasks:
project = Project(name='some test project')
project.append_to(tasks=Task(name='finish the documentation', priority=Priority.HIGH))
# Add multiple tasks at once
project.append_to(tasks=[Task(name='finish all todos'), Task(name='complete the unit tests')])
project.save()
print(project.dumps(pretty_print=True))
Output:
{
"id": "OBJ_5b142be00df7a9647023f0b1",
"created": "2018-06-03T19:54:06.830307",
"name": "some test project",
"tasks": [
{
"completed": false,
"created": "2018-06-03T19:53:38.149125",
"name": "finish the documentation",
"priority": "HIGH"
},
{
"completed": false,
"created": "2018-06-03T19:53:51.041349",
"name": "finish all todos",
"priority": "MEDIUM"
},
{
"completed": false,
"created": "2018-06-03T19:53:51.041380",
"name": "complete the unit tests",
"priority": "MEDIUM"
}
]
}
Complete the first task:
project.tasks[0].complete()
project.save()
print(project.dumps(pretty_print=True))
Notice completed is now true, closed_date is set, and AuditableRepository
has incremented the version and updated the updated timestamp:
{
"created": "2018-06-11T23:17:57.050000",
"id": "OBJ_5b1ee7050df7a9087e0e8952",
"inserted": "2018-06-11T23:17:57.050000",
"name": "some test project",
"tasks": [
{
"closed_date": "2018-06-11T23:19:39.345000",
"completed": true,
"created": "2018-06-11T23:17:57.050000",
"name": "finish the documentation",
"priority": "HIGH"
},
...
],
"updated": "2018-06-11T23:19:46.428000",
"version": 2
}
Auditable Repository¶
AuditableRepository automatically adds three fields to every document:
inserted: the date and time the document was first created;
updated: the date and time of the most recent update;
version: the number of times the document has been updated;
Use MongoRepository when you do not need audit metadata.
Delete objects¶
Count documents:
Project.count()
1
Delete a single document:
project.delete()
1
Delete all documents in the collection:
Project.delete_all()
Querying data¶
AppKernel provides a query DSL built on operator overloading. The query can be passed to:
find — returns a generator to iterate over the matching documents;
find_one — returns the first match or
None;where — returns a
Queryobject for chaining (e.g.sort_by);
A simple query:
prj = Project.find_one(Project.name == 'some test project')
print(prj.dumps(pretty_print=True))
Search across a nested array using dot-path chaining:
prj = Project.find_one(Project.tasks.name % 'finish')
Alternatively, use bracket notation for element matching:
prj2 = Project.find_one(Project.tasks[Task.name == 'finish the documentation'])
Iterate over all documents:
for project in Project.find():
print(project)
Iterate over matching documents:
for prj in Project.find(Project.name == 'some test project'):
print(prj.dumps(pretty_print=True))
Sort the result:
query = Project.where(Project.name == 'some test project').sort_by(Project.created.asc())
for prj in query.find():
print(prj.dumps(pretty_print=True))
Compound expressions:
from datetime import datetime, date
yesterday = datetime.combine(date(2018, 6, 10), datetime.min.time())
today = datetime.combine(date(2018, 6, 11), datetime.min.time())
prj = Project.find_one((Project.created > yesterday) & (Project.created < today))
Pagination¶
The following query returns the first 10 projects:
for prj in Project.find(page=0, page_size=10):
print(prj)
Query expressions¶
prj = Project.find_by_id('5b1ee9930df7a9087e0e8953')
prj = Project.find_one(Project.name == 'Project A')
projects = Project.find(Project.name != 'Project A')
prj = Project.find_one((Project.name == 'Project A') | (Project.name == 'Project B'))
from datetime import timedelta
yesterday = datetime.now() - timedelta(days=1)
prj = Project.find_one((Project.name == 'Project A') & (Project.created > yesterday))
Find all projects with no tasks:
prj = Project.find_one(Project.tasks == None)
Find all projects with at least one task whose name contains ‘finish’:
prj = Project.find_one(Project.tasks.name % 'finish')
Find all users who have the roles Admin and Operator:
User.find(User.roles % ['Admin', 'Operator'])
User.find(User.description == None)
User.find(User.description != None)
yesterday = datetime.now() - timedelta(days=1)
tomorrow = datetime.now() + timedelta(days=1)
projects = Project.find((Project.created > yesterday) & (Project.created < tomorrow))
Query on fields that exist in the database but are not declared on the model (e.g. audit fields added by AuditableRepository):
project = Project.find_one(Project.custom_property('version') == 2)
Advanced Functionality¶
Atomic updates¶
Avoid the read-modify-write cycle for counter updates. The naive approach is slow and prone to race conditions:
# DON'T DO THIS — vulnerable to concurrent modification
for stock in Stock.find((Stock.product.code == 'BTX') & (Stock.product.size == ProductSize.L)):
if stock.available > 0:
stock.available -= 1
stock.reserved += 1
stock.save()
else:
raise ReservationException('Not enough products on stock.')
Use update() instead for a single atomic operation:
query = Stock.where((Stock.product.code == 'BTX') & (Stock.product.size == ProductSize.L))
res = query.update(available=Stock.available - quantity, reserved=Stock.reserved + quantity)
if res == 0:
raise ReservationException('No stock available for code BTX, size L.')
elif res > 1:
raise ReservationException(f'Multiple items reserved ({res}).')
Native queries¶
For complex queries not covered by the DSL, fall back to native MongoDB syntax:
for p in Project.find_by_query({'counter': {'$gte': 0, '$lt': 10}}):
print(f'Project: {p.name}, counter: {p.counter}')
You can also obtain a reference to the underlying PyMongo Collection:
mongo_document = Project.get_collection().find_one(filter)
Bulk insert¶
Insert (or upsert) multiple documents at once:
def create_user_batch(count=50):
return [
User()
.update(name=f'user_{i}')
.update(password='default password')
.append_to(roles=['Admin', 'User', 'Operator'])
for i in range(1, count + 1)
]
ids = User.bulk_insert(create_user_batch())
Dropping the collection¶
User.get_collection().drop()
Check index information¶
idx_info = User.get_collection().index_information()
Index management¶
Indexes speed up queries on specific fields. Declare indexes directly in the field’s Annotated[] metadata:
from appkernel import MongoIndex, MongoUniqueIndex, MongoTextIndex
class Project(Model, AuditableRepository):
name: Annotated[str | None, Required(), Validators(NotEmpty), MongoUniqueIndex()] = None
created: Annotated[datetime | None, Required(), Generator(date_now_generator), MongoIndex()] = None
Project.init_indexes()
MongoUniqueIndex on name prevents duplicate project names. MongoIndex on created speeds up queries and sorting by creation date.
Built-in index types¶
MongoIndex: standard index to speed up queries (note: indexes also slow down inserts, so use them selectively);
MongoUniqueIndex: unique constraint — only one document per unique value is allowed;
MongoTextIndex: full-text search index for string fields;
For more details, see the MongoDB indexes documentation.
Schema Installation¶
MongoDB supports JSON Schema validation to enforce data integrity on inserts and updates. AppKernel can generate and install this schema for you:
Project.add_schema_validation(validation_action='error')
The validation_action parameter accepts:
'error': rejects invalid documents;'warning': logs a warning but allows the operation;
Optimistic Locking¶
AppKernel uses optimistic locking to detect concurrent write conflicts. Every document managed
by MongoRepository (and its subclasses) carries a version counter maintained
automatically by the repository — no schema change or field declaration is required.
How it works¶
On first insert the document receives
version = 1.On each subsequent save the repository atomically increments
versionto the next integer and conditions the update on the current version matching what the caller holds. If another process has already incremented the version in the meantime, the update finds no matching document and raisesVersionConflictError.
AuditableRepository inherits the same version logic and additionally maintains
inserted and updated timestamps.
When served over HTTP the framework converts VersionConflictError to an HTTP 409
Conflict response automatically — no extra handling is needed in service code.
Recovering from a conflict¶
The standard recovery pattern is re-fetch, merge, retry:
from appkernel import VersionConflictError
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
project = Project.find_by_id(project_id) # re-fetch to get the latest version
project.name = 'Updated name'
try:
project.save()
break
except VersionConflictError:
if attempt == MAX_RETRIES - 1:
raise # give up after N retries
Detecting a conflict in tests¶
The example below demonstrates the collision scenario:
from appkernel import VersionConflictError
import pytest
# Both callers read the same document at version 1
original = Project.find_by_id(project_id)
concurrent = Project.find_by_id(project_id)
# First writer succeeds — document is now version 2
original.name = 'First update'
original.save()
# Second writer holds version 1 — the update is rejected
concurrent.name = 'Lost update'
with pytest.raises(VersionConflictError):
concurrent.save()
Repository types and version tracking¶
Repository |
|
timestamps |
CRUD |
|---|---|---|---|
|
yes |
no |
yes |
|
yes |
yes |
yes |
Supported Repository Types¶
All repositories extend the Repository base class:
MongoRepository— standard CRUD and query access to MongoDB, including automatic optimistic-lockingversiontracking;AuditableRepository— extends MongoRepository with automaticinserted,updated, andversionfields;
Aggregation Pipeline¶
MongoDB’s Aggregation Pipeline is accessible via the collection reference:
pipeline = [{'$match': ...}, {'$group': ...}]
Project.get_collection().aggregate(pipeline)