File size: 4,147 Bytes
dbff7e5
c7370ec
dbff7e5
 
 
d637ff8
dbff7e5
 
 
 
 
c7370ec
dbff7e5
 
 
 
 
 
 
 
 
 
 
 
c7370ec
dbff7e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0487dc0
 
 
dbff7e5
 
 
 
ba0ca62
dbff7e5
ba0ca62
dbff7e5
ba0ca62
 
dbff7e5
ba0ca62
c7370ec
ba0ca62
 
 
c7370ec
ba0ca62
 
dbff7e5
ba0ca62
dbff7e5
5752e4d
 
 
c7370ec
 
 
 
 
 
 
 
 
5752e4d
 
 
 
 
 
 
 
 
 
 
 
f7ad8ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbff7e5
 
ba0ca62
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import pandas as pd
from utils import DATA_DIR, TMP_DIR, transform_to_datetime


def clean_old_data_from_parquet_files(cutoff_date: str):
    print("Cleaning oldest data")
    # Convert the string to datetime64[ns, UTC]
    min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True)

    # clean tools.parquet
    try:
        tools = pd.read_parquet(TMP_DIR / "tools.parquet")

        # make sure creator_address is in the columns
        assert "trader_address" in tools.columns, "trader_address column not found"

        # lowercase and strip creator_address
        tools["trader_address"] = tools["trader_address"].str.lower().str.strip()

        tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True)

        print(f"length before filtering {len(tools)}")
        tools = tools.loc[tools["request_time"] > min_date_utc]
        print(f"length after filtering {len(tools)}")
        tools.to_parquet(TMP_DIR / "tools.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning tools file {e}")

    # clean all_trades_profitability.parquet
    try:
        all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")

        all_trades["creation_timestamp"] = pd.to_datetime(
            all_trades["creation_timestamp"], utc=True
        )

        print(f"length before filtering {len(all_trades)}")
        all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc]
        print(f"length after filtering {len(all_trades)}")
        all_trades.to_parquet(
            DATA_DIR / "all_trades_profitability.parquet", index=False
        )

    except Exception as e:
        print(f"Error cleaning all trades profitability file {e}")

    # clean unknown_traders.parquet
    try:
        unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")

        unknown_traders["creation_timestamp"] = pd.to_datetime(
            unknown_traders["creation_timestamp"], utc=True
        )

        print(f"length unknown traders before filtering {len(unknown_traders)}")
        unknown_traders = unknown_traders.loc[
            unknown_traders["creation_timestamp"] > min_date_utc
        ]
        print(f"length unknown traders after filtering {len(unknown_traders)}")
        unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning unknown_traders file {e}")

    # clean fpmmTrades.parquet
    try:
        fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
        try:
            fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
                lambda x: transform_to_datetime(x)
            )
        except Exception as e:
            print(f"Transformation not needed")
        fpmmTrades["creation_timestamp"] = pd.to_datetime(
            fpmmTrades["creationTimestamp"]
        )
        fpmmTrades["creation_timestamp"] = pd.to_datetime(
            fpmmTrades["creation_timestamp"], utc=True
        )

        print(f"length before filtering {len(fpmmTrades)}")
        fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc]
        print(f"length after filtering {len(fpmmTrades)}")
        fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning fpmmTrades file {e}")

    # clean invalid trades parquet
    try:
        invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet")

        invalid_trades["creation_timestamp"] = pd.to_datetime(
            invalid_trades["creation_timestamp"], utc=True
        )

        print(f"length before filtering {len(invalid_trades)}")
        invalid_trades = invalid_trades.loc[
            invalid_trades["creation_timestamp"] > min_date_utc
        ]
        print(f"length after filtering {len(invalid_trades)}")
        invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning fpmmTrades file {e}")


if __name__ == "__main__":
    clean_old_data_from_parquet_files("2024-10-25")