1# vim:set et sw=4: 

2 

3""" 

4Queue utility functions for dak 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2001 - 2006 James Troup <james@nocrew.org> 

8@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org> 

9@license: GNU General Public License version 2 or later 

10""" 

11 

12# This program is free software; you can redistribute it and/or modify 

13# it under the terms of the GNU General Public License as published by 

14# the Free Software Foundation; either version 2 of the License, or 

15# (at your option) any later version. 

16 

17# This program is distributed in the hope that it will be useful, 

18# but WITHOUT ANY WARRANTY; without even the implied warranty of 

19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20# GNU General Public License for more details. 

21 

22# You should have received a copy of the GNU General Public License 

23# along with this program; if not, write to the Free Software 

24# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

25 

26############################################################################### 

27 

28from . import utils 

29 

30from .regexes import * 

31from .config import Config 

32from .dbconn import * 

33 

34################################################################################ 

35 

36 

37def check_valid(overrides: list[dict], session) -> bool: 

38 """Check if section and priority for new overrides exist in database. 

39 

40 Additionally does sanity checks: 

41 - debian-installer packages have to be udeb (or source) 

42 - non debian-installer packages cannot be udeb 

43 

44 :param overrides: list of overrides to check. The overrides need 

45 to be given in form of a dict with the following keys: 

46 

47 - package: package name 

48 - priority 

49 - section 

50 - component 

51 - type: type of requested override ('dsc', 'deb' or 'udeb') 

52 

53 All values are strings. 

54 :return: :const:`True` if all overrides are valid, :const:`False` if there is any 

55 invalid override. 

56 """ 

57 all_valid = True 

58 for o in overrides: 

59 o['valid'] = True 

60 if session.query(Priority).filter_by(priority=o['priority']).first() is None: 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true

61 o['valid'] = False 

62 if session.query(Section).filter_by(section=o['section']).first() is None: 62 ↛ 63line 62 didn't jump to line 63, because the condition on line 62 was never true

63 o['valid'] = False 

64 if get_mapped_component(o['component'], session) is None: 64 ↛ 65line 64 didn't jump to line 65, because the condition on line 64 was never true

65 o['valid'] = False 

66 if o['type'] not in ('dsc', 'deb', 'udeb'): 66 ↛ 67line 66 didn't jump to line 67, because the condition on line 66 was never true

67 raise Exception('Unknown override type {0}'.format(o['type'])) 

68 if o['type'] == 'udeb' and o['section'].split('/', 1)[-1] != 'debian-installer': 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true

69 o['valid'] = False 

70 if o['section'].split('/', 1)[-1] == 'debian-installer' and o['type'] not in ('dsc', 'udeb'): 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true

71 o['valid'] = False 

72 all_valid = all_valid and o['valid'] 

73 return all_valid 

74 

75############################################################################### 

76 

77 

78def prod_maintainer(notes, upload: PolicyQueueUpload, session, trainee=False): 

79 cnf = Config() 

80 changes = upload.changes 

81 whitelists = [upload.target_suite.mail_whitelist] 

82 

83 # Here we prepare an editor and get them ready to prod... 

84 prod_message = "\n\n=====\n\n".join([note.comment for note in notes]) 

85 answer = 'E' 

86 while answer == 'E': 

87 prod_message = utils.call_editor(prod_message) 

88 print("Prod message:") 

89 print(utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True)) 

90 prompt = "[P]rod, Edit, Abandon, Quit ?" 

91 answer = "XXX" 

92 while prompt.find(answer) == -1: 

93 answer = utils.input_or_exit(prompt) 

94 m = re_default_answer.search(prompt) 

95 if answer == "": 

96 answer = m.group(1) 

97 answer = answer[:1].upper() 

98 if answer == 'A': 

99 return 

100 elif answer == 'Q': 

101 return 0 

102 # Otherwise, do the proding... 

103 user_email_address = utils.whoami() + " <%s>" % ( 

104 cnf["Dinstall::MyAdminAddress"]) 

105 

106 changed_by = changes.changedby or changes.maintainer 

107 maintainer = changes.maintainer 

108 maintainer_to = utils.mail_addresses_for_upload(maintainer, changed_by, changes.fingerprint) 

109 

110 Subst = { 

111 '__SOURCE__': upload.changes.source, 

112 '__VERSION__': upload.changes.version, 

113 '__ARCHITECTURE__': upload.changes.architecture, 

114 '__CHANGES_FILENAME__': upload.changes.changesname, 

115 '__MAINTAINER_TO__': ", ".join(maintainer_to), 

116 } 

117 

118 Subst["__FROM_ADDRESS__"] = user_email_address 

119 Subst["__PROD_MESSAGE__"] = prod_message 

120 Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"] 

121 

122 prod_mail_message = utils.TemplateSubst( 

123 Subst, cnf["Dir::Templates"] + "/process-new.prod") 

124 

125 # Send the prod mail 

126 utils.send_mail(prod_mail_message, whitelists=whitelists) 

127 

128 print("Sent prodding message") 

129 

130 answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower() 

131 if answer != "n": 

132 comment = NewComment() 

133 comment.policy_queue = upload.policy_queue 

134 comment.package = upload.changes.source 

135 comment.version = upload.changes.version 

136 comment.comment = prod_mail_message 

137 comment.author = utils.whoami() 

138 comment.trainee = trainee 

139 session.add(comment) 

140 session.commit() 

141 

142################################################################################ 

143 

144 

145def edit_note(note, upload, session, trainee=False): 

146 newnote = "" 

147 answer = 'E' 

148 while answer == 'E': 

149 newnote = utils.call_editor(newnote).rstrip() 

150 print("New Note:") 

151 print(utils.prefix_multi_line_string(newnote, " ")) 

152 empty_note = not newnote.strip() 

153 if empty_note: 

154 prompt = "Done, Edit, [A]bandon, Quit ?" 

155 else: 

156 prompt = "[D]one, Edit, Abandon, Quit ?" 

157 answer = "XXX" 

158 while prompt.find(answer) == -1: 

159 answer = utils.input_or_exit(prompt) 

160 m = re_default_answer.search(prompt) 

161 if answer == "": 

162 answer = m.group(1) 

163 answer = answer[:1].upper() 

164 if answer == 'A': 

165 return 

166 elif answer == 'Q': 

167 return 0 

168 

169 comment = NewComment() 

170 comment.policy_queue = upload.policy_queue 

171 comment.package = upload.changes.source 

172 comment.version = upload.changes.version 

173 comment.comment = newnote 

174 comment.author = utils.whoami() 

175 comment.trainee = trainee 

176 session.add(comment) 

177 session.commit() 

178 

179############################################################################### 

180 

181 

182def get_suite_version_by_source(source: str, session) -> list[tuple[str, str]]: 

183 'returns a list of tuples (suite_name, version) for source package' 

184 q = session.query(Suite.suite_name, DBSource.version). \ 

185 join(Suite.sources).filter_by(source=source) 

186 return q.all() 

187 

188 

189def get_suite_version_by_package(package: str, arch_string: str, session) -> list[tuple[str, str]]: 

190 ''' 

191 returns a list of tuples (suite_name, version) for binary package and 

192 arch_string 

193 ''' 

194 return session.query(Suite.suite_name, DBBinary.version). \ 

195 join(Suite.binaries).filter_by(package=package). \ 

196 join(DBBinary.architecture). \ 

197 filter(Architecture.arch_string.in_([arch_string, 'all'])).all()