Work/GraphDB

OpenAI Embedding과 Elasticsearch로 Hybrid Retrieval 기반 GraphDB Schema 생성하기

raeul0304 2025. 7. 9. 16:19

LLM 기반 RAG 시스템에서 가장 중요한 단계는 검색(Retrieval) 이다.
아무리 좋은 모델이라도, 엉뚱한 데이터를 찾아오면 잘못된 답변을 낸다.

그래서 핵심은 “좋은 검색 품질을 확보하는 것”이다.

 

이를 위해 문서를 벡터화(Embedding) 하여 의미적 유사도를 계산하고,
Dense / Sparse / Hybrid 검색 구조를 설계한다.
그리고 이렇게 검색된 문맥을 LLM에 전달해 GraphDB Schema를 자동으로 생성하는 것이 이번 글의 목표다.

 

 

이번 글에서는 다음 흐름을 다룬다 👇

  1. OpenAI Embedding 모델 종류와 특징
  2. Dense / Sparse / Hybrid 검색 방식 비교
  3. Elasticsearch, Weaviate, Vespa, Qdrant 비교
  4. Elasticsearch를 활용한 Hybrid Search 구축
  5. LLM 기반 GraphDB Schema 자동 생성

 


 

📌 Embedding - 텍스트를 벡터로 표현하다

OpenAI Embedding 모델

모델명  용도 특징
text-embedding-3-small 경량 임베딩 빠르고 저비용
text-embedding-3-large 고정밀 임베딩 가장 높은 의미 정확도
text-embedding-ada-002 구버전 여전히 안정적이며 경량 대안

 

벡터화된 문서는 이후 검색 단계에서 의미적 유사도(semantic similarity) 를 기반으로 비교된다.
즉, “질문과 가장 의미적으로 가까운 문서”를 찾는 데 활용된다.

 

 

 

 

📌 Retrieval - 검색 기법의 세 가지 축

RAG의 검색(Search)에는 세 가지 주요 방식이 존재한다.

 

📍 Dense Retrieval

  • 의미 유사도 기반 검색 방식이다.
  • 문장 임베딩 간의 코사인 유사도(cosine similarity) 또는 내적(dot product) 을 계산해 가장 유사한 문서를 찾는다.
  • 주요 구현체: FAISS (IVF, HNSW)

예: “서울 본사 고객” ↔ “서울에 있는 회사 고객 정보” — 단어가 달라도 의미가 같으면 검색된다.

 

 

📍Sparse Retrieval (Lexical Search)

  • 단어 기반(Bag-of-Words) 검색 방식이다.
  • 질의(query)와 문서를 단어 빈도 기반으로 매핑하고, BM25 또는 TF-IDF 점수로 유사도를 계산한다.
  • 주요 구현체: Elasticsearch Query DSL

예: “매출” → “매출액”, “매출 총이익” — 단어가 일치해야 높은 점수를 받는다.

 

 

 

📍Hybrid Retrieval

Dense와 Sparse를 결합한 방식이다.
즉, 의미 검색 + 키워드 검색을 동시에 수행한다.

 

VectorStore 내부 검색 방식 similarity_search() 동작
Chroma Dense (cosine) 쿼리 임베딩 → 벡터 유사도 계산
ElasticsearchStore Sparse(BM25) 또는 Dense keyword 또는 벡터 비교
WeaviateStore Hybrid dense + sparse 혼합 가중치
Qdrant Dense 내적 또는 cosine 기반 유사도 계산

 

 

 

 

 

📌 주요 Vector Store 비교

Feature  Elasticsearch  Weaviate  Vespa  Qdrant
Sparse Search (BM25) 최고 성능 내장 고급 지원 제한적
Dense Vector 지원 최근 추가 기본 고성능 매우 빠름
Hybrid Search API 수동 구현 Native 가능(복잡) 직접 구현 필요
Metadata Filtering 매우 강력 보통 고급 보통
Scale-out (확장성) 매우 좋음 가능 산업용 수준 중간
학습 곡선 중간 낮음 높음 낮음

 

📍 Elasticsearch

🔸 구조

전통적인 inverted index 기반의 full-text 검색 엔진이다.
BM25, TF-IDF를 기본 지원하며, 최신 버전부터 dense vector(knn_vector) 기능을 지원한다.

 

🔸 장점

  • BM25 기반 키워드 검색 성능이 매우 우수하다.
  • 메타데이터 필터링이 강력하다 (must, should, range 등).
  • 대규모 운영 경험이 풍부하다 (ELK stack).
  • Python, Java, REST API 등 다양한 언어를 지원한다.

 

🔸 단점

  • Dense 검색은 전용 벡터 DB보다 속도가 느리다.
  • Hybrid 검색 설정은 수동으로 구성해야 한다.

 

🔸 추천 사용처

  • 키워드 중심 질의가 많고 Boolean Filtering이 중요한 환경
  • 법률, 회계, 로그 분석 등 키워드 중심 RAG

 

 

📍 Weaviate

🔸 구조

벡터 중심 DB로, BM25가 내장되어 있다.
API 단에서 hybrid 쿼리를 기본 지원한다 (alpha 값으로 dense/sparse 비율 조절).

 

🔸 장점

  • hybrid search를 native로 지원한다.
  • schema 관리 및 필터 설정이 단순하다.
  • OpenAI, Cohere, HuggingFace 임베딩 연동을 지원한다.

🔸 단점

  • Elasticsearch만큼의 확장성은 부족하다.
  • 내부 엔진은 HNSW로 고정되어 있다.

🔸 추천 사용처

  • 의미 + 키워드 혼합형 RAG
  • 빠른 hybrid 프로토타입 구축 환경

 

 

📍 Vespa

🔸 구조

Yahoo가 개발한 대규모 ranking engine이다.
Sparse + Dense + LTR(Learning to Rank)을 통합 지원한다.

 

🔸 장점

  • 다단계 ranking 및 semantic reranking 가능
  • BM25 + BERT reranker 등 고도화된 파이프라인 구성 가능
  • 대규모 추천, 뉴스, 쇼핑 서비스에서 강력하다.

 

🔸 단점

  • 러닝 커브가 높고 설정이 복잡하다.
  • 중소형 프로젝트에는 과한 경우가 많다.

 

🔸 추천 사용처

  • 대규모 검색/랭킹 시스템
  • 고도화된 Scoring 또는 도메인 특화 RAG

 

 

📍 Qdrant

🔸 구조

Rust 기반의 고성능 벡터 DB이다.
Dense Vector + Metadata Filtering을 기본으로 지원한다.

 

🔸 장점

  • 빠른 검색 속도 (Rust + SIMD 최적화)
  • Python, JS, REST SDK 지원
  • Text Matching Plugin으로 Sparse 검색 보조 가능

 

🔸 단점

  • BM25 같은 전통 검색은 직접 구현해야 한다.
  • Hybrid 검색은 수동으로 구성해야 한다.

 

🔸 추천 사용처

  • Dense 중심의 고성능 semantic search
  • Sparse는 보조적인 역할일 때 적합

 

 

 

 

📌 ElasticSearch 실습 - Hybrid Search 구축

 

Elasticsearch를 사용하려면 먼저 Docker 환경에서 실행해야 한다.
(docker-compose up 또는 단일 docker run 명령으로 가능하다.)

 

curl -X PUT "http://<ELASTICSEARCH_HOST>:9200/_application/search/inforactive_vdb" \
-H "Content-Type: application/json" \
-d '{
  "indices": ["graphdb-schema"]
}'

 

 

 

이제 LangChain과 OpenAI Embedding을 결합하여
Elasticsearch 기반 Hybrid RAG 구조를 구현해보자.

 

아래는 전체 파이프라인 코드이다.
LangChain, OpenAI, Elasticsearch를 결합해 완전한 RAG 기반 GraphDB Schema 생성 시스템을 구성한다.

 

import sys
import os
sys.path.append(r'C:\Users\USER\vscodeProjects\graphDB_pjt')
import config
import logging
import json
import pandas as pd
from io import BytesIO
from typing import List, Dict, Any
from fastapi import UploadFile
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from langchain_elasticsearch import ElasticsearchRetriever
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, ElasticsearchStore
from langchain_core.documents import Document
from collections import defaultdict
from db_to_graphDB.extract_preprocessed_data import list_preprocessed_columns, preprocess_data

# Logging 설정
logging.basicConfig(level='INFO', format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
logger = logging.getLogger(__name__)

# TMP 디렉토리 설정
TMP_DIR = os.path.join(os.getcwd(), "tmp")
os.makedirs(TMP_DIR, exist_ok=True)

# VectorDB 저장 경로
PERSIST_DIRECTORY = "chroma_schema_db"
os.makedirs(PERSIST_DIRECTORY, exist_ok=True)

ELASTICSEARCH_URL = os.getenv("ELASTICSEARCH_URL", "http://localhost:9200")
ELASTICSEARCH_INDEX_NAME = os.getenv("ELASTICSEARCH_INDEX_NAME", "schema_vector_db")
OPEN_API_KEY_TEST = os.getenv("OPEN_API_KEY_TEST")

# 파일 저장 함수
def store_file(file_name, df, version_name):
    xlsx_path = os.path.join(TMP_DIR, f"{file_name}_{version_name}.xlsx")
    df.to_excel(xlsx_path, index=False)
    logger.info(f"{file_name}_{version_name} 버전 저장 완료 (.xlsx) : {xlsx_path}")
    
    
# Embedding 모델 초기화
def get_embedding_model():
    try:
        embeddings = OpenAIEmbeddings(
            openai_api_key=config.OPEN_API_KEY_TEST,
            model="text-embedding-3-small"
        )
        logger.info("OpenAI Embedding 모델이 성공적으로 초기화되었다.")
        return embeddings
    except Exception as e:
        logger.error(f"Embedding 모델 초기화 중 오류 발생 : {e}")
        raise e    

TEXT_FIELD = "text"
CONTENT_FIELD = "text"
VECTOR_FIELD = "vector"

# Schema VectorDB 생성
def create_schema_vectordb(schema_json_path: str) -> ElasticsearchStore:
    with open(schema_json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    documents: List[Document] = []

    for table in data["schemas"]:
        table_name = table["table_name"]

        # Summary 저장
        documents.append(Document(
            page_content=f"[Table: {table_name}]\n{table['schema_explanation']}",
            metadata={"table_name": table_name, "chunk_type": "table_summary"}
        ))

        # Node 저장
        for node in table.get("nodes", []):
            documents.append(Document(
                page_content=(
                    f"[Node: {node['label']}]\n"
                    f"Description: {node['description']}\n"
                    f"Properties: {', '.join(node.get('properties', []))}"
                ),
                metadata={"table_name": table_name, "chunk_type": "node", "label": node["label"]}
            ))

        # Relationship 저장
        for rel in table.get("relationships", []):
            documents.append(Document(
                page_content=(
                    f"[Relationship: {rel['from']} - [{rel['type']}] -> {rel['to']} ] \n"
                    f"Description: {rel['description']}\n"
                    f"Properties: {', '.join(rel.get('properties', []))}"
                ),
                metadata={
                    "table_name": table_name,
                    "chunk_type": "relationship",
                    "from": rel["from"],
                    "to": rel["to"],
                    "relationship_type": rel["type"]
                }
            ))

    embeddings = get_embedding_model()

    try:
        vectordb = ElasticsearchStore.from_documents(
            documents,
            embedding=embeddings,
            es_url=ELASTICSEARCH_URL,
            index_name=ELASTICSEARCH_INDEX_NAME
        )
        print(f"총 {len(documents)}개의 문서가 ElasticSearchStore에 저장되었다.")
        return vectordb
    except Exception as e:
        print(f"문서 저장 중 오류 발생: {e}")
        raise

 

 

 

 

🔸 벡터 검색 (Dense)

def vector_query(search_query: str) -> Dict:
    query_vector = embeddings.embed_query(search_query)
    return {
        "knn": {
            "field": VECTOR_FIELD,
            "query_vector": query_vector,
            "k": 2,
            "num_candidates": 10,
        }
    }

embeddings = get_embedding_model()

vector_retriever = ElasticsearchRetriever.from_es_params(
    index_name=ELASTICSEARCH_INDEX_NAME,
    body_func=lambda q: vector_query(q),
    content_field=CONTENT_FIELD,
    url=ELASTICSEARCH_URL
)

 

 

 

 

🔸 키워드 검색 (Sparse / BM25)

def bm25_query(search_query: str, k: int = 2) -> Dict:
    return {
        "query": {"match": {TEXT_FIELD: search_query}},
        "size": k
    }

bm25_retriever = ElasticsearchRetriever.from_es_params(
    index_name=ELASTICSEARCH_INDEX_NAME,
    body_func=lambda q: bm25_query(q, k=2),
    content_field=TEXT_FIELD,
    url=ELASTICSEARCH_URL
)

 

 

 

🔸 Hybrid Search (Dense + Sparse 결합)

def hybrid_search(search_query: str) -> Dict:
    query_vector = embeddings.embed_query(search_query)
    return {
        "retriever": {
            "rrf": {
                "retrievers": [
                    {"standard": {"query": {"match": {TEXT_FIELD: search_query}}}},
                    {"knn": {"field": VECTOR_FIELD, "query_vector": query_vector, "k": 2, "num_candidates": 5}}
                ]
            }
        }
    }

hybrid_retriever = ElasticsearchRetriever.from_es_params(
    index_name=ELASTICSEARCH_INDEX_NAME,
    body_func=lambda q: hybrid_search(q),
    content_field=TEXT_FIELD,
    url=ELASTICSEARCH_URL
)

 

 

 

 

 

🔸 LLM을 통한 GraphDB Schema 생성

 

LLM은 OpenAI의 GPT-4o 모델을 사용하며,
검색된 컨텍스트를 기반으로 SAP 테이블 구조를 분석해
GraphDB용 노드/관계 스키마(JSON) 를 자동으로 생성한다.

 

def get_model():
    try:
        model = ChatOpenAI(
            openai_api_key=OPEN_API_KEY_TEST,
            model="gpt-4o",
            temperature=0,
            max_tokens=4048
        )
        logger.info("OpenAI 모델이 성공적으로 초기화되었다.")
        return model
    except Exception as e:
        logger.error(f"모델 초기화 중 오류 발생: {e}")
        raise e

 

 

데이터를 분석하고 스키마를 생성하는 핵심 프롬프트는 다음과 같다.

def create_graphdb_schema(model, data_info, retrieved_context: str):
    required_keys = ['table_name', 'table_description', 'fields', 'sample_rows']
    for key in required_keys:
        if key not in data_info:
            raise ValueError(f"Missing required key: {key}")
    
    sample_str = json.dumps(data_info['sample_rows'], indent=2)
    schema_prompt = f"""
You are a data modeling expert...
(중략)
"""
    response = model.invoke(schema_prompt)
    return json.loads(response.content)

 

 

이 함수는 LLM이 데이터를 분석한 결과를 Graph Schema(JSON) 형태로 반환한다.

최종적으로 다음 함수로 통합 실행한다.

 

def run_graphDB_schema_generation_llm(file_name, file):
    try:
        model = get_model()
        data_info = get_data_info(file_name, file)
        graphdb_schema = create_graphdb_schema(model, data_info)
        return graphdb_schema
    except Exception as e:
        logger.error(f"GraphDB schema generation failed: {str(e)}")
        return {"error": str(e)}

 

 

 

 

Embedding, Retrieval, Hybrid Search, 그리고 LLM Schema 생성까지 모든 RAG 구성요소가 완전하게 연결된 파이프라인이다.

이 구조를 통해 단순한 텍스트 검색을 넘어, 데이터의 의미를 이해하고 구조화하는 지능형 시스템을 구축할 수 있다.