Skip to content

Result Analysers

Modules:

Name Description
meta_result_analyser
metric_correlation_analyser
tabular_analyser

Classes:

Name Description
CorrelationResults

Class to hold correlation analysis results.

MetaResultAnalyser

An analyser for conducing a meta-evaluation of different evaluation methods.

MetricCorrelationAnalyser

An analyser calculating and visualizing correlations between

TabularResultAnalyser

An analyser summarising evaluation results in a tabular format.

CorrelationResults dataclass

Class to hold correlation analysis results.

Source code in evalsense/workflow/analysers/metric_correlation_analyser.py
@dataclass
class CorrelationResults[T: pl.DataFrame | pd.DataFrame | npt.NDArray[np.float_]]:
    """Class to hold correlation analysis results."""

    correlation_matrix: T
    figure: Figure | None = None

MetaResultAnalyser

Bases: ResultAnalyser[T]

An analyser for conducing a meta-evaluation of different evaluation methods.

The analyser computes the Spearman rank correlation between the rankings specified by the meta tiers and the scores returned by the evaluation methods. The meta tiers can either be sourced from human annotations or be based on progressive perturbations for automatic meta-evaluation.

Methods:

Name Description
__call__

Analyses the results from perturbation-based meta-evaluation experiments.

Source code in evalsense/workflow/analysers/meta_result_analyser.py
class MetaResultAnalyser[T: pl.DataFrame | pd.DataFrame](ResultAnalyser[T]):
    """An analyser for conducing a meta-evaluation of different evaluation methods.

    The analyser computes the Spearman rank correlation between the rankings specified by
    the meta tiers and the scores returned by the evaluation methods.
    The meta tiers can either be sourced from human annotations or be based on
    progressive perturbations for automatic meta-evaluation.
    """

    def __init__(
        self,
        name: str = "MetaResultAnalyser",
        output_format: Literal["polars", "pandas", "numpy"] = "polars",
    ):
        super().__init__(name=name)
        if output_format not in OUTPUT_FORMATTERS:
            raise ValueError(
                f"Invalid output format: {output_format}. "
                f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
            )
        self.output_format = output_format

    @override
    def __call__(
        self,
        project: Project,
        meta_tier_field: str = "perturbation_type_tier",
        lower_tier_is_better: bool = False,
        metric_labels: dict[str, str] | None = None,
        **kwargs: dict[str, Any],
    ) -> T:
        """
        Analyses the results from perturbation-based meta-evaluation experiments.

        Args:
            project (Project): The project holding the meta-evaluation data to analyse.
            meta_tier_field (str): The field name that indicates the meta-evaluation
                tier to specify the expected score ranking.
            lower_tier_is_better (bool): If True, lower perturbation tiers correspond
                to better outputs. If False, higher tiers are better. Defaults to False.
            metric_labels (dict[str, str] | None): A dictionary mapping metric names
                to their labels in the output table. If None, no aliasing is performed.
                Defaults to None.
            **kwargs (dict[str, Any]): Additional keyword arguments.

        Returns:
            T: The analysed results in the specified output format.
        """
        eval_logs = project.get_logs(type="evaluation", status="success")

        # Data structure for tracking the intermediate results
        # The nested dictionary is indexed by perturbation record → sample ID → perturbation tier
        result_data: dict[
            PerturbationGroupedRecord, dict[str | int, dict[int, float | int]]
        ] = defaultdict(lambda: defaultdict(dict))

        for eval_record, log in eval_logs.items():
            if not hasattr(log, "samples") or not log.samples:
                continue

            # Extract scores for the individual samples
            for sample in log.samples:
                if not hasattr(sample, "scores") or not sample.scores:
                    continue

                if meta_tier_field not in sample.metadata:
                    raise ValueError(
                        f"Perturbation tier field '{meta_tier_field}' not found in sample metadata."
                    )
                perturbation_tier = int(cast(int, sample.metadata.get(meta_tier_field)))
                sample_id = sample.id

                for metric_name, score in sample.scores.items():
                    if type(score.value) is float or type(score.value) is int:
                        if metric_labels is not None and metric_name in metric_labels:
                            metric_name = metric_labels[metric_name]

                        result_data[
                            eval_record.get_perturbation_grouped_record(metric_name)
                        ][sample_id][perturbation_tier] = score.value
                    elif type(score.value) is dict:
                        # Extract inner scores from result dictionary
                        for inner_metric_name, inner_score in score.value.items():
                            if (
                                metric_labels is not None
                                and inner_metric_name in metric_labels
                            ):
                                inner_metric_name = metric_labels[inner_metric_name]

                            if type(inner_score) is float or type(inner_score) is int:
                                result_data[
                                    eval_record.get_perturbation_grouped_record(
                                        inner_metric_name
                                    )
                                ][sample_id][perturbation_tier] = inner_score
            del log

        # For each metric, compute average spearman rank correlation between the
        # meta tiers and the scores
        correlation_data: dict[str, list[float]] = defaultdict(list)
        for perturbation_record, samples in result_data.items():
            for sample_id, perturbation_scores in samples.items():
                perturbation_tiers = list(perturbation_scores.keys())
                perturbation_values = list(perturbation_scores.values())
                multiplier = -1 if lower_tier_is_better else 1
                correlation = spearmanr(
                    [(multiplier * pt) for pt in perturbation_tiers],
                    perturbation_values,
                ).correlation  # type: ignore
                if math.isnan(correlation):
                    continue
                correlation_data[perturbation_record.metric_name].append(correlation)

        correlation_results = []
        for metric_name, correlations in correlation_data.items():
            correlation_results.append(
                {
                    "metric_name": metric_name,
                    "avg_correlation": sum(correlations) / len(correlations),
                }
            )

        df = pl.DataFrame(correlation_results)
        if self.output_format in OUTPUT_FORMATTERS:
            return cast(T, OUTPUT_FORMATTERS[self.output_format](df))
        raise ValueError(
            f"Invalid output format: {self.output_format}. "
            f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
        )

__call__

__call__(
    project: Project,
    meta_tier_field: str = "perturbation_type_tier",
    lower_tier_is_better: bool = False,
    metric_labels: dict[str, str] | None = None,
    **kwargs: dict[str, Any],
) -> T

Analyses the results from perturbation-based meta-evaluation experiments.

Parameters:

Name Type Description Default
project Project

The project holding the meta-evaluation data to analyse.

required
meta_tier_field str

The field name that indicates the meta-evaluation tier to specify the expected score ranking.

'perturbation_type_tier'
lower_tier_is_better bool

If True, lower perturbation tiers correspond to better outputs. If False, higher tiers are better. Defaults to False.

False
metric_labels dict[str, str] | None

A dictionary mapping metric names to their labels in the output table. If None, no aliasing is performed. Defaults to None.

None
**kwargs dict[str, Any]

Additional keyword arguments.

{}

Returns:

Name Type Description
T T

The analysed results in the specified output format.

Source code in evalsense/workflow/analysers/meta_result_analyser.py
@override
def __call__(
    self,
    project: Project,
    meta_tier_field: str = "perturbation_type_tier",
    lower_tier_is_better: bool = False,
    metric_labels: dict[str, str] | None = None,
    **kwargs: dict[str, Any],
) -> T:
    """
    Analyses the results from perturbation-based meta-evaluation experiments.

    Args:
        project (Project): The project holding the meta-evaluation data to analyse.
        meta_tier_field (str): The field name that indicates the meta-evaluation
            tier to specify the expected score ranking.
        lower_tier_is_better (bool): If True, lower perturbation tiers correspond
            to better outputs. If False, higher tiers are better. Defaults to False.
        metric_labels (dict[str, str] | None): A dictionary mapping metric names
            to their labels in the output table. If None, no aliasing is performed.
            Defaults to None.
        **kwargs (dict[str, Any]): Additional keyword arguments.

    Returns:
        T: The analysed results in the specified output format.
    """
    eval_logs = project.get_logs(type="evaluation", status="success")

    # Data structure for tracking the intermediate results
    # The nested dictionary is indexed by perturbation record → sample ID → perturbation tier
    result_data: dict[
        PerturbationGroupedRecord, dict[str | int, dict[int, float | int]]
    ] = defaultdict(lambda: defaultdict(dict))

    for eval_record, log in eval_logs.items():
        if not hasattr(log, "samples") or not log.samples:
            continue

        # Extract scores for the individual samples
        for sample in log.samples:
            if not hasattr(sample, "scores") or not sample.scores:
                continue

            if meta_tier_field not in sample.metadata:
                raise ValueError(
                    f"Perturbation tier field '{meta_tier_field}' not found in sample metadata."
                )
            perturbation_tier = int(cast(int, sample.metadata.get(meta_tier_field)))
            sample_id = sample.id

            for metric_name, score in sample.scores.items():
                if type(score.value) is float or type(score.value) is int:
                    if metric_labels is not None and metric_name in metric_labels:
                        metric_name = metric_labels[metric_name]

                    result_data[
                        eval_record.get_perturbation_grouped_record(metric_name)
                    ][sample_id][perturbation_tier] = score.value
                elif type(score.value) is dict:
                    # Extract inner scores from result dictionary
                    for inner_metric_name, inner_score in score.value.items():
                        if (
                            metric_labels is not None
                            and inner_metric_name in metric_labels
                        ):
                            inner_metric_name = metric_labels[inner_metric_name]

                        if type(inner_score) is float or type(inner_score) is int:
                            result_data[
                                eval_record.get_perturbation_grouped_record(
                                    inner_metric_name
                                )
                            ][sample_id][perturbation_tier] = inner_score
        del log

    # For each metric, compute average spearman rank correlation between the
    # meta tiers and the scores
    correlation_data: dict[str, list[float]] = defaultdict(list)
    for perturbation_record, samples in result_data.items():
        for sample_id, perturbation_scores in samples.items():
            perturbation_tiers = list(perturbation_scores.keys())
            perturbation_values = list(perturbation_scores.values())
            multiplier = -1 if lower_tier_is_better else 1
            correlation = spearmanr(
                [(multiplier * pt) for pt in perturbation_tiers],
                perturbation_values,
            ).correlation  # type: ignore
            if math.isnan(correlation):
                continue
            correlation_data[perturbation_record.metric_name].append(correlation)

    correlation_results = []
    for metric_name, correlations in correlation_data.items():
        correlation_results.append(
            {
                "metric_name": metric_name,
                "avg_correlation": sum(correlations) / len(correlations),
            }
        )

    df = pl.DataFrame(correlation_results)
    if self.output_format in OUTPUT_FORMATTERS:
        return cast(T, OUTPUT_FORMATTERS[self.output_format](df))
    raise ValueError(
        f"Invalid output format: {self.output_format}. "
        f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
    )

MetricCorrelationAnalyser

Bases: ResultAnalyser[T]

An analyser calculating and visualizing correlations between different evaluation metrics.

This class analyzes the correlation between scores returned for individual samples by pairs of different evaluation methods, and produces a correlation matrix plot.

Methods:

Name Description
__call__

Calculates Spearman rank correlations between evaluation metrics.

__init__

Initializes the metric correlation analyser.

Source code in evalsense/workflow/analysers/metric_correlation_analyser.py
class MetricCorrelationAnalyser[T: CorrelationResults](ResultAnalyser[T]):
    """An analyser calculating and visualizing correlations between
    different evaluation metrics.

    This class analyzes the correlation between scores returned for individual samples
    by pairs of different evaluation methods, and produces a correlation matrix plot.
    """

    def __init__(
        self,
        name: str = "MetricCorrelationAnalyser",
        output_format: Literal["polars", "pandas", "numpy"] = "polars",
    ):
        """Initializes the metric correlation analyser.

        Args:
            name (str): The name of the metric correlation analyser.
            output_format (Literal["polars", "pandas", "numpy"]): The output
                format of the correlation matrix. Can be "polars", "pandas",
                or "numpy". Defaults to "polars".
        """
        super().__init__(name=name)
        if output_format not in OUTPUT_FORMATTERS:
            raise ValueError(
                f"Invalid output format: {output_format}. "
                f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
            )
        self.output_format = output_format

    @override
    def __call__(
        self,
        project: Project,
        corr_method: Literal["spearman", "pearson"] = "spearman",
        return_plot: bool = True,
        figsize: tuple[int, int] = (12, 10),
        metric_labels: dict[str, str] | None = None,
        method_filter_fun: Callable[[str], bool] = lambda _: True,
        **kwargs: dict,
    ) -> T:
        """Calculates Spearman rank correlations between evaluation metrics.

        Args:
            project (Project): The project holding the evaluation data to analyse.
            corr_method (Literal["spearman", "pearson"]): The correlation method to use.
                Can be "spearman" or "pearson". Defaults to "spearman".
            return_plot (bool): Whether to generate and return a visualization of the
                correlation matrix. Defaults to True.
            figsize (Tuple[int, int]): Figure size for the correlation matrix plot.
                Defaults to (10, 8).
            metric_labels (dict[str, str] | None): A dictionary mapping metric names
                to their labels in the figure. If None, no aliasing is performed.
                Defaults to None.
            method_filter_fun (Callable[[str], bool]): A function to filter the
                evaluation methods, taking the method name as input and returning
                True if the method should be included in the analysis. Operates on
                original method names before label translation. Defaults to
                a function that always returns True.
            **kwargs (dict): Additional arguments for the analysis.

        Returns:
            T: The correlation results containing the correlation matrix and
                optionally a visualization.
        """
        eval_logs = project.get_logs(type="evaluation", status="success")

        result_data: dict[str, list[float | int]] = defaultdict(list)
        for log in eval_logs.values():
            if not hasattr(log, "samples") or not log.samples:
                continue

            # Extract scores from individual samples
            sample_result_data: dict[str, list[tuple[str | int, float | int]]] = (
                defaultdict(list)
            )
            for sample in log.samples:
                if not hasattr(sample, "scores") or not sample.scores:
                    continue

                for metric_name, score in sample.scores.items():
                    if type(score.value) is float or type(score.value) is int:
                        if not method_filter_fun(metric_name):
                            continue

                        if metric_labels is not None and metric_name in metric_labels:
                            metric_name = metric_labels[metric_name]

                        sample_result_data[metric_name].append((sample.id, score.value))
                    elif type(score.value) is dict:
                        # Extract inner scores from result dictionary
                        for inner_metric_name, inner_score in score.value.items():
                            if not method_filter_fun(inner_metric_name):
                                continue

                            if (
                                metric_labels is not None
                                and inner_metric_name in metric_labels
                            ):
                                inner_metric_name = metric_labels[inner_metric_name]

                            if type(inner_score) is float or type(inner_score) is int:
                                sample_result_data[inner_metric_name].append(
                                    (sample.id, inner_score)
                                )

            # Aggregate scores across all samples after sorting by sample ID
            # to ensure consistent ordering
            for metric_name, scores in sample_result_data.items():
                sorted_scores = [s[1] for s in sorted(scores, key=lambda x: x[0])]
                result_data[metric_name].extend(sorted_scores)

        sample_scores_df = pl.DataFrame(result_data)

        correlation_data = sample_scores_df.select(
            pl.corr(
                sample_scores_df.get_column(col1),
                sample_scores_df.get_column(col2),
                method=corr_method,
            ).alias(f"{col1}__{col2}")
            for i, col1 in enumerate(sample_scores_df.columns)
            for col2 in sample_scores_df.columns[i:]
        )

        # Reshape the correlation data to a proper matrix format
        cols = sample_scores_df.columns
        matrix_data = [[0.0 for _ in cols] for _ in cols]

        for i, col1 in enumerate(cols):
            for j, col2 in enumerate(cols):
                if i <= j:
                    col_name = f"{col1}__{col2}"
                    if col_name in correlation_data.columns:
                        val = correlation_data.get_column(col_name)[0]
                        matrix_data[i][j] = val
                        matrix_data[j][i] = val  # Matrix is symmetric

        # Create the correlation matrix
        corr_matrix = pl.DataFrame(
            matrix_data,
            schema=cols,
        )
        # Add metric names as a first column
        corr_matrix = corr_matrix.with_columns(
            pl.Series(name="Metric", values=cols)
        ).select("Metric", *cols)

        # Create a visualization of the correlation matrix if requested
        fig = None
        if return_plot:
            # Convert to pandas for visualization with seaborn
            corr_matrix_pd = corr_matrix.to_pandas().set_index("Metric")

            fig, ax = plt.subplots(figsize=figsize)
            mask = np.triu(np.ones_like(corr_matrix_pd, dtype=bool), k=1)
            cmap = sns.diverging_palette(220, 10, as_cmap=True)

            sns.heatmap(
                corr_matrix_pd,
                mask=mask,
                cmap=cmap,
                vmax=1.0,
                vmin=-1.0,
                center=0,
                square=True,
                linewidths=0.5,
                cbar_kws={"shrink": 0.5},
                annot=True,
                fmt=".2f",
                ax=ax,
            )

            plt.title("Spearman Rank Correlation Between Evaluation Metrics")
            plt.tight_layout()

        # Format the output according to the specified format
        if self.output_format in OUTPUT_FORMATTERS:
            formatted_corr_matrix = OUTPUT_FORMATTERS[self.output_format](corr_matrix)
            return cast(
                T,
                CorrelationResults(
                    correlation_matrix=formatted_corr_matrix, figure=fig
                ),
            )

        raise ValueError(
            f"Invalid output format: {self.output_format}. "
            f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
        )

__call__

__call__(
    project: Project,
    corr_method: Literal[
        "spearman", "pearson"
    ] = "spearman",
    return_plot: bool = True,
    figsize: tuple[int, int] = (12, 10),
    metric_labels: dict[str, str] | None = None,
    method_filter_fun: Callable[
        [str], bool
    ] = lambda _: True,
    **kwargs: dict,
) -> T

Calculates Spearman rank correlations between evaluation metrics.

Parameters:

Name Type Description Default
project Project

The project holding the evaluation data to analyse.

required
corr_method Literal['spearman', 'pearson']

The correlation method to use. Can be "spearman" or "pearson". Defaults to "spearman".

'spearman'
return_plot bool

Whether to generate and return a visualization of the correlation matrix. Defaults to True.

True
figsize Tuple[int, int]

Figure size for the correlation matrix plot. Defaults to (10, 8).

(12, 10)
metric_labels dict[str, str] | None

A dictionary mapping metric names to their labels in the figure. If None, no aliasing is performed. Defaults to None.

None
method_filter_fun Callable[[str], bool]

A function to filter the evaluation methods, taking the method name as input and returning True if the method should be included in the analysis. Operates on original method names before label translation. Defaults to a function that always returns True.

lambda _: True
**kwargs dict

Additional arguments for the analysis.

{}

Returns:

Name Type Description
T T

The correlation results containing the correlation matrix and optionally a visualization.

Source code in evalsense/workflow/analysers/metric_correlation_analyser.py
@override
def __call__(
    self,
    project: Project,
    corr_method: Literal["spearman", "pearson"] = "spearman",
    return_plot: bool = True,
    figsize: tuple[int, int] = (12, 10),
    metric_labels: dict[str, str] | None = None,
    method_filter_fun: Callable[[str], bool] = lambda _: True,
    **kwargs: dict,
) -> T:
    """Calculates Spearman rank correlations between evaluation metrics.

    Args:
        project (Project): The project holding the evaluation data to analyse.
        corr_method (Literal["spearman", "pearson"]): The correlation method to use.
            Can be "spearman" or "pearson". Defaults to "spearman".
        return_plot (bool): Whether to generate and return a visualization of the
            correlation matrix. Defaults to True.
        figsize (Tuple[int, int]): Figure size for the correlation matrix plot.
            Defaults to (10, 8).
        metric_labels (dict[str, str] | None): A dictionary mapping metric names
            to their labels in the figure. If None, no aliasing is performed.
            Defaults to None.
        method_filter_fun (Callable[[str], bool]): A function to filter the
            evaluation methods, taking the method name as input and returning
            True if the method should be included in the analysis. Operates on
            original method names before label translation. Defaults to
            a function that always returns True.
        **kwargs (dict): Additional arguments for the analysis.

    Returns:
        T: The correlation results containing the correlation matrix and
            optionally a visualization.
    """
    eval_logs = project.get_logs(type="evaluation", status="success")

    result_data: dict[str, list[float | int]] = defaultdict(list)
    for log in eval_logs.values():
        if not hasattr(log, "samples") or not log.samples:
            continue

        # Extract scores from individual samples
        sample_result_data: dict[str, list[tuple[str | int, float | int]]] = (
            defaultdict(list)
        )
        for sample in log.samples:
            if not hasattr(sample, "scores") or not sample.scores:
                continue

            for metric_name, score in sample.scores.items():
                if type(score.value) is float or type(score.value) is int:
                    if not method_filter_fun(metric_name):
                        continue

                    if metric_labels is not None and metric_name in metric_labels:
                        metric_name = metric_labels[metric_name]

                    sample_result_data[metric_name].append((sample.id, score.value))
                elif type(score.value) is dict:
                    # Extract inner scores from result dictionary
                    for inner_metric_name, inner_score in score.value.items():
                        if not method_filter_fun(inner_metric_name):
                            continue

                        if (
                            metric_labels is not None
                            and inner_metric_name in metric_labels
                        ):
                            inner_metric_name = metric_labels[inner_metric_name]

                        if type(inner_score) is float or type(inner_score) is int:
                            sample_result_data[inner_metric_name].append(
                                (sample.id, inner_score)
                            )

        # Aggregate scores across all samples after sorting by sample ID
        # to ensure consistent ordering
        for metric_name, scores in sample_result_data.items():
            sorted_scores = [s[1] for s in sorted(scores, key=lambda x: x[0])]
            result_data[metric_name].extend(sorted_scores)

    sample_scores_df = pl.DataFrame(result_data)

    correlation_data = sample_scores_df.select(
        pl.corr(
            sample_scores_df.get_column(col1),
            sample_scores_df.get_column(col2),
            method=corr_method,
        ).alias(f"{col1}__{col2}")
        for i, col1 in enumerate(sample_scores_df.columns)
        for col2 in sample_scores_df.columns[i:]
    )

    # Reshape the correlation data to a proper matrix format
    cols = sample_scores_df.columns
    matrix_data = [[0.0 for _ in cols] for _ in cols]

    for i, col1 in enumerate(cols):
        for j, col2 in enumerate(cols):
            if i <= j:
                col_name = f"{col1}__{col2}"
                if col_name in correlation_data.columns:
                    val = correlation_data.get_column(col_name)[0]
                    matrix_data[i][j] = val
                    matrix_data[j][i] = val  # Matrix is symmetric

    # Create the correlation matrix
    corr_matrix = pl.DataFrame(
        matrix_data,
        schema=cols,
    )
    # Add metric names as a first column
    corr_matrix = corr_matrix.with_columns(
        pl.Series(name="Metric", values=cols)
    ).select("Metric", *cols)

    # Create a visualization of the correlation matrix if requested
    fig = None
    if return_plot:
        # Convert to pandas for visualization with seaborn
        corr_matrix_pd = corr_matrix.to_pandas().set_index("Metric")

        fig, ax = plt.subplots(figsize=figsize)
        mask = np.triu(np.ones_like(corr_matrix_pd, dtype=bool), k=1)
        cmap = sns.diverging_palette(220, 10, as_cmap=True)

        sns.heatmap(
            corr_matrix_pd,
            mask=mask,
            cmap=cmap,
            vmax=1.0,
            vmin=-1.0,
            center=0,
            square=True,
            linewidths=0.5,
            cbar_kws={"shrink": 0.5},
            annot=True,
            fmt=".2f",
            ax=ax,
        )

        plt.title("Spearman Rank Correlation Between Evaluation Metrics")
        plt.tight_layout()

    # Format the output according to the specified format
    if self.output_format in OUTPUT_FORMATTERS:
        formatted_corr_matrix = OUTPUT_FORMATTERS[self.output_format](corr_matrix)
        return cast(
            T,
            CorrelationResults(
                correlation_matrix=formatted_corr_matrix, figure=fig
            ),
        )

    raise ValueError(
        f"Invalid output format: {self.output_format}. "
        f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
    )

__init__

__init__(
    name: str = "MetricCorrelationAnalyser",
    output_format: Literal[
        "polars", "pandas", "numpy"
    ] = "polars",
)

Initializes the metric correlation analyser.

Parameters:

Name Type Description Default
name str

The name of the metric correlation analyser.

'MetricCorrelationAnalyser'
output_format Literal['polars', 'pandas', 'numpy']

The output format of the correlation matrix. Can be "polars", "pandas", or "numpy". Defaults to "polars".

'polars'
Source code in evalsense/workflow/analysers/metric_correlation_analyser.py
def __init__(
    self,
    name: str = "MetricCorrelationAnalyser",
    output_format: Literal["polars", "pandas", "numpy"] = "polars",
):
    """Initializes the metric correlation analyser.

    Args:
        name (str): The name of the metric correlation analyser.
        output_format (Literal["polars", "pandas", "numpy"]): The output
            format of the correlation matrix. Can be "polars", "pandas",
            or "numpy". Defaults to "polars".
    """
    super().__init__(name=name)
    if output_format not in OUTPUT_FORMATTERS:
        raise ValueError(
            f"Invalid output format: {output_format}. "
            f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
        )
    self.output_format = output_format

TabularResultAnalyser

Bases: ResultAnalyser[T]

An analyser summarising evaluation results in a tabular format.

This class is generic in T to provide better type hints when returning different output types. It is the responsibility of the client code to ensure that the specified output_format is compatible with the type T. For example, a correct use of this class could look as follows:

analyser = TabularResultAnalyser[pl.DataFrame](
    output_format="polars",
)

Methods:

Name Description
__call__

Analyses the evaluation results.

__init__

Initializes the tabular result analyser.

Source code in evalsense/workflow/analysers/tabular_analyser.py
class TabularResultAnalyser[T: pl.DataFrame | pd.DataFrame](ResultAnalyser[T]):
    """An analyser summarising evaluation results in a tabular format.

    This class is generic in T to provide better type hints when returning
    different output types. It is the responsibility of the client code to
    ensure that the specified `output_format` is compatible with the type T.
    For example, a correct use of this class could look as follows:

        analyser = TabularResultAnalyser[pl.DataFrame](
            output_format="polars",
        )
    """

    def __init__(
        self,
        name: str = "TabularResultAnalyser",
        output_format: Literal["polars", "pandas"] = "polars",
    ):
        """Initializes the tabular result analyser.

        Args:
            name (str): The name of the tabular result analyser.
            output_format (Literal["polars", "pandas", "dataset"]): The output format of the
                result. Can be "polars" or "pandas". Defaults to "polars".
        """
        super().__init__(name=name)
        if output_format not in OUTPUT_FORMATTERS:
            raise ValueError(
                f"Invalid output format: {output_format}. "
                f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
            )
        self.output_format = output_format

    @override
    def __call__(self, project: Project, **kwargs: dict) -> T:
        """Analyses the evaluation results.

        Args:
            project (Project): The project holding the evaluation data to analyse.
            **kwargs (dict): Additional arguments for the analysis.

        Returns:
            T: The analysed results in the specified output format.
        """
        eval_logs = project.get_logs(type="evaluation", status="success")

        result_data = []
        for eval_record, log in eval_logs.items():
            if not log.results:
                continue

            for score in log.results.scores:
                for metric_name, metric in score.metrics.items():
                    value = metric.value

                    result_data.append(
                        {
                            "dataset": eval_record.dataset_record.name,
                            "splits": ", ".join(eval_record.dataset_record.splits),
                            "task": eval_record.task_name,
                            "generator": eval_record.generator_name,
                            "model": eval_record.model_record.name,
                            "metric": f"{score.name}/{metric_name}",
                            "value": value,
                        }
                    )

        df = pl.DataFrame(result_data)
        df = df.pivot(
            on="metric",
            index=["dataset", "splits", "task", "generator", "model"],
            values="value",
            aggregate_function="first",
        )
        if self.output_format in OUTPUT_FORMATTERS:
            return cast(T, OUTPUT_FORMATTERS[self.output_format](df))
        raise ValueError(
            f"Invalid output format: {self.output_format}. "
            f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
        )

__call__

__call__(project: Project, **kwargs: dict) -> T

Analyses the evaluation results.

Parameters:

Name Type Description Default
project Project

The project holding the evaluation data to analyse.

required
**kwargs dict

Additional arguments for the analysis.

{}

Returns:

Name Type Description
T T

The analysed results in the specified output format.

Source code in evalsense/workflow/analysers/tabular_analyser.py
@override
def __call__(self, project: Project, **kwargs: dict) -> T:
    """Analyses the evaluation results.

    Args:
        project (Project): The project holding the evaluation data to analyse.
        **kwargs (dict): Additional arguments for the analysis.

    Returns:
        T: The analysed results in the specified output format.
    """
    eval_logs = project.get_logs(type="evaluation", status="success")

    result_data = []
    for eval_record, log in eval_logs.items():
        if not log.results:
            continue

        for score in log.results.scores:
            for metric_name, metric in score.metrics.items():
                value = metric.value

                result_data.append(
                    {
                        "dataset": eval_record.dataset_record.name,
                        "splits": ", ".join(eval_record.dataset_record.splits),
                        "task": eval_record.task_name,
                        "generator": eval_record.generator_name,
                        "model": eval_record.model_record.name,
                        "metric": f"{score.name}/{metric_name}",
                        "value": value,
                    }
                )

    df = pl.DataFrame(result_data)
    df = df.pivot(
        on="metric",
        index=["dataset", "splits", "task", "generator", "model"],
        values="value",
        aggregate_function="first",
    )
    if self.output_format in OUTPUT_FORMATTERS:
        return cast(T, OUTPUT_FORMATTERS[self.output_format](df))
    raise ValueError(
        f"Invalid output format: {self.output_format}. "
        f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
    )

__init__

__init__(
    name: str = "TabularResultAnalyser",
    output_format: Literal["polars", "pandas"] = "polars",
)

Initializes the tabular result analyser.

Parameters:

Name Type Description Default
name str

The name of the tabular result analyser.

'TabularResultAnalyser'
output_format Literal['polars', 'pandas', 'dataset']

The output format of the result. Can be "polars" or "pandas". Defaults to "polars".

'polars'
Source code in evalsense/workflow/analysers/tabular_analyser.py
def __init__(
    self,
    name: str = "TabularResultAnalyser",
    output_format: Literal["polars", "pandas"] = "polars",
):
    """Initializes the tabular result analyser.

    Args:
        name (str): The name of the tabular result analyser.
        output_format (Literal["polars", "pandas", "dataset"]): The output format of the
            result. Can be "polars" or "pandas". Defaults to "polars".
    """
    super().__init__(name=name)
    if output_format not in OUTPUT_FORMATTERS:
        raise ValueError(
            f"Invalid output format: {output_format}. "
            f"Must be one of: {', '.join(OUTPUT_FORMATTERS.keys())}."
        )
    self.output_format = output_format