Spaces:
Sleeping
Sleeping
| import os | |
| from google.cloud import bigquery | |
| from google.oauth2 import service_account | |
| from config import BIGQUERY_KEY_PATH | |
| def init_bigquery_connection(): | |
| """Initialize connection to BigQuery.""" | |
| # Check if the key file exists | |
| if not os.path.exists(BIGQUERY_KEY_PATH): | |
| print("⚠️ Service account key file not found. Please provide a valid key file path.") | |
| return None | |
| try: | |
| # Create credentials using the service account key | |
| credentials = service_account.Credentials.from_service_account_file( | |
| BIGQUERY_KEY_PATH, | |
| scopes=["https://www.googleapis.com/auth/bigquery"] | |
| ) | |
| # Create BigQuery client | |
| client = bigquery.Client(credentials=credentials, project=credentials.project_id) | |
| print("✅ Successfully connected to BigQuery!") | |
| return client | |
| except Exception as e: | |
| print(f"❌ Error connecting to BigQuery: {str(e)}") | |
| return None | |
| def get_bigquery_schema_info(client, project_id, dataset_id): | |
| """Retrieve comprehensive schema information for all tables in a BigQuery dataset using INFORMATION_SCHEMA.""" | |
| query = f""" | |
| SELECT table_name, column_name, data_type | |
| FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS` | |
| ORDER BY table_name, ordinal_position; | |
| """ | |
| try: | |
| query_job = client.query(query) | |
| results = query_job.result() | |
| # Organize results by table | |
| schema_info = {} | |
| for row in results: | |
| table_name = row.table_name | |
| column_name = row.column_name | |
| data_type = row.data_type | |
| if table_name not in schema_info: | |
| schema_info[table_name] = [] | |
| schema_info[table_name].append(f"{column_name} ({data_type})") | |
| return schema_info | |
| except Exception as e: | |
| print(f"Error retrieving schema information: {str(e)}") | |
| return {} | |
| async def execute_custom_query(client, query): | |
| """Execute a custom SQL query against BigQuery.""" | |
| if not query.strip(): | |
| return {"error": "Empty query"} | |
| if client is None: | |
| return {"error": "Failed to connect to BigQuery."} | |
| try: | |
| # Execute the query | |
| query_job = client.query(query) | |
| results = query_job.result() # Wait for query to complete | |
| # Get column names from schema | |
| schema = query_job.result().schema | |
| column_names = [field.name for field in schema] | |
| # Convert results to a list of tuples | |
| rows = [] | |
| for row in results: | |
| row_values = tuple(row.values()) | |
| rows.append(row_values) | |
| if not rows: | |
| return {"error": "Query executed successfully but returned no results."} | |
| # Return both rows and column names | |
| return {"rows": rows, "column_names": column_names} | |
| except Exception as e: | |
| return {"error": str(e)} |