Skip to content

Commit c73a03c

Browse files
authored
Merge pull request #80 from sqlparser/I7A9SH
add DataLineageParser
2 parents 6773995 + 6d6645e commit c73a03c

2 files changed

Lines changed: 166 additions & 0 deletions

File tree

api/DataLineageParser.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* 解析SQLFLow exportLineageAsJson接口返回的JSON格式的血缘关系中的关系链路
3+
*
4+
* 例如demo中的血缘数据,解析成以下链路:
5+
* 达成的目标是,List中两个元素:
6+
* SCOTT.DEPT -> SCOTT.EMP->VSAL
7+
* SCOTT.EMP->VSAL
8+
*/
9+
10+
public class DataLineageParser {
11+
static class Node {
12+
String value;
13+
String id;
14+
Node next;
15+
16+
public Node(String value, String id) {
17+
this.value = value;
18+
this.id = id;
19+
}
20+
21+
public String key() {
22+
Node node = this.next;
23+
StringBuilder key = new StringBuilder(id);
24+
while (node != null) {
25+
key.append(node.id);
26+
node = node.next;
27+
}
28+
return key.toString();
29+
}
30+
}
31+
32+
public static void main(String[] args) {
33+
String input = "{"jobId":"d9550e491c024d0cbe6e1034604aca17","code":200,"data":{"mode":"global","sqlflow":{"relationship":[{"sources":[{"parentName":"ORDERS","column":"TABLE","coordinates":[],"id":"10000106","parentId":"86"}],"id":"1000012311","type":"fdd","target":{"parentName":"SPECIAL_ORDERS","column":"TABLE","coordinates":[],"id":"10000102","parentId":"82"}},{"sources":[{"parentName":"CUSTOMERS","column":"TABLE","coordinates":[],"id":"10000103","parentId":"94"}],"id":"1000012312","type":"fdd","target":{"parentName":"SPECIAL_ORDERS","column":"TABLE","coordinates":[],"id":"10000102","parentId":"82"}}]}},"sessionId":"8bb7d3da4b687bb7badf01608a739fbebd61309cd5a643cecf079d122095738a_1685604216451"}";
34+
try {
35+
ObjectMapper objectMapper = new ObjectMapper();
36+
JsonNode jsonNode = objectMapper.readTree(input);
37+
JsonNode relationshipNode = jsonNode.path("data").path("sqlflow").path("relationships");
38+
List<Map<String, Object>> dataList = objectMapper.readValue(relationshipNode.toString(), new TypeReference<List<Map<String, Object>>>() {
39+
});
40+
41+
ArrayList<Node> value = new ArrayList<>();
42+
Map<String, Node> nodeMap = new HashMap<>();
43+
for (Map<String, Object> data : dataList) {
44+
List<Map<String, Object>> sources = (List<Map<String, Object>>) data.get("sources");
45+
Map<String, Object> targetNode = (Map<String, Object>) data.get("target");
46+
Node target = new Node((String) targetNode.get("parentName"), (String) targetNode.get("parentId"));
47+
if (!sources.isEmpty()) {
48+
for (Map<String, Object> source : sources) {
49+
String parentId = (String) source.get("parentId");
50+
String parentName = (String) source.get("parentName");
51+
Node sourceNode = new Node(parentName, parentId);
52+
sourceNode.next = target;
53+
value.add(sourceNode);
54+
nodeMap.put(parentId, sourceNode);
55+
}
56+
} else {
57+
value.add(target);
58+
nodeMap.put((String) targetNode.get("parentId"), target);
59+
}
60+
}
61+
62+
for (Node node : value) {
63+
Node next = node.next;
64+
if (next != null) {
65+
String id = next.id;
66+
next = nodeMap.get(id);
67+
if (next != null) {
68+
node.next = next;
69+
}
70+
}
71+
}
72+
73+
HashSet<String> key = new HashSet<>();
74+
Iterator<Node> iterator = value.iterator();
75+
while (iterator.hasNext()) {
76+
Node node = iterator.next();
77+
String k = node.key();
78+
if (key.contains(k)) {
79+
iterator.remove();
80+
}
81+
key.add(k);
82+
}
83+
84+
// value
85+
} catch (JsonProcessingException e) {
86+
e.printStackTrace();
87+
}
88+
}
89+
}

api/DataLineageParser.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* 解析SQLFLow exportLineageAsJson接口返回的JSON格式的血缘关系中的关系链路
3+
*
4+
* 例如demo中的血缘数据解析成以下链路
5+
* 达成的目标是List中两个元素
6+
* SCOTT.DEPT -> SCOTT.EMP->VSAL
7+
* SCOTT.EMP->VSAL
8+
*/
9+
10+
import json
11+
12+
class Node:
13+
def __init__(self, value, node_id):
14+
self.value = value
15+
self.id = node_id
16+
self.next = None
17+
18+
def key(self):
19+
node = self.next
20+
key = self.id
21+
while node:
22+
key += node.id
23+
node = node.next
24+
return key
25+
26+
def main():
27+
input_data = '{"jobId":"d9550e491c024d0cbe6e1034604aca17","code":200,"data":{"mode":"global","sqlflow":{"relationship":[{"sources":[{"parentName":"ORDERS","column":"TABLE","coordinates":[],"id":"10000106","parentId":"86"}],"id":"1000012311","type":"fdd","target":{"parentName":"SPECIAL_ORDERS","column":"TABLE","coordinates":[],"id":"10000102","parentId":"82"}},{"sources":[{"parentName":"CUSTOMERS","column":"TABLE","coordinates":[],"id":"10000103","parentId":"94"}],"id":"1000012312","type":"fdd","target":{"parentName":"SPECIAL_ORDERS","column":"TABLE","coordinates":[],"id":"10000102","parentId":"82"}}]}},"sessionId":"8bb7d3da4b687bb7badf01608a739fbebd61309cd5a643cecf079d122095738a_1685604216451"}'
28+
try:
29+
data = json.loads(input_data)
30+
relationship_node = data["data"]["sqlflow"]["relationships"]
31+
data_list = relationship_node
32+
33+
value = []
34+
node_map = {}
35+
for data_item in data_list:
36+
sources = data_item["sources"]
37+
target_node = data_item["target"]
38+
target = Node(target_node["parentName"], target_node["parentId"])
39+
if sources:
40+
for source in sources:
41+
parent_id = source["parentId"]
42+
parent_name = source["parentName"]
43+
source_node = Node(parent_name, parent_id)
44+
source_node.next = target
45+
value.append(source_node)
46+
node_map[parent_id] = source_node
47+
else:
48+
value.append(target)
49+
node_map[target_node["parentId"]] = target
50+
51+
for node in value:
52+
next_node = node.next
53+
if next_node:
54+
next_id = next_node.id
55+
next_node = node_map.get(next_id)
56+
if next_node:
57+
node.next = next_node
58+
59+
key_set = set()
60+
value_iter = iter(value)
61+
while True:
62+
try:
63+
node = next(value_iter)
64+
k = node.key()
65+
if k in key_set:
66+
value_iter.remove()
67+
key_set.add(k)
68+
except StopIteration:
69+
break
70+
71+
chains = []
72+
print(chains)
73+
except json.JSONDecodeError as e:
74+
print(e)
75+
76+
if __name__ == "__main__":
77+
main()

0 commit comments

Comments
 (0)