Skip to content

sgnligo.sinks.kafka_sink

A sink element to send data to kafka topics.

KafkaSink dataclass

Bases: SinkElement

Send data to kafka topics or pretty print to stdout

Parameters:

Name Type Description Default
output_kafka_server Optional[str]

str or None, The kafka server to write data to. If None, pretty print to stdout

None
time_series_topics Optional[list[str]]

list[str], The kafka topics to write time-series data to

None
trigger_topics Optional[list[str]]

list[str], The kafka topics to write trigger data to

None
tag Optional[list[str]]

str, The tag to write the kafka data with

None
prefix str

str, The prefix of the kafka topic

''
interval Optional[float]

int, The interval at which to write the data to kafka

None
Source code in sgnligo/sinks/kafka_sink.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@dataclass
class KafkaSink(SinkElement):
    """Send data to kafka topics or pretty print to stdout

    Args:
        output_kafka_server:
            str or None, The kafka server to write data to. If None, pretty
            print to stdout
        time_series_topics:
            list[str], The kafka topics to write time-series data to
        trigger_topics:
            list[str], The kafka topics to write trigger data to
        tag:
            str, The tag to write the kafka data with
        prefix:
            str, The prefix of the kafka topic
        interval:
            int, The interval at which to write the data to kafka
    """

    output_kafka_server: Optional[str] = None
    time_series_topics: Optional[list[str]] = None
    trigger_topics: Optional[list[str]] = None
    tag: Optional[list[str]] = None
    prefix: str = ""
    interval: Optional[float] = None

    def __post_init__(self):
        super().__post_init__()

        # Initialize client only if kafka server is provided
        # Handle both None and the string "None"
        if self.output_kafka_server is not None and self.output_kafka_server != "None":
            self.client = kafka.Client("kafka://{}".format(self.output_kafka_server))
        else:
            self.client = None

        if self.tag is None:
            self.tag = []

        if self.time_series_topics is not None:
            self.time_series_data = {}
            for topic in self.time_series_topics:
                self.time_series_data[topic] = {"time": [], "data": []}
        else:
            self.time_series_data = None

        if self.trigger_topics is not None:
            self.trigger_data = {}
            for topic in self.trigger_topics:
                self.trigger_data[topic] = []
        else:
            self.trigger_data = None

        self.last_sent = now()

    def _pretty_print(self, topic, data, data_type="time_series"):
        """Pretty print data to stdout in a formatted way."""
        output = {
            "topic": self.prefix + topic,
            "tags": self.tag,
            "data_type": data_type,
            "timestamp": now(),
            "data": data,
        }
        print(json.dumps(output, indent=2, cls=LIGOJSONEncoder))
        sys.stdout.flush()

    def write(self):
        if self.time_series_data is not None:
            for topic, data in self.time_series_data.items():
                if len(data["time"]) > 0:
                    if self.client is not None:
                        self.client.write(self.prefix + topic, data, tags=self.tag)
                    else:
                        self._pretty_print(topic, data, "time_series")
                    self.time_series_data[topic] = {"time": [], "data": []}

        if self.trigger_data is not None:
            for topic, data in self.trigger_data.items():
                if len(data) > 0:
                    if self.client is not None:
                        self.client.write(self.prefix + topic, data, tags=self.tag)
                    else:
                        self._pretty_print(topic, data, "trigger")
                    self.trigger_data[topic] = []

    def pull(self, pad, frame):
        """Incoming frames are expected to be an EventFrame containing {"kafka":
        EventBuffer}. The data in the EventBuffer are expected to in the format of
        {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}
        """
        events = frame["kafka"].data
        if events is not None:
            for topic, data in events.items():
                if (
                    self.time_series_topics is not None
                    and topic in self.time_series_topics
                ):
                    self.time_series_data[topic]["time"].extend(data["time"])
                    self.time_series_data[topic]["data"].extend(data["data"])
                elif self.trigger_topics is not None and topic in self.trigger_topics:
                    self.trigger_data[topic].extend(data)

        if frame.EOS:
            self.mark_eos(pad)

    def internal(self):
        if self.interval is None:
            # Don't wait
            self.write()
        else:
            time_now = now()
            if time_now - self.last_sent > self.interval:
                self.write()
                self.last_sent = time_now

        if self.at_eos:
            print("shutdown: KafkaSink: close")
            if self.client is not None:
                self.client.close()

pull(pad, frame)

Incoming frames are expected to be an EventFrame containing {"kafka": EventBuffer}. The data in the EventBuffer are expected to in the format of {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}

Source code in sgnligo/sinks/kafka_sink.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def pull(self, pad, frame):
    """Incoming frames are expected to be an EventFrame containing {"kafka":
    EventBuffer}. The data in the EventBuffer are expected to in the format of
    {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}
    """
    events = frame["kafka"].data
    if events is not None:
        for topic, data in events.items():
            if (
                self.time_series_topics is not None
                and topic in self.time_series_topics
            ):
                self.time_series_data[topic]["time"].extend(data["time"])
                self.time_series_data[topic]["data"].extend(data["data"])
            elif self.trigger_topics is not None and topic in self.trigger_topics:
                self.trigger_data[topic].extend(data)

    if frame.EOS:
        self.mark_eos(pad)

LIGOJSONEncoder

Bases: JSONEncoder

Custom JSON encoder that handles LIGO-specific types.

Source code in sgnligo/sinks/kafka_sink.py
19
20
21
22
23
24
25
26
27
class LIGOJSONEncoder(json.JSONEncoder):
    """Custom JSON encoder that handles LIGO-specific types."""

    def default(self, obj: Any) -> Any:
        if isinstance(obj, LIGOTimeGPS):
            # Convert LIGOTimeGPS to float (GPS seconds)
            return float(obj)
        # Let the base class handle other types
        return super().default(obj)