Spaces:
Sleeping
Sleeping
File size: 8,511 Bytes
188fc65 76015ae 188fc65 ca782c7 188fc65 3534d27 188fc65 d55ca1e dee8db2 8b46b25 683ced8 88d993b d55ca1e 683ced8 d55ca1e 683ced8 8b46b25 61e3519 1d43e3c 204cdbd 3534d27 4324fcb 910b931 1d43e3c 4324fcb 3534d27 1d43e3c 3534d27 204cdbd 4324fcb 3534d27 188fc65 82cc8a9 3534d27 188fc65 f9d2095 8b71f28 f9d2095 f62fbb7 cd0bd46 f62fbb7 cd0bd46 f62fbb7 cd0bd46 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
from collections import Counter
import matplotlib.pyplot as plt
import pandas as pd
import praw # Reddit's API
import re # Regular expression module
import streamlit as st
import time
import numpy as np
from wordcloud import WordCloud
from transformers import (
pipeline,
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForTokenClassification,
TokenClassificationPipeline
)
from transformers.pipelines import AggregationStrategy
# Function to normalize text by replacing multiple spaces/newlines with a single space
def normalize_text(text):
if not isinstance(text, str):
return ""
return re.sub(r'\s+', ' ', text).strip()
# ---------- Cached function for scraping Reddit data ----------
# @st.cache_data(show_spinner=False)
def scrape_reddit_data(search_query, total_limit):
# Retrieve API credentials from st.secrets
reddit = praw.Reddit(
client_id=st.secrets["reddit_client_id"],
client_secret=st.secrets["reddit_client_secret"],
user_agent=st.secrets["reddit_user_agent"]
)
subreddit = reddit.subreddit("all")
posts_data = []
# Iterate over submissions based on the search query and limit
for i, submission in enumerate(subreddit.search(search_query, sort="relevance", limit=total_limit)):
# No UI updates here as caching does not allow live progress updates
if submission.title and submission.selftext:
posts_data.append([
submission.title,
submission.url,
submission.created_utc,
submission.selftext,
])
time.sleep(0.25)
df = pd.DataFrame(posts_data, columns=["Title", "URL", "Date", "Detail"])
for col in ["Title", "Detail"]:
df[col] = df[col].apply(normalize_text)
# Filter out rows with empty Title or Detail
df = df[(df["Title"] != "") & (df["Detail"] != "")]
df['Date'] = pd.to_datetime(df['Date'], unit='s')
df = df.sort_values(by="Date", ascending=True).reset_index(drop=True)
return df
# ------------------ Sentiment Analysis Functions ------------------------#
def split_text_by_token_limit(text, tokenizer, max_tokens):
tokens = tokenizer.encode(text, add_special_tokens=False)
chunks = []
for i in range(0, len(tokens), max_tokens):
chunk_tokens = tokens[i:i+max_tokens]
chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
chunks.append(chunk_text)
return chunks
# def safe_sentiment(sentiment_pipeline, text, length, progress_bar):
# try:
# result = sentiment_pipeline(text)[0]
# except Exception as e:
# result = None
# if "count" not in st.session_state:
# st.session_state.count = 0
# st.session_state.count += 1
# progress = st.session_state.count / length
# progress_bar.progress(progress)
# return result
def safe_sentiment(sentiment_pipeline, text, length, progress_bar):
try:
result = sentiment_pipeline(text)[0]
except Exception as e:
result = None
if "count" not in st.session_state:
st.session_state.count = 0
st.session_state.count += 1
progress = st.session_state.count / length
# Clamp the progress value between 0.0 and 1.0
progress = min(max(progress, 0.0), 1.0)
progress_bar.progress(progress)
return result
def safe_sentiment_batch(sentiment_pipeline, texts):
try:
results = sentiment_pipeline(texts)
except Exception as e:
results = [None] * len(texts)
return results
def analyze_detail(text, tokenizer, sentiment_pipeline, max_tokens):
text = preprocess_text(text)
chunks = split_text_by_token_limit(text, tokenizer, max_tokens)
if not chunks:
return None
# batch processing (for each chunk)
results = safe_sentiment_batch(sentiment_pipeline, chunks)
# arrange the result
scores = {"POSITIVE": 0, "NEGATIVE": 0, "NEUTRAL": 0}
for result in results:
if result is not None:
label = result['label'].upper()
if label in scores:
scores[label] += result['score']
final_label = max(scores, key=lambda k: scores[k])
final_score = scores[final_label]
return {"label": final_label, "score": final_score}
def preprocess_text(text):
# Replace URLs and user mentions
text = re.sub(r'http\S+', 'http', text)
text = re.sub(r'@\w+', '@user', text)
return text
def generate_variants(keyword):
# Split the keyword into individual words
words = keyword.split()
# Original keyword
original = keyword
# Convert the keyword to all uppercase letters
all_upper = keyword.upper()
# Convert the keyword to all lowercase letters
all_lower = keyword.lower()
# Concatenate words with each word capitalized (no spaces)
no_space_title = ''.join(word.capitalize() for word in words)
# Concatenate words in all uppercase (no spaces)
no_space_upper = ''.join(word.upper() for word in words)
# Concatenate words in all lowercase (no spaces)
no_space_lower = ''.join(word.lower() for word in words)
# Create a string with only the first letter of each word (e.g., MHW)
initials = ''.join(word[0].upper() for word in words)
# Return all variants as a list
return [original, all_upper, all_lower, no_space_title, no_space_upper, no_space_lower, initials]
# Function to check if a cell contains any excluded keywords
def contains_excluded_keywords(cell, excluded_keywords):
if isinstance(cell, np.ndarray):
cell_str = ' '.join(map(str, cell))
return any(keyword in cell_str for keyword in excluded_keywords)
elif isinstance(cell, str):
return any(keyword in cell for keyword in excluded_keywords)
return False
# Function to extract terms from a cell
def extract_terms(cell):
if isinstance(cell, np.ndarray):
# Convert each element to a string and strip whitespace
return [str(item).strip() for item in cell if str(item).strip()]
elif isinstance(cell, str):
# Split the string by commas and strip whitespace from each term
return [term.strip() for term in cell.split(',') if term.strip()]
else:
return []
# def remove_excluded_from_list(keywords_list, excluded_keywords):
# """
# Remove items from the keywords_list if they contain any of the excluded keywords.
# This function checks for partial matches in a case-insensitive manner.
# """
# if not isinstance(keywords_list, list):
# return keywords_list # If it's not a list, return as is
# filtered_list = []
# for item in keywords_list:
# # Check if item contains any excluded keyword (case-insensitive)
# if any(kw.lower() in item.lower() for kw in excluded_keywords):
# # Skip this item if it matches an excluded keyword
# continue
# else:
# filtered_list.append(item)
# return filtered_list
def remove_excluded_from_text(text, excluded_keywords):
"""
Remove occurrences of any excluded keyword from the text.
Matching is case-insensitive. Extra whitespace is cleaned.
"""
if not isinstance(text, str):
return text
filtered_text = text
for kw in excluded_keywords:
# Create a regex pattern for the keyword (case-insensitive)
pattern = re.compile(re.escape(kw), re.IGNORECASE)
# Replace any occurrence of the keyword with an empty string
filtered_text = pattern.sub("", filtered_text)
# Remove extra spaces and strip the result
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
return filtered_text
def process_extracted_result(result, excluded_keywords):
"""
Process an extracted result by removing excluded keywords from each string.
If result is a list, process each element; if it's a string, process it directly.
Return a list of non-empty cleaned strings.
"""
cleaned_items = []
if isinstance(result, list):
for item in result:
cleaned_item = remove_excluded_from_text(item, excluded_keywords)
if cleaned_item: # Only add non-empty strings
cleaned_items.append(cleaned_item)
elif isinstance(result, str):
cleaned_item = remove_excluded_from_text(result, excluded_keywords)
if cleaned_item:
cleaned_items.append(cleaned_item)
return cleaned_items
|