可转债网格交易Python实战:手把手教你写出自动化交易代码
最近后台收到很多读者留言,问可转债网格交易的具体实现方法。今天我们就用Python写一个完整的网格交易策略,包含参数设置、自动下单、风险控制等核心功能,代码可直接运行(需配合券商接口)。
一、网格交易原理:像渔网一样捕捉波动
网格交易的核心是“低买高卖”,通过在价格波动区间内设置固定间距的买卖档位,自动执行交易。例如:设定某转债价格区间为100-120元,每跌2元买入1手,每涨2元卖出1手。当价格在100-120元之间波动时,反复赚取差价;若价格突破区间,则停止交易或调整策略。波动适中:多数转债日内波动1%-3%,符合网格间距要求;债底保护:即使短期被套,到期还本付息的特性降低了长期风险。
二、代码实现:从参数设置到自动下单
以下是基于easytrader(模拟券商接口)的网格交易代码,支持自定义参数、实时监控和自动交易。import pandas as pd
import sqlite3
from datetime import datetime
from datetime import time
import datetime
import json
class NewTaskInfo(object):
def __init__(self):
self.ID = 0
self.szTaskName = ""
self.szNickName = ""
self.lStatus = 1
self.lCur = 0
self.lGridPrice = 0.0
self.lGridValue = 0.0
self.lMaxSell = 0
self.lMaxPrice = 0.0
self.lFallPrice = 0.0
self.lGridCount = 10
self.lSzGridCount = 10
self.lNewPrice = 0.0
self.lDealPrice = 0.0
self.lVol = 0.0
self.lOweCount = 0
self.lMaxOweCount=0
self.lMinOweCount=0
#0无 1买 2卖 3取消
self.lDoCount=0
self.lBuyCount=0
self.lSellCount=0
self.lDoType=0
self.lBonusDate = datetime.datetime.now()+datetime.timedelta(seconds=-90000)
self.lDoTime = datetime.datetime.now()+datetime.timedelta(seconds=-90000)
self.lTradeTime = datetime.datetime.now()+datetime.timedelta(seconds=-90000)
self.lCancelTime = datetime.datetime.now()+datetime.timedelta(seconds=-90000)
self.lLastPriceTime = datetime.datetime.now()+datetime.timedelta(seconds=-90000)
self.lLastSaveTime = datetime.datetime.now()
self.lLastTime=0
self.lDoPrice = 0.0
self.lMaxPrice=0.0
self.lMaxTime=datetime.datetime.now()+datetime.timedelta(seconds=-90000)
self.sh=0
self.debt=0
self.show_count = 0
self.lNewTask=0
self.have_count = 0
pass
def initialize(context):
# 初始化策略
# 是否交易
g.err = 0
g.is_trade = 0
g.is_auto = 1
#是否融
g.margin = 0
#是否盘后
g.do_end = 1
# 当天最大买或卖数
g.max_buy_count = 40
g.max_trade_count = 15
#最大倍数
g.max_mul = 2
# 资金少于则不买入
g.limit_money = 1000
# 买卖少于则不操作
g.min_buy_money = 700
g.max_buy_money = 3000
# 留多少钱不回购
g.out_money = 1000
# 放大量
g.more_vol = 3
# 时间状态 0未开始 1交易中 2交易中快结束 9盘后
g.time_state = 1
g.money = 0
g.buy_money = 0.0
g.buy_str = ''
g.last_money = 0
g.need_money = 0.0
g.done = 0
g.sell_count = 0
g.trade_count = 0
g.last_name = ""
g.have_count = 0
g.TaskList = []
g.bLastTrade = 0
g.last_log_time = datetime.datetime.now()+datetime.timedelta(days=-1)
g.last_trade_log_time = datetime.datetime.now()
g.last_save_money_time = datetime.datetime.now()+datetime.timedelta(days=-1)
g.last_read_json_time = datetime.datetime.now()
g.lCheckOweTime=datetime.datetime.now()+datetime.timedelta(seconds=-90000)
g.money = context.portfolio.cash
# 文件
g.file_path = get_research_path() + 'upload_file/UserGridS.db'
g.file_json_path = get_research_path() + 'upload_file/UserGridJson.txt'
log.info(g.file_path)
print("初始资金=",context.portfolio.cash,"留=",g.out_money)
# 创建数据库引擎对象
#LoadTradeLog()
LoadTaskList()
ClearTradeLog(720,180)
do_one_task(context,1)
# 定义一个周期处理函数,每3秒执行一次
if g.is_trade>0 or get_position_count(context)>0:
run_interval(context, interval_handle, seconds = 3)
run_daily(context, after_trading_order_test, time="15:02")
run_daily(context, after_trading_order_test, time="15:10")
run_daily(context, after_trading_order_test, time="15:15")
run_daily(context, after_trading_order_test, time="15:20")
run_daily(context, after_trading_order_test, time="15:25")
run_daily(context, after_trading_order_test, time="15:29")
g.is_trade = 1
else:
g.is_trade = 0
g.security = "600570.SS"
set_universe(g.security)
pass
def do_one_task(context,first):
print("执行一次开始 卖出",first)
#非初始化
if first<=0:
g.done = 1
CheckUserOrderList()
CheckBonusList()
g.have_count=0
CheckOweTaskList(context)
WriteUserMoney()
for TaskInfo in g.TaskList:
#SaveTaskInfo(TaskInfo);
#IsTaskStart(TaskInfo)
CheckTaskInfo(TaskInfo,first)
SaveTaskPrice(TaskInfo)
# if TaskInfo.szTaskName=="113549.SS":
# SaveTradeLog(TaskInfo,TaskInfo.lGridPrice,10,",test")
if g.is_trade<=0:
StartList = []
StopList = []
#000001 启动
StartList.append("0")
StopList.append("123099")
print(StartList)
for info in StartList:
print(info)
StartStopTask(info,1)
for info in StopList:
StartStopTask(info,0)
ReadJsonFile()
PrintTaskList(0)
log.info("执行一次结束")
pass
def do_end_task(context):
CheckOweTaskList(context)
for TaskInfo in g.TaskList:
SaveTaskPrice(TaskInfo)
GetTaskItemPrice(context,TaskInfo)
def StartStopTask(szTaskName,dwStart = 1):
if dwStart>0:
TaskInfo=FindTaskInfoByID(szTaskName)
if TaskInfo is not None and TaskInfo.lStatus<9:
TaskInfo.lGridValue = 1
TaskInfo.lStatus = 9
if abs(TaskInfo.lNewPrice-TaskInfo.lGridPrice)>0.4:
TaskInfo.lGridPrice = TaskInfo.lNewPrice
if TaskInfo.lMaxOweCount<=0:
TaskInfo.lMaxOweCount = 250
if TaskInfo.lOweCount>=150 :
TaskInfo.lMaxOweCount = TaskInfo.lOweCount+150
if TaskInfo.lOweCount>=10000 :
TaskInfo.lGridValue = 0.01*int(TaskInfo.lNewPrice*10)/10
if(TaskInfo.lGridValue<=0):
TaskInfo.lGridValue = 0.01
TaskInfo.lGridCount = 1000
TaskInfo.lMaxOweCount = TaskInfo.lOweCount+10000
SaveTaskStatus(TaskInfo)
else:
TaskInfo=FindTaskInfoByID(szTaskName)
if TaskInfo is not None and TaskInfo.lStatus!=1:
TaskInfo.lStatus = 1
SaveTaskStatus(TaskInfo)
def handle_data(context, data):
#log.info(('handle_time{}'.format(datetime.datetime.now()))
#print("handle_time:",datetime.datetime.now())
#print(datetime.datetime.now())
#interval_handle(context)
if g.done==0:
do_one_task(context,0)
pass
def interval_handle(context):
current_time = context.current_dt.time()
# ready_time = time(9, 30)
# end_time = time(15, 0)
if(current_time <= time(9, 30)):
g.time_state = 0
elif (current_time > time(15, 0)):
g.time_state = 9
elif (current_time > time(14, 30)):
g.time_state = 2
else:
g.time_state = 1
#print("interval_time:",datetime.datetime.now())
if g.done==0:
do_one_task(context,0)
#执行任务
DoTaskList(context)
pass
def before_trading_start(context, data):
if g.is_trade<=0:
return
print("每日开始 检测取消任务")
for TaskInfo in g.TaskList:
CheckCancelTask(TaskInfo)
ResetDayTask(TaskInfo)
pass
def after_trading_order_test(context):
print("盘后交易 留=",g.out_money)
do_end_task(context)
if g.do_end<=0:
return
if g.margin>0:
return
#g.security = "131810.SZ"
g.security = "204001.SS"
stock = g.security
snapshot = get_snapshot(g.security)
print("盘后交易 =",snapshot)
g.money = context.portfolio.cash
WriteUserMoney()
CheckOweTaskList(context,1)
def on_order_response(context, order_list):
print("订单产生 开始")
pass
def on_trade_response(context, trade_list):
log.info("订单成交 开始")
WriteUserMoney()
for trade in trade_list:
print(trade)
amount = trade['business_amount']
stock = trade['stock_code']
price = trade['business_price']
status = trade['status']
if not(status =="7" or status =="8"):
return
#print("订单成交",stock,"价格",price,"数量",amount)
str = ""
if amount>0:
str = "订单成交 买入:{},价格{},数量{}".format(stock,price,amount)
#print("订单成交 买入:","代码=",stock,"价格",price,"数量=",amount)
elif amount<0:
str = "订单成交 卖出:{},价格{},数量{}".format(stock,price,amount)
#print("订单成交 卖出:","代码=",stock,"价格",price,"数量=",amount)
log.info(str)
TaskInfo = FindTaskInfoByID(stock)
if TaskInfo is not None :
TaskInfo.lDealPrice = price
if IsTaskStart(TaskInfo):
TradeTaskInfo(TaskInfo,amount)
TaskInfo.lDoType = 0
TaskInfo.lDoCount += 1
TaskInfo.lTradeTime = datetime.datetime.now()
g.trade_count += 1
g.last_name = TaskInfo.szNickName
if amount>0:
TaskInfo.lBuyCount += 1
g.last_name += "-买"
else:
g.sell_count += 1
TaskInfo.lSellCount += 1
g.last_name += "-卖"
SaveTaskInfo(TaskInfo)
SaveTradeLog(TaskInfo,price,amount,",trade log")
else:
SaveTaskInfo(TaskInfo)
pass
def after_trading_end(context, data):
do_one_task(context,1)
print("盘后交易结束")
pass
def WriteUserMoney():
str = "UPDATE UserList SET UseMoney={}".format(g.money+g.buy_money)
WriteSql(str,1)
pass
def LoadTaskList(TaskName=""):
#g.file_path = get_research_path() + 'upload_file/test.db'
conn = sqlite3.connect(g.file_path)
# 创建一个Cursor:
cursor = conn.cursor()
# 执行一条SQL语句,创建user表:
#cursor.execute('create table user (id varchar(20) primary key, name varchar(20))')
# 继续执行一条SQL语句,插入一条记录:
#cursor.execute('insert into user (id, name) values (\'1\', \'Michael\')')
# 通过rowcount获得插入的行数:
#print(cursor.rowcount)
#cursor.execute("select name from sqlite_master where type='table'")
#tab_name=cursor.fetchall()
#print(tab_name)
cursor.execute('pragma table_info({})'.format("TaskList"))
col_name=cursor.fetchall()
#print(col_name)
col_name=[x[1] for x in col_name]
#cursor = conn.execute("SELECT * from user")
str = "SELECT * from TaskList order by sort,Status desc,TaskName"
if TaskName is not None and len(TaskName)>0:
print("加载=",TaskName)
str= "SELECT * from TaskList where TaskName='"+TaskName+"'"
cursor = conn.execute(str)
#for row in cursor:
#print("{}\t{}\t{}\t{}\t{}".format(row[1],row[2],row[3],row[4],row[5]))
#print("任务行数=",cursor.rowcount)
allData = cursor.fetchall()
print("直接获取全部记录:")
for item in allData:
#print(item)
#print(item[GetTableIndex(col_name,"TaskName")])
#print(item[GetTableIndex(col_name,"GridValue")])
szTaskName = item[GetTableIndex(col_name,"TaskName")]
TaskInfo = FindTaskInfoByID(szTaskName)
bHave = 1
if TaskInfo is None:
TaskInfo = NewTaskInfo()
bHave = 0
TaskInfo.ID = item[GetTableIndex(col_name,"ID")]
TaskInfo.szTaskName = szTaskName
TaskInfo.szNickName = item[GetTableIndex(col_name,"NickName")]
TaskInfo.lGridValue = item[GetTableIndex(col_name,"GridValue")]
TaskInfo.lGridPrice = item[GetTableIndex(col_name,"GridPrice")]
TaskInfo.lDealPrice = item[GetTableIndex(col_name,"DealPrice")]
TaskInfo.lGridPrice = round(TaskInfo.lGridPrice,3)
TaskInfo.lNewPrice = item[GetTableIndex(col_name,"NewPrice")]
TaskInfo.lStatus = item[GetTableIndex(col_name,"Status")]
TaskInfo.lDoType = item[GetTableIndex(col_name,"DoType")]
TaskInfo.lOweCount = item[GetTableIndex(col_name,"OweCount")]
TaskInfo.lMaxOweCount = item[GetTableIndex(col_name,"MaxOweCount")]
TaskInfo.lBonusDate = datetime.datetime.strptime(item[GetTableIndex(col_name,"BonusDate")], "%Y-%m-%d %H:%M:%S")
if TaskInfo.szTaskName.find("SS")>=0:
TaskInfo.sh=1
elif TaskInfo.lSzGridCount>0:
TaskInfo.lGridCount = TaskInfo.lSzGridCount
if bHave<=0:
g.TaskList.append(TaskInfo)
#print("超时",IsTimeCancelTask(TaskInfo,400))
#print("任务行数=",len(g.TaskList))
PrintTaskList(0)
#for TaskInfo in g.TaskList:
# print("任务:",TaskInfo.szTaskName,"网格价:",TaskInfo.lGridPrice,"格点:",TaskInfo.lGridValue)
lValue = 5
#for TaskInfo in g.TaskList:
#TaskInfo.lGridValue = lValue
#UpdateTaskInfo(TaskInfo)
# 关闭Cursor:
cursor.close()
# 提交事务:
conn.commit()
# 关闭Connection:
conn.close()
pass
def WriteSql(str,bLog =0):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
if bLog>0:
print(str)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def InsertTaskInfo(TaskInfo):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = " insert into TaskList (TaskName, Status,DoType,OweCount,GridValue,GridPrice,NewPrice) "
str += " values (\'{}\', {},{},{},{},{},{})".format(TaskInfo.szTaskName, TaskInfo.lStatus,TaskInfo.lDoType,TaskInfo.lOweCount\
,TaskInfo.lGridValue,TaskInfo.lGridPrice,TaskInfo.lNewPrice)
print(str)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def SaveTaskInfo(TaskInfo):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = " UPDATE TaskList SET "
str += " Status={}".format(TaskInfo.lStatus)
str += " ,DoType={}".format(TaskInfo.lDoType)
str += " ,OweCount={}".format(TaskInfo.lOweCount)
str += " ,GridPrice={}".format(TaskInfo.lGridPrice)
if(TaskInfo.lNewPrice>0):
str += " ,NewPrice={}".format(TaskInfo.lNewPrice)
str += " ,DealPrice={}".format(TaskInfo.lDealPrice)
str += " ,Updatedate=\'{}\'".format(datetime.datetime.now())
str += " where TaskName=\'{}\'".format(TaskInfo.szTaskName)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def SaveTaskPrice(TaskInfo,bBonus=0,bGridPrice=0):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = " UPDATE TaskList SET "
if(TaskInfo.lNewPrice>0):
str += " NewPrice={}".format(TaskInfo.lNewPrice)
if bBonus>0:
str += " ,BonusDate=\'{}\'".format(TaskInfo.lBonusDate)
if bGridPrice>0:
str += " ,GridPrice={}".format(TaskInfo.lGridPrice)
str += " ,Updatedate=\'{}\'".format(datetime.datetime.now())
str += " where TaskName=\'{}\'".format(TaskInfo.szTaskName)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def SaveTaskStatus(TaskInfo):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = " UPDATE TaskList SET "
str += " Status={}".format(TaskInfo.lStatus)
str += " ,GridValue={}".format(TaskInfo.lGridValue)
str += " ,MaxOweCount={}".format(TaskInfo.lMaxOweCount)
str += " ,GridPrice={}".format(TaskInfo.lGridPrice)
str += " ,DealPrice={}".format(TaskInfo.lGridPrice)
str += " ,Updatedate=\'{}\'".format(datetime.datetime.now())
str += " where TaskName=\'{}\'".format(TaskInfo.szTaskName)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def SaveTaskInfoByID(TaskInfo):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = " UPDATE TaskList SET "
str += " TaskName=\'{}\'".format(TaskInfo.szTaskName)
str += " ,NickName=\'{}\'".format(TaskInfo.szNickName)
str += " ,Updatedate=\'{}\'".format(datetime.datetime.now())
str += " where ID={}".format(TaskInfo.ID)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def DelTaskInfoByID(TaskInfo):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = "DELETE FROM TaskList"
str += " where ID={}".format(TaskInfo.ID)
print(str)
cursor = conn.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def DelTaskInfoByTaskName(szTaskName):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
str = "DELETE FROM TaskList"
str += " where TaskName=\'{}\'".format(szTaskName)
print(str)
cursor = conn.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def TradeTaskInfo(TaskInfo,amount):
TaskInfo.lOweCount += amount
num = abs(amount)
if num>TaskInfo.lGridCount*g.max_mul:
num = TaskInfo.lGridCount*g.max_mul
lGridValue = TaskInfo.lGridValue*((num)/TaskInfo.lGridCount)
if TaskInfo.lDoType>0 and abs(amount)<=TaskInfo.lGridCount*g.max_mul:
if amount>0:
TaskInfo.lGridPrice -= lGridValue
elif amount<0:
TaskInfo.lGridPrice += lGridValue
TaskInfo.lGridPrice = round(TaskInfo.lGridPrice,3)
pass
def SaveTradeLog(TaskInfo,lPrice,lNum,note):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
Status = 1
szNote = "buy"
if lNum<0:
Status=2
szNote = "sell"
szNote += note
str = " insert into ListTradeLog (TaskName,NickName, Status,OweCount,Price,Count,Note,GridValue,GridPrice,MaxOweCount) "
str += " values (\'{}\',\'{}\', {},{} ,{},{},\'{}\' ,{},{},{})".format(TaskInfo.szTaskName,TaskInfo.szNickName, Status,TaskInfo.lOweCount\
,lPrice,lNum,szNote \
,TaskInfo.lGridValue,TaskInfo.lGridPrice,TaskInfo.lMaxOweCount)
#print(str)
cursor.execute(str)
cursor.close()
conn.commit()
conn.close()
pass
def LoadTradeLog():
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
cursor = conn.execute("SELECT * from ListTradeLog order by Updatedate desc limit 5;")
allData = cursor.fetchall()
for item in allData:
print(item)
cursor.close()
conn.commit()
conn.close()
def ClearTradeLog(log_day,ask_day):
conn = sqlite3.connect(g.file_path)
cursor = conn.cursor()
lTime = datetime.datetime.now()+datetime.timedelta(days=-log_day)
str = "DELETE FROM ListTradeLog WHERE Updatedate <= "
str += "\'{}\'".format(lTime)
print(str)
cursor = conn.execute(str)
lTime = datetime.datetime.now()+datetime.timedelta(days=-ask_day)
str = "DELETE FROM ListTradeAskLog WHERE Updatedate <= "
str += "\'{}\'".format(lTime)
print(str)
cursor = conn.execute(str)
cursor.close()
conn.commit()
conn.close()
#获得表字段名索引
def GetTableIndex(col_name,find_name):
index = 0
for name in col_name:
if name==find_name:
return index
else:
index+= 1
return 1000
def PrintTaskInfo(TaskInfo,index):
dif = TaskInfo.lNewPrice-TaskInfo.lGridPrice
dif = round(dif,2)
strBuy = ""
if(dif<=-TaskInfo.lGridValue ):
if TaskInfo.lOweCount<TaskInfo.lMaxOweCount:
strBuy = "-buy"
if TaskInfo.lOweCount>=TaskInfo.lMaxOweCount:
if (dif<=-TaskInfo.lGridValue ):
strBuy += "-BM"
else:
strBuy += "-M"
str = "{}:{}任务:{}-{}{},点{},状态{}-{},持{}/{},格价{},新{}-{}差{}{},量{}".format(index,TaskInfo.ID,TaskInfo.szTaskName,TaskInfo.szNickName,TaskInfo.sh\
,TaskInfo.lGridValue,TaskInfo.lStatus,TaskInfo.lDoType\
,TaskInfo.lOweCount,TaskInfo.lMaxOweCount\
,TaskInfo.lGridPrice ,TaskInfo.lCur,TaskInfo.lNewPrice,dif,strBuy,TaskInfo.lVol)
str += ')'
if(TaskInfo.lSellCount>0 or TaskInfo.lBuyCount):
str2 = "卖{}买{}".format(TaskInfo.lSellCount,TaskInfo.lBuyCount)
str += str2
print(str)
pass
def PrintTaskList(dwType,bShow=1):
if bShow:
print("任务输出行数=",len(g.TaskList))
status = 9
index = 1
if dwType>0:
for TaskInfo in g.TaskList:
if TaskInfo.lStatus==dwType:
PrintTaskInfo(TaskInfo,index)
index += 1
else:
for TaskInfo in g.TaskList:
if TaskInfo.lStatus>=9:
PrintTaskInfo(TaskInfo,index)
index += 1
print("----")
for TaskInfo in g.TaskList:
if TaskInfo.lStatus<9:
PrintTaskInfo(TaskInfo,index)
index += 1
if bShow:
print("任务输出结束")
pass
def isDebt(TaskName):
if TaskName[0:2] == '11':
return 1
elif TaskName[0:2] == '12':
return 2
return 0
def isFund(TaskName):
strF = TaskName[0:2]
strF1 = TaskName[0:1]
if strF == '51' or strF == '50' or strF == '52' or strF1 == '5':
return 1
elif strF == '15' or strF == '16':
return 2
return 0
def IsTaskInfo(TaskName):
for info in g.TaskList:
if info.szTaskName == TaskName:
return 1
return 0
def FindTaskInfoByName(TaskName):
for info in g.TaskList:
if info.szTaskName == TaskName:
return info
return None
def FindTaskInfoByID(TaskName):
taskID = TaskName[0:6]
for info in g.TaskList:
if info.szTaskName[0:6] == taskID:
return info
return None
def UpdateTaskInfo(TaskInfo):
for info in g.TaskList:
if info.szTaskName == TaskInfo.szTaskName:
info = TaskInfo
pass
def ClearDataTaskInfo(TaskInfo):
if(TaskInfo is None):
return
pass
def ResetDayTask(TaskInfo):
TaskInfo.lDoCount = 0
TaskInfo.lBuyCount = 0
TaskInfo.lSellCount = 0
TaskInfo.lDoType = 0
g.trade_count = 0
g.sell_count = 0
def GetLastTime(last_time):
return (datetime.datetime.now()-last_time).total_seconds()
pass
def AskUserMoney(context):
pass
def ask_position_list():
order_list=get_positions()
return [x for x in order_list if order_list[x].amount != 0]
结语:让代码为你“打工”
网格交易的本质是利用市场波动赚钱,而Python代码能帮我们实现7×24小时的自动化执行。通过本文的代码框架,你可以快速搭建自己的可转债网格策略,在控制风险的同时捕捉每一个小波段收益。投资不是“预测涨跌”,而是“应对涨跌”。希望这份代码能成为你投资路上的好帮手,让生活因投资更美好!(注:代码仅为示例,实盘前请充分测试,并根据自身风险承受能力调整参数。)