古法呈现,通过Netflow提取路由器信息,方便做设备的流量组成,以及按照流量比例去划分,可以非常直观的看到路由器上面跑了什么样的流量。
NetDevOps经典自动化协议[Bokeh饼状图, 呈现Netflow数据]
1.任务一: 安装bokeh模块并测试饼状图
安装依赖:
pip install bokeh pandas完整的Bokeh饼状图函数:
from bokeh.plotting import figure, output_file, savefrom bokeh.transform import cumsumfrom bokeh.palettes import Category10import pandas as pdfrom math import piimport osfrom pathlib import PathOUTPUTS_DIR = Path(__file__).resolve().parent / 'outputs'def bokeh_bing(name_list, count_list, bing_name, save_name=None):"""使用 Bokeh 绘制饼状图, 生成交互式 HTML 文件。"""# 构建数据字典, 转换为 DataFramedata_dict = dict(zip(name_list, [float(c) for c in count_list]))data = pd.Series(data_dict).reset_index(name='bytes').rename(columns={'index': 'application'})# 计算每个扇形的角度data['angle'] = data['bytes'] / data['bytes'].sum() * 2 * pi# 分配颜色num = len(data_dict)if num <= 2:data['color'] = Category10[3][:num]elif num <= 10:data['color'] = Category10[num]else:data['color'] = (Category10[10] * ((num // 10) + 1))[:num]# 计算百分比data['percentage'] = (data['bytes'] / data['bytes'].sum() * 100).round(2).astype(str) + '%'# 创建图表p = figure(height=500, width=700, title=bing_name, toolbar_location="right",tools="hover,pan,wheel_zoom,box_zoom,reset,save",tooltips="@application: @bytes (@percentage)", x_range=(-0.5, 1.0))# 绘制饼图p.wedge(x=0, y=1, radius=0.4,start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'),line_color="white", fill_color='color', legend_field='application', source=data)# 美化p.axis.axis_label = Nonep.axis.visible = Falsep.grid.grid_line_color = Nonep.title.text_font_size = '16pt'p.legend.label_text_font_size = '12pt'p.legend.location = "center_right"# 输出到 outputs 目录os.makedirs(OUTPUTS_DIR, exist_ok=True)output_filename = save_name if save_name else str(OUTPUTS_DIR / f"{bing_name}.html")output_file(output_filename, title=bing_name)save(p)print(f"[*] Bokeh 饼状图已生成: {output_filename}")测试代码:bokeh_bing(['名称1', '名称2', '名称3'], [1000, 123, 444], '测试饼图')
测试饼图效果 (在浏览器中打开生成的HTML文件):

2.任务二: SSH采集Netflow数据 → 正则提取 → Bokeh饼状图
组织文件目录:

第一步: 在路由器上配置Netflow
flow record test-record
match application name
collect counter bytes
!
flow monitor test-monitor
record test-record
!
interface GigabitEthernet1
ip flow monitor test-monitor input
第二步: 使用paramiko SSH登录思科路由器, 执行以下命令获取Netflow数据
show flow monitor name qytang-monitor cache format table
第三步: 用正则表达式从CLI回显中提取APP NAME和bytes

★ 使用 re.match 逐行匹配, 提取出两个列表:
app_name_list: 如 ['layer7 mdns', 'prot icmp', 'port ssh', ...]
app_bytes_list: 如 ['24464', '1425', '7085', ...]
APP NAME由前缀(port / layer7 / prot)加空格加应用名组成, bytes为行末的数字
第四步: 调用任务一的bokeh_bing函数生成饼状图
bokeh_bing(app_name_list, app_bytes_list, 'Netflow应用流量分布')
import reimport osimport sys# 添加tools目录到Python路径sys.path.append(os.path.join(os.path.dirname(__file__), 'tools'))from day3_ssh_single_cmd import ssh_runfrom day3_bokeh_bing import bokeh_bingdef get_netflow_app(host, username, password):"""SSH登录路由器, 采集Netflow数据, 正则提取, 绘制Bokeh饼状图。"""# 1. SSH执行命令获取Netflow数据show_result = ssh_run(host, username, password,'show flow monitor name qytang-monitor cache format table')if show_result is None:print("[!] 无法获取Netflow数据")returnprint("\n[*] Netflow数据采集结果:")print(show_result)# 2. ★ 正则提取APP NAME和bytesapp_name_list = []app_bytes_list = []# 定义正则表达式模式# APP NAME由前缀(port / layer7 / prot)加空格加应用名组成# bytes为行末的数字pattern = r'^(port|layer7|prot)\s+(.+?)\s+(\d+)$'for line in show_result.strip().split('\n'):line = line.strip()# 跳过标题行和分隔线if 'APP NAME' in line or '=' in line:continue# 使用正则表达式匹配match = re.match(pattern, line)if match:# 构建完整的APP NAMEapp_name = f"{match.group(1)}{match.group(2)}"app_bytes = match.group(3)app_name_list.append(app_name)app_bytes_list.append(app_bytes)# 3. 打印提取结果if app_name_list and app_bytes_list:print(f"\n[*] 提取到 {len(app_name_list)} 条 Netflow 记录")for name, byt in zip(app_name_list, app_bytes_list):print(f" {name:<25s}{byt} bytes")else:print("\n[!] 未提取到Netflow记录")return# 4. 调用bokeh_bing生成饼状图bokeh_bing(app_name_list, app_bytes_list, 'Netflow应用流量分布')if __name__ == "__main__":# 设置路由器信息router_host = '10.10.1.1' # 用户提供的路由器IProuter_username = 'XXX' # 用户提供的正确用户名router_password = 'XXX' # 用户提供的正确密码print("=== NetDevOps-任务2 ===")print(f"正在从路由器 {router_host} 采集Netflow数据...")get_netflow_app(router_host, router_username, router_password)
最终Netflow饼状图效果 (在浏览器中打开生成的HTML文件):
设备跑的协议越多,颜色块会越多,更直观:

#Python,#SRE,#CCIE,#NetDevops,#AIops,#Linux,#Trae,#cisco,#Rocky,#Netflow,#SNMP,#Syslog