等级: 新手上路
- 注册:
- 2024-7-24
- 曾用名:
|
from PythonApi import *
import pandas as pd
import numpy as np
def init(context):
context.position = 0 # 1 for long, -1 for short, 0 for no position
context.entry_price = 0
context.stop_loss = 0
context.max_profit = 0
context.mindiff = get_instruments(context.run_info.base_book_id).mintick
def convert_ndarray_to_dataframe(data, columns):
df = pd.DataFrame(data, columns=columns)
return df
def calculate_macd(data, short_period=12, long_period=26, signal_period=9):
short_ema = data["close"].ewm(span=short_period, adjust=False).mean()
long_ema = data["close"].ewm(span=long_period, adjust=False).mean()
macd = short_ema - long_ema
signal = macd.ewm(span=signal_period, adjust=False).mean()
return macd, signal
def calculate_kdj(data, n=9, k_period=3, d_period=3):
low_min = data["low"].rolling(window=n, min_periods=1).min()
high_max = data["high"].rolling(window=n, min_periods=1).max()
rsv = (data["close"] - low_min) / (high_max - low_min) * 100
k = rsv.ewm(com=(k_period - 1), min_periods=1).mean()
d = k.ewm(com=(d_period - 1), min_periods=1).mean()
j = 3 * k - 2 * d
data["K"] = k
data["D"] = d
data["J"] = j
return data
def handle_bar(context):
security = context.run_info.base_book_id # 获取当前交易的证券代码
# 获取1分钟和5分钟历史数据
data_1m_raw = history_bars(security, 30, "1m", ["close", "high", "low"])
data_5m_raw = history_bars(security, 31, "5m", ["close"])
# 确保数据量足够
if len(data_1m_raw) < 30 or len(data_5m_raw) < 31:
return
# 将ndarray转换为DataFrame
data_1m = convert_ndarray_to_dataframe(data_1m_raw, ["close", "high", "low"])
data_5m = convert_ndarray_to_dataframe(data_5m_raw[:-1], ["close"])
# 计算MACD和KDJ指标
macd_5m, signal_5m = calculate_macd(data_5m)
data_1m = calculate_kdj(data_1m)
# 当前价格
current_price = data_1m["close"].iloc[-1]
# 获取账户资金信息和合约单位
account_cash = get_account(19)
contract_size = get_instruments(context.run_info.base_book_id).multipliter
# 计算手数(可根据实际需求调整计算方法)
volume = 10
# 假设最多用10%的资金
# 判断是否有未平仓订单
if context.position != 0:
current_profit = (
(current_price - context.entry_price)
if context.position == 1
else (context.entry_price - current_price)
)
context.max_profit = max(context.max_profit, current_profit)
if context.position == 1:
context.stop_loss = max(
context.stop_loss, context.entry_price + context.max_profit * 0.6
)
else:
context.stop_loss = min(
context.stop_loss, context.entry_price - context.max_profit * 0.6
)
if (context.position == 1 and current_price < context.stop_loss) or (
context.position == -1 and current_price > context.stop_loss
):
if context.position == 1:
sell_close(security, "Market", 0, volume,serial_id = 1)
print(f"Sell at {current_price} on {security}")
else:
buy_close(security, "Market", 0, volume,serial_id = 2)
print(f"Cover at {current_price} on {security}")
context.position = 0
# 没有未平仓订单时判断开仓信号
if context.position == 0:
if (
macd_5m.iloc[-1] < signal_5m.iloc[-1]
): # 使用前一根5分钟K线判断
if data_1m["J"].iloc[-2] < 0 and data_1m["J"].iloc[-1] > 0 and data_1m["J"].iloc[-1]-data_1m["J"].iloc[-2]<15:
buy_open(security, "Market", 0, volume,serial_id = 3)
context.position = 1
context.entry_price = current_price
context.stop_loss = current_price - 5 * context.mindiff
context.max_profit = 0
print(f"Buy at {current_price} on {security}")
elif (
macd_5m.iloc[-1] > signal_5m.iloc[-1]
): # 使用前一根5分钟K线判断
if data_1m["J"].iloc[-2] > 100 and data_1m["J"].iloc[-1] < 100 and data_1m["J"].iloc[-2]-data_1m["J"].iloc[-1]<15:
sell_open(security, "Market", 0, volume,serial_id = 4)
context.position = -1
context.entry_price = current_price
context.stop_loss = current_price + 5 * context.mindiff
context.max_profit = 0
print(f"Short at {current_price} on {security}")
这是整个回测的代码 |
|