import requestsimport matplotlib.pyplot as pltfrom datetime import datetimeimport matplotlib.font_manager as fmimport sysimport time# ==================== 配置区域 ====================# 如果图表中文显示异常,请尝试指定系统中文字体路径(可选)# 例如Windows: r'C:\Windows\Fonts\msyh.ttc'CHINESE_FONT_PATH = None # 备用API配置(主API wttr.in 失效时自动切换)USE_BACKUP_API = TrueBACKUP_API_URL = "http://t.weather.itboy.net/api/weather/city/"# 常用城市代码映射(可自行添加)CITY_CODE_MAP = { '北京': '101010100', '上海': '101020100', '广州': '101280101', '深圳': '101280601', '杭州': '101210101', '南京': '101190101', '武汉': '101200101', '成都': '101270101', '济南': '101120101', '青岛': '101120201', # 更多城市代码可从 https://github.com/qwd/LocationList 获取}# =================================================class WeatherDashboard: def __init__(self): """初始化天气仪表板""" # 主API(wttr.in - 国际) self.main_api = { 'url': "https://wttr.in", 'params': {'format': 'j1', 'lang': 'zh'} } # 备用API(国内源,速度更快但需要城市代码) self.backup_api = { 'url': "http://t.weather.itboy.net/api/weather/city/", 'city_map': CITY_CODE_MAP } # 初始化图表中文字体 self._setup_chinese_font() # 请求配置 self.timeout = 15 self.max_retries = 2 def _setup_chinese_font(self): """设置中文字体,解决乱码问题""" if CHINESE_FONT_PATH: try: font_prop = fm.FontProperties(fname=CHINESE_FONT_PATH) plt.rcParams['font.sans-serif'] = [font_prop.get_name()] except: print(f"警告: 无法加载指定字体 {CHINESE_FONT_PATH},使用系统默认字体") # 设置备用字体方案 font_options = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans', 'Arial Unicode MS', 'sans-serif'] current_fonts = plt.rcParams.get('font.sans-serif', []) for font in font_options: if font not in current_fonts: current_fonts = [font] + current_fonts plt.rcParams['font.sans-serif'] = current_fonts plt.rcParams['axes.unicode_minus'] = False def get_weather_with_retry(self, city): """获取天气数据(带重试和备用API)""" # 先尝试主API print(f"正在通过主API查询 {city} 的天气...") data = self._try_main_api(city) # 如果主API失败且启用备用API,尝试备用API if not data and USE_BACKUP_API: print("主API查询失败,尝试备用API...") data = self._try_backup_api(city) return data def _try_main_api(self, city): """尝试主API(wttr.in)""" for attempt in range(self.max_retries): try: url = f"{self.main_api['url']}/{requests.utils.quote(city)}" response = requests.get( url, params=self.main_api['params'], timeout=self.timeout, headers={'User-Agent': 'Mozilla/5.0'} # 模拟浏览器 ) response.raise_for_status() return response.json() except requests.exceptions.Timeout: print(f" 主API超时 ({attempt+1}/{self.max_retries}),重试...") time.sleep(1) # 等待1秒后重试 except Exception as e: print(f" 主API错误: {e}") break return None def _try_backup_api(self, city): """尝试备用API(国内源)""" # 查找城市代码 city_code = None for name, code in self.backup_api['city_map'].items(): if city in name or name in city: city_code = code break if not city_code: print(f" 备用API未找到城市 {city} 的代码") # 尝试直接使用名称(部分API支持) city_code = city try: url = f"{self.backup_api['url']}{city_code}" response = requests.get(url, timeout=self.timeout) response.raise_for_status() data = response.json() # 转换备用API数据格式以匹配主程序 return self._convert_backup_data(data, city) except Exception as e: print(f" 备用API错误: {e}") return None def _convert_backup_data(self, data, city): """转换备用API数据格式""" if data.get('status') != 200: return None # 构建与wttr.in相似的数据结构 city_data = data.get('cityInfo', {}) weather_data = data.get('data', {}) converted = { 'current_condition': [{ 'temp_C': weather_data.get('wendu', 'N/A'), 'temp_F': str(int(float(weather_data.get('wendu', 0)) * 9/5 + 32)), 'FeelsLikeC': weather_data.get('wendu', 'N/A'), 'humidity': weather_data.get('shidu', 'N/A').strip('%') if weather_data.get('shidu') else 'N/A', 'lang_zh': [{'value': weather_data.get('forecast', [{}])[0].get('type', '未知')}], 'windspeedKmph': weather_data.get('forecast', [{}])[0].get('fl', 'N/A').replace('级', ''), 'winddir16Point': weather_data.get('forecast', [{}])[0].get('fx', 'N/A'), 'pressure': '1013', 'visibility': '10', 'cloudcover': '0' }], 'nearest_area': [{ 'areaName': [{'value': city_data.get('city', city)}], 'region': [{'value': city_data.get('parent', '')}], 'country': [{'value': '中国'}] }], 'weather': [{ 'astronomy': [{ 'sunrise': weather_data.get('forecast', [{}])[0].get('sunrise', '06:00'), 'sunset': weather_data.get('forecast', [{}])[0].get('sunset', '18:00') }], 'hourly': self._create_hourly_forecast(weather_data.get('forecast', [])) }], 'source': 'backup_api' } return converted def _create_hourly_forecast(self, forecast): """创建小时预报数据""" hourly = [] if forecast and len(forecast) > 0: today = forecast[0] times = ['08:00', '12:00', '16:00', '20:00'] for i, time_str in enumerate(times): hourly.append({ 'time': time_str.replace(':00', ''), 'tempC': str(int(today.get('high', '20').replace('℃', '')) - i*2), 'lang_zh': [{'value': today.get('type', '晴')}], 'chanceofrain': '0' }) return hourly[:4] def parse_weather_data(self, data): """解析天气数据""" if not data: return None, None # 检查数据来源 source = data.get('source', 'main_api') if source == 'backup_api': current = data['current_condition'][0] area = data['nearest_area'][0] weather_info = { 'city': area['areaName'][0]['value'], 'region': area['region'][0]['value'], 'country': area['country'][0]['value'], 'temp_c': current['temp_C'], 'temp_f': current['temp_F'], 'feelslike_c': current['FeelsLikeC'], 'humidity': current['humidity'], 'weather_desc': current['lang_zh'][0]['value'], 'wind_speed': current['windspeedKmph'], 'wind_dir': current['winddir16Point'], 'pressure': current['pressure'], 'visibility': current['visibility'], 'cloud_cover': current['cloudcover'], 'sunrise': data['weather'][0]['astronomy'][0]['sunrise'], 'sunset': data['weather'][0]['astronomy'][0]['sunset'], 'source': '备用API' } hourly_forecast = data['weather'][0]['hourly'] else: # 原始wttr.in数据解析 current = data['current_condition'][0] area = data['nearest_area'][0] weather_info = { 'city': area['areaName'][0]['value'], 'region': area['region'][0]['value'], 'country': area['country'][0]['value'], 'temp_c': current['temp_C'], 'temp_f': current['temp_F'], 'feelslike_c': current['FeelsLikeC'], 'humidity': current['humidity'], 'weather_desc': current['lang_zh'][0]['value'], 'wind_speed': current['windspeedKmph'], 'wind_dir': current['winddir16Point'], 'pressure': current['pressure'], 'visibility': current['visibility'], 'cloud_cover': current['cloudcover'], 'sunrise': data['weather'][0]['astronomy'][0]['sunrise'], 'sunset': data['weather'][0]['astronomy'][0]['sunset'], 'source': '主API' } # 获取未来几小时预报 hourly_forecast = [] for hour in data['weather'][0]['hourly'][:4]: hourly_forecast.append({ 'time': hour['time'], 'temp': hour['tempC'], 'weather': hour['lang_zh'][0]['value'], 'chance_of_rain': hour.get('chanceofrain', '0') }) return weather_info, hourly_forecast def create_weather_chart(self, weather_info, hourly_forecast): """创建气象图表""" if not weather_info: print("无有效天气数据,无法生成图表") return # 创建图表 fig = plt.figure(figsize=(15, 10)) fig.suptitle( f'{weather_info["city"]}天气状况 - 数据来源: {weather_info.get("source", "主API")}\n' f'{datetime.now().strftime("%Y-%m-%d %H:%M")}', fontsize=16, fontweight='bold', y=0.98 ) # 1. 主信息面板 ax1 = plt.subplot(2, 2, 1) ax1.axis('off') # 天气图标映射 weather_icons = { '晴': '☀️', '多云': '⛅', '阴': '☁️', '雨': '🌧️', '雪': '❄️', '雷': '⛈️', '雾': '🌫️', '风': '💨', '小雨': '🌦️', '中雨': '🌧️', '大雨': '⛈️' } current_weather = weather_info['weather_desc'] icon = '☀️' for key in weather_icons: if key in current_weather: icon = weather_icons[key] break info_text = ( f"{icon}{current_weather}\n\n" f"🌡️ 温度: {weather_info['temp_c']}°C (体感{weather_info['feelslike_c']}°C)\n" f"💨 风力: {weather_info['wind_speed']} km/h {weather_info['wind_dir']}\n" f"💧 湿度: {weather_info['humidity']}%\n" f"📊 气压: {weather_info['pressure']} hPa\n" f"👁️ 能见度: {weather_info['visibility']} km\n" f"☁️ 云量: {weather_info['cloud_cover']}%\n" f"🌅 日出: {weather_info['sunrise']} | 🌇 日落: {weather_info['sunset']}\n" f"📍 {weather_info['city']}{weather_info['region']}{weather_info['country']}" ) ax1.text(0.1, 0.95, info_text, fontsize=12, verticalalignment='top', bbox=dict(boxstyle="round,pad=0.8", facecolor="lightblue", alpha=0.7)) # 2. 温度计图 ax2 = plt.subplot(2, 2, 2) temp_c = float(weather_info['temp_c']) if weather_info['temp_c'] != 'N/A' else 20 feelslike_c = float(weather_info['feelslike_c']) if weather_info['feelslike_c'] != 'N/A' else temp_c categories = ['实际温度', '体感温度'] values = [temp_c, feelslike_c] # 根据温度选择颜色 if temp_c > 30: colors = ['#FF4500', '#FF8C00'] # 炎热 elif temp_c > 20: colors = ['#FFD700', '#FFA500'] # 温暖 elif temp_c > 10: colors = ['#90EE90', '#32CD32'] # 凉爽 else: colors = ['#87CEEB', '#1E90FF'] # 寒冷 bars = ax2.bar(categories, values, color=colors, edgecolor='black', alpha=0.8) ax2.set_ylabel('温度 (°C)', fontsize=12) ax2.set_title('温度对比', fontsize=14, fontweight='bold') ax2.grid(axis='y', linestyle='--', alpha=0.3) # 添加数值标签 for bar, value in zip(bars, values): height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height + 0.5, f'{value:.1f}°C', ha='center', va='bottom', fontweight='bold') ax2.set_ylim(0, max(values) * 1.3 if max(values) > 0 else 30) # 3. 气象指标雷达图 ax3 = plt.subplot(2, 2, 3, projection='polar') categories = ['温度', '湿度', '风力', '气压', '能见度', '云量'] N = len(categories) # 数据归一化处理 try: temp_norm = min(abs(temp_c) / 40, 1) if temp_c != 'N/A' else 0.5 humidity_norm = float(weather_info['humidity']) / 100 if weather_info['humidity'] != 'N/A' else 0.5 wind_norm = min(float(weather_info['wind_speed']) / 50, 1) if weather_info['wind_speed'] != 'N/A' else 0.3 pressure_norm = min(float(weather_info['pressure']) / 1100, 1) if weather_info['pressure'] != 'N/A' else 0.8 visibility_norm = min(float(weather_info['visibility']) / 20, 1) if weather_info['visibility'] != 'N/A' else 0.7 cloud_norm = float(weather_info['cloud_cover']) / 100 if weather_info['cloud_cover'] != 'N/A' else 0.5 except: # 如果数据解析失败,使用默认值 temp_norm, humidity_norm, wind_norm, pressure_norm, visibility_norm, cloud_norm = 0.5, 0.5, 0.3, 0.8, 0.7, 0.5 values = [temp_norm, humidity_norm, wind_norm, pressure_norm, visibility_norm, cloud_norm] values += values[:1] angles = [n / float(N) * 2 * 3.14159 for n in range(N)] angles += angles[:1] ax3.plot(angles, values, 'o-', linewidth=2, color='purple', alpha=0.7) ax3.fill(angles, values, alpha=0.3, color='purple') ax3.set_xticks(angles[:-1]) ax3.set_xticklabels(categories, fontsize=10) ax3.set_title('气象指标雷达图', fontsize=14, fontweight='bold', pad=20) ax3.set_ylim(0, 1) # 4. 小时预报 ax4 = plt.subplot(2, 2, 4) if hourly_forecast and len(hourly_forecast) > 0: hours = [f"{hour['time']}时" for hour in hourly_forecast] temps = [] for hour in hourly_forecast: try: temps.append(float(hour['temp'])) except: temps.append(temp_c) # 使用当前温度作为备选 # 创建温度曲线 ax4.plot(hours, temps, 'o-', linewidth=2, color='red', alpha=0.7, label='温度') ax4.fill_between(hours, temps, alpha=0.2, color='red') # 添加天气图标 for i, hour in enumerate(hourly_forecast): weather = hour['weather'] icon = '☀️' for key in weather_icons: if key in weather: icon = weather_icons[key] break ax4.text(i, temps[i] + (max(temps)-min(temps))*0.1, icon, ha='center', va='bottom', fontsize=16) ax4.set_xlabel('时间', fontsize=12) ax4.set_ylabel('温度 (°C)', fontsize=12) ax4.set_title('未来几小时温度预报', fontsize=14, fontweight='bold') ax4.grid(True, linestyle='--', alpha=0.3) ax4.legend() # 设置y轴范围 temp_min, temp_max = min(temps), max(temps) ax4.set_ylim(temp_min - 2, temp_max + 3) else: ax4.text(0.5, 0.5, '无小时预报数据', ha='center', va='center', fontsize=14) ax4.set_title('小时预报', fontsize=14, fontweight='bold') plt.tight_layout() plt.show() def run(self): """运行主程序""" print("=" * 60) print(" 智能天气查询系统") print("=" * 60) print("功能特点:") print(" • 双API备份(主API失败自动切换备用API)") print(" • 自动重试机制(网络波动时自动重试)") print(" • 完整气象图表(温度、雷达图、小时预报)") print(" • 中文支持(城市名、天气描述全中文化)") print("=" * 60) print("支持的城市示例:") print(" • 直接输入: 北京、上海、济南、纽约、london、paris") print(" • 支持拼音: beijing、shanghai、jinan") print(" • 输入 'quit' 或 'q' 退出程序") print("=" * 60) # 检查必要库 try: import requests import matplotlib except ImportError as e: print(f"❌ 缺少必要的库: {e}") print("请运行以下命令安装:") print(" pip install requests matplotlib") return while True: try: city = input("\n🌍 请输入城市名称: ").strip() if city.lower() in ['quit', 'exit', 'q']: print("\n感谢使用智能天气查询系统,再见!") break if not city: print("⚠️ 城市名称不能为空,请重新输入") continue print(f"\n🔍 正在查询 [{city}] 的天气,请稍候...") # 获取天气数据 data = self.get_weather_with_retry(city) if data: # 解析数据 weather_info, hourly_forecast = self.parse_weather_data(data) if weather_info: # 显示基本信息 print(f"\n✅ 查询成功!") print(f"📍 位置: {weather_info['city']}{weather_info.get('region', '')}") print(f"🌤️ 天气: {weather_info['weather_desc']}") print(f"🌡️ 温度: {weather_info['temp_c']}°C (体感{weather_info['feelslike_c']}°C)") print(f"💨 风速: {weather_info['wind_speed']} km/h {weather_info['wind_dir']}") print(f"💧 湿度: {weather_info['humidity']}%") # 询问是否显示图表 show_chart = input("\n📊 是否显示详细气象图表? (y/n): ").strip().lower() if show_chart in ['y', 'yes', '是', '']: print("正在生成图表,请稍候...") self.create_weather_chart(weather_info, hourly_forecast) else: print("已跳过图表显示") else: print("❌ 解析天气数据失败") else: print("❌ 无法获取天气信息,请检查:") print(" 1. 城市名称是否正确") print(" 2. 网络连接是否正常") print(" 3. 稍后重试") except KeyboardInterrupt: print("\n\n程序被用户中断,退出...") break except Exception as e: print(f"\n❌ 程序运行出错: {e}") print("请重试或联系开发者")# ==================== 主程序入口 ====================def main(): """程序主函数""" print("正在启动智能天气查询系统...") # 创建天气仪表板实例 dashboard = WeatherDashboard() # 运行主程序 dashboard.run()# ==================== 程序启动 ====================if __name__ == "__main__": main()