基礎から実用まで、実装しながら理解するDAGs完全ガイド
💡 覚えやすいイメージ: 川の流れのように、一方向に流れて決して元の場所に戻らない構造
まずは、シンプルで理解しやすいDAGクラスを作成します。隣接リスト形式で実装し、基本的な操作を追加していきます。
class DAG:
def __init__(self):
"""
DAGを初期化
- graph: 隣接リスト(辞書形式)
- in_degree: 各ノードの入次数を記録
"""
self.graph = {}
self.in_degree = {}
def add_node(self, node):
"""ノードを追加"""
if node not in self.graph:
self.graph[node] = []
self.in_degree[node] = 0
print(f"ノード '{node}' を追加しました")
def add_edge(self, from_node, to_node):
"""エッジを追加(from_node → to_node)"""
# ノードが存在しない場合は自動で追加
self.add_node(from_node)
self.add_node(to_node)
# エッジを追加
self.graph[from_node].append(to_node)
self.in_degree[to_node] += 1
print(f"エッジ '{from_node}' → '{to_node}' を追加しました")
def display(self):
"""DAGの内容を表示"""
print("\n=== DAG の構造 ===")
for node in self.graph:
neighbors = self.graph[node]
print(f"{node} → {neighbors}")
print("\n=== 入次数 ===")
for node in self.in_degree:
print(f"{node}: {self.in_degree[node]}")
# 使用例
dag = DAG()
dag.add_edge('A', 'B')
dag.add_edge('A', 'C')
dag.add_edge('B', 'D')
dag.add_edge('C', 'D')
dag.display()
ノード 'A' を追加しました ノード 'B' を追加しました エッジ 'A' → 'B' を追加しました ノード 'C' を追加しました エッジ 'A' → 'C' を追加しました エッジ 'B' → 'D' を追加しました ノード 'D' を追加しました エッジ 'C' → 'D' を追加しました === DAG の構造 === A → ['B', 'C'] B → ['D'] C → ['D'] D → [] === 入次数 === A: 0 B: 1 C: 1 D: 2
キューベース
再帰ベース
まずは理解しやすいKahn's Algorithmから実装してみましょう。
from collections import deque
def topological_sort_kahn(dag):
"""
Kahn's Algorithmを使ったトポロジカルソート
手順:
1. 入次数が0のノードをキューに追加
2. キューからノードを取り出して結果に追加
3. そのノードの隣接ノードの入次数を1減らす
4. 入次数が0になったノードをキューに追加
5. キューが空になるまで繰り返す
"""
# 入次数をコピー(元のデータを変更しないため)
in_degree = dag.in_degree.copy()
queue = deque()
result = []
print("=== Kahn's Algorithm 実行 ===")
# ステップ1: 入次数0のノードをキューに追加
for node in in_degree:
if in_degree[node] == 0:
queue.append(node)
print(f"初期キューに追加: {node}")
step = 1
# メインループ
while queue:
print(f"\nステップ {step}:")
current = queue.popleft()
result.append(current)
print(f" 処理中: {current}")
# 隣接ノードの入次数を減らす
for neighbor in dag.graph[current]:
in_degree[neighbor] -= 1
print(f" {neighbor} の入次数: {in_degree[neighbor]}")
# 入次数が0になったらキューに追加
if in_degree[neighbor] == 0:
queue.append(neighbor)
print(f" キューに追加: {neighbor}")
print(f" 現在のキュー: {list(queue)}")
step += 1
# 循環チェック
if len(result) != len(dag.graph):
print("❌ 循環が検出されました!これはDAGではありません。")
return None
print(f"\n✅ トポロジカル順序: {' → '.join(result)}")
return result
# 使用例
dag = DAG()
dag.add_edge('数学基礎', '線形代数')
dag.add_edge('数学基礎', 'プログラミング')
dag.add_edge('線形代数', '機械学習')
dag.add_edge('プログラミング', 'データ構造')
dag.add_edge('データ構造', '機械学習')
result = topological_sort_kahn(dag)
現在のグラフ状態: DAG(有効)
深さ優先探索を使って、後戻り時にノードを記録していく手法です。最後に逆順にすることでトポロジカル順序を得られます。
def topological_sort_dfs(dag):
"""
DFS-basedトポロジカルソート
アイデア:
- すべての隣接ノードを訪問した後にスタックに追加
- 最終的にスタックの内容を逆順にすると、トポロジカル順序になる
"""
visited = set()
temp_visited = set() # 循環検出用
stack = []
def dfs_visit(node):
print(f" 訪問開始: {node}")
# 循環検出
if node in temp_visited:
print(f"❌ 循環検出: {node}")
return False
if node in visited:
return True
temp_visited.add(node)
# すべての隣接ノードを訪問
for neighbor in dag.graph[node]:
if not dfs_visit(neighbor):
return False
temp_visited.remove(node)
visited.add(node)
stack.append(node)
print(f" 完了: {node} → スタックに追加")
return True
print("=== DFS-based トポロジカルソート ===")
# すべてのノードを処理
for node in dag.graph:
if node not in visited:
print(f"\n{node} から DFS 開始")
if not dfs_visit(node):
return None # 循環が見つかった
# スタックを逆順にしてトポロジカル順序を得る
result = stack[::-1]
print(f"\n✅ トポロジカル順序: {' → '.join(result)}")
return result
# 使用例
dag = DAG()
nodes = ['シャツ', 'ネクタイ', 'ジャケット', '靴下', '靴', 'ベルト']
edges = [
('シャツ', 'ネクタイ'),
('シャツ', 'ベルト'),
('ネクタイ', 'ジャケット'),
('ベルト', 'ジャケット'),
('靴下', '靴'),
]
for from_node, to_node in edges:
dag.add_edge(from_node, to_node)
result = topological_sort_dfs(dag)
実際のプロジェクトでタスクの依存関係を管理するシステムを作ってみましょう。
import datetime
from typing import List, Dict, Optional
class Task:
def __init__(self, name: str, duration: int, description: str = ""):
self.name = name
self.duration = duration # 日数
self.description = description
self.earliest_start = 0
self.latest_start = 0
self.is_critical = False
def __str__(self):
return f"Task({self.name}, {self.duration}日)"
class ProjectScheduler:
def __init__(self):
self.dag = DAG()
self.tasks = {}
def add_task(self, name: str, duration: int, description: str = ""):
"""タスクを追加"""
task = Task(name, duration, description)
self.tasks[name] = task
self.dag.add_node(name)
print(f"タスク追加: {task}")
def add_dependency(self, prerequisite: str, task: str):
"""依存関係を追加(prerequisite → task)"""
self.dag.add_edge(prerequisite, task)
print(f"依存関係: {prerequisite} → {task}")
def calculate_schedule(self):
"""スケジュールを計算"""
topo_order = topological_sort_kahn(self.dag)
if not topo_order:
print("❌ 循環依存があります!スケジュール不可能")
return None
print("\n=== スケジュール計算 ===")
# 最早開始時刻の計算
for task_name in topo_order:
task = self.tasks[task_name]
max_prereq_finish = 0
# 前提タスクの最大完了時間を計算
for node in self.dag.graph:
if task_name in self.dag.graph[node]:
prereq_finish = self.tasks[node].earliest_start + self.tasks[node].duration
max_prereq_finish = max(max_prereq_finish, prereq_finish)
task.earliest_start = max_prereq_finish
print(f"{task_name}: 最早開始 = {task.earliest_start}日目")
# プロジェクト全体の期間
project_duration = max(
task.earliest_start + task.duration
for task in self.tasks.values()
)
print(f"\n🎯 プロジェクト期間: {project_duration}日")
return topo_order, project_duration
def find_critical_path(self):
"""クリティカルパスを見つける"""
topo_order, project_duration = self.calculate_schedule()
if not topo_order:
return None
# 最遅開始時刻の計算(逆順)
for task_name in reversed(topo_order):
task = self.tasks[task_name]
# このタスクに依存するタスクの最小開始時間
min_successor_start = project_duration
has_successor = False
for successor in self.dag.graph[task_name]:
successor_task = self.tasks[successor]
min_successor_start = min(min_successor_start, successor_task.latest_start)
has_successor = True
if not has_successor:
task.latest_start = project_duration - task.duration
else:
task.latest_start = min_successor_start - task.duration
# クリティカルタスクの判定
task.is_critical = (task.earliest_start == task.latest_start)
# クリティカルパスの抽出
critical_tasks = [name for name, task in self.tasks.items() if task.is_critical]
print("\n🔥 クリティカルパス:")
for task_name in critical_tasks:
task = self.tasks[task_name]
print(f" {task_name}: {task.earliest_start}日目〜{task.earliest_start + task.duration}日目")
return critical_tasks
# 使用例:Webサイト開発プロジェクト
scheduler = ProjectScheduler()
# タスクを追加
scheduler.add_task("要件定義", 3, "要件の詳細化と仕様決定")
scheduler.add_task("UI設計", 5, "ユーザーインターフェース設計")
scheduler.add_task("DB設計", 4, "データベース設計")
scheduler.add_task("フロントエンド開発", 8, "UI実装")
scheduler.add_task("バックエンド開発", 10, "API・DB実装")
scheduler.add_task("テスト", 5, "統合テスト")
scheduler.add_task("デプロイ", 2, "本番環境への展開")
# 依存関係を設定
scheduler.add_dependency("要件定義", "UI設計")
scheduler.add_dependency("要件定義", "DB設計")
scheduler.add_dependency("UI設計", "フロントエンド開発")
scheduler.add_dependency("DB設計", "バックエンド開発")
scheduler.add_dependency("フロントエンド開発", "テスト")
scheduler.add_dependency("バックエンド開発", "テスト")
scheduler.add_dependency("テスト", "デプロイ")
# スケジュール計算
critical_path = scheduler.find_critical_path()
以下の科目の履修順序を決めてください:
以下のグラフがDAGかどうか判定してください:
A → B → C → D, B → E → F → C
DAGにおいて、特定のノードからの最長パスを求める関数を実装してください。
中級編をマスターしたあなたは、次の上級トピックに挑戦する準備ができています: