Redis作为高性能的键值数据库,在现代应用开发中扮演着重要角色。Python通过redis-py库提供了丰富的Redis操作接口,能够帮助开发者高效地实现数据缓存、会话管理、消息队列等多种功能。本文将介绍15个实用的自动化脚本,涵盖数据操作、性能优化和高级功能等多个典型场景。
环境准备
首先确保已安装redis-py库:
pip install redis
所有示例默认连接本地Redis服务器(localhost:6379,数据库0),实际使用时请根据生产环境调整连接参数。
1. 基础键值操作
场景:简单的数据存储与检索
import redisdef basic_key_operations(): r = redis.Redis(host='localhost', port=6379, db=0) # 设置带过期时间的键值 r.setex('session:user123', 3600, 'active') # 获取值 status = r.get('session:user123') print(f"Session status: {status.decode() if status else 'Not found'}") # 检查键是否存在 exists = r.exists('session:user123') print(f"Key exists: {exists}")if __name__ == "__main__": basic_key_operations()2. 批量操作键值
场景:高效处理大量数据
def batch_operations(): r = redis.Redis(host='localhost', port=6379, db=0) # 批量设置多个键值 data = { 'user:1:name': 'Alice', 'user:1:email': 'alice@example.com', 'user:1:age': '25' } r.mset(data) # 批量获取多个键值 keys = ['user:1:name', 'user:1:email', 'user:1:age'] values = r.mget(keys) for key, value in zip(keys, values): print(f"{key}: {value.decode() if value else 'None'}")3. 哈希表高级操作
场景:存储结构化对象
def advanced_hash_operations(): r = redis.Redis(host='localhost', port=6379, db=0) user_id = "user:1001" # 设置多个字段 user_data = { 'name': 'Bob Smith', 'email': 'bob@example.com', 'score': '95', 'last_login': '2025-12-25' } r.hset(user_id, mapping=user_data) # 获取所有字段 all_fields = r.hgetall(user_id) print("User data:") for field, value in all_fields.items(): print(f" {field.decode()}: {value.decode()}") # 增加数值字段 r.hincrby(user_id, 'score', 5) new_score = r.hget(user_id, 'score') print(f"Updated score: {new_score.decode()}")4. 列表队列操作
场景:实现任务队列
def queue_operations(): r = redis.Redis(host='localhost', port=6379, db=0) queue_key = 'task_queue' # 生产者:添加任务到队列 tasks = ['process_data', 'send_email', 'generate_report', 'cleanup'] for task in tasks: r.lpush(queue_key, task) print(f"Added task: {task}") # 消费者:处理任务 while True: task = r.brpop(queue_key, timeout=1) if task: task_name = task[1].decode() print(f"Processing task: {task_name}") # 模拟任务处理 import time time.sleep(0.5) print(f"Completed: {task_name}") else: print("No more tasks in queue") break5. 集合运算
场景:用户标签系统
def set_operations(): r = redis.Redis(host='localhost', port=6379, db=0) # 用户标签集合 r.sadd('user:101:tags', 'python', 'redis', 'backend') r.sadd('user:102:tags', 'python', 'frontend', 'javascript') r.sadd('user:103:tags', 'redis', 'database', 'backend') # 查找共同标签 common_tags = r.sinter('user:101:tags', 'user:103:tags') print("Common tags between user 101 and 103:") for tag in common_tags: print(f" - {tag.decode()}") # 合并所有标签 all_tags = r.sunion('user:101:tags', 'user:102:tags', 'user:103:tags') print("\nAll unique tags:") for tag in sorted(all_tags): print(f" - {tag.decode()}")6. 有序集合排行榜
场景:游戏得分排行榜
def leaderboard_system(): r = redis.Redis(host='localhost', port=6379, db=0) leaderboard_key = 'game:leaderboard' # 添加玩家得分 players = { 'player1': 1500, 'player2': 2200, 'player3': 1800, 'player4': 2500, 'player5': 1900 } r.zadd(leaderboard_key, players) # 获取前3名 top_players = r.zrevrange(leaderboard_key, 0, 2, withscores=True) print("Top 3 Players:") for rank, (player, score) in enumerate(top_players, 1): print(f"{rank}. {player.decode()}: {int(score)} points") # 获取玩家排名 player_rank = r.zrevrank(leaderboard_key, 'player3') player_score = r.zscore(leaderboard_key, 'player3') print(f"\nPlayer3 rank: {player_rank + 1 if player_rank is not None else 'N/A'}") print(f"Player3 score: {player_score}")7. 发布订阅模式
场景:实时通知系统
import threadingimport timedef pubsub_system(): r = redis.Redis(host='localhost', port=6379, db=0) def subscriber(channel_name): pubsub = r.pubsub() pubsub.subscribe(channel_name) print(f"Subscriber started on channel: {channel_name}") for message in pubsub.listen(): if message['type'] == 'message': data = message['data'].decode() print(f"Received: {data}") if data == 'exit': break # 启动订阅者线程 channel = 'notifications' sub_thread = threading.Thread(target=subscriber, args=(channel,)) sub_thread.daemon = True sub_thread.start() time.sleep(1) # 等待订阅者准备 # 发布消息 messages = [ "System started", "User logged in", "Data updated", "exit" ] for msg in messages: r.publish(channel, msg) time.sleep(0.5) sub_thread.join(timeout=2)8. 管道批量操作
场景:减少网络延迟
def pipeline_performance(): r = redis.Redis(host='localhost', port=6379, db=0) # 不使用管道 start = time.time() for i in range(100): r.set(f'temp:{i}', f'value:{i}') without_pipeline = time.time() - start # 清理数据 for i in range(100): r.delete(f'temp:{i}') # 使用管道 start = time.time() pipe = r.pipeline() for i in range(100): pipe.set(f'temp:{i}', f'value:{i}') pipe.execute() with_pipeline = time.time() - start print(f"Without pipeline: {without_pipeline:.3f} seconds") print(f"With pipeline: {with_pipeline:.3f} seconds") print(f"Performance improvement: {without_pipeline/with_pipeline:.1f}x")9. 事务保证原子性
场景:账户余额转账
def transaction_example(): r = redis.Redis(host='localhost', port=6379, db=0) # 初始化账户余额 r.set('account:A', '1000') r.set('account:B', '500') def transfer_funds(from_account, to_account, amount): pipe = r.pipeline(transaction=True) try: # 监视账户变化 pipe.watch(from_account, to_account) # 检查余额 from_balance = int(pipe.get(from_account) or 0) if from_balance < amount: print("Insufficient funds") return False # 开始事务 pipe.multi() pipe.decrby(from_account, amount) pipe.incrby(to_account, amount) pipe.execute() print(f"Transferred {amount} from {from_account} to {to_account}") return True except redis.WatchError: print("Transaction failed due to concurrent modification") return False finally: pipe.reset() # 执行转账 transfer_funds('account:A', 'account:B', 200) # 验证结果 print(f"Account A: {r.get('account:A').decode()}") print(f"Account B: {r.get('account:B').decode()}")10. Lua脚本执行
场景:复杂原子操作
def lua_scripting(): r = redis.Redis(host='localhost', port=6379, db=0) # Lua脚本:原子性地检查并设置值 lua_script = """ local key = KEYS local value = ARGV local ttl = ARGV local current = redis.call('GET', key) if current then return {false, current} else redis.call('SET', key, value) redis.call('EXPIRE', key, ttl) return {true, value} end """ # 注册脚本 script = r.register_script(lua_script) # 执行脚本 key = 'cache:item' result = script(keys=[key], args=['cached_data', '60']) if result[0]: print(f"Cache set successfully: {result[1]}") else: print(f"Cache already exists: {result[1]}")11. 键过期与TTL管理
场景:缓存失效管理
def expiration_management(): r = redis.Redis(host='localhost', port=6379, db=0) # 设置带过期时间的键 cache_key = 'api:response:user:123' r.setex(cache_key, 300, '{"name": "John", "status": "active"}') # 5分钟过期 # 检查剩余时间 ttl = r.ttl(cache_key) print(f"Cache will expire in {ttl} seconds") # 更新过期时间 if ttl < 60: # 如果剩余时间小于1分钟 r.expire(cache_key, 600) # 延长到10分钟 print("Cache expiration extended to 10 minutes") # 设置永久键 r.set('config:site_name', 'MyApp') r.persist('config:site_name') # 移除过期时间12. 扫描大键集合
场景:安全遍历大量键
def scan_keys(): r = redis.Redis(host='localhost', port=6379, db=0) # 使用SCAN代替KEYS(避免阻塞) pattern = 'user:*' cursor = '0' user_keys = [] while cursor != 0: cursor, keys = r.scan(cursor=cursor, match=pattern, count=100) user_keys.extend(keys) print(f"Found {len(user_keys)} user keys") # 批量获取用户信息 for i in range(0, len(user_keys), 10): batch = user_keys[i:i+10] values = r.mget(batch) for key, value in zip(batch, values): if value: print(f"{key.decode()}: {value.decode()[:50]}...")13. 位图操作
场景:用户在线状态追踪
def bitmap_operations(): r = redis.Redis(host='localhost', port=6379, db=0) # 记录用户每日登录状态 bitmap_key = 'online:2025-12-25' # 用户ID 1001-1010在25日登录 for user_id in range(1001, 1011): r.setbit(bitmap_key, user_id, 1) # 检查特定用户是否登录 user_1005_status = r.getbit(bitmap_key, 1005) print(f"User 1005 online: {'Yes' if user_1005_status else 'No'}") # 统计在线用户数 online_count = r.bitcount(bitmap_key) print(f"Total online users: {online_count}") # 查找第一个在线的用户 first_online = r.bitpos(bitmap_key, 1) print(f"First online user ID: {first_online}")14. 地理空间索引
场景:附近位置搜索
def geospatial_operations(): r = redis.Redis(host='localhost', port=6379, db=0) # 添加地理位置 locations_key = 'cities:locations' cities = { 'Beijing': (116.4074, 39.9042), 'Shanghai': (121.4737, 31.2304), 'Guangzhou': (113.2644, 23.1291), 'Shenzhen': (114.0579, 22.5431) } r.geoadd(locations_key, cities) # 查找附近的城市 center = (116.4074, 39.9042) # 北京坐标 nearby = r.georadius( locations_key, center[0], center[1], 500, # 500公里半径 unit='km', withdist=True, sort='ASC' ) print("Cities within 500km of Beijing:") for city, distance in nearby: print(f" {city.decode()}: {distance:.1f} km") # 计算两个城市距离 distance = r.geodist(locations_key, 'Beijing', 'Shanghai', unit='km') print(f"\nDistance Beijing-Shanghai: {distance} km")15. 连接池与连接管理
场景:生产环境连接优化
import redisfrom redis.connection import ConnectionPooldef connection_pool_management(): # 创建连接池 pool = ConnectionPool( host='localhost', port=6379, db=0, max_connections=10, socket_timeout=5, retry_on_timeout=True ) # 使用连接池 r = redis.Redis(connection_pool=pool) try: # 测试连接 r.ping() print("Redis connection successful") # 性能测试 import time start = time.time() for i in range(1000): r.set(f'test:{i}', f'data:{i}') elapsed = time.time() - start print(f"1000 operations completed in {elapsed:.2f} seconds") print(f"Operations per second: {1000/elapsed:.0f}") except redis.ConnectionError as e: print(f"Connection failed: {e}") finally: # 清理连接池 pool.disconnect() print("Connection pool cleaned up")这些脚本覆盖了Redis主要应用场景,从基础操作到高级功能,可以帮助开发者高效地实现各种数据存储和处理需求。在实际应用中,建议根据具体业务场景选择合适的操作方式,并结合性能监控和优化策略,可以显著提升应用性能和开发效率。
如果你觉得这篇文章有用,欢迎点赞、转发、收藏、留言、推荐❤!
------加入知识场与更多人一起学习------https://ima.qq.com/wiki/?shareId=f2628818f0874da17b71ffa0e5e8408114e7dbad46f1745bbd1cc1365277631c
https://ima.qq.com/wiki/?shareId=66042e013e5ccae8371b46359aa45b8714f435cc844ff0903e27a64e050b54b5