Coverage for daklib/ls.py: 63%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1"""List packages according to various criteria 

2 

3@copyright: 2014, Ansgar Burchardt <ansgar@debian.org> 

4@license: GPL-2+ 

5""" 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License 

18# along with this program; if not, write to the Free Software 

19# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

20 

21from collections import defaultdict 

22from collections.abc import Collection, Iterable 

23 

24import sqlalchemy.dialects.postgresql as pgsql 

25import sqlalchemy.sql as sql 

26from sqlalchemy import ColumnElement 

27 

28from daklib.dbconn import DBConn, package_list 

29 

30 

31def list_packages( 

32 packages: Collection[str], 

33 suites: Collection[str] | None = None, 

34 components: Collection[str] | None = None, 

35 architectures: Collection[str] | None = None, 

36 binary_types: Collection[str] | None = None, 

37 source_and_binary=False, 

38 regex=False, 

39 format: str | None = None, 

40 highest: str | None = None, 

41) -> Iterable[str] | Iterable[dict[str, dict[str, dict[str, dict[str, str]]]]]: 

42 session = DBConn().session() 

43 try: 

44 t = package_list 

45 

46 comparison_operator = "~" if regex else "=" 

47 

48 where: ColumnElement[bool] = sql.false() 

49 for package in packages: 

50 where = where | t.c.package.op(comparison_operator)(package) 

51 if source_and_binary: 

52 where = where | t.c.source.op(comparison_operator)(package) 

53 

54 if suites is not None: 

55 where = where & (t.c.suite.in_(suites) | t.c.codename.in_(suites)) 

56 if components is not None: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 where = where & t.c.component.in_(components) 

58 if architectures is not None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 where = where & t.c.architecture.in_(architectures) 

60 if binary_types is not None: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 where = where & t.c.type.in_(binary_types) 

62 

63 if format is None: 

64 c_architectures = sql.func.string_agg( 

65 t.c.architecture, 

66 pgsql.aggregate_order_by( 

67 sql.literal(", "), 

68 t.c.architecture_is_source.desc(), 

69 t.c.architecture, 

70 ), 

71 ) 

72 query = ( 

73 sql.select(t.c.package, t.c.version, t.c.display_suite, c_architectures) 

74 .where(where) 

75 .group_by(t.c.package, t.c.version, t.c.display_suite) 

76 .order_by(t.c.package, t.c.version, t.c.display_suite) 

77 ) 

78 result = session.execute(query).mappings().all() 

79 

80 if len(result) == 0: 

81 return 

82 

83 lengths = { 

84 "package": max(10, max(len(row[t.c.package]) for row in result)), 

85 "version": max(13, max(len(row[t.c.version]) for row in result)), 

86 "suite": max(10, max(len(row[t.c.display_suite]) for row in result)), 

87 } 

88 format = "{0:{lengths[package]}} | {1:{lengths[version]}} | {2:{lengths[suite]}} | {3}" 

89 

90 for row in result: 

91 yield format.format( 

92 row[t.c.package], 

93 row[t.c.version], 

94 row[t.c.display_suite], 

95 row[c_architectures], 

96 lengths=lengths, 

97 ) 

98 elif format in ("control-suite", "heidi"): 98 ↛ 104line 98 didn't jump to line 104 because the condition on line 98 was always true

99 query = sql.select(t.c.package, t.c.version, t.c.architecture).where(where) 

100 for row in session.execute(query).mappings(): 

101 yield "{0} {1} {2}".format( 

102 row[t.c.package], row[t.c.version], row[t.c.architecture] 

103 ) 

104 elif format == "python": 

105 c_architectures = sql.func.string_agg( 

106 t.c.architecture, 

107 pgsql.aggregate_order_by( 

108 sql.literal(","), 

109 t.c.architecture_is_source.desc(), 

110 t.c.architecture, 

111 ), 

112 ) 

113 query = ( 

114 sql.select( 

115 t.c.package, 

116 t.c.version, 

117 t.c.display_suite, 

118 c_architectures, 

119 t.c.source, 

120 t.c.source_version, 

121 t.c.component, 

122 ) 

123 .where(where) 

124 .group_by( 

125 t.c.package, 

126 t.c.version, 

127 t.c.display_suite, 

128 t.c.source, 

129 t.c.component, 

130 t.c.source_version, 

131 ) 

132 ) 

133 result = session.execute(query).mappings().all() 

134 

135 if len(result) == 0: 

136 return 

137 

138 def val(): 

139 return defaultdict(val) 

140 

141 ret = val() 

142 for row in result: 

143 ret[row[t.c.package]][row[t.c.display_suite]][row[t.c.version]] = { 

144 "component": row[t.c.component], 

145 "architectures": row[c_architectures].split(","), 

146 "source": row[t.c.source], 

147 "source_version": row[t.c.source_version], 

148 } 

149 

150 yield ret 

151 return 

152 else: 

153 raise ValueError("Unknown output format requested.") 

154 

155 if highest is not None: 155 ↛ 156line 155 didn't jump to line 156

156 query = ( 

157 sql.select(t.c.package, sql.func.max(t.c.version)) 

158 .where(where) 

159 .group_by(t.c.package) 

160 .order_by(t.c.package) 

161 ) 

162 yield "" 

163 for r in session.execute(query): 

164 yield "{0} ({1} {2})".format(r[0], highest, r[1]) 

165 finally: 

166 session.close()