等级: 免费版
- 注册:
- 2023-9-29
- 曾用名:
|

楼主 |
发表于 2025-7-3 11:38
|
显示全部楼层
我的测试代码根据时间节点传递给 history_bars_date函数的start_date, end_date参数.
我的时间节点信息获取在自有jzt_fuLegalStamp包中,另外交易日历类TradeCalendar是在自有包jzt_trade_calendar中.
由于这些包的代码较长, 如果你们有自己的时间节点信息获取函数,你们可自行替代. 如需要我提供,也可.
如下是我的测试代码:
##############################################################################
# test_history_bars_date
##############################################################################
import time
import datetime as DateTime
from jzt_trade_calendar import *
from jzt_fuLegalStamp import *
from PythonApi import *
class GlobalCls:
def _init_(self):
self.name = 'global var'
g = GlobalCls()
def IsPaperTrading(context):
return context.run_info.run_type == 'paper_trading'
#----------------------------------------
def init(context):
g.context = context
Print(f"context.run_info.frequency:{context.run_info.frequency}")
Print(f"context.run_info.base_book_id:{context.run_info.base_book_id}")
#-------------------------------------------------------------
# 交易日历相关
#-------------------------------------------------------------
earlyest_market_date = DateTime.datetime.strptime('19930915','%Y%m%d')
if context.run_info.run_type == 'backtest':
start_date = context.run_info.start_date
else:
start_date = GetBasebookCurstamp()
tmp_beg_dt = start_date - DateTime.timedelta(days=365*1) #PS:向前推1年
if tmp_beg_dt.year < earlyest_market_date.year:
tmp_beg_dt = earlyest_market_date
tmp_end_dt = context.run_info.end_date + DateTime.timedelta(days=365*5) #PS:向后推x年
#PS:仅返回回测或实盘时对应合约的历史交易日list,交易日类型是datetime.datetime
context.tradedate_lst = get_trading_dates(tmp_beg_dt,tmp_end_dt) #获取交易日历
#Print(f"get_trading_dates({tmp_beg_dt},{tmp_end_dt}) ret list. list[0]:{context.tradedate_lst[0]} list[-1]:{context.tradedate_lst[-1]}")
#assert( context.tradedate_lst[0]-tmp_beg_dt < DateTime.timedelta(days=20) ) #获得的最早日期不能比要查询的起始日期过大
g.trade_calendar = TradeCalendar(context.tradedate_lst)
#-------------------------------------------------------------
g.fu_1mtp_man = FuXmTimepointMan(1) #1分钟K时间节点管理对象
g.fu_5mtp_man = FuXmTimepointMan(5) #5分钟K时间节点管理对象
g.fu_30mtp_man = FuXmTimepointMan(30) #30分钟K时间节点管理对象
#-------------------------------------------------------------
if 0:
context.universe = ['INSC0000',
'SQHC00',
'SQRU00',
'SQSP00',
'SQAU00',
'INNR00',
'DQV00',
'ZQTA00',
'ZQFG00',
'DQM00',
'DQJ00',
'SQSN00',
'ZQRM00',
'ZQCF00',
'DQEB00',
'SQRB00',
'SQPB00',
'SQAG00',
'DQPG00',
'DQB00',
'SQCU00',
'DQJD00',
'DQY00',
'DQI00',
'SQNI00',
'DQP00',
'ZQUR00',
'ZQSA00',
'ZQSF00',
'SRX00', #'ZQSRX00',
'AX00', #'DQAX00',
'ZQAP00',
'ZQMA00',
'DQJM00'
]
else:
context.universe = ['ZQSM00']
Print("init finish")
pass
# before_trading此函数会在每天基准合约的策略交易开始前被调用,当天只会被调用一次。--(选择实现)
def before_trading(context):
pass
# 你选择的品种的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新。--(必须实现)
def handle_bar(context):
#symbol = 'DQPG00'
HISTORY_ITEMS_LIST = ['datetime', 'close']
STAMP_INDEX = 0
target_dt = GetBasebookCurstamp()
if target_dt != GetFuTimepoint('5m', target_dt):
return
is_all_symbols_bars_ok = True
for symbol in context.universe:
base_bars = history_bars(symbol, 1, context.run_info.frequency, HISTORY_ITEMS_LIST, include_now=True)
#若该品种未休盘
if len(base_bars) and PyramidBarStampToDatetime(base_bars[-1,STAMP_INDEX]) == target_dt:
#获取所处30m级别K线--
freq = '30m'
end_dt = GetFuTimepoint(freq, target_dt)
pre_tp = GetFuPrexTimepoint(freq, target_dt)
bars_help = history_bars_date(symbol, pre_tp, end_dt, freq, HISTORY_ITEMS_LIST, include_now=True)
if len(bars_help):
fu_xmtp_man = GetFuXmtpMan(freq)
assert(fu_xmtp_man is not None)
latest_bar_stamp = PyramidBarStampToDatetime(bars_help[-1,STAMP_INDEX])
cur_timepoint = fu_xmtp_man.GetDateTimepoint_bydt(target_dt)
if latest_bar_stamp == cur_timepoint:
#Print(f"{symbol} {freq} latest_bar_stamp:{latest_bar_stamp} == cur_timepoint:{cur_timepoint} ")
pass
else:
is_all_symbols_bars_ok = False
Print(f"history_bars_date({symbol}, {pre_tp}, {end_dt}, {freq}, {HISTORY_ITEMS_LIST}, include_now=True) ret bars len:{len(bars_help)}")
info_comp = '>' if latest_bar_stamp > cur_timepoint else '<'
Print(f"Warning: {symbol} {freq} latest_bar_stamp:{latest_bar_stamp} {info_comp} cur_timepoint:{cur_timepoint} !")
assert(False)
else:
Print(f"Warning: history_bars_date({symbol}, {pre_tp}, {end_dt}, {freq}, {HISTORY_ITEMS_LIST}, include_now=True) ret bars len:{len(bars_help)}")
is_all_symbols_bars_ok = False
if is_all_symbols_bars_ok:
#Print(f"is_all_symbols_bars_ok:{is_all_symbols_bars_ok}")
pass
pass
# after_trading函数会在每天交易结束后被调用,当天只会被调用一次。 --(选择实现)
def after_trading(context):
pass
def exit(context):
print(f"on exit")
def Print(content):
dt_str = GetDatetimeForLogTag().strftime('%Y%m%d %H:%M:%S')
print("[{}] {}".format(dt_str, content)) #PS:打印也是耗时操作
###########################################################
# 时间节点相关 250620
###########################################################
def GetFuXmtpMan(freq):
if freq == FREQ_1M: return g.fu_1mtp_man
elif freq == FREQ_5M: return g.fu_5mtp_man
elif freq == FREQ_30M: return g.fu_30mtp_man
else: return None
def GetFuPrexTimepoint(freq:str, target_dt:DateTime.datetime):
fu_xmtp_man = GetFuXmtpMan(freq)
assert(fu_xmtp_man is not None)
#if fu_xmtp_man is not None:
return fu_xmtp_man.GetPreDateTimepoint_bydt(target_dt, g.trade_calendar)
def GetFuTimepoint(freq:str, target_dt:DateTime.datetime):
fu_xmtp_man = GetFuXmtpMan(freq)
assert(fu_xmtp_man is not None)
#if fu_xmtp_man is not None:
return fu_xmtp_man.GetDateTimepoint_bydt(target_dt)
#####################################################
# 时间相关
#####################################################
def PyramidBarStampToDatetime(val):
val = int(val)
#print("PyramidBarStampToDatetime val:{}".format(val))
return DateTime.datetime(year=int(val//10000000000),
month= val%10000000000//100000000,
day= val%100000000//1000000,
hour= val%1000000//10000,
minute=val%10000//100,
second=val%100,
microsecond=0)
#现实时间
def GetActualDateTime():
return DateTime.datetime.now()
def GetBasebookCurstamp():
return g.context.now
def GetDatetimeForLogTag():
if IsPaperTrading(g.context): return GetActualDateTime()
else: return GetBasebookCurstamp()
#根据程序运行模式获取当前时间(可以是模拟时间) PS:包含秒
def GetCurrentDateTime():
if IsPaperTrading(g.context): return DateTime.datetime.now().replace(microsecond=0)
else: return g.context.now.replace(microsecond=0) |
|