-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathviews_api.py
More file actions
135 lines (116 loc) · 4.48 KB
/
views_api.py
File metadata and controls
135 lines (116 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from http import HTTPStatus
from fastapi import APIRouter, Depends, HTTPException
from lnbits.core.models import User
from lnbits.core.services import websocket_updater
from lnbits.decorators import check_user_exists
from .crud import (
create_bitcoinswitch,
delete_bitcoinswitch,
get_bitcoinswitch,
get_bitcoinswitches,
update_bitcoinswitch,
)
from .models import Bitcoinswitch, BitcoinswitchPublic, CreateBitcoinswitch
bitcoinswitch_api_router = APIRouter(prefix="/api/v1")
@bitcoinswitch_api_router.post("")
async def api_bitcoinswitch_create(
data: CreateBitcoinswitch, user: User = Depends(check_user_exists)
) -> Bitcoinswitch:
if data.wallet not in user.wallet_ids:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=(
"You do not have permission to create a bitcoinswitch for this wallet."
),
)
return await create_bitcoinswitch(data)
@bitcoinswitch_api_router.put("/trigger/{switch_id}/{pin}")
async def api_bitcoinswitch_trigger(
switch_id: str,
pin: int,
user: User = Depends(check_user_exists),
) -> None:
switch = await get_bitcoinswitch(switch_id)
if not switch:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Bitcoinswitch does not exist."
)
_switch = next((s for s in switch.switches if s.pin == pin), None)
if not _switch:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Switch with this pin does not exist.",
)
if switch.wallet not in user.wallet_ids:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="You do not have permission to trigger this switch.",
)
await websocket_updater(switch.id, f"{pin}-{_switch.duration}")
@bitcoinswitch_api_router.put("/{bitcoinswitch_id}")
async def api_bitcoinswitch_update(
data: CreateBitcoinswitch,
bitcoinswitch_id: str,
user: User = Depends(check_user_exists),
) -> Bitcoinswitch:
bitcoinswitch = await get_bitcoinswitch(bitcoinswitch_id)
if not bitcoinswitch:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="bitcoinswitch does not exist"
)
if data.wallet not in user.wallet_ids:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="You do not have permission to update this bitcoinswitch.",
)
for k, v in data.dict().items():
if v is not None:
setattr(bitcoinswitch, k, v)
bitcoinswitch.switches = data.switches
return await update_bitcoinswitch(bitcoinswitch)
@bitcoinswitch_api_router.get(
"/public/{bitcoinswitch_id}", response_model=BitcoinswitchPublic
)
async def api_bitcoinswitch_get_public(bitcoinswitch_id: str) -> Bitcoinswitch:
bitcoinswitch = await get_bitcoinswitch(bitcoinswitch_id)
if not bitcoinswitch:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Bitcoinswitch does not exist."
)
return bitcoinswitch
@bitcoinswitch_api_router.get("")
async def api_bitcoinswitchs_retrieve(
user: User = Depends(check_user_exists),
) -> list[Bitcoinswitch]:
return await get_bitcoinswitches(user.wallet_ids)
@bitcoinswitch_api_router.get("/{bitcoinswitch_id}")
async def api_bitcoinswitch_retrieve(
bitcoinswitch_id: str, user: User = Depends(check_user_exists)
) -> Bitcoinswitch:
bitcoinswitch = await get_bitcoinswitch(bitcoinswitch_id)
if not bitcoinswitch:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Bitcoinswitch does not exist"
)
if bitcoinswitch.wallet not in user.wallet_ids:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="You do not have permission to access this bitcoinswitch.",
)
return bitcoinswitch
@bitcoinswitch_api_router.delete("/{bitcoinswitch_id}")
async def api_bitcoinswitch_delete(
bitcoinswitch_id: str,
user: User = Depends(check_user_exists),
) -> None:
bitcoinswitch = await get_bitcoinswitch(bitcoinswitch_id)
if not bitcoinswitch:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Bitcoinswitch does not exist."
)
if bitcoinswitch.wallet not in user.wallet_ids:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="You do not have permission to delete this bitcoinswitch.",
)
await delete_bitcoinswitch(bitcoinswitch_id)