diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/DefaultStatementParser.java b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/DefaultStatementParser.java index 327e7247f..43d36d7f4 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/DefaultStatementParser.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/DefaultStatementParser.java @@ -16,13 +16,76 @@ */ package io.datavines.connector.plugin; +import com.alibaba.druid.DbType; +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.visitor.SchemaStatVisitor; +import com.alibaba.druid.stat.TableStat; import io.datavines.connector.api.StatementParser; import io.datavines.connector.api.entity.StatementMetadataFragment; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class DefaultStatementParser implements StatementParser { + private final DbType dbType; + + public DefaultStatementParser() { + this(null); + } + + public DefaultStatementParser(DbType dbType) { + this.dbType = dbType; + } + @Override public StatementMetadataFragment parseStatement(String statement) { - return null; + if (statement == null || statement.trim().isEmpty()) { + return null; + } + + List stmtList = SQLUtils.parseStatements(statement, dbType); + if (stmtList == null || stmtList.isEmpty()) { + return null; + } + + List inputTables = new ArrayList<>(); + List outputTables = new ArrayList<>(); + + for (SQLStatement sqlStatement : stmtList) { + SchemaStatVisitor visitor = SQLUtils.createSchemaStatVisitor(dbType); + sqlStatement.accept(visitor); + + Map tables = visitor.getTables(); + if (tables == null || tables.isEmpty()) { + continue; + } + + for (Map.Entry entry : tables.entrySet()) { + String tableName = entry.getKey().getName(); + TableStat stat = entry.getValue(); + + if (stat.getInsertCount() > 0 || stat.getCreateCount() > 0 + || stat.getMergeCount() > 0) { + if (!outputTables.contains(tableName)) { + outputTables.add(tableName); + } + } + + if (stat.getSelectCount() > 0) { + if (!inputTables.contains(tableName)) { + inputTables.add(tableName); + } + } + } + } + + if (inputTables.isEmpty() && outputTables.isEmpty()) { + return null; + } + + return new StatementMetadataFragment(inputTables, outputTables, new ArrayList<>()); } } diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-trino/src/main/java/io/datavines/connector/plugin/TrinoConnector.java b/datavines-connector/datavines-connector-plugins/datavines-connector-trino/src/main/java/io/datavines/connector/plugin/TrinoConnector.java index 3b3d505f2..33ee198a1 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-trino/src/main/java/io/datavines/connector/plugin/TrinoConnector.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-trino/src/main/java/io/datavines/connector/plugin/TrinoConnector.java @@ -93,9 +93,4 @@ public ConnectorResponse testConnect(TestConnectionRequestParam param) { .build(); } } - - @Override - public List keyProperties() { - return Arrays.asList("host","port","catalog","database"); - } } diff --git a/datavines-core/src/main/java/io/datavines/core/enums/Status.java b/datavines-core/src/main/java/io/datavines/core/enums/Status.java index 715cde826..1357c32ec 100644 --- a/datavines-core/src/main/java/io/datavines/core/enums/Status.java +++ b/datavines-core/src/main/java/io/datavines/core/enums/Status.java @@ -16,6 +16,7 @@ */ package io.datavines.core.enums; +import lombok.Getter; import org.springframework.context.i18n.LocaleContextHolder; import java.util.Locale; @@ -49,7 +50,7 @@ public enum Status { OLD_PASSWORD_IS_INCORRECT_ERROR(10020004, "Old Password is Incorrect", "旧密码错误"), NEW_PASSWORD_CONFIRM_IS_INCORRECT_ERROR(10020004, "New Password Confirm is Incorrect", "新密码确认错误"), - WORKSPACE_EXIST_ERROR(11010001, "WorkSpace {0} is Exist error", "工作空间 {0} 已存在错误"), + WORKSPACE_EXIST_ERROR(11010001, "WorkSpace {0} is Exist Error", "工作空间 {0} 已存在错误"), CREATE_WORKSPACE_ERROR(11010002, "Create WorkSpace {0} Error", "创建工作空间 {0} 错误"), WORKSPACE_NOT_EXIST_ERROR(11010003, "WorkSpace {0} is Not Exist Error", "工作空间 {0} 不存在错误"), UPDATE_WORKSPACE_ERROR(11010004, "Update WorkSpace {0} Error", "更新工作空间 {0} 错误"), @@ -69,7 +70,6 @@ public enum Status { TASK_NOT_EXIST_ERROR(13010001, "Task {0} Not Exist Error", "任务{0}不存在错误"), TASK_LOG_PATH_NOT_EXIST_ERROR(13010002, "Task {0} Log Path Not Exist Error", "任务 {0} 的日志路径不存在错误"), TASK_EXECUTE_HOST_NOT_EXIST_ERROR(13010003, "Task Execute Host {0} Not Exist Error", "任务 {0} 的执行服务地址不存在错误"), - TASK_EXECUTE_NOT_RUNNING(13010004, "Taskt {0} has not running", "任务 {0} 还没有开始运行,请稍后重试"), JOB_PARAMETER_IS_NULL_ERROR(14010001, "Job {0} Parameter is Null Error", "作业 {0} 参数为空错误"), @@ -83,7 +83,6 @@ public enum Status { METRIC_IS_NOT_EXIST(14010009, "The Metric {0} is not exist", "规则 {0} 不存在"), MULTI_TABLE_ACCURACY_NOT_SUPPORT_LOCAL_ENGINE(14010010, "Local Engine not support multi table accuracy in one datasource", "Local引擎不支持跨表准确性检查"), JOB_PARAMETER_CONTAIN_DUPLICATE_METRIC_ERROR(14010011, "Job {0} Parameter Contain Duplicate Metric", "作业中存在重复的检查规则"), - JOB_SCHEDULE_EXIST_ERROR(14020001, "Job Schedule is Exist error, id must be not null", "作业定时任务已存在,ID 不能为空"), CREATE_JOB_SCHEDULE_ERROR(14020002, "Create Job Schedule {0} Error", "创建作业定时任务 {0} 错误"), JOB_SCHEDULE_NOT_EXIST_ERROR(14020003, "Job Schedule {0} is not Exist error", "作业定时任务 {0} 不存在错误"), @@ -93,6 +92,7 @@ public enum Status { SCHEDULE_TYPE_NOT_VALIDATE_ERROR(14020007, "Schedule type {0} is not Validate Error", "定时器类型参数 {0} 错误"), SCHEDULE_CYCLE_NOT_VALIDATE_ERROR(14020008, "Schedule Param Cycle {0} is not Validate Error", "定时器周期参数 {0} 错误"), SCHEDULE_CRON_IS_INVALID_ERROR(14020009, "Schedule cron {0} is not Validate Error", "定时器 Crontab 表达式 {0} 错误"), + DATASOURCE_NOT_SUPPORT_ERROR_DATA_OUTPUT_TO_SELF_ERROR(14020010, "DataSource type {0} not Support Error Data Output To Self Error", "{0} 类型数据源不支持错误输出写入"), CREATE_TENANT_ERROR(15010001, "Create Tenant {0} Error", "创建 Linux 用户 {0} 错误"), TENANT_NOT_EXIST_ERROR(15010002, "Tenant {0} Not Exist Error", "Linux 用户 {0} 不存在错误"), @@ -109,15 +109,13 @@ public enum Status { ERROR_DATA_STORAGE_EXIST_ERROR(17010003, "Error Data Storage {0} is Exist error", "错误数据存储 {0} 已存在"), UPDATE_ERROR_DATA_STORAGE_ERROR(17010004, "Update Error Data Storage {0} Error", "更新 错误数据存储 {0} 错误"), - DATASOURCE_NOT_SUPPORT_ERROR_DATA_OUTPUT_TO_SELF_ERROR(14020010, "DataSource type {0} not Support Error Data Output To Self Error", "{0} 类型数据源不支持错误输出写入"), - SLA_ALREADY_EXIST_ERROR(18010001, "SLA {0} Already exist", "SLA {0} 已经存在"), SLA_SENDER_ALREADY_EXIST_ERROR(18020001, "SLA Sender {0} Already exist", "SLA 发送器 {0} 已经存在"), SLA_JOB_IS_NOT_EXIST_ERROR(18010003, "SLA job {0} is not exist", "SLA job {0} 存在"), CATALOG_FETCH_DATASOURCE_NULL_ERROR(19010001, "获取元数据时数据源为空", "DataSource must not be null when fetch metadata"), - CATALOG_FETCH_METADATA_PARAMETER_ERROR(19010002, "获取元数据参数错误", "Fetch Metadata Parameter Error"), + CATALOG_TAG_CATEGORY_CREATE_ERROR(20010001, "Create Tag Category {0} Error", "创建标签类别 {0} 错误"), CATALOG_TAG_CATEGORY_NOT_EXIST_ERROR(20010002, "Tag Category {0} Not Exist Error", "标签类别 {0} 不存在"), CATALOG_TAG_CATEGORY_EXIST_ERROR(20010003, "Tag Category {0} is Exist error", "标签类别 {0} 已存在"), @@ -129,21 +127,22 @@ public enum Status { CREATE_CATALOG_TASK_SCHEDULE_ERROR(20030002, "Create Catalog Task Schedule {0} Error", "创建元数据抓取定时任务 {0} 错误"), CATALOG_TASK_SCHEDULE_NOT_EXIST_ERROR(20030003, "Catalog Task Schedule {0} is not Exist error", "元数据抓取定时任务 {0} 不存在"), UPDATE_CATALOG_TASK_SCHEDULE_ERROR(20030004, "Update Catalog Task Schedule {0} Error", "更新元数据抓取定时任务 {0} 错误"), - CATALOG_PROFILE_INSTANCE_FQN_ERROR(20030004, "Catalog instance fqn {0} Error", "数据实体全限定名 {0} 错误"), + CATALOG_INSTANCE_FQN_ERROR(20030005, "Catalog instance fqn {0} Error", "数据实体全限定名 {0} 错误"), + CATALOG_INSTANCE_IS_NULL_ERROR(20040001, "Catalog instance fqn {0} Error", "数据实体 {0} 为空错误"), CREATE_ISSUE_ERROR(21010001, "Create Issue {0} Error", "创建Issue {0} 错误"), ISSUE_NOT_EXIST_ERROR(21010002, "Issue {0} Not Exist Error", "Issue {0} 不存在错误"), ISSUE_EXIST_ERROR(21010003, "Issue {0} is Exist error", "Issue {0} 已存在错误"), UPDATE_ISSUE_ERROR(21010004, "Update Issue {0} Error", "更新Issue {0} 错误"), - CREATE_CONFIG_ERROR(22010001, "Create Config {0} Error", "创建参数 {0} 错误"), CONFIG_NOT_EXIST_ERROR(22010002, "Config {0} Not Exist Error", "参数 {0} 不存在错误"), CONFIG_EXIST_ERROR(22010003, "Config {0} is Exist error", "参数 {0} 已存在错误"), UPDATE_CONFIG_ERROR(22010004, "Update Config {0} Error", "更新参数 {0} 错误"), - CAN_NOT_DELETE_DEFAULT_CONFIG_ERROR(22010005, "Can Not Delete Default Config {0} Error", "不能删除默认参数 {0} 错误"), ; + + @Getter private final int code; private final String enMsg; private final String zhMsg; @@ -154,10 +153,6 @@ public enum Status { this.zhMsg = zhMsg; } - public int getCode() { - return this.code; - } - public String getMsg() { if (Locale.SIMPLIFIED_CHINESE.getLanguage().equals(LocaleContextHolder.getLocale().getLanguage())) { return this.zhMsg; diff --git a/datavines-engine/datavines-engine-config/src/main/java/io/datavines/engine/config/BaseJobConfigurationBuilder.java b/datavines-engine/datavines-engine-config/src/main/java/io/datavines/engine/config/BaseJobConfigurationBuilder.java index ec8ebc06b..cc2aa83a3 100644 --- a/datavines-engine/datavines-engine-config/src/main/java/io/datavines/engine/config/BaseJobConfigurationBuilder.java +++ b/datavines-engine/datavines-engine-config/src/main/java/io/datavines/engine/config/BaseJobConfigurationBuilder.java @@ -254,7 +254,7 @@ protected SinkConfig getValidateResultDataSinkConfig(ExpectedValue expectedValue SinkConfig validateResultDataStorageConfig = new SinkConfig(); validateResultDataStorageConfig.setPlugin(jobExecutionInfo.getValidateResultDataStorageType()); Map configMap = getValidateResultSourceConfigMap( - ParameterUtils.convertParameterPlaceholders(sql, inputParameter),dbTable); + ParameterUtils.convertParameterPlaceholders(sql, inputParameter), dbTable); configMap.put(JOB_EXECUTION_ID, jobExecutionInfo.getId()); configMap.put(INVALIDATE_ITEMS_TABLE, inputParameter.get(INVALIDATE_ITEMS_TABLE)); configMap.put(METRIC_UNIQUE_KEY, inputParameter.get(METRIC_UNIQUE_KEY)); diff --git a/datavines-server/src/main/java/io/datavines/server/api/controller/CatalogLineageController.java b/datavines-server/src/main/java/io/datavines/server/api/controller/CatalogLineageController.java new file mode 100644 index 000000000..b6f6fa70b --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/controller/CatalogLineageController.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.controller; + +import io.datavines.core.aop.RefreshToken; +import io.datavines.core.constant.DataVinesConstants; +import io.datavines.server.api.dto.bo.catalog.lineage.LineageEntityEdgeInfo; +import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceKeyProperties; +import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceList; +import io.datavines.server.repository.entity.catalog.CatalogTagCategory; +import io.datavines.server.repository.service.CatalogEntityRelService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; + +import javax.validation.Valid; + +@Api(value = "catalog", tags = "catalog", produces = MediaType.APPLICATION_JSON_VALUE) +@RestController +@RequestMapping(value = DataVinesConstants.BASE_API_PATH + "/catalog/lineage", produces = MediaType.APPLICATION_JSON_VALUE) +@RefreshToken +public class CatalogLineageController { + + @Autowired + private CatalogEntityRelService catalogEntityRelService; + + @ApiOperation(value = "add lineage", response = Long.class) + @PostMapping(value = "/add", consumes = MediaType.APPLICATION_JSON_VALUE) + public Object addLineage(@Valid @RequestBody LineageEntityEdgeInfo entityEdgeInfo) { + return catalogEntityRelService.addLineage(entityEdgeInfo); + } + + @ApiOperation(value = "get lineage by full qualified name", response = CatalogTagCategory.class, responseContainer = "list") + @GetMapping(value = "/getByFqn/{datasourceId}/{fqn}") + public Object getByFqn(@PathVariable Long datasourceId, @PathVariable String fqn) { + return catalogEntityRelService.getLineageByFqn(datasourceId,fqn,1,1); + } + + @ApiOperation(value = "delete tag category", response = boolean.class) + @GetMapping(value = "/getByUUID/{uuid}") + public Object getByUUID(@PathVariable String uuid) { + return catalogEntityRelService.getLineageByUUID(uuid,1,1); + } + + @ApiOperation(value = "delete lineage", response = boolean.class) + @DeleteMapping(value = "/{fromUUID}/{toUUID}") + public Object deleteLineage(@PathVariable("fromUUID") String fromUUID, + @PathVariable("toUUID") String toUUID) { + return catalogEntityRelService.deleteLineage(fromUUID, toUUID); + } + + @ApiOperation(value = "parse sql to get lineage", response = Long.class) + @PostMapping(value = "/addByParseSql", consumes = MediaType.APPLICATION_JSON_VALUE) + public Object addLineageByParseSql(@Valid @RequestBody SqlWithDataSourceList sqlWithDataSourceList) { + return catalogEntityRelService.addLineageByParseSql(sqlWithDataSourceList); + } + + @ApiOperation(value = "parse sql to get lineage", response = Long.class) + @PostMapping(value = "/addByParseSql2", consumes = MediaType.APPLICATION_JSON_VALUE) + public Object addLineageByParseSql2(@Valid @RequestBody SqlWithDataSourceKeyProperties sqlWithDataSourceKeyProperties) { + return catalogEntityRelService.addLineageByParseSql2(sqlWithDataSourceKeyProperties); + } +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/CatalogEntityInstanceInfo.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/CatalogEntityInstanceInfo.java new file mode 100644 index 000000000..b42e9b32e --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/CatalogEntityInstanceInfo.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class CatalogEntityInstanceInfo implements Serializable { + + private static final long serialVersionUID = -1L; + + private Long id; + + private String uuid; + + private Long datasourceId; + + private String type; + + private String fullyQualifiedName; + + private String displayName; + + private String description; + +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/CatalogEntityColumnLineageDetail.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/CatalogEntityColumnLineageDetail.java new file mode 100644 index 000000000..3fcad0e7d --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/CatalogEntityColumnLineageDetail.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class CatalogEntityColumnLineageDetail implements Serializable { + + private static final long serialVersionUID = -1L; + + private List fromChildren; + + private String function; + + private CatalogEntityInstanceInfo toChild; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/CatalogEntityLineageDetail.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/CatalogEntityLineageDetail.java new file mode 100644 index 000000000..4732fe4b2 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/CatalogEntityLineageDetail.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.enums.LineageSourceType; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class CatalogEntityLineageDetail implements Serializable { + + private static final long serialVersionUID = -1L; + + private List childRelDetailList; + + private String description; + + private LineageSourceType sourceType; + + private String sqlQuery; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/EntityEdgeInfo.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/EntityEdgeInfo.java new file mode 100644 index 000000000..bcfc6e0eb --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/EntityEdgeInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class EntityEdgeInfo implements Serializable { + + private static final long serialVersionUID = -1L; + + private CatalogEntityInstanceInfo fromEntity; + + private String description; + + private CatalogEntityLineageDetail lineageDetail; + + private CatalogEntityInstanceInfo toEntity; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/LineageEntityEdgeInfo.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/LineageEntityEdgeInfo.java new file mode 100644 index 000000000..69700e814 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/LineageEntityEdgeInfo.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class LineageEntityEdgeInfo implements Serializable { + + private static final long serialVersionUID = -1L; + + private String uuid; + + private CatalogEntityInstanceInfo fromEntity; + + private String description; + + private CatalogEntityLineageDetail lineageDetail; + + private CatalogEntityInstanceInfo toEntity; + +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/LineageEntityNodeInfo.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/LineageEntityNodeInfo.java new file mode 100644 index 000000000..bb6eae221 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/LineageEntityNodeInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class LineageEntityNodeInfo implements Serializable { + + private static final long serialVersionUID = -1L; + + private Long id; + + private String uuid; + + private Long datasourceId; + + private String type; + + private String fullyQualifiedName; + + private String displayName; + + private String description; + + private CatalogEntityInstanceInfo datasource; + + private CatalogEntityInstanceInfo database; + + private CatalogEntityInstanceInfo schema; + + private CatalogEntityInstanceInfo catalog; + + private List columns; + + private boolean hasNextNode; + +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/SqlWithDataSourceKeyProperties.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/SqlWithDataSourceKeyProperties.java new file mode 100644 index 000000000..f0cf7ec22 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/SqlWithDataSourceKeyProperties.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.api.dto.bo.datasource.DataSourceKeyProperties; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SqlWithDataSourceKeyProperties implements Serializable { + + private static final long serialVersionUID = -1L; + + private DataSourceKeyProperties dataSourceKeyProperties; + + private String sql; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/SqlWithDataSourceList.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/SqlWithDataSourceList.java new file mode 100644 index 000000000..dbfe43a94 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/catalog/lineage/SqlWithDataSourceList.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.catalog.lineage; + +import io.datavines.server.api.dto.bo.datasource.DataSourceInfo; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class SqlWithDataSourceList implements Serializable { + + private static final long serialVersionUID = -1L; + + private List dataSourceInfos; + + private String sql; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/datasource/DataSourceInfo.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/datasource/DataSourceInfo.java new file mode 100644 index 000000000..ebcd00ec1 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/datasource/DataSourceInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.datasource; + +import lombok.Data; + +@Data +public class DataSourceInfo { + + private Long id; + + private String uuid; + + private String category; + + private String type; + + private String paramCode; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/bo/datasource/DataSourceKeyProperties.java b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/datasource/DataSourceKeyProperties.java new file mode 100644 index 000000000..3b736ef61 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/bo/datasource/DataSourceKeyProperties.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.bo.datasource; + +import lombok.Data; + +import java.util.Map; + +@Data +public class DataSourceKeyProperties { + + private String category; + + private String type; + + private Map param; +} diff --git a/datavines-server/src/main/java/io/datavines/server/api/dto/vo/catalog/lineage/CatalogEntityLineageVO.java b/datavines-server/src/main/java/io/datavines/server/api/dto/vo/catalog/lineage/CatalogEntityLineageVO.java new file mode 100644 index 000000000..f0b9665d9 --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/api/dto/vo/catalog/lineage/CatalogEntityLineageVO.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.api.dto.vo.catalog.lineage; + +import io.datavines.server.api.dto.bo.catalog.lineage.LineageEntityEdgeInfo; +import io.datavines.server.api.dto.bo.catalog.lineage.LineageEntityNodeInfo; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class CatalogEntityLineageVO implements Serializable { + + public LineageEntityNodeInfo currentNode; + + public List nodes; + + public List edges; + } diff --git a/datavines-server/src/main/java/io/datavines/server/enums/LineageSourceType.java b/datavines-server/src/main/java/io/datavines/server/enums/LineageSourceType.java new file mode 100644 index 000000000..ce1bf3fdf --- /dev/null +++ b/datavines-server/src/main/java/io/datavines/server/enums/LineageSourceType.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.server.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +import java.util.HashMap; +import java.util.Map; + +public enum LineageSourceType { + + /** + * 0-data quality task + * 1-catalog task + */ + MANUAL(0, "manual"), + SQL_PARSER(1, "sql_parser"), + SPARK_LISTENER(2, "spark_listener"), + FLINK_SQL_LINEAGE(3, "flink_sql_lineage"), + ; + LineageSourceType(int code, String description){ + this.code = code; + this.description = description; + } + + @EnumValue + private final int code; + + private final String description; + + private static final Map CODE_2_MAP = new HashMap<>(); + + private static final Map DESC_2_MAP = new HashMap<>(); + + static { + for (LineageSourceType commandCategory : LineageSourceType.values()) { + CODE_2_MAP.put(commandCategory.code,commandCategory); + DESC_2_MAP.put(commandCategory.description,commandCategory); + } + } + + public static LineageSourceType of(Integer status) { + if (CODE_2_MAP.containsKey(status)) { + return CODE_2_MAP.get(status); + } + throw new IllegalArgumentException("invalid lineage source type : " + status); + } + + public static LineageSourceType descOf(String description) { + if (DESC_2_MAP.containsKey(description)) { + return DESC_2_MAP.get(description); + } + throw new IllegalArgumentException("invalid lineage source type : " + description); + } + + public int getCode() { + return code; + } + + public String getDescription() { + return description; + } +} diff --git a/datavines-server/src/main/java/io/datavines/server/repository/entity/DataSource.java b/datavines-server/src/main/java/io/datavines/server/repository/entity/DataSource.java index 417d31fb4..a1430312b 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/entity/DataSource.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/entity/DataSource.java @@ -44,6 +44,9 @@ public class DataSource implements Serializable { @TableField(value = "name") private String name; + @TableField(value = "category") + private String category; + @TableField(value = "type") private String type; diff --git a/datavines-server/src/main/java/io/datavines/server/repository/entity/catalog/CatalogEntityRel.java b/datavines-server/src/main/java/io/datavines/server/repository/entity/catalog/CatalogEntityRel.java index cb578c86a..441cc4308 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/entity/catalog/CatalogEntityRel.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/entity/catalog/CatalogEntityRel.java @@ -44,6 +44,12 @@ public class CatalogEntityRel implements Serializable { @TableField(value = "type") private String type; + @TableField(value = "source_type") + private String sourceType; + + @TableField(value = "related_script") + private String relatedScript; + @TableField(value = "update_by") private Long updateBy; diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityInstanceService.java b/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityInstanceService.java index 9ed2e3167..0cdae4ec8 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityInstanceService.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityInstanceService.java @@ -34,6 +34,12 @@ public interface CatalogEntityInstanceService extends IService getChildren(String uuid); + + CatalogEntityInstance getByUUID(String uuid); + CatalogEntityInstance getByDataSourceAndFQN(Long dataSourceId, String fqn); IPage getEntityPage(String upstreamId, Integer pageNumber, Integer pageSize, String whetherMark); diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityRelService.java b/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityRelService.java index 0b3f2d5fb..8d648b7d4 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityRelService.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/CatalogEntityRelService.java @@ -17,7 +17,23 @@ package io.datavines.server.repository.service; import com.baomidou.mybatisplus.extension.service.IService; +import io.datavines.server.api.dto.bo.catalog.lineage.LineageEntityEdgeInfo; +import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceKeyProperties; +import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceList; +import io.datavines.server.api.dto.vo.catalog.lineage.CatalogEntityLineageVO; import io.datavines.server.repository.entity.catalog.CatalogEntityRel; public interface CatalogEntityRelService extends IService { + + public boolean addLineage(LineageEntityEdgeInfo entityEdgeInfo); + + public boolean addLineageByParseSql(SqlWithDataSourceList sqlWithDataSourceList); + + public boolean addLineageByParseSql2(SqlWithDataSourceKeyProperties sqlWithDataSourceKeyProperties); + + public CatalogEntityLineageVO getLineageByFqn(Long datasourceId, String fqn, int upstreamDepth, int downstreamDepth); + + public CatalogEntityLineageVO getLineageByUUID(String uuid, int upstreamDepth, int downstreamDepth); + + public boolean deleteLineage(String fromUUID, String toUUID); } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/DataSourceService.java b/datavines-server/src/main/java/io/datavines/server/repository/service/DataSourceService.java index 3055dc78b..54d67b9c4 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/DataSourceService.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/DataSourceService.java @@ -18,12 +18,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; -import io.datavines.server.api.dto.bo.datasource.ExecuteRequest; +import io.datavines.server.api.dto.bo.datasource.*; import io.datavines.common.exception.DataVinesException; import io.datavines.common.param.ConnectorResponse; import io.datavines.common.param.TestConnectionRequestParam; -import io.datavines.server.api.dto.bo.datasource.DataSourceCreate; -import io.datavines.server.api.dto.bo.datasource.DataSourceUpdate; import io.datavines.server.api.dto.vo.DataSourceVO; import io.datavines.server.repository.entity.DataSource; import io.datavines.core.exception.DataVinesServerException; @@ -57,4 +55,6 @@ public interface DataSourceService extends IService { String getConfigJson(String type); List listByWorkSpaceIdAndType(long workspaceId,String type); + + List listByInfo(DataSourceKeyProperties dataSourceKeyProperties); } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java index 458340f3e..742894aff 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityInstanceServiceImpl.java @@ -16,6 +16,7 @@ */ package io.datavines.server.repository.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -94,6 +95,44 @@ public CatalogEntityInstance getByTypeAndFQN(String type, String fqn) { return baseMapper.selectOne(new QueryWrapper().lambda().eq(CatalogEntityInstance::getType, type).eq(CatalogEntityInstance::getFullyQualifiedName, fqn)); } + @Override + public CatalogEntityInstance getByUUID(String uuid) { + return baseMapper.selectOne(new QueryWrapper().lambda().eq(CatalogEntityInstance::getUuid, uuid)); + } + + @Override + public CatalogEntityInstance getParent(String uuid) { + CatalogEntityRel parentEntityRel = entityRelService.getOne(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity2Uuid, uuid) + .eq(CatalogEntityRel::getType, EntityRelType.CHILD.getDescription()), false); + + CatalogEntityInstance parentEntity = null; + if (parentEntityRel == null) { + return parentEntity; + } + + parentEntity = getByUUID(parentEntityRel.getEntity1Uuid()); + return parentEntity; + } + + @Override + public List getChildren(String uuid) { + List childEntityRelList = entityRelService.list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity1Uuid, uuid) + .eq(CatalogEntityRel::getType, EntityRelType.CHILD.getDescription())); + List childEntityList = new ArrayList<>(); + if (CollectionUtils.isEmpty(childEntityRelList)) { + return childEntityList; + } + + for (CatalogEntityRel childEntityRel : childEntityRelList) { + CatalogEntityInstance childEntity = getByUUID(childEntityRel.getEntity2Uuid()); + childEntityList.add(childEntity); + } + + return childEntityList; + } + @Override public CatalogEntityInstance getByDataSourceAndFQN(Long dataSourceId, String fqn) { return baseMapper.selectOne(new QueryWrapper().lambda() @@ -418,7 +457,7 @@ public CatalogTableProfileVO getTableEntityProfile(String uuid) { return tableProfileVO; } String latestDate = tableRecords.getDatetime(); - Double records = Double.valueOf((String)tableRecords.getValue()); + double records = Double.parseDouble((String) tableRecords.getValue()); List columnList = getCatalogEntityInstances(uuid); if (CollectionUtils.isEmpty(columnList)) { return tableProfileVO; @@ -459,19 +498,19 @@ public CatalogTableProfileVO getTableEntityProfile(String uuid) { switch (metricName) { case "column_null": columnBaseProfileVO.setNullCount(entityProfile.getActualValue()); - columnBaseProfileVO.setNullPercentage(String.format("%.2f",(Double.valueOf(entityProfile.getActualValue()) / records * 100)) +"%"); + columnBaseProfileVO.setNullPercentage(String.format("%.2f",(Double.parseDouble(entityProfile.getActualValue()) / records * 100)) +"%"); break; case "column_not_null": columnBaseProfileVO.setNotNullCount(entityProfile.getActualValue()); - columnBaseProfileVO.setNotNullPercentage(String.format("%.2f",(Double.valueOf(entityProfile.getActualValue()) / records * 100)) +"%"); + columnBaseProfileVO.setNotNullPercentage(String.format("%.2f",(Double.parseDouble(entityProfile.getActualValue()) / records * 100)) +"%"); break; case "column_unique": columnBaseProfileVO.setUniqueCount(entityProfile.getActualValue()); - columnBaseProfileVO.setUniquePercentage(String.format("%.2f",(Double.valueOf(entityProfile.getActualValue()) / records * 100)) +"%"); + columnBaseProfileVO.setUniquePercentage(String.format("%.2f",(Double.parseDouble(entityProfile.getActualValue()) / records * 100)) +"%"); break; case "column_distinct": columnBaseProfileVO.setDistinctCount(entityProfile.getActualValue()); - columnBaseProfileVO.setDistinctPercentage(String.format("%.2f",(Double.valueOf(entityProfile.getActualValue()) / records * 100)) +"%"); + columnBaseProfileVO.setDistinctPercentage(String.format("%.2f",(Double.parseDouble(entityProfile.getActualValue()) / records * 100)) +"%"); break; default: break; @@ -793,9 +832,8 @@ public IPage getEntityMetricList(String uuid, Integer pag @Override public IPage getEntityIssueList(String uuid, Integer pageNumber, Integer pageSize) { Page page = new Page<>(pageNumber, pageSize); - IPage entityMetricPage = catalogEntityMetricJobRelService.getEntityIssuePage(page, uuid); - return entityMetricPage; + return catalogEntityMetricJobRelService.getEntityIssuePage(page, uuid); } @Override @@ -865,10 +903,10 @@ public long executeDataProfileJob(RunProfileRequest runProfileRequest, int runni baseJobParameter.setExpectedType("fix_value"); jobParameters.add(baseJobParameter); } else { - throw new DataVinesServerException(Status.CATALOG_PROFILE_INSTANCE_FQN_ERROR, fqn); + throw new DataVinesServerException(Status.CATALOG_INSTANCE_FQN_ERROR, fqn); } } else { - throw new DataVinesServerException(Status.CATALOG_PROFILE_INSTANCE_FQN_ERROR, fqn); + throw new DataVinesServerException(Status.CATALOG_INSTANCE_FQN_ERROR, fqn); } List columns = new ArrayList<>(); @@ -938,7 +976,7 @@ public long executeDataProfileJob(RunProfileRequest runProfileRequest, int runni .eq(CatalogEntityMetricJobRel::getEntityUuid, uuid) .eq(CatalogEntityMetricJobRel::getMetricJobId, jobId) .eq(CatalogEntityMetricJobRel::getMetricJobType, DATA_PROFILE.getDescription())); - if (listRel.size() >= 1) { + if (!listRel.isEmpty()) { catalogEntityMetricJobRelService.remove(new QueryWrapper().lambda() .eq(CatalogEntityMetricJobRel::getEntityUuid, uuid) .eq(CatalogEntityMetricJobRel::getMetricJobId, jobId) diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityRelServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityRelServiceImpl.java index 381c558e4..4e9d1b7bb 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityRelServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CatalogEntityRelServiceImpl.java @@ -16,13 +16,533 @@ */ package io.datavines.server.repository.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import io.datavines.common.datasource.jdbc.JdbcConnectionInfo; +import io.datavines.common.enums.EntityRelType; +import io.datavines.common.utils.JSONUtils; +import io.datavines.common.utils.StringUtils; +import io.datavines.connector.api.ConnectorFactory; +import io.datavines.connector.api.LineageParser; +import io.datavines.connector.api.entity.ScriptMetadata; +import io.datavines.connector.api.entity.StatementMetadata; +import io.datavines.connector.api.entity.StatementMetadataFragment; +import io.datavines.core.enums.Status; +import io.datavines.core.exception.DataVinesServerException; +import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo; +import io.datavines.server.api.dto.bo.catalog.lineage.*; +import io.datavines.server.api.dto.bo.datasource.DataSourceInfo; +import io.datavines.server.api.dto.vo.catalog.lineage.CatalogEntityLineageVO; +import io.datavines.server.enums.LineageSourceType; +import io.datavines.server.repository.entity.DataSource; +import io.datavines.server.repository.entity.catalog.CatalogEntityInstance; import io.datavines.server.repository.entity.catalog.CatalogEntityRel; import io.datavines.server.repository.mapper.CatalogEntityRelMapper; +import io.datavines.server.repository.service.CatalogEntityInstanceService; import io.datavines.server.repository.service.CatalogEntityRelService; +import io.datavines.server.repository.service.DataSourceService; +import io.datavines.server.utils.ContextHolder; +import io.datavines.spi.PluginLoader; +import org.apache.commons.collections4.CollectionUtils; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; @Service("catalogEntityRelService") public class CatalogEntityRelServiceImpl extends ServiceImpl implements CatalogEntityRelService { + @Autowired + private CatalogEntityInstanceService catalogEntityInstanceService; + + @Autowired + private DataSourceService dataSourceService; + + @Transactional(rollbackFor = Exception.class) + @Override + public boolean addLineage(LineageEntityEdgeInfo entityEdgeInfo) { + String fromUUID = entityEdgeInfo.getFromEntity().getUuid(); + String toUUID = entityEdgeInfo.getToEntity().getUuid(); + CatalogEntityRel rel = getOne(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity1Uuid, fromUUID) + .eq(CatalogEntityRel::getEntity2Uuid, toUUID) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription()), false); + if (rel == null) { + rel = new CatalogEntityRel(); + } + + rel.setEntity1Uuid(fromUUID); + rel.setEntity2Uuid(toUUID); + rel.setType(EntityRelType.DOWNSTREAM.getDescription()); + rel.setSourceType(entityEdgeInfo.getLineageDetail().getSourceType().getDescription()); + rel.setRelatedScript(entityEdgeInfo.getLineageDetail().getSqlQuery()); + rel.setUpdateBy(ContextHolder.getUserId()); + rel.setUpdateTime(LocalDateTime.now()); + + boolean result; + if (rel.getId() == null) { + result = save(rel); + } else { + result = updateById(rel); + } + + if (result) { + CatalogEntityLineageDetail catalogEntityLineageDetail = entityEdgeInfo.getLineageDetail(); + List detailList = catalogEntityLineageDetail.getChildRelDetailList(); + if (CollectionUtils.isEmpty(detailList)) { + return true; + } + + for (CatalogEntityColumnLineageDetail detail : detailList) { + List fromEntityList = detail.getFromChildren(); + if (CollectionUtils.isEmpty(fromEntityList)) { + continue; + } + + CatalogEntityInstanceInfo toEntity = detail.getToChild(); + if (toEntity == null) { + continue; + } + + for (CatalogEntityInstanceInfo fromEntity : fromEntityList) { + CatalogEntityRel columnRel = getOne(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity1Uuid, fromEntity.getUuid()) + .eq(CatalogEntityRel::getEntity2Uuid, toEntity.getUuid()) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription()), false); + if (columnRel == null) { + columnRel = new CatalogEntityRel(); + } + + columnRel.setEntity1Uuid(fromEntity.getUuid()); + columnRel.setEntity2Uuid(toEntity.getUuid()); + columnRel.setType(EntityRelType.DOWNSTREAM.getDescription()); + columnRel.setSourceType(catalogEntityLineageDetail.getSourceType().getDescription()); + columnRel.setRelatedScript(catalogEntityLineageDetail.getSqlQuery()); + columnRel.setUpdateBy(ContextHolder.getUserId()); + columnRel.setUpdateTime(LocalDateTime.now()); + + if (columnRel.getId() == null) { + save(columnRel); + } else { + updateById(columnRel); + } + } + } + + return true; + } + + return false; + } + + @Override + public boolean addLineageByParseSql(SqlWithDataSourceList sqlWithDataSourceList) { + if (sqlWithDataSourceList == null) { + throw new DataVinesServerException("Request body cannot be null"); + } + + List dataSourceInfos = sqlWithDataSourceList.getDataSourceInfos(); + if (CollectionUtils.isEmpty(dataSourceInfos)) { + throw new DataVinesServerException("DataSource list cannot be empty"); + } + + String sql = sqlWithDataSourceList.getSql(); + if (StringUtils.isEmpty(sql)) { + throw new DataVinesServerException("SQL cannot be empty"); + } + + boolean added = false; + StringBuilder errors = new StringBuilder(); + + for (DataSourceInfo dataSourceInfo: dataSourceInfos) { + ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(dataSourceInfo.getType()); + if (connectorFactory == null) { + errors.append("Unsupported datasource type: ").append(dataSourceInfo.getType()).append("; "); + continue; + } + + DataSource dataSource = dataSourceService.getDataSourceById(dataSourceInfo.getId()); + if (dataSource == null) { + errors.append("DataSource not found, id: ").append(dataSourceInfo.getId()).append("; "); + continue; + } + + JdbcConnectionInfo jdbcConnectionInfo = JSONUtils.parseObject(dataSource.getParam(), JdbcConnectionInfo.class); + + ScriptMetadata scriptMetadata; + try { + scriptMetadata = LineageParser.parseScript(sql, connectorFactory.getStatementSplitter(), connectorFactory.getStatementParser()); + } catch (Exception e) { + throw new DataVinesServerException("SQL parse error: " + e.getMessage(), e); + } + + if (scriptMetadata == null) { + throw new DataVinesServerException("Failed to parse SQL, please check the SQL syntax"); + } + + List statementMetadataList = scriptMetadata.getStatementMetadataList(); + if (CollectionUtils.isEmpty(statementMetadataList)) { + throw new DataVinesServerException("No valid SQL statements found after parsing"); + } + + for (StatementMetadata statementMetadata: statementMetadataList) { + StatementMetadataFragment statementMetadataFragment = statementMetadata.getStatementMetadataFragment(); + if (statementMetadataFragment == null) { + errors.append("Cannot extract table lineage from SQL: ").append(statementMetadata.getStatementText()).append("; "); + continue; + } + + List inputTables = statementMetadataFragment.getInputTables(); + List outputTables = statementMetadataFragment.getOutputTables(); + if (CollectionUtils.isEmpty(inputTables) || CollectionUtils.isEmpty(outputTables)) { + errors.append("SQL must contain both source (SELECT) and target (INSERT/CREATE) tables; "); + continue; + } + + for (String inputTable : inputTables) { + for (String outputTable : outputTables) { + if (StringUtils.isEmpty(inputTable) || StringUtils.isEmpty(outputTable)) { + continue; + } + + String resolvedInput = inputTable; + String resolvedOutput = outputTable; + + if (jdbcConnectionInfo != null && !resolvedInput.contains(".")) { + resolvedInput = jdbcConnectionInfo.getDatabase() + "." + resolvedInput; + } + + if (jdbcConnectionInfo != null && !resolvedOutput.contains(".")) { + resolvedOutput = jdbcConnectionInfo.getDatabase() + "." + resolvedOutput; + } + + CatalogEntityInstance fromEntity = catalogEntityInstanceService.getByDataSourceAndFQN(dataSourceInfo.getId(), resolvedInput); + CatalogEntityInstance toEntity = catalogEntityInstanceService.getByDataSourceAndFQN(dataSourceInfo.getId(), resolvedOutput); + if (fromEntity == null || toEntity == null) { + if (fromEntity == null) { + errors.append("Table not found in catalog: ").append(resolvedInput).append("; "); + } + if (toEntity == null) { + errors.append("Table not found in catalog: ").append(resolvedOutput).append("; "); + } + continue; + } + + if (addLineage(fromEntity.getUuid(), toEntity.getUuid(), LineageSourceType.SQL_PARSER, statementMetadata.getStatementText())) { + added = true; + } + } + } + } + } + + if (!added && errors.length() > 0) { + throw new DataVinesServerException(errors.toString()); + } + + return added; + } + + @Override + public boolean addLineageByParseSql2(SqlWithDataSourceKeyProperties sqlWithDataSourceKeyProperties) { + List dataSourceInfos = dataSourceService.listByInfo(sqlWithDataSourceKeyProperties.getDataSourceKeyProperties()); + SqlWithDataSourceList sqlWithDataSourceList = new SqlWithDataSourceList(); + sqlWithDataSourceList.setSql(sqlWithDataSourceKeyProperties.getSql()); + sqlWithDataSourceList.setDataSourceInfos(dataSourceInfos); + return addLineageByParseSql(sqlWithDataSourceList); + } + + @Override + public CatalogEntityLineageVO getLineageByFqn(Long datasourceId, String fqn, int upstreamDepth, int downstreamDepth) { + CatalogEntityInstance catalogEntityInstance = catalogEntityInstanceService.getByDataSourceAndFQN(datasourceId, fqn); + if (catalogEntityInstance == null) { + throw new DataVinesServerException(Status.CATALOG_INSTANCE_IS_NULL_ERROR,fqn); + } + + return getCatalogEntityLineageVO(catalogEntityInstance); + } + + @NotNull + private CatalogEntityLineageVO getCatalogEntityLineageVO(CatalogEntityInstance catalogEntityInstance) { + Set nodeSet = new HashSet<>(); + Set edgeSet = new HashSet<>(); + + CatalogEntityLineageVO catalogEntityLineageVO = new CatalogEntityLineageVO(); + CatalogEntityInstanceInfo currentEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(catalogEntityInstance, currentEntityInstanceInfo); + String fromUuid = catalogEntityInstance.getUuid(); + + LineageEntityNodeInfo currentNode = new LineageEntityNodeInfo(); + BeanUtils.copyProperties(catalogEntityInstance, currentNode); + CatalogEntityInstance databaseEntity = catalogEntityInstanceService.getParent(fromUuid); + CatalogEntityInstanceInfo databaseEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(databaseEntity, databaseEntityInstanceInfo); + currentNode.setDatabase(databaseEntityInstanceInfo); + + DataSource dataSource = dataSourceService.getById(catalogEntityInstance.getDatasourceId()); + CatalogEntityInstanceInfo datasourceEntityInstanceInfo = new CatalogEntityInstanceInfo(); + datasourceEntityInstanceInfo.setType(dataSource.getType()); + datasourceEntityInstanceInfo.setDisplayName(dataSource.getName()); + datasourceEntityInstanceInfo.setUuid(dataSource.getUuid()); + datasourceEntityInstanceInfo.setId(dataSource.getId()); + List currentChildEntityInstances = catalogEntityInstanceService.getChildren(catalogEntityInstance.getUuid()); + if (CollectionUtils.isNotEmpty(currentChildEntityInstances)) { + List columns = new ArrayList<>(); + for (CatalogEntityInstance childEntityInstance : currentChildEntityInstances) { + CatalogEntityInstanceInfo columnEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(childEntityInstance, columnEntityInstanceInfo); + columns.add(columnEntityInstanceInfo); + } + currentNode.setColumns(columns); + } + currentNode.setDatasource(datasourceEntityInstanceInfo); + + catalogEntityLineageVO.setCurrentNode(currentNode); + + List nodes = new ArrayList<>(); + nodes.add(currentNode); + + List edges = new ArrayList<>(); + + List downstreamRelList = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity1Uuid, fromUuid) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + if (CollectionUtils.isNotEmpty(downstreamRelList)) { + for (CatalogEntityRel rel : downstreamRelList) { + CatalogEntityInstance downstreamEntityInstance = catalogEntityInstanceService.getByUUID(rel.getEntity2Uuid()); + if (downstreamEntityInstance != null && !nodeSet.contains(downstreamEntityInstance.getUuid())) { + LineageEntityNodeInfo downstreamNode = new LineageEntityNodeInfo(); + BeanUtils.copyProperties(downstreamEntityInstance, downstreamNode); + + CatalogEntityInstance toDatabaseEntity = catalogEntityInstanceService.getParent(fromUuid); + CatalogEntityInstanceInfo toDatabaseEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(toDatabaseEntity, toDatabaseEntityInstanceInfo); + downstreamNode.setDatabase(toDatabaseEntityInstanceInfo); + + DataSource toDataSource = dataSourceService.getById(catalogEntityInstance.getDatasourceId()); + CatalogEntityInstanceInfo toDatasourceEntityInstanceInfo = new CatalogEntityInstanceInfo(); + toDatasourceEntityInstanceInfo.setType(toDataSource.getType()); + toDatasourceEntityInstanceInfo.setDisplayName(toDataSource.getName()); + toDatasourceEntityInstanceInfo.setUuid(toDataSource.getUuid()); + toDatasourceEntityInstanceInfo.setId(toDataSource.getId()); + downstreamNode.setDatasource(toDatasourceEntityInstanceInfo); + + List childEntityInstances = catalogEntityInstanceService.getChildren(downstreamEntityInstance.getUuid()); + if (CollectionUtils.isNotEmpty(childEntityInstances)) { + List columns = new ArrayList<>(); + for (CatalogEntityInstance childEntityInstance : childEntityInstances) { + CatalogEntityInstanceInfo columnEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(childEntityInstance, columnEntityInstanceInfo); + columns.add(columnEntityInstanceInfo); + } + downstreamNode.setColumns(columns); + } + + List downstreamRelList2 = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity1Uuid, downstreamNode.getUuid()) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + if (CollectionUtils.isNotEmpty(downstreamRelList2)) { + downstreamNode.setHasNextNode(true); + } + + nodes.add(downstreamNode); + nodeSet.add(downstreamEntityInstance.getUuid()); + + if (!edgeSet.contains(currentEntityInstanceInfo.getUuid() + ":" + downstreamEntityInstance.getUuid())) { + LineageEntityEdgeInfo edgeInfo = new LineageEntityEdgeInfo(); + CatalogEntityInstanceInfo fromEntity = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(currentEntityInstanceInfo, fromEntity); + CatalogEntityInstanceInfo toEntity = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(downstreamEntityInstance, toEntity); + edgeInfo.setFromEntity(fromEntity); + edgeInfo.setToEntity(toEntity); + edgeInfo.setUuid(fromEntity.getUuid() + ":" + toEntity.getUuid()); + + CatalogEntityLineageDetail lineageDetail = new CatalogEntityLineageDetail(); + List childRelDetailList = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(currentNode.getColumns()) + && CollectionUtils.isNotEmpty(downstreamNode.getColumns())) { + Map key2ColumnMap = new HashMap<>(); + for (CatalogEntityInstanceInfo column : currentNode.getColumns()) { + key2ColumnMap.put(column.getUuid(), column); + } + + List columns = downstreamNode.getColumns(); + for (CatalogEntityInstanceInfo column : columns) { + List columnDownstreamRelList = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity2Uuid, column.getUuid()) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + if (CollectionUtils.isNotEmpty(columnDownstreamRelList)) { + List fromChildren = columnDownstreamRelList + .stream() + .filter(x-> key2ColumnMap.containsKey(x.getEntity1Uuid())) + .map(item -> key2ColumnMap.get(item.getEntity1Uuid())).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(fromChildren)) { + CatalogEntityColumnLineageDetail + columnLineageDetail = new CatalogEntityColumnLineageDetail(); + columnLineageDetail.setFromChildren(fromChildren); + columnLineageDetail.setToChild(column); + childRelDetailList.add(columnLineageDetail); + } + } + } + } + + lineageDetail.setChildRelDetailList(childRelDetailList); + lineageDetail.setSourceType(LineageSourceType.descOf(rel.getSourceType())); + lineageDetail.setSqlQuery(rel.getRelatedScript()); + edgeInfo.setLineageDetail(lineageDetail); + edges.add(edgeInfo); + edgeSet.add(fromEntity.getUuid() + ":" + toEntity.getUuid()); + } + } + } + } + + List upstreamRelList = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity2Uuid, fromUuid) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + if (CollectionUtils.isNotEmpty(upstreamRelList)) { + for (CatalogEntityRel rel : upstreamRelList) { + CatalogEntityInstance upstreamEntityInstance = catalogEntityInstanceService.getByUUID(rel.getEntity1Uuid()); + if (upstreamEntityInstance != null && !nodeSet.contains(upstreamEntityInstance.getUuid())) { + LineageEntityNodeInfo upstreamNode = new LineageEntityNodeInfo(); + BeanUtils.copyProperties(upstreamEntityInstance, upstreamNode); + + CatalogEntityInstance fromDatabaseEntity = catalogEntityInstanceService.getParent(fromUuid); + CatalogEntityInstanceInfo fromDatabaseEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(fromDatabaseEntity, fromDatabaseEntityInstanceInfo); + upstreamNode.setDatabase(fromDatabaseEntityInstanceInfo); + + DataSource fromDataSource = dataSourceService.getById(catalogEntityInstance.getDatasourceId()); + CatalogEntityInstanceInfo fromDatasourceEntityInstanceInfo = new CatalogEntityInstanceInfo(); + fromDatasourceEntityInstanceInfo.setType(fromDataSource.getType()); + fromDatasourceEntityInstanceInfo.setDisplayName(fromDataSource.getName()); + fromDatasourceEntityInstanceInfo.setUuid(fromDataSource.getUuid()); + fromDatasourceEntityInstanceInfo.setId(fromDataSource.getId()); + upstreamNode.setDatasource(fromDatasourceEntityInstanceInfo); + + List childEntityInstances = catalogEntityInstanceService.getChildren(upstreamEntityInstance.getUuid()); + if (CollectionUtils.isNotEmpty(childEntityInstances)) { + List columns = new ArrayList<>(); + for (CatalogEntityInstance childEntityInstance : childEntityInstances) { + CatalogEntityInstanceInfo columnEntityInstanceInfo = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(childEntityInstance, columnEntityInstanceInfo); + columns.add(columnEntityInstanceInfo); + } + upstreamNode.setColumns(columns); + } + + List upstreamRelList2 = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity2Uuid, upstreamNode.getUuid()) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + if (CollectionUtils.isNotEmpty(upstreamRelList2)) { + upstreamNode.setHasNextNode(true); + } + nodes.add(upstreamNode); + nodeSet.add(upstreamEntityInstance.getUuid()); + + if (!edgeSet.contains(upstreamEntityInstance.getUuid() + ":" + currentEntityInstanceInfo.getUuid())) { + LineageEntityEdgeInfo edgeInfo = new LineageEntityEdgeInfo(); + CatalogEntityInstanceInfo fromEntity = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(upstreamEntityInstance, fromEntity); + CatalogEntityInstanceInfo toEntity = new CatalogEntityInstanceInfo(); + BeanUtils.copyProperties(currentEntityInstanceInfo, toEntity); + edgeInfo.setFromEntity(fromEntity); + edgeInfo.setToEntity(toEntity); + edgeInfo.setUuid(fromEntity.getUuid() + ":" + toEntity.getUuid()); + + CatalogEntityLineageDetail lineageDetail = new CatalogEntityLineageDetail(); + List childRelDetailList = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(currentNode.getColumns()) + && CollectionUtils.isNotEmpty(upstreamNode.getColumns())) { + Map key2ColumnMap = new HashMap<>(); + for (CatalogEntityInstanceInfo column : upstreamNode.getColumns()) { + key2ColumnMap.put(column.getUuid(), column); + } + + List columns = currentNode.getColumns(); + for (CatalogEntityInstanceInfo column : columns) { + List columnDownstreamRelList = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity2Uuid, column.getUuid()) + .eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + if (CollectionUtils.isNotEmpty(columnDownstreamRelList)) { + List fromChildren = columnDownstreamRelList + .stream() + .filter(x-> key2ColumnMap.containsKey(x.getEntity1Uuid())) + .map(item -> key2ColumnMap.get(item.getEntity1Uuid())).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(fromChildren)) { + CatalogEntityColumnLineageDetail + columnLineageDetail = new CatalogEntityColumnLineageDetail(); + columnLineageDetail.setFromChildren(fromChildren); + columnLineageDetail.setToChild(column); + childRelDetailList.add(columnLineageDetail); + } + } + } + } + + lineageDetail.setChildRelDetailList(childRelDetailList); + lineageDetail.setSourceType(LineageSourceType.descOf(rel.getSourceType())); + lineageDetail.setSqlQuery(rel.getRelatedScript()); + edgeInfo.setLineageDetail(lineageDetail); + edges.add(edgeInfo); + edgeSet.add(fromEntity.getUuid() + ":" + toEntity.getUuid()); + } + } + } + } + + catalogEntityLineageVO.setNodes(nodes); + catalogEntityLineageVO.setEdges(edges); + + return catalogEntityLineageVO; + } + + @Override + public CatalogEntityLineageVO getLineageByUUID(String uuid, int upstreamDepth, int downstreamDepth) { + CatalogEntityInstance catalogEntityInstance = catalogEntityInstanceService.getByUUID(uuid); + if (catalogEntityInstance == null) { + throw new DataVinesServerException(Status.CATALOG_INSTANCE_IS_NULL_ERROR, uuid); + } + + return getCatalogEntityLineageVO(catalogEntityInstance); + } + + @Override + public boolean deleteLineage(String fromUUID, String toUUID) { + return remove(new LambdaQueryWrapper().eq(CatalogEntityRel::getEntity1Uuid, fromUUID).eq(CatalogEntityRel::getEntity2Uuid, toUUID)); + } + + private boolean addLineage(String fromUUID, String toUUID, LineageSourceType sourceType, String sql) { + List relList = list(new LambdaQueryWrapper() + .eq(CatalogEntityRel::getEntity1Uuid, fromUUID) + .eq(CatalogEntityRel::getEntity2Uuid, toUUID).eq(CatalogEntityRel::getType, EntityRelType.DOWNSTREAM.getDescription())); + CatalogEntityRel rel; + if (CollectionUtils.isEmpty(relList)) { + rel = new CatalogEntityRel(); + } else { + rel = relList.get(0); + } + rel.setEntity1Uuid(fromUUID); + rel.setEntity2Uuid(toUUID); + rel.setType(EntityRelType.DOWNSTREAM.getDescription()); + rel.setSourceType(sourceType.getDescription()); + rel.setRelatedScript(sql); + rel.setUpdateBy(ContextHolder.getUserId()); + rel.setUpdateTime(LocalDateTime.now()); + if (CollectionUtils.isEmpty(relList)) { + return save(rel); + } else { + return updateById(rel); + } + } + } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/DataSourceServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/DataSourceServiceImpl.java index 18ad2794b..c66809d32 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/DataSourceServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/DataSourceServiceImpl.java @@ -16,6 +16,7 @@ */ package io.datavines.server.repository.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -24,13 +25,11 @@ import io.datavines.common.utils.*; import io.datavines.core.utils.LanguageUtils; import io.datavines.server.api.dto.bo.catalog.CatalogRefresh; -import io.datavines.server.api.dto.bo.datasource.ExecuteRequest; +import io.datavines.server.api.dto.bo.datasource.*; import io.datavines.common.exception.DataVinesException; import io.datavines.common.param.*; import io.datavines.connector.api.ConnectorFactory; import io.datavines.core.enums.Status; -import io.datavines.server.api.dto.bo.datasource.DataSourceCreate; -import io.datavines.server.api.dto.bo.datasource.DataSourceUpdate; import io.datavines.server.api.dto.bo.job.schedule.MapParam; import io.datavines.server.api.dto.bo.task.CommonTaskScheduleCreateOrUpdate; import io.datavines.server.api.dto.vo.DataSourceVO; @@ -41,6 +40,7 @@ import io.datavines.core.exception.DataVinesServerException; import io.datavines.server.utils.ContextHolder; import io.datavines.spi.PluginLoader; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -96,6 +96,7 @@ public long insert(DataSourceCreate dataSourceCreate) { ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(type); List keyProperties = connectorFactory.getConnector().keyProperties(); List keyPropertyValueList = new ArrayList<>(); + keyPropertyValueList.add(dataSourceCreate.getType().toLowerCase()); if (CollectionUtils.isNotEmpty(keyProperties)) { keyProperties.forEach(property -> { if (StringUtils.isNotEmpty(paramMap.get(property))) { @@ -168,6 +169,7 @@ public int update(DataSourceUpdate dataSourceUpdate) throws DataVinesException { ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(type); List keyProperties = connectorFactory.getConnector().keyProperties(); List keyPropertyValueList = new ArrayList<>(); + keyPropertyValueList.add(dataSourceUpdate.getType().toLowerCase()); if (CollectionUtils.isNotEmpty(keyProperties)) { keyProperties.forEach(property -> { if (StringUtils.isNotEmpty(paramMap.get(property))) { @@ -264,6 +266,46 @@ public List listByWorkSpaceIdAndType(long workspaceId, String type) return baseMapper.selectList(new QueryWrapper().lambda().eq(DataSource::getWorkspaceId, workspaceId).eq(DataSource::getType, type)); } + @Override + public List listByInfo(DataSourceKeyProperties dataSourceKeyProperties) { + Map paramMap = dataSourceKeyProperties.getParam(); + String type = dataSourceKeyProperties.getType(); + String paramCode = ""; + ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(type); + List keyProperties = connectorFactory.getConnector().keyProperties(); + List keyPropertyValueList = new ArrayList<>(); + keyPropertyValueList.add(type.toLowerCase()); + if (CollectionUtils.isNotEmpty(keyProperties)) { + keyProperties.forEach(property -> { + if (StringUtils.isNotEmpty(paramMap.get(property))) { + keyPropertyValueList.add(paramMap.get(property).toLowerCase()); + } + }); + } + + if (CollectionUtils.isNotEmpty(keyPropertyValueList)) { + paramCode = Md5Utils.getMd5(String.join("@#@", keyPropertyValueList),true); + } + + if (StringUtils.isEmpty(paramCode)) { + return new ArrayList<>(); + } + + List dataSourceList = list(new LambdaQueryWrapper().eq(DataSource::getParamCode, paramCode)); + if (CollectionUtils.isEmpty(dataSourceList)) { + return new ArrayList<>(); + } + + List dataSources = new ArrayList<>(); + dataSourceList.forEach(dataSource -> { + DataSourceInfo dataSourceInfo = new DataSourceInfo(); + BeanUtils.copyProperties(dataSource, dataSourceInfo); + dataSources.add(dataSourceInfo); + }); + + return dataSources; + } + @Override public Object getDatabaseList(Long id) throws DataVinesServerException { diff --git a/datavines-ui/Editor/components/Database/index.tsx b/datavines-ui/Editor/components/Database/index.tsx index 14f6e53e0..8323a40d6 100644 --- a/datavines-ui/Editor/components/Database/index.tsx +++ b/datavines-ui/Editor/components/Database/index.tsx @@ -27,6 +27,7 @@ import Schedule from '@/view/Main/HomeDetail/Jobs/components/Schedule'; import SearchForm from './SearchForm'; import store from '@/store'; import { useLogger } from '@/view/Main/HomeDetail/Jobs/useLogger'; +import Lineage from '../Lineage'; type DIndexProps = { onShowModal?: (...args: any[]) => any; @@ -165,6 +166,8 @@ const Index = ({ onShowModal, afterClose }:DIndexProps) => { setTotal(0); setRowKey('id'); break; + case 'Lineage': + break; default: } setActivedTabKey(key); @@ -822,6 +825,10 @@ const Index = ({ onShowModal, afterClose }:DIndexProps) => { overflow: 'auto', }} > + {tableItems[+activeTableKey]?.name === 'Lineage' ? ( + + ) : ( + <> { tableItems[+activeTableKey]?.name === 'Profile' ? ( <> @@ -897,6 +904,8 @@ const Index = ({ onShowModal, afterClose }:DIndexProps) => { /> ) : '' } + + )} diff --git a/datavines-ui/Editor/components/Database/option.tsx b/datavines-ui/Editor/components/Database/option.tsx index 4094e4bd3..2cba5d32a 100644 --- a/datavines-ui/Editor/components/Database/option.tsx +++ b/datavines-ui/Editor/components/Database/option.tsx @@ -95,6 +95,9 @@ export const tableTabs:Tab[] = [ { label: , name: 'Issues', children: null, key: '4', }, + { + label: , name: 'Lineage', children: null, key: '5', + }, ]; export const tableCol:Col[][] = [[{ title: , diff --git a/datavines-ui/Editor/components/Lineage/AddLineageModal.tsx b/datavines-ui/Editor/components/Lineage/AddLineageModal.tsx new file mode 100644 index 000000000..7e835fa77 --- /dev/null +++ b/datavines-ui/Editor/components/Lineage/AddLineageModal.tsx @@ -0,0 +1,268 @@ +import React, { useState, useEffect } from 'react'; +import { + Modal, Form, Select, Divider, Button, Space, Typography, message, +} from 'antd'; +import { PlusOutlined, MinusCircleOutlined } from '@ant-design/icons'; +import { useIntl } from 'react-intl'; +import useRequest from '../../hooks/useRequest'; +import { CatalogEntityInstanceInfo } from './useLineageData'; + +const { Text } = Typography; + +interface AddLineageModalProps { + visible: boolean; + onClose: () => void; + onSuccess: () => void; + datasourceList: any[]; +} + +const AddLineageModal: React.FC = ({ + visible, onClose, onSuccess, datasourceList, +}) => { + const intl = useIntl(); + const { $http } = useRequest(); + const [form] = Form.useForm(); + const [submitting, setSubmitting] = useState(false); + + // Cascade data + const [fromDatabases, setFromDatabases] = useState([]); + const [fromTables, setFromTables] = useState([]); + const [fromColumns, setFromColumns] = useState([]); + const [toDatabases, setToDatabases] = useState([]); + const [toTables, setToTables] = useState([]); + const [toColumns, setToColumns] = useState([]); + + const loadDatabases = async (dsUuid: string, setFn: (v: any[]) => void) => { + try { + const res = await $http.get(`/catalog/list/database/${dsUuid}`); + setFn(res || []); + } catch { setFn([]); } + }; + + const loadTables = async (dbUuid: string, setFn: (v: any[]) => void) => { + try { + const res = await $http.get(`/catalog/list/table/${dbUuid}`); + setFn(res || []); + } catch { setFn([]); } + }; + + const loadColumns = async (tableUuid: string, setFn: (v: any[]) => void) => { + try { + const res = await $http.get(`/catalog/list/column/${tableUuid}`); + setFn(res || []); + } catch { setFn([]); } + }; + + const handleSubmit = async () => { + try { + const values = await form.validateFields(); + setSubmitting(true); + + const columnMappings = values.columnMappings || []; + const childRelDetailList = columnMappings + .filter((m: any) => m?.fromColumn && m?.toColumn) + .map((m: any) => ({ + fromChildren: [{ uuid: m.fromColumn }], + toChild: { uuid: m.toColumn }, + })); + + const payload = { + fromEntity: { uuid: values.fromTable }, + toEntity: { uuid: values.toTable }, + lineageDetail: { + sourceType: 'MANUAL', + childRelDetailList, + }, + }; + + await $http.post('/catalog/lineage/add', payload); + message.success(intl.formatMessage({ id: 'lineage_add' }) + ' ✓'); + form.resetFields(); + onSuccess(); + } catch (e: any) { + if (e?.msg) message.error(e.msg); + } finally { + setSubmitting(false); + } + }; + + useEffect(() => { + if (!visible) { + form.resetFields(); + setFromDatabases([]); + setFromTables([]); + setFromColumns([]); + setToDatabases([]); + setToTables([]); + setToColumns([]); + } + }, [visible]); + + return ( + +
+ {intl.formatMessage({ id: 'lineage_source_table' })} +
+ + ({ + label: db.name, value: db.uuid, + }))} + onChange={(val) => { + loadTables(val, setFromTables); + form.setFieldsValue({ fromTable: undefined }); + setFromColumns([]); + }} + /> + + + ({ + label: ds.name, value: ds.uuid, + }))} + onChange={(val) => { + loadDatabases(val, setToDatabases); + form.setFieldsValue({ toDatabase: undefined, toTable: undefined }); + setToTables([]); + setToColumns([]); + }} + /> + + + ({ + label: t.name, value: t.uuid, + }))} + onChange={(val) => { + loadColumns(val, setToColumns); + }} + /> + +
+ + + + {intl.formatMessage({ id: 'lineage_column_mapping' })} ({intl.formatMessage({ id: 'lineage_optional' })}) + + {(fields, { add, remove }) => ( + <> + {fields.map(({ key, name, ...restField }) => ( +
+ + ({ + label: c.name, value: c.uuid, + }))} + /> + + remove(name)} /> +
+ ))} + + + )} +
+ +
+ ); +}; + +export default AddLineageModal; diff --git a/datavines-ui/Editor/components/Lineage/LineageDetailDrawer.tsx b/datavines-ui/Editor/components/Lineage/LineageDetailDrawer.tsx new file mode 100644 index 000000000..58f2db8ab --- /dev/null +++ b/datavines-ui/Editor/components/Lineage/LineageDetailDrawer.tsx @@ -0,0 +1,235 @@ +import React from 'react'; +import { + Drawer, Tag, Button, Popconfirm, Typography, Divider, +} from 'antd'; +import { + ArrowRightOutlined, DeleteOutlined, AimOutlined, DatabaseOutlined, +} from '@ant-design/icons'; +import { useIntl } from 'react-intl'; +import { LineageEntityNodeInfo, LineageEntityEdgeInfo } from './useLineageData'; + +const { Text, Title } = Typography; + +interface LineageDetailDrawerProps { + visible: boolean; + onClose: () => void; + selectedNode?: LineageEntityNodeInfo | null; + selectedEdge?: LineageEntityEdgeInfo | null; + onViewAsCenter?: (uuid: string) => void; + onDeleteEdge?: (fromUuid: string, toUuid: string) => void; +} + +const SOURCE_TYPE_COLORS: Record = { + MANUAL: 'blue', + SQL_PARSER: 'green', + SPARK_LISTENER: 'orange', + FLINK_SQL_LINEAGE: 'purple', + manual: 'blue', + sql_parser: 'green', +}; + +const LineageDetailDrawer: React.FC = ({ + visible, onClose, selectedNode, selectedEdge, onViewAsCenter, onDeleteEdge, +}) => { + const intl = useIntl(); + + const renderNodeDetail = () => { + if (!selectedNode) return null; + const columns = selectedNode.columns || []; + + return ( +
+ {/* Header */} +
+ + <DatabaseOutlined style={{ marginRight: 8, color: '#4169E1' }} /> + {selectedNode.displayName} + + {selectedNode.datasource?.type && ( + + {selectedNode.datasource.type.toUpperCase()} + + )} +
+ + {/* FQN */} +
+
Fully Qualified Name
+
+ {selectedNode.fullyQualifiedName || '-'} +
+
+ + {/* Metadata */} +
+
+ {intl.formatMessage({ id: 'lineage_select_datasource' })} +
+ {selectedNode.datasource?.displayName || '-'} +
+
+
+ {intl.formatMessage({ id: 'lineage_select_database' })} +
+ {selectedNode.database?.displayName || '-'} +
+ + {/* Columns */} + {columns.length > 0 && ( +
+
+ {intl.formatMessage({ id: 'dv_metric_column' })} ({columns.length}) +
+
+ {columns.map((col) => ( + + {col.displayName} + + ))} +
+
+ )} + + + +
+ ); + }; + + const renderEdgeDetail = () => { + if (!selectedEdge) return null; + const detail = selectedEdge.lineageDetail; + const fromName = selectedEdge.fromEntity?.displayName || ''; + const toName = selectedEdge.toEntity?.displayName || ''; + const columnLineages = detail?.childRelDetailList || []; + + return ( +
+ {/* Flow header */} +
+ {fromName} + + {toName} +
+ + {/* Source type */} + {detail?.sourceType && ( +
+
+ {intl.formatMessage({ id: 'lineage_source_type' })} +
+ + {detail.sourceType} + +
+ )} + + {/* Column lineage */} + {columnLineages.length > 0 && ( +
+
+ {intl.formatMessage({ id: 'lineage_column_mapping' })} +
+
+ {columnLineages.map((item, idx) => { + const fromCols = (item.fromChildren || []).map((c) => c.displayName).join(', '); + const toCol = item.toChild?.displayName || ''; + return ( +
+ {fromCols} + + {toCol} +
+ ); + })} +
+
+ )} + + {/* SQL */} + {detail?.sqlQuery && ( +
+
+ {intl.formatMessage({ id: 'lineage_related_sql' })} +
+
+ {detail.sqlQuery} +
+
+ )} + + + { + const from = selectedEdge.fromEntity?.uuid; + const to = selectedEdge.toEntity?.uuid; + if (from && to) onDeleteEdge?.(from, to); + }} + okType="danger" + > + + +
+ ); + }; + + const title = selectedNode + ? intl.formatMessage({ id: 'lineage_view_detail' }) + : intl.formatMessage({ id: 'lineage_detail' }); + + return ( + + {selectedNode ? renderNodeDetail() : renderEdgeDetail()} + + ); +}; + +export default LineageDetailDrawer; diff --git a/datavines-ui/Editor/components/Lineage/LineageGraph.less b/datavines-ui/Editor/components/Lineage/LineageGraph.less new file mode 100644 index 000000000..cc71d66e2 --- /dev/null +++ b/datavines-ui/Editor/components/Lineage/LineageGraph.less @@ -0,0 +1,227 @@ +@lineage-bg: #f8f9fc; +@lineage-border: #e8ecf1; +@lineage-dot: #dde1e9; +@lineage-radius: 10px; +@lineage-shadow: 0 1px 3px rgba(0, 0, 0, 0.06); +@lineage-font: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', sans-serif; + +.lineage-wrapper { + font-family: @lineage-font; + + // Ant Design Spin adds wrapper divs — ensure they fill available space + .ant-spin-nested-loading, + .ant-spin-container { + height: 100%; + } +} + +.lineage-toolbar { + display: flex; + align-items: center; + justify-content: space-between; + padding: 10px 16px; + background: #fff; + border: 1px solid @lineage-border; + border-radius: @lineage-radius @lineage-radius 0 0; + gap: 8px; + flex-wrap: wrap; + + .toolbar-left, .toolbar-right { + display: flex; + align-items: center; + gap: 8px; + } +} + +.lineage-graph-container { + position: relative; + width: 100%; + height: calc(100vh - 430px); + min-height: 380px; + background-color: @lineage-bg; + background-image: radial-gradient(@lineage-dot 1px, transparent 1px); + background-size: 20px 20px; + border: 1px solid @lineage-border; + border-top: none; + border-radius: 0 0 @lineage-radius @lineage-radius; + overflow: hidden; + + .lineage-canvas { + width: 100%; + height: 100%; + } + + // Zoom controls — bottom-right pill + .lineage-zoom-controls { + position: absolute; + right: 16px; + bottom: 16px; + display: flex; + align-items: center; + gap: 1px; + background: #fff; + border-radius: 20px; + box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1); + border: 1px solid @lineage-border; + padding: 2px; + z-index: 10; + + .ant-btn { + width: 30px; + height: 30px; + min-width: 30px; + padding: 0; + display: flex; + align-items: center; + justify-content: center; + border: none; + background: transparent; + color: #6e7191; + font-size: 13px; + border-radius: 50%; + transition: all 0.2s; + + &:hover { + background: #f0f2f5; + color: #1a1a2e; + } + } + + .zoom-divider { + width: 1px; + height: 18px; + background: @lineage-border; + margin: 0 2px; + } + } + + // Legend — bottom-left + .lineage-legend { + position: absolute; + left: 16px; + bottom: 16px; + display: flex; + align-items: center; + gap: 16px; + background: rgba(255, 255, 255, 0.92); + backdrop-filter: blur(6px); + border-radius: 8px; + padding: 6px 14px; + border: 1px solid @lineage-border; + z-index: 10; + font-size: 11px; + color: #6e7191; + + .legend-item { + display: flex; + align-items: center; + gap: 6px; + white-space: nowrap; + + .legend-dot { + width: 10px; + height: 10px; + border-radius: 3px; + } + } + } + + // Minimap override — no CSS Modules, classes are global + .g6-minimap-container { + position: absolute !important; + right: 16px; + top: 16px; + background: rgba(255, 255, 255, 0.92); + backdrop-filter: blur(6px); + border: 1px solid @lineage-border; + border-radius: 8px; + overflow: hidden; + box-shadow: @lineage-shadow; + } +} + +.lineage-empty-container { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + height: calc(100vh - 430px); + min-height: 320px; + background-color: @lineage-bg; + background-image: radial-gradient(@lineage-dot 1px, transparent 1px); + background-size: 20px 20px; + border: 1px solid @lineage-border; + border-top: none; + border-radius: 0 0 @lineage-radius @lineage-radius; + + .ant-empty-description { + color: #6e7191; + } +} + +.lineage-loading-container { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + height: calc(100vh - 380px); + min-height: 420px; +} + +.lineage-error-container { + padding: 16px; +} + +// Drawer enhancements +.lineage-drawer { + .drawer-section { + margin-bottom: 20px; + + .drawer-section-title { + font-size: 12px; + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.5px; + color: #9ca3af; + margin-bottom: 10px; + } + } + + .drawer-fqn { + font-family: 'SF Mono', 'Fira Code', 'Consolas', monospace; + font-size: 12px; + background: #f5f6fa; + padding: 8px 12px; + border-radius: 6px; + word-break: break-all; + color: #374151; + line-height: 1.5; + } + + .drawer-sql-block { + margin-top: 8px; + padding: 12px 14px; + background: #1e1e2e; + border-radius: 8px; + font-family: 'SF Mono', 'Fira Code', 'Consolas', monospace; + font-size: 12px; + color: #cdd6f4; + max-height: 220px; + overflow: auto; + white-space: pre-wrap; + word-break: break-all; + line-height: 1.6; + } + + .col-mapping-row { + display: flex; + align-items: center; + gap: 8px; + padding: 6px 0; + + .col-mapping-arrow { + color: #9ca3af; + flex-shrink: 0; + } + } +} diff --git a/datavines-ui/Editor/components/Lineage/LineageGraph.tsx b/datavines-ui/Editor/components/Lineage/LineageGraph.tsx new file mode 100644 index 000000000..f2bc800b0 --- /dev/null +++ b/datavines-ui/Editor/components/Lineage/LineageGraph.tsx @@ -0,0 +1,297 @@ +import React, { useRef, useEffect, useCallback } from 'react'; +import G6, { Graph, IG6GraphEvent } from '@antv/g6'; +import { Button, Tooltip } from 'antd'; +import { + ZoomInOutlined, ZoomOutOutlined, CompressOutlined, +} from '@ant-design/icons'; +import { NODE_WIDTH, NODE_HEIGHT, EDGE_COLORS, EDGE_ACTIVE_COLORS } from './constants'; +import { registerLineageNode } from './registerNodes'; +import { + CatalogEntityLineageVO, LineageEntityNodeInfo, LineageEntityEdgeInfo, +} from './useLineageData'; +import './LineageGraph.less'; + +let nodesRegistered = false; + +interface LineageGraphProps { + data: CatalogEntityLineageVO | null; + onNodeClick?: (node: LineageEntityNodeInfo) => void; + onEdgeClick?: (edge: LineageEntityEdgeInfo) => void; + onExpandNode?: (uuid: string) => void; +} + +const LineageGraph: React.FC = ({ + data, onNodeClick, onEdgeClick, onExpandNode, +}) => { + const containerRef = useRef(null); + const graphRef = useRef(null); + + const transformData = useCallback((lineageVO: CatalogEntityLineageVO) => { + const currentUuid = lineageVO.currentNode?.uuid; + const upstreamUuids = new Set(); + const downstreamUuids = new Set(); + + (lineageVO.edges || []).forEach((edge) => { + const fromUuid = edge.fromEntity?.uuid; + const toUuid = edge.toEntity?.uuid; + if (toUuid === currentUuid && fromUuid) upstreamUuids.add(fromUuid); + if (fromUuid === currentUuid && toUuid) downstreamUuids.add(toUuid); + }); + + const nodes = (lineageVO.nodes || []).map((node) => { + let nodeType = 'upstream'; + if (node.uuid === currentUuid) nodeType = 'current'; + else if (downstreamUuids.has(node.uuid)) nodeType = 'downstream'; + return { + id: node.uuid, + label: node.displayName || '', + nodeType, + datasourceName: node.datasource?.displayName || '', + datasourceType: node.datasource?.type || '', + databaseName: node.database?.displayName || '', + columnCount: node.columns?.length || 0, + hasNextNode: node.hasNextNode || false, + type: 'lineage-table-node', + rawData: node, + }; + }); + + const edges = (lineageVO.edges || []).map((edge) => { + const fromUuid = edge.fromEntity?.uuid || ''; + const toUuid = edge.toEntity?.uuid || ''; + let colorKey: 'upstream' | 'downstream' | 'default' = 'default'; + if (toUuid === currentUuid) colorKey = 'upstream'; + else if (fromUuid === currentUuid) colorKey = 'downstream'; + + return { + id: edge.uuid || `${fromUuid}-${toUuid}`, + source: fromUuid, + target: toUuid, + colorKey, + style: { + stroke: EDGE_COLORS[colorKey], + lineWidth: 2, + endArrow: { + path: G6.Arrow.triangle(6, 6, 8), + fill: EDGE_COLORS[colorKey], + d: 8, + }, + shadowColor: 'transparent', + shadowBlur: 0, + }, + rawData: edge, + }; + }); + + return { nodes, edges }; + }, []); + + useEffect(() => { + if (!containerRef.current || !data) return; + + if (!nodesRegistered) { + registerLineageNode(); + nodesRegistered = true; + } + + const container = containerRef.current; + const width = container.clientWidth || 800; + const height = container.clientHeight || 500; + + if (graphRef.current) { + graphRef.current.destroy(); + graphRef.current = null; + } + + try { + const g6Data = transformData(data); + if (!g6Data.nodes.length) return; + + const minimap = new G6.Minimap({ + size: [140, 90], + className: 'lineage-minimap', + }); + + const graph = new G6.Graph({ + container, + width, + height, + layout: { + type: 'dagre', + rankdir: 'LR', + nodesep: 50, + ranksep: 100, + align: 'UL', + }, + defaultNode: { + type: 'lineage-table-node', + size: [NODE_WIDTH, NODE_HEIGHT], + }, + defaultEdge: { + type: 'cubic-horizontal', + style: { + stroke: EDGE_COLORS.default, + lineWidth: 2, + lineDash: undefined, + }, + }, + modes: { + default: ['drag-canvas', 'zoom-canvas'], + }, + plugins: [minimap], + fitView: true, + fitViewPadding: [60, 60, 60, 60], + maxZoom: 1.5, + minZoom: 0.15, + animate: true, + animateCfg: { duration: 350, easing: 'easeCubic' }, + }); + + graph.on('node:click', (evt: IG6GraphEvent) => { + const model = evt.item?.getModel(); + if (!model) return; + const targetName = (evt.target as any)?.get?.('name'); + if ((targetName === 'expand-icon' || targetName === 'expand-bg') && onExpandNode) { + onExpandNode(model.id as string); + return; + } + // Clear previous selection + graph.getNodes().forEach((n) => graph.setItemState(n, 'selected', false)); + graph.setItemState(evt.item!, 'selected', true); + if (onNodeClick && model.rawData) { + onNodeClick(model.rawData as LineageEntityNodeInfo); + } + }); + + graph.on('node:mouseenter', (evt: IG6GraphEvent) => { + graph.setItemState(evt.item!, 'hover', true); + container.style.cursor = 'pointer'; + }); + graph.on('node:mouseleave', (evt: IG6GraphEvent) => { + graph.setItemState(evt.item!, 'hover', false); + container.style.cursor = 'default'; + }); + + graph.on('edge:click', (evt: IG6GraphEvent) => { + const model = evt.item?.getModel(); + if (!model) return; + if (onEdgeClick && model.rawData) { + onEdgeClick(model.rawData as LineageEntityEdgeInfo); + } + }); + + graph.on('edge:mouseenter', (evt: IG6GraphEvent) => { + const model = evt.item?.getModel(); + const colorKey = (model?.colorKey as string) || 'default'; + const activeColor = EDGE_ACTIVE_COLORS[colorKey as keyof typeof EDGE_ACTIVE_COLORS] || EDGE_ACTIVE_COLORS.default; + graph.updateItem(evt.item!, { + style: { + stroke: activeColor, + lineWidth: 3, + shadowColor: `${activeColor}40`, + shadowBlur: 8, + endArrow: { + path: G6.Arrow.triangle(7, 7, 8), + fill: activeColor, + d: 8, + }, + }, + }); + container.style.cursor = 'pointer'; + }); + + graph.on('edge:mouseleave', (evt: IG6GraphEvent) => { + const model = evt.item?.getModel(); + const colorKey = (model?.colorKey as string) || 'default'; + const edgeColor = EDGE_COLORS[colorKey as keyof typeof EDGE_COLORS] || EDGE_COLORS.default; + graph.updateItem(evt.item!, { + style: { + stroke: edgeColor, + lineWidth: 2, + shadowColor: 'transparent', + shadowBlur: 0, + endArrow: { + path: G6.Arrow.triangle(6, 6, 8), + fill: edgeColor, + d: 8, + }, + }, + }); + container.style.cursor = 'default'; + }); + + graph.on('canvas:click', () => { + graph.getNodes().forEach((n) => graph.setItemState(n, 'selected', false)); + }); + + graph.data(g6Data); + graph.render(); + graphRef.current = graph; + + const resizeObserver = new ResizeObserver(() => { + if (graphRef.current && container) { + graphRef.current.changeSize(container.clientWidth, container.clientHeight); + graphRef.current.fitView(60); + } + }); + resizeObserver.observe(container); + + return () => { + resizeObserver.disconnect(); + if (graphRef.current) { + graphRef.current.destroy(); + graphRef.current = null; + } + }; + } catch (err) { + console.error('[LineageGraph] render error:', err); + } + }, [data, onNodeClick, onEdgeClick, onExpandNode, transformData]); + + const handleZoom = (factor: number) => { + if (!graphRef.current) return; + const zoom = graphRef.current.getZoom(); + graphRef.current.zoomTo(Math.min(Math.max(zoom * factor, 0.15), 3), undefined, true, { duration: 200 }); + }; + + const handleFitView = () => { + graphRef.current?.fitView(60, undefined, true, { duration: 300 }); + }; + + return ( +
+
+ +
+
+ + Current +
+
+ + Upstream +
+
+ + Downstream +
+
+ +
+ +
+
+ ); +}; + +export default LineageGraph; diff --git a/datavines-ui/Editor/components/Lineage/LineageToolbar.tsx b/datavines-ui/Editor/components/Lineage/LineageToolbar.tsx new file mode 100644 index 000000000..b484456be --- /dev/null +++ b/datavines-ui/Editor/components/Lineage/LineageToolbar.tsx @@ -0,0 +1,65 @@ +import React from 'react'; +import { Input, Segmented, Button, Tooltip, Space } from 'antd'; +import { + ReloadOutlined, PlusOutlined, CodeOutlined, +} from '@ant-design/icons'; +import { useIntl } from 'react-intl'; + +interface LineageToolbarProps { + onRefresh: () => void; + onFitView: () => void; + onAddLineage: () => void; + onParseSql: () => void; + direction: string; + onDirectionChange: (dir: string) => void; + onSearch: (value: string) => void; +} + +const LineageToolbar: React.FC = ({ + onRefresh, onAddLineage, onParseSql, + direction, onDirectionChange, onSearch, +}) => { + const intl = useIntl(); + return ( +
+
+ + onDirectionChange(val as string)} + options={[ + { value: 'all', label: intl.formatMessage({ id: 'lineage_both' }) }, + { value: 'upstream', label: intl.formatMessage({ id: 'lineage_upstream' }) }, + { value: 'downstream', label: intl.formatMessage({ id: 'lineage_downstream' }) }, + ]} + size="middle" + /> +
+
+ + + + + + + + +
+
+ ); +}; + +export default LineageToolbar; diff --git a/datavines-ui/Editor/components/Lineage/SqlParseModal.tsx b/datavines-ui/Editor/components/Lineage/SqlParseModal.tsx new file mode 100644 index 000000000..17b73528d --- /dev/null +++ b/datavines-ui/Editor/components/Lineage/SqlParseModal.tsx @@ -0,0 +1,113 @@ +import React, { useState, useEffect } from 'react'; +import { Modal, Form, Select, Input, message, Alert } from 'antd'; +import { useIntl } from 'react-intl'; +import useRequest from '../../hooks/useRequest'; + +const { TextArea } = Input; + +interface SqlParseModalProps { + visible: boolean; + onClose: () => void; + onSuccess: () => void; + datasourceList: any[]; +} + +const SqlParseModal: React.FC = ({ + visible, onClose, onSuccess, datasourceList, +}) => { + const intl = useIntl(); + const { $http } = useRequest(); + const [form] = Form.useForm(); + const [submitting, setSubmitting] = useState(false); + const [errorMsg, setErrorMsg] = useState(null); + + const handleSubmit = async () => { + try { + const values = await form.validateFields(); + setSubmitting(true); + setErrorMsg(null); + + const ds = datasourceList.find((d: any) => d.id === values.datasourceId); + const payload = { + dataSourceInfos: [{ + id: values.datasourceId, + type: ds?.type || '', + }], + sql: values.sql, + }; + + await $http.post('/catalog/lineage/addByParseSql', payload, { hideError: true }); + message.success(intl.formatMessage({ id: 'lineage_add_by_sql' }) + ' ✓'); + form.resetFields(); + setErrorMsg(null); + onSuccess(); + } catch (e: any) { + const errText = e?.msg || e?.message || e?.data?.msg || 'Unknown error'; + setErrorMsg(errText); + } finally { + setSubmitting(false); + } + }; + + useEffect(() => { + if (!visible) { + form.resetFields(); + setErrorMsg(null); + } + }, [visible]); + + return ( + + {errorMsg && ( + setErrorMsg(null)} + /> + )} +
+ +