Files
roll-room/pyground/sugar/SugarRush1000.py
2026-04-23 16:58:11 +08:00

490 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
from typing import List, Dict, Tuple, Optional
from symbol_pay_table import get_symbol_pay
# 符号默认权重
default_symbol_weights = {
"A": 2, # 高赔率
"B": 5, # 中高赔率
"C": 10, # 中等赔率
"D": 15, # 中低赔率
"E": 20, # 低赔率
"F": 30, # 极低赔率
"G": 40, # 最低赔率
"S": 1, # Scatter
}
scotter_count_mapping = {"3": 10, "4": 12, "5": 15, "6": 20, "7": 30}
class SugarRush1000:
def __init__(
self,
balance: float = 1000.0,
bet: float = 1.0,
mock_grid=None,
weights=None,
scotter_counts_weights=None,
):
"""
初始化游戏
:param balance: 玩家余额
:param bet: 单次下注额
"""
self.balance = balance
self.bet = bet
self.rows = 7
self.cols = 7
# 游戏符号定义和权重
self.symbol_weights = weights if weights else default_symbol_weights
self.symbols = list(self.symbol_weights.keys())
self.weights = list(self.symbol_weights.values())
if scotter_counts_weights is None:
scotter_counts_weights = {3: 10, 4: 12, 5: 15, 6: 20, 7: 30}
self.scotter_counts = list(scotter_counts_weights.keys())
self.scotter_counts_weights = list(scotter_counts_weights.values())
# 游戏状态
self.grid = []
# 7x7 网格
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
self.free_spins = 0 # 剩余免费旋转次数
self.is_free_spin_status = False
self.max_win_cap = 25000 # 最大赢利倍数限制
self.is_buy_free = False
self.is_super_free = False # 是否为超级免费旋转模式
self.spin_total_win = (
0 # 本次旋转的总赢利。主要计算免费旋转过程中累积盈利是否超过最大额度
)
self.mock_grid = mock_grid # 用于测试的固定网格
self.drop_sequence = [] # 用于测试的掉落序列
def _get_next_drop_symbol(self, count=1, no_scatter: bool = False):
"""生成一列中的count个符号要求一列中最多有一个scatter符号
no_scatter: 是否排除scatter符号。如果设置为True则不生成scatter符号如果设置为False则则本次生成的符号中顶多有一个scatter符号
"""
# 测试辅助方法从预设序列中获取掉落符号。预设序列不使用最多有一个scatter符号的规则
result = []
while count > 0:
if self.drop_sequence:
result.append(self.drop_sequence.pop(0))
count -= 1
else:
break
symbols1 = self.symbols[:]
weights1 = self.weights[:]
tmp = [(s, w) for s, w in zip(symbols1, weights1) if s != "S"]
symbols2, weights2 = map(list, zip(*tmp))
for _ in range(count):
tmp_symbols = symbols1
tmp_weights = weights1
if no_scatter:
tmp_symbols = symbols2
tmp_weights = weights2
symbol = random.choices(tmp_symbols, weights=tmp_weights, k=1)[0]
result.append(symbol)
if symbol == "S":
no_scatter = True
return result
def grid_col_has_scotter(self, col):
for row in range(self.rows):
if self.grid[row][col] == "S":
return True
return False
def _init_grid(self):
"""初始化或重置网格"""
self.grid = [[None for _ in range(self.cols)] for _ in range(self.rows)]
for col in range(self.cols):
symbols = self._get_next_drop_symbol(count=self.rows, no_scatter=False)
for row in range(self.rows):
self.grid[row][col] = symbols[row]
def buy_free_spins(self, buy_type: Optional[str] = None) -> int:
""":param buy_type: None=普通旋转, 'standard'=买免费旋转(50x), 'super'=买超级免费旋转(200x)"""
result = {}
if not buy_type:
result = {"message": "未购买任何功能", "balance": self.balance}
cost = 0
if buy_type == "standard":
cost = self.bet * 50
elif buy_type == "super":
cost = self.bet * 200
if self.balance < cost:
result["message"] = "余额不足,无法购买功能"
return result
self.is_buy_free = True
self.is_super_free = True if buy_type == "super" else False
self.balance -= cost
self.free_spins = 0
result = {
"cost": cost,
"balance": self.balance,
"is_buy_free": self.is_buy_free,
"is_super_free": self.is_super_free,
}
return result
def doSpin(self) -> Dict:
"""
执行一次旋转/翻转操作的核心接口
:param buy_type: None=普通旋转, 'standard'=买免费旋转(100x), 'super'=买超级免费旋转(500x)
:return: 包含游戏状态和结果的字典
"""
result = {
"balance": self.balance,
"bet": self.bet,
"actual_bet": 0.0,
"is_scotter": False,
"added_spins": 0,
"win": 0.0,
"grid": self.grid,
"multipliers": self.multipliers,
"free_spins_remaining": self.free_spins,
"is_super": self.is_super_free,
"message": "",
"error": 0,
"extra_free": False,
}
# 1. 扣除下注积分
if (
not self.is_buy_free and not self.is_free_spin_status
): # 普通模式,而且不是买免费旋转
if self.balance < self.bet:
result["message"] = "游戏结束,余额不足"
result["error"] = 1
return result
result["actual_bet"] = self.bet
self.balance -= self.bet
# 初始化游戏状态
# 不是免费旋转模式则重置乘数点和累计盈利
if not self.is_free_spin_status:
self.spin_total_win = 0
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
# 免费旋转扣除次数
if self.free_spins > 0:
self.free_spins -= 1
# 1.2 初始化网格
if self.mock_grid:
self.grid = self.mock_grid
else:
self._init_grid()
# 2. 处理购买功能,触发>=3个的scotter
buy_free = self.is_buy_free
if self.is_buy_free:
self.is_buy_free = False
# 购买时强制触发Scatter
scatters_count = random.choices(
self.scotter_counts, weights=self.scotter_counts_weights, k=1
)[0]
self._force_scatters(int(scatters_count))
# 2.1 如果是超级免费旋转而且没有初始化则初始化所有点为x2
if self.is_super_free and self.free_spins > 0:
self.is_super_free = False
self.multipliers = [[2 for _ in range(self.cols)] for _ in range(self.rows)]
# 3. 开始翻转循环
cascade_win = 0.0
cascade_count = 0
max_win_limit = self.bet * self.max_win_cap
steps = []
while True:
step = {
"multipler": [],
"grid": "",
# 运行后状态
"score": 0,
"symbol_links": [],
}
steps.append(step)
step["grid"] = "".join([s for row in self.grid for s in row])
for r in range(self.rows):
for c in range(self.cols):
if self.multipliers[r][c] > 0:
step["multipler"].append(r * self.cols + c)
step["multipler"].append(self.multipliers[r][c])
# 1. 查找赢奖组合
clusters = self._find_clusters()
if not clusters:
break # 没有赢奖组合,翻转结束
cascade_count += 1
current_spin_win = 0.0
# 2. 计算赔付并处理消除
# 注意:需要先收集所有要消除的位置,避免计算时网格已变
symbols_to_remove = set()
for symbol, coords in clusters:
count = len(coords)
if count < 5:
continue
# 计算该组合的乘数 (包含该区域内所有乘数点的叠加)
cluster_mult = 0
cluster_info = []
for r, c in coords:
if self.multipliers[r][c] > 1:
cluster_mult += self.multipliers[r][c]
cluster_info.append([r * 7 + c, self.multipliers[r][c]])
cluster_mult = max(1, cluster_mult)
# 基础赔付 * 符号倍率 * 组合乘数
pay_factor = get_symbol_pay(symbol, count)
win_amount = self.bet * pay_factor * cluster_mult
current_spin_win += win_amount
symbol_link = {
"symbol": symbol,
"loc": [],
"multipler": [],
"total_multi": 0,
"base_score": 0,
"score": 0,
}
symbol_link["base_score"] = self.bet * pay_factor
symbol_link["score"] = win_amount
symbol_link["total_multi"] = cluster_mult
for r, c in coords:
symbol_link["loc"].append(r * 7 + c)
for index, m in cluster_info:
symbol_link["multipler"].append(index)
symbol_link["multipler"].append(m)
step["symbol_links"].append(symbol_link)
# 记录消除位置并更新乘数点
for r, c in coords:
symbols_to_remove.add((r, c))
# 乘数点逻辑:炸开一次,点数+1 (2的幂次方)
if self.multipliers[r][c] > 0:
self.multipliers[r][c] = self.multipliers[r][c] * 2
else:
self.multipliers[r][c] = 1
if self.multipliers[r][c] > 1024:
self.multipliers[r][c] = 1024
step["score"] = current_spin_win
cascade_win += current_spin_win
# 检查是否超过最大赢利限制
if cascade_win >= max_win_limit:
cascade_win = max_win_limit
result["message"] = "达到最大赢奖上限 25000x!"
break
# 3. 执行消除和掉落
self._apply_cascade(symbols_to_remove)
# 3.1 最后处理scotter
scatter_count = sum(row.count("S") for row in self.grid)
if scatter_count >= 3:
if not self.is_free_spin_status:
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
self.is_free_spin_status = True
added_spins = self._add_free_spins(scatter_count)
if added_spins > 0:
result["is_scotter"] = True
result["added_spins"] = added_spins
result["message"] = f"触发免费旋转! +{added_spins}"
if added_spins > 0 and not buy_free:
result["extra_free"] = True
# 3.2 处理达到最大奖限制
raw_spin_total_win = self.spin_total_win
self.spin_total_win += cascade_win
if self.spin_total_win >= max_win_limit:
cascade_win = max_win_limit - raw_spin_total_win
self.spin_total_win = max_win_limit
self.free_spins = 0 # 立即结束
result["message"] = "达到最大赢奖上限 25000x!"
# 4. 结算
if cascade_win > 0:
self.balance += cascade_win
# 免费旋转模式结束
if self.free_spins <= 0:
self.is_free_spin_status = False
self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)]
# 5. 构建结果信息
result["win"] = cascade_win
result["balance"] = self.balance
result["free_spins_remaining"] = self.free_spins
result["grid"] = self.grid
result["multipliers"] = self.multipliers
result["steps"] = steps
result["spin_total_win"] = self.spin_total_win
return result
def _find_clusters(self) -> Dict[str, List[Tuple[int, int]]]:
"""查找所有相连的符号块 (排除SCATTER)"""
visited = set()
clusters = []
for r in range(self.rows):
for c in range(self.cols):
symbol = self.grid[r][c]
if symbol == "S" or (r, c) in visited:
continue
# BFS 查找连通块
queue = [(r, c)]
connected = []
while queue:
curr_r, curr_c = queue.pop(0)
if (curr_r, curr_c) in visited:
continue
if self.grid[curr_r][curr_c] == symbol:
visited.add((curr_r, curr_c))
connected.append((curr_r, curr_c))
# 检查上下左右
for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nr, nc = curr_r + dr, curr_c + dc
if 0 <= nr < self.rows and 0 <= nc < self.cols:
if (nr, nc) not in visited and self.grid[nr][
nc
] == symbol:
queue.append((nr, nc))
if len(connected) >= 5:
clusters.append((symbol, connected))
return clusters
def _apply_cascade(self, remove_coords: set):
"""执行消除和掉落逻辑"""
# 1. 消除
for r, c in remove_coords:
self.grid[r][c] = None # 标记为空
# 2. 掉落 (按列处理)
for c in range(self.cols):
# 提取该列所有非空符号
existing_syms = [
self.grid[r][c] for r in range(self.rows) if self.grid[r][c] is not None
]
# 填充新符号到顶部
needed = self.rows - len(existing_syms)
new_syms = self._get_next_drop_symbol(
count=needed, no_scatter=self.grid_col_has_scotter(c)
)
# 重新组合该列 (新符号在上,原有符号在下)
new_col = new_syms + existing_syms
# 更新回网格
for r in range(self.rows):
self.grid[r][c] = new_col[r]
def _force_scatters(self, count: int):
"""强制在随机位置放置指定数量的Scatter"""
# 检查是否已经有,有的话改成其他符号
for r in range(self.rows):
for c in range(self.cols):
if self.grid[r][c] == "S":
self.grid[r][c] = "G"
cols = list(range(self.cols))
for _ in range(count):
x = random.choice(cols)
y = random.randint(0, self.rows - 1)
self.grid[y][x] = "S"
cols.remove(x)
def _add_free_spins(self, scatter_count: int) -> int:
"""根据Scatter数量增加免费旋转次数"""
if scatter_count < 3:
return 0
spins = scotter_count_mapping.get(str(scatter_count), 30)
self.free_spins += spins
return spins
def no_cluster_grid():
return [
(
["B" if col % 2 == 0 else "E" for col in range(7)]
if row % 2 == 0
else ["E" if col % 2 == 0 else "B" for col in range(7)]
)
for row in range(7)
]
if __name__ == "__main__":
begin_balance = 1_0000_0000
custom_grid = no_cluster_grid()
custom_grid[0] = ["A"] * 7 # 设置一个乘数点
game = SugarRush1000(
balance=begin_balance,
weights={
"A": 11.910798577481279,
"B": 23.10218485800891,
"C": 36.43351025439602,
"D": 46.62768084621008,
"E": 54.778674468859805,
"F": 64.18383150145051,
"G": 75.00641730395597,
"S": 1.5190499131843918,
},
scotter_counts_weights={
"3": 7.26421894353717,
"4": 4.1724734692682395,
"5": 0.8119106579617028,
"6": 0.20313929837878217,
"7": 0.04857599989214818,
},
mock_grid=custom_grid,
)
game.drop_sequence = [
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"C",
"A",
"B",
"C",
"D",
"A",
"B",
] # 预设掉落全是 C
res = game.doSpin()
print(json.dumps(res))