第三部分 实现简单的量化框架

框架内容:

  • 开始时间、结束时间、现金、持仓数据
  • 获取历史数据
  • 交易函数
  • 计算并绘制收益曲线
  • 回测主体框架
  • 计算各项指标
  • 用户待写代码:初始化、每日处理函数

第四部分 在线平台与量化投资

本节内容:

  • 第一个简单的策略(了解平台)
  • 双均线策略
  • 因子选股策略
  • 多因子选股策略
  • 小市值策略
  • 海龟交易法则
  • 均值回归策略
  • 动量策略
  • 反转策略
  • 羊驼交易法则
  • PEG策略
  • 鳄鱼交易法则

JoinQuant平台

  • 主要框架

    • initialize
    • handle_data
    • ……
  • 获取历史数据
  • 交易函数
  • 回测频率:
    • 按天回测
    • 按分钟回测
  • 风险指标

双均线策略

  • 均线:对于每一个交易日,都可以计算出前N天的移动平均值,然后把这些移动平均值连起来,成为一条线,就叫做N日移动平均线。
  • 移动平均线常用线有5天、10天、30天、60天、120天和240天的指标。 5天和10天的是短线操作的参照指标,称做日均线指标; 30天和60天的是中期均线指标,称做季均线指标; 120天、240天的是长期均线指标,称做年均线指标。
  • 金叉:短期均线上穿长期均线
  • 死叉:短期均线下穿长期均线
 # 导入函数库
import jqdata # 初始化函数,设定基准等等
def initialize(context):
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock') g.security = ['601318.XSHG']
g.p1 =
g.p2 = def handle_data(context, data):
cash = context.portfolio.available_cash
for stock in g.security:
hist = attribute_history(stock, g.p2)
ma60 = hist['close'].mean()
ma5 = hist['close'][-:].mean()
if ma5 > ma60 and stock not in context.portfolio.positions:
order_value(stock, cash/len(g.security))
elif ma5 < ma60 and stock in context.portfolio.positions:
order_target(stock,

双均线代码

因子选股策略

  • 因子:

    • 标准 增长率,市值,ROE,……
  • 选股策略:
    • 选取该因子最大(或最小)的N只股票持仓
  • 多因子选股:如何同时考虑多个因子?
 # 导入函数库
import jqdata # 初始化函数,设定基准等等
def initialize(context):
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock')
g.N=
g.days = #获取成分股 def handle_data(context, data):
g.days+=
if g.days % == :
g.security = get_index_stocks('000300.XSHG')
df = get_fundamentals(query(valuation).filter(valuation.code.in_(g.security)))
df = df.sort(columns='market_cap')
df = df.iloc[:g.N,:]
tohold = df['code'].values for stock in context.portfolio.positions:
if stock not in tohold:
order_target(stock, ) tobuy = [stock for stock in tohold if stock not in context.portfolio.positions] if len(tobuy)>:
cash = context.portfolio.available_cash
cash_every_stock = cash / len(tobuy)
for stock in tobuy:
order_value(stock, cash_every_stock)

因子选股策略

均值回归理论

 import jqdata
import math
import numpy as np
import pandas as pd def initialize(context):
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock') g.benchmark = '000300.XSHG'
set_benchmark(g.benchmark) g.ma_days =
g.stock_num = run_monthly(handle, )
#run_monthly(handle, ) def handle(context):
tohold = get_hold_list(context)
for stock in context.portfolio.positions:
if stock not in tohold:
order_target_value(stock, ) tobuy = [stock for stock in tohold if stock not in context.portfolio.positions] if len(tobuy)>:
cash = context.portfolio.available_cash
cash_every_stock = cash / len(tobuy) for stock in tobuy:
order_value(stock, cash_every_stock) def get_hold_list(context):
stock_pool = get_index_stocks(g.benchmark)
stock_score = pd.Series(index=stock_pool)
for stock in stock_pool:
df = attribute_history(stock, g.ma_days, '1d', ['close'])
ma = df.mean()[]
current_price = get_current_data()[stock].day_open
ratio = (ma - current_price) / ma
stock_score[stock] = ratio
return stock_score.nlargest(g.stock_num).index.value

均值回归策略

  • 均值回归:“跌下去的迟早要涨上来”
  • 均值回归的理论基于以下观测:价格的波动一般会以它的均线为中心。也就是说,当标的价格由于波动而偏离移动均线时,它将调整并重新归于均线。
  • 偏离程度:(MA-P)/MA
  • 策略:在每个调仓日进行(每月调一次仓)
    • 计算池内股票的N日移动均线;
    • 计算池内所有股票价格与均线的偏离度;
    • 选取偏离度最高的num_stocks支股票并进行调仓。

day33 Python与金融量化分析(三)-LMLPHP

布林带策略

  • 布林带/布林线/保利加通道(Bollinger Band):由三条轨道线组成,其中上下两条线分别可以看成是价格的压力线和支撑线,在两条线之间是一条价格平均线。
  • 计算公式:
    • 中间线=20日均线
    • up线=20日均线+N*SD(20日收盘价)
    • down线=20日均线-N*SD(20日收盘价)
 import talib
#import numpy as np
#import pandas as pd def initialize(context):
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock')
set_benchmark('000300.XSHG') g.security = ['600036.XSHG']#,'601328.XSHG','600196.XSHG','600010.XSHG']
g.N = # 初始化此策略
def handle_data(context, data):
cash = context.portfolio.cash / len(g.security)
for stock in g.security:
df = attribute_history(stock, )
middle = df['close'].mean()
upper = df['close'].mean() + g.N * df['close'].std()
lower = df['close'].mean() - g.N * df['close'].std() current_price = get_current_data()[stock].day_open
# 当价格突破阻力线upper时,且拥有的股票数量>=0时,卖出所有股票
if current_price >= upper and stock in context.portfolio.positions:
order_target(stock, )
# 当价格跌破支撑线lower时, 且拥有的股票数量<=0时,则全仓买入
elif current_price <= lower and stock not in context.portfolio.positions:
order_value(stock, cash)

布林带策略

PEG策略

  • 彼得·林奇:任何一家公司股票如果定价合理的话,市盈率就会与收益增长率相等。
  • 每股收益(EPS)
  • 股价(P)
  • 市盈率(PE)= P/EPS
  • 收益增长率(G)= (EPSi – EPSi-1)/ EPSi-1
  • PEG = PE / G / 100
  • PEG越低,代表股价被低估的可能性越大,股价会涨的可能性越大。
  • PEG是一个综合指标,既考察价值,又兼顾成长性。PEG估值法适合应用于成长型的公司。
  • 注意:过滤掉市盈率或收益增长率为负的情况
 def initialize(context):
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock') g.security = get_index_stocks('000300.XSHG')
g.days =
g.N = def handle_data(context, data):
g.days+=
if g.days%!=:
return
df = get_fundamentals(query(valuation.code, valuation.pe_ratio, indicator.inc_net_profit_year_on_year).filter(valuation.code.in_(g.security)))
df = df[(df['pe_ratio']>)&(df['inc_net_profit_year_on_year']>)]
df['PEG'] = df['pe_ratio']/df['inc_net_profit_year_on_year']/
df = df.sort(columns='PEG')[:g.N]
tohold = df['code'].values for stock in context.portfolio.positions:
if stock not in tohold:
order_target_value(stock, ) tobuy = [stock for stock in tohold if stock not in context.portfolio.positions] if len(tobuy)>:
print('Buying')
cash = context.portfolio.available_cash
cash_every_stock = cash / len(tobuy) for stock in tobuy:
order_value(stock, cash_every_stock)

PEG

羊驼交易法则

  • 起始时随机买入N只股票,每天卖掉收益率最差的M只,再随机买入剩余股票池的M只。
 import jqdata
import pandas as pd def initialize(context):
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock') g.security = get_index_stocks('000300.XSHG')
g.period =
g.N =
g.change =
g.init = True stocks = get_sorted_stocks(context, g.security)[:g.N]
cash = context.portfolio.available_cash * 0.9 / len(stocks)
for stock in stocks:
order_value(stock, cash)
run_monthly(handle, ) def get_sorted_stocks(context, stocks):
df = history(g.period, field='close', security_list=stocks).T
df['ret'] = (df.iloc[:,-] - df.iloc[:,]) / df.iloc[:,]
df = df.sort(columns='ret', ascending=False)
return df.index.values def handle(context):
if g.init:
g.init = False
return
stocks = get_sorted_stocks(context, context.portfolio.positions.keys()) for stock in stocks[-g.change:]:
order_target(stock, ) stocks = get_sorted_stocks(context, g.security) for stock in stocks:
if len(context.portfolio.positions) >= g.N:
break
if stock not in context.portfolio.positions:
order_value(stock, context.portfolio.available_cash * 0.9)

羊驼交易

海龟交易法则

  • 唐奇安通道:

    • 上线=Max(前N个交易日的最高价)
    • 下线=Min(前N个交易日的最低价)
    • 中线=(上线+下线)/2

day33 Python与金融量化分析(三)-LMLPHP

day33 Python与金融量化分析(三)-LMLPHP

分钟回测

  • 入市:若当前价格高于过去20日的最高价,则买入一个Unit
  • 加仓:若股价在上一次买入(或加仓)的基础上上涨了0.5N,则加仓一个Unit
  • 止盈:当股价跌破10日内最低价时(10日唐奇安通道下沿),清空头寸
  • 止损:当价格比最后一次买入价格下跌2N时,则卖出全部头寸止损(损失不会超过2%)
 import jqdata
import math
import numpy as np
import pandas as pd
from collections import deque def initialize(context): set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock') g.security = '000060.XSHE'
set_benchmark(g.security)
g.in_day =
g.out_day =
g.today_units =
g.current_units =
g.N=deque(maxlen=)
g.current_N =
g.last_buy_price = price = attribute_history(g.security, g.N.maxlen*+, '1d', ('high', 'low', 'close')) for i in range(g.N.maxlen+, g.N.maxlen*+):
li = []
for j in range(i-,i+):
a = price['high'][j]-price['low'][j]
b = abs(price['high'][j]-price['close'][j-])
c = abs(price['low'][j]-price['close'][j-])
li.append(max(a,b,c))
current_N = np.array(li).mean()
g.N.append(current_N) def before_trading_start(context):
g.current_N = cal_N()
g.today_units = def handle_data(context, data):
dt = context.current_dt
current_price = data[g.security].price #上一分钟价格
value = context.portfolio.total_value
cash = context.portfolio.available_cash unit = math.floor(value * 0.01 / g.current_N) if g.current_units == :
buy(current_price, cash, unit)
else:
if stop_loss(current_price):
return
if sell(current_price):
return
addin(current_price, cash, unit) def cal_N():
# if len(g.N) < g.N.maxlen:
# price = attribute_history(g.security, g.N.maxlen+, '1d', ('high', 'low', 'close'))
# li = []
# for i in range(, g.N.maxlen+):
# a = price['high'][i]-price['low'][i]
# b = abs(price['high'][i]-price['close'][i-])
# c = abs(price['low'][i]-price['close'][i-])
# li.append(max(a,b,c))
# current_N = np.array(li).mean()
# else:
price = attribute_history(g.security, , '1d', ('high', 'low', 'close'))
a = price['high'][]-price['low'][]
b = abs(price['high'][]-price['close'][])
c = abs(price['low'][]-price['close'][])
current_N = (max(a,b,c) + np.array(g.N).sum())/(g.N.maxlen+)
g.N.append(current_N)
return current_N def buy(current_price, cash, unit):
price = attribute_history(g.security, g.in_day, '1d', ('high',))
if current_price > max(price['high']):
shares = cash / current_price
if shares >= unit:
print("buying %d" % unit)
o = order(g.security, unit)
g.last_buy_price = o.price
g.current_units +=
g.today_units +=
return True
return False def addin(current_price, cash, unit):
if current_price >= g.last_buy_price + 0.5 * g.current_N:
shares = cash / current_price
if shares >= unit:
print("adding %d" % unit)
o = order(g.security, unit)
g.last_buy_price = o.price
g.current_units +=
g.today_units +=
return True
return False def sell(current_price):
price = attribute_history(g.security, g.out_day, '1d', ('low',))
if current_price < min(price['low']):
print("selling")
order_target(g.security, )
g.current_units = g.today_units
return True
return False def stop_loss(current_price):
if current_price < g.last_buy_price - * g.current_N:
print("stop loss")
order_target(g.security, )
g.current_units = g.today_units
return True
return False

海龟交易法则

鳄鱼法则交易系统

https://www.joinquant.com/post/595?tag=new

 # 导入函数库
import jqdata
import numpy as np # 初始化函数,设定基准等等
def initialize(context):
set_option('use_real_price', True)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=), type='stock')
set_benchmark('000300.XSHG') g.up_price = {} #向上碎形最高价
g.low_price = {} #向下碎形最低价
g.up_fractal_exists = {} #判断有效向上碎形
g.down_fractal_exists = {} #判断有效向下碎形
g.AO_index = {} #存放连续的AO指标数据
g.cal_AC_index = {} #计算AC指标中转存储
g.AC_index = {} #存放连续的AC指标数据
g.amount = {} #满仓仓位
g.stock = get_index_stocks('000300.XSHG')
g.buy_stock = []
g.month = context.current_dt.month
run_monthly(select_universe,,'open') #重置全局变量
def reset_global():
g.up_price = {} #向上碎形最高价
g.low_price = {} #向下碎形最低价
g.up_fractal_exists = {} #判断有效向上碎形
g.down_fractal_exists = {} #判断有效向下碎形
g.AO_index = {} #存放连续的AO指标数据
g.cal_AC_index = {} #计算AC指标中转存储
g.AC_index = {} #存放连续的AC指标数据
g.amount = {} #满仓仓位
g.buy_stock = [] def initial_stock_global(stock):
g.up_price[stock] =
g.low_price[stock] =
g.up_fractal_exists[stock] = False
g.down_fractal_exists[stock] = False #判断有效向下碎形
g.AO_index[stock] = [] #存放连续的AO指标数据
g.cal_AC_index[stock] = [] #计算AC指标中转存储
g.AC_index[stock] = [] #存放连续的AC指标数据
g.amount[stock] = #满仓仓位 #轮换选股后清空持仓
def reset_position(context):
for stock in g.buy_stock:
order_target(stock,)
log.info("sell %s for reset position"%stock) #选股
def select_universe(context):
#每三个月操作一次
month = context.current_dt.month
if month% != g.month%:
return
#清空全局变量
reset_position(context)
reset_global()
hist = history(,'1d','close',g.stock,df = False)
for stock in g.stock:
if is_sleeping_alligator(stock,hist,):
g.buy_stock.append(stock)
#初始化该股票全局变量
initial_stock_global(stock)
print g.buy_stock
return None #睡着的鳄鱼
def is_sleeping_alligator(stock,hist,nday):
for i in range(nday):
if is_struggle(stock,hist,i) == False:
return False
return True #均线纠缠,BRG三线非常接近
def is_struggle(stock,hist,delta):
blue_line = hist[stock][--delta:--delta].mean()
red_line = hist[stock][--delta:--delta].mean()
green_line = hist[stock][--delta:--delta].mean()
if abs(blue_line/red_line-)<0.02 and abs(red_line/green_line-)<0.02:
return True
else:
return False #判断 向上 或 向下 碎形
def is_fractal(stock,direction):
hist = attribute_history(stock, , fields=[direction])
if direction == 'high':
if np.all(hist.iloc[:] < hist.iloc[]) and np.all(hist.iloc[:] < hist.iloc[]):
g.up_price[stock] = hist.iloc[].values
return True
elif direction == 'low':
if np.all(hist.iloc[:] > hist.iloc[]) and np.all(hist.iloc[:] > hist.iloc[]):
g.low_price[stock] = hist.iloc[].values
return True
return False #通过比较碎形与红线位置,判断碎形是否有效
def is_effective_fractal(stock, direction):
if is_fractal(stock,direction):
hist = attribute_history(stock, )
red_line = hist['close'][:-].mean()
close_price = hist['close'][-]
if direction == 'high':
if close_price > red_line:
g.up_fractal_exists[stock] = True
else:
g.up_fractal_exists[stock] = False
elif direction == 'low':
if close_price < red_line:
g.down_fractal_exists[stock] = True
else:
g.down_fractal_exists[stock] = False #N日内最高价格的N日线
def nday_high_point(stock,n):
hist = history(*n,'1d','high',[stock],df = False)[stock]
high_point = []
for i in range(n):
high_point.append(max(hist[--i:--i]))
return np.array(high_point).mean() #N日内最低价格的N日线
def nday_low_point(stock,n):
hist = history(*n,'1d','low',[stock],df = False)[stock]
low_point = []
for i in range(n):
low_point.append(max(hist[--i:--i]))
return np.array(low_point).mean() #AO=5日内(最高-最低)/2的5日移动平均-34日内(最高-最低)/2的34日移动平均
def AO_index(stock):
g.AO_index[stock].append(nday_high_point(stock,)/ + nday_low_point(stock,)/\
- nday_high_point(stock,)/ - nday_low_point(stock,)/)
return None #AO-AO的5日平均值的5日平均
def AC_index(stock):
AO_index(stock)
if len(g.AO_index[stock]) >= :
g.cal_AC_index[stock].append(g.AO_index[stock][-] - np.array(g.AO_index[stock][-:]).mean())
if len(g.cal_AC_index[stock]) >=:
g.AC_index[stock].append(np.array(g.cal_AC_index[stock][-:]).mean()) #判断序列n日上行
def is_up_going(alist,n):
if len(alist) < n:
return False
for i in range(n-):
if alist[-(+i)] <= alist[-(+i)]:
return False
return True #判断序列n日下行
def is_down_going(alist,n):
if len(alist) < n:
return False
for i in range(n-):
if alist[-(+i)] >= alist[-(+i)]:
return False
return True #碎形被突破
def active_fractal(stock,direction):
close_price = history(,'1d','close',[stock],df=False)[stock][]
if direction == 'up' and close_price > g.up_price[stock]:
return True
elif direction == 'down' and close_price < g.low_price[stock]:
return True
return False #进场,初始仓位
def set_initial_position(stock,context):
close_price = history(,'1d','close',[stock],df=False)[stock][]
g.amount[stock] = context.portfolio.cash/close_price/len(g.buy_stock)*
order(stock, g.amount[stock])
log.info("buying %s 股数为 %s"%(stock,g.amount[stock]))
g.down_fractal_exists[stock] = False #卖出
def sell_all_stock(stock,context):
order_target(stock,)
log.info("selling %s"%stock)
g.up_fractal_exists[stock] = False #加仓
def adjust_position(stock,context,position):
order(stock,g.amount[stock]*position)
log.info("adjust position buying %s 股数为 %s"%(stock,g.amount[stock]*position)) # 计算股票前n日收益率
def security_return(days,security_code):
hist1 = attribute_history(security_code, days + , '1d', 'close',df=False)
security_returns = (hist1['close'][-]-hist1['close'][])/hist1['close'][]
return security_returns # 止损,根据前n日收益率
def conduct_nday_stoploss(context,security_code,days,bench):
if security_return(days,security_code)<= bench:
for stock in g.buy_stock:
order_target_value(stock,)
log.info("Sell %s for stoploss" %stock)
return True
else:
return False # 计算股票累计收益率(从建仓至今)
def security_accumulate_return(context,data,stock):
current_price = data[stock].price
cost = context.portfolio.positions[stock].avg_cost
if cost != :
return (current_price-cost)/cost
else:
return None # 个股止损,根据累计收益
def conduct_accumulate_stoploss(context,data,stock,bench):
if security_accumulate_return(context,data,stock) != None\
and security_accumulate_return(context,data,stock) < bench:
order_target_value(stock,)
log.info("Sell %s for stoploss" %stock)
return True
else:
return False # 个股止盈,根据累计收益
def conduct_accumulate_stopwin(context,data,stock,bench):
if security_accumulate_return(context,data,stock) != None\
and security_accumulate_return(context,data,stock) > bench:
order_target_value(stock,)
log.info("Sell %s for stopwin" %stock)
return True
else:
return False def handle_data(context,data):
#大盘止损
if conduct_nday_stoploss(context,'000300.XSHG',,-0.03):
return
for stock in g.buy_stock:
#个股止损
if conduct_accumulate_stopwin(context,data,stock,0.3)\
or conduct_accumulate_stoploss(context,data,stock,-0.1):
return
#计算AO,AC指标
AC_index(stock)
#空仓时,寻找机会入场
if context.portfolio.positions[stock].amount == :
#计算向上碎形
is_effective_fractal(stock,'high')
#有效向上碎形存在,并被突破,买入
if g.up_fractal_exists and active_fractal(stock,'up'):
close_price = history(, '1d', 'close', [stock],df = False)
if is_up_going(g.AO_index[stock],)\
and is_up_going(g.AC_index[stock],)\
and is_up_going(close_price[stock],):
set_initial_position(stock,context)
#有持仓时,加仓或离场
else:
#计算向下碎形
is_effective_fractal(stock,'low')
#出场条件1:有效向下碎形存在,并被突破,卖出
if g.down_fractal_exists and active_fractal(stock,'down'):
sell_all_stock(stock,context)
return

鳄鱼交易法

05-04 04:56