Skip to content

Frames

DataFrame dataclass

Bases: Generic[_S_co]

Shape-bound dataframe whose operations are type checked

Source code in typol/frame.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
@dataclasses.dataclass(init=False, eq=False, frozen=True)
class DataFrame(Generic[_S_co]):
    """Shape-bound dataframe whose operations are type checked"""

    shape: type[_S_co]
    dataframe: pl.DataFrame

    @overload
    def __init__(
        self,
        of: type[_S_co],
        /,
        values: Iterable[Mapping[str, Any]]
        | Iterable[tuple]
        | Iterable[ColumnInitializer[_S_co, Any]]
        | Mapping[BoundDimension[_S_co, Any], Iterable]
        | tuple[ColumnInitializer[_S_co, Any], ...]
        | pl.DataFrame
        | None = None,
    ) -> None: ...
    @overload
    def __init__(
        self, of: type[_S_co], /, values: Iterable[tuple], *, orient: Literal["row", "col"] = ...
    ) -> None: ...

    def __init__(
        self,
        shape: type[_S_co],
        /,
        values: Iterable[Mapping[str, Any]]
        | Iterable[tuple]
        | Iterable[ColumnInitializer[_S_co, Any]]
        | Mapping[BoundDimension[_S_co, Any], Iterable]
        | pl.DataFrame
        | None = None,
        *,
        orient: Literal["row", "col"] | None = None,
    ) -> None:
        meta = shape.shape_meta()
        if isinstance(values, pl.DataFrame):
            df = enforce_shape(shape, values)
        elif isinstance(values, Mapping):
            df = pl.DataFrame(
                {k.name if isinstance(k, BoundDimension) else k: vs for k, vs in values.items()},
                schema=meta.schema,
            )
        elif isinstance(values, Iterable):
            iterator = iter(values)
            first = more_itertools.first(iterator, None)
            if first is None:
                df = pl.DataFrame(schema=meta.schema)
            elif isinstance(first, ColumnInitializer):
                initializers = cast(Iterable[ColumnInitializer[_S_co, Any]], values)
                df = pl.DataFrame(
                    {i.dimension.name: i.value for i in initializers}, schema=meta.schema
                )
            elif isinstance(first, Mapping) and type(first) is not dict:
                values = cast(Iterable[Mapping], more_itertools.prepend(first, iterator))
                df = pl.DataFrame((dict(v) for v in values), schema=meta.schema)
            else:
                df = pl.DataFrame(
                    values
                    if isinstance(values, Collection)
                    else more_itertools.prepend(first, iterator),
                    schema=meta.schema,
                    orient=orient,
                )
        elif values is None:
            df = pl.DataFrame(schema=meta.schema)
        else:
            assert_never(values)
        object.__setattr__(self, "shape", shape)
        object.__setattr__(self, "dataframe", df)

    @property
    def s(self) -> _S_co:
        return cast(_S_co, self.shape)

    def __getitem__[T](self, s: ExoExpr[_S_co, T] | ExoAggExpr[_S_co, T]) -> Series[T]:
        """Construct a series of the value of expr `s` for each row in the frame"""
        if not isinstance(s, BoundDimension):
            return Series[T](self.dataframe.select(s.expr).to_series())
        return self.get_column(s)

    def get_column[T](self, s: BoundDimension[_S_co, T]) -> Series[T]:
        return Series[T](self.dataframe[s.name])

    def head(self, n: int = 5) -> DataFrame[_S_co]:
        return DataFrame(self.shape, self.dataframe.head(n))

    def slice(self, offset: int, length: int | None = None) -> DataFrame[_S_co]:
        return DataFrame(self.shape, self.dataframe.slice(offset, length))

    def lazy(self) -> LazyFrame[_S_co]:
        # Lazy import to avoid circular dependencies
        from typol.lazy import LazyFrame  # noqa: PLC0415,RUF100

        return LazyFrame(self.shape, self.dataframe.lazy())

    def collect(self, streaming: bool = False) -> DataFrame[_S_co]:
        return self

    def filter(self, *condition: ExoExpr[_S_co, bool]) -> DataFrame[_S_co]:
        """Only keep rows where the boolean conditions evaluate to `True`"""
        return DataFrame(self.shape, self.dataframe.filter(*(c.expr for c in condition)))

    def with_columns(
        self, *columns: EndoExpr[_S_co, Any] | BoundSeries[_S_co, Any]
    ) -> DataFrame[_S_co]:
        """Use the provided expressions to update existing columns in the shape"""
        return DataFrame(self.shape, self.dataframe.with_columns(c.expr for c in columns))

    def transform[SNew: Shape](
        self, shape: type[SNew], *transforms: Expr[_S_co, SNew, Any]
    ) -> DataFrame[SNew]:
        """
        Convert from one shape to another shape, using the provided expressions to map columns in
        the current shape to columns in the new shape:

        - Any columns with the same name in both the current and new shapes without an expression
          mapping to them will be preserved
        - Any columns in the new shape not in the original shape, and not mapped to, will throw a
          runtime error
        - Any columns in the current shape not in the new shape will be dropped
        """
        return self.lazy().transform(shape, *transforms).collect()

    def agg(self, *agg: EndoAggExpr[_S_co, Any]) -> DataFrame[_S_co]:
        """
        Define the aggregating expressions to group rows in the dataframe. Any columns not
        aggregated will be treated as the group by keys, since all columns must be preserved. To
        drop columns instead use `transform` before `agg`
        """
        return self.lazy().agg(*agg).collect()

    def group_by(self, *keys: EndoExpr[_S_co, Any]) -> GroupBy[_S_co, _S_co]:
        """
        Determine a series of expressions to group the dataframe by, this should be followed by an
        agg to apply aggregations to the grouped frame
        """
        return GroupBy(self.shape, self.dataframe.group_by(*(k.expr for k in keys)))

    def agg_transform[Q: Shape](
        self, shape: type[Q], *agg: AggExpr[_S_co, Q, Any] | Expr[_S_co, Q, Any]
    ) -> DataFrame[Q]:
        """
        Define the aggregating expressions to group rows in the dataframe. Any columns not
        aggregated will be treated as the group by keys, since all columns must be preserved. To
        drop columns instead use `transform` before `agg`.

        This allows transforming the aggregated columns since aggregation may change types
        """
        return self.lazy().agg_transform(shape, *agg).collect()

    def group_by_transform[Q: Shape](
        self, shape: type[Q], *keys: Expr[_S_co, Q, Any]
    ) -> GroupBy[_S_co, Q]:
        """
        Determine a series of expressions to group the dataframe by, this should be followed by an
        agg to apply aggregations to the grouped frame
        """
        return GroupBy(shape, self.dataframe.group_by(*(k.expr for k in keys)))

    def explode(self, *explosions: Explosion[_S_co, _S_co, Any]) -> DataFrame[_S_co]:
        """
        Take a series of list columns and create a new row for each value in the list:

        ```
        accounts.explode(
            Account.link_name.implode().over(Account.type).list.explode_to(Account.link_name)
        )
        ```

        The above will create a new row for every linked name from any account for the same type
        """
        return self.lazy().explode(*explosions).collect()

    def explode_transform[Q: Shape](
        self, shape: type[Q], *explosions: Explosion[_S_co, Q, Any] | Expr[_S_co, Q, Any]
    ) -> DataFrame[Q]:
        """Take a series of list columns and create a new row for each value in the list"""
        return self.lazy().explode_transform(shape, *explosions).collect()

    def vstack(self, other: DataFrame[_S_co]) -> DataFrame[_S_co]:
        # We know these share the same shape, but we don't know the order of the columns matches.
        # We select the columns to reorder them to match in the vstack
        return DataFrame(
            self.shape, self.dataframe.vstack(other.dataframe.select(self.dataframe.columns))
        )

    @classmethod
    def concat(cls, shape: type[_S_co], frames: Iterable[Self]) -> DataFrame[_S_co]:
        return LazyFrame.concat(shape, map(DataFrame.lazy, frames)).collect()

    def rechunk(self) -> DataFrame[_S_co]:
        return DataFrame(self.shape, self.dataframe.rechunk())

    def unique(
        self,
        *exprs: BoundDimension[_S_co, Any],
        keep: Literal["first", "last", "none", "any"] = "any",
        maintain_order: bool = False,
    ) -> DataFrame[_S_co]:
        return self.lazy().unique(*exprs, keep=keep, maintain_order=maintain_order).collect()

    def sort(
        self,
        *exprs: ExoExpr[_S_co, Any],
        descending: tuple[bool, ...] | bool = False,
        nulls_last: tuple[bool, ...] | bool = False,
        maintain_order: bool = False,
    ) -> DataFrame[_S_co]:
        return (
            self.lazy()
            .sort(
                *exprs, descending=descending, nulls_last=nulls_last, maintain_order=maintain_order
            )
            .collect()
        )

    def iter_raw(self) -> Iterator[tuple[Any, ...]]:
        """Yield each row of the frame as a tuple of values. Use `iter_rows` for well-typed access"""
        return self.dataframe.iter_rows(named=False)

    def iter_dicts(self) -> Iterator[dict[str, Any]]:
        """
        Yield each row of the frame as a dictionary of column name to value. Use `iter_rows` for
        well-typed access
        """
        return self.dataframe.iter_rows(named=True)

    def to_dicts(self) -> list[dict[str, Any]]:
        """
        Return a list of the rows of the frame as a dictionary of column name to value. This is
        particularly useful for debugging for getting a Python object out of a Polars frame
        """
        return self.dataframe.to_dicts()

    def iter_rows(self) -> Iterator[Row[_S_co]]:
        """
        Yield a shape-typed `Row` for each row in the frame. Access to fields of these rows can
        be done in a well typed manner, using `row[S.column]`, which will have the right output type
        """
        return map(Row, self.dataframe.iter_rows(named=True))

    def is_empty(self) -> bool:
        return self.dataframe.is_empty()

    def __len__(self) -> int:
        """The number of rows in the dataframe"""
        return len(self.dataframe)

    def equals(self, other: DataFrame[_S_co]) -> bool:
        return self.dataframe.equals(other.dataframe)

    def suffix(self, suffixed: type[Suffixed[_S_co]] | None = None) -> DataFrame[Suffixed[_S_co]]:
        return self.lazy().suffix(suffixed).collect()

    def gather_every(self, n: int, offset: int = 0) -> DataFrame[_S_co]:
        return DataFrame(self.shape, self.dataframe.gather_every(n, offset))

    def shift(self, n: int) -> DataFrame[_S_co]:
        return DataFrame(self.shape, self.dataframe.shift(n))

    @overload
    def glimpse(self, *, return_type: Literal["string"]) -> str: ...
    @overload
    def glimpse(self, *, return_type: Literal["frame"]) -> pl.DataFrame: ...
    @overload
    def glimpse(self, *, return_type: None = None) -> None: ...

    def glimpse(
        self, *, return_type: Literal["string", "frame"] | None = None
    ) -> str | pl.DataFrame | None:
        """Print a dense preview DataFrame"""
        return self.dataframe.glimpse(return_type=return_type)

    def pipe[**P, T](
        self, function: Callable[Concatenate[Self, P], T], *args: P.args, **kwargs: P.kwargs
    ) -> T:
        return function(self, *args, **kwargs)

    @classmethod
    def read_csv(
        cls,
        shape: type[_S_co],
        source: IO[str] | str | bytes | Path | IO[bytes],
        mappings: Mapping[BoundDimension[_S_co, Any], str] | None = None,
        *,
        has_header: bool = True,
        skip_rows: int = 0,
    ) -> Self:
        if not has_header:
            return cls(
                shape,
                pl.read_csv(
                    source,
                    schema=shape.shape_meta().schema,
                    has_header=False,
                    ignore_errors=True,
                    truncate_ragged_lines=True,
                    try_parse_dates=True,
                    skip_rows=skip_rows,
                    infer_schema=False,
                ),
            )
        if mappings is None:
            # The default is to match up the columns from the file with the dimensions from the
            # shape alphanumerically case-insenstively
            headers_only = pl.read_csv(source, infer_schema_length=0, skip_rows=skip_rows, n_rows=0)
            columns = {re.sub(r"[\W_]", "", h).lower(): h for h in headers_only.columns}
            mappings = {
                d: columns[re.sub(r"[\W_]", "", d.name).lower()]
                for d in shape.shape_meta().dimensions
            }

        return cls(
            shape,
            pl.read_csv(
                source,
                columns=list(mappings.values()),
                schema_overrides={mappings[d]: d.ty.pl_ty for d in shape.shape_meta().dimensions},
                has_header=True,
                ignore_errors=True,
                truncate_ragged_lines=True,
                try_parse_dates=True,
                skip_rows=skip_rows,
                infer_schema=False,
            ).rename({mappings[d]: d.name for d in shape.shape_meta().dimensions}),
        )

    @overload
    def write_csv(
        self,
        sink: None = None,
        mappings: Mapping[BoundDimension[_S_co, Any], str]
        | Sequence[BoundDimension[_S_co, Any]]
        | None = None,
        *,
        include_header: bool = True,
        null_marker: str | None = None,
        quote_style: CsvQuoteStyle | None = None,
        float_scientific: bool | None = None,
        float_precision: int | None = None,
        line_terminator: str = "\n",
    ) -> str: ...
    @overload
    def write_csv(
        self,
        sink: IO[str] | str | Path | IO[bytes],
        mappings: Mapping[BoundDimension[_S_co, Any], str]
        | Sequence[BoundDimension[_S_co, Any]]
        | None = None,
        *,
        include_header: bool = True,
        null_marker: str | None = None,
        quote_style: CsvQuoteStyle | None = None,
        float_scientific: bool | None = None,
        float_precision: int | None = None,
        line_terminator: str = "\n",
    ) -> None: ...

    def write_csv(
        self,
        sink: IO[str] | str | Path | IO[bytes] | None = None,
        mappings: Mapping[BoundDimension[_S_co, Any], str]
        | Sequence[BoundDimension[_S_co, Any]]
        | None = None,
        *,
        include_header: bool = True,
        null_marker: str | None = None,
        quote_style: CsvQuoteStyle | None = None,
        float_scientific: bool | None = None,
        float_precision: int | None = None,
        line_terminator: str = "\n",
    ) -> str | None:
        """
        Output the dataframe to a file. By default, this uses the column names in the Shape. Use
        `mappings` to select which columns to output in order and rename them, e.g.

        ```
        {
            Account.name: "Account Name",
            Account.broker: "Broker"
        }
        ```

        or just provide a sequence of relevant columns in order: `(Account.name, Account.broker)`
        """
        dataframe = self.dataframe
        if isinstance(mappings, Sequence):
            dataframe = dataframe.select(c.name for c in mappings)
        elif isinstance(mappings, Mapping):
            dataframe = cast(
                pl.DataFrame,
                dataframe.lazy()
                .rename({d.name: h for d, h in mappings.items()})
                .select(mappings.values())
                .collect(background=False),
            )

        return dataframe.write_csv(
            sink,
            include_header=include_header,
            null_value=null_marker,
            quote_style=quote_style,
            float_scientific=float_scientific,
            float_precision=float_precision,
            line_terminator=line_terminator,
        )

    def write_csv_of(
        self,
        sink: IO[str] | str | Path | IO[bytes],
        *exprs: ExoExpr[_S_co, Any],
        include_header: bool = True,
        null_marker: str | None = None,
        quote_style: CsvQuoteStyle | None = None,
        float_scientific: bool | None = None,
        float_precision: int | None = None,
        line_terminator: str = "\n",
    ) -> None:
        """
        Output the given expressions to a CSV. This is useful to apply a final transformation to the
        dataframe (e.g. for formatting or tidying up), without having to define a new `Shape`.

        The columns will be named based on the source shape unless renamed, use `to_out("...")` to
        rename the columns to an arbitrary value:

        ```
        df.write_csv_of(
            output_path,
            Account.balance.round(5).to_out("balance"),
            Account.closed.dt.strftime("%d/%m/%Y").to_out("Closed On")
            Account.code,  # column will just be "code"
        )
        """
        dataframe = self.dataframe.select(e.expr for e in exprs)

        dataframe.write_csv(
            sink,
            include_header=include_header,
            null_value=null_marker,
            quote_style=quote_style,
            float_scientific=float_scientific,
            float_precision=float_precision,
            line_terminator=line_terminator,
        )

    def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True) -> PolarsDataFrame:
        """Deprecated compatibility with the Dataframe Interchange Protocol"""
        return self.dataframe.__dataframe__(nan_as_null, allow_copy)  # ty: ignore[deprecated]

    def join_asof[Q: Shape](
        self,
        right: DataFrame[Q],
        on: ExoExpr[_S_co | Q, Any] | JoinOn[_S_co, Q, Any],
        strategy: Literal["backward", "forward", "nearest"] = "backward",
    ) -> DataFrame[Intersection[_S_co, Q]]:
        """
        Join two tables into a common shape, by nearest

        Parameters
        ----------
        on : BoundDimension[S, _]
            Join on the same columns for the left and the right shapes based on the joint shape.
            The column must be available in both original shapes
        """
        joined = self.dataframe.join_asof(
            right.dataframe,
            left_on=(on.left if isinstance(on, JoinOn) else on).expr,
            right_on=(on.right if isinstance(on, JoinOn) else on).expr,
            strategy=strategy,
        )
        return DataFrame["Intersection[_S_co, Q]"](self.shape & right.shape, joined)

    def join[Q: Shape](
        self,
        right: DataFrame[Q],
        *on: ExoExpr[_S_co | Q, Any] | JoinOn[_S_co, Q, Any],
        how: Literal["inner", "left", "right", "full", "semi", "anti", "cross", "outer"] = "inner",
    ) -> DataFrame[Intersection[_S_co, Q]]:
        """
        Join two tables into a common shape

        Parameters
        ----------
        on : BoundDimension[S, _]
            Join on the same columns for the left and the right shapes based on the joint shape.
            The column must be available in both original shapes
        """
        return self.lazy().join(right.lazy(), *on, how=how).collect()

__dataframe__(nan_as_null=False, allow_copy=True)

Deprecated compatibility with the Dataframe Interchange Protocol

Source code in typol/frame.py
def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True) -> PolarsDataFrame:
    """Deprecated compatibility with the Dataframe Interchange Protocol"""
    return self.dataframe.__dataframe__(nan_as_null, allow_copy)  # ty: ignore[deprecated]

__getitem__(s)

Construct a series of the value of expr s for each row in the frame

Source code in typol/frame.py
def __getitem__[T](self, s: ExoExpr[_S_co, T] | ExoAggExpr[_S_co, T]) -> Series[T]:
    """Construct a series of the value of expr `s` for each row in the frame"""
    if not isinstance(s, BoundDimension):
        return Series[T](self.dataframe.select(s.expr).to_series())
    return self.get_column(s)

__len__()

The number of rows in the dataframe

Source code in typol/frame.py
def __len__(self) -> int:
    """The number of rows in the dataframe"""
    return len(self.dataframe)

agg(*agg)

Define the aggregating expressions to group rows in the dataframe. Any columns not aggregated will be treated as the group by keys, since all columns must be preserved. To drop columns instead use transform before agg

Source code in typol/frame.py
def agg(self, *agg: EndoAggExpr[_S_co, Any]) -> DataFrame[_S_co]:
    """
    Define the aggregating expressions to group rows in the dataframe. Any columns not
    aggregated will be treated as the group by keys, since all columns must be preserved. To
    drop columns instead use `transform` before `agg`
    """
    return self.lazy().agg(*agg).collect()

agg_transform(shape, *agg)

Define the aggregating expressions to group rows in the dataframe. Any columns not aggregated will be treated as the group by keys, since all columns must be preserved. To drop columns instead use transform before agg.

This allows transforming the aggregated columns since aggregation may change types

Source code in typol/frame.py
def agg_transform[Q: Shape](
    self, shape: type[Q], *agg: AggExpr[_S_co, Q, Any] | Expr[_S_co, Q, Any]
) -> DataFrame[Q]:
    """
    Define the aggregating expressions to group rows in the dataframe. Any columns not
    aggregated will be treated as the group by keys, since all columns must be preserved. To
    drop columns instead use `transform` before `agg`.

    This allows transforming the aggregated columns since aggregation may change types
    """
    return self.lazy().agg_transform(shape, *agg).collect()

explode(*explosions)

Take a series of list columns and create a new row for each value in the list:

accounts.explode(
    Account.link_name.implode().over(Account.type).list.explode_to(Account.link_name)
)

The above will create a new row for every linked name from any account for the same type

Source code in typol/frame.py
def explode(self, *explosions: Explosion[_S_co, _S_co, Any]) -> DataFrame[_S_co]:
    """
    Take a series of list columns and create a new row for each value in the list:

    ```
    accounts.explode(
        Account.link_name.implode().over(Account.type).list.explode_to(Account.link_name)
    )
    ```

    The above will create a new row for every linked name from any account for the same type
    """
    return self.lazy().explode(*explosions).collect()

explode_transform(shape, *explosions)

Take a series of list columns and create a new row for each value in the list

Source code in typol/frame.py
def explode_transform[Q: Shape](
    self, shape: type[Q], *explosions: Explosion[_S_co, Q, Any] | Expr[_S_co, Q, Any]
) -> DataFrame[Q]:
    """Take a series of list columns and create a new row for each value in the list"""
    return self.lazy().explode_transform(shape, *explosions).collect()

filter(*condition)

Only keep rows where the boolean conditions evaluate to True

Source code in typol/frame.py
def filter(self, *condition: ExoExpr[_S_co, bool]) -> DataFrame[_S_co]:
    """Only keep rows where the boolean conditions evaluate to `True`"""
    return DataFrame(self.shape, self.dataframe.filter(*(c.expr for c in condition)))

glimpse(*, return_type=None)

glimpse(*, return_type: Literal['string']) -> str
glimpse(*, return_type: Literal['frame']) -> pl.DataFrame
glimpse(*, return_type: None = None) -> None

Print a dense preview DataFrame

Source code in typol/frame.py
def glimpse(
    self, *, return_type: Literal["string", "frame"] | None = None
) -> str | pl.DataFrame | None:
    """Print a dense preview DataFrame"""
    return self.dataframe.glimpse(return_type=return_type)

group_by(*keys)

Determine a series of expressions to group the dataframe by, this should be followed by an agg to apply aggregations to the grouped frame

Source code in typol/frame.py
def group_by(self, *keys: EndoExpr[_S_co, Any]) -> GroupBy[_S_co, _S_co]:
    """
    Determine a series of expressions to group the dataframe by, this should be followed by an
    agg to apply aggregations to the grouped frame
    """
    return GroupBy(self.shape, self.dataframe.group_by(*(k.expr for k in keys)))

group_by_transform(shape, *keys)

Determine a series of expressions to group the dataframe by, this should be followed by an agg to apply aggregations to the grouped frame

Source code in typol/frame.py
def group_by_transform[Q: Shape](
    self, shape: type[Q], *keys: Expr[_S_co, Q, Any]
) -> GroupBy[_S_co, Q]:
    """
    Determine a series of expressions to group the dataframe by, this should be followed by an
    agg to apply aggregations to the grouped frame
    """
    return GroupBy(shape, self.dataframe.group_by(*(k.expr for k in keys)))

iter_dicts()

Yield each row of the frame as a dictionary of column name to value. Use iter_rows for well-typed access

Source code in typol/frame.py
def iter_dicts(self) -> Iterator[dict[str, Any]]:
    """
    Yield each row of the frame as a dictionary of column name to value. Use `iter_rows` for
    well-typed access
    """
    return self.dataframe.iter_rows(named=True)

iter_raw()

Yield each row of the frame as a tuple of values. Use iter_rows for well-typed access

Source code in typol/frame.py
def iter_raw(self) -> Iterator[tuple[Any, ...]]:
    """Yield each row of the frame as a tuple of values. Use `iter_rows` for well-typed access"""
    return self.dataframe.iter_rows(named=False)

iter_rows()

Yield a shape-typed Row for each row in the frame. Access to fields of these rows can be done in a well typed manner, using row[S.column], which will have the right output type

Source code in typol/frame.py
def iter_rows(self) -> Iterator[Row[_S_co]]:
    """
    Yield a shape-typed `Row` for each row in the frame. Access to fields of these rows can
    be done in a well typed manner, using `row[S.column]`, which will have the right output type
    """
    return map(Row, self.dataframe.iter_rows(named=True))

join(right, *on, how='inner')

Join two tables into a common shape

Parameters

on : BoundDimension[S, _] Join on the same columns for the left and the right shapes based on the joint shape. The column must be available in both original shapes

Source code in typol/frame.py
def join[Q: Shape](
    self,
    right: DataFrame[Q],
    *on: ExoExpr[_S_co | Q, Any] | JoinOn[_S_co, Q, Any],
    how: Literal["inner", "left", "right", "full", "semi", "anti", "cross", "outer"] = "inner",
) -> DataFrame[Intersection[_S_co, Q]]:
    """
    Join two tables into a common shape

    Parameters
    ----------
    on : BoundDimension[S, _]
        Join on the same columns for the left and the right shapes based on the joint shape.
        The column must be available in both original shapes
    """
    return self.lazy().join(right.lazy(), *on, how=how).collect()

join_asof(right, on, strategy='backward')

Join two tables into a common shape, by nearest

Parameters

on : BoundDimension[S, _] Join on the same columns for the left and the right shapes based on the joint shape. The column must be available in both original shapes

Source code in typol/frame.py
def join_asof[Q: Shape](
    self,
    right: DataFrame[Q],
    on: ExoExpr[_S_co | Q, Any] | JoinOn[_S_co, Q, Any],
    strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> DataFrame[Intersection[_S_co, Q]]:
    """
    Join two tables into a common shape, by nearest

    Parameters
    ----------
    on : BoundDimension[S, _]
        Join on the same columns for the left and the right shapes based on the joint shape.
        The column must be available in both original shapes
    """
    joined = self.dataframe.join_asof(
        right.dataframe,
        left_on=(on.left if isinstance(on, JoinOn) else on).expr,
        right_on=(on.right if isinstance(on, JoinOn) else on).expr,
        strategy=strategy,
    )
    return DataFrame["Intersection[_S_co, Q]"](self.shape & right.shape, joined)

to_dicts()

Return a list of the rows of the frame as a dictionary of column name to value. This is particularly useful for debugging for getting a Python object out of a Polars frame

Source code in typol/frame.py
def to_dicts(self) -> list[dict[str, Any]]:
    """
    Return a list of the rows of the frame as a dictionary of column name to value. This is
    particularly useful for debugging for getting a Python object out of a Polars frame
    """
    return self.dataframe.to_dicts()

transform(shape, *transforms)

Convert from one shape to another shape, using the provided expressions to map columns in the current shape to columns in the new shape:

  • Any columns with the same name in both the current and new shapes without an expression mapping to them will be preserved
  • Any columns in the new shape not in the original shape, and not mapped to, will throw a runtime error
  • Any columns in the current shape not in the new shape will be dropped
Source code in typol/frame.py
def transform[SNew: Shape](
    self, shape: type[SNew], *transforms: Expr[_S_co, SNew, Any]
) -> DataFrame[SNew]:
    """
    Convert from one shape to another shape, using the provided expressions to map columns in
    the current shape to columns in the new shape:

    - Any columns with the same name in both the current and new shapes without an expression
      mapping to them will be preserved
    - Any columns in the new shape not in the original shape, and not mapped to, will throw a
      runtime error
    - Any columns in the current shape not in the new shape will be dropped
    """
    return self.lazy().transform(shape, *transforms).collect()

with_columns(*columns)

Use the provided expressions to update existing columns in the shape

Source code in typol/frame.py
def with_columns(
    self, *columns: EndoExpr[_S_co, Any] | BoundSeries[_S_co, Any]
) -> DataFrame[_S_co]:
    """Use the provided expressions to update existing columns in the shape"""
    return DataFrame(self.shape, self.dataframe.with_columns(c.expr for c in columns))

write_csv(sink=None, mappings=None, *, include_header=True, null_marker=None, quote_style=None, float_scientific=None, float_precision=None, line_terminator='\n')

write_csv(
    sink: None = None,
    mappings: Mapping[BoundDimension[_S_co, Any], str]
    | Sequence[BoundDimension[_S_co, Any]]
    | None = None,
    *,
    include_header: bool = True,
    null_marker: str | None = None,
    quote_style: CsvQuoteStyle | None = None,
    float_scientific: bool | None = None,
    float_precision: int | None = None,
    line_terminator: str = "\n",
) -> str
write_csv(
    sink: IO[str] | str | Path | IO[bytes],
    mappings: Mapping[BoundDimension[_S_co, Any], str]
    | Sequence[BoundDimension[_S_co, Any]]
    | None = None,
    *,
    include_header: bool = True,
    null_marker: str | None = None,
    quote_style: CsvQuoteStyle | None = None,
    float_scientific: bool | None = None,
    float_precision: int | None = None,
    line_terminator: str = "\n",
) -> None

Output the dataframe to a file. By default, this uses the column names in the Shape. Use mappings to select which columns to output in order and rename them, e.g.

{
    Account.name: "Account Name",
    Account.broker: "Broker"
}

or just provide a sequence of relevant columns in order: (Account.name, Account.broker)

Source code in typol/frame.py
def write_csv(
    self,
    sink: IO[str] | str | Path | IO[bytes] | None = None,
    mappings: Mapping[BoundDimension[_S_co, Any], str]
    | Sequence[BoundDimension[_S_co, Any]]
    | None = None,
    *,
    include_header: bool = True,
    null_marker: str | None = None,
    quote_style: CsvQuoteStyle | None = None,
    float_scientific: bool | None = None,
    float_precision: int | None = None,
    line_terminator: str = "\n",
) -> str | None:
    """
    Output the dataframe to a file. By default, this uses the column names in the Shape. Use
    `mappings` to select which columns to output in order and rename them, e.g.

    ```
    {
        Account.name: "Account Name",
        Account.broker: "Broker"
    }
    ```

    or just provide a sequence of relevant columns in order: `(Account.name, Account.broker)`
    """
    dataframe = self.dataframe
    if isinstance(mappings, Sequence):
        dataframe = dataframe.select(c.name for c in mappings)
    elif isinstance(mappings, Mapping):
        dataframe = cast(
            pl.DataFrame,
            dataframe.lazy()
            .rename({d.name: h for d, h in mappings.items()})
            .select(mappings.values())
            .collect(background=False),
        )

    return dataframe.write_csv(
        sink,
        include_header=include_header,
        null_value=null_marker,
        quote_style=quote_style,
        float_scientific=float_scientific,
        float_precision=float_precision,
        line_terminator=line_terminator,
    )

write_csv_of(sink, *exprs, include_header=True, null_marker=None, quote_style=None, float_scientific=None, float_precision=None, line_terminator='\n')

Output the given expressions to a CSV. This is useful to apply a final transformation to the dataframe (e.g. for formatting or tidying up), without having to define a new Shape.

The columns will be named based on the source shape unless renamed, use to_out("...") to rename the columns to an arbitrary value:

``` df.write_csv_of( output_path, Account.balance.round(5).to_out("balance"), Account.closed.dt.strftime("%d/%m/%Y").to_out("Closed On") Account.code, # column will just be "code" )

Source code in typol/frame.py
def write_csv_of(
    self,
    sink: IO[str] | str | Path | IO[bytes],
    *exprs: ExoExpr[_S_co, Any],
    include_header: bool = True,
    null_marker: str | None = None,
    quote_style: CsvQuoteStyle | None = None,
    float_scientific: bool | None = None,
    float_precision: int | None = None,
    line_terminator: str = "\n",
) -> None:
    """
    Output the given expressions to a CSV. This is useful to apply a final transformation to the
    dataframe (e.g. for formatting or tidying up), without having to define a new `Shape`.

    The columns will be named based on the source shape unless renamed, use `to_out("...")` to
    rename the columns to an arbitrary value:

    ```
    df.write_csv_of(
        output_path,
        Account.balance.round(5).to_out("balance"),
        Account.closed.dt.strftime("%d/%m/%Y").to_out("Closed On")
        Account.code,  # column will just be "code"
    )
    """
    dataframe = self.dataframe.select(e.expr for e in exprs)

    dataframe.write_csv(
        sink,
        include_header=include_header,
        null_value=null_marker,
        quote_style=quote_style,
        float_scientific=float_scientific,
        float_precision=float_precision,
        line_terminator=line_terminator,
    )

GroupBy dataclass

Source code in typol/frame.py
@dataclasses.dataclass
class GroupBy[S: Shape, Q: Shape]:
    shape: type[Q]
    group_by: pl.dataframe.frame.GroupBy

    def agg(self, *agg: AggExpr[S, Q, Any]) -> DataFrame[Q]:
        """Define the aggregating expressions to group rows in the dataframe"""
        return DataFrame(self.shape, self.group_by.agg(*(e.expr for e in agg)))

agg(*agg)

Define the aggregating expressions to group rows in the dataframe

Source code in typol/frame.py
def agg(self, *agg: AggExpr[S, Q, Any]) -> DataFrame[Q]:
    """Define the aggregating expressions to group rows in the dataframe"""
    return DataFrame(self.shape, self.group_by.agg(*(e.expr for e in agg)))

enforce_shape(shape, dataframe)

enforce_shape(
    shape: type[S], dataframe: pl.DataFrame
) -> pl.DataFrame
enforce_shape(
    shape: type[S], dataframe: pl.LazyFrame
) -> pl.LazyFrame

Select the relevant columns from the Polars frame and strict cast them to ensure they are typed correctly. This is effectively to project-and-assert shape

Source code in typol/frame.py
def enforce_shape[S: Shape](
    shape: type[S], dataframe: pl.DataFrame | pl.LazyFrame
) -> pl.DataFrame | pl.LazyFrame:
    """
    Select the relevant columns from the Polars frame and strict cast them to ensure they are
    typed correctly. This is effectively to project-and-assert `shape`
    """
    return dataframe.select(
        pl.col(d).cast(t, strict=True) for d, t in shape.shape_meta().datatypes.items()
    )

LazyFrame dataclass

Bases: Generic[_S_co]

Shape-bound dataframe whose operations are type checked

Source code in typol/lazy.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
@dataclasses.dataclass(init=False, eq=False, frozen=True)
class LazyFrame(Generic[_S_co]):
    """Shape-bound dataframe whose operations are type checked"""

    shape: type[_S_co]
    dataframe: pl.LazyFrame

    @overload
    def __init__(
        self,
        of: type[_S_co],
        /,
        values: Iterable[Mapping[str, Any]]
        | Iterable[ColumnInitializer[_S_co, Any]]
        | Mapping[BoundDimension[_S_co, Any], Iterable]
        | Iterable[tuple]
        | pl.LazyFrame
        | None = None,
    ) -> None: ...
    @overload
    def __init__(
        self, of: type[_S_co], /, values: Iterable[tuple], *, orient: Literal["row", "col"] = ...
    ) -> None: ...

    def __init__(
        self,
        shape: type[_S_co],
        /,
        values: Iterable[Mapping[str, Any]]
        | Iterable[tuple]
        | Iterable[ColumnInitializer[_S_co, Any]]
        | Mapping[BoundDimension[_S_co, Any], Iterable]
        | pl.LazyFrame
        | None = None,
        *,
        orient: Literal["row", "col"] | None = None,
    ) -> None:
        meta = shape.shape_meta()
        if isinstance(values, pl.LazyFrame):
            df = enforce_shape(shape, values)
        elif isinstance(values, tuple) and isinstance(values[0], ColumnInitializer):
            initializers = cast(tuple[ColumnInitializer[_S_co, Any], ...], values)
            df = pl.LazyFrame({i.dimension.name: i.value for i in initializers}, schema=meta.schema)
        elif isinstance(values, Mapping):
            df = pl.LazyFrame(
                {k.name if isinstance(k, BoundDimension) else k: vs for k, vs in values.items()},
                schema=meta.schema,
            )
        elif isinstance(values, Iterable):
            iterator = iter(values)
            first = more_itertools.first(iterator, None)
            if first is None:
                df = pl.LazyFrame(schema=meta.schema)
            elif isinstance(first, ColumnInitializer):
                initializers = cast(Iterable[ColumnInitializer[_S_co, Any]], values)
                df = pl.LazyFrame(
                    {i.dimension.name: i.value for i in initializers}, schema=meta.schema
                )
            elif isinstance(first, Mapping) and type(first) is not dict:
                values = cast(Iterable[Mapping], more_itertools.prepend(first, iterator))
                df = pl.LazyFrame((dict(r) for r in values), schema=meta.schema)
            else:
                df = pl.LazyFrame(
                    values
                    if isinstance(values, Collection)
                    else more_itertools.prepend(first, iterator),
                    schema=meta.schema,
                    orient=orient,
                )
        elif values is None:
            df = pl.LazyFrame(schema=meta.schema)
        else:
            assert_never(values)
        object.__setattr__(self, "dataframe", df)
        object.__setattr__(self, "shape", shape)

    @property
    def s(self) -> _S_co:
        """
        Provides a utility alias for accessing dataframe shape columns and attributes

        As `Shape`s only have class-level operations, pretending this is an instance is equivalent
        enough for direct usage of attributes (dimensions and `shape_meta`). To see the shape as a
        shape-type, use `.shape`. This works around ty's limitation of resolving `Unknown` for
        `type[S & Q]` (ty's fine with `type[S] & type[Q]`), by lowering to the instance level
        """
        return cast(_S_co, self.shape)

    def __getitem__[T](self, s: ExoExpr[_S_co, T]) -> LazySeries[T]:
        """Construct a lazy series of values transformed by `s` for each row in the frame"""
        return LazySeries[T](self.dataframe.select(s.expr.alias("series")))

    def get_column[T](self, s: BoundDimension[_S_co, T]) -> LazySeries[T]:
        return self.__getitem__(s)

    def head(self, n: int = 5) -> LazyFrame[_S_co]:
        return LazyFrame(self.shape, self.dataframe.head(n))

    def slice(self, offset: int, length: int | None = None) -> LazyFrame[_S_co]:
        return LazyFrame(self.shape, self.dataframe.slice(offset, length))

    def lazy(self) -> LazyFrame[_S_co]:
        return self

    def collect(
        self, engine: Literal["auto", "in-memory", "streaming", "gpu"] = "auto"
    ) -> DataFrame[_S_co]:
        df = cast(pl.DataFrame, self.dataframe.collect(engine=engine, background=False))
        return DataFrame(self.shape, df)

    def filter(self, *condition: ExoExpr[_S_co, bool]) -> LazyFrame[_S_co]:
        """Only keep rows where the boolean conditions evaluate to `True`"""
        return LazyFrame(self.shape, self.dataframe.filter(*(c.expr for c in condition)))

    def with_columns(self, *columns: EndoExpr[_S_co, Any]) -> LazyFrame[_S_co]:
        """Use the provided expressions to update existing columns in the shape"""
        return LazyFrame(self.shape, self.dataframe.with_columns(c.expr for c in columns))

    def transform[SNew: Shape](
        self, shape: type[SNew], *transforms: Expr[_S_co, SNew, Any] | BoundSeries[SNew, Any]
    ) -> LazyFrame[SNew]:
        """
        Convert from one shape to another shape, using the provided expressions to map columns in
        the current shape to columns in the new shape:

        - Any columns with the same name in both the current and new shapes without an expression
          mapping to them will be preserved
        - Any columns in the new shape not in the original shape, and not mapped to, will throw a
          runtime error
        - Any columns in the current shape not in the new shape will be dropped
        """
        return LazyFrame(shape, self.dataframe.with_columns(t.expr for t in transforms))

    def explode(self, *explosions: Explosion[_S_co, _S_co, Any]) -> LazyFrame[_S_co]:
        """
        Take a series of list columns and create a new row for each value in the list:

        ```
        accounts.explode(
            Account.link_name.implode().over(Account.type).list.explode_to(Account.link_name)
        )
        ```

        The above will create a new row for every linked name from any account for the same type
        """
        return LazyFrame(
            self.shape,
            self.dataframe.with_columns(
                e.expr.to_out(e.to.name).cast(list_of(e.to.ty)).expr for e in explosions
            ).explode(*(e.to.name for e in explosions)),
        )

    def explode_transform[Q: Shape](
        self, shape: type[Q], *explosions: Explosion[_S_co, Q, Any] | Expr[_S_co, Q, Any]
    ) -> LazyFrame[Q]:
        """Take a series of list columns and create a new row for each value in the list"""
        return LazyFrame(
            shape,
            self.dataframe.with_columns(
                (
                    e.expr.to_out(e.to.name).cast(list_of(e.to.ty))
                    if isinstance(e, Explosion)
                    else e
                ).expr
                for e in explosions
            ).explode(*(e.to.name for e in explosions if isinstance(e, Explosion))),
        )

    def agg(self, *agg: EndoAggExpr[_S_co, Any]) -> LazyFrame[_S_co]:
        """
        Define the aggregating expressions to group rows in the dataframe. Any columns not
        aggregated will be treated as the group by keys, since all columns must be preserved. To
        drop columns instead use `transform` before `agg`
        """
        aggregating = {e.expr.meta.output_name() for e in agg}
        return LazyFrame(
            self.shape,
            self.dataframe.group_by(
                *map(pl.col, self.dataframe.collect_schema().keys() - aggregating)
            ).agg(*(e.expr for e in agg)),
        )

    def group_by(self, *keys: EndoExpr[_S_co, Any]) -> LazyGroupBy[_S_co, _S_co]:
        """
        Determine a series of expressions to group the dataframe by, this should be followed by an
        agg to apply aggregations to the grouped frame
        """
        return LazyGroupBy(self.shape, self.dataframe.group_by(*(k.expr for k in keys)))

    def agg_transform[Q: Shape](
        self, shape: type[Q], *exprs: AggExpr[_S_co, Q, Any] | Expr[_S_co, Q, Any]
    ) -> LazyFrame[Q]:
        """
        Define the aggregating expressions to group rows in the dataframe. Any columns not
        aggregated will be treated as the group by keys, since all columns must be preserved. To
        drop columns instead use `transform` before `agg`.

        This allows transforming the aggregated columns since aggregation may change types
        """
        aggregating = {e.expr.meta.output_name(): e.expr for e in exprs if isinstance(e, AggExpr)}
        non_aggregating = {e.expr.meta.output_name(): e.expr for e in exprs if isinstance(e, Expr)}
        assert aggregating.keys().isdisjoint(non_aggregating.keys()), (
            f"Can't aggregate and group by same dimensions: {aggregating.keys() & non_aggregating.keys()}"
        )
        return LazyFrame(
            shape,
            self.dataframe.group_by(
                *(
                    e if (e := non_aggregating.get(k)) is not None else pl.col(k)
                    for k in shape.shape_meta().datatypes.keys() - aggregating.keys()
                )
            ).agg(*aggregating.values()),
        )

    def group_by_transform[Q: Shape](
        self, shape: type[Q], *keys: Expr[_S_co, Q, Any]
    ) -> LazyGroupBy[_S_co, Q]:
        """
        Determine a series of expressions to group the dataframe by, this should be followed by an
        agg to apply aggregations to the grouped frame
        """
        return LazyGroupBy(shape, self.dataframe.group_by(*(k.expr for k in keys)))

    @classmethod
    def concat(cls, shape: type[_S_co], frames: Iterable[Self]) -> LazyFrame[_S_co]:
        frames = iter(frames)
        if head := first(frames, None):
            return head.vstack(*frames)
        return cls(shape)

    def vstack(self, *frames: Self) -> LazyFrame[_S_co]:
        # We know these share the same shape, but we don't know the order of the columns matches.
        # We select the columns to reorder them to match in the vstack
        columns = self.dataframe.collect_schema().names()
        return LazyFrame(
            self.shape,
            pl.concat(
                prepend(self.dataframe, (d.dataframe.select(columns) for d in frames)),
                how="vertical",
            ),
        )

    def unique(
        self,
        *exprs: BoundDimension[_S_co, Any],
        keep: Literal["first", "last", "none", "any"] = "any",
        maintain_order: bool = False,
    ) -> LazyFrame[_S_co]:
        names = [e.name for e in exprs] if exprs else None
        return LazyFrame(
            self.shape, self.dataframe.unique(names, keep=keep, maintain_order=maintain_order)
        )

    def sort(
        self,
        *exprs: ExoExpr[_S_co, Any],
        descending: tuple[bool, ...] | bool = False,
        nulls_last: tuple[bool, ...] | bool = False,
        maintain_order: bool = False,
    ) -> LazyFrame[_S_co]:
        return LazyFrame(
            self.shape,
            self.dataframe.sort(
                (e.expr for e in exprs),
                descending=descending,
                nulls_last=nulls_last,
                maintain_order=maintain_order,
            ),
        )

    def suffix(self, suffixed: type[Suffixed[_S_co]] | None = None) -> LazyFrame[Suffixed[_S_co]]:
        suffixed = suffixed or suffix(self.shape)
        return LazyFrame[Any](suffixed, self.dataframe.rename(suffixed.mapping_to()))

    def gather_every(self, n: int, offset: int = 0) -> LazyFrame[_S_co]:
        return LazyFrame(self.shape, self.dataframe.gather_every(n, offset))

    def pipe[**P, T](
        self, function: Callable[Concatenate[Self, P], T], *args: P.args, **kwargs: P.kwargs
    ) -> T:
        return function(self, *args, **kwargs)

    @classmethod
    def scan_csv(
        cls,
        shape: type[_S_co],
        source: IO[str] | str | bytes | Path | IO[bytes],
        mappings: Mapping[BoundDimension[_S_co, Any], str] | None = None,
        *,
        has_header: bool = True,
        skip_rows: int = 0,
        low_memory: bool = False,
    ) -> Self:
        if not has_header:
            return cls(
                shape,
                pl.scan_csv(
                    source,
                    schema=shape.shape_meta().schema,
                    has_header=False,
                    ignore_errors=True,
                    truncate_ragged_lines=True,
                    try_parse_dates=True,
                    skip_rows=skip_rows,
                    infer_schema=False,
                ),
            )
        if mappings is None:
            # The default is to match up the columns from the file with the dimensions from the
            # shape alphanumerically case-insenstively
            headers_only = pl.scan_csv(source, infer_schema_length=0, skip_rows=skip_rows, n_rows=0)
            schema = headers_only.collect_schema()
            columns = {re.sub(r"[\W_]", "", h).lower(): h for h in schema.keys()}
            mappings = {
                d: columns[re.sub(r"[\W_]", "", d.name).lower()]
                for d in shape.shape_meta().dimensions
            }

        return cls(
            shape,
            pl.scan_csv(
                source,
                schema_overrides={mappings[d]: d.ty.pl_ty for d in shape.shape_meta().dimensions},
                has_header=has_header,
                ignore_errors=True,
                truncate_ragged_lines=True,
                try_parse_dates=True,
                skip_rows=skip_rows,
                infer_schema=False,
                infer_schema_length=0,
                low_memory=low_memory,
            )
            .select(mappings[d] for d in shape.shape_meta().dimensions)
            .rename({mappings[d]: d.name for d in shape.shape_meta().dimensions}),
        )

    def join_asof[Q: Shape](
        self,
        right: LazyFrame[Q],
        on: ExoExpr[_S_co | Q, Any] | JoinOn[_S_co, Q, Any],
        strategy: Literal["backward", "forward", "nearest"] = "backward",
    ) -> LazyFrame[Intersection[_S_co, Q]]:
        """
        Join two tables into a common shape, by nearest

        Parameters
        ----------
        on : BoundDimension[S, _]
            Join on the same columns for the left and the right shapes based on the joint shape.
            The column must be available in both original shapes
        """
        joined = self.dataframe.join_asof(
            right.dataframe,
            left_on=(on.left if isinstance(on, JoinOn) else on).expr,
            right_on=(on.right if isinstance(on, JoinOn) else on).expr,
            strategy=strategy,
        )
        return LazyFrame["Intersection[_S_co, Q]"](self.shape & right.shape, joined)

    def join[Q: Shape](
        self,
        right: LazyFrame[Q],
        *on: JoinOn[_S_co, Q, Any] | ExoExpr[_S_co | Q, Any],
        how: Literal["inner", "left", "right", "full", "semi", "anti", "cross", "outer"] = "inner",
    ) -> LazyFrame[Intersection[_S_co, Q]]:
        """
        Join two tables into a common shape. The common shape must be a subclass of both original
        tables. To avoid creating the common subclass use `.join_transform(...)`

        Parameters
        ----------
        on : BoundDimension[S, _]
            Join on the same columns for the left and the right shapes based on the joint shape.
            The column must be available in both original shapes
        """
        if on:
            joined = self.dataframe.join(
                right.dataframe,
                left_on=[(e.left if isinstance(e, JoinOn) else e).expr for e in on],
                right_on=[(e.right if isinstance(e, JoinOn) else e).expr for e in on],
                how=how,
            )
            already_populated = frozenset(joined.collect_schema().keys())
            joined = joined.with_columns(
                # Polars will drop right columns with different names if they're simple matchups
                # Restore the right column names so the full self.shape & right.shape shape is
                # generated
                e.left.expr.alias(e.right.name)
                for e in on
                if isinstance(e, JoinOn)
                and isinstance(e.right, BoundDimension)
                and e.right.name not in already_populated
            )
        else:
            joined = self.dataframe.join(right.dataframe, how=how)
        return LazyFrame["Intersection[_S_co, Q]"](self.shape & right.shape, joined)

s property

Provides a utility alias for accessing dataframe shape columns and attributes

As Shapes only have class-level operations, pretending this is an instance is equivalent enough for direct usage of attributes (dimensions and shape_meta). To see the shape as a shape-type, use .shape. This works around ty's limitation of resolving Unknown for type[S & Q] (ty's fine with type[S] & type[Q]), by lowering to the instance level

__getitem__(s)

Construct a lazy series of values transformed by s for each row in the frame

Source code in typol/lazy.py
def __getitem__[T](self, s: ExoExpr[_S_co, T]) -> LazySeries[T]:
    """Construct a lazy series of values transformed by `s` for each row in the frame"""
    return LazySeries[T](self.dataframe.select(s.expr.alias("series")))

agg(*agg)

Define the aggregating expressions to group rows in the dataframe. Any columns not aggregated will be treated as the group by keys, since all columns must be preserved. To drop columns instead use transform before agg

Source code in typol/lazy.py
def agg(self, *agg: EndoAggExpr[_S_co, Any]) -> LazyFrame[_S_co]:
    """
    Define the aggregating expressions to group rows in the dataframe. Any columns not
    aggregated will be treated as the group by keys, since all columns must be preserved. To
    drop columns instead use `transform` before `agg`
    """
    aggregating = {e.expr.meta.output_name() for e in agg}
    return LazyFrame(
        self.shape,
        self.dataframe.group_by(
            *map(pl.col, self.dataframe.collect_schema().keys() - aggregating)
        ).agg(*(e.expr for e in agg)),
    )

agg_transform(shape, *exprs)

Define the aggregating expressions to group rows in the dataframe. Any columns not aggregated will be treated as the group by keys, since all columns must be preserved. To drop columns instead use transform before agg.

This allows transforming the aggregated columns since aggregation may change types

Source code in typol/lazy.py
def agg_transform[Q: Shape](
    self, shape: type[Q], *exprs: AggExpr[_S_co, Q, Any] | Expr[_S_co, Q, Any]
) -> LazyFrame[Q]:
    """
    Define the aggregating expressions to group rows in the dataframe. Any columns not
    aggregated will be treated as the group by keys, since all columns must be preserved. To
    drop columns instead use `transform` before `agg`.

    This allows transforming the aggregated columns since aggregation may change types
    """
    aggregating = {e.expr.meta.output_name(): e.expr for e in exprs if isinstance(e, AggExpr)}
    non_aggregating = {e.expr.meta.output_name(): e.expr for e in exprs if isinstance(e, Expr)}
    assert aggregating.keys().isdisjoint(non_aggregating.keys()), (
        f"Can't aggregate and group by same dimensions: {aggregating.keys() & non_aggregating.keys()}"
    )
    return LazyFrame(
        shape,
        self.dataframe.group_by(
            *(
                e if (e := non_aggregating.get(k)) is not None else pl.col(k)
                for k in shape.shape_meta().datatypes.keys() - aggregating.keys()
            )
        ).agg(*aggregating.values()),
    )

explode(*explosions)

Take a series of list columns and create a new row for each value in the list:

accounts.explode(
    Account.link_name.implode().over(Account.type).list.explode_to(Account.link_name)
)

The above will create a new row for every linked name from any account for the same type

Source code in typol/lazy.py
def explode(self, *explosions: Explosion[_S_co, _S_co, Any]) -> LazyFrame[_S_co]:
    """
    Take a series of list columns and create a new row for each value in the list:

    ```
    accounts.explode(
        Account.link_name.implode().over(Account.type).list.explode_to(Account.link_name)
    )
    ```

    The above will create a new row for every linked name from any account for the same type
    """
    return LazyFrame(
        self.shape,
        self.dataframe.with_columns(
            e.expr.to_out(e.to.name).cast(list_of(e.to.ty)).expr for e in explosions
        ).explode(*(e.to.name for e in explosions)),
    )

explode_transform(shape, *explosions)

Take a series of list columns and create a new row for each value in the list

Source code in typol/lazy.py
def explode_transform[Q: Shape](
    self, shape: type[Q], *explosions: Explosion[_S_co, Q, Any] | Expr[_S_co, Q, Any]
) -> LazyFrame[Q]:
    """Take a series of list columns and create a new row for each value in the list"""
    return LazyFrame(
        shape,
        self.dataframe.with_columns(
            (
                e.expr.to_out(e.to.name).cast(list_of(e.to.ty))
                if isinstance(e, Explosion)
                else e
            ).expr
            for e in explosions
        ).explode(*(e.to.name for e in explosions if isinstance(e, Explosion))),
    )

filter(*condition)

Only keep rows where the boolean conditions evaluate to True

Source code in typol/lazy.py
def filter(self, *condition: ExoExpr[_S_co, bool]) -> LazyFrame[_S_co]:
    """Only keep rows where the boolean conditions evaluate to `True`"""
    return LazyFrame(self.shape, self.dataframe.filter(*(c.expr for c in condition)))

group_by(*keys)

Determine a series of expressions to group the dataframe by, this should be followed by an agg to apply aggregations to the grouped frame

Source code in typol/lazy.py
def group_by(self, *keys: EndoExpr[_S_co, Any]) -> LazyGroupBy[_S_co, _S_co]:
    """
    Determine a series of expressions to group the dataframe by, this should be followed by an
    agg to apply aggregations to the grouped frame
    """
    return LazyGroupBy(self.shape, self.dataframe.group_by(*(k.expr for k in keys)))

group_by_transform(shape, *keys)

Determine a series of expressions to group the dataframe by, this should be followed by an agg to apply aggregations to the grouped frame

Source code in typol/lazy.py
def group_by_transform[Q: Shape](
    self, shape: type[Q], *keys: Expr[_S_co, Q, Any]
) -> LazyGroupBy[_S_co, Q]:
    """
    Determine a series of expressions to group the dataframe by, this should be followed by an
    agg to apply aggregations to the grouped frame
    """
    return LazyGroupBy(shape, self.dataframe.group_by(*(k.expr for k in keys)))

join(right, *on, how='inner')

Join two tables into a common shape. The common shape must be a subclass of both original tables. To avoid creating the common subclass use .join_transform(...)

Parameters

on : BoundDimension[S, _] Join on the same columns for the left and the right shapes based on the joint shape. The column must be available in both original shapes

Source code in typol/lazy.py
def join[Q: Shape](
    self,
    right: LazyFrame[Q],
    *on: JoinOn[_S_co, Q, Any] | ExoExpr[_S_co | Q, Any],
    how: Literal["inner", "left", "right", "full", "semi", "anti", "cross", "outer"] = "inner",
) -> LazyFrame[Intersection[_S_co, Q]]:
    """
    Join two tables into a common shape. The common shape must be a subclass of both original
    tables. To avoid creating the common subclass use `.join_transform(...)`

    Parameters
    ----------
    on : BoundDimension[S, _]
        Join on the same columns for the left and the right shapes based on the joint shape.
        The column must be available in both original shapes
    """
    if on:
        joined = self.dataframe.join(
            right.dataframe,
            left_on=[(e.left if isinstance(e, JoinOn) else e).expr for e in on],
            right_on=[(e.right if isinstance(e, JoinOn) else e).expr for e in on],
            how=how,
        )
        already_populated = frozenset(joined.collect_schema().keys())
        joined = joined.with_columns(
            # Polars will drop right columns with different names if they're simple matchups
            # Restore the right column names so the full self.shape & right.shape shape is
            # generated
            e.left.expr.alias(e.right.name)
            for e in on
            if isinstance(e, JoinOn)
            and isinstance(e.right, BoundDimension)
            and e.right.name not in already_populated
        )
    else:
        joined = self.dataframe.join(right.dataframe, how=how)
    return LazyFrame["Intersection[_S_co, Q]"](self.shape & right.shape, joined)

join_asof(right, on, strategy='backward')

Join two tables into a common shape, by nearest

Parameters

on : BoundDimension[S, _] Join on the same columns for the left and the right shapes based on the joint shape. The column must be available in both original shapes

Source code in typol/lazy.py
def join_asof[Q: Shape](
    self,
    right: LazyFrame[Q],
    on: ExoExpr[_S_co | Q, Any] | JoinOn[_S_co, Q, Any],
    strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> LazyFrame[Intersection[_S_co, Q]]:
    """
    Join two tables into a common shape, by nearest

    Parameters
    ----------
    on : BoundDimension[S, _]
        Join on the same columns for the left and the right shapes based on the joint shape.
        The column must be available in both original shapes
    """
    joined = self.dataframe.join_asof(
        right.dataframe,
        left_on=(on.left if isinstance(on, JoinOn) else on).expr,
        right_on=(on.right if isinstance(on, JoinOn) else on).expr,
        strategy=strategy,
    )
    return LazyFrame["Intersection[_S_co, Q]"](self.shape & right.shape, joined)

transform(shape, *transforms)

Convert from one shape to another shape, using the provided expressions to map columns in the current shape to columns in the new shape:

  • Any columns with the same name in both the current and new shapes without an expression mapping to them will be preserved
  • Any columns in the new shape not in the original shape, and not mapped to, will throw a runtime error
  • Any columns in the current shape not in the new shape will be dropped
Source code in typol/lazy.py
def transform[SNew: Shape](
    self, shape: type[SNew], *transforms: Expr[_S_co, SNew, Any] | BoundSeries[SNew, Any]
) -> LazyFrame[SNew]:
    """
    Convert from one shape to another shape, using the provided expressions to map columns in
    the current shape to columns in the new shape:

    - Any columns with the same name in both the current and new shapes without an expression
      mapping to them will be preserved
    - Any columns in the new shape not in the original shape, and not mapped to, will throw a
      runtime error
    - Any columns in the current shape not in the new shape will be dropped
    """
    return LazyFrame(shape, self.dataframe.with_columns(t.expr for t in transforms))

with_columns(*columns)

Use the provided expressions to update existing columns in the shape

Source code in typol/lazy.py
def with_columns(self, *columns: EndoExpr[_S_co, Any]) -> LazyFrame[_S_co]:
    """Use the provided expressions to update existing columns in the shape"""
    return LazyFrame(self.shape, self.dataframe.with_columns(c.expr for c in columns))

LazyGroupBy dataclass

Source code in typol/lazy.py
@dataclasses.dataclass
class LazyGroupBy[S: Shape, Q: Shape]:
    shape: type[Q]
    group_by: pl.lazyframe.group_by.LazyGroupBy

    def agg(self, *agg: AggExpr[S, Q, Any]) -> LazyFrame[Q]:
        """Define the aggregating expressions to group rows in the dataframe"""
        return LazyFrame(self.shape, self.group_by.agg(*(e.expr for e in agg)))

agg(*agg)

Define the aggregating expressions to group rows in the dataframe

Source code in typol/lazy.py
def agg(self, *agg: AggExpr[S, Q, Any]) -> LazyFrame[Q]:
    """Define the aggregating expressions to group rows in the dataframe"""
    return LazyFrame(self.shape, self.group_by.agg(*(e.expr for e in agg)))