Initial commit

This commit is contained in:
2026-04-23 16:58:11 +08:00
commit 267eba1eca
2582 changed files with 273338 additions and 0 deletions

12
pyground/sugar/README.md Normal file
View File

@@ -0,0 +1,12 @@
SugarRush1000.py 是业务核心逻辑文件
SugarRushAutoRTPTuner.py 游戏RTP调参和校验文件
SugarRushAutoFreeSpinsTuner.py 购买一次免费旋转的旋转次数调参和校验文件
SugarRushSimulator.py 根据指定权重参数运行n次产生对局日志这些日志将转成sql写入数据库供后端逻辑使用
write_sql.py 根据日志文件生成sql写入数据库
数据库数据dump
mysqldump -uroot -p123456 db_name table_name > dump.sql
数据库数据导入
mysql -uroot -p123456 db_name < sql

View File

@@ -0,0 +1,489 @@
import json
import random
from typing import List, Dict, Tuple, Optional
from symbol_pay_table import get_symbol_pay
# 符号默认权重
default_symbol_weights = {
"A": 2, # 高赔率
"B": 5, # 中高赔率
"C": 10, # 中等赔率
"D": 15, # 中低赔率
"E": 20, # 低赔率
"F": 30, # 极低赔率
"G": 40, # 最低赔率
"S": 1, # Scatter
}
scotter_count_mapping = {"3": 10, "4": 12, "5": 15, "6": 20, "7": 30}
class SugarRush1000:
def __init__(
self,
balance: float = 1000.0,
bet: float = 1.0,
mock_grid=None,
weights=None,
scotter_counts_weights=None,
):
"""
初始化游戏
:param balance: 玩家余额
:param bet: 单次下注额
"""
self.balance = balance
self.bet = bet
self.rows = 7
self.cols = 7
# 游戏符号定义和权重
self.symbol_weights = weights if weights else default_symbol_weights
self.symbols = list(self.symbol_weights.keys())
self.weights = list(self.symbol_weights.values())
if scotter_counts_weights is None:
scotter_counts_weights = {3: 10, 4: 12, 5: 15, 6: 20, 7: 30}
self.scotter_counts = list(scotter_counts_weights.keys())
self.scotter_counts_weights = list(scotter_counts_weights.values())
# 游戏状态
self.grid = []
# 7x7 网格
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
self.free_spins = 0 # 剩余免费旋转次数
self.is_free_spin_status = False
self.max_win_cap = 25000 # 最大赢利倍数限制
self.is_buy_free = False
self.is_super_free = False # 是否为超级免费旋转模式
self.spin_total_win = (
0 # 本次旋转的总赢利。主要计算免费旋转过程中累积盈利是否超过最大额度
)
self.mock_grid = mock_grid # 用于测试的固定网格
self.drop_sequence = [] # 用于测试的掉落序列
def _get_next_drop_symbol(self, count=1, no_scatter: bool = False):
"""生成一列中的count个符号要求一列中最多有一个scatter符号
no_scatter: 是否排除scatter符号。如果设置为True则不生成scatter符号如果设置为False则则本次生成的符号中顶多有一个scatter符号
"""
# 测试辅助方法从预设序列中获取掉落符号。预设序列不使用最多有一个scatter符号的规则
result = []
while count > 0:
if self.drop_sequence:
result.append(self.drop_sequence.pop(0))
count -= 1
else:
break
symbols1 = self.symbols[:]
weights1 = self.weights[:]
tmp = [(s, w) for s, w in zip(symbols1, weights1) if s != "S"]
symbols2, weights2 = map(list, zip(*tmp))
for _ in range(count):
tmp_symbols = symbols1
tmp_weights = weights1
if no_scatter:
tmp_symbols = symbols2
tmp_weights = weights2
symbol = random.choices(tmp_symbols, weights=tmp_weights, k=1)[0]
result.append(symbol)
if symbol == "S":
no_scatter = True
return result
def grid_col_has_scotter(self, col):
for row in range(self.rows):
if self.grid[row][col] == "S":
return True
return False
def _init_grid(self):
"""初始化或重置网格"""
self.grid = [[None for _ in range(self.cols)] for _ in range(self.rows)]
for col in range(self.cols):
symbols = self._get_next_drop_symbol(count=self.rows, no_scatter=False)
for row in range(self.rows):
self.grid[row][col] = symbols[row]
def buy_free_spins(self, buy_type: Optional[str] = None) -> int:
""":param buy_type: None=普通旋转, 'standard'=买免费旋转(50x), 'super'=买超级免费旋转(200x)"""
result = {}
if not buy_type:
result = {"message": "未购买任何功能", "balance": self.balance}
cost = 0
if buy_type == "standard":
cost = self.bet * 50
elif buy_type == "super":
cost = self.bet * 200
if self.balance < cost:
result["message"] = "余额不足,无法购买功能"
return result
self.is_buy_free = True
self.is_super_free = True if buy_type == "super" else False
self.balance -= cost
self.free_spins = 0
result = {
"cost": cost,
"balance": self.balance,
"is_buy_free": self.is_buy_free,
"is_super_free": self.is_super_free,
}
return result
def doSpin(self) -> Dict:
"""
执行一次旋转/翻转操作的核心接口
:param buy_type: None=普通旋转, 'standard'=买免费旋转(100x), 'super'=买超级免费旋转(500x)
:return: 包含游戏状态和结果的字典
"""
result = {
"balance": self.balance,
"bet": self.bet,
"actual_bet": 0.0,
"is_scotter": False,
"added_spins": 0,
"win": 0.0,
"grid": self.grid,
"multipliers": self.multipliers,
"free_spins_remaining": self.free_spins,
"is_super": self.is_super_free,
"message": "",
"error": 0,
"extra_free": False,
}
# 1. 扣除下注积分
if (
not self.is_buy_free and not self.is_free_spin_status
): # 普通模式,而且不是买免费旋转
if self.balance < self.bet:
result["message"] = "游戏结束,余额不足"
result["error"] = 1
return result
result["actual_bet"] = self.bet
self.balance -= self.bet
# 初始化游戏状态
# 不是免费旋转模式则重置乘数点和累计盈利
if not self.is_free_spin_status:
self.spin_total_win = 0
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
# 免费旋转扣除次数
if self.free_spins > 0:
self.free_spins -= 1
# 1.2 初始化网格
if self.mock_grid:
self.grid = self.mock_grid
else:
self._init_grid()
# 2. 处理购买功能,触发>=3个的scotter
buy_free = self.is_buy_free
if self.is_buy_free:
self.is_buy_free = False
# 购买时强制触发Scatter
scatters_count = random.choices(
self.scotter_counts, weights=self.scotter_counts_weights, k=1
)[0]
self._force_scatters(int(scatters_count))
# 2.1 如果是超级免费旋转而且没有初始化则初始化所有点为x2
if self.is_super_free and self.free_spins > 0:
self.is_super_free = False
self.multipliers = [[2 for _ in range(self.cols)] for _ in range(self.rows)]
# 3. 开始翻转循环
cascade_win = 0.0
cascade_count = 0
max_win_limit = self.bet * self.max_win_cap
steps = []
while True:
step = {
"multipler": [],
"grid": "",
# 运行后状态
"score": 0,
"symbol_links": [],
}
steps.append(step)
step["grid"] = "".join([s for row in self.grid for s in row])
for r in range(self.rows):
for c in range(self.cols):
if self.multipliers[r][c] > 0:
step["multipler"].append(r * self.cols + c)
step["multipler"].append(self.multipliers[r][c])
# 1. 查找赢奖组合
clusters = self._find_clusters()
if not clusters:
break # 没有赢奖组合,翻转结束
cascade_count += 1
current_spin_win = 0.0
# 2. 计算赔付并处理消除
# 注意:需要先收集所有要消除的位置,避免计算时网格已变
symbols_to_remove = set()
for symbol, coords in clusters:
count = len(coords)
if count < 5:
continue
# 计算该组合的乘数 (包含该区域内所有乘数点的叠加)
cluster_mult = 0
cluster_info = []
for r, c in coords:
if self.multipliers[r][c] > 1:
cluster_mult += self.multipliers[r][c]
cluster_info.append([r * 7 + c, self.multipliers[r][c]])
cluster_mult = max(1, cluster_mult)
# 基础赔付 * 符号倍率 * 组合乘数
pay_factor = get_symbol_pay(symbol, count)
win_amount = self.bet * pay_factor * cluster_mult
current_spin_win += win_amount
symbol_link = {
"symbol": symbol,
"loc": [],
"multipler": [],
"total_multi": 0,
"base_score": 0,
"score": 0,
}
symbol_link["base_score"] = self.bet * pay_factor
symbol_link["score"] = win_amount
symbol_link["total_multi"] = cluster_mult
for r, c in coords:
symbol_link["loc"].append(r * 7 + c)
for index, m in cluster_info:
symbol_link["multipler"].append(index)
symbol_link["multipler"].append(m)
step["symbol_links"].append(symbol_link)
# 记录消除位置并更新乘数点
for r, c in coords:
symbols_to_remove.add((r, c))
# 乘数点逻辑:炸开一次,点数+1 (2的幂次方)
if self.multipliers[r][c] > 0:
self.multipliers[r][c] = self.multipliers[r][c] * 2
else:
self.multipliers[r][c] = 1
if self.multipliers[r][c] > 1024:
self.multipliers[r][c] = 1024
step["score"] = current_spin_win
cascade_win += current_spin_win
# 检查是否超过最大赢利限制
if cascade_win >= max_win_limit:
cascade_win = max_win_limit
result["message"] = "达到最大赢奖上限 25000x!"
break
# 3. 执行消除和掉落
self._apply_cascade(symbols_to_remove)
# 3.1 最后处理scotter
scatter_count = sum(row.count("S") for row in self.grid)
if scatter_count >= 3:
if not self.is_free_spin_status:
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
self.is_free_spin_status = True
added_spins = self._add_free_spins(scatter_count)
if added_spins > 0:
result["is_scotter"] = True
result["added_spins"] = added_spins
result["message"] = f"触发免费旋转! +{added_spins}"
if added_spins > 0 and not buy_free:
result["extra_free"] = True
# 3.2 处理达到最大奖限制
raw_spin_total_win = self.spin_total_win
self.spin_total_win += cascade_win
if self.spin_total_win >= max_win_limit:
cascade_win = max_win_limit - raw_spin_total_win
self.spin_total_win = max_win_limit
self.free_spins = 0 # 立即结束
result["message"] = "达到最大赢奖上限 25000x!"
# 4. 结算
if cascade_win > 0:
self.balance += cascade_win
# 免费旋转模式结束
if self.free_spins <= 0:
self.is_free_spin_status = False
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
# 5. 构建结果信息
result["win"] = cascade_win
result["balance"] = self.balance
result["free_spins_remaining"] = self.free_spins
result["grid"] = self.grid
result["multipliers"] = self.multipliers
result["steps"] = steps
result["spin_total_win"] = self.spin_total_win
return result
def _find_clusters(self) -> Dict[str, List[Tuple[int, int]]]:
"""查找所有相连的符号块 (排除SCATTER)"""
visited = set()
clusters = []
for r in range(self.rows):
for c in range(self.cols):
symbol = self.grid[r][c]
if symbol == "S" or (r, c) in visited:
continue
# BFS 查找连通块
queue = [(r, c)]
connected = []
while queue:
curr_r, curr_c = queue.pop(0)
if (curr_r, curr_c) in visited:
continue
if self.grid[curr_r][curr_c] == symbol:
visited.add((curr_r, curr_c))
connected.append((curr_r, curr_c))
# 检查上下左右
for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nr, nc = curr_r + dr, curr_c + dc
if 0 <= nr < self.rows and 0 <= nc < self.cols:
if (nr, nc) not in visited and self.grid[nr][
nc
] == symbol:
queue.append((nr, nc))
if len(connected) >= 5:
clusters.append((symbol, connected))
return clusters
def _apply_cascade(self, remove_coords: set):
"""执行消除和掉落逻辑"""
# 1. 消除
for r, c in remove_coords:
self.grid[r][c] = None # 标记为空
# 2. 掉落 (按列处理)
for c in range(self.cols):
# 提取该列所有非空符号
existing_syms = [
self.grid[r][c] for r in range(self.rows) if self.grid[r][c] is not None
]
# 填充新符号到顶部
needed = self.rows - len(existing_syms)
new_syms = self._get_next_drop_symbol(
count=needed, no_scatter=self.grid_col_has_scotter(c)
)
# 重新组合该列 (新符号在上,原有符号在下)
new_col = new_syms + existing_syms
# 更新回网格
for r in range(self.rows):
self.grid[r][c] = new_col[r]
def _force_scatters(self, count: int):
"""强制在随机位置放置指定数量的Scatter"""
# 检查是否已经有,有的话改成其他符号
for r in range(self.rows):
for c in range(self.cols):
if self.grid[r][c] == "S":
self.grid[r][c] = "G"
cols = list(range(self.cols))
for _ in range(count):
x = random.choice(cols)
y = random.randint(0, self.rows - 1)
self.grid[y][x] = "S"
cols.remove(x)
def _add_free_spins(self, scatter_count: int) -> int:
"""根据Scatter数量增加免费旋转次数"""
if scatter_count < 3:
return 0
spins = scotter_count_mapping.get(str(scatter_count), 30)
self.free_spins += spins
return spins
def no_cluster_grid():
return [
(
["B" if col % 2 == 0 else "E" for col in range(7)]
if row % 2 == 0
else ["E" if col % 2 == 0 else "B" for col in range(7)]
)
for row in range(7)
]
if __name__ == "__main__":
begin_balance = 1_0000_0000
custom_grid = no_cluster_grid()
custom_grid[0] = ["A"] * 7 # 设置一个乘数点
game = SugarRush1000(
balance=begin_balance,
weights={
"A": 11.910798577481279,
"B": 23.10218485800891,
"C": 36.43351025439602,
"D": 46.62768084621008,
"E": 54.778674468859805,
"F": 64.18383150145051,
"G": 75.00641730395597,
"S": 1.5190499131843918,
},
scotter_counts_weights={
"3": 7.26421894353717,
"4": 4.1724734692682395,
"5": 0.8119106579617028,
"6": 0.20313929837878217,
"7": 0.04857599989214818,
},
mock_grid=custom_grid,
)
game.drop_sequence = [
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"A",
"B",
"C",
"D",
"A",
"B",
] # 预设掉落全是 C
res = game.doSpin()
print(json.dumps(res))

View File

@@ -0,0 +1,247 @@
import json
import random
import sys
import numpy as np
from typing import Dict, List
from SugarRush1000 import SugarRush1000
"""
训练购买免费旋转功能时,一次购买可以获得的平均免费旋转次数
"""
def init_symbol_value():
return {
"3": 0.1,
"4": 0.4,
"5": 1,
"6": 1.5,
"7": 2,
}
class SugarRushAutoTuner:
def __init__(
self,
target_rtp: float = 96.5,
batch_spins=20000,
weights=None,
feature: str = "normal",
):
self.target_rtp = target_rtp
self.rows = 7
self.cols = 7
self.bet = 1.0
self.batch_spins = batch_spins
self.feature = feature
# 符号定义及其价值(价值越高,对 RTP 影响越大)
self.symbols_config = init_symbol_value()
# 初始权重
self.symbol_keys = list(self.symbols_config.keys())
if weights is not None:
self.weights = weights
else:
self.weights = {k: 20.0 for k in self.symbol_keys}
# 学习率 (控制调整速度,太小收敛慢,太大容易震荡)
self.learning_rate = 0.002 # 0.01
print(f"初始化自动调优器feature:{feature} 目标 RTP: {self.target_rtp}%")
def simulate_one_batch(self, spins: int = 20000):
begin_balance = 1_0000_0000
game = SugarRush1000(balance=begin_balance, weights=self.weights)
total_bet = 0.0
total_win = 0.0
for _ in range(spins):
# 执行旋转
res = game.doSpin()
if res["error"]:
break
# 统计数据
actual_cost = res["actual_bet"]
if actual_cost > 0:
total_bet += actual_cost
total_win += res["win"]
# 校验余额是否正确
assert (
f"{abs(begin_balance - game.balance):.2f}"
== f"{abs(total_bet - total_win):.2f}"
)
return (total_win / total_bet) * 100
def simulate_free_batch(self, spins: int = 20000, buy_type: str = "standard"):
game = SugarRush1000()
scotter_counts = list(self.weights.keys())
scotter_counts_weights = list(self.weights.values())
total_free_spins = 0
for _ in range(spins):
scatters_count = random.choices(
scotter_counts, weights=scotter_counts_weights, k=1
)[0]
total_free_spins += game._add_free_spins(int(scatters_count))
print(f"购买 {spins} 次免费旋转,实际免费旋转次数 {total_free_spins}")
return total_free_spins / spins
def tune(self, iterations: int = 50):
"""
迭代调整权重
"""
history = []
for i in range(iterations):
# 1. 模拟当前 RTP
current_rtp = 0
if self.feature == "normal":
current_rtp = self.simulate_one_batch(self.batch_spins)
else:
current_rtp = self.simulate_free_batch(self.batch_spins, self.feature)
error = current_rtp - self.target_rtp
history.append(current_rtp)
print(
f"迭代 {i+1}/{iterations} | RTP: {current_rtp:.4f}% | 目标: {self.target_rtp}% | 误差: {error:+.4f}%"
)
# 2. 检查是否收敛
if abs(error) < 0.1:
print(f"收敛成功!最终 RTP: {current_rtp:.4f}%")
break
# 3. 动态调整权重
# 调整策略:如果 RTP 太高,降低高价值符号的权重,提高低价值符号权重
# 调整量 = 误差 * 学习率 * 符号价值系数
# 防止初始误差过大导致权重崩塌
safe_error = max(min(error, 5.0), -5.0)
adjustment_factor = safe_error * self.learning_rate
# 防止权重变为负数
min_weight = 0
for sym in self.symbol_keys:
value = self.symbols_config[sym]
# 核心算法:
# 如果 RTP > Target (Error > 0),我们需要降低 RTP。
# 对于高价值符号 (value大),我们需要减小其权重。
# Adjustment 应该是负的。所以: - adjustment_factor * value
# 如果 RTP < Target (Error < 0),我们需要提高 RTP。
# 对于高价值符号,我们需要增加其权重。
# Adjustment 应该是正的。所以: - adjustment_factor * value (因为error是负的负负得正)
delta = -adjustment_factor * value * self.weights[sym]
# --- 优化:限制单次最大调整幅度 ---
# 防止某次调整幅度超过权重的 40%,给算法留点“喘息”空间
max_change_ratio = 0.4
if abs(delta) > self.weights[sym] * max_change_ratio:
delta = np.sign(delta) * self.weights[sym] * max_change_ratio
new_weight = self.weights[sym] + delta
# 限制最小权重,防止符号消失
if new_weight < min_weight:
new_weight = min_weight
if new_weight > 500:
new_weight = 500
self.weights[sym] = new_weight
return self.weights, history
# --- 运行自动调优 ---
def train_weights():
print("开始训练权重...")
# 设置随机种子
seed = 42
random.seed(seed)
np.random.seed(seed)
print(f"随机种子: {seed}")
fast = {"iterations": 100, "spins": 50_0000}
# middle = {"iterations": 15, "spins": 5_0000}
# slow = {"iterations": 10, "spins": 10_0000}
current = fast
configs = [
("fast", fast),
# ("middle", middle), ("slow", slow)
]
tuner = SugarRushAutoTuner(
target_rtp=11,
batch_spins=current["spins"],
weights={
"3": 7.300320151890674,
"4": 4.256044392311739,
"5": 0.8531843171577663,
"6": 0.21882648882051972,
"7": 0.05364159605552199,
},
feature="standard",
)
for name, config in configs:
begin_weights = tuner.weights.copy()
print(f"# {name}调优")
tuner.batch_spins = config["spins"]
final_weights, rtp_history = tuner.tune(iterations=config["iterations"])
print("\n=== 调优前的符号权重 ===")
for sym, w in begin_weights.items():
print(f"{sym}: {w:.2f}")
print("\n 符号出现概率 (%) ===")
for sym, w in begin_weights.items():
print(f"{sym}: {(w/sum(begin_weights.values()))*100:.2f}%")
print("\n=== 最终调整后的符号权重 ===")
print(json.dumps(final_weights, indent=4, ensure_ascii=False))
# 将权重转换为概率百分比
total_w = sum(final_weights.values())
print("\n=== 符号出现概率 (%) ===")
for sym, w in final_weights.items():
print(f"{sym}: {(w/total_w)*100:.2f}%")
def verify():
print("开始进行权重校验:")
for i in range(10):
tuner = SugarRushAutoTuner(
target_rtp=56,
batch_spins=10_0000,
weights={
"A": 12.048116942034044,
"B": 23.797750079057277,
"C": 38.01412853023891,
"D": 49.64407598502997,
"E": 59.15353799355775,
"F": 70.33559681987649,
"G": 83.45720386881379,
"S": 1.0979547602954776,
},
)
print(f"{i+1}/10 次校验:")
tuner.tune(iterations=1)
if __name__ == "__main__":
if len(sys.argv) >= 2:
verify()
else:
train_weights()

View File

@@ -0,0 +1,339 @@
import json
import random
import sys
import numpy as np
from typing import Dict, List
from SugarRush1000 import SugarRush1000
"""
符号的价值的含义是对RTP调整的影响程度。符号价值越高符号概率的调整对RTP的影响越大。比如当前RTP过高要降低RTP,算法优先降低高价值符号的概率提升低价值符号的概率。结果就是高价值符号出现频率降低直接拉动了RTP的降低低价值符号出现频率提升又提升了级联效果的流畅性。
所有符号的价值不需要考虑归一化,但是要考虑比例。
调整权重的公式deletaWeight = -(Error)x(LearningRate)x(SymbolsValue)x(CurrentWieght)
相对性原理:算法只关心“符号 A 的价值是符号 B 的多少倍”。如果 A 的价值是 2.0B 是 1.0。当RTP偏高时算法会让 A 的权重减少幅度是 B 的 2倍。
比例要合理不能差距过大否则调优过程非常不稳定。比如符号A Value=1000,符号B value=0.001, 会导致算法对符号A极其敏感稍微一点误差就会让A的权重剧烈震荡而符号B几乎完全不动。
每个符号的价值配置公式:
Value(symbol)约是BasePay(symbol)xVolatilityFactor
BasePay:是符号的期望赔率
VolatilityFactor:是符号的波动因子,人工设定的修正值。
* 对于Scotter:它没有基础赔付但是能触发免费旋转Scatter的潜在价值远高于普通符号所以它的Value应该设置得更高。
* 对于高赔率符号系数设为1.2~1.5。高赔率对RTP影响很大调整它们可以很快让RTP收敛。
* 对于低赔率符号系数设为0.8~1.0。它们主要贡献消除次数和触发级联效果但是本身赔率低对RTP的边际贡献较低
"""
fast = {"iterations": 30, "spins": 100}
middle = {"iterations": 15, "spins": 5_0000}
slow = {"iterations": 10, "spins": 10_0000}
config = {
"scotter_count_weights": {
"3": 7.26421894353717,
"4": 4.1724734692682395,
"5": 0.8119106579617028,
"6": 0.20313929837878217,
"7": 0.04857599989214818,
},
"target_rtp": 85,
"values": {
"S": 0.5,
"A": 0.4,
"B": 0.6,
"C": 0.8,
"D": 1,
"E": 1.2,
"F": 1.4,
"G": 0.6,
},
"weights": {
"A": 19.015957779792195,
"B": 21.291015318701493,
"C": 31.66660200727613,
"D": 35.193596023259865,
"E": 48.7122724047052,
"F": 64.49005324700025,
"G": 21.291015318701493,
"S": 2.6840958157151236,
},
"iterators": [
{"name": "fast", "iterations": 30, "spins": 10000},
],
"feature": "standard",
}
class SugarRushAutoTuner:
def __init__(
self,
target_rtp: float = 96.5,
values=None,
weights=None,
scotter_count_weights=None,
feature: str = "normal",
):
self.target_rtp = target_rtp
self.rows = 7
self.cols = 7
self.bet = 1.0
self.feature = feature
if scotter_count_weights:
self.scotter_count_weights = scotter_count_weights
# 符号定义及其价值(价值越高,对 RTP 影响越大)
self.symbols_config = values
# 初始权重
self.symbol_keys = list(self.symbols_config.keys())
if weights is not None:
self.weights = weights
else:
self.weights = {k: 20.0 for k in self.symbol_keys}
# 学习率 (控制调整速度,太小收敛慢,太大容易震荡)
self.learning_rate = 0.01 # 0.002
print(f"初始化自动调优器feature:{feature} 目标 RTP: {self.target_rtp}%")
def simulate_one_batch(self, spins: int = 20000):
begin_balance = 1_0000_0000
game = SugarRush1000(
balance=begin_balance,
weights=self.weights,
scotter_counts_weights=self.scotter_count_weights,
)
total_bet = 0.0
total_win = 0.0
total_scotter = 0
total_has_scotter = 0
for _ in range(spins):
# 执行旋转
res = game.doSpin()
if res["error"]:
break
# 统计数据
actual_cost = res["actual_bet"]
if actual_cost > 0:
total_bet += actual_cost
if res["is_scotter"]:
total_scotter += 1
has_scotter = res["is_scotter"]
if not has_scotter:
if has_scotter:
break
for row in res["grid"]:
for symbol in row:
if symbol == "S":
has_scotter = True
break
if has_scotter:
total_has_scotter += 1
total_win += res["win"]
# 校验余额是否正确
assert (
f"{abs(begin_balance - game.balance):.2f}"
== f"{abs(total_bet - total_win):.2f}"
)
print(
f"旋转{spins}scotter {total_scotter}次, containe scotter {total_has_scotter}"
)
return (total_win / total_bet) * 100
def simulate_free_batch(self, spins: int = 20000, buy_type: str = "standard"):
begin_balance = 1_0000_0000
game = SugarRush1000(
balance=begin_balance,
weights=self.weights,
scotter_counts_weights=self.scotter_count_weights,
)
total_bet = 0.0
total_win = 0.0
total_spins = 0
total_free_spins = 0
total_scotter = 0
for _ in range(spins):
r = game.buy_free_spins(buy_type)
total_bet += r["cost"]
# 执行旋转
score = 0
can_spins = 1
while can_spins > 0:
can_spins -= 1
total_spins += 1
res = game.doSpin()
if res["error"]:
break
# 统计数据
score += res["win"]
total_win += res["win"]
if res["is_scotter"]:
total_scotter += 1
total_free_spins += res["added_spins"]
if res["free_spins_remaining"] >= 0:
can_spins = res["free_spins_remaining"]
if score != res["spin_total_win"]:
print(
"total_win != res[spin_total_win]", total_win, res["spin_total_win"]
)
assert score == res["spin_total_win"]
# 校验余额是否正确
assert (
f"{abs(begin_balance - game.balance):.2f}"
== f"{abs(total_bet - total_win):.2f}"
)
return (total_win / total_bet) * 100
def tune(self, iterations: int = 50, batch_spins: int = 20000):
"""
迭代调整权重
"""
history = []
for i in range(iterations):
# 1. 模拟当前 RTP
current_rtp = 0
if self.feature == "normal":
current_rtp = self.simulate_one_batch(batch_spins)
else:
current_rtp = self.simulate_free_batch(batch_spins, self.feature)
error = current_rtp - self.target_rtp
history.append(current_rtp)
print(
f"迭代 {i+1}/{iterations} | RTP: {current_rtp:.4f}% | 目标: {self.target_rtp}% | 误差: {error:+.4f}%"
)
# 2. 检查是否收敛
if abs(error) < 3:
print(f"收敛成功!最终 RTP: {current_rtp:.4f}%")
break
# 3. 动态调整权重
# 调整策略:如果 RTP 太高,降低高价值符号的权重,提高低价值符号权重
# 调整量 = 误差 * 学习率 * 符号价值系数
# 防止初始误差过大导致权重崩塌
safe_error = max(min(error, 5.0), -5.0)
adjustment_factor = safe_error * self.learning_rate
# 防止权重变为负数
min_weight = 0
for sym in self.symbol_keys:
value = self.symbols_config[sym]
# 核心算法:
# 如果 RTP > Target (Error > 0),我们需要降低 RTP。
# 对于高价值符号 (value大),我们需要减小其权重。
# Adjustment 应该是负的。所以: - adjustment_factor * value
# 如果 RTP < Target (Error < 0),我们需要提高 RTP。
# 对于高价值符号,我们需要增加其权重。
# Adjustment 应该是正的。所以: - adjustment_factor * value (因为error是负的负负得正)
delta = -adjustment_factor * value * self.weights[sym]
# --- 优化:限制单次最大调整幅度 ---
# 防止某次调整幅度超过权重的 40%,给算法留点“喘息”空间
max_change_ratio = 0.4
if abs(delta) > self.weights[sym] * max_change_ratio:
delta = np.sign(delta) * self.weights[sym] * max_change_ratio
new_weight = self.weights[sym] + delta
# 限制最小权重,防止符号消失
if new_weight < min_weight:
new_weight = min_weight
if new_weight > 500:
new_weight = 500
self.weights[sym] = new_weight
return self.weights, history
# --- 运行自动调优 ---
def train_weights():
print("开始训练权重...")
# 设置随机种子
seed = random.randint(1, 1000)
random.seed(seed)
np.random.seed(seed)
print(f"随机种子: {seed}")
print(f"符号价值: ")
print(json.dumps(config["values"], indent=4, ensure_ascii=False))
tuner = SugarRushAutoTuner(
target_rtp=config["target_rtp"],
values=config["values"],
weights=config["weights"],
feature=config["feature"],
scotter_count_weights=config["scotter_count_weights"],
)
for t in config["iterators"]:
name, iterations, spins = t.values()
begin_weights = tuner.weights.copy()
print(f"# {name}调优")
final_weights, rtp_history = tuner.tune(
iterations=iterations, batch_spins=spins
)
print("\n=== 调优前的符号权重 ===")
for sym, w in begin_weights.items():
print(f"{sym}: {w:.2f}")
print("\n 符号出现概率 (%) ===")
for sym, w in begin_weights.items():
print(f"{sym}: {(w/sum(begin_weights.values()))*100:.2f}%")
print("\n=== 最终调整后的符号权重 ===")
print(json.dumps(final_weights, indent=4, ensure_ascii=False))
# 将权重转换为概率百分比
total_w = sum(final_weights.values())
print("\n=== 符号出现概率 (%) ===")
for sym, w in final_weights.items():
print(f"{sym}: {(w/total_w)*100:.2f}%")
def verify():
print("开始进行权重校验:")
for i in range(10):
tuner = SugarRushAutoTuner(
target_rtp=config["target_rtp"],
values=config["values"],
weights={
"A": 18.18442161117576,
"B": 19.994169181485578,
"C": 29.28572430711806,
"D": 32.143058029000244,
"E": 44.060855546850874,
"F": 57.93237542185442,
"G": 19.994169181485578,
"S": 2.5426663862665424,
},
scotter_count_weights=config["scotter_count_weights"],
feature="super",
)
print(f"{i+1}/10 次校验:")
tuner.tune(iterations=1, batch_spins=10000)
if __name__ == "__main__":
if len(sys.argv) >= 2:
verify()
else:
train_weights()

View File

@@ -0,0 +1,173 @@
import json
import random
from typing import List, Dict, Tuple, Optional
import time
from SugarRush1000 import SugarRush1000
"""
此程序主要模拟用户使用过程中的体验指标
返奖率 RTP = 总赢 / 总投注
中奖率/级联率
平均多久一次获取一次连线 = 带连线的旋转次数 / 总旋转次数
免费旋转率
平均多久一次获取一次免费旋转 = 命中免费旋转次数 / 总旋转次数(不算免费旋转)
平均免费旋转次数 = 总免费旋转次数 / 总旋转次数
"""
class SugarRushSimulator:
def __init__(self, buy_type: Optional[str] = None):
self.buy_type = buy_type
self.f = open(
f"simulator_log_{'normal' if buy_type is None else buy_type}_{time.time()}.log",
"w",
)
def simulate_batch(self, weights, scotter_counts_weights, spins: int) -> Dict:
"""
模拟指定次数的旋转
:param spins: 模拟次数
:param buy_type: None=普通混合模式, 'standard'=只测买普通, 'super'=只测买超级
:return: 统计结果
"""
begin_balance = 1_0000_0000
game = SugarRush1000(
balance=begin_balance,
weights=weights,
scotter_counts_weights=scotter_counts_weights,
)
total_bet = 0.0
total_win = 0.0
game_spins = []
for index in range(spins):
spin = {
"gid": index + 1,
"score": 0,
"count": 0,
"feature": "normal" if self.buy_type is None else self.buy_type,
"extra_free": 0,
"steps": [],
}
game_spins.append(spin)
if self.buy_type and self.buy_type != "normal":
r = game.buy_free_spins(self.buy_type)
total_bet += r["cost"]
can_spins = 1
free_spind_id = 0
aes = 0
while can_spins > 0:
can_spins -= 1
res = game.doSpin()
game.mock_grid = None
if res["error"]:
raise Exception("模拟器异常", res["error"])
if res["free_spins_remaining"] >= 0:
can_spins = res["free_spins_remaining"]
actual_cost = res["actual_bet"]
if actual_cost > 0:
total_bet += actual_cost
free_spind_id += 1
for step in res["steps"]:
step["gid"] = spin["gid"]
step["free_spin_id"] = free_spind_id
step["aes"] = aes
aes += 1
spin["steps"].append(step)
if res["extra_free"]:
spin["extra_free"] += 1
spin["score"] = res["spin_total_win"]
spin["count"] = len(spin["steps"])
total_win += spin["score"]
if len(game_spins) >= 100:
self.write_log(game_spins)
game_spins = []
if len(game_spins) > 0:
self.write_log(game_spins)
game_spins = []
print(f"旋转{spins}RTP {(total_win / total_bet) * 100}")
return {}
def write_log(self, games):
buf = ""
for game in games:
buf += json.dumps(game, ensure_ascii=False) + "\n"
self.f.write(buf)
self.f.flush()
if __name__ == "__main__":
scotter_count_weights = {
"3": 7.26421894353717,
"4": 4.1724734692682395,
"5": 0.8119106579617028,
"6": 0.20313929837878217,
"7": 0.04857599989214818,
}
batchs = [
# {
# "buy_type": "normal",
# "spins": 20000,
# "weights": {
# "A": 16.559326336631095,
# "B": 17.242437197138468,
# "C": 23.794844051571708,
# "D": 24.480364769888073,
# "E": 31.29327382410323,
# "F": 38.17160528628571,
# "G": 17.242437197138468,
# "S": 2.2546844695944754,
# },
# "scotter_count_weights": scotter_count_weights,
# },
{
"buy_type": "standard",
"spins": 5000,
"weights": {
"A": 18.92167765262542,
"B": 21.106974838197058,
"C": 31.250953223681833,
"D": 34.54623003651251,
"E": 47.52175529241897,
"F": 62.474728710698884,
"G": 21.106974838197058,
"S": 2.666109542063759,
},
"scotter_count_weights": scotter_count_weights,
},
# {
# "buy_type": "super",
# "spins": 5000,
# "weights": {
# "A": 19.015957779792195,
# "B": 21.291015318701493,
# "C": 31.66660200727613,
# "D": 35.193596023259865,
# "E": 48.7122724047052,
# "F": 64.49005324700025,
# "G": 21.291015318701493,
# "S": 2.6840958157151236,
# },
# "scotter_count_weights": scotter_count_weights,
# },
]
for batch in batchs:
sim = SugarRushSimulator(buy_type=batch["buy_type"])
sim.simulate_batch(
batch["weights"], batch["scotter_count_weights"], batch["spins"]
)

View File

@@ -0,0 +1,112 @@
import json
import random
from typing import List, Dict, Tuple, Optional
import time
import requests
"""
此程序主要模拟用户真实使用过程中的RTP值
返奖率 RTP = 总赢 / 总投注
"""
class SugarRushSimulator:
def __init__(self):
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Compatible; LongConnectionClient/1.0)',
'Connection': 'keep-alive' # 显式告诉服务器保持连接
}
response = self.session.post("http://localhost:8081/api/login", json={'username': '19156010758', 'password': '123456'})
data = self.do_result(response)
if data is not None:
self.session.headers.update({'Authorization': f"Bearer {data['token']}"})
def do_result(self, response):
if response.status_code == 200:
resp = json.loads(response.text)
if resp['code'] == 200:
if 'data' in resp:
return resp['data']
else:
return resp
else:
print(f"发生错误: {resp}")
return None
else:
print(f"请求发生错误: {response.status_code}")
def buy_free_spins(self,bet, buy_type: str):
response = self.session.get(f"http://localhost:8081/api/sugar/buy_free_spins?bet={bet}&kind={buy_type}", headers=self.headers, timeout=(5, 10))
return self.do_result(response)
def do_spin(self, bet):
response = self.session.get(f"http://localhost:8081/api/sugar/dospin?bet={bet}", headers=self.headers, timeout=(5, 10))
return self.do_result(response)
def simulate_batch(self, spins: int, buy_type: Optional[str] = None) -> Dict:
"""
模拟指定次数的旋转
:param spins: 模拟次数
:param buy_type: None=普通混合模式, 'standard'=只测买普通, 'super'=只测买超级
:return: 统计结果
"""
bet = 1
total_bet = 0.0
total_win = 0.0
for _ in range(spins):
if buy_type and buy_type != "normal":
r = self.buy_free_spins(bet, buy_type)
if r:
total_bet += float(r.get("cost", 0))
else:
total_bet += bet
spin_win = 0
can_spins = True
while can_spins > 0:
res = self.do_spin(bet)
if len(res['data']) == 0:
can_spins = False
else:
spin_win = res['data'][-1]['tw']
total_win += float(spin_win)
print(f"旋转{spins} 次, 总下注 {total_bet} 总盈利 {total_win} RTP {(total_win / total_bet) * 100}")
return {}
if __name__ == "__main__":
batchs = [
# {
# "buy_type": "normal",
# "spins": 100,
# },
# {
# "buy_type": "standard",
# "spins": 20,
# },
# {
# "buy_type": "super",
# "spins": 20,
# },
]
for batch in batchs:
sim = SugarRushSimulator()
start_time = time.time() # 记录开始时间
sim.simulate_batch(
batch["spins"],
batch['buy_type']
)
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time
print(f"Batch {batch['buy_type']} took {elapsed_time:.4f} seconds") # 打印耗时

View File

@@ -0,0 +1,120 @@
import unittest
from SugarRush1000 import SugarRush1000
def no_cluster_grid():
return [
(
["B" if col % 2 == 0 else "E" for col in range(7)]
if row % 2 == 0
else ["E" if col % 2 == 0 else "B" for col in range(7)]
)
for row in range(7)
]
def print_mid_result(tag, res):
print(f"\n======={tag}=========")
for index, grid in enumerate(res["middle_grids"]):
print(f"step{index+1}---")
print(f"倍数点数")
print(" ",end="")
for i in range(7):
print(f"{i} ",end=" ")
print()
for i,row in enumerate(res["middle_multipliers"][index]):
print(f"{i} {row}")
print(f"消除符号")
for symbol in res["middle_grid_remove_symbols"][index]:
print(symbol)
print(f"score:{res['middle_score'][index]}")
for row in grid:
print(row)
class TestSugarRushLogic(unittest.TestCase):
def setUp(self):
self.bet = 1.0
def test_basic_cascade_and_drop(self):
# 0. 准备数据:构造一个即将消除的场景
# 假设中间一行全是 'A',且数量 >=5
# 为了演示,我们假设 rows=7, cols=7手动构造特定列
# 构造一个特定网格第3行全是 A (7个),其他是 B
custom_grid = no_cluster_grid()
custom_grid[3] = ["A"] * 7
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
game.drop_sequence = ["C", "C", "C", "C", "A", "A", "A"] # 预设掉落全是 C
# 1. 执行 Spin
result = game.doSpin()
# 2. 断言验证
self.assertEqual(result["win"], 1.75)
# 验证网格状态第3行的 A 应该消失,顶部的 B 应该掉下来,顶部变成 C
# 原第3行变空上方的第0,1,2行掉落填补顶部第0行填入新符号 C
# 所以最终第0行应该是 C第3行应该是原来的第2行 (B)
self.assertEqual(result["grid"][0][0], "C") # 顶部掉入了新符号
self.assertEqual(result["grid"][3][0], "B") # 原本上面的 B 掉下来了
print("✅ 测试通过:基础翻转与掉落逻辑正常")
def test_multiplier_mechanic(self):
# 准备数据:测试乘数翻倍
custom_grid = no_cluster_grid()
custom_grid[0] = ["A"] * 7 # 设置一个乘数点
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
game.drop_sequence = ["C", "C", "C", "C", "C", "C", "C",
"C", "C", "C", "C", "C", "C", "C",
"C", "A", "B", "C", "D", "A", "B"] # 预设掉落全是 C
result = game.doSpin()
self.assertEqual(result["middle_multipliers"][3][0][0], 4)
self.assertEqual(result["win"], 16.75)
print("✅ 测试通过:乘数翻倍逻辑正常")
def test_scatter_trigger(self):
# 0. 准备数据:放置 3 个 Scatter
custom_grid = no_cluster_grid()
custom_grid[0][0] = "S"
custom_grid[1][1] = "S"
custom_grid[2][2] = "S"
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
result = game.doSpin()
# 1. 断言
self.assertEqual(result["free_spins_remaining"], 10) # 3个 Scatter 应得 10 次
self.assertTrue("触发免费旋转" in result["message"])
print("✅ 测试通过Scatter 触发逻辑正常")
def test_max_win_cap(self):
# 0. 准备数据:构造一个巨大的赢利场景
# 比如很多高价值符号,且带有高倍乘数
custom_grid = no_cluster_grid()
custom_grid[0] = ["A"] * 7 # 一行A
custom_grid[1][0] = "S" # 10次免费旋转但是达到上限后应该清0
custom_grid[2][1] = "S"
custom_grid[3][2] = "S"
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
game.drop_sequence = ["A" for _ in range(7) for _ in range(11)]
result = game.doSpin()
# 1. 断言
max_win = 25000 * self.bet
# 即使全屏超高赔率,赢利也不能超过 25000
self.assertLessEqual(result["win"], max_win)
if result["win"] >= max_win:
self.assertEqual(result["free_spins_remaining"], 0) # 达到上限应结束
print("✅ 测试通过:最大赢奖上限逻辑正常")
if __name__ == "__main__":
unittest.main(argv=["first-arg-is-ignored"], exit=False)

View File

@@ -0,0 +1,114 @@
import random
import unittest
from SugarRush1000 import SugarRush1000
def no_cluster_grid():
return [
(
["B" if col % 2 == 0 else "E" for col in range(7)]
if row % 2 == 0
else ["E" if col % 2 == 0 else "B" for col in range(7)]
)
for row in range(7)
]
def print_mid_result(tag, res):
print(f"\n======={tag}=========")
for index, step in enumerate(res["steps"]):
print(f"step{index+1}---")
print(f"倍数点数 {step['multipler']}")
print(f"消除符号")
for symbol in step["symbol_links"]:
print(symbol)
print(f"score:{step['score']} ")
for index, row in enumerate(step["grid"]):
if index % 7 == 0 and index != 0:
print()
print(row, end=" ")
print()
class TestSugarRushLogic(unittest.TestCase):
def setUp(self):
self.bet = 1.0
def test_row_cascade_and_drop(self):
# 0. 准备数据:构造一个即将消除的场景
custom_grid = no_cluster_grid()
custom_grid[3] = ["A"] * 7
# 假设 A 的赔付是 1.0
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
game.drop_sequence = ["C", "C", "C", "C", "A", "A", "A"] # 预设掉落全是 C
# 1. 执行 Spin
result = game.doSpin()
# print_mid_result("test_row_cascade_and_drop", result)
# 2. 断言验证
self.assertEqual(result["win"], 1.75)
self.assertEqual(result["grid"][0][0], "C") # 顶部掉入了新符号
self.assertEqual(result["grid"][3][0], "B") # 原本上面的 B 掉下来了
print("✅ 测试通过:基础翻转与行掉落逻辑正常")
def test_col_cascade_and_drop(self):
# 0. 准备数据:构造一个即将消除的场景
custom_grid = no_cluster_grid()
for row in range(7):
custom_grid[row][3] = "A"
# 假设 A 的赔付是 1.0
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
game.drop_sequence = ["C", "C", "C", "C", "A", "A", "A"] # 预设掉落全是 C
# 1. 执行 Spin
result = game.doSpin()
# print_mid_result("test_row_cascade_and_drop", result)
# 2. 断言验证
self.assertEqual(result["win"], 1.75)
self.assertEqual(result["grid"][0][3], "C")
self.assertEqual(result["grid"][1][3], "C")
self.assertEqual(result["grid"][2][3], "C")
self.assertEqual(result["grid"][3][3], "C")
self.assertEqual(result["grid"][4][3], "A")
self.assertEqual(result["grid"][5][3], "A")
self.assertEqual(result["grid"][6][3], "A")
print("✅ 测试通过:基础翻转与列掉落逻辑正常")
def test_row_col_cascade_and_drop(self):
random.seed(42)
# 0. 准备数据:构造一个即将消除的场景
custom_grid = no_cluster_grid()
for row in range(3):
custom_grid[row + 2][3] = "A"
for col in range(3):
custom_grid[3][2 + col] = "A"
# 假设 A 的赔付是 1.0
game = SugarRush1000(bet=self.bet, mock_grid=custom_grid)
game.drop_sequence = ["C", "C", "C", "C", "A"] # 预设掉落全是 C
# 1. 执行 Spin
result = game.doSpin()
# print_mid_result("test_row_col_cascade_and_drop", result)
# 2. 断言验证
self.assertEqual(result["win"], 19.1)
print("✅ 测试通过:基础翻转与行列掉落逻辑正常")
if __name__ == "__main__":
unittest.main(argv=["first-arg-is-ignored"], exit=False)

View File

@@ -0,0 +1,126 @@
import random
import unittest
from SugarRush1000 import SugarRush1000
def no_cluster_grid():
return [
(
["B" if col % 2 == 0 else "E" for col in range(7)]
if row % 2 == 0
else ["E" if col % 2 == 0 else "B" for col in range(7)]
)
for row in range(7)
]
def print_mid_result(tag, res):
print(f"\n======={tag}=========")
for index, grid in enumerate(res["middle_grids"]):
print(f"step{index+1}---")
print(f"倍数点数")
print(" ",end="")
for i in range(7):
print(f"{i} ",end=" ")
print()
for i,row in enumerate(res["middle_multipliers"][index]):
print(f"{i} {row}")
print(f"消除符号")
for symbol in res["middle_grid_remove_symbols"][index]:
print(symbol)
print(f"score:{res['middle_score'][index]}")
for row in grid:
print(row)
class TestSugarRushLogic(unittest.TestCase):
def setUp(self):
self.bet = 1.0
def test_normal_free_spin_s3(self):
# 0. 准备数据:构造一个即将消除的场景
custom_grid = no_cluster_grid()
custom_grid[0][0] = "S"
custom_grid[1][1] = "S"
custom_grid[2][2] = "S"
game = SugarRush1000(balance=1000, bet=self.bet, mock_grid=custom_grid)
drop_symbol = []
for i in range(20):
grids = no_cluster_grid()
[drop_symbol.append(grids[r][c]) for c in range(7) for r in range(7)]
game.drop_sequence = drop_symbol
# 1. 触发freeSpin
result = game.doSpin()
game.mock_grid = None
self.assertEqual(result["balance"], 999)
self.assertEqual(result["free_spins_remaining"], 10)
# 2. 使用freeSpin
free_spins = result["free_spins_remaining"]
for i in range(free_spins):
result = game.doSpin()
self.assertEqual(result["balance"], 999)
self.assertEqual(result["free_spins_remaining"], free_spins - i - 1)
# 3. 回归正常spin
result = game.doSpin()
self.assertEqual(result["balance"], 998)
self.assertEqual(result["free_spins_remaining"], 0)
print("✅ 测试通过普通freeSpin正常")
def test_buy_standard_free_spin(self):
custom_grid = no_cluster_grid()
balance = 1000
game = SugarRush1000(balance=balance, bet=self.bet, mock_grid=custom_grid)
# 1. 购买
result = game.buy_free_spins("standard")
self.assertEqual(result["balance"], balance - 100 * game.bet)
self.assertEqual(result["is_buy_free"], True)
self.assertEqual(result["is_super_free"], False)
self.assertEqual(game.free_spins, 0)
# 2. 使用
result = game.doSpin()
self.assertGreaterEqual(game.free_spins, 10)
self.assertEqual(game.is_buy_free, False)
self.assertEqual(game.is_super_free, False)
self.assertEqual(game.multipliers[0][0], 0)
print("✅ 测试通过:购买普通免费旋转正常")
def test_buy_super_free_spin(self):
custom_grid = no_cluster_grid()
balance = 1000
game = SugarRush1000(balance=balance, bet=self.bet, mock_grid=custom_grid)
# 1. 购买
result = game.buy_free_spins("super")
self.assertEqual(result["balance"], balance - 500 * game.bet)
self.assertEqual(result["is_buy_free"], True)
self.assertEqual(result["is_super_free"], True)
self.assertEqual(game.free_spins, 0)
# 2. 使用
result = game.doSpin()
self.assertGreater(game.free_spins, 10)
self.assertEqual(game.is_buy_free, False)
self.assertEqual(game.is_super_free, True)
self.assertEqual(game.multipliers[0][0], 0)
# 进入super free spin
result = game.doSpin()
self.assertGreater(game.free_spins, 9)
self.assertEqual(game.is_buy_free, False)
self.assertEqual(game.is_super_free, False)
self.assertEqual(result["steps"][0]["multipler"][1], 2)
print("✅ 测试通过:购买超级免费旋转正常")
if __name__ == "__main__":
unittest.main(argv=["first-arg-is-ignored"], exit=False)

View File

@@ -0,0 +1,46 @@
import unittest
from SugarRush1000 import SugarRush1000
def checkGrid(grid):
total_scotter_count = 0
for col in range(7):
scotter_count = 0
for row in range(7):
if grid[row][col] == "S":
scotter_count += 1
total_scotter_count += 1
if scotter_count > 1:
print(f"col{col} has {scotter_count} scotter")
return False
if total_scotter_count > 7:
print(f"total scotter count {total_scotter_count} > 7")
return False
return True
class TestSugarRushLogic(unittest.TestCase):
def test_scotter_rule(self):
game = SugarRush1000(balance=100000, bet=1)
# 1. 触发
for _ in range(10000):
result = game.doSpin()
assert checkGrid(result['grid'])
print("✅ 测试通过生成scotter规则正常")
def test_free_spin_scotter_rule(self):
game = SugarRush1000(balance=100000, bet=1)
for _ in range(10000):
game.buy_free_spins('standard')
result = game.doSpin()
assert checkGrid(result['grid'])
print("✅ 测试通过免费旋转scotter规则正常")
if __name__ == "__main__":
unittest.main(argv=["first-arg-is-ignored"], exit=False)

View File

@@ -0,0 +1,3 @@
"""
测试免费旋转过程中,同个位置的倍数应该是连续增长的
"""

View File

@@ -0,0 +1,51 @@
import unittest
from SugarRush1000 import SugarRush1000
def checkGrid(grid):
total_scotter_count = 0
for col in range(7):
scotter_count = 0
for row in range(7):
if grid[row][col] == "S":
scotter_count += 1
total_scotter_count += 1
if scotter_count > 1:
print(f"col{col} has {scotter_count} scotter")
return False
if total_scotter_count > 7:
print(f"total scotter count {total_scotter_count} > 7")
return False
return True
class TestSugarRushLogic(unittest.TestCase):
def test_same_symbol_clusters(self):
mock_grid = [
['A','E','E','A','B','A','F'],
['E','E','E','B','C','D','E'],
['E','E','S','C','A','A','A'],
['A','B','A','D','E','B','B'],
['B','E','E','A','E','E','A'],
['E','C','D','E','B','E','B'],
['A','F','E','A','A','E','A'],
]
game = SugarRush1000(balance=100000, bet=1, mock_grid=mock_grid)
# 1. 触发
import json
result = game.doSpin()
# print(json.dumps(result))
assert len(result['steps'][0]['symbol_links']) == 2
assert result['steps'][0]['symbol_links'][0]['symbol'] == 'E'
assert result['steps'][0]['symbol_links'][1]['symbol'] == 'E'
print("✅ 测试通过同个符号多个cluster消除正常")
if __name__ == "__main__":
unittest.main(argv=["first-arg-is-ignored"], exit=False)

View File

@@ -0,0 +1,137 @@
symbol_pay_table = {
"A": {
"max_cascade": 15,
"15+": 150.0,
"14": 70.0,
"13": 35.0,
"12": 15.0,
"11": 7.5,
"10": 5.0,
"9": 2.5,
"8": 2.0,
"7": 1.75,
"6": 1.5,
"5": 1.0,
},
"B": {
"max_cascade": 15,
"15+": 100.0,
"14": 60.0,
"13": 30.0,
"12": 12.5,
"11": 6.0,
"10": 4.0,
"9": 2.0,
"8": 1.5,
"7": 1.25,
"6": 1.0,
"5": 0.75,
},
"C": {
"max_cascade": 15,
"15+": 60.0,
"14": 40.0,
"13": 20.0,
"12": 10.0,
"11": 4.5,
"10": 3.0,
"9": 1.5,
"8": 1.25,
"7": 1.0,
"6": 0.75,
"5": 0.5,
},
"D": {
"max_cascade": 15,
"15+": 40.0,
"14": 20.0,
"13": 10.0,
"12": 5.0,
"11": 3.0,
"10": 2.0,
"9": 1.25,
"8": 1.0,
"7": 0.75,
"6": 0.5,
"5": 0.4,
},
"E": {
"max_cascade": 15,
"15+": 30.0,
"14": 15.0,
"13": 8.0,
"12": 3.5,
"11": 2.5,
"10": 1.5,
"9": 1.0,
"8": 0.75,
"7": 0.5,
"6": 0.4,
"5": 0.3,
},
"F": {
"max_cascade": 15,
"15+": 25.0,
"14": 12.0,
"13": 6.0,
"12": 3.0,
"11": 2.0,
"10": 1.25,
"9": 0.75,
"8": 0.5,
"7": 0.4,
"6": 0.3,
"5": 0.25,
},
"G": {
"max_cascade": 15,
"15+": 20.0,
"14": 10.0,
"13": 5.0,
"12": 2.5,
"11": 1.5,
"10": 1.0,
"9": 0.5,
"8": 0.4,
"7": 0.3,
"6": 0.25,
"5": 0.2,
},
}
def get_symbol_pay(symbol: str, cluster_size: int) -> float:
if symbol not in symbol_pay_table:
raise ValueError(f"Unknown symbol: {symbol}")
"""根据符号和连锁大小获取赔付倍数"""
if cluster_size >= symbol_pay_table[symbol]["max_cascade"]:
return symbol_pay_table[symbol][f'{symbol_pay_table[symbol]["max_cascade"]}+']
elif str(cluster_size) in symbol_pay_table[symbol]:
return symbol_pay_table[symbol][str(cluster_size)]
else:
return 0.0
if __name__ == "__main__":
reward = set()
for symbol in symbol_pay_table:
for key in symbol_pay_table[symbol]:
if key != "max_cascade":
reward.add(symbol_pay_table[symbol][key])
reward = list(reward)
sorted_reward = sorted(reward)
print(sorted_reward)
max_score=2000
a = 0
b=0
c=0
max_a = max_score/0.2
max_b = max_score/0.25
max_c = max_score/0.3
print(f"{max_a}*{max_b}*{max_c}/1000000={max_a*max_b*max_c/1000000}")
# avaiable_reward = a*0.2 + b*0.25 + c*0.3

View File

@@ -0,0 +1,40 @@
symbol_pay_table = """
15+|3000|2000|1200|800|600|500|400
14|1400|1200|800|400|300|240|200
13|700|600|400|200|160|120|100
12|300|250|200|100|70|60|50
11|150|120|90|60|50|40|30
10|100|80|60|40|30|25|20
9|50|40|30|25|20|15|10
8|40|30|25|20|15|10|8
7|35|25|20|15|10|8|6
6|30|20|15|10|8|6|5
5|20|15|10|8|6|5|4
"""
cascade = [line for line in symbol_pay_table.split("\n") if line.strip()]
cascade = [[item for item in line.split("|") if item.strip() ] for line in cascade]
cols = len(cascade[0]) - 1
base_bet = 20
mapping = {0:'A',1:'B', 2:'C', 3:'D', 4:'E', 5:'F', 6:'G'}
result = {}
for c in range(cols):
result[mapping[c]] = {}
for pay in cascade:
if '+' in pay[0]:
result[mapping[c]]['max_cascade'] = int(pay[0][0:-1])
result[mapping[c]][pay[0]] = int(pay[c+1]) / 20
# print(f"symbol{c} cas{pay[0]} pay:{pay[c+1]}")
import json
print(json.dumps(result, indent=2, ensure_ascii=False))
# 反计算回来校验
# for symbol in result:
# for key in result[symbol]:
# if key != 'max_cascade':
# print(f"{result[symbol][key] * base_bet}", end="\t")
# print()

View File

@@ -0,0 +1,218 @@
from time import time
import json
import random
from typing import List, Dict, Tuple, Optional
import time
from SugarRush1000 import SugarRush1000
class SugarRushSimulator:
def __init__(self, buy_type: Optional[str] = None):
self.buy_type = buy_type
self.f = open(
f"simulator_log_{'normal' if buy_type is None else buy_type}_{time.time()}.log",
"w",
)
def check_target_score(self, score, target_scores):
for i in range(len(target_scores)):
if score[i] < target_scores[i][2]:
return False
return True
def getIndex(self, score, target_scores):
for i in range(len(target_scores)):
if score > target_scores[i][0] and score <= target_scores[i][1]:
return i
return -1
def simulate_batch(
self, weights, scotter_counts_weights, target_scores: List[Tuple[int, int, int]]
) -> Dict:
"""
模拟指定次数的旋转
:param target_scores: 模拟目标分数范围
:param buy_type: None=普通混合模式, 'standard'=只测买普通, 'super'=只测买超级
:return: 统计结果
"""
begin_balance = 1_0000_0000
game = SugarRush1000(
balance=begin_balance,
weights=weights,
scotter_counts_weights=scotter_counts_weights,
)
total_bet = 0.0
total_win = 0.0
game_spins = []
index = 0
real_score_counts = [0 for _ in target_scores]
while True:
if self.check_target_score(real_score_counts, target_scores):
break
spin = {
"gid": index + 1,
"score": 0,
"count": 0,
"feature": "normal" if self.buy_type is None else self.buy_type,
"extra_free": 0,
"steps": [],
}
bet = 0
if self.buy_type and self.buy_type != "normal":
r = game.buy_free_spins(self.buy_type)
bet = r["cost"]
can_spins = 1
free_spind_id = 0
aes = 0
while can_spins > 0:
can_spins -= 1
res = game.doSpin()
game.mock_grid = None
if res["error"]:
raise Exception("模拟器异常", res["error"])
if res["free_spins_remaining"] >= 0:
can_spins = res["free_spins_remaining"]
actual_cost = res["actual_bet"]
if actual_cost > 0:
bet = actual_cost
free_spind_id += 1
for step in res["steps"]:
step["gid"] = spin["gid"]
step["free_spin_id"] = free_spind_id
step["aes"] = aes
aes += 1
spin["steps"].append(step)
if res["extra_free"]:
spin["extra_free"] += 1
spin["score"] = res["spin_total_win"]
spin["count"] = len(spin["steps"])
range_index = self.getIndex(spin["score"], target_scores)
if (
range_index < 0
or real_score_counts[range_index] >= target_scores[range_index][2]
):
continue
real_score_counts[range_index] += 1
total_win += spin["score"]
total_bet += bet
game_spins.append(spin)
if len(game_spins) >= 100:
self.write_log(game_spins)
game_spins = []
index += 1
if len(game_spins) > 0:
self.write_log(game_spins)
game_spins = []
print(f"旋转{index}RTP {(total_win / total_bet) * 100}")
return {}
def write_log(self, games):
buf = ""
for game in games:
buf += json.dumps(game, ensure_ascii=False) + "\n"
self.f.write(buf)
self.f.flush()
if __name__ == "__main__":
scotter_count_weights = {
"3": 7.26421894353717,
"4": 4.1724734692682395,
"5": 0.8119106579617028,
"6": 0.20313929837878217,
"7": 0.04857599989214818,
}
batchs = [
# {
# "buy_type": "normal",
# "weights": {
# "A": 16.559326336631095,
# "B": 17.242437197138468,
# "C": 23.794844051571708,
# "D": 24.480364769888073,
# "E": 31.29327382410323,
# "F": 38.17160528628571,
# "G": 17.242437197138468,
# "S": 2.2546844695944754,
# },
# "target_score": [(0,1, 500),
# (1,5, 500),
# (5,10, 500),
# (10,20, 500)],
# "scotter_count_weights": scotter_count_weights,
# },
# {
# "buy_type": "standard",
# "weights": {
# "A": 18.92167765262542,
# "B": 21.106974838197058,
# "C": 31.250953223681833,
# "D": 34.54623003651251,
# "E": 47.52175529241897,
# "F": 62.474728710698884,
# "G": 21.106974838197058,
# "S": 2.666109542063759,
# },
# "scotter_count_weights": scotter_count_weights,
# "target_score": [(20, 30, 500),
# (30, 50, 500),
# (50, 100, 500),
# (100, 200, 500),
# (200, 300, 500),
# (300, 400, 500),
# (400, 500, 500),
# ],
# },
{
"buy_type": "super",
"weights": {
"A": 19.015957779792195,
"B": 21.291015318701493,
"C": 31.66660200727613,
"D": 35.193596023259865,
"E": 48.7122724047052,
"F": 64.49005324700025,
"G": 21.291015318701493,
"S": 2.6840958157151236,
},
"scotter_count_weights": scotter_count_weights,
"target_score": [
(80, 120, 500),
(120, 200, 500),
(200, 400, 500),
(400, 600, 500),
(800, 1000, 500),
(1000, 1200, 500),
(1200, 1400, 200),
(1400, 1600, 200),
(1600, 1800, 200),
(1800, 2000, 200),
],
},
]
for batch in batchs:
sim = SugarRushSimulator(buy_type=batch["buy_type"])
sim.simulate_batch(
batch["weights"], batch["scotter_count_weights"], batch["target_score"]
)

161
pyground/sugar/write_sql.py Normal file
View File

@@ -0,0 +1,161 @@
# 假设有一个数据库辅助类,你需要根据实际环境实现
import json
import pymysql
class DB:
def __init__(self, host, user, password, database, port=3306, charset="utf8mb4"):
"""
初始化数据库连接
"""
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
port=port,
charset=charset,
cursorclass=pymysql.cursors.DictCursor,
)
self.last_insert_id = None
def _execute(self, sql, params=None):
"""
内部方法执行SQL并返回cursor
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params or ())
return cursor
except Exception as e:
print(f"SQL执行错误: {e}\nSQL: {sql}")
raise
def getOne(self, sql):
# 模拟查询一行
cursor = self._execute(sql)
return cursor.fetchone()
def getone(self, sql):
# PHP代码中混用了getOne和getone通常功能一致
return self.getOne(sql)
def query(self, sql):
"""
执行SQL (通常用于无返回结果的操作如UPDATE, DELETE, CREATE等)
"""
try:
with self.connection.cursor() as cursor:
result = cursor.execute(sql)
self.connection.commit()
# 记录最后插入的ID (如果是INSERT操作)
self.last_insert_id = cursor.lastrowid
return result
except Exception as e:
self.connection.rollback()
print(f"Query执行失败: {e}\nSQL: {sql}")
raise
def query_array(self, sql):
"""
查询多行
"""
cursor = self._execute(sql)
return cursor.fetchall()
def insert_array(self, data, table):
"""
插入数据
:param data: 字典,例如 [{'name': 'Alice', 'age': 25}]
:param table: 表名
"""
if not data:
return False
columns = ", ".join(data[0].keys())
# 使用占位符防止SQL注入
placeholders = ", ".join(["%s"] * len(data[0]))
values = [tuple(item.values()) for item in data]
sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
try:
with self.connection.cursor() as cursor:
cursor.executemany(sql, values)
self.connection.commit()
self.last_insert_id = cursor.lastrowid
return self.last_insert_id
except Exception as e:
self.connection.rollback()
print(f"插入数据失败: {e}")
raise
def insert_id(self):
"""
获取最后插入ID
"""
return self.last_insert_id
def close(self):
"""
关闭数据库连接
"""
if self.connection:
self.connection.close()
db = DB(host="localhost", user="fire", password="iTSJSPPZM3LSGAPC", database="fire")
file_name = "simulator_log_super_1776010009.7211218.log"
with open(file_name, "r") as f:
type = ""
if "super" in file_name:
type = "_super"
elif "normal" in file_name:
type = ""
elif "standard" in file_name:
type = "_free"
# db.query("truncate table game_sugar_spin{type}".format(type=type))
# db.query("truncate table game_sugar_step_info{type}".format(type=type))
spins = []
steps_data = []
index = 0
startGid = 10000
for line in f.readlines():
index += 1
data = json.loads(line)
data["gid"] = startGid + data['gid']
steps = data["steps"]
for step in steps:
step["gid"] = data["gid"]
step["symbol_links"] = json.dumps(
step["symbol_links"], separators=(",", ":"), ensure_ascii=False
)
step["multipler"] = json.dumps(
step["multipler"], separators=(",", ":"), ensure_ascii=False
)
steps_data.append(step)
del data["steps"]
spins.append(data)
if len(spins) > 100:
db.insert_array(spins, f"game_sugar_spin{type}")
spins = []
print(f"写入进度: {index}个spin")
if len(steps_data) > 300:
db.insert_array(steps_data, f"game_sugar_step_info{type}")
steps_data = []
if len(spins) > 0:
db.insert_array(spins, f"game_sugar_spin{type}")
spins = []
if len(steps_data) > 0:
db.insert_array(steps_data, f"game_sugar_step_info{type}")
steps_data = []

167
pyground/sugar/统计.py Normal file
View File

@@ -0,0 +1,167 @@
import pandas as pd
import matplotlib.pyplot as plt
import logging
import sys
from datetime import datetime
import pymysql
# ================= 配置部分 =================
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(
f'game_stats_{datetime.now().strftime("%Y%m%d")}.log', encoding="utf-8"
),
],
)
# 数据库连接配置 (请根据实际情况修改)
DB_CONFIG = {
"host": "localhost",
"user": "root",
"password": "123456",
"database": "fire",
"charset": "utf8mb4",
}
# ================= 核心逻辑 =================
def get_db_connection():
"""获取数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logging.info("数据库连接成功")
return conn
except Exception as e:
logging.error(f"数据库连接失败: {e}")
raise
def fetch_data(conn, type):
"""从数据库获取数据"""
query = f"SELECT gid, score, feature, extra_free, create_time FROM game_sugar_spin{type}"
try:
logging.info(f"正在执行查询: {query}")
df = pd.read_sql(query, conn)
logging.info(f"数据加载完成,共 {len(df)}")
return df
except Exception as e:
logging.error(f"数据查询失败: {e}")
raise
def analyze_data(df):
"""使用 Pandas 进行统计分析"""
logging.info("开始进行统计分析...")
# 1. 基础统计
basic_stats = df["score"].describe().to_frame().T
logging.info(f"\n基础统计信息:\n{basic_stats.to_string()}")
# 2. 统计得分为 0 的数量
zero_score_count = (df["score"] == 0).sum()
total_count = len(df)
zero_ratio = (
(zero_score_count / total_count * 100).round(2) if total_count > 0 else 0
)
logging.info(f"\n得分为 0 的统计: {zero_score_count} 次 (占比: {zero_ratio}%)")
# 2.1 统计免费旋转大于0的数量
extra_free_count = (df["extra_free"] > 0).sum()
total_count = len(df)
zero_ratio = (
(extra_free_count / total_count * 100).round(2) if total_count > 0 else 0
)
logging.info(
f"\n免费旋转次数大于 0 的统计: {extra_free_count} 次 (占比: {zero_ratio}%)"
)
# 3. 得分区间分布
bins = [0, 0.1, 0.5, 1, 10, 50, 100, 500, float("inf")]
labels = ["0-0.1", "0.1-0.5", "0.5-1", "1-10", "11-50", "51-100", "101-500", "500+"]
df["score_range"] = pd.cut(df["score"], bins=bins, labels=labels, right=False)
# 计算数量
range_dist = df["score_range"].value_counts().sort_index().to_frame(name="count")
# 计算比例 (百分比)
range_dist["percentage"] = (range_dist["count"] / len(df) * 100).round(2)
# 格式化日志输出,拼接百分比
# 使用 apply 将每一行转换为 "数量 (百分比%)" 的格式
log_output = range_dist.apply(
lambda row: f"{row['count']} ({row['percentage']}%)", axis=1
).to_string(name=False)
logging.info(f"\n得分区间分布 (数量 | 占比):\n{log_output}")
return basic_stats, range_dist
def visualize_data(range_dist):
"""生成可视化图表"""
logging.info("正在生成图表...")
plt.rcParams["font.sans-serif"] = ["SimHei"] # 用来正常显示中文标签
plt.rcParams["axes.unicode_minus"] = False # 用来正常显示负号
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# 图2: 得分区间分布 (饼图)
colors = [
"#ff9999",
"#ffb3e6",
"#66b3ff",
"#99ff99",
"#ffcc99",
"#c2c2f0",
"#ffb3e6",
]
range_dist["count"].plot(
kind="pie", ax=axes[1], autopct="%1.1f%%", startangle=90, colors=colors
)
axes[1].set_title("得分区间分布占比", fontsize=14)
axes[1].set_ylabel("") # 隐藏y轴标签
plt.tight_layout()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"game_score_stats_{timestamp}.png"
plt.savefig(filename)
logging.info(f"图表已保存为: {filename}")
# plt.show() # 如需弹窗显示可取消注释
def main():
start_time = datetime.now()
logging.info(f"任务开始执行: {start_time}")
conn = None
try:
conn = get_db_connection()
df = fetch_data(conn, "")
# 打印前5行数据预览
# logging.info("数据预览:\n" + df.head().to_string())
basic_stats, range_dist = analyze_data(df)
# visualize_data(range_dist)
basic_stats, range_dist = analyze_data(df.query("extra_free > 0"))
except Exception as e:
logging.error(f"任务执行出错: {e}")
finally:
if conn:
conn.close()
logging.info("数据库连接已关闭")
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logging.info(f"任务执行完成,耗时: {duration:.2f}")
if __name__ == "__main__":
main()