在日常C#开发中,你是不是经常遇到这样的场景:用户在搜索框疯狂输入,每次输入都触发一次API调用;或者多个异步操作同时进行,结果却乱序返回,界面显示的数据"驴唇不对马嘴"?更糟糕的是,你写了一堆嵌套回调、状态机、线程同步代码,最后自己都看不懂了。
使用传统异步模式处理这类场景,代码量往往会膨胀3-5倍。但如果用Rx.NET,同样的功能只需要几行优雅的代码就能搞定。
读完这篇文章,你将掌握:✅ Rx.NET的核心思想与适用场景✅ 3个立即可用的实战案例(从入门到进阶)✅ 规避常见陷阱的最佳实践
咱们直接上干货,用最简单的Console应用展示Rx.NET的魔力。
Rx.NET就是把异步数据源当作"可观察的集合"来处理,就像你用LINQ查询数据库一样自然。
传统的异步编程就像"被动接电话"——事件来了你得赶紧处理,代码分散在各个回调里。而Rx.NET则是"主动管理数据流"——你定义好规则,数据自动按你的要求流转。
✅ 适用场景:
❌ 不适用场景:
用户在搜索框输入时,每次按键都触发API调用,服务器压力山大,用户体验也差。传统做法需要手动管理Timer、清理旧请求,代码容易出错。
using System.Reactive.Linq;using System.Reactive.Subjects;namespaceAppRxNet{internalclassProgram {staticvoidMain(string[] args) { Console.OutputEncoding = System.Text.Encoding.UTF8; Console.WriteLine("🔍 模拟搜索框输入(输入'exit'退出)\n");// 创建一个Subject作为用户输入的数据源 var searchInput = new Subject<string>();// 核心逻辑:防抖 + 去重 + 过滤 var searchStream = searchInput .Throttle(TimeSpan.FromMilliseconds(500)) // 500ms内无新输入才触发 .DistinctUntilChanged() // 过滤连续重复值 .Where(term => !string.IsNullOrWhiteSpace(term) && term.Length >= 2);// 订阅处理结果 searchStream.Subscribe(term => { Console.WriteLine($"✅ 发起搜索请求: '{term}'");// 这里可以调用真实API });// 模拟用户输入 while (true) {var input = Console.ReadLine();if (input == "exit") break; searchInput.OnNext(input); } Console.WriteLine("👋 程序结束"); } }}
注意:这里看到是不是是事件订阅与发布。
你需要同时调用3个API获取数据,传统方式要么用Task.WhenAll(无法控制顺序),要么手写状态机(复杂且易错)。
using System.Reactive.Linq;using System.Reactive.Subjects;namespaceAppRxNet{internalclassProgram {staticvoidMain(string[] args) { Console.OutputEncoding = System.Text.Encoding.UTF8; Console.WriteLine("🌐 并发请求示例\n");// 模拟三个异步API调用 var api1 = SimulateApiCall("用户信息", 1000);var api2 = SimulateApiCall("订单列表", 1500);var api3 = SimulateApiCall("推荐商品", 800);// 方案1:等待所有结果(类似Task.WhenAll) Console.WriteLine("📦 方案1: 等待所有结果"); Observable.Zip(api1, api2, api3, (u, o, r) => new { User = u, Orders = o, Recommendations = r }) .Subscribe( result => Console.WriteLine($"✅ 全部完成: {result.User}, {result.Orders}, {result.Recommendations}"), error => Console.WriteLine($"❌ 错误: {error.Message}"), () => Console.WriteLine("🏁 所有请求完成\n") ); Console.ReadLine();// 方案2:谁快用谁(最快响应优先) Console.WriteLine("📦 方案2: 最快响应优先"); Observable.Amb(api1, api2, api3) .Subscribe( result => Console.WriteLine($"⚡ 最快返回: {result}"), () => Console.WriteLine("🏁 完成\n") ); Console.ReadLine();// 方案3:按顺序逐个处理(Concat) Console.WriteLine("📦 方案3: 按顺序执行"); Observable.Concat(api1, api2, api3) .Subscribe( result => Console.WriteLine($"📝 按序返回: {result}"), () => Console.WriteLine("🏁 全部完成") ); Console.ReadLine(); }// 模拟异步API调用 static IObservable<string> SimulateApiCall(string name, int delayMs) {return Observable.FromAsync(async () => {await Task.Delay(delayMs);return$"{name}(耗时{delayMs}ms)"; }); } }}
| Zip | ||
| Amb | ||
| Concat |
在实际项目中,我经常这样用:
注意:这个是不是有点Task的感觉!
用户快速切换搜索词时,旧的请求还没返回,新的请求又发出了。如果不处理,可能导致界面显示的是旧结果。
internalclassProgram{staticvoidMain() { Console.OutputEncoding = System.Text.Encoding.UTF8; Console.WriteLine("🔍 智能搜索(自动取消过期请求)\n"); Console.WriteLine("提示:快速输入多个词,观察只有最后一个请求返回结果\n");var searchInput = new Subject<string>();// 核心逻辑:Switch会自动取消未完成的旧请求 var intelligentSearch = searchInput .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .Select(term => { Console.WriteLine($"🚀 发起请求: '{term}'");return SearchApi(term); // 返回Observable }) .Switch() // 🔥 关键:自动取消旧请求,只保留最新的 .Subscribe( result => Console.WriteLine($"✅ 收到结果: {result}"), error => Console.WriteLine($"❌ 错误: {error.Message}") );// 模拟用户快速输入 while (true) {var input = Console.ReadLine();if (input == "exit") break; searchInput.OnNext(input); } intelligentSearch.Dispose(); Console.WriteLine("👋 程序结束"); }// 模拟搜索API(随机延迟) static IObservable<string> SearchApi(string term) {return Observable.FromAsync(async () => {var delay = new Random().Next(500, 2000); Console.WriteLine($" ⏳ '{term}' 查询中... (预计{delay}ms)");await Task.Delay(delay);return$"'{term}' 的搜索结果"; }); }}注意:Switch确保只有最后一个请求的结果会被处理,这个应用比较神奇。
Switch的工作机制:
// 使用TakeUntil的替代方案 var searchStream = searchInput .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .SelectMany(term => SearchApi(term).TakeUntil(searchInput) // 新输入到来时取消当前请求 ); Switch vs TakeUntil:
Switch:更简洁,语义清晰TakeUntil:更灵活,可以指定任意"取消信号"| Throttle | ||
| DistinctUntilChanged | ||
| Where | ||
| Select | ||
| SelectMany | ||
| Switch | ||
| Zip | ||
| Merge |
using (var subscription = observable.Subscribe(...)) { // 业务逻辑 } // 自动清理 observable .ObserveOn(SynchronizationContext.Current) // 切换到UI线程 .Subscribe(data => UpdateUI(data)); // 错误示范 observable.Subscribe(x => Console.WriteLine(x)); // 正确做法 observable.Subscribe( x => Console.WriteLine(x), ex => Console.WriteLine($"错误: {ex.Message}"), () => Console.WriteLine("完成") ); ✅ Rx.NET把异步当数据流:用LINQ的思维处理异步,代码简洁3-5倍✅ 组合操作符威力巨大:Throttle、Switch、Zip等解决90%的异步场景✅ 记得资源管理:Dispose订阅、处理OnError、注意线程上下文
问题1:你在项目中遇到过哪些"异步地狱"场景?用传统方式是怎么解决的?问题2:对比async/await,你觉得Rx.NET的优势和劣势分别是什么?
欢迎在评论区分享你的实战经验!如果这篇文章帮到你了,记得点赞+收藏,下次遇到异步问题时随时翻出来参考。
标签:#CSharp#响应式编程#Rx.NET#异步编程#性能优化
💡 一句话金句:
1. "异步编程不是回调地狱,而是优雅的数据流" 2. "Switch操作符:让过期请求自动消失的魔法" 3. "Rx.NET = LINQ + 时间维度 + 异步能力"