-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy path_utils.py
44 lines (34 loc) · 1.05 KB
/
_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING
from flow.record import RecordDescriptor
if TYPE_CHECKING:
from collections.abc import Iterator
from flow.record.base import Record
def generate_records(count: int = 100) -> Iterator[Record]:
TestRecordEmbedded = RecordDescriptor(
"test/embedded_record",
[
("datetime", "dt"),
],
)
TestRecord = RecordDescriptor(
"test/adapter",
[
("uint32", "number"),
("record", "record"),
],
)
for i in range(count):
embedded = TestRecordEmbedded(datetime.datetime.now(datetime.timezone.utc))
yield TestRecord(number=i, record=embedded)
def generate_plain_records(count: int = 100) -> Iterator[Record]:
TestRecord = RecordDescriptor(
"test/adapter/plain",
[
("uint32", "number"),
("datetime", "dt"),
],
)
for i in range(count):
yield TestRecord(number=i, dt=datetime.datetime.now(datetime.timezone.utc))