Skip to content

Expressions

Typed wrappers for expr and schema (shape) operations, the core of Typol

AggExpr dataclass

An expression created by an aggregation function (e.g. .sum()). This can't be used as a normal expression, only as an aggregated value in an .agg(...) argument

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class AggExpr[S: Shape, R: Shape, T]:
    """
    An expression created by an aggregation function (e.g. `.sum()`). This can't be used as a normal
    expression, only as an aggregated value in an `.agg(...)` argument
    """

    expr: pl.Expr

    def coalesce(self, *others: ExoAggExpr[S, T]) -> AggExpr[S, R, T]:
        return AggExpr(pl.coalesce(self.expr, *map(_pl_expr, others)))

    def null_when_eq(self, value: ExoAggExpr[S, T] | T) -> AggExpr[S, R, T]:
        return AggExpr(pl.when(self.expr.ne(_pl_expr(value))).then(self.expr))

    def null_insignificant[N: (float, int)](self: AggExpr[S, R, N]) -> AggExpr[S, R, N]:
        return AggExpr(
            pl.when(self.expr.is_not_nan(), self.expr.is_not_null(), self.expr.ne(0)).then(
                self.expr
            )
        )

    def to[Q: Shape](self, dimension: BoundDimension[Q, T]) -> AggExpr[S, Q, T]:
        # We don't cast here in case it's an `agg`; Polars has weird casting behaviour with
        # this and there's no way to tell Polars is this to be aggregated
        return AggExpr(self.expr.alias(dimension.name))

    def over[Q: Shape](
        self,
        *exprs: ExoExpr[S, Any] | ExoExpr[Q, Any],
        order_by: Iterable[ExoExpr[S, Any] | ExoExpr[Q, Any]] | None = None,
        mapping_strategy: Literal["group_to_rows", "join", "explode"] = "group_to_rows",
    ) -> Expr[Intersection[S, Q], R, T]:
        """
        Restrict an aggregating expression to just a window (i.e. bucket) of values keyed on by
        `exprs`. "group_to_rows" matches values up to the current rows, "join" matches them back to
        the source rows, and "explode" does the same as join, but if there are multiple values from
        the aggregating expression, it will duplicate the existing line into multiple per each
        aggregated value:

        ```
        # Find multiple ids attached to the same username
        accounts.filter(Account.id.count().over(Account.username).gt(1))
        ```
        """
        return IntermediateExpr(
            self.expr.over(
                # Polars needs these key names to be unique, otherwise it raises `DuplicateError`
                *(_pl_expr(e).alias(f"_key{i}") for i, e in enumerate(exprs)),
                order_by=(_pl_expr(e) for e in order_by) if order_by else None,
                mapping_strategy=mapping_strategy,
            )
        )

over(*exprs, order_by=None, mapping_strategy='group_to_rows')

Restrict an aggregating expression to just a window (i.e. bucket) of values keyed on by exprs. "group_to_rows" matches values up to the current rows, "join" matches them back to the source rows, and "explode" does the same as join, but if there are multiple values from the aggregating expression, it will duplicate the existing line into multiple per each aggregated value:

# Find multiple ids attached to the same username
accounts.filter(Account.id.count().over(Account.username).gt(1))
Source code in typol/expr.py
def over[Q: Shape](
    self,
    *exprs: ExoExpr[S, Any] | ExoExpr[Q, Any],
    order_by: Iterable[ExoExpr[S, Any] | ExoExpr[Q, Any]] | None = None,
    mapping_strategy: Literal["group_to_rows", "join", "explode"] = "group_to_rows",
) -> Expr[Intersection[S, Q], R, T]:
    """
    Restrict an aggregating expression to just a window (i.e. bucket) of values keyed on by
    `exprs`. "group_to_rows" matches values up to the current rows, "join" matches them back to
    the source rows, and "explode" does the same as join, but if there are multiple values from
    the aggregating expression, it will duplicate the existing line into multiple per each
    aggregated value:

    ```
    # Find multiple ids attached to the same username
    accounts.filter(Account.id.count().over(Account.username).gt(1))
    ```
    """
    return IntermediateExpr(
        self.expr.over(
            # Polars needs these key names to be unique, otherwise it raises `DuplicateError`
            *(_pl_expr(e).alias(f"_key{i}") for i, e in enumerate(exprs)),
            order_by=(_pl_expr(e) for e in order_by) if order_by else None,
            mapping_strategy=mapping_strategy,
        )
    )

BoundDimension dataclass

Bases: Expr[_S_contra, _S_contra, _T]

This binds the shape to the dimension at the type level, which means this shape can then be enforced by all the operations using any dimension, and any expression created from this bound dimension can continue to refer to the shape it operates on in its type too by passing the type parameter along

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class BoundDimension(Expr[_S_contra, _S_contra, _T]):
    """
    This binds the shape to the dimension at the type level, which means this shape can then
    be enforced by all the operations using any dimension, and any expression created from this
    bound dimension can continue to refer to the shape it operates on in its type too by passing
    the type parameter along
    """

    shape: type[_S_contra]
    ty: Type[_T]
    name: str

    @property
    def expr(self) -> pl.Expr:
        return pl.col(self.name)

    def set(self, value: _T) -> Initializer[_S_contra, _T]:
        return Initializer(self, normalize_enum(value))

    def set_or_null(self, value: _T | None) -> Initializer[_S_contra, _T]:
        return Initializer(self, normalize_enum(value))

    def set_all(self, values: Iterable[_T]) -> ColumnInitializer[_S_contra, _T]:
        return ColumnInitializer(self, list(map(normalize_enum, values)))

    def set_or_null_all(self, values: Iterable[_T | None]) -> ColumnInitializer[_S_contra, _T]:
        return ColumnInitializer(self, list(map(normalize_enum, values)))

    def map(self, transform: Callable[[_T], _T]) -> EndoExpr[_S_contra, _T]:
        """
        Apply a Python transformation to the values in a column. This is defined on the dimension
        rather than on `Expr`, to know the polars datatype of the output. To change the type, use
        `Expr.map_to`
        """
        return IntermediateExpr(self.expr.map_elements(transform, return_dtype=self.ty.pl_ty))

    def null(self) -> Expr[Any, _S_contra, _T]:
        null: MesoExpr[Any, _T] = lit(None)
        return null.to(self)

map(transform)

Apply a Python transformation to the values in a column. This is defined on the dimension rather than on Expr, to know the polars datatype of the output. To change the type, use Expr.map_to

Source code in typol/expr.py
def map(self, transform: Callable[[_T], _T]) -> EndoExpr[_S_contra, _T]:
    """
    Apply a Python transformation to the values in a column. This is defined on the dimension
    rather than on `Expr`, to know the polars datatype of the output. To change the type, use
    `Expr.map_to`
    """
    return IntermediateExpr(self.expr.map_elements(transform, return_dtype=self.ty.pl_ty))

ChainedWhen dataclass

Bases: Generic[_S_contra, _R_contra, _T]

A chain of when statements representing an if/elif chain. Construct by starting with a tp.when(conds).then(if_true), and adding more .when(else_cond).then(else_true)s after

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class ChainedWhen(Generic[_S_contra, _R_contra, _T]):
    """
    A chain of when statements representing an if/elif chain. Construct by starting with a
    `tp.when(conds).then(if_true)`, and adding more `.when(else_cond).then(else_true)`s after
    """

    expr: polars.expr.whenthen.ChainedWhen

    @overload
    def then[SA: Shape](
        self, then: ExoExpr[SA, _T]
    ) -> PartialConditional[Intersection[_S_contra, SA], _R_contra, _T]: ...
    @overload
    def then(self, then: _T) -> PartialConditional[_S_contra, _R_contra, _T]: ...

    def then[SA: Shape](
        self, then: Expr | _T
    ) -> PartialConditional[Intersection[_S_contra, SA], _R_contra, _T]:
        return PartialConditional(self.expr.then(_expr_or_lit(then).expr))

ColumnInitializer dataclass

Bases: Generic[_S_contra, _T]

Used in dataframe constructors to initialize a dataframe column-wise

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class ColumnInitializer(Generic[_S_contra, _T]):
    """Used in dataframe constructors to initialize a dataframe column-wise"""

    dimension: BoundDimension[_S_contra, _T]
    value: list[_T | None]

Dimension dataclass

A shape Dimension declares a column in the dataclass. This should be a class-level field of a Shape subtype, and will be a BoundDimension when accessed as MyShape.my_dimension

You must provide a type when declaring a dimension, and can optionally set a polars type implementation and an underlying name to use. By default, the name will be the name of the field

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class Dimension[T]:
    """
    A shape `Dimension` declares a column in the dataclass. This should be a class-level field of
    a `Shape` subtype, and will be a `BoundDimension` when accessed as `MyShape.my_dimension`

    You must provide a type when declaring a dimension, and can optionally set a polars type
    implementation and an underlying name to use. By default, the name will be the name of the field
    """

    ty: Type[T]
    name: str = ""

    def __get__[S: Shape](
        self, shape: S | None, shape_type: type[S] | None = None
    ) -> BoundDimension[S, T]:
        """
        This is the secret sauce: when a shape's dimensions are accessed by `MyShape.my_dimension`,
        this binds the shape to the dimension at the type level, which means this shape can then
        be enforced by all the operations using any dimension, and any expression created from this
        bound dimension can continue to refer to the shape it operates on in its type too by passing
        the type parameter along
        """
        shape_ty = shape_type or shape is not None and type(shape)
        assert shape_ty
        return BoundDimension(shape_ty, self.ty, self.name)

    def __set_name__(self, _owner: type, name: str) -> None:
        if not self.name:
            object.__setattr__(self, "name", name)

__get__(shape, shape_type=None)

This is the secret sauce: when a shape's dimensions are accessed by MyShape.my_dimension, this binds the shape to the dimension at the type level, which means this shape can then be enforced by all the operations using any dimension, and any expression created from this bound dimension can continue to refer to the shape it operates on in its type too by passing the type parameter along

Source code in typol/expr.py
def __get__[S: Shape](
    self, shape: S | None, shape_type: type[S] | None = None
) -> BoundDimension[S, T]:
    """
    This is the secret sauce: when a shape's dimensions are accessed by `MyShape.my_dimension`,
    this binds the shape to the dimension at the type level, which means this shape can then
    be enforced by all the operations using any dimension, and any expression created from this
    bound dimension can continue to refer to the shape it operates on in its type too by passing
    the type parameter along
    """
    shape_ty = shape_type or shape is not None and type(shape)
    assert shape_ty
    return BoundDimension(shape_ty, self.ty, self.name)

DtExprNamespace dataclass

Namespace for date and datetime functions, similar to pl.Expr.dt

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class DtExprNamespace[S: Shape, R: Shape, T]:
    """Namespace for date and datetime functions, similar to `pl.Expr.dt`"""

    expr: Expr[S, R, T]

    def date(self) -> Expr[S, R, datetime.date]:
        """Just take the date component of a datetime"""
        return IntermediateExpr(self.expr.expr.dt.date())

    def offset_by[Q: Shape](
        self, offset: str | ExoExpr[Q, str]
    ) -> Expr[Intersection[S, Q], R, datetime.date]:
        """
        Add an amount of time to a date or datetime, see `pl.offset_by` for all allowed interval
        strings, but examples include `-1y` or `6mo3m2s`
        """
        return IntermediateExpr(self.expr.expr.dt.offset_by(_pl_expr(offset)))

    def strftime(self, format: str) -> MesoExpr[S, str]:
        return IntermediateExpr(self.expr.expr.dt.strftime(format))

    def year(self) -> MesoExpr[S, int]:
        """Year number from date"""
        return IntermediateExpr(self.expr.expr.dt.year())

    def month(self) -> MesoExpr[S, int]:
        """Month number from date"""
        return IntermediateExpr(self.expr.expr.dt.month())

    def day(self) -> MesoExpr[S, int]:
        """Day of month from date"""
        return IntermediateExpr(self.expr.expr.dt.day())

    def add_business_days[Q: Shape](
        self,
        offset: int | ExoExpr[Q, int],
        roll: Literal["step", "snap", "raise", "forward", "backward"] = "step",
    ) -> Expr[Intersection[S, Q], R, datetime.date]:
        """
        Add `offset` business days to the current day. If the current day is not a business day, it
        will be treated based on `roll`:

            - `"step"`: Treat the first step from the current non-business day as if it is moving
              from a business day. For zero, it heads to the business day it would move from to go
              forwards (back 1)
            - `"snap"`: In the direction of `offset`, move to a business day before adding. For
              zero, it heads to the business day it would move from to go forwards (forward 1)
            - `"raise"`: Throw an error if not starting on a business day
            - `"forward"`: Snap to the next business day
            - `"backward:` Snap to the previous business day
        """
        dt = self.expr.expr.dt
        match roll:
            case "snap" | "step":
                go_forward = _pl_expr(offset).ge(0) if roll == "snap" else _pl_expr(offset).le(0)
                dt = (
                    pl.when(go_forward)
                    .then(dt.add_business_days(0, roll="forward"))
                    .otherwise(dt.add_business_days(0, roll="backward"))
                ).dt
                roll = "raise"
        return IntermediateExpr(dt.add_business_days(_pl_expr(offset), roll=roll))

    def total_days(self: DtExprNamespace[S, R, datetime.timedelta]) -> MesoExpr[S, int]:
        """The total number of days represented by the duration"""
        return IntermediateExpr(self.expr.expr.dt.total_days())

    def total_seconds(self: DtExprNamespace[S, R, datetime.timedelta]) -> MesoExpr[S, int]:
        """The total number of seconds represented by the duration"""
        return IntermediateExpr(self.expr.expr.dt.total_seconds())

    def weekday(self: DtExprNamespace[S, R, datetime.date]) -> MesoExpr[S, int]:
        """
        Day of week between 1 (Monday) and 7 (Sunday), you'll need to `- 1` to be compatible with
        `import calendar`
        """
        return IntermediateExpr(self.expr.expr.dt.weekday())

    def month_start(self: DtExprNamespace[S, R, datetime.date]) -> Expr[S, R, datetime.date]:
        """Backward to the first of the same month"""
        return IntermediateExpr(self.expr.expr.dt.month_start())

    def month_end(self: DtExprNamespace[S, R, datetime.date]) -> Expr[S, R, datetime.date]:
        """Forward to of the last of the same month"""
        return IntermediateExpr(self.expr.expr.dt.month_end())

add_business_days(offset, roll='step')

Add offset business days to the current day. If the current day is not a business day, it will be treated based on roll:

- `"step"`: Treat the first step from the current non-business day as if it is moving
  from a business day. For zero, it heads to the business day it would move from to go
  forwards (back 1)
- `"snap"`: In the direction of `offset`, move to a business day before adding. For
  zero, it heads to the business day it would move from to go forwards (forward 1)
- `"raise"`: Throw an error if not starting on a business day
- `"forward"`: Snap to the next business day
- `"backward:` Snap to the previous business day
Source code in typol/expr.py
def add_business_days[Q: Shape](
    self,
    offset: int | ExoExpr[Q, int],
    roll: Literal["step", "snap", "raise", "forward", "backward"] = "step",
) -> Expr[Intersection[S, Q], R, datetime.date]:
    """
    Add `offset` business days to the current day. If the current day is not a business day, it
    will be treated based on `roll`:

        - `"step"`: Treat the first step from the current non-business day as if it is moving
          from a business day. For zero, it heads to the business day it would move from to go
          forwards (back 1)
        - `"snap"`: In the direction of `offset`, move to a business day before adding. For
          zero, it heads to the business day it would move from to go forwards (forward 1)
        - `"raise"`: Throw an error if not starting on a business day
        - `"forward"`: Snap to the next business day
        - `"backward:` Snap to the previous business day
    """
    dt = self.expr.expr.dt
    match roll:
        case "snap" | "step":
            go_forward = _pl_expr(offset).ge(0) if roll == "snap" else _pl_expr(offset).le(0)
            dt = (
                pl.when(go_forward)
                .then(dt.add_business_days(0, roll="forward"))
                .otherwise(dt.add_business_days(0, roll="backward"))
            ).dt
            roll = "raise"
    return IntermediateExpr(dt.add_business_days(_pl_expr(offset), roll=roll))

date()

Just take the date component of a datetime

Source code in typol/expr.py
def date(self) -> Expr[S, R, datetime.date]:
    """Just take the date component of a datetime"""
    return IntermediateExpr(self.expr.expr.dt.date())

day()

Day of month from date

Source code in typol/expr.py
def day(self) -> MesoExpr[S, int]:
    """Day of month from date"""
    return IntermediateExpr(self.expr.expr.dt.day())

month()

Month number from date

Source code in typol/expr.py
def month(self) -> MesoExpr[S, int]:
    """Month number from date"""
    return IntermediateExpr(self.expr.expr.dt.month())

month_end()

Forward to of the last of the same month

Source code in typol/expr.py
def month_end(self: DtExprNamespace[S, R, datetime.date]) -> Expr[S, R, datetime.date]:
    """Forward to of the last of the same month"""
    return IntermediateExpr(self.expr.expr.dt.month_end())

month_start()

Backward to the first of the same month

Source code in typol/expr.py
def month_start(self: DtExprNamespace[S, R, datetime.date]) -> Expr[S, R, datetime.date]:
    """Backward to the first of the same month"""
    return IntermediateExpr(self.expr.expr.dt.month_start())

offset_by(offset)

Add an amount of time to a date or datetime, see pl.offset_by for all allowed interval strings, but examples include -1y or 6mo3m2s

Source code in typol/expr.py
def offset_by[Q: Shape](
    self, offset: str | ExoExpr[Q, str]
) -> Expr[Intersection[S, Q], R, datetime.date]:
    """
    Add an amount of time to a date or datetime, see `pl.offset_by` for all allowed interval
    strings, but examples include `-1y` or `6mo3m2s`
    """
    return IntermediateExpr(self.expr.expr.dt.offset_by(_pl_expr(offset)))

total_days()

The total number of days represented by the duration

Source code in typol/expr.py
def total_days(self: DtExprNamespace[S, R, datetime.timedelta]) -> MesoExpr[S, int]:
    """The total number of days represented by the duration"""
    return IntermediateExpr(self.expr.expr.dt.total_days())

total_seconds()

The total number of seconds represented by the duration

Source code in typol/expr.py
def total_seconds(self: DtExprNamespace[S, R, datetime.timedelta]) -> MesoExpr[S, int]:
    """The total number of seconds represented by the duration"""
    return IntermediateExpr(self.expr.expr.dt.total_seconds())

weekday()

Day of week between 1 (Monday) and 7 (Sunday), you'll need to - 1 to be compatible with import calendar

Source code in typol/expr.py
def weekday(self: DtExprNamespace[S, R, datetime.date]) -> MesoExpr[S, int]:
    """
    Day of week between 1 (Monday) and 7 (Sunday), you'll need to `- 1` to be compatible with
    `import calendar`
    """
    return IntermediateExpr(self.expr.expr.dt.weekday())

year()

Year number from date

Source code in typol/expr.py
def year(self) -> MesoExpr[S, int]:
    """Year number from date"""
    return IntermediateExpr(self.expr.expr.dt.year())

Element

Bases: Shape

Special shape containing pl.element() for mapping single element expressions, such as list.eval

Source code in typol/expr.py
class Element[T](Shape):
    """
    Special shape containing `pl.element()` for mapping single element expressions, such as
    `list.eval`
    """

    @classmethod
    def element(cls) -> EndoExpr[Element[T], T]:
        return IntermediateExpr(pl.element())

Explosion dataclass

Bases: Generic[_S_contra, _R_contra, _T]

An expression that can "explode" a frame to a new row for each output value

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class Explosion(Generic[_S_contra, _R_contra, _T]):
    """An expression that can "explode" a frame to a new row for each output value"""

    expr: ExoExpr[_S_contra, list[_T]]
    to: BoundDimension[_R_contra, _T]

Expr

Bases: ABC, Generic[_S_contra, _R_contra, _T]

Base class for all expressions, defining the common operations such as comparison and transformation

Source code in typol/expr.py
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
class Expr(ABC, Generic[_S_contra, _R_contra, _T]):
    """
    Base class for all expressions, defining the common operations such as comparison and
    transformation
    """

    expr: pl.Expr

    def gt[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return IntermediateExpr(self.expr.gt(_pl_expr(other)))

    def lt[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return IntermediateExpr(self.expr.lt(_pl_expr(other)))

    def ge[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return IntermediateExpr(self.expr.ge(_pl_expr(other)))

    def le[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return IntermediateExpr(self.expr.le(_pl_expr(other)))

    def eq[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return IntermediateExpr(self.expr.eq(_pl_expr(other)))

    def ne[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return IntermediateExpr(self.expr.ne(_pl_expr(other)))

    def __gt__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return self.gt(other)

    def __lt__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return self.lt(other)

    def __ge__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return self.ge(other)

    def __le__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return self.le(other)

    def __ne__[SA: Shape](  # ty: ignore[invalid-method-override]
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return self.ne(other)

    def __eq__[SA: Shape](  # ty: ignore[invalid-method-override]
        self, other: ExoExpr[SA, _T] | _T
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        return self.eq(other)

    def __hash__[SA: Shape](self) -> int:
        return hash(self.expr)

    def is_between[SA: Shape](
        self,
        start: ExoExpr[SA, _T] | _T,
        end: ExoExpr[SA, _T] | _T,
        closed: Literal["both", "left", "right", "none"] = "both",
    ) -> MesoExpr[Intersection[_S_contra, SA], bool]:
        """Check if this expression is between the given lower and upper bounds"""
        return IntermediateExpr(self.expr.is_between(_pl_expr(start), _pl_expr(end), closed))

    def round[N: (float, int)](
        self: Expr[_S_contra, _R_contra, N], decimals: int = 0
    ) -> Expr[_S_contra, _R_contra, N]:
        return IntermediateExpr(self.expr.round(decimals))

    def floor[N: (float, int)](
        self: Expr[_S_contra, _R_contra, N],
    ) -> Expr[_S_contra, _R_contra, N]:
        return IntermediateExpr(self.expr.floor())

    def ceil[N: (float, int)](self: Expr[_S_contra, _R_contra, N]) -> Expr[_S_contra, _R_contra, N]:
        return IntermediateExpr(self.expr.ceil())

    def is_null(self) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_null())

    def is_nan(
        self: Expr[_S_contra, _R_contra, float] | Expr[_S_contra, _R_contra, int],
    ) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_nan())

    def is_infinite(
        self: Expr[_S_contra, _R_contra, float] | Expr[_S_contra, _R_contra, int],
    ) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_infinite())

    def is_not_null(self) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_not_null())

    def is_not_nan(
        self: Expr[_S_contra, _R_contra, float] | Expr[_S_contra, _R_contra, int],
    ) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_not_nan())

    def is_finite(
        self: Expr[_S_contra, _R_contra, float] | Expr[_S_contra, _R_contra, int],
    ) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_finite())

    def is_significant[N: (int, float)](
        self: Expr[_S_contra, _R_contra, N],
    ) -> MesoExpr[_S_contra, bool]:
        """Is the numeric value a significant number, not `nan`, `0` or `null`"""
        return IntermediateExpr(self.expr.is_not_nan() & self.expr.is_not_null() & self.expr.ne(0))

    def not_(self: Expr[_S_contra, _R_contra, bool]) -> Expr[_S_contra, _R_contra, bool]:
        return IntermediateExpr(self.expr.not_())

    def is_in[SA: Shape](
        self, other: ExoExpr[SA, builtins.list[_T]] | Collection[_T] | Series[_T]
    ) -> MesoExpr[_S_contra, bool]:
        match other:
            case Series():
                return IntermediateExpr(self.expr.is_in(other.data))
            case Expr():
                return IntermediateExpr(self.expr.is_in(other.expr))
            case Collection():
                return IntermediateExpr(self.expr.is_in(normalize_enum(other)))

    @staticmethod
    def lit[L](expr: L | None) -> MesoExpr[Shape, L]:
        return IntermediateExpr(_pl_expr(expr))

    @overload
    def fill_null[SA: Shape](
        self, fill: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]: ...
    @overload
    def fill_null[SA: Shape](
        self, *, strategy: Literal["forward", "backward", "min", "max", "mean", "zero", "one"]
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]: ...

    def fill_null[SA: Shape](
        self,
        fill: ExoExpr[SA, _T] | _T | None = None,
        strategy: Literal["forward", "backward", "min", "max", "mean", "zero", "one"] | None = None,
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return IntermediateExpr(
            self.expr.fill_null(_pl_expr(fill) if fill is not None else None, strategy)
        )

    def fill_nan[SA: Shape](
        self, fill: ExoExpr[SA, _T] | _T | None
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return IntermediateExpr(self.expr.fill_nan(_pl_expr(fill)))

    def reverse(self) -> Expr[_S_contra, _R_contra, _T]:
        return IntermediateExpr(self.expr.reverse())

    def to[Q: Shape](self, dimension: BoundDimension[Q, _T]) -> Expr[_S_contra, Q, _T]:
        return IntermediateExpr(self.expr.alias(dimension.name).cast(dimension.ty.pl_ty))

    def to_out(self, label: builtins.str) -> Expr[_S_contra, Shape, _T]:
        """
        The _out variant of `to` lets you rename a column, but it must be renamed again before it
        can be stored in a shape. However, if you're  going out to a file, this controls the output
        column name, so is most useful with `transform_write_csv`
        """
        return IntermediateExpr(self.expr.alias(label))

    def agg(self) -> MesoAggExpr[_S_contra, builtins.list[_T]]:
        """Collect all values in the given group into a list"""
        return AggExpr(self.expr)

    def gather_every(self, n: int, offset: int = 0) -> MesoAggExpr[_S_contra, builtins.list[_T]]:
        """Collect all values in the given group into a list"""
        return AggExpr(self.expr.gather_every(n, offset))

    def sum(self) -> AggExpr[_S_contra, _R_contra, _T]:
        return AggExpr(self.expr.sum())

    def mode(self) -> AggExpr[_S_contra, _R_contra, _T]:
        return AggExpr(self.expr.mode())

    def mean[N: (int, float, Decimal)](
        self: Expr[_S_contra, _R_contra, N],
    ) -> AggExpr[_S_contra, _R_contra, N]:
        return AggExpr(self.expr.mean())

    def median[N: (int, float, Decimal)](
        self: Expr[_S_contra, _R_contra, N],
    ) -> AggExpr[_S_contra, _R_contra, N]:
        return AggExpr(self.expr.median())

    def unique(self, maintain_order: bool = False) -> Expr[_S_contra, _R_contra, _T]:
        return IntermediateExpr(self.expr.unique(maintain_order=maintain_order))

    def drop_nulls(self) -> Expr[_S_contra, _R_contra, _T]:
        return IntermediateExpr(self.expr.drop_nulls())

    def sort(self, *, descending: bool = False) -> Expr[_S_contra, _R_contra, _T]:
        return IntermediateExpr(self.expr.sort(descending=descending))

    def first(self) -> AggExpr[_S_contra, _R_contra, _T]:
        return AggExpr(self.expr.first())

    def last(self) -> AggExpr[_S_contra, _R_contra, _T]:
        return AggExpr(self.expr.last())

    def min(self) -> AggExpr[_S_contra, _R_contra, _T]:
        return AggExpr(self.expr.min())

    def max(self) -> AggExpr[_S_contra, _R_contra, _T]:
        return AggExpr(self.expr.max())

    def count(self) -> MesoAggExpr[_S_contra, int]:
        return AggExpr(self.expr.count())

    def len(self) -> MesoAggExpr[_S_contra, int]:
        return AggExpr(self.expr.len())

    def implode(self: MesoExpr[_S_contra, _T]) -> MesoExpr[_S_contra, builtins.list[_T]]:
        """
        Group all the elements into a single. This resizes the resultant series to a single element.

        Note: Since this resizes the result, it is unsafe to simply map back to a column. Instead,
        it's useful for creating arrays for intermediates used in `ExoExpr`s. You might want
        `agg` instead to create a aggregated list out of elements
        """
        return IntermediateExpr(self.expr.implode())

    @overload
    def over[Q: Shape](
        self,
        *exprs: ExoExpr[_S_contra, Any]
        | ExoExpr[Q, Any]
        | ExoExpr[Intersection[_S_contra, Q], Any],
        order_by: Iterable[
            ExoExpr[_S_contra, Any] | ExoExpr[Q, Any] | ExoExpr[Intersection[_S_contra, Q], Any]
        ]
        | None = None,
        mapping_strategy: Literal["group_to_rows"] = "group_to_rows",
    ) -> MesoExpr[Intersection[_S_contra, Q], _T]: ...
    @overload
    def over[Q: Shape](
        self,
        *exprs: ExoExpr[_S_contra, Any]
        | ExoExpr[Q, Any]
        | ExoExpr[Intersection[_S_contra, Q], Any],
        order_by: Iterable[
            ExoExpr[_S_contra, Any] | ExoExpr[Q, Any] | ExoExpr[Intersection[_S_contra, Q], Any]
        ]
        | None = None,
        mapping_strategy: Literal["join"],
    ) -> MesoExpr[Intersection[_S_contra, Q], builtins.list[_T]]: ...

    def over(
        self,
        *exprs: ExoExpr,
        order_by: Iterable | None = None,
        mapping_strategy: Literal["group_to_rows", "join"] = "group_to_rows",
    ) -> MesoExpr:
        """
        Specify the expression is for the keyed group of the table. I.e.,

        Specify the expression is for a window (i.e. bucket) of values keyed on by `exprs`.
        "group_to_rows" matches values up to the current rows, "join" *implodes* the group and
        matches this list back to each of the the source rows:

        ```
        # Find accounts with a US-region account with the same username
        accounts.filter(
            Account.region.over(Account.username, mapping_strategy="join")
            .list.contains("US")
        )
        ```
        """
        return IntermediateExpr(
            self.expr.over(
                *(_pl_expr(e) for e in exprs),
                order_by=(_pl_expr(e) for e in order_by) if order_by else None,
                mapping_strategy=mapping_strategy,
            )
        )

    @staticmethod
    def _normalize_mapping[A, B](mapping: Mapping[A, B]) -> Mapping[A, B]:
        # To avoid processing large mappings we don't need to, we can just check one element, since
        # we should assume all element
        if mapping and builtins.any(isinstance(x, enum.Enum) for x in first(mapping.items(), ())):
            return {normalize_enum(k): normalize_enum(v) for k, v in mapping.items()}
        return mapping

    def replace(
        self,
        mapping: Mapping[_T, _T],
        *,
        default: ExoExpr[_S_contra, _T] | _T | None = None,
        or_null: bool = False,
    ) -> Expr[_S_contra, _R_contra, _T]:
        """
        Translate the values in the column using the given lookup table. If the lookup fails,
        this preserves the current value, use `default` or `or_null` to change this behaviour.
        """
        kwargs = _ReplaceKwargs()
        if default is not None:
            kwargs["default"] = _pl_expr(default)
        elif or_null:
            kwargs["default"] = None
        replace = self.expr.replace_strict if "default" in kwargs else self.expr.replace
        mapping = self._normalize_mapping(mapping)

        return IntermediateExpr(replace(mapping, **kwargs))

    def replace_to[Q: Shape, U](
        self,
        mapping: Mapping[_T, U],
        to: BoundDimension[Q, U],
        *,
        default: ExoExpr[_S_contra, U] | U | None = None,
        or_null: bool = False,
    ) -> Expr[_S_contra, Q, U]:
        """
        Translate the values in the column using the given lookup table

        This is the `_to` variant, which is allowed to change type but must be mapped to a new
        column

        Unlike `replace`, all values must be mapped or a default must be set, since the column type
        can change
        """
        kwargs = _ReplaceKwargs()
        if default is not None:
            kwargs["default"] = _pl_expr(default)
        elif or_null:
            kwargs["default"] = None
        mapping = self._normalize_mapping(mapping)

        return IntermediateExpr(
            self.expr.replace_strict(mapping, return_dtype=to.ty.pl_ty, **kwargs).alias(to.name)
        )

    def replace_out[U](
        self,
        mapping: Mapping[_T, U],
        ty: Typeable[U],
        *,
        default: ExoExpr[_S_contra, U] | U | None = None,
        or_null: bool = False,
    ) -> MesoExpr[_S_contra, U]:
        """
        Translate the values in the column using the given lookup table

        This is the `_out` variant, which is allowed to change type but must be mapped to a new
        column with `.to` if it is to be stored in a dataframe

        Unlike `replace`, all values must be mapped or a default must be set, since the column type
        is changing
        """
        kwargs = _ReplaceKwargs()
        if default is not None:
            kwargs["default"] = _pl_expr(default)
        elif or_null:
            kwargs["default"] = None
        mapping = self._normalize_mapping(mapping)

        return IntermediateExpr(
            self.expr.replace_strict(mapping, return_dtype=from_typeable(ty).pl_ty, **kwargs)
        )

    def map_to[Q: Shape, U](
        self, transform: Callable[[_T], U | None], to: BoundDimension[Q, U]
    ) -> Expr[_S_contra, Q, U]:
        """
        Apply a Python transformation to the values in a column. This has to be mapped straight to a
        dimension to know the polars datatype of the output. This limitation shouldn't be too
        disruptive since the `transform` function in Python should be able to get it into its final
        form, and since after being mapped to a column it can continue to be operated on
        """
        return IntermediateExpr(self.expr.map_elements(transform, to.ty.pl_ty).alias(to.name))

    def map_out[U](
        self, transform: Callable[[_T], U | None], ty: Typeable[U]
    ) -> MesoExpr[_S_contra, U]:
        """
        Apply a Python transformation to the values in a column. This has to be mapped straight to a
        dimension to know the polars datatype of the output. This limitation shouldn't be too
        disruptive since the `transform` function in Python should be able to get it into its final
        form, and since after being mapped to a column it can continue to be operated on
        """
        return IntermediateExpr(self.expr.map_elements(transform, from_typeable(ty).pl_ty))

    def cast(self, ty: Typeable[_T], *, strict: bool = True) -> Expr[_S_contra, _R_contra, _T]:
        """
        Cast the values in the column whilst preserving the type, useful if two polars
        representation have the same Python type (e.g. `Float32` and `Float64`)

        This is the `_out` variant, which is allowed to change type but must be mapped to a new
        column if it is to be stored in a dataframe
        """
        return IntermediateExpr(self.expr.cast(from_typeable(ty).pl_ty, strict=strict))

    def cast_out[U](self, ty: Typeable[U], *, strict: bool = True) -> MesoExpr[_S_contra, U]:
        """
        Cast the values in the column to change the type, i.e. casting ints to strings.

        This is the `_out` variant, which is allowed to change type but must be mapped to a new
        column if it is to be stored in a dataframe
        """
        return IntermediateExpr(self.expr.cast(from_typeable(ty).pl_ty, strict=strict))

    def __or__[SA: Shape](
        self: Expr[_S_contra, _R_contra, bool], other: ExoExpr[SA, bool] | bool
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, bool]:
        return IntermediateExpr(self.expr | _pl_expr(other))

    def __and__[SA: Shape](
        self: Expr[_S_contra, _R_contra, bool], other: ExoExpr[SA, bool] | bool
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, bool]:
        return IntermediateExpr(self.expr & _pl_expr(other))

    @overload
    def __add__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]: ...
    @overload
    def __add__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, int] | int
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __add__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, N] | N
    ) -> MesoExpr[Intersection[_S_contra, SA], N]: ...
    @overload
    def __add__[SA: Shape, D: datetime.date | datetime.datetime](
        self: Expr[_S_contra, _R_contra, D],
        other: ExoExpr[SA, datetime.timedelta] | datetime.timedelta,
    ) -> MesoExpr[Intersection[_S_contra, SA], D]: ...

    def __add__(self, other: Expr | object) -> Expr:
        return IntermediateExpr(self.expr + _pl_expr(other))

    @overload
    def __sub__[SA: Shape](
        self: Expr[_S_contra, _R_contra, datetime.date | datetime.datetime],
        other: ExoExpr[SA, datetime.date | datetime.datetime] | datetime.date | datetime.datetime,
    ) -> MesoExpr[Intersection[_S_contra, SA], datetime.timedelta]: ...
    @overload
    def __sub__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]: ...
    @overload
    def __sub__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, int] | int
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __sub__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, N] | N
    ) -> MesoExpr[Intersection[_S_contra, SA], N]: ...
    @overload
    def __sub__[SA: Shape, D: datetime.date | datetime.datetime](
        self: Expr[_S_contra, _R_contra, D],
        other: ExoExpr[SA, datetime.timedelta] | datetime.timedelta,
    ) -> MesoExpr[Intersection[_S_contra, SA], D]: ...

    def __sub__(self, other: Expr | object) -> Expr:
        return IntermediateExpr(self.expr - _pl_expr(other))

    @overload
    def __mul__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]: ...
    @overload
    def __mul__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, int] | int
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __mul__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, N] | N
    ) -> MesoExpr[Intersection[_S_contra, SA], N]: ...

    def __mul__(self, other: Expr | object) -> Expr:
        return IntermediateExpr(self.expr * _pl_expr(other))

    @overload
    def __pow__[SA: Shape](
        self, other: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]: ...
    @overload
    def __pow__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, int] | int
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __pow__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, N] | N
    ) -> MesoExpr[Intersection[_S_contra, SA], N]: ...

    def __pow__(self, other: Expr | object) -> Expr:
        return IntermediateExpr(self.expr ** _pl_expr(other))

    @overload
    def __truediv__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, N] | N
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __truediv__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, int] | int
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __truediv__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, N] | N
    ) -> MesoExpr[Intersection[_S_contra, SA], N]: ...
    @overload
    def __truediv__[SA: Shape](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, int] | int
    ) -> MesoExpr[Intersection[_S_contra, SA], float]: ...  # Ints give floats when divided

    def __truediv__(self, other: Expr | object) -> Expr:
        return IntermediateExpr(self.expr / _pl_expr(other))

    @overload
    def __floordiv__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, N] | N
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __floordiv__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, N], other: ExoExpr[SA, int] | int
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, N]: ...
    @overload
    def __floordiv__[SA: Shape, N: float | Decimal](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, N] | N
    ) -> MesoExpr[Intersection[_S_contra, SA], N]: ...
    @overload
    def __floordiv__[SA: Shape](
        self: Expr[_S_contra, _R_contra, int], other: ExoExpr[SA, int] | int
    ) -> MesoExpr[Intersection[_S_contra, SA], float]: ...  # Ints give floats when divided

    def __floordiv__(self, other: Expr | object) -> Expr:
        return IntermediateExpr(self.expr // _pl_expr(other))

    def __neg__[N: (float, Decimal, datetime.timedelta, int)](
        self: Expr[_S_contra, _R_contra, N],
    ) -> Expr[_S_contra, _R_contra, N]:
        return IntermediateExpr(-self.expr)

    def __invert__(self: Expr[_S_contra, _R_contra, bool]) -> Expr[_S_contra, _R_contra, bool]:
        return IntermediateExpr(~self.expr)

    def abs[N: float | Decimal | int](
        self: Expr[_S_contra, _R_contra, N],
    ) -> Expr[_S_contra, _R_contra, N]:
        return IntermediateExpr(self.expr.abs())

    def is_unique(self) -> MesoExpr[_S_contra, bool]:
        return IntermediateExpr(self.expr.is_unique())

    def coalesce[SA: Shape](
        self,
        *others: ExoExpr[_S_contra, _T]
        | ExoExpr[SA, _T]
        | ExoExpr[Intersection[_S_contra, SA], _T]
        | _T,
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return IntermediateExpr(pl.coalesce(self.expr, *(_pl_expr(e) for e in others)))

    def null_when[SA: Shape](
        self,
        *conditions: ExoExpr[_S_contra, bool]
        | ExoExpr[SA, bool]
        | ExoExpr[Intersection[_S_contra, SA], bool],
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return when(*conditions).otherwise(self)

    def null_when_eq[Q: Shape](
        self, expr: ExoExpr[Q, _T] | _T
    ) -> Expr[Intersection[_S_contra, Q], _R_contra, _T]:
        """Replace any value equalling `expr` with null. E.g. `.null_when_eq("NOT SET")`"""
        return when(self.eq(expr)).otherwise(self)

    def null_insignificant(self) -> Expr[_S_contra, _R_contra, _T]:
        """Replace 0 and `nan` with null"""
        return when(self.is_significant()).then(self)

    def max_horizontal[SA: Shape](
        self, *others: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return IntermediateExpr(pl.max_horizontal(self.expr, *map(_pl_expr, others)))

    def min_horizontal[SA: Shape](
        self, *others: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return IntermediateExpr(pl.min_horizontal(self.expr, *map(_pl_expr, others)))

    def repeat_by[SA: Shape](
        self, by: ExoExpr[SA, int] | int
    ) -> IntermediateExpr[Intersection[_S_contra, SA], _R_contra, builtins.list[_T]]:
        """
        Create a list of the element repeated `by` times. Also useful for constructing singleton
        lists with `.repeat_by(0)`
        """
        return IntermediateExpr(self.expr.repeat_by(_pl_expr(by)))

    @property
    def struct[M: Shape](
        self: Expr[_S_contra, _R_contra, StructMapping[M]],
    ) -> StructExprNamespace[_S_contra, _R_contra, M]:
        return StructExprNamespace(self)

    @property
    def str(
        self: Expr[_S_contra, _R_contra, builtins.str],
    ) -> StrExprNamespace[_S_contra, _R_contra, builtins.str]:
        return StrExprNamespace(self.cast(STRING))

    @property
    def dt[D: (datetime.datetime, datetime.date)](
        self: Expr[_S_contra, _R_contra, D],
    ) -> DtExprNamespace[_S_contra, _R_contra, D]:
        return DtExprNamespace(self)

    @property
    def list[A](
        self: Expr[_S_contra, _R_contra, builtins.list[A]],
    ) -> ListExprNamespace[_S_contra, _R_contra, A]:
        return ListExprNamespace(self)

    @overload
    def on[S: Shape, Q: Shape](self: ExoExpr[S | Q, Any]) -> JoinOn[S, Q, _T]: ...
    @overload
    def on[Q: Shape](self, other: ExoExpr[Q, _T]) -> JoinOn[_S_contra, Q, _T]: ...

    def on(self, other: ExoExpr | None = None) -> JoinOn:
        return JoinOn(self, other if other is not None else self)

agg()

Collect all values in the given group into a list

Source code in typol/expr.py
def agg(self) -> MesoAggExpr[_S_contra, builtins.list[_T]]:
    """Collect all values in the given group into a list"""
    return AggExpr(self.expr)

cast(ty, *, strict=True)

Cast the values in the column whilst preserving the type, useful if two polars representation have the same Python type (e.g. Float32 and Float64)

This is the _out variant, which is allowed to change type but must be mapped to a new column if it is to be stored in a dataframe

Source code in typol/expr.py
def cast(self, ty: Typeable[_T], *, strict: bool = True) -> Expr[_S_contra, _R_contra, _T]:
    """
    Cast the values in the column whilst preserving the type, useful if two polars
    representation have the same Python type (e.g. `Float32` and `Float64`)

    This is the `_out` variant, which is allowed to change type but must be mapped to a new
    column if it is to be stored in a dataframe
    """
    return IntermediateExpr(self.expr.cast(from_typeable(ty).pl_ty, strict=strict))

cast_out(ty, *, strict=True)

Cast the values in the column to change the type, i.e. casting ints to strings.

This is the _out variant, which is allowed to change type but must be mapped to a new column if it is to be stored in a dataframe

Source code in typol/expr.py
def cast_out[U](self, ty: Typeable[U], *, strict: bool = True) -> MesoExpr[_S_contra, U]:
    """
    Cast the values in the column to change the type, i.e. casting ints to strings.

    This is the `_out` variant, which is allowed to change type but must be mapped to a new
    column if it is to be stored in a dataframe
    """
    return IntermediateExpr(self.expr.cast(from_typeable(ty).pl_ty, strict=strict))

gather_every(n, offset=0)

Collect all values in the given group into a list

Source code in typol/expr.py
def gather_every(self, n: int, offset: int = 0) -> MesoAggExpr[_S_contra, builtins.list[_T]]:
    """Collect all values in the given group into a list"""
    return AggExpr(self.expr.gather_every(n, offset))

implode()

Group all the elements into a single. This resizes the resultant series to a single element.

Note: Since this resizes the result, it is unsafe to simply map back to a column. Instead, it's useful for creating arrays for intermediates used in ExoExprs. You might want agg instead to create a aggregated list out of elements

Source code in typol/expr.py
def implode(self: MesoExpr[_S_contra, _T]) -> MesoExpr[_S_contra, builtins.list[_T]]:
    """
    Group all the elements into a single. This resizes the resultant series to a single element.

    Note: Since this resizes the result, it is unsafe to simply map back to a column. Instead,
    it's useful for creating arrays for intermediates used in `ExoExpr`s. You might want
    `agg` instead to create a aggregated list out of elements
    """
    return IntermediateExpr(self.expr.implode())

is_between(start, end, closed='both')

Check if this expression is between the given lower and upper bounds

Source code in typol/expr.py
def is_between[SA: Shape](
    self,
    start: ExoExpr[SA, _T] | _T,
    end: ExoExpr[SA, _T] | _T,
    closed: Literal["both", "left", "right", "none"] = "both",
) -> MesoExpr[Intersection[_S_contra, SA], bool]:
    """Check if this expression is between the given lower and upper bounds"""
    return IntermediateExpr(self.expr.is_between(_pl_expr(start), _pl_expr(end), closed))

is_significant()

Is the numeric value a significant number, not nan, 0 or null

Source code in typol/expr.py
def is_significant[N: (int, float)](
    self: Expr[_S_contra, _R_contra, N],
) -> MesoExpr[_S_contra, bool]:
    """Is the numeric value a significant number, not `nan`, `0` or `null`"""
    return IntermediateExpr(self.expr.is_not_nan() & self.expr.is_not_null() & self.expr.ne(0))

map_out(transform, ty)

Apply a Python transformation to the values in a column. This has to be mapped straight to a dimension to know the polars datatype of the output. This limitation shouldn't be too disruptive since the transform function in Python should be able to get it into its final form, and since after being mapped to a column it can continue to be operated on

Source code in typol/expr.py
def map_out[U](
    self, transform: Callable[[_T], U | None], ty: Typeable[U]
) -> MesoExpr[_S_contra, U]:
    """
    Apply a Python transformation to the values in a column. This has to be mapped straight to a
    dimension to know the polars datatype of the output. This limitation shouldn't be too
    disruptive since the `transform` function in Python should be able to get it into its final
    form, and since after being mapped to a column it can continue to be operated on
    """
    return IntermediateExpr(self.expr.map_elements(transform, from_typeable(ty).pl_ty))

map_to(transform, to)

Apply a Python transformation to the values in a column. This has to be mapped straight to a dimension to know the polars datatype of the output. This limitation shouldn't be too disruptive since the transform function in Python should be able to get it into its final form, and since after being mapped to a column it can continue to be operated on

Source code in typol/expr.py
def map_to[Q: Shape, U](
    self, transform: Callable[[_T], U | None], to: BoundDimension[Q, U]
) -> Expr[_S_contra, Q, U]:
    """
    Apply a Python transformation to the values in a column. This has to be mapped straight to a
    dimension to know the polars datatype of the output. This limitation shouldn't be too
    disruptive since the `transform` function in Python should be able to get it into its final
    form, and since after being mapped to a column it can continue to be operated on
    """
    return IntermediateExpr(self.expr.map_elements(transform, to.ty.pl_ty).alias(to.name))

null_insignificant()

Replace 0 and nan with null

Source code in typol/expr.py
def null_insignificant(self) -> Expr[_S_contra, _R_contra, _T]:
    """Replace 0 and `nan` with null"""
    return when(self.is_significant()).then(self)

null_when_eq(expr)

Replace any value equalling expr with null. E.g. .null_when_eq("NOT SET")

Source code in typol/expr.py
def null_when_eq[Q: Shape](
    self, expr: ExoExpr[Q, _T] | _T
) -> Expr[Intersection[_S_contra, Q], _R_contra, _T]:
    """Replace any value equalling `expr` with null. E.g. `.null_when_eq("NOT SET")`"""
    return when(self.eq(expr)).otherwise(self)

over(*exprs, order_by=None, mapping_strategy='group_to_rows')

over(
    *exprs: ExoExpr[_S_contra, Any]
    | ExoExpr[Q, Any]
    | ExoExpr[Intersection[_S_contra, Q], Any],
    order_by: Iterable[
        ExoExpr[_S_contra, Any]
        | ExoExpr[Q, Any]
        | ExoExpr[Intersection[_S_contra, Q], Any]
    ]
    | None = None,
    mapping_strategy: Literal[
        "group_to_rows"
    ] = "group_to_rows",
) -> MesoExpr[Intersection[_S_contra, Q], _T]
over(
    *exprs: ExoExpr[_S_contra, Any]
    | ExoExpr[Q, Any]
    | ExoExpr[Intersection[_S_contra, Q], Any],
    order_by: Iterable[
        ExoExpr[_S_contra, Any]
        | ExoExpr[Q, Any]
        | ExoExpr[Intersection[_S_contra, Q], Any]
    ]
    | None = None,
    mapping_strategy: Literal["join"],
) -> MesoExpr[
    Intersection[_S_contra, Q], builtins.list[_T]
]

Specify the expression is for the keyed group of the table. I.e.,

Specify the expression is for a window (i.e. bucket) of values keyed on by exprs. "group_to_rows" matches values up to the current rows, "join" implodes the group and matches this list back to each of the the source rows:

# Find accounts with a US-region account with the same username
accounts.filter(
    Account.region.over(Account.username, mapping_strategy="join")
    .list.contains("US")
)
Source code in typol/expr.py
def over(
    self,
    *exprs: ExoExpr,
    order_by: Iterable | None = None,
    mapping_strategy: Literal["group_to_rows", "join"] = "group_to_rows",
) -> MesoExpr:
    """
    Specify the expression is for the keyed group of the table. I.e.,

    Specify the expression is for a window (i.e. bucket) of values keyed on by `exprs`.
    "group_to_rows" matches values up to the current rows, "join" *implodes* the group and
    matches this list back to each of the the source rows:

    ```
    # Find accounts with a US-region account with the same username
    accounts.filter(
        Account.region.over(Account.username, mapping_strategy="join")
        .list.contains("US")
    )
    ```
    """
    return IntermediateExpr(
        self.expr.over(
            *(_pl_expr(e) for e in exprs),
            order_by=(_pl_expr(e) for e in order_by) if order_by else None,
            mapping_strategy=mapping_strategy,
        )
    )

repeat_by(by)

Create a list of the element repeated by times. Also useful for constructing singleton lists with .repeat_by(0)

Source code in typol/expr.py
def repeat_by[SA: Shape](
    self, by: ExoExpr[SA, int] | int
) -> IntermediateExpr[Intersection[_S_contra, SA], _R_contra, builtins.list[_T]]:
    """
    Create a list of the element repeated `by` times. Also useful for constructing singleton
    lists with `.repeat_by(0)`
    """
    return IntermediateExpr(self.expr.repeat_by(_pl_expr(by)))

replace(mapping, *, default=None, or_null=False)

Translate the values in the column using the given lookup table. If the lookup fails, this preserves the current value, use default or or_null to change this behaviour.

Source code in typol/expr.py
def replace(
    self,
    mapping: Mapping[_T, _T],
    *,
    default: ExoExpr[_S_contra, _T] | _T | None = None,
    or_null: bool = False,
) -> Expr[_S_contra, _R_contra, _T]:
    """
    Translate the values in the column using the given lookup table. If the lookup fails,
    this preserves the current value, use `default` or `or_null` to change this behaviour.
    """
    kwargs = _ReplaceKwargs()
    if default is not None:
        kwargs["default"] = _pl_expr(default)
    elif or_null:
        kwargs["default"] = None
    replace = self.expr.replace_strict if "default" in kwargs else self.expr.replace
    mapping = self._normalize_mapping(mapping)

    return IntermediateExpr(replace(mapping, **kwargs))

replace_out(mapping, ty, *, default=None, or_null=False)

Translate the values in the column using the given lookup table

This is the _out variant, which is allowed to change type but must be mapped to a new column with .to if it is to be stored in a dataframe

Unlike replace, all values must be mapped or a default must be set, since the column type is changing

Source code in typol/expr.py
def replace_out[U](
    self,
    mapping: Mapping[_T, U],
    ty: Typeable[U],
    *,
    default: ExoExpr[_S_contra, U] | U | None = None,
    or_null: bool = False,
) -> MesoExpr[_S_contra, U]:
    """
    Translate the values in the column using the given lookup table

    This is the `_out` variant, which is allowed to change type but must be mapped to a new
    column with `.to` if it is to be stored in a dataframe

    Unlike `replace`, all values must be mapped or a default must be set, since the column type
    is changing
    """
    kwargs = _ReplaceKwargs()
    if default is not None:
        kwargs["default"] = _pl_expr(default)
    elif or_null:
        kwargs["default"] = None
    mapping = self._normalize_mapping(mapping)

    return IntermediateExpr(
        self.expr.replace_strict(mapping, return_dtype=from_typeable(ty).pl_ty, **kwargs)
    )

replace_to(mapping, to, *, default=None, or_null=False)

Translate the values in the column using the given lookup table

This is the _to variant, which is allowed to change type but must be mapped to a new column

Unlike replace, all values must be mapped or a default must be set, since the column type can change

Source code in typol/expr.py
def replace_to[Q: Shape, U](
    self,
    mapping: Mapping[_T, U],
    to: BoundDimension[Q, U],
    *,
    default: ExoExpr[_S_contra, U] | U | None = None,
    or_null: bool = False,
) -> Expr[_S_contra, Q, U]:
    """
    Translate the values in the column using the given lookup table

    This is the `_to` variant, which is allowed to change type but must be mapped to a new
    column

    Unlike `replace`, all values must be mapped or a default must be set, since the column type
    can change
    """
    kwargs = _ReplaceKwargs()
    if default is not None:
        kwargs["default"] = _pl_expr(default)
    elif or_null:
        kwargs["default"] = None
    mapping = self._normalize_mapping(mapping)

    return IntermediateExpr(
        self.expr.replace_strict(mapping, return_dtype=to.ty.pl_ty, **kwargs).alias(to.name)
    )

to_out(label)

The _out variant of to lets you rename a column, but it must be renamed again before it can be stored in a shape. However, if you're going out to a file, this controls the output column name, so is most useful with transform_write_csv

Source code in typol/expr.py
def to_out(self, label: builtins.str) -> Expr[_S_contra, Shape, _T]:
    """
    The _out variant of `to` lets you rename a column, but it must be renamed again before it
    can be stored in a shape. However, if you're  going out to a file, this controls the output
    column name, so is most useful with `transform_write_csv`
    """
    return IntermediateExpr(self.expr.alias(label))

Initializer dataclass

Bases: Expr[Any, _S_contra, _T]

Used in Entry.of to allow constructing rows where the dimension matches the assigned column value

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class Initializer(Expr[Any, _S_contra, _T]):
    """
    Used in `Entry.of` to allow constructing rows where the dimension matches the assigned column
    value
    """

    dimension: BoundDimension[_S_contra, _T]
    value: _T | None

    @property
    def expr(self) -> pl.Expr:
        return Expr.lit(self.value).to(self.dimension).expr

IntermediateExpr dataclass

Bases: Expr[S, R, T]

An expression created from another expression, this just stores the polars expression generated from whatever operation has been applied to the last expression

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class IntermediateExpr[S: Shape, R: Shape, T](Expr[S, R, T]):
    """
    An expression created from another expression, this just stores the polars expression generated
    from whatever operation has been applied to the last expression
    """

    expr: pl.Expr = dataclasses.field()

JoinOn dataclass

Bases: Generic[_S_contra, _R_contra, _T]

Represents a requirement for left and right to be equal for two rows to join

Source code in typol/expr.py
@dataclasses.dataclass
class JoinOn(Generic[_S_contra, _R_contra, _T]):
    """Represents a requirement for `left` and `right` to be equal for two rows to join"""

    left: ExoExpr[_S_contra, _T]
    right: ExoExpr[_R_contra, _T]

ListExprNamespace dataclass

Namespace for list functions, similar to pl.Expr.list

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class ListExprNamespace[S: Shape, R: Shape, T]:
    """Namespace for list functions, similar to `pl.Expr.list`"""

    expr: Expr[S, R, list[T]]

    def explode_to[Q: Shape](self, to: BoundDimension[Q, T]) -> Explosion[S, Q, T]:
        """
        Explode the entire dataframe around for this list column, creating a new row for every list
        entry in a existing row
        """
        return Explosion(self.expr, to)

    def unique(self) -> Expr[S, R, list[T]]:
        """Remove duplicate elements from the list"""
        return IntermediateExpr(self.expr.expr.list.unique())

    def sort(self, descending: bool = False) -> Expr[S, R, list[T]]:
        """Order the eleements of the list"""
        return IntermediateExpr(self.expr.expr.list.sort(descending=descending))

    def explode(self) -> MesoExpr[S, T]:
        """
        Flatten a list expression into one entry per list element. This resizes the resultant series
        to the sum of the length of the lists.

        Note: Since this resizes the result, it is unsafe to simply map back to a column. Instead,
        it's useful for flattening arrays before applying some aggregate expression
        """
        return IntermediateExpr(self.expr.expr.list.explode())

    def set_difference[Q: Shape](
        self, other: ExoExpr[Q, list[T]] | list[T]
    ) -> Expr[Intersection[S, Q], R, list[T]]:
        return IntermediateExpr(self.expr.expr.list.set_difference(_pl_expr(other)))

    def set_intersection[Q: Shape](
        self, other: ExoExpr[Q, list[T]] | list[T]
    ) -> Expr[Intersection[S, Q], R, list[T]]:
        return IntermediateExpr(self.expr.expr.list.set_intersection(_pl_expr(other)))

    def set_union[Q: Shape](
        self, other: ExoExpr[Q, list[T]] | list[T]
    ) -> Expr[Intersection[S, Q], R, list[T]]:
        return IntermediateExpr(self.expr.expr.list.set_union(_pl_expr(other)))

    def contains[Q: Shape](self, other: ExoExpr[Q, T] | T) -> MesoExpr[Intersection[S, Q], bool]:
        return IntermediateExpr(self.expr.expr.list.contains(_pl_expr(other)))

    def len(self) -> MesoExpr[S, int]:
        return IntermediateExpr(self.expr.expr.list.len())

    def drop_nulls(self) -> Expr[S, R, list[T]]:
        return IntermediateExpr(self.expr.expr.list.drop_nulls())

    def concat[Q: Shape](
        self, other: ExoExpr[Q, list[T]] | list[T]
    ) -> Expr[Intersection[S, Q], R, list[T]]:
        return IntermediateExpr(self.expr.expr.list.concat(_pl_expr(other)))

    def head[Q: Shape](self, n: int | ExoExpr[Q, int]) -> Expr[Intersection[S, Q], R, list[T]]:
        return IntermediateExpr(self.expr.expr.list.head(_pl_expr(n)))

    def get[Q: Shape](
        self, index: ExoExpr[Q, int] | int, null_on_oob: bool = False
    ) -> Expr[S, R, T]:
        return IntermediateExpr(self.expr.expr.list.get(_pl_expr(index), null_on_oob=null_on_oob))

    def join[Q: Shape](
        self: ListExprNamespace[S, R, str], sep: ExoExpr[Q, str] | str
    ) -> Expr[Intersection[S, Q], R, str]:
        return IntermediateExpr(self.expr.expr.list.join(_pl_expr(sep)))

    def eval[U](self, expr: ExoExpr[Element[T], U]) -> MesoExpr[S, list[U]]:
        """
        Evaluate an expression against each element of the list, effectively `map` but for Polars.

        Imagine we had the ages for various family members, and we wanted to know the age in 5
        years time:

        +---------+-----------+
        | surname |   ages    |
        +---------+-----------+
        | Baggins | [111, 33] |
        | Gamgee  | [38]      |
        +---------+-----------+

        To transform these ages, we could do:

        ```python
        five_years_from_now = families.with_columns(
            Family.ages.eval(Element.element() + 5).to(Family.ages)
        )
        ```
        """
        return IntermediateExpr(self.expr.expr.list.eval(expr.expr))

eval(expr)

Evaluate an expression against each element of the list, effectively map but for Polars.

Imagine we had the ages for various family members, and we wanted to know the age in 5 years time:

+---------+-----------+ | surname | ages | +---------+-----------+ | Baggins | [111, 33] | | Gamgee | [38] | +---------+-----------+

To transform these ages, we could do:

five_years_from_now = families.with_columns(
    Family.ages.eval(Element.element() + 5).to(Family.ages)
)
Source code in typol/expr.py
def eval[U](self, expr: ExoExpr[Element[T], U]) -> MesoExpr[S, list[U]]:
    """
    Evaluate an expression against each element of the list, effectively `map` but for Polars.

    Imagine we had the ages for various family members, and we wanted to know the age in 5
    years time:

    +---------+-----------+
    | surname |   ages    |
    +---------+-----------+
    | Baggins | [111, 33] |
    | Gamgee  | [38]      |
    +---------+-----------+

    To transform these ages, we could do:

    ```python
    five_years_from_now = families.with_columns(
        Family.ages.eval(Element.element() + 5).to(Family.ages)
    )
    ```
    """
    return IntermediateExpr(self.expr.expr.list.eval(expr.expr))

explode()

Flatten a list expression into one entry per list element. This resizes the resultant series to the sum of the length of the lists.

Note: Since this resizes the result, it is unsafe to simply map back to a column. Instead, it's useful for flattening arrays before applying some aggregate expression

Source code in typol/expr.py
def explode(self) -> MesoExpr[S, T]:
    """
    Flatten a list expression into one entry per list element. This resizes the resultant series
    to the sum of the length of the lists.

    Note: Since this resizes the result, it is unsafe to simply map back to a column. Instead,
    it's useful for flattening arrays before applying some aggregate expression
    """
    return IntermediateExpr(self.expr.expr.list.explode())

explode_to(to)

Explode the entire dataframe around for this list column, creating a new row for every list entry in a existing row

Source code in typol/expr.py
def explode_to[Q: Shape](self, to: BoundDimension[Q, T]) -> Explosion[S, Q, T]:
    """
    Explode the entire dataframe around for this list column, creating a new row for every list
    entry in a existing row
    """
    return Explosion(self.expr, to)

sort(descending=False)

Order the eleements of the list

Source code in typol/expr.py
def sort(self, descending: bool = False) -> Expr[S, R, list[T]]:
    """Order the eleements of the list"""
    return IntermediateExpr(self.expr.expr.list.sort(descending=descending))

unique()

Remove duplicate elements from the list

Source code in typol/expr.py
def unique(self) -> Expr[S, R, list[T]]:
    """Remove duplicate elements from the list"""
    return IntermediateExpr(self.expr.expr.list.unique())

PartialConditional dataclass

Bases: Expr[_S_contra, _R_contra, _T]

The intermediate state where one outcome value has been provided but not the other, which is assumed by default to be null. Use .otherwise to provide the other value, or .when again to construct an if/elif chain

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class PartialConditional(Expr[_S_contra, _R_contra, _T]):
    """
    The intermediate state where one outcome value has been provided but not the other, which
    is assumed by default to be null. Use `.otherwise` to provide the other value, or `.when`
    again to construct an if/elif chain
    """

    expr: polars.expr.whenthen.Then | polars.expr.whenthen.ChainedThen

    def when[SA: Shape](
        self, *conditions: ExoExpr[SA, bool]
    ) -> ChainedWhen[Intersection[_S_contra, SA], _R_contra, _T]:
        return ChainedWhen(self.expr.when(c.expr for c in conditions))

    def otherwise[SA: Shape](
        self, otherwise: ExoExpr[SA, _T] | _T
    ) -> Expr[Intersection[_S_contra, SA], _R_contra, _T]:
        return IntermediateExpr(self.expr.otherwise(_pl_expr(otherwise)))

Projection dataclass

Bases: Generic[_SProjection_contra]

Represent a projection of a potentially wider shape onto just this shape. This is useful for constructing a struct out of a wider shape

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class Projection(Generic[_SProjection_contra]):
    """
    Represent a projection of a potentially wider shape onto just this shape. This is useful for
    constructing a struct out of a wider shape
    """

    shape: type[_SProjection_contra]

    def struct(self) -> MesoExpr[_SProjection_contra, StructMapping[_SProjection_contra]]:
        return IntermediateExpr(pl.struct(self.shape.shape_meta().datatypes.keys()))

Shape

This is the core component of typed polars, that lets you define the static column names and types of a dataframe much like a dataclass.

class Account(Shape):
    name = Dimension(str)
    code = Dimension(int)

defines a two-column dataframe with a str and an int column. Operations can be done on the dataframe using the fields of the shape type, e.g.:

df.filter(Account.code.ne(0), Account.name.str.contains("SIM").not_())
Source code in typol/expr.py
class Shape(metaclass=ShapeType):
    """
    This is the core component of typed polars, that lets you define the static column names and
    types of a dataframe much like a dataclass.

    ```python
    class Account(Shape):
        name = Dimension(str)
        code = Dimension(int)
    ```

    defines a two-column dataframe with a `str` and an `int` column. Operations can be done on the
    dataframe using the fields of the shape type, e.g.:

    ```python
    df.filter(Account.code.ne(0), Account.name.str.contains("SIM").not_())
    ```
    """

    @classmethod
    def shape_meta(cls) -> ShapeMeta[Self]:
        """Access utility functions for inspecting a shape"""
        return ShapeMeta(cls)

shape_meta() classmethod

Access utility functions for inspecting a shape

Source code in typol/expr.py
@classmethod
def shape_meta(cls) -> ShapeMeta[Self]:
    """Access utility functions for inspecting a shape"""
    return ShapeMeta(cls)

ShapeMeta dataclass

A wrapper object that all library level definitions are on to avoid name conflicts with a shape's dimensions

This provides utilities for inspecting the shape and schema

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class ShapeMeta[S: Shape]:
    """
    A wrapper object that all library level definitions are on to avoid name conflicts with a
    shape's dimensions

    This provides utilities for inspecting the shape and schema
    """

    shape: type[S]

    @property
    def datatypes(self) -> Mapping[str, pl.DataType | type]:
        """A mapping from dimension name to their polars data type"""
        return {d.name: d.ty.pl_ty for d in self.dimensions}

    @property
    def schema(self) -> pl.Schema:
        """
        A polars runtime schema to direct it how to configure (and enforce the types on) the
        dataframe
        """
        return pl.Schema(self.datatypes)

    @property
    def dimensions(self) -> Iterable[BoundDimension[S, Any]]:
        """Iterate through the dimensions defined in the shape"""
        # Anything locally overwritten should be considered seen, so not pulled from base classes
        seen = set(vars(self.shape).keys())
        # Iterate through base classes, rather than use `__dict__`, to preserve definition order
        # which must match runtime behaviour
        for cls in self.shape.__bases__:
            if issubclass(cls, Shape) and cls is not Shape:
                for dimension in cls.shape_meta().dimensions:
                    if dimension.name not in seen:
                        yield dataclasses.replace(dimension, shape=self.shape)
                    seen.add(dimension.name)

        # Use vars(...).keys to preserve definition order of locally defined dimensions
        for key in vars(self.shape).keys():
            # Can't use `vars(...).values()` since the descriptor won't be bound, have to getattr
            if isinstance(value := getattr(self.shape, key), BoundDimension):
                yield value

datatypes property

A mapping from dimension name to their polars data type

dimensions property

Iterate through the dimensions defined in the shape

schema property

A polars runtime schema to direct it how to configure (and enforce the types on) the dataframe

ShapeType

Bases: type

Metaclass defining shape-level operators

Source code in typol/expr.py
class ShapeType(type):
    """Metaclass defining shape-level operators"""

    def shape_intersection[S: Shape, Q: Shape](
        cls: type[S], other: type[Q]
    ) -> Intersection[type[S], type[Q]]:
        return cls & other

    def __and__[S: Shape, Q: Shape](cls: type[S], other: type[Q]) -> Intersection[type[S], type[Q]]:
        if cls is other:
            return cls
        return cast(
            "Intersection[S, Q]", ShapeType(f"{cls.__name__}&{other.__name__}", (cls, other), {})
        )

StrExprNamespace dataclass

Namespace for string functions, similar to pl.Expr.str

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class StrExprNamespace[S: Shape, R: Shape, T]:
    """Namespace for string functions, similar to `pl.Expr.str`"""

    expr: Expr[S, R, T]

    def contains[Q: Shape](
        self, substring: ExoExpr[Q, str] | str, literal: bool = False
    ) -> MesoExpr[Intersection[S, Q], bool]:
        """Whether each column value contains the regex (or if `literal` is set, the fixed-string)"""
        substr = _pl_expr(substring)
        return IntermediateExpr(self.expr.expr.str.contains(substr, literal=literal))

    def contains_any[Q: Shape](
        self, substrings: Collection[ExoExpr[Q, str] | str], *, ascii_case_insensitive: bool = False
    ) -> MesoExpr[Intersection[S, Q], bool]:
        """
        Whether each column value contains the regex (or if `ascii_case_insensitive` is set, then
        the match can be either upper or lower case)
        """
        substrs = [_pl_expr(s) for s in substrings]
        return IntermediateExpr(
            self.expr.expr.str.contains_any(substrs, ascii_case_insensitive=ascii_case_insensitive)
        )

    def count_matches[Q: Shape](
        self, substring: ExoExpr[Q, str] | str, literal: bool = False
    ) -> MesoExpr[Intersection[S, Q], int]:
        """
        How many times each column value contains the regex pattern (or if `literal` is set, the
        fixed-string)
        """
        substr = _pl_expr(substring)
        return IntermediateExpr(self.expr.expr.str.count_matches(substr, literal=literal))

    def decode(self, encoding: Literal["hex", "base64"]) -> MesoExpr[S, bytes]:
        return IntermediateExpr(self.expr.expr.str.decode(encoding))

    def encode(self, encoding: Literal["hex", "base64"]) -> EndoExpr[S, str]:
        return IntermediateExpr(self.expr.expr.str.encode(encoding))

    def normalize(self, form: Literal["NFC", "NFKC", "NFD", "NFKD"] = "NFC") -> EndoExpr[S, str]:
        return IntermediateExpr(self.expr.expr.str.normalize(form))

    def escape_regex(self) -> EndoExpr[S, str]:
        """Turn the string into a pattern that will only literally match its value"""
        return IntermediateExpr(self.expr.expr.str.escape_regex())

    def extract[Q: Shape](
        self, pattern: ExoExpr[Q, str] | str, group_index: int = 1
    ) -> EndoExpr[Intersection[S, Q], bool]:
        """Extract the 1st or `group_index`th regex capture group from the column"""
        return IntermediateExpr(self.expr.expr.str.extract(_pl_expr(pattern), group_index))

    def extract_all[Q: Shape](
        self, pattern: ExoExpr[Q, str] | str
    ) -> MesoExpr[Intersection[S, Q], list[str]]:
        """Extract all regex capture group as a list from the column"""
        return IntermediateExpr(self.expr.expr.str.extract_all(_pl_expr(pattern)))

    def extract_groups[Q: Shape](
        self, pattern: ExoExpr[Q, str] | str
    ) -> MesoExpr[Intersection[S, Q], Mapping[str, str]]:
        """Extract all regex capture group as a dict from the column"""
        return IntermediateExpr(self.expr.expr.str.extract_all(_pl_expr(pattern)))

    def extract_many[Q: Shape](
        self,
        patterns: Collection[ExoExpr[Q, str] | str] | ExoExpr[Q, list[str]] | ExoExpr[Q, str] | str,
        *,
        ascii_case_insensitive: bool = False,
        overlapping: bool = False,
        leftmost: bool = False,
    ) -> MesoExpr[Intersection[S, Q], list[str]]:
        """Extract multiple fixed strings from the column"""
        pats = (
            [_pl_expr(e) for e in patterns]
            if isinstance(patterns, Collection) and not isinstance(patterns, str)
            else _pl_expr(patterns)
        )
        return IntermediateExpr(
            self.expr.expr.str.extract_many(
                pats,
                ascii_case_insensitive=ascii_case_insensitive,
                overlapping=overlapping,
                leftmost=leftmost,
            )
        )

    def find[Q: Shape](
        self, substring: ExoExpr[Q, str] | str, *, literal: bool = False, strict: bool = True
    ) -> MesoExpr[Intersection[S, Q], int]:
        """The index of the first match of the regex (or if `literal` is set, the fixed-string)"""
        substr = _pl_expr(substring)
        return IntermediateExpr(self.expr.expr.str.find(substr, literal=literal, strict=strict))

    def find_many[Q: Shape](
        self,
        patterns: ExoExpr[Q, str] | str,
        *,
        ascii_case_insensitive: bool = False,
        overlapping: bool = False,
        leftmost: bool = False,
    ) -> MesoExpr[Intersection[S, Q], list[int]]:
        """The index of the many matches of the fixed-strings"""
        pats = (
            [_pl_expr(e) for e in patterns]
            if isinstance(patterns, Collection) and not isinstance(patterns, str)
            else _pl_expr(patterns)
        )
        return IntermediateExpr(
            self.expr.expr.str.find_many(
                pats,
                ascii_case_insensitive=ascii_case_insensitive,
                overlapping=overlapping,
                leftmost=leftmost,
            )
        )

    def head[Q: Shape](self, n: ExoExpr[Q, int] | int) -> Expr[Intersection[S, Q], S, str]:
        """Take the first `n` characters from each string"""
        return IntermediateExpr(self.expr.expr.str.head(_pl_expr(n)))

    def tail[Q: Shape](self, n: ExoExpr[Q, int] | int) -> Expr[Intersection[S, Q], S, str]:
        """Take the last `n` characters from each string"""
        return IntermediateExpr(self.expr.expr.str.tail(_pl_expr(n)))

    def pad_end[Q: Shape](
        self, length: ExoExpr[Q, int] | int, fill_char: str = " "
    ) -> Expr[Intersection[S, Q], S, str]:
        """Ensure the strings are at least `length` long, adding `fill_char` to make up the difference"""
        return IntermediateExpr(self.expr.expr.str.pad_end(_pl_expr(length), fill_char))

    def pad_start[Q: Shape](
        self, length: ExoExpr[Q, int] | int, fill_char: str = " "
    ) -> Expr[Intersection[S, Q], S, str]:
        """Ensure the strings are at least `length` long, adding `fill_char` to make up the difference"""
        return IntermediateExpr(self.expr.expr.str.pad_start(_pl_expr(length), fill_char))

    def json_path_match[Q: Shape](
        self, json_path: ExoExpr[Q, str] | str
    ) -> Expr[Intersection[S, Q], S, str]:
        """Extract the value from the JSON string at the given path"""
        return IntermediateExpr(self.expr.expr.str.json_path_match(_pl_expr(json_path)))

    def starts_with[Q: Shape](
        self, suffix: ExoExpr[Q, str] | str
    ) -> MesoExpr[Intersection[S, Q], bool]:
        """Whether each column value starts with the given fixed string"""
        return IntermediateExpr(self.expr.expr.str.starts_with(_pl_expr(suffix)))

    def ends_with[Q: Shape](
        self, suffix: ExoExpr[Q, str] | str
    ) -> MesoExpr[Intersection[S, Q], bool]:
        """Whether each column value ends with the given fixed string"""
        return IntermediateExpr(self.expr.expr.str.ends_with(_pl_expr(suffix)))

    def len_chars(self) -> MesoExpr[S, int]:
        """Count the number of unicode characters in the string"""
        return IntermediateExpr(self.expr.expr.str.len_chars())

    def len_bytes(self) -> MesoExpr[S, int]:
        """Count the number of bytes in the string"""
        return IntermediateExpr(self.expr.expr.str.len_bytes())

    def join(self, sep: str) -> AggExpr[S, R, str]:
        """Aggregate a group of strings by interspersing `sep` between them and concatenating"""
        return AggExpr(self.expr.expr.str.join(sep))

    def replace[SA: Shape, SB: Shape](
        self,
        pattern: ExoExpr[SA, str] | str,
        value: ExoExpr[SB, str] | str,
        literal: bool = False,
        n: int = 1,
    ) -> Expr[Intersection[S, SA] | SB, R, str]:
        """
        Replace `n` matches for `pattern` (regex, or fixed-string if `literal` is `True`) with
        `value`
        """
        return IntermediateExpr(
            self.expr.expr.str.replace(_pl_expr(pattern), _pl_expr(value), literal=literal, n=n)
        )

    def replace_all[SA: Shape, SB: Shape](
        self, pattern: ExoExpr[SA, str] | str, value: ExoExpr[SB, str] | str, literal: bool = False
    ) -> Expr[Intersection[S, SA] | SB, R, str]:
        """
        Replace all matches for `pattern` (regex, or fixed-string if `literal` is `True`) with
        `value`
        """
        return IntermediateExpr(
            self.expr.expr.str.replace_all(_pl_expr(pattern), _pl_expr(value), literal=literal)
        )

    def replace_many[SA: Shape, SB: Shape](
        self,
        patterns: Collection[ExoExpr[SA, str] | str]
        | ExoExpr[SA, list[str]]
        | ExoExpr[SA, str]
        | str,
        replace_with: Collection[ExoExpr[SB, str] | str]
        | ExoExpr[SB, list[str]]
        | ExoExpr[SB, str]
        | str,
        *,
        ascii_case_insensitive: bool = False,
        leftmost: bool = False,
    ) -> Expr[Intersection[S, SA] | SB, R, str]:
        """Replace many matches for the fixed-string pattern with `value`"""
        pats = (
            [_pl_expr(e) for e in patterns]
            if isinstance(patterns, Collection) and not isinstance(patterns, str)
            else _pl_expr(patterns)
        )
        rws = (
            [_pl_expr(e) for e in replace_with]
            if isinstance(replace_with, Collection) and not isinstance(replace_with, str)
            else _pl_expr(replace_with)
        )
        return IntermediateExpr(
            self.expr.expr.str.replace_many(
                pats, rws, ascii_case_insensitive=ascii_case_insensitive, leftmost=leftmost
            )
        )

    def strip_chars[SA: Shape](
        self, characters: ExoExpr[SA, str] | str | None = None
    ) -> Expr[Intersection[S, SA], R, str]:
        """Remove leading and trailing characters in the given string. By default removes whitespace"""
        return IntermediateExpr(self.expr.expr.str.strip_chars(_pl_expr_or_none(characters)))

    def strip_chars_end[SA: Shape](
        self, characters: ExoExpr[SA, str] | str | None = None
    ) -> Expr[Intersection[S, SA], R, str]:
        """Remove trailing characters in the given string. By default removes whitespace"""
        return IntermediateExpr(self.expr.expr.str.strip_chars_end(_pl_expr_or_none(characters)))

    def strip_chars_start[SA: Shape](
        self, characters: ExoExpr[SA, str] | str | None = None
    ) -> Expr[Intersection[S, SA], R, str]:
        """Remove leading characters in the given string. By default removes whitespace"""
        return IntermediateExpr(self.expr.expr.str.strip_chars_start(_pl_expr_or_none(characters)))

    def strip_prefix[SA: Shape](
        self, prefix: ExoExpr[SA, str] | str | None = None
    ) -> Expr[Intersection[S, SA], R, str]:
        """Remove leading substring from the given string"""
        return IntermediateExpr(self.expr.expr.str.strip_prefix(_pl_expr_or_none(prefix)))

    def strip_suffix[SA: Shape](
        self, prefix: ExoExpr[SA, str] | str | None = None
    ) -> Expr[Intersection[S, SA], R, str]:
        """Remove trailing substring from the given string"""
        return IntermediateExpr(self.expr.expr.str.strip_suffix(_pl_expr_or_none(prefix)))

    def to_date(self, format: str, *, strict: bool = True) -> MesoExpr[S, datetime.date]:
        return IntermediateExpr(self.expr.expr.str.to_date(format, strict=strict))

    def to_datetime(
        self,
        format: str,
        *,
        time_unit: Literal["ns", "us", "ms"] | None = None,
        strict: bool = True,
    ) -> MesoExpr[S, datetime.date]:
        return IntermediateExpr(
            self.expr.expr.str.to_datetime(format, strict=strict, time_unit=time_unit)
        )

    def to_lowercase(self) -> Expr[S, R, T]:
        return IntermediateExpr(self.expr.expr.str.to_lowercase())

    def to_uppercase(self) -> Expr[S, R, T]:
        return IntermediateExpr(self.expr.expr.str.to_uppercase())

    def to_titlecase(self) -> Expr[S, R, T]:
        return IntermediateExpr(self.expr.expr.str.to_titlecase())

    def reverse(self) -> Expr[S, R, T]:
        return IntermediateExpr(self.expr.expr.str.reverse())

    def to_decimal(self, *, scale: int) -> ExoExpr[S, Decimal]:
        return IntermediateExpr(self.expr.expr.str.to_decimal(scale=scale))

    def to_integer[SA: Shape](
        self, *, base: ExoExpr[SA, int] | int = 10, dtype: Typeable[int] = int, strict: bool = True
    ) -> ExoExpr[Intersection[S, SA], int]:
        return IntermediateExpr(
            self.expr.expr.str.to_integer(
                base=_pl_expr(base),
                dtype=cast(polars.datatypes.IntegerType, from_typeable(dtype).pl_ty),
                strict=strict,
            )
        )

    def is_not_empty(self) -> MesoExpr[S, bool]:
        return IntermediateExpr(self.expr.expr.is_not_null() & self.expr.expr.ne(""))

    def split[SA: Shape](
        self,
        sep: ExoExpr[SA, str] | str,
        *,
        inclusive: bool = False,
        literal: bool = False,
        strict: bool = True,
    ) -> MesoExpr[Intersection[S, SA], list[str]]:
        """Break a string into a list of strings, using `sep` as the separator"""
        return IntermediateExpr(
            self.expr.expr.str.split(
                _pl_expr(sep), inclusive=inclusive, literal=literal, strict=strict
            )
        )

    def splitn[SA: Shape](
        self, sep: ExoExpr[SA, str] | str, n: int
    ) -> MesoExpr[Intersection[S, SA], list[str]]:
        """Break a string into a list of `n` strings, using `sep` as the separator"""
        return IntermediateExpr(self.expr.expr.str.splitn(_pl_expr(sep), n))

    def split_exact[SA: Shape](
        self, sep: ExoExpr[SA, str] | str, n: int, *, inclusive: bool = False
    ) -> MesoExpr[Intersection[S, SA], list[str]]:
        """Break a string into a list of exactly `n` strings, using `sep` as the separator"""
        return IntermediateExpr(
            self.expr.expr.str.split_exact(_pl_expr(sep), n, inclusive=inclusive)
        )

    def slice[SA: Shape, SB: Shape](
        self, offset: ExoExpr[SA, int] | int, length: ExoExpr[SB, int] | int | None = None
    ) -> MesoExpr[Intersection[S, SA, SB], str]:
        """
        Take characters starting from offset, up to `offset + length`, or the end of the string if
        set to `None`
        """
        return IntermediateExpr(
            self.expr.expr.str.slice(
                _pl_expr(offset), _pl_expr(length) if length is not None else None
            )
        )

contains(substring, literal=False)

Whether each column value contains the regex (or if literal is set, the fixed-string)

Source code in typol/expr.py
def contains[Q: Shape](
    self, substring: ExoExpr[Q, str] | str, literal: bool = False
) -> MesoExpr[Intersection[S, Q], bool]:
    """Whether each column value contains the regex (or if `literal` is set, the fixed-string)"""
    substr = _pl_expr(substring)
    return IntermediateExpr(self.expr.expr.str.contains(substr, literal=literal))

contains_any(substrings, *, ascii_case_insensitive=False)

Whether each column value contains the regex (or if ascii_case_insensitive is set, then the match can be either upper or lower case)

Source code in typol/expr.py
def contains_any[Q: Shape](
    self, substrings: Collection[ExoExpr[Q, str] | str], *, ascii_case_insensitive: bool = False
) -> MesoExpr[Intersection[S, Q], bool]:
    """
    Whether each column value contains the regex (or if `ascii_case_insensitive` is set, then
    the match can be either upper or lower case)
    """
    substrs = [_pl_expr(s) for s in substrings]
    return IntermediateExpr(
        self.expr.expr.str.contains_any(substrs, ascii_case_insensitive=ascii_case_insensitive)
    )

count_matches(substring, literal=False)

How many times each column value contains the regex pattern (or if literal is set, the fixed-string)

Source code in typol/expr.py
def count_matches[Q: Shape](
    self, substring: ExoExpr[Q, str] | str, literal: bool = False
) -> MesoExpr[Intersection[S, Q], int]:
    """
    How many times each column value contains the regex pattern (or if `literal` is set, the
    fixed-string)
    """
    substr = _pl_expr(substring)
    return IntermediateExpr(self.expr.expr.str.count_matches(substr, literal=literal))

ends_with(suffix)

Whether each column value ends with the given fixed string

Source code in typol/expr.py
def ends_with[Q: Shape](
    self, suffix: ExoExpr[Q, str] | str
) -> MesoExpr[Intersection[S, Q], bool]:
    """Whether each column value ends with the given fixed string"""
    return IntermediateExpr(self.expr.expr.str.ends_with(_pl_expr(suffix)))

escape_regex()

Turn the string into a pattern that will only literally match its value

Source code in typol/expr.py
def escape_regex(self) -> EndoExpr[S, str]:
    """Turn the string into a pattern that will only literally match its value"""
    return IntermediateExpr(self.expr.expr.str.escape_regex())

extract(pattern, group_index=1)

Extract the 1st or group_indexth regex capture group from the column

Source code in typol/expr.py
def extract[Q: Shape](
    self, pattern: ExoExpr[Q, str] | str, group_index: int = 1
) -> EndoExpr[Intersection[S, Q], bool]:
    """Extract the 1st or `group_index`th regex capture group from the column"""
    return IntermediateExpr(self.expr.expr.str.extract(_pl_expr(pattern), group_index))

extract_all(pattern)

Extract all regex capture group as a list from the column

Source code in typol/expr.py
def extract_all[Q: Shape](
    self, pattern: ExoExpr[Q, str] | str
) -> MesoExpr[Intersection[S, Q], list[str]]:
    """Extract all regex capture group as a list from the column"""
    return IntermediateExpr(self.expr.expr.str.extract_all(_pl_expr(pattern)))

extract_groups(pattern)

Extract all regex capture group as a dict from the column

Source code in typol/expr.py
def extract_groups[Q: Shape](
    self, pattern: ExoExpr[Q, str] | str
) -> MesoExpr[Intersection[S, Q], Mapping[str, str]]:
    """Extract all regex capture group as a dict from the column"""
    return IntermediateExpr(self.expr.expr.str.extract_all(_pl_expr(pattern)))

extract_many(patterns, *, ascii_case_insensitive=False, overlapping=False, leftmost=False)

Extract multiple fixed strings from the column

Source code in typol/expr.py
def extract_many[Q: Shape](
    self,
    patterns: Collection[ExoExpr[Q, str] | str] | ExoExpr[Q, list[str]] | ExoExpr[Q, str] | str,
    *,
    ascii_case_insensitive: bool = False,
    overlapping: bool = False,
    leftmost: bool = False,
) -> MesoExpr[Intersection[S, Q], list[str]]:
    """Extract multiple fixed strings from the column"""
    pats = (
        [_pl_expr(e) for e in patterns]
        if isinstance(patterns, Collection) and not isinstance(patterns, str)
        else _pl_expr(patterns)
    )
    return IntermediateExpr(
        self.expr.expr.str.extract_many(
            pats,
            ascii_case_insensitive=ascii_case_insensitive,
            overlapping=overlapping,
            leftmost=leftmost,
        )
    )

find(substring, *, literal=False, strict=True)

The index of the first match of the regex (or if literal is set, the fixed-string)

Source code in typol/expr.py
def find[Q: Shape](
    self, substring: ExoExpr[Q, str] | str, *, literal: bool = False, strict: bool = True
) -> MesoExpr[Intersection[S, Q], int]:
    """The index of the first match of the regex (or if `literal` is set, the fixed-string)"""
    substr = _pl_expr(substring)
    return IntermediateExpr(self.expr.expr.str.find(substr, literal=literal, strict=strict))

find_many(patterns, *, ascii_case_insensitive=False, overlapping=False, leftmost=False)

The index of the many matches of the fixed-strings

Source code in typol/expr.py
def find_many[Q: Shape](
    self,
    patterns: ExoExpr[Q, str] | str,
    *,
    ascii_case_insensitive: bool = False,
    overlapping: bool = False,
    leftmost: bool = False,
) -> MesoExpr[Intersection[S, Q], list[int]]:
    """The index of the many matches of the fixed-strings"""
    pats = (
        [_pl_expr(e) for e in patterns]
        if isinstance(patterns, Collection) and not isinstance(patterns, str)
        else _pl_expr(patterns)
    )
    return IntermediateExpr(
        self.expr.expr.str.find_many(
            pats,
            ascii_case_insensitive=ascii_case_insensitive,
            overlapping=overlapping,
            leftmost=leftmost,
        )
    )

head(n)

Take the first n characters from each string

Source code in typol/expr.py
def head[Q: Shape](self, n: ExoExpr[Q, int] | int) -> Expr[Intersection[S, Q], S, str]:
    """Take the first `n` characters from each string"""
    return IntermediateExpr(self.expr.expr.str.head(_pl_expr(n)))

join(sep)

Aggregate a group of strings by interspersing sep between them and concatenating

Source code in typol/expr.py
def join(self, sep: str) -> AggExpr[S, R, str]:
    """Aggregate a group of strings by interspersing `sep` between them and concatenating"""
    return AggExpr(self.expr.expr.str.join(sep))

json_path_match(json_path)

Extract the value from the JSON string at the given path

Source code in typol/expr.py
def json_path_match[Q: Shape](
    self, json_path: ExoExpr[Q, str] | str
) -> Expr[Intersection[S, Q], S, str]:
    """Extract the value from the JSON string at the given path"""
    return IntermediateExpr(self.expr.expr.str.json_path_match(_pl_expr(json_path)))

len_bytes()

Count the number of bytes in the string

Source code in typol/expr.py
def len_bytes(self) -> MesoExpr[S, int]:
    """Count the number of bytes in the string"""
    return IntermediateExpr(self.expr.expr.str.len_bytes())

len_chars()

Count the number of unicode characters in the string

Source code in typol/expr.py
def len_chars(self) -> MesoExpr[S, int]:
    """Count the number of unicode characters in the string"""
    return IntermediateExpr(self.expr.expr.str.len_chars())

pad_end(length, fill_char=' ')

Ensure the strings are at least length long, adding fill_char to make up the difference

Source code in typol/expr.py
def pad_end[Q: Shape](
    self, length: ExoExpr[Q, int] | int, fill_char: str = " "
) -> Expr[Intersection[S, Q], S, str]:
    """Ensure the strings are at least `length` long, adding `fill_char` to make up the difference"""
    return IntermediateExpr(self.expr.expr.str.pad_end(_pl_expr(length), fill_char))

pad_start(length, fill_char=' ')

Ensure the strings are at least length long, adding fill_char to make up the difference

Source code in typol/expr.py
def pad_start[Q: Shape](
    self, length: ExoExpr[Q, int] | int, fill_char: str = " "
) -> Expr[Intersection[S, Q], S, str]:
    """Ensure the strings are at least `length` long, adding `fill_char` to make up the difference"""
    return IntermediateExpr(self.expr.expr.str.pad_start(_pl_expr(length), fill_char))

replace(pattern, value, literal=False, n=1)

Replace n matches for pattern (regex, or fixed-string if literal is True) with value

Source code in typol/expr.py
def replace[SA: Shape, SB: Shape](
    self,
    pattern: ExoExpr[SA, str] | str,
    value: ExoExpr[SB, str] | str,
    literal: bool = False,
    n: int = 1,
) -> Expr[Intersection[S, SA] | SB, R, str]:
    """
    Replace `n` matches for `pattern` (regex, or fixed-string if `literal` is `True`) with
    `value`
    """
    return IntermediateExpr(
        self.expr.expr.str.replace(_pl_expr(pattern), _pl_expr(value), literal=literal, n=n)
    )

replace_all(pattern, value, literal=False)

Replace all matches for pattern (regex, or fixed-string if literal is True) with value

Source code in typol/expr.py
def replace_all[SA: Shape, SB: Shape](
    self, pattern: ExoExpr[SA, str] | str, value: ExoExpr[SB, str] | str, literal: bool = False
) -> Expr[Intersection[S, SA] | SB, R, str]:
    """
    Replace all matches for `pattern` (regex, or fixed-string if `literal` is `True`) with
    `value`
    """
    return IntermediateExpr(
        self.expr.expr.str.replace_all(_pl_expr(pattern), _pl_expr(value), literal=literal)
    )

replace_many(patterns, replace_with, *, ascii_case_insensitive=False, leftmost=False)

Replace many matches for the fixed-string pattern with value

Source code in typol/expr.py
def replace_many[SA: Shape, SB: Shape](
    self,
    patterns: Collection[ExoExpr[SA, str] | str]
    | ExoExpr[SA, list[str]]
    | ExoExpr[SA, str]
    | str,
    replace_with: Collection[ExoExpr[SB, str] | str]
    | ExoExpr[SB, list[str]]
    | ExoExpr[SB, str]
    | str,
    *,
    ascii_case_insensitive: bool = False,
    leftmost: bool = False,
) -> Expr[Intersection[S, SA] | SB, R, str]:
    """Replace many matches for the fixed-string pattern with `value`"""
    pats = (
        [_pl_expr(e) for e in patterns]
        if isinstance(patterns, Collection) and not isinstance(patterns, str)
        else _pl_expr(patterns)
    )
    rws = (
        [_pl_expr(e) for e in replace_with]
        if isinstance(replace_with, Collection) and not isinstance(replace_with, str)
        else _pl_expr(replace_with)
    )
    return IntermediateExpr(
        self.expr.expr.str.replace_many(
            pats, rws, ascii_case_insensitive=ascii_case_insensitive, leftmost=leftmost
        )
    )

slice(offset, length=None)

Take characters starting from offset, up to offset + length, or the end of the string if set to None

Source code in typol/expr.py
def slice[SA: Shape, SB: Shape](
    self, offset: ExoExpr[SA, int] | int, length: ExoExpr[SB, int] | int | None = None
) -> MesoExpr[Intersection[S, SA, SB], str]:
    """
    Take characters starting from offset, up to `offset + length`, or the end of the string if
    set to `None`
    """
    return IntermediateExpr(
        self.expr.expr.str.slice(
            _pl_expr(offset), _pl_expr(length) if length is not None else None
        )
    )

split(sep, *, inclusive=False, literal=False, strict=True)

Break a string into a list of strings, using sep as the separator

Source code in typol/expr.py
def split[SA: Shape](
    self,
    sep: ExoExpr[SA, str] | str,
    *,
    inclusive: bool = False,
    literal: bool = False,
    strict: bool = True,
) -> MesoExpr[Intersection[S, SA], list[str]]:
    """Break a string into a list of strings, using `sep` as the separator"""
    return IntermediateExpr(
        self.expr.expr.str.split(
            _pl_expr(sep), inclusive=inclusive, literal=literal, strict=strict
        )
    )

split_exact(sep, n, *, inclusive=False)

Break a string into a list of exactly n strings, using sep as the separator

Source code in typol/expr.py
def split_exact[SA: Shape](
    self, sep: ExoExpr[SA, str] | str, n: int, *, inclusive: bool = False
) -> MesoExpr[Intersection[S, SA], list[str]]:
    """Break a string into a list of exactly `n` strings, using `sep` as the separator"""
    return IntermediateExpr(
        self.expr.expr.str.split_exact(_pl_expr(sep), n, inclusive=inclusive)
    )

splitn(sep, n)

Break a string into a list of n strings, using sep as the separator

Source code in typol/expr.py
def splitn[SA: Shape](
    self, sep: ExoExpr[SA, str] | str, n: int
) -> MesoExpr[Intersection[S, SA], list[str]]:
    """Break a string into a list of `n` strings, using `sep` as the separator"""
    return IntermediateExpr(self.expr.expr.str.splitn(_pl_expr(sep), n))

starts_with(suffix)

Whether each column value starts with the given fixed string

Source code in typol/expr.py
def starts_with[Q: Shape](
    self, suffix: ExoExpr[Q, str] | str
) -> MesoExpr[Intersection[S, Q], bool]:
    """Whether each column value starts with the given fixed string"""
    return IntermediateExpr(self.expr.expr.str.starts_with(_pl_expr(suffix)))

strip_chars(characters=None)

Remove leading and trailing characters in the given string. By default removes whitespace

Source code in typol/expr.py
def strip_chars[SA: Shape](
    self, characters: ExoExpr[SA, str] | str | None = None
) -> Expr[Intersection[S, SA], R, str]:
    """Remove leading and trailing characters in the given string. By default removes whitespace"""
    return IntermediateExpr(self.expr.expr.str.strip_chars(_pl_expr_or_none(characters)))

strip_chars_end(characters=None)

Remove trailing characters in the given string. By default removes whitespace

Source code in typol/expr.py
def strip_chars_end[SA: Shape](
    self, characters: ExoExpr[SA, str] | str | None = None
) -> Expr[Intersection[S, SA], R, str]:
    """Remove trailing characters in the given string. By default removes whitespace"""
    return IntermediateExpr(self.expr.expr.str.strip_chars_end(_pl_expr_or_none(characters)))

strip_chars_start(characters=None)

Remove leading characters in the given string. By default removes whitespace

Source code in typol/expr.py
def strip_chars_start[SA: Shape](
    self, characters: ExoExpr[SA, str] | str | None = None
) -> Expr[Intersection[S, SA], R, str]:
    """Remove leading characters in the given string. By default removes whitespace"""
    return IntermediateExpr(self.expr.expr.str.strip_chars_start(_pl_expr_or_none(characters)))

strip_prefix(prefix=None)

Remove leading substring from the given string

Source code in typol/expr.py
def strip_prefix[SA: Shape](
    self, prefix: ExoExpr[SA, str] | str | None = None
) -> Expr[Intersection[S, SA], R, str]:
    """Remove leading substring from the given string"""
    return IntermediateExpr(self.expr.expr.str.strip_prefix(_pl_expr_or_none(prefix)))

strip_suffix(prefix=None)

Remove trailing substring from the given string

Source code in typol/expr.py
def strip_suffix[SA: Shape](
    self, prefix: ExoExpr[SA, str] | str | None = None
) -> Expr[Intersection[S, SA], R, str]:
    """Remove trailing substring from the given string"""
    return IntermediateExpr(self.expr.expr.str.strip_suffix(_pl_expr_or_none(prefix)))

tail(n)

Take the last n characters from each string

Source code in typol/expr.py
def tail[Q: Shape](self, n: ExoExpr[Q, int] | int) -> Expr[Intersection[S, Q], S, str]:
    """Take the last `n` characters from each string"""
    return IntermediateExpr(self.expr.expr.str.tail(_pl_expr(n)))

StructExprNamespace dataclass

Namespace for struct functions

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class StructExprNamespace[S: Shape, R: Shape, M: Shape]:
    """Namespace for struct functions"""

    expr: Expr[S, R, StructMapping[M]]

    def field[T](self, dim: BoundDimension[M, T]) -> MesoExpr[S, T]:
        return IntermediateExpr(self.expr.expr.struct.field(dim.name))

    def __getitem__[T](self, dim: BoundDimension[M, T]) -> MesoExpr[S, T]:
        return self.field(dim)

    def map_rows_to[T, Q: Shape](
        self, transform: Callable[[Row[M]], T | None], to: BoundDimension[Q, T]
    ) -> Expr[S, Q, T]:
        """Apply a Python transformation on `Row`s to the structs in a column"""
        # TODO(racharm) Use PEP810 lazy imports with py315, needed to avoid circular import issues
        from typol.row import Row  # noqa: PLC0415,RUF100

        return self.expr.map_to(lambda x: transform(Row.from_struct_mapping(x)), to)

map_rows_to(transform, to)

Apply a Python transformation on Rows to the structs in a column

Source code in typol/expr.py
def map_rows_to[T, Q: Shape](
    self, transform: Callable[[Row[M]], T | None], to: BoundDimension[Q, T]
) -> Expr[S, Q, T]:
    """Apply a Python transformation on `Row`s to the structs in a column"""
    # TODO(racharm) Use PEP810 lazy imports with py315, needed to avoid circular import issues
    from typol.row import Row  # noqa: PLC0415,RUF100

    return self.expr.map_to(lambda x: transform(Row.from_struct_mapping(x)), to)

Suffixed

Bases: Shape

A suffixed shape allows modifying a shape with an additional tag after each column name. This is critical in joins to avoid name conflicts, if two similarly named columns would otherwise collide with each other. Use df.suffix() to conveniently add a suffix to an existing dataframe.

To access dimensions of a suffixed shape, use the shape to transform the base shape, i.e.

my_shape = tp.DataFrame(...)
suffixed = my_shape.suffix()
col_a = suffixed[suffixed.s(my_shape.s.a)]

The above is a little clunky to use; suffixed shapes are only intended as brief intermediaries when name conflicts are possible

Source code in typol/expr.py
class Suffixed[S: Shape](Shape):
    """
    A suffixed shape allows modifying a shape with an additional tag after each column name.
    This is critical in joins to avoid name conflicts, if two similarly named columns would
    otherwise collide with each other. Use `df.suffix()` to conveniently add a suffix to an existing
    dataframe.

    To access dimensions of a suffixed shape, use the shape to transform the base shape, i.e.

    ```
    my_shape = tp.DataFrame(...)
    suffixed = my_shape.suffix()
    col_a = suffixed[suffixed.s(my_shape.s.a)]
    ```

    The above is a little clunky to use; suffixed shapes are only intended as brief intermediaries
    when name conflicts are possible
    """

    shape: ClassVar[type[S]]  # type: ignore
    suffix: ClassVar[str]

    @classmethod
    def mapping_to(cls) -> dict[str, str]:
        return {s.name: f"{s.name}{cls.suffix}" for s in cls.shape.shape_meta().dimensions}

    def __new__[T](cls, dim: BoundDimension[S, T]) -> BoundDimension[Self, T]:
        return BoundDimension[Any, T](dim.shape, dim.ty, dim.name + cls.suffix)

    @classmethod
    def __call__[T](cls, dim: BoundDimension[S, T]) -> BoundDimension[Self, T]:
        return cls.__new__(dim)  # ty: ignore[missing-argument]

    @classmethod
    def shape_meta(cls) -> ShapeMeta:
        return SuffixedShapeMeta(cls) if cls.__bases__ == (Suffixed,) else super().shape_meta()

SuffixedShapeMeta dataclass

Bases: ShapeMeta[Suffixed[S]]

Similar to ShapeMeta, but handle renaming the dimensions with the suffix

Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class SuffixedShapeMeta[S: Shape](ShapeMeta[Suffixed[S]]):
    """Similar to `ShapeMeta`, but handle renaming the dimensions with the suffix"""

    shape: type[Suffixed[S]]

    @property
    def dimensions(self) -> Iterable[BoundDimension[Suffixed, Any]]:
        """Iterate through the dimensions defined in the shape"""
        for dimension in self.shape.shape.shape_meta().dimensions:
            yield dataclasses.replace(
                dimension, name=dimension.name + self.shape.suffix, shape=self.shape
            )

dimensions property

Iterate through the dimensions defined in the shape

When dataclass

Bases: Generic[_S_contra]

A condition that can be combined with a value using then and otherwise to construct a conditional expression. Use tp.when rather than When(...) directly

tp.when(some_condition).then(if_true).otherwise(if_false)
Source code in typol/expr.py
@dataclasses.dataclass(frozen=True)
class When(Generic[_S_contra]):
    """
    A condition that can be combined with a value using `then` and `otherwise` to construct a
    conditional expression. Use `tp.when` rather than `When(...)` directly

    ```
    tp.when(some_condition).then(if_true).otherwise(if_false)
    ```
    """

    conditions: Collection[ExoExpr[_S_contra, bool]]

    @overload
    def then[SA: Shape, R: Shape, T](
        self, then: Expr[SA, R, T]
    ) -> PartialConditional[Intersection[_S_contra, SA], R, T]: ...
    @overload
    def then[T: Value](self, then: T) -> PartialConditional[_S_contra, Never, T]: ...

    def then[SA: Shape, R: Shape, T](
        self, then: Expr | T
    ) -> PartialConditional[Intersection[_S_contra, SA], R, T]:
        return PartialConditional(
            pl.when(c.expr for c in self.conditions).then(_expr_or_lit(then).expr)
        )

    @overload
    def otherwise[SA: Shape, R: Shape, T](
        self, otherwise: Expr[SA, R, T]
    ) -> PartialConditional[Intersection[_S_contra, SA], R, T]: ...
    @overload
    def otherwise[T: Value](self, otherwise: T) -> PartialConditional[_S_contra, Never, T]: ...

    def otherwise[SA: Shape, R: Shape, T](
        self, otherwise: Expr | T
    ) -> PartialConditional[Intersection[_S_contra, SA], R, T]:
        return PartialConditional(
            pl.when((~all_horizontal(self.conditions)).expr).then(_expr_or_lit(otherwise).expr)
        )

all_horizontal(*conditions)

and all the given conditions, i.e. when all is true

Source code in typol/expr.py
def all_horizontal[S: Shape, R: Shape](
    *conditions: Expr[S, R, bool] | Iterable[Expr[S, R, bool]],
) -> Expr[S, R, bool]:
    """`and` all the given conditions, i.e. when all is true"""
    return IntermediateExpr(
        pl.all_horizontal(
            *(map(_pl_expr, c) if isinstance(c, Iterable) else _pl_expr(c) for c in conditions)
        )
    )

any_horizontal(*conditions)

or all the given conditions, i.e. when any is true

Source code in typol/expr.py
def any_horizontal[S: Shape, R: Shape](
    *conditions: Expr[S, R, bool] | Iterable[Expr[S, R, bool]],
) -> Expr[S, R, bool]:
    """`or` all the given conditions, i.e. when any is true"""
    return IntermediateExpr(
        pl.any_horizontal(
            *(map(_pl_expr, c) if isinstance(c, Iterable) else _pl_expr(c) for c in conditions)
        )
    )

concat_list(*exprs)

Combine various list expressions into a single list. Also useful for constructing lists, with tp.concat_list([expr1, expr2])

Source code in typol/expr.py
def concat_list[S: Shape, A](
    *exprs: ExoExpr[S, list[A]] | Sequence[A | ExoExpr[S, A]],
) -> MesoExpr[S, list[A]]:
    """
    Combine various list expressions into a single list. Also useful for constructing lists,
    with `tp.concat_list([expr1, expr2])`
    """
    return IntermediateExpr(
        pl.concat_list(
            *(
                _pl_expr(expr) if isinstance(expr, Expr) else [_pl_expr(e) for e in expr]
                for expr in exprs
            )
        )
    )

date_range(start, end, interval='1d', closed='both')

Construct a series from start inclusive to end inclusive

Source code in typol/expr.py
def date_range[S: Shape](
    start: ExoExpr[S, datetime.date] | datetime.date,
    end: ExoExpr[S, datetime.date],
    interval: str | datetime.timedelta = "1d",
    closed: RangeClosed = "both",
) -> MesoExpr[S, datetime.date]:
    """Construct a series from start inclusive to end inclusive"""
    return IntermediateExpr(pl.date_range(_pl_expr(start), _pl_expr(end), interval, closed=closed))

date_ranges(start, end)

Construct a list for each element containing a date range from start to end inclusive

Source code in typol/expr.py
def date_ranges[S: Shape](
    start: ExoExpr[S, datetime.date] | datetime.date, end: ExoExpr[S, datetime.date]
) -> MesoExpr[S, list[datetime.date]]:
    """Construct a list for each element containing a date range from start to end inclusive"""
    return IntermediateExpr(pl.date_ranges(_pl_expr(start), _pl_expr(end)))

datetime_range(start, end, interval, closed='both')

Construct a series from start inclusive to end inclusive

Source code in typol/expr.py
def datetime_range[S: Shape](
    start: ExoExpr[S, datetime.datetime] | datetime.datetime,
    end: ExoExpr[S, datetime.datetime] | datetime.date,
    interval: str | datetime.timedelta,
    closed: RangeClosed = "both",
) -> MesoExpr[S, datetime.date]:
    """Construct a series from start inclusive to end inclusive"""
    return IntermediateExpr(
        pl.datetime_range(_pl_expr(start), _pl_expr(end), interval, closed=closed)
    )

duration(weeks=None, days=None, minutes=None, seconds=None, milliseconds=None, microseconds=None, nanoseconds=None)

Construct a duration, either from literals of column values

weeks = tp.duration(weeks=AccountingPeriod.week_count)
adjusted = Rate.date + tp.duration(days=Rate.adjustment_days, seconds=10)
Source code in typol/expr.py
def duration[S: Shape](
    weeks: int | ExoExpr[S, int] | None = None,
    days: int | ExoExpr[S, int] | None = None,
    minutes: int | ExoExpr[S, int] | None = None,
    seconds: int | ExoExpr[S, int] | None = None,
    milliseconds: int | ExoExpr[S, int] | None = None,
    microseconds: int | ExoExpr[S, int] | None = None,
    nanoseconds: int | ExoExpr[S, int] | None = None,
) -> Expr[S, Never, datetime.timedelta]:
    """
    Construct a duration, either from literals of column values

    ```
    weeks = tp.duration(weeks=AccountingPeriod.week_count)
    adjusted = Rate.date + tp.duration(days=Rate.adjustment_days, seconds=10)
    ```
    """
    return IntermediateExpr(
        pl.duration(
            weeks=_pl_expr_or_none(weeks),
            days=_pl_expr_or_none(days),
            minutes=_pl_expr_or_none(minutes),
            seconds=_pl_expr_or_none(seconds),
            milliseconds=_pl_expr_or_none(milliseconds),
            microseconds=_pl_expr_or_none(microseconds),
            nanoseconds=_pl_expr_or_none(nanoseconds),
        )
    )

int_range(value, end=None, step=1)

Construct a series from start inclusive to end exclusive. If end is unspecified, value is end, otherwise value is start

Source code in typol/expr.py
def int_range[S: Shape, Q: Shape](
    value: ExoAggExpr[S, int] | ExoAggExpr[Intersection[S, Q], int] | int,
    end: ExoAggExpr[Q, int] | ExoAggExpr[Intersection[S, Q], int] | int | None = None,
    step: int = 1,
) -> MesoExpr[S, int]:
    """
    Construct a series from start inclusive to end exclusive. If end is unspecified, `value` is end,
    otherwise `value` is start
    """
    return IntermediateExpr(pl.int_range(_pl_expr(value), _pl_expr_or_none(end), step))

length()

Count the number of rows in a shape or window. This is namespaced under Expr to avoid conflicts with the len builtin

Source code in typol/expr.py
def length() -> MesoAggExpr[Any, int]:
    """
    Count the number of rows in a shape or window. This is namespaced under `Expr` to avoid
    conflicts with the len builtin
    """
    return AggExpr(pl.len())

lit(value)

Create a literal value that can be mapped to a column

Source code in typol/expr.py
def lit[T](value: T | None) -> Expr[Shape, Never, T]:
    """Create a literal value that can be mapped to a column"""
    return Expr.lit(value)

max_horizontal(*exprs)

Max the given exprs, i.e. take the largest. For pl.max, use Expr.max

Source code in typol/expr.py
def max_horizontal[S: Shape, R: Shape, T: Orderable](
    *exprs: Expr[S, R, T] | T | Iterable[Expr[S, R, T] | T],
) -> Expr[S, R, T]:
    """Max the given exprs, i.e. take the largest. For `pl.max`, use Expr.max"""
    return IntermediateExpr(
        pl.max_horizontal(
            *(map(_pl_expr, c) if isinstance(c, Iterable) else _pl_expr(c) for c in exprs)
        )
    )

min_horizontal(*exprs)

Min the given exprs, i.e. take the smallest. For pl.min, use Expr.min

Source code in typol/expr.py
def min_horizontal[S: Shape, R: Shape, T: Orderable](
    *exprs: Expr[S, R, T] | T | Iterable[Expr[S, R, T] | T],
) -> Expr[S, R, T]:
    """Min the given exprs, i.e. take the smallest. For `pl.min`, use Expr.min"""
    return IntermediateExpr(
        pl.min_horizontal(
            *(map(_pl_expr, c) if isinstance(c, Iterable) else _pl_expr(c) for c in exprs)
        )
    )

null(ty)

Create a literal value that can be mapped to a column, or be unset

Source code in typol/expr.py
def null[T](ty: Typeable[T]) -> Expr[Shape, Never, T]:
    """Create a literal value that can be mapped to a column, or be unset"""
    return lit(None)

projection(shape)

Construct a projection of a shape out of a potentially wider shaped dataframe

accounts = tp.DataFrame(Account, ...)
external_email = accounts.with_columns(
    tp.projection(e := EmailDetails).struct()
        .struct.map_rows_to(e.email + "@" + e.organization + ".com", e.email)
)
Source code in typol/expr.py
def projection[S: Shape](shape: type[S]) -> Projection[S]:
    """
    Construct a projection of a shape out of a potentially wider shaped dataframe

    ```
    accounts = tp.DataFrame(Account, ...)
    external_email = accounts.with_columns(
        tp.projection(e := EmailDetails).struct()
            .struct.map_rows_to(e.email + "@" + e.organization + ".com", e.email)
    )
    ```
    """
    return Projection(shape)

row_index()

Row number of the current line of the frame or window, starting at 0

Use this and a transform, i.e. .transform or .with_columns, if you're looking for frame.with_row_index():

# Polars equivalent: frame.with_row_index("line_number")
frame.with_column(tp.row_index().to(Report.line_number))
Source code in typol/expr.py
def row_index() -> MesoExpr[Any, int]:
    """
    Row number of the current line of the frame or window, starting at 0

    Use this and a transform, i.e. `.transform` or `.with_columns`, if you're looking for
    `frame.with_row_index()`:

    ```
    # Polars equivalent: frame.with_row_index("line_number")
    frame.with_column(tp.row_index().to(Report.line_number))
    ```
    """
    return int_range(length())

struct(*exprs)

Construct a struct expression from the underlying expressions:

tp.struct(Account.name.to(Login.account), tp.lit(0).to(Login.attempts))

The above will create an expression that can be put into a column for tp.dimension(tp.struct_of(Login)), or mapped with `.struct.map_rows_to(..., to)

Source code in typol/expr.py
def struct[S: Shape, M: Shape](*exprs: Expr[S, M, Any]) -> MesoExpr[S, StructMapping[M]]:
    """
    Construct a struct expression from the underlying expressions:

    ```
    tp.struct(Account.name.to(Login.account), tp.lit(0).to(Login.attempts))
    ```

    The above will create an expression that can be put into a column for
    `tp.dimension(tp.struct_of(Login))`, or mapped with `.struct.map_rows_to(..., to)
    """
    return IntermediateExpr(pl.struct(map(_pl_expr, exprs)))

suffix(shape, suffix=None)

Create a modified shape where each column name is suffixed:

suffixed = external_accounts.suffix(suff := suffix(ExternalAccount))
account_details = accounts.join(
    suffixed,
    Accounts.external_account_number.on(suff(ExternalAccounts.number))
).transform(
    # Could not do this otherwise, as the Accounts.name and ExternalAccounts.name columns would
    # conflict and Polars wouldn't be able to tell them apart (in regular Polars you'd also have
    # to be explicit)
    (Accounts.name + "-" + suff(ExternalAccounts.name)).to(AccountMatchup.name)
)

Parameters

suffix : str | None The string literal to append to each column name. If None, this will be the name of the shape itself

Source code in typol/expr.py
def suffix[S: Shape](shape: type[S], suffix: str | None = None) -> type[Suffixed[S]]:
    """
    Create a modified shape where each column name is suffixed:

    ```
    suffixed = external_accounts.suffix(suff := suffix(ExternalAccount))
    account_details = accounts.join(
        suffixed,
        Accounts.external_account_number.on(suff(ExternalAccounts.number))
    ).transform(
        # Could not do this otherwise, as the Accounts.name and ExternalAccounts.name columns would
        # conflict and Polars wouldn't be able to tell them apart (in regular Polars you'd also have
        # to be explicit)
        (Accounts.name + "-" + suff(ExternalAccounts.name)).to(AccountMatchup.name)
    )
    ```

    Parameters
    ----------
    suffix : str | None
        The string literal to append to each column name. If `None`, this will be the name of the
        shape itself
    """
    with_shape, with_suffix = shape, suffix

    class SuffixedShape(Suffixed):
        shape = with_shape
        suffix = with_suffix or shape.__qualname__

    return SuffixedShape