Coverage for daklib/ls.py: 63%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1"""List packages according to various criteria 

2 

3@copyright: 2014, Ansgar Burchardt <ansgar@debian.org> 

4@license: GPL-2+ 

5""" 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License 

18# along with this program; if not, write to the Free Software 

19# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

20 

21from collections import defaultdict 

22from collections.abc import Collection, Iterable 

23 

24import sqlalchemy.dialects.postgresql as pgsql 

25import sqlalchemy.sql as sql 

26from sqlalchemy import ColumnElement 

27 

28from daklib.dbconn import DBConn, package_list 

29 

30 

31def list_packages( 

32 packages: Collection[str], 

33 suites: Collection[str] | None = None, 

34 components: Collection[str] | None = None, 

35 architectures: Collection[str] | None = None, 

36 binary_types: Collection[str] | None = None, 

37 source_and_binary=False, 

38 regex=False, 

39 format: str | None = None, 

40 highest: str | None = None, 

41) -> Iterable[str] | Iterable[dict[str, dict[str, dict[str, dict[str, str]]]]]: 

42 session = DBConn().session() 

43 try: 

44 t = package_list 

45 

46 comparison_operator = "~" if regex else "=" 

47 

48 where: ColumnElement[bool] = sql.false() 

49 for package in packages: 

50 where = where | t.c.package.op(comparison_operator)(package) 

51 if source_and_binary: 

52 where = where | t.c.source.op(comparison_operator)(package) 

53 

54 if suites is not None: 

55 where = where & (t.c.suite.in_(suites) | t.c.codename.in_(suites)) 

56 if components is not None: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 where = where & t.c.component.in_(components) 

58 if architectures is not None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 where = where & t.c.architecture.in_(architectures) 

60 if binary_types is not None: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 where = where & t.c.type.in_(binary_types) 

62 

63 if format is None: 

64 c_architectures = sql.func.string_agg( 

65 t.c.architecture, 

66 pgsql.aggregate_order_by( 

67 ", ", t.c.architecture_is_source.desc(), t.c.architecture 

68 ), 

69 ) 

70 query = ( 

71 sql.select(t.c.package, t.c.version, t.c.display_suite, c_architectures) 

72 .where(where) 

73 .group_by(t.c.package, t.c.version, t.c.display_suite) 

74 .order_by(t.c.package, t.c.version, t.c.display_suite) 

75 ) 

76 result = session.execute(query).mappings().all() 

77 

78 if len(result) == 0: 

79 return 

80 

81 lengths = { 

82 "package": max(10, max(len(row[t.c.package]) for row in result)), 

83 "version": max(13, max(len(row[t.c.version]) for row in result)), 

84 "suite": max(10, max(len(row[t.c.display_suite]) for row in result)), 

85 } 

86 format = "{0:{lengths[package]}} | {1:{lengths[version]}} | {2:{lengths[suite]}} | {3}" 

87 

88 for row in result: 

89 yield format.format( 

90 row[t.c.package], 

91 row[t.c.version], 

92 row[t.c.display_suite], 

93 row[c_architectures], 

94 lengths=lengths, 

95 ) 

96 elif format in ("control-suite", "heidi"): 96 ↛ 102line 96 didn't jump to line 102 because the condition on line 96 was always true

97 query = sql.select(t.c.package, t.c.version, t.c.architecture).where(where) 

98 for row in session.execute(query).mappings(): 

99 yield "{0} {1} {2}".format( 

100 row[t.c.package], row[t.c.version], row[t.c.architecture] 

101 ) 

102 elif format == "python": 

103 c_architectures = sql.func.string_agg( 

104 t.c.architecture, 

105 pgsql.aggregate_order_by( 

106 ",", t.c.architecture_is_source.desc(), t.c.architecture 

107 ), 

108 ) 

109 query = ( 

110 sql.select( 

111 t.c.package, 

112 t.c.version, 

113 t.c.display_suite, 

114 c_architectures, 

115 t.c.source, 

116 t.c.source_version, 

117 t.c.component, 

118 ) 

119 .where(where) 

120 .group_by( 

121 t.c.package, 

122 t.c.version, 

123 t.c.display_suite, 

124 t.c.source, 

125 t.c.component, 

126 t.c.source_version, 

127 ) 

128 ) 

129 result = session.execute(query).mappings().all() 

130 

131 if len(result) == 0: 

132 return 

133 

134 def val(): 

135 return defaultdict(val) 

136 

137 ret = val() 

138 for row in result: 

139 ret[row[t.c.package]][row[t.c.display_suite]][row[t.c.version]] = { 

140 "component": row[t.c.component], 

141 "architectures": row[c_architectures].split(","), 

142 "source": row[t.c.source], 

143 "source_version": row[t.c.source_version], 

144 } 

145 

146 yield ret 

147 return 

148 else: 

149 raise ValueError("Unknown output format requested.") 

150 

151 if highest is not None: 151 ↛ 152line 151 didn't jump to line 152

152 query = ( 

153 sql.select(t.c.package, sql.func.max(t.c.version)) 

154 .where(where) 

155 .group_by(t.c.package) 

156 .order_by(t.c.package) 

157 ) 

158 yield "" 

159 for r in session.execute(query): 

160 yield "{0} ({1} {2})".format(r[0], highest, r[1]) 

161 finally: 

162 session.close()