-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathsqlite.py
192 lines (157 loc) · 5.3 KB
/
sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# http://inamidst.com/saxo/
# Created by Sean B. Palmer
import re
import sqlite3
class Table(object):
def __init__(self, connection, name):
self.connection = connection
self.name = name
def __iter__(self):
return self.rows()
def __delitem__(self, row):
fields = []
for field in self.schema():
fields.append(field[1])
if len(row) == len(fields):
query = "DELETE FROM %s WHERE " % self.name
query += " AND ".join(["%s=?" % field for field in fields])
cursor = self.connection.cursor()
cursor.execute(query, row)
self.connection.commit()
else:
raise ValueError("Wrong length: %s" % row)
def create(self, *schema):
cursor = self.connection.cursor()
types = {
None: "NULL",
int: "INTEGER",
float: "REAL",
str: "TEXT",
bytes: "BLOB"
}
schema = ", ".join(a + " " + types.get(b, b) for (a, b) in schema)
query = "CREATE TABLE IF NOT EXISTS %s (%s)" % (self.name, schema)
cursor.execute(query)
cursor.close()
def insert(self, row, *rows, commit=True):
cursor = self.connection.cursor()
size = len(row)
args = ",".join(["?"] * size)
query = "INSERT INTO %s VALUES(%s)" % (self.name, args)
cursor.execute(query, tuple(row))
for extra in rows:
cursor.execute(query, tuple(extra))
if commit:
self.connection.commit()
cursor.close()
def replace(self, row, *rows, commit=True):
# TODO: command="INSERT", then command="INSERT OR REPLACE"
cursor = self.connection.cursor()
size = len(row)
args = ",".join(["?"] * size)
query = "INSERT OR REPLACE INTO %s VALUES(%s)" % (self.name, args)
cursor.execute(query, tuple(row))
for extra in rows:
cursor.execute(query, tuple(extra))
if commit:
self.connection.commit()
cursor.close()
def rows(self, order=None):
cursor = self.connection.cursor()
query = "SELECT * FROM %s" % self.name
if isinstance(order, str):
if order.isalpha():
query += " ORDER BY %s" % order
cursor.execute(query)
while True:
result = cursor.fetchone()
if result is None:
break
yield result
cursor.close()
def schema(self):
cursor = self.connection.cursor()
query = "PRAGMA table_info(%s)" % self.name
cursor.execute(query)
while True:
result = cursor.fetchone()
if result is None:
break
yield result
cursor.close()
class Database(object):
def __init__(self, path):
self.path = path
self.connection = sqlite3.connect(path)
def regexp(pattern, text):
return re.search(pattern, text) is not None
self.connection.create_function("REGEXP", 2, regexp)
def __iter__(self):
raise NotImplemented
def __delitem__(self, key):
if key in self:
query = "DROP TABLE %s" % key
cursor = self.connection.cursor()
cursor.execute(query)
cursor.close()
def __contains__(self, key):
cursor = self.connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
cursor.execute(query, (key,))
result = cursor.fetchone() is not None
cursor.close()
return result
def __getitem__(self, key):
return Table(self.connection, key)
def __enter__(self, *args, **kargs):
return self
def __exit__(self, *args, **kargs):
# TODO: Check for changes to commit?
# self.connection.commit()
self.connection.close()
def commit(self):
self.connection.commit()
def execute(self, text, *args):
cursor = self.connection.cursor()
if not args:
cursor.execute(text)
else:
cursor.execute(text, args)
return cursor
def query(self, text, *args):
cursor = self.execute(text, *args)
# Duplicate rows are sometimes given,
# even when sqlite3 was compiled thread-safe
previous = None
while True:
result = cursor.fetchone()
if result is None:
break
if result == previous:
continue
yield result
previous = result
cursor.close()
def test():
import os
filename = "/tmp/saxo-test.sqlite3"
if os.path.isfile(filename):
os.remove(filename)
with Database(filename) as db:
assert "example" not in db
db["example"].create(
("name", str),
("size", int))
assert "example" in db
db["example"].insert(
("pqr", 5),
("abc", 10))
print(list(db["example"]))
assert list(db["example"].rows(order="name")) == [('abc', 10), ('pqr', 5)]
assert list(db["example"].rows(order="size")) == [('pqr', 5), ('abc', 10)]
print(list(db["example"].schema()))
del db["example"][("pqr", 5)]
print(list(db["example"]))
os.remove(filename)
if __name__ == "__main__":
test()