From 36908383fa8192af97a4e20e7378c48d555207fd Mon Sep 17 00:00:00 2001 From: Steffan Date: Sun, 4 Apr 2021 00:25:43 -0700 Subject: [PATCH] LCS-102: Adds multithreaded approach to add all types of LC problems --- utility/lcAPIparser.py | 304 ++++++++++++++++++++++------------------- 1 file changed, 165 insertions(+), 139 deletions(-) diff --git a/utility/lcAPIparser.py b/utility/lcAPIparser.py index 72282a7..efbaa03 100644 --- a/utility/lcAPIparser.py +++ b/utility/lcAPIparser.py @@ -1,5 +1,5 @@ ''' -Last Run: 12/22/2020 +Last Run: 04/04/2021 Last Updated: 12/07/2020 This takes the results of the LeetCode api page and transforms it into the data we want @@ -12,6 +12,7 @@ import re # For matching for last run/updated import argparse # For getting command-line args from datetime import date # For getting the current date +from concurrent import futures class helper: def __init__(self): @@ -19,6 +20,11 @@ def __init__(self): cur_dir_path = os.path.normpath(os.path.dirname(os.path.abspath(__file__))) env_path = os.path.normpath(os.path.join(cur_dir_path, os.path.normpath('../.env'))) load_dotenv(dotenv_path=env_path) + self.existing_problem_ids = set() + self.post_url = '' + self.token = '' + self.problem_get_url = '' + self.problem_post_url = '' ''' Updates the header of this file's last run and last updated @@ -75,157 +81,177 @@ def convert_block(self, info_block): } return convert - ''' - Parse the results from the LeetCode API, which we either - gather from a GET request or we can parse a provided file. - POST the results via our running server instance into our DB. - ''' - def parse(self, file_location: str = None, test: bool =False): - lc_url = 'https://leetcode.com/api/problems/algorithms/' - question_info = None - if not file_location: - print('No file passed, grabbing info directly') - lc_res = get(url=lc_url).json() - question_info = None - try: - question_info = lc_res.get('stat_status_pairs', None) - except Exception: - print('Could not retrieve question info from LC') - self.update_header_dates(True) - raise - exit(1) + def get_login_info(self): + # Define our server URL endpoints here: + base_server_url = os.getenv('SERVER_BASE_URL') + if test: + print('Connecting to test server port.') + server_port = os.getenv('TEST_SERVER_PORT') else: - with open(file_location, 'r') as file: - asJson = json.load(file) - question_info = asJson.get('stat_status_pairs', None) - if question_info: - # Define our server URL endpoints here: - base_server_url = os.getenv('SERVER_BASE_URL') - if test: - print('Connecting to test server port.') - server_port = os.getenv('TEST_SERVER_PORT') - else: - server_port = os.getenv('SERVER_PORT') - server_url = base_server_url + ':' + server_port - - problem_post_url = server_url + '/api/problems/bulk' - problem_get_url = server_url + '/api/problems' - login_url = server_url + '/api/auth' - register_url = server_url+ '/api/users' - - # Get a login token for the admin user - admin_email = os.getenv('ADMIN_EMAIL') - admin_pass = os.getenv('ADMIN_PASS') - admin_name = os.getenv('ADMIN_NAME') - - body = json.dumps({ - 'name': admin_name, - 'email': admin_email, - 'password': admin_pass - }) - headers = { - 'Content-Type': 'application/json' - } - - # Try to register the user first, in case of first time setup - register_res = None - token = None - try: - register_res = post(url=register_url, headers=headers, data=body, verify=False).json() - if register_res.get('token', False): - token = register_res.get('token') - except Exception: - print('Unable to register user.') - print(register_url) - print(headers) - # print(body) - self.update_header_dates(True) - raise - exit(1) + server_port = os.getenv('SERVER_PORT') + server_url = base_server_url + ':' + server_port - # Get auth user token if register didn't provide it - if not token: - login_res = None - try: - login_res = post(url=login_url, headers=headers, data=body, verify=False).json() - except Exception: - print('Unable to login to server.') - print(login_url) - print(headers) - # print(body) - self.update_header_dates(True) - raise - exit(1) - if not login_res.get('token', None): - print('No token receieved. Wrong credentials?') - self.update_header_dates(True) - exit(1) - token = login_res['token'] + self.problem_post_url = server_url + '/api/problems/bulk' + self.problem_get_url = server_url + '/api/problems' + login_url = server_url + '/api/auth' + register_url = server_url+ '/api/users' + # Get a login token for the admin user + admin_email = os.getenv('ADMIN_EMAIL') + admin_pass = os.getenv('ADMIN_PASS') + admin_name = os.getenv('ADMIN_NAME') - # Setup headers for POSTing a new problem and GETing our cur problems - headers = { - 'x-auth-token': token, - 'Content-Type': 'application/json' - } + body = json.dumps({ + 'name': admin_name, + 'email': admin_email, + 'password': admin_pass + }) + headers = { + 'Content-Type': 'application/json' + } - # Get all problems currently in the database - all_probs_res = None - try: - all_probs_res = get(url=problem_get_url, headers=headers, verify=False).json() - except Exception: - print('Couldn\'t get all problems from the server.') - print(problem_get_url) - print(headers) - self.update_header_dates(True) - raise - exit(1) - # All we really care about is the 'id' values for the problems - existing_problem_ids = set() - for prob in all_probs_res: - lc_id = prob.get('id', None) - existing_problem_ids.add(lc_id) - - blocks = [] - # print(question_info[:100]) - # Question info contains an array of blocks of information for each question - for info in question_info: - # Convert block to info we need if it's not already in our DB - cur_id = info.get('stat', {}).get('question_id', None) - if cur_id and cur_id not in existing_problem_ids: - # Convert and add to blocks to POST - converted_block = self.convert_block(info) - if not converted_block: - print('Failed to convert info') - else: - blocks.append(converted_block) - blocks_as_dict = {"problems" : blocks} - # Convert to JSON so we can POST it to the DB - blocks_as_json = json.dumps(blocks_as_dict) - # POST to server - prob_add_res = None + # Try to register the user first, in case of first time setup + register_res = None + self.token = None + try: + register_res = post(url=register_url, headers=headers, data=body, verify=False).json() + if register_res.get('token', False): + self.token = register_res.get('token') + except Exception: + print('Unable to register user.') + print(register_url) + print(headers) + # print(body) + raise + exit(1) + + # Get auth user token if register didn't provide it + if not self.token: + login_res = None try: - prob_add_res = post(url=problem_post_url, headers=headers, data=blocks_as_json, verify=False).json() + login_res = post(url=login_url, headers=headers, data=body, verify=False).json() except Exception: - print('Unable to POST problems to server.') - print(problem_post_url) + print('Unable to login to server.') + print(login_url) print(headers) - self.update_header_dates(True) + # print(body) raise exit(1) - print(prob_add_res) - if prob_add_res.get('errors', None): - # Some kind of error. - print('Errors when adding problems in bulk.') - print(str(prob_add_res['errors'])) - self.update_header_dates(True) + if not login_res.get('token', None): + print('No token receieved. Wrong credentials?') exit(1) + self.token = login_res['token'] + + def get_cur_problems(self): + # Setup headers for POSTing a new problem and GETing our cur problems + headers = { + 'x-auth-token': self.token, + 'Content-Type': 'application/json' + } + + # Get all problems currently in the database + all_probs_res = None + try: + all_probs_res = get(url=self.problem_get_url, headers=headers, verify=False).json() + except Exception: + print('Couldn\'t get all problems from the server.') + print(self.problem_get_url) + print(headers) + raise + exit(1) + # All we really care about is the 'id' values for the problems + self.existing_problem_ids = set() + for prob in all_probs_res: + lc_id = prob.get('id', None) + self.existing_problem_ids.add(lc_id) + + def get_info_from_url(self, lc_url_base: str, lc_url_type: str): + lc_res = get(url=lc_url_base + lc_url_type).json() + question_info = None + try: + question_info = lc_res.get('stat_status_pairs', None) + except Exception: + print('Could not retrieve question info from LC') + raise + exit(1) + self.add_new_problems(question_info, lc_url_type) + + def add_new_problems(self, question_info, url_provider: str): + blocks = [] + headers = { + 'x-auth-token': self.token, + 'Content-Type': 'application/json' + } + # print(question_info[:100]) + # Question info contains an array of blocks of information for each question + for info in question_info: + # Convert block to info we need if it's not already in our DB + cur_id = info.get('stat', {}).get('question_id', None) + if cur_id and cur_id not in self.existing_problem_ids: + # Convert and add to blocks to POST + converted_block = self.convert_block(info) + if not converted_block: + print('Failed to convert info') + else: + blocks.append(converted_block) + blocks_as_dict = {"problems" : blocks} + # Convert to JSON so we can POST it to the DB + blocks_as_json = json.dumps(blocks_as_dict) + # POST to server + prob_add_res = None + try: + prob_add_res = post(url=self.problem_post_url, headers=headers, data=blocks_as_json, verify=False).json() + except Exception: + print('Unable to POST problems to server.') + print(self.problem_post_url) + print(headers) + raise + exit(1) + print(f'Results for {url_provider} are:') + print(prob_add_res) + if prob_add_res.get('errors', None): + # Some kind of error. + print('Errors when adding problems in bulk.') + print(str(prob_add_res['errors'])) + exit(1) + + ''' + Parse the results from the LeetCode API, which we either + gather from a GET request or we can parse a provided file. + POST the results via our running server instance into our DB. + ''' + def parse(self, file_location: str = None, test: bool =False): + question_info = None + self.get_login_info() + self.get_cur_problems() + if not file_location: + lc_url_root = 'https://leetcode.com/api/problems/' + lc_problem_types = ['algorithms', 'shell', 'database', 'concurrency'] + print('No file passed, grabbing info directly from LC api resources') + future = {} + update_header_as_fail = False + with futures.ThreadPoolExecutor() as exec: + for prob_type in lc_problem_types: + future[exec.submit(self.get_info_from_url, lc_url_root, prob_type)] = prob_type + for f in futures.as_completed(future.keys()): + try: + f.result() + except Exception as exc: + print('%r generated an exception: %s' % (future[f], exc)) + update_header_as_fail = True + futures.wait(future) # Update the last run/last tested dates - self.update_header_dates() + self.update_header_dates(update_header_as_fail) else: - print('No question info could be gathered.') - # Update the last run, but not the last tested date - self.update_header_dates(True) + with open(file_location, 'r') as file: + asJson = json.load(file) + question_info = asJson.get('stat_status_pairs', None) + if question_info: + self.add_new_problems(question_info, 'custom file') + else: + print('No question info could be gathered based on custom file.') + # Update the last run, but not the last tested date + self.update_header_dates(True) if __name__ == '__main__': # Create our helper