-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDataMining.py
More file actions
112 lines (101 loc) · 3.84 KB
/
DataMining.py
File metadata and controls
112 lines (101 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# coding: UTF-8
import time
from sqlalchemy.sql.expression import func
from sqlalchemy import desc
from sqlalchemy.exc import OperationalError
import datetime
from requests.exceptions import ConnectionError
import pandas as pd
import traceback
from utils import BitFlyer as F
from database.TradeHistory import History, History5min, History1min, get_session
from database.db_utils import get_recent_hist_df
from utils.settings import logging_config, get_logger
from ui.notification import slack
logger = get_logger().getChild(__file__)
use_data_type = History1min
use_interval = 1
class BoardMiner:
def __init__(self):
super().__init__()
self.session = get_session()
self.pre_hist_id, = self.session.query(
func.max(History.id)
).first()
self.last_req = 0
self.sleep_time = 1
def _add_history(self):
payloads = {
'after': self.pre_hist_id,
'before': self.pre_hist_id + 500,
'count': 500,
}
hist = F.api('history', payloads=payloads)
if not isinstance(hist, list):
time.sleep(60)
return True
self.last_req = time.time()
for h in hist:
h['exec_date'] = F.str2date(h['exec_date'])
hist_data = History(**h)
self.session.add(hist_data)
self.pre_hist_id = max(self.pre_hist_id, h['id'])
if len(hist) == 0:
if not self._check_latest():
return False
self.pre_hist_id += 500 - 1
self._set_hist_n(use_data_type, use_interval)
# self._set_hist_n(History1min, 1)
self.session.commit()
return True
def _check_latest(self):
# Trueならpre_hist_idを500進める
time.sleep(1)
hist = F.api('history', payloads={'count': 1})
if not hist:
return False
latest_id = hist[0]['id']
db_latest = self.session.query(History).order_by(
desc(History.exec_date)
).first().id
return db_latest + 500 <= latest_id
def _set_hist_n(self, data_type, n):
latest_n_data_time = self.session.query(data_type).order_by(
desc(data_type.exec_date)
).first().exec_date
latest_histdata_time = self.session.query(History).order_by(
desc(History.exec_date)
).first().exec_date
if latest_n_data_time >= latest_histdata_time - datetime.timedelta(minutes=n * 2):
return
logger.debug('set hist%d', n)
df = get_recent_hist_df(latest_n_data_time + datetime.timedelta(minutes=n), self.session)
df.exec_date = pd.to_datetime(df.exec_date)
df = df.set_index('exec_date')
bench_price = df.price.resample('%dMin' % n).mean().fillna(method='ffill')
bench_size = df['size'].resample('%dMin' % n).sum().fillna(0)
dfb = pd.DataFrame([bench_price, bench_size]).T
until = datetime.datetime.utcnow() - datetime.timedelta(minutes=n)
logger.debug(dfb.loc[:until])
dfb.loc[:until].to_sql(data_type.__tablename__, self.session.bind, chunksize=1000, if_exists='append')
def run(self):
while True:
try:
logger.info(self.pre_hist_id)
if not self._add_history():
self.sleep_time = 10
slp = max(0, self.sleep_time - (time.time() - self.last_req))
time.sleep(slp)
except (ConnectionError, OperationalError) as e:
logger.error('Error: hist_id=%d', self.pre_hist_id)
logger.exception(e)
self.sleep_time = 2
time.sleep(60 * 3)
if __name__ == '__main__':
logging_config()
miner = BoardMiner()
try:
miner.run()
except Exception as e:
logger.exception(e)
slack('Uncaught error occurred : %s' % traceback.format_exc())