Spaces:
Sleeping
Sleeping
import os | |
from google.cloud import bigquery | |
from google.oauth2 import service_account | |
from config import BIGQUERY_KEY_PATH | |
def init_bigquery_connection(): | |
"""Initialize connection to BigQuery.""" | |
# Check if the key file exists | |
if not os.path.exists(BIGQUERY_KEY_PATH): | |
print("⚠️ Service account key file not found. Please provide a valid key file path.") | |
return None | |
try: | |
# Create credentials using the service account key | |
credentials = service_account.Credentials.from_service_account_file( | |
BIGQUERY_KEY_PATH, | |
scopes=["https://www.googleapis.com/auth/bigquery"] | |
) | |
# Create BigQuery client | |
client = bigquery.Client(credentials=credentials, project=credentials.project_id) | |
print("✅ Successfully connected to BigQuery!") | |
return client | |
except Exception as e: | |
print(f"❌ Error connecting to BigQuery: {str(e)}") | |
return None | |
def get_bigquery_schema_info(client, project_id, dataset_id): | |
"""Retrieve comprehensive schema information for all tables in a BigQuery dataset using INFORMATION_SCHEMA.""" | |
query = f""" | |
SELECT table_name, column_name, data_type | |
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS` | |
ORDER BY table_name, ordinal_position; | |
""" | |
try: | |
query_job = client.query(query) | |
results = query_job.result() | |
# Organize results by table | |
schema_info = {} | |
for row in results: | |
table_name = row.table_name | |
column_name = row.column_name | |
data_type = row.data_type | |
if table_name not in schema_info: | |
schema_info[table_name] = [] | |
schema_info[table_name].append(f"{column_name} ({data_type})") | |
return schema_info | |
except Exception as e: | |
print(f"Error retrieving schema information: {str(e)}") | |
return {} | |
async def execute_custom_query(client, query): | |
"""Execute a custom SQL query against BigQuery.""" | |
if not query.strip(): | |
return {"error": "Empty query"} | |
if client is None: | |
return {"error": "Failed to connect to BigQuery."} | |
try: | |
# Execute the query | |
query_job = client.query(query) | |
results = query_job.result() # Wait for query to complete | |
# Get column names from schema | |
schema = query_job.result().schema | |
column_names = [field.name for field in schema] | |
# Convert results to a list of tuples | |
rows = [] | |
for row in results: | |
row_values = tuple(row.values()) | |
rows.append(row_values) | |
if not rows: | |
return {"error": "Query executed successfully but returned no results."} | |
# Return both rows and column names | |
return {"rows": rows, "column_names": column_names} | |
except Exception as e: | |
return {"error": str(e)} |