-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy path01a_cb_set_get.py
More file actions
115 lines (94 loc) · 3.71 KB
/
01a_cb_set_get.py
File metadata and controls
115 lines (94 loc) · 3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Upserts a document to the specified Couchbase collection and retrieves the document by its key.
The `upsert_document` function takes a key and a document dictionary as input, and upserts
the document to the specified Couchbase collection. It prints the CAS (Compare-And-Swap)
value of the upserted document and the time taken to perform the operation.
The `get_airline_by_key` function takes a key as input, and retrieves the document
from the specified Couchbase collection by its key. It prints the content of the
retrieved document and its CAS value, as well as the time taken to perform the operation.
The code also demonstrates how to connect to a Couchbase cluster, get a reference to a
bucket and collection, and how to cleanly close the connection to the cluster.
"""
from datetime import timedelta
import time
# needed for any cluster connection
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
# needed for options -- cluster, timeout, SQL++ (N1QL) query, etc.
from couchbase.options import ClusterOptions
# Update this to your cluster
# For local/self-hosted Couchbase Server:
ENDPOINT = "localhost"
USERNAME = "Administrator"
PASSWORD = "password"
# For Capella (cloud), uncomment and update these instead:
# ENDPOINT = "cb.your-endpoint.cloud.couchbase.com" # Your Capella hostname
# USERNAME = "your-capella-username"
# PASSWORD = "your-capella-password"
BUCKET_NAME = "travel-sample"
CB_SCOPE = "inventory"
CB_COLLECTION = "airline"
# User Input ends here.
# Connect options - authentication
auth = PasswordAuthenticator(USERNAME, PASSWORD)
# get a reference to our cluster
options = ClusterOptions(auth)
# For local/self-hosted Couchbase Server:
cluster = Cluster('couchbase://{}'.format(ENDPOINT), options)
# For Capella (cloud), use this instead (uncomment and comment out the line above):
# options.apply_profile('wan_development') # Helps avoid latency issues with Capella
# cluster = Cluster('couchbases://{}'.format(ENDPOINT), options) # Note: couchbaseS (secure)
# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=10))
# get a reference to our bucket
cb = cluster.bucket(BUCKET_NAME)
# get a reference to our bucket using the default collection
# cb_coll = cb.default_collection()
# get a reference to our collection
cb_coll = cb.scope(CB_SCOPE).collection(CB_COLLECTION)
# upsert document function
def upsert_document(key, doc):
print("\nUpsert CAS: ")
start_time = time.time()
try:
# Example of upsert operation with TTL of 1 hour
# result = cb_coll.upsert(key, doc, expiry=timedelta(hours=1))
result = cb_coll.upsert(key, doc)
print(result.cas)
except Exception as e:
print(e)
finally:
end_time = time.time()
execution_time = end_time - start_time
print(f"Upsert operation took {execution_time:.6f} seconds")
# get document function
def get_airline_by_key(key):
print("\nGet Result: ")
start_time = time.time()
try:
result = cb_coll.get(key)
print(result.content_as[dict])
print("CAS:", result.cas)
except Exception as e:
print(e)
finally:
end_time = time.time()
execution_time = end_time - start_time
print(f"Get operation took {execution_time:.6f} seconds")
# query for new document by callsign
key = "airline_8091"
doc = {
"type": "airline",
"id": 8091,
"callsign": "CBS",
"iata": None,
"icao": None,
"name": "Couchbase Airways",
"timestamp": time.time()
}
upsert_document(key, doc)
get_airline_by_key(key)
# Cleanly close the connection to the Couchbase cluster
print("\nClosing connection to Couchbase cluster...")
cluster.close()
print("Connection closed successfully.")