Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,76 @@
*/
package io.datavines.connector.plugin;

import com.alibaba.druid.DbType;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
import com.alibaba.druid.stat.TableStat;
import io.datavines.connector.api.StatementParser;
import io.datavines.connector.api.entity.StatementMetadataFragment;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class DefaultStatementParser implements StatementParser {

private final DbType dbType;

public DefaultStatementParser() {
this(null);
}

public DefaultStatementParser(DbType dbType) {
this.dbType = dbType;
}

@Override
public StatementMetadataFragment parseStatement(String statement) {
return null;
if (statement == null || statement.trim().isEmpty()) {
return null;
}

List<SQLStatement> stmtList = SQLUtils.parseStatements(statement, dbType);
if (stmtList == null || stmtList.isEmpty()) {
return null;
}

List<String> inputTables = new ArrayList<>();
List<String> outputTables = new ArrayList<>();

for (SQLStatement sqlStatement : stmtList) {
SchemaStatVisitor visitor = SQLUtils.createSchemaStatVisitor(dbType);
sqlStatement.accept(visitor);

Map<TableStat.Name, TableStat> tables = visitor.getTables();
if (tables == null || tables.isEmpty()) {
continue;
}

for (Map.Entry<TableStat.Name, TableStat> entry : tables.entrySet()) {
String tableName = entry.getKey().getName();
TableStat stat = entry.getValue();

if (stat.getInsertCount() > 0 || stat.getCreateCount() > 0
|| stat.getMergeCount() > 0) {
if (!outputTables.contains(tableName)) {
outputTables.add(tableName);
}
}

if (stat.getSelectCount() > 0) {
if (!inputTables.contains(tableName)) {
inputTables.add(tableName);
}
}
}
}

if (inputTables.isEmpty() && outputTables.isEmpty()) {
return null;
}

return new StatementMetadataFragment(inputTables, outputTables, new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,4 @@ public ConnectorResponse testConnect(TestConnectionRequestParam param) {
.build();
}
}

@Override
public List<String> keyProperties() {
return Arrays.asList("host","port","catalog","database");
}
}
21 changes: 8 additions & 13 deletions datavines-core/src/main/java/io/datavines/core/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.datavines.core.enums;

import lombok.Getter;
import org.springframework.context.i18n.LocaleContextHolder;

import java.util.Locale;
Expand Down Expand Up @@ -49,7 +50,7 @@ public enum Status {
OLD_PASSWORD_IS_INCORRECT_ERROR(10020004, "Old Password is Incorrect", "旧密码错误"),
NEW_PASSWORD_CONFIRM_IS_INCORRECT_ERROR(10020004, "New Password Confirm is Incorrect", "新密码确认错误"),

WORKSPACE_EXIST_ERROR(11010001, "WorkSpace {0} is Exist error", "工作空间 {0} 已存在错误"),
WORKSPACE_EXIST_ERROR(11010001, "WorkSpace {0} is Exist Error", "工作空间 {0} 已存在错误"),
CREATE_WORKSPACE_ERROR(11010002, "Create WorkSpace {0} Error", "创建工作空间 {0} 错误"),
WORKSPACE_NOT_EXIST_ERROR(11010003, "WorkSpace {0} is Not Exist Error", "工作空间 {0} 不存在错误"),
UPDATE_WORKSPACE_ERROR(11010004, "Update WorkSpace {0} Error", "更新工作空间 {0} 错误"),
Expand All @@ -69,7 +70,6 @@ public enum Status {
TASK_NOT_EXIST_ERROR(13010001, "Task {0} Not Exist Error", "任务{0}不存在错误"),
TASK_LOG_PATH_NOT_EXIST_ERROR(13010002, "Task {0} Log Path Not Exist Error", "任务 {0} 的日志路径不存在错误"),
TASK_EXECUTE_HOST_NOT_EXIST_ERROR(13010003, "Task Execute Host {0} Not Exist Error", "任务 {0} 的执行服务地址不存在错误"),

TASK_EXECUTE_NOT_RUNNING(13010004, "Taskt {0} has not running", "任务 {0} 还没有开始运行,请稍后重试"),

JOB_PARAMETER_IS_NULL_ERROR(14010001, "Job {0} Parameter is Null Error", "作业 {0} 参数为空错误"),
Expand All @@ -83,7 +83,6 @@ public enum Status {
METRIC_IS_NOT_EXIST(14010009, "The Metric {0} is not exist", "规则 {0} 不存在"),
MULTI_TABLE_ACCURACY_NOT_SUPPORT_LOCAL_ENGINE(14010010, "Local Engine not support multi table accuracy in one datasource", "Local引擎不支持跨表准确性检查"),
JOB_PARAMETER_CONTAIN_DUPLICATE_METRIC_ERROR(14010011, "Job {0} Parameter Contain Duplicate Metric", "作业中存在重复的检查规则"),

JOB_SCHEDULE_EXIST_ERROR(14020001, "Job Schedule is Exist error, id must be not null", "作业定时任务已存在,ID 不能为空"),
CREATE_JOB_SCHEDULE_ERROR(14020002, "Create Job Schedule {0} Error", "创建作业定时任务 {0} 错误"),
JOB_SCHEDULE_NOT_EXIST_ERROR(14020003, "Job Schedule {0} is not Exist error", "作业定时任务 {0} 不存在错误"),
Expand All @@ -93,6 +92,7 @@ public enum Status {
SCHEDULE_TYPE_NOT_VALIDATE_ERROR(14020007, "Schedule type {0} is not Validate Error", "定时器类型参数 {0} 错误"),
SCHEDULE_CYCLE_NOT_VALIDATE_ERROR(14020008, "Schedule Param Cycle {0} is not Validate Error", "定时器周期参数 {0} 错误"),
SCHEDULE_CRON_IS_INVALID_ERROR(14020009, "Schedule cron {0} is not Validate Error", "定时器 Crontab 表达式 {0} 错误"),
DATASOURCE_NOT_SUPPORT_ERROR_DATA_OUTPUT_TO_SELF_ERROR(14020010, "DataSource type {0} not Support Error Data Output To Self Error", "{0} 类型数据源不支持错误输出写入"),

CREATE_TENANT_ERROR(15010001, "Create Tenant {0} Error", "创建 Linux 用户 {0} 错误"),
TENANT_NOT_EXIST_ERROR(15010002, "Tenant {0} Not Exist Error", "Linux 用户 {0} 不存在错误"),
Expand All @@ -109,15 +109,13 @@ public enum Status {
ERROR_DATA_STORAGE_EXIST_ERROR(17010003, "Error Data Storage {0} is Exist error", "错误数据存储 {0} 已存在"),
UPDATE_ERROR_DATA_STORAGE_ERROR(17010004, "Update Error Data Storage {0} Error", "更新 错误数据存储 {0} 错误"),

DATASOURCE_NOT_SUPPORT_ERROR_DATA_OUTPUT_TO_SELF_ERROR(14020010, "DataSource type {0} not Support Error Data Output To Self Error", "{0} 类型数据源不支持错误输出写入"),

SLA_ALREADY_EXIST_ERROR(18010001, "SLA {0} Already exist", "SLA {0} 已经存在"),
SLA_SENDER_ALREADY_EXIST_ERROR(18020001, "SLA Sender {0} Already exist", "SLA 发送器 {0} 已经存在"),
SLA_JOB_IS_NOT_EXIST_ERROR(18010003, "SLA job {0} is not exist", "SLA job {0} 存在"),

CATALOG_FETCH_DATASOURCE_NULL_ERROR(19010001, "获取元数据时数据源为空", "DataSource must not be null when fetch metadata"),

CATALOG_FETCH_METADATA_PARAMETER_ERROR(19010002, "获取元数据参数错误", "Fetch Metadata Parameter Error"),

CATALOG_TAG_CATEGORY_CREATE_ERROR(20010001, "Create Tag Category {0} Error", "创建标签类别 {0} 错误"),
CATALOG_TAG_CATEGORY_NOT_EXIST_ERROR(20010002, "Tag Category {0} Not Exist Error", "标签类别 {0} 不存在"),
CATALOG_TAG_CATEGORY_EXIST_ERROR(20010003, "Tag Category {0} is Exist error", "标签类别 {0} 已存在"),
Expand All @@ -129,21 +127,22 @@ public enum Status {
CREATE_CATALOG_TASK_SCHEDULE_ERROR(20030002, "Create Catalog Task Schedule {0} Error", "创建元数据抓取定时任务 {0} 错误"),
CATALOG_TASK_SCHEDULE_NOT_EXIST_ERROR(20030003, "Catalog Task Schedule {0} is not Exist error", "元数据抓取定时任务 {0} 不存在"),
UPDATE_CATALOG_TASK_SCHEDULE_ERROR(20030004, "Update Catalog Task Schedule {0} Error", "更新元数据抓取定时任务 {0} 错误"),
CATALOG_PROFILE_INSTANCE_FQN_ERROR(20030004, "Catalog instance fqn {0} Error", "数据实体全限定名 {0} 错误"),
CATALOG_INSTANCE_FQN_ERROR(20030005, "Catalog instance fqn {0} Error", "数据实体全限定名 {0} 错误"),
CATALOG_INSTANCE_IS_NULL_ERROR(20040001, "Catalog instance fqn {0} Error", "数据实体 {0} 为空错误"),

CREATE_ISSUE_ERROR(21010001, "Create Issue {0} Error", "创建Issue {0} 错误"),
ISSUE_NOT_EXIST_ERROR(21010002, "Issue {0} Not Exist Error", "Issue {0} 不存在错误"),
ISSUE_EXIST_ERROR(21010003, "Issue {0} is Exist error", "Issue {0} 已存在错误"),
UPDATE_ISSUE_ERROR(21010004, "Update Issue {0} Error", "更新Issue {0} 错误"),


CREATE_CONFIG_ERROR(22010001, "Create Config {0} Error", "创建参数 {0} 错误"),
CONFIG_NOT_EXIST_ERROR(22010002, "Config {0} Not Exist Error", "参数 {0} 不存在错误"),
CONFIG_EXIST_ERROR(22010003, "Config {0} is Exist error", "参数 {0} 已存在错误"),
UPDATE_CONFIG_ERROR(22010004, "Update Config {0} Error", "更新参数 {0} 错误"),

CAN_NOT_DELETE_DEFAULT_CONFIG_ERROR(22010005, "Can Not Delete Default Config {0} Error", "不能删除默认参数 {0} 错误"),
;

@Getter
private final int code;
private final String enMsg;
private final String zhMsg;
Expand All @@ -154,10 +153,6 @@ public enum Status {
this.zhMsg = zhMsg;
}

public int getCode() {
return this.code;
}

public String getMsg() {
if (Locale.SIMPLIFIED_CHINESE.getLanguage().equals(LocaleContextHolder.getLocale().getLanguage())) {
return this.zhMsg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ protected SinkConfig getValidateResultDataSinkConfig(ExpectedValue expectedValue
SinkConfig validateResultDataStorageConfig = new SinkConfig();
validateResultDataStorageConfig.setPlugin(jobExecutionInfo.getValidateResultDataStorageType());
Map<String, Object> configMap = getValidateResultSourceConfigMap(
ParameterUtils.convertParameterPlaceholders(sql, inputParameter),dbTable);
ParameterUtils.convertParameterPlaceholders(sql, inputParameter), dbTable);
configMap.put(JOB_EXECUTION_ID, jobExecutionInfo.getId());
configMap.put(INVALIDATE_ITEMS_TABLE, inputParameter.get(INVALIDATE_ITEMS_TABLE));
configMap.put(METRIC_UNIQUE_KEY, inputParameter.get(METRIC_UNIQUE_KEY));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.api.controller;

import io.datavines.core.aop.RefreshToken;
import io.datavines.core.constant.DataVinesConstants;
import io.datavines.server.api.dto.bo.catalog.lineage.LineageEntityEdgeInfo;
import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceKeyProperties;
import io.datavines.server.api.dto.bo.catalog.lineage.SqlWithDataSourceList;
import io.datavines.server.repository.entity.catalog.CatalogTagCategory;
import io.datavines.server.repository.service.CatalogEntityRelService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import javax.validation.Valid;

@Api(value = "catalog", tags = "catalog", produces = MediaType.APPLICATION_JSON_VALUE)
@RestController
@RequestMapping(value = DataVinesConstants.BASE_API_PATH + "/catalog/lineage", produces = MediaType.APPLICATION_JSON_VALUE)
@RefreshToken
public class CatalogLineageController {

@Autowired
private CatalogEntityRelService catalogEntityRelService;

@ApiOperation(value = "add lineage", response = Long.class)
@PostMapping(value = "/add", consumes = MediaType.APPLICATION_JSON_VALUE)
public Object addLineage(@Valid @RequestBody LineageEntityEdgeInfo entityEdgeInfo) {
return catalogEntityRelService.addLineage(entityEdgeInfo);
}

@ApiOperation(value = "get lineage by full qualified name", response = CatalogTagCategory.class, responseContainer = "list")
@GetMapping(value = "/getByFqn/{datasourceId}/{fqn}")
public Object getByFqn(@PathVariable Long datasourceId, @PathVariable String fqn) {
return catalogEntityRelService.getLineageByFqn(datasourceId,fqn,1,1);
}

@ApiOperation(value = "delete tag category", response = boolean.class)
@GetMapping(value = "/getByUUID/{uuid}")
public Object getByUUID(@PathVariable String uuid) {
return catalogEntityRelService.getLineageByUUID(uuid,1,1);
}

@ApiOperation(value = "delete lineage", response = boolean.class)
@DeleteMapping(value = "/{fromUUID}/{toUUID}")
public Object deleteLineage(@PathVariable("fromUUID") String fromUUID,
@PathVariable("toUUID") String toUUID) {
return catalogEntityRelService.deleteLineage(fromUUID, toUUID);
}

@ApiOperation(value = "parse sql to get lineage", response = Long.class)
@PostMapping(value = "/addByParseSql", consumes = MediaType.APPLICATION_JSON_VALUE)
public Object addLineageByParseSql(@Valid @RequestBody SqlWithDataSourceList sqlWithDataSourceList) {
return catalogEntityRelService.addLineageByParseSql(sqlWithDataSourceList);
}

@ApiOperation(value = "parse sql to get lineage", response = Long.class)
@PostMapping(value = "/addByParseSql2", consumes = MediaType.APPLICATION_JSON_VALUE)
public Object addLineageByParseSql2(@Valid @RequestBody SqlWithDataSourceKeyProperties sqlWithDataSourceKeyProperties) {
return catalogEntityRelService.addLineageByParseSql2(sqlWithDataSourceKeyProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.api.dto.bo.catalog;

import lombok.Data;

import java.io.Serializable;

@Data
public class CatalogEntityInstanceInfo implements Serializable {

private static final long serialVersionUID = -1L;

private Long id;

private String uuid;

private Long datasourceId;

private String type;

private String fullyQualifiedName;

private String displayName;

private String description;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.api.dto.bo.catalog.lineage;

import io.datavines.server.api.dto.bo.catalog.CatalogEntityInstanceInfo;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class CatalogEntityColumnLineageDetail implements Serializable {

private static final long serialVersionUID = -1L;

private List<CatalogEntityInstanceInfo> fromChildren;

private String function;

private CatalogEntityInstanceInfo toChild;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.api.dto.bo.catalog.lineage;

import io.datavines.server.enums.LineageSourceType;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class CatalogEntityLineageDetail implements Serializable {

private static final long serialVersionUID = -1L;

private List<CatalogEntityColumnLineageDetail> childRelDetailList;

private String description;

private LineageSourceType sourceType;

private String sqlQuery;
}
Loading
Loading