1# vim:set et sw=4: 

2 

3""" 

4Queue utility functions for dak 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2001 - 2006 James Troup <james@nocrew.org> 

8@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org> 

9@license: GNU General Public License version 2 or later 

10""" 

11 

12# This program is free software; you can redistribute it and/or modify 

13# it under the terms of the GNU General Public License as published by 

14# the Free Software Foundation; either version 2 of the License, or 

15# (at your option) any later version. 

16 

17# This program is distributed in the hope that it will be useful, 

18# but WITHOUT ANY WARRANTY; without even the implied warranty of 

19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20# GNU General Public License for more details. 

21 

22# You should have received a copy of the GNU General Public License 

23# along with this program; if not, write to the Free Software 

24# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

25 

26############################################################################### 

27 

28from . import utils 

29from .config import Config 

30from .dbconn import ( 

31 Architecture, 

32 DBBinary, 

33 DBSource, 

34 NewComment, 

35 PolicyQueueUpload, 

36 Priority, 

37 Section, 

38 Suite, 

39 get_mapped_component, 

40) 

41from .regexes import re_default_answer 

42 

43################################################################################ 

44 

45 

46def check_valid(overrides: list[dict], session) -> bool: 

47 """Check if section and priority for new overrides exist in database. 

48 

49 Additionally does sanity checks: 

50 - debian-installer packages have to be udeb (or source) 

51 - non debian-installer packages cannot be udeb 

52 

53 :param overrides: list of overrides to check. The overrides need 

54 to be given in form of a dict with the following keys: 

55 

56 - package: package name 

57 - priority 

58 - section 

59 - component 

60 - type: type of requested override ('dsc', 'deb' or 'udeb') 

61 

62 All values are strings. 

63 :return: :const:`True` if all overrides are valid, :const:`False` if there is any 

64 invalid override. 

65 """ 

66 all_valid = True 

67 for o in overrides: 

68 o["valid"] = True 

69 if session.query(Priority).filter_by(priority=o["priority"]).first() is None: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true

70 o["valid"] = False 

71 if session.query(Section).filter_by(section=o["section"]).first() is None: 71 ↛ 72line 71 didn't jump to line 72, because the condition on line 71 was never true

72 o["valid"] = False 

73 if get_mapped_component(o["component"], session) is None: 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true

74 o["valid"] = False 

75 if o["type"] not in ("dsc", "deb", "udeb"): 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true

76 raise Exception("Unknown override type {0}".format(o["type"])) 

77 if o["type"] == "udeb" and o["section"].split("/", 1)[-1] != "debian-installer": 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true

78 o["valid"] = False 

79 if o["section"].split("/", 1)[-1] == "debian-installer" and o["type"] not in ( 79 ↛ 83line 79 didn't jump to line 83, because the condition on line 79 was never true

80 "dsc", 

81 "udeb", 

82 ): 

83 o["valid"] = False 

84 all_valid = all_valid and o["valid"] 

85 return all_valid 

86 

87 

88############################################################################### 

89 

90 

91def prod_maintainer(notes, upload: PolicyQueueUpload, session, trainee=False): 

92 cnf = Config() 

93 changes = upload.changes 

94 whitelists = [upload.target_suite.mail_whitelist] 

95 

96 # Here we prepare an editor and get them ready to prod... 

97 prod_message = "\n\n=====\n\n".join([note.comment for note in notes]) 

98 answer = "E" 

99 while answer == "E": 

100 prod_message = utils.call_editor(prod_message) 

101 print("Prod message:") 

102 print( 

103 utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True) 

104 ) 

105 prompt = "[P]rod, Bug, Edit, Abandon, Quit ?" 

106 answer = "XXX" 

107 while prompt.find(answer) == -1: 

108 answer = utils.input_or_exit(prompt) 

109 m = re_default_answer.search(prompt) 

110 if answer == "": 

111 answer = m.group(1) 

112 answer = answer[:1].upper() 

113 if answer == "A": 

114 return 

115 elif answer == "Q": 

116 return 0 

117 # Otherwise, do the proding... 

118 user_email_address = utils.whoami() + " <%s>" % (cnf["Dinstall::MyAdminAddress"]) 

119 

120 is_bug = answer == "B" 

121 

122 changed_by = changes.changedby or changes.maintainer 

123 maintainer = changes.maintainer 

124 maintainer_to = utils.mail_addresses_for_upload( 

125 maintainer, changed_by, changes.fingerprint 

126 ) 

127 

128 Subst = { 

129 "__SOURCE__": upload.changes.source, 

130 "__VERSION__": upload.changes.version, 

131 "__ARCHITECTURE__": upload.changes.architecture, 

132 "__CHANGES_FILENAME__": upload.changes.changesname, 

133 "__MAINTAINER_TO__": ", ".join(maintainer_to), 

134 } 

135 

136 Subst["__FROM_ADDRESS__"] = user_email_address 

137 Subst["__PROD_MESSAGE__"] = prod_message 

138 Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"] 

139 

140 if is_bug: 

141 Subst["__DEBBUGS_CC__"] = ", ".join( 

142 maintainer_to + [cnf["Dinstall::MyEmailAddress"]] 

143 ) 

144 prod_mail_message = utils.TemplateSubst( 

145 Subst, cnf["Dir::Templates"] + "/process-new.bug" 

146 ) 

147 else: 

148 prod_mail_message = utils.TemplateSubst( 

149 Subst, cnf["Dir::Templates"] + "/process-new.prod" 

150 ) 

151 

152 # Send the prod mail 

153 utils.send_mail(prod_mail_message, whitelists=whitelists) 

154 

155 if is_bug: 

156 print("Filed bug against source package") 

157 else: 

158 print("Sent prodding message") 

159 

160 answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower() 

161 if answer != "n": 

162 comment = NewComment() 

163 comment.policy_queue = upload.policy_queue 

164 comment.package = upload.changes.source 

165 comment.version = upload.changes.version 

166 comment.comment = prod_mail_message 

167 comment.author = utils.whoami() 

168 comment.trainee = trainee 

169 session.add(comment) 

170 session.commit() 

171 

172 

173################################################################################ 

174 

175 

176def edit_note(note, upload, session, trainee=False): 

177 newnote = "" 

178 answer = "E" 

179 while answer == "E": 

180 newnote = utils.call_editor(newnote).rstrip() 

181 print("New Note:") 

182 print(utils.prefix_multi_line_string(newnote, " ")) 

183 empty_note = not newnote.strip() 

184 if empty_note: 

185 prompt = "Done, Edit, [A]bandon, Quit ?" 

186 else: 

187 prompt = "[D]one, Edit, Abandon, Quit ?" 

188 answer = "XXX" 

189 while prompt.find(answer) == -1: 

190 answer = utils.input_or_exit(prompt) 

191 m = re_default_answer.search(prompt) 

192 if answer == "": 

193 answer = m.group(1) 

194 answer = answer[:1].upper() 

195 if answer == "A": 

196 return 

197 elif answer == "Q": 

198 return 0 

199 

200 comment = NewComment() 

201 comment.policy_queue = upload.policy_queue 

202 comment.package = upload.changes.source 

203 comment.version = upload.changes.version 

204 comment.comment = newnote 

205 comment.author = utils.whoami() 

206 comment.trainee = trainee 

207 session.add(comment) 

208 session.commit() 

209 

210 

211############################################################################### 

212 

213 

214def get_suite_version_by_source(source: str, session) -> list[tuple[str, str]]: 

215 "returns a list of tuples (suite_name, version) for source package" 

216 q = ( 

217 session.query(Suite.suite_name, DBSource.version) 

218 .join(Suite.sources) 

219 .filter_by(source=source) 

220 ) 

221 return q.all() 

222 

223 

224def get_suite_version_by_package( 

225 package: str, arch_string: str, session 

226) -> list[tuple[str, str]]: 

227 """ 

228 returns a list of tuples (suite_name, version) for binary package and 

229 arch_string 

230 """ 

231 return ( 

232 session.query(Suite.suite_name, DBBinary.version) 

233 .join(Suite.binaries) 

234 .filter_by(package=package) 

235 .join(DBBinary.architecture) 

236 .filter(Architecture.arch_string.in_([arch_string, "all"])) 

237 .all() 

238 )