1#! /usr/bin/env python3
2# vim:set et ts=4 sw=4:
4""" Handles packages from policy queues
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
8@copyright: 2009 Joerg Jaspert <joerg@debian.org>
9@copyright: 2009 Frank Lichtenheld <djpig@debian.org>
10@copyright: 2009 Mark Hymers <mhy@debian.org>
11@license: GNU General Public License version 2 or later
12"""
13# This program is free software; you can redistribute it and/or modify
14# it under the terms of the GNU General Public License as published by
15# the Free Software Foundation; either version 2 of the License, or
16# (at your option) any later version.
18# This program is distributed in the hope that it will be useful,
19# but WITHOUT ANY WARRANTY; without even the implied warranty of
20# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21# GNU General Public License for more details.
23# You should have received a copy of the GNU General Public License
24# along with this program; if not, write to the Free Software
25# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27################################################################################
29# <mhy> So how do we handle that at the moment?
30# <stew> Probably incorrectly.
32################################################################################
34import os
35import datetime
36import functools
37import re
38import sys
39import traceback
40import apt_pkg
41from sqlalchemy.orm.exc import NoResultFound
42import sqlalchemy.sql as sql
43from collections.abc import Callable, Iterable
44from typing import NoReturn
46from daklib.dbconn import *
47from daklib import daklog
48from daklib import utils
49from daklib.externalsignature import check_upload_for_external_signature_request
50from daklib.config import Config
51from daklib.archive import ArchiveTransaction, source_component_from_package_list
52from daklib.urgencylog import UrgencyLog
53from daklib.packagelist import PackageList
55import daklib.announce
56import daklib.upload
57import daklib.utils
59# Globals
60Options = None
61Logger = None
63################################################################################
65ProcessingCallable = Callable[[PolicyQueueUpload, PolicyQueue, str, ArchiveTransaction], None]
68def do_comments(
69 dir: str,
70 srcqueue: PolicyQueue,
71 opref: str,
72 npref: str,
73 line: str,
74 fn: ProcessingCallable,
75 transaction: ArchiveTransaction
76) -> None:
77 session = transaction.session
78 actions: list[tuple[PolicyQueueUpload, str]] = []
79 for comm in [x for x in os.listdir(dir) if x.startswith(opref)]:
80 with open(os.path.join(dir, comm)) as fd:
81 lines = fd.readlines()
82 if len(lines) == 0 or lines[0] != line + "\n": 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true
83 continue
85 # If the ACCEPT includes a _<arch> we only accept that .changes.
86 # Otherwise we accept all .changes that start with the given prefix
87 changes_prefix = comm[len(opref):]
88 if changes_prefix.count('_') < 2:
89 changes_prefix = changes_prefix + '_'
90 else:
91 changes_prefix = changes_prefix + '.changes'
93 # We need to escape "_" as we use it with the LIKE operator (via the
94 # SQLA startwith) later.
95 changes_prefix = changes_prefix.replace("_", r"\_")
97 uploads = session.query(PolicyQueueUpload).filter_by(policy_queue=srcqueue) \
98 .join(PolicyQueueUpload.changes).filter(DBChange.changesname.startswith(changes_prefix)) \
99 .order_by(PolicyQueueUpload.source_id)
100 reason = "".join(lines[1:])
101 actions.extend((u, reason) for u in uploads)
103 if opref != npref:
104 newcomm = npref + comm[len(opref):]
105 newcomm = utils.find_next_free(os.path.join(dir, newcomm))
106 transaction.fs.move(os.path.join(dir, comm), newcomm)
108 actions.sort()
110 for u, reason in actions:
111 print(("Processing changes file: {0}".format(u.changes.changesname)))
112 fn(u, srcqueue, reason, transaction)
114################################################################################
117def try_or_reject(function: ProcessingCallable) -> ProcessingCallable:
118 @functools.wraps(function)
119 def wrapper(upload: PolicyQueueUpload, srcqueue: PolicyQueue, comments: str, transaction: ArchiveTransaction) -> None:
120 try:
121 function(upload, srcqueue, comments, transaction)
122 except Exception as e:
123 comments = 'An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}'.format(traceback.format_exc(), comments)
124 try:
125 transaction.rollback()
126 real_comment_reject(upload, srcqueue, comments, transaction)
127 except Exception as e:
128 comments = 'In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}'.format(traceback.format_exc(), comments)
129 transaction.rollback()
130 real_comment_reject(upload, srcqueue, comments, transaction, notify=False)
131 if not Options['No-Action']: 131 ↛ 134line 131 didn't jump to line 134, because the condition on line 131 was never false
132 transaction.commit()
133 else:
134 transaction.rollback()
135 return wrapper
137################################################################################
140@try_or_reject
141def comment_accept(
142 upload: PolicyQueueUpload,
143 srcqueue: PolicyQueue,
144 comments: str,
145 transaction: ArchiveTransaction
146) -> None:
147 for byhand in upload.byhand: 147 ↛ 148line 147 didn't jump to line 148, because the loop on line 147 never started
148 path = os.path.join(srcqueue.path, byhand.filename)
149 if os.path.exists(path):
150 raise Exception('E: cannot ACCEPT upload with unprocessed byhand file {0}'.format(byhand.filename))
152 cnf = Config()
154 fs = transaction.fs
155 session = transaction.session
156 changesname = upload.changes.changesname
157 allow_tainted = srcqueue.suite.archive.tainted
159 # We need overrides to get the target component
160 overridesuite = upload.target_suite
161 if overridesuite.overridesuite is not None:
162 overridesuite = session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one()
164 def binary_component_func(db_binary: DBBinary) -> Component:
165 section = db_binary.proxy['Section']
166 component_name = 'main'
167 if section.find('/') != -1:
168 component_name = section.split('/', 1)[0]
169 return get_mapped_component(component_name, session=session)
171 def is_debug_binary(db_binary: DBBinary) -> bool:
172 return daklib.utils.is_in_debug_section(db_binary.proxy)
174 def has_debug_binaries(upload: PolicyQueueUpload) -> bool:
175 return any((is_debug_binary(x) for x in upload.binaries))
177 def source_component_func(db_source: DBSource) -> Component:
178 package_list = PackageList(db_source.proxy)
179 component = source_component_from_package_list(package_list, upload.target_suite)
180 if component is not None: 180 ↛ 184line 180 didn't jump to line 184, because the condition on line 180 was never false
181 return get_mapped_component(component.component_name, session=session)
183 # Fallback for packages without Package-List field
184 query = session.query(Override).filter_by(suite=overridesuite, package=db_source.source) \
185 .join(OverrideType).filter(OverrideType.overridetype == 'dsc') \
186 .join(Component)
187 return query.one().component
189 policy_queue = upload.target_suite.policy_queue
190 if policy_queue == srcqueue:
191 policy_queue = None
193 all_target_suites = [upload.target_suite if policy_queue is None else policy_queue.suite]
194 if policy_queue is None or policy_queue.send_to_build_queues: 194 ↛ 197line 194 didn't jump to line 197, because the condition on line 194 was never false
195 all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues])
197 throw_away_binaries = False
198 if upload.source is not None:
199 source_component = source_component_func(upload.source)
200 if upload.target_suite.suite_name in cnf.value_list('Dinstall::ThrowAwayNewBinarySuites') and \
201 source_component.component_name in cnf.value_list('Dinstall::ThrowAwayNewBinaryComponents'):
202 throw_away_binaries = True
204 for suite in all_target_suites:
205 debug_suite = suite.debug_suite
207 if upload.source is not None:
208 # If we have Source in this upload, let's include it into
209 # upload suite.
210 transaction.copy_source(
211 upload.source,
212 suite,
213 source_component,
214 allow_tainted=allow_tainted,
215 )
217 if not throw_away_binaries:
218 if debug_suite is not None and has_debug_binaries(upload):
219 # If we're handing a debug package, we also need to include the
220 # source in the debug suite as well.
221 transaction.copy_source(
222 upload.source,
223 debug_suite,
224 source_component_func(upload.source),
225 allow_tainted=allow_tainted,
226 )
228 if not throw_away_binaries:
229 for db_binary in upload.binaries:
230 # Now, let's work out where to copy this guy to -- if it's
231 # a debug binary, and the suite has a debug suite, let's go
232 # ahead and target the debug suite rather then the stock
233 # suite.
234 copy_to_suite = suite
235 if debug_suite is not None and is_debug_binary(db_binary):
236 copy_to_suite = debug_suite
238 # build queues and debug suites may miss the source package
239 # if this is a binary-only upload.
240 if copy_to_suite != upload.target_suite:
241 transaction.copy_source(
242 db_binary.source,
243 copy_to_suite,
244 source_component_func(db_binary.source),
245 allow_tainted=allow_tainted,
246 )
248 transaction.copy_binary(
249 db_binary,
250 copy_to_suite,
251 binary_component_func(db_binary),
252 allow_tainted=allow_tainted,
253 extra_archives=[upload.target_suite.archive],
254 )
256 check_upload_for_external_signature_request(session, suite, copy_to_suite, db_binary)
258 suite.update_last_changed()
260 # Copy .changes if needed
261 if policy_queue is None and upload.target_suite.copychanges: 261 ↛ 262line 261 didn't jump to line 262, because the condition on line 261 was never true
262 src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
263 dst = os.path.join(upload.target_suite.path, upload.changes.changesname)
264 fs.copy(src, dst, mode=upload.target_suite.archive.mode)
266 # List of files in the queue directory
267 queue_files = [changesname]
268 chg = daklib.upload.Changes(upload.policy_queue.path, changesname, keyrings=[], require_signature=False)
269 queue_files.extend(f.filename for f in chg.buildinfo_files)
271 # TODO: similar code exists in archive.py's `ArchiveUpload._install_policy`
272 if policy_queue is not None:
273 # register upload in policy queue
274 new_upload = PolicyQueueUpload()
275 new_upload.policy_queue = policy_queue
276 new_upload.target_suite = upload.target_suite
277 new_upload.changes = upload.changes
278 new_upload.source = upload.source
279 new_upload.binaries = upload.binaries
280 session.add(new_upload)
281 session.flush()
283 # copy .changes & similar to policy queue
284 for fn in queue_files:
285 src = os.path.join(upload.policy_queue.path, fn)
286 dst = os.path.join(policy_queue.path, fn)
287 transaction.fs.copy(src, dst, mode=policy_queue.change_perms)
289 # Copy upload to Process-Policy::CopyDir
290 # Used on security.d.o to sync accepted packages to ftp-master, but this
291 # should eventually be replaced by something else.
292 copydir = cnf.get('Process-Policy::CopyDir') or None
293 if policy_queue is None and copydir is not None: 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true
294 mode = upload.target_suite.archive.mode
295 if upload.source is not None:
296 for f in [df.poolfile for df in upload.source.srcfiles]:
297 dst = os.path.join(copydir, f.basename)
298 if not os.path.exists(dst):
299 fs.copy(f.fullpath, dst, mode=mode)
301 for db_binary in upload.binaries:
302 f = db_binary.poolfile
303 dst = os.path.join(copydir, f.basename)
304 if not os.path.exists(dst):
305 fs.copy(f.fullpath, dst, mode=mode)
307 for fn in queue_files:
308 src = os.path.join(upload.policy_queue.path, fn)
309 dst = os.path.join(copydir, fn)
310 # We check for `src` to exist as old uploads in policy queues
311 # might still miss the `.buildinfo` files.
312 if os.path.exists(src) and not os.path.exists(dst):
313 fs.copy(src, dst, mode=mode)
315 if policy_queue is None:
316 utils.process_buildinfos(upload.policy_queue.path, chg.buildinfo_files,
317 fs, Logger)
319 if policy_queue is None and upload.source is not None and not Options['No-Action']:
320 urgency = upload.changes.urgency
321 # As per policy 5.6.17, the urgency can be followed by a space and a
322 # comment. Extract only the urgency from the string.
323 if ' ' in urgency: 323 ↛ 324line 323 didn't jump to line 324, because the condition on line 323 was never true
324 urgency, comment = urgency.split(' ', 1)
325 if urgency not in cnf.value_list('Urgency::Valid'): 325 ↛ 326line 325 didn't jump to line 326, because the condition on line 325 was never true
326 urgency = cnf['Urgency::Default']
327 UrgencyLog().log(upload.source.source, upload.source.version, urgency)
329 if policy_queue is None:
330 print(" ACCEPT")
331 else:
332 print(" ACCEPT-TO-QUEUE")
333 if not Options['No-Action']: 333 ↛ 336line 333 didn't jump to line 336, because the condition on line 333 was never false
334 Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname])
336 if policy_queue is None:
337 pu = get_processed_upload(upload)
338 daklib.announce.announce_accept(pu)
340 # TODO: code duplication. Similar code is in process-upload.
341 # Move .changes to done
342 now = datetime.datetime.now()
343 donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d'))
344 if policy_queue is None:
345 for fn in queue_files:
346 src = os.path.join(upload.policy_queue.path, fn)
347 if os.path.exists(src): 347 ↛ 345line 347 didn't jump to line 345, because the condition on line 347 was never false
348 dst = os.path.join(donedir, fn)
349 dst = utils.find_next_free(dst)
350 fs.copy(src, dst, mode=0o644)
352 if throw_away_binaries and upload.target_suite.archive.use_morgue:
353 morguesubdir = cnf.get("New::MorgueSubDir", 'new')
355 utils.move_to_morgue(morguesubdir,
356 [db_binary.poolfile.fullpath for db_binary in upload.binaries],
357 fs, Logger)
359 remove_upload(upload, transaction)
361################################################################################
364@try_or_reject
365def comment_reject(*args) -> None:
366 real_comment_reject(*args, manual=True)
369def real_comment_reject(
370 upload: PolicyQueueUpload,
371 srcqueue: PolicyQueue,
372 comments: str,
373 transaction: ArchiveTransaction,
374 notify=True,
375 manual=False
376) -> None:
377 cnf = Config()
379 fs = transaction.fs
380 session = transaction.session
381 changesname = upload.changes.changesname
382 queuedir = upload.policy_queue.path
383 rejectdir = cnf['Dir::Reject']
385 ### Copy files to reject/
387 poolfiles = [b.poolfile for b in upload.binaries]
388 if upload.source is not None: 388 ↛ 391line 388 didn't jump to line 391, because the condition on line 388 was never false
389 poolfiles.extend([df.poolfile for df in upload.source.srcfiles])
390 # Not beautiful...
391 files = [af.path for af in session.query(ArchiveFile)
392 .filter_by(archive=upload.policy_queue.suite.archive)
393 .join(ArchiveFile.file)
394 .filter(PoolFile.file_id.in_([f.file_id for f in poolfiles]))]
395 for byhand in upload.byhand: 395 ↛ 396line 395 didn't jump to line 396, because the loop on line 395 never started
396 path = os.path.join(queuedir, byhand.filename)
397 if os.path.exists(path):
398 files.append(path)
399 chg = daklib.upload.Changes(queuedir, changesname, keyrings=[], require_signature=False)
400 for f in chg.buildinfo_files:
401 path = os.path.join(queuedir, f.filename)
402 if os.path.exists(path): 402 ↛ 400line 402 didn't jump to line 400, because the condition on line 402 was never false
403 files.append(path)
404 files.append(os.path.join(queuedir, changesname))
406 for fn in files:
407 dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn)))
408 fs.copy(fn, dst, link=True)
410 ### Write reason
412 dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(changesname)))
413 fh = fs.create(dst)
414 fh.write(comments)
415 fh.close()
417 ### Send mail notification
419 if notify: 419 ↛ 433line 419 didn't jump to line 433, because the condition on line 419 was never false
420 rejected_by = None
421 reason = comments
423 # Try to use From: from comment file if there is one.
424 # This is not very elegant...
425 match = re.match(r"\AFrom: ([^\n]+)\n\n", comments)
426 if match: 426 ↛ 430line 426 didn't jump to line 430, because the condition on line 426 was never false
427 rejected_by = match.group(1)
428 reason = '\n'.join(comments.splitlines()[2:])
430 pu = get_processed_upload(upload)
431 daklib.announce.announce_reject(pu, reason, rejected_by)
433 print(" REJECT")
434 if not Options["No-Action"]: 434 ↛ 437line 434 didn't jump to line 437, because the condition on line 434 was never false
435 Logger.log(["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname])
437 changes = upload.changes
438 remove_upload(upload, transaction)
439 session.delete(changes)
441################################################################################
444def remove_upload(upload: PolicyQueueUpload, transaction: ArchiveTransaction) -> None:
445 fs = transaction.fs
446 session = transaction.session
447 changes = upload.changes
449 # Remove byhand and changes files. Binary and source packages will be
450 # removed from {bin,src}_associations and eventually removed by clean-suites automatically.
451 queuedir = upload.policy_queue.path
452 for byhand in upload.byhand: 452 ↛ 453line 452 didn't jump to line 453, because the loop on line 452 never started
453 path = os.path.join(queuedir, byhand.filename)
454 if os.path.exists(path):
455 fs.unlink(path)
456 session.delete(byhand)
458 chg = daklib.upload.Changes(queuedir, upload.changes.changesname, keyrings=[], require_signature=False)
459 queue_files = [upload.changes.changesname]
460 queue_files.extend(f.filename for f in chg.buildinfo_files)
461 for fn in queue_files:
462 # We check for `path` to exist as old uploads in policy queues
463 # might still miss the `.buildinfo` files.
464 path = os.path.join(queuedir, fn)
465 if os.path.exists(path): 465 ↛ 461line 465 didn't jump to line 461, because the condition on line 465 was never false
466 fs.unlink(path)
468 session.delete(upload)
469 session.flush()
471################################################################################
474def get_processed_upload(upload: PolicyQueueUpload) -> daklib.announce.ProcessedUpload:
475 pu = daklib.announce.ProcessedUpload()
477 pu.maintainer = upload.changes.maintainer
478 pu.changed_by = upload.changes.changedby
479 pu.fingerprint = upload.changes.fingerprint
481 pu.suites = [upload.target_suite]
482 pu.from_policy_suites = [upload.target_suite]
484 changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname)
485 with open(changes_path, 'r') as fd:
486 pu.changes = fd.read()
487 pu.changes_filename = upload.changes.changesname
488 pu.sourceful = upload.source is not None
489 pu.source = upload.changes.source
490 pu.version = upload.changes.version
491 pu.architecture = upload.changes.architecture
492 pu.bugs = upload.changes.closes
494 pu.program = "process-policy"
496 return pu
498################################################################################
501def remove_unreferenced_binaries(policy_queue: PolicyQueue, transaction: ArchiveTransaction) -> None:
502 """Remove binaries that are no longer referenced by an upload"""
503 session = transaction.session
504 suite = policy_queue.suite
506 query = sql.text("""
507 SELECT b.*
508 FROM binaries b
509 JOIN bin_associations ba ON b.id = ba.bin
510 WHERE ba.suite = :suite_id
511 AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm
512 JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id
513 WHERE pqu.policy_queue_id = :policy_queue_id
514 AND pqubm.binary_id = b.id)""")
515 binaries = session.query(DBBinary).from_statement(query) \
516 .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
518 for binary in binaries:
519 Logger.log(["removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version])
520 transaction.remove_binary(binary, suite)
523def remove_unreferenced_sources(policy_queue: PolicyQueue, transaction: ArchiveTransaction) -> None:
524 """Remove sources that are no longer referenced by an upload or a binary"""
525 session = transaction.session
526 suite = policy_queue.suite
528 query = sql.text("""
529 SELECT s.*
530 FROM source s
531 JOIN src_associations sa ON s.id = sa.source
532 WHERE sa.suite = :suite_id
533 AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu
534 WHERE pqu.policy_queue_id = :policy_queue_id
535 AND pqu.source_id = s.id)
536 AND NOT EXISTS (SELECT 1 FROM binaries b
537 JOIN bin_associations ba ON b.id = ba.bin
538 WHERE b.source = s.id
539 AND ba.suite = :suite_id)""")
540 sources = session.query(DBSource).from_statement(query) \
541 .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
543 for source in sources:
544 Logger.log(["removed source from policy queue", policy_queue.queue_name, source.source, source.version])
545 transaction.remove_source(source, suite)
547################################################################################
550def usage(status=0) -> NoReturn:
551 print("""Usage: dak process-policy QUEUE""")
552 sys.exit(status)
554################################################################################
557def main():
558 global Options, Logger
560 cnf = Config()
561 session = DBConn().session()
563 Arguments = [('h', "help", "Process-Policy::Options::Help"),
564 ('n', "no-action", "Process-Policy::Options::No-Action")]
566 for i in ["help", "no-action"]:
567 key = "Process-Policy::Options::%s" % i
568 if key not in cnf: 568 ↛ 566line 568 didn't jump to line 566, because the condition on line 568 was never false
569 cnf[key] = ""
571 queue_name = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv)
573 Options = cnf.subtree("Process-Policy::Options")
574 if Options["Help"]:
575 usage()
577 if len(queue_name) != 1: 577 ↛ 578line 577 didn't jump to line 578, because the condition on line 577 was never true
578 print("E: Specify exactly one policy queue")
579 sys.exit(1)
581 queue_name = queue_name[0]
583 Logger = daklog.Logger("process-policy")
584 if not Options["No-Action"]: 584 ↛ 587line 584 didn't jump to line 587, because the condition on line 584 was never false
585 urgencylog = UrgencyLog()
587 with ArchiveTransaction() as transaction:
588 session = transaction.session
589 try:
590 pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one()
591 except NoResultFound:
592 print("E: Cannot find policy queue %s" % queue_name)
593 sys.exit(1)
595 commentsdir = os.path.join(pq.path, 'COMMENTS')
596 # The comments stuff relies on being in the right directory
597 os.chdir(pq.path)
599 do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction)
600 do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction)
601 do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction)
603 remove_unreferenced_binaries(pq, transaction)
604 remove_unreferenced_sources(pq, transaction)
606 if not Options['No-Action']: 606 ↛ exitline 606 didn't return from function 'main', because the condition on line 606 was never false
607 urgencylog.close()
609################################################################################
612if __name__ == '__main__':
613 main()