Skip to content

Data

pamiq_core.data.DataBuffer

DataBuffer(collecting_data_names: Iterable[str], max_size: int)

Bases: ABC, PersistentStateMixin

Interface for managing experience data collected during system execution.

DataBuffer provides an interface for collecting and managing experience data generated during system execution. It maintains a buffer of fixed maximum size that stores data for specified data names.

Initializes the DataBuffer.

PARAMETER DESCRIPTION
collecting_data_names

Names of data fields to collect and store.

TYPE: Iterable[str]

max_size

Maximum number of samples to store in the buffer.

TYPE: int

RAISES DESCRIPTION
ValueError

If max_size is negative.

Source code in src/pamiq_core/data/buffer.py
def __init__(self, collecting_data_names: Iterable[str], max_size: int) -> None:
    """Initializes the DataBuffer.

    Args:
        collecting_data_names: Names of data fields to collect and store.
        max_size: Maximum number of samples to store in the buffer.

    Raises:
        ValueError: If max_size is negative.
    """
    super().__init__()
    self._collecting_data_names = set(collecting_data_names)
    if max_size < 0:
        raise ValueError("max_size must be non-negative")
    self._max_size = max_size

collecting_data_names property

collecting_data_names: set[str]

Returns the set of data field names being collected.

max_size property

max_size: int

Returns the maximum number of samples that can be stored.

add abstractmethod

add(step_data: StepData[T]) -> None

Adds a new data sample to the buffer.

PARAMETER DESCRIPTION
step_data

Dictionary containing data for one step. Must contain all fields specified in collecting_data_names.

TYPE: StepData[T]

Source code in src/pamiq_core/data/buffer.py
@abstractmethod
def add(self, step_data: StepData[T]) -> None:
    """Adds a new data sample to the buffer.

    Args:
        step_data: Dictionary containing data for one step. Must contain
            all fields specified in collecting_data_names.
    """
    pass

get_data abstractmethod

get_data() -> BufferData[T]

Retrieves all stored data from the buffer.

RETURNS DESCRIPTION
BufferData[T]

Dictionary mapping data field names to sequences of their values.

BufferData[T]

Each sequence has the same length.

Source code in src/pamiq_core/data/buffer.py
@abstractmethod
def get_data(self) -> BufferData[T]:
    """Retrieves all stored data from the buffer.

    Returns:
        Dictionary mapping data field names to sequences of their values.
        Each sequence has the same length.
    """
    pass

__len__ abstractmethod

__len__() -> int

Returns the current number of samples in the buffer.

RETURNS DESCRIPTION
int

The number of samples currently stored in the buffer.

TYPE: int

Source code in src/pamiq_core/data/buffer.py
@abstractmethod
def __len__(self) -> int:
    """Returns the current number of samples in the buffer.

    Returns:
        int: The number of samples currently stored in the buffer.
    """
    pass

pamiq_core.data.DataCollector

DataCollector(user: DataUser[T])

A thread-safe collector for buffered data.

This class provides concurrent data collection capabilities with thread safety, working in conjunction with DataUser to manage data collection and transfer.

Initialize DataCollector with a specified DataUser.

PARAMETER DESCRIPTION
user

DataUser instance this collector is associated with.

TYPE: DataUser[T]

Source code in src/pamiq_core/data/interface.py
def __init__(self, user: DataUser[T]) -> None:
    """Initialize DataCollector with a specified DataUser.

    Args:
        user: DataUser instance this collector is associated with.
    """
    self._user = user
    self._queues_dict = user.create_empty_queues()
    self._lock = RLock()

collect

collect(step_data: StepData[T]) -> None

Collect step data in a thread-safe manner.

PARAMETER DESCRIPTION
step_data

Data to be collected.

TYPE: StepData[T]

Source code in src/pamiq_core/data/interface.py
def collect(self, step_data: StepData[T]) -> None:
    """Collect step data in a thread-safe manner.

    Args:
        step_data: Data to be collected.
    """
    with self._lock:
        self._queues_dict.append(step_data)

pamiq_core.data.DataUser

DataUser(buffer: DataBuffer[T])

Bases: PersistentStateMixin

A class that manages data buffering and timestamps for collected data.

This class acts as a user of data buffers, handling the collection, storage, and retrieval of data along with their timestamps. It works in conjunction with a DataCollector to manage concurrent data collection.

Initialize DataUser with a specified buffer.

PARAMETER DESCRIPTION
buffer

Data buffer instance to store collected data.

TYPE: DataBuffer[T]

Source code in src/pamiq_core/data/interface.py
def __init__(self, buffer: DataBuffer[T]) -> None:
    """Initialize DataUser with a specified buffer.

    Args:
        buffer: Data buffer instance to store collected data.
    """
    self._buffer = buffer
    self._timestamps: deque[float] = deque(maxlen=buffer.max_size)
    # DataCollector instance is only accessed from DataUser and Container classes
    self._collector = DataCollector(self)

create_empty_queues

create_empty_queues() -> TimestampingQueuesDict[T]

Create empty timestamping queues for data collection.

RETURNS DESCRIPTION
TimestampingQueuesDict[T]

New instance of TimestampingQueuesDict with appropriate configuration.

Source code in src/pamiq_core/data/interface.py
def create_empty_queues(self) -> TimestampingQueuesDict[T]:
    """Create empty timestamping queues for data collection.

    Returns:
        New instance of TimestampingQueuesDict with appropriate configuration.
    """
    return TimestampingQueuesDict(
        self._buffer.collecting_data_names, self._buffer.max_size
    )

update

update() -> None

Update buffer with collected data from the collector.

Moves all collected data from the collector to the buffer and records their timestamps.

Source code in src/pamiq_core/data/interface.py
def update(self) -> None:
    """Update buffer with collected data from the collector.

    Moves all collected data from the collector to the buffer and
    records their timestamps.
    """
    queues = self._collector._move_data()  # pyright: ignore[reportPrivateUsage]
    for _ in range(len(queues)):
        data, t = queues.popleft()
        self._buffer.add(data)
        self._timestamps.append(t)

get_data

get_data() -> BufferData[T]

Retrieve data from the buffer.

RETURNS DESCRIPTION
BufferData[T]

Current data stored in the buffer.

Source code in src/pamiq_core/data/interface.py
def get_data(self) -> BufferData[T]:
    """Retrieve data from the buffer.

    Returns:
        Current data stored in the buffer.
    """
    return self._buffer.get_data()

count_data_added_since

count_data_added_since(timestamp: float) -> int

Count the number of data points added after the specified timestamp.

NOTE: Use pamiq_core.time to retrieve timestamp.

PARAMETER DESCRIPTION
timestamp

Reference timestamp to count from.

TYPE: float

RETURNS DESCRIPTION
int

Number of data points added after the specified timestamp.

Source code in src/pamiq_core/data/interface.py
def count_data_added_since(self, timestamp: float) -> int:
    """Count the number of data points added after the specified timestamp.

    NOTE: Use `pamiq_core.time` to retrieve `timestamp`.

    Args:
        timestamp: Reference timestamp to count from.

    Returns:
        Number of data points added after the specified timestamp.
    """
    for i, t in enumerate(reversed(self._timestamps)):
        if t <= timestamp:
            return i
    return len(self._timestamps)

save_state

save_state(path: Path) -> None

Save the state of this DataUser to the specified path.

This method first updates the buffer with any pending collected data, then delegates the state saving to the underlying buffer.

PARAMETER DESCRIPTION
path

Directory path where the state should be saved

TYPE: Path

Source code in src/pamiq_core/data/interface.py
@override
def save_state(self, path: Path) -> None:
    """Save the state of this DataUser to the specified path.

    This method first updates the buffer with any pending collected data,
    then delegates the state saving to the underlying buffer.

    Args:
        path: Directory path where the state should be saved
    """
    self.update()
    path.mkdir()
    self._buffer.save_state(path / "buffer")
    with open(path / "timestamps.pkl", "wb") as f:
        pickle.dump(self._timestamps, f)

load_state

load_state(path: Path) -> None

Load the state of this DataUser from the specified path.

This method delegates the state loading to the underlying buffer.

PARAMETER DESCRIPTION
path

Directory path from where the state should be loaded

TYPE: Path

Source code in src/pamiq_core/data/interface.py
@override
def load_state(self, path: Path) -> None:
    """Load the state of this DataUser from the specified path.

    This method delegates the state loading to the underlying buffer.

    Args:
        path: Directory path from where the state should be loaded
    """
    self._buffer.load_state(path / "buffer")
    with open(path / "timestamps.pkl", "rb") as f:
        self._timestamps = deque(pickle.load(f), maxlen=self._buffer.max_size)

__len__

__len__() -> int

Returns the current number of samples in the buffer.

Source code in src/pamiq_core/data/interface.py
def __len__(self) -> int:
    """Returns the current number of samples in the buffer."""
    return len(self._buffer)

pamiq_core.data.impls.SequentialBuffer

SequentialBuffer(collecting_data_names: Iterable[str], max_size: int)

Bases: DataBuffer[T]

Implementation of DataBuffer that maintains data in sequential order.

This buffer stores collected data points in ordered queues, preserving the insertion order. Each data field is stored in a separate queue with a maximum size limit.

Initialize a new SequentialBuffer.

PARAMETER DESCRIPTION
collecting_data_names

Names of data fields to collect.

TYPE: Iterable[str]

max_size

Maximum number of data points to store.

TYPE: int

Source code in src/pamiq_core/data/impls/sequential_buffer.py
@override
def __init__(self, collecting_data_names: Iterable[str], max_size: int):
    """Initialize a new SequentialBuffer.

    Args:
        collecting_data_names: Names of data fields to collect.
        max_size: Maximum number of data points to store.
    """
    super().__init__(collecting_data_names, max_size)

    self._queues_dict: dict[str, deque[T]] = {
        name: deque(maxlen=max_size) for name in collecting_data_names
    }

    self._current_size = 0

add

add(step_data: StepData[T]) -> None

Add a new data sample to the buffer.

PARAMETER DESCRIPTION
step_data

Dictionary containing data for one step. Must contain all fields specified in collecting_data_names.

TYPE: StepData[T]

RAISES DESCRIPTION
KeyError

If a required data field is missing from step_data.

Source code in src/pamiq_core/data/impls/sequential_buffer.py
@override
def add(self, step_data: StepData[T]) -> None:
    """Add a new data sample to the buffer.

    Args:
        step_data: Dictionary containing data for one step. Must contain
            all fields specified in collecting_data_names.

    Raises:
        KeyError: If a required data field is missing from step_data.
    """
    for name in self.collecting_data_names:
        if name not in step_data:
            raise KeyError(f"Required data '{name}' not found in step_data")
        self._queues_dict[name].append(step_data[name])

    if self._current_size < self.max_size:
        self._current_size += 1

get_data

get_data() -> dict[str, list[T]]

Retrieve all stored data from the buffer.

RETURNS DESCRIPTION
dict[str, list[T]]

Dictionary mapping data field names to lists of their values.

dict[str, list[T]]

Each list preserves the original insertion order.

Source code in src/pamiq_core/data/impls/sequential_buffer.py
@override
def get_data(self) -> dict[str, list[T]]:
    """Retrieve all stored data from the buffer.

    Returns:
        Dictionary mapping data field names to lists of their values.
        Each list preserves the original insertion order.
    """
    return {name: list(queue) for name, queue in self._queues_dict.items()}

__len__

__len__() -> int

Returns the current number of samples in the buffer.

RETURNS DESCRIPTION
int

The number of samples currently stored in the buffer.

TYPE: int

Source code in src/pamiq_core/data/impls/sequential_buffer.py
@override
def __len__(self) -> int:
    """Returns the current number of samples in the buffer.

    Returns:
        int: The number of samples currently stored in the buffer.
    """
    return self._current_size

save_state

save_state(path: Path) -> None

Save the buffer state to the specified path.

Creates a directory at the given path and saves each data queue as a separate pickle file.

PARAMETER DESCRIPTION
path

Directory path where to save the buffer state

TYPE: Path

Source code in src/pamiq_core/data/impls/sequential_buffer.py
@override
def save_state(self, path: Path) -> None:
    """Save the buffer state to the specified path.

    Creates a directory at the given path and saves each data queue as a
    separate pickle file.

    Args:
        path: Directory path where to save the buffer state
    """
    path.mkdir()
    for name, queue in self._queues_dict.items():
        with open(path / f"{name}.pkl", "wb") as f:
            pickle.dump(queue, f)

load_state

load_state(path: Path) -> None

Load the buffer state from the specified path.

Loads data queues from pickle files in the given directory.

PARAMETER DESCRIPTION
path

Directory path from where to load the buffer state

TYPE: Path

Source code in src/pamiq_core/data/impls/sequential_buffer.py
@override
def load_state(self, path: Path) -> None:
    """Load the buffer state from the specified path.

    Loads data queues from pickle files in the given directory.

    Args:
        path: Directory path from where to load the buffer state
    """
    for name in self.collecting_data_names:
        with open(path / f"{name}.pkl", "rb") as f:
            queue = deque(pickle.load(f), maxlen=self.max_size)
            self._queues_dict[name] = queue

pamiq_core.data.impls.RandomReplacementBuffer

RandomReplacementBuffer(
    collecting_data_names: Iterable[str],
    max_size: int,
    replace_probability: float | None = None,
    expected_survival_length: int | None = None,
)

Bases: DataBuffer[T]

Buffer implementation that randomly replaces elements when full.

This buffer keeps track of collected data and, when full, randomly replaces existing elements based on a configurable probability.

Initialize a RandomReplacementBuffer.

PARAMETER DESCRIPTION
collecting_data_names

Names of data fields to collect.

TYPE: Iterable[str]

max_size

Maximum number of data points to store.

TYPE: int

replace_probability

Probability of replacing an existing element when buffer is full. Must be between 0.0 and 1.0 inclusive. If None and expected_survival_length is provided, this will be computed automatically. Default is 1.0 if both are None.

TYPE: float | None DEFAULT: None

expected_survival_length

Expected number of steps that data should survive in the buffer. Used to automatically compute replace_probability if replace_probability is None. Cannot be specified together with replace_probability.

TYPE: int | None DEFAULT: None

RAISES DESCRIPTION
ValueError

If replace_probability is not between 0.0 and 1.0 inclusive, or if both replace_probability and expected_survival_length are specified.

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
def __init__(
    self,
    collecting_data_names: Iterable[str],
    max_size: int,
    replace_probability: float | None = None,
    expected_survival_length: int | None = None,
) -> None:
    """Initialize a RandomReplacementBuffer.

    Args:
        collecting_data_names: Names of data fields to collect.
        max_size: Maximum number of data points to store.
        replace_probability: Probability of replacing an existing element when buffer is full.
            Must be between 0.0 and 1.0 inclusive. If None and expected_survival_length is provided,
            this will be computed automatically. Default is 1.0 if both are None.
        expected_survival_length: Expected number of steps that data should survive in the buffer.
            Used to automatically compute replace_probability if replace_probability is None.
            Cannot be specified together with replace_probability.

    Raises:
        ValueError: If replace_probability is not between 0.0 and 1.0 inclusive, or if both
            replace_probability and expected_survival_length are specified.
    """
    super().__init__(collecting_data_names, max_size)

    if replace_probability is None:
        if expected_survival_length is None:
            replace_probability = 1.0
        else:
            replace_probability = (
                self.compute_replace_probability_from_expected_survival_length(
                    max_size, expected_survival_length
                )
            )
    elif expected_survival_length is not None:
        raise ValueError(
            "Cannot specify both replace_probability and expected_survival_length. "
            "Please specify only one of them."
        )
    if not (1.0 >= replace_probability >= 0.0):
        raise ValueError(
            "replace_probability must be between 0.0 and 1.0 inclusive"
        )

    self._lists_dict: dict[str, list[T]] = {
        name: [] for name in collecting_data_names
    }

    self._replace_probability = replace_probability
    self._current_size = 0

is_full property

is_full: bool

Check if the buffer has reached its maximum capacity.

RETURNS DESCRIPTION
bool

True if the buffer is full, False otherwise.

compute_replace_probability_from_expected_survival_length staticmethod

compute_replace_probability_from_expected_survival_length(
    max_size: int, survival_length: int
) -> float

Compute the replace probability from expected survival length.

This method calculates the replacement probability needed to achieve a desired expected survival length for data in the buffer.

The computation is based on the mathematical analysis described in below

https://zenn.dev/gesonanko/scraps/b581e75bfd9f3e

PARAMETER DESCRIPTION
max_size

Maximum size of the buffer.

TYPE: int

survival_length

Expected number of steps that data should survive.

TYPE: int

RETURNS DESCRIPTION
float

The computed replacement probability between 0.0 and 1.0.

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
@staticmethod
def compute_replace_probability_from_expected_survival_length(
    max_size: int, survival_length: int
) -> float:
    """Compute the replace probability from expected survival length.

    This method calculates the replacement probability needed to achieve
    a desired expected survival length for data in the buffer.

    The computation is based on the mathematical analysis described in below:
        https://zenn.dev/gesonanko/scraps/b581e75bfd9f3e

    Args:
        max_size: Maximum size of the buffer.
        survival_length: Expected number of steps that data should survive.

    Returns:
        The computed replacement probability between 0.0 and 1.0.
    """
    gamma = 0.5772156649015329  # Euler-Mascheroni constant
    p = max_size / survival_length * (math.log(max_size) + gamma)
    return min(max(p, 0.0), 1.0)  # Clamp value between 0 to 1.

add

add(step_data: StepData[T]) -> None

Add a new data sample to the buffer.

If the buffer is full, the new data may replace an existing entry based on the configured replacement probability.

PARAMETER DESCRIPTION
step_data

Dictionary containing data for one step. Must contain all fields specified in collecting_data_names.

TYPE: StepData[T]

RAISES DESCRIPTION
KeyError

If a required data field is missing from step_data.

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
@override
def add(self, step_data: StepData[T]) -> None:
    """Add a new data sample to the buffer.

    If the buffer is full, the new data may replace an existing entry
    based on the configured replacement probability.

    Args:
        step_data: Dictionary containing data for one step. Must contain
            all fields specified in collecting_data_names.

    Raises:
        KeyError: If a required data field is missing from step_data.
    """
    for name in self.collecting_data_names:
        if name not in step_data:
            raise KeyError(f"Required data '{name}' not found in step_data")

    if self.is_full:
        if random.random() > self._replace_probability:
            return
        replace_index = random.randint(0, self.max_size - 1)
        for name in self.collecting_data_names:
            self._lists_dict[name][replace_index] = step_data[name]
    else:
        for name in self.collecting_data_names:
            self._lists_dict[name].append(step_data[name])
        self._current_size += 1

get_data

get_data() -> Mapping[str, list[T]]

Retrieve all stored data from the buffer.

RETURNS DESCRIPTION
Mapping[str, list[T]]

Dictionary mapping data field names to lists of their values.

Mapping[str, list[T]]

Returns a copy of the internal data to prevent modification.

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
@override
def get_data(self) -> Mapping[str, list[T]]:
    """Retrieve all stored data from the buffer.

    Returns:
        Dictionary mapping data field names to lists of their values.
        Returns a copy of the internal data to prevent modification.
    """
    return {name: data.copy() for name, data in self._lists_dict.items()}

__len__

__len__() -> int

Returns the current number of samples in the buffer.

RETURNS DESCRIPTION
int

The number of samples currently stored in the buffer.

TYPE: int

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
@override
def __len__(self) -> int:
    """Returns the current number of samples in the buffer.

    Returns:
        int: The number of samples currently stored in the buffer.
    """
    return self._current_size

save_state

save_state(path: Path) -> None

Save the buffer state to the specified path.

Creates a directory at the given path and saves each data list as a separate pickle file.

PARAMETER DESCRIPTION
path

Directory path where to save the buffer state.

TYPE: Path

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
@override
def save_state(self, path: Path) -> None:
    """Save the buffer state to the specified path.

    Creates a directory at the given path and saves each data list as a
    separate pickle file.

    Args:
        path: Directory path where to save the buffer state.
    """
    path.mkdir()
    for name, data in self._lists_dict.items():
        with open(path / f"{name}.pkl", "wb") as f:
            pickle.dump(data, f)

load_state

load_state(path: Path) -> None

Load the buffer state from the specified path.

Loads data lists from pickle files in the given directory.

PARAMETER DESCRIPTION
path

Directory path from where to load the buffer state.

TYPE: Path

RAISES DESCRIPTION
ValueError

If loaded data lists have inconsistent lengths.

Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
@override
def load_state(self, path: Path) -> None:
    """Load the buffer state from the specified path.

    Loads data lists from pickle files in the given directory.

    Args:
        path: Directory path from where to load the buffer state.

    Raises:
        ValueError: If loaded data lists have inconsistent lengths.
    """
    lists_dict: dict[str, list[T]] = {}
    size: int | None = None
    for name in self.collecting_data_names:
        with open(path / f"{name}.pkl", "rb") as f:
            obj = list(pickle.load(f))[: self.max_size]
        if size is None:
            size = len(obj)
        if size != len(obj):
            raise ValueError("Inconsistent list lengths in loaded data")
        lists_dict[name] = obj

    self._lists_dict = lists_dict
    if size is None:
        self._current_size = 0
    else:
        self._current_size = size