tt-creators / creators.py
markojak's picture
Upload folder using huggingface_hub
46e6e62 verified
#!/usr/bin/env python3
import os
import glob
import pandas as pd
import gradio as gr
import time
import pyarrow as pa
import pyarrow.parquet as pq
import json
from pathlib import Path
# Configuration
DATA_DIR = Path("../data/tiktok_profiles")
CACHE_FILE = Path("../data/tiktok_profiles_combined.parquet")
PROCESSED_FILES_LOG = Path("../data/processed_files.json")
COLUMNS = [
"id",
"unique_id",
"follower_count",
"nickname",
"video_count",
"following_count",
"signature",
"email",
"bio_link",
"updated_at",
"tt_seller",
"region",
"language",
"url",
]
def get_processed_files():
"""
Get the list of already processed files from the log.
Returns a set of filenames that have been processed.
"""
if PROCESSED_FILES_LOG.exists():
with open(PROCESSED_FILES_LOG, "r") as f:
return set(json.load(f))
return set()
def update_processed_files(processed_files):
"""
Update the log of processed files.
"""
PROCESSED_FILES_LOG.parent.mkdir(exist_ok=True)
with open(PROCESSED_FILES_LOG, "w") as f:
json.dump(list(processed_files), f)
def load_data(force_reload=False):
"""
Load data from either the cache file or from individual CSV files.
Only processes new files that haven't been processed before.
Returns a pandas DataFrame with all the data.
Args:
force_reload: If True, reprocess all files regardless of whether they've been processed before.
"""
start_time = time.time()
# Get all available CSV files
all_csv_files = {file.name: file for file in DATA_DIR.glob("*.csv")}
# If cache exists and we're not forcing a reload, load from cache
if CACHE_FILE.exists() and not force_reload:
print(f"Loading data from cache file: {CACHE_FILE}")
df = pd.read_parquet(CACHE_FILE)
# Check for new files
processed_files = get_processed_files()
new_files = [
all_csv_files[name] for name in all_csv_files if name not in processed_files
]
if not new_files:
print(
f"No new files to process. Data loaded in {time.time() - start_time:.2f} seconds"
)
return df
print(f"Found {len(new_files)} new files to process")
# Process only the new files
new_dfs = []
for i, file in enumerate(new_files):
print(f"Loading new file {i+1}/{len(new_files)}: {file.name}")
# Read CSV with optimized settings
chunk_df = pd.read_csv(
file,
dtype={
"id": "str",
"unique_id": "str",
"follower_count": "Int64",
"nickname": "str",
"video_count": "Int64",
"following_count": "Int64",
"signature": "str",
"email": "str",
"bio_link": "str",
"updated_at": "str",
"tt_seller": "str",
"region": "str",
"language": "str",
"url": "str",
},
low_memory=False,
)
new_dfs.append(chunk_df)
processed_files.add(file.name)
if new_dfs:
# Combine new data with existing data
print("Combining new data with existing data...")
new_data = pd.concat(new_dfs, ignore_index=True)
df = pd.concat([df, new_data], ignore_index=True)
# Remove duplicates based on unique_id
df = df.drop_duplicates(subset=["unique_id"], keep="last")
# Save updated data to cache file
print(f"Saving updated data to {CACHE_FILE}")
df.to_parquet(CACHE_FILE, index=False)
# Update the processed files log
update_processed_files(processed_files)
print(f"Data loaded and updated in {time.time() - start_time:.2f} seconds")
return df
# If no cache file or force_reload is True, process all files
print(f"Loading data from CSV files in {DATA_DIR}")
# Get all CSV files
csv_files = list(all_csv_files.values())
total_files = len(csv_files)
print(f"Found {total_files} CSV files")
# Load data in chunks
dfs = []
processed_files = set()
for i, file in enumerate(csv_files):
if i % 10 == 0:
print(f"Loading file {i+1}/{total_files}: {file.name}")
# Read CSV with optimized settings
chunk_df = pd.read_csv(
file,
dtype={
"id": "str",
"unique_id": "str",
"follower_count": "Int64",
"nickname": "str",
"video_count": "Int64",
"following_count": "Int64",
"signature": "str",
"email": "str",
"bio_link": "str",
"updated_at": "str",
"tt_seller": "str",
"region": "str",
"language": "str",
"url": "str",
},
low_memory=False,
)
dfs.append(chunk_df)
processed_files.add(file.name)
# Combine all dataframes
print("Combining all dataframes...")
df = pd.concat(dfs, ignore_index=True)
# Remove duplicates based on unique_id
df = df.drop_duplicates(subset=["unique_id"], keep="last")
# Save to cache file
print(f"Saving combined data to {CACHE_FILE}")
CACHE_FILE.parent.mkdir(exist_ok=True)
df.to_parquet(CACHE_FILE, index=False)
# Update the processed files log
update_processed_files(processed_files)
print(f"Data loaded and cached in {time.time() - start_time:.2f} seconds")
return df
def search_by_username(df, username):
"""Search for profiles by username (unique_id)"""
if not username:
return pd.DataFrame()
# Case-insensitive search
results = df[df["unique_id"].str.lower().str.contains(username.lower(), na=False)]
return results.head(100) # Limit results to prevent UI overload
def search_by_nickname(df, nickname):
"""Search for profiles by nickname"""
if not nickname:
return pd.DataFrame()
# Case-insensitive search
results = df[df["nickname"].str.lower().str.contains(nickname.lower(), na=False)]
return results.head(100) # Limit results to prevent UI overload
def search_by_follower_count(df, min_followers, max_followers):
"""Search for profiles by follower count range"""
if min_followers is None:
min_followers = 0
if max_followers is None:
max_followers = df["follower_count"].max()
results = df[
(df["follower_count"] >= min_followers)
& (df["follower_count"] <= max_followers)
]
return results.head(100) # Limit results to prevent UI overload
def format_results(df):
"""Format the results for display"""
if df.empty:
# Return an empty DataFrame with the same columns instead of a string
return pd.DataFrame(columns=df.columns)
# Format the DataFrame for display
display_df = df.copy()
# Convert follower count to human-readable format
def format_number(num):
if pd.isna(num):
return "N/A"
if num >= 1_000_000:
return f"{num/1_000_000:.1f}M"
elif num >= 1_000:
return f"{num/1_000:.1f}K"
return str(num)
display_df["follower_count"] = display_df["follower_count"].apply(format_number)
display_df["video_count"] = display_df["video_count"].apply(format_number)
display_df["following_count"] = display_df["following_count"].apply(format_number)
return display_df
def combined_search(
df,
min_followers,
max_followers,
min_videos,
max_videos,
signature_query,
region,
has_email,
):
"""Combined search function using all criteria"""
results = df.copy()
# Apply each filter if provided
if min_followers is not None:
results = results[results["follower_count"] >= min_followers]
if max_followers is not None:
results = results[results["follower_count"] <= max_followers]
if min_videos is not None:
results = results[results["video_count"] >= min_videos]
if max_videos is not None:
results = results[results["video_count"] <= max_videos]
if signature_query:
results = results[
results["signature"]
.str.lower()
.str.contains(signature_query.lower(), na=False)
]
if region:
results = results[results["region"].str.lower() == region.lower()]
# Filter for profiles with email
if has_email:
results = results[results["email"].notna() & (results["email"] != "")]
return results.head(1000) # Limit to 1000 results to prevent UI overload
def create_interface(df):
"""Create the Gradio interface"""
# Get min and max follower counts for slider
min_followers_global = max(1000, int(df["follower_count"].min()))
max_followers_global = min(10000000, int(df["follower_count"].max()))
# Get min and max video counts for slider
min_videos_global = max(1, int(df["video_count"].min()))
max_videos_global = min(10000, int(df["video_count"].max()))
# Get unique regions for dropdown
regions = sorted(df["region"].dropna().unique().tolist())
regions = [""] + regions # Add empty option
with gr.Blocks(title="TikTok Creator Analyzer") as interface:
gr.Markdown("# TikTok Creator Analyzer")
gr.Markdown(f"Database contains {len(df):,} creator profiles")
# Show top 100 profiles by default
top_profiles = df.sort_values(by="follower_count", ascending=False).head(100)
default_view = format_results(top_profiles)
with gr.Tab("Overview"):
gr.Markdown("## Top 100 Profiles by Follower Count")
overview_results = gr.Dataframe(value=default_view, label="Top Profiles")
refresh_btn = gr.Button("Refresh")
refresh_btn.click(
fn=lambda: format_results(
df.sort_values(by="follower_count", ascending=False).head(100)
),
inputs=[],
outputs=overview_results,
)
with gr.Tab("Advanced Search"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Follower Count")
min_followers_slider = gr.Slider(
minimum=min_followers_global,
maximum=max_followers_global,
value=min_followers_global,
step=1000,
label="Minimum Followers",
interactive=True,
)
max_followers_slider = gr.Slider(
minimum=min_followers_global,
maximum=max_followers_global,
value=max_followers_global,
step=1000,
label="Maximum Followers",
interactive=True,
)
gr.Markdown("### Video Count")
min_videos_slider = gr.Slider(
minimum=min_videos_global,
maximum=max_videos_global,
value=min_videos_global,
step=10,
label="Minimum Videos",
interactive=True,
)
max_videos_slider = gr.Slider(
minimum=min_videos_global,
maximum=max_videos_global,
value=max_videos_global,
step=10,
label="Maximum Videos",
interactive=True,
)
with gr.Column(scale=1):
signature_input = gr.Textbox(label="Keywords in Signature")
region_input = gr.Dropdown(label="Region", choices=regions)
has_email_checkbox = gr.Checkbox(label="Has Email", value=False)
search_btn = gr.Button("Search", variant="primary", size="lg")
results_count = gr.Markdown("### Results: 0 profiles found")
# Create a dataframe with download button
with gr.Row():
search_results = gr.Dataframe(label="Results")
download_btn = gr.Button("Download Results as CSV")
# Function to update results count
def update_results_count(results_df):
count = len(results_df)
return f"### Results: {count:,} profiles found"
# Function to perform search and update results
def perform_search(
min_followers,
max_followers,
min_videos,
max_videos,
signature,
region,
has_email,
):
results = combined_search(
df,
min_followers,
max_followers,
min_videos,
max_videos,
signature,
region,
has_email,
)
formatted_results = format_results(results)
count_text = update_results_count(results)
return formatted_results, count_text
# Function to download results as CSV
def download_results(results_df):
if results_df.empty:
return None
# Convert back to original format for download
download_df = df[df["unique_id"].isin(results_df["unique_id"])]
# Save to temporary CSV file
temp_csv = "temp_results.csv"
download_df.to_csv(temp_csv, index=False)
return temp_csv
# Connect the search button
search_btn.click(
fn=perform_search,
inputs=[
min_followers_slider,
max_followers_slider,
min_videos_slider,
max_videos_slider,
signature_input,
region_input,
has_email_checkbox,
],
outputs=[search_results, results_count],
)
# Connect the download button
download_btn.click(
fn=download_results,
inputs=[search_results],
outputs=[gr.File(label="Download")],
)
with gr.Tab("Statistics"):
gr.Markdown("## Database Statistics")
# Calculate some basic statistics
total_creators = len(df)
total_followers = df["follower_count"].sum()
avg_followers = df["follower_count"].mean()
median_followers = df["follower_count"].median()
max_followers = df["follower_count"].max()
stats_md = f"""
- Total Creators: {total_creators:,}
- Total Followers: {total_followers:,}
- Average Followers: {avg_followers:,.2f}
- Median Followers: {median_followers:,}
- Max Followers: {max_followers:,}
"""
gr.Markdown(stats_md)
with gr.Tab("Maintenance"):
gr.Markdown("## Database Maintenance")
# Get processed files info
processed_files = get_processed_files()
maintenance_md = f"""
- Total processed files: {len(processed_files)}
- Last update: {time.ctime(CACHE_FILE.stat().st_mtime) if CACHE_FILE.exists() else 'Never'}
"""
gr.Markdown(maintenance_md)
with gr.Row():
force_reload_btn = gr.Button("Force Reload All Files")
reload_status = gr.Markdown("Click to reload all files from scratch")
def reload_all_files():
return "Reloading all files... This may take a while. Please restart the application."
force_reload_btn.click(
fn=reload_all_files, inputs=[], outputs=reload_status
)
return interface
def main():
print("Loading TikTok creator data...")
df = load_data()
print(f"Loaded {len(df):,} creator profiles")
# Create and launch the interface
interface = create_interface(df)
interface.launch(share=True, server_name="0.0.0.0")
if __name__ == "__main__":
main()