Work/GraphDB

GraphDB를 활용한 자연어질의 처리 with LLM

raeul0304 2025. 5. 19. 17:23

계속 graphRAG, graphDB, 일반 RAG랑 헷갈림...

 

GraphRag란  GraphDB를 기반으로 RAG을 구현하는 것. 단순히 벡터 데이터베이스 기반의 retrieval 방식의 유사도 검색이 아닌, knowledge graph를 구축해 검색 대상들 (엔티티)간의 관계를 파악해 더 세분화된 지식 검색을 하는 것

 

wikidocs에 제시된 내용을 기반으로 구현 방법을 정리해보고, 포스트 마지막에 실제 구현한 내용을 정리해보고자 한다.

 

✏️ Part 2. 생성(Generation)을 위한 LLM 모델

📌 프롬프트 구성하기

 

🔻 PromptTemplate

기본적인 템플릿 구성으로, Langflow에서처럼 {} 중괄호를 이용해 입력받아야 할 파라미터를 정의내릴 수 있다.

from neo4j_graphrag.generation import PromptTemplate

prompt_template = PromptTemplate(
    template = ''' 제시어가 들어간 문장을 {sentences_num}개 작성해주세요.:
    제시어: {concept}
    답변:
    ''',
    expected_inputs = ['sentences_num', 'concept']
)

 

 

▫️ PromptTemplate의 파라미터

    ▪️ template (Optional[str]) : 프롬프트 텍스트(문자열), 중괄호로 제공받아야 할 파라미터 포맷을 작성

    ▪️ expected_inputs (Optional[list[str]]) : 입력 받아야 할 파라미터 key 리스트

 

 

▫️ format

prompt = prompt_template.format('3', concept='그래프')
# prompt = prompt_template.format('3', '그래프')
# prompt = prompt_template.format(sentences_num='3', concept='그래프')
print(prompt)

 

format 메소드를 통해 파라미터 값을 입력하여 프롬프트를 완성할 수 있다.

위 예시에서의 파라미터 key인 'sentences_num', 'concept'를 명시하여 전달인자를 입력하거나 생략하여 순서대로 입력할 수 있다.

 

 

 

 

🔻 RagTemplate

neo4j_graphrag 패키지 내의 RAG 모듈을 사용할 시, RagTemplate을 사용하여 동작한다.

PromptTemplate과 달리 input의 파라미터가 'query_text', 'context',  'examples'로 미리 정의되어 있다.

from neo4j_graphrag.generation import RagTemplate, GraphRAG

prompt_template = RagTemplate(
    template = """ 주어진 context를 기반으로 examples에 작성된 답변 형식을 참고하여 question에 답변하세요.
        question: {query_text}
        context : {context}
        examples : {examples}
        """,
    expected_inputs=["query_text", "context", "examples"]

 

 

▫️ PromptTemplate의 파라미터

    ▪️ DEFAULT_TEMPLATE : str = 'Answer the user question using the following context\n\nContext:\n{context}\n\nExamples:\n{examples}\n\nQuestion:\n{query_text}\n\nAnswer:\n'

    ▪️ EXPECTED_INPUTS: list[str] = ['context', 'query_text', 'examples']

 

 

▫️ format

prompt = prompt_template.format(
    query_text='그래프 데이터베이스에 관해 알려주세요.', 
    context = '''
        그래프 데이터베이스는 데이터와 그 데이터 간의 관계를 그래프 구조로 저장하고 관리하는 데이터베이스 시스템입니다. 
        이 데이터베이스는 노드(node), 엣지(edge), 속성(property)로 구성된 그래프 모델을 사용하여 데이터를 표현합니다. 
    ''', 
    examples = '[**AI**] AI는 인공지능을 의미하며, 컴퓨터가 사람처럼 학습하고 문제를 해결할 수 있도록 하는 기술 분야를 말함.'
)
print(prompt)

 

 

 

 

🔻 Text2CypherTemplate

format (기본 프롬프트 템플릿)

Task: Generate a Cypher statement for querying a Neo4j graph database from a user input.

Schema:
{schema}

Examples (optional):
{examples}

Input:
{query_text}

Do not use any properties or relationships not included in the schema.
Do not include triple backticks ''' or any additional text except the generated Cypher statement in your reponse.

Cypher query:

 

위 내용을 prompt_template과 같은 변수에 저장을 하고 아래와 같이 코드를 작성하면 된다.

from neo4j_graphrag.generation.prompts import Text2CypherTemplate
prompt_template = Text2CypherTemplate()

prompt = prompt_template.format(
	query_text = '''
        Tom Hanks가 출연한 영화는 몇개입니까?
    '''
)

 

 

활용

from neo4j_graphrag.generation.prompts import Text2CypherTemplate
from neo4j_graphrag.llm import OpenAILLM

# 프롬프트 템플릿 생성
prompt_template = Text2CypherTemplate()

# LLM 객체 생성
llm = OpenAILLM(model_name="gpt-4o", model_params={"temperature": 0})

# 사용자 입력
query_text = "Tom Hanks가 출연한 영화는 몇 개입니까?"

# 프롬프트 생성
prompt = prompt_template.format(query_text=query_text)

# LLM을 통해 Cypher 쿼리 생성
response = llm.invoke(prompt)
print(response.content)

 

 

찾아보니 Text2CypherTemplate 외에 Text2CypherRetriever가 있어 차이점을 살펴보았다.

Text2CypherTemplate의 경우 프롬프트 템플릿을 생성하기 위해 사용하는 것으로, LLM에 직접 프롬프트를 전달해야 한다.

query_text, scehma, examples로 구성되어있으며 

프롬프트를 커스터마이징 가능하고, 수동 처리를 통해 활용할 수 있어 프롬프트를 직접 제어하고자 할 때 주로 사용한다.

 

반면, Text2CypherRetriever는 자연어 질문을 통한 데이터 검색 자동화를 위해 사용하는 것으로,

자연어를 입력하면 Cypher 구문으로 변환하고 쿼리를 실행하여 결과를 반환한다.

driver, llm, neo4j_schema, examples로 구성되어있고 전체 RAG 파이프라인에 통합한다.

▪️ driver : Neo4j 데이터베이스 드라이버

▪️ llm : cypher 쿼리를 생성할 LLM

▪️ neo4j_schema : 데이터베이스 스키마 정보

▪️ examples : LLM이 참고할 수 있는 예시 쿼리

 

 

Text2CypherRetriever 활용 예시를 코드로 살펴보자.

아래는 원자재 co와 관련해서 작성해보았는데, 아주 간단하게만 생각해보았다.

from neo4j import GraphDatabase
from neo4j_graphrag.retrievers import Text2CypherRetriever
from neo4j_graphrag.llm import OpenAILLLM

# Neo4j 드라이버 설정
driver = GraphDatabase.driver("neo4j://localhost:7587", auth=("neo4j", "password"))

# LLM 객체 생성
llm = OpenAILLM(model_name = "gpt-4o")

#데이터베이스 스키마 정의 (선택)
neo4j_schema = """
Node properties:
RawMaterial {name: STRING, materialCode: STRING}
SemiProduct {name: STRING}
FinishedProduct {name: STRING}
ProductionOrder {orderNumber: STRING, date: DATE}
WorkCenter {name: STRING}
CostCenter {name: STRING}
CostElement {type: STRING, amount: FLOAT}
Supplier {name: STRING}
Operation {name: STRING}

The relationships:
(:RawMaterial)-[:USED_IN]->(:ProductionOrder)
(:ProductionOrder)-[:PRODUCES]->(:SemiProduct)
(:ProductionOrder)-[:PRODUCES]->(:FinishedProduct)
(:ProductionOrder)-[:EXECUTED_AT]->(:WorkCenter)
(:ProductionOrder)-[:CHARGED_TO]->(:CostCenter)
(:Operation)-[:HAS_COST]->(:CostElement)
(:ProductionOrder)-[:HAS_OPERATION]->(:Operation)
(:RawMaterial)-[:SUPPLIED_BY]->(:Supplier)
"""

#예시 쿼리 정의 (선택)
examples = [
    "USER INPUT: '2024년 1분기 동안 사용된 원자재의 총 비용은 얼마인가요?' QUERY: MATCH (rm:RawMaterial)-[:USED_IN]->(po:ProductionOrder)-[:HAS_OPERATION]->(op:Operation)-[:HAS_COST]->(ce:CostElement) WHERE po.date >= date('2024-01-01') AND po.date <= date('2024-03-31') AND ce.type = '재료비' RETURN sum(ce.amount) AS totalRawMaterialCost",
    
    "USER INPUT: '반제품 B가 어떤 원자재로부터 만들어졌나요?' QUERY: MATCH (rm:RawMaterial)-[:USED_IN]->(po:ProductionOrder)-[:PRODUCES]->(sp:SemiProduct {name: 'B'}) RETURN DISTINCT rm.name",

    "USER INPUT: '작업장 A에서 수행된 모든 생산 오더와 그 결과물은?' QUERY: MATCH (po:ProductionOrder)-[:EXECUTED_AT]->(wc:WorkCenter {name: 'A'}) OPTIONAL MATCH (po)-[:PRODUCES]->(prod) RETURN po.orderNumber, prod.name"
]


# Text2CypherRetriever 객체 생성
retriever = Text2CypherRetriever(
    driver=driver,
    llm=llm,
    neo4j_schema=neo4j_schema,
    examples=examples,
)

#자연어 질문을 통해 데이터 검색
query_text = "2024년 1분기 동안 사용된 원자재의 총 비용은 얼마인가요?"
results = retriever.get_search_results(query_text)
print(results)

 

Text2CypherRetriever 소스코드 : https://neo4j.com/docs/neo4j-graphrag-python/current/_modules/neo4j_graphrag/retrievers/text2cypher.html

 

neo4j_graphrag.retrievers.text2cypher — neo4j-graphrag-python documentation

 

neo4j.com

 

 

 

 

🔻 ERExtractionTemplate

ERExtractionTemplate은 주어진 텍스트 내에서 엔티티(노드)를 추출하고,

그 엔티티들 간의 관계 또한 추출하여 Knowledge graph를 구축해주는 기능을 한다.

 

default 템플릿은 다음과 같다.

You are a top-tier algorithm designed for extracting information in structured formats to build a knowledge graph.
Extract the entities (nodes) and specify their type from the following text.
Also extract the relationships between these nodes.

Return result as JSON using the following format:
{{"nodes": [ {{"id": "0", "label": "Person", "properties": {{"name": "John"}} }}],
"relationships": [{{"type": "KNOWS", "start_node_id": "0", "end_node_id": "1", "properties": {{"since": "2024-08-01"}} }}] }}

Use only fhe following nodes and relationships (if provided):
{schema}

Assign a unique ID (string) to each node, and reuse it to define relationships.
Do respect the source and target node types for relationship and
the relationship direction.

Do not return any additional information other than the JSON in it.

Examples:
{examples}

Input text:

{text}

 

엔티티(노드)와 관계를 추출했다면 그것들의 라벨(label)과 속성(property) 등을 JSON 형태로 반환해주는 프롬프트이다.

 

만약, 주어진 텍스트가 "John은 2024년 8월 1일부터 Tom과 지내 왔습니다" 일때, 엔티티는 John, Tom이라는 이름을 가진 사람 2명이다.

엔티티들 간의 관계는, '앎' 혹은 '지인'을 의미하는 KNOWS로 정의 가능하다.

이미 id 1에 Tom이 있다면, 이를 반영하여 모델은 아래와 같은 값을 반환할 수 있다.

 

Person ("name" : "John", "id" : "0")) - KNOWS("since": "2024-08-01") -> Person("id" : "1")
"nodes": [ {{"id": "0", "label": "Person", "properties": {{"name": "John"}} }}]
"relationships": [{{"type": "KNOWS", "start_node_id": "0", "end_node_id": "1", "properties": {{"since": "2024-08-01"}} }}] }}

 

 

format

기본적인 형식은 아래와 같다.

from neo4j_graphrag.generation.prompts import ERExtractionTemplate

prompt_template = ERExtractionTemplate()
prompt = prompt_template.format(
	schema = '',
    text = '''
    그래프 데이터베이스는 데이터와 그 데이터 간의 관계를 그래프 구조로 저장하고 관리하는 데이터베이스 시스템입니다.
    이 데이터베이스는 노드(node), 엣지(edge), 속성(property)로 구성된 그래프 모델을 사용하여 데이터를 표현합니다.
    ''',
    examples = ''
)
print(prompt)

 

그리고 이를 활용한다면!

from neo4j_graphrag.generation.prompts import ERExtractionTemplate
from neo4j_graphrag.llm import OpenAILLM
import openai

prompt_template = ERExtractionTemplate()
llm = OpenAILLM(model_name="gpt-4o", model_params={"temperature": 0})

input_text = ''' 민수와 수민은 친구입니다. 민수는 수민을 좋아합니다. '''

prompt = prompt_template.format(
	schema = '',
    text = input_text,
    examples = ''
)

response = llm.invoke(prompt)
print(response.content)

 

 

그럼 output은 다음과 같은 형태로 나올 것이다:

# output : 
{
    "nodes": [
        {"id": "0", "label": "Person", "properties": {"name": "민수"}},
        {"id": "1", "label": "Person", "properties": {"name": "수민"}}
    ],
    "relationships": [
        {"type": "FRIEND", "start_node_id": "0", "end_node_id": "1"},
        {"type": "LIKES", "start_node_id": "0", "end_node_id": "1"}
    ]
}

 

 

 

 

📌 임베딩 모델

임베딩 모델은 원하는 모델을 선택해서 활용할 수 있다.

자세한 내용은 (https://neo4j.com/docs/neo4j-graphrag-python/current/user_guide_rag.html#embedders

from neo4j_graphrag.embeddings import SentenceTransformerEmbeddings

embedder = SentenceTransformerEmbeddings(model="all-MiniLM-L6-v2")

 

 

 

 

✏️ Part 3. 그래프 구축을 위한 다양한 Components

📌 Document Splitter

🔻 Text Splitter Customizing

from neo4j_graphrag.experimental.components.text_splitters.base import TextSplitter
from neo4j_graphrag.experimental.components.types import TextChunks, TextChunk

class MyTextSplitter(TextSplitter):
	def __init__(self, separator: str = ".") -> None:
        self.separator = separator
        
    async def run(self, text:str) -> TextChunks:
        return TextChunks(
        	chunks=[
            	TextChunk(text=text_chunk)
                for text_chunk in text.split(self.separator)
            ]
        )

 

 

 

🔻 FixedSizeSplitter

from neo4j_graphrag.experimental.components.text_splitters.fixed_size_splitter import FixedSizeSplitter

splitter = FixedSizeSplitter(chunk_size=10, chunk_overlap=3)
result = await splitter.run(text="Hello World. Life is beautiful")

 

run()은 비동기 함수이므로, await을 사용하여 실행해야 한다.

run()을 통해 입력한 문서를 분할하여 반환한다.

run의 파라미터인 text는 분할 대상이 되는 문서 텍스트를 문자열 형태로 입력을 받는다.

 

결과는 아래와 같게 되는데, return 타입은 TextChunk 객체들로 구성된 TextChunks로 각 청크는 해당 텍스트와 인덱스 정보를 포함하고 있다.

# output : 
TextChunks(chunks=[TextChunk(text='Hello Worl', index=0, metadata=None), 
TextChunk(text='orld. Life', index=1, metadata=None), 
TextChunk(text='ife is bea', index=2, metadata=None), 
TextChunk(text='beautiful.', index=3, metadata=None), 
TextChunk(text='ul. My nam', index=4, metadata=None), 
TextChunk(text='name is na', index=5, metadata=None), 
TextChunk(text=' nayeon. H', index=6, metadata=None), 
TextChunk(text='. Hi', index=7, metadata=None)])

 

 

 

 

🔻 LangChainTextSplitterAdapter

🔹CharacterTextSplitter

이것도 FixedSizeSplitter와 같이 지정된 크기로 텍스트를 청크로 나누지만,

FixedSizeSplitter는 토큰 단위로 청킹을 하고 CharacterTextSplitter는 문자 단위로 청킹을 한다.

 

from langchain_text_splitters import CharacterTextSplitter
from neo4j_graphrag.experimental.components.text_splitters.langchain import LangChainTextSplitterAdapter

splitter = LangChainTextSplitterAdapter(
    CharacterTextSplitter(chunk_size=10, chunk_overlap=3, separator=".")
)
await splitter.run(text="Hello World. Life is beautiful.")

 

그러면 결과는

# output : 
TextChunks(chunks=[TextChunk(text='Hello World', index=0, metadata=None), 
TextChunk(text='Life is beautiful', index=1, metadata=None)])

 

 

 

 

🔹RecursiveCharacterTextSplitter

지정된 단위가 아니라 의미 기반으로 분할을 하고 싶을 때 사용하는 것이다.

RecursiveCharacterTextSplitter는 청크가 충분히 작아질 때까지 순서대로 분할을 한다.

문장 혹은 단어를 가능한 한 마지막까지 유지하여 분할하는 특징이 있다.

 

from langchain_text_splitters import RecursiveCharacterTextSplitter
from neo4j_graphrag.experimental.components.text_splitters.langchain import LangChainTextSplitterAdapter

splitter = LangChainTextSplitterAdapter(
    RecursiveCharacterTextSplitter(chunk_size=100,chunk_overlap=20)
)
await splitter.run(text="""
Hello World. Life is beautiful.

The world colors itself anew every day. 
When the morning sunlight streams through the window, it feels as if that warm glow envelops our hearts. 
Every moment is precious, and the small joys we discover within them make our lives even more vibrant.

The people we encounter, the conversations we share, and the memories we create together enrich our existence. 
Sometimes we face challenging moments, but even within those, we grow and learn. 
Ultimately, those experiences shape us into better individuals.""")

 

그럼 결과는 다음과 같이 나올 수 있다 :

# output : 
TextChunks(chunks=[TextChunk(text='Hello World. Life is beautiful.', index=0, metadata=None), 
TextChunk(text='The world colors itself anew every day. When the morning sunlight streams through the window, it', index=1, metadata=None), 
TextChunk(text='the window, it feels as if that warm glow envelops our hearts. Every moment is precious, and the', index=2, metadata=None), 
TextChunk(text='precious, and the small joys we discover within them make our lives even more vibrant.', index=3, metadata=None), 
TextChunk(text='The people we encounter, the conversations we share, and the memories we create together enrich our', index=4, metadata=None), 
TextChunk(text='together enrich our existence. Sometimes we face challenging moments, but even within those, we', index=5, metadata=None), 
TextChunk(text='within those, we grow and learn. Ultimately, those experiences shape us into better individuals.', index=6, metadata=None)])

 

 

 

 

🔹 MarkdownHeaderTextSplitter

만약 텍스트가 마크다운 형식으로 정리가 되어있다면 MarkdownHeaderTextSplitter를 활용할 수 있다.

이 기법은 #,##,### 등의 Markdown 헤더 태그를 기준으로 분할하는데,

각 청크에는 해당하는 헤더의 계층 정보도 함께 포함된다!

 

예를 들어 텍스트가 아래와 같은 경우

# Chapter 1
This is an introduction.

## Section 1.1
Details of section 1.1

### Subsection 1.1.1
More specific details

# Chapter 2
Second chapter begins...

 

Chapter > Section > Subsection 구조로 분석되어 분할된다.

 

 

만약 모든 헤더를 활용하고 싶지 않다면 헤더 레벨을 개발자가 직접 지정도 가능하다.

from langchain.text_splitter import MarkdownHeaderTextSplitter

headers_to_split_on = [
	("#", "chapter"),
    ("##", "section"),
    ("###", "subsection),
]

splitter = MardownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
chunks = splitter.split_text(markdown_text)

 

이렇게 하면 각 청크에 'metatdata'가 붙고, {"chapter": ..., "section": ..., "subsection": ...} 식의 구조화된 결과가 나온다.

 

 

 

 

 

✏️ Part 4. GraphDB 검색을 위한 Retriever

📌 검색을 위한 벡터 INDEX 생성하기

🔻 1. 벡터 임베딩 저장하기

벡터 index를 생성하는 방법을 알아보면서 임베딩이 되는지를 빠르게 테스트 해보고

노드 임베딩 수에 따라 배치 임베딩 또는 대규모 배치 처리를 선택해서 벡터 index를 생성하면 된다.

 

📍 단일 임베딩

먼저 테스트를 해보기 위해 단일 임베딩을 진행해보자.

[예제1 : RawMaterial 노드에 임베딩 저장]

본 예제에서는 특정 노드(ex.'RawMaterial')를 선택하여 이름과 materialCode를 함께 저장한다.

그 다음 genai.vector.encode() 함수를 사용하여 OpenAI 모델로 임베딩 벡터를 생성한다.

그리고 생성된 벡터를 db.create.setNodeVectorProperty() 프로시저를 통해 해당 노드의 embedding 속성으로 저장한다.

:param token => 'sk-...';

MATCH (rm:RawMaterial)
WHERE rm.name IS NOT NULL AND rm.materialCode IS NOT NULL
WITH rm, rm.name + ' ' + rm.materialCode AS description
CALL genai.vector.encode(description, 'OpenAI', { token: $token }) YIELD vector
CALL db.create.setNodeVectorProperty(rm, 'embedding', vector)
RETURN rm.name, vector

 

위 과정을 좀 더 살펴보면..

먼저 ":param token=> //" 과정은 cypher 세션 내에서 token이라는 변수를 정의하고, 그 값으로 OpenAI API키를 할당하는 것이다.

따라서 이후 쿼리에서 $token 이라고 쓰면 해당 값으로 치환되어 사용이 된다.

이처럼 파라미터 변수 설정을 하는 이유는, cypher 쿼리 안에 민감한 정보나 외부 입력값을 직접 하드코딩 하지 않고 안전하게 주입하기 위해서다.

토큰을 쿼리 본문에 직접 노출시키면 로그나 저장에 노출될 수 있으며, 여러 쿼리에서 같은 토큰을 반복적으로 쓰기도 쉬워진다.

그리고.. 구문 오류 방지도 되고 api 교체시 나중에 토큰만 바구면 동일 쿼리를 재사용 가능하기 때문에 유연성이 높아진다!

 

구문에 나온 절차들을 살펴보면

1. 노드에서 텍스트를 추출, 필요 시 정규화 및 줄 바꿈 제거

2. 임베딩 생성 : neo4j에서는 genai.vector.encode() 또는 encodeBatch() 사용

3. 벡터 저장 : 벡터를 노드의 속성에 저장 (node.embedding)

4. 벡터 인덱스 생성

CALL db.index.vector.createNodeIndex('RawMaterial', 'embedding', 1536, 'cosine')

 

- 1536 : OpenAI의 text-embdding-ada-002의 임베딩 크기

- cosine : 거리 측정 방식

 

5. 벡터 유사도 검색 : 이건 원하는 기법을 사용해서 대체가 가능하다!

CALL db.index.vector.queryNodes('RawMaterial', 'embedding', $query_vector, 5)
YIELD node, score
RETURN node.title, score

 

- $query_vector와 가장 가까운 노드를 상위 5개 검색하는 예제

 

 

[예제2 : ProductionOrder와 관련된 정보를 결합해서 임베딩]

MATCH(po:ProductionOrder)
OPTIONAL MATCH (po)-[:HAS_OPERATION]->(op:Operation)
OPTIONAL MATCH (po)-[:CHARGED_TO]->(cc:CostCenter)
OPTIONAL MATCH (po)-[EXECUTED_AT]->(wc:WorkCenter)
WITH po, po.orderNumber + '작업: '+ coalesce(op.name, '') +'원가센터: ' + coalesce(cc.name, '') + ' 작업장: ' + coalesce(wc.name, '') AS full_description
CALL genai.vector.encode(full_description, 'OpenAI', { token: $token }) YIELD vector
CALL db.create.setNodeVectorProperty(po, 'embedding', vector)
RETURN po.orderNumber, vector

 

 

이런 식으로 하면, 향후 자연어 기반 질문이 예를 들어 "작업장 A에서 실행된 생산 오더를 찾아줘"라고 한다면

LLM이 query를 생성하고, 관련 노드들의 임베딩을 비교하여 vector search로 찾을 수 있게 된다.

 

여기서 coalesce는 sql의 COALESCE()와 거의 동일한 개념으로, 널(null)이 아닌 첫 번째 값을 반환하는 기능이다.

coalesce() 안에 있는 파라미터를 왼쪽에서부터 차례로 평가하면서 널이 아닌 첫 번째 값을 반환한다.

만약 모든 값이 null이면 결과도 null인데, 모두 null일 경우 기본값으로 대체하도록 설정이 가능하다.

위 구문에서처럼 colesce(cc.name, '이름 없음')으로 하면 cc.name이 null일 때 대비해서 '이름 없음'을 기본값 설정을 한 것!

 

coalesce가 필요한 이유는, 임베딩하고자 하는 요소 중 하나가 null일 경우 임베딩이 불가해지기 때문이다.

cypher에서는 문자열 + null = null 이 된다.

예를 들어,

WITH po, po.orderNumber + ' ' + op.name AS text

 

 

 

📍 배치 임베딩

현실적으로는 한 개씩 임베딩을 생성하는 것은 매우 비효율적이겠죠..?ㅎㅎ

단일 임베딩은 기초 개념을 학습하거나 테스트 목적으로 하는 것이고, 실제로는 여러 노드를 한 번에 처리해야 한다.

이때 처리하고자 하는 노드가 수십개 정도일 경우 배치 임베딩을 진행하고, 전체 노드를 처리할 때는 대규모 배치를 진행하면 된다.

 

위 예시를 이어서 활용을 하면, 여러 RawMaterial 노드의 텍스트를 한 번에 임베딩하고, 각 노드에 벡터를 매핑해 저장한다.

:param token => 'sk-...';

MATCH (rm:RawMaterial)
WHERE rm.name IS NOT NULL AND rm.materialCode IS NOT NULL
WITH rm LIMIT 20
WITH collect(rm) AS rms
WITH rms, [rm IN rms | rm.name + ' ' + rm.materialCode] AS texts
CALL genai.vector.encodeBatch(texts, 'OpenAI', { token: $token }) YIELD index, vector
CALL db.create.setNodeVectorProperty(rms[index], 'embedding', vector)
RETURN rms[index].name, vector

 

▪️collect() : 여러 개의 값을 하나의 리스트로 묶어주는 함수로, 특정 그룹에 속한 노드나 속성들을 리스트 형태로 만들고 싶을 때 사용

▪️encodeBatch() : 리스트를 이 함수의 파라미터로 입력해주면 한 번에 임베딩하여 반환해주며, 여기에는 반환된 노드의 인덱스(dix)와 임베딩 벡터(vector)를 포함한다.

▪️ 마지막으로 db.create.setNodeVectorProperty 프로시저를 호출하여 노드들의 embedding 속성을 생성해 임베딩 벡터를 저장할 수 있다.

 

 

 

📍 대규모 배치 처리

대규모 배치 처리는 위에서의 소규모 배치가 한 encodeBatch를 반복적으로 수행하고 저장하는 과정이다.

수천 개 이상의 노드를 처리할 경우에는 range()와 UNWIND를 사용해 반복적으로 배치 처리한다.

 

아래 예시는 100개씩 처리하는 예제이다:

:param toekn => 'sk-...';

MATCH (rm:RawMaterial)
WHERE rm.name IS NOT NULL AND rm.materialCode IS NOT NULL
WITH collect(rm) AS allRMs
WITH allRMs, size(allRMs) AS total, 100 AS batchSize
UNWIND range(0, total -1. batchSize) AS startIndex
WITH allRMs[startIndex .. startIndex + batchSize] AS batch
WITH batch, [rm IN batch | rm.name + ' ' + rm.materialCode] AS texts
CALL genai.vector.encodeBatch(texts, 'OpenAI', { token: $token }) YIELD index, vector
CALL db.create.setNodeVectorProperty(batch[index], 'embedding', vector)
RETURN batch[index].name, vector

 

▪️UNWIND : 리스트를 행으로 풀어주는 연산자로, 각 원소마다 작업을 수행할 때 주로 사용!

 

 

🔻 Vector Index 생성하기

벡터 인덱스는 노드에 대한 단일 레이블, 단일 속성 인덱스이거나 관계에 대한 단일 관계 유형, 단일 속성 인덱스다.

벡터 인덱스를 통해 대규모 데이터 세트에서 벡터 임베딩을 쿼리할 수 있다.

임베딩은 텍스트, 이미지 또는 문서와 같은 데이터 객체의 숫자 표현으로, 텍스트의 각 단어 또는 토큰은 일반적으로 고차원 벡터로 표현되며 각 차원은 단어 의미의 특정 측면을 나타낸다.

 

CREATE VECTOR INDEX 명령을 사용하여 벡터 인덱스를 생성할 수 있다.

인덱스를 만들 때 이름을 지정하는 것이 좋으며, neo4j 5.16브타 인덱스 이름을 매개변수로 지정할 수도 있다 (CREATE VECTOR INDEX  $name..)

 

예시와 함께 살펴보자.

CREATE VECTOR INDEX rawMaterialEmbedding IF NOT EXISTS
FOR (rm:RawMaterial)
ON rm.embedding
OPTIONS {
    indexConfig: {
      'vector.dimensions': 1536,
      'vector.similarity_function': 'cosine'
    }
}

 

이렇게 하면 RawMaterial 노드의 embedding 속성에 대한 벡터 인덱스가 생성되어 유사도 기반 검색이 가능해진다.

 

만약 노드가 아닌 관계(relationship) 자체에 embedding 속성을 부여하고, 그에 대한 벡터 인덱스를 생성하고 싶다면 :

CREATE VECTOR INDEX hasOperationEmbedding IF NOT EXISTS
FOR ()-[r:HAS_OPERATION]-()
ON (r.embedding)
OPTIONS {
  indexConfig: {
    'vector.dimensions': 1536,
    'vector.similarity_function': 'cosine'
  }
}

 

그리고 이 임베딩은 이런 방식으로 저장했을 것이다 :

MATCH (po:ProductionOrder)-[r:HAS_OPERATION]->(op:Operation)
WHERE r.description IS NOT NULL
CALL genai.vector.encode(r.description, 'OpenAI', { token: $token }) YIELD vector
CALL db.create/setRelationshipVectorProperty(r, 'embedding', vector)

 

 

 

📌 다양한 검색기 (Retrievers)

🔻 Vector Retriever

neo4j 벡터 인덱스와 쿼리 텍스트 또는 벡터를 기반으로 유사성 검색을 수행하는 방식이다. 일치하는 노드와 유사성 점수를 반환한다.

from neo4j_graphrag.retrievers import VectorRetriever

retriever = VectorRetriever(
	driver,
    index_name=POSTER_INDEX_NAME,
)

 

 

 

🔻 Text2Cypher Retriever

자연어를 사용해서 Neo4j 데이터베이스에서 데이터를 검색할 수 있도록 해주는 기능으로,

LLM을 사용하여 사용자의 자연어 쿼리를 cypher 쿼리로 변환한 다음 생성된 cypher 

 

from neo4j import GraphDatabase
from neo4j_graphrag.retrievers import Text2CypherRetriever
from neo4j_graphrag.llm.openai import OpenAILLM

URI = "neo4j://localhost:7687"
AUTH = ("neo4j", "password")

#connect to neo4j database
driver = GraphDatabase.driver(URI, auth=AUTH)

#create LLM object
llm = OpenAILLM(model_name="gpt-4o")

#(optional) neo4j 스키마 명시하기 - 개인의 용도에 맞게!
neo4j_schema = """
Node properties:
RawMaterial {name: STRING, materialCode: STRING}
SemiProduct {name: STRING}
FinishedProduct {name: STRING}
ProductionOrder {orderNumber: STRING, date: DATE}
WorkCenter {name: STRING}
CostCenter {name: STRING}
CostElement {type: STRING, amount: FLOAT}
Supplier {name: STRING}
Operation {name: STRING, description: STRING}

Relationship properties:
HAS_COST {reason: STRING}
SUPPLIED_BY {contractNote: STRING}
HAS_OPERATION {description: STRING}

The relationships:
(:RawMaterial)-[:USED_IN]->(:ProductionOrder)
(:ProductionOrder)-[:PRODUCES]->(:SemiProduct)
(:ProductionOrder)-[:PRODUCES]->(:FinishedProduct)
(:ProductionOrder)-[:EXECUTED_AT]->(:WorkCenter)
(:ProductionOrder)-[:CHARGED_TO]->(:CostCenter)
(:Operation)-[:HAS_COST]->(:CostElement)
(:ProductionOrder)-[:HAS_OPERATION]->(:Operation)
(:RawMaterial)-[:SUPPLIED_BY]->(:Supplier)
"""

# 도메인 예시 쿼리
examples = [
    "USER INPUT: '2024년 1분기 동안 사용된 원자재의 총 비용은 얼마인가요?' QUERY: MATCH (rm:RawMaterial)-[:USED_IN]->(po:ProductionOrder)-[:HAS_OPERATION]->(op:Operation)-[:HAS_COST]->(ce:CostElement) WHERE po.date >= date('2024-01-01') AND po.date <= date('2024-03-31') AND ce.type = '재료비' RETURN sum(ce.amount) AS totalRawMaterialCost",

    "USER INPUT: '반제품 B는 어떤 원자재로부터 만들어졌나요?' QUERY: MATCH (rm:RawMaterial)-[:USED_IN]->(po:ProductionOrder)-[:PRODUCES]->(sp:SemiProduct {name: 'B'}) RETURN DISTINCT rm.name",

    "USER INPUT: '작업장 A에서 실행된 생산 오더를 보여줘' QUERY: MATCH (po:ProductionOrder)-[:EXECUTED_AT]->(wc:WorkCenter {name: 'A'}) OPTIONAL MATCH (po)-[:PRODUCES]->(result) RETURN po.orderNumber, result.name",
]

# retriever 객체 생성
retriever = Text2CypherRetriever(
    driver=driver,
    llm=llm,
    neo4j_schema=neo4j_schema,
    examples=examples,
)

 

근데 문득 코드를 보고 나니 결국 llm이 자연어를 cypher로 바꿔주는 건데..

langchain에서 제공하는 CypherQAChain 등과 뭐가 다른건지, 왜 Text2Cypher의 성능(정밀도)이 좋다고 하는 걸까?

 

구성을 살펴보니 Langchain 기반 툴은 스키마를 반영할 때 graph.schema를 자동 포함하는 반면에 Text2CypherRretriever는 GraphIndex 기반이기 때문에 더 세밀한 제어가 가능하다고 한다. 그리고 Langchain 기반은 validation 및 fallback 기능이 없어 LLM이 틀리면 그대로 실행을 하지만, neo4j의 툴은 쿼리를 검증, 재생성 루프 등이 내장되어있어 더 정밀도가 좋은 거 같다!

 

설계 목적 자체에서도 Langchain은 범용 QA 체인 중심의 설계이고, neo4j 툴은 정확한 cypher 생성에 특화하여 설계한 것으로

간단한 cypher 구문 생성은 langchain으로도 충분하지만 보다 더 복잡한 쿼리를 날릴 때는 neo4j 툴을 사용하는게 좋아 보인다.

 

 

 

✏️ Part 5. GraphDB 기반 RAG (GraphRAG)

📌 GraphRAG 모듈 사용하기

그래서 위의 내용을 정리를 하자면,

1. vector index를 생성하고

2. vector retriever를 생성하고

3. graphRAG 파이프라인을 생성하면 된다.

 

영화 데이터를 예제로 들어보면,1.

CREATE VECTOR INDEX moviePlotsEmbedding FOR (n:Movie) ON (n.plotEmbedding) OPTIONS {indexConfig: {
 `vector.dimensions`: 1536,
 `vector.similarity_function`: 'cosine'
}}

 

 

2.

from neo4j_graphrag.retrievers import VectorRetriever
from neo4j_graphrag.embeddings.openai import OpenAIEmbeddings
embedder = OpenAIEmbeddings(model="text-embedding-ada-002")
retriever = VectorRetriever(
    driver,
    index_name="moviePlotsEmbedding",
    embedder=embedder,
    return_properties=["title", "plot"],
)
query_text = "A cowboy doll is jealous when a new spaceman figure becomes the top toy." # 토이스토리 줄거리
retriever_result = retriever.search(query_text=query_text, top_k=3)
print(retriever_result)

 

 

 

3.

from neo4j_graphrag.llm import OpenAILLM
from neo4j_graphrag.generation import GraphRAG

# Retrieval 후 답변을 생성할 때 사용할 LLM
llm = OpenAILLM(model_name="gpt-4o", model_params={"temperature": 0})
# RAG 파이프라인 초기화
rag = GraphRAG(retriever=retriever, llm=llm)
# 질문하기
query_text = "What movies are sad romances?"
response = rag.search(query_text=query_text, retriever_config={"top_k": 5})
print(response.answer)

 

 


이제 실제 구현한 과정을 정리해보자면

1. LLM 설정

model = ChatOpenAI(openai_api_key=config.gpt4o_OPENAI_API_KEY, model_name="gpt-4o", temperature=0, max_tokens=4048)

 

이런 식으로 llm 모델을 설정할 때, gpt의 모델을 정해주는 것이 좋다.

이전에 텍스트를 cypher 구문으로 바꾸는데 특화된 fine tuning된 모델을 사용해볼까해서 찾아봤었는데gpt-4o 모델의 성능이 fine tuning 된 모델보다 더 좋았다. (mini도 안됨!)이번에도 모델명을 지정하지 않은 채 했다가 4o 모델을 사용하니 성능이 확 좋아졌다! (지피티 최고)

 

 

2. neo4j 연결

URI = config.NEO4J_URI
AUTH = (config.NEO4J_USERNAME, config.NEO4J_PASSWORD)
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    driver.verify_connectivity()

 

이처럼 너무 길어질 내용이나 보안에 중요한 내용은 config 파일에 저장해두는게 좋다.

 

 

3. 노드 임베딩 및 벡터 인덱스 생성 (선택 → 유사 검색을 할 때만 필요. 정확한 cypher 구문 매칭을 통해 결과값을 받을 수 있는 경우 필요 없음)

 

"~노드와 가장 비슷한 노드가 뭐야?" 와 같이 의미적 유사도를 계산하고자 했을 때는 노드를 임베딩한 값을 속성에 추가하여 저장하는 것이 좋다.

하지만 나의 경우 cypher 구문으로 정확히 매칭되는 질문들만 던질 예정이라 구현은 해놓고 사용하진 않기로 했다.

# 노드 임베딩 > TODO: 속성에 저장을 해두는 건데 실행할 때마다 매번 수행할 필요 없음 
def generate_node_embeddings(driver, token):
    logger.info("Generating node embeddings...")
    query = """
    MATCH (m:Metric)
    WHERE m.name IS NOT NULL AND m.amount IS NOT NULL
    WITH m, m.name + 'level:' + toString(m.level) + 'amount:' + toString(m.amount) AS description
    CALL genai.vector.encode(description, 'OpenAI', { token: $token}) YIELD vector
    CALL db.create.setNodeVector(m, 'embedding', vector)
    RETURN m.name AS name, vector
    """
    with driver.session() as session:
        result = session.run(query, token=token)
        print("임베딩 저장 결과:")
        for record in result:
            print(f"- {record['name']}")

# vector index 생성
def create_vector_index(driver):
    query = """
    CREATE VECTOR INDEX metricEmbedding IF NOT EXISTS
    FOR (m:Metric)
    ON m.embedding
    OPTIONS {
        indexConfig: {
            'vector.dimensions': 1536,
            'vector.similarity_function': 'COSINE'
        }
    }
    """
    with driver.session() as session:
        session.run(query)
        print("metric 벡터 인덱스 생성 완료")


# 유사도 검색
def similarity_search(driver, query_text, token, top_k=5):
    query = """
    CALL genai.vector.encode($query_text, 'OpenAI', { token: $token}) YIELD vector
    CALL db.index.vector.queryNodes('Metric', 'embedding', vector, $top_k)
    YIELD node, score
    RETURN node.name AS name, node.level AS level, node.amount AS amount, score
    """
    with driver.session() as session:
        result = session.run(query, query_text=query_text, token=token, top_k=top_k)
        print(f"{query_text}에 대한 유사도 검색 결과:")
        for record in result:
            print(f" - {record['name']}, level: {record['level']}, amount: {record['amount']}, score: {record['score']:.4f}")

 

 

 

4. retriever 정의

 

기본적으로 Text2CypherRetriever는 default template을 가지고 실행이 된다.

하지만 우리가 가지고 있는 graphDB의 정보를 정확하게 파악하고 구문을 생성하기를 바라서 custom template을 만들어서 넣어줬다.

def initialize_retriever():
    retriever = Text2CypherRetriever(
        driver=driver,
        llm=model,  # type: ignore
        neo4j_schema=config.NEO4J_SCHEMA,
        examples=config.NEO4J_EXAMPLES,
        custom_prompt = config.CYPHER_TEMPLATES2
    )
    return retriever

 

내용이 매우 길어져서 config 파일에 템플릿을 저장해두었는데, 각자 graphDB에 대한 정보를 넣어서 활용하면 된다.

커스텀 템플릿에는 Task, Domain structure, schema, examples, input을 각각 명시해줬다.

 

 

 

5. (llm의) system prompt 작성

def get_system_prompt(query):
    
    system_prompt = """You are an assistant that generates clear, professional, and fact-based answers in Korean.

    You must rely **only on the provided "Information"**, which comes from a Neo4j graph database. Do not make assumptions, hallucinate missing information, or use external knowledge. Do not mention the use of a database or describe the query process in your response.

    Important instructions:
    - Do **not** generate any information not present in the provided Information.
    - If no relevant information is found, respond with:
    "이 질문에 대해서는 개인 DB 기반 정보가 없어 정확한 답변을 드릴 수 없습니다."
    - Write all answers in **KOREAN**, with a confident, helpful tone. Avoid passive voice and technical disclaimers.

    ---

    Information:
    {retrieved_contents}
    """

    prompt = ChatPromptTemplate.from_messages(
        [
            ("system",
             system_prompt),
            ("human", 
             query)
        ]
    )
    return prompt

 

llm에게 전달될 prompt도 정의해준다.

 

 

 

6. text2cypher 실행

마지막으로 실행만 하면 된다!

def run_text2cypher(query):
    retriever = intialize_retriever()
    retrieved_data = retriever.search(query_text=query)
    system_prompt = get_system_prompt(query)
    chain = system_prompt | model | StrOutputParser()
    response = chain.invoke({'query': query, 'retrieved_contents': retrieved_contents})
    return response