-
Notifications
You must be signed in to change notification settings - Fork 0
/
managevmail.py
executable file
·500 lines (425 loc) · 20.3 KB
/
managevmail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
#!/usr/bin/env python3
__author__ = "Michael Thies <mail@mhthies.de>"
__license__ = "MIT"
import argparse
import configparser
import getpass
import shutil
import subprocess
import sys
import re
import os.path
import pymysql.cursors
# #####################################################################
# Helper functions
# #####################################################################
def query_user(prompt, var_type, default=None, hide=False):
"""
Query user for a certain value and let them try again, if they fail entering a valid value.
A default value can be given, that is returned when the user enters an empty string. If no default value is given,
an empty input may still be valid if the given type allows construction from empty string. The default value – if
given – is automatically appended to the prompt (in brackets).
Booleans must be entered with 'y' or 'n' by the user.
:param prompt: The prompt string to be presented to the user
:type prompt: str
:param var_type: The type to convert the value to. If it fails with ValueError the user is queried again.
:type var_type: type
:param default: The default value. Must be of the var_type or None.
:param hide: If True, getpass() is used for the query to hide the input in the terminal (for password inputs etc.)
:type hide: bool
:return: The entered value, converted to the given var_type
:rtype: var_type
"""
if var_type is bool:
prompt += " [{}/{}]".format('Y' if default else 'y', 'n' if default or default is None else 'N')
elif var_type is str:
if default:
prompt += " [{}]".format(default)
else:
prompt += " [{}]".format(default)
while True:
result = getpass.getpass(prompt + " ") if hide else input(prompt + " ")
if not result and default is not None:
return default
if var_type is bool:
if result.lower() in "yn":
return result.lower() == 'y'
else:
print("Invalid input. Must be 'y' or 'n'.")
else:
try:
return var_type(result)
except ValueError:
print("Not a valid {}. Please try again.".format(var_type.__name__))
def query_database(db, query, data=()):
"""
Helper function to query MySQL database. This basically wraps cursor.execute(query, data), but takes care of
creating and closing the cursor and fetching the data.
:param db: The MySQL Connector object to use for the query
:param query: The SQL query
:param data: The data to be filled into the query.
:return: The query result as a list of namedtuples or None if the query didn't produce rows
:rtype: [collections.namedtuple] or None
"""
with db.cursor() as cursor:
cursor.execute(query, data)
return cursor.fetchall()
def hash_pw(password):
"""
Hash the given plain password with Dovecot's SHA512-CRYPT hashing function. The result can be stored to the accounts
Database to check
:param password: The plain password
:type password: str
:return: The password hash
:rtype: str
"""
result = subprocess.run(['doveadm', 'pw', '-s', 'SHA512-CRYPT'], stdout=subprocess.PIPE,
input="{0}\n{0}\n".format(password), universal_newlines=True)
result.check_returncode()
return result.stdout.strip()
def check_quota_usage(account_name):
"""
Use doveadm to get the current quota usage of the account with given name.
:param account_name: The name (email address) of the account to check
:type account_name: str
:return: The quota usage in MiB or None if it could not be found
:rtype: float or None
"""
result = subprocess.run(['doveadm', '-f', 'tab', 'quota', 'get', '-u', account_name], stdout=subprocess.PIPE,
universal_newlines=True)
if result.returncode == 67:
# Account does not exist (or similar error)
return None
if result.returncode == 75:
# Permission error
return None
result.check_returncode()
value = int(result.stdout.split('\n')[1].split('\t')[2]) / 1024
return value
def delete_mailbox(domain, user):
"""
Delete the dovecot mailbox located at /var/vmail/mailboxes/<domain>/<user>/.
:param domain: The domain
:param user: The user
"""
mailbox = os.path.join('/var/vmail/mailboxes', domain, user)
if os.path.exists(mailbox):
shutil.rmtree(mailbox)
sieves = os.path.join('/var/vmail/sieve', domain, user)
if os.path.exists(sieves):
shutil.rmtree(sieves)
# #####################################################################
# User dialog functions for different commands
# #####################################################################
def list_accounts(db, _):
result = query_database(db,
"SELECT `username`, `domain`, NULL AS `target_username`, NULL AS `target_domain`,"
"`enabled`, `sendonly` "
"FROM `accounts` "
"UNION SELECT `source_username`, `source_domain`, `destination_username`,"
"`destination_domain`, `enabled`, NULL "
"FROM `aliases`"
"ORDER BY `domain`, `username`")
for account in result:
print("{}{:>15}@{}{}{}".format("[dis] " if not account['enabled'] else " ",
account['username'], account['domain'],
"\t→ {}@{}".format(account['target_username'], account['target_domain'])
if account['target_username'] else "",
"\t[sendonly]" if account['sendonly'] else ""))
return 0
def show_account(db, account_name):
user, domain = account_name.split('@')
# First, show alias
alias_result = query_database(db, "SELECT `destination_username`, `destination_domain`, `enabled` "
"FROM `aliases` "
"WHERE `source_username` = %s AND `source_domain` = %s",
(user, domain))
if alias_result:
current_alias = alias_result[0]
print("<{}> is an alias:\n"
"Destination: <{}@{}>\n"
"Enabled: {}".format(account_name, current_alias['destination_username'],
current_alias['destination_domain'],
"Yes" if current_alias['enabled'] else "No"))
# Now, show account
account_result = query_database(db, "SELECT `enabled`, `quota`, `sendonly` "
"FROM `accounts` "
"WHERE `username` = %s AND `domain` = %s",
(user, domain))
if account_result:
current_account = account_result[0]
quota_used = check_quota_usage(account_name)
print("{}<{}> is {}an account:\n"
"Enabled: {}\n"
"Sendonly: {}\n"
"Quota: {} MiB\n"
"Quota used: {}".format("\n" if alias_result else "",
account_name,
"also " if alias_result else "",
"Yes" if current_account['enabled'] else "No",
"Yes" if current_account['sendonly'] else "No",
current_account['quota'],
"{:.1f} MiB ({:.1f} %)".format(
quota_used, quota_used/current_account['quota']*100)
if quota_used is not None else "N/A"))
if not alias_result and not account_result:
print("<{}> is neither an account nor an alias.".format(account_name))
def add_account(db, account_name):
# Check if name is already an account or alias
user, domain = account_name.split('@')
result = query_database(db, "SELECT COUNT(*) AS c FROM `accounts` WHERE `username` = %s AND `domain` = %s",
(user, domain))
if result[0]['c'] > 0:
print("The account {} exists already.".format(account_name))
return 2
result = query_database(db, "SELECT `destination_username`, `destination_domain` "
"FROM `aliases` "
"WHERE `source_username` = %s AND `source_domain` = %s",
(user, domain))
if result:
current_alias = result[0]
print("Warning: This address is currently an alias of {}@{}.".format(current_alias['destination_username'],
current_alias['destination_domain']))
if not query_user("Do you still want to create an account at the address?", bool, False):
return 0
# Check if domain exists
result = query_database(db, "SELECT COUNT(*) AS c FROM `domains` WHERE `domain` = %s",
(domain,))
if result[0]['c'] != 1:
print("The domain {} is not registered as virtual mail domain yet. Please add it manually to the database"
.format(domain))
return 2
# Ask user for information
pass1 = query_user("New account's password:", str, hide=True)
if not pass1:
print("Password must not be empty.")
return 64
pass2 = query_user("Type password again:", str, hide=True)
if pass1 != pass2:
print("Passwords do not match.")
return 64
enabled = query_user("Enable Account?", bool, True)
send_only = query_user("Create send-only account?", bool, False)
if not send_only:
quota = query_user("Storage quota in MB:", int, 128)
else:
quota = 0
# Hash password and create account
pass_hash = hash_pw(pass1)
query_database(db, "INSERT INTO `accounts` (`username`, `domain`, `password`, `quota`, `enabled`, `sendonly`) "
"VALUES(%s,%s,%s,%s,%s,%s)",
(user, domain, pass_hash, quota, enabled, send_only))
db.commit()
print("Account has been created.")
return 0
def change_account(db, account_name):
# Get current settings and exit if accounts doesn't exist
user, domain = account_name.split('@')
result = query_database(db, "SELECT `id`, `username`, `domain`, `enabled`, `quota`, `sendonly` "
"FROM `accounts` "
"WHERE `username` = %s AND `domain` = %s",
(user, domain))
if not result:
print("This account does not exist yet.")
return 2
current_account = result[0]
# Query user for new values
enabled = query_user("Account enabled?", bool, bool(current_account['enabled']))
send_only = query_user("Send-only account?", bool, bool(current_account['sendonly']))
if not send_only:
quota = query_user("Quota in MB:", int, current_account['quota'])
else:
quota = 0
# Store new values
query_database(db, "UPDATE `accounts` SET `enabled` = %s, `quota` = %s, `sendonly` = %s WHERE `id` = %s",
(enabled, quota, send_only, current_account['id']))
db.commit()
print("Stored new values.")
# Ask user, if mailbox shall be deleted
if send_only and not current_account['sendonly']:
if query_user("Do you want to delete the accounts's mailbox and sieve scripts?", bool, False):
delete_mailbox(domain, user)
print("Account's mailbox has been deleted.")
return 0
def change_password(db, account_name):
# Get id and exit if accounts doesn't exist
user, domain = account_name.split('@')
result = query_database(db, "SELECT `id` FROM `accounts` WHERE `username` = %s AND `domain` = %s",
(user, domain))
if not result:
print("This account does not exist.")
return 2
current_account = result[0]
# Query user for new password
pass1 = query_user("New password:", str, hide=True)
if not pass1:
print("Password must not be empty.")
return 64
pass2 = query_user("Type password again:", str, hide=True)
if pass1 != pass2:
print("Passwords do not match.")
return 64
# Hash password and create account
pass_hash = hash_pw(pass1)
# Hash password and store new hash
query_database(db, "UPDATE `accounts` SET `password` = %s WHERE `id` = %s",
(pass_hash, current_account['id']))
db.commit()
print("Stored new password.")
def delete_account(db, account_name):
# Get id and exit if accounts doesn't exist
user, domain = account_name.split('@')
result = query_database(db, "SELECT `id` FROM `accounts` WHERE `username` = %s AND `domain` = %s",
(user, domain))
if not result:
print("This account does not exist.")
return 2
current_account = result[0]
if not query_user("Do you really want to delete the account {}?".format(account_name), bool, False):
return 0
# Delete database entry
query_database(db, "DELETE FROM `accounts` WHERE `id` = %s", (current_account['id'],))
print("Account has been deleted.")
db.commit()
# Ask user, if mailbox shall be deleted
if query_user("Do you want to delete the user's mailbox and sieve scripts?", bool, False):
delete_mailbox(domain, user)
print("Account's mailbox has been deleted.")
return 0
def add_alias(db, alias_name):
# Check if name is already an account or alias
user, domain = alias_name.split('@')
result = query_database(db, "SELECT `destination_username`, `destination_domain` "
"FROM `aliases` "
"WHERE `source_username` = %s AND `source_domain` = %s",
(user, domain))
if result:
current_alias = result[0]
print("This address is already an alias of {}@{}.".format(current_alias['destination_username'],
current_alias['destination_domain']))
return 2
result = query_database(db, "SELECT COUNT(*) AS c FROM `accounts` WHERE `username` = %s AND `domain` = %s",
(user, domain))
if result[0]['c'] > 0:
print("There is already an account for address {}.".format(alias_name))
if not query_user("Do you still want to create an alias at the address?", bool, False):
return 0
# Check if domain exists
result = query_database(db, "SELECT COUNT(*) AS c FROM `domains` WHERE `domain` = %s", (domain,))
if result[0]['c'] == 0:
print("The domain {} is not registered as virtual mail domain yet. Please add it manually to the database"
.format(domain))
return 2
# Ask user for information
while True:
target = query_user("Destination address:", str)
if re.match(r'^[^@]+@[^@?%:/&=]+.[^@?%:/&=]+$', args.address):
break
else:
print("'{}' is not a valid target email address. Please try again.".format(target))
target_user, target_domain = target.strip().split('@')
enabled = query_user("Enable Alias?", bool, True)
# Create new alias
query_database(db, "INSERT INTO `aliases` (`source_username`, `source_domain`, `destination_username`, "
"`destination_domain`, `enabled`) "
"VALUES(%s,%s,%s,%s,%s)",
(user, domain, target_user, target_domain, enabled))
db.commit()
print("Alias has been created.")
return 0
def change_alias(db, alias_name):
# Get current data
user, domain = alias_name.split('@')
result = query_database(db, "SELECT `id`, `destination_username`, `destination_domain`, `enabled` "
"FROM `aliases` "
"WHERE `source_username` = %s AND `source_domain` = %s",
(user, domain))
if not result:
print("{} is currently not registered as alias.".format(alias_name))
return 2
current_alias = result[0]
# Ask user for information
while True:
target = query_user("New destination address:", str, "{}@{}".format(current_alias['destination_username'],
current_alias['destination_domain']))
if re.match(r'^[^@]+@[^@?%:/&=]+.[^@?%:/&=]+$', args.address):
break
else:
print("'{}' is not a valid target email address. Please try again.".format(target))
target_user, target_domain = target.strip().split('@')
enabled = query_user("Enable Alias?", bool, current_alias['enabled'])
# Store new values
query_database(db, "UPDATE `aliases` SET `enabled` = %s, `destination_username` = %s, `destination_domain` = %s "
"WHERE `id` = %s",
(enabled, target_user, target_domain, current_alias['id']))
db.commit()
print("Stored new values.")
return 0
def delete_alias(db, alias_name):
# Get current data
user, domain = alias_name.split('@')
result = query_database(db, "SELECT `id`, `destination_username`, `destination_domain`, `enabled` "
"FROM `aliases` "
"WHERE `source_username` = %s AND `source_domain` = %s",
(user, domain))
if not result:
print("{} is currently not registered as alias.".format(alias_name))
return 2
current_alias = result[0]
# Ask user for confirmation
print("The alias is {} → {}@{}".format(alias_name, current_alias['destination_domain'],
current_alias['destination_username']))
if not query_user("Do you really want to delete it?", bool, False):
return 0
# Store new values
query_database(db, "DELETE FROM `aliases` WHERE `id` = %s", (current_alias['id'],))
db.commit()
print("Alias has been deleted.")
return 0
# #####################################################################
# Main script
# #####################################################################
# Cli argument parsing, config file parsing and database connection happens here
# Map cli commands to handler functions
COMMANDS = {
'list': list_accounts,
'show': show_account,
'add': add_account,
'change': change_account,
'pw': change_password,
'delete': delete_account,
'addalias': add_alias,
'changealias': change_alias,
'deletealias': delete_alias
}
# Commands that do not require a mail address
SIMPLE_COMMANDS = ['list']
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A python cli interface to make modifications to accounts in the vmail"
" MySQL database")
parser.add_argument('-c', '--config', type=str, default="/etc/managevmail/config.ini",
help="The path to the config.ini file, containing database options. Defaults to "
"/etc/managevmail/config.ini")
parser.add_argument('command', type=str, help="The main command. Must be one of {}"
.format(", ".join(COMMANDS.keys())))
parser.add_argument('address', type=str, help="The email address (account or alias) to be added/modified/deleted.",
default="", nargs='?')
args = parser.parse_args()
if args.command not in COMMANDS:
print("{} is not a valid command. Please use one of: {}".format(args.command, ", ".join(COMMANDS.keys())))
sys.exit(64)
if args.command not in SIMPLE_COMMANDS:
if not args.address:
print("Command {} requires an address argument.".format(args.command))
sys.exit(64)
if not re.match(r'^[^@]+@[^@?%:/&=]+.[^@?%:/&=]+$', args.address):
print("{} is not a valid email address.".format(args.address))
sys.exit(65)
config = configparser.ConfigParser()
with open(args.config) as config_file:
config.read_file(config_file)
cnx = pymysql.connect(**config['database'],
cursorclass=pymysql.cursors.DictCursor)
with cnx:
result = COMMANDS[args.command](cnx, args.address)
sys.exit(result)