我们开发的AI小助手,在具备调用内外部接口生成代码(参考10.1、10.2)的基础上,已经具备了初步的自然语言编程能力。 但在实际应用中仍然存在一些不方便的地方,主要的问题是:
在生成和执行代码过程中,但凡有一点瑕疵就要推到重来,重新从需求描述开始;但大模型生成代码稳定性不足,并不保证下一次生成的代码比上一次更好,如果需求比较复杂,步骤较多,则问题更加严重,往往按下葫芦起了瓢,需要提高效率。
- 代码生成的问题主要有两类:
1)自然语言描述功能有瑕疵
除了笔误,主要是描述考虑不周全,例如: 根据沪深300的成份股列表,按股票代码遍历,按akshare库的stock_zh_a_spot_em接口匹配最新行情,获取最新价,然后再按股票代码匹配报表数据获得股票收益,然后计算市盈率。
这样可以获得准确答案,但大概率会把代码生成为,每次遍历循环中,都取一次全市场行情以及报表数据—— 显然有更好的方法,在执行循环遍历前获得全市场行情以及全市场报表数据
2)AI生成代码不准确
例1: 按股票代码在业绩报表中匹配每股收益,然后计算TTM市盈率
for stock_code in stock_list:
# 假设getReportDataLocal返回的数据结构可以直接通过stock_code获取每股收益
lastSeason_eps = lastSeason_report.get(stock_code, {}).get('每股收益', 0) # AI错误生成
initSeason_eps = initSeason_report.get(stock_code, {}).get('每股收益', 0) # AI错误生成
lastyear_eps = lastyear_report.get(stock_code, {}).get('每股收益', 0) # AI错误生成
# 计算TTM每股收益
TTM_eps = lastSeason_eps + (lastyear_eps - initSeason_eps)
print("TTM_eps::",TTM_eps)
# 使用akshare获取实时股价
stock_df.rename(columns={'代码': '股票代码'}, inplace=True)
current_price = stock_df[stock_df['股票代码'] == stock_code]['最新价'].values[0]
# 计算TTM市盈率
TTM_pe = current_price / TTM_eps if TTM_eps != 0 else None
# 存储结果
stock_info = {
'股票代码': stock_code,
'TTM每股收益': TTM_eps,
'股价': current_price,
'TTM市盈率': TTM_pe
}
stock_300.append(stock_info)
很容易看出上述例子中,整体框架是正确的,但是按股票代码stock_code匹配数据的三个语句显然是错误的(虽然是可执行的,但结果是空)。
当然不难改正,例如错误的语句可以修正为标准的
lastSeason_eps = lastSeason_report[lastSeason_report['股票代码'] == stock_code]['每股收益'].values[0]
initSeason_eps = initSeason_report[initSeason_report['股票代码'] == stock_code]['每股收益'].values[0]
lastyear_eps = lastyear_report[lastyear_report['股票代码'] == stock_code]['每股收益'].values[0]
例2:
stock_300 = pd.DataFrame()
for stock_code in stock_list:
...
# 存储结果
stock_info = {
'股票代码': stock_code,
'TTM每股收益': TTM_eps,
'股价': current_price,
'TTM市盈率': TTM_pe
}
stock_300.append(stock_info) # 这里会报错,stock_300是dataframe,3.10之后不支持append方法
这种也只需要简单改进就行,比如换成字典对象,就能用append,然后再转成dataframe
stock_300 = []
for stock_code in stock_list:
...
# 存储结果
stock_info = {
'股票代码': stock_code,
'TTM每股收益': TTM_eps,
'股价': current_price,
'TTM市盈率': TTM_pe
}
stock_300.append(stock_info)
df_stock_300 = pd.DataFrame(stock_300)
以上两个例子表明,某些情况下只需要对代码进行简单人工修正就可以得到正确的结果,并不需要再从头来一遍。
- 主要改进, AI在完成一轮自然语言输入,代码生成,执行之后,允许用户选择
所以我们对 Assistant类,新编写了genCodeNew方法,改进如下:
当一轮代码生成完成后,用户可以选择
1)按原需求描述,重新生成代码
2)在存放临时目录里,手工修改代码后,自动执行
3)重新输入自然语言描述,再来一次
4)执行后续操作(代码分析,生成说明,入知识库等…)
待执行成功后继续,完成代码功能分析,生成代码描述,保存到本地代码库等流程。
以下代码用了两层循环,外循环针对exec_stat执行状态的变化,起始是1(生成代码) 后续会根据用户的选择变化(0-表示执行成功退出 1-表示需要生成代码; 2-表示需要执行代码 3-表示需要输入需求描述), 内循环是在exec_stat=3代表完成一轮,等待用户输入后续,请指示下一步操作(选1/2/3/4):
1- 重新生成代码
2- 重新执行
3- 重新输入需求描述
4- 退出,执行后续
相对简单,可以对照代码查看
import ...
class Assistant:
def __init__(self, llm: BaseLLM):
...
def genCodeNew(self,message: str, summary_prompt: str):
prompt = self.CONTEXT_QA_PROMPT.format(summary_prompt=summary_prompt, message=message)
exec_stat = 1 # 0-表示执行成功退出 1-表示需要生成代码; 2-表示需要执行代码 3-表示需要输入需求描述
result = {}
real_code = ""
file_name = ""
while exec_stat > 0:
try:
match exec_stat:
case 1: # 生成代码
# 生成代码
raw_code = self.llm(prompt)
print('\n raw_code:: %s \n' % raw_code)
# 提取并展示代码
real_code = re.findall(r'```python(.*?)```', raw_code, re.DOTALL)[0]
print("realcode:: \n %s" % real_code)
# 保存代码到临时目录
file_name = self.saveCode(real_code, self.test_path)
if file_name == "保存失败":
print("保存失败")
exec_stat = 0 # 停止执行
else:
print("保存成功")
exec_stat = 2
case 2: # 执行代码
result = self.executeCode(real_code)
print('\n exec_result:: %s \n' % result['exec_res']) # 展示结果
exec_stat = 3
case 3: # 需要用户重新输入需求描述
# 根据结果问询用户的下一步操作
cont_menu = """
请指示下一步操作(选1/2/3/4):
1- 重新生成代码
2- 重新执行
3- 重新输入需求描述
4- 退出,执行后续
"""
while True:
u_input = input(cont_menu)
if str(u_input) == "1": # 重新生成代码
exec_stat = 1
break
elif str(u_input) == "2": # 重新执行,代码需要调用临时文件中获取
# file_n = self.test_path + file_name
# print("正在执行文件 %s" % file_n)
# real_code = self.getCode(file_n)
real_code = self.getCode(file_name)
exec_stat = 2
break
elif str(u_input) == "3": # 重新输入需求描述
user_input = ""
print("请输入您的问题,以回车结束或者输入空行结束:")
while True:
line = input()
# 如果输入为空行,或者仅含回车符号。退出循环
if line == "\n" or line.strip() == "":
user_input = "stop"
break
else:
user_input += line + "\n"
if line == "stop":
break
else:
prompt = self.CONTEXT_QA_PROMPT.format(summary_prompt=summary_prompt, message=user_input)
exec_stat = 1
break
elif str(u_input) == "4": # 退出执行
exec_stat = 0
break
except Exception as e:
match exec_stat:
case 1: # 代码生成错误
print("生成代码失败: %s" % e)
case 2: # 代码执行错误
print("执行代码失败: %s" % e)
case 3: # 用户输入错误
print("其他错误: %s" % e)
traceback.print_exc()
exec_stat = 0 # 停止执行
continue
return result
# 执行代码
def executeCode(self,code: str):
...
# 保存代码
def saveCode(self, code: str, path: str):
...
def getCode(self,file_name: str):
...
# 匹配python文件名
def match_python_files(self, msg: str):
...
# 自动生成文件名
def getFileName(self, code: str):
...
# 人工确认内容
def confirm_content(self, key_word: str, key_text: str):
...
# 分析代码产生接口函数说明和
def saveInterfaceDesc(self, code: str):
...
相应的主程序改成调用 Assistant.genCodeNew方法
import ...
dashscope.api_key = ...
if __name__ == '__main__':
os.environ["DASHSCOPE_API_KEY"] = dashscope.api_key
llm = Tongyi(model_name="qwen-max", streaming=True, temperature=0)
assistant = Assistant(llm)
while True:
try:
# 单行输入
# user_input = input("请输入您的问题:")
# 多行输入
user_input = ""
print("请输入您的问题,以回车结束或者输入空行结束:")
while True:
line = input()
# 如果输入为空行,或者仅含回车符号。退出循环
if line == "\n" or line.strip() == "":
break
else:
user_input += line + "\n"
similarDocs = assistant.local_interface_db.searchKnowledge(user_input)
summary_prompt = "".join([doc.page_content for doc in similarDocs]) # 找到最接近的描述doc
summary_prompt = assistant.local_interface_db.searchKnowledge(summary_prompt) # 确定需要生成的函数类型
# 使用新版可重复尝试生成代码的方法
result = assistant.genCodeNew(user_input, summary_prompt)
print('result:: ', result)
print('\n')
"""
# 使用旧版的代码生成方法
raw_code = assistant.genCode(user_input, summary_prompt) # 提交给大模型生成代码
...
"""
# 遍历执行结果,保存接口函数
if result['exec_succ']:
# 获取接口说明,并保存接口函数
assistant.saveInterfaceDesc(result['code'])
# 储存需求到本地库
assistant.local_requirement_db.insertKnowledge(user_input)
print('本轮处理完毕' + '\n')
except KeyboardInterrupt:
break
except Exception as e:
print("执行异常: %s" % e)
traceback.format_exc()
- 次要改进
本次更新顺便改进了其他两个地方
1)支持用户输入自然语言需求描述时,按可换行的顺序格式输入需求
原先的input方法遇到回车符号就会退出,如果遇到以下格式:
请按下列步骤生成计算沪深300静态市盈率的python代码:
1、 获得离当前日期前一年的最后一天,按XXXX1231格式输出到lastyear_reportDate;
2、 使用pythonRepository库的getReportDataLocal(report_date=lastyear_reportDate)方法获取上年末的报表lastyear_report;
3、 使用pythonRepository库的getHS300List方法获得沪深300成份股,生成stock_list列表;
4、 使用akshare库的stock_zh_a_spot_em接口获取全市场行情,再遍历stock_list列表,按股票代码匹配全市场行情,获得当前股价,然后用同一股票代码匹配lastyear_report每股收益,除以当前股价,获得该股票的市盈率,最后结果在stock_list列表增加一列{静态市盈率},并打印该列表
5、 将计算完毕的stock_list按csv形式保存于本地,文件名为HS300_Static_PE,路径是/home/cfets/myquant/fundamentalData,返回值为保存结果
就必须压缩为一行,非常别扭:
请按下列步骤生成计算沪深300静态市盈率的python函数calc_HS300_static_pe,无入参: 1、获得离当前日期前一年的最后一天,按XXXX1231格式输出到lastyear_reportDate;2、使用pythonRepository库的getReportDataLocal(report_date=lastyear_reportDate)方法获取上年末的报表lastyear_report;3、使用pythonRepository库的getHS300List方法获得沪深300成份股,生成stock_list列表;4、使用akshare库的stock_zh_a_spot_em接口获取全市场行情,再遍历stock_list列表,按股票代码匹配全市场行情,获得当前股价,然后用同一股票代码匹配lastyear_report每股收益,除以当前股价,获得该股票的市盈率;5、最后结果合并在stock_list列表,形成’股票代码’、‘每股收益’、‘股价’、'静态市盈率’四列,并打印该列表,然后按csv形式保存于本地,文件名为HS300_Static_PE,路径是/home/cfets/myquant/fundamentalData,返回值为保存结果。
所以改进为(参考上述主程序代码)
while True:
try:
# 单行输入
# user_input = input("请输入您的问题:")
# 多行输入
user_input = ""
print("请输入您的问题,以回车结束或者输入空行结束:")
while True:
line = input()
# 如果输入为空行,或者仅含回车符号。退出循环
if line == "\n" or line.strip() == "":
break
else:
user_input += line + "\n"
similarDocs = assistant.local_interface_db.searchKnowledge(user_input)
...
即支持
1、 … 回车
2、… 回车
3、…
…
按两下回车即可退出
- 根据代码自动生成文件名方法,改成按正则表达式匹配
即按pattern = r’\b(?<!/)[a-zA-Z0-9_.]+.py\b’匹配 XXXX.py的文件名
# 匹配python文件名
def match_python_files(self, msg: str):
pattern = r'\b(?<!\/)[a-zA-Z0-9_\.]+\.py\b'
matches = re.findall(pattern, msg)[0]
return matches
# 自动生成文件名
def getFileName(self, code: str):
CONTEXT_FILENAME_TMPL = """
请根据{file_code}所示代码起一个合适英语文件名
"""
CONTEXT_FILENAME_PROMPT = PromptTemplate(
input_variables=["file_code"],
template=CONTEXT_FILENAME_TMPL,
)
prompt = CONTEXT_FILENAME_PROMPT.format(file_code=code)
print('prompt::', prompt)
print('\n')
file_name=""
try:
raw_file_name = self.llm(prompt)
print('raw_file_name::', raw_file_name)
print('\n')
file_name = self.match_python_files(raw_file_name)
# file_name = raw_file_name.split("\"")[1]
print('file_name::', file_name)
print('\n')
except Exception as e:
print("生成文件名失败: %s" % e)
traceback.print_exc()
finally:
return file_name
上述代码可结合10.2提供的基本代码获得完整代码,下一篇,我们通过计算静态市盈率,动态市盈率和TTM市盈率,来应用AI小助手进行自然语言编程,并分享一些心得