Coverage for daklib/policy.py: 88%

195 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to process policy queue uploads""" 

18 

19import errno 

20import os 

21import shutil 

22from typing import TYPE_CHECKING, NotRequired, Optional, TypedDict 

23 

24import daklib.utils as utils 

25 

26from .config import Config 

27from .dbconn import ( 

28 Component, 

29 Override, 

30 OverrideType, 

31 PolicyQueueUpload, 

32 Priority, 

33 Section, 

34 Suite, 

35 get_mapped_component, 

36 get_mapped_component_name, 

37) 

38from .fstransactions import FilesystemTransaction 

39from .packagelist import PackageList 

40from .regexes import re_file_changes, re_file_safe 

41 

42if TYPE_CHECKING: 

43 from sqlalchemy.orm import Session 

44 

45 

46class UploadCopy: 

47 """export a policy queue upload 

48 

49 This class can be used in a with-statement:: 

50 

51 with UploadCopy(...) as copy: 

52 ... 

53 

54 Doing so will provide a temporary copy of the upload in the directory 

55 given by the :attr:`directory` attribute. The copy will be removed 

56 on leaving the with-block. 

57 """ 

58 

59 def __init__(self, upload: PolicyQueueUpload, group: Optional[str] = None): 

60 """initializer 

61 

62 :param upload: upload to handle 

63 """ 

64 

65 self._directory: Optional[str] = None 

66 self.upload = upload 

67 self.group = group 

68 

69 @property 

70 def directory(self) -> str: 

71 assert self._directory is not None 

72 return self._directory 

73 

74 def export( 

75 self, 

76 directory: str, 

77 mode: Optional[int] = None, 

78 symlink: bool = True, 

79 ignore_existing: bool = False, 

80 ) -> None: 

81 """export a copy of the upload 

82 

83 :param directory: directory to export to 

84 :param mode: permissions to use for the copied files 

85 :param symlink: use symlinks instead of copying the files 

86 :param ignore_existing: ignore already existing files 

87 """ 

88 with FilesystemTransaction() as fs: 

89 source = self.upload.source 

90 queue = self.upload.policy_queue 

91 

92 if source is not None: 

93 for dsc_file in source.srcfiles: 

94 f = dsc_file.poolfile 

95 dst = os.path.join(directory, os.path.basename(f.filename)) 

96 if not os.path.exists(dst) or not ignore_existing: 96 ↛ 93line 96 didn't jump to line 93 because the condition on line 96 was always true

97 fs.copy(f.fullpath, dst, mode=mode, symlink=symlink) 

98 

99 for binary in self.upload.binaries: 

100 f = binary.poolfile 

101 dst = os.path.join(directory, os.path.basename(f.filename)) 

102 if not os.path.exists(dst) or not ignore_existing: 102 ↛ 99line 102 didn't jump to line 99 because the condition on line 102 was always true

103 fs.copy(f.fullpath, dst, mode=mode, symlink=symlink) 

104 

105 # copy byhand files 

106 for byhand in self.upload.byhand: 106 ↛ 107line 106 didn't jump to line 107 because the loop on line 106 never started

107 src = os.path.join(queue.path, byhand.filename) 

108 dst = os.path.join(directory, byhand.filename) 

109 if os.path.exists(src) and ( 

110 not os.path.exists(dst) or not ignore_existing 

111 ): 

112 fs.copy(src, dst, mode=mode, symlink=symlink) 

113 

114 # copy .changes 

115 src = os.path.join(queue.path, self.upload.changes.changesname) 

116 dst = os.path.join(directory, self.upload.changes.changesname) 

117 if not os.path.exists(dst) or not ignore_existing: 117 ↛ 88line 117 didn't jump to line 88

118 fs.copy(src, dst, mode=mode, symlink=symlink) 

119 

120 def __enter__(self): 

121 assert self._directory is None 

122 

123 mode = 0o0700 

124 symlink = True 

125 if self.group is not None: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 mode = 0o2750 

127 symlink = False 

128 

129 cnf = Config() 

130 self._directory = utils.temp_dirname( 

131 parent=cnf.get("Dir::TempPath"), mode=mode, group=self.group 

132 ) 

133 self.export(self.directory, symlink=symlink) 

134 return self 

135 

136 def __exit__(self, *args): 

137 if self._directory is not None: 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true

138 shutil.rmtree(self._directory) 

139 self._directory = None 

140 return None 

141 

142 

143class MissingOverride(TypedDict): 

144 package: str 

145 priority: str 

146 section: str 

147 component: str 

148 type: str 

149 included: bool 

150 valid: NotRequired[bool] 

151 

152 

153class PolicyQueueUploadHandler: 

154 """process uploads to policy queues 

155 

156 This class allows to accept or reject uploads and to get a list of missing 

157 overrides (for NEW processing). 

158 """ 

159 

160 def __init__(self, upload: PolicyQueueUpload, session: "Session"): 

161 """initializer 

162 

163 :param upload: upload to process 

164 :param session: database session 

165 """ 

166 self.upload = upload 

167 self.session = session 

168 

169 @property 

170 def _overridesuite(self) -> Suite: 

171 overridesuite = self.upload.target_suite 

172 if overridesuite.overridesuite is not None: 

173 overridesuite = ( 

174 self.session.query(Suite) 

175 .filter_by(suite_name=overridesuite.overridesuite) 

176 .one() 

177 ) 

178 return overridesuite 

179 

180 def _source_override(self, component_name: str) -> Override | None: 

181 assert self.upload.source is not None 

182 package = self.upload.source.source 

183 suite = self._overridesuite 

184 component = get_mapped_component(component_name, self.session) 

185 query = ( 

186 self.session.query(Override) 

187 .filter_by(package=package, suite=suite) 

188 .join(OverrideType) 

189 .filter(OverrideType.overridetype == "dsc") 

190 .filter(Override.component == component) 

191 ) 

192 return query.first() 

193 

194 def _binary_override( 

195 self, name: str, binarytype, component_name: str 

196 ) -> Override | None: 

197 suite = self._overridesuite 

198 component = get_mapped_component(component_name, self.session) 

199 query = ( 

200 self.session.query(Override) 

201 .filter_by(package=name, suite=suite) 

202 .join(OverrideType) 

203 .filter(OverrideType.overridetype == binarytype) 

204 .filter(Override.component == component) 

205 ) 

206 return query.first() 

207 

208 @property 

209 def _changes_prefix(self) -> str: 

210 changesname = self.upload.changes.changesname 

211 assert changesname.endswith(".changes") 

212 assert re_file_changes.match(changesname) 

213 return changesname[0:-8] 

214 

215 def accept(self) -> None: 

216 """mark upload as accepted""" 

217 assert len(self.missing_overrides()) == 0 

218 

219 fn1 = "ACCEPT.{0}".format(self._changes_prefix) 

220 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1) 

221 try: 

222 fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) 

223 with os.fdopen(fh, "wt") as f: 

224 f.write("OK\n") 

225 except OSError as e: 

226 if e.errno == errno.EEXIST: 

227 pass 

228 else: 

229 raise 

230 

231 def reject(self, reason: str) -> None: 

232 """mark upload as rejected 

233 

234 :param reason: reason for the rejection 

235 """ 

236 cnf = Config() 

237 

238 fn1 = "REJECT.{0}".format(self._changes_prefix) 

239 assert re_file_safe.match(fn1) 

240 

241 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1) 

242 try: 

243 fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) 

244 with os.fdopen(fh, "wt") as f: 

245 f.write("NOTOK\n") 

246 f.write( 

247 "From: {0} <{1}>\n\n".format( 

248 utils.whoami(), cnf["Dinstall::MyAdminAddress"] 

249 ) 

250 ) 

251 f.write(reason) 

252 except OSError as e: 

253 if e.errno == errno.EEXIST: 

254 pass 

255 else: 

256 raise 

257 

258 def get_action(self) -> Optional[str]: 

259 """get current action 

260 

261 :return: string giving the current action, one of 'ACCEPT', 'ACCEPTED', 'REJECT' 

262 """ 

263 changes_prefix = self._changes_prefix 

264 

265 for action in ("ACCEPT", "ACCEPTED", "REJECT"): 

266 fn1 = "{0}.{1}".format(action, changes_prefix) 

267 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1) 

268 if os.path.exists(fn): 

269 return action 

270 

271 return None 

272 

273 def missing_overrides( 

274 self, hints: Optional[list[MissingOverride]] = None 

275 ) -> list[MissingOverride]: 

276 """get missing override entries for the upload 

277 

278 :param hints: suggested hints for new overrides in the same format as 

279 the return value 

280 :return: list of dicts with the following keys: 

281 

282 - package: package name 

283 - priority: default priority (from upload) 

284 - section: default section (from upload) 

285 - component: default component (from upload) 

286 - type: type of required override ('dsc', 'deb' or 'udeb') 

287 

288 All values are strings. 

289 """ 

290 # TODO: use Package-List field 

291 missing: list[MissingOverride] = [] 

292 components = set() 

293 

294 source = self.upload.source 

295 

296 if hints is None: 

297 hints = [] 

298 hints_map = dict([((o["type"] or "", o["package"]), o) for o in hints]) 

299 

300 def check_override( 

301 name: str, 

302 type: str | None, 

303 priority: str | None, 

304 section: str | None, 

305 included: bool, 

306 ) -> None: 

307 type = type or "" 

308 section = section or "" 

309 component = "main" 

310 if section.find("/") != -1: 

311 component = section.split("/", 1)[0] 

312 override = self._binary_override(name, type, component) 

313 if override is None and not any( 

314 o["package"] == name and o["type"] == type for o in missing 

315 ): 

316 hint = hints_map.get((type, name)) 

317 if hint is not None: 

318 missing.append(hint) 

319 component = hint["component"] 

320 else: 

321 missing.append( 

322 { 

323 "package": name, 

324 "priority": priority or "", 

325 "section": section, 

326 "component": component or "", 

327 "type": type or "", 

328 "included": included, 

329 } 

330 ) 

331 components.add(component) 

332 

333 for binary in self.upload.binaries: 

334 binary_proxy = binary.proxy 

335 priority = binary_proxy.get("Priority", "optional") 

336 section = binary_proxy["Section"] 

337 check_override( 

338 binary.package, binary.binarytype, priority, section, included=True 

339 ) 

340 

341 if source is not None: 

342 source_proxy = source.proxy 

343 package_list = PackageList(source_proxy) 

344 if not package_list.fallback: 344 ↛ 354line 344 didn't jump to line 354 because the condition on line 344 was always true

345 packages = package_list.packages_for_suite(self.upload.target_suite) 

346 for p in packages: 

347 check_override( 

348 p.name, p.type, p.priority, p.section, included=False 

349 ) 

350 

351 # see daklib.archive.source_component_from_package_list 

352 # which we cannot use here as we might not have a Package-List 

353 # field for old packages 

354 mapped_components = [get_mapped_component_name(c) for c in components] 

355 source_component_db = ( 

356 self.session.query(Component) 

357 .order_by(Component.ordering) 

358 .filter(Component.component_name.in_(mapped_components)) 

359 .first() 

360 ) 

361 assert source_component_db is not None 

362 source_component = source_component_db.component_name 

363 

364 override = self._source_override(source_component) 

365 if override is None: 

366 hint = hints_map.get(("dsc", source.source)) 

367 if hint is not None: 

368 missing.append(hint) 

369 else: 

370 section = "misc" 

371 if source_component != "main": 

372 section = "{0}/{1}".format(source_component, section) 

373 missing.append( 

374 dict( 

375 package=source.source, 

376 priority="optional", 

377 section=section, 

378 component=source_component, 

379 type="dsc", 

380 included=True, 

381 ) 

382 ) 

383 

384 return missing 

385 

386 def add_overrides(self, new_overrides, suite: Suite) -> None: 

387 if suite.overridesuite is not None: 

388 suite = ( 

389 self.session.query(Suite) 

390 .filter_by(suite_name=suite.overridesuite) 

391 .one() 

392 ) 

393 

394 for override in new_overrides: 

395 package = override["package"] 

396 priority = ( 

397 self.session.query(Priority) 

398 .filter_by(priority=override["priority"]) 

399 .first() 

400 ) 

401 section = ( 

402 self.session.query(Section) 

403 .filter_by(section=override["section"]) 

404 .first() 

405 ) 

406 component = get_mapped_component(override["component"], self.session) 

407 overridetype = ( 

408 self.session.query(OverrideType) 

409 .filter_by(overridetype=override["type"]) 

410 .one() 

411 ) 

412 

413 if priority is None: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 raise Exception( 

415 "Invalid priority {0} for package {1}".format(priority, package) 

416 ) 

417 if section is None: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 raise Exception( 

419 "Invalid section {0} for package {1}".format(section, package) 

420 ) 

421 if component is None: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 raise Exception( 

423 "Invalid component {0} for package {1}".format(component, package) 

424 ) 

425 

426 o = Override( 

427 package=package, 

428 suite=suite, 

429 component=component, 

430 priority=priority, 

431 section=section, 

432 overridetype=overridetype, 

433 ) 

434 self.session.add(o) 

435 

436 self.session.commit()