-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.py
More file actions
123 lines (103 loc) · 3.92 KB
/
storage.py
File metadata and controls
123 lines (103 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import threading
import datetime
import time
import json
import os
from dotenv import load_dotenv
from segment import Segment
from WAL import WAL
load_dotenv()
class storage_engine:
def __init__(self):
self.inmemory_storage = {}
self.inmemory_lock = threading.RLock()
self.segment_files = []
self.segment_file_path = os.path.expanduser(os.getenv("SEGMENTS_PATH", "./segments"))
self.threshold = int(os.getenv("THRESHOLD", 100))
self.segment = Segment()
self.wal = WAL()
print("Replaying WAL...")
self.inmemory_storage = self.wal.replay()
print(f"WAL recovery complete. {len(self.inmemory_storage)} keys restored.")
def insert_inmemory(self, key, value):
self.check_capacity_inmemory_storage()
entry = {
"key": key,
"value": value,
"tombstone": False,
"ts": int(time.time() * 1000)
}
with self.inmemory_lock:
self.inmemory_storage[key] = entry
self.wal.append("PUT", key, value)
return "OK"
def delete_inmemory(self, key):
entry = {
"key": key,
"value": None,
"tombstone": True,
"ts": int(time.time() * 1000)
}
with self.inmemory_lock:
self.inmemory_storage[key] = entry
self.wal.append("DELETE", key)
return "DELETED"
def rotate_segment_file(self):
now = datetime.datetime.now()
res = int(now.timestamp() * 1000)
with self.inmemory_lock:
if not self.inmemory_storage:
return
segment_file_name = os.path.join(self.segment_file_path, f"segment_{res}.json")
self.segment_files.append(segment_file_name)
with open(segment_file_name, 'w') as f:
json.dump(self.inmemory_storage, f,indent=4)
self.wal.clear()
self.inmemory_storage = {}
def check_capacity_inmemory_storage(self):
if len(self.inmemory_storage)>self.threshold:
self.rotate_segment_file()
return "Overflow", True
return "No Overflow", True
def find_value(self, key):
with self.inmemory_lock:
if key in self.inmemory_storage:
entry = self.inmemory_storage[key]
if entry.get("tombstone", False):
return None
return entry.get("value")
entry = self.segment.search_in_json_segments(key)
if entry and not entry.get("tombstone", False):
return entry.get("value")
return None
def background_compaction(self):
while True:
self.segment.merge_segments()
time.sleep(3 * 60 * 60)
def cli(self):
print("🚀 Welcome to LSM Storage CLI (type 'help' for commands)")
while True:
cmd = input("lsm> ").strip().split()
if not cmd:
continue
if cmd[0].lower() == "put" and len(cmd) == 3:
print(self.insert_inmemory(cmd[1], cmd[2]))
elif cmd[0].lower() == "get" and len(cmd) == 2:
print(self.find_value(cmd[1]))
elif cmd[0].lower() == "remove" and len(cmd) == 2:
print(self.delete_inmemory(cmd[1]))
elif cmd[0].lower() == "exit":
print("👋 Exiting CLI...")
break
elif cmd[0].lower() == "help":
print("Commands:")
print(" put <key> <value> → Insert key/value")
print(" get <key> → Retrieve value")
print(" remove <key> → Remove value")
print(" exit → Quit CLI")
else:
print("❌ Invalid command. Type 'help' for usage.")
if __name__ == '__main__':
store = storage_engine()
threading.Thread(target=store.background_compaction, daemon=True).start()
store.cli()