Data Anomaly Detector for Construction
Overview
Detect unusual patterns, outliers, and anomalies in construction data. Identify cost overruns, schedule delays, productivity issues, and data quality problems before they impact projects.
Business Case
Construction data often contains anomalies that indicate:
- Cost estimate errors or fraud
- Schedule logic issues
- Productivity problems
- Data entry mistakes
- Equipment or material issues
Early detection prevents costly corrections and project delays.
Technical Implementation
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple
from enum import Enum
import pandas as pd
import numpy as np
from datetime import datetime
from scipy import stats
class AnomalyType(Enum):
OUTLIER = "outlier"
PATTERN_BREAK = "pattern_break"
MISSING_SEQUENCE = "missing_sequence"
DUPLICATE = "duplicate"
IMPOSSIBLE_VALUE = "impossible_value"
TREND_DEVIATION = "trend_deviation"
class AnomalySeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class Anomaly:
id: str
anomaly_type: AnomalyType
severity: AnomalySeverity
field: str
value: Any
expected_range: Optional[Tuple[float, float]] = None
description: str = ""
row_index: Optional[int] = None
detection_method: str = ""
confidence: float = 0.0
suggested_action: str = ""
@dataclass
class AnomalyReport:
source: str
detected_at: datetime
total_records: int
anomalies: List[Anomaly]
summary: Dict[str, int]
class ConstructionAnomalyDetector:
"""Detect anomalies in construction data."""
# Construction-specific thresholds
COST_THRESHOLDS = {
'concrete_per_cy': (200, 800),
'steel_per_ton': (1500, 4000),
'labor_per_hour': (25, 150),
'overhead_percentage': (5, 25),
'contingency_percentage': (3, 20),
}
SCHEDULE_THRESHOLDS = {
'max_activity_duration': 365, # days
'max_lag': 30, # days
'min_productivity': 0.1,
'max_productivity': 10.0,
}
def __init__(self):
self.anomalies: List[Anomaly] = []
self.detection_history: List[AnomalyReport] = []
def detect_cost_anomalies(self, df: pd.DataFrame, cost_column: str,
group_by: str = None) -> List[Anomaly]:
"""Detect anomalies in cost data."""
anomalies = []
# Statistical outlier detection (IQR method)
Q1 = df[cost_column].quantile(0.25)
Q3 = df[cost_column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[cost_column] < lower_bound) | (df[cost_column] > upper_bound)]
for idx, row in outliers.iterrows():
value = row[cost_column]
severity = AnomalySeverity.HIGH if abs(value - df[cost_column].median()) > 3 * IQR else AnomalySeverity.MEDIUM
anomalies.append(Anomaly(
id=f"COST-{idx}",
anomaly_type=AnomalyType.OUTLIER,
severity=severity,
field=cost_column,
value=value,
expected_range=(lower_bound, upper_bound),
description=f"Cost value {value:,.2f} outside expected range",
row_index=idx,
detection_method="IQR",
confidence=0.95,
suggested_action="Review cost estimate for errors"
))
# Negative cost check
negatives = df[df[cost_column] < 0]
for idx, row in negatives.iterrows():
anomalies.append(Anomaly(
id=f"COST-NEG-{idx}",
anomaly_type=AnomalyType.IMPOSSIBLE_VALUE,
severity=AnomalySeverity.CRITICAL,
field=cost_column,
value=row[cost_column],
expected_range=(0, None),
description="Negative cost value detected",
row_index=idx,
detection_method="Business Rule",
confidence=1.0,
suggested_action="Correct data entry error or investigate credit"
))
# Group-based anomalies (if grouped)
if group_by and group_by in df.columns:
group_stats = df.groupby(group_by)[cost_column].agg(['mean', 'std'])
for group_name, stats in group_stats.iterrows():
group_data = df[df[group_by] == group_name]
z_scores = np.abs((group_data[cost_column] - stats['mean']) / stats['std'])
for idx, z in z_scores.items():
if z > 3:
anomalies.append(Anomaly(
id=f"COST-GROUP-{idx}",
anomaly_type=AnomalyType.OUTLIER,
severity=AnomalySeverity.MEDIUM,
field=cost_column,
value=df.loc[idx, cost_column],
description=f"Unusual cost for group {group_name} (z-score: {z:.2f})",
row_index=idx,
detection_method="Z-Score by Group",
confidence=min(z / 5, 1.0)
))
return anomalies
def detect_schedule_anomalies(self, df: pd.DataFrame) -> List[Anomaly]:
"""Detect anomalies in schedule data."""
anomalies = []
# Check for required columns
required = ['start_date', 'end_date']
if not all(col in df.columns for col in required):
return anomalies
# Convert dates
df['start_date'] = pd.to_datetime(df['start_date'])
df['end_date'] = pd.to_datetime(df['end_date'])
# Calculate duration
df['duration'] = (df['end_date'] - df['start_date']).dt.days
# Negative duration (end before start)
negative_duration = df[df['duration'] < 0]
for idx, row in negative_duration.iterrows():
anomalies.append(Anomaly(
id=f"SCHED-NEG-{idx}",
anomaly_type=AnomalyType.IMPOSSIBLE_VALUE,
severity=AnomalySeverity.CRITICAL,
field="duration",
value=row['duration'],
description="End date before start date",
row_index=idx,
detection_method="Business Rule",
confidence=1.0,
suggested_action="Correct dates"
))
# Extremely long durations
long_tasks = df[df['duration'] > self.SCHEDULE_THRESHOLDS['max_activity_duration']]
for idx, row in long_tasks.iterrows():
anomalies.append(Anomaly(
id=f"SCHED-LONG-{idx}",
anomaly_type=AnomalyType.OUTLIER,
severity=AnomalySeverity.MEDIUM,
field="duration",
value=row['duration'],
expected_range=(0, self.SCHEDULE_THRESHOLDS['max_activity_duration']),
description=f"Task duration {row['duration']} days exceeds threshold",
row_index=idx,
detection_method="Threshold",
confidence=0.9,
suggested_action="Review if task should be broken down"
))
# Zero duration non-milestones
if 'is_milestone' in df.columns:
zero_duration = df[(df['duration'] == 0) & (~df['is_milestone'])]
for idx, row in zero_duration.iterrows():
anomalies.append(Anomaly(
id=f"SCHED-ZERO-{idx}",
anomaly_type=AnomalyType.IMPOSSIBLE_VALUE,
severity=AnomalySeverity.HIGH,
field="duration",
value=0,
description="Zero duration task that is not a milestone",
row_index=idx,
detection_method=