From aa5726bb0325053a313ef9dcfd79615c7942ebd2 Mon Sep 17 00:00:00 2001 From: Bogdan Popescu <68062990+bopopescu@users.noreply.github.com> Date: Thu, 23 Jul 2020 18:45:18 +0300 Subject: [PATCH] Migrating from master and slave jargon --- spark-setup/ec2/deploy_templates.py | 64 +-- spark-setup/ec2/spark_ec2.py | 486 +++++++++--------- .../main/python/mllib/naive_bayes_example.py | 2 +- spark-setup/python/docs/conf.py | 6 +- spark-setup/python/pyspark/conf.py | 14 +- spark-setup/python/pyspark/context.py | 26 +- .../python/pyspark/ml/classification.py | 2 +- spark-setup/python/pyspark/ml/clustering.py | 2 +- spark-setup/python/pyspark/ml/evaluation.py | 2 +- spark-setup/python/pyspark/ml/feature.py | 2 +- .../python/pyspark/ml/recommendation.py | 2 +- spark-setup/python/pyspark/ml/regression.py | 2 +- spark-setup/python/pyspark/ml/tuning.py | 2 +- .../python/pyspark/mllib/classification.py | 2 +- .../python/pyspark/mllib/evaluation.py | 2 +- spark-setup/python/pyspark/mllib/feature.py | 2 +- spark-setup/python/pyspark/mllib/fpm.py | 2 +- .../pyspark/mllib/linalg/distributed.py | 2 +- spark-setup/python/pyspark/mllib/random.py | 2 +- .../python/pyspark/mllib/regression.py | 2 +- .../python/pyspark/mllib/stat/_statistics.py | 2 +- spark-setup/python/pyspark/mllib/tree.py | 2 +- spark-setup/python/pyspark/mllib/util.py | 2 +- spark-setup/python/pyspark/rdd.py | 6 +- spark-setup/python/pyspark/sql/catalog.py | 2 +- spark-setup/python/pyspark/sql/column.py | 2 +- spark-setup/python/pyspark/sql/conf.py | 2 +- spark-setup/python/pyspark/sql/functions.py | 2 +- spark-setup/python/pyspark/sql/group.py | 2 +- spark-setup/python/pyspark/sql/session.py | 12 +- spark-setup/python/pyspark/sql/tests.py | 2 +- .../python/pyspark/streaming/context.py | 2 +- spark-setup/python/pyspark/streaming/flume.py | 4 +- spark-setup/python/pyspark/tests.py | 10 +- 34 files changed, 339 insertions(+), 339 deletions(-) diff --git a/spark-setup/ec2/deploy_templates.py b/spark-setup/ec2/deploy_templates.py index 895e55a..af038db 100755 --- a/spark-setup/ec2/deploy_templates.py +++ b/spark-setup/ec2/deploy_templates.py @@ -7,69 +7,69 @@ import sys # Deploy the configuration file templates in the spark-ec2/templates directory -# to the root filesystem, substituting variables such as the master hostname, +# to the root filesystem, substituting variables such as the main hostname, # ZooKeeper URL, etc as read from the environment. # Find system memory in KB and compute Spark's default limit from that mem_command = "cat /proc/meminfo | grep MemTotal | awk '{print $2}'" cpu_command = "nproc" -master_ram_kb = int( +main_ram_kb = int( os.popen(mem_command).read().strip()) -# This is the master's memory. Try to find slave's memory as well -first_slave = os.popen("cat /root/spark-ec2/slaves | head -1").read().strip() +# This is the main's memory. Try to find subordinate's memory as well +first_subordinate = os.popen("cat /root/spark-ec2/subordinates | head -1").read().strip() -slave_mem_command = "ssh -t -o StrictHostKeyChecking=no %s %s" %\ - (first_slave, mem_command) +subordinate_mem_command = "ssh -t -o StrictHostKeyChecking=no %s %s" %\ + (first_subordinate, mem_command) -slave_cpu_command = "ssh -t -o StrictHostKeyChecking=no %s %s" %\ - (first_slave, cpu_command) +subordinate_cpu_command = "ssh -t -o StrictHostKeyChecking=no %s %s" %\ + (first_subordinate, cpu_command) -slave_ram_kb = int(os.popen(slave_mem_command).read().strip()) +subordinate_ram_kb = int(os.popen(subordinate_mem_command).read().strip()) -slave_cpus = int(os.popen(slave_cpu_command).read().strip()) +subordinate_cpus = int(os.popen(subordinate_cpu_command).read().strip()) -system_ram_kb = min(slave_ram_kb, master_ram_kb) +system_ram_kb = min(subordinate_ram_kb, main_ram_kb) system_ram_mb = system_ram_kb / 1024 -slave_ram_mb = slave_ram_kb / 1024 +subordinate_ram_mb = subordinate_ram_kb / 1024 # Leave some RAM for the OS, Hadoop daemons, and system caches -if slave_ram_mb > 100*1024: - slave_ram_mb = slave_ram_mb - 15 * 1024 # Leave 15 GB RAM -elif slave_ram_mb > 60*1024: - slave_ram_mb = slave_ram_mb - 10 * 1024 # Leave 10 GB RAM -elif slave_ram_mb > 40*1024: - slave_ram_mb = slave_ram_mb - 6 * 1024 # Leave 6 GB RAM -elif slave_ram_mb > 20*1024: - slave_ram_mb = slave_ram_mb - 3 * 1024 # Leave 3 GB RAM -elif slave_ram_mb > 10*1024: - slave_ram_mb = slave_ram_mb - 2 * 1024 # Leave 2 GB RAM +if subordinate_ram_mb > 100*1024: + subordinate_ram_mb = subordinate_ram_mb - 15 * 1024 # Leave 15 GB RAM +elif subordinate_ram_mb > 60*1024: + subordinate_ram_mb = subordinate_ram_mb - 10 * 1024 # Leave 10 GB RAM +elif subordinate_ram_mb > 40*1024: + subordinate_ram_mb = subordinate_ram_mb - 6 * 1024 # Leave 6 GB RAM +elif subordinate_ram_mb > 20*1024: + subordinate_ram_mb = subordinate_ram_mb - 3 * 1024 # Leave 3 GB RAM +elif subordinate_ram_mb > 10*1024: + subordinate_ram_mb = subordinate_ram_mb - 2 * 1024 # Leave 2 GB RAM else: - slave_ram_mb = max(512, slave_ram_mb - 1300) # Leave 1.3 GB RAM + subordinate_ram_mb = max(512, subordinate_ram_mb - 1300) # Leave 1.3 GB RAM -# Make tachyon_mb as slave_ram_mb for now. -tachyon_mb = slave_ram_mb +# Make tachyon_mb as subordinate_ram_mb for now. +tachyon_mb = subordinate_ram_mb worker_instances_str = "" -worker_cores = slave_cpus +worker_cores = subordinate_cpus if os.getenv("SPARK_WORKER_INSTANCES") != "": worker_instances = int(os.getenv("SPARK_WORKER_INSTANCES", 1)) worker_instances_str = "%d" % worker_instances # Distribute equally cpu cores among worker instances - worker_cores = max(slave_cpus / worker_instances, 1) + worker_cores = max(subordinate_cpus / worker_instances, 1) template_vars = { - "master_list": os.getenv("MASTERS"), - "active_master": os.getenv("MASTERS").split("\n")[0], - "slave_list": os.getenv("SLAVES"), + "main_list": os.getenv("MASTERS"), + "active_main": os.getenv("MASTERS").split("\n")[0], + "subordinate_list": os.getenv("SLAVES"), "hdfs_data_dirs": os.getenv("HDFS_DATA_DIRS"), "mapred_local_dirs": os.getenv("MAPRED_LOCAL_DIRS"), "spark_local_dirs": os.getenv("SPARK_LOCAL_DIRS"), - "spark_worker_mem": "%dm" % slave_ram_mb, + "spark_worker_mem": "%dm" % subordinate_ram_mb, "spark_worker_instances": worker_instances_str, "spark_worker_cores": "%d" % worker_cores, - "spark_master_opts": os.getenv("SPARK_MASTER_OPTS", ""), + "spark_main_opts": os.getenv("SPARK_MASTER_OPTS", ""), "spark_version": os.getenv("SPARK_VERSION"), "tachyon_version": os.getenv("TACHYON_VERSION"), "hadoop_major_version": os.getenv("HADOOP_MAJOR_VERSION"), diff --git a/spark-setup/ec2/spark_ec2.py b/spark-setup/ec2/spark_ec2.py index 7c8656c..f5268fa 100644 --- a/spark-setup/ec2/spark_ec2.py +++ b/spark-setup/ec2/spark_ec2.py @@ -183,11 +183,11 @@ def parse_args(): prog="spark-ec2", version="%prog {v}".format(v=SPARK_EC2_VERSION), usage="%prog [options] \n\n" - + " can be: launch, destroy, login, stop, start, get-master, reboot-slaves") + + " can be: launch, destroy, login, stop, start, get-main, reboot-subordinates") parser.add_option( - "-s", "--slaves", type="int", default=1, - help="Number of slaves to launch (default: %default)") + "-s", "--subordinates", type="int", default=1, + help="Number of subordinates to launch (default: %default)") parser.add_option( "-w", "--wait", type="int", help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start") @@ -206,15 +206,15 @@ def parse_args(): help="Type of instance to launch (default: %default). " + "WARNING: must be 64-bit; small instances won't work") parser.add_option( - "-m", "--master-instance-type", default="", - help="Master instance type (leave empty for same as instance-type)") + "-m", "--main-instance-type", default="", + help="Main instance type (leave empty for same as instance-type)") parser.add_option( "-r", "--region", default="us-east-1", help="EC2 region used to launch instances in, or to find them in (default: %default)") parser.add_option( "-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "subordinates across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies) (default: a single zone chosen at random)") parser.add_option( "-a", "--ami", @@ -237,7 +237,7 @@ def parse_args(): parser.add_option( "--deploy-root-dir", default=None, - help="A directory to copy into / on the first master. " + + help="A directory to copy into / on the first main. " + "Must be absolute. Note that a trailing slash is handled as per rsync: " + "If you omit it, the last directory of the --deploy-root-dir path will be created " + "in / before copying its contents. If you append the trailing slash, " + @@ -278,7 +278,7 @@ def parse_args(): help="Swap space to set up per node, in MB (default: %default)") parser.add_option( "--spot-price", metavar="PRICE", type="float", - help="If specified, launch slaves as spot instances with the given " + + help="If specified, launch subordinates as spot instances with the given " + "maximum price (in dollars)") parser.add_option( "--ganglia", action="store_true", default=True, @@ -294,15 +294,15 @@ def parse_args(): "--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") parser.add_option( - "--use-existing-master", action="store_true", default=False, - help="Launch fresh slaves, but use an existing stopped master if possible") + "--use-existing-main", action="store_true", default=False, + help="Launch fresh subordinates, but use an existing stopped main if possible") parser.add_option( "--worker-instances", type="int", default=1, help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " + "is used as Hadoop major version (default: %default)") parser.add_option( - "--master-opts", type="string", default="", - help="Extra options to give to master through SPARK_MASTER_OPTS variable " + + "--main-opts", type="string", default="", + help="Extra options to give to main through SPARK_MASTER_OPTS variable " + "(e.g -Dspark.worker.timeout=180)") parser.add_option( "--user-data", type="string", default="", @@ -320,7 +320,7 @@ def parse_args(): parser.add_option( "--tag-volumes", action="store_true", default=False, help="Apply the tags given in --additional-tags to any EBS volumes " + - "attached to master and slave instances.") + "attached to main and subordinate instances.") parser.add_option( "--copy-aws-credentials", action="store_true", default=False, help="Add AWS credentials to hadoop configuration to allow Spark to access S3") @@ -503,7 +503,7 @@ def get_spark_ami(opts): # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the master and slaves +# Returns a tuple of EC2 reservation objects for the main and subordinates # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: @@ -520,77 +520,77 @@ def launch_cluster(conn, opts, cluster_name): user_data_content = user_data_file.read() print("Setting up security groups...") - master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + main_group = get_or_make_group(conn, cluster_name + "-main", opts.vpc_id) + subordinate_group = get_or_make_group(conn, cluster_name + "-subordinates", opts.vpc_id) authorized_address = opts.authorized_address - if master_group.rules == []: # Group was just now created + if main_group.rules == []: # Group was just now created if opts.vpc_id is None: - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) + main_group.authorize(src_group=main_group) + main_group.authorize(src_group=subordinate_group) else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize('tcp', 22, 22, authorized_address) - master_group.authorize('tcp', 8080, 8081, authorized_address) - master_group.authorize('tcp', 18080, 18080, authorized_address) - master_group.authorize('tcp', 19999, 19999, authorized_address) - master_group.authorize('tcp', 50030, 50030, authorized_address) - master_group.authorize('tcp', 50070, 50070, authorized_address) - master_group.authorize('tcp', 60070, 60070, authorized_address) - master_group.authorize('tcp', 4040, 4045, authorized_address) + main_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=main_group) + main_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=main_group) + main_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=main_group) + main_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + main_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + main_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + main_group.authorize('tcp', 22, 22, authorized_address) + main_group.authorize('tcp', 8080, 8081, authorized_address) + main_group.authorize('tcp', 18080, 18080, authorized_address) + main_group.authorize('tcp', 19999, 19999, authorized_address) + main_group.authorize('tcp', 50030, 50030, authorized_address) + main_group.authorize('tcp', 50070, 50070, authorized_address) + main_group.authorize('tcp', 60070, 60070, authorized_address) + main_group.authorize('tcp', 4040, 4045, authorized_address) # Rstudio (GUI for R) needs port 8787 for web access - master_group.authorize('tcp', 8787, 8787, authorized_address) + main_group.authorize('tcp', 8787, 8787, authorized_address) # HDFS NFS gateway requires 111,2049,4242 for tcp & udp - master_group.authorize('tcp', 111, 111, authorized_address) - master_group.authorize('udp', 111, 111, authorized_address) - master_group.authorize('tcp', 2049, 2049, authorized_address) - master_group.authorize('udp', 2049, 2049, authorized_address) - master_group.authorize('tcp', 4242, 4242, authorized_address) - master_group.authorize('udp', 4242, 4242, authorized_address) + main_group.authorize('tcp', 111, 111, authorized_address) + main_group.authorize('udp', 111, 111, authorized_address) + main_group.authorize('tcp', 2049, 2049, authorized_address) + main_group.authorize('udp', 2049, 2049, authorized_address) + main_group.authorize('tcp', 4242, 4242, authorized_address) + main_group.authorize('udp', 4242, 4242, authorized_address) # RM in YARN mode uses 8088 - master_group.authorize('tcp', 8088, 8088, authorized_address) + main_group.authorize('tcp', 8088, 8088, authorized_address) if opts.ganglia: - master_group.authorize('tcp', 5080, 5080, authorized_address) - if slave_group.rules == []: # Group was just now created + main_group.authorize('tcp', 5080, 5080, authorized_address) + if subordinate_group.rules == []: # Group was just now created if opts.vpc_id is None: - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) + subordinate_group.authorize(src_group=main_group) + subordinate_group.authorize(src_group=subordinate_group) else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize('tcp', 22, 22, authorized_address) - slave_group.authorize('tcp', 8080, 8081, authorized_address) - slave_group.authorize('tcp', 50060, 50060, authorized_address) - slave_group.authorize('tcp', 50075, 50075, authorized_address) - slave_group.authorize('tcp', 60060, 60060, authorized_address) - slave_group.authorize('tcp', 60075, 60075, authorized_address) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=main_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=main_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=main_group) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize('tcp', 22, 22, authorized_address) + subordinate_group.authorize('tcp', 8080, 8081, authorized_address) + subordinate_group.authorize('tcp', 50060, 50060, authorized_address) + subordinate_group.authorize('tcp', 50075, 50075, authorized_address) + subordinate_group.authorize('tcp', 60060, 60060, authorized_address) + subordinate_group.authorize('tcp', 60075, 60075, authorized_address) # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + existing_mains, existing_subordinates = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - if existing_slaves or (existing_masters and not opts.use_existing_master): + if existing_subordinates or (existing_mains and not opts.use_existing_main): print("ERROR: There are already instances running in group %s or %s" % - (master_group.name, slave_group.name), file=stderr) + (main_group.name, subordinate_group.name), file=stderr) sys.exit(1) # Figure out Spark AMI @@ -631,32 +631,32 @@ def launch_cluster(conn, opts, cluster_name): name = '/dev/sd' + string.ascii_letters[i + 1] block_map[name] = dev - # Launch slaves + # Launch subordinates if opts.spot_price is not None: # Launch spot instances with the requested price - print("Requesting %d slaves as spot instances with price $%.3f" % - (opts.slaves, opts.spot_price)) + print("Requesting %d subordinates as spot instances with price $%.3f" % + (opts.subordinates, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 my_req_ids = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - slave_reqs = conn.request_spot_instances( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + subordinate_reqs = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="launch-group-%s" % cluster_name, placement=zone, - count=num_slaves_this_zone, + count=num_subordinates_this_zone, key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_profile_name=opts.instance_profile_name) - my_req_ids += [req.id for req in slave_reqs] + my_req_ids += [req.id for req in subordinate_reqs] i += 1 print("Waiting for spot instances to be granted...") @@ -671,23 +671,23 @@ def launch_cluster(conn, opts, cluster_name): for i in my_req_ids: if i in id_to_req and id_to_req[i].state == "active": active_instance_ids.append(id_to_req[i].instance_id) - if len(active_instance_ids) == opts.slaves: - print("All %d slaves granted" % opts.slaves) + if len(active_instance_ids) == opts.subordinates: + print("All %d subordinates granted" % opts.subordinates) reservations = conn.get_all_reservations(active_instance_ids) - slave_nodes = [] + subordinate_nodes = [] for r in reservations: - slave_nodes += r.instances + subordinate_nodes += r.instances break else: - print("%d of %d slaves granted, waiting longer" % ( - len(active_instance_ids), opts.slaves)) + print("%d of %d subordinates granted, waiting longer" % ( + len(active_instance_ids), opts.subordinates)) except: print("Canceling spot instance requests") conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(slave_nodes) + running = len(main_nodes) + len(subordinate_nodes) if running: print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) @@ -696,48 +696,48 @@ def launch_cluster(conn, opts, cluster_name): zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_nodes = [] + subordinate_nodes = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - if num_slaves_this_zone > 0: - slave_res = image.run( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + if num_subordinates_this_zone > 0: + subordinate_res = image.run( key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, - min_count=num_slaves_this_zone, - max_count=num_slaves_this_zone, + min_count=num_subordinates_this_zone, + max_count=num_subordinates_this_zone, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - slave_nodes += slave_res.instances - print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( - s=num_slaves_this_zone, - plural_s=('' if num_slaves_this_zone == 1 else 's'), + subordinate_nodes += subordinate_res.instances + print("Launched {s} subordinate{plural_s} in {z}, regid = {r}".format( + s=num_subordinates_this_zone, + plural_s=('' if num_subordinates_this_zone == 1 else 's'), z=zone, - r=slave_res.id)) + r=subordinate_res.id)) i += 1 - # Launch or resume masters - if existing_masters: - print("Starting master...") - for inst in existing_masters: + # Launch or resume mains + if existing_mains: + print("Starting main...") + for inst in existing_mains: if inst.state not in ["shutting-down", "terminated"]: inst.start() - master_nodes = existing_masters + main_nodes = existing_mains else: - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type + main_type = opts.main_instance_type + if main_type == "": + main_type = opts.instance_type if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name - master_res = image.run( + main_res = image.run( key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, - instance_type=master_type, + security_group_ids=[main_group.id] + additional_group_ids, + instance_type=main_type, placement=opts.zone, min_count=1, max_count=1, @@ -748,8 +748,8 @@ def launch_cluster(conn, opts, cluster_name): instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - master_nodes = master_res.instances - print("Launched master in %s, regid = %s" % (zone, master_res.id)) + main_nodes = main_res.instances + print("Launched main in %s, regid = %s" % (zone, main_res.id)) # This wait time corresponds to SPARK-4983 print("Waiting for AWS to propagate instance metadata...") @@ -762,22 +762,22 @@ def launch_cluster(conn, opts, cluster_name): map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') ) - print('Applying tags to master nodes') - for master in master_nodes: - master.add_tags( - dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + print('Applying tags to main nodes') + for main in main_nodes: + main.add_tags( + dict(additional_tags, Name='{cn}-main-{iid}'.format(cn=cluster_name, iid=main.id)) ) - print('Applying tags to slave nodes') - for slave in slave_nodes: - slave.add_tags( - dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + print('Applying tags to subordinate nodes') + for subordinate in subordinate_nodes: + subordinate.add_tags( + dict(additional_tags, Name='{cn}-subordinate-{iid}'.format(cn=cluster_name, iid=subordinate.id)) ) if opts.tag_volumes: if len(additional_tags) > 0: print('Applying tags to volumes') - all_instance_ids = [x.id for x in master_nodes + slave_nodes] + all_instance_ids = [x.id for x in main_nodes + subordinate_nodes] volumes = conn.get_all_volumes(filters={'attachment.instance-id': all_instance_ids}) for v in volumes: v.add_tags(additional_tags) @@ -785,13 +785,13 @@ def launch_cluster(conn, opts, cluster_name): print('--tag-volumes has no effect without --additional-tags') # Return all the instances - return (master_nodes, slave_nodes) + return (main_nodes, subordinate_nodes) def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. - Returns a tuple of lists of EC2 instance objects for the masters and slaves. + Returns a tuple of lists of EC2 instance objects for the mains and subordinates. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) @@ -808,42 +808,42 @@ def get_instances(group_names): instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] - master_instances = get_instances([cluster_name + "-master"]) - slave_instances = get_instances([cluster_name + "-slaves"]) + main_instances = get_instances([cluster_name + "-main"]) + subordinate_instances = get_instances([cluster_name + "-subordinates"]) - if any((master_instances, slave_instances)): - print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( - m=len(master_instances), - plural_m=('' if len(master_instances) == 1 else 's'), - s=len(slave_instances), - plural_s=('' if len(slave_instances) == 1 else 's'))) + if any((main_instances, subordinate_instances)): + print("Found {m} main{plural_m}, {s} subordinate{plural_s}.".format( + m=len(main_instances), + plural_m=('' if len(main_instances) == 1 else 's'), + s=len(subordinate_instances), + plural_s=('' if len(subordinate_instances) == 1 else 's'))) - if not master_instances and die_on_error: - print("ERROR: Could not find a master for cluster {c} in region {r}.".format( + if not main_instances and die_on_error: + print("ERROR: Could not find a main for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) - return (master_instances, slave_instances) + return (main_instances, subordinate_instances) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. -def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = get_dns_name(master_nodes[0], opts.private_ips) +def setup_cluster(conn, main_nodes, subordinate_nodes, opts, deploy_ssh_key): + main = get_dns_name(main_nodes[0], opts.private_ips) if deploy_ssh_key: - print("Generating cluster's SSH key on master...") + print("Generating cluster's SSH key on main...") key_setup = """ [ -f ~/.ssh/id_rsa ] || (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ - ssh(master, opts, key_setup) - dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) - print("Transferring cluster's SSH key to slaves...") - for slave in slave_nodes: - slave_address = get_dns_name(slave, opts.private_ips) - print(slave_address) - ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) + ssh(main, opts, key_setup) + dot_ssh_tar = ssh_read(main, opts, ['tar', 'c', '.ssh']) + print("Transferring cluster's SSH key to subordinates...") + for subordinate in subordinate_nodes: + subordinate_address = get_dns_name(subordinate, opts.private_ips) + print(subordinate_address) + ssh_write(subordinate_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon', 'rstudio'] @@ -860,10 +860,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten - print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( + print("Cloning spark-ec2 scripts from {r}/tree/{b} on main...".format( r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)) ssh( - host=master, + host=main, opts=opts, command="rm -rf spark-ec2" + " && " @@ -871,36 +871,36 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): b=opts.spark_ec2_git_branch) ) - print("Deploying files to master...") + print("Deploying files to main...") deploy_files( conn=conn, root_dir=SPARK_EC2_DIR + "/" + "deploy.generic", opts=opts, - master_nodes=master_nodes, - slave_nodes=slave_nodes, + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes, modules=modules ) if opts.deploy_root_dir is not None: - print("Deploying {s} to master...".format(s=opts.deploy_root_dir)) + print("Deploying {s} to main...".format(s=opts.deploy_root_dir)) deploy_user_files( root_dir=opts.deploy_root_dir, opts=opts, - master_nodes=master_nodes + main_nodes=main_nodes ) - print("Running setup on master...") - setup_spark_cluster(master, opts) + print("Running setup on main...") + setup_spark_cluster(main, opts) print("Done!") -def setup_spark_cluster(master, opts): - ssh(master, opts, "chmod u+x spark-ec2/setup.sh") - ssh(master, opts, "spark-ec2/setup.sh") - print("Spark standalone cluster started at http://%s:8080" % master) +def setup_spark_cluster(main, opts): + ssh(main, opts, "chmod u+x spark-ec2/setup.sh") + ssh(main, opts, "spark-ec2/setup.sh") + print("Spark standalone cluster started at http://%s:8080" % main) if opts.ganglia: - print("Ganglia started at http://%s:5080/ganglia" % master) + print("Ganglia started at http://%s:5080/ganglia" % main) def is_ssh_available(host, opts, print_ssh_output=True): @@ -1068,13 +1068,13 @@ def get_num_disks(instance_type): # Deploy the configuration file templates in a given local directory to # a cluster, filling in any template parameters with information about the -# cluster (e.g. lists of masters and slaves). Files are only deployed to -# the first master instance in the cluster, and we expect the setup +# cluster (e.g. lists of mains and subordinates). Files are only deployed to +# the first main instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. # # root_dir should be an absolute path to the directory with the files we want to deploy. -def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = get_dns_name(master_nodes[0], opts.private_ips) +def deploy_files(conn, root_dir, opts, main_nodes, subordinate_nodes, modules): + active_main = get_dns_name(main_nodes[0], opts.private_ips) num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -1086,7 +1086,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i spark_local_dirs += ",/mnt%d/spark" % i - cluster_url = "%s:7077" % active_master + cluster_url = "%s:7077" % active_main if "." in opts.spark_version: # Pre-built Spark deploy @@ -1102,13 +1102,13 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): print("No valid Tachyon version found; Tachyon won't be set up") modules.remove("tachyon") - master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] - slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes] + main_addresses = [get_dns_name(i, opts.private_ips) for i in main_nodes] + subordinate_addresses = [get_dns_name(i, opts.private_ips) for i in subordinate_nodes] worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else "" template_vars = { - "master_list": '\n'.join(master_addresses), - "active_master": active_master, - "slave_list": '\n'.join(slave_addresses), + "main_list": '\n'.join(main_addresses), + "active_main": active_main, + "subordinate_list": '\n'.join(subordinate_addresses), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, @@ -1119,7 +1119,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): "tachyon_version": tachyon_v, "hadoop_major_version": opts.hadoop_major_version, "spark_worker_instances": worker_instances_str, - "spark_master_opts": opts.master_opts + "spark_main_opts": opts.main_opts } if opts.copy_aws_credentials: @@ -1149,12 +1149,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): text = text.replace("{{" + key + "}}", template_vars[key]) dest.write(text) dest.close() - # rsync the whole directory over to the master machine + # rsync the whole directory over to the main machine command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s/" % tmp_dir, - "%s@%s:/" % (opts.user, active_master) + "%s@%s:/" % (opts.user, active_main) ] subprocess.check_call(command) # Remove the temp directory we created above @@ -1164,16 +1164,16 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): # Deploy a given local directory to a cluster, WITHOUT parameter substitution. # Note that unlike deploy_files, this works for binary files. # Also, it is up to the user to add (or not) the trailing slash in root_dir. -# Files are only deployed to the first master instance in the cluster. +# Files are only deployed to the first main instance in the cluster. # # root_dir should be an absolute path. -def deploy_user_files(root_dir, opts, master_nodes): - active_master = get_dns_name(master_nodes[0], opts.private_ips) +def deploy_user_files(root_dir, opts, main_nodes): + active_main = get_dns_name(main_nodes[0], opts.private_ips) command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s" % root_dir, - "%s@%s:/" % (opts.user, active_master) + "%s@%s:/" % (opts.user, active_main) ] subprocess.check_call(command) @@ -1273,10 +1273,10 @@ def get_zones(conn, opts): # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): - num_slaves_this_zone = total // num_partitions + num_subordinates_this_zone = total // num_partitions if (total % num_partitions) - current_partitions > 0: - num_slaves_this_zone += 1 - return num_slaves_this_zone + num_subordinates_this_zone += 1 + return num_subordinates_this_zone # Gets the IP address, taking into account the --private-ips flag @@ -1331,21 +1331,21 @@ def real_main(): print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( t=opts.instance_type), file=stderr) - if opts.master_instance_type != "": - if opts.master_instance_type not in EC2_INSTANCE_TYPES: - print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( - t=opts.master_instance_type), file=stderr) + if opts.main_instance_type != "": + if opts.main_instance_type not in EC2_INSTANCE_TYPES: + print("Warning: Unrecognized EC2 instance type for main-instance-type: {t}".format( + t=opts.main_instance_type), file=stderr) # Since we try instance types even if we can't resolve them, we check if they resolve first # and, if they do, see if they resolve to the same virtualization type. if opts.instance_type in EC2_INSTANCE_TYPES and \ - opts.master_instance_type in EC2_INSTANCE_TYPES: + opts.main_instance_type in EC2_INSTANCE_TYPES: if EC2_INSTANCE_TYPES[opts.instance_type] != \ - EC2_INSTANCE_TYPES[opts.master_instance_type]: - print("Error: spark-ec2 currently does not support having a master and slaves " + EC2_INSTANCE_TYPES[opts.main_instance_type]: + print("Error: spark-ec2 currently does not support having a main and subordinates " "with different AMI virtualization types.", file=stderr) - print("master instance virtualization type: {t}".format( - t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) - print("slave instance virtualization type: {t}".format( + print("main instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.main_instance_type]), file=stderr) + print("subordinate instance virtualization type: {t}".format( t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) sys.exit(1) @@ -1385,48 +1385,48 @@ def real_main(): opts.zone = random.choice(conn.get_all_zones()).name if action == "launch": - if opts.slaves <= 0: - print("ERROR: You have to start at least 1 slave", file=sys.stderr) + if opts.subordinates <= 0: + print("ERROR: You have to start at least 1 subordinate", file=sys.stderr) sys.exit(1) if opts.resume: - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) else: - (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = launch_cluster(conn, opts, cluster_name) wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) - setup_cluster(conn, master_nodes, slave_nodes, opts, True) + setup_cluster(conn, main_nodes, subordinate_nodes, opts, True) elif action == "destroy": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - if any(master_nodes + slave_nodes): + if any(main_nodes + subordinate_nodes): print("The following instances will be terminated:") - for inst in master_nodes + slave_nodes: + for inst in main_nodes + subordinate_nodes: print("> %s" % get_dns_name(inst, opts.private_ips)) print("ALL DATA ON ALL NODES WILL BE LOST!!") msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": - print("Terminating master...") - for inst in master_nodes: + print("Terminating main...") + for inst in main_nodes: inst.terminate() - print("Terminating slaves...") - for inst in slave_nodes: + print("Terminating subordinates...") + for inst in subordinate_nodes: inst.terminate() # Delete security groups as well if opts.delete_groups: - group_names = [cluster_name + "-master", cluster_name + "-slaves"] + group_names = [cluster_name + "-main", cluster_name + "-subordinates"] wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='terminated' ) print("Deleting security groups (this will take some time)...") @@ -1470,38 +1470,38 @@ def real_main(): print("Try re-running in a few minutes.") elif action == "login": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + if not main_nodes[0].public_dns_name and not opts.private_ips: + print("Main has no public DNS name. Maybe you meant to specify --private-ips?") else: - master = get_dns_name(master_nodes[0], opts.private_ips) - print("Logging into master " + master + "...") + main = get_dns_name(main_nodes[0], opts.private_ips) + print("Logging into main " + main + "...") proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, main)]) - elif action == "reboot-slaves": + elif action == "reboot-subordinates": response = raw_input( "Are you sure you want to reboot the cluster " + - cluster_name + " slaves?\n" + - "Reboot cluster slaves " + cluster_name + " (y/N): ") + cluster_name + " subordinates?\n" + + "Reboot cluster subordinates " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Rebooting slaves...") - for inst in slave_nodes: + print("Rebooting subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: print("Rebooting " + inst.id) inst.reboot() - elif action == "get-master": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + elif action == "get-main": + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + if not main_nodes[0].public_dns_name and not opts.private_ips: + print("Main has no public DNS name. Maybe you meant to specify --private-ips?") else: - print(get_dns_name(master_nodes[0], opts.private_ips)) + print(get_dns_name(main_nodes[0], opts.private_ips)) elif action == "stop": response = raw_input( @@ -1509,17 +1509,17 @@ def real_main(): cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + - "All data on spot-instance slaves will be lost.\n" + + "All data on spot-instance subordinates will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Stopping master...") - for inst in master_nodes: + print("Stopping main...") + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.stop() - print("Stopping slaves...") - for inst in slave_nodes: + print("Stopping subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() @@ -1527,33 +1527,33 @@ def real_main(): inst.stop() elif action == "start": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print("Starting slaves...") - for inst in slave_nodes: + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print("Starting subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - print("Starting master...") - for inst in master_nodes: + print("Starting main...") + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) # Determine types of running instances - existing_master_type = master_nodes[0].instance_type - existing_slave_type = slave_nodes[0].instance_type - # Setting opts.master_instance_type to the empty string indicates we - # have the same instance type for the master and the slaves - if existing_master_type == existing_slave_type: - existing_master_type = "" - opts.master_instance_type = existing_master_type - opts.instance_type = existing_slave_type - - setup_cluster(conn, master_nodes, slave_nodes, opts, False) + existing_main_type = main_nodes[0].instance_type + existing_subordinate_type = subordinate_nodes[0].instance_type + # Setting opts.main_instance_type to the empty string indicates we + # have the same instance type for the main and the subordinates + if existing_main_type == existing_subordinate_type: + existing_main_type = "" + opts.main_instance_type = existing_main_type + opts.instance_type = existing_subordinate_type + + setup_cluster(conn, main_nodes, subordinate_nodes, opts, False) else: print("Invalid action: %s" % action, file=stderr) diff --git a/spark-setup/examples/src/main/python/mllib/naive_bayes_example.py b/spark-setup/examples/src/main/python/mllib/naive_bayes_example.py index 749353b..c66b932 100644 --- a/spark-setup/examples/src/main/python/mllib/naive_bayes_example.py +++ b/spark-setup/examples/src/main/python/mllib/naive_bayes_example.py @@ -19,7 +19,7 @@ NaiveBayes Example. Usage: - `spark-submit --master local[4] examples/src/main/python/mllib/naive_bayes_example.py` + `spark-submit --main local[4] examples/src/main/python/mllib/naive_bayes_example.py` """ from __future__ import print_function diff --git a/spark-setup/python/docs/conf.py b/spark-setup/python/docs/conf.py index 50fb317..b9f42bb 100644 --- a/spark-setup/python/docs/conf.py +++ b/spark-setup/python/docs/conf.py @@ -44,8 +44,8 @@ # The encoding of source files. #source_encoding = 'utf-8-sig' -# The master toctree document. -master_doc = 'index' +# The main toctree document. +main_doc = 'index' # General information about the project. project = u'PySpark' @@ -56,7 +56,7 @@ # built documents. # # The short X.Y version. -version = 'master' +version = 'main' # The full version, including alpha/beta/rc tags. release = os.environ.get('RELEASE_VERSION', version) diff --git a/spark-setup/python/pyspark/conf.py b/spark-setup/python/pyspark/conf.py index 491b3a8..585163f 100644 --- a/spark-setup/python/pyspark/conf.py +++ b/spark-setup/python/pyspark/conf.py @@ -19,14 +19,14 @@ >>> from pyspark.conf import SparkConf >>> from pyspark.context import SparkContext >>> conf = SparkConf() ->>> conf.setMaster("local").setAppName("My app") +>>> conf.setMain("local").setAppName("My app") ->>> conf.get("spark.master") +>>> conf.get("spark.main") u'local' >>> conf.get("spark.app.name") u'My app' >>> sc = SparkContext(conf=conf) ->>> sc.master +>>> sc.main u'local' >>> sc.appName u'My app' @@ -88,7 +88,7 @@ class SparkConf(object): what the system properties are. All setter methods in this class support chaining. For example, - you can write C{conf.setMaster("local").setAppName("My app")}. + you can write C{conf.setMain("local").setAppName("My app")}. .. note:: Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. @@ -135,9 +135,9 @@ def setIfMissing(self, key, value): self.set(key, value) return self - def setMaster(self, value): - """Set master URL to connect to.""" - self.set("spark.master", value) + def setMain(self, value): + """Set main URL to connect to.""" + self.set("spark.main", value) return self def setAppName(self, value): diff --git a/spark-setup/python/pyspark/context.py b/spark-setup/python/pyspark/context.py index ac4b2b0..267ab2f 100644 --- a/spark-setup/python/pyspark/context.py +++ b/spark-setup/python/pyspark/context.py @@ -74,14 +74,14 @@ class SparkContext(object): PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') - def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + def __init__(self, main=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler): """ - Create a new SparkContext. At least the master and app name should be set, + Create a new SparkContext. At least the main and app name should be set, either through the named parameters here or through C{conf}. - :param master: Cluster URL to connect to + :param main: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). :param appName: A name for your job, to display on the cluster web UI. :param sparkHome: Location where Spark is installed on cluster nodes. @@ -114,14 +114,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) try: - self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, + self._do_init(main, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise - def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, + def _do_init(self, main, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls): self.environment = environment or {} # java gateway must have been launched at this point. @@ -145,8 +145,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, batchSize) # Set any parameters passed directly to us on the conf - if master: - self._conf.setMaster(master) + if main: + self._conf.setMain(main) if appName: self._conf.setAppName(appName) if sparkHome: @@ -158,14 +158,14 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._conf.setIfMissing(key, value) # Check that we have at least the required parameters - if not self._conf.contains("spark.master"): - raise Exception("A master URL must be set in your configuration") + if not self._conf.contains("spark.main"): + raise Exception("A main URL must be set in your configuration") if not self._conf.contains("spark.app.name"): raise Exception("An application name must be set in your configuration") # Read back our properties from the conf in case we loaded some of them from # the classpath or an external config file - self.master = self._conf.get("spark.master") + self.main = self._conf.get("spark.main") self.appName = self._conf.get("spark.app.name") self.sparkHome = self._conf.get("spark.home", None) @@ -262,16 +262,16 @@ def _ensure_initialized(cls, instance=None, gateway=None, conf=None): if instance: if (SparkContext._active_spark_context and SparkContext._active_spark_context != instance): - currentMaster = SparkContext._active_spark_context.master + currentMain = SparkContext._active_spark_context.main currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite # Raise error if there is already a running Spark context raise ValueError( "Cannot run multiple SparkContexts at once; " - "existing SparkContext(app=%s, master=%s)" + "existing SparkContext(app=%s, main=%s)" " created by %s at %s:%s " - % (currentAppName, currentMaster, + % (currentAppName, currentMain, callsite.function, callsite.file, callsite.linenum)) else: SparkContext._active_spark_context = instance diff --git a/spark-setup/python/pyspark/ml/classification.py b/spark-setup/python/pyspark/ml/classification.py index 570a414..e525268 100644 --- a/spark-setup/python/pyspark/ml/classification.py +++ b/spark-setup/python/pyspark/ml/classification.py @@ -1648,7 +1648,7 @@ def _to_java(self): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.classification tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/ml/clustering.py b/spark-setup/python/pyspark/ml/clustering.py index 86aa289..43c2649 100644 --- a/spark-setup/python/pyspark/ml/clustering.py +++ b/spark-setup/python/pyspark/ml/clustering.py @@ -1134,7 +1134,7 @@ def getKeepLastCheckpoint(self): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.clustering tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/ml/evaluation.py b/spark-setup/python/pyspark/ml/evaluation.py index 7cb8d62..d72de3a 100644 --- a/spark-setup/python/pyspark/ml/evaluation.py +++ b/spark-setup/python/pyspark/ml/evaluation.py @@ -337,7 +337,7 @@ def setParams(self, predictionCol="prediction", labelCol="label", # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.evaluation tests")\ .getOrCreate() globs['spark'] = spark diff --git a/spark-setup/python/pyspark/ml/feature.py b/spark-setup/python/pyspark/ml/feature.py index 3a4b6ed..5b67cc9 100755 --- a/spark-setup/python/pyspark/ml/feature.py +++ b/spark-setup/python/pyspark/ml/feature.py @@ -2797,7 +2797,7 @@ def selectedFeatures(self): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.feature tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/ml/recommendation.py b/spark-setup/python/pyspark/ml/recommendation.py index ee9916f..c8aa67d 100644 --- a/spark-setup/python/pyspark/ml/recommendation.py +++ b/spark-setup/python/pyspark/ml/recommendation.py @@ -373,7 +373,7 @@ def itemFactors(self): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.recommendation tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/ml/regression.py b/spark-setup/python/pyspark/ml/regression.py index b199bf2..3889aa4 100644 --- a/spark-setup/python/pyspark/ml/regression.py +++ b/spark-setup/python/pyspark/ml/regression.py @@ -1666,7 +1666,7 @@ def pValues(self): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.regression tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/ml/tuning.py b/spark-setup/python/pyspark/ml/tuning.py index ffeb445..123e744 100644 --- a/spark-setup/python/pyspark/ml/tuning.py +++ b/spark-setup/python/pyspark/ml/tuning.py @@ -467,7 +467,7 @@ def copy(self, extra=None): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("ml.tuning tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/classification.py b/spark-setup/python/pyspark/mllib/classification.py index 9f53ed0..6b9ebd1 100644 --- a/spark-setup/python/pyspark/mllib/classification.py +++ b/spark-setup/python/pyspark/mllib/classification.py @@ -754,7 +754,7 @@ def _test(): import pyspark.mllib.classification globs = pyspark.mllib.classification.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("mllib.classification tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/evaluation.py b/spark-setup/python/pyspark/mllib/evaluation.py index fc2a0b3..a2fb706 100644 --- a/spark-setup/python/pyspark/mllib/evaluation.py +++ b/spark-setup/python/pyspark/mllib/evaluation.py @@ -535,7 +535,7 @@ def _test(): import pyspark.mllib.evaluation globs = pyspark.mllib.evaluation.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("mllib.evaluation tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/feature.py b/spark-setup/python/pyspark/mllib/feature.py index bde0f67..b616d4a 100644 --- a/spark-setup/python/pyspark/mllib/feature.py +++ b/spark-setup/python/pyspark/mllib/feature.py @@ -778,7 +778,7 @@ def _test(): from pyspark.sql import SparkSession globs = globals().copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("mllib.feature tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/fpm.py b/spark-setup/python/pyspark/mllib/fpm.py index f58ea5d..f0a17a6 100644 --- a/spark-setup/python/pyspark/mllib/fpm.py +++ b/spark-setup/python/pyspark/mllib/fpm.py @@ -179,7 +179,7 @@ def _test(): import pyspark.mllib.fpm globs = pyspark.mllib.fpm.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("mllib.fpm tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/linalg/distributed.py b/spark-setup/python/pyspark/mllib/linalg/distributed.py index 600655c..5c8dd99 100644 --- a/spark-setup/python/pyspark/mllib/linalg/distributed.py +++ b/spark-setup/python/pyspark/mllib/linalg/distributed.py @@ -1178,7 +1178,7 @@ def _test(): import pyspark.mllib.linalg.distributed globs = pyspark.mllib.linalg.distributed.__dict__.copy() spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("mllib.linalg.distributed tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/random.py b/spark-setup/python/pyspark/mllib/random.py index 61213dd..39d1e09 100644 --- a/spark-setup/python/pyspark/mllib/random.py +++ b/spark-setup/python/pyspark/mllib/random.py @@ -414,7 +414,7 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("mllib.random tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/regression.py b/spark-setup/python/pyspark/mllib/regression.py index 1b66f5b..e34c42a 100644 --- a/spark-setup/python/pyspark/mllib/regression.py +++ b/spark-setup/python/pyspark/mllib/regression.py @@ -828,7 +828,7 @@ def _test(): import pyspark.mllib.regression globs = pyspark.mllib.regression.__dict__.copy() spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("mllib.regression tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/stat/_statistics.py b/spark-setup/python/pyspark/mllib/stat/_statistics.py index 49b2644..c748a4b 100644 --- a/spark-setup/python/pyspark/mllib/stat/_statistics.py +++ b/spark-setup/python/pyspark/mllib/stat/_statistics.py @@ -306,7 +306,7 @@ def _test(): from pyspark.sql import SparkSession globs = globals().copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("mllib.stat.statistics tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/tree.py b/spark-setup/python/pyspark/mllib/tree.py index a6089fc..b6edded 100644 --- a/spark-setup/python/pyspark/mllib/tree.py +++ b/spark-setup/python/pyspark/mllib/tree.py @@ -647,7 +647,7 @@ def _test(): globs = globals().copy() from pyspark.sql import SparkSession spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("mllib.tree tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/mllib/util.py b/spark-setup/python/pyspark/mllib/util.py index 9775580..2d314e0 100644 --- a/spark-setup/python/pyspark/mllib/util.py +++ b/spark-setup/python/pyspark/mllib/util.py @@ -513,7 +513,7 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ - .master("local[2]")\ + .main("local[2]")\ .appName("mllib.util tests")\ .getOrCreate() globs['spark'] = spark diff --git a/spark-setup/python/pyspark/rdd.py b/spark-setup/python/pyspark/rdd.py index ccef30c..83f43d9 100644 --- a/spark-setup/python/pyspark/rdd.py +++ b/spark-setup/python/pyspark/rdd.py @@ -1555,7 +1555,7 @@ def func(split, iterator): def collectAsMap(self): """ - Return the key-value pairs in this RDD to the master as a dictionary. + Return the key-value pairs in this RDD to the main as a dictionary. .. note:: this method should only be used if the resulting data is expected to be small, as all the data is loaded into the driver's memory. @@ -1609,7 +1609,7 @@ def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): def reduceByKeyLocally(self, func): """ Merge the values for each key using an associative and commutative reduce function, but - return the results immediately to the master as a dictionary. + return the results immediately to the main as a dictionary. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. @@ -1634,7 +1634,7 @@ def mergeMaps(m1, m2): def countByKey(self): """ Count the number of elements for each key, and return the result to the - master as a dictionary. + main as a dictionary. >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.countByKey().items()) diff --git a/spark-setup/python/pyspark/sql/catalog.py b/spark-setup/python/pyspark/sql/catalog.py index 30c7a3f..f87c2df 100644 --- a/spark-setup/python/pyspark/sql/catalog.py +++ b/spark-setup/python/pyspark/sql/catalog.py @@ -282,7 +282,7 @@ def _test(): globs = pyspark.sql.catalog.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("sql.catalog tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/sql/column.py b/spark-setup/python/pyspark/sql/column.py index 8d5adc8..c7950d8 100644 --- a/spark-setup/python/pyspark/sql/column.py +++ b/spark-setup/python/pyspark/sql/column.py @@ -437,7 +437,7 @@ def _test(): import pyspark.sql.column globs = pyspark.sql.column.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("sql.column tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/sql/conf.py b/spark-setup/python/pyspark/sql/conf.py index 792c420..addde67 100644 --- a/spark-setup/python/pyspark/sql/conf.py +++ b/spark-setup/python/pyspark/sql/conf.py @@ -72,7 +72,7 @@ def _test(): globs = pyspark.sql.conf.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("sql.conf tests")\ .getOrCreate() globs['sc'] = spark.sparkContext diff --git a/spark-setup/python/pyspark/sql/functions.py b/spark-setup/python/pyspark/sql/functions.py index 7fe901a..fe89bee 100644 --- a/spark-setup/python/pyspark/sql/functions.py +++ b/spark-setup/python/pyspark/sql/functions.py @@ -1883,7 +1883,7 @@ def _test(): import pyspark.sql.functions globs = pyspark.sql.functions.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("sql.functions tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/sql/group.py b/spark-setup/python/pyspark/sql/group.py index f2092f9..66ee13e 100644 --- a/spark-setup/python/pyspark/sql/group.py +++ b/spark-setup/python/pyspark/sql/group.py @@ -201,7 +201,7 @@ def _test(): import pyspark.sql.group globs = pyspark.sql.group.__dict__.copy() spark = SparkSession.builder\ - .master("local[4]")\ + .main("local[4]")\ .appName("sql.group tests")\ .getOrCreate() sc = spark.sparkContext diff --git a/spark-setup/python/pyspark/sql/session.py b/spark-setup/python/pyspark/sql/session.py index 9f4772e..b03ad2e 100644 --- a/spark-setup/python/pyspark/sql/session.py +++ b/spark-setup/python/pyspark/sql/session.py @@ -67,7 +67,7 @@ class SparkSession(object): To create a SparkSession, use the following builder pattern: >>> spark = SparkSession.builder \\ - ... .master("local") \\ + ... .main("local") \\ ... .appName("Word Count") \\ ... .config("spark.some.config.option", "some-value") \\ ... .getOrCreate() @@ -109,14 +109,14 @@ def config(self, key=None, value=None, conf=None): return self @since(2.0) - def master(self, master): - """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" - to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone + def main(self, main): + """Sets the Spark main URL to connect to, such as "local" to run locally, "local[4]" + to run locally with 4 cores, or "spark://main:7077" to run on a Spark standalone cluster. - :param master: a url for spark master + :param main: a url for spark main """ - return self.config("spark.master", master) + return self.config("spark.main", main) @since(2.0) def appName(self, name): diff --git a/spark-setup/python/pyspark/sql/tests.py b/spark-setup/python/pyspark/sql/tests.py index 22b1ffc..6537c8d 100644 --- a/spark-setup/python/pyspark/sql/tests.py +++ b/spark-setup/python/pyspark/sql/tests.py @@ -1891,7 +1891,7 @@ def test_hivecontext(self): |print(hive_context.sql("show databases").collect()) """) proc = subprocess.Popen( - [self.sparkSubmit, "--master", "local-cluster[1,1,1024]", + [self.sparkSubmit, "--main", "local-cluster[1,1,1024]", "--driver-class-path", hive_site_dir, script], stdout=subprocess.PIPE) out, err = proc.communicate() diff --git a/spark-setup/python/pyspark/streaming/context.py b/spark-setup/python/pyspark/streaming/context.py index 17c34f8..a4e528b 100644 --- a/spark-setup/python/pyspark/streaming/context.py +++ b/spark-setup/python/pyspark/streaming/context.py @@ -247,7 +247,7 @@ def remember(self, duration): def checkpoint(self, directory): """ - Sets the context to periodically checkpoint the DStream operations for master + Sets the context to periodically checkpoint the DStream operations for main fault-tolerance. The graph will be checkpointed every batch interval. @param directory: HDFS-compatible directory where the checkpoint data diff --git a/spark-setup/python/pyspark/streaming/flume.py b/spark-setup/python/pyspark/streaming/flume.py index cd30483..5d4dd4a 100644 --- a/spark-setup/python/pyspark/streaming/flume.py +++ b/spark-setup/python/pyspark/streaming/flume.py @@ -47,8 +47,8 @@ def createStream(ssc, hostname, port, Create an input stream that pulls events from Flume. :param ssc: StreamingContext object - :param hostname: Hostname of the slave machine to which the flume data will be sent - :param port: Port of the slave machine to which the flume data will be sent + :param hostname: Hostname of the subordinate machine to which the flume data will be sent + :param port: Port of the subordinate machine to which the flume data will be sent :param storageLevel: Storage level to use for storing the received objects :param enableDecompression: Should netty server decompress input stream :param bodyDecoder: A function used to decode body (default is utf8_decoder) diff --git a/spark-setup/python/pyspark/tests.py b/spark-setup/python/pyspark/tests.py index 8d227ea..ea299db 100644 --- a/spark-setup/python/pyspark/tests.py +++ b/spark-setup/python/pyspark/tests.py @@ -1917,7 +1917,7 @@ def test_module_dependency_on_cluster(self): |def myfunc(x): | return x + 1 """) - proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master", + proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--main", "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE) out, err = proc.communicate() @@ -1951,7 +1951,7 @@ def test_package_dependency_on_cluster(self): """) self.create_spark_package("a:mylib:0.1") proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", - "file:" + self.programDir, "--master", + "file:" + self.programDir, "--main", "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) @@ -1971,7 +1971,7 @@ def test_single_script_on_cluster(self): # this will fail if you have different spark.executor.memory # in conf/spark-defaults.conf proc = subprocess.Popen( - [self.sparkSubmit, "--master", "local-cluster[1,1,1024]", script], + [self.sparkSubmit, "--main", "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) @@ -1991,7 +1991,7 @@ def test_user_configuration(self): | sc.stop() """) proc = subprocess.Popen( - [self.sparkSubmit, "--master", "local", script], + [self.sparkSubmit, "--main", "local", script], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out, err = proc.communicate() @@ -2002,7 +2002,7 @@ class ContextTests(unittest.TestCase): def test_failed_sparkcontext_creation(self): # Regression test for SPARK-1550 - self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name")) + self.assertRaises(Exception, lambda: SparkContext("an-invalid-main-name")) def test_get_or_create(self): with SparkContext.getOrCreate() as sc: