|
import sqlite3 |
|
import random |
|
from faker import Faker |
|
|
|
|
|
fake = Faker() |
|
|
|
|
|
custom_schema = { |
|
"products": { |
|
"columns": ["product_id INTEGER PRIMARY KEY", "name TEXT", "price REAL", "category_id INTEGER"], |
|
"relations": ["category_id -> categories.id"], |
|
}, |
|
"categories": { |
|
"columns": ["id INTEGER PRIMARY KEY", "category_name TEXT"], |
|
"relations": None, |
|
}, |
|
"orders": { |
|
"columns": ["order_id INTEGER PRIMARY KEY", "user_id INTEGER", "product_id INTEGER", "order_date TEXT"], |
|
"relations": ["product_id -> products.product_id", "user_id -> users.user_id"], |
|
}, |
|
"users": { |
|
"columns": [ |
|
"user_id INTEGER PRIMARY KEY", |
|
"first_name TEXT", |
|
"last_name TEXT", |
|
"email TEXT UNIQUE", |
|
"phone_number TEXT", |
|
"address TEXT" |
|
], |
|
"relations": None, |
|
} |
|
} |
|
|
|
|
|
conn = sqlite3.connect("ecommerce.db") |
|
cursor = conn.cursor() |
|
|
|
|
|
def create_tables(): |
|
for table_name, table_data in custom_schema.items(): |
|
columns = ", ".join(table_data["columns"]) |
|
table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns}" |
|
|
|
if table_data["relations"]: |
|
for relation in table_data["relations"]: |
|
col_name, ref_table = relation.split(" -> ") |
|
ref_col = ref_table.split(".")[1] |
|
ref_table_name = ref_table.split(".")[0] |
|
table_sql += f", FOREIGN KEY({col_name}) REFERENCES {ref_table_name}({ref_col})" |
|
|
|
table_sql += ");" |
|
cursor.execute(table_sql) |
|
|
|
|
|
def insert_categories(): |
|
categories = [(i, fake.word().capitalize() + " " + fake.word().capitalize()) for i in range(1, 1001)] |
|
cursor.executemany("INSERT INTO categories (id, category_name) VALUES (?, ?)", categories) |
|
return categories |
|
|
|
|
|
def insert_products(categories): |
|
products = [(i, fake.company() + " " + fake.word().capitalize(), round(random.uniform(10, 1000), 2), random.choice(categories)[0]) for i in range(1, 1001)] |
|
cursor.executemany("INSERT INTO products (product_id, name, price, category_id) VALUES (?, ?, ?, ?)", products) |
|
return products |
|
|
|
|
|
def insert_users(): |
|
users = [(i, fake.first_name(), fake.last_name(), fake.email(), fake.phone_number(), fake.address()) for i in range(1, 1001)] |
|
cursor.executemany("INSERT OR IGNORE INTO users (user_id, first_name, last_name, email, phone_number, address) VALUES (?, ?, ?, ?, ?, ?)", users) |
|
return users |
|
|
|
|
|
def insert_orders(users, products): |
|
orders = [(i, random.choice(users)[0], random.choice(products)[0], fake.date_this_year().strftime("%Y-%m-%d")) for i in range(1, 1001)] |
|
cursor.executemany("INSERT INTO orders (order_id, user_id, product_id, order_date) VALUES (?, ?, ?, ?)", orders) |
|
|
|
|
|
create_tables() |
|
|
|
|
|
categories = insert_categories() |
|
products = insert_products(categories) |
|
users = insert_users() |
|
insert_orders(users, products) |
|
|
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
print("1000 rows inserted into each table successfully!") |
|
|