-
Notifications
You must be signed in to change notification settings - Fork 0
/
mailchimp-backup.py
executable file
·182 lines (154 loc) · 5.26 KB
/
mailchimp-backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python
"""MailChimp list backup script."""
import argparse
import csv
from datetime import datetime
import io
import os
import sys
import requests
from mailchimp3 import MailChimp
def _client(key):
"""Return MailChimp API client object."""
headers = requests.utils.default_headers()
headers['User-Agent'] = (
'Mailchimp Backup script '
'(https://github.com/max-arnold/mailchimp-backup)'
)
return MailChimp(mc_api=key)
def _filename(out, lst):
"""Generate a filename."""
_filename.now = _filename.now or datetime.now()
attrs = {
'year': '{:04d}'.format(_filename.now.year),
'month': '{:02d}'.format(_filename.now.month),
'day': '{:02d}'.format(_filename.now.day),
'hour': '{:02d}'.format(_filename.now.hour),
'minute': '{:02d}'.format(_filename.now.minute),
'second': '{:02d}'.format(_filename.now.second),
'list': lst,
}
return os.path.abspath(out.format(**attrs))
_filename.now = None
def get_lists(key):
"""Return lists info."""
return _client(key).lists.all(get_all=True)
def show_lists(key):
"""Display lists info."""
lists = get_lists(key)
for lst in lists['lists']:
print(
'ID: {}, Name: "{}", Members: {}'.format(
lst['id'], lst['name'], lst['stats']['member_count']
)
)
FIELDS = [
'email_address',
'email_type',
'status',
'vip',
'merge_fields.*',
'ip_signup',
'timestamp_signup',
'language',
'location.latitude',
'location.longitude',
'location.country_code',
'location.timezone',
'tags',
]
def _export_member(member):
"""Unpack a list member into a data structure specified by FIELDS."""
mem = {}
for field in FIELDS:
if '.' not in field:
mem[field] = member[field]
else:
fields = field.split('.')
if fields[1] == '*':
nested_fields = member[fields[0]].keys()
else:
nested_fields = [fields[1]]
for nf in nested_fields:
mem['%s.%s' % (fields[0], nf)] = member[fields[0]][nf]
return mem
def to_csv(members):
"""Convert JSON data structure to CSV string."""
with io.StringIO() as fp:
cw = csv.writer(fp)
if len(members):
cw.writerow(members[0].keys())
for member in members:
cw.writerow(member.values())
value = fp.getvalue()
return value
def export_list(key, list_id):
"""Export list."""
lst = _client(key).lists.members.all(list_id, get_all=True)
export = []
for member in lst.get('members', []):
export.append(_export_member(member))
return to_csv(export)
def export_all_lists(key, options):
"""Export all existing lists."""
for lst in get_lists(key)['lists']:
yield (lst['id'], export_list(key, lst['id']))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='MailChimp list backup script')
parser.add_argument('--key', type=str, help='API key')
parser.add_argument(
'--show-lists', action='store_true', help='Show available lists'
)
parser.add_argument('--list', type=str, help='List ID')
parser.add_argument('--all-lists', action='store_true', help='Export all list IDs')
parser.add_argument('--out', type=str, help='Output file')
parser.add_argument(
'--fail-if-empty',
action='store_true',
help='Fail if there are no lists or any of them is empty',
)
options = parser.parse_args()
key = options.key or os.environ.get('MAILCHIMP_KEY')
if key is None:
parser.exit(
status=1,
message=(
'Please specify either the MAILCHIMP_KEY '
'environment variable or the --key argument\n'
),
)
if options.show_lists:
show_lists(key)
parser.exit()
if options.list:
lst = export_list(key, options.list)
if options.fail_if_empty and len(lst.split('\n')) < 2:
parser.exit(
status=1, message='List {} is empty'.format(options.list)
)
if options.out:
filename = _filename(options.out, options.list)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as fp:
fp.write(lst.encode('utf-8'))
else:
sys.stdout.write(lst)
parser.exit()
if options.all_lists:
current_filename = None
lists = list(export_all_lists(key, options))
if options.fail_if_empty and len(lists) == 0:
parser.exit(status=1, message='No lists found')
for lst_id, lst in lists:
if options.fail_if_empty and len(lst.split('\n')) < 2:
parser.exit(status=1, message='List {} is empty'.format(lst_id))
if options.out:
filename = _filename(options.out, lst_id)
os.makedirs(os.path.dirname(filename), exist_ok=True)
mode = 'wb' if current_filename != filename else 'ab'
with open(filename, mode) as fp:
fp.write(lst.encode('utf-8'))
current_filename = filename
else:
sys.stdout.write(lst)
parser.exit()