import json import random from typing import List, Dict, Tuple, Optional from symbol_pay_table import get_symbol_pay # 符号默认权重 default_symbol_weights = { "A": 2, # 高赔率 "B": 5, # 中高赔率 "C": 10, # 中等赔率 "D": 15, # 中低赔率 "E": 20, # 低赔率 "F": 30, # 极低赔率 "G": 40, # 最低赔率 "S": 1, # Scatter } scotter_count_mapping = {"3": 10, "4": 12, "5": 15, "6": 20, "7": 30} class SugarRush1000: def __init__( self, balance: float = 1000.0, bet: float = 1.0, mock_grid=None, weights=None, scotter_counts_weights=None, ): """ 初始化游戏 :param balance: 玩家余额 :param bet: 单次下注额 """ self.balance = balance self.bet = bet self.rows = 7 self.cols = 7 # 游戏符号定义和权重 self.symbol_weights = weights if weights else default_symbol_weights self.symbols = list(self.symbol_weights.keys()) self.weights = list(self.symbol_weights.values()) if scotter_counts_weights is None: scotter_counts_weights = {3: 10, 4: 12, 5: 15, 6: 20, 7: 30} self.scotter_counts = list(scotter_counts_weights.keys()) self.scotter_counts_weights = list(scotter_counts_weights.values()) # 游戏状态 self.grid = [] # 7x7 网格 self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)] self.free_spins = 0 # 剩余免费旋转次数 self.is_free_spin_status = False self.max_win_cap = 25000 # 最大赢利倍数限制 self.is_buy_free = False self.is_super_free = False # 是否为超级免费旋转模式 self.spin_total_win = ( 0 # 本次旋转的总赢利。主要计算免费旋转过程中累积盈利是否超过最大额度 ) self.mock_grid = mock_grid # 用于测试的固定网格 self.drop_sequence = [] # 用于测试的掉落序列 def _get_next_drop_symbol(self, count=1, no_scatter: bool = False): """生成一列中的count个符号,要求一列中最多有一个scatter符号 no_scatter: 是否排除scatter符号。如果设置为True,则不生成scatter符号,如果设置为False,则则本次生成的符号中顶多有一个scatter符号 """ # 测试辅助方法:从预设序列中获取掉落符号。预设序列不使用最多有一个scatter符号的规则 result = [] while count > 0: if self.drop_sequence: result.append(self.drop_sequence.pop(0)) count -= 1 else: break symbols1 = self.symbols[:] weights1 = self.weights[:] tmp = [(s, w) for s, w in zip(symbols1, weights1) if s != "S"] symbols2, weights2 = map(list, zip(*tmp)) for _ in range(count): tmp_symbols = symbols1 tmp_weights = weights1 if no_scatter: tmp_symbols = symbols2 tmp_weights = weights2 symbol = random.choices(tmp_symbols, weights=tmp_weights, k=1)[0] result.append(symbol) if symbol == "S": no_scatter = True return result def grid_col_has_scotter(self, col): for row in range(self.rows): if self.grid[row][col] == "S": return True return False def _init_grid(self): """初始化或重置网格""" self.grid = [[None for _ in range(self.cols)] for _ in range(self.rows)] for col in range(self.cols): symbols = self._get_next_drop_symbol(count=self.rows, no_scatter=False) for row in range(self.rows): self.grid[row][col] = symbols[row] def buy_free_spins(self, buy_type: Optional[str] = None) -> int: """:param buy_type: None=普通旋转, 'standard'=买免费旋转(50x), 'super'=买超级免费旋转(200x)""" result = {} if not buy_type: result = {"message": "未购买任何功能", "balance": self.balance} cost = 0 if buy_type == "standard": cost = self.bet * 50 elif buy_type == "super": cost = self.bet * 200 if self.balance < cost: result["message"] = "余额不足,无法购买功能" return result self.is_buy_free = True self.is_super_free = True if buy_type == "super" else False self.balance -= cost self.free_spins = 0 result = { "cost": cost, "balance": self.balance, "is_buy_free": self.is_buy_free, "is_super_free": self.is_super_free, } return result def doSpin(self) -> Dict: """ 执行一次旋转/翻转操作的核心接口 :param buy_type: None=普通旋转, 'standard'=买免费旋转(100x), 'super'=买超级免费旋转(500x) :return: 包含游戏状态和结果的字典 """ result = { "balance": self.balance, "bet": self.bet, "actual_bet": 0.0, "is_scotter": False, "added_spins": 0, "win": 0.0, "grid": self.grid, "multipliers": self.multipliers, "free_spins_remaining": self.free_spins, "is_super": self.is_super_free, "message": "", "error": 0, "extra_free": False, } # 1. 扣除下注积分 if ( not self.is_buy_free and not self.is_free_spin_status ): # 普通模式,而且不是买免费旋转 if self.balance < self.bet: result["message"] = "游戏结束,余额不足" result["error"] = 1 return result result["actual_bet"] = self.bet self.balance -= self.bet # 初始化游戏状态 # 不是免费旋转模式则重置乘数点和累计盈利 if not self.is_free_spin_status: self.spin_total_win = 0 self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)] # 免费旋转扣除次数 if self.free_spins > 0: self.free_spins -= 1 # 1.2 初始化网格 if self.mock_grid: self.grid = self.mock_grid else: self._init_grid() # 2. 处理购买功能,触发>=3个的scotter buy_free = self.is_buy_free if self.is_buy_free: self.is_buy_free = False # 购买时强制触发Scatter scatters_count = random.choices( self.scotter_counts, weights=self.scotter_counts_weights, k=1 )[0] self._force_scatters(int(scatters_count)) # 2.1 如果是超级免费旋转而且没有初始化,则初始化所有点为x2 if self.is_super_free and self.free_spins > 0: self.is_super_free = False self.multipliers = [[2 for _ in range(self.cols)] for _ in range(self.rows)] # 3. 开始翻转循环 cascade_win = 0.0 cascade_count = 0 max_win_limit = self.bet * self.max_win_cap steps = [] while True: step = { "multipler": [], "grid": "", # 运行后状态 "score": 0, "symbol_links": [], } steps.append(step) step["grid"] = "".join([s for row in self.grid for s in row]) for r in range(self.rows): for c in range(self.cols): if self.multipliers[r][c] > 0: step["multipler"].append(r * self.cols + c) step["multipler"].append(self.multipliers[r][c]) # 1. 查找赢奖组合 clusters = self._find_clusters() if not clusters: break # 没有赢奖组合,翻转结束 cascade_count += 1 current_spin_win = 0.0 # 2. 计算赔付并处理消除 # 注意:需要先收集所有要消除的位置,避免计算时网格已变 symbols_to_remove = set() for symbol, coords in clusters: count = len(coords) if count < 5: continue # 计算该组合的乘数 (包含该区域内所有乘数点的叠加) cluster_mult = 0 cluster_info = [] for r, c in coords: if self.multipliers[r][c] > 1: cluster_mult += self.multipliers[r][c] cluster_info.append([r * 7 + c, self.multipliers[r][c]]) cluster_mult = max(1, cluster_mult) # 基础赔付 * 符号倍率 * 组合乘数 pay_factor = get_symbol_pay(symbol, count) win_amount = self.bet * pay_factor * cluster_mult current_spin_win += win_amount symbol_link = { "symbol": symbol, "loc": [], "multipler": [], "total_multi": 0, "base_score": 0, "score": 0, } symbol_link["base_score"] = self.bet * pay_factor symbol_link["score"] = win_amount symbol_link["total_multi"] = cluster_mult for r, c in coords: symbol_link["loc"].append(r * 7 + c) for index, m in cluster_info: symbol_link["multipler"].append(index) symbol_link["multipler"].append(m) step["symbol_links"].append(symbol_link) # 记录消除位置并更新乘数点 for r, c in coords: symbols_to_remove.add((r, c)) # 乘数点逻辑:炸开一次,点数+1 (2的幂次方) if self.multipliers[r][c] > 0: self.multipliers[r][c] = self.multipliers[r][c] * 2 else: self.multipliers[r][c] = 1 if self.multipliers[r][c] > 1024: self.multipliers[r][c] = 1024 step["score"] = current_spin_win cascade_win += current_spin_win # 检查是否超过最大赢利限制 if cascade_win >= max_win_limit: cascade_win = max_win_limit result["message"] = "达到最大赢奖上限 25000x!" break # 3. 执行消除和掉落 self._apply_cascade(symbols_to_remove) # 3.1 最后处理scotter scatter_count = sum(row.count("S") for row in self.grid) if scatter_count >= 3: if not self.is_free_spin_status: self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)] self.is_free_spin_status = True added_spins = self._add_free_spins(scatter_count) if added_spins > 0: result["is_scotter"] = True result["added_spins"] = added_spins result["message"] = f"触发免费旋转! +{added_spins}" if added_spins > 0 and not buy_free: result["extra_free"] = True # 3.2 处理达到最大奖限制 raw_spin_total_win = self.spin_total_win self.spin_total_win += cascade_win if self.spin_total_win >= max_win_limit: cascade_win = max_win_limit - raw_spin_total_win self.spin_total_win = max_win_limit self.free_spins = 0 # 立即结束 result["message"] = "达到最大赢奖上限 25000x!" # 4. 结算 if cascade_win > 0: self.balance += cascade_win # 免费旋转模式结束 if self.free_spins <= 0: self.is_free_spin_status = False self.multipliers = [[0 for _ in range(self.cols)] for _ in range(self.rows)] # 5. 构建结果信息 result["win"] = cascade_win result["balance"] = self.balance result["free_spins_remaining"] = self.free_spins result["grid"] = self.grid result["multipliers"] = self.multipliers result["steps"] = steps result["spin_total_win"] = self.spin_total_win return result def _find_clusters(self) -> Dict[str, List[Tuple[int, int]]]: """查找所有相连的符号块 (排除SCATTER)""" visited = set() clusters = [] for r in range(self.rows): for c in range(self.cols): symbol = self.grid[r][c] if symbol == "S" or (r, c) in visited: continue # BFS 查找连通块 queue = [(r, c)] connected = [] while queue: curr_r, curr_c = queue.pop(0) if (curr_r, curr_c) in visited: continue if self.grid[curr_r][curr_c] == symbol: visited.add((curr_r, curr_c)) connected.append((curr_r, curr_c)) # 检查上下左右 for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]: nr, nc = curr_r + dr, curr_c + dc if 0 <= nr < self.rows and 0 <= nc < self.cols: if (nr, nc) not in visited and self.grid[nr][ nc ] == symbol: queue.append((nr, nc)) if len(connected) >= 5: clusters.append((symbol, connected)) return clusters def _apply_cascade(self, remove_coords: set): """执行消除和掉落逻辑""" # 1. 消除 for r, c in remove_coords: self.grid[r][c] = None # 标记为空 # 2. 掉落 (按列处理) for c in range(self.cols): # 提取该列所有非空符号 existing_syms = [ self.grid[r][c] for r in range(self.rows) if self.grid[r][c] is not None ] # 填充新符号到顶部 needed = self.rows - len(existing_syms) new_syms = self._get_next_drop_symbol( count=needed, no_scatter=self.grid_col_has_scotter(c) ) # 重新组合该列 (新符号在上,原有符号在下) new_col = new_syms + existing_syms # 更新回网格 for r in range(self.rows): self.grid[r][c] = new_col[r] def _force_scatters(self, count: int): """强制在随机位置放置指定数量的Scatter""" # 检查是否已经有,有的话改成其他符号 for r in range(self.rows): for c in range(self.cols): if self.grid[r][c] == "S": self.grid[r][c] = "G" cols = list(range(self.cols)) for _ in range(count): x = random.choice(cols) y = random.randint(0, self.rows - 1) self.grid[y][x] = "S" cols.remove(x) def _add_free_spins(self, scatter_count: int) -> int: """根据Scatter数量增加免费旋转次数""" if scatter_count < 3: return 0 spins = scotter_count_mapping.get(str(scatter_count), 30) self.free_spins += spins return spins def no_cluster_grid(): return [ ( ["B" if col % 2 == 0 else "E" for col in range(7)] if row % 2 == 0 else ["E" if col % 2 == 0 else "B" for col in range(7)] ) for row in range(7) ] if __name__ == "__main__": begin_balance = 1_0000_0000 custom_grid = no_cluster_grid() custom_grid[0] = ["A"] * 7 # 设置一个乘数点 game = SugarRush1000( balance=begin_balance, weights={ "A": 11.910798577481279, "B": 23.10218485800891, "C": 36.43351025439602, "D": 46.62768084621008, "E": 54.778674468859805, "F": 64.18383150145051, "G": 75.00641730395597, "S": 1.5190499131843918, }, scotter_counts_weights={ "3": 7.26421894353717, "4": 4.1724734692682395, "5": 0.8119106579617028, "6": 0.20313929837878217, "7": 0.04857599989214818, }, mock_grid=custom_grid, ) game.drop_sequence = [ "C", "C", "C", "C", "C", "C", "C", "C", "C", "C", "C", "C", "C", "C", "C", "A", "B", "C", "D", "A", "B", ] # 预设掉落全是 C res = game.doSpin() print(json.dumps(res))