Files
roll-room/pyground/sugar/write_sql.py
2026-04-23 16:58:11 +08:00

162 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 假设有一个数据库辅助类,你需要根据实际环境实现
import json
import pymysql
class DB:
def __init__(self, host, user, password, database, port=3306, charset="utf8mb4"):
"""
初始化数据库连接
"""
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
port=port,
charset=charset,
cursorclass=pymysql.cursors.DictCursor,
)
self.last_insert_id = None
def _execute(self, sql, params=None):
"""
内部方法执行SQL并返回cursor
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params or ())
return cursor
except Exception as e:
print(f"SQL执行错误: {e}\nSQL: {sql}")
raise
def getOne(self, sql):
# 模拟查询一行
cursor = self._execute(sql)
return cursor.fetchone()
def getone(self, sql):
# PHP代码中混用了getOne和getone通常功能一致
return self.getOne(sql)
def query(self, sql):
"""
执行SQL (通常用于无返回结果的操作如UPDATE, DELETE, CREATE等)
"""
try:
with self.connection.cursor() as cursor:
result = cursor.execute(sql)
self.connection.commit()
# 记录最后插入的ID (如果是INSERT操作)
self.last_insert_id = cursor.lastrowid
return result
except Exception as e:
self.connection.rollback()
print(f"Query执行失败: {e}\nSQL: {sql}")
raise
def query_array(self, sql):
"""
查询多行
"""
cursor = self._execute(sql)
return cursor.fetchall()
def insert_array(self, data, table):
"""
插入数据
:param data: 字典,例如 [{'name': 'Alice', 'age': 25}]
:param table: 表名
"""
if not data:
return False
columns = ", ".join(data[0].keys())
# 使用占位符防止SQL注入
placeholders = ", ".join(["%s"] * len(data[0]))
values = [tuple(item.values()) for item in data]
sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
try:
with self.connection.cursor() as cursor:
cursor.executemany(sql, values)
self.connection.commit()
self.last_insert_id = cursor.lastrowid
return self.last_insert_id
except Exception as e:
self.connection.rollback()
print(f"插入数据失败: {e}")
raise
def insert_id(self):
"""
获取最后插入ID
"""
return self.last_insert_id
def close(self):
"""
关闭数据库连接
"""
if self.connection:
self.connection.close()
db = DB(host="localhost", user="fire", password="iTSJSPPZM3LSGAPC", database="fire")
file_name = "simulator_log_super_1776010009.7211218.log"
with open(file_name, "r") as f:
type = ""
if "super" in file_name:
type = "_super"
elif "normal" in file_name:
type = ""
elif "standard" in file_name:
type = "_free"
# db.query("truncate table game_sugar_spin{type}".format(type=type))
# db.query("truncate table game_sugar_step_info{type}".format(type=type))
spins = []
steps_data = []
index = 0
startGid = 10000
for line in f.readlines():
index += 1
data = json.loads(line)
data["gid"] = startGid + data['gid']
steps = data["steps"]
for step in steps:
step["gid"] = data["gid"]
step["symbol_links"] = json.dumps(
step["symbol_links"], separators=(",", ":"), ensure_ascii=False
)
step["multipler"] = json.dumps(
step["multipler"], separators=(",", ":"), ensure_ascii=False
)
steps_data.append(step)
del data["steps"]
spins.append(data)
if len(spins) > 100:
db.insert_array(spins, f"game_sugar_spin{type}")
spins = []
print(f"写入进度: {index}个spin")
if len(steps_data) > 300:
db.insert_array(steps_data, f"game_sugar_step_info{type}")
steps_data = []
if len(spins) > 0:
db.insert_array(spins, f"game_sugar_spin{type}")
spins = []
if len(steps_data) > 0:
db.insert_array(steps_data, f"game_sugar_step_info{type}")
steps_data = []