File size: 4,147 Bytes
dbff7e5 c7370ec dbff7e5 d637ff8 dbff7e5 c7370ec dbff7e5 c7370ec dbff7e5 0487dc0 dbff7e5 ba0ca62 dbff7e5 ba0ca62 dbff7e5 ba0ca62 dbff7e5 ba0ca62 c7370ec ba0ca62 c7370ec ba0ca62 dbff7e5 ba0ca62 dbff7e5 5752e4d c7370ec 5752e4d f7ad8ae dbff7e5 ba0ca62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import pandas as pd
from utils import DATA_DIR, TMP_DIR, transform_to_datetime
def clean_old_data_from_parquet_files(cutoff_date: str):
print("Cleaning oldest data")
# Convert the string to datetime64[ns, UTC]
min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True)
# clean tools.parquet
try:
tools = pd.read_parquet(TMP_DIR / "tools.parquet")
# make sure creator_address is in the columns
assert "trader_address" in tools.columns, "trader_address column not found"
# lowercase and strip creator_address
tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True)
print(f"length before filtering {len(tools)}")
tools = tools.loc[tools["request_time"] > min_date_utc]
print(f"length after filtering {len(tools)}")
tools.to_parquet(TMP_DIR / "tools.parquet", index=False)
except Exception as e:
print(f"Error cleaning tools file {e}")
# clean all_trades_profitability.parquet
try:
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
all_trades["creation_timestamp"] = pd.to_datetime(
all_trades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(all_trades)}")
all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc]
print(f"length after filtering {len(all_trades)}")
all_trades.to_parquet(
DATA_DIR / "all_trades_profitability.parquet", index=False
)
except Exception as e:
print(f"Error cleaning all trades profitability file {e}")
# clean unknown_traders.parquet
try:
unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")
unknown_traders["creation_timestamp"] = pd.to_datetime(
unknown_traders["creation_timestamp"], utc=True
)
print(f"length unknown traders before filtering {len(unknown_traders)}")
unknown_traders = unknown_traders.loc[
unknown_traders["creation_timestamp"] > min_date_utc
]
print(f"length unknown traders after filtering {len(unknown_traders)}")
unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)
except Exception as e:
print(f"Error cleaning unknown_traders file {e}")
# clean fpmmTrades.parquet
try:
fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
try:
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
fpmmTrades["creation_timestamp"] = pd.to_datetime(
fpmmTrades["creationTimestamp"]
)
fpmmTrades["creation_timestamp"] = pd.to_datetime(
fpmmTrades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(fpmmTrades)}")
fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc]
print(f"length after filtering {len(fpmmTrades)}")
fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False)
except Exception as e:
print(f"Error cleaning fpmmTrades file {e}")
# clean invalid trades parquet
try:
invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet")
invalid_trades["creation_timestamp"] = pd.to_datetime(
invalid_trades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(invalid_trades)}")
invalid_trades = invalid_trades.loc[
invalid_trades["creation_timestamp"] > min_date_utc
]
print(f"length after filtering {len(invalid_trades)}")
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)
except Exception as e:
print(f"Error cleaning fpmmTrades file {e}")
if __name__ == "__main__":
clean_old_data_from_parquet_files("2024-10-25")
|