MoneyRadar2 / app-backup-HN.py
openfree's picture
Rename app.py to app-backup-HN.py
d38f8d2 verified
raw
history blame
26.3 kB
import gradio as gr
import requests
import json
import os
from datetime import datetime, timedelta
from huggingface_hub import InferenceClient
MAX_COUNTRY_RESULTS = 100 # 국가별 최대 결과 수
MAX_GLOBAL_RESULTS = 1000 # 전세계 최대 결과 수
def create_article_components(max_results):
article_components = []
for i in range(max_results):
with gr.Group(visible=False) as article_group:
title = gr.Markdown()
image = gr.Image(width=200, height=150)
snippet = gr.Markdown()
info = gr.Markdown()
article_components.append({
'group': article_group,
'title': title,
'image': image,
'snippet': snippet,
'info': info,
'index': i,
})
return article_components
API_KEY = os.getenv("SERPHOUSE_API_KEY")
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN"))
# 국가별 언어 코드 매핑
COUNTRY_LANGUAGES = {
"United States": "en",
"United Kingdom": "en",
"Taiwan": "zh-TW", # 대만어(번체 중국어)
"Canada": "en",
"Australia": "en",
"Germany": "de",
"France": "fr",
"Japan": "ja",
# "South Korea": "ko",
"China": "zh",
"India": "hi",
"Brazil": "pt",
"Mexico": "es",
"Russia": "ru",
"Italy": "it",
"Spain": "es",
"Netherlands": "nl",
"Singapore": "en",
"Hong Kong": "zh-HK",
"Indonesia": "id",
"Malaysia": "ms",
"Philippines": "tl",
"Thailand": "th",
"Vietnam": "vi",
"Belgium": "nl",
"Denmark": "da",
"Finland": "fi",
"Ireland": "en",
"Norway": "no",
"Poland": "pl",
"Sweden": "sv",
"Switzerland": "de",
"Austria": "de",
"Czech Republic": "cs",
"Greece": "el",
"Hungary": "hu",
"Portugal": "pt",
"Romania": "ro",
"Turkey": "tr",
"Israel": "he",
"Saudi Arabia": "ar",
"United Arab Emirates": "ar",
"South Africa": "en",
"Argentina": "es",
"Chile": "es",
"Colombia": "es",
"Peru": "es",
"Venezuela": "es",
"New Zealand": "en",
"Bangladesh": "bn",
"Pakistan": "ur",
"Egypt": "ar",
"Morocco": "ar",
"Nigeria": "en",
"Kenya": "sw",
"Ukraine": "uk",
"Croatia": "hr",
"Slovakia": "sk",
"Bulgaria": "bg",
"Serbia": "sr",
"Estonia": "et",
"Latvia": "lv",
"Lithuania": "lt",
"Slovenia": "sl",
"Luxembourg": "fr",
"Malta": "mt",
"Cyprus": "el",
"Iceland": "is"
}
COUNTRY_LOCATIONS = {
"United States": "United States",
"United Kingdom": "United Kingdom",
"Taiwan": "Taiwan", # 국가명 사용
"Canada": "Canada",
"Australia": "Australia",
"Germany": "Germany",
"France": "France",
"Japan": "Japan",
# "South Korea": "South Korea",
"China": "China",
"India": "India",
"Brazil": "Brazil",
"Mexico": "Mexico",
"Russia": "Russia",
"Italy": "Italy",
"Spain": "Spain",
"Netherlands": "Netherlands",
"Singapore": "Singapore",
"Hong Kong": "Hong Kong",
"Indonesia": "Indonesia",
"Malaysia": "Malaysia",
"Philippines": "Philippines",
"Thailand": "Thailand",
"Vietnam": "Vietnam",
"Belgium": "Belgium",
"Denmark": "Denmark",
"Finland": "Finland",
"Ireland": "Ireland",
"Norway": "Norway",
"Poland": "Poland",
"Sweden": "Sweden",
"Switzerland": "Switzerland",
"Austria": "Austria",
"Czech Republic": "Czech Republic",
"Greece": "Greece",
"Hungary": "Hungary",
"Portugal": "Portugal",
"Romania": "Romania",
"Turkey": "Turkey",
"Israel": "Israel",
"Saudi Arabia": "Saudi Arabia",
"United Arab Emirates": "United Arab Emirates",
"South Africa": "South Africa",
"Argentina": "Argentina",
"Chile": "Chile",
"Colombia": "Colombia",
"Peru": "Peru",
"Venezuela": "Venezuela",
"New Zealand": "New Zealand",
"Bangladesh": "Bangladesh",
"Pakistan": "Pakistan",
"Egypt": "Egypt",
"Morocco": "Morocco",
"Nigeria": "Nigeria",
"Kenya": "Kenya",
"Ukraine": "Ukraine",
"Croatia": "Croatia",
"Slovakia": "Slovakia",
"Bulgaria": "Bulgaria",
"Serbia": "Serbia",
"Estonia": "Estonia",
"Latvia": "Latvia",
"Lithuania": "Lithuania",
"Slovenia": "Slovenia",
"Luxembourg": "Luxembourg",
"Malta": "Malta",
"Cyprus": "Cyprus",
"Iceland": "Iceland"
}
MAJOR_COUNTRIES = list(COUNTRY_LOCATIONS.keys())
def translate_query(query, country):
try:
# 영어 입력 확인
if is_english(query):
print(f"영어 검색어 감지 - 원본 사용: {query}")
return query
# 선택된 국가가 번역 지원 국가인 경우
if country in COUNTRY_LANGUAGES:
# South Korea 선택시 한글 입력은 그대로 사용
if country == "South Korea":
print(f"한국 선택 - 원본 사용: {query}")
return query
target_lang = COUNTRY_LANGUAGES[country]
print(f"번역 시도: {query} -> {country}({target_lang})")
url = f"https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": target_lang,
"dt": "t",
"q": query
}
response = requests.get(url, params=params)
translated_text = response.json()[0][0][0]
print(f"번역 완료: {query} -> {translated_text} ({country})")
return translated_text
return query
except Exception as e:
print(f"번역 오류: {str(e)}")
return query
def translate_to_korean(text):
try:
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": "ko",
"dt": "t",
"q": text
}
response = requests.get(url, params=params)
translated_text = response.json()[0][0][0]
return translated_text
except Exception as e:
print(f"한글 번역 오류: {str(e)}")
return text
def is_english(text):
return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', ''))
def is_korean(text):
return any('\uAC00' <= char <= '\uD7A3' for char in text)
def search_serphouse(query, country, page=1, num_result=10):
url = "https://api.serphouse.com/serp/live"
now = datetime.utcnow()
yesterday = now - timedelta(days=1)
date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}"
translated_query = translate_query(query, country)
print(f"Original query: {query}")
print(f"Translated query: {translated_query}")
payload = {
"data": {
"q": translated_query,
"domain": "google.com",
"loc": COUNTRY_LOCATIONS.get(country, "United States"),
"lang": COUNTRY_LANGUAGES.get(country, "en"),
"device": "desktop",
"serp_type": "news",
"page": "1",
"num": "10",
"date_range": date_range,
"sort_by": "date"
}
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {API_KEY}"
}
try:
response = requests.post(url, json=payload, headers=headers)
print("Request payload:", json.dumps(payload, indent=2, ensure_ascii=False))
print("Response status:", response.status_code)
response.raise_for_status()
return {"results": response.json(), "translated_query": translated_query}
except requests.RequestException as e:
return {"error": f"Error: {str(e)}", "translated_query": query}
def format_results_from_raw(response_data):
if "error" in response_data:
return "Error: " + response_data["error"], []
try:
results = response_data["results"]
translated_query = response_data["translated_query"]
news_results = results.get('results', {}).get('results', {}).get('news', [])
if not news_results:
return "검색 결과가 없습니다.", []
articles = []
for idx, result in enumerate(news_results, 1):
articles.append({
"index": idx,
"title": result.get("title", "제목 없음"),
"link": result.get("url", result.get("link", "#")),
"snippet": result.get("snippet", "내용 없음"),
"channel": result.get("channel", result.get("source", "알 수 없음")),
"time": result.get("time", result.get("date", "알 수 없는 시간")),
"image_url": result.get("img", result.get("thumbnail", "")),
"translated_query": translated_query
})
return "", articles
except Exception as e:
return f"결과 처리 중 오류 발생: {str(e)}", []
def serphouse_search(query, country):
response_data = search_serphouse(query, country)
return format_results_from_raw(response_data)
# Hacker News API 관련 함수들 먼저 추가
def get_hn_item(item_id):
"""개별 아이템 정보 가져오기"""
try:
response = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json")
return response.json()
except:
return None
def get_recent_stories():
"""최신 스토리 가져오기"""
try:
response = requests.get("https://hacker-news.firebaseio.com/v0/newstories.json")
story_ids = response.json()
recent_stories = []
current_time = datetime.now().timestamp()
day_ago = current_time - (24 * 60 * 60)
for story_id in story_ids:
story = get_hn_item(story_id)
if story and 'time' in story and story['time'] > day_ago:
recent_stories.append(story)
if len(recent_stories) >= 100:
break
return recent_stories
except Exception as e:
print(f"Error fetching HN stories: {str(e)}")
return []
def format_hn_time(timestamp):
"""Unix timestamp를 읽기 쉬운 형식으로 변환"""
try:
dt = datetime.fromtimestamp(timestamp)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return "Unknown time"
def refresh_hn_stories():
"""Hacker News 스토리 새로고침"""
status_msg = "Hacker News 포스트를 가져오는 중..."
outputs = [gr.update(value=status_msg, visible=True)]
# 모든 컴포넌트 초기화
for _ in hn_article_components:
outputs.extend([
gr.update(visible=False),
gr.update(),
gr.update()
])
# 최신 스토리 가져오기
stories = get_recent_stories()
# 결과 업데이트
outputs = [gr.update(value=f"총 {len(stories)}개의 포스트를 찾았습니다.", visible=True)]
for idx, comp in enumerate(hn_article_components):
if idx < len(stories):
story = stories[idx]
outputs.extend([
gr.update(visible=True),
gr.update(value=f"### [{story.get('title', 'Untitled')}]({story.get('url', '#')})"),
gr.update(value=f"**작성자:** {story.get('by', 'unknown')} | **시간:** {format_hn_time(story.get('time', 0))} | **점수:** {story.get('score', 0)} | **댓글:** {len(story.get('kids', []))}개")
])
else:
outputs.extend([
gr.update(visible=False),
gr.update(),
gr.update()
])
return outputs
css = """
footer {visibility: hidden;}
#status_area {
background: rgba(255, 255, 255, 0.9); /* 약간 투명한 흰색 배경 */
padding: 15px;
border-bottom: 1px solid #ddd;
margin-bottom: 20px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1); /* 부드러운 그림자 효과 */
}
#results_area {
padding: 10px;
margin-top: 10px;
}
/* 탭 스타일 개선 */
.tabs {
border-bottom: 2px solid #ddd !important;
margin-bottom: 20px !important;
}
.tab-nav {
border-bottom: none !important;
margin-bottom: 0 !important;
}
.tab-nav button {
font-weight: bold !important;
padding: 10px 20px !important;
}
.tab-nav button.selected {
border-bottom: 2px solid #1f77b4 !important; /* 선택된 탭 강조 */
color: #1f77b4 !important;
}
/* 검색 상태 메시지 스타일 */
#status_area .markdown-text {
font-size: 1.1em;
color: #2c3e50;
padding: 10px 0;
}
/* 검색 결과 컨테이너 스타일 */
.group {
border: 1px solid #eee;
padding: 15px;
margin-bottom: 15px;
border-radius: 5px;
background: white;
}
/* 검색 버튼 스타일 */
.primary-btn {
background: #1f77b4 !important;
border: none !important;
}
/* 검색어 입력창 스타일 */
.textbox {
border: 1px solid #ddd !important;
border-radius: 4px !important;
}
"""
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css, title="NewsAI 서비스") as iface:
with gr.Tabs():
# 국가별 탭
with gr.Tab("국가별"):
gr.Markdown("검색어를 입력하고 원하는 국가(한국 제외)를를 선택하면, 검색어와 일치하는 24시간 이내 뉴스를 최대 100개 출력합니다.")
gr.Markdown("국가 선택후 검색어에 '한글'을 입력하면 현지 언어로 번역되어 검색합니다. 예: 'Taiwan' 국가 선택후 '삼성' 입력시 '三星'으로 자동 검색")
with gr.Column():
with gr.Row():
query = gr.Textbox(label="검색어")
country = gr.Dropdown(MAJOR_COUNTRIES, label="국가", value="South Korea")
status_message = gr.Markdown("", visible=True)
translated_query_display = gr.Markdown(visible=False)
search_button = gr.Button("검색", variant="primary")
progress = gr.Progress()
articles_state = gr.State([])
article_components = []
for i in range(100):
with gr.Group(visible=False) as article_group:
title = gr.Markdown()
image = gr.Image(width=200, height=150)
snippet = gr.Markdown()
info = gr.Markdown()
article_components.append({
'group': article_group,
'title': title,
'image': image,
'snippet': snippet,
'info': info,
'index': i,
})
# 전세계 탭
with gr.Tab("전세계"):
gr.Markdown("검색어를 입력하면 67개국(한국 제외) 전체에 대해 국가별로 구분하여 24시간 이내 뉴스가 최대 1000개 순차 출력됩니다.")
gr.Markdown("국가 선택후 검색어에 '한글'을 입력하면 현지 언어로 번역되어 검색합니다. 예: 'Taiwan' 국가 선택후 '삼성' 입력시 '三星'으로 자동 검색")
with gr.Column():
with gr.Column(elem_id="status_area"):
with gr.Row():
query_global = gr.Textbox(label="검색어")
search_button_global = gr.Button("전세계 검색", variant="primary")
status_message_global = gr.Markdown("")
translated_query_display_global = gr.Markdown("")
with gr.Column(elem_id="results_area"):
articles_state_global = gr.State([])
global_article_components = []
for i in range(1000):
with gr.Group(visible=False) as article_group:
title = gr.Markdown()
image = gr.Image(width=200, height=150)
snippet = gr.Markdown()
info = gr.Markdown()
global_article_components.append({
'group': article_group,
'title': title,
'image': image,
'snippet': snippet,
'info': info,
'index': i,
})
# AI 리포터 탭
with gr.Tab("AI 리포터"):
gr.Markdown("지난 24시간 동안의 Hacker News 포스트를 보여줍니다.")
with gr.Column():
refresh_button = gr.Button("새로고침", variant="primary")
status_message_hn = gr.Markdown("")
with gr.Column(elem_id="hn_results_area"):
hn_articles_state = gr.State([])
hn_article_components = []
for i in range(100):
with gr.Group(visible=False) as article_group:
title = gr.Markdown()
info = gr.Markdown()
hn_article_components.append({
'group': article_group,
'title': title,
'info': info,
'index': i,
})
# 기존 함수들
def search_and_display(query, country, articles_state, progress=gr.Progress()):
status_msg = "검색을 진행중입니다. 잠시만 기다리세요..."
progress(0, desc="검색어 번역 중...")
translated_query = translate_query(query, country)
translated_display = f"**원본 검색어:** {query}\n**번역된 검색어:** {translated_query}" if translated_query != query else f"**검색어:** {query}"
progress(0.2, desc="검색 시작...")
error_message, articles = serphouse_search(query, country)
progress(0.5, desc="결과 처리 중...")
outputs = []
outputs.append(gr.update(value=status_msg, visible=True))
outputs.append(gr.update(value=translated_display, visible=True))
if error_message:
outputs.append(gr.update(value=error_message, visible=True))
for comp in article_components:
outputs.extend([
gr.update(visible=False), gr.update(), gr.update(),
gr.update(), gr.update()
])
articles_state = []
else:
outputs.append(gr.update(value="", visible=False))
total_articles = len(articles)
for idx, comp in enumerate(article_components):
progress((idx + 1) / total_articles, desc=f"결과 표시 중... {idx + 1}/{total_articles}")
if idx < len(articles):
article = articles[idx]
image_url = article['image_url']
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False)
korean_summary = translate_to_korean(article['snippet'])
outputs.extend([
gr.update(visible=True),
gr.update(value=f"### [{article['title']}]({article['link']})"),
image_update,
gr.update(value=f"**요약:** {article['snippet']}\n\n**한글 요약:** {korean_summary}"),
gr.update(value=f"**출처:** {article['channel']} | **시간:** {article['time']}")
])
else:
outputs.extend([
gr.update(visible=False), gr.update(), gr.update(),
gr.update(), gr.update()
])
articles_state = articles
progress(1.0, desc="완료!")
outputs.append(articles_state)
outputs[0] = gr.update(value="", visible=False)
return outputs
def search_global(query, articles_state_global):
status_msg = "전세계 검색을 시작합니다..."
all_results = []
outputs = [
gr.update(value=status_msg, visible=True),
gr.update(value=f"**검색어:** {query}", visible=True),
]
for _ in global_article_components:
outputs.extend([
gr.update(visible=False), gr.update(), gr.update(),
gr.update(), gr.update()
])
outputs.append([])
yield outputs
total_countries = len(COUNTRY_LOCATIONS)
for idx, (country, location) in enumerate(COUNTRY_LOCATIONS.items(), 1):
try:
status_msg = f"{country} 검색 중... ({idx}/{total_countries} 국가)"
outputs[0] = gr.update(value=status_msg, visible=True)
yield outputs
error_message, articles = serphouse_search(query, country)
if not error_message and articles:
for article in articles:
article['source_country'] = country
all_results.extend(articles)
sorted_results = sorted(all_results, key=lambda x: x.get('time', ''), reverse=True)
seen_urls = set()
unique_results = []
for article in sorted_results:
url = article.get('link', '')
if url not in seen_urls:
seen_urls.add(url)
unique_results.append(article)
unique_results = unique_results[:1000]
outputs = [
gr.update(value=f"{idx}/{total_countries} 국가 검색 완료\n현재까지 발견된 뉴스: {len(unique_results)}건", visible=True),
gr.update(value=f"**검색어:** {query}", visible=True),
]
for idx, comp in enumerate(global_article_components):
if idx < len(unique_results):
article = unique_results[idx]
image_url = article.get('image_url', '')
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False)
korean_summary = translate_to_korean(article['snippet'])
outputs.extend([
gr.update(visible=True),
gr.update(value=f"### [{article['title']}]({article['link']})"),
image_update,
gr.update(value=f"**요약:** {article['snippet']}\n\n**한글 요약:** {korean_summary}"),
gr.update(value=f"**출처:** {article['channel']} | **국가:** {article['source_country']} | **시간:** {article['time']}")
])
else:
outputs.extend([
gr.update(visible=False), gr.update(), gr.update(),
gr.update(), gr.update()
])
outputs.append(unique_results)
yield outputs
except Exception as e:
print(f"Error searching {country}: {str(e)}")
continue
final_status = f"검색 완료! 총 {len(unique_results)}개의 뉴스가 발견되었습니다."
outputs[0] = gr.update(value=final_status, visible=True)
yield outputs
# 국가별 탭 이벤트 연결
search_outputs = [
status_message,
translated_query_display,
gr.Markdown(visible=False)
]
for comp in article_components:
search_outputs.extend([
comp['group'], comp['title'], comp['image'],
comp['snippet'], comp['info']
])
search_outputs.append(articles_state)
search_button.click(
search_and_display,
inputs=[query, country, articles_state],
outputs=search_outputs,
show_progress=True
)
# 전세계 탭 이벤트 연결
global_search_outputs = [
status_message_global,
translated_query_display_global,
]
for comp in global_article_components:
global_search_outputs.extend([
comp['group'], comp['title'], comp['image'],
comp['snippet'], comp['info']
])
global_search_outputs.append(articles_state_global)
search_button_global.click(
search_global,
inputs=[query_global, articles_state_global],
outputs=global_search_outputs
)
# AI 리포터 탭 이벤트 연결
hn_outputs = [status_message_hn]
for comp in hn_article_components:
hn_outputs.extend([
comp['group'],
comp['title'],
comp['info']
])
refresh_button.click(
refresh_hn_stories,
outputs=hn_outputs
)
iface.launch(auth=("it1","chosun1"))