multi_agentic_sql_generator / utils /bigquery_utils.py
Gonalb's picture
init commit
05e3517
import os
from google.cloud import bigquery
from google.oauth2 import service_account
from config import BIGQUERY_KEY_PATH
def init_bigquery_connection():
"""Initialize connection to BigQuery."""
# Check if the key file exists
if not os.path.exists(BIGQUERY_KEY_PATH):
print("⚠️ Service account key file not found. Please provide a valid key file path.")
return None
try:
# Create credentials using the service account key
credentials = service_account.Credentials.from_service_account_file(
BIGQUERY_KEY_PATH,
scopes=["https://www.googleapis.com/auth/bigquery"]
)
# Create BigQuery client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
print("✅ Successfully connected to BigQuery!")
return client
except Exception as e:
print(f"❌ Error connecting to BigQuery: {str(e)}")
return None
def get_bigquery_schema_info(client, project_id, dataset_id):
"""Retrieve comprehensive schema information for all tables in a BigQuery dataset using INFORMATION_SCHEMA."""
query = f"""
SELECT table_name, column_name, data_type
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS`
ORDER BY table_name, ordinal_position;
"""
try:
query_job = client.query(query)
results = query_job.result()
# Organize results by table
schema_info = {}
for row in results:
table_name = row.table_name
column_name = row.column_name
data_type = row.data_type
if table_name not in schema_info:
schema_info[table_name] = []
schema_info[table_name].append(f"{column_name} ({data_type})")
return schema_info
except Exception as e:
print(f"Error retrieving schema information: {str(e)}")
return {}
async def execute_custom_query(client, query):
"""Execute a custom SQL query against BigQuery."""
if not query.strip():
return {"error": "Empty query"}
if client is None:
return {"error": "Failed to connect to BigQuery."}
try:
# Execute the query
query_job = client.query(query)
results = query_job.result() # Wait for query to complete
# Get column names from schema
schema = query_job.result().schema
column_names = [field.name for field in schema]
# Convert results to a list of tuples
rows = []
for row in results:
row_values = tuple(row.values())
rows.append(row_values)
if not rows:
return {"error": "Query executed successfully but returned no results."}
# Return both rows and column names
return {"rows": rows, "column_names": column_names}
except Exception as e:
return {"error": str(e)}