-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathtest_elastic_adapter.py
53 lines (46 loc) · 1.48 KB
/
test_elastic_adapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import json
import pytest
from flow.record import RecordDescriptor
from flow.record.adapter.elastic import ElasticWriter
MyRecord = RecordDescriptor(
"my/record",
[
("string", "field_one"),
("string", "field_two"),
],
)
@pytest.mark.parametrize(
"record",
[
MyRecord("first", "record"),
MyRecord("second", "record"),
],
)
def test_elastic_writer_metadata(record):
options = {
"_meta_foo": "some value",
"_meta_bar": "another value",
}
with ElasticWriter(uri="elasticsearch:9200", **options) as writer:
assert writer.metadata_fields == {"foo": "some value", "bar": "another value"}
assert writer.record_to_document(record, "some-index") == {
"_index": "some-index",
"_source": json.dumps(
{
"field_one": record.field_one,
"field_two": record.field_two,
"_record_metadata": {
"descriptor": {
"name": "my/record",
"hash": record._desc.descriptor_hash,
},
"source": None,
"classification": None,
"generated": record._generated.isoformat(),
"version": 1,
"foo": "some value",
"bar": "another value",
},
}
),
}