import osimport numpy as npimport matplotlib.pyplot as pltfrom matplotlib.collections import LineCollectionfrom matplotlib.colors import Normalizefrom matplotlib.cm import ScalarMappablefrom matplotlib.lines import Line2Dfrom matplotlib.colors import Normalize, LinearSegmentedColormap# 0. 全局设置OUTDIR = "trajectory_streamline_output"os.makedirs(OUTDIR, exist_ok=True)plt.rcParams["font.family"] = "DejaVu Sans"plt.rcParams["axes.unicode_minus"] = FalseXLIM = (-4.0, 4.2)YLIM = (-3.2, 3.2)# 自定义配色cmap_basic_track = LinearSegmentedColormap.from_list( "basic_track", ["#1d4e89", "#2a9d8f", "#e9c46a", "#f4a261", "#e76f51"], N=256)cmap_adv_bg = LinearSegmentedColormap.from_list( "adv_bg", ["#f8fbff", "#d9ecff", "#a8d0f0", "#6ea8d9", "#3b82c4"], N=256)cmap_adv_track = LinearSegmentedColormap.from_list( "adv_track", ["#4c1d95", "#7c3aed", "#06b6d4", "#10b981", "#fde047"], N=256)# 1. 构造二维流场数据def velocity_field(x, y, t=0.0): """ 生成一个带有双涡旋 + 平移 + 波动扰动的二维速度场 支持标量或数组输入 """ x = np.asarray(x) y = np.asarray(y) t = np.asarray(t) # 涡旋1 x1, y1 = -1.6, 0.9 r1 = (x - x1) ** 2 + (y - y1) ** 2 g1 = 1.75 u1 = -g1 * (y - y1) * np.exp(-r1 / 2.3) v1 = g1 * (x - x1) * np.exp(-r1 / 2.3) # 涡旋2(反向) x2, y2 = 1.7, -0.75 r2 = (x - x2) ** 2 + (y - y2) ** 2 g2 = -1.35 u2 = -g2 * (y - y2) * np.exp(-r2 / 2.7) v2 = g2 * (x - x2) * np.exp(-r2 / 2.7) # 整体平移 + 轻微时变扰动 u_bg = 0.42 + 0.10 * np.cos(1.45 * y + 0.25 * t) v_bg = 0.16 * np.sin(1.10 * x - 0.30 * t) u = u1 + u2 + u_bg v = v1 + v2 + v_bg return u, vdef scalar_field(x, y): """ 生成一个用于高级图背景的标量场(浓度/温度/势函数风格) """ x = np.asarray(x) y = np.asarray(y) z = ( 0.75 + 1.60 * np.exp(-((x + 1.2) ** 2 + (y + 1.35) ** 2) / 1.7) + 1.10 * np.exp(-((x - 1.85) ** 2 + (y - 0.65) ** 2) / 2.0) + 0.35 * np.sin(1.20 * x - 0.85 * y) + 0.18 * np.cos(1.70 * y) ) return z# 2. 轨迹积分(RK4)def rk4_step(x, y, t, dt): u1, v1 = velocity_field(x, y, t) k1x, k1y = dt * u1, dt * v1 u2, v2 = velocity_field(x + 0.5 * k1x, y + 0.5 * k1y, t + 0.5 * dt) k2x, k2y = dt * u2, dt * v2 u3, v3 = velocity_field(x + 0.5 * k2x, y + 0.5 * k2y, t + 0.5 * dt) k3x, k3y = dt * u3, dt * v3 u4, v4 = velocity_field(x + k3x, y + k3y, t + dt) k4x, k4y = dt * u4, dt * v4 x_new = x + (k1x + 2 * k2x + 2 * k3x + k4x) / 6.0 y_new = y + (k1y + 2 * k2y + 2 * k3y + k4y) / 6.0 return x_new, y_newdef compute_curvature(x, y): """ 计算二维曲线离散曲率 """ x = np.asarray(x) y = np.asarray(y) dx = np.gradient(x) dy = np.gradient(y) ddx = np.gradient(dx) ddy = np.gradient(dy) denom = (dx * dx + dy * dy) ** 1.5 + 1e-12 kappa = np.abs(dx * ddy - dy * ddx) / denom return kappadef integrate_trajectory(x0, y0, dt=0.04, n_steps=320): """ 从初始点出发积分一条轨迹 """ xs = [x0] ys = [y0] ts = [0.0] x, y = x0, y0 for i in range(n_steps - 1): t = i * dt x_new, y_new = rk4_step(x, y, t, dt) if (x_new < XLIM[0]) or (x_new > XLIM[1]) or (y_new < YLIM[0]) or (y_new > YLIM[1]): break xs.append(x_new) ys.append(y_new) ts.append(t + dt) x, y = x_new, y_new xs = np.array(xs) ys = np.array(ys) ts = np.array(ts) u, v = velocity_field(xs, ys, ts) speed = np.hypot(u, v) conc = scalar_field(xs, ys) curvature = compute_curvature(xs, ys) return { "x": xs, "y": ys, "t": ts, "speed": speed, "conc": conc, "curvature": curvature }def generate_seeds(): """ 生成一组初始点 """ left_y = np.linspace(-2.5, 2.5, 11) left_x = np.full_like(left_y, -3.4) lower_x = np.linspace(-3.1, -1.4, 4) lower_y = np.full_like(lower_x, -2.7) upper_x = np.linspace(-3.0, -1.6, 3) upper_y = np.full_like(upper_x, 2.7) xs = np.concatenate([left_x, lower_x, upper_x]) ys = np.concatenate([left_y, lower_y, upper_y]) return list(zip(xs, ys))# 3. 画渐变轨迹def make_segments(x, y): pts = np.array([x, y]).T.reshape(-1, 1, 2) segs = np.concatenate([pts[:-1], pts[1:]], axis=1) return segsdef add_colored_trajectory(ax, x, y, values, cmap, norm, linewidths=2.0, alpha=1.0, zorder=3): segs = make_segments(x, y) lc = LineCollection(segs, cmap=cmap, norm=norm) lc.set_array(values[:-1]) if np.isscalar(linewidths): lc.set_linewidth(linewidths) else: lc.set_linewidth(linewidths[:-1]) lc.set_alpha(alpha) lc.set_capstyle("round") lc.set_joinstyle("round") lc.set_zorder(zorder) ax.add_collection(lc) return lc# 4. 基础版图def plot_basic(trajs, out_png): xg = np.linspace(XLIM[0], XLIM[1], 220) yg = np.linspace(YLIM[0], YLIM[1], 180) X, Y = np.meshgrid(xg, yg) U, V = velocity_field(X, Y, 0.0) all_speed = np.concatenate([tr["speed"] for tr in trajs]) speed_norm = Normalize( vmin=np.percentile(all_speed, 5), vmax=np.percentile(all_speed, 98) ) fig, ax = plt.subplots(figsize=(10.2, 7.8), dpi=220, facecolor="white") ax.set_facecolor("white") ax.streamplot( X, Y, U, V, color="#d7e3f0", density=1.35, linewidth=0.85, arrowsize=0.65, minlength=0.15, zorder=1 ) last_lc = None for tr in trajs: x = tr["x"] y = tr["y"] speed = tr["speed"] ax.plot(x, y, color="white", lw=3.2, alpha=0.95, zorder=2) last_lc = add_colored_trajectory( ax, x, y, speed, cmap=cmap_basic_track, norm=speed_norm, linewidths=2.2, alpha=0.98, zorder=3 ) ax.scatter(x[0], y[0], s=26, facecolor="white", edgecolor="black", linewidth=0.6, zorder=4) ax.scatter(x[-1], y[-1], s=24, facecolor="#334155", edgecolor="white", linewidth=0.5, zorder=4) ax.set_xlim(XLIM) ax.set_ylim(YLIM) ax.set_aspect("equal", adjustable="box") ax.set_xlabel("X", fontsize=12) ax.set_ylabel("Y", fontsize=12) ax.set_title("Basic Multivariable Trajectory Streamline Plot", fontsize=18, pad=14) ax.grid(alpha=0.18, linestyle="--") cbar = fig.colorbar(last_lc, ax=ax, pad=0.02, fraction=0.045) cbar.set_label("Speed along trajectory", fontsize=11) cbar.ax.tick_params(labelsize=9) handles = [ Line2D([0], [0], marker="o", color="none", markerfacecolor="white", markeredgecolor="black", markersize=6, label="Start point"), Line2D([0], [0], marker="o", color="none", markerfacecolor="#334155", markeredgecolor="white", markersize=6, label="End point") ] ax.legend(handles=handles, loc="upper left", fontsize=9, frameon=True) plt.tight_layout() plt.savefig(out_png, bbox_inches="tight", facecolor="white")# 5. 进阶版图def plot_advanced(trajs, out_png): xg = np.linspace(XLIM[0], XLIM[1], 260) yg = np.linspace(YLIM[0], YLIM[1], 220) X, Y = np.meshgrid(xg, yg) U, V = velocity_field(X, Y, 0.0) C = scalar_field(X, Y) all_speed = np.concatenate([tr["speed"] for tr in trajs]) all_curv = np.concatenate([tr["curvature"] for tr in trajs]) speed_norm = Normalize( vmin=np.percentile(all_speed, 5), vmax=np.percentile(all_speed, 98) ) curv_low = np.percentile(all_curv, 8) curv_high = np.percentile(all_curv, 95) def map_width(kappa): kk = np.clip((kappa - curv_low) / (curv_high - curv_low + 1e-12), 0, 1) return 1.1 + 3.6 * kk fig, ax = plt.subplots(figsize=(10.8, 8.0), dpi=240, facecolor="white") ax.set_facecolor("#f8fbff") cf = ax.contourf( X, Y, C, levels=20, cmap=cmap_adv_bg, alpha=0.96, zorder=0 ) ax.contour( X, Y, C, levels=10, colors="#64748b", linewidths=0.45, alpha=0.22, zorder=1 ) ax.streamplot( X, Y, U, V, color=(0.38, 0.49, 0.62, 0.22), density=1.15, linewidth=0.55, arrowsize=0.55, minlength=0.12, zorder=2 ) last_lc = None exposure = np.array([np.trapz(tr["conc"], tr["t"]) for tr in trajs]) top_ids = set(np.argsort(exposure)[-4:]) for i, tr in enumerate(trajs): x = tr["x"] y = tr["y"] speed = tr["speed"] curv = tr["curvature"] widths = map_width(curv) alpha = 0.84 if i in top_ids else 0.58 ax.plot(x, y, color=(1, 1, 1, 0.25), lw=np.max(widths) + 1.0, zorder=3) last_lc = add_colored_trajectory( ax, x, y, speed, cmap=cmap_adv_track, norm=speed_norm, linewidths=widths, alpha=alpha, zorder=4 ) ax.scatter(x[0], y[0], s=20, facecolor="white", edgecolor="black", linewidth=0.55, zorder=5) ax.scatter(x[-1], y[-1], s=22, facecolor="#0f172a", edgecolor="white", linewidth=0.55, zorder=5) ax.set_xlim(XLIM) ax.set_ylim(YLIM) ax.set_aspect("equal", adjustable="box") ax.set_xlabel("X", fontsize=12, color="black") ax.set_ylabel("Y", fontsize=12, color="black") ax.set_title("Advanced Multivariable Trajectory Streamline Plot", fontsize=18, color="black", pad=14) ax.tick_params(colors="black", labelsize=10) for spine in ax.spines.values(): spine.set_color((0, 0, 0, 0.18)) ax.grid(alpha=0.10, linestyle="--", color="gray") cbar = fig.colorbar(last_lc, ax=ax, pad=0.02, fraction=0.045) cbar.set_label("Speed along trajectory", fontsize=11, color="black") cbar.ax.tick_params(labelsize=9, colors="black") cbar.outline.set_edgecolor((0, 0, 0, 0.25)) handles = [ Line2D([0], [0], color="#7c3aed", lw=1.2, label="Low curvature"), Line2D([0], [0], color="#06b6d4", lw=2.6, label="Medium curvature"), Line2D([0], [0], color="#fde047", lw=4.2, label="High curvature"), Line2D([0], [0], marker="o", color="none", markerfacecolor="white", markeredgecolor="black", markersize=6, label="Start point"), Line2D([0], [0], marker="o", color="none", markerfacecolor="#0f172a", markeredgecolor="white", markersize=6, label="End point") ] leg = ax.legend( handles=handles, loc="upper left", fontsize=9, frameon=True, facecolor=(1, 1, 1, 0.88), edgecolor=(0, 0, 0, 0.15) ) ax.text( 0.02, 0.02, "Background: scalar concentration field\n" "Line color: speed\n" "Line width: curvature", transform=ax.transAxes, fontsize=9.5, color="black", ha="left", va="bottom", bbox=dict(boxstyle="round,pad=0.35", facecolor=(1, 1, 1, 0.82), edgecolor=(0, 0, 0, 0.15)) ) plt.tight_layout() plt.savefig(out_png, bbox_inches="tight", facecolor=fig.get_facecolor())def main(): seeds = generate_seeds() trajs = [integrate_trajectory(x0, y0) for x0, y0 in seeds] basic_png = os.path.join(OUTDIR, "trajectory_stream_basic.png") advanced_png = os.path.join(OUTDIR, "trajectory_stream_advanced.png") plot_basic(trajs, basic_png) plot_advanced(trajs, advanced_png)if __name__ == "__main__": main()