我们开发的AI小助手,在具备调用内外部接口生成代码(参考10.1、10.2)的基础上,已经具备了初步的自然语言编程能力。 但在实际应用中仍然存在一些不方便的地方,主要的问题是:
在生成和执行代码过程中,但凡有一点瑕疵就要推到重来,重新从需求描述开始;但大模型生成代码稳定性不足,并不保证下一次生成的代码比上一次更好,如果需求比较复杂,步骤较多,则问题更加严重,往往按下葫芦起了瓢,需要提高效率。

  1. 代码生成的问题主要有两类:

1)自然语言描述功能有瑕疵
除了笔误,主要是描述考虑不周全,例如: 根据沪深300的成份股列表,按股票代码遍历,按akshare库的stock_zh_a_spot_em接口匹配最新行情,获取最新价,然后再按股票代码匹配报表数据获得股票收益,然后计算市盈率。

这样可以获得准确答案,但大概率会把代码生成为,每次遍历循环中,都取一次全市场行情以及报表数据—— 显然有更好的方法,在执行循环遍历前获得全市场行情以及全市场报表数据

2)AI生成代码不准确
例1: 按股票代码在业绩报表中匹配每股收益,然后计算TTM市盈率

for stock_code in stock_list:
            # 假设getReportDataLocal返回的数据结构可以直接通过stock_code获取每股收益
            lastSeason_eps = lastSeason_report.get(stock_code, {}).get('每股收益', 0)  # AI错误生成
            initSeason_eps = initSeason_report.get(stock_code, {}).get('每股收益', 0)  # AI错误生成
            lastyear_eps = lastyear_report.get(stock_code, {}).get('每股收益', 0)     # AI错误生成
            
            
            # 计算TTM每股收益
            TTM_eps = lastSeason_eps + (lastyear_eps - initSeason_eps)
            print("TTM_eps::",TTM_eps)
            
            # 使用akshare获取实时股价
            stock_df.rename(columns={'代码': '股票代码'}, inplace=True)
            current_price = stock_df[stock_df['股票代码'] == stock_code]['最新价'].values[0]
            
            # 计算TTM市盈率
            TTM_pe = current_price / TTM_eps if TTM_eps != 0 else None
            
            # 存储结果
            stock_info = {
                '股票代码': stock_code,
                'TTM每股收益': TTM_eps,
                '股价': current_price,
                'TTM市盈率': TTM_pe
            }
            stock_300.append(stock_info)

很容易看出上述例子中,整体框架是正确的,但是按股票代码stock_code匹配数据的三个语句显然是错误的(虽然是可执行的,但结果是空)。
当然不难改正,例如错误的语句可以修正为标准的

        lastSeason_eps = lastSeason_report[lastSeason_report['股票代码'] == stock_code]['每股收益'].values[0]
        initSeason_eps = initSeason_report[initSeason_report['股票代码'] == stock_code]['每股收益'].values[0]
        lastyear_eps = lastyear_report[lastyear_report['股票代码'] == stock_code]['每股收益'].values[0]

例2:

stock_300 = pd.DataFrame()    

for stock_code in stock_list:
           ...
            
            # 存储结果
            stock_info = {
                '股票代码': stock_code,
                'TTM每股收益': TTM_eps,
                '股价': current_price,
                'TTM市盈率': TTM_pe
            }
            stock_300.append(stock_info) # 这里会报错,stock_300是dataframe,3.10之后不支持append方法
这种也只需要简单改进就行,比如换成字典对象,就能用append,然后再转成dataframe
stock_300 = []  

for stock_code in stock_list:
           ...
            
            # 存储结果
            stock_info = {
                '股票代码': stock_code,
                'TTM每股收益': TTM_eps,
                '股价': current_price,
                'TTM市盈率': TTM_pe
            }
            stock_300.append(stock_info)

df_stock_300 = pd.DataFrame(stock_300)

以上两个例子表明,某些情况下只需要对代码进行简单人工修正就可以得到正确的结果,并不需要再从头来一遍。

  1. 主要改进, AI在完成一轮自然语言输入,代码生成,执行之后,允许用户选择
    所以我们对 Assistant类,新编写了genCodeNew方法,改进如下:
    当一轮代码生成完成后,用户可以选择
    1)按原需求描述,重新生成代码
    2)在存放临时目录里,手工修改代码后,自动执行
    3)重新输入自然语言描述,再来一次
    4)执行后续操作(代码分析,生成说明,入知识库等…)
    待执行成功后继续,完成代码功能分析,生成代码描述,保存到本地代码库等流程。

以下代码用了两层循环,外循环针对exec_stat执行状态的变化,起始是1(生成代码) 后续会根据用户的选择变化(0-表示执行成功退出 1-表示需要生成代码; 2-表示需要执行代码 3-表示需要输入需求描述), 内循环是在exec_stat=3代表完成一轮,等待用户输入后续,请指示下一步操作(选1/2/3/4):
1- 重新生成代码
2- 重新执行
3- 重新输入需求描述
4- 退出,执行后续
相对简单,可以对照代码查看

import ...

class Assistant:
    def __init__(self, llm: BaseLLM):
       ...


    def genCodeNew(self,message: str, summary_prompt: str):
        prompt = self.CONTEXT_QA_PROMPT.format(summary_prompt=summary_prompt, message=message)
        exec_stat = 1  #  0-表示执行成功退出 1-表示需要生成代码; 2-表示需要执行代码 3-表示需要输入需求描述
        result = {}
        real_code = ""
        file_name = ""

        while exec_stat > 0:
            try:
                match exec_stat:
                    case 1:   # 生成代码
                        # 生成代码
                        raw_code = self.llm(prompt)
                        print('\n raw_code:: %s \n' % raw_code)
                        #  提取并展示代码
                        real_code = re.findall(r'```python(.*?)```', raw_code, re.DOTALL)[0]
                        print("realcode:: \n  %s" % real_code)
                        # 保存代码到临时目录
                        file_name = self.saveCode(real_code, self.test_path)
                        if file_name == "保存失败":
                            print("保存失败")
                            exec_stat = 0   # 停止执行
                        else:
                            print("保存成功")
                            exec_stat = 2

                    case 2:  # 执行代码
                        result = self.executeCode(real_code)
                        print('\n exec_result:: %s \n' % result['exec_res'])   # 展示结果
                        exec_stat = 3

                    case 3:   # 需要用户重新输入需求描述
                        # 根据结果问询用户的下一步操作
                        cont_menu = """
                        请指示下一步操作(选1/2/3/4):
                        1- 重新生成代码
                        2- 重新执行
                        3- 重新输入需求描述
                        4- 退出,执行后续
                        """
                        while True:
                            u_input = input(cont_menu)
                            if str(u_input) == "1":   # 重新生成代码
                                exec_stat = 1
                                break
                            elif str(u_input) == "2":   # 重新执行,代码需要调用临时文件中获取
                                # file_n  = self.test_path + file_name
                                # print("正在执行文件 %s" % file_n)
                                # real_code = self.getCode(file_n)
                                real_code = self.getCode(file_name)
                                exec_stat = 2
                                break
                            elif str(u_input) == "3":   # 重新输入需求描述
                                user_input = ""
                                print("请输入您的问题,以回车结束或者输入空行结束:")
                                while True:
                                    line = input()
                                    # 如果输入为空行,或者仅含回车符号。退出循环
                                    if line == "\n" or line.strip() == "":
                                        user_input = "stop"
                                        break
                                    else:
                                        user_input += line + "\n"

                                if line == "stop":
                                    break
                                else:
                                    prompt = self.CONTEXT_QA_PROMPT.format(summary_prompt=summary_prompt, message=user_input)
                                    exec_stat = 1
                                    break
                            elif str(u_input) == "4":   # 退出执行
                                exec_stat = 0
                                break


            except Exception as e:
                match exec_stat:
                    case 1:   # 代码生成错误
                        print("生成代码失败: %s" % e)
                    case 2:  #  代码执行错误
                        print("执行代码失败: %s" % e)
                    case 3:   # 用户输入错误
                        print("其他错误: %s" % e)
                traceback.print_exc()
                exec_stat = 0   # 停止执行
                continue


        return result

    # 执行代码
    def executeCode(self,code: str):
       ...

    # 保存代码
    def saveCode(self, code: str, path: str):
        ...

    def getCode(self,file_name: str):
        ...

    # 匹配python文件名
    def match_python_files(self, msg: str):
        ...

    # 自动生成文件名
    def getFileName(self, code: str):
        ...

    # 人工确认内容
    def confirm_content(self, key_word: str, key_text: str):
        ...

    # 分析代码产生接口函数说明和
    def saveInterfaceDesc(self, code: str):
       ...

相应的主程序改成调用 Assistant.genCodeNew方法

import ...

dashscope.api_key = ...

if __name__ == '__main__':
    os.environ["DASHSCOPE_API_KEY"] = dashscope.api_key
    llm = Tongyi(model_name="qwen-max", streaming=True, temperature=0)

    assistant = Assistant(llm)

    while True:
        try:
            # 单行输入
            # user_input = input("请输入您的问题:")
            # 多行输入
            user_input = ""
            print("请输入您的问题,以回车结束或者输入空行结束:")
            while True:
                line = input()
                # 如果输入为空行,或者仅含回车符号。退出循环
                if line == "\n" or line.strip() == "":
                    break
                else:
                    user_input += line + "\n"

            similarDocs = assistant.local_interface_db.searchKnowledge(user_input)
            summary_prompt = "".join([doc.page_content for doc in similarDocs])  # 找到最接近的描述doc
            summary_prompt = assistant.local_interface_db.searchKnowledge(summary_prompt)  # 确定需要生成的函数类型

            # 使用新版可重复尝试生成代码的方法
            result = assistant.genCodeNew(user_input, summary_prompt)
            print('result:: ', result)
            print('\n')

            """
            # 使用旧版的代码生成方法
            raw_code = assistant.genCode(user_input, summary_prompt)  # 提交给大模型生成代码      
            ...
            """
            # 遍历执行结果,保存接口函数


            if result['exec_succ']:
                # 获取接口说明,并保存接口函数
                assistant.saveInterfaceDesc(result['code'])
            # 储存需求到本地库
            assistant.local_requirement_db.insertKnowledge(user_input)

            print('本轮处理完毕' + '\n')

        except KeyboardInterrupt:

            break

        except Exception as e:
            print("执行异常: %s" % e)
            traceback.format_exc()
  1. 次要改进
    本次更新顺便改进了其他两个地方

1)支持用户输入自然语言需求描述时,按可换行的顺序格式输入需求
原先的input方法遇到回车符号就会退出,如果遇到以下格式:

请按下列步骤生成计算沪深300静态市盈率的python代码:
1、 获得离当前日期前一年的最后一天,按XXXX1231格式输出到lastyear_reportDate;
2、 使用pythonRepository库的getReportDataLocal(report_date=lastyear_reportDate)方法获取上年末的报表lastyear_report;
3、 使用pythonRepository库的getHS300List方法获得沪深300成份股,生成stock_list列表;
4、 使用akshare库的stock_zh_a_spot_em接口获取全市场行情,再遍历stock_list列表,按股票代码匹配全市场行情,获得当前股价,然后用同一股票代码匹配lastyear_report每股收益,除以当前股价,获得该股票的市盈率,最后结果在stock_list列表增加一列{静态市盈率},并打印该列表
5、 将计算完毕的stock_list按csv形式保存于本地,文件名为HS300_Static_PE,路径是/home/cfets/myquant/fundamentalData,返回值为保存结果

就必须压缩为一行,非常别扭:
请按下列步骤生成计算沪深300静态市盈率的python函数calc_HS300_static_pe,无入参: 1、获得离当前日期前一年的最后一天,按XXXX1231格式输出到lastyear_reportDate;2、使用pythonRepository库的getReportDataLocal(report_date=lastyear_reportDate)方法获取上年末的报表lastyear_report;3、使用pythonRepository库的getHS300List方法获得沪深300成份股,生成stock_list列表;4、使用akshare库的stock_zh_a_spot_em接口获取全市场行情,再遍历stock_list列表,按股票代码匹配全市场行情,获得当前股价,然后用同一股票代码匹配lastyear_report每股收益,除以当前股价,获得该股票的市盈率;5、最后结果合并在stock_list列表,形成’股票代码’、‘每股收益’、‘股价’、'静态市盈率’四列,并打印该列表,然后按csv形式保存于本地,文件名为HS300_Static_PE,路径是/home/cfets/myquant/fundamentalData,返回值为保存结果。

所以改进为(参考上述主程序代码)

  while True:
        try:
            # 单行输入
            # user_input = input("请输入您的问题:")
            # 多行输入
            user_input = ""
            print("请输入您的问题,以回车结束或者输入空行结束:")
            while True:
                line = input()
                # 如果输入为空行,或者仅含回车符号。退出循环
                if line == "\n" or line.strip() == "":
                    break
                else:
                    user_input += line + "\n"

            similarDocs = assistant.local_interface_db.searchKnowledge(user_input)
            ...

即支持
1、 … 回车
2、… 回车
3、…

按两下回车即可退出

  1. 根据代码自动生成文件名方法,改成按正则表达式匹配

即按pattern = r’\b(?<!/)[a-zA-Z0-9_.]+.py\b’匹配 XXXX.py的文件名

# 匹配python文件名
    def match_python_files(self, msg: str):
        pattern = r'\b(?<!\/)[a-zA-Z0-9_\.]+\.py\b'
        matches = re.findall(pattern, msg)[0]
        return matches

    # 自动生成文件名
    def getFileName(self, code: str):
        CONTEXT_FILENAME_TMPL = """
        请根据{file_code}所示代码起一个合适英语文件名
        """

        CONTEXT_FILENAME_PROMPT = PromptTemplate(
            input_variables=["file_code"],
            template=CONTEXT_FILENAME_TMPL,
        )

        prompt = CONTEXT_FILENAME_PROMPT.format(file_code=code)
        print('prompt::', prompt)
        print('\n')

        file_name=""

        try:
            raw_file_name = self.llm(prompt)
            print('raw_file_name::', raw_file_name)
            print('\n')
            file_name = self.match_python_files(raw_file_name)
            # file_name = raw_file_name.split("\"")[1]

            print('file_name::', file_name)
            print('\n')
        except Exception as e:
            print("生成文件名失败: %s" % e)
            traceback.print_exc()
        finally:
            return file_name

上述代码可结合10.2提供的基本代码获得完整代码,下一篇,我们通过计算静态市盈率,动态市盈率和TTM市盈率,来应用AI小助手进行自然语言编程,并分享一些心得

10-10 00:27