Coverage for daklib/queue.py: 21%

114 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# vim:set et sw=4: 

2 

3""" 

4Queue utility functions for dak 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2001 - 2006 James Troup <james@nocrew.org> 

8@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org> 

9@license: GNU General Public License version 2 or later 

10""" 

11 

12# This program is free software; you can redistribute it and/or modify 

13# it under the terms of the GNU General Public License as published by 

14# the Free Software Foundation; either version 2 of the License, or 

15# (at your option) any later version. 

16 

17# This program is distributed in the hope that it will be useful, 

18# but WITHOUT ANY WARRANTY; without even the implied warranty of 

19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20# GNU General Public License for more details. 

21 

22# You should have received a copy of the GNU General Public License 

23# along with this program; if not, write to the Free Software 

24# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

25 

26############################################################################### 

27 

28from collections.abc import Iterable 

29from typing import TYPE_CHECKING, Literal 

30 

31from . import utils 

32from .config import Config 

33from .dbconn import ( 

34 Architecture, 

35 DBBinary, 

36 DBSource, 

37 NewComment, 

38 PolicyQueueUpload, 

39 Priority, 

40 Section, 

41 Suite, 

42 get_mapped_component, 

43) 

44from .regexes import re_default_answer 

45 

46if TYPE_CHECKING: 

47 from sqlalchemy.engine import Row 

48 from sqlalchemy.orm import Session 

49 

50 from .policy import MissingOverride 

51 

52################################################################################ 

53 

54 

55def check_valid(overrides: list["MissingOverride"], session: "Session") -> bool: 

56 """Check if section and priority for new overrides exist in database. 

57 

58 Additionally does sanity checks: 

59 - debian-installer packages have to be udeb (or source) 

60 - non debian-installer packages cannot be udeb 

61 

62 :param overrides: list of overrides to check. The overrides need 

63 to be given in form of a dict with the following keys: 

64 

65 - package: package name 

66 - priority 

67 - section 

68 - component 

69 - type: type of requested override ('dsc', 'deb' or 'udeb') 

70 

71 All values are strings. 

72 :return: :const:`True` if all overrides are valid, :const:`False` if there is any 

73 invalid override. 

74 """ 

75 all_valid = True 

76 for o in overrides: 

77 o["valid"] = True 

78 if session.query(Priority).filter_by(priority=o["priority"]).first() is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 o["valid"] = False 

80 if session.query(Section).filter_by(section=o["section"]).first() is None: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 o["valid"] = False 

82 if get_mapped_component(o["component"], session) is None: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 o["valid"] = False 

84 if o["type"] not in ("dsc", "deb", "udeb"): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 raise Exception("Unknown override type {0}".format(o["type"])) 

86 if o["type"] == "udeb" and o["section"].split("/", 1)[-1] != "debian-installer": 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 o["valid"] = False 

88 if o["section"].split("/", 1)[-1] == "debian-installer" and o["type"] not in ( 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was never true

89 "dsc", 

90 "udeb", 

91 ): 

92 o["valid"] = False 

93 all_valid = all_valid and o["valid"] 

94 return all_valid 

95 

96 

97############################################################################### 

98 

99 

100def prod_maintainer( 

101 notes: Iterable[NewComment], 

102 upload: PolicyQueueUpload, 

103 session: "Session", 

104 trainee=False, 

105) -> Literal[0] | None: 

106 cnf = Config() 

107 changes = upload.changes 

108 whitelists = [upload.target_suite.mail_whitelist] 

109 

110 # Here we prepare an editor and get them ready to prod... 

111 prod_message = "\n\n=====\n\n".join([note.comment for note in notes]) 

112 answer = "E" 

113 while answer == "E": 

114 prod_message = utils.call_editor(prod_message) 

115 print("Prod message:") 

116 print( 

117 utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True) 

118 ) 

119 prompt = "[P]rod, Bug, Edit, Abandon, Quit ?" 

120 answer = "XXX" 

121 while prompt.find(answer) == -1: 

122 answer = utils.input_or_exit(prompt) 

123 m = re_default_answer.search(prompt) 

124 if answer == "": 

125 assert m is not None 

126 answer = m.group(1) 

127 answer = answer[:1].upper() 

128 if answer == "A": 

129 return None 

130 elif answer == "Q": 

131 return 0 

132 # Otherwise, do the proding... 

133 user_email_address = utils.whoami() + " <%s>" % (cnf["Dinstall::MyAdminAddress"]) 

134 

135 is_bug = answer == "B" 

136 

137 changed_by = changes.changedby or changes.maintainer 

138 maintainer = changes.maintainer 

139 maintainer_to = utils.mail_addresses_for_upload( 

140 maintainer, changed_by, changes.fingerprint, changes.authorized_by_fingerprint 

141 ) 

142 

143 Subst = { 

144 "__SOURCE__": upload.changes.source, 

145 "__VERSION__": upload.changes.version, 

146 "__ARCHITECTURE__": upload.changes.architecture, 

147 "__CHANGES_FILENAME__": upload.changes.changesname, 

148 "__MAINTAINER_TO__": ", ".join(maintainer_to), 

149 } 

150 

151 Subst["__FROM_ADDRESS__"] = user_email_address 

152 Subst["__PROD_MESSAGE__"] = prod_message 

153 Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"] 

154 

155 if is_bug: 

156 Subst["__DEBBUGS_CC__"] = ", ".join( 

157 maintainer_to + [cnf["Dinstall::MyEmailAddress"]] 

158 ) 

159 prod_mail_message = utils.TemplateSubst( 

160 Subst, cnf["Dir::Templates"] + "/process-new.bug" 

161 ) 

162 else: 

163 prod_mail_message = utils.TemplateSubst( 

164 Subst, cnf["Dir::Templates"] + "/process-new.prod" 

165 ) 

166 

167 # Send the prod mail 

168 utils.send_mail(prod_mail_message, whitelists=whitelists) 

169 

170 if is_bug: 

171 print("Filed bug against source package") 

172 else: 

173 print("Sent prodding message") 

174 

175 answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower() 

176 if answer != "n": 

177 comment = NewComment() 

178 comment.policy_queue = upload.policy_queue 

179 comment.package = upload.changes.source 

180 comment.version = upload.changes.version 

181 comment.comment = prod_mail_message 

182 comment.author = utils.whoami() 

183 comment.trainee = trainee 

184 session.add(comment) 

185 session.commit() 

186 

187 return None 

188 

189 

190################################################################################ 

191 

192 

193def edit_note( 

194 upload: PolicyQueueUpload, session: "Session", trainee=False 

195) -> Literal[0] | None: 

196 newnote = "" 

197 answer = "E" 

198 while answer == "E": 

199 newnote = utils.call_editor(newnote).rstrip() 

200 print("New Note:") 

201 print(utils.prefix_multi_line_string(newnote, " ")) 

202 empty_note = not newnote.strip() 

203 if empty_note: 

204 prompt = "Done, Edit, [A]bandon, Quit ?" 

205 else: 

206 prompt = "[D]one, Edit, Abandon, Quit ?" 

207 answer = "XXX" 

208 while prompt.find(answer) == -1: 

209 answer = utils.input_or_exit(prompt) 

210 m = re_default_answer.search(prompt) 

211 if answer == "": 

212 assert m is not None 

213 answer = m.group(1) 

214 answer = answer[:1].upper() 

215 if answer == "A": 

216 return None 

217 elif answer == "Q": 

218 return 0 

219 

220 comment = NewComment() 

221 comment.policy_queue = upload.policy_queue 

222 comment.package = upload.changes.source 

223 comment.version = upload.changes.version 

224 comment.comment = newnote 

225 comment.author = utils.whoami() 

226 comment.trainee = trainee 

227 session.add(comment) 

228 session.commit() 

229 

230 return None 

231 

232 

233############################################################################### 

234 

235 

236def get_suite_version_by_source( 

237 source: str, session: "Session" 

238) -> "list[Row[tuple[str, str]]]": 

239 "returns a list of tuples (suite_name, version) for source package" 

240 q = ( 

241 session.query(Suite.suite_name, DBSource.version) 

242 .join(Suite.sources) 

243 .filter_by(source=source) 

244 ) 

245 return q.all() 

246 

247 

248def get_suite_version_by_package( 

249 package: str, arch_string: str, session: "Session" 

250) -> "list[Row[tuple[str, str]]]": 

251 """ 

252 returns a list of tuples (suite_name, version) for binary package and 

253 arch_string 

254 """ 

255 return ( 

256 session.query(Suite.suite_name, DBBinary.version) 

257 .join(Suite.binaries) 

258 .filter_by(package=package) 

259 .join(DBBinary.architecture) 

260 .filter(Architecture.arch_string.in_([arch_string, "all"])) 

261 .all() 

262 )