From 3f08775221bb780d9ce33cbef8861a5ec7936ce0 Mon Sep 17 00:00:00 2001 From: mengw15 <125719918+mengw15@users.noreply.github.com> Date: Mon, 9 Mar 2026 03:49:06 -0700 Subject: [PATCH 1/2] lakekeeper core change, migration --- amber/requirements.txt | 5 +- .../iceberg/iceberg_catalog_instance.py | 36 ++++++++++---- .../core/storage/iceberg/iceberg_utils.py | 40 +++++++++++++++- .../storage/iceberg/test_iceberg_document.py | 9 ++-- .../python/core/storage/storage_config.py | 12 ++++- .../storage/test_large_binary_manager.py | 9 ++-- .../main/python/texera_run_python_worker.py | 6 +++ .../pythonworker/PythonWorkflowWorker.scala | 3 ++ common/config/src/main/resources/storage.conf | 11 +++++ .../amber/config/EnvironmentalVariable.scala | 1 + .../texera/amber/config/StorageConfig.scala | 4 +- common/workflow-core/build.sbt | 19 +++++++- .../core/storage/IcebergCatalogInstance.scala | 2 +- .../result/iceberg/IcebergTableWriter.scala | 13 ++++- .../texera/amber/util/IcebergUtil.scala | 47 +++++++++++++++---- .../ComputingUnitManagingResource.scala | 2 + sql/texera_lakekeeper.sql | 21 +++++++++ 17 files changed, 206 insertions(+), 34 deletions(-) create mode 100644 sql/texera_lakekeeper.sql diff --git a/amber/requirements.txt b/amber/requirements.txt index 803ab682d5e..8cca5d201f8 100644 --- a/amber/requirements.txt +++ b/amber/requirements.txt @@ -43,7 +43,10 @@ bidict==0.22.0 cached_property==1.5.2 psutil==5.9.0 tzlocal==2.1 -pyiceberg==0.8.1 +pyiceberg==0.9.0 +s3fs==2025.9.0 +aiobotocore==2.25.1 +botocore==1.40.53 readerwriterlock==1.0.9 tenacity==8.5.0 SQLAlchemy==2.0.37 diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py index b1478fadf03..0059808f9f8 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py @@ -18,14 +18,17 @@ from pyiceberg.catalog import Catalog from typing import Optional -from core.storage.iceberg.iceberg_utils import create_postgres_catalog +from core.storage.iceberg.iceberg_utils import ( + create_postgres_catalog, + create_rest_catalog, +) from core.storage.storage_config import StorageConfig class IcebergCatalogInstance: """ IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance. - Currently only postgres SQL catalog is supported. + Supports postgres SQL catalog and REST catalog. - Provides a single shared catalog for all Iceberg table-related operations. - Lazily initializes the catalog on first access. - Supports replacing the catalog instance for testing or reconfiguration. @@ -39,16 +42,31 @@ def get_instance(cls): Retrieves the singleton Iceberg catalog instance. - If the catalog is not initialized, it is lazily created using the configured properties. + - Supports "postgres" and "rest" catalog types. :return: the Iceberg catalog instance. """ if cls._instance is None: - cls._instance = create_postgres_catalog( - "texera_iceberg", - StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH, - StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME, - StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME, - StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD, - ) + catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE + if catalog_type == "postgres": + cls._instance = create_postgres_catalog( + "texera_iceberg", + StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH, + StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME, + StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME, + StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD, + ) + elif catalog_type == "rest": + cls._instance = create_rest_catalog( + "texera_iceberg", + StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME, + StorageConfig.ICEBERG_REST_CATALOG_URI, + StorageConfig.S3_ENDPOINT, + StorageConfig.S3_REGION, + StorageConfig.S3_AUTH_USERNAME, + StorageConfig.S3_AUTH_PASSWORD, + ) + else: + raise ValueError(f"Unsupported catalog type: {catalog_type}") return cls._instance @classmethod diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 9e17b2e0e82..c1f9df2e403 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -17,7 +17,7 @@ import pyarrow as pa import pyiceberg.table -from pyiceberg.catalog import Catalog +from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.expressions import AlwaysTrue from pyiceberg.io.pyarrow import ArrowScan @@ -153,6 +153,44 @@ def create_postgres_catalog( ) +def create_rest_catalog( + catalog_name: str, + warehouse_name: str, + rest_uri: str, + s3_endpoint: str, + s3_region: str, + s3_username: str, + s3_password: str, +) -> Catalog: + """ + Creates a REST catalog instance by connecting to a REST endpoint. + - Configures the catalog to interact with a REST endpoint. + - The warehouse_name parameter specifies the warehouse identifier (name for Lakekeeper). + - Configures S3FileIO for MinIO/S3 storage backend. + :param catalog_name: the name of the catalog. + :param warehouse_name: the warehouse identifier (name for Lakekeeper). + :param rest_uri: the URI of the REST catalog endpoint. + :param s3_endpoint: the S3 endpoint URL. + :param s3_region: the S3 region. + :param s3_username: the S3 access key ID. + :param s3_password: the S3 secret access key. + :return: a Catalog instance (REST catalog). + """ + return load_catalog( + catalog_name, + **{ + "type": "rest", + "uri": rest_uri, + "warehouse": warehouse_name, + "s3.endpoint": s3_endpoint, + "s3.access-key-id": s3_username, + "s3.secret-access-key": s3_password, + "s3.region": s3_region, + "s3.path-style-access": "true", + }, + ) + + def create_table( catalog: Catalog, table_namespace: str, diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py index 34711beb652..ebdd3d0788c 100644 --- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -37,16 +37,19 @@ # Hardcoded storage config only for test purposes. StorageConfig.initialize( + catalog_type="postgres", postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog", postgres_username="texera", postgres_password="password", + rest_catalog_uri="http://localhost:8181/catalog/", + rest_catalog_warehouse_name="texera", table_result_namespace="operator-port-result", directory_path="../../../../../../amber/user-resources/workflow-results", commit_batch_size=4096, s3_endpoint="http://localhost:9000", - s3_region="us-east-1", - s3_auth_username="minioadmin", - s3_auth_password="minioadmin", + s3_region="us-west-2", + s3_auth_username="texera_minio", + s3_auth_password="password", ) diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index c55495ea14c..0e47bdb71ae 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -25,14 +25,17 @@ class StorageConfig: _initialized = False + ICEBERG_CATALOG_TYPE = None ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None ICEBERG_POSTGRES_CATALOG_USERNAME = None ICEBERG_POSTGRES_CATALOG_PASSWORD = None + ICEBERG_REST_CATALOG_URI = None + ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None ICEBERG_TABLE_RESULT_NAMESPACE = None ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None ICEBERG_TABLE_COMMIT_BATCH_SIZE = None - # S3 configs (for large_binary_manager module) + # S3 configs S3_ENDPOINT = None S3_REGION = None S3_AUTH_USERNAME = None @@ -41,9 +44,12 @@ class StorageConfig: @classmethod def initialize( cls, + catalog_type, postgres_uri_without_scheme, postgres_username, postgres_password, + rest_catalog_uri, + rest_catalog_warehouse_name, table_result_namespace, directory_path, commit_batch_size, @@ -57,9 +63,13 @@ def initialize( "Storage config has already been initialized and cannot be modified." ) + cls.ICEBERG_CATALOG_TYPE = catalog_type cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password + cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri + cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name + cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size) diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py index a657f244f38..f935fceb600 100644 --- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py @@ -27,16 +27,19 @@ def setup_storage_config(self): """Initialize StorageConfig for tests.""" if not StorageConfig._initialized: StorageConfig.initialize( + catalog_type="postgres", postgres_uri_without_scheme="localhost:5432/test", postgres_username="test", postgres_password="test", + rest_catalog_uri="http://localhost:8181/catalog/", + rest_catalog_warehouse_name="texera", table_result_namespace="test", directory_path="/tmp/test", commit_batch_size=1000, s3_endpoint="http://localhost:9000", - s3_region="us-east-1", - s3_auth_username="minioadmin", - s3_auth_password="minioadmin", + s3_region="us-west-2", + s3_auth_username="texera_minio", + s3_auth_password="password", ) def test_get_s3_client_initializes_once(self): diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index 3ebf81c201f..8687298f819 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -45,9 +45,12 @@ def init_loguru_logger(stream_log_level) -> None: output_port, logger_level, r_path, + iceberg_catalog_type, iceberg_postgres_catalog_uri_without_scheme, iceberg_postgres_catalog_username, iceberg_postgres_catalog_password, + iceberg_rest_catalog_uri, + iceberg_rest_catalog_warehouse_name, iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, @@ -58,9 +61,12 @@ def init_loguru_logger(stream_log_level) -> None: ) = sys.argv init_loguru_logger(logger_level) StorageConfig.initialize( + iceberg_catalog_type, iceberg_postgres_catalog_uri_without_scheme, iceberg_postgres_catalog_username, iceberg_postgres_catalog_password, + iceberg_rest_catalog_uri, + iceberg_rest_catalog_warehouse_name, iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index 558b99c9b7b..d2bc5f50253 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -178,9 +178,12 @@ class PythonWorkflowWorker( Integer.toString(pythonProxyServer.getPortNumber.get()), UdfConfig.pythonLogStreamHandlerLevel, RENVPath, + StorageConfig.icebergCatalogType, StorageConfig.icebergPostgresCatalogUriWithoutScheme, StorageConfig.icebergPostgresCatalogUsername, StorageConfig.icebergPostgresCatalogPassword, + StorageConfig.icebergRESTCatalogUri, + StorageConfig.icebergRESTCatalogWarehouseName, StorageConfig.icebergTableResultNamespace, StorageConfig.fileStorageDirectoryPath.toString, StorageConfig.icebergTableCommitBatchSize.toString, diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 276d1491cdb..3e04053d7dc 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -27,6 +27,17 @@ storage { rest-uri = "" rest-uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI} # the uri of the rest catalog, not needed unless using REST catalog + rest { + uri = "http://localhost:8181/catalog/" + uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI} + warehouse-name = "texera" + warehouse-name = ${?STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME} + region = "us-west-2" + region = ${?STORAGE_ICEBERG_CATALOG_REST_REGION} + s3-bucket = "texera-iceberg" + s3-bucket = ${?STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET} + } + postgres { # do not include scheme in the uri as Python and Java use different schemes uri-without-scheme = "localhost:5432/texera_iceberg_catalog" diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala index 1adc3233055..339b57f52a4 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala @@ -52,6 +52,7 @@ object EnvironmentalVariable { // Iceberg Catalog val ENV_ICEBERG_CATALOG_TYPE = "STORAGE_ICEBERG_CATALOG_TYPE" val ENV_ICEBERG_CATALOG_REST_URI = "STORAGE_ICEBERG_CATALOG_REST_URI" + val ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME = "STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME" // Iceberg Postgres Catalog val ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME = diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index 3bc1e05a9b5..728e3c0c2de 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -36,7 +36,9 @@ object StorageConfig { // Iceberg specifics val icebergCatalogType: String = conf.getString("storage.iceberg.catalog.type") - val icebergRESTCatalogUri: String = conf.getString("storage.iceberg.catalog.rest-uri") + val icebergRESTCatalogUri: String = conf.getString("storage.iceberg.catalog.rest.uri") + val icebergRESTCatalogWarehouseName: String = + conf.getString("storage.iceberg.catalog.rest.warehouse-name") // Iceberg Postgres specifics val icebergPostgresCatalogUriWithoutScheme: String = diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt index db916685138..4f9c37b1719 100644 --- a/common/workflow-core/build.sbt +++ b/common/workflow-core/build.sbt @@ -134,10 +134,14 @@ dependencyOverrides ++= Seq( "io.netty" % "netty-codec" % nettyVersion, "io.netty" % "netty-codec-http" % nettyVersion, "io.netty" % "netty-codec-http2" % nettyVersion, + "io.netty" % "netty-codec-socks" % nettyVersion, "io.netty" % "netty-common" % nettyVersion, "io.netty" % "netty-handler" % nettyVersion, + "io.netty" % "netty-handler-proxy" % nettyVersion, "io.netty" % "netty-resolver" % nettyVersion, "io.netty" % "netty-transport" % nettyVersion, + "io.netty" % "netty-transport-classes-epoll" % nettyVersion, + "io.netty" % "netty-transport-native-epoll" % nettyVersion, "io.netty" % "netty-transport-native-unix-common" % nettyVersion ) @@ -167,6 +171,10 @@ libraryDependencies ++= Seq( excludeJackson, excludeJacksonModule ), + "org.apache.iceberg" % "iceberg-aws" % "1.7.1" excludeAll( + excludeJackson, + excludeJacksonModule + ), "org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll( excludeXmlBind, excludeGlassfishJersey, @@ -208,6 +216,13 @@ libraryDependencies ++= Seq( "software.amazon.awssdk" % "s3" % "2.29.51" excludeAll( ExclusionRule(organization = "io.netty") ), - "software.amazon.awssdk" % "auth" % "2.29.51", - "software.amazon.awssdk" % "regions" % "2.29.51", + "software.amazon.awssdk" % "auth" % "2.29.51" excludeAll( + ExclusionRule(organization = "io.netty") + ), + "software.amazon.awssdk" % "regions" % "2.29.51" excludeAll( + ExclusionRule(organization = "io.netty") + ), + "software.amazon.awssdk" % "sts" % "2.29.51" excludeAll( + ExclusionRule(organization = "io.netty") + ), ) \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala index e3512874c9b..bb9f2d8bf2d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala @@ -52,7 +52,7 @@ object IcebergCatalogInstance { case "rest" => IcebergUtil.createRestCatalog( "texera_iceberg", - StorageConfig.fileStorageDirectoryPath + StorageConfig.icebergRESTCatalogWarehouseName ) case "postgres" => IcebergUtil.createPostgresCatalog( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 549cb4b9d17..dd2e40bc30d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -107,10 +107,19 @@ private[storage] class IcebergTableWriter[T]( private def flushBuffer(): Unit = { if (buffer.nonEmpty) { // Create a unique file path using the writer's identifier and the filename index - val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}") + // Handle S3 URIs (s3://) differently from local file paths to preserve URI format + val location = table.location() + val filepathString = if (location.startsWith("s3://")) { + // For S3 URIs, append path component directly as string to preserve s3:// format + val basePath = if (location.endsWith("/")) location else s"$location/" + s"$basePath${writerIdentifier}_${filenameIdx}" + } else { + // For local file paths, use Paths.get() for proper path resolution + Paths.get(location).resolve(s"${writerIdentifier}_${filenameIdx}").toString + } // Increment the filename index by 1 filenameIdx += 1 - val outputFile: OutputFile = table.io().newOutputFile(filepath.toString) + val outputFile: OutputFile = table.io().newOutputFile(filepathString) // Create a Parquet data writer to write a new file val dataWriter: DataWriter[Record] = Parquet .writeData(outputFile) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala index ad6ac07c1ff..39f010ef3fb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala @@ -22,9 +22,10 @@ package org.apache.texera.amber.util import org.apache.texera.amber.config.StorageConfig import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, LargeBinary, Schema, Tuple} import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.catalog.{Catalog, TableIdentifier} +import org.apache.iceberg.catalog.{Catalog, SupportsNamespaces, TableIdentifier} import org.apache.iceberg.data.parquet.GenericParquetReaders import org.apache.iceberg.data.{GenericRecord, Record} +import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO} import org.apache.iceberg.io.{CloseableIterable, InputFile} import org.apache.iceberg.jdbc.JdbcCatalog @@ -40,6 +41,8 @@ import org.apache.iceberg.{ TableProperties, Schema => IcebergSchema } +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.exceptions.AlreadyExistsException import java.nio.ByteBuffer import java.nio.file.Path @@ -96,22 +99,32 @@ object IcebergUtil { * TODO: Add authentication support, such as OAuth2, using `OAuth2Properties`. * * @param catalogName the name of the catalog. - * @param warehouse the root path for the warehouse where the tables are stored. + * @param warehouse the warehouse identifier (for Lakekeeper). * @return the initialized RESTCatalog instance. */ def createRestCatalog( catalogName: String, - warehouse: Path + warehouse: String ): RESTCatalog = { val catalog = new RESTCatalog() - catalog.initialize( - catalogName, - Map( - "warehouse" -> warehouse.toString, - CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri, - CatalogProperties.FILE_IO_IMPL -> classOf[HadoopFileIO].getName - ).asJava + + // Build base properties map + var properties = Map( + "warehouse" -> warehouse, + CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri + ) + + properties = properties ++ Map( + CatalogProperties.FILE_IO_IMPL -> classOf[S3FileIO].getName, + // S3FileIO configuration for MinIO + "s3.endpoint" -> StorageConfig.s3Endpoint, + "s3.access-key-id" -> StorageConfig.s3Username, + "s3.secret-access-key" -> StorageConfig.s3Password, + "s3.region" -> StorageConfig.s3Region, + "s3.path-style-access" -> "true" ) + + catalog.initialize(catalogName, properties.asJava) catalog } @@ -165,6 +178,20 @@ object IcebergUtil { TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString ) + val namespace = Namespace.of(tableNamespace) + + catalog match { + case nsCatalog: SupportsNamespaces => + try nsCatalog.createNamespace(namespace, Map.empty[String, String].asJava) + catch { + case _: AlreadyExistsException => () + } + case _ => + throw new IllegalArgumentException( + s"Catalog ${catalog.getClass.getName} does not support namespaces" + ) + } + val identifier = TableIdentifier.of(tableNamespace, tableName) if (catalog.tableExists(identifier) && overrideIfExists) { catalog.dropTable(identifier) diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala index 1249d067835..9b214b9755c 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala @@ -69,6 +69,8 @@ object ComputingUnitManagingResource { private lazy val computingUnitEnvironmentVariables: Map[String, Any] = Map( // Variables for saving results to Iceberg EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> StorageConfig.icebergCatalogType, + EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI -> StorageConfig.icebergRESTCatalogUri, + EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME -> StorageConfig.icebergRESTCatalogWarehouseName, EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME -> StorageConfig.icebergPostgresCatalogUriWithoutScheme, EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME -> StorageConfig.icebergPostgresCatalogUsername, EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD -> StorageConfig.icebergPostgresCatalogPassword, diff --git a/sql/texera_lakekeeper.sql b/sql/texera_lakekeeper.sql new file mode 100644 index 00000000000..afdca6946cc --- /dev/null +++ b/sql/texera_lakekeeper.sql @@ -0,0 +1,21 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +\c postgres + +DROP DATABASE IF EXISTS texera_lakekeeper; +CREATE DATABASE texera_lakekeeper; From 42d0b32ba61972e29b45790e75b8acc061639c60 Mon Sep 17 00:00:00 2001 From: mengw15 <125719918+mengw15@users.noreply.github.com> Date: Mon, 9 Mar 2026 04:01:13 -0700 Subject: [PATCH 2/2] lakekeeper CI related --- .github/workflows/github-action-build.yml | 166 +++++++++++++++++- .../storage/iceberg/test_iceberg_document.py | 2 +- .../storage/test_large_binary_manager.py | 2 +- common/config/src/main/resources/storage.conf | 2 +- common/workflow-core/build.sbt | 4 + 5 files changed, 171 insertions(+), 5 deletions(-) diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index af2a60920d5..0bbe6fd83d7 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -97,7 +97,6 @@ jobs: POSTGRES_PASSWORD: postgres ports: - 5432:5432 - # Add a health check so steps wait until Postgres is ready options: >- --health-cmd="pg_isready -U postgres" --health-interval=10s @@ -129,20 +128,105 @@ jobs: psql -h localhost -U postgres -f sql/texera_ddl.sql psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql psql -h localhost -U postgres -f sql/texera_lakefs.sql + psql -h localhost -U postgres -f sql/texera_lakekeeper.sql env: PGPASSWORD: postgres - name: Create texera_db_for_test_cases run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f sql/texera_ddl.sql env: PGPASSWORD: postgres + - name: Start MinIO + run: | + docker run -d --name minio --network host \ + -e MINIO_ROOT_USER=texera_minio \ + -e MINIO_ROOT_PASSWORD=password \ + minio/minio:RELEASE.2025-02-28T09-55-16Z server /data + + for i in $(seq 1 30); do + curl -sf http://localhost:9000/minio/health/live && break + echo "Waiting for MinIO... (attempt $i)" + sleep 2 + done + - name: Start Lakekeeper + run: | + docker run --rm --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + vakamo/lakekeeper:v0.11.0 migrate + + docker run -d --name lakekeeper --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + -e LAKEKEEPER__METRICS_PORT=9091 \ + vakamo/lakekeeper:v0.11.0 serve + + for i in $(seq 1 30); do + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break + echo "Waiting for Lakekeeper to be ready... (attempt $i)" + sleep 2 + done + + # Final check - fail with logs if Lakekeeper didn't start + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || { + echo "Lakekeeper failed to start. Container logs:" + docker logs lakekeeper + exit 1 + } + - name: Initialize Lakekeeper Warehouse + run: | + docker run --rm --network host --entrypoint sh minio/mc -c \ + "mc alias set minio http://localhost:9000 texera_minio password && \ + mc mb --ignore-existing minio/texera-iceberg" + + curl -sf -X POST -H 'Content-Type: application/json' \ + -d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \ + http://localhost:8181/management/v1/project || true + + curl -sf -X POST -H 'Content-Type: application/json' -d '{ + "warehouse-name": "texera", + "project-id": "00000000-0000-0000-0000-000000000000", + "storage-profile": { + "type": "s3", + "bucket": "texera-iceberg", + "region": "us-west-2", + "endpoint": "http://localhost:9000", + "flavor": "s3-compat", + "path-style-access": true, + "sts-enabled": false + }, + "storage-credential": { + "type": "s3", + "credential-type": "access-key", + "aws-access-key-id": "texera_minio", + "aws-secret-access-key": "password" + } + }' http://localhost:8181/management/v1/warehouse - name: Compile with sbt run: sbt clean package + env: + STORAGE_ICEBERG_CATALOG_TYPE: rest + STORAGE_ICEBERG_CATALOG_REST_URI: http://localhost:8181/catalog/ + STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME: texera + STORAGE_S3_ENDPOINT: http://localhost:9000 + STORAGE_S3_REGION: us-west-2 + STORAGE_S3_AUTH_USERNAME: texera_minio + STORAGE_S3_AUTH_PASSWORD: password - name: Set docker-java API version run: | echo "api.version=1.52" >> ~/.docker-java.properties cat ~/.docker-java.properties - name: Run backend tests run: sbt test + env: + STORAGE_ICEBERG_CATALOG_TYPE: rest + STORAGE_ICEBERG_CATALOG_REST_URI: http://localhost:8181/catalog/ + STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME: texera + STORAGE_S3_ENDPOINT: http://localhost:9000 + STORAGE_S3_REGION: us-west-2 + STORAGE_S3_AUTH_USERNAME: texera_minio + STORAGE_S3_AUTH_PASSWORD: password python: strategy: @@ -166,9 +250,87 @@ jobs: run: sudo apt-get update && sudo apt-get install -y postgresql - name: Start PostgreSQL Service run: sudo systemctl start postgresql + - name: Configure PostgreSQL for TCP password auth + run: | + sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';" + PG_HBA=$(sudo -u postgres psql -t -c "SHOW hba_file;" | xargs) + sudo sed -i 's/local\s\+all\s\+all\s\+peer/local all all md5/' "$PG_HBA" + echo "host all all 127.0.0.1/32 md5" | sudo tee -a "$PG_HBA" + echo "host all all ::1/128 md5" | sudo tee -a "$PG_HBA" + sudo systemctl restart postgresql - name: Create Database and User run: | - cd sql && sudo -u postgres psql -f iceberg_postgres_catalog.sql + cd sql + sudo -u postgres psql -f iceberg_postgres_catalog.sql + sudo -u postgres psql -f texera_lakekeeper.sql + - name: Start MinIO + run: | + docker run -d --name minio --network host \ + -e MINIO_ROOT_USER=texera_minio \ + -e MINIO_ROOT_PASSWORD=password \ + minio/minio:RELEASE.2025-02-28T09-55-16Z server /data + + for i in $(seq 1 30); do + curl -sf http://localhost:9000/minio/health/live && break + echo "Waiting for MinIO... (attempt $i)" + sleep 2 + done + - name: Start Lakekeeper + run: | + docker run --rm --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + vakamo/lakekeeper:v0.11.0 migrate + + docker run -d --name lakekeeper --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + -e LAKEKEEPER__METRICS_PORT=9091 \ + vakamo/lakekeeper:v0.11.0 serve + + for i in $(seq 1 30); do + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break + echo "Waiting for Lakekeeper to be ready... (attempt $i)" + sleep 2 + done + + # Final check - fail with logs if Lakekeeper didn't start + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || { + echo "Lakekeeper failed to start. Container logs:" + docker logs lakekeeper + exit 1 + } + - name: Initialize Lakekeeper Warehouse + run: | + docker run --rm --network host --entrypoint sh minio/mc -c \ + "mc alias set minio http://localhost:9000 texera_minio password && \ + mc mb --ignore-existing minio/texera-iceberg" + + curl -sf -X POST -H 'Content-Type: application/json' \ + -d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \ + http://localhost:8181/management/v1/project || true + + curl -sf -X POST -H 'Content-Type: application/json' -d '{ + "warehouse-name": "texera", + "project-id": "00000000-0000-0000-0000-000000000000", + "storage-profile": { + "type": "s3", + "bucket": "texera-iceberg", + "region": "us-west-2", + "endpoint": "http://localhost:9000", + "flavor": "s3-compat", + "path-style-access": true, + "sts-enabled": false + }, + "storage-credential": { + "type": "s3", + "credential-type": "access-key", + "aws-access-key-id": "texera_minio", + "aws-secret-access-key": "password" + } + }' http://localhost:8181/management/v1/warehouse - name: Lint with Ruff run: | cd amber/src/main/python && ruff check . && ruff format --check . diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py index ebdd3d0788c..3cc48da6bdf 100644 --- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -37,7 +37,7 @@ # Hardcoded storage config only for test purposes. StorageConfig.initialize( - catalog_type="postgres", + catalog_type="rest", postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog", postgres_username="texera", postgres_password="password", diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py index f935fceb600..82537457e69 100644 --- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py @@ -27,7 +27,7 @@ def setup_storage_config(self): """Initialize StorageConfig for tests.""" if not StorageConfig._initialized: StorageConfig.initialize( - catalog_type="postgres", + catalog_type="rest", postgres_uri_without_scheme="localhost:5432/test", postgres_username="test", postgres_password="test", diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 3e04053d7dc..73a6ff8583d 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -21,7 +21,7 @@ storage { # Configuration for Apache Iceberg, used for storing the workflow results & stats iceberg { catalog { - type = postgres # either hadoop, rest, or postgres + type = rest # either hadoop, rest, or postgres type = ${?STORAGE_ICEBERG_CATALOG_TYPE} rest-uri = "" diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt index 4f9c37b1719..4488c74dd57 100644 --- a/common/workflow-core/build.sbt +++ b/common/workflow-core/build.sbt @@ -37,6 +37,10 @@ ThisBuild / conflictManager := ConflictManager.latestRevision // Restrict parallel execution of tests to avoid conflicts Global / concurrentRestrictions += Tags.limit(Tags.Test, 1) +// Fork a separate JVM for tests to avoid sbt classloader conflicts +// (iceberg-aws S3FileIO hits ClassCastException with layered classloaders) +Test / fork := true +Test / baseDirectory := (ThisBuild / baseDirectory).value ///////////////////////////////////////////////////////////////////////////// // Compiler Options