Tool UseとRAG
RAG(Retrieval Augmented Generation)は、外部知識を検索してから回答を生成する方式です。
Tool Useは、計算・検索・Web操作などをツール呼び出しとして明示的に実行する方式です。
このノートでは、RAGとTool Useを同じ推論ループで扱う最小実装を作ります。
モデルの頭の中だけでは、更新される世界に追いつけない
日付、数値、社内文書、HTML の状態のように、外に取りに行かないと答えられない問題があります。RAG と Tool Use は、その不足を埋めるための 2 つの接続方法です。
このノートでは、まず検索して根拠を付ける RAG を作り、そのあとで計算機や HTML 操作のようなツール呼び出しを足します。最後に両者を一つの推論ループへまとめて、どこで検索し、どこで行動するのかを切り分けます。
中心にあるのは、生成の前に何を取りに行くかの判断です
RAG は文書を取ってくる方法で、Tool Use は操作や計算を外部へ委譲する方法です。どちらも「LLM 単体で答えをひねり出さない」ための設計ですが、必要な外部接続は違います。
最初の関門は、検索をそのまま信じないこと
文書を拾えたことと、最終回答に使えることは別です。チャンク化、retrieve、rerank を分けているのは、その差を見えるようにするためです。
引用付き回答は、生成品質と根拠対応を分けて見る
自然な文章が出ても、根拠との結び付きが曖昧なら意味がありません。ここでは、回答のうまさより「どのチャンクに支えられているか」を優先して読みます。
Tool Use は、検索できない問題に外部操作を差し込む
計算、HTML からの抽出、手続き的な分岐は、文章検索だけでは処理しづらい場面です。ここから先は、モデルが自分で答えるのではなく、必要な道具を選ぶ流れになります。
最後に両者を同じオーケストレータへ載せる
「まず検索するか」「まず計算するか」を判断する層があると、RAG と Tool Use は別機能ではなくなります。以後の節では、その分岐の作り方を見ます。
まずは文書を刻んで、検索可能な形にする
RAG の出発点は、長い文書を検索単位へ分解することです。ここではチャンク化と retrieve を通して、候補集合の作られ方を見ます。
import ast
import operator as op
import re
from html.parser import HTMLParser
import numpy as np
まずRAGの最小パイプラインを作ります。
実行前提: Python 3.10+ と numpy が必要です。未導入なら pip install numpy を実行してください。
- 文書をチャンク化
- 検索で上位チャンクを取得
- 必要なら再ランキング
- 取得文脈を使って回答生成(根拠付き)
このノートでは固定長チャンクと文単位チャンクを同じインデックスに入れて比較します。
後段で重複除外を行い、どちらの分割が効いたかを観察できるようにしています。
用語メモ
hit@k: 上位k件のどこかに正解文書が入る割合MRR: 正解順位の逆数平均(1位=1.0, 2位=0.5)routing accuracy: 質問に対して適切なツールを選べた割合lexical overlap: 回答語と根拠文脈語の重なり率(厳密な事実性指標ではない)
knowledge_docs = [
{
'id': 'doc-rl-1',
'title': '強化学習の基礎',
'text': 'ベルマン最適方程式は最適価値関数を再帰的に定義する。価値反復法はこの更新を繰り返す。',
},
{
'id': 'doc-llm-1',
'title': 'LLMのファインチューニング',
'text': 'SFTは指示と回答ペアを用いて応答スタイルを調整する。LoRAは低ランク行列のみを更新する。',
},
{
'id': 'doc-rag-1',
'title': 'RAGの実装ポイント',
'text': '固定長チャンクと意味チャンクで検索精度が変わる。再ランキングで関連度上位を並び替えると精度が改善する。',
},
{
'id': 'doc-safe-1',
'title': 'ガードレール',
'text': 'Input Railsは危険入力を検知して遮断する。Output Railsは生成結果を検査して安全性を保つ。',
},
]
for d in knowledge_docs:
print(d['id'], d['title'])
候補をそのまま使わず、並べ直す
最初に拾った候補は雑です。rerank を挟むことで、最終回答へ本当に使いたいチャンクを前に持ってきます。
def fixed_length_chunk(text, chunk_size=26, overlap=6):
chunks = []
i = 0
while i < len(text):
chunks.append(text[i:i+chunk_size])
if i + chunk_size >= len(text):
break
i += chunk_size - overlap
return chunks
def sentence_chunk(text):
parts = re.split(r'[。!?!?]', text)
return [p.strip() for p in parts if p.strip()]
chunk_db = []
for doc in knowledge_docs:
# 教材用に2方式を併走して比較し、後段で重複除外する
f_chunks = fixed_length_chunk(doc['text'])
s_chunks = sentence_chunk(doc['text'])
for idx, c in enumerate(f_chunks):
chunk_db.append({
'chunk_id': f"{doc['id']}-f{idx}",
'doc_id': doc['id'],
'title': doc['title'],
'text': c,
'mode': 'fixed',
})
for idx, c in enumerate(s_chunks):
chunk_db.append({
'chunk_id': f"{doc['id']}-s{idx}",
'doc_id': doc['id'],
'title': doc['title'],
'text': c,
'mode': 'sentence',
})
print('chunk count:', len(chunk_db))
print('fixed sample :', [c['text'] for c in chunk_db if c['mode'] == 'fixed'][:2])
print('sentence sample:', [c['text'] for c in chunk_db if c['mode'] == 'sentence'][:2])
取得した根拠から、引用付き回答を作る
ここでは自由生成ではなく、まず根拠との対応が崩れないことを優先します。回答文と引用元のつながりを追ってください。
def tokenize_ja_like(s):
s = re.sub(r'\s+', '', s)
# 教育用: 文字2-gramでトークン化
if len(s) < 2:
return [s] if s else []
return [s[i:i+2] for i in range(len(s)-1)]
def build_tfidf_index(chunks):
tokenized = [tokenize_ja_like(c['text']) for c in chunks]
vocab = sorted(set(t for toks in tokenized for t in toks))
stoi = {t: i for i, t in enumerate(vocab)}
# TF: チャンク内の語頻度
tf = np.zeros((len(chunks), len(vocab)), dtype=np.float64)
for i, toks in enumerate(tokenized):
for t in toks:
tf[i, stoi[t]] += 1.0
# DF: その語を含むチャンク数, IDF: 珍しい語を重くする係数
df = np.count_nonzero(tf > 0, axis=0)
idf = np.log((1 + len(chunks)) / (1 + df)) + 1.0
tfidf = tf * idf[None, :]
norm = np.linalg.norm(tfidf, axis=1, keepdims=True) + 1e-12
tfidf = tfidf / norm
return {
'vocab': vocab,
'stoi': stoi,
'idf': idf,
'matrix': tfidf,
'chunks': chunks,
}
def query_vector(query, index):
v = np.zeros(len(index['vocab']), dtype=np.float64)
for t in tokenize_ja_like(query):
j = index['stoi'].get(t)
if j is not None:
v[j] += 1.0
v = v * index['idf']
v /= np.linalg.norm(v) + 1e-12
return v
def retrieve(query, index, top_k=5):
q = query_vector(query, index)
scores = index['matrix'] @ q
order = np.argsort(scores)[::-1][:top_k]
out = []
for i in order:
ch = index['chunks'][i]
out.append({
'chunk_id': ch['chunk_id'],
'doc_id': ch['doc_id'],
'title': ch['title'],
'text': ch['text'],
'score': float(scores[i]),
'mode': ch['mode'],
})
return out
index = build_tfidf_index(chunk_db)
res = retrieve('ベルマン最適方程式を説明して', index, top_k=6)
for r in res:
print(r['chunk_id'], round(r['score'], 4), r['text'])
次に、計算ツールを推論ループへ入れる
数値問題は、検索より計算機へ投げた方が速くて確実です。ここからは「どのツールを呼ぶか」の分岐が主役になります。
def rerank(query, retrieved):
q_terms = set(tokenize_ja_like(query))
reranked = []
for r in retrieved:
c_terms = set(tokenize_ja_like(r['text']))
title_terms = set(tokenize_ja_like(r['title']))
overlap = len(q_terms & c_terms) / max(len(q_terms), 1)
title_overlap = len(q_terms & title_terms) / max(len(q_terms), 1)
# これは確率ではなく線形スコア。重みは検証データで調整する。
score = 0.65 * r['score'] + 0.25 * overlap + 0.10 * title_overlap
rr = dict(r)
rr['rerank_score'] = float(score)
reranked.append(rr)
reranked.sort(key=lambda x: x['rerank_score'], reverse=True)
return reranked
query = 'ベルマン最適方程式を1文で説明して'
retrieved = retrieve(query, index, top_k=6)
reranked = rerank(query, retrieved)
print('top reranked chunks:')
for r in reranked[:3]:
print(r['chunk_id'], round(r['rerank_score'], 4), '|', r['text'])
HTML 断片から必要な情報だけ抜く
Web 操作の最小例として、HTML からクリック候補やテキストを抽出します。本物のブラウザ操作ではありませんが、外部状態を読む入口としては十分です。
def generate_with_citations(query, ranked_chunks, max_chunks=3):
ctx = ranked_chunks[:max_chunks]
# 取得文脈から重なり最大の文を抽出(extractive generation)
q_terms = set(tokenize_ja_like(query))
best = None
for c in ctx:
score = len(q_terms & set(tokenize_ja_like(c['text'])))
if best is None or score > best['score']:
best = {'score': score, 'text': c['text']}
if best is None or best['score'] == 0:
return {
'answer_text': '根拠文脈で十分な裏付けが見つからなかったため、追加情報が必要です。',
'refs': [],
'used_chunks': ctx,
}
answer_text = best['text']
refs = [f"[{c['doc_id']}:{c['chunk_id']}]" for c in ctx if c['text'] == best['text']]
if not refs and ctx:
refs = [f"[{ctx[0]['doc_id']}:{ctx[0]['chunk_id']}]"]
return {
'answer_text': answer_text,
'refs': refs,
'used_chunks': ctx,
}
def lexical_overlap_ratio(answer_text, chunks):
# 回答語と根拠文脈語の重なり率(粗い指標)
a = set(tokenize_ja_like(answer_text))
c = set()
for ch in chunks:
c |= set(tokenize_ja_like(ch['text']))
return len(a & c) / max(len(a), 1)
query = 'ベルマン最適方程式を1文で説明して'
# これは比較用に手で置いた no-RAG の失敗例(モデル実行結果ではない)
baseline_no_rag_manual = 'ベルマン最適方程式は量子状態を直接最適化する式です。'
rag_out = generate_with_citations(query, reranked)
rag_answer = rag_out['answer_text'] + ' ' + ' '.join(rag_out['refs'])
print('manual baseline (no-RAG example):', baseline_no_rag_manual)
print('RAG answer :', rag_answer)
print('lexical overlap baseline =', round(lexical_overlap_ratio(baseline_no_rag_manual, rag_out['used_chunks']), 4))
print('lexical overlap RAG =', round(lexical_overlap_ratio(rag_out['answer_text'], rag_out['used_chunks']), 4))
ここでの generate_with_citations は自由生成モデルではなく、取得した文脈から最も重なる文を抜き出して引用を付ける extractive toy 実装です。rerank も学習済み reranker ではなく手作りスコアなので、「検索して根拠を付ける流れ」を理解するための最小例として読んでください。
ここからTool Useです。
LLMにすべてを内部推論させるより、外部ツール(検索・計算・Web操作)を明示的に呼び出す設計は、
失敗箇所の切り分けと監査ログの取得に向いています。
ただしルーティング誤りやツール側失敗があるので、評価と監視が必須です。
def tool_retrieve(query, top_k=3):
# 一旦深めに取得してから上位k件へ
fetch_k = max(top_k * 3, top_k)
r = rerank(query, retrieve(query, index, top_k=fetch_k))
# 同一テキストの重複を除外
dedup = []
seen = set()
for x in r:
key = (x['doc_id'], x['text'])
if key in seen:
continue
seen.add(key)
dedup.append(x)
if len(dedup) >= top_k:
break
return {
'type': 'retrieval_result',
'items': [{
'doc_id': x['doc_id'],
'chunk_id': x['chunk_id'],
'text': x['text'],
'score': x['rerank_score'],
} for x in dedup]
}
_ALLOWED_BIN_OPS = {
ast.Add: op.add,
ast.Sub: op.sub,
ast.Mult: op.mul,
ast.Div: op.truediv,
ast.Pow: op.pow,
}
_ALLOWED_UNARY_OPS = {ast.UAdd: op.pos, ast.USub: op.neg}
def _safe_eval(node):
if isinstance(node, ast.Expression):
return _safe_eval(node.body)
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return float(node.value)
if isinstance(node, ast.UnaryOp) and type(node.op) in _ALLOWED_UNARY_OPS:
return _ALLOWED_UNARY_OPS[type(node.op)](_safe_eval(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _ALLOWED_BIN_OPS:
left = _safe_eval(node.left)
right = _safe_eval(node.right)
# 過剰計算を防ぐ簡易ガード
if isinstance(node.op, ast.Pow) and abs(right) > 10:
raise ValueError('exponent too large')
out = _ALLOWED_BIN_OPS[type(node.op)](left, right)
if abs(out) > 1e12:
raise ValueError('result too large')
return out
raise ValueError('unsupported expression')
def tool_calculator(expression):
expr = expression.strip()
if len(expr) == 0 or len(expr) > 64:
return {'type': 'calc_result', 'error': 'invalid expression length'}
if not re.fullmatch(r'[0-9+\-*/(). ]+', expr):
return {'type': 'calc_result', 'error': 'invalid expression'}
try:
tree = ast.parse(expr, mode='eval')
value = _safe_eval(tree)
except Exception as e:
return {'type': 'calc_result', 'error': str(e)}
return {'type': 'calc_result', 'value': value}
def extract_expression_from_query(user_query):
# クエリ文字列から最も長い算術式っぽい部分を抽出
q = user_query.replace('^', '**')
segments = re.findall(r'[0-9.() +\-*/]+', q)
candidates = []
for seg in segments:
expr = seg.strip()
if len(expr) < 3:
continue
if re.search(r'\d', expr) and re.search(r'[+\-*/]', expr):
candidates.append(expr)
if not candidates:
return None
candidates.sort(key=len, reverse=True)
return candidates[0]
def decide_tool(user_query):
q = user_query.lower()
# 日付(例: 2024-01-01)を計算式と誤判定しない
date_like = re.search(r'(?<!\d)\d{4}[-/]\d{1,2}[-/]\d{1,2}(?!\d)', q)
calc_intent_terms = ['計算', 'evaluate', '=', 'solve']
has_calc_intent = any(t in q for t in calc_intent_terms)
expr = extract_expression_from_query(user_query)
if has_calc_intent and expr and not date_like:
return {'tool': 'calculator', 'args': {'expression': expr}}
web_action_terms = ['クリック', '押して', 'tap', 'click', '選択', 'open', '開いて']
web_target_terms = ['button', 'ボタン', 'link', 'signin', 'sign in', 'login', 'ログイン', 'html', 'account', 'ページ']
danger_terms = ['delete', 'remove', 'purchase', 'buy', '送金', '削除', '購入']
if any(t in q for t in web_action_terms) and (any(t in q for t in web_target_terms) or any(t in q for t in danger_terms)):
return {'tool': 'web_agent', 'args': {'instruction': user_query}}
return {'tool': 'retrieve', 'args': {'query': user_query}}
for q in [
'2+3*4を計算して',
'2*(3+4)を計算して',
'3.5+1.2を計算して',
'ベルマン方程式を説明して',
'Sign In を押して',
'2024-01-01の予定を教えて',
]:
print(q, '->', decide_tool(q))
Web Agentの最小例として、HTMLからクリック候補を抽出し、
ユーザー指示との一致度が高い要素を選びます。
Step Success Rate: 各ステップで正しいアクションを選べた割合Success Rate: 1タスクを最後まで全ステップ正しく完了できた割合
class SimpleDOMParser(HTMLParser):
def __init__(self):
super().__init__()
self.stack = []
self.nodes = []
def handle_starttag(self, tag, attrs):
self.stack.append({
'tag': tag,
'attrs': dict(attrs),
'text_parts': [],
})
def handle_data(self, data):
txt = data.strip()
if not txt:
return
# 祖先すべてに子孫テキストを集約(button > span のような構造に対応)
for node in self.stack:
node['text_parts'].append(txt)
def handle_endtag(self, tag):
if not self.stack:
return
node = self.stack.pop()
if node['tag'] != tag:
return
if node['tag'] in {'button', 'a'}:
attr = node['attrs']
text = ' '.join(node['text_parts']).strip()
self.nodes.append({
'tag': node['tag'],
'id': attr.get('id', ''),
'class': attr.get('class', ''),
'href': attr.get('href', ''),
'aria_label': attr.get('aria-label', ''),
'title': attr.get('title', ''),
'text': text,
})
def tool_web_agent(html, instruction):
parser = SimpleDOMParser()
parser.feed(html)
inst = instruction.lower()
dangerous_terms = ['delete', 'remove', 'purchase', 'buy', '送金', '削除', '購入']
if any(t in inst for t in dangerous_terms):
return {
'type': 'web_action',
'action': 'blocked',
'reason': 'dangerous intent',
'target': None,
'score': 0.0,
'candidates': 0,
}
cand = []
for n in parser.nodes:
score = 0.0
# 行動語が含まれるか(クリック意図)
if any(t in inst for t in ['click', 'クリック', '押して', 'tap', 'open', '開いて']):
score += 0.2
searchable = ' '.join([n['text'], n['id'], n['class'], n['aria_label'], n['title']]).lower()
for key in ['login', 'sign in', 'signin', '検索', '送信', 'next', 'ログイン', 'docs']:
if key in inst and key in searchable:
score += 0.5
if n['id'] and n['id'].lower() in inst:
score += 0.4
cand.append((score, n))
cand.sort(key=lambda x: x[0], reverse=True)
if not cand or cand[0][0] < 0.6:
return {
'type': 'web_action',
'action': 'none',
'target': None,
'score': 0.0,
'candidates': len(cand),
}
best_score, best = cand[0]
return {
'type': 'web_action',
'action': 'click',
'requires_confirmation': True,
'target': best,
'score': best_score,
'candidates': len(cand),
}
html = '''
<div><button id="login-btn"><span>Sign In</span></button></div>
<div><a id="docs-link" href="/docs">Docs</a></div>
<div><button id="next-btn">Next</button></div>
'''
print(tool_web_agent(html, 'Sign In ボタンをクリックして'))
print(tool_web_agent(html, 'delete account ボタンをクリックして'))
ツール選択のルールを明示する
検索、計算、HTML 操作のどれを使うかを、入力の種類に応じて振り分けます。オーケストレータが何を根拠に分岐するかを見る節です。
def tool_orchestrator(user_query, html_context=None):
plan = decide_tool(user_query)
if plan['tool'] == 'calculator':
tool_out = tool_calculator(**plan['args'])
final = f"計算結果: {tool_out.get('value', tool_out.get('error'))}"
return {'plan': plan, 'tool_output': tool_out, 'final_answer': final}
if plan['tool'] == 'web_agent':
html = html_context or '<div><button id="default">OK</button></div>'
tool_out = tool_web_agent(html, plan['args']['instruction'])
if tool_out['action'] == 'blocked':
final = '危険操作の可能性があるため実行をブロックしました。'
elif tool_out['action'] == 'click':
tgt = tool_out['target']
final = f"次の操作候補: click(tag={tgt['tag']}, id={tgt['id']}, text={tgt['text']}) ※ユーザー確認後に実行"
else:
final = '実行可能な操作を特定できませんでした。'
return {'plan': plan, 'tool_output': tool_out, 'final_answer': final}
tool_out = tool_retrieve(**plan['args'])
ranked_for_gen = [
{'doc_id': i['doc_id'], 'chunk_id': i['chunk_id'], 'text': i['text'], 'rerank_score': i['score']}
for i in tool_out['items']
]
rag_out = generate_with_citations(user_query, ranked_for_gen, max_chunks=len(ranked_for_gen))
final = rag_out['answer_text'] + (' ' + ' '.join(rag_out['refs']) if rag_out['refs'] else '')
return {
'plan': plan,
'tool_output': tool_out,
'rag_output': rag_out,
'final_answer': final,
}
demo_queries = [
'2+3*4を計算して',
'2*(3+4)を計算して',
'ベルマン最適方程式を1文で説明して',
'Sign In ボタンをクリックして',
'delete account をクリックして',
]
for q in demo_queries:
out = tool_orchestrator(q, html_context=html)
print('Q:', q)
print('plan:', out['plan'])
print('final:', out['final_answer'])
print('---')
成功率を見るときは、検索と回答を混ぜない
どの段で失敗したかを分けて計測しないと改善点が見えません。ここでは retrieval と end-to-end の指標を意識的に分けます。
# 評価: retrieval / citation / routing
# hit@k = 上位k件のどこかに正解docが含まれる割合
rag_tests = [
('ベルマン方程式を説明して', 'doc-rl-1'),
('LoRAの利点は?', 'doc-llm-1'),
('RAGの改善方法は?', 'doc-rag-1'),
]
hit1 = 0
hit3 = 0
mrr_sum = 0.0
for q, expect_doc in rag_tests:
items = tool_retrieve(q, top_k=3)['items']
docs = [it['doc_id'] for it in items]
hit1 += int(len(docs) > 0 and docs[0] == expect_doc)
hit3 += int(expect_doc in docs)
rank = None
for i, d in enumerate(docs, 1):
if d == expect_doc:
rank = i
break
mrr_sum += 0.0 if rank is None else 1.0 / rank
print('retrieval hit@1 =', round(hit1 / len(rag_tests), 3))
print('retrieval hit@3 =', round(hit3 / len(rag_tests), 3))
print('retrieval MRR =', round(mrr_sum / len(rag_tests), 3))
chunk_lookup = {(c['doc_id'], c['chunk_id']): c['text'] for c in chunk_db}
def citation_lexical_overlap_toy(answer_text, refs):
# 注意: 厳密な事実性ではなく、回答語と参照チャンク語の重なりを見る簡易指標
if not refs:
return 0.0
a_terms = set(tokenize_ja_like(answer_text))
support = 0
for r in refs:
m = re.match(r'^\[(.+?):(.+?)\]$', r)
if not m:
continue
key = (m.group(1), m.group(2))
text = chunk_lookup.get(key, '')
c_terms = set(tokenize_ja_like(text))
if len(a_terms & c_terms) >= 2:
support += 1
return support / max(len(refs), 1)
overlap_scores = []
for q, _ in rag_tests:
out = tool_orchestrator(q)
rag_out = out.get('rag_output', {'answer_text': '', 'refs': []})
overlap_scores.append(citation_lexical_overlap_toy(rag_out['answer_text'], rag_out['refs']))
print('citation lexical overlap (toy) =', round(sum(overlap_scores) / len(overlap_scores), 3))
route_tests = [
('1+2を計算して', 'calculator'),
('2*(3+4)を計算して', 'calculator'),
('3.5+1.2を計算して', 'calculator'),
('Sign In を押して', 'web_agent'),
('ガードレールを説明して', 'retrieve'),
('2024-01-01の予定を教えて', 'retrieve'),
]
route_hit = 0
for q, t in route_tests:
route_hit += int(decide_tool(q)['tool'] == t)
print('tool routing accuracy =', round(route_hit / len(route_tests), 3))
最後に、RAG と Tool Use を一つの流れとして読む
両者は別機能ではなく、LLM の外にある情報や能力を接続する手段です。最後の節では、その共通骨格をまとめて確認します。
# コスト概算(仮定値)
requests_per_day = 900
avg_query_tok = 420
avg_context_tok = 850 # RAGで追加される文脈
avg_output_tok = 180
price_in = 0.20 # USD / 1M input tokens
price_out = 0.80 # USD / 1M output tokens
cost_per_req = ((avg_query_tok + avg_context_tok) / 1e6) * price_in + (avg_output_tok / 1e6) * price_out
daily_cost = cost_per_req * requests_per_day
print('cost per request (USD):', round(cost_per_req, 6))
print('daily cost (USD):', round(daily_cost, 4))
# 単純なレイテンシ見積り
retrieve_ms = 45
rerank_ms = 30
gen_ms = 520
tool_overhead_ms = 25
print('estimated latency (ms):', retrieve_ms + rerank_ms + gen_ms + tool_overhead_ms)
RAGとTool Useを組み合わせると、
- 根拠付き回答(RAG)
- 外部操作の明示実行(Tool Use)
- 監査しやすい推論ログ(plan/tool_output)
を同じパイプラインで扱えます。
ただし、ルーティング誤り・ツール失敗・根拠不足は常に起きるので、評価指標を継続監視する設計が前提です。