Coverage for dak/init_dirs.py: 78%
90 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-08-26 22:11 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-08-26 22:11 +0000
1#! /usr/bin/env python3
3"""Initial setup of an archive."""
4# Copyright (C) 2002, 2004, 2006 James Troup <james@nocrew.org>
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20################################################################################
22import os
23import sys
25import apt_pkg
27from daklib import utils
28from daklib.dbconn import Component, DBConn, Keyring, PolicyQueue, Suite
30################################################################################
32Cnf = None
34################################################################################
37def usage(exit_code=0):
38 """Print a usage message and exit with 'exit_code'."""
40 print(
41 """Usage: dak init-dirs
42Creates directories for an archive based on dak.conf configuration file.
44 -h, --help show this help and exit."""
45 )
46 sys.exit(exit_code)
49################################################################################
52def do_dir(target, config_name):
53 """If 'target' exists, make sure it is a directory. If it doesn't, create
54 it."""
56 if os.path.exists(target):
57 if not os.path.isdir(target): 57 ↛ 58line 57 didn't jump to line 58, because the condition on line 57 was never true
58 utils.fubar("%s (%s) is not a directory." % (target, config_name))
59 else:
60 print("Creating {} ...".format(target))
61 os.makedirs(target)
64def process_file(config, config_name):
65 """Create directories for a config entry that's a filename."""
67 if config_name in config:
68 target = os.path.dirname(config[config_name])
69 do_dir(target, config_name)
72def process_tree(config, tree):
73 """Create directories for a config tree."""
75 for entry in config.subtree(tree).list():
76 entry = entry.lower()
77 config_name = "%s::%s" % (tree, entry)
78 target = config[config_name]
79 do_dir(target, config_name)
82def process_morguesubdir(subdir):
83 """Create directories for morgue sub directories."""
85 config_name = "%s::MorgueSubDir" % (subdir)
86 if config_name in Cnf: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true
87 target = os.path.join(Cnf["Dir::Morgue"], Cnf[config_name])
88 do_dir(target, config_name)
91def process_keyring(fullpath, secret=False):
92 """Create empty keyring if necessary."""
94 if os.path.exists(fullpath): 94 ↛ 95line 94 didn't jump to line 95, because the condition on line 94 was never true
95 return
97 keydir = os.path.dirname(fullpath)
99 if not os.path.isdir(keydir): 99 ↛ 100line 99 didn't jump to line 100, because the condition on line 99 was never true
100 print("Creating {} ...".format(keydir))
101 os.makedirs(keydir)
102 if secret:
103 # Make sure secret keyring directories are 0700
104 os.chmod(keydir, 0o700)
106 # Touch the file
107 print("Creating {} ...".format(fullpath))
108 with open(fullpath, "w") as fh:
109 if secret: 109 ↛ 110line 109 didn't jump to line 110, because the condition on line 109 was never true
110 os.fchmod(fh.fileno(), 0o600)
111 else:
112 os.fchmod(fh.fileno(), 0o644)
115######################################################################
118def create_directories():
119 """Create directories referenced in dak.conf."""
121 session = DBConn().session()
123 # Process directories from dak.conf
124 process_tree(Cnf, "Dir")
126 # Hardcode creation of the unchecked directory
127 if "Dir::Base" in Cnf: 127 ↛ 133line 127 didn't jump to line 133, because the condition on line 127 was never false
128 do_dir(
129 os.path.join(Cnf["Dir::Base"], "queue", "unchecked"), "unchecked directory"
130 )
132 # Process queue directories
133 for queue in session.query(PolicyQueue):
134 do_dir(queue.path, "%s queue" % queue.queue_name)
135 # If we're doing the NEW queue, make sure it has a COMMENTS directory
136 if queue.queue_name == "new":
137 do_dir(
138 os.path.join(queue.path, "COMMENTS"),
139 "%s queue comments" % queue.queue_name,
140 )
142 for config_name in ["Rm::LogFile", "Import-Archive::ExportDir"]:
143 process_file(Cnf, config_name)
145 for subdir in ["Clean-Queues", "Clean-Suites"]:
146 process_morguesubdir(subdir)
148 suite_suffix = "%s" % (Cnf.find("Dinstall::SuiteSuffix"))
150 if "Dinstall::SigningPubKeyring" in Cnf: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true
151 process_keyring(Cnf["Dinstall::SigningPubKeyring"], secret=True)
153 # Process public keyrings
154 for keyring in session.query(Keyring).filter_by(active=True):
155 process_keyring(keyring.keyring_name)
157 # Process dists directories
158 # TODO: Store location of each suite in database
159 for suite in session.query(Suite):
160 suite_dir = os.path.join(
161 suite.archive.path, "dists", suite.suite_name, suite_suffix
162 )
164 # TODO: Store valid suite/component mappings in database
165 for component in session.query(Component):
166 component_name = component.component_name
168 sc_dir = os.path.join(suite_dir, component_name)
170 do_dir(sc_dir, "%s/%s" % (suite.suite_name, component_name))
172 for arch in suite.architectures: 172 ↛ 173line 172 didn't jump to line 173, because the loop on line 172 never started
173 if arch.arch_string == "source":
174 arch_string = "source"
175 else:
176 arch_string = "binary-%s" % arch.arch_string
178 suite_arch_dir = os.path.join(sc_dir, arch_string)
180 do_dir(
181 suite_arch_dir,
182 "%s/%s/%s" % (suite.suite_name, component_name, arch_string),
183 )
186################################################################################
189def main():
190 """Initial setup of an archive."""
192 global Cnf
194 Cnf = utils.get_conf()
195 arguments = [("h", "help", "Init-Dirs::Options::Help")]
196 for i in ["help"]:
197 key = "Init-Dirs::Options::%s" % i
198 if key not in Cnf: 198 ↛ 196line 198 didn't jump to line 196, because the condition on line 198 was never false
199 Cnf[key] = ""
201 arguments = apt_pkg.parse_commandline(Cnf, arguments, sys.argv)
203 options = Cnf.subtree("Init-Dirs::Options")
204 if options["Help"]:
205 usage()
206 elif arguments: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 utils.warn("dak init-dirs takes no arguments.")
208 usage(exit_code=1)
210 create_directories()
213################################################################################
216if __name__ == "__main__":
217 main()