Skip to content

Utils

Dict

Module evalsense.utils.dict.

Functions:

Name Description
deep_update

Recursively updates a dictionary with data from another dictionary.

deep_update

deep_update(old_dict: dict, new_dict: dict) -> dict

Recursively updates a dictionary with data from another dictionary.

Parameters:

Name Type Description Default
old_dict dict

The dictionary to update.

required
new_dict dict

The new dictionary data.

required
Source code in evalsense/utils/dict.py
def deep_update(old_dict: dict, new_dict: dict) -> dict:
    """Recursively updates a dictionary with data from another dictionary.

    Args:
        old_dict (dict): The dictionary to update.
        new_dict (dict): The new dictionary data.
    """
    old_dict = old_dict.copy()
    for key, value in new_dict.items():
        if key in old_dict and isinstance(value, dict):
            old_dict[key] = deep_update(old_dict[key], value)
        else:
            old_dict[key] = value
    return old_dict

Files

Module evalsense.utils.files.

Functions:

Name Description
download_file

Downloads a file from a URL.

get_remote_file_headers

Gets the HTTP headers of a remote file.

to_safe_filename

Converts a string to a safe filename.

verify_file

Verifies the integrity of a file against the provided metadata.

download_file

download_file(
    url: str,
    target_path: str | Path,
    resume_download: bool = True,
    force_download: bool = False,
    show_progress: bool = True,
    max_attempts: int = 2,
    expected_hash: str | None = None,
    hash_type: str = DEFAULT_HASH_TYPE,
    chunk_size: int = 10 * 1024**2,
) -> None

Downloads a file from a URL.

Parameters:

Name Type Description Default
url str

The URL of the file to download.

required
target_path str | Path

The path to save the downloaded file.

required
resume_download bool

Whether to resume a partially downloaded file.

True
force_download bool

Whether to force the download even if the file already exists.

False
show_progress bool

Whether to show download progress.

True
max_attempts int

The maximum number of download attempts. Defaults to 2.

2
expected_hash str

The expected hash of the downloaded file.

None
hash_type str

The hash algorithm to use. Defaults to "sha256".

DEFAULT_HASH_TYPE
chunk_size int

The size of each download chunk in bytes. Defaults to 10MB.

10 * 1024 ** 2

Raises:

Type Description
RuntimeError

If the download fails after the maximum number of attempts.

ValueError

If max_attempts is invalid.

Source code in evalsense/utils/files.py
def download_file(
    url: str,
    target_path: str | Path,
    resume_download: bool = True,
    force_download: bool = False,
    show_progress: bool = True,
    max_attempts: int = 2,
    expected_hash: str | None = None,
    hash_type: str = DEFAULT_HASH_TYPE,
    chunk_size: int = 10 * 1024**2,
) -> None:
    """Downloads a file from a URL.

    Args:
        url (str): The URL of the file to download.
        target_path (str | Path): The path to save the downloaded file.
        resume_download (bool, optional): Whether to resume a partially
            downloaded file.
        force_download (bool, optional): Whether to force the download even
            if the file already exists.
        show_progress (bool, optional): Whether to show download progress.
        max_attempts (int, optional): The maximum number of download attempts.
            Defaults to 2.
        expected_hash (str, optional): The expected hash of the downloaded file.
        hash_type (str, optional): The hash algorithm to use. Defaults to "sha256".
        chunk_size (int, optional): The size of each download chunk in bytes.
            Defaults to 10MB.

    Raises:
        RuntimeError: If the download fails after the maximum number of attempts.
        ValueError: If max_attempts is invalid.
    """
    if max_attempts < 1:
        raise ValueError("max_attempts must be 1 or greater")

    target_path = Path(target_path)
    target_name = target_path.name
    target_path.parent.mkdir(parents=True, exist_ok=True)

    # Try to determine the size of the downloaded file and resume support
    remote_file_headers = get_remote_file_headers(url, max_attempts=max_attempts)
    file_size = int(remote_file_headers.get("Content-Length", 0))
    supports_resume = "Accept-Ranges" in remote_file_headers
    content_encoding = remote_file_headers.get("Content-Encoding", "")
    compression_encodings = ["gzip", "deflate", "br", "zstd"]
    if any(encoding in content_encoding for encoding in compression_encodings):
        supports_resume = False

    # Return early if file already downloaded
    if not force_download and verify_file(
        target_path,
        expected_size=file_size,
        expected_hash=expected_hash,
        hash_type=hash_type,
    ):
        return

    # Check existing partial download
    temp_path = target_path.with_suffix(target_path.suffix + ".part")
    already_downloaded_size = (
        temp_path.stat().st_size
        if temp_path.exists() and resume_download and supports_resume
        else 0
    )
    if already_downloaded_size > file_size:
        # Partial download is corrupted — larger file size than expected
        already_downloaded_size = 0
    headers = (
        {"Range": f"bytes={already_downloaded_size}-"}
        if already_downloaded_size > 0
        else {}
    )
    headers["User-Agent"] = USER_AGENT

    # Try to download the file
    try:
        for attempt in Retrying(
            stop=stop_after_attempt(max_attempts),
            wait=wait_exponential(multiplier=1, min=2, max=32),
            reraise=True,
        ):
            with (
                attempt as _,
                requests.get(url, stream=True, headers=headers, timeout=10) as response,
            ):
                response.raise_for_status()

                mode = "ab" if already_downloaded_size > 0 else "wb"
                with (
                    open(temp_path, mode) as file,
                    tqdm(
                        total=file_size,
                        initial=already_downloaded_size,
                        unit="B",
                        unit_scale=True,
                        unit_divisor=1024,
                        disable=not show_progress,
                        desc=f"Downloading {target_name}",
                        leave=False,
                    ) as progress,
                ):
                    for chunk in response.iter_content(chunk_size):
                        if chunk:
                            file.write(chunk)
                            file.flush()
                            progress.update(len(chunk))

                if not verify_file(
                    temp_path,
                    expected_hash=expected_hash,
                    hash_type=hash_type,
                ):
                    # temp_path.unlink()
                    raise RuntimeError(
                        f"Downloaded file {target_path} could not be verified."
                    )

                # Success, move the temporary file to the target path
                temp_path.replace(target_path)

    except Exception as e:
        raise RuntimeError(
            f"Download from {url} failed after {max_attempts} attempts: {e}"
        )

get_remote_file_headers

get_remote_file_headers(
    url: str, max_attempts: int = 2
) -> CaseInsensitiveDict[str]

Gets the HTTP headers of a remote file.

Parameters:

Name Type Description Default
url str

The URL of the file.

required
max_attempts int

The maximum number of attempts to get the headers. Defaults to 2.

2

Returns:

Type Description
CaseInsensitiveDict[str]

The headers of the file.

Raises:

Type Description
RuntimeError

If the headers cannot be determined in the maximum number of attempts.

Source code in evalsense/utils/files.py
def get_remote_file_headers(
    url: str, max_attempts: int = 2
) -> CaseInsensitiveDict[str]:
    """Gets the HTTP headers of a remote file.

    Args:
        url (str): The URL of the file.
        max_attempts (int, optional): The maximum number of attempts to get the headers.
            Defaults to 2.

    Returns:
        (CaseInsensitiveDict[str]): The headers of the file.

    Raises:
        RuntimeError: If the headers cannot be determined in the maximum
            number of attempts.
    """
    try:
        for attempt in Retrying(
            stop=stop_after_attempt(max_attempts),
            wait=wait_exponential(multiplier=1, min=2, max=32),
            reraise=True,
        ):
            headers = {"User-Agent": USER_AGENT}
            with attempt as _, requests.head(url, headers=headers) as response:
                response.raise_for_status()
                return response.headers
    except Exception as e:
        raise RuntimeError(f"Failed to get headers for {url}: {e}")
    assert False, "Unreachable code (included as a hint for type checking)"

to_safe_filename

to_safe_filename(name: str) -> str

Converts a string to a safe filename.

Parameters:

Name Type Description Default
name str

The string to convert.

required

Returns:

Type Description
str

The safe filename.

Source code in evalsense/utils/files.py
def to_safe_filename(name: str) -> str:
    """Converts a string to a safe filename.

    Args:
        name (str): The string to convert.

    Returns:
        (str): The safe filename.
    """
    name = unicodedata.normalize("NFKD", name)
    name = regex.sub(r"[^\w\s-_]", "-", name)
    name = regex.sub(r"[-\s]+", "-", name)
    return name

verify_file

verify_file(
    file_path: str | Path,
    expected_size: int | None = None,
    expected_hash: str | None = None,
    hash_type: str = DEFAULT_HASH_TYPE,
    show_progress: bool = True,
    chunk_size: int = 10 * 1024**2,
) -> bool

Verifies the integrity of a file against the provided metadata.

Parameters:

Name Type Description Default
file_path str | Path

The path to the file to verify.

required
expected_size int

The expected size of the file in bytes (skips checking size if None).

None
expected_hash str

The expected hash of the file (skips checking hash if None).

None
hash_type str

The hash algorithm to use. Defaults to "sha256".

DEFAULT_HASH_TYPE
show_progress bool

Whether to show verification progress.

True
chunk_size int

The size of each verification chunk in bytes. Defaults to 10MB.

10 * 1024 ** 2

Returns:

Type Description
bool

True if the file matches the expected metadata, False otherwise.

Raises:

Type Description
ValueError

If the hash type is unsupported.

Source code in evalsense/utils/files.py
def verify_file(
    file_path: str | Path,
    expected_size: int | None = None,
    expected_hash: str | None = None,
    hash_type: str = DEFAULT_HASH_TYPE,
    show_progress: bool = True,
    chunk_size: int = 10 * 1024**2,
) -> bool:
    """Verifies the integrity of a file against the provided metadata.

    Args:
        file_path (str | Path): The path to the file to verify.
        expected_size (int, optional): The expected size of the file in bytes
            (skips checking size if None).
        expected_hash (str, optional): The expected hash of the file (skips checking
            hash if None).
        hash_type (str, optional): The hash algorithm to use. Defaults to "sha256".
        show_progress (bool, optional): Whether to show verification progress.
        chunk_size (int, optional): The size of each verification chunk in bytes.
            Defaults to 10MB.

    Returns:
        (bool): True if the file matches the expected metadata, False otherwise.

    Raises:
        ValueError: If the hash type is unsupported.
    """
    file_path = Path(file_path)
    file_name = file_path.name
    if not file_path.exists():
        return False

    if expected_size is not None and file_path.stat().st_size != expected_size:
        return False

    if expected_hash is not None:
        try:
            hash_func = hashlib.new(hash_type)
        except ValueError:
            raise ValueError(f"Unsupported hash type: {hash_type}")

        with (
            file_path.open("rb") as f,
            tqdm(
                total=expected_size,
                unit="B",
                unit_scale=True,
                unit_divisor=1024,
                disable=not show_progress,
                desc=f"Verifying {file_name}",
                leave=False,
            ) as progress,
        ):
            while chunk := f.read(chunk_size):
                hash_func.update(chunk)
                progress.update(len(chunk))

        computed_hash = hash_func.hexdigest()

        if computed_hash.lower() != expected_hash.lower():
            return False

    return True

🤗 Huggingface

Module evalsense.utils.huggingface.

Functions:

Name Description
disable_dataset_progress_bars

Context manager to disable progress bars for Hugging Face datasets.

disable_dataset_progress_bars

disable_dataset_progress_bars()

Context manager to disable progress bars for Hugging Face datasets.

Source code in evalsense/utils/huggingface.py
@contextmanager
def disable_dataset_progress_bars():
    """Context manager to disable progress bars for Hugging Face datasets."""
    progress_bars_enabled = not are_progress_bars_disabled()
    if progress_bars_enabled:
        disable_progress_bars()
    try:
        yield
    finally:
        if progress_bars_enabled:
            enable_progress_bars()

Text

Functions:

Name Description
extract_lines

Extract lines from the text based on a filter function.

extract_score

Extract the first numerical score from text that falls between min_score and max_score.

extract_ternary_answer

Extract a ternary answer (True/False/Unknown) from the text.

extract_weighted_binary_answer

Extract a weighted binary answer from the model output.

extract_weighted_score

Extract a weighted evaluation score from the model output.

format_template

Format a template string with the provided keyword arguments.

extract_lines

extract_lines(
    text: str,
    include_filter_fun: Callable[
        [str], bool
    ] = lambda _: True,
    trim_lines: bool = True,
) -> list[str]

Extract lines from the text based on a filter function.

Parameters:

Name Type Description Default
text str

The text to extract lines from.

required
include_filter_fun Callable[[str], bool]

A function that takes a line and returns True if it should be included, False otherwise. Defaults to a function that includes all lines.

lambda _: True
trim_lines bool

Whether to trim bullet points, list numbers and whitespace from the beginning/end of each line. Defaults to True.

True

Returns:

Type Description
list[str]

list[str]: A list of extracted lines.

Source code in evalsense/utils/text.py
def extract_lines(
    text: str,
    include_filter_fun: Callable[[str], bool] = lambda _: True,
    trim_lines: bool = True,
) -> list[str]:
    """
    Extract lines from the text based on a filter function.

    Args:
        text (str): The text to extract lines from.
        include_filter_fun (Callable[[str], bool], optional): A function that
            takes a line and returns True if it should be included, False
            otherwise. Defaults to a function that includes all lines.
        trim_lines (bool, optional): Whether to trim bullet points, list numbers
            and whitespace from the beginning/end of each line. Defaults to True.

    Returns:
        list[str]: A list of extracted lines.
    """
    lines = text.splitlines()
    if trim_lines:
        lines = [regex.sub(r"^\s*\d+\.\s*", "", line) for line in lines]
        lines = [line.strip().lstrip("*").lstrip("-").strip() for line in lines]

    return [line for line in lines if include_filter_fun(line)]

extract_score

extract_score(
    text: str, min_score: int = 1, max_score: int = 10
) -> int

Extract the first numerical score from text that falls between min_score and max_score.

Parameters:

Name Type Description Default
text str

The text to extract the score from.

required
min_score int

The minimum valid score. Defaults to 1.

1
max_score int

The maximum valid score. Defaults to 10.

10

Returns:

Type Description
int

float | None: The extracted score if found and valid, otherwise None.

Source code in evalsense/utils/text.py
def extract_score(text: str, min_score: int = 1, max_score: int = 10) -> int:
    """
    Extract the first numerical score from text that falls between min_score and max_score.

    Args:
        text (str): The text to extract the score from.
        min_score (int): The minimum valid score. Defaults to 1.
        max_score (int): The maximum valid score. Defaults to 10.

    Returns:
        float | None: The extracted score if found and valid, otherwise None.
    """
    pattern = r"\b\d+\b"
    matches = regex.findall(pattern, text)

    for match in matches:
        try:
            score = int(match)
            if min_score <= score <= max_score:
                return score
        except ValueError:
            continue

    raise ValueError(f"Unable to extract a valid score from text: {text}.")

extract_ternary_answer

extract_ternary_answer(
    text: str,
    binary_only: Literal[True],
    unknown_on_mismatch: Literal[False] = False,
) -> bool
extract_ternary_answer(
    text: str,
    binary_only: Literal[False],
    unknown_on_mismatch: bool = True,
) -> bool | None
extract_ternary_answer(
    text: str,
    binary_only: bool,
    unknown_on_mismatch: bool = True,
) -> bool | None

Extract a ternary answer (True/False/Unknown) from the text.

Valid answers are 'yes', 'no', 'true', 'false', 'unknown', and 'I don't know'.

Parameters:

Name Type Description Default
text str

The text to extract the answer from.

required
binary_only bool

If True, only 'yes' or 'no' are valid answers.

required
unknown_on_mismatch bool

If True, return None for answers not matching any of the valid answers. If False, raise an error. Only relevant if binary_only is False. Defaults to True.

True

Returns:

Type Description
bool | None

bool | None: The extracted answer - bool for True/False, None for Unknown.

Source code in evalsense/utils/text.py
def extract_ternary_answer(
    text: str, binary_only: bool, unknown_on_mismatch: bool = True
) -> bool | None:
    """
    Extract a ternary answer (True/False/Unknown) from the text.

    Valid answers are 'yes', 'no', 'true', 'false', 'unknown', and 'I don't know'.

    Args:
        text (str): The text to extract the answer from.
        binary_only (bool): If True, only 'yes' or 'no' are valid answers.
        unknown_on_mismatch (bool): If True, return None for answers not
            matching any of the valid answers. If False, raise an error.
            Only relevant if binary_only is False. Defaults to True.

    Returns:
        bool | None: The extracted answer - bool for True/False, None for Unknown.
    """
    pattern = r"\b(?:yes|no|true|false|unknown|i don't know)\b"
    match = regex.search(pattern, text, regex.IGNORECASE)
    if match:
        answer = match.group(0).lower()
        if answer in ["yes", "true"]:
            return True
        elif answer in ["no", "false"]:
            return False
        else:
            if binary_only:
                raise ValueError(
                    "Binary answer expected, but 'unknown' or 'I don't know' found."
                )
            return None
    if not binary_only and unknown_on_mismatch:
        return None
    raise ValueError(f"Unable to extract a binary answer from text: {text}.")

extract_weighted_binary_answer

extract_weighted_binary_answer(
    output: ModelOutput,
) -> float

Extract a weighted binary answer from the model output.

Parameters:

Name Type Description Default
output ModelOutput

The model output containing logprobs.

required

Returns:

Name Type Description
float float

The model probability of the answer being True.

Source code in evalsense/utils/text.py
def extract_weighted_binary_answer(output: ModelOutput) -> float:
    """
    Extract a weighted binary answer from the model output.

    Args:
        output (ModelOutput): The model output containing logprobs.

    Returns:
        float: The model probability of the answer being True.
    """
    valid_options = ["true", "false", "yes", "no"]

    def normalise_token(token: str) -> str:
        return token.strip().replace("▁", "").replace("_", "").lower()

    def match_target_token(token: str) -> bool:
        return token in valid_options

    def token_parsing_function(token: str) -> bool:
        if token not in valid_options:
            raise ValueError

        return token in ["true", "yes"]

    probabilities = _eval_weighted_options(
        output,
        normalise_token,
        match_target_token,
        token_parsing_function,
    )

    return probabilities[True]

extract_weighted_score

extract_weighted_score(
    output: ModelOutput,
    min_score: int = 1,
    max_score: int = 10,
) -> float

Extract a weighted evaluation score from the model output.

Parameters:

Name Type Description Default
output ModelOutput

The model output containing logprobs.

required
min_score int

The minimum valid score. Defaults to 1.

1
max_score int

The maximum valid score. Defaults to 10.

10

Returns:

Name Type Description
float float

The weighted score.

Source code in evalsense/utils/text.py
def extract_weighted_score(
    output: ModelOutput, min_score: int = 1, max_score: int = 10
) -> float:
    """
    Extract a weighted evaluation score from the model output.

    Args:
        output (ModelOutput): The model output containing logprobs.
        min_score (int): The minimum valid score. Defaults to 1.
        max_score (int): The maximum valid score. Defaults to 10.

    Returns:
        float: The weighted score.
    """
    target_score = extract_score(output.completion, min_score, max_score)

    def normalise_token(token: str) -> str:
        return token.strip().replace("▁", "").replace("_", "")

    def match_target_token(token: str) -> bool:
        return token == str(target_score)

    def token_parsing_function(token: str) -> int:
        score = int(token)
        if min_score <= score <= max_score:
            return score
        else:
            raise ValueError

    probabilities = _eval_weighted_options(
        output,
        normalise_token,
        match_target_token,
        token_parsing_function,
    )

    # Calculate the weighted score
    return sum([score * prob for score, prob in probabilities.items()])

format_template

format_template(template: str, **kwargs) -> str

Format a template string with the provided keyword arguments.

Parameters:

Name Type Description Default
template str

The template string to format.

required
**kwargs dict[str, any]

Keyword arguments to replace placeholders in the template.

{}

Returns:

Name Type Description
str str

The formatted string.

Source code in evalsense/utils/text.py
def format_template(template: str, **kwargs) -> str:
    """
    Format a template string with the provided keyword arguments.

    Args:
        template (str): The template string to format.
        **kwargs (dict[str, any]): Keyword arguments to replace placeholders in the template.

    Returns:
        str: The formatted string.
    """
    try:
        return template.format(**kwargs)
    except KeyError as e:
        raise KeyError(
            f"Missing key in template formatting: {str(e)}. "
            f"The provided keys were: {list(kwargs.keys())}. "
            "Ensure all placeholders in the template are provided in kwargs."
        )