-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrun_sample.py
More file actions
64 lines (49 loc) · 1.69 KB
/
run_sample.py
File metadata and controls
64 lines (49 loc) · 1.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
https://doc.shinnytech.com/tqsdk/latest/quickstart.html
"""
import datetime
import pandas as pd
from tqsdk import TqApi, TqAuth, TqAccount, TqKq
from credentials import *
from delegate.tq_delegate import TQDelegate, UpdateType
from tools.utils_basic import pd_show_all
IS_PROD = False
def main():
pd_show_all()
my_symbol = "SHFE.au2506" # 测试的品种
my_duration_sec = 10 # 每个K线的时间长度
my_klines_count = 50 # 订阅多少根K线
if IS_PROD:
my_account = TqAccount(qh_company, qh_username, qh_password) # 实盘账户
else:
my_account = TqKq() # 快期模拟账户
def init(df):
print(df.tail(2))
def run(
system_datetime: datetime.datetime,
kline_datetime: datetime.datetime,
kline_dataframe: pd.DataFrame,
update_type: int,
):
print(f'{system_datetime} {kline_datetime} ', end='')
if update_type == UpdateType.PrevKLine:
print('prev kline \n', kline_dataframe.tail(2))
elif update_type == UpdateType.NewKLine:
print('kline \n', kline_dataframe.tail(2))
elif update_type == UpdateType.NewPrice:
print('close ', kline_dataframe.tail(1).close.values[0])
elif update_type == UpdateType.NewVolume:
print('volume ', kline_dataframe.tail(1).volume.values[0])
delegate = TQDelegate(
tq_account=my_account,
tq_username=tq_username,
tq_password=tq_password,
strategy_init=init,
strategy_run=run,
symbol=my_symbol,
duration_seconds=my_duration_sec,
data_length=my_klines_count,
)
delegate.run()
if __name__ == '__main__':
main()