Coverage for dak/import_users_from_passwd.py: 27%
67 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1#! /usr/bin/env python3
3"""Sync PostgreSQL users with system users"""
4# Copyright (C) 2001, 2002, 2006 James Troup <james@nocrew.org>
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20################################################################################
22import grp
23import pwd
24import re
25import sys
27import apt_pkg
28from sqlalchemy import sql
30from daklib import utils
31from daklib.config import Config
32from daklib.dbconn import DBConn
34################################################################################
37def usage(exit_code=0):
38 print(
39 """Usage: dak import-users-from-passwd [OPTION]...
40Sync PostgreSQL's users with system users.
42 -h, --help show this help and exit
43 -n, --no-action don't do anything
44 -q, --quiet be quiet about what is being done
45 -v, --verbose explain what is being done"""
46 )
47 sys.exit(exit_code)
50################################################################################
53def main():
54 cnf = Config()
56 Arguments = [
57 ("n", "no-action", "Import-Users-From-Passwd::Options::No-Action"),
58 ("q", "quiet", "Import-Users-From-Passwd::Options::Quiet"),
59 ("v", "verbose", "Import-Users-From-Passwd::Options::Verbose"),
60 ("h", "help", "Import-Users-From-Passwd::Options::Help"),
61 ]
62 for i in ["no-action", "quiet", "verbose", "help"]:
63 key = "Import-Users-From-Passwd::Options::%s" % i
64 if key not in cnf: 64 ↛ 62line 64 didn't jump to line 62 because the condition on line 64 was always true
65 cnf[key] = ""
67 arguments = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv) # type: ignore[attr-defined]
68 Options = cnf.subtree("Import-Users-From-Passwd::Options")
70 if Options["Help"]: 70 ↛ 72line 70 didn't jump to line 72 because the condition on line 70 was always true
71 usage()
72 elif arguments:
73 utils.warn("dak import-users-from-passwd takes no non-option arguments.")
74 usage(1)
76 session = DBConn().session()
77 valid_gid = cnf.get("Import-Users-From-Passwd::ValidGID", "")
78 if valid_gid:
79 debiangrp = grp.getgrnam(valid_gid).gr_mem
80 else:
81 debiangrp = []
83 passwd_unames = {}
84 for entry in pwd.getpwall():
85 uname = entry[0]
86 if uname not in debiangrp:
87 if Options["Verbose"]:
88 print("Skipping %s (Not in group %s)." % (uname, valid_gid))
89 continue
90 passwd_unames[uname] = ""
92 postgres_unames = {}
93 q = session.execute(sql.text("SELECT usename FROM pg_user"))
94 for j in q.fetchall():
95 uname = j[0]
96 postgres_unames[uname] = ""
98 known_postgres_unames = {}
99 for i in cnf.get("Import-Users-From-Passwd::KnownPostgres", "").split(","):
100 uname = i.strip()
101 known_postgres_unames[uname] = ""
103 for uname in sorted(postgres_unames):
104 if uname not in passwd_unames and uname not in known_postgres_unames:
105 print(
106 "I: Deleting %s from Postgres, no longer in passwd or list of known Postgres users"
107 % (uname)
108 )
109 q = session.execute(sql.text('DROP USER "%s"' % (uname)))
111 safe_name = re.compile("^[A-Za-z0-9]+$")
112 for uname in sorted(passwd_unames):
113 if uname not in postgres_unames:
114 if not Options["Quiet"]:
115 print("Creating %s user in Postgres." % (uname))
116 if not Options["No-Action"]:
117 if safe_name.match(uname):
118 # NB: I never figured out how to use a bind parameter for this query
119 # XXX: Fix this as it looks like a potential SQL injection attack to me
120 # (hence the safe_name match we do)
121 try:
122 q = session.execute(sql.text('CREATE USER "%s"' % (uname)))
123 session.commit()
124 except Exception as e:
125 utils.warn("Could not create user %s (%s)" % (uname, str(e)))
126 session.rollback()
127 else:
128 print("NOT CREATING USER %s. Doesn't match safety regex" % uname)
130 session.commit()
133#######################################################################################
136if __name__ == "__main__":
137 main()