Data
pamiq_core.data.DataBuffer ¶
Bases: ABC
, PersistentStateMixin
Interface for managing experience data collected during system execution.
DataBuffer provides an interface for collecting and managing experience data generated during system execution. It maintains a buffer of fixed maximum size.
Type Parameters
- T: The type of individual data elements.
- R: The return type of the get_data() method.
Initializes the DataBuffer.
PARAMETER | DESCRIPTION |
---|---|
max_queue_size
|
Maximum number of samples to store in the collector's queue. When the queue size exceeds this limit, old data will be deleted. If None, the queue will have unlimited size (may cause memory issues).
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If max_queue_size is negative. |
Source code in src/pamiq_core/data/buffer.py
max_queue_size
property
¶
Returns the maximum number of samples that can be stored in the collector's queue.
add
abstractmethod
¶
Adds a new data sample to the buffer.
PARAMETER | DESCRIPTION |
---|---|
data
|
Data element to add to the buffer.
TYPE:
|
get_data
abstractmethod
¶
Retrieves all stored data from the buffer.
RETURNS | DESCRIPTION |
---|---|
R
|
Data structure containing all stored samples. |
__len__
abstractmethod
¶
Returns the current number of samples in the buffer.
RETURNS | DESCRIPTION |
---|---|
int
|
The number of samples currently stored in the buffer.
TYPE:
|
pamiq_core.data.DataCollector ¶
A thread-safe collector for buffered data.
This class provides concurrent data collection capabilities with thread safety, working in conjunction with DataUser to manage data collection and transfer.
Type Parameters
T: The type of individual data elements.
Initialize DataCollector with a specified queue size.
PARAMETER | DESCRIPTION |
---|---|
max_queue_size
|
Maximum size of the collection queue. If None, queue has no size limit.
TYPE:
|
Source code in src/pamiq_core/data/interface.py
collect ¶
Collect data in a thread-safe manner.
PARAMETER | DESCRIPTION |
---|---|
data
|
Data to be collected.
TYPE:
|
pamiq_core.data.DataUser ¶
Bases: PersistentStateMixin
A class that manages data buffering and timestamps for collected data.
This class acts as a user of data buffers, handling the collection, storage, and retrieval of data along with their timestamps. It works in conjunction with a DataCollector to manage concurrent data collection.
Type Parameters
R: The return type of the buffer's get_data() method.
Initialize DataUser with a specified buffer.
PARAMETER | DESCRIPTION |
---|---|
buffer
|
Data buffer instance to store collected data.
TYPE:
|
Source code in src/pamiq_core/data/interface.py
update ¶
Update buffer with collected data from the collector.
Moves all collected data from the collector to the buffer and records their timestamps.
Source code in src/pamiq_core/data/interface.py
get_data ¶
Update and retrieve data from the buffer.
RETURNS | DESCRIPTION |
---|---|
R
|
Current data stored in the buffer. |
count_data_added_since ¶
Count the number of data points added after the specified timestamp.
NOTE: Use pamiq_core.time
to retrieve timestamp
.
PARAMETER | DESCRIPTION |
---|---|
timestamp
|
Reference timestamp to count from.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
Number of data points added after the specified timestamp. |
Source code in src/pamiq_core/data/interface.py
save_state ¶
Save the state of this DataUser to the specified path.
This method first updates the buffer with any pending collected data, then delegates the state saving to the underlying buffer.
PARAMETER | DESCRIPTION |
---|---|
path
|
Directory path where the state should be saved
TYPE:
|
Source code in src/pamiq_core/data/interface.py
load_state ¶
Load the state of this DataUser from the specified path.
This method delegates the state loading to the underlying buffer.
PARAMETER | DESCRIPTION |
---|---|
path
|
Directory path from where the state should be loaded
TYPE:
|
Source code in src/pamiq_core/data/interface.py
pamiq_core.data.impls.SequentialBuffer ¶
Bases: DataBuffer[T, list[T]]
Implementation of DataBuffer that maintains data in sequential order.
This buffer stores collected data points in an ordered queue, preserving the insertion order with a maximum size limit.
Initialize a new SequentialBuffer.
PARAMETER | DESCRIPTION |
---|---|
max_size
|
Maximum number of data points to store.
TYPE:
|
Source code in src/pamiq_core/data/impls/sequential_buffer.py
max_size
property
¶
Returns the maximum number of data points that can be stored in the buffer.
add ¶
Add a new data sample to the buffer.
PARAMETER | DESCRIPTION |
---|---|
data
|
Data element to add to the buffer.
TYPE:
|
get_data ¶
Retrieve all stored data from the buffer.
RETURNS | DESCRIPTION |
---|---|
list[T]
|
List of all stored data elements preserving the original insertion order. |
__len__ ¶
Returns the current number of samples in the buffer.
RETURNS | DESCRIPTION |
---|---|
int
|
The number of samples currently stored in the buffer.
TYPE:
|
save_state ¶
Save the buffer state to the specified path.
Saves the data queue to a pickle file with .pkl extension.
PARAMETER | DESCRIPTION |
---|---|
path
|
File path where to save the buffer state (without extension)
TYPE:
|
Source code in src/pamiq_core/data/impls/sequential_buffer.py
load_state ¶
Load the buffer state from the specified path.
Loads data queue from pickle file with .pkl extension.
PARAMETER | DESCRIPTION |
---|---|
path
|
File path from where to load the buffer state (without extension)
TYPE:
|
Source code in src/pamiq_core/data/impls/sequential_buffer.py
pamiq_core.data.impls.RandomReplacementBuffer ¶
RandomReplacementBuffer(
max_size: int,
replace_probability: float | None = None,
expected_survival_length: int | None = None,
)
Bases: DataBuffer[T, list[T]]
Buffer implementation that randomly replaces elements when full.
This buffer keeps track of collected data and, when full, randomly replaces existing elements based on a configurable probability.
Initialize a RandomReplacementBuffer.
PARAMETER | DESCRIPTION |
---|---|
max_size
|
Maximum number of data points to store.
TYPE:
|
replace_probability
|
Probability of replacing an existing element when buffer is full. Must be between 0.0 and 1.0 inclusive. If None and expected_survival_length is provided, this will be computed automatically. Default is 1.0 if both are None.
TYPE:
|
expected_survival_length
|
Expected number of steps that data should survive in the buffer. Used to automatically compute replace_probability if replace_probability is None. Cannot be specified together with replace_probability.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If replace_probability is not between 0.0 and 1.0 inclusive, or if both replace_probability and expected_survival_length are specified. |
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
max_size
property
¶
Returns the maximum number of data points that can be stored in the buffer.
is_full
property
¶
Check if the buffer has reached its maximum capacity.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the buffer is full, False otherwise. |
compute_replace_probability_from_expected_survival_length
staticmethod
¶
compute_replace_probability_from_expected_survival_length(
max_size: int, survival_length: int
) -> float
Compute the replace probability from expected survival length.
This method calculates the replacement probability needed to achieve a desired expected survival length for data in the buffer.
The computation is based on the mathematical analysis described in below
https://zenn.dev/gesonanko/scraps/b581e75bfd9f3e
PARAMETER | DESCRIPTION |
---|---|
max_size
|
Maximum size of the buffer.
TYPE:
|
survival_length
|
Expected number of steps that data should survive.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
float
|
The computed replacement probability between 0.0 and 1.0. |
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
add ¶
Add a new data sample to the buffer.
If the buffer is full, the new data may replace an existing entry based on the configured replacement probability.
PARAMETER | DESCRIPTION |
---|---|
data
|
Data element to add to the buffer.
TYPE:
|
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
get_data ¶
Retrieve all stored data from the buffer.
RETURNS | DESCRIPTION |
---|---|
list[T]
|
List of all stored data elements. |
list[T]
|
Returns a copy of the internal data to prevent modification. |
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
__len__ ¶
Returns the current number of samples in the buffer.
RETURNS | DESCRIPTION |
---|---|
int
|
The number of samples currently stored in the buffer.
TYPE:
|
save_state ¶
Save the buffer state to the specified path.
Saves the data list to a pickle file with .pkl extension.
PARAMETER | DESCRIPTION |
---|---|
path
|
File path where to save the buffer state (without extension).
TYPE:
|
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
load_state ¶
Load the buffer state from the specified path.
Loads data list from pickle file with .pkl extension.
PARAMETER | DESCRIPTION |
---|---|
path
|
File path from where to load the buffer state (without extension).
TYPE:
|
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
pamiq_core.data.impls.DictSequentialBuffer ¶
Bases: DataBuffer[Mapping[str, T], dict[str, list[T]]]
Buffer implementation that stores dictionary data in sequential order.
See: SequentialBuffer
Initialize a DictSequentialBuffer.
PARAMETER | DESCRIPTION |
---|---|
keys
|
The keys that must be present in all data dictionaries.
TYPE:
|
max_size
|
Maximum number of data points to store.
TYPE:
|
Source code in src/pamiq_core/data/impls/sequential_buffer.py
max_size
property
¶
Returns the maximum number of data points that can be stored in the buffer.
add ¶
Add a new data sample to the buffer.
The data must contain exactly the keys specified during initialization. If the buffer is full, the oldest entry will be removed.
PARAMETER | DESCRIPTION |
---|---|
data
|
Dictionary containing data for each key.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If the data keys don't match the expected keys. |
Source code in src/pamiq_core/data/impls/sequential_buffer.py
get_data ¶
Retrieve all stored data from the buffer.
RETURNS | DESCRIPTION |
---|---|
dict[str, list[T]]
|
Dictionary mapping each key to a list of its stored values. |
dict[str, list[T]]
|
The lists maintain the sequential order in which data was added. |
Source code in src/pamiq_core/data/impls/sequential_buffer.py
__len__ ¶
Returns the current number of samples in the buffer.
RETURNS | DESCRIPTION |
---|---|
int
|
The number of samples currently stored in the buffer.
TYPE:
|
pamiq_core.data.impls.DictRandomReplacementBuffer ¶
DictRandomReplacementBuffer(
keys: Iterable[str],
max_size: int,
replace_probability: float | None = None,
expected_survival_length: int | None = None,
)
Bases: DataBuffer[Mapping[str, T], dict[str, list[T]]]
Buffer implementation that stores dictionary data with random replacement.
Initialize a DictRandomReplacementBuffer.
PARAMETER | DESCRIPTION |
---|---|
keys
|
The keys that must be present in all data dictionaries.
TYPE:
|
Other arguments are same as RandomReplacementBuffer
.
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
max_size
property
¶
Returns the maximum number of data points that can be stored in the buffer.
add ¶
Add a new data sample to the buffer.
The data must contain exactly the keys specified during initialization.
PARAMETER | DESCRIPTION |
---|---|
data
|
Dictionary containing data for each key.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If the data keys don't match the expected keys. |
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
get_data ¶
Retrieve all stored data from the buffer.
RETURNS | DESCRIPTION |
---|---|
dict[str, list[T]]
|
Dictionary mapping each key to a list of its stored values. |
dict[str, list[T]]
|
The lists maintain the order in which data was added/replaced. |
Source code in src/pamiq_core/data/impls/random_replacement_buffer.py
__len__ ¶
Returns the current number of samples in the buffer.
RETURNS | DESCRIPTION |
---|---|
int
|
The number of samples currently stored in the buffer.
TYPE:
|