# -*- coding: utf-8 -*-
"""
Created on Sat Aug 18 11:08:38 2018
@author: acadsoc
"""
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from pyecharts import Bar, Line, Page, Overlap
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler
# import pymssql
from dateutil import parser
import copy
import os
import sys
from featureSelection import featureSelection
plt.style.use('ggplot') # 设置ggplot2画图风格
# 根据不同平台设置其中文字体路径
if sys.platform == 'linux':
zh_font = matplotlib.font_manager.FontProperties(
fname='path/anaconda3/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/STZHONGS.TTF')
else:
zh_font = matplotlib.font_manager.FontProperties(fname='C:\Windows\Fonts\STZHONGS.ttf') # 设置中文字体
# 根据不同平台设定工作目录
if sys.platform == 'linux':
os.chdir(path) # Linux path
else:
os.chdir(path) # Windows path
class rollingRegression():
'''
滚动多元回归分析类。
参数
----
response : str
回归因变量。
date_begin : datetime
起始日期。
date_end : datetime
截止日期。
rolling_days : int
滚动天数。
intercept : bool
回归方程是否带常数项。
p_value_threshold : float
回归系数按p值显示阈值。
normalize : bool
数据是否标准化。
属性
----
coef_ : dataframe
回归系数。
coef_pvalue_ : dataframe
回归系数pvalue。
r2_ : dataframe
回归模型Rsquared 和 Rsquared_adj。
echarts_page_ : echarts_page
echarts_page 文件。
'''
def __init__(self, response='新单数', date_begin='2018-01-01', date_end='2018-07-31', rolling_days=30,
intercept=False, p_value_threshold=.1, normalize=False):
self.response = response # 回归因变量
self.date_begin = date_begin # 起始日期
self.date_end = date_end # 终止日期
self.rolling_days = rolling_days # 滚动天数
self.intercept = intercept # 回归方程是否带常数项
self.p_value_threshold = p_value_threshold # p值显示阈值
self.normalize = normalize # 是否将数据标准化后再进行回归分析
if self.normalize: # 如果数据标准化,常数强制设置为0
self.intercept = False
# 起始日期间隔必须大于等于滚动天数
if (parser.parse(self.date_end) - parser.parse(self.date_begin)).days < self.rolling_days:
raise IOError('起始日期间隔必须大于等于滚动天数,请重新选择起始日期或者调整滚动日期。')
def getData(self, file='业绩相关数据2018-8-1.xlsx', variabls_in=None, variables_out=None):
'''
读取数据。
注:该方法只适合读取特定方式的数据,第一列应是datetime类型,第二列是因变量,其余是自变量。
参数
----
file : str
文件路径及文件名。
variabls_in : list, 默认是None
需要用到的自变量。
variables_out : list, 默认是None
不用的自变量。
return
------
df_ : dataframe
分析用数据框,response在第一列。
'''
if variabls_in: # 如果有选入模型的自变量,强制将variable_out设置为None
variables_out = None
if file.split('.')[-1] == 'xlsx': # 根据不同文件格式读取数据
df = pd.read_excel(file)
elif file.split('.')[-1] == 'txt':
df = pd.read_table(file)
else:
df = eval('pd.read_' + file.split('.')[-1] + '(file)')
df.index = df.iloc[:, 0] # 将日期变为索引
df = df.iloc[:, 1:]
df[df.isnull()] = 0 # 缺失值填充
df = df.astype(float) # 将数据框object格式转换为float
# dateTransfer = np.vectorize(self._dateTransfer) # 向量化日期转换函数
# df.index = dateTransfer(df.index) # 转换索引日期格式
df.index = pd.DatetimeIndex(df.index) # 将索引转换为datetime格式
if variabls_in:
df = pd.concat([df[df.columns[0]], df[variabls_in]], axis=1)
if variables_out:
for var in variables_out:
df.pop(var)
if self.normalize: # 数据标准化
df_std = StandardScaler().fit_transform(df)
self.df_ = pd.DataFrame(df_std, index=df.index, columns=df.columns)
else:
self.df_ = df
return self
def rollingOLS(self, df):
'''
滚动日期多元线性模型。
参数
----
df : dataframe
回归分析用数据框,response在第一列。
return
------
coef : dataframe
回归系数。
coef_pvalue : dataframe
回归系数pvalue。
r2 : dataframe
回归模型Rsquared 和 Rsquared_adj。
'''
df = df.loc[(df.index>=self.date_begin) & (df.index<=self.date_end), :] # 按照参数给定起始、截止时间选择数据
df = df.sort_index(ascending=True) # 按日期升序排序
coef = {}
coef_pvalue = {}
r2 = {}
# 从起始日开始做回归
for i in range(df.shape[0] - self.rolling_days):
date = df.index[i+self.rolling_days]
data = df.iloc[i:i+self.rolling_days, :]
X = data.iloc[:, 1:]
y = data.iloc[:, 0]
# 线性回归模型拟合
model = sm.OLS(y, X, hasconst=self.intercept)
lr = model.fit()
# 按字典格式保存系数、pvalue、R2
coef[date] = lr.params
coef_pvalue[date] = lr.pvalues
r2[date] = []
r2[date].append(lr.rsquared)
r2[date].append(lr.rsquared_adj)
# 系数字典转化为数据框,并按日期升序排序
coef = pd.DataFrame.from_dict(coef, orient='index')
coef = coef.sort_index(ascending=True)
# 系数pvalue转化为数据框,并按日期升序排序
coef_pvalue = pd.DataFrame.from_dict(coef_pvalue, orient='index')
coef_pvalue = coef_pvalue.sort_index(ascending=True)
# R2转化为数据框,并按日期升序排序
r2 = pd.DataFrame.from_dict(r2, orient='index')
r2.columns = ['R_squred','R_squred_adj']
r2 = r2.sort_index(ascending=True)
return coef, coef_pvalue, r2
def _dateTransfer(self, date):
'''
定义日期转换函数。
参数
----
date : str
需要转换的日期数据。
return
------
date : datetime
日期。
'''
return parser.parse(date).strftime('%Y-%m-%d')
def fit(self, feat_selected=None):
'''
多元回归分析并保存数据。
参数
----
feat_selected : list, 默认是None
分析用的特征列表。
return
------
coef_ : dataframe
回归系数。
coef_pvalue_ : dataframe
回归系数pvalue。
r2_ : dataframe
回归模型Rsquared 和 Rsquared_adj。
'''
if feat_selected is not None:
df = pd.concat([self.df_.iloc[:, 0], self.df_[feat_selected]], axis=1)
else:
df = self.df_
# 滚动回归分析
self.coef_, self.coef_pvalue_, self.r2_ = self.rollingOLS(df)
# 存储分析数据表
self.coef_.to_excel('coef.xlsx')
self.coef_pvalue_.to_excel('coef_pvalue.xlsx')
self.r2_.to_excel('r2.xlsx')
return self
def coefPlots(self, width_subplot=12, height_subplot=5, columns_subplots=3):
'''
画滚动回归系数及pvalue图。
参数
----
width_subplot : int
子图宽度。
height_subplot : int
子图高度。
columns_subplots : int
子图列数。
'''
num_subplots = self.coef_.shape[1] + 1 # 确定子图个数
# 确定子图行数
if num_subplots % columns_subplots == 0: # 余数为0
rows_subplots = num_subplots // columns_subplots # 取整
else:
rows_subplots = num_subplots // columns_subplots + 1
# 确定画布宽、高
width_figure = columns_subplots * width_subplot
height_figure = rows_subplots * height_subplot
# 绘制滚动回归R2图
plt.figure(figsize=(width_figure, height_figure))
plt.subplot(rows_subplots, columns_subplots, 1)
plt.plot(self.r2_['R_squred'], color='r', lw=3, label='R_squred')
plt.plot(self.r2_['R_squred_adj'], color='g', lw=3, label='R_squred_adj')
plt.title('R2')
plt.legend()
# 在子图中画系滚动回归系数及p值图
for i, feature in enumerate(self.coef_.columns): # 系数图
plt.subplot(rows_subplots, columns_subplots, i+2)
plt.plot(self.coef_[feature], color='red', lw=3, label='Beta')
for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]): # p值图
if pvalue <= self.p_value_threshold:
plt.vlines(t, ymin=np.min(self.coef_[feature]), ymax=np.max(self.coef_[feature]),
color='green', alpha=.3, lw=5, label='p_value')
#plt.xlabel('日期')
if ((i + columns_subplots + 1) % columns_subplots) & (i > 0) == 0:
plt.ylabel('coef')
plt.title(feature, fontproperties=zh_font)
# plt.savefig('rollingRegression.jpeg') # 保存图片
plt.show()
return self
def coefEcharts(self):
'''
利用Echarts画图。
注:因为没有vline方法,故用echarts画出的图文件过大,在浏览器中打开很慢。
参数
----
return
------
echarts_page_ : echarts_page
echarts_page 文件。
'''
self.echarts_page_ = Page(self.response + '回归分析')
charts = []
zeros = np.zeros(self.coef_.shape[0])
line = Line('R2') # R2图
bar = Bar()
line.add('R_squred', self.r2_.index, self.r2_['R_squred'], is_more_utils=True)
line.add('R_squred_adj', self.r2_.index, self.r2_['R_squred_adj'], is_more_utils=True)
charts.append(line)
for i, feature in enumerate(self.coef_.columns):
min_num = np.min(self.coef_[feature])
max_num = np.max(self.coef_[feature])
line = Line(feature)
bar = Bar()
ol = Overlap()
line.add('coef', self.coef_.index, self.coef_[feature], is_more_utils=True) # 系数图
#line.on()
for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]): # p值图
if pvalue <= self.p_value_threshold:
min_array, max_array = copy.deepcopy(zeros), copy.deepcopy(zeros)
min_array[self.coef_.index==t] = min_num
max_array[self.coef_.index==t] = max_num
bar.add('p-value', self.coef_.index, min_array)
bar.add('p-value', self.coef_.index, max_array)
ol.add(line)
ol.add(bar)
charts.append(ol)
self.echarts_page_.add(charts)
self.echarts_page_.render() # 保存格式为HTML, 保存地址为设定的全局path
return self