Historical Data Manager for Construction
Overview
Manage legacy construction data from archives, old systems, and historical records. Extract, clean, normalize, and migrate data into modern formats for analysis and benchmarking.
Business Case
Construction companies accumulate decades of project data in various formats:
- Paper records scanned to PDF
- Legacy database exports (Access, dBase, FoxPro)
- Old spreadsheet formats (Lotus 1-2-3, early Excel)
- Proprietary software exports
- Project closeout documentation
This skill helps extract value from historical data for:
- Cost benchmarking and trending
- Productivity analysis over time
- Risk pattern identification
- Estimating improvement
Technical Implementation
Historical Data Extractor
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import pandas as pd
import re
import json
@dataclass
class HistoricalRecord:
project_id: str
project_name: str
year: int
data_type: str # cost, schedule, labor, material
original_format: str
extracted_data: Dict[str, Any]
quality_score: float
notes: List[str] = field(default_factory=list)
class HistoricalDataManager:
"""Manage extraction and normalization of historical construction data."""
def __init__(self, archive_path: str):
self.archive_path = Path(archive_path)
self.records: List[HistoricalRecord] = []
self.normalization_rules = self._load_normalization_rules()
def scan_archive(self) -> Dict[str, int]:
"""Scan archive and categorize files by type."""
file_types = {}
for file_path in self.archive_path.rglob('*'):
if file_path.is_file():
ext = file_path.suffix.lower()
file_types[ext] = file_types.get(ext, 0) + 1
return file_types
def extract_from_legacy_excel(self, file_path: str, year: int) -> List[HistoricalRecord]:
"""Extract data from legacy Excel files."""
records = []
try:
# Try different engines for old formats
try:
df = pd.read_excel(file_path, engine='openpyxl')
except:
df = pd.read_excel(file_path, engine='xlrd')
# Detect data type from content
data_type = self._detect_data_type(df)
# Normalize column names
df = self._normalize_columns(df)
# Extract project info
project_info = self._extract_project_info(df, file_path)
record = HistoricalRecord(
project_id=project_info.get('id', f'LEGACY-{year}-{hash(file_path) % 10000}'),
project_name=project_info.get('name', Path(file_path).stem),
year=year,
data_type=data_type,
original_format='excel',
extracted_data=df.to_dict('records'),
quality_score=self._assess_quality(df)
)
records.append(record)
except Exception as e:
print(f"Error extracting {file_path}: {e}")
return records
def extract_from_csv(self, file_path: str, year: int) -> HistoricalRecord:
"""Extract data from CSV files with encoding detection."""
# Try different encodings
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
for encoding in encodings:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except:
continue
df = self._normalize_columns(df)
data_type = self._detect_data_type(df)
return HistoricalRecord(
project_id=f'CSV-{year}-{hash(file_path) % 10000}',
project_name=Path(file_path).stem,
year=year,
data_type=data_type,
original_format='csv',
extracted_data=df.to_dict('records'),
quality_score=self._assess_quality(df)
)
def extract_from_database_export(self, file_path: str, db_type: str) -> List[HistoricalRecord]:
"""Extract data from legacy database exports."""
records = []
if db_type == 'access':
# Read Access MDB/ACCDB files
import pyodbc
conn_str = f'DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};DBQ={file_path}'
conn = pyodbc.connect(conn_str)
# Get all tables
cursor = conn.cursor()
tables = [row.table_name for row in cursor.tables(tableType='TABLE')]
for table in tables:
df = pd.read_sql(f'SELECT * FROM [{table}]', conn)
# Process each table...
conn.close()
return records
def normalize_cost_data(self, records: List[HistoricalRecord], base_year: int = 2026) -> pd.DataFrame:
"""Normalize historical cost data to current dollars."""
# RSMeans historical cost indices (example values)
cost_indices = {
2015: 0.82, 2016: 0.84, 2017: 0.87, 2018: 0.90,
2019: 0.93, 2020: 0.95, 2021: 0.98, 2022: 1.02,
2023: 1.06, 2024: 1.10, 2025: 1.14, 2026: 1.18
}
normalized_data = []
for record in records:
if record.data_type == 'cost':
year_index = cost_indices.get(record.year, 1.0)
base_index = cost_indices.get(base_year, 1.18)
escalation_factor = base_index / year_index
for item in record.extracted_data:
if 'amount' in item or 'cost' in item:
original_cost = item.get('amount') or item.get('cost', 0)
normalized_item = item.copy()
normalized_item['original_cost'] = original_cost
normalized_item['normalized_cost'] = original_cost * escalation_factor
normalized_item['escalation_factor'] = escalation_factor
normalized_item['original_year'] = record.year
normalized_item['project_id'] = record.project_id
normalized_data.append(normalized_item)
return pd.DataFrame(normalized_data)
def _detect_data_type(self, df: pd.DataFrame) -> str:
"""Detect type of data from column names and content."""
columns_lower = [c.lower() for c in df.columns]
if any(c in columns_lower for c in ['cost', 'amount', 'price', 'total', 'budget']):
return 'cost'
elif any(c in columns_lower for c in ['start', 'finish', 'duration', 'task', 'activity']):
return 'schedule'
elif any(c in columns_lower for c in ['hours', 'labor', 'worker', 'crew']):
return 'labor'
elif any(c in columns_lower for c in ['material', 'quantity', 'unit', 'supplier']):
return 'material'
else:
return 'unknown'
def _normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize column names to standard format."""
column_mapping = {
r'proj.*id': 'project_id',
r'proj.*name': 'project_name',
r'desc.*': 'description',
r'qty|quantity': 'quantity',
r'unit.*cost|unit.*price': 'unit_cost',
r'total|amount': 'amount',
r'start.*date': 'start_date',
r'end.*date|finish.*date': 'end_date',
r'dur.*': 'duration',
}
new_columns = {}
for col in df.columns:
col_lower = col.lower().strip()
for pattern, new_name in column_mapping.items():
if re.match(pattern, col_lower):
new_columns[col] = new_name
break
return df.rename(columns=new_columns)
def _assess_quality(self, df: pd.DataFrame) -> float:
"""Assess data quality score (0-1)."""
if df.empt