Skip to content

EventDataset

kloppy.domain.EventDataset dataclass

EventDataset(records, metadata)

Bases: Dataset[Event]

An event stream dataset.

ATTRIBUTE DESCRIPTION
dataset_type

"DatasetType.EVENT"

TYPE: DatasetType

events

A list of events. Alias for records.

TYPE: List[Event]

metadata

Metadata for the dataset.

TYPE: Metadata

dataset_type property

dataset_type

events property

events

records instance-attribute

records

metadata instance-attribute

metadata

get_event_by_id

get_event_by_id(event_id)
Source code in kloppy/domain/models/event.py
def get_event_by_id(self, event_id: str) -> Optional[Event]:
    return self.get_record_by_id(event_id)

add_state

add_state(*builder_keys)

See [add_state][kloppy.domain.services.state_builder.add_state]

Source code in kloppy/domain/models/event.py
def add_state(self, *builder_keys):
    """
    See [`add_state`][kloppy.domain.services.state_builder.add_state]
    """
    from kloppy.domain.services.state_builder import add_state

    return add_state(self, *builder_keys)

to_pandas

to_pandas(record_converter=None, additional_columns=None)
Source code in kloppy/domain/models/event.py
@deprecated(
    "to_pandas will be removed in the future. Please use to_df instead."
)
def to_pandas(
    self,
    record_converter: Optional[Callable[[Event], Dict]] = None,
    additional_columns: Optional[NamedColumns] = None,
) -> "DataFrame":
    try:
        import pandas as pd
    except ImportError:
        raise ImportError(
            "Seems like you don't have pandas installed. Please"
            " install it using: pip install pandas"
        )

    if not record_converter:
        from ..services.transformers.attribute import (
            DefaultEventTransformer,
        )

        record_converter = DefaultEventTransformer()

    def generic_record_converter(event: Event):
        row = record_converter(event)
        if additional_columns:
            for k, v in additional_columns.items():
                if callable(v):
                    value = v(event)
                else:
                    value = v
                row.update({k: value})
        return row

    return pd.DataFrame.from_records(
        map(generic_record_converter, self.records)
    )

aggregate

aggregate(type_, **aggregator_kwargs)
Source code in kloppy/domain/models/event.py
def aggregate(self, type_: str, **aggregator_kwargs) -> List[Any]:
    if type_ == "minutes_played":
        from kloppy.domain.services.aggregators.minutes_played import (
            MinutesPlayedAggregator,
        )

        aggregator = MinutesPlayedAggregator(**aggregator_kwargs)
    else:
        raise KloppyError(f"No aggregator {type_} not found")

    return aggregator.aggregate(self)

transform

transform(*args, **kwargs)

See transform

filter

filter(filter_)

Filter all records used filter_

PARAMETER DESCRIPTION
filter_

The filter to be used to filter the records. It can be a callable that takes a record and returns a boolean, or a string representing a css-like selector.

TYPE: Union[str, Callable[[Event], bool]]

Examples:

1
2
3
>>> from kloppy.domain import EventType
>>> dataset = dataset.filter(lambda event: event.event_type == EventType.PASS)
>>> dataset = dataset.filter('pass')

map

map(mapper)

find_all

find_all(filter_)

find

find(filter_)

from_dataset classmethod

from_dataset(dataset, mapper_fn)

Create a new Dataset from other dataset

PARAMETER DESCRIPTION
mapper_fn

TYPE: Callable[[Self], Self]

Examples:

>>> from kloppy.domain import Code,     CodeDataset
>>> code_dataset = (
>>>     CodeDataset
>>>     .from_dataset(
>>>         dataset,
>>>         lambda event: Code(
>>>             code_id=event.event_id,
>>>             code=event.event_name,
>>>             period=event.period,
>>>             timestamp=event.timestamp - 7,
>>>             end_timestamp=event.timestamp + 5,
>>>             labels={
>>>                 'Player': str(event.player),
>>>                 'Team': str(event.team)
>>>             }
>>>         )
>>>     )
>>> )

get_record_by_id

get_record_by_id(record_id)

to_records

1
2
3
4
5
to_records(
    *columns: Unpack[tuple[Column]],
    as_list: Literal[True] = True,
    **named_columns: NamedColumns
) -> List[Dict[str, Any]]
1
2
3
4
5
to_records(
    *columns: Unpack[tuple[Column]],
    as_list: Literal[False] = False,
    **named_columns: NamedColumns
) -> Iterable[Dict[str, Any]]
to_records(*columns, as_list=True, **named_columns)

to_dict

to_dict(*columns, orient='list', **named_columns)

to_df

to_df(*columns, engine=None, **named_columns)