|
import pymysql |
|
|
|
|
|
class OptionMysql(object): |
|
def __init__(self, options): |
|
host = options["HOST"] |
|
user = options["USERNAME"] |
|
password = options["PASSWORD"] |
|
database = options["DATABASE"] |
|
port = options["PORT"] |
|
charset = "utf8" |
|
|
|
self.conn = pymysql.connect( |
|
host=host, |
|
port=port, |
|
user=user, |
|
password=password, |
|
database=database, |
|
charset=charset, |
|
) |
|
|
|
self.cur = self.conn.cursor() |
|
self.dict_cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) |
|
|
|
def __exit__(self): |
|
self.cur.close() |
|
self.dict_cur.close() |
|
self.conn.close() |
|
|
|
def get_dict_data(self, sql, params=[]): |
|
"""查询,返回字典类型""" |
|
try: |
|
if params: |
|
self.dict_cur.execute(sql, params) |
|
else: |
|
self.dict_cur.execute(sql) |
|
data = self.dict_cur.fetchall() |
|
return data |
|
except Exception as e: |
|
self.conn.rollback() |
|
raise e |
|
|
|
def update_data(self, sql, params=[]): |
|
"""更新""" |
|
try: |
|
if not params: |
|
self.cur.execute(sql) |
|
else: |
|
self.cur.execute(sql, params) |
|
self.conn.commit() |
|
except Exception as e: |
|
self.conn.rollback() |
|
raise e |
|
return True |
|
|