import pandas as pd import matplotlib.pyplot as plt import logging import sys from datetime import datetime import pymysql # ================= 配置部分 ================= # 日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler( f'game_stats_{datetime.now().strftime("%Y%m%d")}.log', encoding="utf-8" ), ], ) # 数据库连接配置 (请根据实际情况修改) DB_CONFIG = { "host": "localhost", "user": "root", "password": "123456", "database": "fire", "charset": "utf8mb4", } # ================= 核心逻辑 ================= def get_db_connection(): """获取数据库连接""" try: conn = pymysql.connect(**DB_CONFIG) logging.info("数据库连接成功") return conn except Exception as e: logging.error(f"数据库连接失败: {e}") raise def fetch_data(conn, type): """从数据库获取数据""" query = f"SELECT gid, score, feature, extra_free, create_time FROM game_sugar_spin{type}" try: logging.info(f"正在执行查询: {query}") df = pd.read_sql(query, conn) logging.info(f"数据加载完成,共 {len(df)} 行") return df except Exception as e: logging.error(f"数据查询失败: {e}") raise def analyze_data(df): """使用 Pandas 进行统计分析""" logging.info("开始进行统计分析...") # 1. 基础统计 basic_stats = df["score"].describe().to_frame().T logging.info(f"\n基础统计信息:\n{basic_stats.to_string()}") # 2. 统计得分为 0 的数量 zero_score_count = (df["score"] == 0).sum() total_count = len(df) zero_ratio = ( (zero_score_count / total_count * 100).round(2) if total_count > 0 else 0 ) logging.info(f"\n得分为 0 的统计: {zero_score_count} 次 (占比: {zero_ratio}%)") # 2.1 统计免费旋转大于0的数量 extra_free_count = (df["extra_free"] > 0).sum() total_count = len(df) zero_ratio = ( (extra_free_count / total_count * 100).round(2) if total_count > 0 else 0 ) logging.info( f"\n免费旋转次数大于 0 的统计: {extra_free_count} 次 (占比: {zero_ratio}%)" ) # 3. 得分区间分布 bins = [0, 0.1, 0.5, 1, 10, 50, 100, 500, float("inf")] labels = ["0-0.1", "0.1-0.5", "0.5-1", "1-10", "11-50", "51-100", "101-500", "500+"] df["score_range"] = pd.cut(df["score"], bins=bins, labels=labels, right=False) # 计算数量 range_dist = df["score_range"].value_counts().sort_index().to_frame(name="count") # 计算比例 (百分比) range_dist["percentage"] = (range_dist["count"] / len(df) * 100).round(2) # 格式化日志输出,拼接百分比 # 使用 apply 将每一行转换为 "数量 (百分比%)" 的格式 log_output = range_dist.apply( lambda row: f"{row['count']} ({row['percentage']}%)", axis=1 ).to_string(name=False) logging.info(f"\n得分区间分布 (数量 | 占比):\n{log_output}") return basic_stats, range_dist def visualize_data(range_dist): """生成可视化图表""" logging.info("正在生成图表...") plt.rcParams["font.sans-serif"] = ["SimHei"] # 用来正常显示中文标签 plt.rcParams["axes.unicode_minus"] = False # 用来正常显示负号 fig, axes = plt.subplots(1, 2, figsize=(16, 6)) # 图2: 得分区间分布 (饼图) colors = [ "#ff9999", "#ffb3e6", "#66b3ff", "#99ff99", "#ffcc99", "#c2c2f0", "#ffb3e6", ] range_dist["count"].plot( kind="pie", ax=axes[1], autopct="%1.1f%%", startangle=90, colors=colors ) axes[1].set_title("得分区间分布占比", fontsize=14) axes[1].set_ylabel("") # 隐藏y轴标签 plt.tight_layout() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"game_score_stats_{timestamp}.png" plt.savefig(filename) logging.info(f"图表已保存为: {filename}") # plt.show() # 如需弹窗显示可取消注释 def main(): start_time = datetime.now() logging.info(f"任务开始执行: {start_time}") conn = None try: conn = get_db_connection() df = fetch_data(conn, "") # 打印前5行数据预览 # logging.info("数据预览:\n" + df.head().to_string()) basic_stats, range_dist = analyze_data(df) # visualize_data(range_dist) basic_stats, range_dist = analyze_data(df.query("extra_free > 0")) except Exception as e: logging.error(f"任务执行出错: {e}") finally: if conn: conn.close() logging.info("数据库连接已关闭") end_time = datetime.now() duration = (end_time - start_time).total_seconds() logging.info(f"任务执行完成,耗时: {duration:.2f} 秒") if __name__ == "__main__": main()