前言
之前7月中旬,我曾在微博上说准备做“20个LLM大型项目的源码解读”
针对这个事,目前的最新情况是
- 已经做了的:LLaMA、Alpaca、ChatGLM-6B、deepspeedchat、transformer、langchain、langchain-chatglm知识库
- 准备做的:chatpaper、deepspeed、Megatron-LM
- 再往后则:BERT、GPT、pytorch、chatdoctor、baichuan、BLOOM/BELLE、Chinese LLaMA、PEFT BLIP2 llama.cpp
总之,够未来半年忙了。为加快这个事情的进度,本文解读两个关于学术论文的GPT(由于我司每周都有好几个或为申博、或为评职称、或为毕业而报名论文1V1发表辅导的,比如中文期刊、EI会议、ei期刊/SCI等等,所以对这个方向一直都是高度关注,我司也在做类似的LLM产品,敬请期待)
- 一个是chatpaper:https://github.com/kaixindelele/ChatPaper
- 一个是gpt_academic:https://github.com/binary-husky/gpt_academic
我把这两个项目的结构做了拆解/解析,且基本把原有代码的每一行都补上了注释,如果大家对任何一行代码有疑问,可以随时在本文评论区留言,我会及时做补充说明
第一部分 ChatPaper:论文对话、总结、翻译
ChatPaper的自身定位是全流程加速科研:论文总结+专业级翻译+润色+审稿+审稿回复,因为论文更多是PDF的格式,故针对PDF的对话、总结、翻译,便不可避免的涉及到PDF的解析
1.1 论文审阅:ChatPaper/ChatReviewerAndResponse
1.1.1 对PDF的解析:ChatReviewerAndResponse/get_paper.py
// 待更
1.1.2 论文审查:ChatReviewerAndResponse/chat_reviewer.py
使用OpenAI的GPT模型进行论文审查的脚本。它首先定义了一个Reviewer类来处理审查工作,然后在if __name__ == '__main__':语句下使用argparse处理命令行参数,并调用chat_reviewer_main函数来开始审查过程
- 导入模块:与第一段代码相似,但新增了一些库,如jieba、tenacity等
- 命名元组定义:用于保存与论文审稿相关的参数
ReviewerParams = namedtuple( "ReviewerParams", [ "paper_path", "file_format", "research_fields", "language" ], )
- 判断文本中是否包含中文:
def contains_chinese(text): for ch in text: if u'\u4e00' <= ch <= u'\u9fff': return True return False
- 插入句子到文本
主要功能是在给定文本的每隔一定数量的单词或中文字符后插入一个指定的句子。如果文本行包含中文字符,则使用jieba分词工具来切分中文,否则使用空格来切分:def insert_sentence(text, sentence, interval): # 将输入文本按换行符分割成行 lines = text.split('\n') # 初始化一个新的行列表 new_lines = [] # 遍历每一行 for line in lines: # 检查行中是否包含中文字符 if contains_chinese(line): # 如果是中文,使用jieba分词工具进行分词 words = list(jieba.cut(line)) # 定义分隔符为空字符(对于中文分词) separator = '' else: # 如果不包含中文,按空格分割行 words = line.split() # 定义分隔符为空格(对于英文或其他非中文语言) separator = ' ' # 初始化一个新的单词列表 new_words = [] # 初始化一个计数器 count = 0 # 遍历当前行的每一个单词 for word in words: # 将当前单词添加到新的单词列表 new_words.append(word) # 计数器增加 count += 1 # 检查是否达到了插入句子的间隔 if count % interval == 0: # 在达到指定间隔时,将要插入的句子添加到新的单词列表 new_words.append(sentence) # 将新的单词列表连接起来,并添加到新的行列表 new_lines.append(separator.join(new_words)) # 将新的行列表连接起来,返回结果 return '\n'.join(new_lines)
- 论文审稿类:定义了一个Reviewer类,包含以下功能:
第一阶段审稿:先是基于论文标题和摘要,选择要审稿的部分
然后分别实现两个函数# 定义Reviewer类 class Reviewer: # 初始化方法,设置属性 def __init__(self, args=None): if args.language == 'en': self.language = 'English' elif args.language == 'zh': self.language = 'Chinese' else: self.language = 'Chinese' # 创建一个ConfigParser对象 self.config = configparser.ConfigParser() # 读取配置文件 self.config.read('apikey.ini') # 获取某个键对应的值 self.chat_api_list = self.config.get('OpenAI', 'OPENAI_API_KEYS')[1:-1].replace('\'', '').split(',') self.chat_api_list = [api.strip() for api in self.chat_api_list if len(api) > 5] self.cur_api = 0 self.file_format = args.file_format self.max_token_num = 4096 self.encoding = tiktoken.get_encoding("gpt2") def validateTitle(self, title): # 修正论文的路径格式 rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |' new_title = re.sub(rstr, "_", title) # 替换为下划线 return new_title
一个stage_1,主要功能是为了与GPT-3模型进行对话,获取模型对于文章的两个最关键部分的选择意见
一个chat_review,主要功能是调用GPT-3模型进行论文审稿,对输入的文章文本进行审查,并按照预定格式生成审稿意见def stage_1(self, paper): # 初始化一个空列表,用于存储生成的HTML内容 htmls = [] # 初始化一个空字符串,用于存储文章的标题和摘要 text = '' # 添加文章的标题 text += 'Title: ' + paper.title + '. ' # 添加文章的摘要 text += 'Abstract: ' + paper.section_texts['Abstract'] # 计算文本的token数量 text_token = len(self.encoding.encode(text)) # 判断token数量是否超过最大token限制的一半减去800 if text_token > self.max_token_num/2 - 800: input_text_index = int(len(text)*((self.max_token_num/2)-800)/text_token) # 如果超出,则截取文本以满足长度要求 text = text[:input_text_index] # 设置OpenAI API的密钥 openai.api_key = self.chat_api_list[self.cur_api] # 更新当前使用的API索引 self.cur_api += 1 # 如果当前API索引超过API列表的长度,则重置为0 self.cur_api = 0 if self.cur_api >= len(self.chat_api_list)-1 else self.cur_api # 创建与GPT-3的对话消息 messages = [ {"role": "system", "content": f"You are a professional reviewer in the field of {args.research_fields}. " f"I will give you a paper. You need to review this paper and discuss the novelty and originality of ideas, correctness, clarity, the significance of results, potential impact and quality of the presentation. " f"Due to the length limitations, I am only allowed to provide you the abstract, introduction, conclusion and at most two sections of this paper." f"Now I will give you the title and abstract and the headings of potential sections. " f"You need to reply at most two headings. Then I will further provide you the full information, includes aforementioned sections and at most two sections you called for.\n\n" f"Title: {paper.title}\n\n" f"Abstract: {paper.section_texts['Abstract']}\n\n" f"Potential Sections: {paper.section_names[2:-1]}\n\n" f"Follow the following format to output your choice of sections:" f"{{chosen section 1}}, {{chosen section 2}}\n\n"}, {"role": "user", "content": text}, ] # 调用OpenAI API与GPT-3进行对话 response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, ) # 初始化一个空字符串,用于存储模型的回复 result = '' # 遍历模型的回复,将其添加到结果字符串中 for choice in response.choices: result += choice.message.content # 打印模型的回复 print(result) # 返回模型的回复,将其分割为多个部分 return result.split(',')
使用ChatGPT进行审稿,且有tenacity重试机制和更多的功能,其中review_by_chatgpt 调用了上面所示的两个函数,一个stage_1,一个chat_reviewdef chat_review(self, text): # 设置OpenAI API的密钥 openai.api_key = self.chat_api_list[self.cur_api] # 更新当前使用的API密钥索引 self.cur_api += 1 # 如果当前API密钥索引超过API密钥列表的长度,则将其重置为0 self.cur_api = 0 if self.cur_api >= len(self.chat_api_list)-1 else self.cur_api # 定义用于审稿提示的token数量 review_prompt_token = 1000 # 计算输入文本的token数量 text_token = len(self.encoding.encode(text)) # 计算输入文本的截取位置 input_text_index = int(len(text)*(self.max_token_num-review_prompt_token)/text_token) # 截取文本并添加前缀 input_text = "This is the paper for your review:" + text[:input_text_index] # 从'ReviewFormat.txt'文件中读取审稿格式 with open('ReviewFormat.txt', 'r') as file: review_format = file.read() # 创建与GPT-3的对话消息 messages=[ {"role": "system", "content": "You are a professional reviewer in the field of "+args.research_fields+". Now I will give you a paper. You need to give a complete review opinion according to the following requirements and format:"+ review_format +" Please answer in {}.".format(self.language)}, {"role": "user", "content": input_text}, ] # 调用OpenAI API与GPT-3进行对话 response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, ) # 初始化一个空字符串,用于存储模型的回复 result = '' # 遍历模型的回复,将其添加到结果字符串中 for choice in response.choices: result += choice.message.content # 在结果中插入特定的句子,警告不允许复制 result = insert_sentence(result, '**Generated by ChatGPT, no copying allowed!**', 15) # 追加伦理声明 result += "\n\n⚠伦理声明/Ethics statement:\n--禁止直接复制生成的评论用于任何论文审稿工作!\n--Direct copying of generated comments for any paper review work is prohibited!" # 打印分隔符和结果 print("********"*10) print(result) print("********"*10) # 打印相关的token使用信息和响应时间 print("prompt_token_used:", response.usage.prompt_tokens) print("completion_token_used:", response.usage.completion_tokens) print("total_token_used:", response.usage.total_tokens) print("response_time:", response.response_ms/1000.0, 's') # 返回模型生成的审稿意见 return result
def review_by_chatgpt(self, paper_list): # 创建一个空列表用于存储每篇文章审稿后的HTML格式内容 htmls = [] # 遍历paper_list中的每一篇文章 for paper_index, paper in enumerate(paper_list): # 使用第一阶段审稿方法选择文章的关键部分 sections_of_interest = self.stage_1(paper) # 初始化一个空字符串用于提取文章的主要部分 text = '' # 添加文章的标题 text += 'Title:' + paper.title + '. ' # 添加文章的摘要 text += 'Abstract: ' + paper.section_texts['Abstract'] # 查找并添加“Introduction”部分 intro_title = next((item for item in paper.section_names if 'ntroduction' in item.lower()), None) if intro_title is not None: text += 'Introduction: ' + paper.section_texts[intro_title] # 同样地,查找并添加“Conclusion”部分 conclusion_title = next((item for item in paper.section_names if 'onclusion' in item), None) if conclusion_title is not None: text += 'Conclusion: ' + paper.section_texts[conclusion_title] # 遍历sections_of_interest,添加其他感兴趣的部分 for heading in sections_of_interest: if heading in paper.section_names: text += heading + ': ' + paper.section_texts[heading] # 使用ChatGPT进行审稿,并得到审稿内容 chat_review_text = self.chat_review(text=text) # 将审稿的文章编号和内容添加到htmls列表中 htmls.append('## Paper:' + str(paper_index+1)) htmls.append('\n\n\n') htmls.append(chat_review_text) # 获取当前日期和时间,并转换为字符串格式 date_str = str(datetime.datetime.now())[:13].replace(' ', '-') try: # 创建输出文件夹 export_path = os.path.join('./', 'output_file') os.makedirs(export_path) except: # 如果文件夹已存在,则不执行任何操作 pass # 如果是第一篇文章,则写模式为'w',否则为'a' mode = 'w' if paper_index == 0 else 'a' # 根据文章标题和日期生成文件名 file_name = os.path.join(export_path, date_str+'-'+self.validateTitle(paper.title)+"."+self.file_format) # 将审稿内容导出为Markdown格式并保存 self.export_to_markdown("\n".join(htmls), file_name=file_name, mode=mode) # 清空htmls列表,为下一篇文章做准备 htmls = []
- 主程序部分:
定义了一个chat_reviewer_main 函数,该函数创建了一个Reviewer对象,并对指定路径中的PDF文件进行审稿
主程序中定义了命令行参数解析,并调用了chat_reviewer_main 函数def chat_reviewer_main(args): reviewer1 = Reviewer(args=args) # 开始判断是路径还是文件: paper_list = [] if args.paper_path.endswith(".pdf"): paper_list.append(Paper(path=args.paper_path)) else: for root, dirs, files in os.walk(args.paper_path): print("root:", root, "dirs:", dirs, 'files:', files) #当前目录路径 for filename in files: # 如果找到PDF文件,则将其复制到目标文件夹中 if filename.endswith(".pdf"): paper_list.append(Paper(path=os.path.join(root, filename))) print("------------------paper_num: {}------------------".format(len(paper_list))) [print(paper_index, paper_name.path.split('\\')[-1]) for paper_index, paper_name in enumerate(paper_list)] reviewer1.review_by_chatgpt(paper_list=paper_list)
在主程序中增加了审稿时间的计算功能if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--paper_path", type=str, default='', help="path of papers") parser.add_argument("--file_format", type=str, default='txt', help="output file format") parser.add_argument("--research_fields", type=str, default='computer science, artificial intelligence and reinforcement learning', help="the research fields of paper") parser.add_argument("--language", type=str, default='en', help="output lauguage, en or zh") reviewer_args = ReviewerParams(**vars(parser.parse_args())) start_time = time.time() chat_reviewer_main(args=reviewer_args) print("review time:", time.time() - start_time)
当然,这个项目的论文审稿部分更多是用的ChatGPT的API审稿,我司在API的基础上进一步做了微调的工作,比如如何通过论文审阅语料微调出一个论文审稿GPT(甚至通过10万量级的paper+review语料微调/训练),详见我司的「大模型项目开发线下营」
1.2 PDF解析:ChatPaper/scipdf_parser-master/
1.2.1 ChatPaper/scipdf_parser-master/scipdf/pdf/parse_pdf.py
- 导入必要的库
re: 正则表达式库,用于匹配和处理字符串
os 和 os.path: 操作文件和路径的库
glob: 搜索文件的库
urllib: 用于处理和获取 URL
subprocess: 执行外部命令和程序的库
requests: 用于发送 HTTP 请求的库
BeautifulSoup 和 NavigableString: 从 bs4 导入,用于解析和操作 XML/HTML 内容
tqdm 和 tqdm_notebook: 提供进度条功能 - 定义常量
GROBID_URL: GROBID 是一个开源软件,可以从 PDF 文件中提取和解析学术出版物的结构化信息
PDF_FIGURES_JAR_PATH: 这是指向某个 jar 文件的路径,但这段代码中并没有用到这个常量 - 函数 list_pdf_paths: 返回给定文件夹中所有 PDF 文件的路径
- 函数 validate_url: 通过正则表达式验证给定的路径是否为有效的 URL
def validate_url(path: str): """ 验证给定的``path``是否为URL """ # 定义正则表达式以匹配URL # 下面的正则表达式主要匹配了以下几部分: # 1. http:// 或 https:// 开头 # 2. 域名 (例如:example.com) # 3. localhost (本地主机) # 4. IP地址 (例如:192.168.1.1) # 5. 可选的端口号 (例如::80) # 6. 路径或者查询字符串 regex = re.compile( r"^(?:http|ftp)s?://" # http:// or https:// 开头 # 域名部分 r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|" r"localhost|" # localhost 部分 r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # IP地址部分 r"(?::\d+)?" # 可选的端口号部分 r"(?:/?|[/?]\S+)$", # 路径或查询字符串部分 re.IGNORECASE, # 忽略大小写 ) # 使用上述正则表达式匹配给定的path,如果匹配成功则返回True,否则返回False return re.match(regex, path) is not None
- 函数 parse_pdf:
这是代码中的核心功能,用 GROBID 服务从 PDF 文档中解析 XML 或 BeautifulSoup 格式的信息
如果 fulltext 参数为 True,则解析整篇文章;否则,只解析标题
可以从本地或云端的 GROBID 服务中获取数据def parse_pdf( pdf_path: str, fulltext: bool = True, soup: bool = False, return_coordinates: bool = True, grobid_url: str = GROBID_URL, ): """ 使用GROBID工具将PDF解析为XML或BeautifulSoup 可以查看http://grobid.readthedocs.io/en/latest/Install-Grobid/了解如何本地运行GROBID 加载GROBID zip文件后,可以使用以下方法运行GROBID >> ./gradlew run 参数 ========== pdf_path: str 或 bytes,出版物、文章的路径、URL或PDF的字节字符串 fulltext: bool, 解析选项,如果为True,解析文章的全部文本 如果为False,只解析头部 grobid_url: str, GROBID解析器的url,默认为'http://localhost:8070' 可以更改为"https://cloud.science-miner.com/grobid/"使用云服务 soup: bool, 如果为True,返回文章的BeautifulSoup 输出 ====== parsed_article: 如果soup为False,则返回文本格式的解析后的XML, 否则返回XML的BeautifulSoup 示例 ======= >> parsed_article = parse_pdf(pdf_path, fulltext=True, soup=True) """ # GROBID的URL if fulltext: url = "%s/api/processFulltextDocument" % grobid_url # 完整文本处理URL else: url = "%s/api/processHeaderDocument" % grobid_url # 仅处理头部的URL files = [] if return_coordinates: # 如果需要返回坐标 files += [ ("teiCoordinates", (None, "persName")), ("teiCoordinates", (None, "figure")), ("teiCoordinates", (None, "ref")), ("teiCoordinates", (None, "formula")), ("teiCoordinates", (None, "biblStruct")), ] if isinstance(pdf_path, str): # 如果pdf_path是字符串 if validate_url(pdf_path) and op.splitext(pdf_path)[-1].lower() != ".pdf": print("输入的URL必须以``.pdf``结尾") parsed_article = None elif validate_url(pdf_path) and op.splitext(pdf_path)[-1] == ".pdf": page = urllib.request.urlopen(pdf_path).read() # 从URL下载PDF parsed_article = requests.post(url, files={"input": page}).text # 通过GROBID处理下载的PDF elif op.exists(pdf_path): # 如果pdf_path是文件路径 parsed_article = requests.post( url, files={"input": open(pdf_path, "rb")} ).text # 通过GROBID处理文件 else: parsed_article = None elif isinstance(pdf_path, bytes): # 如果pdf_path是字节 # 假设传入的是字节字符串 parsed_article = requests.post(url, files={"input": pdf_path}).text # 通过GROBID处理字节 else: parsed_article = None if soup and parsed_article is not None: # 如果需要返回BeautifulSoup对象 parsed_article = BeautifulSoup(parsed_article, "lxml") return parsed_article
- 函数 parse_authors: 从 BeautifulSoup 文章对象中提取作者信息
def parse_authors(article): """ Parse authors from a given BeautifulSoup of an article """ # 从文章的 BeautifulSoup 对象中查找包含作者信息的 "sourcedesc" 标签,然后找到其中所有的 "persname" 标签 author_names = article.find("sourcedesc").findAll("persname") # 创建一个空列表,用于保存解析的作者名字 authors = [] # 遍历每个作者标签 for author in author_names: # 查找作者的名字,并进行处理,如果不存在则返回空字符串 firstname = author.find("forename", {"type": "first"}) firstname = firstname.text.strip() if firstname is not None else "" # 查找作者的中间名,并进行处理,如果不存在则返回空字符串 middlename = author.find("forename", {"type": "middle"}) middlename = middlename.text.strip() if middlename is not None else "" # 查找作者的姓氏,并进行处理,如果不存在则返回空字符串 lastname = author.find("surname") lastname = lastname.text.strip() if lastname is not None else "" # 判断中间名是否存在,然后将名、中间名和姓组合在一起 if middlename is not "": authors.append(firstname + " " + middlename + " " + lastname) else: authors.append(firstname + " " + lastname) # 使用"; "连接所有的作者名,生成一个字符串 authors = "; ".join(authors) # 返回最终的作者名字符串 return authors
- 函数 parse_date: 从 BeautifulSoup 文章对象中提取出版日期
def parse_date(article): """ Parse date from a given BeautifulSoup of an article """ # 从文章的 BeautifulSoup 对象中查找包含出版日期信息的 "publicationstmt" 标签 pub_date = article.find("publicationstmt") # 在 "publicationstmt" 标签下查找 "date" 标签 year = pub_date.find("date") # 尝试获取 "date" 标签的 "when" 属性,如果标签不存在则返回空字符串 year = year.attrs.get("when") if year is not None else "" # 返回解析出的年份 return year
- 函数 parse_abstract: 从 BeautifulSoup 文章对象中提取摘要
def parse_abstract(article): """ Parse abstract from a given BeautifulSoup of an article """ # 从文章的 BeautifulSoup 对象中查找 "abstract" 标签 div = article.find("abstract") # 初始化摘要字符串为空 abstract = "" # 遍历 "abstract" 标签下的所有直接子节点 for p in list(div.children): # 如果子节点不是纯文本(NavigableString)且子节点的子元素数量大于0 if not isinstance(p, NavigableString) and len(list(p)) > 0: # 将子节点下的所有非纯文本子元素的文本内容加入摘要字符串 abstract += " ".join( [elem.text for elem in p if not isinstance(elem, NavigableString)] ) # 返回解析出的摘要 return abstract
- 函数 calculate_number_of_references: 计算给定部分中的引用数量
def calculate_number_of_references(div): """ 对于给定的章节,计算章节中的参考文献数量 """ # 计算给定章节中的文献引用数量 n_publication_ref = len( # 列表推导式查找所有type属性为"bibr"的"ref"标签 [ref for ref in div.find_all("ref") if ref.attrs.get("type") == "bibr"] ) # 计算给定章节中的图形引用数量 n_figure_ref = len( # 列表推导式查找所有type属性为"figure"的"ref"标签 [ref for ref in div.find_all("ref") if ref.attrs.get("type") == "figure"] ) # 返回一个字典,包含文献引用数量和图形引用数量 return {"n_publication_ref": n_publication_ref, "n_figure_ref": n_figure_ref}
- 函数 parse_sections:
从 BeautifulSoup 文章对象中提取文章的各个部分或段落
它还计算每个部分中的引用数量def parse_sections(article, as_list: bool = False): """ 从给定的BeautifulSoup文章中解析章节列表 参数 ========== as_list: bool, 如果为True,则将输出文本作为段落列表, 而不是将其连接成一个单一的文本 """ # 找到文章中的"text"部分 article_text = article.find("text") # 获取所有带有特定属性的"div"标签 divs = article_text.find_all("div", attrs={"xmlns": "http://www.tei-c.org/ns/1.0"}) sections = [] # 初始化章节列表 for div in divs: div_list = list(div.children) if len(div_list) == 0: heading = "" text = "" elif len(div_list) == 1: # 如果只有一个子元素 if isinstance(div_list[0], NavigableString): heading = str(div_list[0]) text = "" else: heading = "" text = div_list[0].text else: text = [] heading = div_list[0] if isinstance(heading, NavigableString): heading = str(heading) p_all = list(div.children)[1:] else: heading = "" p_all = list(div.children) for p in p_all: if p is not None: try: text.append(p.text) # 尝试添加文本 except: pass if not as_list: text = "\n".join(text) # 如果标题或文本不为空 if heading is not "" or text is not "": # 计算参考文献数量 ref_dict = calculate_number_of_references(div) sections.append( { "heading": heading, "text": text, "n_publication_ref": ref_dict["n_publication_ref"], "n_figure_ref": ref_dict["n_figure_ref"], } ) return sections
- parse_references(article):
功能:从给定的BeautifulSoup对象中解析文献引用列表
主要步骤:
寻找包含引用的部分
对于每个引用,提取文章标题、期刊、发布日期和作者信息
返回包含所有引用信息的列表def parse_references(article): """ 从给定的BeautifulSoup文章中解析引用列表 """ reference_list = [] # 初始化引用列表 # 在文章中查找文本部分中的引用部分 references = article.find("text").find("div", attrs={"type": "references"}) # 如果存在引用,则查找所有的"biblstruct"标签,否则返回空列表 references = references.find_all("biblstruct") if references is not None else [] reference_list = [] # 再次初始化引用列表 for reference in references: # 尝试查找引用的文章标题 title = reference.find("title", attrs={"level": "a"}) if title is None: title = reference.find("title", attrs={"level": "m"}) title = title.text if title is not None else "" # 尝试查找引用的期刊名 journal = reference.find("title", attrs={"level": "j"}) journal = journal.text if journal is not None else "" if journal is "": journal = reference.find("publisher") journal = journal.text if journal is not None else "" # 查找引用的出版年份 year = reference.find("date") year = year.attrs.get("when") if year is not None else "" authors = [] # 初始化作者列表 # 遍历引用中的所有作者 for author in reference.find_all("author"): firstname = author.find("forename", {"type": "first"}) firstname = firstname.text.strip() if firstname is not None else "" middlename = author.find("forename", {"type": "middle"}) middlename = middlename.text.strip() if middlename is not None else "" lastname = author.find("surname") lastname = lastname.text.strip() if lastname is not None else "" # 根据是否有中间名来组合作者的全名 if middlename is not "": authors.append(firstname + " " + middlename + " " + lastname) else: authors.append(firstname + " " + lastname) authors = "; ".join(authors) # 将所有作者连接为一个字符串 # 将标题、期刊、年份和作者添加到引用列表中 reference_list.append( {"title": title, "journal": journal, "year": year, "authors": authors} ) return reference_list # 返回引用列表
- parse_figure_caption(article):
功能:从给定的BeautifulSoup对象中解析图形和表格
主要步骤:
搜索所有图形
对于每个图形或表格,提取标签、类型、ID、标题和数据
返回包含所有图形/表格信息的列表def parse_figure_caption(article): """ 从给定的BeautifulSoup文章中解析图表列表 """ figures_list = [] # 初始化图表列表 # 在文章中查找所有的"figure"标签 figures = article.find_all("figure") for figure in figures: # 获取图标的类型(可能是图或表)和ID figure_type = figure.attrs.get("type") or "" figure_id = figure.attrs.get("xml:id") or "" # 获取图标的标签(如"图1") label = figure.find("label").text if figure_type == "table": # 如果图形类型为表,则获取表的标题和数据 caption = figure.find("figdesc").text data = figure.table.text else: # 否则,只获取图形的标题,并将数据设置为空字符串 caption = figure.text data = "" # 将标签、类型、ID、标题和数据添加到图形列表中 figures_list.append( { "figure_label": label, "figure_type": figure_type, "figure_id": figure_id, "figure_caption": caption, "figure_data": data, } ) return figures_list # 返回图表列表
- parse_formulas(article):
功能:从给定的BeautifulSoup对象中解析公式
主要步骤:
搜索所有公式
提取公式的ID、文本和坐标
返回包含所有公式信息的列表def parse_formulas(article): """ 从给定的BeautifulSoup文章中解析公式列表 """ formulas_list = [] # 初始化公式列表 # 在文章中查找所有的"formula"标签 formulas = article.find_all("formula") for formula in formulas: # 获取公式的ID formula_id = formula.attrs["xml:id"] or "" # 获取公式的文本内容 formula_text = formula.text # 尝试获取公式的坐标 formula_coordinates = formula.attrs.get("coords") or "" if formula_coordinates is not "": # 如果有坐标,将它们转换为浮点数列表 formula_coordinates = [float(x) for x in formula_coordinates.split(",")] # 将ID、文本和坐标添加到公式列表中 formulas_list.append( { "formula_id": formula_id, "formula_text": formula_text, "formula_coordinates": formula_coordinates, } ) return formulas_list # 返回公式列表
- convert_article_soup_to_dict(article, as_list=False):
功能:将BeautifulSoup对象转换为JSON格式的字典,类似于某些开源项目的输出
主要步骤:
提取文章的标题、作者、发布日期、摘要、部分、引用、图形和公式
返回一个包含所有这些信息的字典def convert_article_soup_to_dict(article, as_list: bool = False): """ 将BeautifulSoup对象转换为JSON格式的函数 与https://github.com/allenai/science-parse/ 的输出类似 参数 ========== article: BeautifulSoup 输出 ====== article_json: dict, 给定文章的解析字典,格式如下: { 'title': ..., 'abstract': ..., 'sections': [ {'heading': ..., 'text': ...}, {'heading': ..., 'text': ...}, ... ], 'references': [ {'title': ..., 'journal': ..., 'year': ..., 'authors': ...}, {'title': ..., 'journal': ..., 'year': ..., 'authors': ...}, ... ], 'figures': [ {'figure_label': ..., 'figure_type': ..., 'figure_id': ..., 'figure_caption': ..., 'figure_data': ...}, ... ] } """ article_dict = {} # 初始化文章字典 if article is not None: # 从文章中获取主标题 title = article.find("title", attrs={"type": "main"}) title = title.text.strip() if title is not None else "" article_dict["title"] = title # 解析文章的作者 article_dict["authors"] = parse_authors(article) # 解析文章的发布日期 article_dict["pub_date"] = parse_date(article) # 解析文章的摘要 article_dict["abstract"] = parse_abstract(article) # 解析文章的各个部分 article_dict["sections"] = parse_sections(article, as_list=as_list) # 解析文章的参考文献 article_dict["references"] = parse_references(article) # 解析文章的图表 article_dict["figures"] = parse_figure_caption(article) # 解析文章的公式 article_dict["formulas"] = parse_formulas(article) # 从文章中获取DOI doi = article.find("idno", attrs={"type": "DOI"}) doi = doi.text if doi is not None else "" article_dict["doi"] = doi return article_dict else: return None # 如果文章不存在,返回None
- parse_pdf_to_dict(...):功能:解析给定的PDF并返回解析后的文章的字典
主要步骤:
使用外部工具或服务(如GROBID)解析PDF
将解析后的BeautifulSoup对象转换为字典格式
返回该字典
这个函数的目的是解析给定的PDF文件,并将其转换为一个结构化的字典。首先,它使用parse_pdf函数来解析PDF,然后使用convert_article_soup_to_dict函数将解析后的BeautifulSoup对象转换为字典def parse_pdf_to_dict( pdf_path: str, fulltext: bool = True, soup: bool = True, as_list: bool = False, return_coordinates: bool = True, grobid_url: str = GROBID_URL, ): """ 解析给定的PDF并返回解析后的文章字典 参数 ========== pdf_path: str, 出版物或文章的路径 fulltext: bool, 是否提取完整文本 soup: bool, 是否返回BeautifulSoup as_list: bool, 是否返回部分列表 return_coordinates: bool, 是否返回坐标 grobid_url: str, grobid服务器的url,默认为`GROBID_URL` 可更改为 "https://cloud.science-miner.com/grobid/" 使用云服务 输出 ===== article_dict: dict, 文章的字典 """ # 使用parse_pdf函数解析PDF parsed_article = parse_pdf( pdf_path, fulltext=fulltext, soup=soup, return_coordinates=return_coordinates, grobid_url=grobid_url, ) # 将BeautifulSoup对象转换为字典 article_dict = convert_article_soup_to_dict(parsed_article, as_list=as_list) return article_dict # 返回解析后的文章字典
- parse_figures(...):
功能:使用pdffigures2工具从给定的科学PDF中解析图形
主要步骤:
检查输出文件夹是否存在,如果不存在则创建它
在输出文件夹中创建子文件夹来保存数据和图形
使用Java运行pdffigures2工具解析图形
打印完成消息def parse_figures( pdf_folder: str, jar_path: str = PDF_FIGURES_JAR_PATH, resolution: int = 300, output_folder: str = "figures", ): """ 使用pdffigures2从给定的科学PDF中提取图形。 参数 ========== pdf_folder: str, 包含PDF文件的文件夹的路径。一个文件夹必须只包含PDF文件。 jar_path: str, pdffigures2-assembly-0.0.12-SNAPSHOT.jar文件的默认路径。 resolution: int, 输出图形的分辨率。 output_folder: str, 我们希望保存解析数据(与图形相关)和图形的文件夹的路径。 输出 ====== folder: 在output_folder/data和output_folder/figures中创建文件夹,分别包含解析数据和图形。 """ # 检查output_folder是否存在,如果不存在,则创建它。 if not op.isdir(output_folder): os.makedirs(output_folder) # 在output_folder内创建“data”和“figures”子文件夹。 data_path = op.join(output_folder, "data") figure_path = op.join(output_folder, "figures") if not op.exists(data_path): os.makedirs(data_path) if not op.exists(figure_path): os.makedirs(figure_path) # 如果data和figures文件夹存在,则执行pdffigures2命令。 if op.isdir(data_path) and op.isdir(figure_path): args = [ "java", "-jar", jar_path, pdf_folder, "-i", str(resolution), "-d", op.join(op.abspath(data_path), ""), "-m", op.join(op.abspath(figure_path), ""), # end path with "/" ] _ = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20 ) print("完成从PDFs中提取图形!") else: print( "您可能需要检查output文件夹路径中的``data``和``figures``。" )
1.3 论文检索:ChatPaper/auto_survey/utils
具体包含如下功能(这个基于GPT4的文献总结工具的项目auto-draft也提供类似的功能)
- 自动搜索相关文献, 提供真实有出处的引用
- 自动生成LaTeX格式,markdown格式的调研结果
1.3.1 /auto_survey/utils/knowledge_databases/ml_textbook_test
// 待更
1.3.2 /auto_survey/utils/embeddings.py
# 导入HuggingFace的文本嵌入功能
from langchain.embeddings import HuggingFaceEmbeddings
# 导入操作系统相关的模块,用于获取环境变量等操作
import os
# 从环境变量中获取OpenAI的API密钥
openai_api_key = os.getenv("OPENAI_API_KEY")
# 如果获取到了OpenAI的API密钥
if openai_api_key is not None:
# 导入OpenAI的文本嵌入功能
from langchain.embeddings.openai import OpenAIEmbeddings
# 使用获取到的API密钥初始化OpenAI的文本嵌入
openai_embedding = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=openai_api_key)
else:
# 如果没有获取到API密钥,则将OpenAI的文本嵌入设为None
openai_embedding = None
# 定义HuggingFace的模型名称
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
# 设置模型的参数,这里是将模型放在CPU上运行
model_kwargs = {'device': 'cpu'}
# 设置文本嵌入的参数,这里是不对嵌入进行归一化
encode_kwargs = {'normalize_embeddings': False}
# 使用上述参数初始化HuggingFace的文本嵌入
all_minilm_l6_v2 = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs)
# 创建一个字典来存储上述两种文本嵌入,方便后续调用
EMBEDDINGS = {"text-embedding-ada-002": openai_embedding, "all-MiniLM-L6-v2": all_minilm_l6_v2}
1.3.3 /auto_survey/utils/gpt_interaction.py
// 待更
1.3.4 /auto_survey/utils/knowledge.py
定义了一个Knowledge类,该类使用关键词字典从数据库中搜索相关内容,并可以将这些内容转化为提示文本或JSON格式
import tiktoken # 导入tiktoken模块,用于计算tokens数量
from random import shuffle # 从random模块导入shuffle函数,用于随机打乱列表
# 使用`tiktoken`来计算文本中的tokens数量
tokenizer_name = tiktoken.encoding_for_model('gpt-4') # 为"gpt-4"模型获取相应的编码器名称
tokenizer = tiktoken.get_encoding(tokenizer_name.name) # 获取编码器实例
def tiktoken_len(text):
# 计算给定文本中的tokens数量
tokens = tokenizer.encode(text, disallowed_special=()) # 对文本进行编码并返回tokens
return len(tokens) # 返回tokens的数量
class Knowledge:
# 定义一个Knowledge类来处理知识数据库相关操作
def __init__(self, db):
self.db = db # 数据库实例
self.contents = [] # 用于存放内容的列表
def collect_knowledge(self, keywords_dict, max_query):
"""
根据给定的关键词字典,从数据库中搜索并收集相关的知识。
keywords_dict:
示例: {"machine learning": 5, "language model": 2};
"""
db = self.db
if max_query > 0:
for kw in keywords_dict:
docs = db.similarity_search_with_score(kw, k=max_query) # 使用关键词在数据库中进行相似度搜索
for i in range(max_query):
content = {"content": docs[i][0].page_content.replace('\n', ' '), # 移除换行符
"score": docs[i][1]} # 为每个文档添加评分
self.contents.append(content) # 将内容添加到contents列表中
shuffle(self.contents) # 随机打乱contents列表
def to_prompts(self, max_tokens=2048):
# 将收集到的知识内容转化为提示文本,且tokens总数不超过max_tokens
if len(self.contents) == 0:
return ""
prompts = []
tokens = 0
for idx, content in enumerate(self.contents):
prompt = "Reference {}: {}\n".format(idx, content["content"])
tokens += tiktoken_len(prompt)
if tokens >= max_tokens:
break
else:
prompts.append(prompt) # 将提示文本添加到prompts列表中
return "".join(prompts) # 返回连接后的提示文本
def to_json(self):
# 将收集到的知识内容转化为JSON格式
if len(self.contents) == 0:
return {}
output = {}
for idx, content in enumerate(self.contents):
output[str(idx)] = {
"content": content["content"],
"score": str(content["score"])
}
print(output)
return output
1.3.5 /auto_survey/utils/references.py
这个代码文件主要注意实现了以下功能
第一部分:References
类之外
-
Reference类的说明
- 从给定的
.bib
文件中读取论文,并用search_paper_abstract
方法填充缺失的摘要 - 根据一些关键词使用Semantic Scholar API查找相关论文
- 从所选论文中生成Bibtex引用格式
- 从所选论文中生成提示(prompts)。示例提示格式为:
{"paper_id": "paper summary"}
。
- 从给定的
-
待完成的任务(todo)
- 加载预定义的论文;
- 使用Semantic Scholar API查找所有相关作品;
- 将所有引文添加到
bib_papers
; - 将所有被引文添加到
bib_papers
; - 使用Semantic Scholar查找它们的嵌入;
- 将引文分组以减少tokens的数量
-
一些基本的工具
- 计算两个向量的余弦相似性;
def evaluate_cosine_similarity(v1, v2): try: return np.dot(v1, v2)/(norm(v1)*norm(v2)) except ValueError: return 0.0
- 将一个较长的列表分割为较小的批次,以便于处理;
def chunks(lst, chunk_size=MAX_BATCH_SIZE): """Splits a longer list to respect batch size""" for i in range(0, len(lst), chunk_size): yield lst[i : i + chunk_size]
- 通过向Semantic Scholar的API发送请求,为一组论文计算嵌入(即将论文映射到一个向量空间中)
def embed(papers): embeddings_by_paper_id: Dict[str, List[float]] = {} for chunk in chunks(papers): # Allow Python requests to convert the data above to JSON response = requests.post(URL, json=chunk) if response.status_code != 200: raise RuntimeError("Sorry, something went wrong, please try later!") for paper in response.json()["preds"]: embeddings_by_paper_id[paper["paper_id"]] = paper["embedding"] return embeddings_by_paper_id
- 为给定的论文标题和描述获取嵌入
def get_embeddings(paper_title, paper_description): output = [{"title": paper_title, "abstract": paper_description, "paper_id": "target_paper"}] emb_vector = embed(output)["target_paper"] target_paper = output[0] target_paper["embeddings"] = emb_vector return target_paper
- 获取与给定论文最相关的
k
篇论文
具体而言,从提供的papers_dict 中找到与给定的paper_title和paper_description最相似的前k篇论文,并返回。至于相似性是通过计算两篇论文嵌入向量的余弦相似度来确定的def get_top_k(papers_dict, paper_title, paper_description, k=None): # 获取目标论文的嵌入向量 target_paper = get_embeddings(paper_title, paper_description) # 存放所有的论文信息,其中应包含嵌入向量 papers = papers_dict # 如果k小于papers的数量,返回k篇最相关的论文 # 如果k大于等于papers的数量或k为None,返回所有论文 max_num_papers = len(papers) # 获取论文总数 if k is None: # 如果k为None,设置k为论文总数 k = max_num_papers num_papers = min(k, max_num_papers) # 确定需要返回的论文数量 # 获取目标论文的嵌入向量 target_embedding_vector = target_paper["embeddings"] # 计算每篇论文与目标论文的余弦相似度 for k in papers: v = papers[k] embedding_vector = v["embeddings"] # 获取当前论文的嵌入向量 cos_sim = evaluate_cosine_similarity(embedding_vector, target_embedding_vector) # 计算余弦相似度 papers[k]["cos_sim"] = cos_sim # 存储余弦相似度到papers中 # 返回相似度最高的前k篇论文 sorted_papers = {k: v for k, v in sorted(papers.items(), key=lambda x: x[1]["cos_sim"], reverse=True)[:num_papers]} # 从返回的论文中移除嵌入向量信息 for key in sorted_papers: sorted_papers[key].pop("embeddings", None) return sorted_papers
- 去除摘要中的换行符,减少提示的长度
def remove_newlines(serie): # This function is applied to the abstract of each paper to reduce the length of prompts. serie = serie.replace('\n', ' ') serie = serie.replace('\\n', ' ') serie = serie.replace(' ', ' ') serie = serie.replace(' ', ' ') return serie
- 计算两个向量的余弦相似性;
-
从.bib文件加载论文信息
- 读取.bib文件,并将其解析为一个python对象;
- 遍历这个对象,从中提取论文的各种属性(如ID、标题、期刊、年份、作者、摘要等);
def load_papers_from_bibtex(bib_file_path): with open(bib_file_path) as bibtex_file: bib_database = bibtexparser.load(bibtex_file) if len(bib_database.entries) == 0: return [] else: bib_papers = [] for bibitem in bib_database.entries: # Add each paper to `bib_papers` paper_id = bibitem.get("ID") title = bibitem.get("title") if title is None: continue journal = bibitem.get("journal") year = bibitem.get("year") author = bibitem.get("author") abstract = bibitem.get("abstract") if abstract is None: abstract = search_paper_abstract(title) result = { "paper_id": paper_id, "title": title, "link": "", "abstract": abstract, "authors": author, "year": year, "journal": journal } bib_papers.append(result) return bib_papers
- 对于缺失摘要的论文,使用
search_paper_abstract
函数查询摘要def search_paper_abstract(title): pg = ProxyGenerator() success = pg.FreeProxies() # pg.ScraperAPI("921b16f94d701308b9d9b4456ddde155") if success: try: scholarly.use_proxy(pg) # input the title of a paper, return its abstract search_query = scholarly.search_pubs(title) found_paper = next(search_query) except: return "" else: return "" # raise RuntimeError("ScraperAPI fails.") return remove_newlines(found_paper['bib']['abstract'])
-
计算文本的tokens数量
- 使用
tokenizer
对象来计算给定文本的tokens的数量# `tokenizer`: used to count how many tokens tokenizer_name = tiktoken.encoding_for_model('gpt-4') tokenizer = tiktoken.get_encoding(tokenizer_name.name) def tiktoken_len(text): # evaluate how many tokens for the given text tokens = tokenizer.encode(text, disallowed_special=()) return len(tokens)
- 使用
-
使用Semantic Scholar (SS) API搜索论文
- 使用Semantic Scholar API搜索指定关键词的论文;
- 从API返回的数据中提取论文的各种属性
-
parse_search_results
函数这部分主要关于从搜索结果中提取学术论文的相关信息:
该函数的目的是对传入的搜索结果进行解析,并将其转换为一个论文信息列表。- 首先检查传入的搜索结果是否为空。
- 逐个解析每篇论文的内容,包括作者信息、年份、标题等。
- 对某些字段进行特殊处理,如将日志名中的
&
替换为\&
。 - 如果存在摘要的“tldr”(即“过长不读”)版本,它会被优先使用,否则会使用原始摘要。
- 最后,所有提取出的信息将被组合成一个字典并添加到结果列表中
且函数下方的代码调用了一个假设的ss_search
方法,然后使用上述函数处理这些搜索结果def parse_search_results(search_results_ss): # 判断搜索结果是否为空 if len(search_results_ss) == 0: return [] # 将搜索结果转换为论文字典的列表 papers_ss = [] for raw_paper in search_results_ss: # 如果论文没有摘要,跳过此论文 if raw_paper["abstract"] is None: continue # 提取作者信息 authors_str, last_name = extract_author_info(raw_paper['authors']) # 获取论文的发表年份 year_str = str(raw_paper['year']) # 获取论文标题 title = raw_paper['title'] # 有些期刊的名字可能包含"&"字符;将其替换掉 journal = raw_paper['venue'].replace("&", "\\&") # 如果没有提供期刊名,就默认为“arXiv preprint” if not journal: journal = "arXiv preprint" # 根据作者姓、发表年份和标题提取论文ID paper_id = extract_paper_id(last_name, year_str, title).lower() # 转换外部ID为链接 link = externalIds2link(raw_paper['externalIds']) # 如果存在tldr摘要,使用tldr摘要;否则,使用原始摘要并移除其中的换行符 if tldr and raw_paper['tldr'] is not None: abstract = raw_paper['tldr']['text'] else: abstract = remove_newlines(raw_paper['abstract']) # 有些论文可能没有嵌入;处理这种情况 embeddings_dict = raw_paper.get('embedding') if embeddings_dict is None: continue else: embeddings = raw_paper['embedding']['vector'] # 组合结果 result = { "paper_id": paper_id, "title": title, "abstract": abstract, "link": link, "authors": authors_str, "year": year_str, "journal": journal, "embeddings": embeddings } # 将结果添加到论文列表中 papers_ss.append(result) # 返回论文列表 return papers_ss # 使用关键字进行搜索 raw_results = ss_search(keyword, limit=counts) # 如果获取到了原始搜索结果 if raw_results is not None: # 提取搜索结果数据 search_results = raw_results.get("data") # 如果搜索结果是空的,设置为空列表 if search_results is None: search_results = [] # 如果没有获取到原始搜索结果,设置为空列表 else: search_results = [] # 解析搜索结果并返回 results = parse_search_results(search_results) return results
第二部分:References
类
该类用于管理论文引用:
-
初始化方法:当创建一个
References
对象时,可以选择为其提供标题、论文列表、关键词以及描述 -
load_papers
方法:加载给定BibTeX格式的论文到引用类中 -
generate_keywords_dict
方法:生成一个关键词字典,其中每个关键词都关联一个论文数量 -
collect_papers
方法:使用给定的关键词字典收集尽可能多的论文。这个方法尝试收集给定关键词的相关论文,并添加到类的内部存储中 -
to_bibtex
方法:将保存的论文列表转换为BibTeX格式的文件 -
_get_papers
方法:一个内部方法,用于从内部存储中获取论文列表 -
to_prompts
方法:将引用转换为提示格式,这可能是为了后续使用某种机器学习模型 -
to_json
方法:将论文列表转换为JSON格式 -
代码的最后部分(在
if __name__ == "__main__":
之后)是一个简单的测试部分,用于测试上述代码的功能
//待更