Every file, every line, fully commented and production-ready. Click a module tab to view its code.
# ============================================================
# config/settings.py
# AGCM — Global Configuration & Constants
# ============================================================
# ────────────────────────────────────────────────────────────
# CPU Power Settings
# Adjust to match your actual system's TDP (Thermal Design Power)
# ────────────────────────────────────────────────────────────
DEFAULT_CPU_WATT = 65 # Watts — standard desktop CPU TDP
MIN_CPU_WATT = 5 # Watts — ARM/embedded
MAX_CPU_WATT = 250 # Watts — high-end server
# ────────────────────────────────────────────────────────────
# Energy Units
# ────────────────────────────────────────────────────────────
JOULES_TO_MJ = 1000 # 1 J = 1000 mJ
JOULES_TO_KWH = 1 / 3_600_000 # 1 kWh = 3,600,000 J
MJ_TO_KWH = 1 / 3_600_000_000
# ────────────────────────────────────────────────────────────
# Hardware Efficiency Profiles
# H_f: multiplier applied to energy (lower = more efficient)
# ────────────────────────────────────────────────────────────
HARDWARE_PROFILES = {
"arm_efficient": {"label": "ARM / Efficient Laptop", "H_f": 0.6, "tdp_w": 10},
"modern_laptop": {"label": "Modern Laptop (15–25W)", "H_f": 0.8, "tdp_w": 20},
"standard": {"label": "Standard Desktop (65W)", "H_f": 1.0, "tdp_w": 65},
"high_end": {"label": "High-End Desktop (125W)", "H_f": 1.3, "tdp_w": 125},
"legacy": {"label": "Legacy System (150W+)", "H_f": 1.5, "tdp_w": 150},
}
DEFAULT_HARDWARE = "standard"
# ────────────────────────────────────────────────────────────
# Default Settings
# ────────────────────────────────────────────────────────────
DEFAULT_REGION = "india"
DEFAULT_INPUT_SIZE = 1000
REPETITIONS = 5 # Number of runs per algorithm (median used)
WARMUP_RUNS = 2 # Warmup runs to discard
# ────────────────────────────────────────────────────────────
# Carbon Budget Mode
# ────────────────────────────────────────────────────────────
CARBON_BUDGET_MG = 5.0 # Default budget: 5 mg CO₂
# ────────────────────────────────────────────────────────────
# Complexity Order Multipliers (used in AGCM calculation)
# Maps Big-O notation to a numeric complexity for input n
# ────────────────────────────────────────────────────────────
import math
def complexity_ops(order: str, n: int) -> float:
"""
Convert Big-O notation string to numeric operation count.
Args:
order: Complexity string, e.g. 'O(n^2)', 'O(n log n)', 'O(2^n)'
n: Input size
Returns:
Estimated operation count (float)
"""
if n <= 0:
return 1.0
ops_map = {
"O(1)": 1,
"O(log n)": math.log2(n),
"O(n)": n,
"O(n log n)": n * math.log2(n),
"O(n^2)": n ** 2,
"O(n^3)": n ** 3,
"O(2^n)": 2 ** min(n, 30), # Cap to avoid overflow
"O(V+E)": n + n * 2, # Approximate for sparse graph
}
return float(ops_map.get(order, n))
# ============================================================
# algorithms/sorting.py
# AGCM — Sorting Algorithm Implementations
# All algorithms operate on Python lists (in-place or return new)
# ============================================================
from typing import List
# ────────────────────────────────────────────────────────────
# 1. BUBBLE SORT
# Time: O(n²) worst/avg | Space: O(1)
# ────────────────────────────────────────────────────────────
def bubble_sort(arr: List[int]) -> List[int]:
"""
Bubble Sort: Repeatedly swap adjacent elements that are out of order.
Green Note: Very inefficient for large n. High AGCM score.
Args:
arr: Input list of integers
Returns:
Sorted list (in-place, also returns reference)
"""
arr = arr.copy() # Work on a copy to avoid mutating input
n = len(arr)
for i in range(n):
swapped = False # Early exit optimization
for j in range(0, n - i - 1):
if arr[j] > arr[j + 1]:
arr[j], arr[j + 1] = arr[j + 1], arr[j]
swapped = True
if not swapped:
break # Already sorted
return arr
# ────────────────────────────────────────────────────────────
# 2. INSERTION SORT
# Time: O(n) best | O(n²) worst | Space: O(1)
# ────────────────────────────────────────────────────────────
def insertion_sort(arr: List[int]) -> List[int]:
"""
Insertion Sort: Build sorted portion one element at a time.
Green Note: Better than Bubble on nearly-sorted data (O(n) best case).
Good for small n or nearly-sorted inputs.
"""
arr = arr.copy()
for i in range(1, len(arr)):
key = arr[i] # Element to be inserted
j = i - 1
# Shift elements greater than key one position right
while j >= 0 and arr[j] > key:
arr[j + 1] = arr[j]
j -= 1
arr[j + 1] = key
return arr
# ────────────────────────────────────────────────────────────
# 3. MERGE SORT
# Time: O(n log n) all cases | Space: O(n)
# ────────────────────────────────────────────────────────────
def merge_sort(arr: List[int]) -> List[int]:
"""
Merge Sort: Divide-and-conquer. Split, sort, merge.
Green Note: Predictable O(n log n) in all cases = consistent
energy cost. Recommended for large, unsorted datasets.
"""
if len(arr) <= 1:
return arr.copy()
mid = len(arr) // 2
left = merge_sort(arr[:mid])
right = merge_sort(arr[mid:])
return _merge(left, right)
def _merge(left: List[int], right: List[int]) -> List[int]:
"""Helper: merge two sorted arrays into one sorted array."""
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i]); i += 1
else:
result.append(right[j]); j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
# ────────────────────────────────────────────────────────────
# 4. QUICK SORT
# Time: O(n log n) avg | O(n²) worst | Space: O(log n)
# ────────────────────────────────────────────────────────────
def quick_sort(arr: List[int]) -> List[int]:
"""
Quick Sort: Partition around pivot, recursively sort sub-arrays.
Green Note: Cache-friendly and in-place (O(log n) space).
Typically the greenest sort for random data. Uses median-of-3
pivot selection to avoid worst-case.
"""
arr = arr.copy()
_quick_sort_helper(arr, 0, len(arr) - 1)
return arr
def _quick_sort_helper(arr, low, high):
if low < high:
pivot_idx = _partition(arr, low, high)
_quick_sort_helper(arr, low, pivot_idx - 1)
_quick_sort_helper(arr, pivot_idx + 1, high)
def _partition(arr, low, high):
"""Lomuto partition scheme with last element as pivot."""
pivot = arr[high]
i = low - 1
for j in range(low, high):
if arr[j] <= pivot:
i += 1
arr[i], arr[j] = arr[j], arr[i]
arr[i + 1], arr[high] = arr[high], arr[i + 1]
return i + 1
# ────────────────────────────────────────────────────────────
# Algorithm Metadata Registry
# ────────────────────────────────────────────────────────────
SORTING_ALGORITHMS = {
"bubble_sort": {"func": bubble_sort, "complexity": "O(n^2)", "name": "Bubble Sort"},
"insertion_sort": {"func": insertion_sort, "complexity": "O(n^2)", "name": "Insertion Sort"},
"merge_sort": {"func": merge_sort, "complexity": "O(n log n)", "name": "Merge Sort"},
"quick_sort": {"func": quick_sort, "complexity": "O(n log n)", "name": "Quick Sort"},
}
# ============================================================
# algorithms/fibonacci.py
# AGCM — Fibonacci Implementations
# Demonstrates the massive carbon cost of naive recursion
# vs dynamic programming (memoization)
# ============================================================
from functools import lru_cache
from typing import Dict
# ────────────────────────────────────────────────────────────
# 1. NAIVE FIBONACCI (Exponential — Do NOT use for large n)
# Time: O(2ⁿ) | Space: O(n) call stack
# ────────────────────────────────────────────────────────────
def naive_fibonacci(n: int) -> int:
"""
Naive recursive Fibonacci — for demonstration only.
WARNING: Extremely slow for n > 30. Used in AGCM to show
the catastrophic carbon cost of exponential algorithms.
Time Complexity: O(2^n) — each call branches into 2 more
Space Complexity: O(n) — call stack depth
Args:
n: Fibonacci index (keep n <= 30 for demo)
Returns:
nth Fibonacci number
"""
if n <= 0:
return 0
if n == 1:
return 1
return naive_fibonacci(n - 1) + naive_fibonacci(n - 2)
# ────────────────────────────────────────────────────────────
# 2. MEMOIZED FIBONACCI (Linear — recommended)
# Time: O(n) | Space: O(n)
# ────────────────────────────────────────────────────────────
def memoized_fibonacci(n: int, memo: Dict[int, int] = None) -> int:
"""
Memoized Fibonacci — stores computed sub-results.
Green Note: Reduces 2^n calls to just n calls.
For n=30: Naive ≈ 2 billion ops, Memoized ≈ 30 ops.
Carbon saving: ~99.99%
Time Complexity: O(n) — each value computed once
Space Complexity: O(n) — memo dictionary
Args:
n: Fibonacci index
memo: Memoization dictionary (auto-initialized)
Returns:
nth Fibonacci number
"""
if memo is None:
memo = {}
if n <= 0: return 0
if n == 1: return 1
if n in memo:
return memo[n] # Cache hit — no recomputation
memo[n] = memoized_fibonacci(n - 1, memo) + memoized_fibonacci(n - 2, memo)
return memo[n]
# ────────────────────────────────────────────────────────────
# Algorithm Metadata Registry
# ────────────────────────────────────────────────────────────
FIBONACCI_ALGORITHMS = {
"naive_fibonacci": {"func": naive_fibonacci, "complexity": "O(2^n)", "name": "Naive Fibonacci"},
"memoized_fibonacci": {"func": memoized_fibonacci, "complexity": "O(n)", "name": "Memoized Fibonacci"},
}
# ============================================================
# profiler/runtime_profiler.py
# AGCM — Runtime Profiler
# Wraps algorithm execution and measures:
# - Execution time (seconds, microsecond precision)
# - CPU usage (%)
# - Peak memory usage (KB)
# ============================================================
import time
import tracemalloc
import psutil
import os
from dataclasses import dataclass
from typing import Callable, Any, Optional
from statistics import median
from config.settings import REPETITIONS, WARMUP_RUNS
@dataclass
class ProfileResult:
"""Structured container for profiling measurements."""
algorithm_name: str
input_size: int
time_s: float # Execution time in seconds
cpu_percent: float # CPU usage during execution (%)
memory_kb: float # Peak memory allocation (KB)
def __repr__(self) -> str:
return (
f"ProfileResult({self.algorithm_name}: "
f"t={self.time_s*1000:.4f}ms, "
f"cpu={self.cpu_percent:.1f}%, "
f"mem={self.memory_kb:.2f}KB)"
)
class RuntimeProfiler:
"""
Profiles an algorithm's runtime performance.
Usage:
profiler = RuntimeProfiler()
result = profiler.profile(bubble_sort, data, "Bubble Sort", n=1000)
"""
def __init__(self,
repetitions: int = REPETITIONS,
warmup: int = WARMUP_RUNS):
self.repetitions = repetitions
self.warmup = warmup
def profile(self,
func: Callable,
input_data: Any,
algo_name: str,
n: int) -> ProfileResult:
"""
Profile a function with given input data.
Args:
func: Algorithm function to profile
input_data: Input data (list, int, etc.)
algo_name: Human-readable algorithm name
n: Input size (for reporting)
Returns:
ProfileResult with time, CPU, and memory measurements
"""
# ── Warmup runs (discard results) ──
for _ in range(self.warmup):
try:
func(input_data) if not isinstance(input_data, int) else func(input_data)
except:
pass
times = []
cpu_vals = []
memory_val = 0.0
# ── Main measurement runs ──
for i in range(self.repetitions):
# CPU baseline
cpu_before = psutil.cpu_percent(interval=None)
# Start memory tracking
tracemalloc.start()
# ── Time measurement ──
t_start = time.perf_counter()
try:
if isinstance(input_data, int):
func(input_data) # For fibonacci (int input)
else:
func(input_data) # For sorting (list input)
except RecursionError:
tracemalloc.stop()
raise RecursionError(f"Recursion limit hit for n={n}. Use smaller n.")
t_end = time.perf_counter()
# Peak memory
_, peak_bytes = tracemalloc.get_traced_memory()
tracemalloc.stop()
cpu_after = psutil.cpu_percent(interval=None)
times.append(t_end - t_start)
cpu_vals.append(max(cpu_before, cpu_after))
memory_val = max(memory_val, peak_bytes / 1024) # bytes → KB
return ProfileResult(
algorithm_name = algo_name,
input_size = n,
time_s = median(times),
cpu_percent = sum(cpu_vals) / len(cpu_vals),
memory_kb = memory_val,
)
# ============================================================
# energy/energy_model.py
# AGCM — Energy Estimation Model
#
# Model: E = P × t
# Where: E = Energy (Joules)
# P = CPU Power (Watts)
# t = Execution time (seconds)
#
# This is a simplified linear model suitable for semester
# project and educational purposes. For production use,
# integrate with hardware power counters (RAPL, etc.)
# ============================================================
from dataclasses import dataclass
from config.settings import DEFAULT_CPU_WATT, JOULES_TO_MJ, HARDWARE_PROFILES
@dataclass
class EnergyResult:
"""Energy estimation output."""
energy_j: float # Raw energy in Joules
energy_mj: float # Energy in millijoules (mJ)
power_w: float # Assumed CPU power (W)
time_s: float # Execution time used
H_f: float # Hardware efficiency factor applied
energy_mj_adjusted: float # After hardware factor
class EnergyModel:
"""
Estimates CPU energy consumption from execution time.
Formula: E = P_cpu × t_exec × H_f
The hardware efficiency factor (H_f) scales the base
energy estimate to account for actual hardware power profile.
Args:
cpu_watt: CPU TDP in watts (default from settings)
hardware: Hardware profile key (e.g. 'standard', 'arm_efficient')
Usage:
model = EnergyModel(cpu_watt=65, hardware='standard')
result = model.estimate(time_s=0.01)
print(f"Energy: {result.energy_mj_adjusted:.4f} mJ")
"""
def __init__(self,
cpu_watt: float = DEFAULT_CPU_WATT,
hardware: str = "standard"):
self.cpu_watt = cpu_watt
self.hardware = hardware
profile = HARDWARE_PROFILES.get(hardware, HARDWARE_PROFILES["standard"])
self.H_f = profile["H_f"]
self.hardware_label = profile["label"]
def estimate(self, time_s: float) -> EnergyResult:
"""
Estimate energy for a given execution time.
Args:
time_s: Execution time in seconds
Returns:
EnergyResult with full breakdown
"""
if time_s < 0:
raise ValueError("Execution time cannot be negative.")
energy_j = self.cpu_watt * time_s # E = P × t (Joules)
energy_mj = energy_j * JOULES_TO_MJ # Convert to mJ
energy_mj_adjusted = energy_mj * self.H_f # Apply hardware factor
return EnergyResult(
energy_j = energy_j,
energy_mj = energy_mj,
power_w = self.cpu_watt,
time_s = time_s,
H_f = self.H_f,
energy_mj_adjusted = energy_mj_adjusted,
)
def energy_per_op(self, time_s: float, op_count: float) -> float:
"""
Calculate energy cost per operation (E_unit).
Args:
time_s: Execution time in seconds
op_count: Estimated operation count f(n)
Returns:
Energy per operation in nanojoules (nJ)
"""
if op_count <= 0:
return 0.0
result = self.estimate(time_s)
# Convert mJ to nJ (×1,000,000) then divide by op count
energy_nj = result.energy_mj_adjusted * 1_000_000
return energy_nj / op_count
# ============================================================
# agcm/agcm_engine.py
# AGCM — Adaptive Green Complexity Metric Engine
#
# FORMULA:
# AGCM(n, h, r) = O(f(n)) × E_unit × H_f × R_c
#
# Where:
# O(f(n)) = Complexity ops at input size n
# E_unit = Energy per operation (nJ)
# H_f = Hardware efficiency factor
# R_c = Regional carbon intensity (gCO₂/kWh)
#
# Lower AGCM = Greener algorithm
# ============================================================
from dataclasses import dataclass, field
from typing import Optional
from profiler.runtime_profiler import ProfileResult
from energy.energy_model import EnergyModel, EnergyResult
from carbon.carbon_calculator import CarbonCalculator, CarbonResult
from config.settings import complexity_ops, HARDWARE_PROFILES
from carbon.regions import REGIONS
@dataclass
class AGCMResult:
"""
Complete result for one algorithm run.
Aggregates profiling, energy, carbon, and AGCM score.
"""
# Identity
algorithm_name: str
complexity: str
input_size: int
region: str
hardware: str
# Profiling
time_s: float
cpu_percent: float
memory_kb: float
# Energy
energy_mj: float
energy_unit_nj: float # E_unit (energy per operation)
H_f: float
# Carbon
carbon_mg: float # CO₂ in milligrams
R_c: float # Regional carbon intensity factor
# AGCM
agcm_score: float # THE KEY METRIC
agcm_rank: int = 0 # Filled after comparison
within_budget: bool = True
def summary(self) -> str:
"""Human-readable one-line summary."""
return (
f"[{self.algorithm_name}] "
f"Time={self.time_s*1000:.3f}ms | "
f"Energy={self.energy_mj:.4f}mJ | "
f"Carbon={self.carbon_mg:.4f}mgCO₂ | "
f"AGCM={self.agcm_score:.6f}"
)
class AGCMEngine:
"""
Orchestrates the full AGCM pipeline for a single algorithm.
Steps:
1. Accept ProfileResult from RuntimeProfiler
2. Estimate energy via EnergyModel
3. Calculate carbon via CarbonCalculator
4. Compute AGCM score
Usage:
engine = AGCMEngine(region='india', hardware='standard')
result = engine.compute(profile_result, complexity='O(n^2)', n=1000)
"""
def __init__(self,
region: str = "india",
hardware: str = "standard",
cpu_watt: float = 65.0):
self.region = region
self.hardware = hardware
self.energy_model = EnergyModel(cpu_watt=cpu_watt, hardware=hardware)
self.carbon_calc = CarbonCalculator(region=region)
self.R_c = REGIONS[region]["intensity_g_per_kwh"]
self.H_f = HARDWARE_PROFILES[hardware]["H_f"]
def compute(self,
profile: ProfileResult,
complexity: str,
n: int) -> AGCMResult:
"""
Run full AGCM computation for a profiled algorithm.
Args:
profile: ProfileResult from RuntimeProfiler
complexity: Big-O string (e.g. 'O(n^2)')
n: Input size
Returns:
AGCMResult with all metrics and AGCM score
"""
# ── Step 1: Operation Count ──
ops = complexity_ops(complexity, n)
# ── Step 2: Energy Estimation ──
energy_result = self.energy_model.estimate(profile.time_s)
E_unit = self.energy_model.energy_per_op(profile.time_s, ops)
# ── Step 3: Carbon Calculation ──
carbon_result = self.carbon_calc.calculate(energy_result.energy_mj_adjusted)
# ── Step 4: AGCM Score ──
# AGCM = ops × E_unit × H_f × R_c
# (normalized to a small number for readability)
agcm_score = ops * E_unit * self.H_f * self.R_c
return AGCMResult(
algorithm_name = profile.algorithm_name,
complexity = complexity,
input_size = n,
region = self.region,
hardware = self.hardware,
time_s = profile.time_s,
cpu_percent = profile.cpu_percent,
memory_kb = profile.memory_kb,
energy_mj = energy_result.energy_mj_adjusted,
energy_unit_nj = E_unit,
H_f = self.H_f,
carbon_mg = carbon_result.carbon_mg,
R_c = self.R_c,
agcm_score = agcm_score,
)
# ============================================================
# agcm/recommender.py
# AGCM — Green Recommendation Engine
#
# After comparing algorithms, this module:
# 1. Ranks all algorithms by AGCM score (ascending = greener)
# 2. Identifies the greenest algorithm
# 3. Generates human-readable recommendations
# 4. Calculates % improvement for greener choices
# ============================================================
from typing import List, Dict
from agcm.agcm_engine import AGCMResult
class GreenRecommender:
"""
Analyzes a list of AGCMResults and generates sustainability
recommendations.
Usage:
rec = GreenRecommender()
ranked = rec.rank(results_list)
report = rec.generate_report(ranked)
print(report)
"""
def rank(self, results: List[AGCMResult]) -> List[AGCMResult]:
"""
Sort algorithms by AGCM score (lower = greener = better rank).
Assigns agcm_rank field (1 = greenest).
Args:
results: List of AGCMResult objects
Returns:
Sorted list with rank assigned
"""
sorted_results = sorted(results, key=lambda r: r.agcm_score)
for i, r in enumerate(sorted_results):
r.agcm_rank = i + 1
return sorted_results
def carbon_saving_percent(self,
greenest: AGCMResult,
other: AGCMResult) -> float:
"""Calculate % carbon reduction from switching to greener algorithm."""
if other.carbon_mg == 0:
return 0.0
return ((other.carbon_mg - greenest.carbon_mg) / other.carbon_mg) * 100
def generate_report(self, ranked: List[AGCMResult]) -> str:
"""
Generate a full text recommendation report.
Args:
ranked: Sorted list of AGCMResult (from rank())
Returns:
Multi-line string report
"""
if not ranked:
return "No results to analyze."
greenest = ranked[0]
worst = ranked[-1]
lines = []
lines.append("=" * 60)
lines.append(" AGCM GREEN RECOMMENDATION REPORT")
lines.append("=" * 60)
lines.append(f" Region: {greenest.region.upper()}")
lines.append(f" Hardware: {greenest.hardware}")
lines.append(f" Input n: {greenest.input_size}")
lines.append("")
lines.append(" RANKING (Lower AGCM = Greener)")
lines.append("-" * 60)
for r in ranked:
saving = self.carbon_saving_percent(greenest, r)
medal = ["🥇", "🥈", "🥉"].get(r.agcm_rank - 1, " ") if r.agcm_rank <= 3 else " "
lines.append(
f" {r.agcm_rank}. {medal} {r.algorithm_name:20s} | "
f"AGCM={r.agcm_score:.5f} | "
f"CO₂={r.carbon_mg:.4f}mg | "
f"{'GREENER by ' + str(round(saving,1)) + '%' if saving > 0 else 'BASELINE'}"
)
lines.append("")
lines.append(" RECOMMENDATION")
lines.append("-" * 60)
lines.append(f" ✅ USE: {greenest.algorithm_name}")
lines.append(f" AGCM Score: {greenest.agcm_score:.6f}")
lines.append(f" Carbon: {greenest.carbon_mg:.4f} mg CO₂")
lines.append(f" Energy: {greenest.energy_mj:.4f} mJ")
if len(ranked) > 1:
saving_vs_worst = self.carbon_saving_percent(greenest, worst)
lines.append(f"")
lines.append(f" ❌ AVOID: {worst.algorithm_name}")
lines.append(f" Switching saves {saving_vs_worst:.1f}% carbon emissions.")
lines.append("=" * 60)
return "\n".join(lines)
# ============================================================
# agcm/budget_mode.py
# AGCM — Carbon Budget Mode
#
# Users define a maximum CO₂ budget (mg).
# This module filters algorithms and reports compliance.
# ============================================================
from dataclasses import dataclass
from typing import List, Tuple
from agcm.agcm_engine import AGCMResult
from config.settings import CARBON_BUDGET_MG
@dataclass
class BudgetReport:
"""Result of carbon budget analysis."""
budget_mg: float
compliant: List[AGCMResult] # Algorithms within budget
exceeded: List[AGCMResult] # Algorithms over budget
recommended: AGCMResult # Best (lowest AGCM) within budget
total_tested: int
class CarbonBudgetMode:
"""
Carbon Budget Mode: Filter and recommend algorithms
that stay within a user-defined CO₂ emission limit.
Example:
budget = CarbonBudgetMode(budget_mg=5.0)
report = budget.analyze(results_list)
print(budget.format_report(report))
"""
def __init__(self, budget_mg: float = CARBON_BUDGET_MG):
if budget_mg <= 0:
raise ValueError("Carbon budget must be positive (mg CO₂).")
self.budget_mg = budget_mg
def analyze(self, results: List[AGCMResult]) -> BudgetReport:
"""
Analyze results against the carbon budget.
Args:
results: List of AGCMResult objects
Returns:
BudgetReport with compliant/exceeded split
"""
for r in results:
r.within_budget = r.carbon_mg <= self.budget_mg
compliant = [r for r in results if r.within_budget]
exceeded = [r for r in results if not r.within_budget]
if not compliant:
recommended = min(results, key=lambda r: r.carbon_mg)
else:
recommended = min(compliant, key=lambda r: r.agcm_score)
return BudgetReport(
budget_mg = self.budget_mg,
compliant = compliant,
exceeded = exceeded,
recommended = recommended,
total_tested = len(results),
)
def format_report(self, report: BudgetReport) -> str:
"""Generate text report of budget compliance."""
lines = [
f"CARBON BUDGET MODE — Budget: {report.budget_mg:.2f} mg CO₂",
f"Algorithms tested: {report.total_tested}",
f"Within budget: {len(report.compliant)} algorithms ✅",
f"Over budget: {len(report.exceeded)} algorithms ❌",
"",
]
if report.compliant:
lines.append("✅ COMPLIANT ALGORITHMS:")
for r in sorted(report.compliant, key=lambda x: x.carbon_mg):
lines.append(f" {r.algorithm_name}: {r.carbon_mg:.4f} mg CO₂")
if report.exceeded:
lines.append("\n❌ OVER BUDGET:")
for r in sorted(report.exceeded, key=lambda x: x.carbon_mg, reverse=True):
excess = r.carbon_mg - report.budget_mg
lines.append(f" {r.algorithm_name}: {r.carbon_mg:.4f} mg CO₂ (over by {excess:.4f})")
lines.append(f"\n🌿 RECOMMENDED: {report.recommended.algorithm_name}")
return "\n".join(lines)
# ============================================================
# main.py
# AGCM — Command Line Interface Entry Point
#
# Usage:
# python main.py
# python main.py --size 1000 --region india --hardware standard
# python main.py --budget 5.0 --category sorting
# ============================================================
import argparse
import random
import sys
from typing import List
from algorithms.sorting import SORTING_ALGORITHMS
from algorithms.fibonacci import FIBONACCI_ALGORITHMS
from profiler.runtime_profiler import RuntimeProfiler
from agcm.agcm_engine import AGCMEngine, AGCMResult
from agcm.recommender import GreenRecommender
from agcm.budget_mode import CarbonBudgetMode
from visualization.charts import plot_all_charts
from visualization.tables import print_results_table
from config.settings import HARDWARE_PROFILES
def parse_args():
parser = argparse.ArgumentParser(
description='AGCM: Adaptive Green Complexity Metric'
)
parser.add_argument('--size', type=int, default=1000, help='Input size n')
parser.add_argument('--region', type=str, default='india', help='Region (india/usa/eu)')
parser.add_argument('--hardware', type=str, default='standard',help='Hardware profile')
parser.add_argument('--budget', type=float, default=5.0, help='Carbon budget (mg CO₂)')
parser.add_argument('--category', type=str, default='sorting', help='Category: sorting/fibonacci/all')
parser.add_argument('--charts', action='store_true', help='Show matplotlib charts')
return parser.parse_args()
def run_comparison(category: str, n: int,
region: str, hardware: str) -> List[AGCMResult]:
"""Run all algorithms in a category and return AGCM results."""
profiler = RuntimeProfiler()
engine = AGCMEngine(region=region, hardware=hardware)
results = []
if category in ('sorting', 'all'):
data = list(range(n, 0, -1)) # Worst-case: reverse-sorted
for key, algo in SORTING_ALGORITHMS.items():
print(f" Profiling {algo['name']}...")
profile = profiler.profile(algo['func'], data, algo['name'], n)
result = engine.compute(profile, algo['complexity'], n)
results.append(result)
if category in ('fibonacci', 'all'):
fib_n = min(n, 28) # Cap naive fibonacci to avoid timeout
for key, algo in FIBONACCI_ALGORITHMS.items():
print(f" Profiling {algo['name']}...")
profile = profiler.profile(algo['func'], fib_n, algo['name'], fib_n)
result = engine.compute(profile, algo['complexity'], fib_n)
results.append(result)
return results
def main():
args = parse_args()
print("\n🌿 AGCM — Adaptive Green Complexity Metric")
print("=" * 50)
print(f" Input Size : {args.size}")
print(f" Region : {args.region.upper()}")
print(f" Hardware : {HARDWARE_PROFILES[args.hardware]['label']}")
print(f" Budget : {args.budget} mg CO₂")
print(f" Category : {args.category}")
print("=" * 50)
print("\nRunning algorithms...")
results = run_comparison(args.category, args.size, args.region, args.hardware)
# ── Rank and recommend ──
rec = GreenRecommender()
ranked = rec.rank(results)
# ── Print results table ──
print_results_table(ranked)
# ── Recommendation report ──
print(rec.generate_report(ranked))
# ── Carbon Budget Mode ──
budget_mode = CarbonBudgetMode(budget_mg=args.budget)
report = budget_mode.analyze(ranked)
print(budget_mode.format_report(report))
# ── Optional charts ──
if args.charts:
plot_all_charts(ranked)
if __name__ == "__main__":
main()
# ============================================================
# carbon/regions.py
# AGCM — Regional Carbon Intensity Data
#
# Data sources:
# - IEA (International Energy Agency) 2023
# - Our World in Data — CO₂ and Greenhouse Gas Emissions
# - European Environment Agency (EEA)
# ============================================================
# Regional carbon intensity in grams of CO₂ per kWh
# These are average values; real-time APIs give more precise data
REGIONS = {
"india": {
"label": "India 🇮🇳",
"intensity_g_per_kwh": 700,
"R_c": 0.700,
"description": "Heavily coal-dependent grid",
},
"usa": {
"label": "USA 🇺🇸",
"intensity_g_per_kwh": 450,
"R_c": 0.450,
"description": "Mixed: natural gas + renewables",
},
"germany": {
"label": "Germany 🇩🇪",
"intensity_g_per_kwh": 350,
"R_c": 0.350,
"description": "Transitioning to renewables",
},
"eu": {
"label": "EU Average 🇪🇺",
"intensity_g_per_kwh": 275,
"R_c": 0.275,
"description": "Mixed EU grid (incl. nuclear)",
},
"norway": {
"label": "Norway 🇳🇴",
"intensity_g_per_kwh": 28,
"R_c": 0.028,
"description": "Almost entirely hydropower",
},
}
def get_region(key: str) -> dict:
"""Retrieve region data by key. Raises ValueError if not found."""
if key not in REGIONS:
raise ValueError(f"Unknown region '{key}'. Available: {list(REGIONS.keys())}")
return REGIONS[key]
def list_regions() -> list:
"""Return list of (key, label, R_c) tuples for display."""
return [(k, v["label"], v["R_c"]) for k, v in REGIONS.items()]
# ============================================================
# carbon/carbon_calculator.py
# AGCM — Carbon Emission Calculator
#
# Formula: C = E_kWh × R_c
# Where:
# E_kWh = Energy in kilowatt-hours
# R_c = Regional carbon intensity (gCO₂/kWh)
# C = Carbon emission in grams CO₂
#
# We work in milligrams (mg) for readability at small scales.
# ============================================================
from dataclasses import dataclass
from carbon.regions import REGIONS, get_region
from config.settings import MJ_TO_KWH
@dataclass
class CarbonResult:
"""Carbon emission calculation result."""
energy_mj: float # Input energy (mJ)
energy_kwh: float # Converted to kWh
R_c: float # Regional factor used
carbon_g: float # CO₂ in grams
carbon_mg: float # CO₂ in milligrams (main output)
region: str # Region key
class CarbonCalculator:
"""
Calculates CO₂ emission from energy consumption.
Uses simplified model: C = E_kWh × R_c
Result is in milligrams (mg) for educational clarity.
Args:
region: Region key from regions.py (e.g. 'india', 'usa')
Usage:
calc = CarbonCalculator(region='india')
result = calc.calculate(energy_mj=0.05)
print(f"CO₂: {result.carbon_mg:.6f} mg")
"""
def __init__(self, region: str = "india"):
self.region = region
self.region_data = get_region(region)
self.R_c = self.region_data["R_c"]
def calculate(self, energy_mj: float) -> CarbonResult:
"""
Calculate carbon emission for given energy.
Args:
energy_mj: Energy in millijoules
Returns:
CarbonResult with full breakdown
"""
if energy_mj < 0:
raise ValueError("Energy cannot be negative.")
energy_kwh = energy_mj * MJ_TO_KWH # mJ → kWh
carbon_g = energy_kwh * self.R_c * 1000 # gCO₂/kWh × kWh = gCO₂
carbon_mg = carbon_g * 1000 # g → mg
return CarbonResult(
energy_mj = energy_mj,
energy_kwh = energy_kwh,
R_c = self.R_c,
carbon_g = carbon_g,
carbon_mg = carbon_mg,
region = self.region,
)
def compare_regions(self, energy_mj: float) -> dict:
"""
Calculate carbon across all regions for the same energy.
Useful for region comparison charts.
Returns:
Dict mapping region key → carbon_mg
"""
results = {}
for key, data in REGIONS.items():
kwh = energy_mj * MJ_TO_KWH
results[key] = kwh * data["R_c"] * 1_000_000 # to mg
return results
# ============================================================
# requirements.txt
# AGCM — Python Dependencies
# Install with: pip install -r requirements.txt
# ============================================================
# Core runtime measurements
psutil>=5.9.0 # CPU and memory metrics
memory-profiler>=0.61.0 # Memory profiling (optional detailed mode)
# Data & visualization
matplotlib>=3.7.0 # Chart generation
pandas>=2.0.0 # DataFrames and table output
numpy>=1.24.0 # Numerical operations
tabulate>=0.9.0 # CLI table formatting
# Web UI (optional)
streamlit>=1.28.0 # Streamlit dashboard
plotly>=5.18.0 # Interactive charts for Streamlit
# Testing
pytest>=7.4.0 # Unit testing
# ============================================================
# app.py
# AGCM — Streamlit Interactive Dashboard
# Run with: streamlit run app.py
# ============================================================
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from main import run_comparison
from agcm.recommender import GreenRecommender
from agcm.budget_mode import CarbonBudgetMode
from config.settings import HARDWARE_PROFILES
from carbon.regions import REGIONS
# ── Page config ──
st.set_page_config(
page_title="AGCM Dashboard",
page_icon="🌿",
layout="wide"
)
st.title("🌿 AGCM — Adaptive Green Complexity Metric")
st.markdown("*Carbon-aware algorithm selection and sustainability analysis*")
# ── Sidebar controls ──
with st.sidebar:
st.header("⚙️ Configuration")
category = st.selectbox("Algorithm Category", ["sorting", "fibonacci", "all"])
n = st.slider("Input Size (n)", 100, 5000, 1000, 100)
region = st.selectbox("Region", list(REGIONS.keys()))
hardware = st.selectbox("Hardware", list(HARDWARE_PROFILES.keys()))
budget = st.number_input("Carbon Budget (mg CO₂)", min_value=0.001, value=5.0)
run_btn = st.button("🚀 Run Analysis", use_container_width=True)
if run_btn:
with st.spinner("Running algorithms and computing AGCM..."):
results = run_comparison(category, n, region, hardware)
rec = GreenRecommender()
ranked = rec.rank(results)
budget_mode = CarbonBudgetMode(budget_mg=budget)
b_report = budget_mode.analyze(ranked)
# ── Summary metrics ──
col1, col2, col3, col4 = st.columns(4)
greenest = ranked[0]
col1.metric("🥇 Greenest", greenest.algorithm_name)
col2.metric("🌿 Best AGCM", f"{greenest.agcm_score:.5f}")
col3.metric("☁️ Min CO₂", f"{greenest.carbon_mg:.4f} mg")
col4.metric("⚡ Min Energy", f"{greenest.energy_mj:.4f} mJ")
# ── Results DataFrame ──
df = pd.DataFrame([{
"Algorithm": r.algorithm_name,
"Complexity": r.complexity,
"Time (ms)": round(r.time_s * 1000, 4),
"Energy (mJ)":round(r.energy_mj, 4),
"CO₂ (mg)": round(r.carbon_mg, 6),
"AGCM Score": round(r.agcm_score, 6),
"AGCM Rank": r.agcm_rank,
"Budget ✅": "✅" if r.within_budget else "❌",
} for r in ranked])
st.subheader("📊 Results Table")
st.dataframe(df, use_container_width=True)
# ── Charts ──
c1, c2 = st.columns(2)
with c1:
fig = px.bar(df, x="Algorithm", y="AGCM Score",
title="AGCM Score (lower = greener)",
color="AGCM Score", color_continuous_scale="RdYlGn_r")
st.plotly_chart(fig, use_container_width=True)
with c2:
fig2 = px.scatter(df, x="Time (ms)", y="CO₂ (mg)",
text="Algorithm", size="Energy (mJ)",
title="Time vs Carbon Emission")
st.plotly_chart(fig2, use_container_width=True)