Bases: TSSink
flowchart TD
sgnligo.sinks.influx_sink.InfluxSink[InfluxSink]
click sgnligo.sinks.influx_sink.InfluxSink href "" "sgnligo.sinks.influx_sink.InfluxSink"
Push data to influx
Source code in sgnligo/sinks/influx_sink.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 | @dataclass
class InfluxSink(TSSink):
"""
Push data to influx
"""
instrument: Optional[str] = None
metadata_key: Optional[str] = None
scald_config: Optional[str] = None
route: Optional[str] = None
verbose: bool = False
wait_time: float = 2
def __post_init__(self):
super().__post_init__()
self.cnt = {p: 0 for p in self.sink_pads}
self.last_reduce_time = None
self.last_t0 = None
# set up aggregator sink
with open(self.scald_config, "r") as f:
agg_config = yaml.safe_load(f)
self.agg_sink = influx.Aggregator(**agg_config["backends"]["default"])
# register measurement schemas for aggregators
self.agg_sink.load(path=self.scald_config)
self.timedeq = deque(maxlen=100)
self.datadeq = {self.route: deque(maxlen=100)}
def pull(self, pad, bufs):
"""
getting the buffer on the pad just modifies the name to show this final
graph point and the prints it to prove it all works.
"""
# super().pull(pad, bufs)
# bufs = self.preparedframes[pad]
self.cnt[pad] += 1
if self.last_t0 is None:
self.last_t0 = bufs[0].t0
if self.metadata_key in bufs.metadata:
# FIXME: only works when data are integers?? if float, I get
# `urllib3 response status: 400 | response reason: Bad Request`
self.timedeq.append(int(bufs[0].t0 / 1_000_000_000))
self.datadeq[self.route].append(int(bufs.metadata[self.metadata_key]))
data = {
self.route: {
self.instrument: {
"time": list(self.timedeq),
"fields": {"data": list(self.datadeq[self.route])},
}
}
}
if bufs[0].t0 - self.last_t0 >= int(self.wait_time * 1_000_000_000):
self.last_t0 = round(int(bufs[0].t0), -2)
print("Writing out to influx")
self.agg_sink.store_columns(self.route, data[self.route], aggregate="max")
self.timedeq.clear()
self.datadeq[self.route].clear()
if bufs.EOS:
self.mark_eos(pad)
if self.verbose is True:
print(self.cnt[pad], bufs)
|
pull(pad, bufs)
getting the buffer on the pad just modifies the name to show this final
graph point and the prints it to prove it all works.
Source code in sgnligo/sinks/influx_sink.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 | def pull(self, pad, bufs):
"""
getting the buffer on the pad just modifies the name to show this final
graph point and the prints it to prove it all works.
"""
# super().pull(pad, bufs)
# bufs = self.preparedframes[pad]
self.cnt[pad] += 1
if self.last_t0 is None:
self.last_t0 = bufs[0].t0
if self.metadata_key in bufs.metadata:
# FIXME: only works when data are integers?? if float, I get
# `urllib3 response status: 400 | response reason: Bad Request`
self.timedeq.append(int(bufs[0].t0 / 1_000_000_000))
self.datadeq[self.route].append(int(bufs.metadata[self.metadata_key]))
data = {
self.route: {
self.instrument: {
"time": list(self.timedeq),
"fields": {"data": list(self.datadeq[self.route])},
}
}
}
if bufs[0].t0 - self.last_t0 >= int(self.wait_time * 1_000_000_000):
self.last_t0 = round(int(bufs[0].t0), -2)
print("Writing out to influx")
self.agg_sink.store_columns(self.route, data[self.route], aggregate="max")
self.timedeq.clear()
self.datadeq[self.route].clear()
if bufs.EOS:
self.mark_eos(pad)
if self.verbose is True:
print(self.cnt[pad], bufs)
|