From 7c92128fcd30ac8d5ff12c9cb798974c353564cc Mon Sep 17 00:00:00 2001 From: Javier Garcia-Bernardo Date: Mon, 8 Jul 2024 13:22:13 +0200 Subject: [PATCH 1/5] update CLI to match new functions --- ingest_lichess.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/ingest_lichess.py b/ingest_lichess.py index 061cfc7..fa6c058 100644 --- a/ingest_lichess.py +++ b/ingest_lichess.py @@ -6,14 +6,19 @@ from ingester import ingest_lichess_data -def main(start, end, pq_dir, months=None, include_moves=False, restart_counter_games=True): +def main(start, end, pq_dir, months=None, include_moves=False, restart_counter_games=True, dir_ndjson=None, ndjson_size=1e6): """Download data with a check for existing parquet files.""" + pq_dir = Path(pq_dir) pq_dir.mkdir(parents=True, exist_ok=True) + if dir_ndjson is not None: + dir_ndjson = Path(dir_ndjson) + dir_ndjson.mkdir(parents=True, exist_ok=True) + years = range(start, end) if months is None: months = range(1, 13) - arguments = [(y, m, pq_dir, include_moves) for y in years for m in months] + arguments = [(y, m, pq_dir, include_moves, dir_ndjson, ndjson_size) for y in years for m in months] for arg in arguments: if (Path(pq_dir) / f"{arg[0]}_{arg[1]:02}.parquet").exists(): @@ -21,22 +26,16 @@ def main(start, end, pq_dir, months=None, include_moves=False, restart_counter_g continue ingest_lichess_data(*arg) - if restart_counter_games: - # Remove the counter file - counter_file = Path(pq_dir) / "cum_files.json.zst" - if counter_file.exists(): - counter_file.unlink() - if __name__ == "__main__": - parser=argparse.ArgumentParser() parser.add_argument('--start', type=int, default=2013) parser.add_argument('--end', type=int, default=datetime.date.today().year) parser.add_argument('--months', nargs='+', type=int) parser.add_argument('--include-moves', action='store_true', default=False) parser.add_argument('--debug', action='store_true', default=False) - parser.add_argument('--restart-counter-games', action='store_true', default=True) parser.add_argument('--parquet-dir', type=Path, default="./lichess_parquet") + parser.add_argument('--dir-ndjson', type=str, default=None) + parser.add_argument('--ndjson-size', type=int, default=1e6) args=parser.parse_args() logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) @@ -47,4 +46,6 @@ def main(start, end, pq_dir, months=None, include_moves=False, restart_counter_g months=args.months, include_moves=args.include_moves, pq_dir=args.parquet_dir, - restart_counter_games=args.restart_counter_games) + dir_ndjson=args.dir_ndjson, + ndjson_size=args.ndjson_size + ) From db4169c176b4915d02c9906832c55bc4d256f389 Mon Sep 17 00:00:00 2001 From: Javier Garcia-Bernardo Date: Mon, 8 Jul 2024 13:25:08 +0200 Subject: [PATCH 2/5] add new functionality by Max --- ingester.py | 291 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 226 insertions(+), 65 deletions(-) diff --git a/ingester.py b/ingester.py index d25c50e..1b99ed7 100644 --- a/ingester.py +++ b/ingester.py @@ -1,16 +1,27 @@ """Module to download and convert lichess data headers to parquet files directory.""" import io -from tempfile import NamedTemporaryFile as TempFile +import logging import json import re -import logging import requests -from tqdm import tqdm + import zstandard as zstd import polars as pl +import s3fs -def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_parquet", - include_moves: bool = False): +from collections import defaultdict +from random import random +from tempfile import NamedTemporaryFile as TempFile +from tqdm import tqdm +from typing import Optional + +def ingest_lichess_data(year: int, + month: int, + dir_parquet: str = "./lichess_parquet", + include_moves: bool = False, + fs: Optional[s3fs.core.S3FileSystem] = None, + dir_ndjson: Optional[str] = None, + ndjson_size: int = 1e6): """ Download, process, and convert chess games data from the Lichess database to Parquet format. @@ -26,6 +37,10 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par to "../lichess_parquet". include_moves (bool, optional; default False): Whether to include games' moves in the saved data. Including moves greatly increases the size of the Parquet files. + fs (s3fs.core.S3FileSystem, optional): If provided, the function will use this filesystem + to read and write files. Defaults to None. + dir_ndjson (str, optional): Directory where NDJSON files will be saved. Defaults to None. + ndjson_size (int, optional): The number of games to process before converting to Parquet. The function constructs a URL to stream the dataset from, uses regular expressions to parse the data, and utilizes Zstandard for decompression. Progress is tracked and displayed using @@ -41,9 +56,14 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par # Use temp file for cumulative values try: - # Read and decompress the data - with open(f"{dir_parquet}/cum_files.json.zst", 'rb') as fin: - decompressed_bytes = zstd.ZstdDecompressor().decompress(fin.read()) + if isinstance(fs, s3fs.core.S3FileSystem): + # read cum_files from S3 point + with fs.open(f"{dir_parquet}/cum_files.json.zst", mode='rb') as fin: + decompressed_bytes = zstd.ZstdDecompressor().decompress(fin.read()) + else: + # Read and decompress the data + with open(f"{dir_parquet}/cum_files.json.zst", 'rb') as fin: + decompressed_bytes = zstd.ZstdDecompressor().decompress(fin.read()) # Convert bytes back to JSON object d_cum_games = json.loads(decompressed_bytes.decode('utf-8')) @@ -52,6 +72,8 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par except FileNotFoundError: logging.debug("Cumulative file not found, recreating from scratch") d_cum_games = dict() + d_cum_games["All"] = defaultdict(int) + # Create data URL file_name = f"lichess_db_standard_rated_{year}-{month:02}.pgn.zst" @@ -80,7 +102,12 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par game = [] moves = None games = 0 - temp_files=[] + batch = 0 + if dir_ndjson is None: + temp_file = TempFile(suffix=".ndjson", mode="w+") + else: + # useful for debugging + temp_file = open(f"{dir_ndjson}/temp.ndjson", mode="w+") # Start progres bar (approximate bytes since the raw file is # compressed and we are uncompressing on the fly) @@ -92,9 +119,6 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par desc=f"{year}_{month:02}", ) - # Create temp file and store in list - temp_files.append(TempFile(suffix=".ndjson", mode="w+")) - logging.debug("Collecting data from stream") # Start loop for line in text_stream: @@ -103,8 +127,23 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par # Looking for the start of the game if line.startswith("["): looking_for_game = False + # Add type of game + event_type = list(re.findall(pattern, line)[0]) + tournament = "tournament" in line + if tournament: + event_type[1] = event_type[1].split("tournament")[0].strip() + game.append(tuple(event_type)) + + elif line.startswith("1."): - moves = line.strip() + if include_moves: + # Keep only 3 moves + moves = line.replace("\n", " ").strip() + moves = moves.split("4.")[0] + else: + moves = "" + elif line.startswith("["): # Game continues, keep appending + game.append(re.findall(pattern, line)[0]) elif not line.startswith("[") and moves is not None: # Game just ended, dump to NDJSON file if include_moves: @@ -112,91 +151,137 @@ def ingest_lichess_data(year: int, month: int, dir_parquet: str = "./lichess_par #if "eval" in modes game_df = dict(game) game_df["Evaluation_flag"] = "eval" in moves + game_df["Tournament"] = tournament + for player in ["White", "Black"]: id_player = game_df[player] + + game_type = game_df["Event"] + if game_type not in d_cum_games: + d_cum_games[game_type] = defaultdict(int) + + # Total games played + if id_player not in d_cum_games["All"]: + d_cum_games["All"][id_player] = 0 + # Add random uniform number per player + d_cum_games["All"][f"{id_player}_random"] = random() + + # Games played in category + if id_player not in d_cum_games[game_type]: + d_cum_games[game_type][id_player] = 0 + d_cum_games[game_type][f"{id_player}Elo_max"] = 0 + d_cum_games[game_type][f"{id_player}Elo_max_faced"] = 0 + # Add cumulative values to game - if game_df[player] not in d_cum_games: - d_cum_games[id_player] = 0 - d_cum_games[id_player] += 1 - game_df.update({f"{player}_cum_games": d_cum_games[id_player]}) + d_cum_games[game_type][id_player] += 1 + d_cum_games["All"][id_player] += 1 + + game_df["ID_random"] = random() + game_df[f"{player}_random"] = d_cum_games["All"][f"{id_player}_random"] + game_df[f"{player}_cum_games_type"] = d_cum_games[game_type][id_player] + game_df[f"{player}_cum_games_total"] = d_cum_games["All"][id_player] # Find max Elo of each player - if d_cum_games.get(f"{id_player}Elo_max") is None: - if game_df[f"{player}Elo"] == "?": - game_df[f"{player}Elo_max"] = "" - else: - d_cum_games[f"{id_player}Elo_max"] = int(game_df[f"{player}Elo"]) - game_df[f"{player}Elo_max"] = d_cum_games[f"{id_player}Elo_max"] + max_elo = d_cum_games[game_type][f"{id_player}Elo_max"] + if game_df[f"{player}Elo"] == "?": + game_df[f"{player}Elo_max"] = max_elo + elif int(game_df[f"{player}Elo"]) > max_elo: + d_cum_games[game_type][f"{id_player}Elo_max"] = int(game_df[f"{player}Elo"]) + game_df[f"{player}Elo_max"] = int(game_df[f"{player}Elo"]) + else: + game_df[f"{player}Elo_max"] = max_elo + + # Max ELO faced + max_elo = d_cum_games[game_type][f"{id_player}Elo_max_faced"] + if game_df[f"{player}Elo"] == "?": + game_df[f"{player}Elo_max_faced"] = max_elo + elif int(game_df[f"{player}Elo"]) > max_elo: + d_cum_games[game_type][f"{id_player}Elo_max_faced"] = int(game_df[f"{player}Elo"]) + game_df[f"{player}Elo_max_faced"] = int(game_df[f"{player}Elo"]) else: - if game_df[f"{player}Elo"] == "?": - game_df[f"{player}Elo_max"] = d_cum_games[f"{id_player}Elo_max"] - else: - d_cum_games[f"{id_player}Elo_max"] = max( - int(game_df[f"{player}Elo"]), - d_cum_games[f"{id_player}Elo_max"] - ) - game_df[f"{player}Elo_max"] = d_cum_games[f"{id_player}Elo_max"] + game_df[f"{player}Elo_max_faced"] = max_elo + # Add fields that are missing when they have no value for field in ['BlackTitle', 'WhiteTitle']: if field not in game_df: - game_df.update({field: '-'}) + game_df.update({field: None}) # Add concat DateTime field to replace seperate Date & TIme game_df.update({'DateTime': f"{game_df['UTCDate']} {game_df['UTCTime']}"}) # Write complete game to temp file - temp_files[-1].write(json.dumps(game_df) + "\n") + temp_file.write(json.dumps(game_df) + "\n") looking_for_game = True game = [] moves = None games += 1 - if games>=1e6: - # When 1M games reached, create new temp file - temp_files.append(TempFile(suffix=".ndjson", mode="w+")) - games = 0 - elif line.startswith("["): # Game continues, keep appending - game.append(re.findall(pattern, line)[0]) + if games >= ndjson_size: + if dir_ndjson is not None: + temp_file.close() + + # Convert the NDJSON to Parquet + _ndjson_to_parquet(temp_file.name, + f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) + batch += 1 + + # When the max size of ndjson is reached, create new temp file + if dir_ndjson is None: + temp_file = TempFile(suffix=".ndjson", mode="w+") + else: + # useful for debugging + temp_file = open(f"{dir_ndjson}/temp.ndjson", mode="w+") + games = 0 # Avoid 'hanging' progress bars due to approximation/actual value-mismatch progress_bar.update(num_bytes * 5.2) # Clean up progress_bar.close() + # close file + if dir_ndjson is not None: + temp_file.close() - logging.debug("Converting ndjson to Parquet") - # Convert temp files to parquet, one by one - batch = 0 - for temp_file in temp_files: - _ndjson_to_parquet(temp_file.name, - f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves) - batch += 1 + # Last batch + _ndjson_to_parquet(temp_file.name, + f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) # Save cumulative values to file compressed_bytes = zstd.ZstdCompressor().compress(json.dumps(d_cum_games).encode('utf-8')) - with open(f"{dir_parquet}/cum_files.json.zst", "wb") as fout: - fout.write(compressed_bytes) + if isinstance(fs, s3fs.core.S3FileSystem): + # read cum_files from S3 point + with fs.open(f"{dir_parquet}/cum_files.json.zst", mode='wb') as fout: + fout.write(compressed_bytes) + else: + # Read and decompress the data + with open(f"{dir_parquet}/cum_files.json.zst", 'wb') as fout: + fout.write(compressed_bytes) -def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool): +def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool, fs: Optional[s3fs.core.S3FileSystem] = None): """Creates a cleaned dataframe from an ndjson of Lichess game info.""" - cols = ["ID", "White", "Black", "Result", "White_cum_games", "Black_cum_games", - "WhiteElo", "BlackElo", "WhiteElo_max", "BlackElo_max", - "WhiteTitle", "BlackTitle", "WhiteRatingDiff", "BlackRatingDiff", "ECO", - "Opening", "TimeControl", "Termination", "DateTime" ] + game_cols = ["ID", "ID_random", "Event", "Tournament", "ECO", "Opening", "TimeControl", "Termination", "DateTime"] schema = { + "Event": pl.Utf8, "Site": pl.Utf8, + "ID_random": pl.Float64, "White": pl.Utf8, "Black": pl.Utf8, - "Result": pl.Enum(["1-0", "0-1", "1/2-1/2", "?", "*"]), + "Result":pl.Utf8, #pl.Enum(["1-0", "0-1", "1/2-1/2", "?", "*"]), "WhiteElo": pl.Utf8, "BlackElo": pl.Utf8, "WhiteElo_max": pl.Int32, "BlackElo_max": pl.Int32, - "White_cum_games": pl.Int32, - "Black_cum_games": pl.Int32, + "WhiteElo_max_faced": pl.Int32, + "BlackElo_max_faced": pl.Int32, + "White_random": pl.Float64, + "Black_random": pl.Float64, + "White_cum_games_total": pl.Int32, + "Black_cum_games_total": pl.Int32, + "White_cum_games_type": pl.Int32, + "Black_cum_games_type": pl.Int32, "WhiteTitle": pl.Utf8, "BlackTitle": pl.Utf8, "WhiteRatingDiff": pl.Utf8, @@ -204,18 +289,21 @@ def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool) "ECO": pl.Utf8, "Opening": pl.Utf8, "TimeControl": pl.Utf8, - "Termination": pl.Enum(["Time forfeit", "Rules infraction", "Normal", "Abandoned", "Unterminated", "?"]), - "DateTime": pl.Utf8 + "Termination": pl.Utf8, #pl.Enum(["Time forfeit", "Rules infraction", "Normal", "Abandoned", "Unterminated", "?"]), + "DateTime": pl.Utf8, + "Tournament": pl.Boolean } if include_moves: - cols.append("Moves") + game_cols.append("Moves") schema["Moves"] = pl.Utf8 - cols.append("Evaluation_flag") + game_cols.append("Evaluation_flag") schema["Evaluation_flag"] = pl.Boolean + # Convert from UTF with "?" symbol to INT int_cols = ["WhiteElo", "BlackElo", "WhiteRatingDiff", "BlackRatingDiff"] - exclude_int = ["White_cum_games", "Black_cum_games", "WhiteElo_max", "BlackElo_max"] + exclude_int = ["White_cum_games_total", "Black_cum_games_total", "WhiteElo_max", "BlackElo_max", "WhiteElo_max_faced", "BlackElo_max_faced", + "White_random", "Black_random", "ID_random", "White_cum_games_type", "Black_cum_games_type"] logging.debug("Creating dataframe") lf = ( @@ -229,14 +317,87 @@ def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool) pl.col(int_cols).str.replace(r"\+", "").cast(pl.Int16), pl.col("DateTime").str.to_datetime(format="%Y.%m.%d %H:%M:%S"), pl.col("Site").str.replace("https://lichess.org/", "").alias("ID"), + # Title is not missing (flag) + pl.col("WhiteTitle").is_not_null().alias("WhiteTitle_flag"), + pl.col("BlackTitle").is_not_null().alias("BlackTitle_flag"), ) # # lastly, select only what we need - .select(cols) + .select( + *game_cols, + "Result", + pl.lit("White").alias("Role_player"), + pl.col("White").alias("Player"), + pl.col("Black").alias("Opponent"), + pl.col("WhiteElo").alias("PlayerElo"), + pl.col("BlackElo").alias("OpponentElo"), + pl.col("WhiteElo_max").alias("PlayerElo_max"), + pl.col("BlackElo_max").alias("OpponentElo_max"), + pl.col("WhiteElo_max_faced").alias("PlayerElo_max_faced"), + pl.col("BlackElo_max_faced").alias("OpponentElo_max_faced"), + pl.col("WhiteTitle").alias("PlayerTitle"), + pl.col("BlackTitle").alias("OpponentTitle"), + pl.col("WhiteTitle_flag").alias("PlayerTitle_flag"), + pl.col("BlackTitle_flag").alias("OpponentTitle_flag"), + pl.col("WhiteRatingDiff").alias("PlayerRatingDiff"), + pl.col("BlackRatingDiff").alias("OpponentRatingDiff"), + pl.col("White_random").alias("Player_random"), + pl.col("Black_random").alias("Opponent_random"), + pl.col("White_cum_games_total").alias("Player_cum_games_total"), + pl.col("Black_cum_games_total").alias("Opponent_cum_games_total"), + pl.col("White_cum_games_type").alias("Player_cum_games_type"), + pl.col("Black_cum_games_type").alias("Opponent_cum_games_type"), + ) + .set_sorted("DateTime") + ) + + d_rev_result = {"1-0": "0-1", "0-1": "1-0", "1/2-1/2": "1/2-1/2", "?": "?", "*": "*"} + # Convert to player-game-role format: i.e., duplicate each game switching White and Black + lf_inv = lf.select( + *game_cols, + pl.col("Result").map_elements(d_rev_result.get, return_dtype=pl.Utf8), + pl.lit("Black").alias("Role_player"), + pl.col("Opponent").alias("Player"), + pl.col("Player").alias("Opponent"), + pl.col("OpponentElo").alias("PlayerElo"), + pl.col("PlayerElo").alias("OpponentElo"), + pl.col("OpponentElo_max").alias("PlayerElo_max"), + pl.col("PlayerElo_max").alias("OpponentElo_max"), + pl.col("OpponentElo_max_faced").alias("PlayerElo_max_faced"), + pl.col("PlayerElo_max_faced").alias("OpponentElo_max_faced"), + pl.col("OpponentTitle").alias("PlayerTitle"), + pl.col("PlayerTitle").alias("OpponentTitle"), + pl.col("OpponentTitle_flag").alias("PlayerTitle_flag"), + pl.col("PlayerTitle_flag").alias("OpponentTitle_flag"), + pl.col("OpponentRatingDiff").alias("PlayerRatingDiff"), + pl.col("PlayerRatingDiff").alias("OpponentRatingDiff"), + pl.col("Opponent_random").alias("Player_random"), + pl.col("Player_random").alias("Opponent_random"), + pl.col("Opponent_cum_games_total").alias("Player_cum_games_total"), + pl.col("Player_cum_games_total").alias("Opponent_cum_games_total"), + pl.col("Opponent_cum_games_type").alias("Player_cum_games_type"), + pl.col("Player_cum_games_type").alias("Opponent_cum_games_type") + ).set_sorted("DateTime") + + # concatenate files sorted by time (should be fast) + lf = (lf + .merge_sorted(lf_inv, key="DateTime") + .sort(["DateTime", "ID"]) + .with_columns( + pl.col("PlayerElo").cut(range(0, 4001, 200)).alias("PlayerElo_bin"), + # Cast to enums (we could also cast titles and others) + pl.col("Result").cast(pl.Enum(["1-0", "0-1", "1/2-1/2", "?", "*"])), + pl.col("Termination").cast(pl.Enum(["Time forfeit", "Rules infraction", "Normal", "Abandoned", "Unterminated", "?"])), + pl.col("Role_player").cast(pl.Enum(["White", "Black"])), + ) ) logging.info("Writing '%s", parquet_path) - - # gzip and use_pyarrow are required for default Apache Drill compatibility - lf.collect(streaming=True).write_parquet(parquet_path, compression='gzip', use_pyarrow=True) + if isinstance(fs, s3fs.core.S3FileSystem): + # write parquet + with fs.open(parquet_path, mode='wb') as f: + lf.collect(streaming=True).write_parquet(f, compression='gzip', use_pyarrow=True) + else: + # gzip and use_pyarrow are required for default Apache Drill compatibility + lf.collect(streaming=True).write_parquet(parquet_path, compression='gzip', use_pyarrow=True) return parquet_path From 0e3a69a5ce59ba6bf3b684519c61c4d5bbc9750f Mon Sep 17 00:00:00 2001 From: Javier Garcia-Bernardo Date: Mon, 8 Jul 2024 23:22:17 +0200 Subject: [PATCH 3/5] add threading --- ingester.py | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/ingester.py b/ingester.py index 1b99ed7..b51ac2e 100644 --- a/ingester.py +++ b/ingester.py @@ -4,11 +4,13 @@ import json import re import requests +import threading import zstandard as zstd import polars as pl import s3fs +from urllib3.exceptions import ProtocolError from collections import defaultdict from random import random from tempfile import NamedTemporaryFile as TempFile @@ -85,9 +87,11 @@ def ingest_lichess_data(year: int, # Set up the decompressor decompressor = zstd.ZstdDecompressor(max_window_size=2**31) - # Connect to url and create tempfile + + # Connect to url and create tempfile. + # Long timeout since we'll process data every ndjson_size games with ( - requests.get(url, stream=True, timeout=10) as response + requests.get(url, stream=True, timeout=360) as response ): # get basic info, make sure connection was successful response.raise_for_status() @@ -103,11 +107,16 @@ def ingest_lichess_data(year: int, moves = None games = 0 batch = 0 + + # Start temp file (2 files to allow for parallel data processing and Internet IO) if dir_ndjson is None: - temp_file = TempFile(suffix=".ndjson", mode="w+") + temp_files = [TempFile(prefix="0", suffix=".ndjson", mode="w+"), + TempFile(prefix="1", suffix=".ndjson", mode="w+")] else: # useful for debugging - temp_file = open(f"{dir_ndjson}/temp.ndjson", mode="w+") + temp_files = [open(f"{dir_ndjson}/temp_0.ndjson", mode="w+"), + open(f"{dir_ndjson}/temp_1.ndjson", mode="w+")] + temp_file = temp_files[0] # Start progres bar (approximate bytes since the raw file is # compressed and we are uncompressing on the fly) @@ -124,7 +133,7 @@ def ingest_lichess_data(year: int, for line in text_stream: progress_bar.update(len(line)) if looking_for_game: - # Looking for the start of the game + # Looking for the start of the game if line.startswith("["): looking_for_game = False # Add type of game @@ -212,7 +221,7 @@ def ingest_lichess_data(year: int, game_df.update({'DateTime': f"{game_df['UTCDate']} {game_df['UTCTime']}"}) # Write complete game to temp file - temp_file.write(json.dumps(game_df) + "\n") + temp_files[batch % 2].write(json.dumps(game_df) + "\n") looking_for_game = True game = [] @@ -221,19 +230,22 @@ def ingest_lichess_data(year: int, if games >= ndjson_size: if dir_ndjson is not None: - temp_file.close() + temp_files[batch % 2].close() # Convert the NDJSON to Parquet - _ndjson_to_parquet(temp_file.name, - f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) + threading.Thread(target=_ndjson_to_parquet, args=(temp_files[batch % 2].name, f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs)).start() + + #_ndjson_to_parquet(temp_file.name, + # f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) batch += 1 # When the max size of ndjson is reached, create new temp file if dir_ndjson is None: - temp_file = TempFile(suffix=".ndjson", mode="w+") + temp_files[batch % 2] = TempFile(prefix=f"{batch%2}", suffix=".ndjson", mode="w+") else: # useful for debugging - temp_file = open(f"{dir_ndjson}/temp.ndjson", mode="w+") + temp_files[batch % 2] = open(f"{dir_ndjson}/temp_{batch%2}.ndjson", mode="w+") + games = 0 # Avoid 'hanging' progress bars due to approximation/actual value-mismatch @@ -244,9 +256,10 @@ def ingest_lichess_data(year: int, if dir_ndjson is not None: temp_file.close() + threading.Thread(target=_ndjson_to_parquet, args=(temp_file.name, f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs)).start() # Last batch - _ndjson_to_parquet(temp_file.name, - f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) + # _ndjson_to_parquet(temp_file.name, + # f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) # Save cumulative values to file compressed_bytes = zstd.ZstdCompressor().compress(json.dumps(d_cum_games).encode('utf-8')) From d87d7e5db7132f550f452bf0c41f765fcf1c0db7 Mon Sep 17 00:00:00 2001 From: Javier Garcia-Bernardo Date: Tue, 9 Jul 2024 09:28:22 +0200 Subject: [PATCH 4/5] delete tempfiles explicitly --- ingester.py | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/ingester.py b/ingester.py index b51ac2e..18734e1 100644 --- a/ingester.py +++ b/ingester.py @@ -3,6 +3,7 @@ import logging import json import re +import os import requests import threading @@ -110,14 +111,12 @@ def ingest_lichess_data(year: int, # Start temp file (2 files to allow for parallel data processing and Internet IO) if dir_ndjson is None: - temp_files = [TempFile(prefix="0", suffix=".ndjson", mode="w+"), - TempFile(prefix="1", suffix=".ndjson", mode="w+")] + temp_file = TempFile(suffix=".ndjson", mode="w+", delete=False) else: + logging.warning("Providing a dir_ndjson is only recommended for debugging.") # useful for debugging - temp_files = [open(f"{dir_ndjson}/temp_0.ndjson", mode="w+"), - open(f"{dir_ndjson}/temp_1.ndjson", mode="w+")] - temp_file = temp_files[0] - + temp_file = open(f"{dir_ndjson}/temp_0.ndjson", mode="w+") + # Start progres bar (approximate bytes since the raw file is # compressed and we are uncompressing on the fly) progress_bar = tqdm( @@ -221,7 +220,7 @@ def ingest_lichess_data(year: int, game_df.update({'DateTime': f"{game_df['UTCDate']} {game_df['UTCTime']}"}) # Write complete game to temp file - temp_files[batch % 2].write(json.dumps(game_df) + "\n") + temp_file.write(json.dumps(game_df) + "\n") looking_for_game = True game = [] @@ -229,11 +228,10 @@ def ingest_lichess_data(year: int, games += 1 if games >= ndjson_size: - if dir_ndjson is not None: - temp_files[batch % 2].close() + temp_file.close() # Convert the NDJSON to Parquet - threading.Thread(target=_ndjson_to_parquet, args=(temp_files[batch % 2].name, f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs)).start() + threading.Thread(target=_ndjson_to_parquet, args=(temp_file.name, f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs)).start() #_ndjson_to_parquet(temp_file.name, # f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) @@ -241,10 +239,10 @@ def ingest_lichess_data(year: int, # When the max size of ndjson is reached, create new temp file if dir_ndjson is None: - temp_files[batch % 2] = TempFile(prefix=f"{batch%2}", suffix=".ndjson", mode="w+") + temp_file = TempFile(suffix=".ndjson", mode="w+", delete=False) else: - # useful for debugging - temp_files[batch % 2] = open(f"{dir_ndjson}/temp_{batch%2}.ndjson", mode="w+") + # useful for debugging (don't use in production) + temp_file = open(f"{dir_ndjson}/temp_{batch%2}.ndjson", mode="w+") games = 0 @@ -256,8 +254,8 @@ def ingest_lichess_data(year: int, if dir_ndjson is not None: temp_file.close() - threading.Thread(target=_ndjson_to_parquet, args=(temp_file.name, f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs)).start() # Last batch + threading.Thread(target=_ndjson_to_parquet, args=(temp_file.name, f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs)).start() # _ndjson_to_parquet(temp_file.name, # f"{dir_parquet}/{year}_{month:02}_{batch:003}.parquet", include_moves, fs=fs) @@ -272,6 +270,8 @@ def ingest_lichess_data(year: int, with open(f"{dir_parquet}/cum_files.json.zst", 'wb') as fout: fout.write(compressed_bytes) + return None + def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool, fs: Optional[s3fs.core.S3FileSystem] = None): """Creates a cleaned dataframe from an ndjson of Lichess game info.""" game_cols = ["ID", "ID_random", "Event", "Tournament", "ECO", "Opening", "TimeControl", "Termination", "DateTime"] @@ -362,7 +362,7 @@ def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool, ) .set_sorted("DateTime") ) - + d_rev_result = {"1-0": "0-1", "0-1": "1-0", "1/2-1/2": "1/2-1/2", "?": "?", "*": "*"} # Convert to player-game-role format: i.e., duplicate each game switching White and Black lf_inv = lf.select( @@ -413,4 +413,11 @@ def _ndjson_to_parquet(ndjson_path: str, parquet_path: str, include_moves: bool, # gzip and use_pyarrow are required for default Apache Drill compatibility lf.collect(streaming=True).write_parquet(parquet_path, compression='gzip', use_pyarrow=True) - return parquet_path + + try: + os.remove(ndjson_path) + except Exception as e: + logging.error(f"Error cleaning up temporary file: {e}") + + + return None From 3c62aac6cbdf60f3c4ae0e359f2f61d003a7866d Mon Sep 17 00:00:00 2001 From: Javier Garcia-Bernardo Date: Wed, 10 Jul 2024 20:44:14 +0200 Subject: [PATCH 5/5] add cum_file per month --- ingester.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/ingester.py b/ingester.py index 18734e1..a764798 100644 --- a/ingester.py +++ b/ingester.py @@ -59,13 +59,20 @@ def ingest_lichess_data(year: int, # Use temp file for cumulative values try: + # Read the cumulative values from the previous month + if month == 1: + year_cum = year - 1 + month_cum = 12 + else: + year_cum = year + month_cum = month - 1 if isinstance(fs, s3fs.core.S3FileSystem): # read cum_files from S3 point - with fs.open(f"{dir_parquet}/cum_files.json.zst", mode='rb') as fin: + with fs.open(f"{dir_parquet}/cum_files_{year_cum}_{month_cum}.json.zst", mode='rb') as fin: decompressed_bytes = zstd.ZstdDecompressor().decompress(fin.read()) else: # Read and decompress the data - with open(f"{dir_parquet}/cum_files.json.zst", 'rb') as fin: + with open(f"{dir_parquet}/cum_files_{year_cum}_{month_cum}.json.zst", 'rb') as fin: decompressed_bytes = zstd.ZstdDecompressor().decompress(fin.read()) # Convert bytes back to JSON object @@ -73,7 +80,7 @@ def ingest_lichess_data(year: int, except FileNotFoundError: - logging.debug("Cumulative file not found, recreating from scratch") + logging.debug("Cumulative file not found for year %s and month %s. Recreating it from scratch", year_cum, month_cum) d_cum_games = dict() d_cum_games["All"] = defaultdict(int) @@ -263,11 +270,11 @@ def ingest_lichess_data(year: int, compressed_bytes = zstd.ZstdCompressor().compress(json.dumps(d_cum_games).encode('utf-8')) if isinstance(fs, s3fs.core.S3FileSystem): # read cum_files from S3 point - with fs.open(f"{dir_parquet}/cum_files.json.zst", mode='wb') as fout: + with fs.open(f"{dir_parquet}/cum_files_{year}_{month}.json.zst", mode='wb') as fout: fout.write(compressed_bytes) else: # Read and decompress the data - with open(f"{dir_parquet}/cum_files.json.zst", 'wb') as fout: + with open(f"{dir_parquet}/cum_files_{year}_{month}.json.zst", 'wb') as fout: fout.write(compressed_bytes) return None