"""
autograder.py
Given a multi-page PDF (one answer sheet per page):
- Convert each page to an image
- Detect fiducials, warp, read bubbles + QR version label
- Look up the correct answers for that version from a CSV
- Grade each page
The answer-key CSV should be generated by make_exams.py and have columns:
version_label, Q1, Q2, Q3, ...
Example row:
01234567,a,b,c,d,a,...
Dependencies:
- pdf2image
- answer_sheet_reader.LayoutConfig
- answer_sheet_reader.read_answer_sheet_with_version
"""
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
import argparse
import os
import csv
import sys
import tempfile
import logging
import numpy as np
import cv2
import pandas as pd
from pdf2image import convert_from_path, pdfinfo_from_path
from .answersheetreader import AnswerSheetReader, LayoutConfig
logger = logging.getLogger(__name__)
[docs]
class Autograder:
"""
Autograder for multi-page PDF answer sheets.
"""
def __init__(self, layout_config: LayoutConfig):
self.layout = layout_config
self.version_keys: Dict[str, List[str]] = {}
[docs]
def load_version_keys_csv(self, keys_csv_path: Path | str):
if not os.path.exists(keys_csv_path):
raise FileNotFoundError(f"Answer-key CSV not found: {keys_csv_path}")
current_number_of_keys = len(self.version_keys)
with open(keys_csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
# Identify question columns and sort them by number
qcols = [col for col in fieldnames if col.startswith("Q")]
qcols.sort(key=lambda name: int(name[1:])) # "Q10" -> 10
for row in reader:
version_label = (row.get("version_label") or "").strip()
if not version_label:
continue
answers: List[str] = []
for col in qcols:
val = row.get(col)
if val == 'True':
answers.append('a')
elif val == 'False':
answers.append('b')
else:
answers.append(val.lower() if val else "")
self.version_keys[version_label] = answers
new_number_of_keys = len(self.version_keys)
logger.info(f"Loaded {new_number_of_keys - current_number_of_keys} new version keys from {keys_csv_path}")
logger.info(f"Total version keys loaded: {new_number_of_keys}")
[docs]
def grade_pdf(self,
pdf_file_path: Path | str,
output_dir_path: [Path | str] = Path.cwd(),
failed_pages_dir_path: Optional[Path | str] = None,
gradebook_paths: Optional[List[Path | str]] = None,
score_column: str = "score",
num_counted: Optional[int] = None,
question_tally_path: Optional[Path | str] = None,
debug_output_dir_path: Optional[Path | str] = None,
interactive: bool = False,
column_id: Optional[str] = None):
"""
Grade the given PDF of answer sheets.
Parameters
----------
pdf_file_path : Path | str
Path to the input PDF file.
output_dir_path : [Path | str]
Path of directory in which to save graded overlay PDFs.
If None, no graded overlays are written.
failed_pages_dir_path : Optional[Path | str]
Directory to save single-page PDFs and the report for pages that
could not be fully read/graded. If None, defaults to
``output_dir_path``.
gradebook_paths : Optional[List[Path | str]]
Paths to one or more non-overlapping gradebook CSV files. Each must
contain a ``Student ID`` column. Grading results (``version_label``,
``num_questions``, ``num_correct``, ``score``, ``status``) are written
into the row whose ``Student ID`` matches the detected student ID.
If None, no gradebooks are updated.
score_column : str
Column name in the gradebook(s) where the score should be written.
Defaults to ``"score"``.
num_counted : Optional[int]
Number of questions that count toward the score. If None, defaults
to the total number of questions. Must be <= total questions.
question_tally_path : Optional[Path | str]
Path to output CSV file summarizing question tallies. If None, no CSV is written
debug_output_dir_path : Optional[Path | str]
Directory to save debug output images. If None, no debug images are saved.
"""
if isinstance(pdf_file_path, str):
pdf_file_path = Path(pdf_file_path)
if not pdf_file_path.exists():
raise FileNotFoundError(f"PDF not found: {pdf_file_path}")
if failed_pages_dir_path is not None:
failed_pages_dir_path = Path(failed_pages_dir_path)
elif output_dir_path is not None:
failed_pages_dir_path = Path(output_dir_path)
if failed_pages_dir_path is not None:
failed_pages_dir_path.mkdir(parents=True, exist_ok=True)
if gradebook_paths:
for gb_path in gradebook_paths:
gb_path = Path(gb_path)
if not gb_path.exists():
raise FileNotFoundError(f"Gradebook not found: {gb_path}")
header = pd.read_csv(gb_path, nrows=0, dtype=str, index_col=False).columns.tolist()
if "Student ID" not in header:
raise ValueError(f"Gradebook {gb_path} is missing required 'Student ID' column. "
f"Columns found: {header}")
if num_counted is not None:
if num_counted < 1:
raise ValueError(f"num_counted must be >= 1, got {num_counted}")
if self.version_keys:
max_q = max(len(v) for v in self.version_keys.values())
if num_counted > max_q:
raise ValueError(
f"num_counted ({num_counted}) exceeds the number of "
f"questions in the loaded answer keys ({max_q})")
if debug_output_dir_path is not None:
debug_output_dir_path = Path(debug_output_dir_path)
if not debug_output_dir_path.exists():
debug_output_dir_path.mkdir(parents=True, exist_ok=True)
total_pages = pdfinfo_from_path(str(pdf_file_path))["Pages"]
results: List[Dict[str, Any]] = []
failed_pages: List[Dict[str, Any]] = []
pages: List = []
for page_index in range(1, total_pages + 1):
sys.stderr.write(f"\rProcessing page {page_index}/{total_pages}...")
sys.stderr.flush()
page = convert_from_path(
str(pdf_file_path), dpi=300, fmt='png',
first_page=page_index, last_page=page_index,
)[0]
pages.append(page)
img = np.array(page)
reader = AnswerSheetReader(img, layout_config=self.layout, debug_output_dir=debug_output_dir_path, interactive=interactive)
read_results = reader.read()
read_status = read_results.get("read_status", "success")
# Interactive recovery: prompt the user to supply missing data
# before treating the page as failed.
if interactive and read_status == "failed_read_qr":
sys.stderr.write(
f"\n[Page {page_index}] QR code could not be read "
f"({read_results.get('read_error', '')})\n"
)
sys.stderr.flush()
try:
version_input = input(
f" Enter exam version label for page {page_index} (or blank to skip): "
).strip()
except EOFError:
version_input = ""
if version_input:
read_results["version"] = version_input
# Run the steps that were skipped after the QR failure.
for _step_name, _action in [
("read_student_id", reader._read_student_id),
("read_bubblefield", reader._read_bubblefield),
]:
try:
_action()
except Exception as _exc:
read_results["read_status"] = f"failed_{_step_name}"
read_results["read_error"] = str(_exc)
break
else:
read_results["read_status"] = "success"
read_status = read_results["read_status"]
if interactive and read_status == "failed_read_student_id":
sys.stderr.write(
f"\n[Page {page_index}] Student ID could not be read "
f"({read_results.get('read_error', '')})\n"
)
sys.stderr.flush()
try:
sid_input = input(
f" Enter student ID for page {page_index} (or blank to skip): "
).strip()
except EOFError:
sid_input = ""
if sid_input:
read_results["student_id_bubbles"] = sid_input
# Run the bubble-field step that was skipped.
try:
reader._read_bubblefield()
read_results["read_status"] = "success"
except Exception as _exc:
read_results["read_status"] = "failed_read_bubblefield"
read_results["read_error"] = str(_exc)
read_status = read_results["read_status"]
if read_status != "success":
logger.warning(f"Page {page_index}: {read_status} — {read_results.get('read_error', '')}")
failed_pdf_name = f"failed_page_{page_index:03d}_{read_status}.pdf"
if failed_pages_dir_path is not None:
page.save(str(failed_pages_dir_path / failed_pdf_name), "PDF", resolution=300.0)
failed_pages.append({
"page_index": page_index,
"reason": f"{read_status}: {read_results.get('read_error', '')}",
"saved_file": failed_pdf_name,
})
results.append({
"page_index": page_index,
"version_label": read_results.get('version'),
"student_id": read_results.get('student_id_bubbles'),
"answers_detected": read_results.get('answers', {}),
"answer_key": None,
"per_question": {},
"num_questions": None,
"num_correct": None,
"score_fraction": None,
"status": read_status,
"overlay_path": None,
})
continue
page_result: Dict[str, Any] = {
"page_index": page_index,
"version_label": read_results['version'],
"student_id": read_results['student_id_bubbles'],
"answers_detected": read_results['answers'],
"answer_key": None,
"per_question": {},
"num_questions": None,
"num_correct": None,
"score_fraction": None,
"status": "ok",
"overlay_path": output_dir_path / f"graded_page_{page_index:03d}.pdf" if output_dir_path is not None else None,
}
key = self.version_keys.get(read_results['version']) if read_results['version'] else None
logger.debug(f"Grading page {page_index}: version={read_results['version']}, student_id={read_results['student_id_bubbles']}, key_found={key is not None}")
logger.debug(f'key={key}, detected_answers={read_results["answers"]}')
if not read_results['version']:
page_result["status"] = "no_qr"
failed_pdf_name = f"failed_page_{page_index:03d}_no_qr.pdf"
if failed_pages_dir_path is not None:
page.save(str(failed_pages_dir_path / failed_pdf_name), "PDF", resolution=300.0)
failed_pages.append({
"page_index": page_index,
"reason": "QR code could not be read",
"saved_file": failed_pdf_name,
})
results.append(page_result)
continue
if key is None:
page_result["status"] = "unknown_version"
failed_pdf_name = f"failed_page_{page_index:03d}_unknown_version_{read_results['version']}.pdf"
if failed_pages_dir_path is not None:
page.save(str(failed_pages_dir_path / failed_pdf_name), "PDF", resolution=300.0)
failed_pages.append({
"page_index": page_index,
"reason": f"Unknown exam version: {read_results['version']}",
"saved_file": failed_pdf_name,
})
results.append(page_result)
continue
page_result["answer_key"] = key
num_q = len(key)
page_result["num_questions"] = num_q
num_correct = 0
per_q: Dict[int, Dict[str, Any]] = {}
for qnum in range(1, num_q + 1):
correct = (key[qnum - 1] or "").lower()
detected = read_results['answers'].get(qnum)
logger.debug(f' Grading Q{qnum}: correct="{correct}", detected="{detected}"')
detected_norm = (detected or "").lower() if detected else ""
is_correct = bool(correct) and (detected_norm == correct)
if is_correct:
num_correct += 1
per_q[qnum] = {
"correct": correct or None,
"detected": detected_norm or None,
"is_correct": is_correct,
}
logger.debug(f' Q{qnum}: correct="{correct}", detected="{detected_norm}", is_correct={is_correct}')
page_result["per_question"] = per_q
page_result["num_correct"] = num_correct
denom = num_counted if num_counted is not None else num_q
page_result["score_fraction"] = (
min(float(num_correct) / denom, 1.0) if denom > 0 else None
)
logger.debug(f' Graded page {page_index}: num_correct={num_correct}/{num_q}, score_fraction={page_result["score_fraction"]}')
logger.debug(f' Writing graded overlay PDF for page {page_index}...')
if output_dir_path is not None:
reader.write_graded_annotations(
per_question_results=per_q,
score_fraction=page_result["score_fraction"],
overlay_path=output_dir_path / f"graded_{read_results['student_id_bubbles'].replace('?', 'x')}_{read_results['version']}.pdf",
)
results.append(page_result)
sys.stderr.write("\r" + " " * 40 + "\r")
sys.stderr.flush()
if gradebook_paths:
# Build a grades DataFrame keyed by student ID
grade_records = []
for res in results:
sid = res.get("student_id")
if sid is not None:
score_frac = res.get("score_fraction")
grade_records.append({
"Student ID": str(sid).strip(),
"version_label": res.get("version_label", ""),
"num_questions": res.get("num_questions", ""),
"num_correct": res.get("num_correct", ""),
score_column: f'{score_frac*100:.1f}' if score_frac is not None else "",
"status": res.get("status", ""),
})
grade_fields = ["version_label", "num_questions", "num_correct", score_column, "status"]
grades_df = pd.DataFrame(grade_records)
logger.debug(f"grade_records built from results: {grade_records}")
logger.debug(f"grades_df columns: {grades_df.columns.tolist()}")
if "Student ID" not in grades_df.columns:
raise ValueError("Internal error: 'Student ID' column missing from grades DataFrame")
logger.debug(f"grades_df 'Student ID' values: {grades_df['Student ID'].tolist()}")
logger.debug(f"grades_df 'Student ID' dtypes: {grades_df['Student ID'].dtype}")
# Normalize IDs for matching: strip leading zeros so that
# "01234567" (from the bubble sheet) matches "1234567" (from
# the gradebook) regardless of zero-padding differences.
def _normalize_id(s: str) -> str:
stripped = s.lstrip("0")
return stripped if stripped else "0"
grades_df["_match_key"] = grades_df["Student ID"].apply(_normalize_id)
logger.debug(f"grades_df _match_key values: {grades_df['_match_key'].tolist()}")
detected_ids = set(grades_df["Student ID"])
logger.info(f"Detected {len(detected_ids)} student ID(s) from graded pages: {sorted(detected_ids)}")
matched_ids: set[str] = set()
for gb_path in gradebook_paths:
gb_path = Path(gb_path)
# Peek at raw file content to diagnose CSV structure issues
with open(gb_path, "r", encoding="utf-8") as _f:
_raw_lines = [_f.readline() for _ in range(3)]
logger.debug(f"Gradebook {gb_path.name} raw lines (first 3): {_raw_lines!r}")
gb_df = pd.read_csv(gb_path, dtype=str, index_col=False).fillna("")
# Drop unnamed columns (typically caused by trailing commas)
unnamed_cols = [c for c in gb_df.columns if c.startswith("Unnamed")]
if unnamed_cols:
logger.debug(f"Dropping unnamed columns from {gb_path.name}: {unnamed_cols}")
gb_df = gb_df.drop(columns=unnamed_cols)
logger.debug(f"Gradebook {gb_path.name} raw columns: {gb_df.columns.tolist()}")
logger.debug(f"Gradebook {gb_path.name} raw 'Student ID' values (first 10): {gb_df['Student ID'].head(10).tolist()}")
logger.debug(f"Gradebook {gb_path.name} 'Student ID' dtype: {gb_df['Student ID'].dtype}")
gb_df["Student ID"] = gb_df["Student ID"].str.strip()
gb_df["_match_key"] = gb_df["Student ID"].apply(_normalize_id)
logger.debug(f"Gradebook {gb_path.name} _match_key values (first 10): {gb_df['_match_key'].head(10).tolist()}")
gb_ids = set(gb_df["Student ID"])
logger.info(f"Gradebook {gb_path.name}: {len(gb_ids)} student ID(s)")
logger.debug(f"Gradebook IDs: {sorted(gb_ids)}")
# Show the intersection and difference of match keys
grades_keys = set(grades_df["_match_key"])
gb_keys = set(gb_df["_match_key"])
logger.debug(f"grades_df match keys: {sorted(grades_keys)}")
logger.debug(f"gradebook match keys (first 20): {sorted(gb_keys)[:20]}")
logger.debug(f"Intersection of match keys: {sorted(grades_keys & gb_keys)}")
logger.debug(f"grades keys NOT in gradebook: {sorted(grades_keys - gb_keys)}")
logger.debug(f"gradebook keys NOT in grades (first 20): {sorted(gb_keys - grades_keys)[:20]}")
# Resolve the target score column for this gradebook.
# If --column-id was given, find the unique column whose name
# contains that substring; fall back to score_column on ambiguity.
target_score_col = score_column
if column_id is not None:
matches = [c for c in gb_df.columns if column_id in c]
if len(matches) == 1:
target_score_col = matches[0]
logger.info(f"column-id '{column_id}' matched '{target_score_col}' in {gb_path.name}")
elif len(matches) == 0:
logger.warning(
f"No column in {gb_path.name} contains '{column_id}' "
f"— falling back to '{score_column}'"
)
else:
logger.warning(
f"column-id '{column_id}' matched multiple columns in {gb_path.name}: "
f"{matches} — falling back to '{score_column}'"
)
# Build effective field list, substituting the resolved score column.
effective_grade_fields = [
target_score_col if gf == score_column else gf
for gf in grade_fields
]
# Update only rows that have new grade data, preserving
# grades written by earlier runs.
for gf in effective_grade_fields:
if gf not in gb_df.columns:
gb_df[gf] = ""
new_grades = grades_df.drop_duplicates(subset="_match_key", keep="last") \
.set_index("_match_key")[grade_fields]
if target_score_col != score_column:
new_grades = new_grades.rename(columns={score_column: target_score_col})
update_mask = gb_df["_match_key"].isin(new_grades.index)
for gf in effective_grade_fields:
mapped = gb_df.loc[update_mask, "_match_key"].map(new_grades[gf])
gb_df.loc[update_mask, gf] = [
str(v) if v not in ("", None) and not (isinstance(v, float) and pd.isna(v)) else ""
for v in mapped
]
logger.debug(f"After update, 'status' values: {gb_df['status'].tolist()}")
updated_count = int(update_mask.sum())
matched_ids.update(gb_df.loc[update_mask, "Student ID"].tolist())
gb_df.drop(columns=["_match_key"]).to_csv(gb_path, index=False)
logger.info(f"Updated {updated_count} of {len(gb_ids)} rows in gradebook {gb_path}")
unmatched = detected_ids - matched_ids
if unmatched:
logger.warning(
f"{len(unmatched)} graded student ID(s) not found in any gradebook: "
f"{', '.join(sorted(unmatched))}"
)
# Save unmatched pages as failed PDFs
for res in results:
sid = res.get("student_id")
if sid is not None and sid in unmatched:
pidx = res["page_index"]
res["status"] = "unmatched_id"
failed_pdf_name = f"failed_page_{pidx:03d}_unmatched_id_{sid}.pdf"
if failed_pages_dir_path is not None:
pages[pidx - 1].save(str(failed_pages_dir_path / failed_pdf_name), "PDF", resolution=300.0)
failed_pages.append({
"page_index": pidx,
"reason": f"Student ID {sid} not found in any gradebook",
"saved_file": failed_pdf_name,
})
logger.info(f"Page {pidx}: student ID {sid} not in any gradebook — saved to {failed_pdf_name}")
if question_tally_path is not None:
# for each student ID, write a row that contains the exam version and their answers
question_tally_path = Path(question_tally_path)
if question_tally_path.exists():
# read existing rows and append only new ones
existing_rows: List[Dict[str, Any]] = []
with open(question_tally_path, "r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
existing_rows.append(row)
existing_combos = set((row["student_id"], row["version_label"]) for row in existing_rows)
new_results = []
for res in results:
combo = (res["student_id"], res["version_label"])
if combo not in existing_combos:
new_results.append(res)
results = existing_rows + new_results
if len(new_results) > 0:
with open(question_tally_path, "w", newline="", encoding="utf-8") as f:
fieldnames = [
"student_id",
"version_label",
] + [f"Q{qnum}" for qnum in range(1, max(len(res["answer_key"] or []) for res in results) + 1)]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for res in results:
row = {
"student_id": res["student_id"],
"version_label": res["version_label"],
}
answers = res["answers_detected"]
for qnum in range(1, len(res["answer_key"] or []) + 1):
row[f"Q{qnum}"] = answers.get(qnum)
writer.writerow(row)
logger.info(f"Wrote question tally CSV to {question_tally_path}")
# Generate report for failed/unparseable pages (after gradebook merge
# so that unmatched-ID pages are included).
if failed_pages and failed_pages_dir_path is not None:
report_path = failed_pages_dir_path / "failed_pages_report.txt"
with open(report_path, "w", encoding="utf-8") as f:
f.write(f"Failed Pages Report\n")
f.write(f"{'=' * 60}\n")
f.write(f"Source PDF: {pdf_file_path}\n")
f.write(f"Total pages processed: {len(pages)}\n")
f.write(f"Pages failed: {len(failed_pages)}\n")
f.write(f"Pages graded successfully: {len(pages) - len(failed_pages)}\n")
f.write(f"{'=' * 60}\n\n")
for entry in failed_pages:
f.write(f"Page {entry['page_index']}:\n")
f.write(f" Reason: {entry['reason']}\n")
f.write(f" Saved as: {entry['saved_file']}\n\n")
logger.info(f"Wrote failed-pages report ({len(failed_pages)} pages) to {report_path}")