Skip to content

mongrator.runner

mongrator.runner

Migration runners: Protocol definitions and sync/async implementations.

SyncRunner wraps a pymongo MongoClient. AsyncRunner wraps a pymongo AsyncMongoClient.

Both share the same non-IO logic via loader and planner.

SyncRunner

Synchronous migration runner backed by pymongo.

Source code in src/mongrator/runner.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class SyncRunner:
    """Synchronous migration runner backed by pymongo."""

    def __init__(self, client: "MongoClient", config: MigratorConfig) -> None:  # type: ignore[type-arg]
        self._client = client
        self._db = client[config.database]
        self._store = SyncStateStore(self._db[config.collection])
        self._lock = SyncMigrationLock(self._db[config.collection])
        self._config = config

    def plan_up(self, target: MigrationId | None = None) -> MigrationPlan:
        """Return the plan for applying pending migrations without executing."""
        files = loader.load(self._config)
        applied = self._store.get_applied()
        return planner.plan_up(files, applied, target)

    def plan_down(self, steps: int = 1) -> MigrationPlan:
        """Return the plan for rolling back migrations without executing."""
        files = loader.load(self._config)
        applied = self._store.get_applied()
        return planner.plan_down(files, applied, steps)

    def up(self, target: MigrationId | None = None, *, transactional: bool = False) -> list[MigrationId]:
        """Apply pending migrations, optionally up to `target`.

        Args:
            target: Stop after applying this migration ID.
            transactional: When True, wrap each migration in a MongoDB
                transaction. Requires a replica set; raises
                ``TransactionNotSupportedError`` otherwise.
        """
        if transactional:
            _check_transaction_support(self._client)
        with self._lock:
            files = loader.load(self._config)
            applied = self._store.get_applied()
            plan = planner.plan_up(files, applied, target)
            applied_ids: list[MigrationId] = []
            for migration in plan.to_apply:
                start = time.monotonic()
                if transactional:
                    with self._client.start_session() as session:
                        with session.start_transaction():
                            _run_up_migration(migration, _SessionBoundDatabase(self._db, session))
                else:
                    _run_up_migration(migration, self._db)
                duration_ms = int((time.monotonic() - start) * 1000)
                self._store.record_applied(make_record(migration.id, migration.checksum, "up", duration_ms))
                applied_ids.append(migration.id)
            return applied_ids

    def down(self, steps: int = 1, *, transactional: bool = False) -> list[MigrationId]:
        """Roll back the most recently applied migrations.

        Args:
            steps: Number of migrations to roll back.
            transactional: When True, wrap each migration in a MongoDB
                transaction. Requires a replica set; raises
                ``TransactionNotSupportedError`` otherwise.
        """
        if transactional:
            _check_transaction_support(self._client)
        with self._lock:
            files = loader.load(self._config)
            applied = self._store.get_applied()
            plan = planner.plan_down(files, applied, steps)
            rolled_back: list[MigrationId] = []
            for migration in plan.to_apply:
                start = time.monotonic()
                if transactional:
                    with self._client.start_session() as session:
                        with session.start_transaction():
                            _run_down_migration(migration, _SessionBoundDatabase(self._db, session))
                else:
                    _run_down_migration(migration, self._db)
                duration_ms = int((time.monotonic() - start) * 1000)
                self._store.record_applied(make_record(migration.id, migration.checksum, "down", duration_ms))
                rolled_back.append(migration.id)
            return rolled_back

    def status(self) -> list[MigrationStatus]:
        """Return the status of every known migration, including orphans.

        This is a read-only operation and intentionally does not acquire the
        advisory lock so it can run safely while migrations are in progress.
        """
        files = loader.load(self._config)
        applied = self._store.get_applied()
        file_ids = {f.id for f in files}
        statuses: list[MigrationStatus] = []
        for f in files:
            if f.id in applied:
                record = self._store.get_record(f.id)
                checksum_ok = record is None or record["checksum"] == f.checksum
                applied_at = record["applied_at"] if record else None
            else:
                checksum_ok = True
                applied_at = None
            statuses.append(
                MigrationStatus(
                    id=f.id,
                    applied=f.id in applied,
                    applied_at=applied_at,
                    checksum_ok=checksum_ok,
                )
            )
        for orphan_id in sorted(applied - file_ids):
            record = self._store.get_record(orphan_id)
            statuses.append(
                MigrationStatus(
                    id=orphan_id,
                    applied=True,
                    applied_at=record["applied_at"] if record else None,
                    orphaned=True,
                )
            )
        return statuses

    def validate(self) -> list[ChecksumMismatchError]:
        """Check that applied migration files match their recorded checksums.

        This is a read-only operation and intentionally does not acquire the
        advisory lock so it can run safely while migrations are in progress.
        """
        files = loader.load(self._config)
        applied = self._store.get_applied()
        errors: list[ChecksumMismatchError] = []
        for f in files:
            if f.id not in applied:
                continue
            record = self._store.get_record(f.id)
            if record and record["checksum"] != f.checksum:
                errors.append(ChecksumMismatchError(f.id, record["checksum"], f.checksum))
        return errors

plan_up(target=None)

Return the plan for applying pending migrations without executing.

Source code in src/mongrator/runner.py
214
215
216
217
218
def plan_up(self, target: MigrationId | None = None) -> MigrationPlan:
    """Return the plan for applying pending migrations without executing."""
    files = loader.load(self._config)
    applied = self._store.get_applied()
    return planner.plan_up(files, applied, target)

plan_down(steps=1)

Return the plan for rolling back migrations without executing.

Source code in src/mongrator/runner.py
220
221
222
223
224
def plan_down(self, steps: int = 1) -> MigrationPlan:
    """Return the plan for rolling back migrations without executing."""
    files = loader.load(self._config)
    applied = self._store.get_applied()
    return planner.plan_down(files, applied, steps)

up(target=None, *, transactional=False)

Apply pending migrations, optionally up to target.

Parameters:

Name Type Description Default
target MigrationId | None

Stop after applying this migration ID.

None
transactional bool

When True, wrap each migration in a MongoDB transaction. Requires a replica set; raises TransactionNotSupportedError otherwise.

False
Source code in src/mongrator/runner.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def up(self, target: MigrationId | None = None, *, transactional: bool = False) -> list[MigrationId]:
    """Apply pending migrations, optionally up to `target`.

    Args:
        target: Stop after applying this migration ID.
        transactional: When True, wrap each migration in a MongoDB
            transaction. Requires a replica set; raises
            ``TransactionNotSupportedError`` otherwise.
    """
    if transactional:
        _check_transaction_support(self._client)
    with self._lock:
        files = loader.load(self._config)
        applied = self._store.get_applied()
        plan = planner.plan_up(files, applied, target)
        applied_ids: list[MigrationId] = []
        for migration in plan.to_apply:
            start = time.monotonic()
            if transactional:
                with self._client.start_session() as session:
                    with session.start_transaction():
                        _run_up_migration(migration, _SessionBoundDatabase(self._db, session))
            else:
                _run_up_migration(migration, self._db)
            duration_ms = int((time.monotonic() - start) * 1000)
            self._store.record_applied(make_record(migration.id, migration.checksum, "up", duration_ms))
            applied_ids.append(migration.id)
        return applied_ids

down(steps=1, *, transactional=False)

Roll back the most recently applied migrations.

Parameters:

Name Type Description Default
steps int

Number of migrations to roll back.

1
transactional bool

When True, wrap each migration in a MongoDB transaction. Requires a replica set; raises TransactionNotSupportedError otherwise.

False
Source code in src/mongrator/runner.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def down(self, steps: int = 1, *, transactional: bool = False) -> list[MigrationId]:
    """Roll back the most recently applied migrations.

    Args:
        steps: Number of migrations to roll back.
        transactional: When True, wrap each migration in a MongoDB
            transaction. Requires a replica set; raises
            ``TransactionNotSupportedError`` otherwise.
    """
    if transactional:
        _check_transaction_support(self._client)
    with self._lock:
        files = loader.load(self._config)
        applied = self._store.get_applied()
        plan = planner.plan_down(files, applied, steps)
        rolled_back: list[MigrationId] = []
        for migration in plan.to_apply:
            start = time.monotonic()
            if transactional:
                with self._client.start_session() as session:
                    with session.start_transaction():
                        _run_down_migration(migration, _SessionBoundDatabase(self._db, session))
            else:
                _run_down_migration(migration, self._db)
            duration_ms = int((time.monotonic() - start) * 1000)
            self._store.record_applied(make_record(migration.id, migration.checksum, "down", duration_ms))
            rolled_back.append(migration.id)
        return rolled_back

status()

Return the status of every known migration, including orphans.

This is a read-only operation and intentionally does not acquire the advisory lock so it can run safely while migrations are in progress.

Source code in src/mongrator/runner.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def status(self) -> list[MigrationStatus]:
    """Return the status of every known migration, including orphans.

    This is a read-only operation and intentionally does not acquire the
    advisory lock so it can run safely while migrations are in progress.
    """
    files = loader.load(self._config)
    applied = self._store.get_applied()
    file_ids = {f.id for f in files}
    statuses: list[MigrationStatus] = []
    for f in files:
        if f.id in applied:
            record = self._store.get_record(f.id)
            checksum_ok = record is None or record["checksum"] == f.checksum
            applied_at = record["applied_at"] if record else None
        else:
            checksum_ok = True
            applied_at = None
        statuses.append(
            MigrationStatus(
                id=f.id,
                applied=f.id in applied,
                applied_at=applied_at,
                checksum_ok=checksum_ok,
            )
        )
    for orphan_id in sorted(applied - file_ids):
        record = self._store.get_record(orphan_id)
        statuses.append(
            MigrationStatus(
                id=orphan_id,
                applied=True,
                applied_at=record["applied_at"] if record else None,
                orphaned=True,
            )
        )
    return statuses

validate()

Check that applied migration files match their recorded checksums.

This is a read-only operation and intentionally does not acquire the advisory lock so it can run safely while migrations are in progress.

Source code in src/mongrator/runner.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def validate(self) -> list[ChecksumMismatchError]:
    """Check that applied migration files match their recorded checksums.

    This is a read-only operation and intentionally does not acquire the
    advisory lock so it can run safely while migrations are in progress.
    """
    files = loader.load(self._config)
    applied = self._store.get_applied()
    errors: list[ChecksumMismatchError] = []
    for f in files:
        if f.id not in applied:
            continue
        record = self._store.get_record(f.id)
        if record and record["checksum"] != f.checksum:
            errors.append(ChecksumMismatchError(f.id, record["checksum"], f.checksum))
    return errors

AsyncRunner

Asynchronous migration runner backed by pymongo AsyncMongoClient.

Migration functions are dispatched based on their type:

  • Coroutine functions (async def up(db)) receive the async database and are awaited. This allows migrations to use await for non-blocking I/O when running with --async.
  • Regular functions that return a list[Operation] (ops-based migrations) continue to receive the sync database, since ops helpers are synchronous.
  • Regular functions that perform raw pymongo calls also receive the sync database, preserving backwards compatibility.

State tracking always uses the async client for non-blocking operation.

Source code in src/mongrator/runner.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
class AsyncRunner:
    """Asynchronous migration runner backed by pymongo AsyncMongoClient.

    Migration functions are dispatched based on their type:

    - **Coroutine functions** (``async def up(db)``) receive the async database
      and are awaited. This allows migrations to use ``await`` for non-blocking
      I/O when running with ``--async``.
    - **Regular functions** that return a ``list[Operation]`` (ops-based migrations)
      continue to receive the sync database, since ops helpers are synchronous.
    - **Regular functions** that perform raw pymongo calls also receive the sync
      database, preserving backwards compatibility.

    State tracking always uses the async client for non-blocking operation.
    """

    def __init__(  # type: ignore[type-arg]
        self,
        client: "AsyncMongoClient",
        config: MigratorConfig,
        *,
        sync_client: "MongoClient | None" = None,
    ) -> None:
        # Sync DB passed to sync migration functions and ops helpers.
        if sync_client is None:
            sync_client = MongoClient(config.uri)
        self._sync_client = sync_client
        self._db = sync_client[config.database]
        # Async DB passed to coroutine migration functions.
        self._async_db = client[config.database]
        # Async store for non-blocking state tracking.
        self._store = AsyncMongoStateStore(self._async_db[config.collection])
        self._lock = AsyncMigrationLock(self._async_db[config.collection])
        self._config = config

    async def plan_up(self, target: MigrationId | None = None) -> MigrationPlan:
        """Return the plan for applying pending migrations without executing."""
        files = loader.load(self._config)
        applied = await self._store.get_applied()
        return planner.plan_up(files, applied, target)

    async def plan_down(self, steps: int = 1) -> MigrationPlan:
        """Return the plan for rolling back migrations without executing."""
        files = loader.load(self._config)
        applied = await self._store.get_applied()
        return planner.plan_down(files, applied, steps)

    async def up(self, target: MigrationId | None = None, *, transactional: bool = False) -> list[MigrationId]:
        """Apply pending migrations, optionally up to `target`.

        Coroutine migration functions receive the async database and are awaited;
        sync functions and ops-based migrations use the sync database.

        Args:
            target: Stop after applying this migration ID.
            transactional: When True, wrap each migration in a MongoDB
                transaction. Requires a replica set; raises
                ``TransactionNotSupportedError`` otherwise.
        """
        if transactional:
            _check_transaction_support(self._sync_client)
        async with self._lock:
            files = loader.load(self._config)
            applied = await self._store.get_applied()
            plan = planner.plan_up(files, applied, target)
            applied_ids: list[MigrationId] = []
            for migration in plan.to_apply:
                start = time.monotonic()
                if transactional:
                    with self._sync_client.start_session() as session:
                        with session.start_transaction():
                            await _async_run_up_migration(
                                migration, self._async_db, _SessionBoundDatabase(self._db, session)
                            )
                else:
                    await _async_run_up_migration(migration, self._async_db, self._db)
                duration_ms = int((time.monotonic() - start) * 1000)
                await self._store.record_applied(make_record(migration.id, migration.checksum, "up", duration_ms))
                applied_ids.append(migration.id)
            return applied_ids

    async def down(self, steps: int = 1, *, transactional: bool = False) -> list[MigrationId]:
        """Roll back the most recently applied migrations.

        Coroutine migration functions receive the async database and are awaited;
        sync functions and ops-based migrations use the sync database.

        Args:
            steps: Number of migrations to roll back.
            transactional: When True, wrap each migration in a MongoDB
                transaction. Requires a replica set; raises
                ``TransactionNotSupportedError`` otherwise.
        """
        if transactional:
            _check_transaction_support(self._sync_client)
        async with self._lock:
            files = loader.load(self._config)
            applied = await self._store.get_applied()
            plan = planner.plan_down(files, applied, steps)
            rolled_back: list[MigrationId] = []
            for migration in plan.to_apply:
                start = time.monotonic()
                if transactional:
                    with self._sync_client.start_session() as session:
                        with session.start_transaction():
                            await _async_run_down_migration(
                                migration, self._async_db, _SessionBoundDatabase(self._db, session)
                            )
                else:
                    await _async_run_down_migration(migration, self._async_db, self._db)
                duration_ms = int((time.monotonic() - start) * 1000)
                await self._store.record_applied(make_record(migration.id, migration.checksum, "down", duration_ms))
                rolled_back.append(migration.id)
            return rolled_back

    async def status(self) -> list[MigrationStatus]:
        """Return the status of every known migration, including orphans.

        This is a read-only operation and intentionally does not acquire the
        advisory lock so it can run safely while migrations are in progress.
        """
        files = loader.load(self._config)
        applied = await self._store.get_applied()
        file_ids = {f.id for f in files}
        statuses: list[MigrationStatus] = []
        for f in files:
            if f.id in applied:
                record = await self._store.get_record(f.id)
                checksum_ok = record is None or record["checksum"] == f.checksum
                applied_at = record["applied_at"] if record else None
            else:
                checksum_ok = True
                applied_at = None
            statuses.append(
                MigrationStatus(
                    id=f.id,
                    applied=f.id in applied,
                    applied_at=applied_at,
                    checksum_ok=checksum_ok,
                )
            )
        for orphan_id in sorted(applied - file_ids):
            record = await self._store.get_record(orphan_id)
            statuses.append(
                MigrationStatus(
                    id=orphan_id,
                    applied=True,
                    applied_at=record["applied_at"] if record else None,
                    orphaned=True,
                )
            )
        return statuses

    async def validate(self) -> list[ChecksumMismatchError]:
        """Check that applied migration files match their recorded checksums.

        This is a read-only operation and intentionally does not acquire the
        advisory lock so it can run safely while migrations are in progress.
        """
        files = loader.load(self._config)
        applied = await self._store.get_applied()
        errors: list[ChecksumMismatchError] = []
        for f in files:
            if f.id not in applied:
                continue
            record = await self._store.get_record(f.id)
            if record and record["checksum"] != f.checksum:
                errors.append(ChecksumMismatchError(f.id, record["checksum"], f.checksum))
        return errors

plan_up(target=None) async

Return the plan for applying pending migrations without executing.

Source code in src/mongrator/runner.py
375
376
377
378
379
async def plan_up(self, target: MigrationId | None = None) -> MigrationPlan:
    """Return the plan for applying pending migrations without executing."""
    files = loader.load(self._config)
    applied = await self._store.get_applied()
    return planner.plan_up(files, applied, target)

plan_down(steps=1) async

Return the plan for rolling back migrations without executing.

Source code in src/mongrator/runner.py
381
382
383
384
385
async def plan_down(self, steps: int = 1) -> MigrationPlan:
    """Return the plan for rolling back migrations without executing."""
    files = loader.load(self._config)
    applied = await self._store.get_applied()
    return planner.plan_down(files, applied, steps)

up(target=None, *, transactional=False) async

Apply pending migrations, optionally up to target.

Coroutine migration functions receive the async database and are awaited; sync functions and ops-based migrations use the sync database.

Parameters:

Name Type Description Default
target MigrationId | None

Stop after applying this migration ID.

None
transactional bool

When True, wrap each migration in a MongoDB transaction. Requires a replica set; raises TransactionNotSupportedError otherwise.

False
Source code in src/mongrator/runner.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
async def up(self, target: MigrationId | None = None, *, transactional: bool = False) -> list[MigrationId]:
    """Apply pending migrations, optionally up to `target`.

    Coroutine migration functions receive the async database and are awaited;
    sync functions and ops-based migrations use the sync database.

    Args:
        target: Stop after applying this migration ID.
        transactional: When True, wrap each migration in a MongoDB
            transaction. Requires a replica set; raises
            ``TransactionNotSupportedError`` otherwise.
    """
    if transactional:
        _check_transaction_support(self._sync_client)
    async with self._lock:
        files = loader.load(self._config)
        applied = await self._store.get_applied()
        plan = planner.plan_up(files, applied, target)
        applied_ids: list[MigrationId] = []
        for migration in plan.to_apply:
            start = time.monotonic()
            if transactional:
                with self._sync_client.start_session() as session:
                    with session.start_transaction():
                        await _async_run_up_migration(
                            migration, self._async_db, _SessionBoundDatabase(self._db, session)
                        )
            else:
                await _async_run_up_migration(migration, self._async_db, self._db)
            duration_ms = int((time.monotonic() - start) * 1000)
            await self._store.record_applied(make_record(migration.id, migration.checksum, "up", duration_ms))
            applied_ids.append(migration.id)
        return applied_ids

down(steps=1, *, transactional=False) async

Roll back the most recently applied migrations.

Coroutine migration functions receive the async database and are awaited; sync functions and ops-based migrations use the sync database.

Parameters:

Name Type Description Default
steps int

Number of migrations to roll back.

1
transactional bool

When True, wrap each migration in a MongoDB transaction. Requires a replica set; raises TransactionNotSupportedError otherwise.

False
Source code in src/mongrator/runner.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
async def down(self, steps: int = 1, *, transactional: bool = False) -> list[MigrationId]:
    """Roll back the most recently applied migrations.

    Coroutine migration functions receive the async database and are awaited;
    sync functions and ops-based migrations use the sync database.

    Args:
        steps: Number of migrations to roll back.
        transactional: When True, wrap each migration in a MongoDB
            transaction. Requires a replica set; raises
            ``TransactionNotSupportedError`` otherwise.
    """
    if transactional:
        _check_transaction_support(self._sync_client)
    async with self._lock:
        files = loader.load(self._config)
        applied = await self._store.get_applied()
        plan = planner.plan_down(files, applied, steps)
        rolled_back: list[MigrationId] = []
        for migration in plan.to_apply:
            start = time.monotonic()
            if transactional:
                with self._sync_client.start_session() as session:
                    with session.start_transaction():
                        await _async_run_down_migration(
                            migration, self._async_db, _SessionBoundDatabase(self._db, session)
                        )
            else:
                await _async_run_down_migration(migration, self._async_db, self._db)
            duration_ms = int((time.monotonic() - start) * 1000)
            await self._store.record_applied(make_record(migration.id, migration.checksum, "down", duration_ms))
            rolled_back.append(migration.id)
        return rolled_back

status() async

Return the status of every known migration, including orphans.

This is a read-only operation and intentionally does not acquire the advisory lock so it can run safely while migrations are in progress.

Source code in src/mongrator/runner.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
async def status(self) -> list[MigrationStatus]:
    """Return the status of every known migration, including orphans.

    This is a read-only operation and intentionally does not acquire the
    advisory lock so it can run safely while migrations are in progress.
    """
    files = loader.load(self._config)
    applied = await self._store.get_applied()
    file_ids = {f.id for f in files}
    statuses: list[MigrationStatus] = []
    for f in files:
        if f.id in applied:
            record = await self._store.get_record(f.id)
            checksum_ok = record is None or record["checksum"] == f.checksum
            applied_at = record["applied_at"] if record else None
        else:
            checksum_ok = True
            applied_at = None
        statuses.append(
            MigrationStatus(
                id=f.id,
                applied=f.id in applied,
                applied_at=applied_at,
                checksum_ok=checksum_ok,
            )
        )
    for orphan_id in sorted(applied - file_ids):
        record = await self._store.get_record(orphan_id)
        statuses.append(
            MigrationStatus(
                id=orphan_id,
                applied=True,
                applied_at=record["applied_at"] if record else None,
                orphaned=True,
            )
        )
    return statuses

validate() async

Check that applied migration files match their recorded checksums.

This is a read-only operation and intentionally does not acquire the advisory lock so it can run safely while migrations are in progress.

Source code in src/mongrator/runner.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
async def validate(self) -> list[ChecksumMismatchError]:
    """Check that applied migration files match their recorded checksums.

    This is a read-only operation and intentionally does not acquire the
    advisory lock so it can run safely while migrations are in progress.
    """
    files = loader.load(self._config)
    applied = await self._store.get_applied()
    errors: list[ChecksumMismatchError] = []
    for f in files:
        if f.id not in applied:
            continue
        record = await self._store.get_record(f.id)
        if record and record["checksum"] != f.checksum:
            errors.append(ChecksumMismatchError(f.id, record["checksum"], f.checksum))
    return errors