Skip to content

LCS-102: Adds multithreaded approach to add all types of LC problems #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 165 additions & 139 deletions utility/lcAPIparser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'''
Last Run: 12/22/2020
Last Run: 04/04/2021
Last Updated: 12/07/2020

This takes the results of the LeetCode api page and transforms it into the data we want
Expand All @@ -12,13 +12,19 @@
import re # For matching for last run/updated
import argparse # For getting command-line args
from datetime import date # For getting the current date
from concurrent import futures

class helper:
def __init__(self):
# Setup environmental variables
cur_dir_path = os.path.normpath(os.path.dirname(os.path.abspath(__file__)))
env_path = os.path.normpath(os.path.join(cur_dir_path, os.path.normpath('../.env')))
load_dotenv(dotenv_path=env_path)
self.existing_problem_ids = set()
self.post_url = ''
self.token = ''
self.problem_get_url = ''
self.problem_post_url = ''

'''
Updates the header of this file's last run and last updated
Expand Down Expand Up @@ -75,157 +81,177 @@ def convert_block(self, info_block):
}
return convert

'''
Parse the results from the LeetCode API, which we either
gather from a GET request or we can parse a provided file.
POST the results via our running server instance into our DB.
'''
def parse(self, file_location: str = None, test: bool =False):
lc_url = 'https://leetcode.com/api/problems/algorithms/'
question_info = None
if not file_location:
print('No file passed, grabbing info directly')
lc_res = get(url=lc_url).json()
question_info = None
try:
question_info = lc_res.get('stat_status_pairs', None)
except Exception:
print('Could not retrieve question info from LC')
self.update_header_dates(True)
raise
exit(1)
def get_login_info(self):
# Define our server URL endpoints here:
base_server_url = os.getenv('SERVER_BASE_URL')
if test:
print('Connecting to test server port.')
server_port = os.getenv('TEST_SERVER_PORT')
else:
with open(file_location, 'r') as file:
asJson = json.load(file)
question_info = asJson.get('stat_status_pairs', None)
if question_info:
# Define our server URL endpoints here:
base_server_url = os.getenv('SERVER_BASE_URL')
if test:
print('Connecting to test server port.')
server_port = os.getenv('TEST_SERVER_PORT')
else:
server_port = os.getenv('SERVER_PORT')
server_url = base_server_url + ':' + server_port

problem_post_url = server_url + '/api/problems/bulk'
problem_get_url = server_url + '/api/problems'
login_url = server_url + '/api/auth'
register_url = server_url+ '/api/users'

# Get a login token for the admin user
admin_email = os.getenv('ADMIN_EMAIL')
admin_pass = os.getenv('ADMIN_PASS')
admin_name = os.getenv('ADMIN_NAME')

body = json.dumps({
'name': admin_name,
'email': admin_email,
'password': admin_pass
})
headers = {
'Content-Type': 'application/json'
}

# Try to register the user first, in case of first time setup
register_res = None
token = None
try:
register_res = post(url=register_url, headers=headers, data=body, verify=False).json()
if register_res.get('token', False):
token = register_res.get('token')
except Exception:
print('Unable to register user.')
print(register_url)
print(headers)
# print(body)
self.update_header_dates(True)
raise
exit(1)
server_port = os.getenv('SERVER_PORT')
server_url = base_server_url + ':' + server_port

# Get auth user token if register didn't provide it
if not token:
login_res = None
try:
login_res = post(url=login_url, headers=headers, data=body, verify=False).json()
except Exception:
print('Unable to login to server.')
print(login_url)
print(headers)
# print(body)
self.update_header_dates(True)
raise
exit(1)
if not login_res.get('token', None):
print('No token receieved. Wrong credentials?')
self.update_header_dates(True)
exit(1)
token = login_res['token']
self.problem_post_url = server_url + '/api/problems/bulk'
self.problem_get_url = server_url + '/api/problems'
login_url = server_url + '/api/auth'
register_url = server_url+ '/api/users'

# Get a login token for the admin user
admin_email = os.getenv('ADMIN_EMAIL')
admin_pass = os.getenv('ADMIN_PASS')
admin_name = os.getenv('ADMIN_NAME')

# Setup headers for POSTing a new problem and GETing our cur problems
headers = {
'x-auth-token': token,
'Content-Type': 'application/json'
}
body = json.dumps({
'name': admin_name,
'email': admin_email,
'password': admin_pass
})
headers = {
'Content-Type': 'application/json'
}

# Get all problems currently in the database
all_probs_res = None
try:
all_probs_res = get(url=problem_get_url, headers=headers, verify=False).json()
except Exception:
print('Couldn\'t get all problems from the server.')
print(problem_get_url)
print(headers)
self.update_header_dates(True)
raise
exit(1)
# All we really care about is the 'id' values for the problems
existing_problem_ids = set()
for prob in all_probs_res:
lc_id = prob.get('id', None)
existing_problem_ids.add(lc_id)

blocks = []
# print(question_info[:100])
# Question info contains an array of blocks of information for each question
for info in question_info:
# Convert block to info we need if it's not already in our DB
cur_id = info.get('stat', {}).get('question_id', None)
if cur_id and cur_id not in existing_problem_ids:
# Convert and add to blocks to POST
converted_block = self.convert_block(info)
if not converted_block:
print('Failed to convert info')
else:
blocks.append(converted_block)
blocks_as_dict = {"problems" : blocks}
# Convert to JSON so we can POST it to the DB
blocks_as_json = json.dumps(blocks_as_dict)
# POST to server
prob_add_res = None
# Try to register the user first, in case of first time setup
register_res = None
self.token = None
try:
register_res = post(url=register_url, headers=headers, data=body, verify=False).json()
if register_res.get('token', False):
self.token = register_res.get('token')
except Exception:
print('Unable to register user.')
print(register_url)
print(headers)
# print(body)
raise
exit(1)

# Get auth user token if register didn't provide it
if not self.token:
login_res = None
try:
prob_add_res = post(url=problem_post_url, headers=headers, data=blocks_as_json, verify=False).json()
login_res = post(url=login_url, headers=headers, data=body, verify=False).json()
except Exception:
print('Unable to POST problems to server.')
print(problem_post_url)
print('Unable to login to server.')
print(login_url)
print(headers)
self.update_header_dates(True)
# print(body)
raise
exit(1)
print(prob_add_res)
if prob_add_res.get('errors', None):
# Some kind of error.
print('Errors when adding problems in bulk.')
print(str(prob_add_res['errors']))
self.update_header_dates(True)
if not login_res.get('token', None):
print('No token receieved. Wrong credentials?')
exit(1)
self.token = login_res['token']

def get_cur_problems(self):
# Setup headers for POSTing a new problem and GETing our cur problems
headers = {
'x-auth-token': self.token,
'Content-Type': 'application/json'
}

# Get all problems currently in the database
all_probs_res = None
try:
all_probs_res = get(url=self.problem_get_url, headers=headers, verify=False).json()
except Exception:
print('Couldn\'t get all problems from the server.')
print(self.problem_get_url)
print(headers)
raise
exit(1)
# All we really care about is the 'id' values for the problems
self.existing_problem_ids = set()
for prob in all_probs_res:
lc_id = prob.get('id', None)
self.existing_problem_ids.add(lc_id)

def get_info_from_url(self, lc_url_base: str, lc_url_type: str):
lc_res = get(url=lc_url_base + lc_url_type).json()
question_info = None
try:
question_info = lc_res.get('stat_status_pairs', None)
except Exception:
print('Could not retrieve question info from LC')
raise
exit(1)
self.add_new_problems(question_info, lc_url_type)

def add_new_problems(self, question_info, url_provider: str):
blocks = []
headers = {
'x-auth-token': self.token,
'Content-Type': 'application/json'
}
# print(question_info[:100])
# Question info contains an array of blocks of information for each question
for info in question_info:
# Convert block to info we need if it's not already in our DB
cur_id = info.get('stat', {}).get('question_id', None)
if cur_id and cur_id not in self.existing_problem_ids:
# Convert and add to blocks to POST
converted_block = self.convert_block(info)
if not converted_block:
print('Failed to convert info')
else:
blocks.append(converted_block)
blocks_as_dict = {"problems" : blocks}
# Convert to JSON so we can POST it to the DB
blocks_as_json = json.dumps(blocks_as_dict)
# POST to server
prob_add_res = None
try:
prob_add_res = post(url=self.problem_post_url, headers=headers, data=blocks_as_json, verify=False).json()
except Exception:
print('Unable to POST problems to server.')
print(self.problem_post_url)
print(headers)
raise
exit(1)
print(f'Results for {url_provider} are:')
print(prob_add_res)
if prob_add_res.get('errors', None):
# Some kind of error.
print('Errors when adding problems in bulk.')
print(str(prob_add_res['errors']))
exit(1)

'''
Parse the results from the LeetCode API, which we either
gather from a GET request or we can parse a provided file.
POST the results via our running server instance into our DB.
'''
def parse(self, file_location: str = None, test: bool =False):
question_info = None
self.get_login_info()
self.get_cur_problems()
if not file_location:
lc_url_root = 'https://leetcode.com/api/problems/'
lc_problem_types = ['algorithms', 'shell', 'database', 'concurrency']
print('No file passed, grabbing info directly from LC api resources')
future = {}
update_header_as_fail = False
with futures.ThreadPoolExecutor() as exec:
for prob_type in lc_problem_types:
future[exec.submit(self.get_info_from_url, lc_url_root, prob_type)] = prob_type
for f in futures.as_completed(future.keys()):
try:
f.result()
except Exception as exc:
print('%r generated an exception: %s' % (future[f], exc))
update_header_as_fail = True
futures.wait(future)
# Update the last run/last tested dates
self.update_header_dates()
self.update_header_dates(update_header_as_fail)
else:
print('No question info could be gathered.')
# Update the last run, but not the last tested date
self.update_header_dates(True)
with open(file_location, 'r') as file:
asJson = json.load(file)
question_info = asJson.get('stat_status_pairs', None)
if question_info:
self.add_new_problems(question_info, 'custom file')
else:
print('No question info could be gathered based on custom file.')
# Update the last run, but not the last tested date
self.update_header_dates(True)

if __name__ == '__main__':
# Create our helper
Expand Down