-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_new_contacts.py
More file actions
104 lines (81 loc) · 3.51 KB
/
process_new_contacts.py
File metadata and controls
104 lines (81 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
import csv
import re
import dns.resolver
import os
import sys
def is_corporate_domain(email):
"""Check if email domain is corporate (not personal)"""
personal_domains = ['gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 'aol.com', 'icloud.com', 'rediffmail.com', 'live.com']
domain = email.split('@')[-1]
return domain not in personal_domains
def has_mx_record(domain):
"""Check if domain has valid MX record"""
try:
answers = dns.resolver.resolve(domain, 'MX')
return len(answers) > 0
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers):
return False
def clean_new_csv(input_csv, output_csv):
"""Clean the new CSV file and extract valid corporate emails"""
cleaned_rows = []
seen_emails = set()
print(f"Processing {input_csv}...")
with open(input_csv, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
for row_num, row in enumerate(reader, 1):
# Extract email from the 'Email' column
email = row.get('Email', '').strip().lower()
if not email:
continue
# Extract first name and company
first_name = row.get('First Name', 'HR').strip()
company = row.get('Company Name', '').strip()
# Basic email format validation
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
print(f"Row {row_num}: Invalid email format: {email}")
continue
# Check for duplicates
if email in seen_emails:
print(f"Row {row_num}: Duplicate email: {email}")
continue
# Check if corporate domain
if not is_corporate_domain(email):
print(f"Row {row_num}: Personal email domain: {email}")
continue
# Check MX record
domain = email.split('@')[-1]
if not has_mx_record(domain):
print(f"Row {row_num}: No MX record for domain: {domain}")
continue
# Add to cleaned list
cleaned_rows.append({
'email': email,
'first_name': first_name,
'company': company
})
seen_emails.add(email)
if row_num % 50 == 0:
print(f"Processed {row_num} rows, found {len(cleaned_rows)} valid contacts...")
# Write cleaned data
with open(output_csv, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.DictWriter(outfile, fieldnames=['email', 'first_name', 'company'])
writer.writeheader()
writer.writerows(cleaned_rows)
print(f"✅ Cleaned contacts: {len(cleaned_rows)}")
print(f"📁 Saved to: {output_csv}")
return len(cleaned_rows)
def main():
input_csv = "new_hr_contacts.csv"
output_csv = "cleaned_new_contacts.csv"
if not os.path.exists(input_csv):
print(f"❌ Input file not found: {input_csv}")
sys.exit(1)
cleaned_count = clean_new_csv(input_csv, output_csv)
if cleaned_count == 0:
print("❌ No valid contacts found after cleaning.")
sys.exit(1)
print(f"\n🎯 Ready to send emails to {cleaned_count} valid HR contacts!")
print(f"📧 Use: python send_job_applications.py --csv {output_csv} --from junejatarandeepsingh@gmail.com")
if __name__ == "__main__":
main()