From d384ecdc0d5513667dbb86267368839b6974ded8 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Wed, 31 Jul 2024 10:55:20 -0500 Subject: [PATCH 1/7] Add sparkccfile.py to support file-wise processing in spark jobs (used in integrity job) --- sparkccfile.py | 311 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 311 insertions(+) create mode 100644 sparkccfile.py diff --git a/sparkccfile.py b/sparkccfile.py new file mode 100644 index 0000000..ca3b499 --- /dev/null +++ b/sparkccfile.py @@ -0,0 +1,311 @@ +import argparse +import json +import logging +import os +import re + +from io import BytesIO +from tempfile import NamedTemporaryFile + +import boto3 +import botocore +import requests + +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType + +LOGGING_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s' + +class CCFileProcessorSparkJob(object): + """ + A Spark job definition to process individual files from a manifest. + This is a simplified version of sparkcc, where we only do the individual file downloads to a local temp file, and then process each file. + (ie: it is much more generic, and should work in many more situations) + """ + + name = 'CCFileProcessor' + + output_schema = StructType([ + StructField("key", StringType(), True), + StructField("val", LongType(), True) + ]) + + # description of input and output shown by --help + input_descr = "Path to file listing input paths" + output_descr = "Name of output table (saved in spark.sql.warehouse.dir)" + + # parse HTTP headers of WARC records (derived classes may override this) + warc_parse_http_header = True + + args = None + records_processed = None + warc_input_processed = None + warc_input_failed = None + log_level = 'INFO' + logging.basicConfig(level=log_level, format=LOGGING_FORMAT) + + num_input_partitions = 400 + num_output_partitions = 10 + + # S3 client is thread-safe, cf. + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients) + s3client = None + + # pattern to split a data URL (:/// or :/) + data_url_pattern = re.compile('^(s3|https?|file|hdfs):(?://([^/]*))?/(.*)') + + + def parse_arguments(self): + """Returns the parsed arguments from the command line""" + + description = self.name + if self.__doc__ is not None: + description += " - " + description += self.__doc__ + arg_parser = argparse.ArgumentParser(prog=self.name, description=description, + conflict_handler='resolve') + + arg_parser.add_argument("input", help=self.input_descr) + arg_parser.add_argument("output", help=self.output_descr) + + arg_parser.add_argument("--input_base_url", + help="Base URL (prefix) used if paths to WARC/WAT/WET " + "files are relative paths. Used to select the " + "access method: s3://commoncrawl/ (authenticated " + "S3) or https://data.commoncrawl.org/ (HTTP)") + arg_parser.add_argument("--num_input_partitions", type=int, + default=self.num_input_partitions, + help="Number of input splits/partitions, " + "number of parallel tasks to process WARC " + "files/records") + arg_parser.add_argument("--num_output_partitions", type=int, + default=self.num_output_partitions, + help="Number of output partitions") + arg_parser.add_argument("--output_format", default="parquet", + help="Output format: parquet (default)," + " orc, json, csv") + arg_parser.add_argument("--output_compression", default="gzip", + help="Output compression codec: None," + " gzip/zlib (default), zstd, snappy, lzo, etc.") + arg_parser.add_argument("--output_option", action='append', default=[], + help="Additional output option pair" + " to set (format-specific) output options, e.g.," + " `header=true` to add a header line to CSV files." + " Option name and value are split at `=` and" + " multiple options can be set by passing" + " `--output_option =` multiple times") + + arg_parser.add_argument("--local_temp_dir", default=None, + help="Local temporary directory, used to" + " buffer content from S3") + + arg_parser.add_argument("--log_level", default=self.log_level, + help="Logging level") + arg_parser.add_argument("--spark-profiler", action='store_true', + help="Enable PySpark profiler and log" + " profiling metrics if job has finished," + " cf. spark.python.profile") + + self.add_arguments(arg_parser) + args = arg_parser.parse_args() + self.init_logging(args.log_level) + if not self.validate_arguments(args): + raise Exception("Arguments not valid") + + return args + + def add_arguments(self, parser): + """Allows derived classes to add command-line arguments. + Derived classes overriding this method must call + super().add_arguments(parser) in order to add "register" + arguments from all classes in the hierarchy.""" + pass + + def validate_arguments(self, args): + """Validate arguments. Derived classes overriding this method + must call super().validate_arguments(args).""" + if "orc" == args.output_format and "gzip" == args.output_compression: + # gzip for Parquet, zlib for ORC + args.output_compression = "zlib" + return True + + def get_output_options(self): + """Convert output options strings (opt=val) to kwargs""" + return {x[0]: x[1] for x in map(lambda x: x.split('=', 1), + self.args.output_option)} + + def init_logging(self, level=None, session=None): + if level: + self.log_level = level + else: + level = self.log_level + logging.basicConfig(level=level, format=LOGGING_FORMAT) + logging.getLogger(self.name).setLevel(level) + if session: + session.sparkContext.setLogLevel(level) + + def init_accumulators(self, session): + """Register and initialize counters (aka. accumulators). + Derived classes may use this method to add their own + accumulators but must call super().init_accumulators(session) + to also initialize counters from base classes.""" + sc = session.sparkContext + self.records_processed = sc.accumulator(0) + self.warc_input_processed = sc.accumulator(0) + self.warc_input_failed = sc.accumulator(0) + + def get_logger(self, session=None): + """Get logger from SparkSession or (if None) from logging module""" + if not session: + try: + session = SparkSession.getActiveSession() + except AttributeError: + pass # method available since Spark 3.0.0 + if session: + return session._jvm.org.apache.log4j.LogManager \ + .getLogger(self.name) + return logging.getLogger(self.name) + + def run(self): + """Run the job""" + self.args = self.parse_arguments() + + builder = SparkSession.builder.appName(self.name) + + if self.args.spark_profiler: + builder.config("spark.python.profile", "true") + + session = builder.getOrCreate() + + self.init_logging(self.args.log_level, session) + self.init_accumulators(session) + + self.run_job(session) + + if self.args.spark_profiler: + session.sparkContext.show_profiles() + + session.stop() + + def log_accumulator(self, session, acc, descr): + """Log single counter/accumulator""" + self.get_logger(session).info(descr.format(acc.value)) + + def log_accumulators(self, session): + """Log counters/accumulators, see `init_accumulators`.""" + self.log_accumulator(session, self.warc_input_processed, + 'WARC/WAT/WET input files processed = {}') + self.log_accumulator(session, self.warc_input_failed, + 'WARC/WAT/WET input files failed = {}') + self.log_accumulator(session, self.records_processed, + 'WARC/WAT/WET records processed = {}') + + @staticmethod + def reduce_by_key_func(a, b): + return a + b + + def run_job(self, session): + input_data = session.sparkContext.textFile(self.args.input, + minPartitions=self.args.num_input_partitions) + + output = input_data.mapPartitionsWithIndex(self.process_files) \ + .reduceByKey(self.reduce_by_key_func) + + session.createDataFrame(output, schema=self.output_schema) \ + .coalesce(self.args.num_output_partitions) \ + .write \ + .format(self.args.output_format) \ + .option("compression", self.args.output_compression) \ + .options(**self.get_output_options()) \ + .saveAsTable(self.args.output) + + self.log_accumulators(session) + + def get_s3_client(self): + if not self.s3client: + self.s3client = boto3.client('s3', use_ssl=False) + return self.s3client + + def fetch_file(self, uri, base_uri=None): + """Fetch file""" + + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + + warctemp = None + + if scheme == 's3': + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Reading from S3 {}'.format(uri)) + # download entire file using a temporary file for buffering + warctemp = NamedTemporaryFile(mode='w+b', dir=self.args.local_temp_dir) + try: + self.get_s3_client().download_fileobj(bucketname, path, warctemp) + warctemp.flush() + warctemp.seek(0) + except botocore.client.ClientError as exception: + self.get_logger().error( + 'Failed to download {}: {}'.format(uri, exception)) + self.warc_input_failed.add(1) + warctemp.close() + + elif scheme == 'http' or scheme == 'https': + headers = None + self.get_logger().info('Fetching {}'.format(uri)) + response = requests.get(uri, headers=headers) + + if response.ok: + # includes "HTTP 206 Partial Content" for range requests + warctemp = NamedTemporaryFile( mode='w+b', + dir=self.args.local_temp_dir) + warctemp.write(response.content) + warctemp.flush() + warctemp.seek(0) + else: + self.get_logger().error( + 'Failed to download {}: {}'.format(uri, response.status_code)) + + else: + self.get_logger().info('Reading local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + warctemp = open(uri, 'rb') + + return warctemp + + def process_files(self, _id, iterator): + """Process files, calling process_file(...) for each file""" + for uri in iterator: + self.warc_input_processed.add(1) + + tempfd = self.fetch_file(uri, self.args.input_base_url) + if not tempfd: + continue + + for res in self.process_file(uri, tempfd): + yield res + + tempfd.close() \ No newline at end of file From 26090f9b363ff41dd50122dda419d168b40ebee8 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Fri, 2 Aug 2024 21:34:53 -0500 Subject: [PATCH 2/7] Merge CCFileProcessorSparkJob into sparkcc.py --- sparkcc.py | 119 ++++++++++++++++++- sparkccfile.py | 311 ------------------------------------------------- 2 files changed, 118 insertions(+), 312 deletions(-) delete mode 100644 sparkccfile.py diff --git a/sparkcc.py b/sparkcc.py index 6a99ec9..d027d25 100644 --- a/sparkcc.py +++ b/sparkcc.py @@ -5,7 +5,7 @@ import re from io import BytesIO -from tempfile import SpooledTemporaryFile, TemporaryFile +from tempfile import SpooledTemporaryFile, TemporaryFile, NamedTemporaryFile import boto3 import botocore @@ -621,3 +621,120 @@ def run_job(self, session): .saveAsTable(self.args.output) self.log_accumulators(session) + + +class CCFileProcessorSparkJob(CCSparkJob): + """ + A Spark job definition to process entire files from a manifest. + (as opposed to processing individual WARC records) + """ + + name = 'CCFileProcessor' + + def run_job(self, session): + input_data = session.sparkContext.textFile(self.args.input, + minPartitions=self.args.num_input_partitions) + + output = input_data.mapPartitionsWithIndex(self.process_files) \ + .reduceByKey(self.reduce_by_key_func) + + session.createDataFrame(output, schema=self.output_schema) \ + .coalesce(self.args.num_output_partitions) \ + .write \ + .format(self.args.output_format) \ + .option("compression", self.args.output_compression) \ + .options(**self.get_output_options()) \ + .saveAsTable(self.args.output) + + self.log_accumulators(session) + + def fetch_file(self, uri, base_uri=None): + """ + Fetch file. This is a modified version of fetch_warc: + It does not currently support hdfs, but that could be added if needed. + Use NamedTemporaryFile so we can support external tools that require a file path. + """ + + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + + warctemp = None + + if scheme == 's3': + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Reading from S3 {}'.format(uri)) + # download entire file using a temporary file for buffering + warctemp = NamedTemporaryFile(mode='w+b', dir=self.args.local_temp_dir) + try: + self.get_s3_client().download_fileobj(bucketname, path, warctemp) + warctemp.flush() + warctemp.seek(0) + except botocore.client.ClientError as exception: + self.get_logger().error( + 'Failed to download {}: {}'.format(uri, exception)) + self.warc_input_failed.add(1) + warctemp.close() + + elif scheme == 'http' or scheme == 'https': + headers = None + self.get_logger().info('Fetching {}'.format(uri)) + response = requests.get(uri, headers=headers) + + if response.ok: + # includes "HTTP 206 Partial Content" for range requests + warctemp = NamedTemporaryFile( mode='w+b', + dir=self.args.local_temp_dir) + warctemp.write(response.content) + warctemp.flush() + warctemp.seek(0) + else: + self.get_logger().error( + 'Failed to download {}: {}'.format(uri, response.status_code)) + else: + self.get_logger().info('Reading local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + warctemp = open(uri, 'rb') + + return warctemp + + def process_files(self, _id, iterator): + """Process files, calling process_file(...) for each file""" + for uri in iterator: + self.warc_input_processed.add(1) + + tempfd = self.fetch_file(uri, self.args.input_base_url) + if not tempfd: + continue + + for res in self.process_file(uri, tempfd): + yield res + + tempfd.close() + + def process_file(self, uri, tempfd): + """Process a single file""" + raise NotImplementedError('Processing file needs to be customized') \ No newline at end of file diff --git a/sparkccfile.py b/sparkccfile.py deleted file mode 100644 index ca3b499..0000000 --- a/sparkccfile.py +++ /dev/null @@ -1,311 +0,0 @@ -import argparse -import json -import logging -import os -import re - -from io import BytesIO -from tempfile import NamedTemporaryFile - -import boto3 -import botocore -import requests - -from pyspark.sql import SparkSession -from pyspark.sql.types import StructType, StructField, StringType, LongType - -LOGGING_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s' - -class CCFileProcessorSparkJob(object): - """ - A Spark job definition to process individual files from a manifest. - This is a simplified version of sparkcc, where we only do the individual file downloads to a local temp file, and then process each file. - (ie: it is much more generic, and should work in many more situations) - """ - - name = 'CCFileProcessor' - - output_schema = StructType([ - StructField("key", StringType(), True), - StructField("val", LongType(), True) - ]) - - # description of input and output shown by --help - input_descr = "Path to file listing input paths" - output_descr = "Name of output table (saved in spark.sql.warehouse.dir)" - - # parse HTTP headers of WARC records (derived classes may override this) - warc_parse_http_header = True - - args = None - records_processed = None - warc_input_processed = None - warc_input_failed = None - log_level = 'INFO' - logging.basicConfig(level=log_level, format=LOGGING_FORMAT) - - num_input_partitions = 400 - num_output_partitions = 10 - - # S3 client is thread-safe, cf. - # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients) - s3client = None - - # pattern to split a data URL (:/// or :/) - data_url_pattern = re.compile('^(s3|https?|file|hdfs):(?://([^/]*))?/(.*)') - - - def parse_arguments(self): - """Returns the parsed arguments from the command line""" - - description = self.name - if self.__doc__ is not None: - description += " - " - description += self.__doc__ - arg_parser = argparse.ArgumentParser(prog=self.name, description=description, - conflict_handler='resolve') - - arg_parser.add_argument("input", help=self.input_descr) - arg_parser.add_argument("output", help=self.output_descr) - - arg_parser.add_argument("--input_base_url", - help="Base URL (prefix) used if paths to WARC/WAT/WET " - "files are relative paths. Used to select the " - "access method: s3://commoncrawl/ (authenticated " - "S3) or https://data.commoncrawl.org/ (HTTP)") - arg_parser.add_argument("--num_input_partitions", type=int, - default=self.num_input_partitions, - help="Number of input splits/partitions, " - "number of parallel tasks to process WARC " - "files/records") - arg_parser.add_argument("--num_output_partitions", type=int, - default=self.num_output_partitions, - help="Number of output partitions") - arg_parser.add_argument("--output_format", default="parquet", - help="Output format: parquet (default)," - " orc, json, csv") - arg_parser.add_argument("--output_compression", default="gzip", - help="Output compression codec: None," - " gzip/zlib (default), zstd, snappy, lzo, etc.") - arg_parser.add_argument("--output_option", action='append', default=[], - help="Additional output option pair" - " to set (format-specific) output options, e.g.," - " `header=true` to add a header line to CSV files." - " Option name and value are split at `=` and" - " multiple options can be set by passing" - " `--output_option =` multiple times") - - arg_parser.add_argument("--local_temp_dir", default=None, - help="Local temporary directory, used to" - " buffer content from S3") - - arg_parser.add_argument("--log_level", default=self.log_level, - help="Logging level") - arg_parser.add_argument("--spark-profiler", action='store_true', - help="Enable PySpark profiler and log" - " profiling metrics if job has finished," - " cf. spark.python.profile") - - self.add_arguments(arg_parser) - args = arg_parser.parse_args() - self.init_logging(args.log_level) - if not self.validate_arguments(args): - raise Exception("Arguments not valid") - - return args - - def add_arguments(self, parser): - """Allows derived classes to add command-line arguments. - Derived classes overriding this method must call - super().add_arguments(parser) in order to add "register" - arguments from all classes in the hierarchy.""" - pass - - def validate_arguments(self, args): - """Validate arguments. Derived classes overriding this method - must call super().validate_arguments(args).""" - if "orc" == args.output_format and "gzip" == args.output_compression: - # gzip for Parquet, zlib for ORC - args.output_compression = "zlib" - return True - - def get_output_options(self): - """Convert output options strings (opt=val) to kwargs""" - return {x[0]: x[1] for x in map(lambda x: x.split('=', 1), - self.args.output_option)} - - def init_logging(self, level=None, session=None): - if level: - self.log_level = level - else: - level = self.log_level - logging.basicConfig(level=level, format=LOGGING_FORMAT) - logging.getLogger(self.name).setLevel(level) - if session: - session.sparkContext.setLogLevel(level) - - def init_accumulators(self, session): - """Register and initialize counters (aka. accumulators). - Derived classes may use this method to add their own - accumulators but must call super().init_accumulators(session) - to also initialize counters from base classes.""" - sc = session.sparkContext - self.records_processed = sc.accumulator(0) - self.warc_input_processed = sc.accumulator(0) - self.warc_input_failed = sc.accumulator(0) - - def get_logger(self, session=None): - """Get logger from SparkSession or (if None) from logging module""" - if not session: - try: - session = SparkSession.getActiveSession() - except AttributeError: - pass # method available since Spark 3.0.0 - if session: - return session._jvm.org.apache.log4j.LogManager \ - .getLogger(self.name) - return logging.getLogger(self.name) - - def run(self): - """Run the job""" - self.args = self.parse_arguments() - - builder = SparkSession.builder.appName(self.name) - - if self.args.spark_profiler: - builder.config("spark.python.profile", "true") - - session = builder.getOrCreate() - - self.init_logging(self.args.log_level, session) - self.init_accumulators(session) - - self.run_job(session) - - if self.args.spark_profiler: - session.sparkContext.show_profiles() - - session.stop() - - def log_accumulator(self, session, acc, descr): - """Log single counter/accumulator""" - self.get_logger(session).info(descr.format(acc.value)) - - def log_accumulators(self, session): - """Log counters/accumulators, see `init_accumulators`.""" - self.log_accumulator(session, self.warc_input_processed, - 'WARC/WAT/WET input files processed = {}') - self.log_accumulator(session, self.warc_input_failed, - 'WARC/WAT/WET input files failed = {}') - self.log_accumulator(session, self.records_processed, - 'WARC/WAT/WET records processed = {}') - - @staticmethod - def reduce_by_key_func(a, b): - return a + b - - def run_job(self, session): - input_data = session.sparkContext.textFile(self.args.input, - minPartitions=self.args.num_input_partitions) - - output = input_data.mapPartitionsWithIndex(self.process_files) \ - .reduceByKey(self.reduce_by_key_func) - - session.createDataFrame(output, schema=self.output_schema) \ - .coalesce(self.args.num_output_partitions) \ - .write \ - .format(self.args.output_format) \ - .option("compression", self.args.output_compression) \ - .options(**self.get_output_options()) \ - .saveAsTable(self.args.output) - - self.log_accumulators(session) - - def get_s3_client(self): - if not self.s3client: - self.s3client = boto3.client('s3', use_ssl=False) - return self.s3client - - def fetch_file(self, uri, base_uri=None): - """Fetch file""" - - (scheme, netloc, path) = (None, None, None) - uri_match = self.data_url_pattern.match(uri) - if not uri_match and base_uri: - # relative input URI (path) and base URI defined - uri = base_uri + uri - uri_match = self.data_url_pattern.match(uri) - - if uri_match: - (scheme, netloc, path) = uri_match.groups() - else: - # keep local file paths as is - path = uri - - warctemp = None - - if scheme == 's3': - bucketname = netloc - if not bucketname: - self.get_logger().error("Invalid S3 URI: " + uri) - return - if not path: - self.get_logger().error("Empty S3 path: " + uri) - return - elif path[0] == '/': - # must strip leading / in S3 path - path = path[1:] - self.get_logger().info('Reading from S3 {}'.format(uri)) - # download entire file using a temporary file for buffering - warctemp = NamedTemporaryFile(mode='w+b', dir=self.args.local_temp_dir) - try: - self.get_s3_client().download_fileobj(bucketname, path, warctemp) - warctemp.flush() - warctemp.seek(0) - except botocore.client.ClientError as exception: - self.get_logger().error( - 'Failed to download {}: {}'.format(uri, exception)) - self.warc_input_failed.add(1) - warctemp.close() - - elif scheme == 'http' or scheme == 'https': - headers = None - self.get_logger().info('Fetching {}'.format(uri)) - response = requests.get(uri, headers=headers) - - if response.ok: - # includes "HTTP 206 Partial Content" for range requests - warctemp = NamedTemporaryFile( mode='w+b', - dir=self.args.local_temp_dir) - warctemp.write(response.content) - warctemp.flush() - warctemp.seek(0) - else: - self.get_logger().error( - 'Failed to download {}: {}'.format(uri, response.status_code)) - - else: - self.get_logger().info('Reading local file {}'.format(uri)) - if scheme == 'file': - # must be an absolute path - uri = os.path.join('/', path) - else: - base_dir = os.path.abspath(os.path.dirname(__file__)) - uri = os.path.join(base_dir, uri) - warctemp = open(uri, 'rb') - - return warctemp - - def process_files(self, _id, iterator): - """Process files, calling process_file(...) for each file""" - for uri in iterator: - self.warc_input_processed.add(1) - - tempfd = self.fetch_file(uri, self.args.input_base_url) - if not tempfd: - continue - - for res in self.process_file(uri, tempfd): - yield res - - tempfd.close() \ No newline at end of file From ee644a58880ed7c3ae557d110f7865029d485227 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Fri, 2 Aug 2024 21:35:23 -0500 Subject: [PATCH 3/7] Add CCFileProcessorSparkJob example and link in readme for it. --- README.md | 2 ++ md5sum.py | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 md5sum.py diff --git a/README.md b/README.md index 9a4c110..0d71fc8 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ This project provides examples how to process the Common Crawl dataset with [Apa + [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files) ++ [md5](./md5sum.py) Run an external command on entire files from a manifest (WARC, WET, WAT, or any other type of file.) + + [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph) + work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)): diff --git a/md5sum.py b/md5sum.py new file mode 100644 index 0000000..88fe15e --- /dev/null +++ b/md5sum.py @@ -0,0 +1,23 @@ +import subprocess +import os +from sparkcc import CCFileProcessorSparkJob +from pyspark.sql.types import StructType, StructField, StringType + +class MD5Sum(CCFileProcessorSparkJob): + """ MD5 sum of each file""" + + name = "MD5Sum" + + output_schema = StructType([ + StructField("uri", StringType(), True), + StructField("md5", StringType(), True), + ]) + + def process_file(self, uri, tempfd): + proc = subprocess.run(['md5sum', tempfd.name], capture_output=True, check=True, encoding='utf8') + digest = proc.stdout.rstrip().split()[0] + yield uri, digest + +if __name__ == '__main__': + job = MD5Sum() + job.run() From 987709ce64266fc233c567f093d8f93342390f51 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Mon, 9 Sep 2024 19:46:09 -0500 Subject: [PATCH 4/7] fix s3 functions so they work in spark environment --- sparkcc.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/sparkcc.py b/sparkcc.py index d027d25..5761ce4 100644 --- a/sparkcc.py +++ b/sparkcc.py @@ -631,6 +631,12 @@ class CCFileProcessorSparkJob(CCSparkJob): name = 'CCFileProcessor' + def add_arguments(self, parser): + super(CCIndexWarcSparkJob, self).add_arguments(parser) + parser.add_argument("--output_base_uri", required=False, + default='./output', + help="Base URI to write output files to. Useful if your job uses write_output_file or check_for_output_file.") + def run_job(self, session): input_data = session.sparkContext.textFile(self.args.input, minPartitions=self.args.num_input_partitions) @@ -737,4 +743,132 @@ def process_files(self, _id, iterator): def process_file(self, uri, tempfd): """Process a single file""" - raise NotImplementedError('Processing file needs to be customized') \ No newline at end of file + raise NotImplementedError('Processing file needs to be customized') + + # See if we can get to the bucket referred to in the uri. + # note: if uri is not s3, this will return True + def validate_s3_bucket_from_uri(self, uri): + """ + Validate that the bucket exists in the S3 URI + """ + if uri is None or len(uri) == 0: + return True + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + if scheme == 's3': + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return False + try: + self.get_s3_client().head_bucket(Bucket=bucketname) + except botocore.exceptions.ClientError as e: + self.get_logger().error("Failed to access bucket: " + bucketname) + return False + return True + return True + + + # Like fetch_warc, where we will check if a local file, file on s3, etc exists or not. + def check_for_output_file(self, uri, base_uri=None): + """ + Check if output file exists. This is a modified version of fetch_warc: + It does not currently support hdfs, but that could be added if needed. + """ + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + if scheme == 's3': + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Checking if file exists on S3 {}'.format(uri)) + try: + self.get_s3_client().head_object(Bucket=bucketname, Key=path) + return True + except botocore.client.ClientError as exception: + self.get_logger().error( + 'Failed to check if file exists on S3 {}: {}'.format(uri, exception)) + return False + elif scheme == 'http' or scheme == 'https': + headers = None + self.get_logger().info('Checking if file exists {}'.format(uri)) + response = requests.head(uri, headers=headers) + if response.ok: + return True + else: + self.get_logger().error( + 'Failed to check if file exists {}: {}'.format(uri, response.status_code)) + return False + else: + self.get_logger().info('Checking if local file exists {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + return os.path.exists(uri) + + # like fetch_warc, but will write a file to local, s3, or hdfs + def write_output_file(self, uri, fd, base_uri=None): + """ + Write file. This is a modified version of fetch_warc: + It does not currently support hdfs, but that could be added if needed. + """ + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + if scheme == 's3': + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Writing to S3 {}'.format(uri)) + try: + self.get_s3_client().upload_fileobj(fd, bucketname, path) + except botocore.client.ClientError as exception: + self.get_logger().error( + 'Failed to write to S3 {}: {}'.format(uri, exception)) + else: + self.get_logger().info('Writing local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + os.makedirs(os.path.dirname(uri), exist_ok=True) + with open(uri, 'wb') as f: + f.write(fd.read()) \ No newline at end of file From cd64b143f363878119d9ec26e7a46b9e13097f25 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Tue, 10 Sep 2024 21:21:26 -0500 Subject: [PATCH 5/7] fix bug when the file was not downloaded from s3 --- sparkcc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sparkcc.py b/sparkcc.py index 5761ce4..c07241d 100644 --- a/sparkcc.py +++ b/sparkcc.py @@ -699,7 +699,7 @@ def fetch_file(self, uri, base_uri=None): 'Failed to download {}: {}'.format(uri, exception)) self.warc_input_failed.add(1) warctemp.close() - + return elif scheme == 'http' or scheme == 'https': headers = None self.get_logger().info('Fetching {}'.format(uri)) @@ -715,6 +715,7 @@ def fetch_file(self, uri, base_uri=None): else: self.get_logger().error( 'Failed to download {}: {}'.format(uri, response.status_code)) + return else: self.get_logger().info('Reading local file {}'.format(uri)) if scheme == 'file': From 7b209dffd175eb5a052628bae9557f79f852bc6f Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Wed, 27 Nov 2024 10:10:06 -0600 Subject: [PATCH 6/7] fix: fix scheme to recognize s3a and s3n --- sparkcc.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sparkcc.py b/sparkcc.py index c07241d..c0a22d0 100644 --- a/sparkcc.py +++ b/sparkcc.py @@ -56,7 +56,7 @@ class CCSparkJob(object): s3client = None # pattern to split a data URL (:/// or :/) - data_url_pattern = re.compile('^(s3|https?|file|hdfs):(?://([^/]*))?/(.*)') + data_url_pattern = re.compile('^(s3|https?|file|hdfs|s3a|s3n):(?://([^/]*))?/(.*)') def parse_arguments(self): @@ -248,7 +248,7 @@ def fetch_warc(self, uri, base_uri=None, offset=-1, length=-1): stream = None - if scheme == 's3': + if scheme in ['s3', 's3a', 's3n']: bucketname = netloc if not bucketname: self.get_logger().error("Invalid S3 URI: " + uri) @@ -632,7 +632,7 @@ class CCFileProcessorSparkJob(CCSparkJob): name = 'CCFileProcessor' def add_arguments(self, parser): - super(CCIndexWarcSparkJob, self).add_arguments(parser) + super(CCSparkJob, self).add_arguments(parser) parser.add_argument("--output_base_uri", required=False, default='./output', help="Base URI to write output files to. Useful if your job uses write_output_file or check_for_output_file.") @@ -676,7 +676,7 @@ def fetch_file(self, uri, base_uri=None): warctemp = None - if scheme == 's3': + if scheme in ['s3', 's3a', 's3n']: bucketname = netloc if not bucketname: self.get_logger().error("Invalid S3 URI: " + uri) @@ -758,7 +758,7 @@ def validate_s3_bucket_from_uri(self, uri): uri_match = self.data_url_pattern.match(uri) if uri_match: (scheme, netloc, path) = uri_match.groups() - if scheme == 's3': + if scheme in ['s3', 's3a', 's3n']: bucketname = netloc if not bucketname: self.get_logger().error("Invalid S3 URI: " + uri) @@ -789,7 +789,7 @@ def check_for_output_file(self, uri, base_uri=None): else: # keep local file paths as is path = uri - if scheme == 's3': + if scheme in ['s3', 's3a', 's3n']: bucketname = netloc if not bucketname: self.get_logger().error("Invalid S3 URI: " + uri) @@ -845,7 +845,7 @@ def write_output_file(self, uri, fd, base_uri=None): else: # keep local file paths as is path = uri - if scheme == 's3': + if scheme in ['s3', 's3a', 's3n']: bucketname = netloc if not bucketname: self.get_logger().error("Invalid S3 URI: " + uri) From 8aeca80c69c0274bd017146c97ca09a19ee88ed1 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Wed, 27 Nov 2024 10:10:54 -0600 Subject: [PATCH 7/7] fix: don't log 404 errors when checking for existence --- sparkcc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sparkcc.py b/sparkcc.py index c0a22d0..344c7a6 100644 --- a/sparkcc.py +++ b/sparkcc.py @@ -805,6 +805,8 @@ def check_for_output_file(self, uri, base_uri=None): self.get_s3_client().head_object(Bucket=bucketname, Key=path) return True except botocore.client.ClientError as exception: + if exception.response['Error']['Code'] == '404': + return False self.get_logger().error( 'Failed to check if file exists on S3 {}: {}'.format(uri, exception)) return False