项目简介:

小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。

本次介绍的是如何在亚马逊云科技机器学习托管服务Amazon SageMaker上搭建一个多模态LangChain Agent,通过ReAct逻辑让Agent通过Amazon Bedrock AI模型托管服务上的大模型对用户问题进行推理决定如何执行操作,通过调用一系列亚马逊云科技AI服务,如Amazon Textract、Amazon Transcribe、Amazon Comprehend对多模态数据进行分析处理,最后利用Streamlit框架开发与用户交互的UI界面。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:

在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台-LMLPHP

方案所需基础知识 

什么是 Amazon SageMaker?

Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了全面的工具,从数据准备、模型训练到部署和监控,覆盖了机器学习项目的全生命周期。通过 SageMaker,用户可以加速机器学习模型的开发和上线,并确保模型在生产环境中的稳定性和性能。

什么是 LangChain Agent?

LangChain Agent 是 LangChain 框架中的一个强大组件,它允许开发者创建具备自主决策能力的 AI 应用程序。通过 LangChain Agent,AI 可以根据预定义的策略和逻辑自动选择合适的工具或模型来处理任务,并在任务执行过程中进行推理和调整。这使得开发者能够构建更复杂、更智能的系统,可以处理多步骤任务并在不同场景中作出动态响应。

在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台-LMLPHP

什么是 ReAct 逻辑?

ReAct 逻辑(Reasoning and Acting)是一种结合了推理(Reasoning)和行动(Acting)的决策机制,用于增强 AI 系统的智能化行为。在 ReAct 逻辑下,AI 系统不仅会执行预定的任务,还会在任务执行过程中进行推理,分析当前的情境,并动态调整接下来的步骤。通过这种逻辑,AI 能够更好地应对复杂的环境和多变的任务需求。

为什么利用 LangChain Agent 分析多模态数据?

处理复杂数据类型

多模态数据通常包含文本、图像、音频、视频等多种类型的信息。LangChain Agent 可以根据 ReAct 逻辑自主选择不同的模型和工具来处理不同模态的数据,实现对多模态数据的全面分析和理解。

提高分析准确性

通过结合多个数据源的内容,LangChain Agent 能够更准确地推断和生成答案。比如,在分析一段视频时,Agent 可以同时处理转录的文本、图像帧和音频信息,从而提供更全面的分析结果。

灵活的决策能力

利用 ReAct 逻辑,LangChain Agent 可以在多模态数据分析过程中根据实时反馈调整策略,确保分析过程中的每一步都能最大程度地利用数据。这种灵活性特别适合处理数据量大、信息复杂的任务。

自动化复杂任务

LangChain Agent 可以自主进行任务分解和工具选择,自动化执行多步骤分析任务。例如,Agent 可以先从视频中提取音频,转录为文本,然后结合图像分析,最终生成全面的报告或回答复杂的问题。

本方案包括的内容

1. 利用亚马逊云科技AI系列服务处理、分析云端金融原始数据

2. 开发一个LangChain Agent,根据用户需求使用ReAct逻辑判断、选择合适的工具完成数据分析任务

3. 利用Streamlit框架开发金融分析数据中台网页应用

项目搭建具体步骤

1. 进入亚马逊云科技控制台,进入Amazon Bedrock服务主页,确认"Mistral Large"大模型是开启状态。并进入DynamoDB,创建一个表,命名为”chat_history_3c3c6530“,该表用于存储数据中台分析行为的记录。

在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台-LMLPHP

2. 接下来进入SageMaker服务,进入Studio页面,点击Open打开Jupyter Notebook开发环境。

在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台-LMLPHP

3. 进入到Jupyter Notebook,创建一个新的ipynb文件,安装和导入必要的依赖。

%%writefile notebook-requirements.txt
boto3
langchain==0.1.20
langchain_experimental
PyAthena[SQLAlchemy] ==3.8.3
sqlalchemy==2.0.27
pandas<2.0.0
numpy==1.24.0
nest-asyncio==1.5.5
PyPortfolioOpt
langchain-aws

!pip install -q -r notebook-requirements.txt

import json
import boto3
import datetime
import pandas as pd

region = 'us-east-1'

其中requirements.txt文件内容如下:

langchain==0.1.20
langchain_experimental
PyAthena[SQLAlchemy] ==3.8.3
sqlalchemy==2.0.27
PyPortfolioOpt
streamlit
bs4
boto3
langchainhub

4. 接下来运行以下代码,通过Python SDK boto3获取云端向量库Kendra的索引、数据源信息。以及存放数据源的S3存储桶名。

# Retrieve the Amazon Kendra index ID and data source ID.
kendra_client = boto3.client('kendra')

kendra_indexes = kendra_client.list_indices()
index = next(item for item in kendra_indexes['IndexConfigurationSummaryItems'] if item["Name"] == "kendra-index")
kendra_index_id = index['Id']
print (f"kendra_index_id is {kendra_index_id}.")


kendra_data_sources = kendra_client.list_data_sources(
    IndexId=kendra_index_id
)
data_source = next(item for item in kendra_data_sources['SummaryItems'] if item["Name"] == "kendra-data-source")
kendra_data_source_id = data_source['Id']

print (f"kendra_data_source_id is {kendra_data_source_id}.")


# Retrieve the S3 bucket names.

s3_client = boto3.client('s3')

data_source_bucket = ''
athena_result_bucket = ''
multimodal_output_bucket = ''

buckets = s3_client.list_buckets()['Buckets']
for bucket in buckets:
    bucket_name = bucket['Name']
    if 'data-source' in bucket_name:
        data_source_bucket = bucket_name
    if 'athena-query' in bucket_name:
        athena_result_bucket = bucket_name
    if 'multi-modal' in bucket_name:
        multimodal_output_bucket = bucket_name

print(f"data_source_bucket is {data_source_bucket}.")
print(f"athena_result_bucket is {athena_result_bucket}.")
print(f"multimodal_output_bucket is {multimodal_output_bucket}.")

 5. 接下来通过Python Boto3 SDK调用亚马逊云科技云端SQL数据分析服务Athena,运行表删除和新表”stock-prices-db“创建命令,并定义新表结构。

glue_db_name = 'stock-prices-db'

athena_client = boto3.client('athena')

def query_athena(query):
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': glue_db_name
        },
        ResultConfiguration={
            'OutputLocation': f"s3://{athena_result_bucket}/",
        },
        WorkGroup='primary'
    )
    print(response)

drop_table_query='DROP TABLE `stock_prices`;'
query_athena(drop_table_query)


create_table_query=f"""
CREATE EXTERNAL TABLE IF NOT EXISTS `stock-prices-db`.stock_prices ( 
    date string, 
    XLYT double, 
    ZQRS double, 
    VBMP double, 
    KLXE double, 
    JHGN double, 
    QPZT double, 
    MNBV double, 
    LKDI double, 
    PYXC double, 
    FGHT double, 
    RSTU double, 
    YXCV double, 
    QWER double, 
    PLKJ double, 
    HGFD double, 
    SEDC double, 
    FVGB double, 
    HNJM double, 
    YTRE double
)

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES ('separatorChar' = ',', 'quoteChar' = '\\\"', 'escapeChar' = '\\\\')
LOCATION 's3://{data_source_bucket}/stock_prices'
TBLPROPERTIES ('skip.header.line.count'='1')
"""
query_athena(create_table_query)

6. 下面我们初始化Amazon Bedrock客户端,并指明Mistral Large大模型id。

# Create an Amazon Bedrock runtime to invoke the LLM.
from langchain_community.llms import Bedrock

bedrock_runtime = boto3.client(service_name='bedrock-runtime')

model_id = "mistral.mistral-large-2402-v1:0"

7. 接下来我们利用SQLAlchemy库创建一个用于调用Athena数据库的连接。接下来创建一个tool工具,定义函数”stock_query“,用于根据用户的问题访问Athena调用数据库执行SQL查询语句,并获取数据回复用户的问题。

import sqlalchemy
from sqlalchemy import create_engine
from langchain import PromptTemplate,SQLDatabase, LLMChain
from langchain_experimental.sql.base import SQLDatabaseChain

# Specify the parameters for the database connection to Athena.
table = 'stock_prices' # You created this table in the prior code cell.

connathena=f"athena.{region}.amazonaws.com" 
portathena='443' 
schemaathena=glue_db_name 
s3stagingathena=f's3://{athena_result_bucket}/athenaresults/'
wkgrpathena='primary'

##  Create the Athena connection string.
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}&work_group={wkgrpathena}"

##  Create the Athena  SQLAlchemy engine.
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)

from langchain.tools import tool
from langchain_aws import BedrockLLM

# Using the @tool decorator is the simplest way to define a custom tool. 
# By default, the decorator uses the function name as the tool name, but this can be overridden by passing a string as the first argument. 

@tool
def stock_query(query):
    # The decorator will use the function's docstring as the tool's description, so a docstring MUST be provided.

    """Use for answering questions about stocks. It only has information about stocks in table 'stock_prices'.
    This tool accepts only questions as input. Example:  What is average price of JJJ in 2014?"""
    
    sql_template = """
    <s>[INST]
    In one single line, run a SQL query to answer the question. If the answer includes stock prices, format it correctly.
    If a question ask for "closing prices", it should be the last available date in the given time period. 
    For example if the time period in the question is 2020, then the closing price is the price for the last available date in 2020.    
    
    Here is a schema of a table:
    <schema>
    {table_info}
    </schema>    
    Here is question: {input}
    [/INST]"""
    
    model_kwargs = {
        "max_tokens": 250,
        "stop": ["\n"],        
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
    llm = BedrockLLM(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs,
        streaming=False
    )
    
    PROMPT_sql = PromptTemplate(
        input_variables=["input", "table_info"], template=sql_template
    )
    
    db_chain = SQLDatabaseChain.from_llm(llm, dbathena, prompt=PROMPT_sql)
    
    response=db_chain.invoke(query)
    return response


# Test the run_query tool
stock_query('What are the maximum prices of stocks  RSTU, YXCV, QWER in year 2017?')

8. 运行以下代码,定义一个用于生成投资建议的LangChain tool工具”PortfolioOptimizerTool“。

from functools import reduce
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from typing import List, Optional
# Import things that are needed generically.
from langchain.tools import BaseTool


# You can also explicitly define a custom tool by subclassing the BaseTool class. 

class PortfolioOptimizerTool(BaseTool): 
    
    name = "portfolio_optimizer"
    description = """
        use this tool when you need to build optimal portfolio. 
        The output results tell you the allocation of your money on each stock.
        No need to pass stock prices to this tool.
        This tool only accept stock in this format "stock_ls":["list of stocks"]. Example "stock_ls":["xyz","abc"].
        """

    def _run(self, stocks: dict):
        """Use the tool."""

        import boto3
        import pandas as pd
        from pyathena import connect
        
        # Establish a connection to Athena.
        session = boto3.Session(region_name=region)
        athena_client = session.client('athena')
        
        # Parse the stocks input.
        json_stocks= stocks.split("\n")[0]
        if type(json_stocks) == str:
            json_stocks = json.loads(json_stocks)
            
        stock_ls = json_stocks.get("stock_ls")
        if not stock_ls:
            raise ValueError("Please provide stock list as stock_ls")
        # Execute the query.
        query = f'SELECT * from "stock-prices-db"."stock_prices"'  
        cursor = connect(s3_staging_dir=f's3://{athena_result_bucket}/athenaresults/', region_name=region).cursor()
        cursor.execute(query)

        # Fetch results.
        rows = cursor.fetchall()

        # Convert to pandas DataFrame.
        df = pd.DataFrame(rows, columns=[column[0] for column in cursor.description])

        # Filter data to use the designated list of stocks.
        stock_ls = [x.lower() for x in stock_ls]
        stock_ls.append('date')        
        df = df[stock_ls]

        # Set "Date" as the index and parse it as a datetime object.
        df.set_index("date", inplace=True)
        df.index = pd.to_datetime(df.index, format = '%Y-%m-%d')
        
        mu = expected_returns.mean_historical_return(df)
        S = risk_models.sample_cov(df)

        # Optimize for the maximal Sharpe ratio.
        ef = EfficientFrontier(mu, S)
        weights = ef.max_sharpe()
        ef.portfolio_performance(verbose=True)

        cleaned_weights = ef.clean_weights()
        ef.portfolio_performance(verbose=True)
        # Lastly, convert the weights into actual allocation values; that is, how many of each stock to buy. For your allocation, consider an investment amount of $100,000.

        from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices
        latest_prices = get_latest_prices(df)
        da = DiscreteAllocation(weights, latest_prices, total_portfolio_value=10000)
        allocation, leftover = da.greedy_portfolio()       
        results=str(dict(cleaned_weights)).replace('{',"").replace('}',"")
        return f"These are the optimized portfolio {results}"
        

    async def _arun(self, stock_ls: int):
        """Use the tool asynchronously."""
        raise NotImplementedError("This tool does not support async")

# Initialize an optimizer.
optimizer = PortfolioOptimizerTool()        

9. 接下来定义一个自定义tool工具text_extract,用于利用Amazon Textract提取文档中的信息。

# Here, you use the @tool decorator to define a custom tool that will extract text from a .pdf file by using Amazon Textract.

@tool
def text_extract(inputString):
    """Useful for when you need to trigger conversion of pdf version of quaterly reports to text files using amazon textextract"""
    print(inputString)
    lambda_client = boto3.client('lambda', region_name=region)
    lambda_payload = {"inputString:"+inputString}
    response=lambda_client.invoke(FunctionName='textract-pdf-files', # This Lambda function is invoked to do text extraction.
                        InvocationType='RequestResponse',
                     Payload=json.dumps(inputString))
    print(response['Payload'].read())
    return response

10.接下来定义一个自定义tool工具transcribe_audio,用于利用Amazon Transcribe提取语音文件中的信息。

# Define a custom tool that will transcribe audio to text.

@tool
def transcribe_audio(inputString):
    """Useful for when you need to convert audio recordings of earnings calls from audio to text format using Amazon Transcribe"""
    
    print(inputString)
    lambda_client = boto3.client('lambda', region_name=region)
    lambda_payload = {"inputString:"+inputString}
    response=lambda_client.invoke(FunctionName='transcribe-audio', # This Lambda function is invoked to generate transcripts.
                        InvocationType='RequestResponse',
                     Payload=json.dumps(inputString))
    print(response['Payload'].read())
    return response

11. 接下来我们将要处理的文件”Amazon-10K-2023-EarningsReport.pdf“上传到用于数据处理的S3桶中,在对其进行文字提取和音频转录。

file_name='Amazon-10K-2023-EarningsReport.pdf'
copy_source = {
    'Bucket': data_source_bucket,
    'Key': file_name
}
s3_client.copy_object(CopySource=copy_source, Bucket=multimodal_output_bucket, Key=file_name)

text_extract('process')
transcribe_audio('process')

12. 接下来利用亚马逊云科技向量库Kendra对处理后的文档数据向量化并创建索引

import time

kendra_client = boto3.client("kendra")

sync_response = kendra_client.start_data_source_sync_job(
    Id = kendra_data_source_id,
    IndexId = kendra_index_id
)

print("Wait for the data source to sync with the index.")

time.sleep(30)

while True:
    jobs = kendra_client.list_data_source_sync_jobs(
        Id = kendra_data_source_id,
        IndexId = kendra_index_id
    )

    status = jobs["History"][0]["Status"]
    print(" Syncing data source. Status: "+status)
    
    if status in ['FAILED','SUCCEEDED','INCOMPLETE','ABORTED']:
        break
    time.sleep(30)

13. 定义新的工具”lookup_info“,用于在向量库中基于用户问题搜索答案。

from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain.retrievers import AmazonKendraRetriever

@tool
def lookup_info(question):
    """Useful for when you need to look up information from a knowledge base."""
    model_kwargs = {
        "max_tokens": 2048,
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
    llm_retrieve = Bedrock(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs,
        streaming=False
    )
      
    retriever = AmazonKendraRetriever(index_id=kendra_index_id,region_name=region, top_k=3)

    prompt_template = """
    Here are a few documents in <documents> tags:
      <documents>
      {context}
      </documents>
      Based on the above documents, provide a detailed answer for, {input} 
      Answer "don't know" if not present in the document. 
    """
    prompt = ChatPromptTemplate.from_template(prompt_template)

    docs_chain = create_stuff_documents_chain(
        llm_retrieve, prompt
    )
    retrieval_chain = create_retrieval_chain(retriever, docs_chain)

    result = retrieval_chain.invoke({"input": question})

 
    return result['answer']

14. 接下来我们定义一个列表,包含我们之前创建的全部tool工具。

# Define the list of tools.

tools = [stock_query, lookup_info, optimizer, text_extract, transcribe_audio]

15. 我们利用LangChain提示词模板定义我们的提示词

from langchain_core.prompts import ChatPromptTemplate

template = '''Respond to the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer.
Thought: you should always think about what to do.
Action: the action to take, should be one of [{tool_names}].
Action Input: the input to the action.
Observation: the result of the action.
... (this Thought/Action/Action Input/Observation can repeat N times).

Thought: I now know the final answer.
Final Answer: the final answer to the original input question.

Begin!

Question: {input}
Thought:{agent_scratchpad}

'''

prompt = ChatPromptTemplate.from_template(template)

16. 为Agent添加对话历史记忆功能,让Agent可以通过对话历史更精确地生成回复。对话历史通过LangChain保存在DynamoDB中。

import uuid
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
from langchain.memory import ConversationBufferMemory

chat_history_table = 'TO BE PROVIDED' # Provide the DynamoDB table name for storing conversations (prompts and answers).
  
chat_session_id = '0'
  
if chat_session_id == '0' :
    chat_session_id = str(uuid.uuid4())

print (chat_session_id)

chat_history_memory = DynamoDBChatMessageHistory(table_name=chat_history_table, session_id=chat_session_id)

17. 下面我们定义分析平台与用户交互时使用的Bedrock上的大语言模型以及模型参数,并创建一个ReAct Agent根据提示词生成回复。该Agent的特点就是同时会生成推理过程并根据推理过程,引导Agent持续更新调用外部数据源的行为步骤,获得的回复更准确。

from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.runnables.history import RunnableWithMessageHistory


model_kwargs = {
        "max_tokens": 2048,
        "stop": ["\n"],        
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
llm = Bedrock(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs,
        streaming=False
    )

# Initialize a ReAct agent.
agent = create_react_agent(llm, tools, prompt)

# Create an agent executor by passing in the agent and tools.
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
agent_with_chat_history = RunnableWithMessageHistory(
    agent_executor,
    # This is needed because a session ID is required in most real-world scenarios.
    # It isn't really used here because we are using the basic, in-memory ChatMessageHistory.
    lambda session_id: message_history,
    input_messages_key="input",
    history_messages_key="chat_history",
)

response = agent_executor.invoke({"input":"What are the closing prices of stocks MNBV, PYXC, LKDI in year 2014?"})
print(response['output'])

18.最后我们通过调用ReAct Agent对示例问题生成回复,示例问题为”查询HFGD、SEDC、FVGB“三家公司的股价。

agent_executor.invoke({"input": "What are the closing prices of stocks HGFD, SEDC, FVGB in year 2018? Can you build an optimized portfolio using these three stocks? Please provide answers to both questions."})

得到的回复如下:

在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台-LMLPHP

19. 新建一个Python文件”streamlit_app.py“,复制以下代码。该代码利用streamlit框架开发了一个金融数据分析中台网页交互UI,在代码中创建了ReAct Agent代理并利用亚马逊云科技系列AI服务自定义tool工具,基于用户问题查询不同类型的多模态原始数据,最终利用大模型为用户生成精准回复。

import boto3
from botocore.exceptions import ClientError
import json
import langchain
from importlib import reload
import time
import sys
import os
import streamlit as st
import uuid
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
from langchain_community.llms import Bedrock
from langchain.agents import initialize_agent
from langchain.agents import AgentExecutor, create_react_agent
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain.retrievers import AmazonKendraRetriever
from langchain_core.tools import Tool, StructuredTool
from langchain.agents import initialize_agent
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
from streamlit.web.server.websocket_headers import _get_websocket_headers
from langchain import hub
from langchain_core.runnables.history import RunnableWithMessageHistory

st.set_page_config(layout="wide")

#Session states to hold sateful variables
if 'generated' not in st.session_state:
    st.session_state['generated'] = []
if 'past' not in st.session_state:
    st.session_state['past'] = []
if 'messages' not in st.session_state:
    st.session_state['messages'] = []
if 'ant_key' not in st.session_state:
    st.session_state['ant_key'] = ''
if 'chat_id' not in st.session_state:
    st.session_state['chat_id'] = 1
if 'client_id' not in st.session_state:
    st.session_state['client_id'] = ''
if 'prompt' not in st.session_state:
    st.session_state['prompt'] = ''
if 'memory' not in st.session_state:
    st.session_state['memory'] = ""

######## Global Variables ########
REGION='us-east-1'

bedrock_runtime = boto3.client(service_name='bedrock-runtime')

model_id = "mistral.mistral-large-2402-v1:0"

with open ('param.json','r') as f:
    params=json.load(f)

table = 'stock_prices'
chat_history_table = params['ddb_table']

######## Setting session Id ########

if len(st.session_state['messages'])<1:

    ######## browser client info ########
    headers = _get_websocket_headers()
    st.session_state['client_id'] = str(headers.get("Sec-Websocket-Key"))
    st.session_state['chat_id']= st.session_state['chat_id']+1


session_id=st.session_state['client_id']
chat_id= st.session_state['chat_id']

#persist dynamodb table id for chat history for each session and browser client
@st.cache_data
def db_table_id(session_id, chat_id):
    chat_sess_id=str(uuid.uuid4())
    return chat_sess_id

chat_session_id=db_table_id(session_id, chat_id)


######## Create tools and Agent ########

# Import tools from utility folder.
from utility import stock_query_mm, kendra_tool_mm, portfolio_tool
from utility.aws_tools import text_extract, transcribe_audio,sentiment_analysis #To complete the DIY, add sentiment_analysis to this import list

# Create tools
tools = [stock_query_mm.run_query, kendra_tool_mm.lookup_info, portfolio_tool.OptimizePortfolio(), text_extract, transcribe_audio,sentiment_analysis] #To complete the DIY, include sentiment_analysis tool in this list

# Define Bedrock LLM
model_kwargs = {
        "max_tokens": 250,
        "stop": ["\n"],
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
llm = Bedrock(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs
    )

# Saving conversation hisory in DynamoDB table
chat_history_memory = DynamoDBChatMessageHistory(table_name=chat_history_table, session_id=chat_session_id)

# Initialize a LangChain ReAct Agent
react_agent = create_react_agent(llm, tools, hub.pull("hwchase17/react"))

if st.session_state['memory']:
    agent_executor = RunnableWithMessageHistory(
        AgentExecutor(agent=react_agent, tools=tools),
        # This is needed because in most real world scenarios, a session id is needed
        # It isn't really used here because we are using a simple in memory ChatMessageHistory
        lambda session_id: chat_history_memory,
        input_messages_key="input",
        history_messages_key="chat_history",
    )
else:
    agent_executor = AgentExecutor(agent=react_agent, tools=tools)

# A function to invoke the agent and parse the output
def query(prompt, agent_executor):

    try:
        if st.session_state['memory']:
            output=agent_executor.invoke({"input":prompt},config={"configurable": {"session_id": chat_session_id}})
        else:
            output=agent_executor.invoke({"input":prompt})
            
        chat_history_memory.add_ai_message(str(output['output']))
        response = output.get('output').replace('\n','').replace('```','')
        response = response.replace("%", " percent")
    except Exception as e:
        print(e)
        response = "I couldn't find the answer to your inquiry. Could you try again?"

    return response

def action_doc(agent_executor):
    st.title('Multimodal Agent to assist Financial Analyst')

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        if "role" in message.keys():
            with st.chat_message(message["role"]):
                st.markdown(message['content'].replace("$","USD ").replace("%", " percent"))

        else:
            with st.expander(label="**Intermediate Steps**"):
                st.write(message["steps"])

    if prompt := st.chat_input("Hello?"):
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)


        with st.chat_message("assistant"):
            message_placeholder = st.empty()
            output_answer=query(prompt, agent_executor)
            message_placeholder.markdown(output_answer.replace("$","USD ").replace("%", " percent"))
        st.session_state.messages.append({"role": "assistant", "content": output_answer})

######## Streamlit app UX display ########

def app_sidebar():
    with st.sidebar:
        st.write('## How to use:')
        description = """This app lets you query multimodal documents and get relevant answers from documents inculde CSV files, audio files and pdf files. To refresh the current session, click the `Clear Session` button."""
        st.markdown(description)
        st.write('---')
        st.write('## Sample Questions')
        st.markdown("""
                    - What are the closing prices of stocks MNBV, PYXC, LKDI in year 2014? Can you build an optimized portfolio using these three stocks? Please provide answers to both questions.
                    - What is the net sales for Amazon in 2022 and 2023? What is the percent difference?
                    - What is Amazon doing with Responsible Supply Chain?   
                    """)
        st.write('## DIY Sample Question')
        st.markdown("""
                    - What is the sentiment of shareholders towards Amazon's profit in 2023?
                    """)
        st.markdown("""
                    **Datasets**
                    
                    - [Q1 2023 Quaterly Earnings recordings](https://s2.q4cdn.com/299287126/files/doc_financials/2023/q1/Amazon-Quarterly-Earnings-Report-Q1-2023-Full-Call-v1.mp3)
                    - [2023 10-K Reports](https://d18rn0p25nwr6d.cloudfront.net/CIK-0001018724/d2fde7ee-05f7-419d-9ce8-186de4c96e25.pdf)
                    - [Sustainability Executive Summary](https://sustainability.aboutamazon.com/reporting)                    
                    -  Fictional stock tickers and stock prices are generated by Amazon Bedrock.
                    """)
        st.write('---')

        use_memory=''
        mem = st.checkbox('Conversation Memory')
        if mem:
            use_memory=True
        st.session_state['memory']=use_memory

        if st.button('Clear Session'):
            '''
                The Clear context helps to refresh the UI and also create a new session for the chat. This creates a new Dynamo DB table to                   hold the chat history.
            '''
            # Delete all the items in Session state
            for key in st.session_state.keys():
                del st.session_state[key]
            # create new session state items
            if 'generated' not in st.session_state:
                st.session_state['generated'] = []
            if 'past' not in st.session_state:
                st.session_state['past'] = []
            if 'messages' not in st.session_state:
                st.session_state['messages'] = []
            if 'ant_key' not in st.session_state:
                st.session_state['ant_key'] = ''
            if 'chat_id' not in st.session_state:
                st.session_state['chat_id'] = 1
            if 'client_id' not in st.session_state:
                st.session_state['client_id'] = ''
            if 'prompt' not in st.session_state:
                st.session_state['prompt'] = ""
            if 'memory' not in st.session_state:
                st.session_state['memory'] = False

def main(agent_executor):
    params=app_sidebar()
    action_doc(agent_executor)


if __name__ == '__main__':
    main(agent_executor)

20. 安装运行应用的的必要依赖

sudo yum install blas-devel lapack-devel
pip install -r requirements.txt -q

21. 运行命令启动streamlit服务器

streamlit run streamlit_app.py

22. 打开启动服务器后返回的URL,在浏览器中打开,根据左侧的提示问题向Agent提问就可以得到基于原始数据查询的的文字回复了。

在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台-LMLPHP

以上就是在亚马逊云科技上利用亚马逊云科技Amazon Sagemaker上搭建LangChain Agent,开发智能化多模态数据金融分析的中台全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。

08-20 05:46