diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..4e5beb3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,24 @@ +name: ci + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install deps + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Lint (syntax) + run: | + python -m py_compile keyauth.py main.py diff --git a/README.md b/README.md index 82cdd14..93f8583 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ KeyAuth Python example SDK for https://keyauth.cc license key API auth. +Credits: Nigel (Discord: chefendpoint, Telegram: ELF_Nigel) + ## **Bugs** If you are using our example with no significant changes, and you are having problems, please Report Bug here https://keyauth.cc/app/?page=forms @@ -31,7 +33,29 @@ Thank you for your compliance, we work hard on the development of KeyAuth and do KeyAuth is a powerful cloud-based authentication system designed to protect your software from piracy and unauthorized access. With KeyAuth, you can implement secure licensing, user management, and subscription systems in minutes. Client SDKs available for [C#](https://github.com/KeyAuth/KeyAuth-CSHARP-Example), [C++](https://github.com/KeyAuth/KeyAuth-CPP-Example), [Python](https://github.com/KeyAuth/KeyAuth-Python-Example), [Java](https://github.com/KeyAuth-Archive/KeyAuth-JAVA-api), [JavaScript](https://github.com/mazkdevf/KeyAuth-JS-Example), [VB.NET](https://github.com/KeyAuth/KeyAuth-VB-Example), [PHP](https://github.com/KeyAuth/KeyAuth-PHP-Example), [Rust](https://github.com/KeyAuth/KeyAuth-Rust-Example), [Go](https://github.com/mazkdevf/KeyAuth-Go-Example), [Lua](https://github.com/mazkdevf/KeyAuth-Lua-Examples), [Ruby](https://github.com/mazkdevf/KeyAuth-Ruby-Example), and [Perl](https://github.com/mazkdevf/KeyAuth-Perl-Example). KeyAuth has several unique features such as memory streaming, webhook function where you can send requests to API without leaking the API, discord webhook notifications, ban the user securely through the application at your discretion. Feel free to join https://t.me/keyauth if you have questions or suggestions. > [!TIP] -> https://vaultcord.com FREE Discord bot to Backup server, members, channels, messages & more. Custom verify page, block alt accounts, VPNs & more. +> Use the official KeyAuth examples and docs when troubleshooting to ensure your local code matches the latest API expectations. + +## Optional client-side delays & lockout helpers + +These helpers are client-side only and meant to slow down abuse. They do not replace server-side enforcement. + +```py +# Delay helpers +api.init_fail_delay() +api.bad_input_delay() +api.close_delay() + +# Lockout helpers +if api.lockout_active(): + print(f"Locked out, try again in {api.lockout_remaining_ms()} ms") + exit(1) + +# On failed login attempt: +api.record_login_fail() + +# On successful login: +api.reset_lockout() +``` ## **Customer connection issues?** diff --git a/keyauth.py b/keyauth.py index 4eb9558..b6c36cf 100644 --- a/keyauth.py +++ b/keyauth.py @@ -1,9 +1,10 @@ -import os -import json as jsond # json -import time # sleep before exit -import binascii # hex encoding -import platform # check platform -import subprocess # needed for mac device +import os +import json as jsond # json +import time # sleep before exit +import binascii # hex encoding +import platform # check platform +import subprocess # needed for mac device +import threading import qrcode from datetime import datetime, timezone, timedelta from discord_interactions import verify_key # used for signature verification @@ -28,11 +29,26 @@ os._exit(1) -class api: - - name = ownerid = version = hash_to_check = "" - - def __init__(self, name, ownerid, version, hash_to_check): +class api: + + name = ownerid = version = hash_to_check = "" + init_fail_delay_ms = 1500 + bad_input_delay_ms = 3000 + close_delay_ms = 5000 + _lockout_lock = threading.Lock() + _lockout_state = { + "fail_count": 0, + "first_fail_at": 0.0, + "locked_until": 0.0, + "last_fail_at": 0.0, + } + lockout_settings = { + "max_attempts": 5, + "window_seconds": 120.0, + "lockout_seconds": 300.0, + } + + def __init__(self, name, ownerid, version, hash_to_check): if len(ownerid) != 10: print("Visit https://keyauth.cc/app/, copy Pthon code, and replace code in main.py with that") time.sleep(3) @@ -43,17 +59,20 @@ def __init__(self, name, ownerid, version, hash_to_check): self.ownerid = ownerid self.version = version - self.hash_to_check = hash_to_check - self.init() + self.hash_to_check = hash_to_check + self._ban_monitor_stop = threading.Event() + self._ban_monitor_thread = None + self._ban_monitor_detected = False + self.init() sessionid = enckey = "" initialized = False - def init(self): - if self.sessionid != "": - print("You've already initialized!") - time.sleep(3) - os._exit(1) + def init(self): + if self.sessionid != "": + print("You've already initialized!") + time.sleep(3) + os._exit(1) post_data = { "type": "init", @@ -89,8 +108,70 @@ def init(self): time.sleep(3) os._exit(1) - self.sessionid = json["sessionid"] - self.initialized = True + self.sessionid = json["sessionid"] + self.initialized = True + + @staticmethod + def init_fail_delay(): + time.sleep(api.init_fail_delay_ms / 1000.0) + + @staticmethod + def bad_input_delay(): + time.sleep(api.bad_input_delay_ms / 1000.0) + + @staticmethod + def close_delay(): + time.sleep(api.close_delay_ms / 1000.0) + + @staticmethod + def lockout_state_snapshot(): + with api._lockout_lock: + return dict(api._lockout_state) + + @staticmethod + def lockout_active(): + with api._lockout_lock: + locked_until = api._lockout_state["locked_until"] + if locked_until <= 0: + return False + return time.time() < locked_until + + @staticmethod + def lockout_remaining_ms(): + with api._lockout_lock: + locked_until = api._lockout_state["locked_until"] + if locked_until <= 0: + return 0 + remaining = locked_until - time.time() + if remaining < 0: + return 0 + return int(remaining * 1000) + + @staticmethod + def record_login_fail(): + with api._lockout_lock: + now = time.time() + api._lockout_state["last_fail_at"] = now + + first_fail_at = api._lockout_state["first_fail_at"] + if first_fail_at <= 0 or (now - first_fail_at) > api.lockout_settings["window_seconds"]: + api._lockout_state["first_fail_at"] = now + api._lockout_state["fail_count"] = 1 + return + + api._lockout_state["fail_count"] += 1 + if api._lockout_state["fail_count"] >= api.lockout_settings["max_attempts"]: + api._lockout_state["locked_until"] = now + api.lockout_settings["lockout_seconds"] + + @staticmethod + def reset_lockout(): + with api._lockout_lock: + api._lockout_state = { + "fail_count": 0, + "first_fail_at": 0.0, + "locked_until": 0.0, + "last_fail_at": 0.0, + } def register(self, user, password, license, hwid=None): self.checkinit() @@ -354,8 +435,8 @@ def check(self): else: return False - def checkblacklist(self): - self.checkinit() + def checkblacklist(self): + self.checkinit() hwid = others.get_hwid() post_data = { @@ -368,10 +449,50 @@ def checkblacklist(self): response = self.__do_request(post_data) json = jsond.loads(response) - if json["success"]: - return True - else: - return False + if json["success"]: + return True + else: + return False + + def start_ban_monitor(self, interval_seconds=120, check_session=True, on_ban=None): + if interval_seconds < 1: + interval_seconds = 1 + + if self._ban_monitor_thread and self._ban_monitor_thread.is_alive(): + return + + self._ban_monitor_detected = False + self._ban_monitor_stop.clear() + + def _loop(): + while not self._ban_monitor_stop.is_set(): + if check_session and not self.check(): + self._ban_monitor_detected = True + if on_ban: + on_ban() + return + + if self.checkblacklist(): + self._ban_monitor_detected = True + if on_ban: + on_ban() + return + + self._ban_monitor_stop.wait(interval_seconds) + + self._ban_monitor_thread = threading.Thread(target=_loop, daemon=True) + self._ban_monitor_thread.start() + + def stop_ban_monitor(self): + self._ban_monitor_stop.set() + if self._ban_monitor_thread and self._ban_monitor_thread.is_alive(): + self._ban_monitor_thread.join(timeout=2) + + def ban_monitor_running(self): + return self._ban_monitor_thread is not None and self._ban_monitor_thread.is_alive() + + def ban_monitor_detected(self): + return self._ban_monitor_detected def log(self, message): self.checkinit() @@ -692,4 +813,4 @@ def get_hwid(): output = subprocess.Popen("ioreg -l | grep IOPlatformSerialNumber", stdout=subprocess.PIPE, shell=True).communicate()[0] serial = output.decode().split('=', 1)[1].replace(' ', '') hwid = serial[1:-2] - return hwid \ No newline at end of file + return hwid diff --git a/main.py b/main.py index 6fc86d7..cac0909 100644 --- a/main.py +++ b/main.py @@ -53,37 +53,46 @@ def getchecksum(): hash_to_check = getchecksum() ) -def answer(): - try: - print("""1.Login -2.Register -3.Upgrade -4.License Key Only - """) +def answer(): + try: + if api.lockout_active(): + print(f"Locked out, try again in {api.lockout_remaining_ms()} ms") + api.close_delay() + os._exit(1) + + print("""1.Login +2.Register +3.Upgrade +4.License Key Only + """) ans = input("Select Option: ") - if ans == "1": - user = input('Provide username: ') - password = input('Provide password: ') - code = input('Enter 2fa code: (not using 2fa? Just press enter)') - keyauthapp.login(user, password, code) - elif ans == "2": - user = input('Provide username: ') - password = input('Provide password: ') - license = input('Provide License: ') - keyauthapp.register(user, password, license) - elif ans == "3": - user = input('Provide username: ') - license = input('Provide License: ') - keyauthapp.upgrade(user, license) - elif ans == "4": - key = input('Enter your license: ') - code = input('Enter 2fa code: (not using 2fa? Just press enter)') - keyauthapp.license(key, code) - else: - print("\nInvalid option") - sleep(1) - clear() - answer() + if ans == "1": + user = input('Provide username: ') + password = input('Provide password: ') + code = input('Enter 2fa code: (not using 2fa? Just press enter)') + keyauthapp.login(user, password, code) + api.reset_lockout() + elif ans == "2": + user = input('Provide username: ') + password = input('Provide password: ') + license = input('Provide License: ') + keyauthapp.register(user, password, license) + api.reset_lockout() + elif ans == "3": + user = input('Provide username: ') + license = input('Provide License: ') + keyauthapp.upgrade(user, license) + api.reset_lockout() + elif ans == "4": + key = input('Enter your license: ') + code = input('Enter 2fa code: (not using 2fa? Just press enter)') + keyauthapp.license(key, code) + api.reset_lockout() + else: + print("\nInvalid option") + api.bad_input_delay() + clear() + answer() except KeyboardInterrupt: os._exit(1) diff --git a/requirements.txt b/requirements.txt index b57953d..7d14c59 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ requests -pywin32 +pywin32; platform_system == "Windows" discord-interactions qrcode pillow