Skip to content

mongrator.ops

mongrator.ops

Declarative operation helpers for MongoDB migrations.

Each helper returns an Operation whose apply() and revert() methods perform the forward and reverse changes respectively. When a migration's up() function returns a list[Operation] and no down() is defined, the runner auto-generates rollback by calling revert() on each operation in reverse order.

Usage in a migration file::

from mongrator import ops

def up(db):
    return [
        ops.create_index("users", {"email": 1}, unique=True),
        ops.rename_field("users", "name", "full_name"),
    ]

Operation dataclass

An atomic database operation, optionally auto-reversible.

Check is_reversible before relying on revert(). Operations whose is_reversible is False will raise NotImplementedError from revert() — a down() function is required to roll them back.

Source code in src/mongrator/ops.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass
class Operation:
    """An atomic database operation, optionally auto-reversible.

    Check ``is_reversible`` before relying on ``revert()``. Operations whose
    ``is_reversible`` is ``False`` will raise ``NotImplementedError`` from
    ``revert()`` — a ``down()`` function is required to roll them back.
    """

    description: str
    _apply: Any = field(repr=False)
    _revert: Any = field(repr=False)
    _is_reversible: bool = field(default=True, repr=False)

    @property
    def is_reversible(self) -> bool:
        """Whether ``revert()`` can succeed on a fresh Operation instance.

        The runner's auto-rollback path calls ``up(db)`` a second time to obtain
        new Operation instances and then calls ``revert()`` on them.  Because
        these are fresh instances, any state captured during ``apply()`` is not
        available.  This property returns ``False`` when revert requires such
        runtime state (e.g. ``drop_index`` without explicit ``keys``) or when
        the operation is inherently destructive (``drop_field``,
        ``drop_collection``).
        """
        return self._is_reversible

    def apply(self, db: Database) -> None:  # type: ignore[type-arg]
        self._apply(db)

    def revert(self, db: Database) -> None:  # type: ignore[type-arg]
        self._revert(db)

is_reversible property

Whether revert() can succeed on a fresh Operation instance.

The runner's auto-rollback path calls up(db) a second time to obtain new Operation instances and then calls revert() on them. Because these are fresh instances, any state captured during apply() is not available. This property returns False when revert requires such runtime state (e.g. drop_index without explicit keys) or when the operation is inherently destructive (drop_field, drop_collection).

create_index(collection, keys, **kwargs)

Create an index. Reverts by dropping the index.

Source code in src/mongrator/ops.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def create_index(
    collection: str,
    keys: dict[str, int | str],
    **kwargs: Any,
) -> Operation:
    """Create an index. Reverts by dropping the index."""
    index_name: str | None = kwargs.get("name")

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        db[collection].create_index(list(keys.items()), **kwargs)

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        name = index_name or "_".join(f"{k}_{v}" for k, v in keys.items())
        db[collection].drop_index(name)

    key_repr = ", ".join(f"{k}: {v}" for k, v in keys.items())
    return Operation(
        description=f"create_index({collection!r}, {{{key_repr}}})",
        _apply=apply,
        _revert=revert,
    )

drop_index(collection, index_name, keys=None, **kwargs)

Drop an index by name. Reverts by recreating the index.

When keys (and optional index options) are provided, revert is fully stateless — it recreates the index from the supplied spec without needing to have run apply() first. This is required for the ops-based auto- rollback path where the runner calls up(db) a second time and then immediately calls revert() on fresh Operation instances.

keys accepts the same dict form used by create_index (e.g. {"email": 1}) or a list[tuple] matching pymongo's format (e.g. [("email", 1)]). A dict is normalized to a list of tuples internally.

If keys is omitted, apply() will attempt to capture the index spec at runtime; however this only works when revert() is called on the same Operation instance that ran apply().

Source code in src/mongrator/ops.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def drop_index(
    collection: str,
    index_name: str,
    keys: dict[str, int | str] | Sequence[tuple[str, int | str]] | None = None,
    **kwargs: Any,
) -> Operation:
    """Drop an index by name. Reverts by recreating the index.

    When *keys* (and optional index options) are provided, revert is fully
    stateless — it recreates the index from the supplied spec without needing
    to have run apply() first.  This is required for the ops-based auto-
    rollback path where the runner calls ``up(db)`` a second time and then
    immediately calls ``revert()`` on fresh Operation instances.

    *keys* accepts the same ``dict`` form used by ``create_index`` (e.g.
    ``{"email": 1}``) or a ``list[tuple]`` matching pymongo's format (e.g.
    ``[("email", 1)]``).  A dict is normalized to a list of tuples internally.

    If *keys* is omitted, apply() will attempt to capture the index spec at
    runtime; however this only works when revert() is called on the **same**
    Operation instance that ran apply().
    """
    # Normalize dict keys to the list-of-tuples form pymongo expects.
    # ty narrows isinstance(…, dict) to dict[object, object], losing the
    # generic parameters, so the comprehension appears as list[tuple[object,
    # object]] rather than list[tuple[str, int | str]].
    _norm_keys: list[tuple[str, int | str]] | None = None
    if isinstance(keys, dict):
        _norm_keys = [(k, v) for k, v in keys.items()]  # ty: ignore[invalid-assignment]
    elif keys is not None:
        _norm_keys = list(keys)

    _captured_spec: dict[str, Any] = {}
    # index_name is authoritative; drop any conflicting name from kwargs.
    kwargs.pop("name", None)

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        if _norm_keys is None:
            _captured_spec.clear()
            indexes = db[collection].index_information()
            if index_name in indexes:
                info = indexes[index_name]
                _captured_spec["key"] = info["key"]
                opts = {k: v for k, v in info.items() if k not in ("key", "v", "ns")}
                _captured_spec["opts"] = opts
        db[collection].drop_index(index_name)

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        if _norm_keys is not None:
            db[collection].create_index(_norm_keys, name=index_name, **kwargs)
        elif _captured_spec:
            _captured_spec["opts"]["name"] = index_name
            db[collection].create_index(_captured_spec["key"], **_captured_spec["opts"])
        else:
            raise NotImplementedError(
                f"drop_index({collection!r}, {index_name!r}) cannot be auto-reverted: "
                "index spec was not captured. Supply keys= or define a down() function."
            )

    return Operation(
        description=f"drop_index({collection!r}, {index_name!r})",
        _apply=apply,
        _revert=revert,
        _is_reversible=_norm_keys is not None,
    )

rename_field(collection, old_name, new_name, filter=None)

Rename a field across all (or filtered) documents. Reverts by renaming back.

Source code in src/mongrator/ops.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def rename_field(
    collection: str,
    old_name: str,
    new_name: str,
    filter: dict[str, Any] | None = None,
) -> Operation:
    """Rename a field across all (or filtered) documents. Reverts by renaming back."""
    query = filter or {}

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        db[collection].update_many(query, {"$rename": {old_name: new_name}})

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        db[collection].update_many(query, {"$rename": {new_name: old_name}})

    return Operation(
        description=f"rename_field({collection!r}, {old_name!r}{new_name!r})",
        _apply=apply,
        _revert=revert,
    )

add_field(collection, field_name, default_value, filter=None)

Add a field with a default value to all (or filtered) documents. Reverts by unsetting the field.

Source code in src/mongrator/ops.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def add_field(
    collection: str,
    field_name: str,
    default_value: Any,
    filter: dict[str, Any] | None = None,
) -> Operation:
    """Add a field with a default value to all (or filtered) documents.
    Reverts by unsetting the field.
    """
    query = filter or {}

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        db[collection].update_many(
            {**query, field_name: {"$exists": False}},
            {"$set": {field_name: default_value}},
        )

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        db[collection].update_many(query, {"$unset": {field_name: ""}})

    return Operation(
        description=f"add_field({collection!r}, {field_name!r}={default_value!r})",
        _apply=apply,
        _revert=revert,
    )

drop_field(collection, field_name, filter=None)

Remove a field from all (or filtered) documents. Not auto-reversible because the original values are lost.

Source code in src/mongrator/ops.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def drop_field(
    collection: str,
    field_name: str,
    filter: dict[str, Any] | None = None,
) -> Operation:
    """Remove a field from all (or filtered) documents. Not auto-reversible
    because the original values are lost.
    """
    query = filter or {}

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        db[collection].update_many(
            {**query, field_name: {"$exists": True}},
            {"$unset": {field_name: ""}},
        )

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        raise NotImplementedError(
            f"drop_field({collection!r}, {field_name!r}) cannot be auto-reverted. "
            "Define a down() function to restore the field."
        )

    return Operation(
        description=f"drop_field({collection!r}, {field_name!r})",
        _apply=apply,
        _revert=revert,
        _is_reversible=False,
    )

create_collection(collection, **kwargs)

Create a collection. Reverts by dropping it.

Source code in src/mongrator/ops.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def create_collection(collection: str, **kwargs: Any) -> Operation:
    """Create a collection. Reverts by dropping it."""

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        db.create_collection(collection, **kwargs)

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        db.drop_collection(collection)

    return Operation(
        description="create_collection({!r}{})".format(
            collection,
            ", " + ", ".join(f"{k}={v!r}" for k, v in kwargs.items()) if kwargs else "",
        ),
        _apply=apply,
        _revert=revert,
    )

drop_collection(collection)

Drop a collection. Not auto-reversible because the data is lost.

Source code in src/mongrator/ops.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def drop_collection(collection: str) -> Operation:
    """Drop a collection. Not auto-reversible because the data is lost."""

    def apply(db: Database) -> None:  # type: ignore[type-arg]
        db.drop_collection(collection)

    def revert(db: Database) -> None:  # type: ignore[type-arg]
        raise NotImplementedError(
            f"drop_collection({collection!r}) cannot be auto-reverted. "
            "Define a down() function to recreate the collection."
        )

    return Operation(
        description=f"drop_collection({collection!r})",
        _apply=apply,
        _revert=revert,
        _is_reversible=False,
    )