Skip to content

DataSource V2 API Reference

The datasource_v2 package provides a composable, dataclass-based API for creating gravitational wave data sources.

Main Entry Points

DataSource (Dispatcher)

sgnligo.sources.datasource_v2.DataSource

Unified data source that dispatches to the appropriate source class.

This is the main entry point for CLI-based pipelines. It accepts a data_source type string and forwards all other parameters to the appropriate source class.

For programmatic use, you can also instantiate source classes directly (e.g., WhiteSource, GWDataNoiseComposedSource).

Parameters:

Name Type Description Default
data_source str

Type of source ("white", "gwdata-noise", etc.)

required
name str

Name for the composed element

'datasource'
**kwargs Any

Parameters forwarded to the specific source class

{}
Example

Direct instantiation

source = DataSource( ... data_source="white", ... name="test", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

From CLI

source = DataSource.from_argv(name="data_source")

Use in pipeline

pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/datasource.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class DataSource:
    """Unified data source that dispatches to the appropriate source class.

    This is the main entry point for CLI-based pipelines. It accepts a
    data_source type string and forwards all other parameters to the
    appropriate source class.

    For programmatic use, you can also instantiate source classes directly
    (e.g., WhiteSource, GWDataNoiseComposedSource).

    Args:
        data_source: Type of source ("white", "gwdata-noise", etc.)
        name: Name for the composed element
        **kwargs: Parameters forwarded to the specific source class

    Example:
        >>> # Direct instantiation
        >>> source = DataSource(
        ...     data_source="white",
        ...     name="test",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>>
        >>> # From CLI
        >>> source = DataSource.from_argv(name="data_source")
        >>>
        >>> # Use in pipeline
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "datasource"
    description: ClassVar[str] = "Unified data source dispatcher"

    def __init__(
        self,
        data_source: str,
        name: str = "datasource",
        **kwargs: Any,
    ) -> None:
        """Initialize DataSource.

        Args:
            data_source: Type of source ("white", "gwdata-noise", etc.)
            name: Name for the composed element
            **kwargs: Parameters forwarded to the specific source class
        """
        self.data_source = data_source
        self.name = name
        self._kwargs = kwargs

        # Look up and instantiate the inner source
        cls = get_composed_source_class(data_source)
        self._inner: ComposedSourceBase = cls(name=name, **kwargs)

    # --- Expose inner element for pipeline connection ---

    @property
    def element(self) -> TSComposedSourceElement:
        """The underlying TSComposedSourceElement for pipeline integration."""
        return self._inner.element

    @property
    def srcs(self) -> Dict[str, Any]:
        """Source pads of the composed element."""
        return self._inner.srcs

    def __getattr__(self, name: str) -> Any:
        """Delegate unknown attributes to the inner source."""
        if name.startswith("_"):
            raise AttributeError(name)
        return getattr(self._inner, name)

    # --- CLI Support ---

    @classmethod
    def create_cli_parser(
        cls,
        prog: Optional[str] = None,
        description: Optional[str] = None,
    ) -> argparse.ArgumentParser:
        """Create CLI argument parser with options for all registered sources.

        Use this when you need to add custom arguments to the parser.

        Example:
            >>> parser = DataSource.create_cli_parser()
            >>> parser.add_argument("--snr-threshold", type=float, default=8.0)
            >>> source = DataSource.from_parser(parser, name="pipeline")

        Returns:
            ArgumentParser configured with --data-source and all source options
        """
        from sgnligo.sources.datasource_v2.cli import build_composed_cli_parser

        return build_composed_cli_parser(prog=prog, description=description)

    @classmethod
    def from_argv(
        cls,
        name: str = "datasource",
        argv: Optional[List[str]] = None,
    ) -> "DataSource":
        """Create DataSource from command line arguments.

        Simple interface for when you don't need custom CLI arguments.
        Parses sys.argv (or provided argv) directly.

        Example:
            >>> # In a script called with:
            >>> # python script.py --data-source white --ifos H1
            >>> source = DataSource.from_argv(name="my_source")

        Args:
            name: Name for the composed element
            argv: Command line arguments (defaults to sys.argv[1:])

        Returns:
            DataSource instance
        """
        import sys

        from sgnligo.sources.datasource_v2.cli import (
            build_composed_cli_parser,
            check_composed_help_options,
            namespace_to_datasource_kwargs,
        )

        # Handle --list-sources and --help-source before parsing
        if check_composed_help_options(argv):
            sys.exit(0)

        parser = build_composed_cli_parser()
        args = parser.parse_args(argv)
        kwargs = namespace_to_datasource_kwargs(args)
        return cls(name=name, **kwargs)

    @classmethod
    def from_parser(
        cls,
        parser: argparse.ArgumentParser,
        name: str = "datasource",
        argv: Optional[List[str]] = None,
    ) -> tuple["DataSource", argparse.Namespace]:
        """Create DataSource from a custom argument parser.

        Use this when you've added custom arguments to the parser.
        Returns both the DataSource and the parsed args so you can
        access your custom arguments.

        Example:
            >>> parser = DataSource.create_cli_parser()
            >>> parser.add_argument("--snr-threshold", type=float, default=8.0)
            >>> source, args = DataSource.from_parser(parser, name="pipeline")
            >>> print(f"SNR threshold: {args.snr_threshold}")

        Args:
            parser: ArgumentParser (from create_cli_parser with optional additions)
            name: Name for the composed element
            argv: Command line arguments (defaults to sys.argv[1:])

        Returns:
            Tuple of (DataSource instance, parsed args namespace)
        """
        import sys

        from sgnligo.sources.datasource_v2.cli import (
            check_composed_help_options,
            namespace_to_datasource_kwargs,
        )

        # Handle --list-sources and --help-source before parsing
        if check_composed_help_options(argv):
            sys.exit(0)

        args = parser.parse_args(argv)
        kwargs = namespace_to_datasource_kwargs(args)
        return cls(name=name, **kwargs), args

    @staticmethod
    def list_sources() -> List[str]:
        """List all available source types."""
        return list_composed_source_types()

    @staticmethod
    def get_source_class(source_type: str) -> Type[ComposedSourceBase]:
        """Get the source class for a given type."""
        return get_composed_source_class(source_type)

element property

The underlying TSComposedSourceElement for pipeline integration.

srcs property

Source pads of the composed element.

create_cli_parser(prog=None, description=None) classmethod

Create CLI argument parser with options for all registered sources.

Use this when you need to add custom arguments to the parser.

Example

parser = DataSource.create_cli_parser() parser.add_argument("--snr-threshold", type=float, default=8.0) source = DataSource.from_parser(parser, name="pipeline")

Returns:

Type Description
ArgumentParser

ArgumentParser configured with --data-source and all source options

Source code in sgnligo/sources/datasource_v2/datasource.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def create_cli_parser(
    cls,
    prog: Optional[str] = None,
    description: Optional[str] = None,
) -> argparse.ArgumentParser:
    """Create CLI argument parser with options for all registered sources.

    Use this when you need to add custom arguments to the parser.

    Example:
        >>> parser = DataSource.create_cli_parser()
        >>> parser.add_argument("--snr-threshold", type=float, default=8.0)
        >>> source = DataSource.from_parser(parser, name="pipeline")

    Returns:
        ArgumentParser configured with --data-source and all source options
    """
    from sgnligo.sources.datasource_v2.cli import build_composed_cli_parser

    return build_composed_cli_parser(prog=prog, description=description)

list_sources() staticmethod

List all available source types.

Source code in sgnligo/sources/datasource_v2/datasource.py
234
235
236
237
@staticmethod
def list_sources() -> List[str]:
    """List all available source types."""
    return list_composed_source_types()

get_source_class(source_type) staticmethod

Get the source class for a given type.

Source code in sgnligo/sources/datasource_v2/datasource.py
239
240
241
242
@staticmethod
def get_source_class(source_type: str) -> Type[ComposedSourceBase]:
    """Get the source class for a given type."""
    return get_composed_source_class(source_type)

CLI Support

sgnligo.sources.datasource_v2.cli

CLI support for composed data sources.

This module provides CLI argument parsing and help generation for the dataclass-based composed source classes.

CLI arguments are defined by mixin classes that sources inherit from. The build_composed_cli_parser() function aggregates CLI arguments from all registered sources by walking their MRO and collecting arguments from mixins.

Example

from sgnligo.sources.datasource_v2.cli import ( ... build_composed_cli_parser, ... check_composed_help_options, ... )

if check_composed_help_options(): ... sys.exit(0)

parser = build_composed_cli_parser() args = parser.parse_args()

build_composed_cli_parser(prog=None, description=None)

Build CLI parser by aggregating arguments from source mixins.

This function walks the MRO of all registered source classes and collects CLI arguments from mixins that implement the CLIMixinProtocol. Duplicate arguments (same arg defined by multiple mixins) raise an error.

Parameters:

Name Type Description Default
prog Optional[str]

Program name for help text

None
description Optional[str]

Description for help text

None

Returns:

Type Description
ArgumentParser

ArgumentParser configured with all source options

Raises:

Type Description
ValueError

If duplicate CLI arguments are detected

Source code in sgnligo/sources/datasource_v2/cli.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def build_composed_cli_parser(
    prog: Optional[str] = None,
    description: Optional[str] = None,
) -> argparse.ArgumentParser:
    """Build CLI parser by aggregating arguments from source mixins.

    This function walks the MRO of all registered source classes and
    collects CLI arguments from mixins that implement the CLIMixinProtocol.
    Duplicate arguments (same arg defined by multiple mixins) raise an error.

    Args:
        prog: Program name for help text
        description: Description for help text

    Returns:
        ArgumentParser configured with all source options

    Raises:
        ValueError: If duplicate CLI arguments are detected
    """
    parser = argparse.ArgumentParser(
        prog=prog,
        description=description or "Process gravitational wave data",
    )

    # Main dispatch option
    source_types = list_composed_source_types()
    parser.add_argument(
        "--data-source",
        required=True,
        choices=source_types,
        help="Type of data source to use",
    )

    # Help options (always available)
    parser.add_argument(
        "--list-sources",
        action="store_true",
        help="List available source types",
    )
    parser.add_argument(
        "--help-source",
        metavar="SOURCE",
        help="Show help for a specific source type",
    )

    # First pass: collect all CLI mixins from all registered sources
    # We need to process mixins with MORE args first (supersets before subsets)
    # to ensure variant mixins like StateVectorOptionsMixin (4 args) are processed
    # before StateVectorOnDictOnlyMixin (3 args), so all args get registered.
    cli_mixins: List[Type] = []
    seen_mixins: Set[Type] = set()

    for _source_type, cls in _COMPOSED_REGISTRY.items():
        for base in cls.__mro__:
            if base in seen_mixins:
                continue

            # Skip if not a CLI mixin (doesn't define add_cli_arguments directly)
            if "add_cli_arguments" not in base.__dict__:
                continue
            if "get_cli_arg_names" not in base.__dict__:
                continue  # pragma: no cover

            # Skip the protocol class itself
            if base is CLIMixinProtocol:
                continue  # pragma: no cover

            cli_mixins.append(base)
            seen_mixins.add(base)

    # Sort mixins by arg count descending - supersets first
    cli_mixins.sort(key=lambda m: len(m.get_cli_arg_names()), reverse=True)

    # Second pass: add arguments, skipping variant mixins with overlapping args.
    # Different sources use different variants of mixins (e.g., one source uses
    # StateVectorOptionsMixin with 4 args, another uses StateVectorOnDictOnlyMixin
    # with 3 args). By processing supersets first, we register all unique args.
    added_args: Set[str] = set()
    for mixin in cli_mixins:
        new_args = mixin.get_cli_arg_names()

        # Skip if this mixin shares any args with an already-processed mixin
        if new_args & added_args:
            continue

        added_args.update(new_args)
        mixin.add_cli_arguments(parser)

    return parser

check_composed_help_options(argv=None)

Check for --list-sources and --help-source before full parsing.

Call this before parse_args() to handle help options that don't require --data-source to be specified.

Parameters:

Name Type Description Default
argv Optional[List[str]]

Command line arguments (defaults to sys.argv[1:])

None

Returns:

Type Description
bool

True if help was handled (caller should exit), False otherwise.

Source code in sgnligo/sources/datasource_v2/cli.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def check_composed_help_options(argv: Optional[List[str]] = None) -> bool:
    """Check for --list-sources and --help-source before full parsing.

    Call this before parse_args() to handle help options that don't require
    --data-source to be specified.

    Args:
        argv: Command line arguments (defaults to sys.argv[1:])

    Returns:
        True if help was handled (caller should exit), False otherwise.
    """
    if argv is None:
        argv = sys.argv[1:]

    if "--list-sources" in argv:
        print(format_composed_source_list())
        return True

    if "--help-source" in argv:
        try:
            idx = argv.index("--help-source")
            source_type = argv[idx + 1]
            if source_type in _COMPOSED_REGISTRY:
                print(format_composed_source_help(source_type))
                return True
            else:
                available = ", ".join(sorted(_COMPOSED_REGISTRY.keys()))
                print(f"Unknown source type '{source_type}'. Available: {available}")
                return True
        except IndexError:
            print("--help-source requires a source type argument")
            return True

    return False

namespace_to_datasource_kwargs(args)

Convert argparse namespace to DataSource kwargs.

This function walks the MRO of the selected source class and calls process_cli_args() on each mixin to convert CLI arguments to field values.

Parameters:

Name Type Description Default
args Namespace

Parsed argparse namespace

required

Returns:

Type Description
Dict[str, Any]

Dict of kwargs for DataSource

Source code in sgnligo/sources/datasource_v2/cli.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def namespace_to_datasource_kwargs(args: argparse.Namespace) -> Dict[str, Any]:
    """Convert argparse namespace to DataSource kwargs.

    This function walks the MRO of the selected source class and calls
    `process_cli_args()` on each mixin to convert CLI arguments to field values.

    Args:
        args: Parsed argparse namespace

    Returns:
        Dict of kwargs for DataSource
    """
    kwargs: Dict[str, Any] = {
        "data_source": args.data_source,
    }

    # Get the source class to walk its MRO for process_cli_args
    source_type = args.data_source
    if source_type in _COMPOSED_REGISTRY:
        cls = _COMPOSED_REGISTRY[source_type]
        processed_classes: Set[Type] = set()

        # Walk MRO and call process_cli_args on each mixin
        for base in cls.__mro__:
            if base in processed_classes:
                continue  # pragma: no cover

            if not hasattr(base, "process_cli_args"):
                continue

            if base is CLIMixinProtocol:
                continue  # pragma: no cover

            mixin_kwargs = base.process_cli_args(args)
            kwargs.update(mixin_kwargs)

            processed_classes.add(base)

    return kwargs

format_composed_source_help(source_type)

Generate detailed help for a specific source type.

Parameters:

Name Type Description Default
source_type str

The source type to show help for

required

Returns:

Type Description
str

Formatted help string

Source code in sgnligo/sources/datasource_v2/cli.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def format_composed_source_help(source_type: str) -> str:
    """Generate detailed help for a specific source type.

    Args:
        source_type: The source type to show help for

    Returns:
        Formatted help string
    """
    cls = get_composed_source_class(source_type)

    lines = [
        f"usage: prog --data-source {source_type} [options]",
        "",
        f"{source_type}: {cls.description}",
        "",
    ]

    # Required fields
    required = get_source_required_fields(cls)
    if required:
        lines.append("Required Options:")
        for name in required:
            cli_name = name.replace("_", "-")
            lines.append(f"  --{cli_name}")
        lines.append("")

    # Optional fields
    optional = get_source_optional_fields(cls)
    if optional:
        lines.append("Optional Options:")
        for name, default in optional.items():
            cli_name = name.replace("_", "-")
            if default is False:
                lines.append(f"  --{cli_name}")
            elif default is not None:
                lines.append(f"  --{cli_name} (default: {default})")
            else:
                lines.append(f"  --{cli_name}")
        lines.append("")

    # Add docstring notes if available
    if cls.__doc__:
        # Extract just the description part (first paragraph)
        doc_lines = cls.__doc__.strip().split("\n\n")[0].split("\n")
        if doc_lines:
            lines.append("Description:")
            for doc_line in doc_lines:
                lines.append(f"  {doc_line.strip()}")

    return "\n".join(lines)

format_composed_source_list()

Generate list of all available sources.

Returns:

Type Description
str

Formatted string listing all sources

Source code in sgnligo/sources/datasource_v2/cli.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def format_composed_source_list() -> str:
    """Generate list of all available sources.

    Returns:
        Formatted string listing all sources
    """
    lines = ["Available data sources:", ""]

    for source_type in sorted(_COMPOSED_REGISTRY.keys()):
        cls = _COMPOSED_REGISTRY[source_type]
        lines.append(f"  {source_type:30} {cls.description}")

    lines.append("")
    lines.append("Use --help-source <name> for detailed options.")
    lines.append("Use --real-time flag with supported sources for real-time mode.")
    return "\n".join(lines)

Registry

sgnligo.sources.datasource_v2.composed_registry

Registry for dataclass-based composed source classes.

This module provides registration and lookup for the new dataclass-based source classes that inherit from ComposedSourceBase.

Example

from sgnligo.sources.datasource_v2.composed_registry import ( ... register_composed_source, ... get_composed_source_class, ... )

@register_composed_source @dataclass class MySource(ComposedSourceBase): ... source_type: ClassVar[str] = "my-source" ... ...

register_composed_source(cls)

Decorator to register a composed source class.

Parameters:

Name Type Description Default
cls Type[ComposedSourceBase]

The composed source class to register

required

Returns:

Type Description
Type[ComposedSourceBase]

The same class (unchanged)

Raises:

Type Description
ValueError

If the source_type is empty or already registered

Example

@register_composed_source @dataclass class WhiteSource(ComposedSourceBase): source_type: ClassVar[str] = "white" ...

Source code in sgnligo/sources/datasource_v2/composed_registry.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def register_composed_source(
    cls: Type[ComposedSourceBase],
) -> Type[ComposedSourceBase]:
    """Decorator to register a composed source class.

    Args:
        cls: The composed source class to register

    Returns:
        The same class (unchanged)

    Raises:
        ValueError: If the source_type is empty or already registered

    Example:
        @register_composed_source
        @dataclass
        class WhiteSource(ComposedSourceBase):
            source_type: ClassVar[str] = "white"
            ...
    """
    source_type = cls.source_type
    if not source_type:
        raise ValueError(f"Class {cls.__name__} must define source_type")
    if source_type in _COMPOSED_REGISTRY:
        raise ValueError(
            f"Source type '{source_type}' is already registered "
            f"(by {_COMPOSED_REGISTRY[source_type].__name__})"
        )
    _COMPOSED_REGISTRY[source_type] = cls
    return cls

get_composed_source_class(source_type)

Get the composed source class for a given type string.

Parameters:

Name Type Description Default
source_type str

The source type identifier

required

Returns:

Type Description
Type[ComposedSourceBase]

The composed source class

Raises:

Type Description
ValueError

If the source type is not registered

Source code in sgnligo/sources/datasource_v2/composed_registry.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_composed_source_class(source_type: str) -> Type[ComposedSourceBase]:
    """Get the composed source class for a given type string.

    Args:
        source_type: The source type identifier

    Returns:
        The composed source class

    Raises:
        ValueError: If the source type is not registered
    """
    if source_type not in _COMPOSED_REGISTRY:
        available = ", ".join(sorted(_COMPOSED_REGISTRY.keys()))
        raise ValueError(f"Unknown source type '{source_type}'. Available: {available}")
    return _COMPOSED_REGISTRY[source_type]

list_composed_source_types()

List all registered composed source types.

Returns:

Type Description
List[str]

Sorted list of registered source type names

Source code in sgnligo/sources/datasource_v2/composed_registry.py
81
82
83
84
85
86
87
def list_composed_source_types() -> List[str]:
    """List all registered composed source types.

    Returns:
        Sorted list of registered source type names
    """
    return sorted(_COMPOSED_REGISTRY.keys())

get_composed_registry()

Get the full composed source registry.

Returns:

Type Description
Dict[str, Type[ComposedSourceBase]]

Dict mapping source type to class

Source code in sgnligo/sources/datasource_v2/composed_registry.py
90
91
92
93
94
95
96
def get_composed_registry() -> Dict[str, Type[ComposedSourceBase]]:
    """Get the full composed source registry.

    Returns:
        Dict mapping source type to class
    """
    return _COMPOSED_REGISTRY.copy()

Source Classes

Fake Sources

sgnligo.sources.datasource_v2.sources.fake

Fake signal source classes (white, sin, impulse).

These sources generate synthetic test signals for pipeline development and testing without requiring real detector data.

Supports both offline (batch) and real-time modes via the real_time flag: - Offline (default): Requires t0 and end (or duration) - Real-time: GPS times optional, generates data synchronized with wall clock

Example

Offline mode (default)

source = WhiteComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... channel_dict={"H1": "FAKE-STRAIN", "L1": "FAKE-STRAIN"}, ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

Real-time mode

source = WhiteComposedSource( ... name="noise", ... ifos=["H1"], ... channel_dict={"H1": "FAKE-STRAIN"}, ... sample_rate=4096, ... real_time=True, ... )

WhiteComposedSource dataclass

Bases: FakeSourceBase


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource[WhiteComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin[GPSOptionsFlexibleMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                



              click sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Gaussian white noise source.

Generates uncorrelated Gaussian white noise for each IFO channel. Useful for basic pipeline testing where spectral characteristics don't matter.

Supports both offline and real-time modes: - Offline (default): Specify t0 and end (or duration) - Real-time: Set real_time=True, GPS times become optional

Example

Offline mode

source = WhiteComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... channel_dict={"H1": "FAKE-STRAIN", "L1": "FAKE-STRAIN"}, ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

Real-time mode

source = WhiteComposedSource( ... name="noise", ... ifos=["H1"], ... channel_dict={"H1": "FAKE-STRAIN"}, ... sample_rate=4096, ... real_time=True, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@register_composed_source
@dataclass(kw_only=True)
class WhiteComposedSource(FakeSourceBase):
    """Gaussian white noise source.

    Generates uncorrelated Gaussian white noise for each IFO channel.
    Useful for basic pipeline testing where spectral characteristics
    don't matter.

    Supports both offline and real-time modes:
    - Offline (default): Specify t0 and end (or duration)
    - Real-time: Set real_time=True, GPS times become optional

    Example:
        >>> # Offline mode
        >>> source = WhiteComposedSource(
        ...     name="noise",
        ...     ifos=["H1", "L1"],
        ...     channel_dict={"H1": "FAKE-STRAIN", "L1": "FAKE-STRAIN"},
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>>
        >>> # Real-time mode
        >>> source = WhiteComposedSource(
        ...     name="noise",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "FAKE-STRAIN"},
        ...     sample_rate=4096,
        ...     real_time=True,
        ... )
    """

    source_type: ClassVar[str] = "white"
    description: ClassVar[str] = "Gaussian white noise"
    signal_type: ClassVar[str] = "white"

SinComposedSource dataclass

Bases: FakeSourceBase


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.SinComposedSource[SinComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin[GPSOptionsFlexibleMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.SinComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                



              click sgnligo.sources.datasource_v2.sources.fake.SinComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.SinComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Sinusoidal test signal source.

Generates a sinusoidal signal for each IFO channel. Useful for testing frequency-domain processing.

Supports both offline and real-time modes.

Example

source = SinComposedSource( ... name="sine", ... ifos=["H1"], ... channel_dict={"H1": "FAKE-STRAIN"}, ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@register_composed_source
@dataclass(kw_only=True)
class SinComposedSource(FakeSourceBase):
    """Sinusoidal test signal source.

    Generates a sinusoidal signal for each IFO channel.
    Useful for testing frequency-domain processing.

    Supports both offline and real-time modes.

    Example:
        >>> source = SinComposedSource(
        ...     name="sine",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "FAKE-STRAIN"},
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
    """

    source_type: ClassVar[str] = "sin"
    description: ClassVar[str] = "Sinusoidal test signal"
    signal_type: ClassVar[str] = "sin"

ImpulseComposedSource dataclass

Bases: FakeSourceBase, ImpulsePositionOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource[ImpulseComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin[GPSOptionsFlexibleMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin[ImpulsePositionOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                

                sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource
                


              click sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin"
            

Impulse test signal source.

Generates an impulse signal (single spike) for each IFO channel. Useful for testing impulse response.

Supports both offline and real-time modes.

Fields inherited from mixins

impulse_position: Sample index for impulse (-1 for random)

Example

source = ImpulseComposedSource( ... name="impulse", ... ifos=["H1"], ... channel_dict={"H1": "FAKE-STRAIN"}, ... sample_rate=4096, ... t0=1000, ... end=1010, ... impulse_position=100, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@register_composed_source
@dataclass(kw_only=True)
class ImpulseComposedSource(FakeSourceBase, ImpulsePositionOptionsMixin):
    """Impulse test signal source.

    Generates an impulse signal (single spike) for each IFO channel.
    Useful for testing impulse response.

    Supports both offline and real-time modes.

    Fields inherited from mixins:
        impulse_position: Sample index for impulse (-1 for random)

    Example:
        >>> source = ImpulseComposedSource(
        ...     name="impulse",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "FAKE-STRAIN"},
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ...     impulse_position=100,
        ... )
    """

    source_type: ClassVar[str] = "impulse"
    description: ClassVar[str] = "Impulse test signal"
    signal_type: ClassVar[str] = "impulse"

GWData Noise Sources

sgnligo.sources.datasource_v2.sources.gwdata_noise

GWData noise composed source class.

Generates colored Gaussian noise with realistic LIGO PSDs, suitable for testing and development without real detector data.

Supports both offline (batch) and real-time modes via the real_time flag: - Offline (default): Requires t0 and end (or duration) - Real-time: GPS times optional, generates data synchronized with wall clock

Example

Offline mode

source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... channel_dict={"H1": "FAKE-STRAIN", "L1": "FAKE-STRAIN"}, ... t0=1000, ... end=1010, ... )

Real-time mode

source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1"], ... channel_dict={"H1": "FAKE-STRAIN"}, ... real_time=True, ... )

GWDataNoiseComposedSource dataclass

Bases: ComposedSourceBase, ChannelOptionsMixin, GPSOptionsFlexibleMixin, StateVectorOnDictOnlyMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource[GWDataNoiseComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin[GPSOptionsFlexibleMixin]
              sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin[StateVectorOnDictOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                


              click sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource href "" "sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Colored Gaussian noise source with optional state vector gating.

Generates colored Gaussian noise with LIGO PSD. Supports both offline and real-time modes, with optional segment-based state vector gating.

Fields inherited from mixins

ifos: List of detector prefixes (from ChannelOptionsMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) t0: GPS start time (from GPSOptionsFlexibleMixin) end: GPS end time (from GPSOptionsFlexibleMixin) duration: Duration in seconds, alternative to end (from GPSOptionsFlexibleMixin) real_time: Enable real-time mode (from GPSOptionsFlexibleMixin) state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin) state_segments_file: State segments file (from StateVectorOnDictOnlyMixin) state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

Offline mode

source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... channel_dict={"H1": "FAKE-STRAIN", "L1": "FAKE-STRAIN"}, ... t0=1000, ... end=1010, ... )

Real-time mode

source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1"], ... channel_dict={"H1": "FAKE-STRAIN"}, ... real_time=True, ... )

Source code in sgnligo/sources/datasource_v2/sources/gwdata_noise.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@register_composed_source
@dataclass(kw_only=True)
class GWDataNoiseComposedSource(
    ComposedSourceBase,
    ChannelOptionsMixin,
    GPSOptionsFlexibleMixin,
    StateVectorOnDictOnlyMixin,
    VerboseOptionsMixin,
):
    """Colored Gaussian noise source with optional state vector gating.

    Generates colored Gaussian noise with LIGO PSD. Supports both offline
    and real-time modes, with optional segment-based state vector gating.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from ChannelOptionsMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        t0: GPS start time (from GPSOptionsFlexibleMixin)
        end: GPS end time (from GPSOptionsFlexibleMixin)
        duration: Duration in seconds, alternative to end (from GPSOptionsFlexibleMixin)
        real_time: Enable real-time mode (from GPSOptionsFlexibleMixin)
        state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin)
        state_segments_file: State segments file (from StateVectorOnDictOnlyMixin)
        state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> # Offline mode
        >>> source = GWDataNoiseComposedSource(
        ...     name="noise",
        ...     ifos=["H1", "L1"],
        ...     channel_dict={"H1": "FAKE-STRAIN", "L1": "FAKE-STRAIN"},
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>>
        >>> # Real-time mode
        >>> source = GWDataNoiseComposedSource(
        ...     name="noise",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "FAKE-STRAIN"},
        ...     real_time=True,
        ... )
    """

    # Class metadata
    source_type: ClassVar[str] = "gwdata-noise"
    description: ClassVar[str] = "Colored Gaussian noise with LIGO PSD"

    def _validate(self) -> None:
        """Validate parameters."""
        # Validate GPS options based on real_time mode
        self._validate_gps_options()

        # Validate channel_dict keys match ifos
        if set(self.channel_dict.keys()) != set(self.ifos):
            raise ValueError("channel_dict keys must match ifos")

        # Validate state segments file
        if self.state_segments_file is not None:
            if not os.path.exists(self.state_segments_file):
                raise ValueError(
                    f"State segments file does not exist: {self.state_segments_file}"
                )

        # Validate state_vector_on_dict
        if self.state_vector_on_dict is not None:
            if set(self.state_vector_on_dict.keys()) != set(self.ifos):
                raise ValueError("state_vector_on_dict keys must match ifos")

    def _load_state_segments(
        self,
        end_time: Optional[float],
    ) -> Tuple[Optional[Tuple[Tuple[int, int], ...]], Optional[Tuple[int, ...]]]:
        """Load state segments from file or create defaults.

        Returns:
            Tuple of (segments, values) where segments is a tuple of (start_ns, end_ns)
            pairs and values is a tuple of state vector values. Returns (None, None)
            if state vector gating is not configured.
        """
        if self.state_vector_on_dict is None:
            return None, None

        if self.state_segments_file is not None:
            state_segments, state_values = read_segments_and_values_from_file(
                self.state_segments_file, self.verbose
            )
        else:
            # Default: single segment covering entire time range with value 3
            if self.t0 is not None:
                start_ns = int(self.t0 * 1e9)
                if end_time is not None:
                    end_ns = int(end_time * 1e9)
                else:
                    # For real-time mode without end time
                    end_ns = int(np.iinfo(np.int32).max * 1e9)
                state_segments = ((start_ns, end_ns),)
                state_values = (3,)  # Default: bits 0 and 1 set
                if self.verbose:
                    print("Using default state segments: single segment with value 3")
            else:
                raise ValueError(
                    "Must provide either state_segments_file or t0 "
                    "when using state vector gating"
                )

        return state_segments, state_values

    def _build(self) -> TSComposedSourceElement:
        """Build the GWData noise source."""
        # Get computed end time (may be calculated from duration)
        end_time = self._get_computed_end()

        # Build full channel names for internal use
        full_channel_dict = {
            ifo: f"{ifo}:{self.channel_dict[ifo]}" for ifo in self.ifos
        }

        # Create the noise source
        noise_source = GWDataNoiseSource(
            name=f"{self.name}_noise",
            channel_dict=full_channel_dict,
            t0=self.t0,
            end=end_time,
            real_time=self.real_time,
            verbose=self.verbose,
        )

        compose = TSCompose()

        # Check if we need state vector gating
        if self.state_vector_on_dict is not None:
            state_segments, state_values = self._load_state_segments(end_time)
            assert state_segments is not None
            assert state_values is not None

            # Determine end time for SegmentSource (doesn't support None)
            seg_end = (
                end_time if end_time is not None else float(np.iinfo(np.int32).max)
            )

            for ifo in self.ifos:
                full_channel = full_channel_dict[ifo]

                # Create segment source for state vector
                state_source = SegmentSource(
                    name=f"{self.name}_{ifo}_state",
                    source_pad_names=("state",),
                    rate=self.state_sample_rate,
                    t0=self.t0,
                    end=seg_end,
                    segments=state_segments,
                    values=state_values,
                )

                gate = add_state_vector_gating(
                    compose=compose,
                    strain_source=noise_source,
                    state_source=state_source,
                    ifo=ifo,
                    bit_mask=self.state_vector_on_dict[ifo],
                    strain_pad=full_channel,
                    state_pad="state",
                    output_pad=full_channel,
                )

                # Add latency tracking if configured
                self._add_latency_tracking(compose, ifo, gate, full_channel)

                if self.verbose:
                    print(
                        f"Added state vector gating for {ifo} with mask "
                        f"{self.state_vector_on_dict[ifo]}"
                    )
        else:
            # No gating - just expose noise source directly
            compose.insert(noise_source)

            # Add latency tracking for each IFO
            for ifo in self.ifos:
                full_channel = full_channel_dict[ifo]
                self._add_latency_tracking(compose, ifo, noise_source, full_channel)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

Frame Sources

sgnligo.sources.datasource_v2.sources.frames

Frame file composed source classes.

These sources read gravitational wave data from GWF frame files, the standard format for LIGO/Virgo data.

Example

source = FramesComposedSource( ... name="data", ... ifos=["H1", "L1"], ... frame_cache="/path/to/frames.cache", ... channel_dict={"H1": "GDS-CALIB_STRAIN", "L1": "GDS-CALIB_STRAIN"}, ... t0=1000000000, ... end=1000000100, ... ) pipeline.connect(source.element, sink)

FramesComposedSource dataclass

Bases: ComposedSourceBase, ChannelOptionsMixin, FrameCacheOptionsMixin, GPSOptionsMixin, SegmentsOptionsMixin, InjectionOptionsMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource[FramesComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin[FrameCacheOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin[InjectionOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                


              click sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource href "" "sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Frame file source for offline analysis.

Reads strain data from GWF frame files specified in a LAL cache file. Supports optional noiseless injections and segment-based gating.

Fields inherited from mixins

ifos: List of detector prefixes (from ChannelOptionsMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) frame_cache: Path to LAL cache file (from FrameCacheOptionsMixin) t0: GPS start time (from GPSOptionsMixin) end: GPS end time (from GPSOptionsMixin) segments_file: Path to LIGO XML segments file (from SegmentsOptionsMixin) segments_name: Segment name in XML (from SegmentsOptionsMixin) noiseless_inj_frame_cache: Injection frame cache (from InjectionOptionsMixin) noiseless_inj_channel_dict: Injection channels (from InjectionOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

source = FramesComposedSource( ... name="data", ... ifos=["H1"], ... frame_cache="/path/to/frames.cache", ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... t0=1000000000, ... end=1000000100, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/frames.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@register_composed_source
@dataclass(kw_only=True)
class FramesComposedSource(
    ComposedSourceBase,
    ChannelOptionsMixin,
    FrameCacheOptionsMixin,
    GPSOptionsMixin,
    SegmentsOptionsMixin,
    InjectionOptionsMixin,
    VerboseOptionsMixin,
):
    """Frame file source for offline analysis.

    Reads strain data from GWF frame files specified in a LAL cache file.
    Supports optional noiseless injections and segment-based gating.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from ChannelOptionsMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        frame_cache: Path to LAL cache file (from FrameCacheOptionsMixin)
        t0: GPS start time (from GPSOptionsMixin)
        end: GPS end time (from GPSOptionsMixin)
        segments_file: Path to LIGO XML segments file (from SegmentsOptionsMixin)
        segments_name: Segment name in XML (from SegmentsOptionsMixin)
        noiseless_inj_frame_cache: Injection frame cache (from InjectionOptionsMixin)
        noiseless_inj_channel_dict: Injection channels (from InjectionOptionsMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> source = FramesComposedSource(
        ...     name="data",
        ...     ifos=["H1"],
        ...     frame_cache="/path/to/frames.cache",
        ...     channel_dict={"H1": "GDS-CALIB_STRAIN"},
        ...     t0=1000000000,
        ...     end=1000000100,
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "frames"
    description: ClassVar[str] = "Read from GWF frame files"

    def _validate(self) -> None:
        """Validate parameters."""
        if self.t0 >= self.end:
            raise ValueError("t0 must be less than end")

        # Validate frame cache
        if not os.path.exists(self.frame_cache):
            raise ValueError(f"Frame cache file does not exist: {self.frame_cache}")

        # Validate channel_dict
        if set(self.channel_dict.keys()) != set(self.ifos):
            raise ValueError("channel_dict keys must match ifos")

        # Validate segments options
        if self.segments_file is not None:
            if self.segments_name is None:
                raise ValueError("Must specify segments_name when segments_file is set")
            if not os.path.exists(self.segments_file):
                raise ValueError(f"Segments file does not exist: {self.segments_file}")

        # Validate injection options
        if self.noiseless_inj_frame_cache is not None:
            if not os.path.exists(self.noiseless_inj_frame_cache):
                raise ValueError(
                    f"Injection frame cache does not exist: "
                    f"{self.noiseless_inj_frame_cache}"
                )
            if self.noiseless_inj_channel_dict is None:
                raise ValueError(
                    "Must specify noiseless_inj_channel_dict when "
                    "noiseless_inj_frame_cache is set"
                )

    def _load_segments(self) -> Optional[Dict[str, List]]:
        """Load and process segments from XML file."""
        if self.segments_file is None or self.segments_name is None:
            return None

        loaded_segments = ligolw_segments.segmenttable_get_by_name(
            ligolw_utils.load_filename(
                self.segments_file,
                contenthandler=ligolw_segments.LIGOLWContentHandler,
            ),
            self.segments_name,
        ).coalesce()

        # Clip to requested time range
        seg = segments.segment(LIGOTimeGPS(self.t0), LIGOTimeGPS(self.end))
        clipped_segments = segments.segmentlistdict(
            (ifo, seglist & segments.segmentlist([seg]))
            for ifo, seglist in loaded_segments.items()
        )

        # Convert to nanoseconds
        segments_dict = {}
        for ifo, segs in clipped_segments.items():
            segments_dict[ifo] = [segments.segment(s[0].ns(), s[1].ns()) for s in segs]
        return segments_dict

    def _build(self) -> TSComposedSourceElement:
        """Build the frame file source."""
        compose = TSCompose()
        segments_dict = self._load_segments()

        # Determine sample rate from first frame reader (will be set after creation)
        sample_rate = None

        for ifo in self.ifos:
            channel_name = f"{ifo}:{self.channel_dict[ifo]}"

            # Create main frame reader
            frame_reader = FrameReader(
                name=f"{self.name}_{ifo}_frames",
                framecache=self.frame_cache,
                channel_names=[channel_name],
                instrument=ifo,
                t0=self.t0,
                end=self.end,
            )

            # Get sample rate from first frame reader
            if sample_rate is None:
                sample_rate = next(iter(frame_reader.rates.values()))

            # Track the current output element and pad for this IFO
            current_source = frame_reader
            current_pad = channel_name

            # Add injection if configured
            if self.noiseless_inj_frame_cache and self.noiseless_inj_channel_dict:
                if ifo in self.noiseless_inj_channel_dict:
                    inj_channel = f"{ifo}:{self.noiseless_inj_channel_dict[ifo]}"

                    inj_reader = FrameReader(
                        name=f"{self.name}_{ifo}_inj",
                        framecache=self.noiseless_inj_frame_cache,
                        channel_names=[inj_channel],
                        instrument=ifo,
                        t0=self.t0,
                        end=self.end,
                    )

                    # Add frames together
                    adder = Adder(
                        name=f"{self.name}_{ifo}_add",
                        sink_pad_names=("frame", "inj"),
                        source_pad_names=(channel_name,),
                    )

                    compose.connect(
                        frame_reader,
                        adder,
                        link_map={"frame": channel_name},
                    )
                    compose.connect(
                        inj_reader,
                        adder,
                        link_map={"inj": inj_channel},
                    )

                    current_source = adder
                    current_pad = channel_name

                    if self.verbose:
                        print(f"Added injection for {ifo} from {inj_channel}")
            else:
                # No injection - just insert the frame reader
                compose.insert(frame_reader)

            # Add segment gating if configured
            if segments_dict is not None and ifo in segments_dict:
                ifo_segments = segments_dict[ifo]

                if ifo_segments:  # Only add gating if there are segments
                    seg_source = SegmentSource(
                        name=f"{self.name}_{ifo}_seg",
                        source_pad_names=("control",),
                        rate=sample_rate,
                        t0=self.t0,
                        end=self.end,
                        segments=ifo_segments,
                    )

                    gate = Gate(
                        name=f"{self.name}_{ifo}_gate",
                        sink_pad_names=("strain", "control"),
                        control="control",
                        source_pad_names=(channel_name,),
                    )

                    compose.connect(
                        current_source,
                        gate,
                        link_map={"strain": current_pad},
                    )
                    compose.connect(
                        seg_source,
                        gate,
                        link_map={"control": "control"},
                    )

                    current_source = gate
                    current_pad = channel_name

                    if self.verbose:
                        print(f"Added segment gating for {ifo}")

            # Add latency tracking if configured
            self._add_latency_tracking(compose, ifo, current_source, current_pad)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

DevShm Sources

sgnligo.sources.datasource_v2.sources.devshm

Shared memory (devshm) composed source classes.

These sources read low-latency data from shared memory for online gravitational wave analysis.

Example

source = DevShmComposedSource( ... name="low_latency", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"}, ... state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"}, ... state_vector_on_dict={"H1": 3}, ... ) pipeline.connect(source.element, sink)

DevShmComposedSource dataclass

Bases: ComposedSourceBase, ChannelOptionsMixin, DevShmOptionsMixin, QueueTimeoutOptionsMixin, StateVectorOptionsMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource[DevShmComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin[DevShmOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin[QueueTimeoutOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin[StateVectorOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                


              click sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource href "" "sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Shared memory source with state vector gating.

Reads low-latency strain data from shared memory and applies state vector gating to ensure only valid data is processed.

Fields inherited from mixins

ifos: List of detector prefixes (from ChannelOptionsMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) shared_memory_dict: Dict mapping IFO to shm path (from DevShmOptionsMixin) discont_wait_time: Discontinuity wait time (from DevShmOptionsMixin) queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin) state_channel_dict: Dict mapping IFO to state vector channel (from StateVectorOptionsMixin) state_vector_on_dict: Dict mapping IFO to bitmask (from StateVectorOptionsMixin) state_segments_file: Path to state segments file (from StateVectorOptionsMixin) state_sample_rate: State vector sample rate (from StateVectorOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Note

state_channel_dict and state_vector_on_dict are required for DevShm sources (validation will fail if not provided).

Example

source = DevShmComposedSource( ... name="low_latency", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"}, ... state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"}, ... state_vector_on_dict={"H1": 3}, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/devshm.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@register_composed_source
@dataclass(kw_only=True)
class DevShmComposedSource(
    ComposedSourceBase,
    ChannelOptionsMixin,
    DevShmOptionsMixin,
    QueueTimeoutOptionsMixin,
    StateVectorOptionsMixin,
    VerboseOptionsMixin,
):
    """Shared memory source with state vector gating.

    Reads low-latency strain data from shared memory and applies state
    vector gating to ensure only valid data is processed.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from ChannelOptionsMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        shared_memory_dict: Dict mapping IFO to shm path (from DevShmOptionsMixin)
        discont_wait_time: Discontinuity wait time (from DevShmOptionsMixin)
        queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin)
        state_channel_dict: Dict mapping IFO to state vector channel
            (from StateVectorOptionsMixin)
        state_vector_on_dict: Dict mapping IFO to bitmask
            (from StateVectorOptionsMixin)
        state_segments_file: Path to state segments file (from StateVectorOptionsMixin)
        state_sample_rate: State vector sample rate (from StateVectorOptionsMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Note:
        state_channel_dict and state_vector_on_dict are required for DevShm sources
        (validation will fail if not provided).

    Example:
        >>> source = DevShmComposedSource(
        ...     name="low_latency",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "GDS-CALIB_STRAIN"},
        ...     shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"},
        ...     state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"},
        ...     state_vector_on_dict={"H1": 3},
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "devshm"
    description: ClassVar[str] = "Read from shared memory"

    def _validate(self) -> None:
        """Validate parameters."""
        ifos_set = set(self.ifos)

        # Validate channel_dict
        if set(self.channel_dict.keys()) != ifos_set:
            raise ValueError("channel_dict keys must match ifos")

        # Validate shared_memory_dict
        if set(self.shared_memory_dict.keys()) != ifos_set:
            raise ValueError("shared_memory_dict keys must match ifos")

        # Validate state_channel_dict (required for devshm)
        if self.state_channel_dict is None:
            raise ValueError("state_channel_dict is required for DevShm sources")
        if set(self.state_channel_dict.keys()) != ifos_set:
            raise ValueError("state_channel_dict keys must match ifos")

        # Validate state_vector_on_dict (required for devshm)
        if self.state_vector_on_dict is None:
            raise ValueError("state_vector_on_dict is required for DevShm sources")
        if set(self.state_vector_on_dict.keys()) != ifos_set:
            raise ValueError("state_vector_on_dict keys must match ifos")

    def _build(self) -> TSComposedSourceElement:
        """Build the shared memory source with state vector gating."""
        # These are validated as required in _validate()
        assert self.state_channel_dict is not None
        assert self.state_vector_on_dict is not None

        # Build channel names for DevShmSource
        # DevShmSource expects: {ifo: [strain_channel, state_channel]}
        channel_names = {}
        for ifo in self.ifos:
            strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
            state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"
            channel_names[ifo] = [strain_channel, state_channel]

        # Create the shared memory source
        devshm = DevShmSource(
            name=f"{self.name}_devshm",
            channel_names=channel_names,
            shared_memory_dirs=self.shared_memory_dict,
            discont_wait_time=self.discont_wait_time,
            queue_timeout=self.queue_timeout,
            verbose=self.verbose,
        )

        compose = TSCompose()

        # Add state vector gating for each IFO
        for ifo in self.ifos:
            strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
            state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"

            gate = add_state_vector_gating(
                compose=compose,
                strain_source=devshm,
                state_source=devshm,
                ifo=ifo,
                bit_mask=self.state_vector_on_dict[ifo],
                strain_pad=strain_channel,
                state_pad=state_channel,
                output_pad=strain_channel,
            )

            # Add latency tracking if configured
            self._add_latency_tracking(compose, ifo, gate, strain_channel)

            if self.verbose:
                print(
                    f"Added state vector gating for {ifo} with mask "
                    f"{self.state_vector_on_dict[ifo]}"
                )

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

Arrakis Sources

sgnligo.sources.datasource_v2.sources.arrakis

Arrakis composed source classes.

These sources read streaming data via Arrakis for online gravitational wave analysis.

Example

source = ArrakisComposedSource( ... name="kafka_data", ... ifos=["H1", "L1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN", "L1": "GDS-CALIB_STRAIN"}, ... ) pipeline.connect(source.element, sink)

ArrakisComposedSource dataclass

Bases: ComposedSourceBase, ChannelOptionsMixin, GPSOptionsFlexibleMixin, QueueTimeoutOptionsMixin, StateVectorOptionsMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource[ArrakisComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin[GPSOptionsFlexibleMixin]
              sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin[QueueTimeoutOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin[StateVectorOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                


              click sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource href "" "sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsFlexibleMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Arrakis source for streaming data.

Reads streaming gravitational wave data from topics. Optionally supports state vector gating.

This source defaults to real-time mode. GPS times are optional.

Fields inherited from mixins

ifos: List of detector prefixes (from ChannelOptionsMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) t0: GPS start time (optional, from GPSOptionsFlexibleMixin) end: GPS end time (optional, from GPSOptionsFlexibleMixin) duration: Duration in seconds (optional, from GPSOptionsFlexibleMixin) real_time: Enable real-time mode (default: True, from GPSOptionsFlexibleMixin) queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin) state_channel_dict: State channel dict (from StateVectorOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOptionsMixin) state_segments_file: State segments file (from StateVectorOptionsMixin) state_sample_rate: State vector sample rate (from StateVectorOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

source = ArrakisComposedSource( ... name="kafka_data", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/arrakis.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@register_composed_source
@dataclass(kw_only=True)
class ArrakisComposedSource(
    ComposedSourceBase,
    ChannelOptionsMixin,
    GPSOptionsFlexibleMixin,
    QueueTimeoutOptionsMixin,
    StateVectorOptionsMixin,
    VerboseOptionsMixin,
):
    """Arrakis source for streaming data.

    Reads streaming gravitational wave data from topics.
    Optionally supports state vector gating.

    This source defaults to real-time mode. GPS times are optional.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from ChannelOptionsMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        t0: GPS start time (optional, from GPSOptionsFlexibleMixin)
        end: GPS end time (optional, from GPSOptionsFlexibleMixin)
        duration: Duration in seconds (optional, from GPSOptionsFlexibleMixin)
        real_time: Enable real-time mode (default: True, from GPSOptionsFlexibleMixin)
        queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin)
        state_channel_dict: State channel dict (from StateVectorOptionsMixin)
        state_vector_on_dict: Bitmask dict (from StateVectorOptionsMixin)
        state_segments_file: State segments file (from StateVectorOptionsMixin)
        state_sample_rate: State vector sample rate (from StateVectorOptionsMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> source = ArrakisComposedSource(
        ...     name="kafka_data",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "GDS-CALIB_STRAIN"},
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "arrakis"
    description: ClassVar[str] = "Read from Arrakis"

    # Override real_time default to True for streaming source
    real_time: bool = True

    def _validate(self) -> None:
        """Validate parameters."""
        ifos_set = set(self.ifos)

        # Validate channel_dict
        if set(self.channel_dict.keys()) != ifos_set:
            raise ValueError("channel_dict keys must match ifos")

        # Validate time range if both provided
        if self.t0 is not None and self.end is not None and self.t0 >= self.end:
            raise ValueError("t0 must be less than end")

        # Validate state vector options
        if self.state_channel_dict is not None:
            if set(self.state_channel_dict.keys()) != ifos_set:
                raise ValueError("state_channel_dict keys must match ifos")
            if self.state_vector_on_dict is None:
                raise ValueError(
                    "Must specify state_vector_on_dict when state_channel_dict is set"
                )

        if self.state_vector_on_dict is not None:
            if set(self.state_vector_on_dict.keys()) != ifos_set:
                raise ValueError("state_vector_on_dict keys must match ifos")
            if self.state_channel_dict is None:
                raise ValueError(
                    "Must specify state_channel_dict when state_vector_on_dict is set"
                )

    def _build(self) -> TSComposedSourceElement:
        """Build the Arrakis source."""
        # Check if state vector gating is enabled
        use_state_vector = (
            self.state_channel_dict is not None
            and self.state_vector_on_dict is not None
        )

        # Build channel names list for ArrakisSource
        channel_names = []
        for ifo in self.ifos:
            strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
            channel_names.append(strain_channel)

            if use_state_vector:
                assert self.state_channel_dict is not None  # for type checker
                state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"
                channel_names.append(state_channel)

        # Calculate duration from end time (handles duration field too)
        end_time = self._get_computed_end()
        duration = None
        if self.t0 is not None and end_time is not None:
            duration = end_time - self.t0

        # Create the Arrakis source
        arrakis = ArrakisSource(
            name=f"{self.name}_arrakis",
            source_pad_names=channel_names,
            start_time=self.t0,
            duration=duration,
            in_queue_timeout=int(self.queue_timeout),
        )

        compose = TSCompose()

        if use_state_vector:
            # Add state vector gating for each IFO
            assert self.state_channel_dict is not None  # for type checker
            assert self.state_vector_on_dict is not None  # for type checker
            for ifo in self.ifos:
                strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
                state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"

                gate = add_state_vector_gating(
                    compose=compose,
                    strain_source=arrakis,
                    state_source=arrakis,
                    ifo=ifo,
                    bit_mask=self.state_vector_on_dict[ifo],
                    strain_pad=strain_channel,
                    state_pad=state_channel,
                    output_pad=strain_channel,
                )

                # Add latency tracking if configured
                self._add_latency_tracking(compose, ifo, gate, strain_channel)

                if self.verbose:
                    print(
                        f"Added state vector gating for {ifo} with mask "
                        f"{self.state_vector_on_dict[ifo]}"
                    )
        else:
            # No gating - just expose Arrakis source directly
            compose.insert(arrakis)

            # Add latency tracking for each IFO
            for ifo in self.ifos:
                strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
                self._add_latency_tracking(compose, ifo, arrakis, strain_channel)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

Base Classes

sgnligo.sources.composed_base

Base class for composed source elements.

This module provides an abstract base class for creating composed source elements that combine multiple internal elements into a single source. Subclasses define their parameters as dataclass fields and implement _build() to wire up the internal elements.

Usage with pipelines

Composed sources wrap a TSComposedSourceElement internally. To use them with Pipeline.connect(), access the inner element via the .element property:

pipeline = Pipeline() pipeline.connect(source.element, sink)

Example

from dataclasses import dataclass from sgnligo.sources.composed_base import ComposedSourceBase

@dataclass ... class MySource(ComposedSourceBase): ... source_type = "my-source" ... description = "My custom source" ... ... ifos: list[str] ... sample_rate: int ... t0: float ... end: float ... ... def _build(self): ... compose = TSCompose() ... # ... wire up elements ... return compose.as_source(name=self.name)

source = MySource(name="test", ifos=["H1"], sample_rate=4096, t0=0, end=10) print(source.srcs) # Access source pads

ComposedSourceBase dataclass

Abstract base class for composed source elements.

Subclasses define their parameters as dataclass fields and implement _build() to create the internal composed element. Composition happens automatically in post_init.

The resulting object behaves like a TSComposedSourceElement - it has .srcs for source pads and can be connected to pipelines via pipeline.connect(source, downstream).

Class Attributes

source_type: String identifier for registry (e.g., "white", "frames"). Leave empty if the source should not be registered. description: Human-readable description for help text and documentation.

Example

from dataclasses import dataclass from typing import ClassVar, List from sgnts.compose import TSCompose, TSComposedSourceElement from sgnts.sources import FakeSeriesSource

@dataclass(kw_only=True) ... class WhiteSource(ComposedSourceBase): ... source_type: ClassVar[str] = "white" ... description: ClassVar[str] = "Gaussian white noise" ... ... ifos: List[str] ... sample_rate: int ... t0: float ... end: float ... ... def build(self) -> TSComposedSourceElement: ... compose = TSCompose() ... for ifo in self.ifos: ... fake = FakeSeriesSource( ... name=f"{self.name}{ifo}", ... source_pad_names=(f"{ifo}:STRAIN",), ... rate=self.sample_rate, ... t0=self.t0, ... end=self.end, ... signal_type="white", ... ) ... compose.insert(fake) ... return compose.as_source(name=self.name)

source = WhiteSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... ) print(list(source.srcs.keys())) ['H1:STRAIN', 'L1:STRAIN']

Source code in sgnligo/sources/composed_base.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@dataclass(kw_only=True)
class ComposedSourceBase:
    """Abstract base class for composed source elements.

    Subclasses define their parameters as dataclass fields and implement
    _build() to create the internal composed element. Composition happens
    automatically in __post_init__.

    The resulting object behaves like a TSComposedSourceElement - it has
    .srcs for source pads and can be connected to pipelines via
    pipeline.connect(source, downstream).

    Class Attributes:
        source_type: String identifier for registry (e.g., "white", "frames").
            Leave empty if the source should not be registered.
        description: Human-readable description for help text and documentation.

    Example:
        >>> from dataclasses import dataclass
        >>> from typing import ClassVar, List
        >>> from sgnts.compose import TSCompose, TSComposedSourceElement
        >>> from sgnts.sources import FakeSeriesSource
        >>>
        >>> @dataclass(kw_only=True)
        ... class WhiteSource(ComposedSourceBase):
        ...     source_type: ClassVar[str] = "white"
        ...     description: ClassVar[str] = "Gaussian white noise"
        ...
        ...     ifos: List[str]
        ...     sample_rate: int
        ...     t0: float
        ...     end: float
        ...
        ...     def _build(self) -> TSComposedSourceElement:
        ...         compose = TSCompose()
        ...         for ifo in self.ifos:
        ...             fake = FakeSeriesSource(
        ...                 name=f"{self.name}_{ifo}",
        ...                 source_pad_names=(f"{ifo}:STRAIN",),
        ...                 rate=self.sample_rate,
        ...                 t0=self.t0,
        ...                 end=self.end,
        ...                 signal_type="white",
        ...             )
        ...             compose.insert(fake)
        ...         return compose.as_source(name=self.name)
        >>>
        >>> source = WhiteSource(
        ...     name="noise",
        ...     ifos=["H1", "L1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>> print(list(source.srcs.keys()))
        ['H1:STRAIN', 'L1:STRAIN']
    """

    # Required for all composed sources
    name: str

    # Optional latency tracking (interval in seconds, None = disabled)
    latency_interval: Optional[float] = None

    # Internal composed element (built in __post_init__)
    _composed: TSComposedSourceElement = field(init=False, repr=False)

    # Internal: pads to expose even when internally linked (for latency multilink)
    _also_expose_pads: list[str] = field(init=False, repr=False, default_factory=list)

    # Class-level metadata for registry and CLI
    # Subclasses should override these
    source_type: ClassVar[str] = ""
    description: ClassVar[str] = ""

    def __post_init__(self) -> None:
        """Validate parameters and build the composed element."""
        self._also_expose_pads = []  # Reset before build
        self._validate()
        self._composed = self._build()

    def _validate(self) -> None:
        """Override to add validation logic. Called before _build().

        Raise ValueError with descriptive message if validation fails.

        Example:
            def _validate(self) -> None:
                if self.t0 >= self.end:
                    raise ValueError("t0 must be less than end")
        """
        pass

    def _add_latency_tracking(
        self,
        compose: TSCompose,
        ifo: str,
        strain_source_element: Any,
        strain_pad_name: str,
    ) -> None:
        """Add latency tracking element for a single IFO.

        Call this in _build() after creating each strain source to add
        latency tracking. The strain source pad is connected directly to
        the Latency element. Since SGN supports multilink (one source pad
        to multiple sinks), the strain pad is also registered to be exposed
        externally via `also_expose_source_pads`.

        The latency output will appear as an additional source pad named
        "{ifo}_latency".

        Args:
            compose: The TSCompose object being built
            ifo: IFO name (e.g., "H1")
            strain_source_element: The source element producing strain data
            strain_pad_name: The pad name on the source element to tap

        Example:
            def _build(self) -> TSComposedSourceElement:
                compose = TSCompose()
                for ifo in self.ifos:
                    source = FakeSeriesSource(...)
                    compose.insert(source)
                    self._add_latency_tracking(compose, ifo, source, ifo)
                return compose.as_source(
                    name=self.name,
                    also_expose_source_pads=self._also_expose_pads,
                )
        """
        if self.latency_interval is None:
            return

        latency = Latency(
            name=f"{self.name}_{ifo}_latency",
            sink_pad_names=("data",),
            source_pad_names=(f"{ifo}_latency",),
            route=f"{ifo}_datasource_latency",
            interval=self.latency_interval,
        )

        # Connect strain source directly to latency element
        compose.connect(
            strain_source_element,
            latency,
            link_map={"data": strain_pad_name},
        )

        # Register the strain pad to be exposed externally (multilink pattern)
        # Format: "element_name:src:pad_name"
        pad_full_name = f"{strain_source_element.name}:src:{strain_pad_name}"
        self._also_expose_pads.append(pad_full_name)

    @abstractmethod
    def _build(self) -> TSComposedSourceElement:
        """Build and return the composed element.

        This method wires up the internal elements using TSCompose
        and returns the result of compose.as_source(name=self.name).

        Returns:
            TSComposedSourceElement with source pads for downstream connection
        """
        ...

    # --- CLI argument support ---

    @classmethod
    def add_cli_arguments(cls, parser: argparse.ArgumentParser) -> None:
        """Add CLI arguments for latency tracking."""
        parser.add_argument(
            "--source-latency-interval",
            type=float,
            metavar="SECONDS",
            default=None,
            help="Enable source latency tracking with specified interval in seconds",
        )

    @classmethod
    def get_cli_arg_names(cls) -> Set[str]:
        """Return set of CLI argument names defined by this class."""
        return {"source_latency_interval"}

    @classmethod
    def process_cli_args(cls, args: argparse.Namespace) -> Dict[str, Any]:
        """Convert CLI args to field values."""
        result: Dict[str, Any] = {}
        source_latency_interval = getattr(args, "source_latency_interval", None)
        if source_latency_interval is not None:
            result["latency_interval"] = source_latency_interval
        return result

    # --- Delegate to inner composed element ---

    @property
    def element(self) -> TSComposedSourceElement:
        """The underlying TSComposedSourceElement for pipeline integration.

        Use this when passing to Pipeline.connect() or other SGN operations
        that require a proper element type.

        Returns:
            The inner composed element

        Example:
            >>> pipeline = Pipeline()
            >>> pipeline.connect(source.element, sink)
        """
        return self._composed

    @property
    def srcs(self) -> Dict[str, Any]:
        """Source pads of the composed element.

        Returns:
            Dictionary mapping pad names to source pad objects
        """
        return self._composed.srcs

    def __getattr__(self, name: str) -> Any:
        """Delegate unknown attributes to the inner composed element.

        This allows composed sources to be used anywhere a TSComposedSourceElement
        is expected, supporting any additional methods or properties.
        """
        # Avoid infinite recursion for private attributes
        if name.startswith("_"):
            raise AttributeError(
                f"'{type(self).__name__}' object has no attribute '{name}'"
            )
        return getattr(self._composed, name)

element property

The underlying TSComposedSourceElement for pipeline integration.

Use this when passing to Pipeline.connect() or other SGN operations that require a proper element type.

Returns:

Type Description
TSComposedSourceElement

The inner composed element

Example

pipeline = Pipeline() pipeline.connect(source.element, sink)

srcs property

Source pads of the composed element.

Returns:

Type Description
Dict[str, Any]

Dictionary mapping pad names to source pad objects

__getattr__(name)

Delegate unknown attributes to the inner composed element.

This allows composed sources to be used anywhere a TSComposedSourceElement is expected, supporting any additional methods or properties.

Source code in sgnligo/sources/composed_base.py
268
269
270
271
272
273
274
275
276
277
278
279
def __getattr__(self, name: str) -> Any:
    """Delegate unknown attributes to the inner composed element.

    This allows composed sources to be used anywhere a TSComposedSourceElement
    is expected, supporting any additional methods or properties.
    """
    # Avoid infinite recursion for private attributes
    if name.startswith("_"):
        raise AttributeError(
            f"'{type(self).__name__}' object has no attribute '{name}'"
        )
    return getattr(self._composed, name)

__post_init__()

Validate parameters and build the composed element.

Source code in sgnligo/sources/composed_base.py
125
126
127
128
129
def __post_init__(self) -> None:
    """Validate parameters and build the composed element."""
    self._also_expose_pads = []  # Reset before build
    self._validate()
    self._composed = self._build()

add_cli_arguments(parser) classmethod

Add CLI arguments for latency tracking.

Source code in sgnligo/sources/composed_base.py
216
217
218
219
220
221
222
223
224
225
@classmethod
def add_cli_arguments(cls, parser: argparse.ArgumentParser) -> None:
    """Add CLI arguments for latency tracking."""
    parser.add_argument(
        "--source-latency-interval",
        type=float,
        metavar="SECONDS",
        default=None,
        help="Enable source latency tracking with specified interval in seconds",
    )

get_cli_arg_names() classmethod

Return set of CLI argument names defined by this class.

Source code in sgnligo/sources/composed_base.py
227
228
229
230
@classmethod
def get_cli_arg_names(cls) -> Set[str]:
    """Return set of CLI argument names defined by this class."""
    return {"source_latency_interval"}

process_cli_args(args) classmethod

Convert CLI args to field values.

Source code in sgnligo/sources/composed_base.py
232
233
234
235
236
237
238
239
@classmethod
def process_cli_args(cls, args: argparse.Namespace) -> Dict[str, Any]:
    """Convert CLI args to field values."""
    result: Dict[str, Any] = {}
    source_latency_interval = getattr(args, "source_latency_interval", None)
    if source_latency_interval is not None:
        result["latency_interval"] = source_latency_interval
    return result

Utilities

sgnligo.sources.datasource_v2.sources.utils

Utility functions for composed sources.

This module contains reusable building blocks that are shared across multiple composed source classes.

add_state_vector_gating(compose, strain_source, state_source, ifo, bit_mask, strain_pad, state_pad, output_pad)

Add BitMask + Gate to a compose for state vector gating.

This is the common pattern used by devshm, arrakis, and gwdata-noise sources. It applies a bitmask to the state vector channel, then uses a Gate to control the strain data based on the masked state vector.

The pattern is

strain_source[strain_pad] ─────────────────┐ ├─> Gate[output_pad] state_source[state_pad] -> BitMask[state] ─┘

Parameters:

Name Type Description Default
compose TSCompose

TSCompose to add elements to (modified in-place)

required
strain_source

Source element providing strain data

required
state_source

Source element providing state vector data

required
ifo str

Interferometer prefix (e.g., "H1")

required
bit_mask int

Bitmask to apply to state vector

required
strain_pad str

Name of the strain output pad on strain_source

required
state_pad str

Name of the state vector output pad on state_source

required
output_pad str

Name for the gated output pad

required

Returns:

Type Description
Gate

The Gate element for downstream use (e.g., latency tracking)

Source code in sgnligo/sources/datasource_v2/sources/utils.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def add_state_vector_gating(
    compose: TSCompose,
    strain_source,
    state_source,
    ifo: str,
    bit_mask: int,
    strain_pad: str,
    state_pad: str,
    output_pad: str,
) -> Gate:
    """Add BitMask + Gate to a compose for state vector gating.

    This is the common pattern used by devshm, arrakis, and gwdata-noise sources.
    It applies a bitmask to the state vector channel, then uses a Gate to
    control the strain data based on the masked state vector.

    The pattern is:
        strain_source[strain_pad] ─────────────────┐
                                                   ├─> Gate[output_pad]
        state_source[state_pad] -> BitMask[state] ─┘

    Args:
        compose: TSCompose to add elements to (modified in-place)
        strain_source: Source element providing strain data
        state_source: Source element providing state vector data
        ifo: Interferometer prefix (e.g., "H1")
        bit_mask: Bitmask to apply to state vector
        strain_pad: Name of the strain output pad on strain_source
        state_pad: Name of the state vector output pad on state_source
        output_pad: Name for the gated output pad

    Returns:
        The Gate element for downstream use (e.g., latency tracking)
    """
    # Create BitMask to filter state vector
    mask = BitMask(
        name=f"{ifo}_Mask",
        sink_pad_names=("state",),
        source_pad_names=("state",),
        bit_mask=bit_mask,
    )

    # Create Gate to control strain based on masked state vector
    gate = Gate(
        name=f"{ifo}_Gate",
        sink_pad_names=("strain", "state_vector"),
        control="state_vector",
        source_pad_names=(output_pad,),
    )

    # Connect state_source -> BitMask
    compose.connect(
        state_source,
        mask,
        link_map={"state": state_pad},
    )

    # Connect BitMask -> Gate.state_vector
    compose.connect(
        mask,
        gate,
        link_map={"state_vector": "state"},
    )

    # Connect strain_source -> Gate.strain
    compose.connect(
        strain_source,
        gate,
        link_map={"strain": strain_pad},
    )

    return gate