From 128881591518cbfc450f3746425bdc41548f2447 Mon Sep 17 00:00:00 2001 From: Judah Paul Date: Sun, 12 Apr 2026 22:05:12 -0400 Subject: [PATCH] Restructure package and modernize build pipeline Split monolithic __init__.py into _base.py, _legacy.py, _typed.py, and _types.py. Add typed API returning dataclasses with exception- based error handling. Legacy dict-returning API preserved. Migrate from setup.py to pyproject.toml. Drop urllib3 dependency, require Python >=3.10. Delete setup.py, setup.cfg, MANIFEST.in, requirements.txt, .bumpversion.cfg, and old test example scripts. Rewrite test suite with HTTP-layer mocking instead of method-level mocking. Gate build/publish on release event, extract version from release tag. Switch to pypa/gh-action-pypi-publish and python -m build. Fix .readthedocs.yaml to install from pyproject.toml. Update README, API reference, and all docs with typed API methods, exceptions, dataclasses, enums, and basic auth constructor params. Fix PyPI badge URL. --- .bumpversion.cfg | 6 - .github/workflows/workflow.yaml | 81 ++--- .gitignore | 20 +- .readthedocs.yaml | 13 +- MANIFEST.in | 2 - README.md | 194 ++++++---- docs/api_reference.md | 283 +++++++++++---- docs/getting_started.md | 32 +- docs/index.md | 25 +- docs/usage/operations.md | 149 ++++++-- pyproject.toml | 50 +++ requirements.txt | 4 - setup.cfg | 2 - setup.py | 86 ----- tests/list_backup_counts.py | 42 --- tests/stop_all.py | 12 - tests/urbackup_api_test.py | 626 ++++++++++++++++++++++++-------- urbackup/__init__.py | 525 +++----------------------- urbackup/_base.py | 174 +++++++++ urbackup/_legacy.py | 258 +++++++++++++ urbackup/_typed.py | 261 +++++++++++++ urbackup/_types.py | 220 +++++++++++ 22 files changed, 2039 insertions(+), 1026 deletions(-) delete mode 100644 .bumpversion.cfg delete mode 100644 MANIFEST.in create mode 100644 pyproject.toml delete mode 100644 requirements.txt delete mode 100644 setup.cfg delete mode 100644 setup.py delete mode 100644 tests/list_backup_counts.py delete mode 100644 tests/stop_all.py create mode 100644 urbackup/_base.py create mode 100644 urbackup/_legacy.py create mode 100644 urbackup/_typed.py create mode 100644 urbackup/_types.py diff --git a/.bumpversion.cfg b/.bumpversion.cfg deleted file mode 100644 index 9591f02..0000000 --- a/.bumpversion.cfg +++ /dev/null @@ -1,6 +0,0 @@ -[bumpversion] -current_version = 0.1.1 -commit = True -tag = True - -[bumpversion:file:setup.py] \ No newline at end of file diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml index 7c9c56e..e7fcfb7 100644 --- a/.github/workflows/workflow.yaml +++ b/.github/workflows/workflow.yaml @@ -15,33 +15,32 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, '3.10', 3.11, 3.12] + python-version: ['3.10', '3.11', '3.12', '3.13'] steps: - uses: actions/checkout@v4 - + - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - + - name: Cache pip uses: actions/cache@v4 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + key: ${{ runner.os }}-pip-${{ hashFiles('pyproject.toml') }} restore-keys: | ${{ runner.os }}-pip- - + - name: Install dependencies run: | pip install --upgrade pip - pip install -r requirements.txt - pip install pytest coverage coveralls + pip install -e ".[dev]" - name: Run tests with coverage run: | - coverage run --source=urbackup -m pytest tests/ + coverage run -m pytest tests/ coverage report -m - name: Coveralls @@ -53,29 +52,28 @@ jobs: build: needs: test runs-on: ubuntu-latest - if: github.ref == 'refs/heads/master' + if: github.event_name == 'release' steps: - uses: actions/checkout@v4 - - - name: Bump patch version - if: github.ref == 'refs/heads/master' + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Update version from tag run: | - pip install bump2version - bump2version patch - env: - GIT_COMMITTER_NAME: github-actions[bot] - GIT_COMMITTER_EMAIL: github-actions[bot]@users.noreply.github.com - GIT_AUTHOR_NAME: github-actions[bot] - GIT_AUTHOR_EMAIL: github-actions[bot]@users.noreply.github.com + VERSION="${GITHUB_REF_NAME#v}" + sed -i "s/^version = .*/version = \"$VERSION\"/" pyproject.toml - name: Build package run: | - pip install setuptools wheel - python setup.py sdist bdist_wheel + pip install build + python -m build - name: Upload build artifacts - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: dist path: dist/* @@ -83,41 +81,30 @@ jobs: publish-testpypi: needs: build runs-on: ubuntu-latest - if: github.ref == 'refs/heads/master' + if: github.event_name == 'release' steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 - with: - python-version: '3.9' - - uses: actions/download-artifact@v2 + - uses: actions/download-artifact@v4 with: name: dist path: dist + - name: Publish to TestPyPI - run: | - pip install twine - twine upload --repository testpypi dist/* - env: - TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.TESTPYPI_API_TOKEN }} + uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: https://test.pypi.org/legacy/ + password: ${{ secrets.TESTPYPI_API_TOKEN }} deploy: needs: publish-testpypi runs-on: ubuntu-latest - if: github.ref == 'refs/heads/master' + if: github.event_name == 'release' steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 - with: - python-version: '3.9' - - uses: actions/download-artifact@v2 + - uses: actions/download-artifact@v4 with: name: dist path: dist + - name: Publish to PyPI - run: | - pip install twine - twine upload dist/* - env: - TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.gitignore b/.gitignore index c23cd5a..3e3b33c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,12 @@ -python_urbackup.egg-info/* -dist/python-urbackup-*.zip -urbackup/__pycache__/* -test.py +*.egg-info/ +dist/ +build/ *.pyc -__pycache__ -env -build -dist -site \ No newline at end of file +__pycache__/ +.pytest_cache/ +.coverage +htmlcov/ +env/ +.venv/ +site/ +test.py \ No newline at end of file diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 12db41c..1154b79 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,24 +1,19 @@ -# Read the Docs configuration file for MkDocs projects -# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details - -# Required version: 2 -# Set the Python version build: os: ubuntu-22.04 tools: python: "3.12" -# Build documentation in the "docs/" directory with MkDocs mkdocs: configuration: mkdocs.yml -# Optionally build your docs in additional formats such as HTML formats: - htmlzip -# Declare the Python requirements required to build your documentation python: install: - - requirements: requirements.txt + - method: pip + path: . + extra_requirements: + - dev diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 7ea2843..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,2 +0,0 @@ -include README.md -include LICENSE.txt \ No newline at end of file diff --git a/README.md b/README.md index 8398cbe..332fe53 100644 --- a/README.md +++ b/README.md @@ -1,118 +1,174 @@ # Python UrBackup [![urbackup.org](docs/urbackup.png)](https://www.urbackup.org/) [![PyPI](https://img.shields.io/pypi/v/python-urbackup)](https://pypi.org/project/python-urbackup/) -![PyPI - Python Version](https://img.shields.io/pypi/pyversions/dirconfig) +![PyPI - Python Version](https://img.shields.io/pypi/pyversions/python-urbackup) [![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/judahpaul16/python-urbackup/workflow.yaml)](https://github.com/judahpaul16/python-urbackup/actions) [![Coverage Status](https://coveralls.io/repos/github/judahpaul16/python-urbackup/badge.svg?branch=master&kill_cache=1)](https://coveralls.io/github/judahpaul16/python-urbackup?branch=master) [![Documentation Status](https://readthedocs.org/projects/python-urbackup/badge/?version=latest)](https://python-urbackup.readthedocs.io/en/latest/?badge=latest) -Python UrBackup is a powerful Python wrapper designed to interact with UrBackup servers. This library allows you to manage backups, restore operations, and monitor the server status programmatically. +Python wrapper to interact with UrBackup servers. Manage backups, restore operations, and monitor server status programmatically. *Originally [urbackup-server-web-api-wrapper](https://github.com/uroni/urbackup-server-python-web-api-wrapper)* ## Installation -Install with: ```bash pip install python-urbackup ``` -## Usage - -### Initialization - -Create an instance of the `urbackup_server` by specifying the server URL, username, and password. Replace `'your_server_url'`, `'your_username'`, and `'your_password'` with your server details: +## Quick Start ```python from urbackup import urbackup_server -server = urbackup_server('your_server_url', 'your_username', 'your_password') +server = urbackup_server("http://127.0.0.1:55414/x", "admin", "password") + +if server.login(): + print("Connected!") ``` -### Logging In +## Legacy API (dict-based) -To perform any operations, you need to log in: +The legacy API returns raw dicts/lists, preserving backward compatibility. ```python -if server.login(): - print("Login successful!") -else: - print("Login failed!") -``` +from urbackup import urbackup_server + +server = urbackup_server("http://127.0.0.1:55414/x", "admin", "password") -### Getting Client Status +# Client status +clients = server.get_status() +for client in clients: + print(f"{client['name']}: online={client['online']}") -Retrieve the status of a specific client: +# Start backups +server.start_incr_file_backup("my-client") +server.start_full_image_backup("my-client") -```python -client_status = server.get_client_status('client_name') -if client_status: - print(f"Client status: {client_status}") -else: - print("Client not found or access denied.") -``` +# Settings +settings = server.get_global_settings() +server.set_global_setting("backup_window", "1-5/8-17") -### Downloading an Installer +# Client settings +server.change_client_setting("my-client", "internet_speed", "50000") -To download an installer for a new client, specify the file path and the client's name: +# Extra clients +server.add_extra_client("10.0.0.5") +extras = server.get_extra_clients() -```python -if server.download_installer('path/to/installer', 'new_client_name'): - print("Installer downloaded successfully.") -else: - print("Failed to download installer.") +# Monitor actions +actions = server.get_actions() +for action in actions or []: + server.stop_action(action) + +# Live log +log = server.get_livelog() ``` -### Starting Backups +## Typed API (dataclass-based) -You can start different types of backups for a client. Here are examples of starting an incremental file backup and a full file backup: +The typed API returns structured dataclass objects with full type hints. ```python -if server.start_incr_file_backup('client_name'): - print("Incremental file backup started successfully.") -else: - print("Failed to start incremental file backup.") - -if server.start_full_file_backup('client_name'): - print("Full file backup started successfully.") -else: - print("Failed to start full file backup.") +from urbackup import urbackup_server, BackupType, ClientNotFoundError + +server = urbackup_server("http://127.0.0.1:55414/x", "admin", "password") +server.login() + +# Typed client statuses +clients = server.get_client_statuses() +for client in clients: + print(f"{client.name}: id={client.id}, online={client.online}") + +# Find specific client (raises ClientNotFoundError if missing) +try: + client = server.get_client_status_by_name("my-client") + print(f"Last backup: {client.lastbackup}") +except ClientNotFoundError: + print("Client not found") + +# Start backup with enum +server.start_backup_typed("my-client", BackupType.INCR_FILE) + +# Typed usage stats +for entry in server.get_usage_typed(): + print(f"{entry.name}: {entry.used} bytes, {entry.files} files") + +# Typed actions with progress +for action in server.get_actions_typed(): + print(f"Client {action.clientid}: {action.progress_percent}%") + +# Typed settings (raises AccessDeniedError on failure) +settings = server.get_global_settings_typed() +server.set_global_setting_typed("backup_window", "1-7/0-24") + +# Client settings (raises KeyError for invalid keys) +server.change_client_setting_typed("my-client", "internet_speed", "50000") + +# Typed log entries +for entry in server.get_livelog_typed(): + print(f"[{entry.level}] {entry.message}") + +# Backup history +backups = server.get_backups(clientid=1) +images = server.get_image_backups(clientid=1) + +# Extra clients +for ec in server.get_extra_clients_typed(): + print(f"{ec.hostname} (id={ec.id})") + +# Server identity +identity = server.get_server_identity_str() ``` -### Managing Clients +## Settings Example -Add a new client to the server: +Configure a client's backup folder path and incremental backup interval: ```python -new_client = server.add_client('new_client_name') -if new_client: - print("New client added:", new_client) -else: - print("Failed to add new client.") +server.change_client_setting("my-client", "default_dirs", "/home;/etc") +server.change_client_setting("my-client", "update_freq_incr", "4") # hours ``` -List clients with no file backup in the last three days: +## Basic Auth (htpasswd) + +If your server uses HTTP basic authentication: ```python -import urbackup -import time -import datetime -server = urbackup.urbackup_server("http://127.0.0.1:55414/x", "admin", "foo") -clients = server.get_status() -diff_time = 3*24*60*60 # 3 days -for client in clients: - if client["lastbackup"]=="-" or client["lastbackup"] < time.time() - diff_time: +server = urbackup_server( + "http://127.0.0.1:55414/x", "admin", "password", + basic_username="httpuser", basic_password="httppass", +) +``` - if client["lastbackup"]=="-" or client["lastbackup"]==0: - lastbackup = "Never" - else: - lastbackup = datetime.datetime.fromtimestamp(client["lastbackup"]).strftime("%x %X") +## Exception Hierarchy - print("Last file backup at {lastbackup} of client {clientname} is older than three days".format( - lastbackup=lastbackup, clientname=client["name"] ) ) +``` +UrbackupError +├── AuthenticationError # login failed +├── ClientNotFoundError # client name not found +├── AccessDeniedError # insufficient permissions +└── UserAlreadyExistsError # duplicate user creation ``` -For more information, please refer to the [API Reference](https://python-urbackup.readthedocs.io/en/latest/api_reference/). +## Available Types + +| Type | Description | +|------|-------------| +| `ClientStatus` | Client info: id, name, online, lastbackup, ip, etc. | +| `BackupEntry` | Backup record: id, clientid, backuptime, size_bytes | +| `UsageEntry` | Storage usage: name, used, files, images | +| `ActionProgress` | Running action: clientid, id, action, progress_percent | +| `LogEntry` | Log record: id, message, level, time | +| `ExtraClient` | Extra client: id, hostname | +| `User` | User account: id, name, rights | +| `Group` | Client group: id, name | +| `BackupType` | Enum: INCR_FILE, FULL_FILE, INCR_IMAGE, FULL_IMAGE | +| `ActionType` | IntEnum: all server action type constants | +| `InstallerOS` | Enum: WINDOWS, LINUX | +| `LogLevel` | IntEnum: ERROR, WARNING, INFO, DEBUG | + +All dataclasses include a `raw` field with the original API dict. ## UrBackup CLI @@ -120,7 +176,7 @@ The UrBackup CLI is a command-line interface that allows you to interact with th *Important Note: For Windows the command-line tool is `urbackupclient_cmd`. Mac and Linux use `urbackupclientctl`.* -CLI options for `urbackupclientctl` and `urbackupclientctl` are as follows: +CLI options for `urbackupclientctl` and `urbackupclient_cmd` are as follows: ```sh USAGE: @@ -159,10 +215,10 @@ Get specific command help with urbackupclientctl --help For more information, please refer to the [UrBackup Administration Documentation](https://www.urbackup.org/administration_manual.html). -## Contributing 🤝 +## Contributing Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests. -## License 📃 +## License This project is licensed under the Apache License - see the [LICENSE](LICENSE) file for details. diff --git a/docs/api_reference.md b/docs/api_reference.md index 77117e6..6980f55 100644 --- a/docs/api_reference.md +++ b/docs/api_reference.md @@ -1,70 +1,213 @@ -# API Reference - -This section provides detailed documentation for all functions and classes available in the Python UrBackup API. - -## Classes - -### `installer_os` - -An enumeration of supported operating systems for installers. - -- `Windows`: Represents a Windows operating system. -- `Linux`: Represents a Linux operating system. - -### `urbackup_server` - -This class provides methods to interact with the UrBackup server. - -#### Attributes - -- `server_basic_username`: Username for basic authentication. -- `server_basic_password`: Password for basic authentication. -- `action_incr_file`: Constant for incremental file backup action. -- `action_full_file`: Constant for full file backup action. -- `action_incr_image`: Constant for incremental image backup action. -- `action_full_image`: Constant for full image backup action. -- `action_resumed_incr_file`: Constant for resumed incremental file backup action. -- `action_resumed_full_file`: Constant for resumed full file backup action. -- `action_file_restore`: Constant for file restore action. -- `action_image_restore`: Constant for image restore action. -- `action_client_update`: Constant for client update action. -- `action_check_db_integrity`: Constant for database integrity check action. -- `action_backup_db`: Constant for database backup action. -- `action_recalc_stats`: Constant for recalculating statistics action. - -#### Methods - -- `__init__(url, username, password)`: Initializes a new `urbackup_server` instance. -- `login()`: Logs in to the UrBackup server and returns `True` on success. -- `get_client_status(client_name)`: Retrieves the status of a specified client. -- `download_installer(installer_fn, new_clientname, os='linux')`: Downloads the installer for a specified client and operating system. -- `add_client(clientname)`: Adds a new client to the server. -- `get_global_settings()`: Retrieves global settings from the server. -- `set_global_setting(key, new_value)`: Sets a global setting on the server. -- `get_client_settings(clientname)`: Retrieves settings for a specified client. -- `change_client_setting(clientname, key, new_value)`: Changes a setting for a specified client. -- `get_client_authkey(clientname)`: Retrieves the authentication key for a specified client. -- `get_server_identity()`: Retrieves the server's identity. -- `get_status()`: Retrieves the overall status of the server. -- `get_livelog(clientid=0)`: Retrieves live log entries for a specified client. -- `get_usage()`: Retrieves usage statistics from the server. -- `get_extra_clients()`: Retrieves a list of extra clients added to the server. -- `start_incr_file_backup(clientname)`: Starts an incremental file backup for a specified client. -- `start_full_file_backup(clientname)`: Starts a full file backup for a specified client. -- `start_incr_image_backup(clientname)`: Starts an incremental image backup for a specified client. -- `start_full_image_backup(clientname)`: Starts a full image backup for a specified client. -- `add_extra_client(addr)`: Adds an extra client by hostname. -- `remove_extra_client(ecid)`: Removes an extra client by ID. -- `get_actions()`: Retrieves current actions being performed on the server. -- `stop_action(action)`: Stops a specific action on the server. - -#### Private Methods - -- `_get_response(action, params, method='POST')`: Sends a request to the server and retrieves the response. -- `_get_json(action, params={})`: Sends a JSON request to the server and parses the response. -- `_download_file(action, outputfn, params)`: Downloads a file from the server based on specified parameters. -- `_md5(s)`: Calculates the MD5 hash of a given string. - -Please refer to the source code for more detailed information on each method, including parameters and expected response types. - -For more specific information regarding urbackup administration, please refer to the [official UrBackup documentation](https://www.urbackup.org/administration_manual.html). \ No newline at end of file +# API Reference + +## `urbackup_server` + +The main class for interacting with a UrBackup server. Inherits both the legacy (dict-returning) and typed (dataclass-returning) APIs. + +### Constructor + +```python +urbackup_server( + url: str, + username: str, + password: str, + *, + basic_username: str = "", + basic_password: str = "", +) +``` + +| Parameter | Description | +|-----------|-------------| +| `url` | Server URL (e.g. `http://127.0.0.1:55414/x`) | +| `username` | UrBackup admin username | +| `password` | UrBackup admin password | +| `basic_username` | Optional HTTP basic auth username (for .htpasswd) | +| `basic_password` | Optional HTTP basic auth password | + +### Authentication + +- `login() -> bool`: Log in to the server. Tries anonymous login first, falls back to username/password with MD5 or PBKDF2 hashing. + +### Legacy API (dict-returning) + +These methods return raw dicts/lists or `None` on failure. + +| Method | Returns | Description | +|--------|---------|-------------| +| `get_status()` | `list[dict] \| None` | All client statuses | +| `get_client_status(clientname)` | `dict \| None` | Single client status | +| `get_server_identity()` | `str \| None` | Server identity string | +| `get_usage()` | `list[dict] \| None` | Storage usage per client | +| `get_actions()` | `list[dict] \| None` | Currently running actions | +| `get_livelog(clientid=0)` | `list[dict] \| None` | Live log entries | +| `get_extra_clients()` | `list[dict] \| None` | Extra clients list | +| `get_global_settings()` | `dict \| None` | Global server settings | +| `get_client_settings(clientname)` | `dict \| None` | Settings for a client | +| `get_client_authkey(clientname)` | `str \| None` | Client internet auth key | +| `set_global_setting(key, new_value)` | `bool` | Update a global setting | +| `change_client_setting(clientname, key, new_value)` | `bool` | Update a client setting | +| `add_client(clientname)` | `dict \| None` | Add a new client | +| `add_extra_client(addr)` | `bool` | Add an extra client by hostname | +| `remove_extra_client(ecid)` | `bool` | Remove an extra client | +| `start_incr_file_backup(clientname)` | `bool` | Start incremental file backup | +| `start_full_file_backup(clientname)` | `bool` | Start full file backup | +| `start_incr_image_backup(clientname)` | `bool` | Start incremental image backup | +| `start_full_image_backup(clientname)` | `bool` | Start full image backup | +| `stop_action(action)` | `bool` | Stop a running action | +| `download_installer(installer_fn, new_clientname, os='linux')` | `bool` | Download client installer | + +### Typed API (dataclass-returning) + +These methods return structured dataclass objects and raise exceptions on failure. + +| Method | Returns | Description | +|--------|---------|-------------| +| `get_client_statuses()` | `list[ClientStatus]` | All client statuses | +| `get_client_status_by_name(clientname)` | `ClientStatus` | Single client (raises `ClientNotFoundError`) | +| `get_server_identity_str()` | `str` | Server identity string | +| `get_usage_typed()` | `list[UsageEntry]` | Storage usage per client | +| `get_actions_typed()` | `list[ActionProgress]` | Currently running actions | +| `get_livelog_typed(clientid=0)` | `list[LogEntry]` | Live log entries | +| `get_extra_clients_typed()` | `list[ExtraClient]` | Extra clients list | +| `get_backups(clientid)` | `list[BackupEntry]` | File backups for a client | +| `get_image_backups(clientid)` | `list[BackupEntry]` | Image backups for a client | +| `get_global_settings_typed()` | `dict` | Global settings (raises `AccessDeniedError`) | +| `set_global_setting_typed(key, value)` | `bool` | Update a global setting | +| `get_client_settings_typed(clientname)` | `dict` | Client settings (raises `AccessDeniedError`) | +| `change_client_setting_typed(clientname, key, new_value)` | `bool` | Update a client setting (raises `KeyError`) | +| `start_backup_typed(clientname, backup_type)` | `bool` | Start backup with `BackupType` enum | +| `stop_action_typed(action)` | `bool` | Stop a running `ActionProgress` | +| `get_users()` | `list[User]` | List server users | +| `get_groups()` | `list[Group]` | List client groups | +| `add_user(username, password, rights='all')` | `bool` | Add user (raises `UserAlreadyExistsError`) | +| `download_installer_typed(installer_fn, new_clientname, os=InstallerOS.LINUX)` | `bool` | Download client installer | + +## Exceptions + +All exceptions inherit from `UrbackupError`. + +| Exception | Raised when | +|-----------|-------------| +| `AuthenticationError` | Login fails | +| `ClientNotFoundError` | Client name not found on server | +| `AccessDeniedError` | Insufficient permissions for the operation | +| `UserAlreadyExistsError` | Attempting to create a duplicate user | + +## Data Types + +### `ClientStatus` + +| Field | Type | Description | +|-------|------|-------------| +| `id` | `int` | Client ID | +| `name` | `str` | Client name | +| `online` | `bool` | Whether client is online | +| `lastbackup` | `int` | Last file backup timestamp | +| `lastbackup_image` | `int` | Last image backup timestamp | +| `file_ok` | `bool` | File backup status OK | +| `image_ok` | `bool` | Image backup status OK | +| `ip` | `str` | Client IP address | +| `client_version_string` | `str` | Client software version | +| `os_version_string` | `str` | Client OS version | +| `raw` | `dict` | Original API response dict | + +### `BackupEntry` + +| Field | Type | Description | +|-------|------|-------------| +| `id` | `int` | Backup ID | +| `clientid` | `int` | Client ID | +| `backuptime` | `int` | Backup timestamp | +| `incremental` | `int` | Whether backup is incremental | +| `size_bytes` | `int` | Backup size in bytes | +| `raw` | `dict` | Original API response dict | + +### `UsageEntry` + +| Field | Type | Description | +|-------|------|-------------| +| `name` | `str` | Client name | +| `used` | `int` | Bytes used | +| `files` | `int` | Number of file backups | +| `images` | `int` | Number of image backups | +| `raw` | `dict` | Original API response dict | + +### `ActionProgress` + +| Field | Type | Description | +|-------|------|-------------| +| `clientid` | `int` | Client ID | +| `id` | `int` | Action ID | +| `name` | `str` | Client name | +| `action` | `int` | Action type constant | +| `progress_percent` | `int` | Progress percentage (-1 if unknown) | +| `raw` | `dict` | Original API response dict | + +### `LogEntry` + +| Field | Type | Description | +|-------|------|-------------| +| `id` | `int` | Log entry ID | +| `message` | `str` | Log message | +| `level` | `int` | Log level (see `LogLevel`) | +| `time` | `int` | Timestamp | +| `raw` | `dict` | Original API response dict | + +### `ExtraClient` + +| Field | Type | Description | +|-------|------|-------------| +| `id` | `int` | Extra client ID | +| `hostname` | `str` | Hostname or IP | +| `raw` | `dict` | Original API response dict | + +### `User` + +| Field | Type | Description | +|-------|------|-------------| +| `id` | `int` | User ID | +| `name` | `str` | Username | +| `rights` | `str` | Permission rights string | +| `raw` | `dict` | Original API response dict | + +### `Group` + +| Field | Type | Description | +|-------|------|-------------| +| `id` | `int` | Group ID | +| `name` | `str` | Group name | +| `raw` | `dict` | Original API response dict | + +## Enums + +### `BackupType` + +| Value | Description | +|-------|-------------| +| `INCR_FILE` | Incremental file backup | +| `FULL_FILE` | Full file backup | +| `INCR_IMAGE` | Incremental image backup | +| `FULL_IMAGE` | Full image backup | + +### `ActionType` + +Integer enum of all server action type constants: `INCR_FILE` (1), `FULL_FILE` (2), `INCR_IMAGE` (3), `FULL_IMAGE` (4), `RESUMED_INCR_FILE` (5), `RESUMED_FULL_FILE` (6), `FILE_RESTORE` (8), `IMAGE_RESTORE` (9), `CLIENT_UPDATE` (10), `CHECK_DB_INTEGRITY` (11), `BACKUP_DB` (12), `RECALC_STATS` (13). + +### `InstallerOS` + +| Value | Description | +|-------|-------------| +| `WINDOWS` | Windows installer | +| `LINUX` | Linux installer | + +### `LogLevel` + +| Value | Description | +|-------|-------------| +| `ERROR` (1) | Error messages | +| `WARNING` (2) | Warning messages | +| `INFO` (3) | Informational messages | +| `DEBUG` (4) | Debug messages | + +For more information on the UrBackup server API, refer to the [official UrBackup documentation](https://www.urbackup.org/administration_manual.html). diff --git a/docs/getting_started.md b/docs/getting_started.md index 63a4d5c..3948fc2 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -1,8 +1,10 @@ -# Getting Started with Python UrBackup +# Getting Started -## Installation +## Requirements + +Python 3.10 or later. -To install Python UrBackup, you'll need Python installed on your system. Python UrBackup supports Python 3.x. +## Installation ```bash pip install python-urbackup @@ -10,12 +12,28 @@ pip install python-urbackup ## Configuration -After installation, you'll need to configure the library with your UrBackup server details: - ```python from urbackup import urbackup_server -server = urbackup_server('your_server_url', 'your_username', 'your_password') +server = urbackup_server("http://127.0.0.1:55414/x", "admin", "password") +``` + +If your server uses HTTP basic authentication (.htpasswd): + +```python +server = urbackup_server( + "http://127.0.0.1:55414/x", "admin", "password", + basic_username="httpuser", basic_password="httppass", +) +``` + +## Logging In + +```python +if server.login(): + print("Login successful!") +else: + print("Login failed!") ``` -This is the basic configuration needed to start interacting with your UrBackup server. \ No newline at end of file +Login is called automatically by all API methods, but you can call it explicitly to verify connectivity. diff --git a/docs/index.md b/docs/index.md index 70bf2ac..0b761b9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,19 +1,21 @@ # Python UrBackup [![urbackup.org](urbackup.png)](https://www.urbackup.org/) [![PyPI](https://img.shields.io/pypi/v/python-urbackup)](https://pypi.org/project/python-urbackup/) -![PyPI - Python Version](https://img.shields.io/pypi/pyversions/dirconfig) +![PyPI - Python Version](https://img.shields.io/pypi/pyversions/python-urbackup) [![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/judahpaul16/python-urbackup/workflow.yaml)](https://github.com/judahpaul16/python-urbackup/actions) [![Coverage Status](https://coveralls.io/repos/github/judahpaul16/python-urbackup/badge.svg?branch=master&kill_cache=1)](https://coveralls.io/github/judahpaul16/python-urbackup?branch=master) [![Documentation Status](https://readthedocs.org/projects/python-urbackup/badge/?version=latest)](https://python-urbackup.readthedocs.io/en/latest/?badge=latest) -Python UrBackup is a powerful Python wrapper designed to interact with UrBackup servers. This library allows you to manage backups, restore operations, and monitor the server status programmatically. +Python wrapper to interact with UrBackup servers. Manage backups, restore operations, and monitor server status programmatically. *Originally [urbackup-server-web-api-wrapper](https://github.com/uroni/urbackup-server-python-web-api-wrapper)* ## Features -- Easy to use Python API. -- Manage and monitor UrBackup server operations. -- Download and configure client installers. +- Legacy dict-returning API for backward compatibility +- Typed dataclass-returning API with full type hints +- Structured exception hierarchy for error handling +- MD5 and PBKDF2 authentication support +- HTTP basic auth (htpasswd) support ## Installation @@ -21,19 +23,18 @@ Python UrBackup is a powerful Python wrapper designed to interact with UrBackup pip install python-urbackup ``` -For more detailed installation instructions, see the [Getting Started](getting_started.md) section. +Requires Python 3.10 or later. -## Quick Start +For more detailed setup, see the [Getting Started](getting_started.md) section. -Here's a quick example to get you started: +## Quick Start ```python from urbackup import urbackup_server -# Initialize and log in -server = urbackup_server('your_server_url', 'your_username', 'your_password') +server = urbackup_server("http://127.0.0.1:55414/x", "admin", "password") if server.login(): - print("Login successful!") + print("Connected!") ``` -For more examples and usage instructions, visit the [Usage](usage/initialization.md) section. +For usage examples, visit the [Usage](usage/initialization.md) section. For the full method listing, see the [API Reference](api_reference.md). diff --git a/docs/usage/operations.md b/docs/usage/operations.md index 7988a11..f0ae93d 100644 --- a/docs/usage/operations.md +++ b/docs/usage/operations.md @@ -1,45 +1,148 @@ # Operations -## Getting Client Status +## Legacy API (dict-based) -To get the status of a specific client: +### Getting Client Status ```python -client_status = server.get_client_status('client_name') +client_status = server.get_client_status("client_name") if client_status: print(f"Client status: {client_status}") -else: - print("Client not found or access denied.") ``` -## Downloading an Installer +### Starting Backups -To download an installer for a new client: +```python +server.start_incr_file_backup("client_name") +server.start_full_file_backup("client_name") +server.start_incr_image_backup("client_name") +server.start_full_image_backup("client_name") +``` + +### Settings + +```python +settings = server.get_global_settings() +server.set_global_setting("backup_window", "1-5/8-17") + +server.change_client_setting("client_name", "default_dirs", "/home;/etc") +server.change_client_setting("client_name", "update_freq_incr", "4") +``` + +### Managing Clients ```python -if server.download_installer('path/to/installer', 'new_client_name'): - print("Installer downloaded successfully.") -else: - print("Failed to download installer.") +new_client = server.add_client("new_client_name") + +server.add_extra_client("10.0.0.5") +extras = server.get_extra_clients() ``` -## Starting Backups +### Monitoring -Start an incremental file backup: +```python +actions = server.get_actions() +for action in actions or []: + server.stop_action(action) + +log = server.get_livelog() +usage = server.get_usage() +``` + +### Download Installer ```python -if server.start_incr_file_backup('client_name'): - print("Incremental file backup started successfully.") +server.download_installer("installer.sh", "new_client", os="linux") ``` -## Managing Clients +## Typed API (dataclass-based) + +The typed API returns structured dataclass objects and raises exceptions instead of returning `None`. -Add a new client to the server: +### Client Status ```python -new_client = server.add_client('new_client_name') -if new_client: - print("New client added:", new_client) -else: - print("Failed to add new client.") -``` \ No newline at end of file +from urbackup import ClientNotFoundError + +clients = server.get_client_statuses() +for client in clients: + print(f"{client.name}: id={client.id}, online={client.online}") + +try: + client = server.get_client_status_by_name("my-client") + print(f"Last backup: {client.lastbackup}") +except ClientNotFoundError: + print("Client not found") +``` + +### Starting Backups + +```python +from urbackup import BackupType + +server.start_backup_typed("client_name", BackupType.INCR_FILE) +server.start_backup_typed("client_name", BackupType.FULL_IMAGE) +``` + +### Backup History + +```python +backups = server.get_backups(clientid=1) +for b in backups: + print(f"Backup {b.id}: {b.size_bytes} bytes at {b.backuptime}") + +images = server.get_image_backups(clientid=1) +``` + +### Settings + +```python +settings = server.get_global_settings_typed() +server.set_global_setting_typed("backup_window", "1-7/0-24") + +client_settings = server.get_client_settings_typed("my-client") +server.change_client_setting_typed("my-client", "internet_speed", "50000") +``` + +### Monitoring + +```python +for action in server.get_actions_typed(): + print(f"Client {action.clientid}: {action.progress_percent}%") + server.stop_action_typed(action) + +for entry in server.get_livelog_typed(): + print(f"[{entry.level}] {entry.message}") + +for entry in server.get_usage_typed(): + print(f"{entry.name}: {entry.used} bytes") +``` + +### Extra Clients + +```python +for ec in server.get_extra_clients_typed(): + print(f"{ec.hostname} (id={ec.id})") +``` + +### Users and Groups + +```python +from urbackup import UserAlreadyExistsError + +users = server.get_users() +groups = server.get_groups() + +try: + server.add_user("newadmin", "password123") +except UserAlreadyExistsError: + print("User already exists") +``` + +### Download Installer + +```python +from urbackup import InstallerOS + +server.download_installer_typed("installer.sh", "new_client", os=InstallerOS.LINUX) +``` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e1b8d2e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,50 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "python-urbackup" +version = "0.2.0" +description = "Python wrapper to access and control an UrBackup server" +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.10" +authors = [ + { name = "Judah Paul", email = "me@judahpaul.com" }, +] +keywords = ["urbackup", "web", "api", "client", "backup"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "requests", +] + +[project.urls] +Homepage = "https://github.com/judahpaul16/python-urbackup" +Documentation = "https://python-urbackup.readthedocs.io/" +Repository = "https://github.com/judahpaul16/python-urbackup" + +[project.optional-dependencies] +dev = [ + "pytest", + "coverage", + "coveralls", + "mkdocs-material", + "mkdocs-git-revision-date-localized-plugin", +] + +[tool.setuptools.packages.find] +include = ["urbackup*"] +exclude = ["tests*", "docs*"] + +[tool.pytest.ini_options] +testpaths = ["tests"] + +[tool.coverage.run] +source = ["urbackup"] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index a23c28b..0000000 --- a/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -requests -urllib3 -mkdocs-material -mkdocs-git-revision-date-localized-plugin \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 224a779..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[metadata] -description-file = README.md \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index 437e45a..0000000 --- a/setup.py +++ /dev/null @@ -1,86 +0,0 @@ -# Always prefer setuptools over distutils -from setuptools import setup, find_packages -# To use a consistent encoding -from codecs import open -from os import path - -here = path.abspath(path.dirname(__file__)) - -# Get the long description from the README file -with open(path.join(here, 'README.md'), encoding='utf-8') as f: - long_description = f.read() - -setup( - name='python-urbackup', - - # Versions should comply with PEP440. For a discussion on single-sourcing - # the version across setup.py and the project code, see - # https://packaging.python.org/en/latest/single_source_version.html - version='0.1.1', - - description='Python wrapper to access and control an UrBackup server', - long_description=long_description, - long_description_content_type='text/markdown', - - # The project's main homepage. - url='https://github.com/judahpaul16/python-urbackup', - - # Author details - author='Judah Paul', - author_email='me@judahpaul.com', - - # Choose your license - license='Apache License 2.0', - - # See https://pypi.python.org/pypi?%3Aaction=list_classifiers - classifiers=[ - # How mature is this project? Common values are - # 3 - Alpha - # 4 - Beta - # 5 - Production/Stable - 'Development Status :: 3 - Alpha', - - # Indicate who your project is intended for - 'Intended Audience :: Developers', - - # Pick your license as you wish (should match "license" above) - 'License :: OSI Approved :: Apache Software License', - - # Specify the Python versions you support here. In particular, ensure - # that you indicate whether you support Python 2, Python 3 or both. - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Programming Language :: Python :: 3.11', - 'Programming Language :: Python :: 3.12', - ], - - # What does your project relate to? - keywords='urbackup web api client', - - # You can just specify the packages manually here if your project is - # simple. Or you can use find_packages(). - packages=find_packages(exclude=['contrib', 'docs', 'tests']), - - # Alternatively, if you want to distribute just a my_module.py, uncomment - # this: - # py_modules=["my_module"], - - # List run-time dependencies here. These will be installed by pip when - # your project is installed. For an analysis of "install_requires" vs pip's - # requirements files see: - # https://packaging.python.org/en/latest/requirements.html - install_requires=[ - 'requests', - 'urllib3' - ], - - # List additional groups of dependencies here (e.g. development - # dependencies). You can install these using the following syntax, - # for example: - # $ pip install -e .[dev,test] - extras_require={ - 'dev': [], - 'test': [], - } -) diff --git a/tests/list_backup_counts.py b/tests/list_backup_counts.py deleted file mode 100644 index 1da0425..0000000 --- a/tests/list_backup_counts.py +++ /dev/null @@ -1,42 +0,0 @@ -import urbackup - -server = urbackup.urbackup_server("http://127.0.0.1:55414/x", "admin", "foo") - -clients = server.get_clients_with_group() - -for client in clients: - - file_backups = server.get_clientbackups(client["id"]) - - incr_file = 0 - full_file = 0 - - for file_backup in file_backups: - - if file_backup["incremental"]>0: - incr_file+=1 - else: - full_file+=1 - - incr_image = 0 - full_image = 0 - - image_backups = server.get_clientimagebackups(client["id"]) - - for image_backup in image_backups: - - if image_backup["letter"]=="SYSVOL" or image_backup["letter"]=="ESP": - continue - - if image_backup["incremental"]>0: - incr_image+=1 - else: - full_image+=1 - - print("Client {clientname} in group {groupname} has {incr_file} incr file backups, {full_file} " - "full file backups, {incr_image} incr image backups and " - "{full_image} full image backups".format( - incr_file=incr_file, clientname=client["name"], - full_file=full_file, incr_image=incr_image, - full_image=full_image, groupname=client["groupname"]) ) - diff --git a/tests/stop_all.py b/tests/stop_all.py deleted file mode 100644 index 84572e4..0000000 --- a/tests/stop_all.py +++ /dev/null @@ -1,12 +0,0 @@ -import urbackup - - -server = urbackup.urbackup_server("http://127.0.0.1:55414/x", "admin", "foo") - -for action in server.get_actions(): - a = action["action"] - if a ==server.action_full_file or a==server.action_resumed_full_file: - print("Running full file backup: "+action["name"]) - - print("Stopping...") - server.stop_action(action) \ No newline at end of file diff --git a/tests/urbackup_api_test.py b/tests/urbackup_api_test.py index 012bc88..79b0723 100644 --- a/tests/urbackup_api_test.py +++ b/tests/urbackup_api_test.py @@ -1,150 +1,476 @@ -import unittest -from unittest.mock import patch, MagicMock -from urbackup import urbackup_server, installer_os -import datetime -import time - -class TestUrBackupServer(unittest.TestCase): - def setUp(self): - self.server_url = "http://127.0.0.1:55414/x" - self.username = "admin" - self.password = "foo" - self.server = urbackup_server(self.server_url, self.username, self.password) - - @patch('urbackup.urbackup_server.get_server_identity') - def test_get_server_identity(self, mock_get_server_identity): - mock_get_server_identity.return_value = {'id': 'server1', 'name': 'TestServer'} - server_identity = self.server.get_server_identity() - self.assertEqual(server_identity, {'id': 'server1', 'name': 'TestServer'}) - - @patch('urbackup.urbackup_server.start_incr_file_backup') - def test_start_incr_file_backup(self, mock_start_backup): - mock_start_backup.return_value = True - result = self.server.start_incr_file_backup('client1') - self.assertTrue(result) - mock_start_backup.assert_called_with('client1') - - @patch('urbackup.urbackup_server.start_full_file_backup') - def test_start_full_file_backup(self, mock_start_backup): - mock_start_backup.return_value = True - result = self.server.start_full_file_backup('client1') - self.assertTrue(result) - mock_start_backup.assert_called_with('client1') - - @patch('urbackup.urbackup_server.start_incr_image_backup') - def test_start_incr_image_backup(self, mock_start_backup): - mock_start_backup.return_value = True - result = self.server.start_incr_image_backup('client1') - self.assertTrue(result) - mock_start_backup.assert_called_with('client1') - - @patch('urbackup.urbackup_server.start_full_image_backup') - def test_start_full_image_backup(self, mock_start_backup): - mock_start_backup.return_value = True - result = self.server.start_full_image_backup('client1') - self.assertTrue(result) - mock_start_backup.assert_called_with('client1') - - @patch('urbackup.urbackup_server.get_actions') - def test_get_actions(self, mock_get_actions): - mock_get_actions.return_value = [{'id': 1, 'type': 'backup', 'status': 'running'}] - actions = self.server.get_actions() - self.assertEqual(actions, [{'id': 1, 'type': 'backup', 'status': 'running'}]) - - @patch('urbackup.urbackup_server.stop_action') - def test_stop_action(self, mock_stop_action): - action = {'clientid': 'client1', 'id': 1} - mock_stop_action.return_value = True - result = self.server.stop_action(action) - self.assertTrue(result) - mock_stop_action.assert_called_with(action) - - @patch('urbackup.urbackup_server.login') - @patch('urbackup.urbackup_server.get_client_status') - @patch('urbackup.urbackup_server.get_client_settings') - @patch('urbackup.urbackup_server._get_json') - def test_change_client_setting(self, mock_get_json, mock_get_client_settings, mock_get_client_status, mock_login): - # Mock the login method to return True - mock_login.return_value = True - - # Set up the other mocks as before - mock_get_client_status.return_value = {'id': 'client1', 'name': 'TestClient'} - mock_get_client_settings.return_value = {'key1': 'value1', 'key2': 'value2'} - mock_get_json.return_value = {'saved_ok': True} - - # Execute the method to test - result = self.server.change_client_setting('TestClient', 'key1', 'new_value') - - # Verify that login was called - mock_login.assert_called_once() - - # Check the results - self.assertTrue(result) - - # Ensure _get_json was called with the correct parameters - mock_get_json.assert_called_with('settings', {'key1': 'new_value', 'key2': 'value2', 'overwrite': 'true', 'sa': 'clientsettings_save', 't_clientid': 'client1'}) - - @patch('urbackup.urbackup_server.get_extra_clients') - @patch('urbackup.urbackup_server.remove_extra_client') - def test_manage_extra_clients(self, mock_remove, mock_get_clients): - # Mocking the get_extra_clients and remove_extra_client methods - mock_get_clients.return_value = [{'id': 'client1'}, {'id': 'client2'}] - - # Execute the method to test - extra_clients = self.server.get_extra_clients() - for extra_client in extra_clients: - self.server.remove_extra_client(extra_client["id"]) - - # Assert that remove_extra_client was called for each client - mock_remove.assert_any_call('client1') - mock_remove.assert_any_call('client2') - self.assertEqual(mock_remove.call_count, 2) - - @patch('urbackup.urbackup_server.add_extra_client') - def test_add_extra_clients(self, mock_add_client): - # Setup - computernames = ["2.2.2.2", "3.3.3.3"] - mock_add_client.return_value = True - - # Test - for ip in computernames: - result = self.server.add_extra_client(ip) - self.assertTrue(result) - mock_add_client.assert_called_with(ip) - - @patch('urbackup.urbackup_server.get_status') - @patch('urbackup.urbackup_server.get_usage') - def test_check_status_and_usage(self, mock_get_usage, mock_get_status): - # Mocking responses - mock_get_status.return_value = [{'name': 'client1', 'lastbackup': 1590000000}] - mock_get_usage.return_value = [{'client': 'client1', 'data': 1000}] - - # Execution - clients = self.server.get_status() - usage = self.server.get_usage() - - # Verify length and data integrity - self.assertEqual(len(clients), len(usage)) - self.assertGreater(len(clients), 0) - self.assertEqual(clients[0]['name'], 'client1') - self.assertEqual(usage[0]['data'], 1000) - - @patch('urbackup.urbackup_server.get_status') - def test_backup_status_check(self, mock_get_status): - mock_get_status.return_value = [{'name': 'client1', 'lastbackup': time.time() - (4 * 24 * 60 * 60)}] - clients = self.server.get_status() - for client in clients: - if client['lastbackup'] < time.time() - (3 * 24 * 60 * 60): - lastbackup = datetime.datetime.fromtimestamp(client['lastbackup']).strftime('%x %X') - self.assertTrue("Last file backup at" in f"Last file backup at {lastbackup} of client {client['name']} is older than three days") - - @patch('urbackup.urbackup_server.download_installer') - def test_download_installer(self, mock_download): - # Setup - mock_download.return_value = True - result = self.server.download_installer("test.exe", "test", installer_os.Windows) - self.assertTrue(result) - mock_download.assert_called_with("test.exe", "test", installer_os.Windows) - -if __name__ == '__main__': - unittest.main() +import json +from unittest.mock import MagicMock, patch + +import pytest + +from urbackup import ( + AccessDeniedError, + ActionProgress, + AuthenticationError, + BackupEntry, + BackupType, + ClientNotFoundError, + ClientStatus, + ExtraClient, + InstallerOS, + LogEntry, + UsageEntry, + urbackup_server, +) + + +def _mock_response(json_data, status_code=200): + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = json_data + return resp + + +def _login_then(action_response): + def side_effect(url, **kwargs): + data = kwargs.get("data", {}) or kwargs.get("params", {}) + if "a=login" in url and not data.get("username"): + return _mock_response({"success": True, "session": "test_ses"}) + return _mock_response(action_response) + return side_effect + + +@pytest.fixture +def server(): + return urbackup_server("http://127.0.0.1:55414/x", "admin", "foo") + + +class TestLogin: + + @patch("requests.post") + def test_anonymous_login(self, mock_post, server): + mock_post.return_value = _mock_response( + {"success": True, "session": "anon_ses"} + ) + assert server.login() is True + assert server._session == "anon_ses" + assert server._logged_in is True + + @patch("requests.post") + def test_password_login_md5(self, mock_post, server): + responses = [ + _mock_response({"success": False}), + _mock_response({"ses": "salt_ses", "salt": "abc", "rnd": "xyz"}), + _mock_response({"success": True}), + ] + mock_post.side_effect = responses + assert server.login() is True + assert server._logged_in is True + + @patch("requests.post") + def test_password_login_pbkdf2(self, mock_post, server): + responses = [ + _mock_response({"success": False}), + _mock_response( + {"ses": "s", "salt": "abc", "rnd": "xyz", "pbkdf2_rounds": 10000} + ), + _mock_response({"success": True}), + ] + mock_post.side_effect = responses + assert server.login() is True + + @patch("requests.post") + def test_login_bad_password(self, mock_post, server): + responses = [ + _mock_response({"success": False}), + _mock_response({"ses": "s", "salt": "abc", "rnd": "xyz"}), + _mock_response({"success": False}), + ] + mock_post.side_effect = responses + assert server.login() is False + + @patch("requests.post") + def test_login_unknown_user(self, mock_post, server): + responses = [ + _mock_response({"success": False}), + _mock_response({}), + ] + mock_post.side_effect = responses + assert server.login() is False + + @patch("requests.post") + def test_already_logged_in(self, mock_post, server): + server._logged_in = True + assert server.login() is True + mock_post.assert_not_called() + + +class TestLegacyAPI: + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1", "online": True}], "server_identity": "id123"} + )) + def test_get_status(self, mock_post, server): + clients = server.get_status() + assert len(clients) == 1 + assert clients[0]["name"] == "pc1" + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1"}], "server_identity": "id123"} + )) + def test_get_server_identity(self, mock_post, server): + assert server.get_server_identity() == "id123" + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1"}]} + )) + def test_get_client_status(self, mock_post, server): + client = server.get_client_status("pc1") + assert client["id"] == 1 + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1"}]} + )) + def test_get_client_status_not_found(self, mock_post, server): + assert server.get_client_status("nonexistent") is None + + @patch("requests.post", side_effect=_login_then( + {"progress": [{"clientid": 1, "id": 10, "action": 1, "pcdone": 50}]} + )) + def test_get_actions(self, mock_post, server): + actions = server.get_actions() + assert len(actions) == 1 + assert actions[0]["pcdone"] == 50 + + @patch("requests.post", side_effect=_login_then( + {"usage": [{"name": "pc1", "used": 1024}]} + )) + def test_get_usage(self, mock_post, server): + usage = server.get_usage() + assert usage[0]["used"] == 1024 + + @patch("requests.post", side_effect=_login_then( + {"logdata": [{"id": 5, "msg": "test log", "loglevel": 3, "time": 100}]} + )) + def test_get_livelog(self, mock_post, server): + log = server.get_livelog() + assert log[0]["msg"] == "test log" + assert server._lastlogid == 5 + + @patch("requests.post", side_effect=_login_then( + {"status": [], "extra_clients": [{"id": 1, "hostname": "10.0.0.1"}]} + )) + def test_get_extra_clients(self, mock_post, server): + extras = server.get_extra_clients() + assert extras[0]["hostname"] == "10.0.0.1" + + @patch("requests.post") + def test_start_incr_file_backup(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"result": [{"start_ok": True}]}), + ] + assert server.start_incr_file_backup("pc1") is True + + @patch("requests.post") + def test_start_full_file_backup(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"result": [{"start_ok": True}]}), + ] + assert server.start_full_file_backup("pc1") is True + + @patch("requests.post") + def test_start_incr_image_backup(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"result": [{"start_ok": True}]}), + ] + assert server.start_incr_image_backup("pc1") is True + + @patch("requests.post") + def test_start_full_image_backup(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"result": [{"start_ok": True}]}), + ] + assert server.start_full_image_backup("pc1") is True + + @patch("requests.post") + def test_stop_action(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"progress": []}), + ] + assert server.stop_action({"clientid": 1, "id": 10}) is True + + def test_stop_action_missing_keys(self, server): + assert server.stop_action({}) is False + + @patch("requests.post") + def test_add_extra_client(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": []}), + ] + assert server.add_extra_client("10.0.0.2") is True + + @patch("requests.post") + def test_remove_extra_client(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": []}), + ] + assert server.remove_extra_client(1) is True + + @patch("requests.post") + def test_add_client(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"new_clientid": 5, "new_authkey": "abc"}), + ] + result = server.add_client("new_pc") + assert result["new_clientid"] == 5 + + @patch("requests.post") + def test_add_client_already_exists(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"already_exists": True}), + ] + assert server.add_client("existing_pc") is None + + @patch("requests.post") + def test_get_global_settings(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"settings": {"backup_window": "1-7/0-24"}}), + ] + settings = server.get_global_settings() + assert settings["backup_window"] == "1-7/0-24" + + @patch("requests.post") + def test_set_global_setting(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"settings": {"backup_window": "1-7/0-24"}}), + _mock_response({"saved_ok": True}), + ] + assert server.set_global_setting("backup_window", "1-5/8-17") is True + + @patch("requests.post") + def test_change_client_setting(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"settings": {"key1": "old", "key2": "val2"}}), + _mock_response({"saved_ok": True}), + ] + assert server.change_client_setting("pc1", "key1", "new") is True + + @patch("requests.post") + def test_change_client_setting_key_not_found(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"settings": {"key1": "old"}}), + ] + assert server.change_client_setting("pc1", "nonexistent", "val") is False + + @patch("requests.post") + def test_get_client_authkey(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"settings": {"internet_authkey": "secret123"}}), + ] + assert server.get_client_authkey("pc1") == "secret123" + + +class TestTypedAPI: + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1", "online": True}]} + )) + def test_get_client_statuses(self, mock_post, server): + clients = server.get_client_statuses() + assert len(clients) == 1 + assert isinstance(clients[0], ClientStatus) + assert clients[0].name == "pc1" + assert clients[0].id == 1 + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1"}]} + )) + def test_get_client_status_by_name(self, mock_post, server): + client = server.get_client_status_by_name("pc1") + assert isinstance(client, ClientStatus) + assert client.id == 1 + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1"}]} + )) + def test_get_client_status_by_name_not_found(self, mock_post, server): + with pytest.raises(ClientNotFoundError): + server.get_client_status_by_name("nonexistent") + + @patch("requests.post", side_effect=_login_then( + {"status": [{"id": 1, "name": "pc1"}], "server_identity": "id123"} + )) + def test_get_server_identity_str(self, mock_post, server): + assert server.get_server_identity_str() == "id123" + + @patch("requests.post", side_effect=_login_then( + {"status": [], "extra_clients": [{"id": 1, "hostname": "10.0.0.1"}]} + )) + def test_get_extra_clients_typed(self, mock_post, server): + extras = server.get_extra_clients_typed() + assert len(extras) == 1 + assert isinstance(extras[0], ExtraClient) + assert extras[0].hostname == "10.0.0.1" + + @patch("requests.post", side_effect=_login_then( + {"progress": [{"clientid": 1, "id": 10, "action": 1, "pcdone": 50}]} + )) + def test_get_actions_typed(self, mock_post, server): + actions = server.get_actions_typed() + assert len(actions) == 1 + assert isinstance(actions[0], ActionProgress) + assert actions[0].progress_percent == 50 + + @patch("requests.post", side_effect=_login_then( + {"usage": [{"name": "pc1", "used": 2048, "files": 10, "images": 2}]} + )) + def test_get_usage_typed(self, mock_post, server): + usage = server.get_usage_typed() + assert isinstance(usage[0], UsageEntry) + assert usage[0].used == 2048 + + @patch("requests.post", side_effect=_login_then( + {"logdata": [{"id": 7, "msg": "backup done", "loglevel": 3, "time": 200}]} + )) + def test_get_livelog_typed(self, mock_post, server): + logs = server.get_livelog_typed() + assert isinstance(logs[0], LogEntry) + assert logs[0].message == "backup done" + assert server._lastlogid == 7 + + @patch("requests.post") + def test_start_backup_typed(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"result": [{"start_ok": True}]}), + ] + assert server.start_backup_typed("pc1", BackupType.INCR_FILE) is True + + @patch("requests.post") + def test_require_login_raises(self, mock_post, server): + mock_post.return_value = _mock_response({}, status_code=500) + with pytest.raises(AuthenticationError): + server.get_client_statuses() + + @patch("requests.post", side_effect=_login_then( + {"backups": [{"id": 100, "clientid": 1, "backuptime": 99999}]} + )) + def test_get_backups(self, mock_post, server): + backups = server.get_backups(clientid=1) + assert len(backups) == 1 + assert isinstance(backups[0], BackupEntry) + assert backups[0].id == 100 + + @patch("requests.post", side_effect=_login_then( + {"settings": {"backup_window": "1-7/0-24"}} + )) + def test_get_global_settings_typed(self, mock_post, server): + settings = server.get_global_settings_typed() + assert settings["backup_window"] == "1-7/0-24" + + @patch("requests.post") + def test_set_global_setting_typed(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"settings": {"backup_window": "1-7/0-24"}}), + _mock_response({"saved_ok": True}), + ] + assert server.set_global_setting_typed("backup_window", "1-5/8-17") is True + + @patch("requests.post") + def test_change_client_setting_typed(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"settings": {"key1": "old"}}), + _mock_response({"saved_ok": True}), + ] + assert server.change_client_setting_typed("pc1", "key1", "new") is True + + @patch("requests.post") + def test_change_client_setting_typed_key_not_found(self, mock_post, server): + mock_post.side_effect = [ + _mock_response({"success": True, "session": "s"}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"status": [{"id": 1, "name": "pc1"}]}), + _mock_response({"settings": {"key1": "old"}}), + ] + with pytest.raises(KeyError): + server.change_client_setting_typed("pc1", "nonexistent", "val") + + +class TestDataclasses: + + def test_client_status_from_dict(self): + data = {"id": 1, "name": "pc1", "online": True, "ip": "192.168.1.1"} + cs = ClientStatus.from_dict(data) + assert cs.id == 1 + assert cs.online is True + assert cs.raw == data + + def test_backup_entry_from_dict(self): + data = {"id": 10, "clientid": 1, "backuptime": 12345, "size_bytes": 999} + be = BackupEntry.from_dict(data) + assert be.size_bytes == 999 + + def test_log_entry_from_dict(self): + data = {"id": 1, "msg": "hello", "loglevel": 3, "time": 100} + le = LogEntry.from_dict(data) + assert le.message == "hello" + assert le.level == 3 + + def test_action_progress_from_dict(self): + data = {"clientid": 1, "id": 5, "pcdone": 75, "action": 1} + ap = ActionProgress.from_dict(data) + assert ap.progress_percent == 75 + + def test_extra_client_from_dict(self): + data = {"id": 1, "hostname": "10.0.0.1"} + ec = ExtraClient.from_dict(data) + assert ec.hostname == "10.0.0.1" + + def test_usage_entry_from_dict(self): + data = {"name": "pc1", "used": 1024, "files": 5, "images": 2} + ue = UsageEntry.from_dict(data) + assert ue.files == 5 + + +class TestConstructor: + + def test_basic_auth(self): + s = urbackup_server( + "http://localhost/x", "admin", "pass", + basic_username="user", basic_password="pw", + ) + headers = s._build_headers() + assert "Authorization" in headers + assert headers["Authorization"].startswith("Basic ") + + def test_no_basic_auth(self): + s = urbackup_server("http://localhost/x", "admin", "pass") + headers = s._build_headers() + assert "Authorization" not in headers + + def test_url_trailing_slash_stripped(self): + s = urbackup_server("http://localhost/x/", "admin", "pass") + assert s._server_url == "http://localhost/x" diff --git a/urbackup/__init__.py b/urbackup/__init__.py index 4a39e3c..9b4961a 100644 --- a/urbackup/__init__.py +++ b/urbackup/__init__.py @@ -1,476 +1,49 @@ -from enum import Enum -import requests -from base64 import b64encode -import hashlib -import binascii -import logging - -logger = logging.getLogger('python-urbackup') - - -"""Backwards compatible imports""" -try: - import urllib -except ImportError: - import urllib.parse - - -class installer_os(Enum): - Windows = "windows", - Linux = "linux" - -class urbackup_server: - - #If you have basic authentication via .htpasswd - server_basic_username = '' - server_basic_password = '' - - action_incr_file = 1 - action_full_file = 2 - action_incr_image = 3 - action_full_image = 4 - action_resumed_incr_file = 5 - action_resumed_full_file = 6 - action_file_restore = 8 - action_image_restore = 9 - action_client_update = 10 - action_check_db_integrity = 11 - action_backup_db = 12 - action_recalc_stats = 13 - - _session = '' - _logged_in = False - _lastlogid = 0 - - def __init__(self, url, username, password): - self._server_url = url - self._server_username = username - self._server_password = password - - def _get_response(self, action, params, method='POST'): - - headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json; charset=UTF-8' - } - - if('server_basic_username' in globals() and len(self.server_basic_username)>0): - userAndPass = b64encode( - str.encode('%s:%s' % (self.server_basic_username, self.server_basic_password)) - ).decode('ascii') - headers['Authorization'] = 'Basic %s' % userAndPass - - try: - action_url = '%s?%s' % (self._server_url, urllib.urlencode({'a': action})) - except AttributeError: - action_url = '%s?%s' % (self._server_url, urllib.parse.urlencode({'a': action})) - - if(len(self._session)>0): - params['ses'] = self._session - - if method == 'POST': - r = requests.post( - action_url, - data=params - ) - elif method == 'GET': - r = requests.get( - action_url, - params=params - ) - else: - raise Exception('Request with method \'%s\' has not been implemented yet.' % method) - - return r - - def _get_json(self, action, params = {}): - tries = 50 - - while tries>0: - response = self._get_response(action, params) - - if(response.status_code == 200): - break - - tries -= 1 - if tries <= 0: - return None - else: - logger.error('API call failed. Retrying...') - - return response.json() - - def _download_file(self, action, outputfn, params): - - req = self._get_response(action, params, 'GET') - - if req.status_code != 200: - return False - - with open(outputfn, 'wb') as stream: - for chunk in req.iter_content(chunk_size=128): - stream.write(chunk) - - return True - - def _md5(self, s): - return hashlib.md5(s.encode()).hexdigest() - - def login(self): - - if not self._logged_in: - - logger.debug('Trying anonymous login...') - - login = self._get_json('login', {}) - - if not login or 'success' not in login or not login['success'] : - logger.debug('Logging in...') - - salt = self._get_json('salt', {'username': self._server_username}) - if not salt or not ('ses' in salt): - logger.warning('Username does not exist') - return False - - self._session = salt['ses'] - - if 'salt' in salt: - password_md5_bin = hashlib.md5((salt['salt']+self._server_password).encode()).digest() - password_md5 = binascii.hexlify(password_md5_bin).decode() - - if 'pbkdf2_rounds' in salt: - pbkdf2_rounds = int(salt['pbkdf2_rounds']) - if pbkdf2_rounds>0: - password_md5 = binascii.hexlify(hashlib.pbkdf2_hmac('sha256', password_md5_bin, - salt['salt'].encode(), pbkdf2_rounds)).decode() - - password_md5 = self._md5(salt['rnd']+password_md5) - - login = self._get_json('login', { - 'username': self._server_username, - 'password': password_md5 }) - - if not login or 'success' not in login or not login['success']: - logger.warning('Error during login. Password wrong?') - return False - - else: - self._logged_in = True - return True - else: - return False - else: - self._logged_in = True - self._session = login['session'] - return True - else: - return True - - def get_client_status(self, clientname): - - if not self.login(): - return None - - status = self._get_json('status') - - if not status: - return None - - if not 'status' in status: - return None - - for client in status['status']: - - if client['name'] == clientname: - - return client - - logger.warning('Could not find client status. No permission?') - return None - - def download_installer(self, installer_fn, new_clientname, os='linux'): - """Download installer for os, defaults to linux""" - - if not os.lower() in ['linux', 'osx', 'mac', 'windows']: - raise Exception('OS not supported') - - if not self.login(): - return False - - new_client = self._get_json('add_client', {'clientname': new_clientname}) - - if 'already_exists' in new_client: - - status = self.get_client_status(new_clientname) - - if status == None: - return False - - return self._download_file('download_client', installer_fn, {'clientid': status['id'] }) - - - if not 'new_authkey' in new_client: - return False - - return self._download_file('download_client', installer_fn, - { - 'clientid': new_client['new_clientid'], - 'authkey': new_client['new_authkey'], - 'os': os - }) - - def add_client(self, clientname): - - if not self.login(): - return None - - ret = self._get_json('add_client', { 'clientname': clientname}) - if ret==None or 'already_exists' in ret: - return None - - return ret - - def get_global_settings(self): - if not self.login(): - return None - - settings = self._get_json('settings', {'sa': 'general'} ) - - if not settings or not 'settings' in settings: - return None - - return settings['settings'] - - def set_global_setting(self, key, new_value): - if not self.login(): - return False - - settings = self._get_json('settings', {'sa': 'general'} ) - - if not settings or not 'settings' in settings: - return False - - settings['settings'][key] = new_value - settings['settings']['sa'] = 'general_save' - - ret = self._get_json('settings', settings['settings']) - - return ret != None and 'saved_ok' in ret - - def get_client_settings(self, clientname): - - if not self.login(): - return None - - client = self.get_client_status(clientname) - - if client == None: - return None - - clientid = client['id'] - - settings = self._get_json('settings', {'sa': 'clientsettings', - 't_clientid': clientid}) - - if not settings or not 'settings' in settings: - return None - - return settings['settings'] - - def change_client_setting(self, clientname, key, new_value): - if not self.login(): - return False - - client = self.get_client_status(clientname) - if client is None: - return False - - clientid = client['id'] - - current_settings = self.get_client_settings(clientname) - if current_settings is None: - return False - - if key in current_settings: - current_settings[key] = new_value - else: - logger.warning('Key not found in settings') - return False - - save_payload = { - 'overwrite': 'true', - 'sa': 'clientsettings_save', - 't_clientid': clientid - } - - save_payload.update(current_settings) - ret = self._get_json('settings', save_payload) - - return ret is not None and 'saved_ok' in ret - - def get_client_authkey(self, clientname): - - if not self.login(): - return None - - settings = self.get_client_settings(clientname) - - if settings: - return settings['internet_authkey'] - - return None - - def get_server_identity(self): - - if not self.login(): - return None - - status = self._get_json('status') - - if not status: - return None - - if not 'server_identity' in status: - return None - - return status['server_identity'] - - def get_status(self): - if not self.login(): - return None - - status = self._get_json('status') - - if not status: - return None - - if not 'status' in status: - return None - - return status['status'] - - def get_livelog(self, clientid = 0): - if not self.login(): - return None - - log = self._get_json('livelog', {'clientid': clientid, 'lastid': self._lastlogid}) - - if not log: - return None - - if not 'logdata' in log: - return None - - self._lastlogid = log['logdata'][-1]['id'] - - return log['logdata'] - - def get_usage(self): - if not self.login(): - return None - - usage = self._get_json('usage') - - if not usage: - return None - - if not 'usage' in usage: - return None - - return usage['usage'] - - def get_extra_clients(self): - if not self.login(): - return None - - status = self._get_json('status') - - if not status: - return None - - if not 'extra_clients' in status: - return None - - return status['extra_clients'] - - def _start_backup(self, clientname, backup_type): - - client_info = self.get_client_status(clientname) - - if not client_info: - return False - - ret = self._get_json('start_backup', {'start_client': client_info['id'], - 'start_type': backup_type } ) - - if ( ret == None - or 'result' not in ret - or len(ret['result'])!=1 - or 'start_ok' not in ret['result'][0] - or not ret['result'][0]['start_ok']): - return False - - return True - - def start_incr_file_backup(self, clientname): - return self._start_backup(clientname, 'incr_file') - - def start_full_file_backup(self, clientname): - return self._start_backup(clientname, 'full_file') - - def start_incr_image_backup(self, clientname): - return self._start_backup(clientname, 'incr_image') - - def start_full_image_backup(self, clientname): - return self._start_backup(clientname, 'full_image') - - def add_extra_client(self, addr): - if not self.login(): - return None - - ret = self._get_json('status', {'hostname': addr } ) - - if not ret: - return False - - return True - - def remove_extra_client(self, ecid): - if not self.login(): - return None - - ret = self._get_json('status', {'hostname': ecid, - 'remove': 'true' }) - - if not ret: - return False - - return True - - def get_actions(self): - if not self.login(): - return None - - ret = self._get_json('progress') - - if not ret or not 'progress' in ret: - return None - - return ret['progress'] - - def stop_action(self, action): - if (not 'clientid' in action - or not 'id' in action): - return False - - if not self.login(): - return None - - ret = self._get_json('progress', - {'stop_clientid': action['clientid'], - 'stop_id': action['id']}) - - if not ret or not 'progress' in ret: - return False - - return True +from ._legacy import _LegacyAPI +from ._typed import _TypedAPI +from ._types import ( + AccessDeniedError, + ActionProgress, + ActionType, + AuthenticationError, + BackupEntry, + BackupType, + ClientNotFoundError, + ClientStatus, + ExtraClient, + Group, + InstallerOS, + LogEntry, + LogLevel, + UrbackupError, + UsageEntry, + User, + UserAlreadyExistsError, + installer_os, +) + + +class urbackup_server(_LegacyAPI, _TypedAPI): + pass + + +__all__ = [ + "urbackup_server", + "AccessDeniedError", + "ActionProgress", + "ActionType", + "AuthenticationError", + "BackupEntry", + "BackupType", + "ClientNotFoundError", + "ClientStatus", + "ExtraClient", + "Group", + "InstallerOS", + "LogEntry", + "LogLevel", + "UrbackupError", + "UsageEntry", + "User", + "UserAlreadyExistsError", + "installer_os", +] diff --git a/urbackup/_base.py b/urbackup/_base.py new file mode 100644 index 0000000..65a8aa5 --- /dev/null +++ b/urbackup/_base.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import binascii +import hashlib +import logging +import urllib.parse +from base64 import b64encode +from typing import Any + +import requests + +from ._types import ActionType + +logger = logging.getLogger("python-urbackup") + + +class _UrbackupBase: + + action_incr_file = ActionType.INCR_FILE + action_full_file = ActionType.FULL_FILE + action_incr_image = ActionType.INCR_IMAGE + action_full_image = ActionType.FULL_IMAGE + action_resumed_incr_file = ActionType.RESUMED_INCR_FILE + action_resumed_full_file = ActionType.RESUMED_FULL_FILE + action_file_restore = ActionType.FILE_RESTORE + action_image_restore = ActionType.IMAGE_RESTORE + action_client_update = ActionType.CLIENT_UPDATE + action_check_db_integrity = ActionType.CHECK_DB_INTEGRITY + action_backup_db = ActionType.BACKUP_DB + action_recalc_stats = ActionType.RECALC_STATS + + def __init__( + self, + url: str, + username: str, + password: str, + *, + basic_username: str = "", + basic_password: str = "", + ): + self._server_url = url.rstrip("/") + self._server_username = username + self._server_password = password + self.server_basic_username = basic_username + self.server_basic_password = basic_password + self._session = "" + self._logged_in = False + self._lastlogid = 0 + + def _build_headers(self) -> dict[str, str]: + headers: dict[str, str] = {"Accept": "application/json"} + if self.server_basic_username: + credentials = b64encode( + f"{self.server_basic_username}:{self.server_basic_password}".encode() + ).decode("ascii") + headers["Authorization"] = f"Basic {credentials}" + return headers + + def _request( + self, + action: str, + params: dict[str, Any] | None = None, + method: str = "POST", + ) -> requests.Response: + action_url = ( + f"{self._server_url}?{urllib.parse.urlencode({'a': action})}" + ) + params = dict(params) if params else {} + + if self._session: + params["ses"] = self._session + + headers = self._build_headers() + + if method == "POST": + return requests.post(action_url, data=params, headers=headers) + if method == "GET": + return requests.get(action_url, params=params, headers=headers) + raise ValueError(f"Unsupported HTTP method: {method}") + + def _get_json( + self, + action: str, + params: dict[str, Any] | None = None, + retries: int = 50, + ) -> dict[str, Any] | None: + for attempt in range(retries): + response = self._request(action, params) + if response.status_code == 200: + return response.json() + logger.error( + "API call to %s failed (HTTP %d), attempt %d/%d", + action, + response.status_code, + attempt + 1, + retries, + ) + return None + + def _download_file( + self, + action: str, + output_path: str, + params: dict[str, Any], + ) -> bool: + response = self._request(action, params, method="GET") + if response.status_code != 200: + return False + with open(output_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + return True + + @staticmethod + def _md5(s: str) -> str: + return hashlib.md5(s.encode()).hexdigest() + + def _hash_password(self, salt_response: dict[str, Any]) -> str: + salt = salt_response["salt"] + password_md5_bin = hashlib.md5( + (salt + self._server_password).encode() + ).digest() + password_hash = binascii.hexlify(password_md5_bin).decode() + + pbkdf2_rounds = int(salt_response.get("pbkdf2_rounds", 0)) + if pbkdf2_rounds > 0: + password_hash = binascii.hexlify( + hashlib.pbkdf2_hmac( + "sha256", + password_md5_bin, + salt.encode(), + pbkdf2_rounds, + ) + ).decode() + + return self._md5(salt_response["rnd"] + password_hash) + + def login(self) -> bool: + if self._logged_in: + return True + + logger.debug("Trying anonymous login...") + login_resp = self._get_json("login", {}) + + if login_resp and login_resp.get("success"): + self._logged_in = True + self._session = login_resp["session"] + return True + + logger.debug("Logging in as %s...", self._server_username) + salt = self._get_json("salt", {"username": self._server_username}) + + if not salt or "ses" not in salt: + logger.warning("Username %s not found", self._server_username) + return False + + self._session = salt["ses"] + + if "salt" not in salt: + return False + + password_hash = self._hash_password(salt) + + login_resp = self._get_json( + "login", + {"username": self._server_username, "password": password_hash}, + ) + + if not login_resp or not login_resp.get("success"): + logger.warning("Login failed for %s", self._server_username) + return False + + self._logged_in = True + return True diff --git a/urbackup/_legacy.py b/urbackup/_legacy.py new file mode 100644 index 0000000..72af56e --- /dev/null +++ b/urbackup/_legacy.py @@ -0,0 +1,258 @@ +from __future__ import annotations + +import logging +from typing import Any + +from ._base import _UrbackupBase + +logger = logging.getLogger("python-urbackup") + + +class _LegacyAPI(_UrbackupBase): + + def get_client_status(self, clientname: str) -> dict[str, Any] | None: + if not self.login(): + return None + + status = self._get_json("status") + if not status or "status" not in status: + return None + + for client in status["status"]: + if client["name"] == clientname: + return client + + logger.warning("Could not find client %s", clientname) + return None + + def download_installer( + self, + installer_fn: str, + new_clientname: str, + os: str = "linux", + ) -> bool: + if os.lower() not in ("linux", "osx", "mac", "windows"): + raise ValueError(f"Unsupported OS: {os}") + + if not self.login(): + return False + + new_client = self._get_json("add_client", {"clientname": new_clientname}) + if not new_client: + return False + + if "already_exists" in new_client: + status = self.get_client_status(new_clientname) + if status is None: + return False + return self._download_file( + "download_client", + installer_fn, + {"clientid": status["id"]}, + ) + + if "new_authkey" not in new_client: + return False + + return self._download_file( + "download_client", + installer_fn, + { + "clientid": new_client["new_clientid"], + "authkey": new_client["new_authkey"], + "os": os, + }, + ) + + def add_client(self, clientname: str) -> dict[str, Any] | None: + if not self.login(): + return None + ret = self._get_json("add_client", {"clientname": clientname}) + if ret is None or "already_exists" in ret: + return None + return ret + + def get_global_settings(self) -> dict[str, Any] | None: + if not self.login(): + return None + settings = self._get_json("settings", {"sa": "general"}) + if not settings or "settings" not in settings: + return None + return settings["settings"] + + def set_global_setting( + self, + key: str, + new_value: Any, + ) -> bool: + if not self.login(): + return False + settings = self._get_json("settings", {"sa": "general"}) + if not settings or "settings" not in settings: + return False + settings["settings"][key] = new_value + settings["settings"]["sa"] = "general_save" + ret = self._get_json("settings", settings["settings"]) + return ret is not None and "saved_ok" in ret + + def get_client_settings( + self, + clientname: str, + ) -> dict[str, Any] | None: + if not self.login(): + return None + client = self.get_client_status(clientname) + if client is None: + return None + settings = self._get_json( + "settings", + {"sa": "clientsettings", "t_clientid": client["id"]}, + ) + if not settings or "settings" not in settings: + return None + return settings["settings"] + + def change_client_setting( + self, + clientname: str, + key: str, + new_value: Any, + ) -> bool: + if not self.login(): + return False + client = self.get_client_status(clientname) + if client is None: + return False + + current_settings = self.get_client_settings(clientname) + if current_settings is None: + return False + + if key not in current_settings: + logger.warning("Setting key %s not found for client %s", key, clientname) + return False + + current_settings[key] = new_value + save_payload = { + "overwrite": "true", + "sa": "clientsettings_save", + "t_clientid": client["id"], + } + save_payload.update(current_settings) + ret = self._get_json("settings", save_payload) + return ret is not None and "saved_ok" in ret + + def get_client_authkey(self, clientname: str) -> str | None: + if not self.login(): + return None + settings = self.get_client_settings(clientname) + if settings: + return settings.get("internet_authkey") + return None + + def get_server_identity(self) -> str | None: + if not self.login(): + return None + status = self._get_json("status") + if not status or "server_identity" not in status: + return None + return status["server_identity"] + + def get_status(self) -> list[dict[str, Any]] | None: + if not self.login(): + return None + status = self._get_json("status") + if not status or "status" not in status: + return None + return status["status"] + + def get_livelog( + self, + clientid: int = 0, + ) -> list[dict[str, Any]] | None: + if not self.login(): + return None + log = self._get_json( + "livelog", + {"clientid": clientid, "lastid": self._lastlogid}, + ) + if not log or "logdata" not in log: + return None + self._lastlogid = log["logdata"][-1]["id"] + return log["logdata"] + + def get_usage(self) -> list[dict[str, Any]] | None: + if not self.login(): + return None + usage = self._get_json("usage") + if not usage or "usage" not in usage: + return None + return usage["usage"] + + def get_extra_clients(self) -> list[dict[str, Any]] | None: + if not self.login(): + return None + status = self._get_json("status") + if not status or "extra_clients" not in status: + return None + return status["extra_clients"] + + def _start_backup(self, clientname: str, backup_type: str) -> bool: + client_info = self.get_client_status(clientname) + if not client_info: + return False + ret = self._get_json( + "start_backup", + {"start_client": client_info["id"], "start_type": backup_type}, + ) + if ( + ret is None + or "result" not in ret + or len(ret["result"]) != 1 + or not ret["result"][0].get("start_ok") + ): + return False + return True + + def start_incr_file_backup(self, clientname: str) -> bool: + return self._start_backup(clientname, "incr_file") + + def start_full_file_backup(self, clientname: str) -> bool: + return self._start_backup(clientname, "full_file") + + def start_incr_image_backup(self, clientname: str) -> bool: + return self._start_backup(clientname, "incr_image") + + def start_full_image_backup(self, clientname: str) -> bool: + return self._start_backup(clientname, "full_image") + + def add_extra_client(self, addr: str) -> bool: + if not self.login(): + return False + ret = self._get_json("status", {"hostname": addr}) + return ret is not None + + def remove_extra_client(self, ecid: int) -> bool: + if not self.login(): + return False + ret = self._get_json("status", {"hostname": ecid, "remove": "true"}) + return ret is not None + + def get_actions(self) -> list[dict[str, Any]] | None: + if not self.login(): + return None + ret = self._get_json("progress") + if not ret or "progress" not in ret: + return None + return ret["progress"] + + def stop_action(self, action: dict[str, Any]) -> bool: + if "clientid" not in action or "id" not in action: + return False + if not self.login(): + return False + ret = self._get_json( + "progress", + {"stop_clientid": action["clientid"], "stop_id": action["id"]}, + ) + return ret is not None and "progress" in ret diff --git a/urbackup/_typed.py b/urbackup/_typed.py new file mode 100644 index 0000000..8f16e8f --- /dev/null +++ b/urbackup/_typed.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +import logging +from typing import Any + +from ._base import _UrbackupBase +from ._types import ( + AccessDeniedError, + ActionProgress, + AuthenticationError, + BackupEntry, + BackupType, + ClientNotFoundError, + ClientStatus, + ExtraClient, + Group, + InstallerOS, + LogEntry, + UsageEntry, + User, + UserAlreadyExistsError, +) + +logger = logging.getLogger("python-urbackup") + + +class _TypedAPI(_UrbackupBase): + + def _require_login(self) -> None: + if not self.login(): + raise AuthenticationError("Login failed") + + def _get_status_response(self) -> dict[str, Any]: + self._require_login() + status = self._get_json("status") + if not status or "status" not in status: + raise AccessDeniedError("Cannot access server status") + return status + + def get_client_statuses(self) -> list[ClientStatus]: + status = self._get_status_response() + return [ClientStatus.from_dict(c) for c in status["status"]] + + def get_client_status_by_name(self, clientname: str) -> ClientStatus: + status = self._get_status_response() + for client in status["status"]: + if client["name"] == clientname: + return ClientStatus.from_dict(client) + raise ClientNotFoundError(f"Client not found: {clientname}") + + def get_server_identity_str(self) -> str: + status = self._get_status_response() + if "server_identity" not in status: + raise AccessDeniedError("Cannot access server identity") + return status["server_identity"] + + def get_extra_clients_typed(self) -> list[ExtraClient]: + status = self._get_status_response() + return [ + ExtraClient.from_dict(ec) + for ec in status.get("extra_clients", []) + ] + + def get_backups( + self, + clientid: int, + ) -> list[BackupEntry]: + self._require_login() + resp = self._get_json("backups", {"sa": "backups", "clientid": clientid}) + if not resp or "backups" not in resp: + return [] + return [BackupEntry.from_dict(b) for b in resp["backups"]] + + def get_image_backups( + self, + clientid: int, + ) -> list[BackupEntry]: + self._require_login() + resp = self._get_json( + "backups", + {"sa": "backups", "clientid": clientid}, + ) + if not resp or "image_backups" not in resp: + return [] + return [BackupEntry.from_dict(b) for b in resp["image_backups"]] + + def start_backup_typed( + self, + clientname: str, + backup_type: BackupType, + ) -> bool: + client = self.get_client_status_by_name(clientname) + ret = self._get_json( + "start_backup", + {"start_client": client.id, "start_type": backup_type.value}, + ) + if ( + ret is None + or "result" not in ret + or len(ret["result"]) != 1 + or not ret["result"][0].get("start_ok") + ): + return False + return True + + def get_actions_typed(self) -> list[ActionProgress]: + self._require_login() + ret = self._get_json("progress") + if not ret or "progress" not in ret: + return [] + return [ActionProgress.from_dict(a) for a in ret["progress"]] + + def stop_action_typed(self, action: ActionProgress) -> bool: + self._require_login() + ret = self._get_json( + "progress", + {"stop_clientid": action.clientid, "stop_id": action.id}, + ) + return ret is not None and "progress" in ret + + def get_usage_typed(self) -> list[UsageEntry]: + self._require_login() + usage = self._get_json("usage") + if not usage or "usage" not in usage: + return [] + return [UsageEntry.from_dict(u) for u in usage["usage"]] + + def get_livelog_typed( + self, + clientid: int = 0, + ) -> list[LogEntry]: + self._require_login() + log = self._get_json( + "livelog", + {"clientid": clientid, "lastid": self._lastlogid}, + ) + if not log or "logdata" not in log: + return [] + entries = [LogEntry.from_dict(e) for e in log["logdata"]] + if entries: + self._lastlogid = entries[-1].id + return entries + + def get_global_settings_typed(self) -> dict[str, Any]: + self._require_login() + settings = self._get_json("settings", {"sa": "general"}) + if not settings or "settings" not in settings: + raise AccessDeniedError("Cannot access global settings") + return settings["settings"] + + def set_global_setting_typed(self, key: str, value: Any) -> bool: + settings = self.get_global_settings_typed() + settings[key] = value + settings["sa"] = "general_save" + ret = self._get_json("settings", settings) + return ret is not None and "saved_ok" in ret + + def get_client_settings_typed( + self, + clientname: str, + ) -> dict[str, Any]: + client = self.get_client_status_by_name(clientname) + settings = self._get_json( + "settings", + {"sa": "clientsettings", "t_clientid": client.id}, + ) + if not settings or "settings" not in settings: + raise AccessDeniedError( + f"Cannot access settings for client {clientname}" + ) + return settings["settings"] + + def change_client_setting_typed( + self, + clientname: str, + key: str, + new_value: Any, + ) -> bool: + client = self.get_client_status_by_name(clientname) + current_settings = self.get_client_settings_typed(clientname) + + if key not in current_settings: + raise KeyError(f"Setting {key} not found for client {clientname}") + + current_settings[key] = new_value + save_payload = { + "overwrite": "true", + "sa": "clientsettings_save", + "t_clientid": client.id, + } + save_payload.update(current_settings) + ret = self._get_json("settings", save_payload) + return ret is not None and "saved_ok" in ret + + def get_users(self) -> list[User]: + self._require_login() + ret = self._get_json("settings", {"sa": "listusers"}) + if not ret or "users" not in ret: + return [] + return [User.from_dict(u) for u in ret["users"]] + + def get_groups(self) -> list[Group]: + self._require_login() + ret = self._get_json("settings", {"sa": "listgroups"}) + if not ret or "groups" not in ret: + return [] + return [Group.from_dict(g) for g in ret["groups"]] + + def add_user( + self, + username: str, + password: str, + rights: str = "all", + ) -> bool: + self._require_login() + salt = self._get_json("salt", {"username": username}) + if salt and salt.get("salt"): + raise UserAlreadyExistsError(f"User already exists: {username}") + + ret = self._get_json( + "settings", + { + "sa": "updatesettings", + "username": username, + "password": password, + "rights": rights, + }, + ) + return ret is not None and ret.get("saved_ok", False) + + def download_installer_typed( + self, + installer_fn: str, + new_clientname: str, + os: InstallerOS = InstallerOS.LINUX, + ) -> bool: + self._require_login() + new_client = self._get_json("add_client", {"clientname": new_clientname}) + if not new_client: + return False + + if "already_exists" in new_client: + client = self.get_client_status_by_name(new_clientname) + return self._download_file( + "download_client", + installer_fn, + {"clientid": client.id}, + ) + + if "new_authkey" not in new_client: + return False + + return self._download_file( + "download_client", + installer_fn, + { + "clientid": new_client["new_clientid"], + "authkey": new_client["new_authkey"], + "os": os.value, + }, + ) diff --git a/urbackup/_types.py b/urbackup/_types.py new file mode 100644 index 0000000..f63aaf1 --- /dev/null +++ b/urbackup/_types.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import IntEnum, Enum +from typing import Any + + +class InstallerOS(str, Enum): + WINDOWS = "windows" + LINUX = "linux" + + +class BackupType(str, Enum): + INCR_FILE = "incr_file" + FULL_FILE = "full_file" + INCR_IMAGE = "incr_image" + FULL_IMAGE = "full_image" + + +class ActionType(IntEnum): + INCR_FILE = 1 + FULL_FILE = 2 + INCR_IMAGE = 3 + FULL_IMAGE = 4 + RESUMED_INCR_FILE = 5 + RESUMED_FULL_FILE = 6 + FILE_RESTORE = 8 + IMAGE_RESTORE = 9 + CLIENT_UPDATE = 10 + CHECK_DB_INTEGRITY = 11 + BACKUP_DB = 12 + RECALC_STATS = 13 + + +class LogLevel(IntEnum): + ERROR = 1 + WARNING = 2 + INFO = 3 + DEBUG = 4 + + +installer_os = InstallerOS + + +class UrbackupError(Exception): + pass + + +class AuthenticationError(UrbackupError): + pass + + +class ClientNotFoundError(UrbackupError): + pass + + +class AccessDeniedError(UrbackupError): + pass + + +class UserAlreadyExistsError(UrbackupError): + pass + + +@dataclass +class ClientStatus: + id: int + name: str + online: bool = False + lastbackup: int = 0 + lastbackup_image: int = 0 + file_ok: bool = False + image_ok: bool = False + ip: str = "" + client_version_string: str = "" + os_version_string: str = "" + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ClientStatus: + return cls( + id=data["id"], + name=data["name"], + online=data.get("online", False), + lastbackup=data.get("lastbackup", 0), + lastbackup_image=data.get("lastbackup_image", 0), + file_ok=data.get("file_ok", False), + image_ok=data.get("image_ok", False), + ip=data.get("ip", ""), + client_version_string=data.get("client_version_string", ""), + os_version_string=data.get("os_version_string", ""), + raw=data, + ) + + +@dataclass +class BackupEntry: + id: int + clientid: int + backuptime: int + incremental: int = 0 + size_bytes: int = 0 + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BackupEntry: + return cls( + id=data["id"], + clientid=data.get("clientid", 0), + backuptime=data.get("backuptime", 0), + incremental=data.get("incremental", 0), + size_bytes=data.get("size_bytes", 0), + raw=data, + ) + + +@dataclass +class UsageEntry: + name: str + used: int = 0 + files: int = 0 + images: int = 0 + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> UsageEntry: + return cls( + name=data.get("name", ""), + used=data.get("used", 0), + files=data.get("files", 0), + images=data.get("images", 0), + raw=data, + ) + + +@dataclass +class LogEntry: + id: int + message: str + level: int = 0 + time: int = 0 + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> LogEntry: + return cls( + id=data["id"], + message=data.get("msg", data.get("message", "")), + level=data.get("loglevel", data.get("level", 0)), + time=data.get("time", 0), + raw=data, + ) + + +@dataclass +class ActionProgress: + clientid: int + id: int + name: str = "" + action: int = 0 + progress_percent: int = -1 + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ActionProgress: + return cls( + clientid=data["clientid"], + id=data["id"], + name=data.get("name", ""), + action=data.get("action", 0), + progress_percent=data.get("pcdone", data.get("progress_percent", -1)), + raw=data, + ) + + +@dataclass +class ExtraClient: + id: int + hostname: str + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ExtraClient: + return cls( + id=data["id"], + hostname=data.get("hostname", ""), + raw=data, + ) + + +@dataclass +class User: + id: int + name: str + rights: str = "" + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> User: + return cls( + id=data["id"], + name=data["name"], + rights=data.get("rights", ""), + raw=data, + ) + + +@dataclass +class Group: + id: int + name: str + raw: dict[str, Any] = field(default_factory=dict, repr=False) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Group: + return cls( + id=data["id"], + name=data["name"], + raw=data, + )