phishguard-api / email_analyzer.py
prashanth135's picture
Upload 38 files
bebe233 verified
# ============================================================
# PhishGuard AI - email_analyzer.py
# Analyzes raw emails for phishing indicators.
# Checks: sender authentication (SPF/DKIM/DMARC),
# brand spoofing, urgency language, and embedded links.
#
# Reuses BERT model from bert_analyzer to avoid duplicate loading.
# ============================================================
import email
import re
from email import policy
from email.parser import BytesParser, Parser
# Reuse the NLP analyzer from bert_analyzer
from bert_analyzer import analyze_text as bert_analyze_text, _ensure_bert_loaded
import bert_analyzer
print("[PhishGuard] Email analyzer initialized (reusing shared NLP)")
URGENCY_PATTERNS = [
r'(act now|immediate action|urgent|verify immediately|account suspended)',
r'(click here to (verify|confirm|update|restore))',
r'(your account (will be|has been) (suspended|closed|deactivated))',
r'(limited time|expires in \d+ hours?)',
r'(unusual (sign-in|login|activity) detected)',
r'(confirm your (identity|password|email|account))',
r'(we noticed (suspicious|unusual|unauthorized))',
]
BRAND_SPOOFS = [
'paypal','amazon','apple','microsoft','google','netflix',
'facebook','instagram','linkedin','twitter','chase','wellsfargo',
'bankofamerica','citibank','irs','fedex','ups','dhl',
'dropbox','docusign','zoom','office365','hdfc','icici','sbi'
]
def parse_email_msg(raw):
"""Parse raw email bytes or string into an email.message object."""
if isinstance(raw, bytes):
return BytesParser(policy=policy.default).parsebytes(raw)
return Parser(policy=policy.default).parsestr(raw)
def extract_urls(text: str) -> list:
"""Extract all unique HTTP/HTTPS URLs from text."""
return list(set(re.findall(r'https?://[^\s<>"\'\\ ]+', text)))
def get_body(msg) -> str:
"""Extract plain text body from email message, falling back to HTML stripped of tags."""
parts = []
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == 'text/plain':
try: parts.append(part.get_content())
except: pass
elif ct == 'text/html' and not parts:
try: parts.append(re.sub(r'<[^>]+>', ' ', part.get_content()))
except: pass
else:
try: parts.append(msg.get_content())
except: pass
return ' '.join(parts)
def check_sender_auth(msg) -> dict:
"""
Check email authentication headers:
- SPF (Sender Policy Framework)
- DKIM (DomainKeys Identified Mail)
- DMARC (Domain-based Message Authentication)
- From/Return-Path domain mismatch
- Free email provider usage
"""
auth = msg.get('Authentication-Results', '').lower()
spf_raw = msg.get('Received-SPF', '').lower()
spf_pass = 'spf=pass' in auth or 'pass' in spf_raw
dkim_pass = 'dkim=pass' in auth
dmarc_pass= 'dmarc=pass'in auth
from_addr = msg.get('From', '')
return_path = msg.get('Return-Path', '')
from_dom = re.search(r'@([\w.-]+)', from_addr)
ret_dom = re.search(r'@([\w.-]+)', return_path)
mismatch = bool(from_dom and ret_dom and
from_dom.group(1) != ret_dom.group(1))
free = {'gmail.com','yahoo.com','hotmail.com','outlook.com','protonmail.com'}
using_free = (from_dom.group(1).lower() in free) if from_dom else False
risk = 0
if not spf_pass: risk += 25
if not dkim_pass: risk += 20
if not dmarc_pass: risk += 15
if mismatch: risk += 30
if using_free: risk += 10
return {
"spf_pass": spf_pass, "dkim_pass": dkim_pass,
"dmarc_pass": dmarc_pass, "domain_mismatch": mismatch,
"using_free_email": using_free,
"auth_risk_score": min(risk, 100)
}
def check_brand_spoofing(subject: str, body: str, sender: str) -> dict:
"""Detect brand names mentioned in email content but not matching sender domain."""
combined = (subject + ' ' + body + ' ' + sender).lower()
sender_dom = re.search(r'@([\w.-]+)', sender)
s_dom = sender_dom.group(1).lower() if sender_dom else ''
spoofed = [b for b in BRAND_SPOOFS
if b in combined and b not in s_dom]
return {
"brand_spoof_detected": bool(spoofed),
"spoofed_brands": spoofed
}
def check_urgency(text: str) -> dict:
"""Detect urgency/pressure language patterns typical of phishing emails."""
matches = []
for pat in URGENCY_PATTERNS:
found = re.findall(pat, text.lower())
matches.extend(found)
return {
"urgency_detected": bool(matches),
"urgency_matches": [str(m) for m in matches[:5]],
"urgency_score": min(len(matches) * 15, 60)
}
def bert_score(text: str) -> float:
"""Run NLP classifier on email text and return phishing probability."""
if not text.strip():
return 0.1
try:
_ensure_bert_loaded()
if bert_analyzer._use_bert and bert_analyzer._classifier is not None:
result = bert_analyzer._classifier(text[:512])[0]
label = result['label'].upper()
score = result['score']
return score if ('SPAM' in label or label == 'LABEL_1') else 1 - score
else:
# Use keyword analysis from bert_analyzer
result = bert_analyze_text("", "", text)
return result.get("bert_phishing_prob", 0.3)
except:
return 0.3
def analyze_email(raw, return_urls: bool = True) -> dict:
"""
Full phishing analysis of a raw email.
Pass raw bytes or a string of the full email.
Combines: BERT NLP score + sender auth + brand spoofing + urgency detection.
"""
msg = parse_email_msg(raw)
subject = msg.get('Subject', '')
sender = msg.get('From', '')
body = get_body(msg)
urls = extract_urls(body)
auth = check_sender_auth(msg)
brand = check_brand_spoofing(subject, body, sender)
urgency = check_urgency(subject + ' ' + body)
bert_p = bert_score(subject + '. ' + body[:400])
raw_score = (bert_p * 40 +
auth['auth_risk_score'] * 0.30 +
urgency['urgency_score'] * 0.20 +
(30 if brand['brand_spoof_detected'] else 0) * 0.10)
final = min(raw_score / 100, 1.0)
result = {
"is_phishing": final > 0.60,
"phishing_probability": round(final, 4),
"subject": subject,
"sender": sender,
"auth_analysis": auth,
"brand_analysis": brand,
"urgency_analysis": urgency,
"bert_score": round(bert_p, 4),
"extracted_url_count": len(urls),
}
if return_urls:
result["extracted_urls"] = urls[:20]
return result