- 测试代码比业务代码还长:每个测试用例都要继承TestCase类,写一堆样板代码,维护成本居高不下
- 断言失败信息不明确:只知道测试失败了,但不知道具体哪里出错,需要手动打印调试信息
- 多环境测试配置混乱:开发、测试、生产环境需要不同配置,但无法优雅地管理测试参数
- 并行测试执行困难:大型测试套件运行缓慢,但手动拆分和管理并行执行复杂易错
- 测试报告难以解读:控制台输出杂乱无章,无法快速定位关键问题和趋势分析
如果你曾为这些问题头疼,那么今天的主角pytest模块正是你的救星!pytest模块是Python生态中最受欢迎的测试框架,它的设计哲学非常巧妙:让测试代码保持简洁直观,同时提供无限扩展能力来处理复杂场景。通过智能发现机制、原生断言支持、灵活的fixture系统和丰富的插件生态,它完美平衡了简单性和功能性。- Fixture机制的艺术:资源管理、依赖注入和作用域控制
- 至少3个实战应用场景,覆盖从单元测试到企业级自动化测试
pytest模块的核心定位非常明确:提供一套简洁、灵活、可扩展的测试框架,让开发者专注于测试逻辑而非框架约束。- 极简语法:测试用例就是普通函数,只需以test_前缀命名,无需继承任何基类
- 智能发现:自动发现测试文件和用例,遵循约定优于配置的原则
- 原生断言:直接使用Python的assert语句,无需记忆特殊的断言方法
- 插件架构:所有高级功能都通过插件实现,保持核心简洁的同时支持无限扩展
- 兼容并包:可以直接运行unittest编写的测试用例,降低迁移成本
这种设计让pytest成为Python测试领域的"瑞士军刀",既能满足简单的单元测试需求,又能构建复杂的企业级自动化测试体系。pytest模块虽然功能强大,但API设计非常精简,主要包含以下核心组件: | | | |
| | 自动发现test_*.py和*_test.py文件 | |
| | | |
| | | |
| | | |
| | 使用@pytest.mark.parametrize批量测试 | |
| | | |
| | | |
| | | |
pytest模块适用于广泛的测试场景,以下是具体指导原则:- 单元测试:快速编写简洁的测试用例,验证单个函数或方法
- API测试:结合requests等库,测试HTTP接口的正确性
- UI自动化:结合Selenium等工具,测试Web应用界面
- 断言友好性:原生assert语句比self.assertEqual更直观
- 灵活性:fixture系统比setUp/tearDown更强大灵活
- 旧项目:逐步迁移,pytest可以并行运行unittest用例
现在我们已经了解了pytest模块的整体设计,接下来让我们深入核心功能,掌握具体的使用方法和技巧。pytest最大的魅力在于其极简的语法设计。你不需要编写任何样板代码,只需遵循简单的命名约定。# test_basic.py - pytest会自动发现并运行此文件中的测试defadd(a: int, b: int) -> int:"""简单的加法函数,作为被测试对象"""return a + bdeftest_add_positive_numbers():"""测试正数相加""" result = add(2, 3)assert result == 5, f"预期 2+3=5,实际得到 {result}"deftest_add_negative_numbers():"""测试负数相加""" result = add(-1, -1)assert result == -2, f"预期 (-1)+(-1)=-2,实际得到 {result}"deftest_add_zero():"""测试零值相加""" result = add(0, 0)assert result == 0, f"预期 0+0=0,实际得到 {result}"deftest_add_mixed_numbers():"""测试正负数混合相加""" result = add(5, -3)assert result == 2, f"预期 5+(-3)=2,实际得到 {result}"
# 项目结构示例project/├── src/│ ├── calculator.py # 业务代码│ └── utils.py├── tests/│ ├── unit/ # pytest会自动发现此目录│ │ ├── test_calculator.py│ │ └── test_utils.py│ ├── integration/ # pytest会自动发现此目录│ │ └── test_api_integration.py│ └── conftest.py # 项目级别的fixture配置└── pytest.ini # 项目配置文件# 运行测试的不同方式"""# 运行所有测试pytest# 运行指定目录pytest tests/unit/# 运行指定文件pytest tests/unit/test_calculator.py# 运行指定测试函数pytest tests/unit/test_calculator.py::test_add_positive_numbers# 按名称筛选pytest -k "add" # 运行名称包含"add"的测试# 详细输出pytest -v# 显示print输出pytest -s"""
# test_calculator_class.py - 使用测试类组织相关测试classCalculator:"""模拟计算器类,作为被测试对象"""def__init__(self): self.memory = 0defadd(self, a: float, b: float) -> float:return a + bdefsubtract(self, a: float, b: float) -> float:return a - bdefmultiply(self, a: float, b: float) -> float:return a * bdefdivide(self, a: float, b: float) -> float:if b == 0:raise ValueError("除数不能为零")return a / bclassTestCalculator:"""测试类必须以Test开头"""defsetup_method(self):"""每个测试方法执行前调用,类似unittest的setUp""" self.calc = Calculator() print("测试方法开始执行...")defteardown_method(self):"""每个测试方法执行后调用,类似unittest的tearDown""" print("测试方法执行完成")deftest_addition(self):"""测试加法""" result = self.calc.add(10, 5)assert result == 15deftest_subtraction(self):"""测试减法""" result = self.calc.subtract(10, 5)assert result == 5deftest_multiplication(self):"""测试乘法""" result = self.calc.multiply(10, 5)assert result == 50deftest_division(self):"""测试除法""" result = self.calc.divide(10, 5)assert result == 2deftest_division_by_zero(self):"""测试除零异常"""import pytestwith pytest.raises(ValueError) as exc_info: self.calc.divide(10, 0)assert"除数不能为零"in str(exc_info.value)
pytest的断言系统是其核心优势之一。它扩展了Python的原生assert语句,提供了丰富的失败信息。# test_assertions.py - 展示pytest的各种断言方式deftest_basic_assertions():"""基础断言示例"""# 相等断言assert2 + 2 == 4# 不等断言assert2 + 2 != 5# 包含断言assert"hello"in"hello world"assert"world"notin"hello python"# 布尔断言assertTrueassertnotFalse# None断言 value = Noneassert value isNoneassert value isnot0# 类型断言assert isinstance("hello", str)assertnot isinstance(123, str)# 比较断言assert10 > 5assert5 < 10assert5 >= 5assert5 <= 5deftest_collection_assertions():"""集合类型断言"""# 列表断言 numbers = [1, 2, 3, 4, 5]assert len(numbers) == 5assert3in numbersassert10notin numbers# 字典断言 user = {"name": "Alice", "age": 30, "email": "alice@example.com"}assert"name"in userassert user["age"] == 30assert"admin"notin user# 集合断言 unique_numbers = {1, 2, 3, 4, 5}assert len(unique_numbers) == 5assert {1, 2}.issubset(unique_numbers)deftest_string_assertions():"""字符串断言""" text = "Hello, World!"# 字符串操作断言assert text.startswith("Hello")assert text.endswith("World!")assert"World"in textassert text.lower() == "hello, world!"assert text.upper() == "HELLO, WORLD!"# 正则表达式匹配import reassert re.match(r"Hello,.*!", text)
# test_advanced_assertions.py - 高级断言技巧deftest_assert_with_custom_message():"""带自定义错误信息的断言""" result = 2 + 2# 基础方式assert result == 4, f"加法结果错误,预期4,实际得到{result}"# 复杂条件 is_even = result % 2 == 0assert is_even, f"{result}不是偶数"# 多个条件组合assert result > 0and result < 10, f"{result}不在0-10范围内"deftest_assert_approx_equality():"""近似相等断言,适用于浮点数比较"""import pytestimport math# 浮点数直接比较可能失败 result = 0.1 + 0.2# assert result == 0.3 # 这会失败!# 使用approx进行近似比较assert result == pytest.approx(0.3)# 可以指定相对误差assert result == pytest.approx(0.3, rel=1e-3) # 相对误差0.1%# 或者绝对误差assert result == pytest.approx(0.3, abs=1e-9) # 绝对误差1e-9# 数学计算示例assert math.sin(math.pi) == pytest.approx(0, abs=1e-10)deftest_assert_exceptions():"""异常断言"""import pytest# 验证代码抛出特定异常with pytest.raises(ValueError): int("not_a_number")# 捕获异常对象并进行验证with pytest.raises(ValueError) as exc_info: int("invalid")# 验证异常信息assert"invalid literal"in str(exc_info.value)# 验证异常类型和消息with pytest.raises(ValueError, match="invalid literal"): int("invalid")# 验证不抛出异常try: result = int("123")except Exception: pytest.fail("不应抛出异常")deftest_assert_with_complex_objects():"""复杂对象断言"""classUser:def__init__(self, name: str, age: int): self.name = name self.age = agedef__eq__(self, other):ifnot isinstance(other, User):returnFalsereturn self.name == other.name and self.age == other.age# 对象相等性断言 user1 = User("Alice", 30) user2 = User("Alice", 30) user3 = User("Bob", 25)assert user1 == user2assert user1 != user3# 字典比较 data1 = {"users": [{"name": "Alice", "age": 30}]} data2 = {"users": [{"name": "Alice", "age": 30}]}assert data1 == data2# 集合比较 set1 = {1, 2, 3} set2 = {3, 2, 1} # 顺序不影响assert set1 == set2
Fixture是pytest最强大的特性之一,它提供了一种优雅的方式来管理测试所需的前置条件和资源。# test_fixture_basics.py - Fixture基础用法import pytestfrom datetime import datetime@pytest.fixturedefsample_data():"""基础fixture:返回测试数据"""return {"name": "Test User","age": 25,"email": "test@example.com","created_at": datetime.now() }deftest_with_fixture(sample_data):"""使用fixture的测试"""assert sample_data["name"] == "Test User"assert sample_data["age"] == 25assert"@"in sample_data["email"]@pytest.fixturedefsetup_database():"""带有setup和teardown的fixture""" print("\n=== 设置数据库连接 ===") connection = {"connected": True, "cursor": None}yield connection # 测试执行期间使用 print("=== 清理数据库资源 ===") connection["connected"] = Falsedeftest_database_operations(setup_database):"""测试数据库操作"""assert setup_database["connected"] isTrue# 模拟数据库操作 setup_database["cursor"] = "active_cursor"deftest_another_database_test(setup_database):"""另一个数据库测试"""assert setup_database["connected"] isTrue# 每个测试都会重新执行fixture
# test_fixture_scopes.py - Fixture作用域import pytestclassExecutionTracker:"""用于跟踪执行次数的类""" count = 0@pytest.fixture(scope="function")deffunction_scoped_fixture():"""函数作用域:每个测试函数执行一次""" ExecutionTracker.count += 1return {"scope": "function", "execution_count": ExecutionTracker.count}@pytest.fixture(scope="class")defclass_scoped_fixture():"""类作用域:每个测试类执行一次""" ExecutionTracker.count += 1return {"scope": "class", "execution_count": ExecutionTracker.count}@pytest.fixture(scope="module")defmodule_scoped_fixture():"""模块作用域:每个测试模块执行一次""" ExecutionTracker.count += 1return {"scope": "module", "execution_count": ExecutionTracker.count}@pytest.fixture(scope="session")defsession_scoped_fixture():"""会话作用域:整个测试会话执行一次""" ExecutionTracker.count += 1return {"scope": "session", "execution_count": ExecutionTracker.count}# 测试类1classTestFixtureScopes1:deftest_function_scope1(self, function_scoped_fixture):assert function_scoped_fixture["scope"] == "function" print(f"函数作用域fixture执行: {function_scoped_fixture['execution_count']}")deftest_class_scope1(self, class_scoped_fixture):assert class_scoped_fixture["scope"] == "class" print(f"类作用域fixture执行: {class_scoped_fixture['execution_count']}")deftest_module_scope1(self, module_scoped_fixture):assert module_scoped_fixture["scope"] == "module" print(f"模块作用域fixture执行: {module_scoped_fixture['execution_count']}")deftest_session_scope1(self, session_scoped_fixture):assert session_scoped_fixture["scope"] == "session" print(f"会话作用域fixture执行: {session_scoped_fixture['execution_count']}")# 测试类2(在同一模块中)classTestFixtureScopes2:deftest_function_scope2(self, function_scoped_fixture):assert function_scoped_fixture["scope"] == "function" print(f"函数作用域fixture执行: {function_scoped_fixture['execution_count']}")deftest_class_scope2(self, class_scoped_fixture):assert class_scoped_fixture["scope"] == "class" print(f"类作用域fixture执行: {class_scoped_fixture['execution_count']}")deftest_module_scope2(self, module_scoped_fixture):assert module_scoped_fixture["scope"] == "module" print(f"模块作用域fixture执行: {module_scoped_fixture['execution_count']}")deftest_session_scope2(self, session_scoped_fixture):assert session_scoped_fixture["scope"] == "session" print(f"会话作用域fixture执行: {session_scoped_fixture['execution_count']}")
# test_parametrized_fixture.py - 参数化Fixtureimport pytest@pytest.fixture(params=["chrome", "firefox", "safari"])defbrowser(request):"""参数化fixture:测试不同的浏览器""" browser_name = request.param print(f"\n启动浏览器: {browser_name}")# 模拟浏览器配置 browser_config = {"name": browser_name,"version": "latest","headless": True,"timeout": 30 }yield browser_config print(f"关闭浏览器: {browser_name}")deftest_browser_initialization(browser):"""测试不同浏览器的初始化"""assert browser["name"] in ["chrome", "firefox", "safari"]assert browser["headless"] isTrueassert browser["timeout"] == 30 print(f"测试浏览器: {browser['name']}")@pytest.fixture(params=[ ("admin", "admin123"), ("user", "user123"), ("guest", "guest123")])defuser_credentials(request):"""参数化fixture:测试不同的用户凭据""" username, password = request.paramreturn {"username": username, "password": password}deftest_login_with_different_credentials(user_credentials):"""使用不同凭据测试登录""" username = user_credentials["username"] password = user_credentials["password"]# 模拟登录逻辑if username == "admin"and password == "admin123":assertTrue, "管理员登录成功"elif username == "user"and password == "user123":assertTrue, "普通用户登录成功"elif username == "guest"and password == "guest123":assertTrue, "访客登录成功"else:assertFalse, "登录失败" print(f"测试用户: {username}")
参数化测试是pytest的重要特性,允许使用不同的输入数据多次运行同一个测试。# test_parametrize_basics.py - 参数化测试基础import pytest# 被测试函数defis_even(number: int) -> bool:"""判断数字是否为偶数"""return number % 2 == 0# 基础参数化@pytest.mark.parametrize("number,expected", [ (2, True), # 偶数 (4, True), # 偶数 (6, True), # 偶数 (1, False), # 奇数 (3, False), # 奇数 (5, False), # 奇数 (0, True), # 0是偶数 (-2, True), # 负偶数 (-3, False), # 负奇数])deftest_is_even_basic(number, expected):"""测试is_even函数""" result = is_even(number)assert result == expected, f"is_even({number})应返回{expected},实际返回{result}" print(f"✓ is_even({number}) = {result}")# 参数化结合fixture@pytest.fixturedefuser_data():return {"name": "Test User", "active": True}@pytest.mark.parametrize("age,expected_status", [ (18, "adult"), (17, "minor"), (65, "senior"), (30, "adult"),])deftest_user_age_classification(user_data, age, expected_status):"""测试用户年龄分类""" user_data["age"] = age# 模拟分类逻辑if age >= 65: status = "senior"elif age >= 18: status = "adult"else: status = "minor"assert status == expected_status print(f"用户{age}岁:分类为{status}")
# test_advanced_parametrize.py - 高级参数化技巧import pytestimport math# 多个参数组合@pytest.mark.parametrize("a", [1, 2, 3])@pytest.mark.parametrize("b", [4, 5, 6])deftest_multiple_parametrization(a, b):"""多个参数化装饰器,生成所有组合""" result = a * b expected = a * b # 这里只是示例assert result == expected print(f"{a} × {b} = {result}")# 使用fixture生成参数defgenerate_test_data():"""生成测试数据的函数"""return [ (1, 1, 2), (2, 3, 5), (-1, -1, -2), (0, 0, 0), (100, 200, 300), ]@pytest.mark.parametrize("a,b,expected", generate_test_data(), ids=[f"case_{i}"for i in range(len(generate_test_data()))])deftest_with_generated_data(a, b, expected):"""使用生成的测试数据""" result = a + bassert result == expected print(f"{a} + {b} = {result} (预期: {expected})")# 参数化类和fixture组合classMathOperations:"""数学运算类""" @staticmethoddefadd(a, b):return a + b @staticmethoddefmultiply(a, b):return a * b@pytest.fixturedefmath_ops():return MathOperations()@pytest.mark.parametrize("operation, a, b, expected", [ ("add", 2, 3, 5), ("add", -1, 1, 0), ("multiply", 2, 3, 6), ("multiply", -2, 3, -6),])deftest_math_operations(math_ops, operation, a, b, expected):"""测试不同的数学运算"""if operation == "add": result = math_ops.add(a, b)elif operation == "multiply": result = math_ops.multiply(a, b)else: pytest.fail(f"未知操作: {operation}")assert result == expected print(f"{operation}({a}, {b}) = {result}")
pytest的标记系统允许对测试进行分类、过滤和控制执行方式。# test_markers.py - 标记系统使用import pytestimport time# 自定义标记@pytest.mark.slowdeftest_slow_operation():"""标记为慢速测试""" time.sleep(2) # 模拟耗时操作assertTrue, "慢速测试完成"@pytest.mark.fastdeftest_fast_operation():"""标记为快速测试"""assert1 + 1 == 2, "快速测试完成"@pytest.mark.integrationdeftest_integration_scenario():"""标记为集成测试"""# 模拟集成测试场景 result = complex_integration_logic()assert result isnotNone@pytest.mark.unitdeftest_unit_scenario():"""标记为单元测试""" result = simple_unit_logic()assert result == "expected"# 条件标记@pytest.mark.skipif( condition=sys.platform == "win32", reason="在Windows上跳过此测试")deftest_platform_specific():"""平台特定测试"""assertTrue# 预期失败标记@pytest.mark.xfail( condition=sys.version_info < (3, 8), reason="Python 3.8以下版本预期失败", strict=True)deftest_feature_only_in_python38():"""只在Python 3.8+可用的特性""" result = new_feature_in_python38()assert result isnotNonedefcomplex_integration_logic():"""模拟复杂集成逻辑"""return {"status": "success", "data": "sample"}defsimple_unit_logic():"""模拟简单单元逻辑"""return"expected"
# test_marker_combinations.py - 标记组合使用import pytestimport sys# 多个标记组合@pytest.mark.slow@pytest.mark.integration@pytest.mark.databasedeftest_database_performance():"""结合多个标记的测试"""# 模拟数据库性能测试assertTrue, "数据库性能测试完成"@pytest.mark.fast@pytest.mark.unit@pytest.mark.securitydeftest_security_unit_test():"""安全相关的单元测试"""# 模拟安全测试assertTrue, "安全测试完成"# 动态标记deftest_dynamically_marked():"""动态添加标记的测试"""# 根据条件动态决定标记if some_condition(): pytest.fail("条件不满足")assertTrue# 通过conftest.py定义标记"""# conftest.py内容import pytestdef pytest_configure(config): """配置pytest标记""" config.addinivalue_line( "markers", "slow: 标记为慢速测试(运行时间>1秒)" ) config.addinivalue_line( "markers", "fast: 标记为快速测试(运行时间<0.1秒)" ) config.addinivalue_line( "markers", "integration: 集成测试" ) config.addinivalue_line( "markers", "unit: 单元测试" )"""# 运行特定标记的测试"""# 运行所有标记为fast的测试pytest -m fast# 运行所有标记为integration的测试pytest -m integration# 运行所有标记为slow但排除标记为database的测试pytest -m "slow and not database"# 运行所有标记为unit或fast的测试pytest -m "unit or fast""""defsome_condition():"""模拟条件检查"""returnFalse
pytest允许开发自定义断言和插件来扩展框架功能。# test_custom_assertions.py - 自定义断言import pytest# 自定义断言类classCustomAssertions:"""自定义断言方法""" @staticmethoddefassert_list_contains(actual_list, expected_item, msg=None):"""断言列表包含特定元素"""if expected_item notin actual_list:if msg isNone: msg = f"列表不包含 {expected_item}"raise AssertionError(msg) @staticmethoddefassert_dict_keys(dictionary, expected_keys, msg=None):"""断言字典包含特定键""" actual_keys = set(dictionary.keys()) expected_keys_set = set(expected_keys)ifnot expected_keys_set.issubset(actual_keys):if msg isNone: msg = f"字典缺少键: {expected_keys_set - actual_keys}"raise AssertionError(msg)# 使用自定义断言deftest_custom_assertions_usage():"""测试自定义断言的使用""" assertions = CustomAssertions()# 测试列表包含 numbers = [1, 2, 3, 4, 5] assertions.assert_list_contains(numbers, 3)# 测试字典键 user = {"name": "Alice", "age": 30, "email": "alice@example.com"} assertions.assert_dict_keys(user, ["name", "email"])# 测试失败情况try: assertions.assert_list_contains(numbers, 10) pytest.fail("应抛出AssertionError")except AssertionError as e:assert"列表不包含"in str(e)# 集成到pytest中@pytest.fixturedefassert_that():"""提供自定义断言方法的fixture"""return CustomAssertions()deftest_with_assert_that_fixture(assert_that):"""使用fixture提供自定义断言""" data = {"id": 1, "name": "Test"} assert_that.assert_dict_keys(data, ["id", "name"])
# test_coverage_example.py - 测试覆盖率示例import math# 被测试的业务代码classStatisticsCalculator:"""统计计算器"""defmean(self, numbers):"""计算平均值"""ifnot numbers:raise ValueError("数字列表不能为空")return sum(numbers) / len(numbers)defmedian(self, numbers):"""计算中位数"""ifnot numbers:raise ValueError("数字列表不能为空") sorted_numbers = sorted(numbers) n = len(sorted_numbers)if n % 2 == 1:return sorted_numbers[n // 2]else: mid1 = sorted_numbers[n // 2 - 1] mid2 = sorted_numbers[n // 2]return (mid1 + mid2) / 2defmode(self, numbers):"""计算众数"""ifnot numbers:raise ValueError("数字列表不能为空") frequency = {}for num in numbers: frequency[num] = frequency.get(num, 0) + 1 max_freq = max(frequency.values()) modes = [num for num, freq in frequency.items() if freq == max_freq]return modes if len(modes) > 1else modes[0]defstandard_deviation(self, numbers):"""计算标准差"""if len(numbers) < 2:raise ValueError("至少需要两个数字") mean_val = self.mean(numbers) variance = sum((x - mean_val) ** 2for x in numbers) / (len(numbers) - 1)return math.sqrt(variance)# 测试覆盖率deftest_statistics_calculator():"""测试统计计算器""" calc = StatisticsCalculator()# 测试平均值assert calc.mean([1, 2, 3, 4, 5]) == 3assert calc.mean([10, 20, 30]) == 20# 测试异常情况import pytestwith pytest.raises(ValueError): calc.mean([])# 测试中位数assert calc.median([1, 2, 3, 4, 5]) == 3assert calc.median([1, 2, 3, 4]) == 2.5# 测试众数assert calc.mode([1, 2, 2, 3, 4]) == 2 result = calc.mode([1, 2, 2, 3, 3, 4])assert2in result and3in result# 测试标准差 numbers = [1, 2, 3, 4, 5] std_dev = calc.standard_deviation(numbers)assert abs(std_dev - 1.5811) < 0.0001# 运行覆盖率测试的命令"""# 安装覆盖率插件pip install pytest-cov# 运行测试并生成覆盖率报告pytest --cov=statistics_calculator test_coverage_example.py# 生成HTML报告pytest --cov=statistics_calculator --cov-report=html test_coverage_example.py# 生成XML报告(用于CI/CD集成)pytest --cov=statistics_calculator --cov-report=xml test_coverage_example.py# 指定最小覆盖率阈值pytest --cov=statistics_calculator --cov-fail-under=80 test_coverage_example.py"""
在现代Web开发中,API接口测试是确保系统稳定性的关键。pytest可以构建强大的API自动化测试框架。- 支持多种认证方式(Token、OAuth、Basic Auth)
# test_api_framework.py - API自动化测试框架import pytestimport requestsimport jsonfrom datetime import datetimefrom typing import Dict, Any, OptionalclassAPITestClient:"""API测试客户端"""def__init__(self, base_url: str, timeout: int = 10): self.base_url = base_url.rstrip('/') self.timeout = timeout self.session = requests.Session() self.headers = {"Content-Type": "application/json","User-Agent": "pytest-api-framework/1.0" }defset_auth_token(self, token: str):"""设置认证令牌""" self.headers["Authorization"] = f"Bearer {token}"defget(self, endpoint: str, params: Optional[Dict] = None, **kwargs):"""发送GET请求""" url = f"{self.base_url}/{endpoint.lstrip('/')}" response = self.session.get( url, params=params, headers=self.headers, timeout=self.timeout, **kwargs )return self._process_response(response)defpost(self, endpoint: str, data: Optional[Dict] = None, **kwargs):"""发送POST请求""" url = f"{self.base_url}/{endpoint.lstrip('/')}" response = self.session.post( url, json=data, headers=self.headers, timeout=self.timeout, **kwargs )return self._process_response(response)defput(self, endpoint: str, data: Optional[Dict] = None, **kwargs):"""发送PUT请求""" url = f"{self.base_url}/{endpoint.lstrip('/')}" response = self.session.put( url, json=data, headers=self.headers, timeout=self.timeout, **kwargs )return self._process_response(response)defdelete(self, endpoint: str, **kwargs):"""发送DELETE请求""" url = f"{self.base_url}/{endpoint.lstrip('/')}" response = self.session.delete( url, headers=self.headers, timeout=self.timeout, **kwargs )return self._process_response(response)def_process_response(self, response: requests.Response) -> Dict[str, Any]:"""处理响应""" result = {"status_code": response.status_code,"headers": dict(response.headers),"elapsed": response.elapsed.total_seconds() }try: result["data"] = response.json()except json.JSONDecodeError: result["data"] = response.textreturn result# 测试fixtures@pytest.fixture(scope="session")defapi_client():"""API客户端fixture"""# 实际项目中从环境变量或配置文件读取 base_url = "https://api.example.com" client = APITestClient(base_url=base_url, timeout=15)# 模拟登录获取token login_data = {"username": "testuser", "password": "testpass"} response = client.post("/auth/login", data=login_data)if response["status_code"] == 200: token = response["data"].get("access_token") client.set_auth_token(token)yield client# 清理资源 client.session.close()@pytest.fixturedeftest_user_data():"""测试用户数据""" timestamp = datetime.now().strftime("%Y%m%d%H%M%S")return {"username": f"testuser_{timestamp}","email": f"test_{timestamp}@example.com","password": "TestPass123!","first_name": "Test","last_name": "User" }# API测试用例classTestUserAPI:"""用户API测试"""deftest_user_registration(self, api_client, test_user_data):"""测试用户注册""" response = api_client.post("/users/register", data=test_user_data)assert response["status_code"] == 201assert"id"in response["data"]assert response["data"]["username"] == test_user_data["username"]assert response["data"]["email"] == test_user_data["email"]# 密码不应在响应中返回assert"password"notin response["data"] print(f"✓ 用户注册成功: {test_user_data['username']}")deftest_user_login(self, api_client, test_user_data):"""测试用户登录"""# 先注册用户 api_client.post("/users/register", data=test_user_data)# 测试登录 login_data = {"username": test_user_data["username"],"password": test_user_data["password"] } response = api_client.post("/auth/login", data=login_data)assert response["status_code"] == 200assert"access_token"in response["data"]assert"refresh_token"in response["data"]assert"expires_in"in response["data"] print(f"✓ 用户登录成功: {test_user_data['username']}")deftest_get_user_profile(self, api_client):"""测试获取用户资料""" response = api_client.get("/users/profile")assert response["status_code"] == 200assert"username"in response["data"]assert"email"in response["data"]assert"created_at"in response["data"] print("✓ 获取用户资料成功") @pytest.mark.parametrize("invalid_data,expected_error", [ ({"username": "", "email": "test@example.com"}, "用户名不能为空"), ({"username": "test", "email": "invalid-email"}, "邮箱格式错误"), ({"username": "test", "email": "test@example.com", "password": "weak"}, "密码强度不足"), ])deftest_user_registration_validation(self, api_client, invalid_data, expected_error):"""测试用户注册验证""" response = api_client.post("/users/register", data=invalid_data)assert response["status_code"] == 400assert"error"in response["data"]assert expected_error in response["data"]["error"] print(f"✓ 验证错误处理: {expected_error}")classTestProductAPI:"""产品API测试""" @pytest.fixturedefsample_product(self):"""示例产品数据"""return {"name": "Test Product","description": "A test product for API testing","price": 99.99,"stock": 100,"category": "electronics" }deftest_create_product(self, api_client, sample_product):"""测试创建产品""" response = api_client.post("/products", data=sample_product)assert response["status_code"] == 201assert response["data"]["name"] == sample_product["name"]assert response["data"]["price"] == sample_product["price"] print(f"✓ 产品创建成功: {sample_product['name']}")deftest_get_product_list(self, api_client):"""测试获取产品列表""" response = api_client.get("/products")assert response["status_code"] == 200assert isinstance(response["data"], list)if response["data"]: product = response["data"][0]assert"id"in productassert"name"in product print(f"✓ 获取产品列表成功,数量: {len(response['data'])}")deftest_get_product_detail(self, api_client):"""测试获取产品详情"""# 先获取产品列表中的第一个产品 list_response = api_client.get("/products")if list_response["data"]: product_id = list_response["data"][0]["id"] detail_response = api_client.get(f"/products/{product_id}")assert detail_response["status_code"] == 200assert detail_response["data"]["id"] == product_id print(f"✓ 获取产品详情成功: ID={product_id}")else: pytest.skip("没有产品可用于测试") @pytest.mark.performancedeftest_api_response_time(self, api_client):"""测试API响应时间"""import time start_time = time.time() response = api_client.get("/products") end_time = time.time() response_time = end_time - start_timeassert response["status_code"] == 200assert response_time < 2.0, f"API响应时间过长: {response_time:.2f}秒" print(f"✓ API响应时间正常: {response_time:.3f}秒")# 配置文件示例"""# pytest.ini[pytest]markers = performance: 性能测试 integration: 集成测试 smoke: 冒烟测试addopts = -v --tb=short --strict-markerstestpaths = testspython_files = test_*.pypython_classes = Test*python_functions = test_*"""deftest_api_framework_summary():"""API测试框架总结""" print("\n=== API自动化测试框架特性总结 ===") print("1. 统一的测试客户端: 封装HTTP请求处理") print("2. 灵活的fixture系统: 管理测试环境和认证") print("3. 参数化测试支持: 覆盖多种测试场景") print("4. 详细的断言验证: 验证响应状态、数据和结构") print("5. 性能测试集成: 监控API响应时间") print("6. 可配置的执行控制: 通过标记筛选测试") print("7. 丰富的报告输出: 支持多种报告格式")if __name__ == "__main__": test_api_framework_summary()
- 可维护性:通过fixture管理测试环境和资源,减少重复代码
- 持续集成:与Jenkins、GitLab CI等工具无缝集成
- 契约测试:验证API实现符合OpenAPI/Swagger规范
在微服务架构中,服务间的集成测试尤为重要。pytest可以构建全面的微服务集成测试套件。- 验证服务间通信协议(REST、gRPC、消息队列)
# test_microservices_integration.py - 微服务集成测试import pytestimport requestsimport jsonimport timefrom typing import Dict, Any, Listfrom dataclasses import dataclassfrom datetime import datetime, timedelta@dataclassclassServiceConfig:"""服务配置""" name: str base_url: str health_endpoint: str = "/health" timeout: int = 10classMicroservicesTestEnvironment:"""微服务测试环境"""def__init__(self, services: List[ServiceConfig]): self.services = services self.clients = {}for service in services: self.clients[service.name] = requests.Session()defwait_for_services(self, timeout: int = 60):"""等待所有服务就绪""" start_time = time.time()while time.time() - start_time < timeout: all_ready = Truefor service in self.services:try: response = self.clients[service.name].get(f"{service.base_url}{service.health_endpoint}", timeout=5 )if response.status_code != 200: all_ready = False print(f"服务 {service.name} 未就绪: HTTP {response.status_code}")except Exception as e: all_ready = False print(f"服务 {service.name} 连接失败: {e}")if all_ready: print("所有微服务已就绪")returnTrue time.sleep(2)raise TimeoutError("等待微服务就绪超时")defcall_service(self, service_name: str, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:"""调用微服务"""if service_name notin self.clients:raise ValueError(f"未知服务: {service_name}") service = next(s for s in self.services if s.name == service_name) url = f"{service.base_url}{endpoint}" method_func = getattr(self.clients[service_name], method.lower()) response = method_func(url, timeout=service.timeout, **kwargs) result = {"status_code": response.status_code,"headers": dict(response.headers),"elapsed": response.elapsed.total_seconds() }try: result["data"] = response.json()except json.JSONDecodeError: result["data"] = response.textreturn resultdefclose(self):"""关闭所有客户端"""for client in self.clients.values(): client.close()# 测试fixtures@pytest.fixture(scope="session")defmicroservices_env():"""微服务测试环境fixture"""# 实际项目中从配置读取服务信息 services = [ ServiceConfig( name="user_service", base_url="http://localhost:8001", health_endpoint="/health" ), ServiceConfig( name="product_service", base_url="http://localhost:8002", health_endpoint="/api/health" ), ServiceConfig( name="order_service", base_url="http://localhost:8003", health_endpoint="/status" ), ServiceConfig( name="payment_service", base_url="http://localhost:8004", health_endpoint="/health" ) ] env = MicroservicesTestEnvironment(services)# 等待服务就绪try: env.wait_for_services(timeout=30)except TimeoutError: pytest.skip("微服务环境未就绪,跳过集成测试")yield env env.close()@pytest.fixturedeftest_user(microservices_env):"""测试用户"""# 创建测试用户 user_data = {"username": f"testuser_{int(time.time())}","email": f"test_{int(time.time())}@example.com","password": "TestPass123!" } response = microservices_env.call_service("user_service","POST","/users/register", json=user_data )assert response["status_code"] in [200, 201] user_id = response["data"]["id"]# 获取认证token login_response = microservices_env.call_service("user_service","POST","/auth/login", json={"username": user_data["username"],"password": user_data["password"] } ) token = login_response["data"]["access_token"]return {"id": user_id,"username": user_data["username"],"email": user_data["email"],"token": token }# 微服务集成测试classTestECommerceIntegration:"""电商微服务集成测试"""deftest_complete_order_workflow(self, microservices_env, test_user):"""测试完整的订单工作流""" print("\n=== 测试完整订单工作流 ===")# 1. 创建产品 product_data = {"name": "Integration Test Product","description": "Product for integration testing","price": 49.99,"stock": 50,"category": "test" } create_product_response = microservices_env.call_service("product_service","POST","/products", json=product_data )assert create_product_response["status_code"] == 201 product_id = create_product_response["data"]["id"] print(f"✓ 产品创建成功: ID={product_id}")# 2. 创建购物车 cart_data = {"user_id": test_user["id"],"items": [ {"product_id": product_id,"quantity": 2,"unit_price": product_data["price"] } ] }# 设置认证header headers = {"Authorization": f"Bearer {test_user['token']}"} create_cart_response = microservices_env.call_service("order_service","POST","/carts", json=cart_data, headers=headers )assert create_cart_response["status_code"] == 201 cart_id = create_cart_response["data"]["id"] print(f"✓ 购物车创建成功: ID={cart_id}")# 3. 创建订单 order_data = {"cart_id": cart_id,"shipping_address": "123 Test Street, Test City","payment_method": "credit_card" } create_order_response = microservices_env.call_service("order_service","POST","/orders", json=order_data, headers=headers )assert create_order_response["status_code"] == 201 order_id = create_order_response["data"]["id"] order_status = create_order_response["data"]["status"] print(f"✓ 订单创建成功: ID={order_id}, 状态={order_status}")# 4. 处理支付 payment_data = {"order_id": order_id,"amount": 99.98, # 2 * 49.99"currency": "USD","payment_method": "credit_card" } process_payment_response = microservices_env.call_service("payment_service","POST","/payments", json=payment_data, headers=headers )assert process_payment_response["status_code"] == 200 payment_status = process_payment_response["data"]["status"] print(f"✓ 支付处理成功: 状态={payment_status}")# 5. 验证订单状态更新 time.sleep(1) # 等待事件处理 get_order_response = microservices_env.call_service("order_service","GET",f"/orders/{order_id}", headers=headers )assert get_order_response["status_code"] == 200 updated_order_status = get_order_response["data"]["status"] print(f"✓ 订单状态已更新: 新状态={updated_order_status}")# 6. 验证产品库存更新 get_product_response = microservices_env.call_service("product_service","GET",f"/products/{product_id}" )assert get_product_response["status_code"] == 200 updated_stock = get_product_response["data"]["stock"]assert updated_stock == 48# 原始50 - 购买2 print(f"✓ 产品库存已更新: 新库存={updated_stock}") print("✓ 完整订单工作流测试通过")return {"user_id": test_user["id"],"product_id": product_id,"cart_id": cart_id,"order_id": order_id }deftest_order_compensation_on_payment_failure(self, microservices_env, test_user):"""测试支付失败时的订单补偿机制""" print("\n=== 测试支付失败补偿机制 ===")# 创建产品 product_data = {"name": "Compensation Test Product","description": "Product for compensation testing","price": 29.99,"stock": 10,"category": "test" } create_product_response = microservices_env.call_service("product_service","POST","/products", json=product_data ) product_id = create_product_response["data"]["id"] initial_stock = create_product_response["data"]["stock"] print(f"✓ 产品创建成功: ID={product_id}, 初始库存={initial_stock}")# 创建订单(模拟库存预留) order_data = {"user_id": test_user["id"],"items": [ {"product_id": product_id,"quantity": 3,"unit_price": product_data["price"] } ],"shipping_address": "456 Test Ave, Test Town" } headers = {"Authorization": f"Bearer {test_user['token']}"} create_order_response = microservices_env.call_service("order_service","POST","/orders", json=order_data, headers=headers ) order_id = create_order_response["data"]["id"] print(f"✓ 订单创建成功: ID={order_id}")# 模拟支付失败 payment_data = {"order_id": order_id,"amount": 89.97, # 3 * 29.99"currency": "USD","payment_method": "insufficient_funds" } process_payment_response = microservices_env.call_service("payment_service","POST","/payments/fail", json=payment_data, headers=headers )assert process_payment_response["status_code"] == 402# Payment Required print(f"✓ 支付失败模拟成功: HTTP {process_payment_response['status_code']}")# 等待补偿机制执行 time.sleep(2)# 验证订单状态为取消 get_order_response = microservices_env.call_service("order_service","GET",f"/orders/{order_id}", headers=headers ) order_status = get_order_response["data"]["status"]assert order_status == "cancelled" print(f"✓ 订单状态已更新为取消: {order_status}")# 验证库存已恢复 get_product_response = microservices_env.call_service("product_service","GET",f"/products/{product_id}" ) restored_stock = get_product_response["data"]["stock"]assert restored_stock == initial_stock # 库存应恢复 print(f"✓ 产品库存已恢复: 当前库存={restored_stock}") print("✓ 支付失败补偿机制测试通过") @pytest.mark.performancedeftest_concurrent_order_processing(self, microservices_env, test_user):"""测试并发订单处理能力""" print("\n=== 测试并发订单处理 ===")import concurrent.futuresimport random# 创建多个测试产品 products = []for i in range(5): product_data = {"name": f"Concurrent Test Product {i}","description": f"Product {i} for concurrent testing","price": random.uniform(10.0, 100.0),"stock": 100,"category": "test" } response = microservices_env.call_service("product_service","POST","/products", json=product_data ) products.append({"id": response["data"]["id"],"price": product_data["price"] }) print(f"创建了 {len(products)} 个测试产品")# 并发创建订单defcreate_concurrent_order(index):"""并发创建订单的函数""" product = random.choice(products) quantity = random.randint(1, 5) order_data = {"user_id": test_user["id"],"items": [ {"product_id": product["id"],"quantity": quantity,"unit_price": product["price"] } ],"shipping_address": f"Concurrent Test Address {index}" } headers = {"Authorization": f"Bearer {test_user['token']}"} response = microservices_env.call_service("order_service","POST","/orders", json=order_data, headers=headers )return {"index": index,"status_code": response["status_code"],"order_id": response["data"]["id"] if response["status_code"] == 201elseNone }# 使用线程池并发执行 start_time = time.time() successful_orders = 0 failed_orders = 0with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(create_concurrent_order, i) for i in range(20)]for future in concurrent.futures.as_completed(futures): result = future.result()if result["status_code"] == 201: successful_orders += 1 print(f" 订单 {result['index']} 创建成功: ID={result['order_id']}")else: failed_orders += 1 print(f" 订单 {result['index']} 创建失败: HTTP {result['status_code']}") end_time = time.time() total_time = end_time - start_time print(f"\n并发测试结果:") print(f" 总订单数: 20") print(f" 成功订单: {successful_orders}") print(f" 失败订单: {failed_orders}") print(f" 总耗时: {total_time:.2f}秒") print(f" 平均响应时间: {total_time/20:.2f}秒/订单")assert successful_orders >= 15, f"并发成功率不足: {successful_orders}/20"assert total_time < 10, f"并发处理时间过长: {total_time:.2f}秒" print("✓ 并发订单处理测试通过")deftest_microservices_integration_summary():"""微服务集成测试总结""" print("\n=== 微服务集成测试框架特性总结 ===") print("1. 多服务协调: 统一管理多个微服务的测试环境") print("2. 工作流验证: 测试完整的业务工作流和数据一致性") print("3. 补偿机制: 验证分布式事务失败时的补偿逻辑") print("4. 并发测试: 模拟多用户并发场景的性能验证") print("5. 错误处理: 测试异常情况和系统恢复能力") print("6. 可观测性: 集成监控和日志验证")if __name__ == "__main__": test_microservices_integration_summary()
在大数据处理中,ETL(抽取、转换、加载)流程的质量至关重要。pytest可以构建强大的数据质量验证框架。# test_data_quality_framework.py - 数据质量验证框架import pytestimport pandas as pdimport numpy as npfrom datetime import datetime, timedeltafrom typing import Dict, List, Any, Optional, Tupleimport jsonimport csvfrom dataclasses import dataclassfrom pathlib import Path@dataclassclassDataQualityRule:"""数据质量规则""" name: str description: str check_function: callable severity: str # "critical", "warning", "info"@dataclassclassDataValidationResult:"""数据验证结果""" rule_name: str passed: bool message: str details: Dict[str, Any] timestamp: datetimeclassDataQualityValidator:"""数据质量验证器"""def__init__(self): self.rules = {} self.results = []defadd_rule(self, rule: DataQualityRule):"""添加数据质量规则""" self.rules[rule.name] = ruledefvalidate_dataframe(self, df: pd.DataFrame, context: Optional[Dict] = None) -> List[DataValidationResult]:"""验证DataFrame数据质量""" results = []for rule_name, rule in self.rules.items():try: passed, message, details = rule.check_function(df, context) result = DataValidationResult( rule_name=rule_name, passed=passed, message=message, details=details, timestamp=datetime.now() ) results.append(result)ifnot passed and rule.severity == "critical": print(f"⚠️ 严重数据质量问题: {rule_name} - {message}")elifnot passed: print(f"ℹ️ 数据质量问题: {rule_name} - {message}")except Exception as e: error_result = DataValidationResult( rule_name=rule_name, passed=False, message=f"规则执行失败: {str(e)}", details={"error": str(e)}, timestamp=datetime.now() ) results.append(error_result) print(f"❌ 规则执行错误: {rule_name} - {str(e)}") self.results.extend(results)return resultsdefgenerate_report(self) -> Dict[str, Any]:"""生成数据质量报告""" total_rules = len(self.results) passed_rules = sum(1for r in self.results if r.passed) failed_rules = total_rules - passed_rules critical_issues = [ r for r in self.results ifnot r.passed and self.rules[r.rule_name].severity == "critical" ] report = {"timestamp": datetime.now().isoformat(),"summary": {"total_rules": total_rules,"passed_rules": passed_rules,"failed_rules": failed_rules,"pass_rate": passed_rules / total_rules if total_rules > 0else0,"critical_issues": len(critical_issues) },"details": [ {"rule_name": r.rule_name,"passed": r.passed,"message": r.message,"severity": self.rules[r.rule_name].severity,"timestamp": r.timestamp.isoformat() }for r in self.results ] }return report# 数据质量规则定义defcheck_null_values(df: pd.DataFrame, context: Optional[Dict] = None) -> Tuple[bool, str, Dict]:"""检查空值""" null_counts = df.isnull().sum() total_null = null_counts.sum() total_cells = df.sizeif total_null == 0:returnTrue, "没有空值", {"null_count": 0, "null_columns": []} null_columns = null_counts[null_counts > 0].index.tolist() null_percentage = (total_null / total_cells) * 100 message = f"发现{total_null}个空值({null_percentage:.2f}%),涉及列: {null_columns}"# 根据阈值判断严重性 threshold = context.get("null_threshold", 5.0) if context else5.0 passed = null_percentage <= thresholdreturn passed, message, {"null_count": total_null,"null_columns": null_columns,"null_percentage": null_percentage,"threshold": threshold }defcheck_data_types(df: pd.DataFrame, context: Optional[Dict] = None) -> Tuple[bool, str, Dict]:"""检查数据类型""" expected_types = context.get("expected_types", {}) if context else {} mismatches = []for column, expected_type in expected_types.items():if column in df.columns: actual_type = str(df[column].dtype)if actual_type != expected_type: mismatches.append({"column": column,"expected": expected_type,"actual": actual_type })ifnot mismatches:returnTrue, "所有列数据类型正确", {"mismatches": []} message = f"发现{len(mismatches)}列数据类型不匹配" details = {"mismatches": mismatches}# 严重性判断 critical_columns = context.get("critical_columns", []) if context else [] critical_mismatches = [ m for m in mismatches if m["column"] in critical_columns ] passed = len(critical_mismatches) == 0ifnot passed: message += f",关键列{mismatches[0]['column']}类型错误"return passed, message, detailsdefcheck_value_ranges(df: pd.DataFrame, context: Optional[Dict] = None) -> Tuple[bool, str, Dict]:"""检查数值范围""" range_rules = context.get("range_rules", {}) if context else {} violations = []for column, (min_val, max_val) in range_rules.items():if column in df.columns and df[column].dtype in ["int64", "float64"]: out_of_range = df[(df[column] < min_val) | (df[column] > max_val)]ifnot out_of_range.empty: violations.append({"column": column,"min_allowed": min_val,"max_allowed": max_val,"out_of_range_count": len(out_of_range),"min_actual": df[column].min(),"max_actual": df[column].max() })ifnot violations:returnTrue, "所有数值在允许范围内", {"violations": []} message = f"发现{len(violations)}列数值超出范围"returnFalse, message, {"violations": violations}defcheck_data_completeness(df: pd.DataFrame, context: Optional[Dict] = None) -> Tuple[bool, str, Dict]:"""检查数据完整性""" required_columns = context.get("required_columns", []) if context else [] missing_columns = []for column in required_columns:if column notin df.columns: missing_columns.append(column)if missing_columns: message = f"缺失必需列: {missing_columns}"returnFalse, message, {"missing_columns": missing_columns}# 检查数据行完整性 completeness_rate = 1.0 completeness_details = {}for column in df.columns: non_null_count = df[column].notnull().sum() total_count = len(df) column_completeness = non_null_count / total_count if total_count > 0else0 completeness_details[column] = {"non_null_count": non_null_count,"total_count": total_count,"completeness_rate": column_completeness } completeness_rate = min(completeness_rate, column_completeness)if completeness_rate >= 0.95: message = f"数据完整性良好,完整性率{completeness_rate:.2%}"returnTrue, message, completeness_detailselse: message = f"数据完整性不足,完整性率{completeness_rate:.2%}"returnFalse, message, completeness_details# 测试fixtures@pytest.fixture(scope="session")defdata_validator():"""数据验证器fixture""" validator = DataQualityValidator()# 添加数据质量规则 validator.add_rule(DataQualityRule( name="null_check", description="检查数据空值", check_function=check_null_values, severity="critical" )) validator.add_rule(DataQualityRule( name="type_check", description="检查数据类型", check_function=check_data_types, severity="warning" )) validator.add_rule(DataQualityRule( name="range_check", description="检查数值范围", check_function=check_value_ranges, severity="warning" )) validator.add_rule(DataQualityRule( name="completeness_check", description="检查数据完整性", check_function=check_data_completeness, severity="critical" ))yield validator@pytest.fixturedefsample_dataframe():"""示例DataFrame""" data = {"id": [1, 2, 3, 4, 5],"name": ["Alice", "Bob", "Charlie", None, "Eve"],"age": [25, 30, 35, 40, 45],"salary": [50000.0, 60000.0, 75000.0, 90000.0, 120000.0],"join_date": pd.date_range("2024-01-01", periods=5),"department": ["Engineering", "Sales", None, "HR", "Marketing"] }return pd.DataFrame(data)# 数据质量测试classTestDataQualityFramework:"""数据质量测试框架"""deftest_null_value_check(self, data_validator, sample_dataframe):"""测试空值检查""" print("\n=== 测试空值检查 ===") context = {"null_threshold": 10.0} # 10%空值阈值 results = data_validator.validate_dataframe(sample_dataframe, context) null_check_result = next(r for r in results if r.rule_name == "null_check")assert null_check_result.passed, f"空值检查失败: {null_check_result.message}" print(f"✓ 空值检查通过: {null_check_result.message}") print(f" 详细数据: {null_check_result.details}")deftest_data_type_check(self, data_validator, sample_dataframe):"""测试数据类型检查""" print("\n=== 测试数据类型检查 ===") context = {"expected_types": {"id": "int64","name": "object","age": "int64","salary": "float64","join_date": "datetime64[ns]","department": "object" },"critical_columns": ["id", "salary"] } results = data_validator.validate_dataframe(sample_dataframe, context) type_check_result = next(r for r in results if r.rule_name == "type_check")assert type_check_result.passed, f"数据类型检查失败: {type_check_result.message}" print(f"✓ 数据类型检查通过: {type_check_result.message}")deftest_value_range_check(self, data_validator, sample_dataframe):"""测试数值范围检查""" print("\n=== 测试数值范围检查 ===") context = {"range_rules": {"age": (18, 65),"salary": (30000.0, 150000.0) } } results = data_validator.validate_dataframe(sample_dataframe, context) range_check_result = next(r for r in results if r.rule_name == "range_check")assert range_check_result.passed, f"数值范围检查失败: {range_check_result.message}" print(f"✓ 数值范围检查通过: {range_check_result.message}")deftest_data_completeness_check(self, data_validator, sample_dataframe):"""测试数据完整性检查""" print("\n=== 测试数据完整性检查 ===") context = {"required_columns": ["id", "name", "age", "salary", "department"] } results = data_validator.validate_dataframe(sample_dataframe, context) completeness_check_result = next(r for r in results if r.rule_name == "completeness_check")# 由于有null值,这个检查应该失败assertnot completeness_check_result.passed, "数据完整性检查应失败" print(f"✓ 数据完整性检查正确识别问题: {completeness_check_result.message}")deftest_comprehensive_validation(self, data_validator):"""测试综合数据验证""" print("\n=== 测试综合数据验证 ===")# 创建包含各种数据质量问题的DataFrame data = {"user_id": [1, 2, 3, 4, 5, 6],"username": ["user1", None, "user3", "user4", "user5", "user6"],"email": ["user1@test.com", "user2@test.com", None, "user4", "user5@test.com", "user6@test.com"],"age": [15, 25, 35, 5, 150, 30], # 有超出范围的年龄"score": [85.5, 92.0, 78.5, None, 88.0, 95.5],"signup_date": pd.date_range("2024-01-01", periods=6) } df = pd.DataFrame(data)# 配置验证上下文 context = {"null_threshold": 5.0,"expected_types": {"user_id": "int64","username": "object","email": "object","age": "int64","score": "float64","signup_date": "datetime64[ns]" },"range_rules": {"age": (18, 100),"score": (0.0, 100.0) },"required_columns": ["user_id", "username", "email", "age", "score"],"critical_columns": ["user_id", "email"] } results = data_validator.validate_dataframe(df, context)# 分析结果 passed_rules = [r for r in results if r.passed] failed_rules = [r for r in results ifnot r.passed] print(f"验证结果:") print(f" 总规则数: {len(results)}") print(f" 通过规则: {len(passed_rules)}") print(f" 失败规则: {len(failed_rules)}")for result in results: status = "✓"if result.passed else"✗" print(f" {status}{result.rule_name}: {result.message}")# 生成详细报告 report = data_validator.generate_report() print(f"\n数据质量报告:") print(f" 通过率: {report['summary']['pass_rate']:.2%}") print(f" 严重问题: {report['summary']['critical_issues']}个")assert len(failed_rules) > 0, "应识别数据质量问题" print("✓ 综合数据验证测试通过")# ETL管道测试classTestETLPipeline:"""ETL管道测试""" @pytest.fixturedefextract_data(self):"""模拟数据抽取""" print("模拟数据抽取过程...")# 模拟从多个数据源抽取数据 customer_data = pd.DataFrame({"customer_id": [1, 2, 3, 4, 5],"name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],"city": ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"],"signup_date": pd.date_range("2023-01-01", periods=5) }) order_data = pd.DataFrame({"order_id": [101, 102, 103, 104, 105],"customer_id": [1, 2, 3, 4, 5],"amount": [150.0, 200.0, 99.99, 300.0, 50.0],"order_date": pd.date_range("2024-01-01", periods=5) })return {"customers": customer_data,"orders": order_data }deftest_extraction_process(self, extract_data):"""测试数据抽取过程""" print("\n=== 测试数据抽取过程 ===") data = extract_data# 验证抽取的数据assert"customers"in dataassert"orders"in data customers_df = data["customers"] orders_df = data["orders"]assert len(customers_df) == 5assert len(orders_df) == 5assert"customer_id"in customers_df.columnsassert"customer_id"in orders_df.columns print(f"✓ 数据抽取成功:") print(f" 客户数据: {len(customers_df)}行, {len(customers_df.columns)}列") print(f" 订单数据: {len(orders_df)}行, {len(orders_df.columns)}列")deftest_transformation_process(self, extract_data):"""测试数据转换过程""" print("\n=== 测试数据转换过程 ===") data = extract_data# 模拟数据转换:合并客户和订单数据 merged_df = pd.merge( data["customers"], data["orders"], on="customer_id", how="inner" )# 计算转换指标 total_sales = merged_df["amount"].sum() avg_order_value = merged_df["amount"].mean() customer_count = merged_df["customer_id"].nunique()# 验证转换结果assert len(merged_df) == 5assert total_sales > 0assert avg_order_value > 0assert customer_count == 5# 添加派生列 merged_df["sales_category"] = pd.cut( merged_df["amount"], bins=[0, 100, 200, float('inf')], labels=["small", "medium", "large"] ) print(f"✓ 数据转换成功:") print(f" 合并后数据: {len(merged_df)}行, {len(merged_df.columns)}列") print(f" 总销售额: ${total_sales:.2f}") print(f" 平均订单值: ${avg_order_value:.2f}") print(f" 客户数: {customer_count}")return merged_dfdeftest_loading_process(self, extract_data):"""测试数据加载过程""" print("\n=== 测试数据加载过程 ===")# 模拟数据转换 transformed_data = self.test_transformation_process(extract_data)# 模拟加载到目标系统 loaded_stats = {"rows_loaded": len(transformed_data),"columns_loaded": len(transformed_data.columns),"total_sales": transformed_data["amount"].sum(),"unique_customers": transformed_data["customer_id"].nunique(),"load_timestamp": datetime.now() }# 验证加载结果assert loaded_stats["rows_loaded"] == 5assert loaded_stats["columns_loaded"] > 5assert loaded_stats["total_sales"] > 0assert loaded_stats["unique_customers"] == 5 print(f"✓ 数据加载成功:") print(f" 加载行数: {loaded_stats['rows_loaded']}") print(f" 加载列数: {loaded_stats['columns_loaded']}") print(f" 加载时间: {loaded_stats['load_timestamp']}")return loaded_statsdeftest_end_to_end_etl_pipeline(self):"""测试端到端ETL管道""" print("\n=== 测试端到端ETL管道 ===")# 执行完整的ETL流程 extraction_result = self.test_extraction_process(None) transformation_result = self.test_transformation_process(None) loading_result = self.test_loading_process(None) print(f"\n端到端ETL管道测试结果:") print(f" 抽取: {extraction_result}") print(f" 转换: {len(transformation_result)}行数据") print(f" 加载: {loading_result}")assert extraction_result isnotNoneassert transformation_result isnotNoneassert loading_result isnotNone print("✓ 端到端ETL管道测试通过")deftest_data_quality_framework_summary():"""数据质量验证框架总结""" print("\n=== 数据质量验证框架特性总结 ===") print("1. 规则化验证: 定义和管理可重用的数据质量规则") print("2. 全面性检查: 涵盖空值、类型、范围、完整性等多个维度") print("3. 可配置阈值: 支持根据不同场景调整验证阈值") print("4. 详细报告: 生成结构化的数据质量报告") print("5. ETL集成: 支持完整的ETL流程测试") print("6. 可扩展性: 支持自定义数据质量规则")if __name__ == "__main__": test_data_quality_framework_summary()
问题描述:多个测试用例共享测试数据导致测试结果相互影响。
@pytest.fixturedeffresh_user_data():"""每次测试都生成全新的用户数据"""import time timestamp = int(time.time() * 1000)return {"username": f"testuser_{timestamp}","email": f"test_{timestamp}@example.com","password": "TestPass123!","unique_id": timestamp # 确保唯一性 }deftest_user_creation_isolated(fresh_user_data):"""独立测试用户创建"""# 使用全新的测试数据 result = create_user(fresh_user_data)assert result["id"] isnotNone
@pytest.fixturedefdb_transaction(database_connection):"""数据库事务fixture,测试后自动回滚"""# 开始事务 transaction = database_connection.begin()yield database_connection# 回滚事务,撤销所有更改 transaction.rollback()deftest_database_operation(db_transaction):"""测试数据库操作,自动回滚""" conn = db_transaction# 执行数据库操作 conn.execute("INSERT INTO users (name) VALUES ('Test User')")# 验证操作 result = conn.execute("SELECT COUNT(*) FROM users") count = result.scalar()# 测试结束后自动回滚,不影响其他测试
import tempfileimport os@pytest.fixturedeftemp_file():"""临时文件fixture"""with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write("Test content") temp_path = f.nameyield temp_path# 测试后清理if os.path.exists(temp_path): os.unlink(temp_path)
问题描述:大型测试套件执行时间过长,影响开发效率。
# 安装pytest-xdist插件# pip install pytest-xdist# 并行运行测试"""# 使用所有CPU核心pytest -n auto# 使用指定数量的workerpytest -n 4# 按模块并行执行pytest -n 4 --dist loadscope# 指定测试分发算法pytest -n 4 --dist loadfile"""
@pytest.fixture(scope="session")defexpensive_resource():"""昂贵的资源,会话级别共享""" resource = setup_expensive_resource()yield resource cleanup_expensive_resource(resource)@pytest.fixture(scope="module")defshared_data():"""模块级别共享数据"""return load_test_data()@pytest.fixturedeffresh_data():"""每次测试都重新生成的数据"""return generate_fresh_data()
# pytest自带测试缓存功能"""# 运行上次失败的测试pytest --lf# 运行上次失败的测试和新增的测试pytest --ff# 清除缓存pytest --cache-clear"""
问题描述:多环境(开发、测试、生产)配置管理复杂。
import osimport json@pytest.fixture(scope="session")deftest_config():"""加载测试配置""" env = os.getenv("TEST_ENV", "development") config_file = f"config/{env}.json"with open(config_file) as f: config = json.load(f)return config@pytest.fixturedefapi_client(test_config):"""基于配置创建API客户端""" base_url = test_config["api_base_url"] timeout = test_config["api_timeout"]return APIClient(base_url=base_url, timeout=timeout)
import dockerimport time@pytest.fixture(scope="session")defdatabase_container():"""启动数据库容器""" client = docker.from_env() container = client.containers.run("postgres:13", environment={"POSTGRES_PASSWORD": "testpass","POSTGRES_USER": "testuser","POSTGRES_DB": "testdb" }, ports={'5432/tcp': 5432}, detach=True )# 等待数据库启动 time.sleep(5)yield container# 清理容器 container.stop() container.remove()
问题描述:测试报告不够详细,难以与CI/CD系统集成。
# 使用多种报告插件"""# HTML报告pip install pytest-htmlpytest --html=report.html --self-contained-html# JUnit XML报告(Jenkins友好)pip install pytest-junitpytest --junitxml=report.xml# JSON报告pip install pytest-json-reportpytest --json-report# Allure报告(美观详细)pip install allure-pytestpytest --alluredir=allure-results"""
# GitHub Actions示例"""# .github/workflows/tests.ymlname: Run Testson: [push, pull_request]jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Set up Python uses: actions/setup-python@v2 with:python-version: '3.9' - name: Install dependencies run: |python -m pip install --upgrade pip pip install -r requirements.txt pip install pytest pytest-html pytest-cov - name: Run tests with coverage run: | pytest --cov=src --cov-report=xml --cov-report=html --html=report.html - name: Upload test report uses: actions/upload-artifact@v2 with: name: test-report path: report.html - name: Upload coverage report uses: actions/upload-artifact@v2 with: name: coverage-report path: htmlcov/"""
from dataclasses import dataclassimport factoryimport randomfrom datetime import datetime, timedelta@dataclassclassUserData: id: int username: str email: str created_at: datetimeclassUserFactory(factory.Factory):classMeta: model = UserData id = factory.Sequence(lambda n: n + 1000) username = factory.Faker('user_name') email = factory.LazyAttribute(lambda o: f"{o.username}@example.com") created_at = factory.Faker('date_time_this_year')@pytest.fixturedeftest_user():"""使用工厂生成测试用户"""return UserFactory()@pytest.fixturedefbulk_users():"""生成批量测试用户"""return [UserFactory() for _ in range(10)]
import jsonimport yamlclassTestDataManager:"""测试数据管理器"""def__init__(self, data_dir="test_data"): self.data_dir = Path(data_dir)defload_user_scenarios(self):"""加载用户场景数据"""with open(self.data_dir / "user_scenarios.yaml") as f:return yaml.safe_load(f)defget_test_cases(self, scenario_name):"""获取特定场景的测试用例""" scenarios = self.load_user_scenarios()return scenarios.get(scenario_name, [])@pytest.fixture(scope="session")deftest_data_manager():"""测试数据管理器fixture"""return TestDataManager()@pytest.mark.parametrize("test_case", test_data_manager().get_test_cases("user_registration"))deftest_user_registration_scenarios(test_case):"""测试用户注册场景""" result = register_user(test_case["input"])assert result["success"] == test_case["expected"]["success"]
pytest模块作为Python测试领域的革命性工具,为我们带来了全新的测试编写体验:- 极简语法:告别繁琐的样板代码,测试用例就是普通函数
- 官方文档阅读:https://docs.pytest.org
- 基础API熟悉:fixture、marker、parametrize
- 常用插件学习:pytest-cov、pytest-html、pytest-xdist
- 持续集成:与Jenkins、GitLab CI、GitHub Actions集成
- 性能测试:结合pytest-benchmark进行性能基准测试
- pytest官方文档:最权威的学习资料,包含最新特性和最佳实践
- pytest GitHub仓库:了解项目进展,参与社区贡献
- pytest插件索引:https://docs.pytest.org/en/latest/reference/plugin_list.html
- 《Python Testing with pytest》:pytest创始人Brian Okken撰写,全面深入
- 《Python测试开发实战》:结合实战案例,覆盖测试全流程
- 《Effective Python Testing》:测试最佳实践和模式总结
- Real Python的pytest教程:实践导向,适合初学者
- Test Automation University的pytest课程:免费优质资源
- Udemy的Python测试框架课程:项目驱动学习
在下一篇Python模块解析文章中,我们将深入探索requests模块,这是Python最流行的HTTP客户端库。你将学到:- 至少3个实战应用场景,包括Web爬虫、API测试和微服务调用
通过学习requests模块,你将能够构建强大的Web客户端应用,处理复杂的HTTP场景,确保网络通信的可靠性和性能。
通过本文的学习,你已经掌握了pytest模块的核心概念和实践技巧。从简单的测试用例到复杂的企业级测试框架,pytest为你提供了完整的解决方案。记住,良好的测试是软件质量的基石,而pytest则是你构建这一基石的强大工具。开始使用pytest编写你的测试吧,让每一次代码提交都充满信心,让每一个产品发布都稳定可靠!