This repository was archived by the owner on Dec 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprocess_test.py
More file actions
138 lines (106 loc) · 3.53 KB
/
process_test.py
File metadata and controls
138 lines (106 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import json
from copy import deepcopy
from typing import *
from pydgraph import DgraphClient, DgraphClientStub
from grapl_analyzerlib.nodes.comparators import IntCmp, _str_cmps, StrCmp, _int_cmps
from grapl_analyzerlib.nodes.dynamic_node import DynamicNodeQuery, DynamicNodeView
from grapl_analyzerlib.nodes.process_node import ProcessQuery, ProcessView, IProcessView
from grapl_analyzerlib.nodes.types import Property, PropertyT
from grapl_analyzerlib.nodes.viewable import Viewable, EdgeViewT, ForwardEdgeView
T = TypeVar("T")
def create_edge(
client: DgraphClient, from_uid: str, edge_name: str, to_uid: str
) -> None:
if edge_name[0] == "~":
mut = {"uid": to_uid, edge_name[1:]: {"uid": from_uid}}
else:
mut = {"uid": from_uid, edge_name: {"uid": to_uid}}
txn = client.txn(read_only=False)
try:
txn.mutate(set_obj=mut, commit_now=True)
finally:
txn.discard()
def _upsert(client: DgraphClient, node_dict: Dict[str, Property]) -> str:
if node_dict.get("uid"):
node_dict.pop("uid")
node_dict["uid"] = "_:blank-0"
node_key = node_dict["node_key"]
query = f"""
{{
q0(func: eq(node_key, "{node_key}")) {{
uid,
expand(_all_)
}}
}}
"""
txn = client.txn(read_only=False)
try:
res = json.loads(txn.query(query).json)["q0"]
new_uid = None
if res:
node_dict["uid"] = res[0]["uid"]
new_uid = res[0]["uid"]
mutation = node_dict
m_res = txn.mutate(set_obj=mutation, commit_now=True)
uids = m_res.uids
if new_uid is None:
new_uid = uids["blank-0"]
return str(new_uid)
finally:
txn.discard()
def upsert(
client: DgraphClient,
type_name: str,
view_type: Type[Viewable],
node_key: str,
node_props: Dict[str, Property],
) -> Viewable:
node_props["node_key"] = node_key
node_props["dgraph.type"] = type_name
uid = _upsert(client, node_props)
# print(f'uid: {uid}')
node_props["uid"] = uid
# print(node_props['node_key'])
return view_type.from_dict(client, node_props)
def main() -> None:
local_client = DgraphClient(DgraphClientStub("localhost:9080"))
parent = {
"process_id": 100,
"process_name": "word.exe",
} # type: Dict[str, Property]
child = {"process_id": 1234, "process_name": "cmd.exe"} # type: Dict[str, Property]
parent_view = upsert(
local_client,
"Process",
ProcessView,
"ea75f056-61a1-479d-9ca2-f632d2c67205",
parent,
)
child_view = upsert(
local_client,
"Process",
ProcessView,
"10f585c2-cf31-41e2-8ca5-d477e78be3ac",
child,
)
create_edge(local_client, parent_view.uid, "children", child_view.uid)
queried_child_0 = ProcessQuery().with_process_id(eq=1234).query_first(local_client)
assert queried_child_0
assert queried_child_0.node_key == child_view.node_key
queried_child_1 = (
ProcessQuery()
.with_process_id(eq=1234)
.query_first(
local_client, contains_node_key="10f585c2-cf31-41e2-8ca5-d477e78be3ac"
)
)
assert queried_child_1
assert queried_child_1.node_key == child_view.node_key
assert queried_child_1.node_key == queried_child_0.node_key
p = (
ProcessQuery().with_parent().query_first(local_client)
) # type: Optional[ProcessView]
assert p
p.get_parent()
if __name__ == "__main__":
main()