# -*- coding: utf-8 -*- """ File Name: factor_factory_py Author : k0180110 Modify Date: 2019-11-13 """ import pandas as pd import numpy as np from sklearn.linear_model import LinearRegression class Func: def dtm(self, open, high): condition1 = open[open.diff(1) < 0] condition2 = open[open.diff(1) > 0] open = self.max_df((high * condition2 - open * condition2), (open) * condition2.diff(1)) return open def dbm(self, open, low): condition2 = open[open.diff(1) > 0] open = self.max_df((open * condition2 - low * condition2), (open) * condition2.diff(1)) return open def max_df(self, df1, df2): df3 = pd.concat([df1, df2], axis=0) result = df3.max() return result def tr(self, high, low, close): part1 = high - low part2 = self.abs((high - self.delay(close))) part3 = self.max_df(part1, part2) part4 = low - self.delay(close) result = self.max_df(part3, part4) return result def hd(self, high): result = high - self.delay(high) return result def ld(self, low): result = self.delay(low) - low return result def count(self, df, window=10, condition=True): """对df前n项条件求数量,df所有数据置为1,其中condition表示选择条件""" df.iloc[:, :] = 1 return self.ts_sum(df * condition, window) def FILTER(self, df, condition=True): return df * condition # return df[condition] def fama(self): pass def sumac_df(self, df): """序列df过去n天累乘""" return df.cumsum() def rolling_decay(self, df): """用于wma函数""" weight = range(1, len(df) + 1)[::-1] / np.array(range(1, len(df) + 1)).sum() weight = np.array(weight).reshape(1, -1) return np.dot(weight, np.asarray(df)) def decayliner(self, df, window): df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) df.fillna(value=0, inplace=True) return df.rolling(window).apply(self.rolling_decay, raw=True) def highday(self, df): df.index = range(len(df))[::-1] df = df[df == df.max()] temp_s = pd.Series(index=df.columns) for col in df.columns: temp_s[col] = df[col].dropna().index.to_list()[0] return temp_s def lowday(self, df): df.index = range(len(df))[::-1] df = df[df == df.min()] temp_s = pd.Series(index=df.columns) for col in df.columns: temp_s[col] = df[col].dropna().index.to_list()[0] return temp_s def rolling_wma(self, df): """用于wma函数""" # a = [0.9 ** i for i in range(1, n + 1)] # 以前的 # return sum([a[i] * list1[-i - 1] for i in range(0, len(a))]) / sum(a) # 网金的 # weight = (self.sequence(len(df)) - 1)[::-1] * 0.9 * 2 / (len(df) * (len(df) + 1)) # 这个为什么 时间距离越久,权重提高越高 # 自己的 网金这个为什么 时间距离越久,权重提高越高,惩罚近期数据的影响? weight = (self.sequence(len(df)) - 1)[::-1] * 0.9 * 2 / (len(df) * (len(df) + 1)) weight = np.array(weight).reshape(1, -1) return np.dot(weight, np.asarray(df)) def wma_df(self, df, window): df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) df.fillna(value=0, inplace=True) result = df.rolling(window).apply(self.rolling_wma, raw=True) return result """ 废弃 def wma(self, df, period=10): # 算df前period期样本加权平均值权重为0.9i,(i表示样本距离当前时点的间隔) return df.rolling(period).apply(self.rolling_wma, raw=True) def WMA(list1, n): # need:(list,number) return:number 计算A前n期样本加权平均值 a = [0.9 ** i for i in range(1, n + 1)] return sum([a[i] * list1[-i - 1] for i in range(0, len(a))]) / sum(a) """ def rank_df(self, df): return df.rank(pct=True) def ts_rank(self, df, window=10): """序列df的末位值在过去n天的顺序排位""" return df.iloc[-1 * window:].rank(pct=True) return df.rolling(window).apply(self.rank_df) def ts_min(self, df, window=10): """序列df过去n天的最小值""" return df.rolling(window).min() def ts_max(self, df, window=10): """序列df过去n天的最大值""" return df.rolling(window).max() def delay(self, df, period=1): """df延迟period长度的值""" return df.shift(period) def ts_sum(self, df, window=10): """序列df过去n天求和""" return df.rolling(window).sum() def sumif(self, df, window=10, condition=True): """对df前n项条件求和,其中condition表示选择条件""" return self.ts_sum(df * condition, window) def sign(self, df): """对df取符号函数""" return np.sign(df) def sequence(self, n): """生成 1~n 的等差序列""" return np.asarray(range(1, n + 1)) def rank(self, df): """向量df升序排序""" return df.rank(axis=0, pct=True) def rolling_prod(self, na): """prod 的辅助函数""" return np.prod(na) def prod(self, df, window=10): """序列df过去n天累乘""" return df.rolling(window).apply(self.rolling_prod, raw=True) def mean(self, df, window=10): """序列df过去n天均值""" return df.rolling(window).mean() # ------------------已用 def stddev(self, df, window=10): """序列df过去n天标准差""" return df.rolling(window).std() def correlation(self, df1, df2, windows=10): result = df1.rolling(windows).corr(df2) return result def coviance(self, df1, df2, windows=10): result = df1.rolling(windows).cov(df2) return result def abs(self, df): return df.abs() """ def sma(self, df, n, m): # Yi+1 =(dfi*m+Yi*(n-m))/n,其中Y表示最终结果 # Clean data if pd.Series(df).isnull().any(): df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) df.fillna(value=0, inplace=True) y = [list(df)[0]] for x in range(0, len(list(df)) - 1): y.append((list(df)[x] * m + y[-1] * (n - m)) / n) return y for col in close.columns: close[col] = self.sma(close[col], 3, 1) """ def sma_df(self, df, n, m): """Yi+1 =(dfi*m+Yi*(n-m))/n,其中Y表示最终结果""" df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) df.fillna(value=0, inplace=True) y = df.iloc[0:1] for x in range(0, len(df) - 1): y = y.append((df.iloc[x] * m / n) + (y.iloc[-1] * (n - m) / n), ignore_index=True) y.index = df.index return y # 调用方式 # qqq = self.sma_df(close, 3, 1) # 是否用ols,是否加常数项 def reg_beta(self, x_df, y_df): # 传进来的是df,循环每一列和df_b做回归 x_train = np.array(x_df).reshape(-1, 1) y_train = np.array(y_df).reshape(-1, 1) linreg = LinearRegression() linreg.fit(x_train, y_train) result = float(linreg.coef_) return result # return float(linreg.intercept_) def reg_sigma(self, x_df, y_df): # 传进来的是各自为一列的df x_train = np.array(x_df).reshape(-1, 1) y_train = np.array(y_df).reshape(-1, 1) linreg = LinearRegression() linreg.fit(x_train, y_train) y_ = linreg.predict(x_train) result = (y_train - y_).std() return result def ts_argmax(self, df): """用于highday函数""" # 这个为什么再-1 return len(df) - np.argmax(df) - 1 def highday(self, df, window=10): return df.rolling(window).apply(self.ts_argmax, raw=True) def ts_argmin(self, df): """用于highday函数""" # 这个为什么再-1 return len(df) - np.argmax(df) - 1 def lowday(self, df, window=10): return df.rolling(window).apply(self.ts_argmin, raw=True) def fama(self, close, mv, pb, ): ret = close.pct_change(periods=1).fillna(0.0) mkt_ret = (ret * mv).sum(axis=1) / mv.sum(axis=1) me30 = (mv.T <= mv.quantile(0.3, axis=1)).T me70 = (mv.T >= mv.quantile(0.7, axis=1)).T pb30 = (pb.T <= pb.quantile(0.3, axis=1)).T pb70 = (pb.T >= pb.quantile(0.7, axis=1)).T smb_ret = ret[me30].mean(axis=1, skipna=True) - ret[me70].mean(axis=1, skipna=True) hml_ret = ret[pb70].mean(axis=1, skipna=True) - ret[pb30].mean(axis=1, skipna=True) result = pd.concat([mkt_ret, smb_ret, hml_ret], axis=1) return result
rank
我的理解是3000只股票的结果值进行排序
ts_rank
我的理解是每只股票按天按天进行排序
wma
question1:文档应该就是时间距离现在越久,权重越高,是否是这样理解?
question2:是直接取最后一个加权得出来的值an,还是取所有值的平均值(a1+a2+a3+...+an)/n
reg_beta
question1:是否用ols,是否加常数项?
fama
question1:这个函数能帮忙发我下吗?有点复杂,我想对照代码理解下,