Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
import os | |
import glob | |
import pandas as pd | |
import gradio as gr | |
import time | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
import json | |
from pathlib import Path | |
# Configuration | |
DATA_DIR = Path("../data/tiktok_profiles") | |
CACHE_FILE = Path("../data/tiktok_profiles_combined.parquet") | |
PROCESSED_FILES_LOG = Path("../data/processed_files.json") | |
COLUMNS = [ | |
"id", | |
"unique_id", | |
"follower_count", | |
"nickname", | |
"video_count", | |
"following_count", | |
"signature", | |
"email", | |
"bio_link", | |
"updated_at", | |
"tt_seller", | |
"region", | |
"language", | |
"url", | |
] | |
def get_processed_files(): | |
""" | |
Get the list of already processed files from the log. | |
Returns a set of filenames that have been processed. | |
""" | |
if PROCESSED_FILES_LOG.exists(): | |
with open(PROCESSED_FILES_LOG, "r") as f: | |
return set(json.load(f)) | |
return set() | |
def update_processed_files(processed_files): | |
""" | |
Update the log of processed files. | |
""" | |
PROCESSED_FILES_LOG.parent.mkdir(exist_ok=True) | |
with open(PROCESSED_FILES_LOG, "w") as f: | |
json.dump(list(processed_files), f) | |
def load_data(force_reload=False): | |
""" | |
Load data from either the cache file or from individual CSV files. | |
Only processes new files that haven't been processed before. | |
Returns a pandas DataFrame with all the data. | |
Args: | |
force_reload: If True, reprocess all files regardless of whether they've been processed before. | |
""" | |
start_time = time.time() | |
# Get all available CSV files | |
all_csv_files = {file.name: file for file in DATA_DIR.glob("*.csv")} | |
# If cache exists and we're not forcing a reload, load from cache | |
if CACHE_FILE.exists() and not force_reload: | |
print(f"Loading data from cache file: {CACHE_FILE}") | |
df = pd.read_parquet(CACHE_FILE) | |
# Check for new files | |
processed_files = get_processed_files() | |
new_files = [ | |
all_csv_files[name] for name in all_csv_files if name not in processed_files | |
] | |
if not new_files: | |
print( | |
f"No new files to process. Data loaded in {time.time() - start_time:.2f} seconds" | |
) | |
return df | |
print(f"Found {len(new_files)} new files to process") | |
# Process only the new files | |
new_dfs = [] | |
for i, file in enumerate(new_files): | |
print(f"Loading new file {i+1}/{len(new_files)}: {file.name}") | |
# Read CSV with optimized settings | |
chunk_df = pd.read_csv( | |
file, | |
dtype={ | |
"id": "str", | |
"unique_id": "str", | |
"follower_count": "Int64", | |
"nickname": "str", | |
"video_count": "Int64", | |
"following_count": "Int64", | |
"signature": "str", | |
"email": "str", | |
"bio_link": "str", | |
"updated_at": "str", | |
"tt_seller": "str", | |
"region": "str", | |
"language": "str", | |
"url": "str", | |
}, | |
low_memory=False, | |
) | |
new_dfs.append(chunk_df) | |
processed_files.add(file.name) | |
if new_dfs: | |
# Combine new data with existing data | |
print("Combining new data with existing data...") | |
new_data = pd.concat(new_dfs, ignore_index=True) | |
df = pd.concat([df, new_data], ignore_index=True) | |
# Remove duplicates based on unique_id | |
df = df.drop_duplicates(subset=["unique_id"], keep="last") | |
# Save updated data to cache file | |
print(f"Saving updated data to {CACHE_FILE}") | |
df.to_parquet(CACHE_FILE, index=False) | |
# Update the processed files log | |
update_processed_files(processed_files) | |
print(f"Data loaded and updated in {time.time() - start_time:.2f} seconds") | |
return df | |
# If no cache file or force_reload is True, process all files | |
print(f"Loading data from CSV files in {DATA_DIR}") | |
# Get all CSV files | |
csv_files = list(all_csv_files.values()) | |
total_files = len(csv_files) | |
print(f"Found {total_files} CSV files") | |
# Load data in chunks | |
dfs = [] | |
processed_files = set() | |
for i, file in enumerate(csv_files): | |
if i % 10 == 0: | |
print(f"Loading file {i+1}/{total_files}: {file.name}") | |
# Read CSV with optimized settings | |
chunk_df = pd.read_csv( | |
file, | |
dtype={ | |
"id": "str", | |
"unique_id": "str", | |
"follower_count": "Int64", | |
"nickname": "str", | |
"video_count": "Int64", | |
"following_count": "Int64", | |
"signature": "str", | |
"email": "str", | |
"bio_link": "str", | |
"updated_at": "str", | |
"tt_seller": "str", | |
"region": "str", | |
"language": "str", | |
"url": "str", | |
}, | |
low_memory=False, | |
) | |
dfs.append(chunk_df) | |
processed_files.add(file.name) | |
# Combine all dataframes | |
print("Combining all dataframes...") | |
df = pd.concat(dfs, ignore_index=True) | |
# Remove duplicates based on unique_id | |
df = df.drop_duplicates(subset=["unique_id"], keep="last") | |
# Save to cache file | |
print(f"Saving combined data to {CACHE_FILE}") | |
CACHE_FILE.parent.mkdir(exist_ok=True) | |
df.to_parquet(CACHE_FILE, index=False) | |
# Update the processed files log | |
update_processed_files(processed_files) | |
print(f"Data loaded and cached in {time.time() - start_time:.2f} seconds") | |
return df | |
def search_by_username(df, username): | |
"""Search for profiles by username (unique_id)""" | |
if not username: | |
return pd.DataFrame() | |
# Case-insensitive search | |
results = df[df["unique_id"].str.lower().str.contains(username.lower(), na=False)] | |
return results.head(100) # Limit results to prevent UI overload | |
def search_by_nickname(df, nickname): | |
"""Search for profiles by nickname""" | |
if not nickname: | |
return pd.DataFrame() | |
# Case-insensitive search | |
results = df[df["nickname"].str.lower().str.contains(nickname.lower(), na=False)] | |
return results.head(100) # Limit results to prevent UI overload | |
def search_by_follower_count(df, min_followers, max_followers): | |
"""Search for profiles by follower count range""" | |
if min_followers is None: | |
min_followers = 0 | |
if max_followers is None: | |
max_followers = df["follower_count"].max() | |
results = df[ | |
(df["follower_count"] >= min_followers) | |
& (df["follower_count"] <= max_followers) | |
] | |
return results.head(100) # Limit results to prevent UI overload | |
def format_results(df): | |
"""Format the results for display""" | |
if df.empty: | |
# Return an empty DataFrame with the same columns instead of a string | |
return pd.DataFrame(columns=df.columns) | |
# Format the DataFrame for display | |
display_df = df.copy() | |
# Convert follower count to human-readable format | |
def format_number(num): | |
if pd.isna(num): | |
return "N/A" | |
if num >= 1_000_000: | |
return f"{num/1_000_000:.1f}M" | |
elif num >= 1_000: | |
return f"{num/1_000:.1f}K" | |
return str(num) | |
display_df["follower_count"] = display_df["follower_count"].apply(format_number) | |
display_df["video_count"] = display_df["video_count"].apply(format_number) | |
display_df["following_count"] = display_df["following_count"].apply(format_number) | |
return display_df | |
def combined_search( | |
df, | |
min_followers, | |
max_followers, | |
min_videos, | |
max_videos, | |
signature_query, | |
region, | |
has_email, | |
): | |
"""Combined search function using all criteria""" | |
results = df.copy() | |
# Apply each filter if provided | |
if min_followers is not None: | |
results = results[results["follower_count"] >= min_followers] | |
if max_followers is not None: | |
results = results[results["follower_count"] <= max_followers] | |
if min_videos is not None: | |
results = results[results["video_count"] >= min_videos] | |
if max_videos is not None: | |
results = results[results["video_count"] <= max_videos] | |
if signature_query: | |
results = results[ | |
results["signature"] | |
.str.lower() | |
.str.contains(signature_query.lower(), na=False) | |
] | |
if region: | |
results = results[results["region"].str.lower() == region.lower()] | |
# Filter for profiles with email | |
if has_email: | |
results = results[results["email"].notna() & (results["email"] != "")] | |
return results.head(1000) # Limit to 1000 results to prevent UI overload | |
def create_interface(df): | |
"""Create the Gradio interface""" | |
# Get min and max follower counts for slider | |
min_followers_global = max(1000, int(df["follower_count"].min())) | |
max_followers_global = min(10000000, int(df["follower_count"].max())) | |
# Get min and max video counts for slider | |
min_videos_global = max(1, int(df["video_count"].min())) | |
max_videos_global = min(10000, int(df["video_count"].max())) | |
# Get unique regions for dropdown | |
regions = sorted(df["region"].dropna().unique().tolist()) | |
regions = [""] + regions # Add empty option | |
with gr.Blocks(title="TikTok Creator Analyzer") as interface: | |
gr.Markdown("# TikTok Creator Analyzer") | |
gr.Markdown(f"Database contains {len(df):,} creator profiles") | |
# Show top 100 profiles by default | |
top_profiles = df.sort_values(by="follower_count", ascending=False).head(100) | |
default_view = format_results(top_profiles) | |
with gr.Tab("Overview"): | |
gr.Markdown("## Top 100 Profiles by Follower Count") | |
overview_results = gr.Dataframe(value=default_view, label="Top Profiles") | |
refresh_btn = gr.Button("Refresh") | |
refresh_btn.click( | |
fn=lambda: format_results( | |
df.sort_values(by="follower_count", ascending=False).head(100) | |
), | |
inputs=[], | |
outputs=overview_results, | |
) | |
with gr.Tab("Advanced Search"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Follower Count") | |
min_followers_slider = gr.Slider( | |
minimum=min_followers_global, | |
maximum=max_followers_global, | |
value=min_followers_global, | |
step=1000, | |
label="Minimum Followers", | |
interactive=True, | |
) | |
max_followers_slider = gr.Slider( | |
minimum=min_followers_global, | |
maximum=max_followers_global, | |
value=max_followers_global, | |
step=1000, | |
label="Maximum Followers", | |
interactive=True, | |
) | |
gr.Markdown("### Video Count") | |
min_videos_slider = gr.Slider( | |
minimum=min_videos_global, | |
maximum=max_videos_global, | |
value=min_videos_global, | |
step=10, | |
label="Minimum Videos", | |
interactive=True, | |
) | |
max_videos_slider = gr.Slider( | |
minimum=min_videos_global, | |
maximum=max_videos_global, | |
value=max_videos_global, | |
step=10, | |
label="Maximum Videos", | |
interactive=True, | |
) | |
with gr.Column(scale=1): | |
signature_input = gr.Textbox(label="Keywords in Signature") | |
region_input = gr.Dropdown(label="Region", choices=regions) | |
has_email_checkbox = gr.Checkbox(label="Has Email", value=False) | |
search_btn = gr.Button("Search", variant="primary", size="lg") | |
results_count = gr.Markdown("### Results: 0 profiles found") | |
# Create a dataframe with download button | |
with gr.Row(): | |
search_results = gr.Dataframe(label="Results") | |
download_btn = gr.Button("Download Results as CSV") | |
# Function to update results count | |
def update_results_count(results_df): | |
count = len(results_df) | |
return f"### Results: {count:,} profiles found" | |
# Function to perform search and update results | |
def perform_search( | |
min_followers, | |
max_followers, | |
min_videos, | |
max_videos, | |
signature, | |
region, | |
has_email, | |
): | |
results = combined_search( | |
df, | |
min_followers, | |
max_followers, | |
min_videos, | |
max_videos, | |
signature, | |
region, | |
has_email, | |
) | |
formatted_results = format_results(results) | |
count_text = update_results_count(results) | |
return formatted_results, count_text | |
# Function to download results as CSV | |
def download_results(results_df): | |
if results_df.empty: | |
return None | |
# Convert back to original format for download | |
download_df = df[df["unique_id"].isin(results_df["unique_id"])] | |
# Save to temporary CSV file | |
temp_csv = "temp_results.csv" | |
download_df.to_csv(temp_csv, index=False) | |
return temp_csv | |
# Connect the search button | |
search_btn.click( | |
fn=perform_search, | |
inputs=[ | |
min_followers_slider, | |
max_followers_slider, | |
min_videos_slider, | |
max_videos_slider, | |
signature_input, | |
region_input, | |
has_email_checkbox, | |
], | |
outputs=[search_results, results_count], | |
) | |
# Connect the download button | |
download_btn.click( | |
fn=download_results, | |
inputs=[search_results], | |
outputs=[gr.File(label="Download")], | |
) | |
with gr.Tab("Statistics"): | |
gr.Markdown("## Database Statistics") | |
# Calculate some basic statistics | |
total_creators = len(df) | |
total_followers = df["follower_count"].sum() | |
avg_followers = df["follower_count"].mean() | |
median_followers = df["follower_count"].median() | |
max_followers = df["follower_count"].max() | |
stats_md = f""" | |
- Total Creators: {total_creators:,} | |
- Total Followers: {total_followers:,} | |
- Average Followers: {avg_followers:,.2f} | |
- Median Followers: {median_followers:,} | |
- Max Followers: {max_followers:,} | |
""" | |
gr.Markdown(stats_md) | |
with gr.Tab("Maintenance"): | |
gr.Markdown("## Database Maintenance") | |
# Get processed files info | |
processed_files = get_processed_files() | |
maintenance_md = f""" | |
- Total processed files: {len(processed_files)} | |
- Last update: {time.ctime(CACHE_FILE.stat().st_mtime) if CACHE_FILE.exists() else 'Never'} | |
""" | |
gr.Markdown(maintenance_md) | |
with gr.Row(): | |
force_reload_btn = gr.Button("Force Reload All Files") | |
reload_status = gr.Markdown("Click to reload all files from scratch") | |
def reload_all_files(): | |
return "Reloading all files... This may take a while. Please restart the application." | |
force_reload_btn.click( | |
fn=reload_all_files, inputs=[], outputs=reload_status | |
) | |
return interface | |
def main(): | |
print("Loading TikTok creator data...") | |
df = load_data() | |
print(f"Loaded {len(df):,} creator profiles") | |
# Create and launch the interface | |
interface = create_interface(df) | |
interface.launch(share=True, server_name="0.0.0.0") | |
if __name__ == "__main__": | |
main() | |