import os from google.cloud import bigquery from google.oauth2 import service_account from config import BIGQUERY_KEY_PATH def init_bigquery_connection(): """Initialize connection to BigQuery.""" # Check if the key file exists if not os.path.exists(BIGQUERY_KEY_PATH): print("⚠️ Service account key file not found. Please provide a valid key file path.") return None try: # Create credentials using the service account key credentials = service_account.Credentials.from_service_account_file( BIGQUERY_KEY_PATH, scopes=["https://www.googleapis.com/auth/bigquery"] ) # Create BigQuery client client = bigquery.Client(credentials=credentials, project=credentials.project_id) print("✅ Successfully connected to BigQuery!") return client except Exception as e: print(f"❌ Error connecting to BigQuery: {str(e)}") return None def get_bigquery_schema_info(client, project_id, dataset_id): """Retrieve comprehensive schema information for all tables in a BigQuery dataset using INFORMATION_SCHEMA.""" query = f""" SELECT table_name, column_name, data_type FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS` ORDER BY table_name, ordinal_position; """ try: query_job = client.query(query) results = query_job.result() # Organize results by table schema_info = {} for row in results: table_name = row.table_name column_name = row.column_name data_type = row.data_type if table_name not in schema_info: schema_info[table_name] = [] schema_info[table_name].append(f"{column_name} ({data_type})") return schema_info except Exception as e: print(f"Error retrieving schema information: {str(e)}") return {} async def execute_custom_query(client, query): """Execute a custom SQL query against BigQuery.""" if not query.strip(): return {"error": "Empty query"} if client is None: return {"error": "Failed to connect to BigQuery."} try: # Execute the query query_job = client.query(query) results = query_job.result() # Wait for query to complete # Get column names from schema schema = query_job.result().schema column_names = [field.name for field in schema] # Convert results to a list of tuples rows = [] for row in results: row_values = tuple(row.values()) rows.append(row_values) if not rows: return {"error": "Query executed successfully but returned no results."} # Return both rows and column names return {"rows": rows, "column_names": column_names} except Exception as e: return {"error": str(e)}