sgnligo.sinks.kafka_sink
¶
A sink element to send data to kafka topics.
KafkaSink
dataclass
¶
Bases: SinkElement
Send data to kafka topics or pretty print to stdout
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_kafka_server
|
Optional[str]
|
str or None, The kafka server to write data to. If None, pretty print to stdout |
None
|
time_series_topics
|
Optional[list[str]]
|
list[str], The kafka topics to write time-series data to |
None
|
trigger_topics
|
Optional[list[str]]
|
list[str], The kafka topics to write trigger data to |
None
|
tag
|
Optional[list[str]]
|
str, The tag to write the kafka data with |
None
|
prefix
|
str
|
str, The prefix of the kafka topic |
''
|
interval
|
Optional[float]
|
int, The interval at which to write the data to kafka |
None
|
Source code in sgnligo/sinks/kafka_sink.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
|
pull(pad, frame)
¶
Incoming frames are expected to be an EventFrame containing {"kafka": EventBuffer}. The data in the EventBuffer are expected to in the format of {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}
Source code in sgnligo/sinks/kafka_sink.py
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
|
LIGOJSONEncoder
¶
Bases: JSONEncoder
Custom JSON encoder that handles LIGO-specific types.
Source code in sgnligo/sinks/kafka_sink.py
19 20 21 22 23 24 25 26 27 |
|