Skip to main content

Real-Time DB Updates Example

This tutorial demonstrates how to use Macrometa GDN as a real-time database with local latencies across the globe.

Prerequisites

Step-by-Step Instructions

This page guides you through creating a collection, subscribing to the collection, and automatically adding and deleting data to the collection.

  1. Create a new JavaScript (.js) or Python (.py) file in your favorite IDE.
  2. Copy the code block below and paste it into your JavaScript or Python file.
  3. With each subsequent step, append the code block to the existing file and then run it.

If you want to skip the explanation and just run the code, then go directly to the Full Demo File.

Step 1. Connect to GDN

To update a database with Macrometa Global Data Network (GDN), you must first establish a connection to a local region.

When this code runs, it initializes the server connection to the specified region URL. For more information about connecting to GDN, refer to Authentication.

# Import libraries
from c8 import C8Client

# Define constants
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "my API key" # Change this to your API key

print("--- Connecting to GDN")

# Choose one of the following methods to access the GDN. API key is recommended.

# Authenticate with API key
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)

# Authenticate with JWT
# client = C8Client(protocol='https', host=URL, port=443, token=<your token>, geofabric=GEO_FABRIC))

# Authenticate with email and password
# client = C8Client(protocol='https', host=URL, port=443, email=<your email id>, password=<your password>, geofabric=GEO_FABRIC)

Step 2. Set Variables

Set the variables required to run the code.

import threading
import pprint
import time
COLLECTION_NAME = "ddos"

# Variables
data = [
{"ip": "10.1.1.1", "action": "block", "rule": "blocklistA"},
{"ip": "20.1.1.2", "action": "block", "rule": "blocklistA"},
{"ip": "30.1.1.3", "action": "block", "rule": "blocklistB"},
{"ip": "40.1.1.4", "action": "block", "rule": "blocklistA"},
{"ip": "50.1.1.5", "action": "block", "rule": "blocklistB"},
]

pp = pprint.PrettyPrinter(indent=4)

Step 3. Add Code

Add code to perform the following actions:

  1. Create a document collection called ddos to which you subscribe. If a collection by that name already exists, the existing collection is used instead.
  2. Add data to the collection, then subscribe to the collection. In this example, we are adding IP addresses to block.
  3. Delete the collection.
note

An existing collection must have streams enabled.

if __name__ == '__main__':

# Open connection to GDN. You will be routed to closest region.
print(f"\n1. CONNECT: Server: {URL}")
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)

# Create a collection if one does not exist
print(f"\n2. CREATE_COLLECTION: Server: {URL}, Collection: {COLLECTION_NAME}")
if client.has_collection(COLLECTION_NAME):
collection = client.collection(COLLECTION_NAME)
else:
collection = client.create_collection(COLLECTION_NAME, stream=True)

# Subscribe to receive real-time updates when changes are made to the collection.
def create_callback():
def callback_fn(event):
pp.pprint(event)
return

client.on_change(COLLECTION_NAME, callback=callback_fn, timeout=15)

print(f"\n3. SUBSCRIBE_COLLECTION: Server: {URL}, Collection: {COLLECTION_NAME}")
rt_thread = threading.Thread(target=create_callback)
rt_thread.start()
time.sleep(10)
print(f"Callback registered for collection: {COLLECTION_NAME}")

# Insert documents into the collection to trigger a notification.
print(f"\n4. INSERT_DOCUMENTS: Server: {URL}, Collection: {COLLECTION_NAME}")
client.insert_document(COLLECTION_NAME, document=data)

# Wait to close the callback.
print("\n5. Waiting to close callback")
rt_thread.join(2)

# Delete collection.
print(f"\n6. DELETE_COLLECTION: Server: {URL}, Collection: {COLLECTION_NAME}")
client.delete_collection(COLLECTION_NAME)

Full Demo File

from c8 import C8Client
import threading
import pprint
import time

URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "my API key" # Change this to your API key

COLLECTION_NAME = "ddos"

# Variables
data = [
{"ip": "10.1.1.1", "action": "block", "rule": "blocklistA"},
{"ip": "20.1.1.2", "action": "block", "rule": "blocklistA"},
{"ip": "30.1.1.3", "action": "block", "rule": "blocklistB"},
{"ip": "40.1.1.4", "action": "block", "rule": "blocklistA"},
{"ip": "50.1.1.5", "action": "block", "rule": "blocklistB"},
]

pp = pprint.PrettyPrinter(indent=4)

if __name__ == '__main__':

# Open connection to GDN. You will be routed to closest region.
print(f"\n1. CONNECT: Server: {URL}")
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)

# Create a collection if one does not exist
print(f"\n2. CREATE_COLLECTION: Server: {URL}, Collection: {COLLECTION_NAME}")
if client.has_collection(COLLECTION_NAME):
collection = client.collection(COLLECTION_NAME)
else:
collection = client.create_collection(COLLECTION_NAME, stream=True)

# Subscribe to receive real-time updates when changes are made to the collection.
def create_callback():
def callback_fn(event):
pp.pprint(event)
return

client.on_change(COLLECTION_NAME, callback=callback_fn, timeout=15)

print(f"\n3. SUBSCRIBE_COLLECTION: Server: {URL}, Collection: {COLLECTION_NAME}")
rt_thread = threading.Thread(target=create_callback)
rt_thread.start()
time.sleep(10)
print(f"Callback registered for collection: {COLLECTION_NAME}")

# Insert documents into the collection to trigger a notification.
print(f"\n4. INSERT_DOCUMENTS: Server: {URL}, Collection: {COLLECTION_NAME}")
client.insert_document(COLLECTION_NAME, document=data)

# Wait to close the callback.
print("\n5. Waiting to close callback")
rt_thread.join(2)

# Delete collection.
print(f"\n6. DELETE_COLLECTION: Server: {URL}, Collection: {COLLECTION_NAME}")
client.delete_collection(COLLECTION_NAME)