Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 164 additions & 2 deletions .github/workflows/github-action-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ jobs:
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
# Add a health check so steps wait until Postgres is ready
options: >-
--health-cmd="pg_isready -U postgres"
--health-interval=10s
Expand Down Expand Up @@ -129,20 +128,105 @@ jobs:
psql -h localhost -U postgres -f sql/texera_ddl.sql
psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql
psql -h localhost -U postgres -f sql/texera_lakefs.sql
psql -h localhost -U postgres -f sql/texera_lakekeeper.sql
env:
PGPASSWORD: postgres
- name: Create texera_db_for_test_cases
run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f sql/texera_ddl.sql
env:
PGPASSWORD: postgres
- name: Start MinIO
run: |
docker run -d --name minio --network host \
-e MINIO_ROOT_USER=texera_minio \
-e MINIO_ROOT_PASSWORD=password \
minio/minio:RELEASE.2025-02-28T09-55-16Z server /data

for i in $(seq 1 30); do
curl -sf http://localhost:9000/minio/health/live && break
echo "Waiting for MinIO... (attempt $i)"
sleep 2
done
- name: Start Lakekeeper
run: |
docker run --rm --network host \
-e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \
vakamo/lakekeeper:v0.11.0 migrate

docker run -d --name lakekeeper --network host \
-e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \
-e LAKEKEEPER__METRICS_PORT=9091 \
vakamo/lakekeeper:v0.11.0 serve

for i in $(seq 1 30); do
docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break
echo "Waiting for Lakekeeper to be ready... (attempt $i)"
sleep 2
done

# Final check - fail with logs if Lakekeeper didn't start
docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || {
echo "Lakekeeper failed to start. Container logs:"
docker logs lakekeeper
exit 1
}
- name: Initialize Lakekeeper Warehouse
run: |
docker run --rm --network host --entrypoint sh minio/mc -c \
"mc alias set minio http://localhost:9000 texera_minio password && \
mc mb --ignore-existing minio/texera-iceberg"

curl -sf -X POST -H 'Content-Type: application/json' \
-d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \
http://localhost:8181/management/v1/project || true

curl -sf -X POST -H 'Content-Type: application/json' -d '{
"warehouse-name": "texera",
"project-id": "00000000-0000-0000-0000-000000000000",
"storage-profile": {
"type": "s3",
"bucket": "texera-iceberg",
"region": "us-west-2",
"endpoint": "http://localhost:9000",
"flavor": "s3-compat",
"path-style-access": true,
"sts-enabled": false
},
"storage-credential": {
"type": "s3",
"credential-type": "access-key",
"aws-access-key-id": "texera_minio",
"aws-secret-access-key": "password"
}
}' http://localhost:8181/management/v1/warehouse
- name: Compile with sbt
run: sbt clean package
env:
STORAGE_ICEBERG_CATALOG_TYPE: rest
STORAGE_ICEBERG_CATALOG_REST_URI: http://localhost:8181/catalog/
STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME: texera
STORAGE_S3_ENDPOINT: http://localhost:9000
STORAGE_S3_REGION: us-west-2
STORAGE_S3_AUTH_USERNAME: texera_minio
STORAGE_S3_AUTH_PASSWORD: password
- name: Set docker-java API version
run: |
echo "api.version=1.52" >> ~/.docker-java.properties
cat ~/.docker-java.properties
- name: Run backend tests
run: sbt test
env:
STORAGE_ICEBERG_CATALOG_TYPE: rest
STORAGE_ICEBERG_CATALOG_REST_URI: http://localhost:8181/catalog/
STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME: texera
STORAGE_S3_ENDPOINT: http://localhost:9000
STORAGE_S3_REGION: us-west-2
STORAGE_S3_AUTH_USERNAME: texera_minio
STORAGE_S3_AUTH_PASSWORD: password

python:
strategy:
Expand All @@ -166,9 +250,87 @@ jobs:
run: sudo apt-get update && sudo apt-get install -y postgresql
- name: Start PostgreSQL Service
run: sudo systemctl start postgresql
- name: Configure PostgreSQL for TCP password auth
run: |
sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';"
PG_HBA=$(sudo -u postgres psql -t -c "SHOW hba_file;" | xargs)
sudo sed -i 's/local\s\+all\s\+all\s\+peer/local all all md5/' "$PG_HBA"
echo "host all all 127.0.0.1/32 md5" | sudo tee -a "$PG_HBA"
echo "host all all ::1/128 md5" | sudo tee -a "$PG_HBA"
sudo systemctl restart postgresql
- name: Create Database and User
run: |
cd sql && sudo -u postgres psql -f iceberg_postgres_catalog.sql
cd sql
sudo -u postgres psql -f iceberg_postgres_catalog.sql
sudo -u postgres psql -f texera_lakekeeper.sql
- name: Start MinIO
run: |
docker run -d --name minio --network host \
-e MINIO_ROOT_USER=texera_minio \
-e MINIO_ROOT_PASSWORD=password \
minio/minio:RELEASE.2025-02-28T09-55-16Z server /data

for i in $(seq 1 30); do
curl -sf http://localhost:9000/minio/health/live && break
echo "Waiting for MinIO... (attempt $i)"
sleep 2
done
- name: Start Lakekeeper
run: |
docker run --rm --network host \
-e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \
vakamo/lakekeeper:v0.11.0 migrate

docker run -d --name lakekeeper --network host \
-e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \
-e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \
-e LAKEKEEPER__METRICS_PORT=9091 \
vakamo/lakekeeper:v0.11.0 serve

for i in $(seq 1 30); do
docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break
echo "Waiting for Lakekeeper to be ready... (attempt $i)"
sleep 2
done

# Final check - fail with logs if Lakekeeper didn't start
docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || {
echo "Lakekeeper failed to start. Container logs:"
docker logs lakekeeper
exit 1
}
- name: Initialize Lakekeeper Warehouse
run: |
docker run --rm --network host --entrypoint sh minio/mc -c \
"mc alias set minio http://localhost:9000 texera_minio password && \
mc mb --ignore-existing minio/texera-iceberg"

curl -sf -X POST -H 'Content-Type: application/json' \
-d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \
http://localhost:8181/management/v1/project || true

curl -sf -X POST -H 'Content-Type: application/json' -d '{
"warehouse-name": "texera",
"project-id": "00000000-0000-0000-0000-000000000000",
"storage-profile": {
"type": "s3",
"bucket": "texera-iceberg",
"region": "us-west-2",
"endpoint": "http://localhost:9000",
"flavor": "s3-compat",
"path-style-access": true,
"sts-enabled": false
},
"storage-credential": {
"type": "s3",
"credential-type": "access-key",
"aws-access-key-id": "texera_minio",
"aws-secret-access-key": "password"
}
}' http://localhost:8181/management/v1/warehouse
- name: Lint with Ruff
run: |
cd amber/src/main/python && ruff check . && ruff format --check .
Expand Down
5 changes: 4 additions & 1 deletion amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ bidict==0.22.0
cached_property==1.5.2
psutil==5.9.0
tzlocal==2.1
pyiceberg==0.8.1
pyiceberg==0.9.0
s3fs==2025.9.0
aiobotocore==2.25.1
botocore==1.40.53
readerwriterlock==1.0.9
tenacity==8.5.0
SQLAlchemy==2.0.37
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
from pyiceberg.catalog import Catalog
from typing import Optional

from core.storage.iceberg.iceberg_utils import create_postgres_catalog
from core.storage.iceberg.iceberg_utils import (
create_postgres_catalog,
create_rest_catalog,
)
from core.storage.storage_config import StorageConfig


class IcebergCatalogInstance:
"""
IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
Currently only postgres SQL catalog is supported.
Supports postgres SQL catalog and REST catalog.
- Provides a single shared catalog for all Iceberg table-related operations.
- Lazily initializes the catalog on first access.
- Supports replacing the catalog instance for testing or reconfiguration.
Expand All @@ -39,16 +42,31 @@ def get_instance(cls):
Retrieves the singleton Iceberg catalog instance.
- If the catalog is not initialized, it is lazily created using the configured
properties.
- Supports "postgres" and "rest" catalog types.
:return: the Iceberg catalog instance.
"""
if cls._instance is None:
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE
if catalog_type == "postgres":
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
elif catalog_type == "rest":
cls._instance = create_rest_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME,
StorageConfig.ICEBERG_REST_CATALOG_URI,
StorageConfig.S3_ENDPOINT,
StorageConfig.S3_REGION,
StorageConfig.S3_AUTH_USERNAME,
StorageConfig.S3_AUTH_PASSWORD,
)
else:
raise ValueError(f"Unsupported catalog type: {catalog_type}")
return cls._instance

@classmethod
Expand Down
40 changes: 39 additions & 1 deletion amber/src/main/python/core/storage/iceberg/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pyarrow as pa
import pyiceberg.table
from pyiceberg.catalog import Catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.expressions import AlwaysTrue
from pyiceberg.io.pyarrow import ArrowScan
Expand Down Expand Up @@ -153,6 +153,44 @@ def create_postgres_catalog(
)


def create_rest_catalog(
catalog_name: str,
warehouse_name: str,
rest_uri: str,
s3_endpoint: str,
s3_region: str,
s3_username: str,
s3_password: str,
) -> Catalog:
"""
Creates a REST catalog instance by connecting to a REST endpoint.
- Configures the catalog to interact with a REST endpoint.
- The warehouse_name parameter specifies the warehouse identifier (name for Lakekeeper).
- Configures S3FileIO for MinIO/S3 storage backend.
:param catalog_name: the name of the catalog.
:param warehouse_name: the warehouse identifier (name for Lakekeeper).
:param rest_uri: the URI of the REST catalog endpoint.
:param s3_endpoint: the S3 endpoint URL.
:param s3_region: the S3 region.
:param s3_username: the S3 access key ID.
:param s3_password: the S3 secret access key.
:return: a Catalog instance (REST catalog).
"""
return load_catalog(
catalog_name,
**{
"type": "rest",
"uri": rest_uri,
"warehouse": warehouse_name,
"s3.endpoint": s3_endpoint,
"s3.access-key-id": s3_username,
"s3.secret-access-key": s3_password,
"s3.region": s3_region,
"s3.path-style-access": "true",
},
)


def create_table(
catalog: Catalog,
table_namespace: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@

# Hardcoded storage config only for test purposes.
StorageConfig.initialize(
catalog_type="rest",
postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog",
postgres_username="texera",
postgres_password="password",
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="operator-port-result",
directory_path="../../../../../../amber/user-resources/workflow-results",
commit_batch_size=4096,
s3_endpoint="http://localhost:9000",
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
s3_region="us-west-2",
s3_auth_username="texera_minio",
s3_auth_password="password",
)


Expand Down
12 changes: 11 additions & 1 deletion amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ class StorageConfig:

_initialized = False

ICEBERG_CATALOG_TYPE = None
ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None
ICEBERG_POSTGRES_CATALOG_USERNAME = None
ICEBERG_POSTGRES_CATALOG_PASSWORD = None
ICEBERG_REST_CATALOG_URI = None
ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None

# S3 configs (for large_binary_manager module)
# S3 configs
S3_ENDPOINT = None
S3_REGION = None
S3_AUTH_USERNAME = None
Expand All @@ -41,9 +44,12 @@ class StorageConfig:
@classmethod
def initialize(
cls,
catalog_type,
postgres_uri_without_scheme,
postgres_username,
postgres_password,
rest_catalog_uri,
rest_catalog_warehouse_name,
table_result_namespace,
directory_path,
commit_batch_size,
Expand All @@ -57,9 +63,13 @@ def initialize(
"Storage config has already been initialized and cannot be modified."
)

cls.ICEBERG_CATALOG_TYPE = catalog_type
cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme
cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username
cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri
cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name

cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
Expand Down
Loading
Loading