import re | |
import pandas as pd | |
from urllib.parse import urlparse, parse_qs | |
from preprocessText import preprocess | |
from googleapiclient.discovery import build | |
import isodate | |
import os | |
apiKeys = [ | |
'AIzaSyC7KzwigUsNJ4KNvqGfPqXVK9QcDBsKU78', | |
'AIzaSyC7KzwigUsNJ4KNvqGfPqXVK9QcDBsKU78', | |
'AIzaSyC7KzwigUsNJ4KNvqGfPqXVK9QcDBsKU78', | |
] | |
class YouTubeService: | |
def __init__(self, api_key): | |
self.api_key = api_key | |
self.service = build('youtube', 'v3', developerKey=api_key) | |
def switch_api_key(self): | |
current_key_index = apiKeys.index(self.api_key) | |
next_key_index = (current_key_index + 1) % len(apiKeys) | |
self.api_key = apiKeys[next_key_index] | |
self.service = build('youtube', 'v3', developerKey=self.api_key) | |
# Initialize the YouTube service with the first API key | |
youtube = YouTubeService(apiKeys[0]) | |
def get_next_api_key(): | |
current_key_index = apiKeys.index(youtube.api_key) | |
next_key_index = (current_key_index + 1) % len(apiKeys) | |
youtube.switch_api_key() | |
return apiKeys[next_key_index] | |
def get_video_id(url): | |
video_id = None | |
parsed_url = urlparse(url) | |
query_params = parse_qs(parsed_url.query) | |
if parsed_url.netloc == '': | |
video_id = parsed_url.path[1:] | |
elif parsed_url.netloc in ('', ''): | |
if 'v' in query_params: | |
video_id = query_params['v'][0] | |
return video_id | |
def get_video_metadata(video_id): | |
try: | |
api_key = get_next_api_key() | |
youtube = build('youtube', 'v3', developerKey=api_key) | |
response = youtube.videos().list( | |
part='snippet,contentDetails,statistics', | |
id=video_id | |
).execute() | |
if 'items' in response and len(response['items']) > 0: | |
video = response['items'][0] | |
try: | |
comments = video['statistics']['commentCount'] | |
except KeyError: | |
comments = 0 | |
metadata = { | |
'title': video['snippet']['title'], | |
'description': video['snippet']['description'], | |
'channel_title': video['snippet']['channelTitle'], | |
'publish_date': video['snippet']['publishedAt'], | |
'duration': video['contentDetails']['duration'], | |
'views': video['statistics']['viewCount'], | |
'likes': video['statistics']['likeCount'], | |
'comments': comments, | |
'category_id': video['snippet']['categoryId'], | |
'thumbnail_link': video['snippet']['thumbnails']['default']['url'] | |
} | |
return metadata | |
except Exception as e: | |
print("An error occurred:", str(e)) | |
return None | |
def get_metadata(url): | |
video_id = get_video_id(url) | |
metadata = get_video_metadata(video_id) | |
if metadata is not None: | |
df = pd.DataFrame([metadata]) | |
df['duration'] = df['duration'].apply(lambda x: isodate.parse_duration(x).total_seconds()) | |
df['cleanTitle'] = df['title'].apply(preprocess) | |
df['cleanTitle'] = df['cleanTitle'].apply(lambda x: ' '.join(x)) | |
df['titleLength'] = df['title'].apply(lambda x: len(x)) | |
df['descriptionLength'] = df['description'].apply(lambda x: len(x)) | |
df['thumbnail_link'] = df['thumbnail_link'].str.replace('default.jpg', 'maxresdefault.jpg') | |
return df | |
else: | |
return 0 | |
def get_trending_videos(country_code): | |
try: | |
api_key = get_next_api_key() | |
youtube = build('youtube', 'v3', developerKey=api_key) | |
try: | |
response = youtube.videos().list( | |
part='snippet,contentDetails,statistics', | |
chart='mostPopular', | |
regionCode=country_code, | |
maxResults=10 | |
).execute() | |
trending_videos = [] | |
for item in response['items']: | |
title = item['snippet']['title'] | |
description = item['snippet']['description'], | |
channel_title = item['snippet']['channelTitle'] | |
publish_date = item['snippet']['publishedAt'] | |
duration = item['contentDetails']['duration'] | |
views = item['statistics']['viewCount'] | |
try: | |
likes = item['statistics']['likeCount'] | |
except KeyError: | |
likes = "Hidden!" | |
try: | |
comments = item['statistics']['commentCount'] | |
except KeyError: | |
comments = "Hidden!" | |
category_id = item['snippet']['categoryId'] | |
thumbnail_link = item['snippet']['thumbnails']['default']['url'] | |
duration = isodate.parse_duration(duration) | |
duration = duration.total_seconds() | |
trending_videos.append({ | |
'title': title, | |
'description':description, | |
'channel_title': channel_title, | |
'publish_date': publish_date, | |
'duration': duration, | |
'views': views, | |
'likes': likes, | |
'comments': comments, | |
'category_id': category_id, | |
'thumbnail_link': thumbnail_link | |
}) | |
df = pd.DataFrame(trending_videos) | |
df['views'] = df['views'].astype(int) | |
df['likes'] = df['likes'].astype(str) | |
df['comments'] = df['comments'].astype(str) | |
df['category_id'] = df['category_id'].astype(int) | |
df['thumbnail_link'] = df['thumbnail_link'].str.replace('default.jpg', 'maxresdefault.jpg') | |
return df | |
except Exception as e: | |
print('An error occurred:', str(e)) | |
return None | |
except Exception as e: | |
print("An error occurred:", str(e)) |