chronos / Mark-1 /db_creation.py
Manoj Kumar
Mark POhase 1
e6f4fec
import sqlite3
import random
from faker import Faker
# Initialize Faker for generating random data
fake = Faker()
# Define custom schema
custom_schema = {
"products": {
"columns": ["product_id INTEGER PRIMARY KEY", "name TEXT", "price REAL", "category_id INTEGER"],
"relations": ["category_id -> categories.id"],
},
"categories": {
"columns": ["id INTEGER PRIMARY KEY", "category_name TEXT"],
"relations": None,
},
"orders": {
"columns": ["order_id INTEGER PRIMARY KEY", "user_id INTEGER", "product_id INTEGER", "order_date TEXT"],
"relations": ["product_id -> products.product_id", "user_id -> users.user_id"],
},
"users": {
"columns": [
"user_id INTEGER PRIMARY KEY",
"first_name TEXT",
"last_name TEXT",
"email TEXT UNIQUE",
"phone_number TEXT",
"address TEXT"
],
"relations": None,
}
}
# Connect to SQLite database
conn = sqlite3.connect("ecommerce.db")
cursor = conn.cursor()
# Function to create tables based on schema
def create_tables():
for table_name, table_data in custom_schema.items():
columns = ", ".join(table_data["columns"])
table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns}"
if table_data["relations"]:
for relation in table_data["relations"]:
col_name, ref_table = relation.split(" -> ")
ref_col = ref_table.split(".")[1]
ref_table_name = ref_table.split(".")[0]
table_sql += f", FOREIGN KEY({col_name}) REFERENCES {ref_table_name}({ref_col})"
table_sql += ");"
cursor.execute(table_sql)
# Function to populate categories table
def insert_categories():
categories = [(i, fake.word().capitalize() + " " + fake.word().capitalize()) for i in range(1, 1001)]
cursor.executemany("INSERT INTO categories (id, category_name) VALUES (?, ?)", categories)
return categories
# Function to populate products table
def insert_products(categories):
products = [(i, fake.company() + " " + fake.word().capitalize(), round(random.uniform(10, 1000), 2), random.choice(categories)[0]) for i in range(1, 1001)]
cursor.executemany("INSERT INTO products (product_id, name, price, category_id) VALUES (?, ?, ?, ?)", products)
return products
# Function to populate users table
def insert_users():
users = [(i, fake.first_name(), fake.last_name(), fake.email(), fake.phone_number(), fake.address()) for i in range(1, 1001)]
cursor.executemany("INSERT OR IGNORE INTO users (user_id, first_name, last_name, email, phone_number, address) VALUES (?, ?, ?, ?, ?, ?)", users)
return users
# Function to populate orders table
def insert_orders(users, products):
orders = [(i, random.choice(users)[0], random.choice(products)[0], fake.date_this_year().strftime("%Y-%m-%d")) for i in range(1, 1001)]
cursor.executemany("INSERT INTO orders (order_id, user_id, product_id, order_date) VALUES (?, ?, ?, ?)", orders)
# Create tables
create_tables()
# Insert data into tables
categories = insert_categories()
products = insert_products(categories)
users = insert_users()
insert_orders(users, products)
# Commit and close connection
conn.commit()
conn.close()
print("1000 rows inserted into each table successfully!")