Test2 / scheduler.py
Dunevhhhh's picture
Update scheduler.py
17ad0be verified
# scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from config import SETTINGS, load_feeds
import logging
from fetch_news import NewsFetcher
from filter_news import NewsFilter
from telegram_bot import TelegramBot
import asyncio
logger = logging.getLogger(__name__)
class NewsScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.processed_links = set()
self.max_history = 1000
self.filter = NewsFilter()
self.bot = TelegramBot()
self.running = False
async def start(self):
if not self.running:
self.scheduler.start()
self.running = True
asyncio.create_task(self.bot.process_queue())
logger.info("Scheduler gestartet mit %d Jobs", len(self.scheduler.get_jobs()))
async def shutdown(self):
if self.running:
await self.scheduler.shutdown()
await self.bot.session.close()
self.running = False
async def job_wrapper(self, feed: Dict):
try:
async with NewsFetcher() as fetcher:
articles = await fetcher.process_rss(feed, self.processed_links) if feed["type"] == "rss" \
else await fetcher.get_news_api(feed, self.processed_links)
filtered = self.filter.filter_articles(articles)
for article in filtered:
await self.bot.message_queue.put(article)
self.cleanup_processed_links()
except Exception as e:
logger.error(f"Job für {feed['name']} fehlgeschlagen: {str(e)}")
def cleanup_processed_links(self):
if len(self.processed_links) > self.max_history:
self.processed_links = set(list(self.processed_links)[-self.max_history:])
def add_jobs(self):
for feed in load_feeds():
trigger = IntervalTrigger(
minutes=feed.get("interval", SETTINGS["check_interval"])
)
self.scheduler.add_job(
self.job_wrapper,
trigger=trigger,
kwargs={"feed": feed},
max_instances=3,
name=f"Feed: {feed['name']}"
)