等级: 专业版
- 注册:
- 2024-10-10
- 曾用名:
|
老师,请帮忙看下我的止损策略(多单在M120下自动止损;空单在M120上自动止损),在模拟板试的,每次测试前几笔开单后都会自动止损,后面就跳出个窗口,就不执行止损了。
# Version: 20241010c
import pandas, pathlib, logging, datetime
def init(context):
#---------------自定义参数--------------
context.ma_value = 120 # MA均线的周期,数值类型为int(整数)。
#--------------------------------------
#---------------初始化变量值--------------
context.mode = context.run_info.run_type # 当前的运行模式
context.bool_if_first_log = False
context.file_dir = FileDir() # 当前文件所在的目录
context.output_dir = OutputDir(FileDir=context.file_dir) # 输出目录
context.start_run_time = datetime.datetime.now() # 策略首次运行的时间(真实的系统时间),用于日志名称
context.bool_if_runned = False # 是否运行过before_trading()
context.temp_log_list = [] # 在日志完成配置前,临时存储日志信息
#--------------------------------------
Write.debug(context, '>>> init - %s'%(VersionInfo()))
Write.info(context, '>>> init - 当前的运行模式:%s'%(dic_run_mode['%s'%context.mode]))
str_base_instru = context.run_info.base_book_id
Write.info(context, '>>> init - 当前的基准合约为%s'%(str_base_instru))
#-----------------------------------------
before_trading(context) # 盘启动策略不会触发before_trading(),补运行一次
Write.debug(context, '--------------------')
'''
# 用于模拟手动下单
itr_contract = str_base_instru
int_order_id = sell_open(order_book_id = itr_contract, style = "Market", volume = 1, hedge_flag = False,serial_id = 1)
Write.info(context, '>>> handle_bar - 订单已提交,开仓卖出%s手%s,订单ID:%s'%(1, itr_contract, int_order_id))
'''
# before_trading()函数每天交易开始前会被调用一次,若盘中启动策略则不会被触发
def before_trading(context):
# 避免同一天重复运行
if context.bool_if_runned:
return
#---------------初始化变量值--------------
context.lst_order_queue = [] # 初始化自定义订单列队
context.log_name = LogName(context, mode = context.mode)
Write.ConfigLogger(OutputDir=context.output_dir, LogName = context.log_name) # 配置日志logger
if not context.bool_if_first_log: # 将日志配置完成之前的信息补写入日志,仅运行一次
context.bool_if_first_log = True # 标记日志配置完成后已经运行了run_once
Write.run_once(context)
#-----------------------------------------
#---------------查询账户信息--------------
Write.debug(context, '>>> before_trading - ')
account_list = get_account_book()
Write.info(context, '>>> before_trading - 已登录的所有账号:%s'%(account_list))
flt_account_id = str(get_account(type = 1)) # 参数说明: https://www.weistock.com/docs/Py ... t%E5%87%BD%E6%95%B0
Write.info(context, '>>> before_trading - 当前账号的ID:%s %s'%(flt_account_id, type(flt_account_id)))
int_account_type = int(get_account(type = 2))
Write.info(context, '>>> before_trading - 当前账号的类型:%s'%(dic_account_type['%s'%int_account_type]))
int_account_status = get_account(type = 53)
Write.info(context, '>>> before_trading - 当前账号的状态:%s %s'%(int_account_status, type(int_account_status)))
flt_total_asset = get_account(type = 6)
Write.info(context, '>>> before_trading - 当前账号的总资产:%s'%(flt_total_asset))
flt_floating_profit_loss = get_account(type = 4)
Write.info(context, '>>> before_trading - 当前账号的浮动盈亏:%s'%(flt_floating_profit_loss))
flt_cash_available = get_account(type = 3)
Write.info(context, '>>> before_trading - 当前账号的现金余额:%s'%(flt_cash_available))
flt_cash_available = get_account(type = 19)
Write.info(context, '>>> before_trading - 当前账号的可用资金:%s'%(flt_cash_available))
lst_portfolio = get_portfolio_book(type = 2) # 读取默认登录账户的持仓品种
Write.info(context, '>>> before_trading - 当前持有的合约品种:%s'%(lst_portfolio))
for itr_contract in lst_portfolio:
obj_position_info = get_portfolio(order_book_id = itr_contract, type = 2) # 单个合约品种的持仓信息
int_long_quantity = obj_position_info.buy_quantity # 多单持仓的数量
int_short_quantity = obj_position_info.sell_quantity # 空单持仓的数量
Write.info(context, '>>> before_trading - 合约%s的持仓情况:多单持仓的数量 %s,空单持仓数量 %s'%(itr_contract, int_long_quantity, int_short_quantity))
context.bool_if_runned = True
# handle_bar()函数由行情更新驱动。例如运行的基础周期为1分钟,则每分钟触发一次。可自定义间隔周期,最低为3秒钟(即tick级别)。
# 请注意间隔周期与代码运算量之间的平衡,因为运行handle_bar()中的代码需要消耗时间(耗时视网络情况和本机的性能而异)。
def handle_bar(context):
#Write.debug(context, '>>> handle_bar - ')
# 将未成交的订单全部撤单
for itr_order_id in context.lst_order_queue:
obj_order_info = get_orders_id(order_id = itr_order_id) # 根据订单号获取订单信息
int_unfilled_quantity = obj_order_info.unfilled_quantity # 读取订单的剩余未成交量
if int_unfilled_quantity > 0:
Write.info(context, '>>> handle_bar - 订单%s存在未成交'%(itr_order_id))
cancel_order(itr_order_id)
Write.info(context, '>>> handle_bar - 订单%s已提交撤销指令(无反馈信息,需自行核对)'%(itr_order_id))
context.lst_order_queue = [] # 清空自定义订单列队
lst_portfolio = get_portfolio_book(type = 2) # 读取默认登录账户的持仓品种
Write.info(context, '>>> handle_bar - 当前持有的合约品种:%s'%(lst_portfolio))
# 遍历有持仓的合约品种
for itr_contract in lst_portfolio:
# 如果当前合约品种不在交易时段则跳过
if not istradertime(itr_contract):
Write.info(context, '>>> handle_bar - 合约%s当前不在交易时段,跳过'%(itr_contract))
continue
obj_position_info = get_portfolio(order_book_id = itr_contract, type = 2) # 单个合约品种的持仓信息
int_long_quantity = obj_position_info.buy_quantity # 多单持仓的数量
int_short_quantity = obj_position_info.sell_quantity # 空单持仓的数量
int_net_quantity = int_long_quantity - int_short_quantity # 净持仓的数量
Write.info(context, '>>> handle_bar - 合约%s的持仓情况:多单持仓的数量 %s,空单持仓数量 %s'%(itr_contract, int_long_quantity, int_short_quantity))
# 如果单个合约品种的净持仓非0,则继续监控盈亏
if int_net_quantity != 0:
int_ma_value = context.ma_value # MA均线的周期,此参数在init()函数中取设定。
nda_bar_close = history_bars(order_book_id = itr_contract, bar_count = int_ma_value+10, frequency = '1m', fields =['datetime','close']) # 获取1分钟k线的最近n个bar(n=均线周期+10)
df_bar_close = pandas.DataFrame(data=nda_bar_close, columns=['date','close']) # 将ndarray转换为dataframe,便于计算ma。
df_bar_close['date'] = pandas.to_datetime(df_bar_close['date'],format='%Y%m%d%H%M%S') # 将日期格式转换为datetime
df_bar_close['ma%s'%int_ma_value] = df_bar_close['close'].rolling(window=int_ma_value).mean().round(5) # 计算MA值,保留5位小数,并将结果填入新的列
flt_last_ma = df_bar_close.iloc[-1]['ma%s'%int_ma_value] # 最后一个ma值
flt_last_close = df_bar_close.iloc[-1]['close'].round(2) # 最后一个收盘价
dtime_last_time = df_bar_close.iloc[-1]['date']
Write.info(context, '>>> handle_bar - %s的最新k线的时间:%s,收盘价:%s,MA%s:%s'%(itr_contract, dtime_last_time, flt_last_close, int_ma_value, flt_last_ma))
# 如果是净空仓,且收盘价>MA值,则锁空仓
if all([int_net_quantity < 0, flt_last_close > flt_last_ma]):
Write.info(context, '>>> handle_bar - %s最新1分钟K线的收盘价%s>MA%s的值%s,且当前%s的持仓为净空仓,满足锁仓要求'%(itr_contract, flt_last_close, int_ma_value, flt_last_ma, itr_contract))
int_order_id = buy_open(order_book_id = itr_contract, style = "Market", volume = abs(int_net_quantity), hedge_flag = False,serial_id = 2)
Write.info(context, '>>> handle_bar - 订单已提交,以市价开仓买入%s手%s,订单ID:%s'%(abs(int_net_quantity), itr_contract, int_order_id))
context.lst_order_queue.append(int_order_id) # 将订单id添加到自定义订单列队,用于监控订单执行结果,避免重复下单
# 如果是净多仓,且收盘价<MA值,则锁多仓
if all([int_net_quantity > 0, flt_last_close < flt_last_ma]):
Write.info(context, '>>> handle_bar - %s最新1分钟K线的收盘价%s<MA%s的值%s,且当前%s的持仓为净多仓,满足锁仓要求'%(itr_contract, flt_last_close, int_ma_value, flt_last_ma, itr_contract))
int_order_id = sell_open(order_book_id = itr_contract, style = "Market", volume = abs(int_net_quantity), hedge_flag = False,serial_id = 3)
Write.info(context, '>>> handle_bar - 订单已提交,以市价开仓卖出%s手%s,订单ID:%s'%(int_net_quantity, itr_contract, int_order_id))
context.lst_order_queue.append(int_order_id) # 将订单id添加到自定义订单列队,用于监控订单执行结果,避免重复下单
# after_trading函数会在每天交易结束后被调用,当天只会被调用一次。 --(选择实现)
def after_trading(context):
Write.debug(context, '>>> after_trading - 收盘啦~')
context.bool_if_runned = False
# order_status()函数由订单事件驱动。当委托下单,成交,撤单等与下单有关的动作时,该方法就会被调用。
def order_status(context,order):
Write.info(context, '>>> order_status - 订单有新动向 - %s,订单ID %s,合约代码 %s,开平方向 %s,订单方向 %s, 订单价格 %s,订单数量 %s,已成交数量 %s,未成交数量 %s,订单类型 %s,成交价格 %s,本次成交数量 %s,订单状态 %s,订单创建时间 %s,交易账户 %s,柜台返回的系统编号 %s'%(order.message, order.order_id, order.order_book_id, order.position_effect, order.side, round(order.price,2), order.quantity, order.filled_quantity, order.unfilled_quantity, order.type, order.trade_price, order.trade_quantity, order.status, order.datetime, order.account, order.system_id))
#############################################################################
# 返回结果与文字之间的映射
dic_run_mode = {'backtest':'回测模式', 'paper_trading':'交易模式'}
dic_account_type = {'0':'盈透', '1':'CTP', '2':'金仕达期货', '3':'FIX接口', '4':'恒生期货', '5':'子账户', '6':'其他柜台', '255':'无效账户'}
#############################################################################
# 获取当前文件所在目录的路径
def FileDir(): #获取当前文件所在目录的路径
obj_FileDir = pathlib.Path(__file__).parent #返回当前文件所在目录的路径
str_FileDir = str(obj_FileDir).replace('\\','/') + '/' #转换为string,并将'\'替换为'/'
return str_FileDir
# output文件夹的路径
def OutputDir(**kargs):
str_file_dir = kargs['FileDir']
str_OutputDir = str_file_dir + "output/"
obj_path = pathlib.Path(str_OutputDir)
bool_path_exists = obj_path.exists()
if not bool_path_exists:
obj_path.mkdir()
return str_OutputDir
# 日志的名字
def LogName(context, **kargs):
str_mode = kargs['mode']
if str_mode == 'paper_trading': #如果是交易模式
dtime_now = context.now
return dtime_now.strftime('%Y%m%d')
elif str_mode == 'backtest': #如果是回测模式
dtime_StartRun = context.start_run_time
return '回测 %s'%(dtime_StartRun.strftime('%Y%m%d %H%M%S'))
else:
Write.info(context, 'LogName - 判断状态时发生未知错误')
def VersionInfo():
obj_file_path = pathlib.Path(__file__)
with open(obj_file_path, 'r', encoding='utf-8') as file:
line = file.readline()
str_version_info = line.strip()
return str_version_info
class Write(object): #重新封装日志函数
def __init__(self):
self.logger = logging.getLogger('my_logger')
self.detail_logger = logging.getLogger('my_logger2')
@classmethod
def ConfigLogger(cls, **kargs):
str_output_dir = kargs['OutputDir']
str_log_name = kargs['LogName']
logger = cls().logger
logger.setLevel(logging.DEBUG)
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 创建一个用于记录info和debug级别日志的handler,并将日志保存在日志.txt文件中
info_handler = logging.FileHandler(r'%s%s - 日志.txt'%(str_output_dir, str_log_name), delay = True)
info_handler.setLevel(logging.DEBUG)
#info_handler.addFilter(lambda record: record.levelno < logging.WARNING)
info_handler.setFormatter(logging.Formatter('%(key_time)s - %(levelname)s - %(message)s'))
info_handler.close()
logger.addHandler(info_handler)
# 创建一个用于将日志消息输出到终端的handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(key_time)s - %(levelname)s - %(message)s'))
console_handler.close()
logger.addHandler(console_handler)
#-------------------------------------------
detail_logger = cls().detail_logger
detail_logger.setLevel(logging.DEBUG)
# 创建一个用于记录warning及以上级别日志的handler,并将日志保存在错误日志.txt文件中
detail_handler = logging.FileHandler(r'%s%s - 错误日志.txt'%(str_output_dir, str_log_name), delay = True)
detail_handler.setLevel(logging.DEBUG)
detail_handler.setFormatter(logging.Formatter('%(key_time)s - %(levelname)s - %(message)s'))
detail_handler.close()
detail_logger.addHandler(detail_handler)
@classmethod
def run_once(cls,context): # 将日志配置完成之前的信息补写入日志
logger = cls().logger
for i in context.temp_log_list:
logger.debug(i, extra={'key_time': context.now}) # 通过python的logging模块写入日志
del context.temp_log_list
@classmethod
def debug(cls, context, msg):
print('%s - DEBUG - %s'%(context.now, msg))
# 如果还没配置日志,则暂时存储至变量中
if not context.bool_if_first_log:
context.temp_log_list.append(msg)
return
logger = cls().logger
logger.debug(msg, extra={'key_time': context.now}) # 通过python的logging模块写入日志
@classmethod
def info(cls, context, msg):
print('%s - INFO - %s'%(context.now, msg))
# 如果还没配置日志,则暂时存储至变量中
if not context.bool_if_first_log:
context.temp_log_list.append(msg)
return
logger = cls().logger
logger.info(msg, extra={'key_time': context.now})
@classmethod
def warning(cls, context, msg):
print('%s - WARNING - %s'%(context.now, msg))
# 如果还没配置日志,则暂时存储至变量中
if not context.bool_if_first_log:
context.temp_log_list.append(msg)
return
logger = cls().logger
logger.warning(msg, extra={'key_time': context.now})
@classmethod
def error(cls, context, msg):
print('%s - ERROR - %s'%(context.now, msg))
# 如果还没配置日志,则暂时存储至变量中
if not context.bool_if_first_log:
context.temp_log_list.append(msg)
return
logger = cls().logger
logger.error(msg, extra={'key_time': context.now})
@classmethod
def critical(cls, context, msg):
print('%s - CRITICAL - %s'%(context.now, msg))
# 如果还没配置日志,则暂时存储至变量中
if not context.bool_if_first_log:
context.temp_log_list.append(msg)
return
logger = cls().logger
logger.critical(msg, extra={'key_time': context.now})
@classmethod
def detail(cls, context, msg):
detail_logger = cls().detail_logger
detail_logger.error(msg, extra={'key_time': context.now})
|
|