1"""List packages according to various criteria 

2 

3@copyright: 2014, Ansgar Burchardt <ansgar@debian.org> 

4@license: GPL-2+ 

5""" 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License 

18# along with this program; if not, write to the Free Software 

19# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

20 

21import sqlalchemy.sql as sql 

22import sqlalchemy.dialects.postgresql as pgsql 

23 

24from daklib.dbconn import DBConn 

25from collections import defaultdict 

26 

27 

28def list_packages(packages, suites=None, components=None, architectures=None, binary_types=None, 

29 source_and_binary=False, regex=False, 

30 format=None, highest=None): 

31 session = DBConn().session() 

32 try: 

33 t = DBConn().view_package_list 

34 

35 comparison_operator = "~" if regex else "=" 

36 

37 where = sql.false() 

38 for package in packages: 

39 where = where | t.c.package.op(comparison_operator)(package) 

40 if source_and_binary: 

41 where = where | t.c.source.op(comparison_operator)(package) 

42 

43 if suites is not None: 

44 where = where & (t.c.suite.in_(suites) | t.c.codename.in_(suites)) 

45 if components is not None: 45 ↛ 46line 45 didn't jump to line 46, because the condition on line 45 was never true

46 where = where & t.c.component.in_(components) 

47 if architectures is not None: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true

48 where = where & t.c.architecture.in_(architectures) 

49 if binary_types is not None: 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true

50 where = where & t.c.type.in_(binary_types) 

51 

52 if format is None: 52 ↛ 72line 52 didn't jump to line 72, because the condition on line 52 was never false

53 c_architectures = sql.func.string_agg(t.c.architecture, pgsql.aggregate_order_by(', ', t.c.architecture_is_source.desc(), t.c.architecture)) 

54 query = sql.select([t.c.package, t.c.version, t.c.display_suite, c_architectures]) \ 

55 .where(where) \ 

56 .group_by(t.c.package, t.c.version, t.c.display_suite) \ 

57 .order_by(t.c.package, t.c.version, t.c.display_suite) 

58 result = session.execute(query).fetchall() 

59 

60 if len(result) == 0: 

61 return 

62 

63 lengths = { 

64 'package': max(10, max(len(row[t.c.package]) for row in result)), 

65 'version': max(13, max(len(row[t.c.version]) for row in result)), 

66 'suite': max(10, max(len(row[t.c.display_suite]) for row in result)) 

67 } 

68 format = "{0:{lengths[package]}} | {1:{lengths[version]}} | {2:{lengths[suite]}} | {3}" 

69 

70 for row in result: 

71 yield format.format(row[t.c.package], row[t.c.version], row[t.c.display_suite], row[c_architectures], lengths=lengths) 

72 elif format in ('control-suite', 'heidi'): 

73 query = sql.select([t.c.package, t.c.version, t.c.architecture]).where(where) 

74 result = session.execute(query) 

75 for row in result: 

76 yield "{0} {1} {2}".format(row[t.c.package], row[t.c.version], row[t.c.architecture]) 

77 elif format == "python": 

78 c_architectures = sql.func.string_agg(t.c.architecture, pgsql.aggregate_order_by(',', t.c.architecture_is_source.desc(), t.c.architecture)) 

79 query = sql.select([t.c.package, 

80 t.c.version, 

81 t.c.display_suite, 

82 c_architectures, 

83 t.c.source, 

84 t.c.source_version, 

85 t.c.component]) \ 

86 .where(where) \ 

87 .group_by(t.c.package, 

88 t.c.version, 

89 t.c.display_suite, 

90 t.c.source, 

91 t.c.component, 

92 t.c.source_version) 

93 result = session.execute(query).fetchall() 

94 

95 if len(result) == 0: 

96 return 

97 

98 def val(): 

99 return defaultdict(val) 

100 ret = val() 

101 for row in result: 

102 ret[row[t.c.package]][row[t.c.display_suite]][row[t.c.version]] = {'component': row[t.c.component], 

103 'architectures': row[c_architectures].split(','), 

104 'source': row[t.c.source], 

105 'source_version': row[t.c.source_version] 

106 } 

107 

108 yield ret 

109 return 

110 else: 

111 raise ValueError("Unknown output format requested.") 

112 

113 if highest is not None: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true

114 query = sql.select([t.c.package, sql.func.max(t.c.version)]).where(where) \ 

115 .group_by(t.c.package).order_by(t.c.package) 

116 result = session.execute(query) 

117 yield "" 

118 for row in result: 

119 yield "{0} ({1} {2})".format(row[0], highest, row[1]) 

120 finally: 

121 session.close() 121 ↛ 96,   121 ↛ 1092 missed branches: 1) line 121 didn't jump to line 96, because the return on line 96 wasn't executed, 2) line 121 didn't jump to line 109, because the return on line 109 wasn't executed