Spaces:
Running
Running
| # ============================================================ | |
| # PhishGuard AI - email_analyzer.py | |
| # Analyzes raw emails for phishing indicators. | |
| # Checks: sender authentication (SPF/DKIM/DMARC), | |
| # brand spoofing, urgency language, and embedded links. | |
| # | |
| # Reuses BERT model from bert_analyzer to avoid duplicate loading. | |
| # ============================================================ | |
| import email | |
| import re | |
| from email import policy | |
| from email.parser import BytesParser, Parser | |
| # Reuse the NLP analyzer from bert_analyzer | |
| from bert_analyzer import analyze_text as bert_analyze_text, _ensure_bert_loaded | |
| import bert_analyzer | |
| print("[PhishGuard] Email analyzer initialized (reusing shared NLP)") | |
| URGENCY_PATTERNS = [ | |
| r'(act now|immediate action|urgent|verify immediately|account suspended)', | |
| r'(click here to (verify|confirm|update|restore))', | |
| r'(your account (will be|has been) (suspended|closed|deactivated))', | |
| r'(limited time|expires in \d+ hours?)', | |
| r'(unusual (sign-in|login|activity) detected)', | |
| r'(confirm your (identity|password|email|account))', | |
| r'(we noticed (suspicious|unusual|unauthorized))', | |
| ] | |
| BRAND_SPOOFS = [ | |
| 'paypal','amazon','apple','microsoft','google','netflix', | |
| 'facebook','instagram','linkedin','twitter','chase','wellsfargo', | |
| 'bankofamerica','citibank','irs','fedex','ups','dhl', | |
| 'dropbox','docusign','zoom','office365','hdfc','icici','sbi' | |
| ] | |
| def parse_email_msg(raw): | |
| """Parse raw email bytes or string into an email.message object.""" | |
| if isinstance(raw, bytes): | |
| return BytesParser(policy=policy.default).parsebytes(raw) | |
| return Parser(policy=policy.default).parsestr(raw) | |
| def extract_urls(text: str) -> list: | |
| """Extract all unique HTTP/HTTPS URLs from text.""" | |
| return list(set(re.findall(r'https?://[^\s<>"\'\\ ]+', text))) | |
| def get_body(msg) -> str: | |
| """Extract plain text body from email message, falling back to HTML stripped of tags.""" | |
| parts = [] | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| ct = part.get_content_type() | |
| if ct == 'text/plain': | |
| try: parts.append(part.get_content()) | |
| except: pass | |
| elif ct == 'text/html' and not parts: | |
| try: parts.append(re.sub(r'<[^>]+>', ' ', part.get_content())) | |
| except: pass | |
| else: | |
| try: parts.append(msg.get_content()) | |
| except: pass | |
| return ' '.join(parts) | |
| def check_sender_auth(msg) -> dict: | |
| """ | |
| Check email authentication headers: | |
| - SPF (Sender Policy Framework) | |
| - DKIM (DomainKeys Identified Mail) | |
| - DMARC (Domain-based Message Authentication) | |
| - From/Return-Path domain mismatch | |
| - Free email provider usage | |
| """ | |
| auth = msg.get('Authentication-Results', '').lower() | |
| spf_raw = msg.get('Received-SPF', '').lower() | |
| spf_pass = 'spf=pass' in auth or 'pass' in spf_raw | |
| dkim_pass = 'dkim=pass' in auth | |
| dmarc_pass= 'dmarc=pass'in auth | |
| from_addr = msg.get('From', '') | |
| return_path = msg.get('Return-Path', '') | |
| from_dom = re.search(r'@([\w.-]+)', from_addr) | |
| ret_dom = re.search(r'@([\w.-]+)', return_path) | |
| mismatch = bool(from_dom and ret_dom and | |
| from_dom.group(1) != ret_dom.group(1)) | |
| free = {'gmail.com','yahoo.com','hotmail.com','outlook.com','protonmail.com'} | |
| using_free = (from_dom.group(1).lower() in free) if from_dom else False | |
| risk = 0 | |
| if not spf_pass: risk += 25 | |
| if not dkim_pass: risk += 20 | |
| if not dmarc_pass: risk += 15 | |
| if mismatch: risk += 30 | |
| if using_free: risk += 10 | |
| return { | |
| "spf_pass": spf_pass, "dkim_pass": dkim_pass, | |
| "dmarc_pass": dmarc_pass, "domain_mismatch": mismatch, | |
| "using_free_email": using_free, | |
| "auth_risk_score": min(risk, 100) | |
| } | |
| def check_brand_spoofing(subject: str, body: str, sender: str) -> dict: | |
| """Detect brand names mentioned in email content but not matching sender domain.""" | |
| combined = (subject + ' ' + body + ' ' + sender).lower() | |
| sender_dom = re.search(r'@([\w.-]+)', sender) | |
| s_dom = sender_dom.group(1).lower() if sender_dom else '' | |
| spoofed = [b for b in BRAND_SPOOFS | |
| if b in combined and b not in s_dom] | |
| return { | |
| "brand_spoof_detected": bool(spoofed), | |
| "spoofed_brands": spoofed | |
| } | |
| def check_urgency(text: str) -> dict: | |
| """Detect urgency/pressure language patterns typical of phishing emails.""" | |
| matches = [] | |
| for pat in URGENCY_PATTERNS: | |
| found = re.findall(pat, text.lower()) | |
| matches.extend(found) | |
| return { | |
| "urgency_detected": bool(matches), | |
| "urgency_matches": [str(m) for m in matches[:5]], | |
| "urgency_score": min(len(matches) * 15, 60) | |
| } | |
| def bert_score(text: str) -> float: | |
| """Run NLP classifier on email text and return phishing probability.""" | |
| if not text.strip(): | |
| return 0.1 | |
| try: | |
| _ensure_bert_loaded() | |
| if bert_analyzer._use_bert and bert_analyzer._classifier is not None: | |
| result = bert_analyzer._classifier(text[:512])[0] | |
| label = result['label'].upper() | |
| score = result['score'] | |
| return score if ('SPAM' in label or label == 'LABEL_1') else 1 - score | |
| else: | |
| # Use keyword analysis from bert_analyzer | |
| result = bert_analyze_text("", "", text) | |
| return result.get("bert_phishing_prob", 0.3) | |
| except: | |
| return 0.3 | |
| def analyze_email(raw, return_urls: bool = True) -> dict: | |
| """ | |
| Full phishing analysis of a raw email. | |
| Pass raw bytes or a string of the full email. | |
| Combines: BERT NLP score + sender auth + brand spoofing + urgency detection. | |
| """ | |
| msg = parse_email_msg(raw) | |
| subject = msg.get('Subject', '') | |
| sender = msg.get('From', '') | |
| body = get_body(msg) | |
| urls = extract_urls(body) | |
| auth = check_sender_auth(msg) | |
| brand = check_brand_spoofing(subject, body, sender) | |
| urgency = check_urgency(subject + ' ' + body) | |
| bert_p = bert_score(subject + '. ' + body[:400]) | |
| raw_score = (bert_p * 40 + | |
| auth['auth_risk_score'] * 0.30 + | |
| urgency['urgency_score'] * 0.20 + | |
| (30 if brand['brand_spoof_detected'] else 0) * 0.10) | |
| final = min(raw_score / 100, 1.0) | |
| result = { | |
| "is_phishing": final > 0.60, | |
| "phishing_probability": round(final, 4), | |
| "subject": subject, | |
| "sender": sender, | |
| "auth_analysis": auth, | |
| "brand_analysis": brand, | |
| "urgency_analysis": urgency, | |
| "bert_score": round(bert_p, 4), | |
| "extracted_url_count": len(urls), | |
| } | |
| if return_urls: | |
| result["extracted_urls"] = urls[:20] | |
| return result | |