Skip to content

sgnligo.transforms.latency

An element to calculate latency of buffers.

Latency dataclass

Bases: TransformElement


              flowchart TD
              sgnligo.transforms.latency.Latency[Latency]

              

              click sgnligo.transforms.latency.Latency href "" "sgnligo.transforms.latency.Latency"
            

Calculate latency and prepare data into the format expected by the KafkaSink

Parameters:

Name Type Description Default
route Optional[str]

str, the kafka route to send the latency data to

None
interval Optional[float]

float, the interval to calculate latency, in seconds

None
Source code in sgnligo/transforms/latency.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@dataclass
class Latency(TransformElement):
    """Calculate latency and prepare data into the format expected by the KafkaSink

    Args:
        route:
            str, the kafka route to send the latency data to
        interval:
            float, the interval to calculate latency, in seconds
    """

    route: Optional[str] = None
    interval: Optional[float] = None

    def __post_init__(self):
        super().__post_init__()
        assert len(self.sink_pads) == 1
        assert isinstance(self.route, str)
        self.frame = None

        if self.interval is not None:
            self.last_time = now()
            self.latencies = []

    def pull(self, pad, frame):
        self.frame = frame

    def new(self, pad):
        """Calculate buffer latency. Latency is defined as the current time subtracted
        by the buffer start time.
        """
        frame = self.frame
        assert isinstance(frame, (EventFrame, TSFrame))
        time = now().ns()
        latency = (time - frame.start) / 1_000_000_000

        if self.interval is None:
            event_data = {
                self.route: {
                    "time": [
                        frame.start / 1_000_000_000,
                    ],
                    "data": [
                        latency,
                    ],
                }
            }
        else:
            self.latencies.append(latency)
            if time / 1e9 - self.last_time >= self.interval:
                event_data = {
                    self.route: {
                        "time": [
                            frame.start / 1_000_000_000,
                        ],
                        "data": [
                            max(self.latencies),
                        ],
                    }
                }
                self.latencies = []
                self.last_time = time / 1e9
            else:
                event_data = {}

        event_buffer = EventBuffer.from_span(frame.start, frame.end, [event_data])
        return EventFrame(
            data=[event_buffer],
            EOS=frame.EOS,
        )

new(pad)

Calculate buffer latency. Latency is defined as the current time subtracted by the buffer start time.

Source code in sgnligo/transforms/latency.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def new(self, pad):
    """Calculate buffer latency. Latency is defined as the current time subtracted
    by the buffer start time.
    """
    frame = self.frame
    assert isinstance(frame, (EventFrame, TSFrame))
    time = now().ns()
    latency = (time - frame.start) / 1_000_000_000

    if self.interval is None:
        event_data = {
            self.route: {
                "time": [
                    frame.start / 1_000_000_000,
                ],
                "data": [
                    latency,
                ],
            }
        }
    else:
        self.latencies.append(latency)
        if time / 1e9 - self.last_time >= self.interval:
            event_data = {
                self.route: {
                    "time": [
                        frame.start / 1_000_000_000,
                    ],
                    "data": [
                        max(self.latencies),
                    ],
                }
            }
            self.latencies = []
            self.last_time = time / 1e9
        else:
            event_data = {}

    event_buffer = EventBuffer.from_span(frame.start, frame.end, [event_data])
    return EventFrame(
        data=[event_buffer],
        EOS=frame.EOS,
    )