Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
__pycache__
results
rm_cache.py

.env
38 changes: 30 additions & 8 deletions instaspyder/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# __main__.py
import sys
import asyncio
import argparse
Expand All @@ -22,9 +23,15 @@ async def cleanup_on_exit():
save_cumulative_results_for_keyword(kw, current_all_found_matches)
await close_async_client()

async def run_search_async(seed=None, keywords=None, depth_arg=None):
async def run_search_async(seed=None, keywords=None, depth_arg=None, face_mode=False, dump_mode=False):
global current_visited_users, current_all_found_matches, initial_seed_username_global, search_keywords_global

if face_mode and not keywords and seed:
print(f"\n{Y}[!] Standalone Face Mode: Skipping API search, using cache for @{seed}...{X}")
from instaspyder.core.face_engine import run_face_recognition
await run_face_recognition(seed, face_mode)
return

initialize_search_environment()

if seed is None or keywords is None or depth_arg is None:
Expand All @@ -45,17 +52,27 @@ async def run_search_async(seed=None, keywords=None, depth_arg=None):
await init_async_client()

try:
print(f"\n{G}Starting search from {C}@{initial_seed_username}{G} (Limit: {depth_arg} depths) for keywords: {Y}{', '.join(search_keywords)}{X}")
mode_str = f" {Y}[Face Mode Active]{G}" if face_mode else ""
dump_str = f" {C}[Dumping Raw Data]{G}" if dump_mode else "" # Visual feedback
print(f"\n{G}Starting search from {C}@{initial_seed_username}{G}{mode_str}{dump_str}...")

await recursive_chain_search_async(
initial_seed_username,
search_keywords,
current_visited_users,
current_all_found_matches,
depth=0,
depth_limit=int(depth_arg)
depth_limit=int(depth_arg),
dump_mode=dump_mode
)
print(f"\n{G}Overall search completed successfully.{X}")

# Trigger Face Recognition after search if flag was provided
if face_mode:
from instaspyder.core.face_engine import run_face_recognition
print(f"\n{Y}[!] Starting Face Recognition Engine...{X}")
await run_face_recognition(initial_seed_username, face_mode)

except Exception as e:
if "Instagram Block" in str(e):
print(f"\n{R}Search halted by Instagram security. State NOT saved to prevent corruption.{X}")
Expand All @@ -73,6 +90,8 @@ def main():
parser.add_argument("-k", "--keywords", help="Comma-separated keywords")
parser.add_argument("-d", "--depth", type=int, help="Search depth (overrides config)")
parser.add_argument("-c", "--config", action="store_true", help="Opens interactive configuration menu")
parser.add_argument("-f", "--face", type=str, help="Path to target image for face recognition search")
parser.add_argument("--dump", action="store_true", help="Dump raw metadata into hidden cache for later use")

args = parser.parse_args()

Expand All @@ -90,11 +109,9 @@ def main():
print(f"{R}Still no headers found. Exiting...{X}")
sys.exit(1)


config = get_config()
default_depth = config.get("max_depth", 2)


if args.depth is not None and args.depth > 2:
print(f"\n{R} You are using aggressive depth{X} (i.e. {C}{args.depth}{X}) {R}It may flag you session cookies{X}")
choice = input(f"\n{Y} Do you want to continue? (y/n): ").lower().strip()
Expand All @@ -119,10 +136,15 @@ def main():
cli_keywords = [k.strip() for k in args.keywords.split(',') if k.strip()]

try:
if args.seed and cli_keywords:
asyncio.run(run_search_async(seed=args.seed, keywords=cli_keywords, depth_arg=args.depth))
if args.seed and args.face and not cli_keywords:
asyncio.run(run_search_async(seed=args.seed, keywords=None, face_mode=args.face))

elif args.seed and cli_keywords:
asyncio.run(run_search_async(seed=args.seed, keywords=cli_keywords, depth_arg=args.depth, face_mode=args.face, dump_mode=args.dump))

else:
asyncio.run(run_search_async(depth_arg=args.depth))
asyncio.run(run_search_async(depth_arg=args.depth, face_mode=args.face, dump_mode=args.dump))

except KeyboardInterrupt:
print(f"\n{R}Ctrl+C detected. Exiting gracefully...{X}")
sys.exit(0)
Expand Down
6 changes: 5 additions & 1 deletion instaspyder/core/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
USER_HOME = Path.home() / ".instaspyder"
CONFIG_FILE = USER_HOME / "config.json"
HEADERS_FILE = USER_HOME / "headers.json"
CACHE_DIR = USER_HOME / ".cache"

DEFAULT_CONFIG = {
"max_depth": 2,
"results_dir": str(USER_HOME / "results"),
"save_state": True
"save_state": True,
"enable_face_recognition": False
}


def init_env():
"""Ensures the config folder and files exist."""
USER_HOME.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)
if not CONFIG_FILE.exists():
with open(CONFIG_FILE, 'w') as f:
json.dump(DEFAULT_CONFIG, f, indent=4)
Expand Down
114 changes: 114 additions & 0 deletions instaspyder/core/face_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# instaspyder/core/face_engine.py
import face_recognition
import httpx
import json
import os
import gc
from instaspyder.utils.colors import G, R, Y, C, X
from instaspyder.core.config_manager import CACHE_DIR, USER_HOME

async def run_face_recognition(seed_username, target_image_path):
cache_file = os.path.join(CACHE_DIR, f"metadata_{seed_username}.json")
progress_file = os.path.join(CACHE_DIR, f"progress_{seed_username}.json")
match_log_file = os.path.join(CACHE_DIR, f"matches_{seed_username}.json")

temp_dir = USER_HOME / "temp"
temp_dir.mkdir(parents=True, exist_ok=True)
temp_path = str(temp_dir / "current_face.jpg")

if not os.path.exists(cache_file):
print(f"{R}[-] No cache found for @{seed_username}.{X}")
return


try:
target_img = face_recognition.load_image_file(target_image_path)
target_encoding = face_recognition.face_encodings(target_img)[0]
except Exception as e:
print(f"{R}[-] Target Error: {e}{X}")
return

with open(cache_file, "r") as f:
candidates = json.load(f)

processed_ids = set()
if os.path.exists(progress_file):
with open(progress_file, "r") as f:
processed_ids = set(json.load(f))

all_potential_matches = []
if os.path.exists(match_log_file):
with open(match_log_file, "r") as f:
all_potential_matches = json.load(f)

print(f"{G}[+] Resuming scan. Found {len(all_potential_matches)} previous matches:{X}")
for m in all_potential_matches:
print(f" {C}[PREVIOUS MATCH] @{m['username']} ({m['confidence']:.2f}% similarity){X}")

print(f"\n{Y}[i] Total: {len(candidates)} | Remaining: {len(candidates) - len(processed_ids)}{X}\n")

async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
for i, user in enumerate(candidates, 1):
user_id = str(user.get("pk") or user.get("id"))
if user_id in processed_ids:
continue

url = user.get("hd_pic_url") or user.get("profile_pic_url")
if not url: continue

print(f"\r{X}[{i}/{len(candidates)}] Testing @{user['username']}... ", end="", flush=True)

try:
resp = await client.get(url)
if resp.status_code == 200:
with open(temp_path, "wb") as tmp:
tmp.write(resp.content)

unknown_img = face_recognition.load_image_file(temp_path)
unknown_encs = face_recognition.face_encodings(unknown_img)

if unknown_encs:
dist = face_recognition.face_distance([target_encoding], unknown_encs[0])[0]
if dist <= 0.55:
conf = max(0, (1 - dist / 0.6) * 100)
new_match = {
"username": user['username'],
"full_name": user.get('full_name'),
"distance": dist,
"confidence": conf
}
all_potential_matches.append(new_match)

with open(match_log_file, "w") as f:
json.dump(all_potential_matches, f, indent=4)

print(f"\n{G}[LIVE MATCH] @{user['username']} ({conf:.2f}% similarity){X}")


processed_ids.add(user_id)
with open(progress_file, "w") as f:
json.dump(list(processed_ids), f)

del unknown_img
gc.collect()
except KeyboardInterrupt:
print(f"\n{Y}[!] Pausing scan. State saved.{X}")
return
except:
continue


if os.path.exists(temp_path): os.remove(temp_path)

print("\n" + "="*60)
print(f"{Y} FINAL REPORT (Sorted by Probability) {X}")
print("="*60)

sorted_matches = sorted(all_potential_matches, key=lambda x: x['confidence'], reverse=True)
for match in sorted_matches:
color = G if match['confidence'] > 50 else Y
print(f"{color}[{match['confidence']:.2f}%]{X} @{match['username']} - {match['full_name']}")


if os.path.exists(progress_file): os.remove(progress_file)
if os.path.exists(match_log_file): os.remove(match_log_file)
25 changes: 19 additions & 6 deletions instaspyder/core/search_logic.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# search_logic.py (Modified for Async - cleaner output)
# search_logic.py (Modified for Async - Deep Metadata Dumping)
import asyncio
import random
from instaspyder.core.instagram_api import fetch_chain_async
from instaspyder.core.user_id_fetcher import get_user_id
from instaspyder.core.config_manager import get_config
from instaspyder.core.state_manager import silent_cache_metadata
from instaspyder.utils.sanitize_text import sanitize_text
from instaspyder.utils.colors import C, G, R, Y, X

async def recursive_chain_search_async(username, keywords_to_match, visited_users, all_found_matches, depth=0, known_user_id=None, depth_limit=None):
async def recursive_chain_search_async(username, keywords_to_match, visited_users, all_found_matches, depth=0, known_user_id=None, depth_limit=None, dump_mode=False, master_seed=None):
indent = " " * depth


if master_seed is None:
master_seed = username

# Fallback logic: if no limit was passed from main, check config
if depth_limit is None:
config = get_config()
Expand All @@ -34,17 +39,22 @@ async def recursive_chain_search_async(username, keywords_to_match, visited_user

users_in_chain = await fetch_chain_async(user_id)


if isinstance(users_in_chain, str) and "_ERROR" in users_in_chain:
raise Exception(f"Instagram Block: {users_in_chain}")

if not users_in_chain:
print(f"{indent}{Y}No users found in chain for @{username}{X}")
return

if dump_mode:
silent_cache_metadata(master_seed, users_in_chain)


for user_data in users_in_chain:
uname = user_data.get("username", "")
fname = user_data.get("full_name", "")
uid = user_data.get("id")
uid = user_data.get("pk") or user_data.get("id")

if not uname or not uid:
continue
Expand Down Expand Up @@ -74,19 +84,22 @@ async def recursive_chain_search_async(username, keywords_to_match, visited_user
print(f" Found via: @{username} (Depth {depth})")
print(f"{G}" + "="*60 + f"{X}\n")


if depth < depth_limit:
tasks = []
for user_data in users_in_chain:
await asyncio.sleep(random.uniform(0.1, 0.2))
await asyncio.sleep(random.uniform(0.1, 0.2))
tasks.append(
recursive_chain_search_async(
user_data["username"],
keywords_to_match,
visited_users,
all_found_matches,
depth + 1,
known_user_id=user_data.get("id"),
depth_limit=depth_limit # Pass the limit down the chain
known_user_id=user_data.get("pk") or user_data.get("id"),
depth_limit=depth_limit,
dump_mode=dump_mode,
master_seed=master_seed
)
)
if tasks:
Expand Down
Loading