|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler |
|
from apscheduler.triggers.interval import IntervalTrigger |
|
from config import SETTINGS, load_feeds |
|
import logging |
|
from fetch_news import NewsFetcher |
|
from filter_news import NewsFilter |
|
from telegram_bot import TelegramBot |
|
import asyncio |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class NewsScheduler: |
|
def __init__(self): |
|
self.scheduler = AsyncIOScheduler() |
|
self.processed_links = set() |
|
self.max_history = 1000 |
|
self.filter = NewsFilter() |
|
self.bot = TelegramBot() |
|
self.running = False |
|
|
|
async def start(self): |
|
if not self.running: |
|
self.scheduler.start() |
|
self.running = True |
|
asyncio.create_task(self.bot.process_queue()) |
|
logger.info("Scheduler gestartet mit %d Jobs", len(self.scheduler.get_jobs())) |
|
|
|
async def shutdown(self): |
|
if self.running: |
|
await self.scheduler.shutdown() |
|
await self.bot.session.close() |
|
self.running = False |
|
|
|
async def job_wrapper(self, feed: Dict): |
|
try: |
|
async with NewsFetcher() as fetcher: |
|
articles = await fetcher.process_rss(feed, self.processed_links) if feed["type"] == "rss" \ |
|
else await fetcher.get_news_api(feed, self.processed_links) |
|
|
|
filtered = self.filter.filter_articles(articles) |
|
for article in filtered: |
|
await self.bot.message_queue.put(article) |
|
|
|
self.cleanup_processed_links() |
|
except Exception as e: |
|
logger.error(f"Job für {feed['name']} fehlgeschlagen: {str(e)}") |
|
|
|
def cleanup_processed_links(self): |
|
if len(self.processed_links) > self.max_history: |
|
self.processed_links = set(list(self.processed_links)[-self.max_history:]) |
|
|
|
def add_jobs(self): |
|
for feed in load_feeds(): |
|
trigger = IntervalTrigger( |
|
minutes=feed.get("interval", SETTINGS["check_interval"]) |
|
) |
|
self.scheduler.add_job( |
|
self.job_wrapper, |
|
trigger=trigger, |
|
kwargs={"feed": feed}, |
|
max_instances=3, |
|
name=f"Feed: {feed['name']}" |
|
) |