File size: 3,013 Bytes
05e3517
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
from google.cloud import bigquery
from google.oauth2 import service_account
from config import BIGQUERY_KEY_PATH

def init_bigquery_connection():
    """Initialize connection to BigQuery."""
    # Check if the key file exists
    if not os.path.exists(BIGQUERY_KEY_PATH):
        print("⚠️ Service account key file not found. Please provide a valid key file path.")
        return None
    
    try:
        # Create credentials using the service account key
        credentials = service_account.Credentials.from_service_account_file(
            BIGQUERY_KEY_PATH, 
            scopes=["https://www.googleapis.com/auth/bigquery"]
        )
        
        # Create BigQuery client
        client = bigquery.Client(credentials=credentials, project=credentials.project_id)
        print("✅ Successfully connected to BigQuery!")
        return client
    except Exception as e:
        print(f"❌ Error connecting to BigQuery: {str(e)}")
        return None

def get_bigquery_schema_info(client, project_id, dataset_id):
    """Retrieve comprehensive schema information for all tables in a BigQuery dataset using INFORMATION_SCHEMA."""
    
    query = f"""
    SELECT table_name, column_name, data_type
    FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS`
    ORDER BY table_name, ordinal_position;
    """
    
    try:
        query_job = client.query(query)
        results = query_job.result()
        
        # Organize results by table
        schema_info = {}
        for row in results:
            table_name = row.table_name
            column_name = row.column_name
            data_type = row.data_type
            
            if table_name not in schema_info:
                schema_info[table_name] = []
                
            schema_info[table_name].append(f"{column_name} ({data_type})")
        
        return schema_info
    
    except Exception as e:
        print(f"Error retrieving schema information: {str(e)}")
        return {}

async def execute_custom_query(client, query):
    """Execute a custom SQL query against BigQuery."""
    if not query.strip():
        return {"error": "Empty query"}
    
    if client is None:
        return {"error": "Failed to connect to BigQuery."}
    
    try:
        # Execute the query
        query_job = client.query(query)
        results = query_job.result()  # Wait for query to complete
        
        # Get column names from schema
        schema = query_job.result().schema
        column_names = [field.name for field in schema]
        
        # Convert results to a list of tuples
        rows = []
        for row in results:
            row_values = tuple(row.values())
            rows.append(row_values)
        
        if not rows:
            return {"error": "Query executed successfully but returned no results."}
        
        # Return both rows and column names
        return {"rows": rows, "column_names": column_names}
    except Exception as e:
        return {"error": str(e)}