diff --git a/lib/types.py b/lib/types.py index f3ef8f8a4..d59b10730 100644 --- a/lib/types.py +++ b/lib/types.py @@ -255,6 +255,7 @@ class GameEventType(TypedDict, total=False): CONTROL_QUEUE_TYPE = Queue[EventType] +PGN_QUEUE_TYPE = Queue[EventType] class PublicDataType(TypedDict, total=False): diff --git a/lichess-bot.py b/lichess-bot.py index 2b60e264a..8248caaad 100644 --- a/lichess-bot.py +++ b/lichess-bot.py @@ -28,7 +28,7 @@ from lib.conversation import Conversation, ChatLine from lib.timer import Timer, seconds, msec, hours, to_seconds from lib.types import (UserProfileType, EventType, GameType, GameEventType, CONTROL_QUEUE_TYPE, CORRESPONDENCE_QUEUE_TYPE, - LOGGING_QUEUE_TYPE) + LOGGING_QUEUE_TYPE, PGN_QUEUE_TYPE) from requests.exceptions import ChunkedEncodingError, ConnectionError, HTTPError, ReadTimeout from rich.logging import RichHandler from collections import defaultdict @@ -53,6 +53,7 @@ class PlayGameArgsType(TypedDict, total=False): challenge_queue: MULTIPROCESSING_LIST_TYPE correspondence_queue: CORRESPONDENCE_QUEUE_TYPE logging_queue: LOGGING_QUEUE_TYPE + pgn_queue: PGN_QUEUE_TYPE game_id: str @@ -132,6 +133,17 @@ def do_correspondence_ping(control_queue: CONTROL_QUEUE_TYPE, period: datetime.t control_queue.put_nowait({"type": "correspondence_ping"}) +def write_pgn_records(pgn_queue: PGN_QUEUE_TYPE, config: Configuration, username: str) -> None: + """Write PGN records to files as games finish.""" + while True: + try: + event = pgn_queue.get() + save_pgn_record(event, config, username) + pgn_queue.task_done() + except InterruptedError: + pass + + def handle_old_logs(auto_log_filename: str) -> None: """Remove old logs.""" directory = os.path.dirname(auto_log_filename) @@ -254,6 +266,14 @@ def start(li: LICHESS_TYPE, user_profile: UserProfileType, config: Configuration log_filename, auto_log_filename)) logging_listener.start() + + pgn_queue = manager.Queue() + pgn_listener = multiprocessing.Process(target=write_pgn_records, + args=(pgn_queue, + config, + user_profile["username"])) + pgn_listener.start() + thread_logging_configurer(logging_queue) try: @@ -264,6 +284,7 @@ def start(li: LICHESS_TYPE, user_profile: UserProfileType, config: Configuration control_queue, correspondence_queue, logging_queue, + pgn_queue, one_game) finally: control_stream.terminate() @@ -274,6 +295,8 @@ def start(li: LICHESS_TYPE, user_profile: UserProfileType, config: Configuration logging_configurer(logging_level, log_filename, auto_log_filename, False) logging_listener.terminate() logging_listener.join() + pgn_listener.terminate() + pgn_listener.join() def log_proc_count(change: str, active_games: set[str]) -> None: @@ -294,6 +317,7 @@ def lichess_bot_main(li: LICHESS_TYPE, control_queue: CONTROL_QUEUE_TYPE, correspondence_queue: CORRESPONDENCE_QUEUE_TYPE, logging_queue: LOGGING_QUEUE_TYPE, + pgn_queue: PGN_QUEUE_TYPE, one_game: bool) -> None: """ Handle all the games and challenges. @@ -329,7 +353,8 @@ def lichess_bot_main(li: LICHESS_TYPE, play_game_args: PlayGameArgsType = {"li": li, "control_queue": control_queue, "user_profile": user_profile, "config": config, "challenge_queue": challenge_queue, - "correspondence_queue": correspondence_queue, "logging_queue": logging_queue} + "correspondence_queue": correspondence_queue, "logging_queue": logging_queue, + "pgn_queue": pgn_queue} recent_bot_challenges: defaultdict[str, list[Timer]] = defaultdict(list) @@ -352,7 +377,6 @@ def lichess_bot_main(li: LICHESS_TYPE, active_games.discard(event["game"]["id"]) matchmaker.game_done() log_proc_count("Freed", active_games) - save_pgn_record(event, config, user_profile["username"]) one_game_completed = True elif event["type"] == "challenge": handle_challenge(event, li, challenge_queue, config.challenge, user_profile, recent_bot_challenges) @@ -515,10 +539,12 @@ def start_game_thread(active_games: set[str], game_id: str, play_game_args: Play def game_error_handler(error: BaseException) -> None: logger.exception("Game ended due to error:", exc_info=error) control_queue: CONTROL_QUEUE_TYPE = play_game_args["control_queue"] + pgn_queue: PGN_QUEUE_TYPE = play_game_args["pgn_queue"] li = play_game_args["li"] - control_queue.put_nowait({"type": "local_game_done", "game": {"id": game_id, - "pgn": li.get_game_pgn(game_id), - "complete": not game_is_active(li, game_id)}}) + control_queue.put_nowait({"type": "local_game_done", "game": {"id": game_id}}) + pgn_queue.put_nowait({"game": {"id": game_id, + "pgn": li.get_game_pgn(game_id), + "complete": not game_is_active(li, game_id)}}) pool.apply_async(play_game, kwds=play_game_args, @@ -594,7 +620,8 @@ def play_game(li: LICHESS_TYPE, config: Configuration, challenge_queue: MULTIPROCESSING_LIST_TYPE, correspondence_queue: CORRESPONDENCE_QUEUE_TYPE, - logging_queue: LOGGING_QUEUE_TYPE) -> None: + logging_queue: LOGGING_QUEUE_TYPE, + pgn_queue: PGN_QUEUE_TYPE) -> None: """ Play a game. @@ -705,7 +732,7 @@ def play_game(li: LICHESS_TYPE, stay_in_game = not stopped and (move_attempted or game_is_active(li, game.id)) pgn_record = try_get_pgn_game_record(li, config, game, board, engine) - final_queue_entries(control_queue, correspondence_queue, game, is_correspondence, pgn_record) + final_queue_entries(control_queue, correspondence_queue, game, is_correspondence, pgn_record, pgn_queue) delete_takeback_record(game) @@ -855,7 +882,7 @@ def should_exit_game(board: chess.Board, game: model.Game, prior_game: Optional[ def final_queue_entries(control_queue: CONTROL_QUEUE_TYPE, correspondence_queue: CORRESPONDENCE_QUEUE_TYPE, - game: model.Game, is_correspondence: bool, pgn_record: str) -> None: + game: model.Game, is_correspondence: bool, pgn_record: str, pgn_queue: PGN_QUEUE_TYPE) -> None: """ Log the game that ended or we disconnected from, and sends a `local_game_done` for the game. @@ -867,9 +894,10 @@ def final_queue_entries(control_queue: CONTROL_QUEUE_TYPE, correspondence_queue: else: logger.info(f"--- {game.url()} Game over") - control_queue.put_nowait({"type": "local_game_done", "game": {"id": game.id, - "pgn": pgn_record, - "complete": is_game_over(game)}}) + control_queue.put_nowait({"type": "local_game_done", "game": {"id": game.id}}) + pgn_queue.put_nowait({"game": {"id": game.id, + "pgn": pgn_record, + "complete": is_game_over(game)}}) def game_changed(current_game: model.Game, prior_game: Optional[model.Game]) -> bool: