Source code for dakweb.routers.changelog

# SPDX-License-Identifier:  GPL-2.0-or-later
# © 2026, Ansgar 🙀 <ansgar@debian.org>
# © 2026, Anton Gladky <gladk@debian.org>

from datetime import datetime
from typing import Annotated

from fastapi import APIRouter, Depends, Query, Response
from pydantic import AfterValidator, BaseModel
from sqlalchemy import TIMESTAMP, cast, or_, select
from sqlalchemy.orm import Session

from daklib.dbconn import DBChange, DBChangelog
from dakweb.routers.deps import get_db

router = APIRouter()


[docs] class ChangelogV1(BaseModel): date: str source: str version: str changedby: str changelog: str
[docs] @router.get("/changelogs") def changelogs( response: Response, search_term: Annotated[ str, AfterValidator(str.strip), Query( description="Substring to match in changelog text or changedby", min_length=1, ), ], db: Session = Depends(get_db), ) -> list[ChangelogV1]: """Legacy endpoint; prefer /v2/changelogs. Returns all matching rows (no pagination). Use only for backward compatibility. """ # Add advisory / deprecation style headers response.headers["Warning"] = ( '299 dakweb "/changelogs is legacy; prefer /v2/changelogs (adds pagination & filters)"' ) response.headers["X-Legacy-Endpoint"] = "true" response.headers["X-Preferred-Endpoint"] = "/v2/changelogs" stmt = ( select( DBChange.date, DBChange.source, DBChange.version, DBChange.changedby, DBChangelog.changelog, ) .join(DBChangelog, DBChange.changelog_id == DBChangelog.id) .where(DBChange.source != "debian-keyring") .where( or_( DBChangelog.changelog.ilike(f"%{search_term}%"), DBChange.changedby.ilike(f"%{search_term}%"), ) ) .order_by(DBChange.seen) ) return [ ChangelogV1( date=c.date, source=c.source, version=c.version, changedby=c.changedby, changelog=c.changelog, ) for c in db.execute(stmt) ]
[docs] class Changelog(BaseModel): id: int date: str source: str version: str changedby: str changelog: str
[docs] class ChangelogResponse(BaseModel): count: int limit: int offset: int has_more: bool results: list[Changelog]
[docs] @router.get("/v2/changelogs") def changelogs_v2( search_term: Annotated[ Annotated[str, AfterValidator(str.strip)] | None, Query(description="Substring to match in changelog text or changedby"), ] = None, source: Annotated[ Annotated[str, AfterValidator(str.strip)] | None, Query(description="Exact source package name to restrict"), ] = None, since: Annotated[datetime | None, Query(description="ISO date/datetime")] = None, till: Annotated[ datetime | None, Query(description="ISO date/datetime upper bound (inclusive)") ] = None, since_id: Annotated[ int | None, Query(description="Return only rows with id > since_id", ge=0) ] = None, limit: Annotated[int, Query(ge=1, le=500)] = 50, offset: Annotated[int, Query(ge=0)] = 0, db: Session = Depends(get_db), ) -> ChangelogResponse: """Improved changelogs endpoint with date range & pagination.""" stmt = ( select( DBChange.change_id, DBChange.date, DBChange.source, DBChange.version, DBChange.changedby, DBChangelog.changelog, ) .join(DBChangelog, DBChange.changelog_id == DBChangelog.id) .where(DBChange.source != "debian-keyring") ) if search_term: like = f"%{search_term}%" stmt = stmt.where( or_( DBChangelog.changelog.ilike(like), DBChange.changedby.ilike(like), ) ) if source: stmt = stmt.where(DBChange.source == source) # Date column may be stored as TEXT; cast to timestamp for comparison if since: stmt = stmt.where(cast(DBChange.date, TIMESTAMP) >= since) if till: # Inclusive upper bound stmt = stmt.where(cast(DBChange.date, TIMESTAMP) <= till) if since_id is not None: # Strictly greater than to avoid returning the last seen row again stmt = stmt.where(DBChange.change_id > since_id) # Order by change_id so since_id acts as a stable cursor for pagination. stmt = stmt.order_by(DBChange.change_id) rows = db.execute(stmt.offset(offset).limit(limit + 1)).all() has_more = len(rows) > limit rows = rows[:limit] results = [ Changelog( id=r.change_id, date=r.date, source=r.source, version=r.version, changedby=r.changedby, changelog=r.changelog, ) for r in rows ] return ChangelogResponse( count=len(results), limit=limit, offset=offset, has_more=has_more, results=results, )