Coverage for dak/import_users_from_passwd.py: 27%

67 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1#! /usr/bin/env python3 

2 

3"""Sync PostgreSQL users with system users""" 

4# Copyright (C) 2001, 2002, 2006 James Troup <james@nocrew.org> 

5 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation; either version 2 of the License, or 

9# (at your option) any later version. 

10 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15 

16# You should have received a copy of the GNU General Public License 

17# along with this program; if not, write to the Free Software 

18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

19 

20################################################################################ 

21 

22import grp 

23import pwd 

24import re 

25import sys 

26 

27import apt_pkg 

28from sqlalchemy import sql 

29 

30from daklib import utils 

31from daklib.config import Config 

32from daklib.dbconn import DBConn 

33 

34################################################################################ 

35 

36 

37def usage(exit_code=0): 

38 print( 

39 """Usage: dak import-users-from-passwd [OPTION]... 

40Sync PostgreSQL's users with system users. 

41 

42 -h, --help show this help and exit 

43 -n, --no-action don't do anything 

44 -q, --quiet be quiet about what is being done 

45 -v, --verbose explain what is being done""" 

46 ) 

47 sys.exit(exit_code) 

48 

49 

50################################################################################ 

51 

52 

53def main(): 

54 cnf = Config() 

55 

56 Arguments = [ 

57 ("n", "no-action", "Import-Users-From-Passwd::Options::No-Action"), 

58 ("q", "quiet", "Import-Users-From-Passwd::Options::Quiet"), 

59 ("v", "verbose", "Import-Users-From-Passwd::Options::Verbose"), 

60 ("h", "help", "Import-Users-From-Passwd::Options::Help"), 

61 ] 

62 for i in ["no-action", "quiet", "verbose", "help"]: 

63 key = "Import-Users-From-Passwd::Options::%s" % i 

64 if key not in cnf: 64 ↛ 62line 64 didn't jump to line 62 because the condition on line 64 was always true

65 cnf[key] = "" 

66 

67 arguments = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv) # type: ignore[attr-defined] 

68 Options = cnf.subtree("Import-Users-From-Passwd::Options") 

69 

70 if Options["Help"]: 70 ↛ 72line 70 didn't jump to line 72 because the condition on line 70 was always true

71 usage() 

72 elif arguments: 

73 utils.warn("dak import-users-from-passwd takes no non-option arguments.") 

74 usage(1) 

75 

76 session = DBConn().session() 

77 valid_gid = cnf.get("Import-Users-From-Passwd::ValidGID", "") 

78 if valid_gid: 

79 debiangrp = grp.getgrnam(valid_gid).gr_mem 

80 else: 

81 debiangrp = [] 

82 

83 passwd_unames = {} 

84 for entry in pwd.getpwall(): 

85 uname = entry[0] 

86 if uname not in debiangrp: 

87 if Options["Verbose"]: 

88 print("Skipping %s (Not in group %s)." % (uname, valid_gid)) 

89 continue 

90 passwd_unames[uname] = "" 

91 

92 postgres_unames = {} 

93 q = session.execute(sql.text("SELECT usename FROM pg_user")) 

94 for j in q.fetchall(): 

95 uname = j[0] 

96 postgres_unames[uname] = "" 

97 

98 known_postgres_unames = {} 

99 for i in cnf.get("Import-Users-From-Passwd::KnownPostgres", "").split(","): 

100 uname = i.strip() 

101 known_postgres_unames[uname] = "" 

102 

103 for uname in sorted(postgres_unames): 

104 if uname not in passwd_unames and uname not in known_postgres_unames: 

105 print( 

106 "I: Deleting %s from Postgres, no longer in passwd or list of known Postgres users" 

107 % (uname) 

108 ) 

109 q = session.execute(sql.text('DROP USER "%s"' % (uname))) 

110 

111 safe_name = re.compile("^[A-Za-z0-9]+$") 

112 for uname in sorted(passwd_unames): 

113 if uname not in postgres_unames: 

114 if not Options["Quiet"]: 

115 print("Creating %s user in Postgres." % (uname)) 

116 if not Options["No-Action"]: 

117 if safe_name.match(uname): 

118 # NB: I never figured out how to use a bind parameter for this query 

119 # XXX: Fix this as it looks like a potential SQL injection attack to me 

120 # (hence the safe_name match we do) 

121 try: 

122 q = session.execute(sql.text('CREATE USER "%s"' % (uname))) 

123 session.commit() 

124 except Exception as e: 

125 utils.warn("Could not create user %s (%s)" % (uname, str(e))) 

126 session.rollback() 

127 else: 

128 print("NOT CREATING USER %s. Doesn't match safety regex" % uname) 

129 

130 session.commit() 

131 

132 

133####################################################################################### 

134 

135 

136if __name__ == "__main__": 

137 main()