-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreams.py
65 lines (54 loc) · 1.88 KB
/
streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""Stream type classes for tap-opensearch."""
from __future__ import annotations
from pathlib import Path
from singer_sdk import typing as th # JSON Schema typing helpers
from tap_opensearch.client import opensearchStream
# TODO: Delete this is if not using json files for schema definition
SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")
# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition.
# - Copy-paste as many times as needed to create multiple stream types.
class UsersStream(opensearchStream):
"""Define custom stream."""
name = "users"
path = "/users"
primary_keys = ["id"]
replication_key = None
# Optionally, you may also use `schema_filepath` in place of `schema`:
# schema_filepath = SCHEMAS_DIR / "users.json" # noqa: ERA001
schema = th.PropertiesList(
th.Property("name", th.StringType),
th.Property(
"id",
th.StringType,
description="The user's system ID",
),
th.Property(
"age",
th.IntegerType,
description="The user's age in years",
),
th.Property(
"email",
th.StringType,
description="The user's email address",
),
th.Property("street", th.StringType),
th.Property("city", th.StringType),
th.Property(
"state",
th.StringType,
description="State name in ISO 3166-2 format",
),
th.Property("zip", th.StringType),
).to_dict()
class GroupsStream(opensearchStream):
"""Define custom stream."""
name = "groups"
path = "/groups"
primary_keys = ["id"]
replication_key = "modified"
schema = th.PropertiesList(
th.Property("name", th.StringType),
th.Property("id", th.StringType),
th.Property("modified", th.DateTimeType),
).to_dict()