// 引入必要模块const express = require('express');const { spawnSync } = require('child_process'); // 核心进程调用模块const { readFile, appendFile } = require('fs/promises');const { join } = require('path');const app = express();const port = 8000;// 定义接口,触发 Python 脚本调用app.get("/", async (req, res) => { try { // 1. 准备要传给 Python 的参数(比如大数据数组) const X = [1, 2, 5]; // 示例:大数组/业务数据 const y = [[1,2], [2,3], [1,2]]; // 2. 把参数写入 JSON 文件(避免命令行传参长度限制) const argsPath = join(__dirname, 'scripts/args.json'); await appendFile( argsPath, JSON.stringify({ X, y }), { encoding: 'utf-8', flag: 'w' } // 'w' 表示覆盖写入 ); // 3. 调用 Python 脚本(核心步骤) const pythonProcess = spawnSync('python3', [ // 参数1:Python 脚本路径 join(__dirname, 'scripts/python-script.py'), // 参数2:要执行的 Python 函数名 'first_function', // 参数3:参数文件路径 argsPath, // 参数4:结果输出文件路径 join(__dirname, 'scripts/results.json') ]); // 4. 处理 Python 脚本的输出/错误 const result = pythonProcess.stdout?.toString()?.trim(); const error = pythonProcess.stderr?.toString()?.trim(); if (result === 'OK') { // 读取 Python 生成的结果文件 const resultBuffer = await readFile(join(__dirname, 'scripts/results.json')); const resultData = JSON.parse(resultBuffer.toString()); res.send(`执行成功!结果:${JSON.stringify(resultData)}`); } else { console.error('Python 执行报错:', error); res.status(500).json({ status: 500, message: '脚本执行失败', error }); } } catch (err) { res.status(500).json({ status: 500, message: '服务器错误', error: err.message }); }});// 启动服务app.listen(port, () => { console.log(`服务启动成功:http://localhost:${port}`);});
import jsonimport sys# 定义要执行的业务函数def first_function(): # 1. 读取 Node 传入的参数文件(sys.argv[2] 对应第三个参数) with open(sys.argv[2], 'r') as f: data = json.load(f) # 2. 业务逻辑处理(示例:模拟大数据/AI 计算) X = data["X"] y = data["y"] # 这里可以替换成你的实际逻辑:比如 Scikit-learn 训练模型、Pandas 处理数据 calculated_results = [x + sum(y[i]) for i, x in enumerate(X)] # 3. 把结果写入文件,供 Node.js 读取 with open(sys.argv[3], 'w') as outfile: json.dump(calculated_results, outfile, indent=4) # 4. 输出标识,告诉 Node.js 执行成功 print("OK") sys.stdout.flush() # 强制刷新输出,避免 Node 读取不到# 根据传入的函数名执行对应逻辑(sys.argv[1] 对应第二个参数)if __name__ == "__main__": if sys.argv[1] == 'first_function': first_function()