-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdask_db.py
103 lines (85 loc) · 3.12 KB
/
dask_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from dask.distributed import LocalCluster, Client, get_worker, Lock # Use Dask's Lock
import logging
import os
import random
import sqlite3
import json
# Configure logging
logging.basicConfig(filename='dask_master.log', level=logging.INFO)
# Database setup
DB_PATH = "pointclouds.db"
def setup_database():
"""Initialize the database schema."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS pointclouds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
worker_id TEXT,
worker_ip TEXT,
worker_name TEXT,
pointcloud TEXT
)
""")
conn.commit()
conn.close()
def generate_and_upload_pointcloud():
"""Worker generates a random point cloud and uploads it to the database."""
try:
# Get worker information
worker = get_worker()
worker_info = worker.worker_address.split(":")
worker_id = worker_info[0]
worker_ip = worker_info[1]
worker_name = worker.name
# Generate a random point cloud (list of 3D points)
pointcloud = [[random.uniform(0, 100) for _ in range(3)] for _ in range(100)]
logging.info(f"Worker {worker_name} generated a point cloud.")
# Use the distributed lock to ensure safe database writes
lock = Lock("db_lock") # Use a distributed lock with a unique name
with lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO pointclouds (worker_id, worker_ip, worker_name, pointcloud)
VALUES (?, ?, ?, ?)
""", (worker_id, worker_ip, worker_name, json.dumps(pointcloud)))
conn.commit()
conn.close()
logging.info(f"Worker {worker_name} uploaded the point cloud to the database.")
return f"Worker {worker_name} completed its task."
except Exception as e:
logging.error(f"Error in worker task: {e}")
return f"Error in worker task: {e}"
def main():
# Setup database
setup_database()
# Create Dask Server (Master Node) and 5 Worker Nodes
cluster = LocalCluster(n_workers=5)
client = Client(cluster)
# Print Master Node and Worker Nodes addresses
print("Master Node (Dask Server) address:", cluster.scheduler_address)
workers_info = client.scheduler_info()['workers']
print("\nWorker Nodes addresses:")
for worker_addr in workers_info:
print(f"- {worker_addr}")
# Get the list of worker addresses
worker_addresses = list(workers_info.keys())
# Submit one task to each worker
futures = []
for worker_addr in worker_addresses:
print(f'worker_addr:{worker_addr}')
future = client.submit(generate_and_upload_pointcloud, workers=worker_addr)
print(f'future:{future}')
futures.append(future)
# Gather results
results = client.gather(futures)
# Log results
for result in results:
logging.info(result)
print(result)
# Ensure resources are cleaned up
client.close()
cluster.close()
if __name__ == "__main__":
main()