forked from turbolytics/sql-flow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_sql.py
127 lines (111 loc) · 3.89 KB
/
test_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import unittest
import os
from sqlflow import InferredDiskBatch, settings
from sqlflow.config import Conf, Pipeline
from sqlflow.handlers import InferredMemBatch
from sqlflow.serde import JSON
class InferredMemBatchTestCase(unittest.TestCase):
def test_flat(self):
f_path = os.path.join(os.path.dirname(__file__), 'fixtures', 'flat.json')
p = InferredMemBatch(
conf=Conf(
kafka=None,
sql_results_cache_dir=settings.SQL_RESULTS_CACHE_DIR,
pipeline=Pipeline(
type=None,
input=None,
sql="SELECT COUNT(*) as num_rows FROM batch",
output=None,
),
),
deserializer=JSON(),
).init()
with open(f_path) as f:
for line in f:
p.write(line)
res = list(p.invoke())
self.assertEqual(
['{"num_rows": 3}'],
res,
)
def test_inferred_batch_nested_return(self):
f_path = os.path.join(os.path.dirname(__file__), 'fixtures', 'flat.json')
p = InferredMemBatch(
conf=Conf(
kafka=None,
sql_results_cache_dir=settings.SQL_RESULTS_CACHE_DIR,
pipeline=Pipeline(
type=None,
input=None,
sql="""
SELECT
{'something': city} as s1,
row(city, 1, 2) as nested_json
FROM batch
""",
output=None,
),
),
deserializer=JSON(),
).init()
with open(f_path) as f:
for line in f:
p.write(line)
res = list(p.invoke())
self.assertEqual([
'{"s1": {"something": "New York"}, "nested_json": {"": 2}}',
'{"s1": {"something": "New York"}, "nested_json": {"": 2}}',
'{"s1": {"something": "Baltimore"}, "nested_json": {"": 2}}',
],
res,
)
class InferredDiskBatchTestCase(unittest.TestCase):
def test_inferred_batch_flat(self):
f_path = os.path.join(os.path.dirname(__file__), 'fixtures', 'flat.json')
p = InferredDiskBatch(conf=Conf(
kafka=None,
sql_results_cache_dir=settings.SQL_RESULTS_CACHE_DIR,
pipeline=Pipeline(
type=None,
input=None,
sql="SELECT COUNT(*) as num_rows FROM batch",
output=None,
),
)).init()
with open(f_path) as f:
for line in f:
p.write(line)
res = list(p.invoke())
self.assertEqual(
['{"num_rows":3}'],
res,
)
def test_inferred_batch_nested_return(self):
f_path = os.path.join(os.path.dirname(__file__), 'fixtures', 'flat.json')
p = InferredDiskBatch(conf=Conf(
kafka=None,
sql_results_cache_dir=settings.SQL_RESULTS_CACHE_DIR,
pipeline=Pipeline(
type=None,
input=None,
sql="""
SELECT
{'something': city} as s1,
row(city, 1, 2) as nested_json
FROM batch
""",
output=None,
),
)).init()
with open(f_path) as f:
for line in f:
p.write(line)
res = list(p.invoke())
# TODO! Figure out this json with multiple "" keys
self.assertEqual([
'{"s1":{"something":"New York"},"nested_json":{"":"New York","":1,"":2}}',
'{"s1":{"something":"New York"},"nested_json":{"":"New York","":1,"":2}}',
'{"s1":{"something":"Baltimore"},"nested_json":{"":"Baltimore","":1,"":2}}',
],
res,
)