Skip to content

Commit 0143634

Browse files
authored
Merge pull request #106 from sawdog/master
Fixes changes in 3.7 Multiprocessing for Import/Export and dump/restore
2 parents 2568f21 + b4d8125 commit 0143634

File tree

6 files changed

+58
-44
lines changed

6 files changed

+58
-44
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ test-ci:
5555
@killall rebirthdb
5656

5757
test-remote:
58-
curl -qo ${REMOTE_TEST_SETUP_NAME} ${REMOTE_TEST_SETUP_URL}
5958
python ${REMOTE_TEST_SETUP_NAME} pytest -m integration
6059

6160
install-db:

requirements-dev.txt

Lines changed: 0 additions & 2 deletions
This file was deleted.

rethinkdb/_export.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import ctypes
2424
import datetime
2525
import json
26-
import multiprocessing as mp
26+
import multiprocessing
2727
import numbers
2828
import optparse
2929
import os
@@ -35,6 +35,8 @@
3535
import traceback
3636
from multiprocessing.queues import SimpleQueue
3737

38+
import six
39+
3840
from rethinkdb import errors, query, utils_common
3941
from rethinkdb.logger import default_logger
4042

@@ -259,12 +261,16 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
259261
with sindex_counter.get_lock():
260262
sindex_counter.value += len(table_info["indexes"])
261263
# -- start the writer
262-
ctx = mp.get_context(mp.get_start_method())
263-
task_queue = SimpleQueue(ctx=ctx)
264+
if six.PY3:
265+
ctx = multiprocessing.get_context(multiprocessing.get_start_method())
266+
task_queue = SimpleQueue(ctx=ctx)
267+
else:
268+
task_queue = SimpleQueue()
269+
264270
writer = None
265271
if options.format == "json":
266272
filename = directory + "/%s/%s.json" % (db, table)
267-
writer = mp.Process(
273+
writer = multiprocessing.Process(
268274
target=json_writer,
269275
args=(
270276
filename,
@@ -274,7 +280,7 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
274280
options.format))
275281
elif options.format == "csv":
276282
filename = directory + "/%s/%s.csv" % (db, table)
277-
writer = mp.Process(
283+
writer = multiprocessing.Process(
278284
target=csv_writer,
279285
args=(
280286
filename,
@@ -284,7 +290,7 @@ def export_table(db, table, directory, options, error_queue, progress_info, sind
284290
error_queue))
285291
elif options.format == "ndjson":
286292
filename = directory + "/%s/%s.ndjson" % (db, table)
287-
writer = mp.Process(
293+
writer = multiprocessing.Process(
288294
target=json_writer,
289295
args=(
290296
filename,
@@ -389,13 +395,16 @@ def update_progress(progress_info, options):
389395

390396
def run_clients(options, workingDir, db_table_set):
391397
# Spawn one client for each db.table, up to options.clients at a time
392-
exit_event = mp.Event()
398+
exit_event = multiprocessing.Event()
393399
processes = []
394-
ctx = mp.get_context(mp.get_start_method())
395-
error_queue = SimpleQueue(ctx=ctx)
396-
interrupt_event = mp.Event()
397-
sindex_counter = mp.Value(ctypes.c_longlong, 0)
398-
hook_counter = mp.Value(ctypes.c_longlong, 0)
400+
if six.PY3:
401+
ctx = multiprocessing.get_context(multiprocessing.get_start_method())
402+
error_queue = SimpleQueue(ctx=ctx)
403+
else:
404+
error_queue = SimpleQueue()
405+
interrupt_event = multiprocessing.Event()
406+
sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0)
407+
hook_counter = multiprocessing.Value(ctypes.c_longlong, 0)
399408

400409
signal.signal(signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event))
401410
errors = []
@@ -407,8 +416,8 @@ def run_clients(options, workingDir, db_table_set):
407416

408417
tableSize = int(options.retryQuery("count", query.db(db).table(table).info()['doc_count_estimates'].sum()))
409418

410-
progress_info.append((mp.Value(ctypes.c_longlong, 0),
411-
mp.Value(ctypes.c_longlong, tableSize)))
419+
progress_info.append((multiprocessing.Value(ctypes.c_longlong, 0),
420+
multiprocessing.Value(ctypes.c_longlong, tableSize)))
412421
arg_lists.append((db, table,
413422
workingDir,
414423
options,
@@ -430,9 +439,9 @@ def run_clients(options, workingDir, db_table_set):
430439
processes = [process for process in processes if process.is_alive()]
431440

432441
if len(processes) < options.clients and len(arg_lists) > 0:
433-
newProcess = mp.Process(target=export_table, args=arg_lists.pop(0))
434-
newProcess.start()
435-
processes.append(newProcess)
442+
new_process = multiprocessing.Process(target=export_table, args=arg_lists.pop(0))
443+
new_process.start()
444+
processes.append(new_process)
436445

437446
update_progress(progress_info, options)
438447

rethinkdb/_import.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626
import csv
2727
import ctypes
2828
import json
29-
import multiprocessing as mp
29+
import multiprocessing
3030
import optparse
3131
import os
3232
import signal
3333
import sys
3434
import time
3535
import traceback
36+
import six
3637
from multiprocessing.queues import Queue, SimpleQueue
3738

3839
from rethinkdb import ast, errors, query, utils_common
@@ -110,12 +111,12 @@ def __init__(
110111
self.query_runner = query_runner
111112

112113
# reporting information
113-
self._bytes_size = mp.Value(ctypes.c_longlong, -1)
114-
self._bytes_read = mp.Value(ctypes.c_longlong, -1)
114+
self._bytes_size = multiprocessing.Value(ctypes.c_longlong, -1)
115+
self._bytes_read = multiprocessing.Value(ctypes.c_longlong, -1)
115116

116-
self._total_rows = mp.Value(ctypes.c_longlong, -1)
117-
self._rows_read = mp.Value(ctypes.c_longlong, 0)
118-
self._rows_written = mp.Value(ctypes.c_longlong, 0)
117+
self._total_rows = multiprocessing.Value(ctypes.c_longlong, -1)
118+
self._rows_read = multiprocessing.Value(ctypes.c_longlong, 0)
119+
self._rows_written = multiprocessing.Value(ctypes.c_longlong, 0)
119120

120121
# source
121122
if hasattr(source, 'read'):
@@ -1083,15 +1084,21 @@ def import_tables(options, sources, files_ignored=None):
10831084

10841085
tables = dict(((x.db, x.table), x) for x in sources) # (db, table) => table
10851086

1086-
ctx = mp.get_context(mp.get_start_method())
1087+
if six.PY3:
1088+
ctx = multiprocessing.get_context(multiprocessing.get_start_method())
1089+
error_queue = SimpleQueue(ctx=ctx)
1090+
warning_queue = SimpleQueue(ctx=ctx)
1091+
timing_queue = SimpleQueue(ctx=ctx)
1092+
else:
1093+
error_queue = SimpleQueue()
1094+
warning_queue = SimpleQueue()
1095+
timing_queue = SimpleQueue()
1096+
10871097
max_queue_size = options.clients * 3
1088-
work_queue = mp.Manager().Queue(max_queue_size)
1089-
error_queue = SimpleQueue(ctx=ctx)
1090-
warning_queue = SimpleQueue(ctx=ctx)
1091-
exit_event = mp.Event()
1092-
interrupt_event = mp.Event()
1098+
work_queue = multiprocessing.Manager().Queue(max_queue_size)
10931099

1094-
timing_queue = SimpleQueue(ctx=ctx)
1100+
exit_event = multiprocessing.Event()
1101+
interrupt_event = multiprocessing.Event()
10951102

10961103
errors = []
10971104
warnings = []
@@ -1168,7 +1175,7 @@ def drain_queues():
11681175
try:
11691176
# - start the progress bar
11701177
if not options.quiet:
1171-
progress_bar = mp.Process(
1178+
progress_bar = multiprocessing.Process(
11721179
target=update_progress,
11731180
name="progress bar",
11741181
args=(sources, options.debug, exit_event, progress_bar_sleep)
@@ -1180,7 +1187,7 @@ def drain_queues():
11801187
writers = []
11811188
pools.append(writers)
11821189
for i in range(options.clients):
1183-
writer = mp.Process(
1190+
writer = multiprocessing.Process(
11841191
target=table_writer,
11851192
name="table writer %d" %
11861193
i,
@@ -1204,7 +1211,7 @@ def drain_queues():
12041211
# add a workers to fill up the readers pool
12051212
while len(readers) < options.clients:
12061213
table = next(file_iter)
1207-
reader = mp.Process(
1214+
reader = multiprocessing.Process(
12081215
target=table.read_to_queue,
12091216
name="table reader %s.%s" %
12101217
(table.db,

rethinkdb/utils_common.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def check_minimum_version(options, minimum_version='1.6'):
129129
version_string = options.retryQuery('get server version', query.db(
130130
'rethinkdb').table('server_status')[0]['process']['version'])
131131

132-
matches = re.match(r'rethinkdb (?P<version>(\d+)\.(\d+)\.(\d+)).*', version_string)
132+
matches = re.match(r'(rethinkdb|rebirthdb) (?P<version>(\d+)\.(\d+)\.(\d+)).*', version_string)
133133

134134
if not matches:
135135
raise RuntimeError("invalid version string format: %s" % version_string)
@@ -285,11 +285,11 @@ def take_action(self, action, dest, opt, value, values, parser):
285285
'--connect',
286286
dest='driver_port',
287287
metavar='HOST:PORT',
288-
help='host and client port of a rethinkdb node to connect (default: localhost:%d)' %
289-
net.DEFAULT_PORT,
288+
help='host and client port of a rethinkdb node to connect (default: localhost:%d)' % net.DEFAULT_PORT,
290289
action='callback',
291290
callback=combined_connect_action,
292-
type='string')
291+
type='str'
292+
)
293293
connection_group.add_option(
294294
'--driver-port',
295295
dest='driver_port',

setup.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@
4444
if MATCH.group("post"):
4545
VERSION += "." + MATCH.group("post")
4646

47-
with open("rethinkdb/version.py", "w") as ostream:
48-
print("# Autogenerated version", file=ostream)
49-
print(file=ostream)
50-
print("VERSION", "=", repr(VERSION), file=ostream)
47+
with open("rethinkdb/version.py", "w") as f:
48+
f.writelines([
49+
"# Autogenerated version",
50+
"VERSION = {0}".format(VERSION)
51+
])
5152
else:
5253
raise RuntimeError("{!r} does not match version format {!r}".format(
5354
RETHINKDB_VERSION_DESCRIBE, VERSION_RE))

0 commit comments

Comments
 (0)