赞
踩
Constructing a Knowledge Graph from Text Documents
from dotenv import load_dotenv import os # Common data processing import json import textwrap # Langchain from langchain_community.graphs import Neo4jGraph from langchain_community.vectorstores import Neo4jVector from langchain_openai import OpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chains import RetrievalQAWithSourcesChain from langchain_openai import ChatOpenAI # Warning control import warnings warnings.filterwarnings("ignore")
# Load from environment load_dotenv('.env', override=True) NEO4J_URI = os.getenv('NEO4J_URI') NEO4J_USERNAME = os.getenv('NEO4J_USERNAME') NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD') NEO4J_DATABASE = os.getenv('NEO4J_DATABASE') or 'neo4j' OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') # Note the code below is unique to this course environment, and not a # standard part of Neo4j's integration with OpenAI. Remove if running # in your own environment. OPENAI_ENDPOINT = os.getenv('OPENAI_BASE_URL') + '/embeddings' # 全局变量 VECTOR_INDEX_NAME = 'form_10k_chunks' VECTOR_NODE_LABEL = 'Chunk' VECTOR_SOURCE_PROPERTY = 'text' VECTOR_EMBEDDING_PROPERTY = 'textEmbedding'
打开就长下面这个样子(随便截了几张)
将上面的文件构建知识图谱之后就可以对这些金融数据进行问答对话了,但是下载下来是XML 格式,需要对XML进行解析并对数据进行清洗。
first_file_name = "./data/form10k/0000950170-23-027948.json"
first_file_as_object = json.load(open(first_file_name))
print(type(first_file_as_object))# dict
for k,v in first_file_as_object.items():
print(k, type(v))
item1_text = first_file_as_object['item1']
item1_text[0:1500]
#初始化 splitter:递归字符串文本分割器
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 2000,#每个块的大小:2000字符
chunk_overlap = 200,#前后块有200个字符的重叠(保持上下文用?)
length_function = len,
is_separator_regex = False,
)
#切块
item1_text_chunks = text_splitter.split_text(item1_text)
切块结果:
def split_form10k_data_from_file(file):#从文件中拆分10-k数据函数 chunks_with_metadata = [] # use this to accumlate chunk records file_as_object = json.load(open(file)) # open the json file for item in ['item1','item1a','item7','item7a']: # pull these keys from the json print(f'Processing {item} from {file}') item_text = file_as_object[item] # grab the text of the item item_text_chunks = text_splitter.split_text(item_text) # split the text into chunks chunk_seq_id = 0 for chunk in item_text_chunks[:20]: # only take the first 20 chunks(为了演示加速,实际情况要使用全部字符) #文件名的最后一部分作为表单的ID form_id = file[file.rindex('/') + 1:file.rindex('.')] # extract form id from file name # finally, construct a record with metadata and the chunk text chunks_with_metadata.append({ 'text': chunk, # metadata from looping... 'f10kItem': item, 'chunkSeqId': chunk_seq_id, # constructed metadata... 'formId': f'{form_id}', # pulled from the filename 'chunkId': f'{form_id}-{item}-chunk{chunk_seq_id:04d}', # metadata from file... 'names': file_as_object['names'], 'cik': file_as_object['cik'], 'cusip6': file_as_object['cusip6'], 'source': file_as_object['source'], }) chunk_seq_id += 1 print(f'\tSplit into {chunk_seq_id} chunks') return chunks_with_metadata
first_file_chunks = split_form10k_data_from_file(first_file_name)
切片结果:
# 这是一个merge语句,merge之前首先有一个match,如果match失败,就会执行一个create,create一个新节点 # 所以merge之前要么是查询要么是创建 # 下面创建时传入我们切块时整理的数据字典(比如:first_file_chunks[0]) merge_chunk_node_query = """ MERGE(mergedChunk:Chunk {chunkId: $chunkParam.chunkId}) ON CREATE SET mergedChunk.names = $chunkParam.names, mergedChunk.formId = $chunkParam.formId, mergedChunk.cik = $chunkParam.cik, mergedChunk.cusip6 = $chunkParam.cusip6, mergedChunk.source = $chunkParam.source, mergedChunk.f10kItem = $chunkParam.f10kItem, mergedChunk.chunkSeqId = $chunkParam.chunkSeqId, mergedChunk.text = $chunkParam.text RETURN mergedChunk """
kg = Neo4jGraph(
url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD, database=NEO4J_DATABASE
)
#kg.query 是执行命令
#命令是merge_chunk_node_query
# 数据参数是 params,里面的chunkParam 将在merge_chunk_node_query 命令里替换$
kg.query(merge_chunk_node_query,
params={'chunkParam':first_file_chunks[0]})
在调用辅助函数批量创建知识图谱之前,我们需要采取额外的步骤来确保我们不会重复数据
kg.query("""
CREATE CONSTRAINT unique_chunk IF NOT EXISTS
FOR (c:Chunk) REQUIRE c.chunkId IS UNIQUE
""")
查看一下
kg.query("SHOW INDEXES")
node_count = 0
for chunk in first_file_chunks:
print(f"Creating `:Chunk` node for chunk ID {chunk['chunkId']}")
kg.query(merge_chunk_node_query,
params={
'chunkParam': chunk
})
node_count += 1
print(f"Created {node_count} nodes")
# 查询下所有节点
kg.query("""
MATCH (n)
RETURN count(n) as nodeCount
""")
# 索引被称为 form_10k_chunks
# 并且我们将为标记为块的节点存储在一个命名为 textEmbedding 的属性中的嵌入
kg.query("""
CREATE VECTOR INDEX `form_10k_chunks` IF NOT EXISTS
FOR (c:Chunk) ON (c.textEmbedding)
OPTIONS { indexConfig: {
`vector.dimensions`: 1536,
`vector.similarity_function`: 'cosine'
}}
""")
查看所有索引看是否创建成功
kg.query("""
MATCH (chunk:Chunk) WHERE chunk.textEmbedding IS NULL
WITH chunk, genai.vector.encode(
chunk.text,
"OpenAI",
{
token: $openAiApiKey,
endpoint: $openAiEndpoint
}) AS vector
CALL db.create.setNodeVectorProperty(chunk, "textEmbedding", vector)
""",
params={"openAiApiKey":OPENAI_API_KEY, "openAiEndpoint": OPENAI_ENDPOINT} )
kg.refresh_schema()
print(kg.schema)
目前只有节点,节点之间没有关系
创建辅助函数来使用Neo4j进行向量搜索
def neo4j_vector_search(question): """Search for similar nodes using the Neo4j vector index""" vector_search_query = """ WITH genai.vector.encode( $question, "OpenAI", { token: $openAiApiKey, endpoint: $openAiEndpoint }) AS question_embedding CALL db.index.vector.queryNodes($index_name, $top_k, question_embedding) yield node, score RETURN score, node.text AS text """ similar = kg.query(vector_search_query, params={ 'question': question, 'openAiApiKey':OPENAI_API_KEY, 'openAiEndpoint': OPENAI_ENDPOINT, 'index_name':VECTOR_INDEX_NAME, 'top_k': 10}) return similar
问个问题试一下
search_results = neo4j_vector_search(
'In a single sentence, tell me about Netapp.'
)
使用Neo4j和LangChain开始最简单的就是Neo4j向量接口
neo4j_vector_store = Neo4jVector.from_existing_graph(
embedding=OpenAIEmbeddings(),
url=NEO4J_URI,
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
index_name=VECTOR_INDEX_NAME,
node_label=VECTOR_NODE_LABEL,
text_node_properties=[VECTOR_SOURCE_PROPERTY],
embedding_node_property=VECTOR_EMBEDDING_PROPERTY,
)
#将向量存储器转换为检索器
retriever = neo4j_vector_store.as_retriever()
建立RetrievalQAWithSourcesChain进行问答: LangChain documentation for this chain
#chain_type="stuff" 就是使用 prompt,吧参数或者数据组合到prompt中,然后传给LLM
chain = RetrievalQAWithSourcesChain.from_chain_type(
ChatOpenAI(temperature=0),
chain_type="stuff",
retriever=retriever)
#直接受一个问题,然后它用那个question调用chain,然后只是提取出答案文本,并将其以适合屏幕的方式漂亮的打印出来
def prettychain(question: str) -> str:
"""Pretty print the chain's response to a question"""
response = chain({"question": question},
return_only_outputs=True,)
print(textwrap.fill(response['answer'], 60))
问个问题试一下
question = "What is Netapp's primary business?"
prettychain(question)
再试几个
!!! 指定字数也行??
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。