-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetl.py
More file actions
178 lines (135 loc) · 5.33 KB
/
etl.py
File metadata and controls
178 lines (135 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
"""
This script does the ETL process on oracle database using petl and cx_Oracle.
Requires 3 arguements {1} source_file {2} mapping_file {3} Target_table
"""
import petl
import cx_Oracle
import sys
import os
import config as cf
from difflib import SequenceMatcher as sm
from datetime import datetime,date
def isOpen(conn_Object):
# To check if the connecion is still open
try:
return conn_Object.ping() is None
except:
return False
def compare (src_record,last_row='()'):
# To compare the last record in the DB table and the record to be loaded.
# The ratio of similarity is set to be greater than 0.75,
# by this the unformatted record in the source table can be compared with
# the formatted row in the DB
src_record = str(src_record)[0:-1].lower() #records from the source file
last_row =str(last_row)[0:-1].lower() #last record in the DB
if(sm(None,src_record,last_row).ratio()>0.75):
return True
else:
return False
def str_to_class(classname):
# To convert string to builtin-datatype class
return getattr(sys.modules['builtins'], classname)
def d_type_change(field_name,d_type):
# To change the attribute datatype to the given datatype 'd_type'
# Also converts the date to the default oracle datetype 'DD-MON-YY'
if(d_type=='date'):
fin_table[field_name]= lambda val:val.strftime("%d-%b-%Y").upper()
else:
fin_table[field_name]= str_to_class(d_type)
def case_ch(field_name,case):
# To change the case to either upper or lower
fin_table[field_name]= case
def con_close():
# To close the connection to the DB with committing or rolling back the
# transcation that has been made
des=input("Commit the transcation or rollback? [c/r]: ")
if des=='c':
conn.commit()
print("Transcation committed")
conn.close()
print("Connection closed")
elif des =='r':
conn.rollback()
print("Transcation rolled back")
conn.close()
print("Connection closed")
else:
print("Invalid option. Choose 'c' or 'r' ")
con_close()
try:
#Creating Log dir
os.makedirs(os.getcwd()+"\logs",exist_ok=True)
#Opening connection and the cursor for SQL execution
print("Opening Connection...")
conn = cx_Oracle.connect(cf.oracle["user"],cf.oracle["pass"],cf.oracle["host"])
print("Connection established")
cur = conn.cursor()
#Extracting last_row from DB
db_table = sys.argv[3].upper()
cur.execute("select max(ROWNUM) from %s" %db_table)
last_rnum = cur.fetchone()[0]
cur.execute("select * from {0} where ROWID IN(select max(ROWID) from {0})".format(db_table))
last_row = cur.fetchone()
print("%d rows are present in the database table" %(lambda x: last_rnum if(last_rnum) else 0)(last_rnum))
print("'%s' is the last row in the database table" %(lambda x: (last_row,) if(last_row) else '')(last_row))
#Extraction
source = petl.fromxlsx(sys.argv[1])
map_file = petl.fromcsv(sys.argv[2])
#transformation
hdr_map = []
for i in map_file:
hdr_map.append(i[1].upper()) #creating header using map file
for i in source[0]:
flag = 0
for j in map_file:
if(i == j[0]):
flag= 1
break
if(flag == 0):
source = petl.cutout(source,i)
src_records = source.records().list()
temp = []
for i in range(-1,(-len(src_records)-1),-1):
if(compare(src_records[i], last_row)):
break
else:
temp.insert(i,src_records[i])
source = petl.pushheader(temp, hdr_map) #adding header
fin_table = petl.convert(source)
#transforming by mapping condtions
for line in map_file:
field_name = line[1].upper()
d_type_change(field_name,line[2])
if(line[4] != ''):
case_ch(field_name,line[4])
#Loading
print("Loading the data...")
petl.appenddb(fin_table, cur, db_table, commit=False)
print("Data Loaded...")
#BatchError Handling
if (cur.getbatcherrors()):
with open(r"logs\{}_trnslog.txt".format(date.today()),'a+') as file:
for i in cur.getbatcherrors():
error = "%s %s %d \n" %(datetime.now().strftime("[%Y-%m-%d::%H:%M:%S]"),i.message,i.offset+2)
file.writelines(error)
print("Could not load some records. Logged at",file.name)
con_close()
#db_Error Handling
except cx_Oracle.Error as e:
er, =e.args
print("DB_Error Occured:",er.code,er.message)
with open(r"logs\{}_dblog.txt".format(date.today()),'a+') as file:
error ="%s %s \n" %(datetime.now().strftime("[%Y-%m-%d::%H:%M:%S]"),er.message)
file.writelines(error)
print("Error has been logged at",file.name)
#Other_Error Handling
except Exception as e:
print("Error Occured:",e)
print("Closing Connection...")
finally:
try:
if (isOpen(conn)):
conn.close()
except:
pass