# Standard library imports import datetime import base64 # Related third-party imports import streamlit as st from streamlit_elements import elements from google_auth_oauthlib.flow import Flow from googleapiclient.discovery import build from dotenv import load_dotenv import pandas as pd import searchconsole import os import cohere from sklearn.metrics.pairwise import cosine_similarity import requests from bs4 import BeautifulSoup load_dotenv() # Initialize Cohere client COHERE_API_KEY = os.environ["COHERE_API_KEY"] co = cohere.Client(COHERE_API_KEY) # Configuration: Set to True if running locally, False if running on Streamlit Cloud IS_LOCAL = False # Constants SEARCH_TYPES = ["web", "image", "video", "news", "discover", "googleNews"] DATE_RANGE_OPTIONS = [ "Last 7 Days", "Last 30 Days", "Last 3 Months", "Last 6 Months", "Last 12 Months", "Last 16 Months", "Custom Range" ] DEVICE_OPTIONS = ["All Devices", "desktop", "mobile", "tablet"] BASE_DIMENSIONS = ["page", "query", "country", "date"] MAX_ROWS = 250_000 DF_PREVIEW_ROWS = 100 # ------------- # Streamlit App Configuration # ------------- def setup_streamlit(): """ Configures Streamlit's page settings and displays the app title and markdown information. Sets the page layout, title, and markdown content with links and app description. """ st.set_page_config(page_title="✨ Simple Google Search Console Data | LeeFoot.co.uk", layout="wide") st.title("✨ Simple Google Search Console Data | June 2024") st.markdown(f"### Lightweight GSC Data Extractor. (Max {MAX_ROWS:,} Rows)") st.markdown( """
Created by LeeFootSEO | More Apps & Scripts on my Website """, unsafe_allow_html=True ) st.divider() def init_session_state(): """ Initialises or updates the Streamlit session state variables for property selection, search type, date range, dimensions, and device type. """ if 'selected_property' not in st.session_state: st.session_state.selected_property = None if 'selected_search_type' not in st.session_state: st.session_state.selected_search_type = 'web' if 'selected_date_range' not in st.session_state: st.session_state.selected_date_range = 'Last 7 Days' if 'start_date' not in st.session_state: st.session_state.start_date = datetime.date.today() - datetime.timedelta(days=7) if 'end_date' not in st.session_state: st.session_state.end_date = datetime.date.today() if 'selected_dimensions' not in st.session_state: st.session_state.selected_dimensions = ['page', 'query'] if 'selected_device' not in st.session_state: st.session_state.selected_device = 'All Devices' if 'custom_start_date' not in st.session_state: st.session_state.custom_start_date = datetime.date.today() - datetime.timedelta(days=7) if 'custom_end_date' not in st.session_state: st.session_state.custom_end_date = datetime.date.today() def fetch_content(url): """ Fetches the content of a webpage. """ try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') content = soup.get_text(separator=' ', strip=True) return content except requests.RequestException as e: return str(e) def generate_embeddings(text_list): """ Generates embeddings for a list of texts using Cohere's API. """ if not text_list: return [] model = 'embed-english-v3.0' input_type = 'search_document' response = co.embed(model=model, texts=text_list, input_type=input_type) embeddings = response.embeddings return embeddings def calculate_relevancy_scores(df): """ Calculates relevancy scores for each row in the dataframe. """ try: st.write("Calculating relevancy scores...") st.write(f"Input DataFrame shape: {df.shape}") st.write(f"Input DataFrame columns: {df.columns}") page_contents = [fetch_content(url) for url in df['page']] st.write(f"Fetched {len(page_contents)} page contents") page_embeddings = generate_embeddings(page_contents) st.write(f"Generated {len(page_embeddings)} page embeddings") query_embeddings = generate_embeddings(df['query'].tolist()) st.write(f"Generated {len(query_embeddings)} query embeddings") relevancy_scores = cosine_similarity(query_embeddings, page_embeddings).diagonal() st.write(f"Calculated {len(relevancy_scores)} relevancy scores") st.write(f"Sample relevancy scores: {relevancy_scores[:5]}") df = df.assign(relevancy_score=relevancy_scores) st.write(f"Assigned relevancy scores to DataFrame") st.write(f"DataFrame shape after assigning scores: {df.shape}") st.write(f"DataFrame columns after assigning scores: {df.columns}") st.write(f"Sample relevancy scores from DataFrame: {df['relevancy_score'].head()}") except Exception as e: st.warning(f"Error calculating relevancy scores: {e}") df = df.assign(relevancy_score=0) # Default value if calculation fails return df def fetch_data_loading(webproperty, search_type, start_date, end_date, dimensions, device_type=None): """ Fetches Google Search Console data with a loading indicator and calculates relevancy scores. """ with st.spinner('Fetching data and calculating relevancy scores...'): df = fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type) if not df.empty: df = calculate_relevancy_scores(df) st.write(f"Data fetched. Shape: {df.shape}") return df # ------------- # Google Authentication Functions # ------------- def load_config(): """ Loads the Google API client configuration from Streamlit secrets. Returns a dictionary with the client configuration for OAuth. """ client_config = { "installed": { "client_id": os.environ["CLIENT_ID"], "client_secret": os.environ["CLIENT_SECRET"], "redirect_uris": [os.environ["REDIRECT_URI"]], }} return client_config def init_oauth_flow(client_config): """ Initialises the OAuth flow for Google API authentication using the client configuration. Sets the necessary scopes and returns the configured Flow object. """ scopes = ["https://www.googleapis.com/auth/webmasters"] return Flow.from_client_config( client_config, scopes=scopes, redirect_uri=client_config["installed"]["redirect_uris"][0], ) def google_auth(client_config): """ Starts the Google authentication process using OAuth. Generates and returns the OAuth flow and the authentication URL. """ flow = init_oauth_flow(client_config) auth_url, _ = flow.authorization_url(prompt="consent") return flow, auth_url def auth_search_console(client_config, credentials): """ Authenticates the user with the Google Search Console API using provided credentials. Returns an authenticated searchconsole client. """ token = { "token": credentials.token, "refresh_token": credentials.refresh_token, "token_uri": credentials.token_uri, "client_id": credentials.client_id, "client_secret": credentials.client_secret, "scopes": credentials.scopes, "id_token": getattr(credentials, "id_token", None), } return searchconsole.authenticate(client_config=client_config, credentials=token) # ------------- # Data Fetching Functions # ------------- def list_gsc_properties(credentials): """ Lists all Google Search Console properties accessible with the given credentials. Returns a list of property URLs or a message if no properties are found. """ service = build('webmasters', 'v3', credentials=credentials) site_list = service.sites().list().execute() return [site['siteUrl'] for site in site_list.get('siteEntry', [])] or ["No properties found"] def fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type=None): """ Fetches Google Search Console data for a specified property, date range, dimensions, and device type. Handles errors and returns the data as a DataFrame. """ query = webproperty.query.range(start_date, end_date).search_type(search_type).dimension(*dimensions) if 'device' in dimensions and device_type and device_type != 'All Devices': query = query.filter('device', 'equals', device_type.lower()) try: df = query.limit(MAX_ROWS).get().to_dataframe() return process_gsc_data(df) except Exception as e: show_error(e) return pd.DataFrame() def process_gsc_data(df): """ Processes the GSC data to return only unique pages with their first query and relevancy score. """ st.write("Processing GSC data...") st.write(f"Input DataFrame shape: {df.shape}") st.write(f"Input DataFrame columns: {df.columns}") # Sort the dataframe by page and clicks (descending) to get the most relevant query first df_sorted = df.sort_values(['page', 'clicks'], ascending=[True, False]) # Get the first occurrence of each page (which will be the one with the highest clicks) df_unique = df_sorted.drop_duplicates(subset='page', keep='first').copy() st.write(f"Unique pages DataFrame shape: {df_unique.shape}") st.write(f"Unique pages DataFrame columns: {df_unique.columns}") # Ensure 'relevancy_score' column exists and is preserved if 'relevancy_score' not in df_unique.columns: st.write("Relevancy score column not found, adding default values") df_unique['relevancy_score'] = 0 # Default value if column doesn't exist else: st.write("Preserving relevancy scores") # Make sure to keep the original relevancy scores df_unique['relevancy_score'] = df_sorted.groupby('page')['relevancy_score'].first().values # Select only the relevant columns, including the relevancy_score result = df_unique[['page', 'query', 'clicks', 'impressions', 'ctr', 'position', 'relevancy_score']] st.write(f"Processed data. Shape: {result.shape}") st.write(f"Columns: {result.columns}") st.write(f"Sample relevancy scores: {result['relevancy_score'].head()}") return result def fetch_data_loading(webproperty, search_type, start_date, end_date, dimensions, device_type=None): """ Fetches Google Search Console data with a loading indicator and calculates relevancy scores. """ with st.spinner('Fetching data and calculating relevancy scores...'): df = fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type) st.write(f"Data fetched. Shape: {df.shape}") st.write(f"Columns: {df.columns}") if not df.empty: df = calculate_relevancy_scores(df) st.write("Relevancy scores calculated.") st.write(f"DataFrame shape after calculating scores: {df.shape}") st.write(f"DataFrame columns after calculating scores: {df.columns}") st.write(f"Sample relevancy scores after calculation: {df['relevancy_score'].head()}") processed_df = process_gsc_data(df) st.write("Data processed.") st.write(f"Final DataFrame shape: {processed_df.shape}") st.write(f"Final DataFrame columns: {processed_df.columns}") st.write(f"Final sample relevancy scores: {processed_df['relevancy_score'].head()}") return processed_df """ Fetches Google Search Console data with a loading indicator. Utilises 'fetch_gsc_data' for data retrieval. Returns the fetched data as a DataFrame. """ with st.spinner('Fetching data...'): return fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type) # ------------- # Utility Functions # ------------- def update_dimensions(selected_search_type): """ Updates and returns the list of dimensions based on the selected search type. Adds 'device' to dimensions if the search type requires it. """ return BASE_DIMENSIONS + ['device'] if selected_search_type in SEARCH_TYPES else BASE_DIMENSIONS def calc_date_range(selection, custom_start=None, custom_end=None): """ Calculates the date range based on the selected range option. Returns the start and end dates for the specified range. """ range_map = { 'Last 7 Days': 7, 'Last 30 Days': 30, 'Last 3 Months': 90, 'Last 6 Months': 180, 'Last 12 Months': 365, 'Last 16 Months': 480 } today = datetime.date.today() if selection == 'Custom Range': if custom_start and custom_end: return custom_start, custom_end else: return today - datetime.timedelta(days=7), today return today - datetime.timedelta(days=range_map.get(selection, 0)), today def show_error(e): """ Displays an error message in the Streamlit app. Formats and shows the provided error 'e'. """ st.error(f"An error occurred: {e}") def property_change(): """ Updates the 'selected_property' in the Streamlit session state. Triggered on change of the property selection. """ st.session_state.selected_property = st.session_state['selected_property_selector'] # ------------- # File & Download Operations # ------------- def show_dataframe(report): """ Shows a preview of the first 100 rows of the processed report DataFrame in an expandable section. """ with st.expander("Preview the First 100 Rows (Unique Pages with Top Query)"): st.dataframe(report.head(DF_PREVIEW_ROWS)) def download_csv_link(report): """ Generates and displays a download link for the report DataFrame in CSV format. """ def to_csv(df): return df.to_csv(index=False, encoding='utf-8-sig') csv = to_csv(report) b64_csv = base64.b64encode(csv.encode()).decode() href = f'Download CSV File' st.markdown(href, unsafe_allow_html=True) # ------------- # Streamlit UI Components # ------------- def show_google_sign_in(auth_url): """ Displays the Google sign-in button and authentication URL in the Streamlit sidebar. """ with st.sidebar: if st.button("Sign in with Google"): # Open the authentication URL st.write('Please click the link below to sign in:') st.markdown(f'[Google Sign-In]({auth_url})', unsafe_allow_html=True) def show_property_selector(properties, account): """ Displays a dropdown selector for Google Search Console properties. Returns the selected property's webproperty object. """ selected_property = st.selectbox( "Select a Search Console Property:", properties, index=properties.index( st.session_state.selected_property) if st.session_state.selected_property in properties else 0, key='selected_property_selector', on_change=property_change ) return account[selected_property] def show_search_type_selector(): """ Displays a dropdown selector for choosing the search type. Returns the selected search type. """ return st.selectbox( "Select Search Type:", SEARCH_TYPES, index=SEARCH_TYPES.index(st.session_state.selected_search_type), key='search_type_selector' ) def show_date_range_selector(): """ Displays a dropdown selector for choosing the date range. Returns the selected date range option. """ return st.selectbox( "Select Date Range:", DATE_RANGE_OPTIONS, index=DATE_RANGE_OPTIONS.index(st.session_state.selected_date_range), key='date_range_selector' ) def show_custom_date_inputs(): """ Displays date input fields for custom date range selection. Updates session state with the selected dates. """ st.session_state.custom_start_date = st.date_input("Start Date", st.session_state.custom_start_date) st.session_state.custom_end_date = st.date_input("End Date", st.session_state.custom_end_date) def show_dimensions_selector(search_type): """ Displays a multi-select box for choosing dimensions based on the selected search type. Returns the selected dimensions. """ available_dimensions = update_dimensions(search_type) return st.multiselect( "Select Dimensions:", available_dimensions, default=st.session_state.selected_dimensions, key='dimensions_selector' ) def show_fetch_data_button(webproperty, search_type, start_date, end_date, selected_dimensions): """ Displays a button to fetch data based on selected parameters. Shows the report DataFrame and download link upon successful data fetching. """ if st.button("Fetch Data"): report = fetch_data_loading(webproperty, search_type, start_date, end_date, selected_dimensions) if report is not None and not report.empty: show_dataframe(report) download_csv_link(report) else: st.warning("No data found for the selected criteria.") # ------------- # Main Streamlit App Function # ------------- # Main Streamlit App Function def main(): """ The main function for the Streamlit application. Handles the app setup, authentication, UI components, and data fetching logic. """ setup_streamlit() client_config = load_config() st.session_state.auth_flow, st.session_state.auth_url = google_auth(client_config) query_params = st.experimental_get_query_params() auth_code = query_params.get("code", [None])[0] if auth_code and not st.session_state.get('credentials'): st.session_state.auth_flow.fetch_token(code=auth_code) st.session_state.credentials = st.session_state.auth_flow.credentials if not st.session_state.get('credentials'): show_google_sign_in(st.session_state.auth_url) else: init_session_state() account = auth_search_console(client_config, st.session_state.credentials) properties = list_gsc_properties(st.session_state.credentials) if properties: webproperty = show_property_selector(properties, account) search_type = show_search_type_selector() date_range_selection = show_date_range_selector() if date_range_selection == 'Custom Range': show_custom_date_inputs() start_date, end_date = st.session_state.custom_start_date, st.session_state.custom_end_date else: start_date, end_date = calc_date_range(date_range_selection) selected_dimensions = show_dimensions_selector(search_type) if st.button("Fetch Data and Calculate Relevancy"): report = fetch_data_loading(webproperty, search_type, start_date, end_date, selected_dimensions) if report is not None and not report.empty: show_dataframe(report) download_csv_link(report) else: st.warning("No data found for the selected criteria.") if __name__ == "__main__": main()