代码修改后难以验证所有功能是否正常:每次修改用户登录逻辑,都要手动测试登录、注册、密码重置等十多个相关功能,耗时费力且容易遗漏。多人协作时互相影响导致bug:团队成员各自开发新功能,合并代码后发现原有功能报错,排查半天才发现是其他模块被影响了。缺乏自动化测试导致回归测试效率低下:产品上线后每次都要手动回归测试上百个功能点,测试人员疲惫不堪,还容易出错。测试数据准备复杂难以复用:每个测试用例都要重新创建数据库连接、初始化数据,代码重复度高,维护困难。无法快速定位测试失败原因:测试失败后只显示简单的断言错误,没有详细的上下文信息,需要额外添加调试代码。unittest模块正是为解决这些痛点而生!它提供了完整的测试框架,支持测试用例组织、断言验证、测试夹具管理,以及丰富的扩展功能。通过本文,你将掌握unittest的核心原理、Python 3.14最新特性、实战应用场景和常见问题解决方案,打造高可靠性的代码测试体系。unittest是Python标准库中的单元测试框架,最初由Kent Beck和Erich Gamma为Smalltalk创建,后移植到Java(JUnit),再到Python。它在Python 2.1版本中首次引入,原名PyUnit,是Python最成熟、最广泛使用的测试框架之一。- 测试驱动开发(TDD):鼓励先写测试后写实现,确保代码可测试性
- 自动化测试:支持批量执行测试用例,生成详细测试报告
- 测试隔离:每个测试用例独立运行,互不干扰,保证测试可靠性
unittest模块包含多个核心组件,下面是其主要功能分类: | | | |
| | | unittest.TestCase, test_开头方法 |
| | | |
| | | assertEqual(), assertTrue(), assertRaises() |
| | | |
| | | TextTestRunner(), unittest.main() |
| | | Mock(), patch(), MagicMock() |
| | | @skip, @skipIf, @skipUnless |
不适合的场景:需要大量参数化测试或复杂夹具管理的场景,这类任务更适合使用pytest框架,但unittest可以通过第三方库(如parameterized)扩展功能。测试用例是unittest的最小测试单位,所有测试类必须继承unittest.TestCase。import unittestclassTestStringMethods(unittest.TestCase):"""测试字符串方法的示例"""deftest_upper(self):"""测试字符串转大写功能""" self.assertEqual('hello'.upper(), 'HELLO')deftest_isupper(self):"""测试字符串是否为大写""" self.assertTrue('HELLO'.isupper()) self.assertFalse('Hello'.isupper())deftest_split(self):"""测试字符串分割功能""" s = 'hello world' self.assertEqual(s.split(), ['hello', 'world'])# 验证分割符不是字符串时抛出TypeErrorwith self.assertRaises(TypeError): s.split(2)if __name__ == '__main__': unittest.main(verbosity=2) # verbosity=2显示详细信息
test_isupper (__main__.TestStringMethods.test_isupper) ... oktest_split (__main__.TestStringMethods.test_split) ... oktest_upper (__main__.TestStringMethods.test_upper) ... ok----------------------------------------------------------------------Ran 3 testsin 0.001sOK
测试夹具管理测试前后的准备和清理工作,确保测试环境的隔离性。import unittestimport tempfileimport osclassFileOperationTests(unittest.TestCase):"""演示测试夹具用法的示例"""defsetUp(self):"""每个测试方法执行前调用,准备测试环境"""# 创建临时文件 self.temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') self.temp_file.write("测试数据\n第二行\n第三行") self.temp_file.close() self.file_path = self.temp_file.namedeftearDown(self):"""每个测试方法执行后调用,清理测试环境"""# 删除临时文件if os.path.exists(self.file_path): os.unlink(self.file_path)deftest_file_exists(self):"""测试文件是否存在""" self.assertTrue(os.path.exists(self.file_path))deftest_file_content(self):"""测试文件内容"""with open(self.file_path, 'r', encoding='utf-8') as f: content = f.read() self.assertIn("测试数据", content) self.assertEqual(3, len(content.split('\n')) - 1) # 减去空行deftest_file_read_lines(self):"""测试按行读取文件"""with open(self.file_path, 'r', encoding='utf-8') as f: lines = f.readlines() self.assertEqual(3, len(lines)) self.assertEqual("测试数据\n", lines[0])# 类级别夹具示例classDatabaseTests(unittest.TestCase):"""演示类级别夹具的示例""" @classmethoddefsetUpClass(cls):"""整个测试类开始前调用一次""" print("建立数据库连接...")# 这里模拟数据库连接 cls.db_connection = "mock_db_connection" @classmethod deftearDownClass(cls):"""整个测试类结束后调用一次""" print("关闭数据库连接...") cls.db_connection = Nonedeftest_query_users(self):"""测试查询用户功能""" self.assertEqual(self.db_connection, "mock_db_connection")# 实际这里会执行数据库查询 result = ["user1", "user2", "user3"] self.assertEqual(3, len(result))deftest_insert_user(self):"""测试插入用户功能""" self.assertIsNotNone(self.db_connection)# 实际这里会执行数据库插入 inserted_id = 123 self.assertIsInstance(inserted_id, int)
unittest提供了数十种断言方法,覆盖各种验证场景。import unittestclassAssertionExamples(unittest.TestCase):"""演示各种断言方法的示例"""deftest_basic_assertions(self):"""基础断言方法"""# 相等性断言 self.assertEqual(10, 5 + 5) self.assertNotEqual(10, 5 + 6)# 真值断言 self.assertTrue(10 > 5) self.assertFalse(10 < 5)# 空值断言 self.assertIsNone(None) self.assertIsNotNone("非空值")# 身份断言(内存地址) a = [1, 2, 3] b = a c = [1, 2, 3] self.assertIs(a, b) # 同一对象 self.assertIsNot(a, c) # 不同对象deftest_container_assertions(self):"""容器相关断言""" numbers = [1, 2, 3, 4, 5]# 成员断言 self.assertIn(3, numbers) self.assertNotIn(10, numbers)# 容器比较 self.assertListEqual([1, 2, 3], [1, 2, 3]) self.assertDictEqual({'a': 1}, {'a': 1}) self.assertSetEqual({1, 2}, {2, 1})deftest_exception_assertions(self):"""异常相关断言"""# 验证函数抛出指定异常 self.assertRaises(ValueError, int, "非数字")# 使用上下文管理器验证异常with self.assertRaises(TypeError) as cm:"字符串" + 123# 可以访问异常对象进行额外验证 exception = cm.exception self.assertIsInstance(exception, TypeError)# 验证异常信息包含特定内容with self.assertRaisesRegex(ValueError, "invalid literal"): int("XYZ")deftest_floating_point_assertions(self):"""浮点数相关断言"""# 浮点数近似相等(考虑浮点误差) self.assertAlmostEqual(0.1 + 0.2, 0.3, places=7)# 浮点数不近似相等 self.assertNotAlmostEqual(0.1, 0.2, places=7)# 相对误差近似 self.assertAlmostEqual(1000.0, 1000.1, delta=0.2)
unittest.mock模块提供了模拟对象功能,用于隔离测试代码的依赖。import unittestfrom unittest.mock import Mock, MagicMock, patch, callimport requestsclassMockExamples(unittest.TestCase):"""演示Mock对象用法的示例"""deftest_basic_mock(self):"""基础Mock用法"""# 创建Mock对象 mock_obj = Mock()# 设置返回值 mock_obj.calculate.return_value = 42# 调用方法 result = mock_obj.calculate(10, 20)# 验证调用 self.assertEqual(42, result) mock_obj.calculate.assert_called_once_with(10, 20)deftest_mock_side_effect(self):"""使用side_effect控制Mock行为""" mock_obj = Mock()# side_effect可以控制每次调用的返回值 mock_obj.get_value.side_effect = [1, 2, 3] self.assertEqual(1, mock_obj.get_value()) self.assertEqual(2, mock_obj.get_value()) self.assertEqual(3, mock_obj.get_value())# side_effect也可以引发异常 mock_obj.raise_error.side_effect = ValueError("模拟错误")with self.assertRaises(ValueError): mock_obj.raise_error()deftest_patch_decorator(self):"""使用patch装饰器临时替换对象"""# 测试函数,调用requests.getdefget_user_data(user_id): response = requests.get(f"https://api.example.com/users/{user_id}")return response.json()# 使用patch替换requests.getwith patch('requests.get') as mock_get:# 配置Mock返回值 mock_response = Mock() mock_response.json.return_value = {"id": 123, "name": "测试用户"} mock_get.return_value = mock_response# 执行测试 result = get_user_data(123)# 验证结果 self.assertEqual("测试用户", result["name"]) mock_get.assert_called_once_with("https://api.example.com/users/123")deftest_magic_mock_features(self):"""MagicMock的特殊功能"""# MagicMock支持魔术方法 magic_mock = MagicMock()# 设置魔术方法的行为 magic_mock.__len__.return_value = 3 magic_mock.__getitem__.side_effect = lambda key: f"值_{key}" self.assertEqual(3, len(magic_mock)) self.assertEqual("值_name", magic_mock["name"])
unittest支持灵活控制测试执行,包括跳过测试和标记预期失败。import unittestimport sysimport platformclassSkipExamples(unittest.TestCase):"""演示测试跳过和预期失败的示例""" @unittest.skip("演示无条件跳过")deftest_skipped_demo(self):"""这个测试会被无条件跳过""" self.fail("这个测试不应该执行") @unittest.skipIf(sys.version_info < (3, 7), "需要Python 3.7+")deftest_skip_if_python_version(self):"""Python版本低于3.7时跳过"""# 这个测试只在Python 3.7+执行 self.assertTrue(hasattr(dict, "popitem")) @unittest.skipUnless(platform.system() == "Linux", "只在Linux系统运行")deftest_skip_unless_linux(self):"""只在Linux系统执行的测试"""import os self.assertEqual("/", os.path.abspath("/"))deftest_conditional_skip(self):"""条件性跳过(运行时决定)"""ifnot self.has_network(): self.skipTest("没有网络连接,跳过网络相关测试")# 实际测试逻辑 self.assertTrue(self.test_network_connection()) @unittest.expectedFailuredeftest_expected_failure(self):"""预期失败的测试"""# 这个测试当前会失败,但我们预期它失败 self.assertEqual(1, 2, "这个比较应该失败")defhas_network(self):"""检查网络连接(简化版)"""returnTrue# 实际应该检查网络deftest_network_connection(self):"""测试网络连接(简化版)"""returnTrue# 实际应该测试网络classSubTestExamples(unittest.TestCase):"""演示subTest用法的示例"""deftest_with_subtest(self):"""使用subTest进行参数化测试""" test_cases = [ (0, 0, 0), (1, 2, 3), (10, 20, 30), (-5, 5, 0), (100, -50, 50) ]for a, b, expected in test_cases:with self.subTest(a=a, b=b, expected=expected): result = a + b self.assertEqual(expected, result, f"{a} + {b} = {result},预期{expected}")
unittest支持自动发现测试用例,便于管理大型测试套件。import unittestimport os# 项目测试目录结构示例"""project/├── src/│ ├── calculator.py│ └── user_service.py└── tests/ ├── test_calculator.py ├── test_user_service.py └── integration/ └── test_integration.py"""# test_calculator.py 示例classCalculator:"""被测试的计算器类"""defadd(self, a, b):return a + bdefdivide(self, a, b):if b == 0:raise ValueError("除数不能为零")return a / bclassTestCalculator(unittest.TestCase):"""测试计算器类的示例"""defsetUp(self): self.calc = Calculator()deftest_add_positive_numbers(self): self.assertEqual(5, self.calc.add(2, 3))deftest_add_negative_numbers(self): self.assertEqual(-1, self.calc.add(2, -3))deftest_divide_normal(self): self.assertEqual(2.0, self.calc.divide(6, 3))deftest_divide_by_zero(self):with self.assertRaises(ValueError) as cm: self.calc.divide(6, 0) self.assertEqual("除数不能为零", str(cm.exception))# 手动组织测试套件defcreate_test_suite():"""手动创建测试套件的示例""" suite = unittest.TestSuite()# 添加单个测试方法 suite.addTest(TestCalculator('test_add_positive_numbers'))# 添加整个测试类 suite.addTest(unittest.makeSuite(TestCalculator))return suite# 测试发现命令行示例"""# 发现并运行所有测试python -m unittest discover# 指定测试目录和模式python -m unittest discover -s tests -p "test_*.py"# 运行特定测试类python -m unittest tests.test_calculator.TestCalculator# 运行单个测试方法python -m unittest tests.test_calculator.TestCalculator.test_add_positive_numbers# 输出详细结果python -m unittest discover -v# 生成XML报告(需要xmlrunner)python -m xmlrunner discover -o test-reports"""
可以自定义测试运行器来满足特定需求,如生成特定格式的报告。import unittestimport sysfrom io import StringIOclassCustomTestRunner(unittest.TextTestRunner):"""自定义测试运行器示例"""def__init__(self, *args, **kwargs):# 添加自定义输出流 kwargs.setdefault('stream', StringIO()) super().__init__(*args, **kwargs)defrun(self, test):"""重写run方法,添加自定义逻辑""" print("=== 开始执行测试 ===", file=sys.stderr)# 运行测试并获取结果 result = super().run(test)# 输出自定义统计信息 print(f"\n=== 测试统计 ===", file=sys.stderr) print(f"总测试数: {result.testsRun}", file=sys.stderr) print(f"通过数: {result.testsRun - len(result.failures) - len(result.errors)}", file=sys.stderr) print(f"失败数: {len(result.failures)}", file=sys.stderr) print(f"错误数: {len(result.errors)}", file=sys.stderr) print(f"跳过数: {len(result.skipped)}", file=sys.stderr)# 输出自定义报告if result.failures: print(f"\n=== 失败详情 ===", file=sys.stderr)for test, traceback in result.failures: print(f"测试: {test}", file=sys.stderr) print(f"原因: {traceback.splitlines()[-1]}", file=sys.stderr)return result# 使用自定义运行器if __name__ == '__main__':# 发现所有测试 loader = unittest.TestLoader() tests = loader.discover('.', pattern='test_*.py')# 使用自定义运行器 runner = CustomTestRunner(verbosity=2) runner.run(tests)
import unittestimport concurrent.futuresimport timeclassParallelTestExecution:"""并行执行测试的示例""" @staticmethoddefrun_test_class(test_class):"""运行单个测试类""" suite = unittest.TestLoader().loadTestsFromTestCase(test_class) runner = unittest.TextTestRunner(verbosity=0) result = runner.run(suite)return (test_class.__name__, result) @staticmethod defrun_tests_in_parallel(test_classes, max_workers=4):"""并行运行多个测试类""" results = {}with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:# 提交所有测试类 future_to_class = { executor.submit(ParallelTestExecution.run_test_class, test_class): test_classfor test_class in test_classes }# 收集结果for future in concurrent.futures.as_completed(future_to_class): test_class = future_to_class[future]try: class_name, result = future.result() results[class_name] = resultexcept Exception as e: print(f"{test_class.__name__} 执行出错: {e}")return results# 示例测试类classFastTest1(unittest.TestCase):deftest_one(self): time.sleep(0.1) self.assertTrue(True)deftest_two(self): time.sleep(0.1) self.assertEqual(2, 1+1)classFastTest2(unittest.TestCase):deftest_three(self): time.sleep(0.2) self.assertIn(3, [1, 2, 3])deftest_four(self): time.sleep(0.2) self.assertNotEqual(4, 5)# 并行执行示例if __name__ == '__main__': test_classes = [FastTest1, FastTest2] print("开始并行测试执行...") start_time = time.time() results = ParallelTestExecution.run_tests_in_parallel(test_classes) elapsed_time = time.time() - start_time print(f"并行执行完成,耗时: {elapsed_time:.2f}秒")# 输出汇总统计 total_tests = sum(len(result.failures) + len(result.errors) + (result.testsRun - len(result.failures) - len(result.errors))for result in results.values()) print(f"总测试数: {total_tests}")for class_name, result in results.items(): passed = result.testsRun - len(result.failures) - len(result.errors) print(f"{class_name}: 通过 {passed}/{result.testsRun}")
测试RESTful API接口的正确性、边界情况和异常处理。import unittestimport jsonfrom unittest.mock import patch, Mockfrom myapp.api import UserAPIclassTestUserAPI(unittest.TestCase):"""测试用户API接口的示例"""defsetUp(self): self.api = UserAPI() self.test_user = {"id": 123,"name": "测试用户","email": "test@example.com" }deftest_get_user_success(self):"""测试成功获取用户信息"""# 模拟数据库返回with patch.object(self.api.db, 'get_user', return_value=self.test_user): response = self.api.get_user(123)# 验证响应格式 self.assertEqual(200, response.status_code) data = json.loads(response.body) self.assertEqual(self.test_user["name"], data["name"]) self.assertEqual(self.test_user["email"], data["email"])deftest_get_user_not_found(self):"""测试用户不存在的情况"""with patch.object(self.api.db, 'get_user', return_value=None): response = self.api.get_user(999) self.assertEqual(404, response.status_code) data = json.loads(response.body) self.assertEqual("用户不存在", data["error"])deftest_create_user_success(self):"""测试成功创建用户""" new_user = {"name": "新用户","email": "new@example.com","password": "secure123" }# 模拟数据库插入返回IDwith patch.object(self.api.db, 'create_user', return_value=456): response = self.api.create_user(new_user) self.assertEqual(201, response.status_code) data = json.loads(response.body) self.assertEqual(456, data["id"]) self.assertEqual(new_user["name"], data["name"])deftest_create_user_validation_error(self):"""测试创建用户时的验证错误""" invalid_user = {"name": "", # 空用户名"email": "invalid-email","password": "123"# 密码太短 } response = self.api.create_user(invalid_user) self.assertEqual(400, response.status_code) data = json.loads(response.body) self.assertIn("errors", data) self.assertGreater(len(data["errors"]), 0)deftest_update_user_success(self):"""测试成功更新用户""" update_data = {"name": "更新后的名字","email": "updated@example.com" }# 模拟数据库更新with patch.object(self.api.db, 'update_user', return_value=True): response = self.api.update_user(123, update_data) self.assertEqual(200, response.status_code) data = json.loads(response.body) self.assertTrue(data["success"])deftest_delete_user_success(self):"""测试成功删除用户"""with patch.object(self.api.db, 'delete_user', return_value=True): response = self.api.delete_user(123) self.assertEqual(204, response.status_code) self.assertEqual("", response.body) # 204无内容classTestUserAPIEdgeCases(unittest.TestCase):"""测试API边界情况的示例"""deftest_get_user_with_special_characters(self):"""测试特殊字符的用户名""" api = UserAPI() test_user = {"id": 789,"name": "测试&用户@#测试","email": "special@example.com" }with patch.object(api.db, 'get_user', return_value=test_user): response = api.get_user(789) self.assertEqual(200, response.status_code) data = json.loads(response.body)# 验证特殊字符正确处理 self.assertEqual(test_user["name"], data["name"])deftest_create_user_duplicate_email(self):"""测试重复邮箱的情况""" api = UserAPI() duplicate_user = {"name": "重复用户","email": "duplicate@example.com","password": "password123" }# 模拟唯一约束冲突with patch.object(api.db, 'create_user', side_effect=Exception("唯一约束冲突")): response = api.create_user(duplicate_user) self.assertEqual(409, response.status_code) # 冲突状态码 data = json.loads(response.body) self.assertEqual("邮箱已存在", data["error"])deftest_concurrent_user_creation(self):"""测试并发创建用户(防止竞态条件)""" api = UserAPI()defcreate_user_thread(user_data):return api.create_user(user_data)# 模拟并发请求(简化版) user_data_list = [ {"name": f"用户{i}", "email": f"user{i}@example.com", "password": "pass123"}for i in range(5) ]# 在实际测试中,这里会使用多线程或asyncio# 这里简化,顺序执行但验证锁机制with patch.object(api.db, 'acquire_lock') as mock_lock: mock_lock.return_value = True responses = []for user_data in user_data_list: responses.append(api.create_user(user_data))# 验证锁被正确调用 self.assertEqual(len(user_data_list), mock_lock.call_count)deftest_user_api_performance(self):"""测试API性能"""import time api = UserAPI()# 模拟快速数据库查询with patch.object(api.db, 'get_user') as mock_get: mock_get.return_value = {"id": 111, "name": "性能测试"} start_time = time.time()for i in range(100): api.get_user(111) elapsed_time = time.time() - start_time# 验证性能要求:100次调用应在0.5秒内完成 self.assertLess(elapsed_time, 0.5) print(f"性能测试:100次调用耗时 {elapsed_time:.3f} 秒")
测试数据库CRUD操作的原子性、一致性和异常处理。import unittestimport sqlite3from contextlib import closingfrom datetime import datetimeclassDatabaseTestBase(unittest.TestCase):"""数据库测试基类,提供通用夹具""" @classmethoddefsetUpClass(cls):"""建立内存数据库连接""" cls.connection = sqlite3.connect(':memory:') cls.connection.row_factory = sqlite3.Row # 允许列名访问 @classmethoddeftearDownClass(cls):"""关闭数据库连接""" cls.connection.close()defsetUp(self):"""每个测试前重置数据库""" cursor = self.connection.cursor()# 创建用户表 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, email TEXT UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''')# 创建订单表 cursor.execute(''' CREATE TABLE IF NOT EXISTS orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, amount REAL NOT NULL, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') self.connection.commit()deftearDown(self):"""每个测试后清理表""" cursor = self.connection.cursor() cursor.execute('DROP TABLE IF EXISTS orders') cursor.execute('DROP TABLE IF EXISTS users') self.connection.commit()classTestUserCRUD(DatabaseTestBase):"""测试用户CRUD操作"""deftest_create_user_success(self):"""测试成功创建用户""" cursor = self.connection.cursor()# 插入用户 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('testuser', 'test@example.com') ) user_id = cursor.lastrowid self.assertIsNotNone(user_id) self.assertGreater(user_id, 0)# 验证插入成功 cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,)) user = cursor.fetchone() self.assertIsNotNone(user) self.assertEqual('testuser', user['username']) self.assertEqual('test@example.com', user['email']) self.assertIsNotNone(user['created_at'])deftest_create_user_duplicate_username(self):"""测试重复用户名""" cursor = self.connection.cursor()# 第一次插入成功 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('duplicate', 'first@example.com') )# 第二次插入应该失败with self.assertRaises(sqlite3.IntegrityError): cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('duplicate', 'second@example.com') )# 验证只有一个用户 cursor.execute('SELECT COUNT(*) as count FROM users') count = cursor.fetchone()['count'] self.assertEqual(1, count)deftest_read_user_by_id(self):"""测试通过ID读取用户""" cursor = self.connection.cursor()# 先插入用户 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('readuser', 'read@example.com') ) user_id = cursor.lastrowid# 读取用户 cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,)) user = cursor.fetchone() self.assertIsNotNone(user) self.assertEqual(user_id, user['id']) self.assertEqual('readuser', user['username']) self.assertEqual('read@example.com', user['email'])deftest_update_user_email(self):"""测试更新用户邮箱""" cursor = self.connection.cursor()# 插入用户 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('updateuser', 'old@example.com') ) user_id = cursor.lastrowid# 更新邮箱 new_email = 'new@example.com' cursor.execute('UPDATE users SET email = ? WHERE id = ?', (new_email, user_id) ) self.assertEqual(1, cursor.rowcount) # 验证更新了一行# 验证更新成功 cursor.execute('SELECT email FROM users WHERE id = ?', (user_id,)) updated_user = cursor.fetchone() self.assertEqual(new_email, updated_user['email'])deftest_delete_user_success(self):"""测试删除用户""" cursor = self.connection.cursor()# 插入用户 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('deleteuser', 'delete@example.com') ) user_id = cursor.lastrowid# 删除用户 cursor.execute('DELETE FROM users WHERE id = ?', (user_id,)) self.assertEqual(1, cursor.rowcount) # 验证删除了一行# 验证用户不存在 cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,)) deleted_user = cursor.fetchone() self.assertIsNone(deleted_user)classTestTransactionIsolation(DatabaseTestBase):"""测试事务隔离"""deftest_transaction_atomicity(self):"""测试事务原子性(要么全部成功,要么全部失败)""" cursor = self.connection.cursor()try:# 开始事务 self.connection.execute('BEGIN TRANSACTION')# 插入用户 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('atomic1', 'atomic1@example.com') ) user1_id = cursor.lastrowid# 故意制造错误(重复用户名)with self.assertRaises(sqlite3.IntegrityError): cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('atomic1', 'atomic2@example.com') # 相同用户名 )# 事务应该回滚 self.connection.rollback()except Exception: self.connection.rollback()raise# 验证事务已回滚(没有用户插入) cursor.execute('SELECT COUNT(*) as count FROM users') count = cursor.fetchone()['count'] self.assertEqual(0, count) # 两个插入都应该失败deftest_transaction_consistency(self):"""测试事务一致性(保持业务规则)""" cursor = self.connection.cursor()# 插入用户 cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', ('consistency', 'consistency@example.com') ) user_id = cursor.lastrowid# 为这个用户插入订单 cursor.execute('INSERT INTO orders (user_id, amount) VALUES (?, ?)', (user_id, 100.0) )# 删除用户(应该失败,因为有外键约束)with self.assertRaises(sqlite3.IntegrityError): cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))# 验证用户和订单都存在 cursor.execute('SELECT COUNT(*) as user_count FROM users WHERE id = ?', (user_id,)) user_count = cursor.fetchone()['user_count'] self.assertEqual(1, user_count) cursor.execute('SELECT COUNT(*) as order_count FROM orders WHERE user_id = ?', (user_id,)) order_count = cursor.fetchone()['order_count'] self.assertEqual(1, order_count)classTestDatabasePerformance(DatabaseTestBase):"""测试数据库性能"""deftest_bulk_insert_performance(self):"""测试批量插入性能"""import time cursor = self.connection.cursor()# 准备批量数据 users = []for i in range(1000): users.append((f'user{i}', f'user{i}@example.com')) start_time = time.time()# 使用executemany进行批量插入 cursor.executemany('INSERT INTO users (username, email) VALUES (?, ?)', users ) elapsed_time = time.time() - start_time# 验证插入成功 cursor.execute('SELECT COUNT(*) as count FROM users') count = cursor.fetchone()['count'] self.assertEqual(1000, count)# 验证性能要求:1000条插入应在0.5秒内完成 self.assertLess(elapsed_time, 0.5) print(f"批量插入性能:1000条记录耗时 {elapsed_time:.3f} 秒")deftest_query_performance_with_index(self):"""测试索引对查询性能的影响"""import time cursor = self.connection.cursor()# 插入测试数据for i in range(500): cursor.execute('INSERT INTO users (username, email) VALUES (?, ?)', (f'perfuser{i}', f'perfuser{i}@example.com') )# 无索引查询 start_time = time.time() cursor.execute('SELECT * FROM users WHERE username = ?', ('perfuser250',)) no_index_time = time.time() - start_time# 创建索引 cursor.execute('CREATE INDEX idx_users_username ON users(username)')# 有索引查询 start_time = time.time() cursor.execute('SELECT * FROM users WHERE username = ?', ('perfuser250',)) with_index_time = time.time() - start_time# 验证索引提高了性能 self.assertLess(with_index_time, no_index_time) print(f"无索引查询: {no_index_time:.6f}秒, 有索引查询: {with_index_time:.6f}秒")
测试包含多种业务规则、状态流转和异常处理的复杂业务逻辑。import unittestfrom datetime import datetime, timedeltafrom enum import Enumfrom unittest.mock import Mock, patchclassOrderStatus(Enum):"""订单状态枚举""" PENDING = "pending"# 待支付 PAID = "paid"# 已支付 PROCESSING = "processing"# 处理中 SHIPPED = "shipped"# 已发货 DELIVERED = "delivered"# 已送达 CANCELLED = "cancelled"# 已取消 REFUNDED = "refunded"# 已退款classOrder:"""订单类,包含复杂业务逻辑"""def__init__(self, order_id, user_id, amount, status=OrderStatus.PENDING): self.order_id = order_id self.user_id = user_id self.amount = amount self.status = status self.created_at = datetime.now() self.paid_at = None self.shipped_at = None self.delivered_at = None self.cancelled_at = None self.refunded_at = None self.payment_method = None self.shipping_address = Nonedefpay(self, payment_method, payment_time=None):"""支付订单"""if self.status != OrderStatus.PENDING:raise ValueError(f"订单当前状态为{self.status.value},无法支付")if payment_time isNone: payment_time = datetime.now()# 检查是否超过支付时限(30分钟)if payment_time - self.created_at > timedelta(minutes=30):raise ValueError("订单已超时,请重新下单") self.status = OrderStatus.PAID self.payment_method = payment_method self.paid_at = payment_timereturnTruedefcancel(self, reason, cancel_time=None):"""取消订单"""if cancel_time isNone: cancel_time = datetime.now()# 待支付状态可直接取消if self.status == OrderStatus.PENDING: self.status = OrderStatus.CANCELLED self.cancelled_at = cancel_timereturnTrue# 已支付但未发货可申请取消if self.status == OrderStatus.PAID:# 验证是否超过取消时限(支付后1小时内)if cancel_time - self.paid_at > timedelta(hours=1):raise ValueError("已超过可取消时间,请联系客服") self.status = OrderStatus.CANCELLED self.cancelled_at = cancel_timereturnTrue# 其他状态不可取消raise ValueError(f"订单当前状态为{self.status.value},无法取消")defship(self, shipping_address, ship_time=None):"""发货"""if self.status != OrderStatus.PAID:raise ValueError(f"订单当前状态为{self.status.value},无法发货")if ship_time isNone: ship_time = datetime.now() self.status = OrderStatus.SHIPPED self.shipping_address = shipping_address self.shipped_at = ship_timereturnTruedefdeliver(self, delivery_time=None):"""送达"""if self.status != OrderStatus.SHIPPED:raise ValueError(f"订单当前状态为{self.status.value},无法标记为送达")if delivery_time isNone: delivery_time = datetime.now()# 验证发货时间在送达时间之前if delivery_time < self.shipped_at:raise ValueError("送达时间不能早于发货时间") self.status = OrderStatus.DELIVERED self.delivered_at = delivery_timereturnTruedefrefund(self, reason, refund_time=None):"""退款"""if refund_time isNone: refund_time = datetime.now()# 仅已支付或已取消的订单可退款if self.status notin [OrderStatus.PAID, OrderStatus.CANCELLED]:raise ValueError(f"订单当前状态为{self.status.value},无法退款")# 验证退款时间合理性if self.status == OrderStatus.PAID and refund_time < self.paid_at:raise ValueError("退款时间不能早于支付时间")if self.status == OrderStatus.CANCELLED and refund_time < self.cancelled_at:raise ValueError("退款时间不能早于取消时间") self.status = OrderStatus.REFUNDED self.refunded_at = refund_timereturnTrueclassTestOrderBusinessLogic(unittest.TestCase):"""测试订单业务逻辑"""defsetUp(self):# 创建测试订单 self.order = Order( order_id=1001, user_id=5001, amount=299.99 )# 固定创建时间,便于测试超时逻辑 self.order.created_at = datetime(2024, 3, 27, 10, 0, 0)deftest_initial_order_state(self):"""测试订单初始状态""" self.assertEqual(OrderStatus.PENDING, self.order.status) self.assertIsNone(self.order.paid_at) self.assertIsNone(self.order.payment_method) self.assertEqual(1001, self.order.order_id) self.assertEqual(5001, self.order.user_id) self.assertEqual(299.99, self.order.amount)deftest_pay_order_success(self):"""测试成功支付订单""" payment_time = datetime(2024, 3, 27, 10, 15, 0) # 创建后15分钟 result = self.order.pay( payment_method="alipay", payment_time=payment_time ) self.assertTrue(result) self.assertEqual(OrderStatus.PAID, self.order.status) self.assertEqual(payment_time, self.order.paid_at) self.assertEqual("alipay", self.order.payment_method)deftest_pay_order_timeout(self):"""测试订单支付超时""" payment_time = datetime(2024, 3, 27, 10, 45, 0) # 创建后45分钟,超时with self.assertRaises(ValueError) as cm: self.order.pay( payment_method="alipay", payment_time=payment_time ) self.assertIn("订单已超时", str(cm.exception)) self.assertEqual(OrderStatus.PENDING, self.order.status) # 状态未改变deftest_pay_order_wrong_status(self):"""测试错误状态下支付订单"""# 先将订单状态设为已支付 self.order.status = OrderStatus.PAIDwith self.assertRaises(ValueError) as cm: self.order.pay( payment_method="alipay", payment_time=datetime(2024, 3, 27, 10, 15, 0) ) self.assertIn("无法支付", str(cm.exception))deftest_cancel_pending_order_success(self):"""测试取消待支付订单""" cancel_time = datetime(2024, 3, 27, 10, 10, 0) result = self.order.cancel( reason="不需要了", cancel_time=cancel_time ) self.assertTrue(result) self.assertEqual(OrderStatus.CANCELLED, self.order.status) self.assertEqual(cancel_time, self.order.cancelled_at)deftest_cancel_paid_order_success(self):"""测试取消已支付订单(在规定时间内)"""# 先支付订单 self.order.pay( payment_method="wechat", payment_time=datetime(2024, 3, 27, 10, 5, 0) )# 在规定时间内取消(支付后30分钟) cancel_time = datetime(2024, 3, 27, 10, 25, 0) result = self.order.cancel( reason="改变主意", cancel_time=cancel_time ) self.assertTrue(result) self.assertEqual(OrderStatus.CANCELLED, self.order.status) self.assertEqual(cancel_time, self.order.cancelled_at)deftest_cancel_paid_order_timeout(self):"""测试取消已支付订单超时"""# 先支付订单 self.order.pay( payment_method="wechat", payment_time=datetime(2024, 3, 27, 10, 5, 0) )# 超过取消时限(支付后2小时) cancel_time = datetime(2024, 3, 27, 12, 5, 0)with self.assertRaises(ValueError) as cm: self.order.cancel( reason="太晚了", cancel_time=cancel_time ) self.assertIn("已超过可取消时间", str(cm.exception)) self.assertEqual(OrderStatus.PAID, self.order.status) # 状态未改变deftest_cancel_processing_order_failure(self):"""测试取消处理中订单失败"""# 先支付订单 self.order.pay( payment_method="alipay", payment_time=datetime(2024, 3, 27, 10, 5, 0) )# 发货(进入处理中状态) self.order.ship( shipping_address="北京市朝阳区", ship_time=datetime(2024, 3, 27, 10, 20, 0) )with self.assertRaises(ValueError) as cm: self.order.cancel( reason="不想等了", cancel_time=datetime(2024, 3, 27, 10, 25, 0) ) self.assertIn("无法取消", str(cm.exception)) self.assertEqual(OrderStatus.SHIPPED, self.order.status) # 状态未改变classTestOrderIntegration(unittest.TestCase):"""测试订单与其他系统的集成"""defsetUp(self): self.order = Order( order_id=2001, user_id=6001, amount=150.0 )# 模拟外部服务 self.mock_payment_service = Mock() self.mock_inventory_service = Mock() self.mock_shipping_service = Mock()deftest_order_with_payment_service_integration(self):"""测试订单与支付服务集成"""# 配置Mock self.mock_payment_service.process_payment.return_value = {"success": True,"transaction_id": "txn_123456","payment_time": datetime(2024, 3, 27, 10, 15, 0) }# 模拟支付处理 payment_time = datetime(2024, 3, 27, 10, 15, 0)with patch('myapp.services.payment_service', self.mock_payment_service):# 这里假设Order类使用支付服务 result = self.order.pay( payment_method="credit_card", payment_time=payment_time ) self.assertTrue(result)# 验证支付服务被调用 self.mock_payment_service.process_payment.assert_called_once()deftest_order_with_inventory_service_integration(self):"""测试订单与库存服务集成"""# 配置Mock库存检查 self.mock_inventory_service.check_availability.return_value = True self.mock_inventory_service.reserve_items.return_value = "reserve_789"# 模拟库存检查with patch('myapp.services.inventory_service', self.mock_inventory_service):# 这里假设Order类在支付前检查库存 has_stock = self.mock_inventory_service.check_availability( product_id="prod_001", quantity=1 ) self.assertTrue(has_stock) self.mock_inventory_service.check_availability.assert_called_once_with( product_id="prod_001", quantity=1 )deftest_order_with_shipping_service_integration(self):"""测试订单与物流服务集成"""# 配置Mock物流服务 self.mock_shipping_service.create_shipment.return_value = {"tracking_number": "TRK123456789","estimated_delivery": datetime(2024, 3, 30, 14, 0, 0) }# 模拟发货with patch('myapp.services.shipping_service', self.mock_shipping_service):# 这里假设Order类使用物流服务发货 shipment_info = self.mock_shipping_service.create_shipment( order_id=2001, address="上海市浦东新区", weight=2.5 ) self.assertIsNotNone(shipment_info) self.assertEqual("TRK123456789", shipment_info["tracking_number"]) self.mock_shipping_service.create_shipment.assert_called_once_with( order_id=2001, address="上海市浦东新区", weight=2.5 )classTestOrderEdgeCases(unittest.TestCase):"""测试订单边界情况"""deftest_order_with_zero_amount(self):"""测试零金额订单""" order = Order( order_id=3001, user_id=7001, amount=0.0 )# 零金额订单可直接完成 result = order.pay( payment_method="free", payment_time=datetime(2024, 3, 27, 10, 5, 0) ) self.assertTrue(result) self.assertEqual(OrderStatus.PAID, order.status)deftest_order_with_negative_amount(self):"""测试负金额订单(异常情况)"""# 在实际业务中,金额应该是正数# 这里测试异常处理 order = Order( order_id=3002, user_id=7002, amount=-50.0 )# 负金额订单应该被拒绝# 这里假设系统有验证逻辑 self.assertLess(order.amount, 0)deftest_order_large_amount(self):"""测试大金额订单""" order = Order( order_id=3003, user_id=7003, amount=999999.99 )# 大金额订单可能需要额外验证# 这里测试正常支付流程 result = order.pay( payment_method="bank_transfer", payment_time=datetime(2024, 3, 27, 10, 10, 0) ) self.assertTrue(result) self.assertEqual(OrderStatus.PAID, order.status)deftest_order_status_transition_validation(self):"""测试订单状态流转验证""" order = Order( order_id=3004, user_id=7004, amount=200.0 )# 记录合法状态流转 legal_transitions = { OrderStatus.PENDING: [OrderStatus.PAID, OrderStatus.CANCELLED], OrderStatus.PAID: [OrderStatus.PROCESSING, OrderStatus.REFUNDED, OrderStatus.CANCELLED], OrderStatus.PROCESSING: [OrderStatus.SHIPPED], OrderStatus.SHIPPED: [OrderStatus.DELIVERED, OrderStatus.REFUNDED], OrderStatus.DELIVERED: [OrderStatus.REFUNDED], OrderStatus.CANCELLED: [OrderStatus.REFUNDED], OrderStatus.REFUNDED: [] # 最终状态 }# 测试一系列合法流转 test_transitions = [ (OrderStatus.PENDING, OrderStatus.PAID), # 待支付 -> 已支付 (OrderStatus.PAID, OrderStatus.CANCELLED), # 已支付 -> 已取消 (OrderStatus.CANCELLED, OrderStatus.REFUNDED), # 已取消 -> 已退款 ]for from_status, to_status in test_transitions: order.status = from_status# 验证是合法流转 self.assertIn(to_status, legal_transitions[from_status])
现象:测试代码依赖外部服务(数据库、API、文件系统),导致测试不稳定、执行慢。import unittestfrom unittest.mock import Mock, patchimport osclassTestWithDependencies(unittest.TestCase):"""解决测试依赖问题的示例"""deftest_with_mock_database(self):"""使用Mock模拟数据库依赖"""# 创建Mock数据库连接 mock_db = Mock() mock_db.query.return_value = [ {"id": 1, "name": "用户1"}, {"id": 2, "name": "用户2"} ]# 替换实际数据库连接with patch('myapp.database.get_connection', return_value=mock_db):from myapp.services import UserService service = UserService() users = service.get_all_users()# 验证结果 self.assertEqual(2, len(users)) mock_db.query.assert_called_once_with("SELECT * FROM users")deftest_with_temp_file_system(self):"""使用临时文件系统避免真实文件操作"""import tempfile# 创建临时目录with tempfile.TemporaryDirectory() as temp_dir: test_file = os.path.join(temp_dir, "test.txt")# 写入测试数据with open(test_file, 'w', encoding='utf-8') as f: f.write("测试数据")# 测试文件操作with open(test_file, 'r', encoding='utf-8') as f: content = f.read() self.assertEqual("测试数据", content)# 测试完成后,临时目录会自动删除deftest_with_mock_external_api(self):"""使用Mock模拟外部API调用""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"success": True, "data": "API返回数据"}with patch('requests.get', return_value=mock_response):import requests response = requests.get("https://api.example.com/data") data = response.json() self.assertTrue(data["success"]) self.assertEqual("API返回数据", data["data"])
现象:每个测试用例都需要复杂的测试数据准备,代码重复度高。解决方案:使用工厂模式创建测试数据,通过setUp方法复用。import unittestfrom datetime import datetimefrom dataclasses import dataclass@dataclassclassUser:"""用户数据类""" id: int username: str email: str created_at: datetimeclassUserFactory:"""用户工厂类,用于创建测试用户""" @staticmethoddefcreate_user(user_id=None, **kwargs):"""创建用户,支持自定义属性""" defaults = {"id": user_id or1,"username": f"testuser{user_id or1}","email": f"test{user_id or1}@example.com","created_at": datetime.now() } defaults.update(kwargs)return User(**defaults) @staticmethoddefcreate_users(count=5):"""批量创建用户"""return [UserFactory.create_user(i+1) for i in range(count)]classTestWithFactoryPattern(unittest.TestCase):"""使用工厂模式准备测试数据的示例"""defsetUp(self):"""准备测试数据""" self.user_factory = UserFactory() self.test_user = self.user_factory.create_user(1001, username="特定用户") self.test_users = self.user_factory.create_users(3)deftest_user_creation(self):"""测试用户创建""" self.assertEqual(1001, self.test_user.id) self.assertEqual("特定用户", self.test_user.username) self.assertIn("@example.com", self.test_user.email)deftest_batch_user_creation(self):"""测试批量用户创建""" self.assertEqual(3, len(self.test_users))# 验证每个用户的唯一性 user_ids = [user.id for user in self.test_users] self.assertEqual([1, 2, 3], user_ids)# 验证用户名格式for i, user in enumerate(self.test_users, 1): self.assertEqual(f"testuser{i}", user.username)deftest_custom_user_attributes(self):"""测试自定义用户属性""" custom_user = self.user_factory.create_user(9999, username="custom", email="custom@test.com", created_at=datetime(2024, 1, 1, 12, 0, 0) ) self.assertEqual("custom", custom_user.username) self.assertEqual("custom@test.com", custom_user.email) self.assertEqual(2024, custom_user.created_at.year)
解决方案:优化测试设计,使用并行执行,避免不必要的重复。import unittestimport timeimport concurrent.futuresclassTestPerformanceOptimization(unittest.TestCase):"""测试性能优化的示例"""defsetUp(self):# 避免在每个测试中重复创建昂贵资源ifnot hasattr(self, 'shared_resource'): self.shared_resource = self.create_expensive_resource()defcreate_expensive_resource(self):"""模拟创建昂贵资源(如数据库连接)""" time.sleep(0.1) # 模拟耗时操作return {"connection": "mock_db_connection", "initialized": True}deftest_use_shared_resource(self):"""使用共享资源,避免重复创建""" self.assertTrue(self.shared_resource["initialized"])# 快速验证逻辑 self.assertEqual("mock_db_connection", self.shared_resource["connection"])deftest_avoid_expensive_io(self):"""避免昂贵的IO操作"""# 使用Mock替代真实API调用 mock_api = Mock() mock_api.get_data.return_value = {"id": 1, "name": "测试数据"} result = mock_api.get_data() self.assertEqual(1, result["id"])# 验证没有实际IO发生 mock_api.get_data.assert_called_once()deftest_parallel_test_execution(self):"""并行执行独立测试""" test_functions = [ self._test_operation1, self._test_operation2, self._test_operation3 ] start_time = time.time()# 使用线程池并行执行with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(func) for func in test_functions]# 等待所有测试完成for future in concurrent.futures.as_completed(futures):try: future.result() # 获取结果,如有异常会抛出except Exception as e: self.fail(f"测试失败: {e}") elapsed_time = time.time() - start_time# 并行执行应该比顺序执行快 print(f"并行执行耗时: {elapsed_time:.3f}秒") self.assertLess(elapsed_time, 0.3) # 三个0.1秒的任务应该在0.3秒内完成def_test_operation1(self):"""模拟耗时操作1""" time.sleep(0.1) self.assertTrue(True)def_test_operation2(self):"""模拟耗时操作2""" time.sleep(0.1) self.assertEqual(2, 1+1)def_test_operation3(self):"""模拟耗时操作3""" time.sleep(0.1) self.assertIn(3, [1, 2, 3])
现象:测试没有覆盖关键业务逻辑、边界情况和异常场景。import unittestfrom coverage import CoverageclassTestCoverageAnalysis(unittest.TestCase):"""测试覆盖率分析的示例"""deftest_complete_coverage_example(self):"""展示全面测试覆盖的示例"""# 被测试的函数defcomplex_function(value):"""包含多种逻辑的复杂函数"""if value isNone:raise ValueError("值不能为None")if value < 0:return"负数"elif value == 0:return"零"elif0 < value < 10:return"小正数"elif10 <= value <= 100:return"中等正数"else:return"大正数"# 测试用例1: None值异常with self.assertRaises(ValueError) as cm: complex_function(None) self.assertEqual("值不能为None", str(cm.exception))# 测试用例2: 负数 self.assertEqual("负数", complex_function(-5))# 测试用例3: 零 self.assertEqual("零", complex_function(0))# 测试用例4: 小正数 self.assertEqual("小正数", complex_function(5))# 测试用例5: 中等正数 self.assertEqual("中等正数", complex_function(50))# 测试用例6: 大正数 self.assertEqual("大正数", complex_function(200))# 测试用例7: 边界值(刚好小于10) self.assertEqual("小正数", complex_function(9))# 测试用例8: 边界值(刚好10) self.assertEqual("中等正数", complex_function(10))# 测试用例9: 边界值(刚好100) self.assertEqual("中等正数", complex_function(100))# 测试用例10: 边界值(刚好大于100) self.assertEqual("大正数", complex_function(101))defrun_with_coverage(self, test_module):"""运行测试并生成覆盖率报告"""# 启动覆盖率收集 cov = Coverage() cov.start()# 运行测试 suite = unittest.TestLoader().loadTestsFromModule(test_module) runner = unittest.TextTestRunner(verbosity=2) runner.run(suite)# 停止收集并生成报告 cov.stop() cov.save()# 输出报告 print("\n=== 覆盖率报告 ===") cov.report()# 生成HTML报告 cov.html_report(directory='coverage_report')return cov# 使用示例if __name__ == '__main__':import sys# 指定要测试的模块 test_module = sys.modules[__name__]# 运行测试并生成覆盖率报告 TestCoverageAnalysis().run_with_coverage(test_module)
现象:随着业务逻辑变化,测试代码难以维护,经常需要大量修改。解决方案:遵循测试设计原则,使用清晰的结构和命名。import unittestclassTestMaintenanceBestPractices(unittest.TestCase):"""测试维护最佳实践的示例"""deftest_principles_for_maintainable_tests(self):"""可维护测试的设计原则"""# 原则1: 每个测试只验证一件事deftest_only_one_thing(self):"""不好的做法:验证多个不相关的事情""" result = some_function() self.assertEqual(result.status, "success") # 验证状态 self.assertEqual(len(result.data), 5) # 验证数据长度 self.assertIn("key", result.data[0]) # 验证数据结构# 应该拆分成多个测试方法# 原则2: 使用清晰的测试命名deftest_calculate_tax_for_high_income(self):"""好的命名:清楚说明测试什么"""pass# 实现省略deftest_xyz(self):"""不好的命名:不清楚测试什么"""pass# 实现省略# 原则3: 避免测试实现细节deftest_internal_implementation(self):"""不好的做法:测试内部实现""" obj = SomeClass()# 直接访问私有属性或调用私有方法# 应该测试公共接口的行为# 原则4: 使用明确的断言消息deftest_with_descriptive_messages(self):"""好的做法:提供清晰的失败消息""" result = calculate_discount(100, 20) self.assertEqual(result, 80, f"折扣计算错误:100打8折应该是80,实际得到{result}")# 原则5: 保持测试独立deftest_independent_of_other_tests(self):"""好的做法:不依赖其他测试的执行顺序或结果"""# 使用setUp准备独立环境# 不依赖全局状态deftest_refactoring_example(self):"""测试重构示例:从难以维护到易于维护"""# 重构前:难以维护的测试deftest_before_refactoring(self):"""包含多个验证,结构混乱"""# 准备测试数据 data = [1, 2, 3, 4, 5] config = {"threshold": 3, "multiplier": 2}# 执行多个操作 filtered = [x for x in data if x > config["threshold"]] transformed = [x * config["multiplier"] for x in filtered] result = sum(transformed)# 多个断言混合 self.assertEqual(len(filtered), 2) self.assertEqual(transformed, [8, 10]) self.assertEqual(result, 18)# 重构后:易于维护的测试deftest_after_refactoring_simple(self):"""专注于单一功能验证""" result = process_data([1, 2, 3, 4, 5]) self.assertEqual(result, 18)# 如果有必要,可以添加更多专门的测试deftest_filtering_logic(self):"""专门测试过滤逻辑""" filtered = filter_data([1, 2, 3, 4, 5], threshold=3) self.assertEqual(filtered, [4, 5])deftest_transformation_logic(self):"""专门测试转换逻辑""" transformed = transform_data([4, 5], multiplier=2) self.assertEqual(transformed, [8, 10])deftest_aggregation_logic(self):"""专门测试聚合逻辑""" result = aggregate_data([8, 10]) self.assertEqual(result, 18)
在深入学习了unittest模块的各项功能后,让我们系统回顾核心知识点,规划后续学习路径,并为下一篇内容做好准备。本节将帮助你巩固所学知识,并提供实用的学习指导。通过本文的学习,你应该掌握了unittest模块的核心要点:- 所有测试类必须继承unittest.TestCase
- tearDown():每个测试方法执行后的清理工作
- setUpClass()和tearDownClass():类级别的初始化和清理
6.2 Python 3.14 unittest最新特性在Python 3.14中,unittest模块迎来了重要更新:
想要深入掌握Python测试,建议按照以下路径学习:- 掌握unittest基础知识:测试用例、夹具、断言
- 测试策略与团队管理:学习如何制定测试策略和领导测试团队
- 《Python测试驱动开发》(Harry Percival著)
- 《Python测试之道》(David Sale著)
- Coursera:Python Testing and Automation
- Udemy:Complete Python Testing Masterclass
下一篇我们将深入探索pytest模块,这是目前Python社区最流行的测试框架。你将学习到:- 如何将现有的unittest测试迁移到pytest
通过对比学习,你将掌握如何根据项目需求选择合适的测试框架,并构建更高效、更可靠的测试体系。
行动指南:立即尝试为你的项目编写第一个unittest测试用例!从最简单的函数开始,逐步扩展到复杂模块。记住,好的测试是代码质量的基石,也是团队协作的保障。如果在学习或实践中遇到问题,欢迎在评论区交流讨论。我们将在后续文章中解答常见问题,并分享更多实战经验。关注我们:每天认识一个Python模块,持续提升Python技能。下一篇文章,我们不见不散!