最近应公司需求,抓取塑化行情价格数据,以此开发了一个基于 Python + Selenium + MySQL 的自动化爬虫脚本工具,用于抓取塑化行业的价格数据。根据在塑化行业(塑料、橡胶、化工原料),价格波动频繁,数据来源分散且往往隐藏在需要登录的动态网页背后。手动收集不仅效率低下,还容易出错。本文将深入分析本次开发的这个脚本,本脚本基于 Python + Selenium + MySQL 的自动化爬虫实战案例。该方案能够自动登录专业行业网站(如隆众资讯等),动态解析表格数据,清洗数据映射后批量存入数据库,为后续的价格趋势分析提供坚实的数据基础。一、核心架构设计
本案例采用经典的 ECL (Extract-Convert-Load) 架构:
- 提取 (Extract):利用 Selenium 模拟浏览器行为,处理 JavaScript 渲染和登录验证。
- 转换 (Convert):通过正则与字符串处理清洗非结构化文本,并映射为标准数据库字段。
- 加载 (Load):使用数据库连接池 (PooledDB) 高效批量写入 MySQL。
二、代码深度解析
1. 依赖库与基础配置
引入代码脚本需求的插件,使在生产环境下能正常运行一致性。
import reimport timeimport loggingimport pymysqlfrom DBUtils.PooledDB import PooledDB # 数据库连接池from selenium import webdriver # 浏览器自动化核心# Selenium 异常、选项、等待等辅助模块from selenium.common.exceptions import TimeoutException, NoSuchElementExceptionfrom selenium.webdriver.firefox.options import Optionsfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.webdriver import FirefoxOptions# 日志配置:输出时间、模块名、日志级别、信息logging.basicConfig(level = logging.INFO, format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)
2. Spider 类初始化(__init__)
初始化Spider类
class Spider: def __init__(self, file=None, DBHelper=None, username=None, password=None): self.files = file # 配置文件路径(未实际使用) self.UserName = username # 登录用户名 self.PassWord = password # 登录密码 self.DB = DBHelper # 数据库操作实例 # 待爬取的URL列表:按区域+品类分类 self.spiderURLS = [ {'area': '东北地区', 'cate': 'HDPE','url': 'xxx'}, {'area': '华北地区', 'cate': 'HDPE','url': 'xxx'}, ]
3. 浏览器初始化(_init_browser)
def _init_browser(self, options): try: driver_path = r'C:\Program Files (x86)\chromedriver\geckodriver.exe' # Firefox驱动路径 driver = webdriver.Firefox(executable_path=driver_path, options=options) driver.set_page_load_timeout(60) # 页面加载超时60秒 return driver except Exception as e: logger.error(f"初始化浏览器失败: {e}") return None
def _parse_row(self, row_text): # 步骤1:按换行符分割每行文本(表格行的原始文本) lines = row_text.strip().split('\n') if len(lines) <= 1: return None # 步骤2:处理带空格的数字(如价格区间),拆分后合并到列表 processed_values = [] for item in lines: if ' ' in item: processed_values.extend(item.split()) else: processed_values.append(item) # 字段名映射:与解析后的列表一一对应 field_names = ['market', 'specification', 'standard', 'brand','olddate1', 'olddate2', 'olddate3', 'olddate4', 'olddate5','riseprice', 'unit', 'remarks'] # 构建字典:字段名→值 result = dict(zip(field_names, processed_values)) return result
1、market:市场名称、specification:规格、standard:标准、brand:品牌;
2、olddate1-olddate5:不同日期的价格(推测)、riseprice:涨跌价、unit:单位、remarks:备注。
(1)登录状态检查(check_login_status)
def check_login_status(self): try: # 等待「用户信息元素」出现,检查是否包含「会员中心」链接 user_info = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#header_menu_top_login"))) member_center = user_info.find_elements(By.LINK_TEXT, "【会员中心】") return len(member_center) > 0 # 存在则已登录 except (NoSuchElementException, TimeoutException, Exception) as e: # 异常处理:未找到元素、超时、其他错误均返回未登录 logger.warning/error(f"登录状态检查失败: {e}") return False
(2)执行登录(perform_login)
def perform_login(self): try: # 关闭弹窗→点击登录按钮→切换账号密码登录→输入账号/密码→点击登录 close_btn = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, "driver-close-btn"))) close_btn.click() time.sleep(2) login_button = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "点我登录"))) login_button.click() time.sleep(3) # 输入用户名(清空后输入) username_input = self.wait.until(EC.visibility_of_element_located((By.ID, "dialogUsername"))) username_input.clear() username_input.send_keys(self.UserName) time.sleep(2) # 输入密码 password_input = self.wait.until(EC.visibility_of_element_located((By.ID, "dialogPassword"))) password_input.clear() password_input.send_keys(self.PassWord) time.sleep(2) # 点击登录按钮 login_button = self.wait.until(EC.element_to_be_clickable((By.ID, "smsValid"))) login_button.click() time.sleep(10) return True except Exception as e: print(f"登录过程中发生错误: {e}") return False
6. 动态数据爬取(_get_dynamic_data)
def _get_dynamic_data(self, driver, area, cate, url): try: self.wait = WebDriverWait(driver, 20) # 显式等待20秒 driver.get(url) driver.refresh() # 刷新页面(确保加载完整) time.sleep(10) # 未登录则执行登录 if not self.check_login_status(): print("未检测到登录状态,尝试登录...") self.perform_login() # 定位表格行元素 rows = driver.find_elements(By.CSS_SELECTOR, 'table.n-data-table-table > tbody > tr') if not rows: logger.warning(f"未找到数据: {url}") return [] result = [] for row in rows: data = self._parse_row(row.text) # 解析每行数据 if not data: continue if data.get('olddate5') == 0: # 过滤无效数据 continue # 补充字段:来源、创建时间、区域、品类、价格类型 data['source'] = "隆众信息" data['created'] = str(time.strftime("%Y-%m-%d 08:20:00", time.localtime())) data['area'] = area data['cate'] = cate data['price_type'] = "市场价" result.append(data) return result except Exception as e: logger.error(f"获取动态数据失败: {e}") return []
7. 数据映射(_get_mapper_data)
def _get_mapper_data(self, data): try: result = [] for item in data: # 映射为数据库字段格式,增加字段存在性检查(get方法) mapped_item = { 'created': item.get('created', ''), 'creator': item.get('creator', '1'), 'source': item.get('source', ''), 'area': item.get('area', ''), 'varieties_name': item.get('cate', ''), 'product_name': item.get('specification', ''), 'price_type': item.get('price_type', ''), 'criterion': item.get('standard', ''), 'manufacturer': item.get('brand', ''), 'market': item.get('market', ''), 'min_price': item.get('olddate3', 0), # 最低价 'hig_price': item.get('olddate4', 0), # 最高价 'stream_price': item.get('olddate4', 0) if item.get('olddate5', 0) == 0 else item.get('olddate5', 0), # 主流价 'unit': item.get('unit', '元/吨'), 'rise': item.get('riseprice', 0), # 涨跌价 'rise_range': item.get('riseprice', 0), # 涨跌幅(复用涨跌价) 'remarks': item.get('remarks', '') } # 过滤主流价≤0的无效数据 ifint(mapped_item.get('stream_price')) <= 0: continue result.append(mapped_item) return result except (KeyError, Exception) as e: logger.error(f"数据映射异常: {e}") return []
8. 主程序入口(main)
def main(self): try: # 配置Firefox选项:指定浏览器路径、页面加载策略(eager:DOM加载完成即停止) options = FirefoxOptions() options.binary_location = r"C:\Program Files\Mozilla Firefox\firefox.exe" options.page_load_strategy = 'eager' # 初始化浏览器 driver = self._init_browser(options) try: # 遍历所有待爬取URL for item in self.spiderURLS: # 爬取数据→映射数据→插入数据库 retlut = self._get_dynamic_data(driver, item['area'], item['cate'], item['url']) data = self._get_mapper_data(retlut) print("data",data) if data: try: self.DB.insert('cmdb_plastify', data, batch=True) # 批量插入 except Exception as e: logger.error(f"数据插入失败: {e}") continue except TimeoutException: logger.warning(f"页面加载超时: {item['url']}") driver.execute_script("window.stop()") # 停止加载 finally: driver.quit() # 无论是否异常,关闭浏览器 except Exception as e: logger.error(f"爬虫异常: {str(e)}")
9. 程序执行入口
if __name__ == '__main__': file = r"E:/workspace/BigDevops/resource/down" username = "s03" password = "s23" Db = DatabaseManager() # 数据库操作实例(需自行实现) sp = Spider(file, Db, username, password) sp.main()
注意:DatabaseManager 类未在代码中实现,需包含 insert 方法(支持批量插入),否则会报错。
总结
源码库:
MYSQL: runSpideWebMySQL.py
https://gitee.com/zhiops/xecollect-script
如果需要进一步了解可以加:
