Skip to content

Workflow

Modules:

Name Description
analysers
pipeline
project
result_analyser

Classes:

Name Description
Pipeline

A pipeline for evaluating LLMs.

Project

An EvalSense project, tracking the performed experiments and their results.

ResultAnalyser

A protocol for analysing or aggregating evaluation results.

Pipeline

A pipeline for evaluating LLMs.

Methods:

Name Description
__init__

Initializes a new Pipeline.

evaluate

Runs the evaluation stage of the pipeline.

generate

Runs the generation stage of the pipeline.

run

Runs the pipeline.

Attributes:

Name Type Description
evaluation_experiments

Returns unique evaluation stages of the experiments.

generation_experiments

Returns unique generation stages of the experiments.

Source code in evalsense/workflow/pipeline.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
class Pipeline:
    """A pipeline for evaluating LLMs."""

    def __init__(
        self,
        experiments: ExperimentDefinitions,
        project: Project,
        maintain_order: bool = False,
    ):
        """Initializes a new Pipeline.

        Args:
            experiments (ExperimentBatchConfig | ExperimentConfig | list[ExperimentBatchConfig | ExperimentConfig]):
                The experiments to run in the pipeline.
            project (Project): The project in which to track the results and outputs.
            maintain_order (bool): Whether to maintain the order of the experiments or
                whether to reorder them to reduce the number of model loads. Defaults
                to False.
        """
        # Standardize experiments to a list of ExperimentConfigs
        if not isinstance(experiments, list):
            experiments = [experiments]
        all_experiments: list[ExperimentConfig] = []
        for experiment in experiments:
            if isinstance(experiment, ExperimentBatchConfig):
                experiment.validate()
                all_experiments.extend(experiment.all_experiments)
            else:
                all_experiments.append(experiment)
        self.experiments = all_experiments
        self.project = project
        self._maintain_order = maintain_order
        self._active_model_config: ModelConfig | None = None
        self._active_model: Model | None = None

    @property
    def generation_experiments(self):
        """Returns unique generation stages of the experiments."""
        experiments = {e.generation_record: e for e in self.experiments}
        experiments_list = list(experiments.values())
        if not self._maintain_order:
            # Sort experiments to minimise model loads
            experiments_list = sorted(
                experiments_list, key=lambda x: x.model_config.name
            )
        return experiments_list

    @property
    def evaluation_experiments(self):
        """Returns unique evaluation stages of the experiments."""
        experiments = {e.evaluation_record: e for e in self.experiments}
        experiments_list = list(experiments.values())
        if not self._maintain_order:
            # Sort experiments to minimise model loads
            experiments_list = sorted(
                experiments_list,
                key=lambda x: "" if x.evaluator is None else x.evaluator.model_name,
            )
        return experiments_list

    def _cleanup_active_model(self):
        """Cleans up the active model if it exists."""
        if self._active_model is not None:
            # Cleanup background processes to free CUDA memory
            # Temporary workaround for Inspect + vLLM memory leaks,
            # see https://github.com/UKGovernmentBEIS/inspect_ai/issues/1543
            main_id = os.getpid()
            parent = psutil.Process(main_id)
            children = parent.children(recursive=True)
            for child in children:
                try:
                    child.terminate()
                except psutil.NoSuchProcess:
                    pass
            _, still_alive = psutil.wait_procs(children, timeout=5)
            if still_alive:
                logger.warning(
                    "⚠️  Unable to fully clean up background processes "
                    f"({len(still_alive)}/{len(children)} still alive). "
                    "Unless this results in overly high resource usage, "
                    "you can safely ignore this warning."
                )

            self._active_model_config = None
            self._active_model = None

    def _load_model(
        self,
        new_model_config: ModelConfig,
    ) -> Model:
        """Gets the model for the current experiment.

        Args:
            new_model_config (ModelConfig): The model configuration for the new model
                to be loaded.

        Returns:
            Model: The model for the current experiment.
        """
        if new_model_config != self._active_model_config:
            logger.info(f"▶️  Loading model {new_model_config.name}.")

            # Loading a new model — clean up the previous one
            self._cleanup_active_model()

            # Prepare the new model
            if isinstance(new_model_config.model, Model):
                new_model = new_model_config.model
            else:
                new_model = get_model(
                    model=new_model_config.model,
                    **new_model_config.model_args,
                    config=GenerateConfig(**new_model_config.generation_args),
                    memoize=False,
                )

            self._active_model_config = new_model_config
            self._active_model = new_model

            return new_model

        # Reusing the previous model
        return cast(Model, self._active_model)

    def _generate_on_dataset(
        self,
        experiment: ExperimentConfig,
        inspect_dataset: Dataset,
        force_rerun: bool,
        eval_kwargs: dict[str, Any] | None,
        eval_retry_kwargs: dict[str, Any] | None,
    ):
        """Generates the results for a given dataset and experiment.

        Args:
            experiment (ExperimentConfig): The experiment configuration.
            inspect_dataset (Dataset): The dataset to process.
            force_rerun (bool): Whether to force rerun the experiment.
            eval_kwargs (dict[str, Any], optional): Additional arguments to pass
                to the Inspect eval function. Defaults to empty dictionary when
                None.
            eval_retry_kwargs (dict[str, Any], optional): Additional arguments
                to pass to the Inspect eval function for retrying failed tasks.
                Defaults to empty dictionary when None.
        """
        prev_record = self.project.get_record(experiment.generation_record)
        interrupted = False

        # Inspect AI logs can only include serialisible task arguments, so we
        # need to use a closure to pass the dataset and solvers to the task.
        @task
        def create_task(task_name: str) -> Task:
            """Creates an Inspect AI task for the experiment.

            Args:
                task_name (str): The name of the task.

            Returns:
                Task: The Inspect AI task.
            """
            return Task(
                dataset=inspect_dataset,
                solver=experiment.generation_steps.steps,
                name=task_name,
            )

        # We need to create the task even when resuming from a previous log,
        # otherwise Inspect will not be able to resolve it.
        inspect_task = create_task(to_safe_filename(experiment.generation_record.label))
        if prev_record is None or prev_record.log_location is None or force_rerun:
            self.project.update_record(experiment.generation_record, ResultRecord())

            # Try generating the model outputs.
            try:
                eval_logs = eval(
                    tasks=inspect_task,
                    model=self._active_model,
                    log_dir=str(self.project.generation_log_path),
                    score=False,
                    **(eval_kwargs or dict()),
                )
            except BaseException as e:
                eval_logs = self.project.get_incomplete_logs(type="generation")
                interrupted = isinstance(e, KeyboardInterrupt)
        else:
            logger.info(
                f"🔁  Retrying generation using log: {prev_record.log_location}"
            )
            prev_log = read_eval_log(prev_record.log_location)

            # Retry generation using the previous log
            try:
                eval_logs = eval_retry(
                    tasks=prev_log,
                    log_dir=str(self.project.generation_log_path),
                    **(eval_retry_kwargs or dict()),
                )
            except BaseException as e:
                eval_logs = self.project.get_incomplete_logs(type="generation")
                interrupted = isinstance(e, KeyboardInterrupt)

        # Check generation status and update the project record
        status = "error"
        error_message = "Unknown error"
        log_location = None
        if not eval_logs:
            error_message = "No log returned from an experiment."
            logger.error("❌  Generation failed: no log returned from an experiment.")
        else:
            if len(eval_logs) > 1:
                logger.warning(
                    f"⚠️  Unexpected number of eval logs ({len(eval_logs)} > 1), "
                    "results may be ignored."
                )
            eval_log = eval_logs[0]
            log_location = eval_log.location

            if eval_log.status == "error":
                if eval_log.error is not None:
                    error_message = eval_log.error.message
                logger.error(f"❌  Generation failed due to an error: {error_message}")
            elif eval_log.status == "cancelled":
                error_message = "Generation was cancelled."
                logger.error("❌  Generation was cancelled.")
            elif eval_log.status == "started":
                error_message = "Generation was started but did not run to completion."
                logger.error(
                    "❌  Generation was started but did not run to completion."
                )
            elif eval_log.status == "success":
                status = "success"
                error_message = None
                logger.info(
                    f"✅  Generation for {experiment.generation_record.label} "
                    "completed successfully."
                )
        self.project.update_record(
            experiment.generation_record,
            ResultRecord(
                status=status, error_message=error_message, log_location=log_location
            ),
        )

        # If user interrupted the generation, raise KeyboardInterrupt
        if interrupted:
            logger.critical("🛑  Execution was interrupted.")
            raise KeyboardInterrupt()

    def generate(
        self,
        show_progress: bool = True,
        force_rerun: bool = False,
        force_reload: bool = False,
        eval_kwargs: dict[str, Any] | None = None,
        eval_retry_kwargs: dict[str, Any] | None = None,
    ):
        """Runs the generation stage of the pipeline.

        Args:
            show_progress (bool, optional): Whether to show a progress bar.
                Defaults to True.
            force_rerun (bool, optional): Whether to force rerunning the experiments.
                Defaults to False.
            force_reload (bool, optional): Whether to force reloading and
                reprocessing the datasets. Defaults to False.
            eval_kwargs (dict[str, Any], optional): Additional arguments to pass
                to the Inspect eval function. Defaults to empty dictionary when
                None.
            eval_retry_kwargs (dict[str, Any], optional): Additional arguments
                to pass to the Inspect eval function for retrying failed tasks.
                Defaults to empty dictionary when None.
        """
        for experiment in tqdm(
            self.generation_experiments,
            disable=not show_progress,
            desc="Experiment Generation",
        ):
            logger.info(
                f"🔄  Starting generation for {experiment.generation_record.label}"
            )

            # Check if we we already have existing generations
            prev_record = self.project.get_record(
                experiment.generation_record,
            )
            if (
                prev_record is not None
                and prev_record.status == "success"
                and not force_rerun
            ):
                logger.info("⏭️  Generation skipped — already completed.")
                continue

            # Load the dataset
            logger.info(f"▶️  Loading dataset {experiment.dataset_manager.name}.")
            dataset_manager = experiment.dataset_manager
            hf_dataset = dataset_manager.load(
                retrieve=not force_reload,
                cache=True,
                force_retrieve=force_reload,
            )

            # Preprocess the dataset
            logger.info(
                "▶️  Preprocessing dataset with task preprocessor "
                f"{experiment.task_preprocessor.name}."
            )
            task_preprocessor = experiment.task_preprocessor
            inspect_dataset = task_preprocessor(
                hf_dataset,
                dataset_manager,
                field_spec=experiment.field_spec,
                force_reprocess=force_reload,
            )

            self._load_model(experiment.model_config)

            self._generate_on_dataset(
                experiment,
                inspect_dataset,
                force_rerun=force_rerun,
                eval_kwargs=eval_kwargs,
                eval_retry_kwargs=eval_retry_kwargs,
            )
        self._cleanup_active_model()
        logger.info("✨  Generation tasks completed.")

    def evaluate(
        self,
        show_progress: bool = True,
        force_rerun: bool = False,
        score_kwargs: dict[str, Any] | None = None,
    ):
        """Runs the evaluation stage of the pipeline.

        Args:
            show_progress (bool, optional): Whether to show a progress bar.
                Defaults to True.
            force_rerun (bool, optional): Whether to force rerun the experiments.
                Defaults to False.
            score_kwargs (dict[str, Any], optional): Additional arguments to pass
                to the Inspect score function. Defaults to empty dictionary when
                None.
        """
        experiments_to_evaluate = [
            experiment
            for experiment in self.evaluation_experiments
            if experiment.evaluator is not None
        ]
        for experiment in tqdm(
            experiments_to_evaluate,
            disable=not show_progress,
            desc="Experiment Evaluation",
        ):
            logger.info(
                f"🔄  Starting evaluation for {experiment.evaluation_record.label}"
            )

            # Check if we have a record from the generations.
            prev_record = self.project.get_record(
                experiment.evaluation_record,
                init_eval_record_from_generations=True,
            )
            if prev_record is None or prev_record.log_location is None:
                logger.error("❌  Evaluation skipped — no valid generations found.")
                continue
            if prev_record.status == "success" and not force_rerun:
                logger.info("⏭️  Evaluation skipped — already completed.")
                continue

            # Prepare the scorer
            # Safe cast, as we filtered out any None evaluators above
            evaluator = cast(Evaluator, experiment.evaluator)
            scorer = evaluator.scorer
            if isinstance(scorer, ScorerFactory):
                if evaluator.model_config is None:
                    logger.error(
                        "❌  Using ScorerFactory as a scorer for evaluation requires a "
                        "model config to specify the used model. Skipping evaluation."
                    )
                    continue
                scorer = scorer.create_scorer(self._load_model(evaluator.model_config))

            # Retrieve the initial evaluation log.
            init_score_log = self.project.get_log(
                experiment.evaluation_record,
            )
            if init_score_log is None:
                logger.error(
                    "❌  Couldn't load initial evaluation log. Skipping evaluation."
                )
                continue

            # Try scoring the model outputs in the log
            exception = None
            try:
                score_log = score(
                    log=init_score_log,
                    scorers=scorer,
                    action="overwrite",
                    **(score_kwargs or dict()),
                )
            except BaseException as e:
                score_log = self.project.get_log(experiment.evaluation_record)
                exception = e
            score_log = cast(EvalLog, score_log)
            write_eval_log(score_log, location=score_log.location)

            # Check scoring status and update the project record
            status = "error"
            error_message = "Unknown error"
            log_location = None
            if not score_log:
                error_message = "No log returned from evaluation."
                logger.error("❌  Evaluation failed: no log returned from evaluation.")
            else:
                log_location = score_log.location
                if score_log.status == "error" or exception is not None:
                    if score_log.error is not None:
                        error_message = score_log.error.message
                    elif exception is not None:
                        error_message = str(exception)
                    logger.error(
                        f"❌  Evaluation failed due to an error: {error_message}"
                    )
                elif score_log.status == "cancelled":
                    error_message = "Evaluation was cancelled."
                    logger.error("❌  Evaluation was cancelled.")
                elif score_log.status == "success":
                    status = "success"
                    error_message = None
                    logger.info(
                        f"✅  Evaluation for {experiment.evaluation_record.label} "
                        "completed successfully."
                    )
            self.project.update_record(
                experiment.evaluation_record,
                ResultRecord(
                    status=status,
                    error_message=error_message,
                    log_location=log_location,
                ),
            )

            # Perform cleanup if needed
            if evaluator.cleanup_fun is not None:
                try:
                    evaluator.cleanup_fun()
                except Exception as e:
                    logger.error(
                        f"❌  Error during cleanup for {evaluator.name}: {e}. "
                        "Please check the evaluator's cleanup function."
                    )

            # If user interrupted the evaluation, raise KeyboardInterrupt
            if isinstance(exception, KeyboardInterrupt):
                logger.critical("🛑  Execution was interrupted.")
                raise KeyboardInterrupt()

        self._cleanup_active_model()
        logger.info("✨  Evaluation tasks completed.")

    def run(
        self,
        show_progress: bool = True,
        force_rerun: bool = False,
        force_reload: bool = False,
        eval_kwargs: dict[str, Any] | None = None,
        eval_retry_kwargs: dict[str, Any] | None = None,
        score_kwargs: dict[str, Any] | None = None,
    ):
        """Runs the pipeline.

        Args:
            show_progress (bool, optional): Whether to show a progress bar.
                Defaults to True.
            force_rerun (bool, optional): Whether to force rerun the experiments.
                Defaults to False.
            force_reload (bool, optional): Whether to force reloading and
                reprocessing the datasets. Defaults to False.
            eval_kwargs (dict[str, Any], optional): Additional arguments to pass
                to the Inspect eval function. Defaults to empty dictionary when
                None.
            eval_retry_kwargs (dict[str, Any], optional): Additional arguments
                to pass to the Inspect eval function for retrying failed tasks.
                Defaults to empty dictionary when None.
            score_kwargs (dict[str, Any], optional): Additional arguments to pass
                to the Inspect score function. Defaults to empty dictionary when
                None.
        """
        self.generate(
            show_progress=show_progress,
            force_rerun=force_rerun,
            force_reload=force_reload,
            eval_kwargs=eval_kwargs,
            eval_retry_kwargs=eval_retry_kwargs,
        )
        self.evaluate(
            show_progress=show_progress,
            force_rerun=force_rerun,
            score_kwargs=score_kwargs,
        )

evaluation_experiments property

evaluation_experiments

Returns unique evaluation stages of the experiments.

generation_experiments property

generation_experiments

Returns unique generation stages of the experiments.

__init__

__init__(
    experiments: ExperimentDefinitions,
    project: Project,
    maintain_order: bool = False,
)

Initializes a new Pipeline.

Parameters:

Name Type Description Default
experiments ExperimentBatchConfig | ExperimentConfig | list[ExperimentBatchConfig | ExperimentConfig]

The experiments to run in the pipeline.

required
project Project

The project in which to track the results and outputs.

required
maintain_order bool

Whether to maintain the order of the experiments or whether to reorder them to reduce the number of model loads. Defaults to False.

False
Source code in evalsense/workflow/pipeline.py
def __init__(
    self,
    experiments: ExperimentDefinitions,
    project: Project,
    maintain_order: bool = False,
):
    """Initializes a new Pipeline.

    Args:
        experiments (ExperimentBatchConfig | ExperimentConfig | list[ExperimentBatchConfig | ExperimentConfig]):
            The experiments to run in the pipeline.
        project (Project): The project in which to track the results and outputs.
        maintain_order (bool): Whether to maintain the order of the experiments or
            whether to reorder them to reduce the number of model loads. Defaults
            to False.
    """
    # Standardize experiments to a list of ExperimentConfigs
    if not isinstance(experiments, list):
        experiments = [experiments]
    all_experiments: list[ExperimentConfig] = []
    for experiment in experiments:
        if isinstance(experiment, ExperimentBatchConfig):
            experiment.validate()
            all_experiments.extend(experiment.all_experiments)
        else:
            all_experiments.append(experiment)
    self.experiments = all_experiments
    self.project = project
    self._maintain_order = maintain_order
    self._active_model_config: ModelConfig | None = None
    self._active_model: Model | None = None

evaluate

evaluate(
    show_progress: bool = True,
    force_rerun: bool = False,
    score_kwargs: dict[str, Any] | None = None,
)

Runs the evaluation stage of the pipeline.

Parameters:

Name Type Description Default
show_progress bool

Whether to show a progress bar. Defaults to True.

True
force_rerun bool

Whether to force rerun the experiments. Defaults to False.

False
score_kwargs dict[str, Any]

Additional arguments to pass to the Inspect score function. Defaults to empty dictionary when None.

None
Source code in evalsense/workflow/pipeline.py
def evaluate(
    self,
    show_progress: bool = True,
    force_rerun: bool = False,
    score_kwargs: dict[str, Any] | None = None,
):
    """Runs the evaluation stage of the pipeline.

    Args:
        show_progress (bool, optional): Whether to show a progress bar.
            Defaults to True.
        force_rerun (bool, optional): Whether to force rerun the experiments.
            Defaults to False.
        score_kwargs (dict[str, Any], optional): Additional arguments to pass
            to the Inspect score function. Defaults to empty dictionary when
            None.
    """
    experiments_to_evaluate = [
        experiment
        for experiment in self.evaluation_experiments
        if experiment.evaluator is not None
    ]
    for experiment in tqdm(
        experiments_to_evaluate,
        disable=not show_progress,
        desc="Experiment Evaluation",
    ):
        logger.info(
            f"🔄  Starting evaluation for {experiment.evaluation_record.label}"
        )

        # Check if we have a record from the generations.
        prev_record = self.project.get_record(
            experiment.evaluation_record,
            init_eval_record_from_generations=True,
        )
        if prev_record is None or prev_record.log_location is None:
            logger.error("❌  Evaluation skipped — no valid generations found.")
            continue
        if prev_record.status == "success" and not force_rerun:
            logger.info("⏭️  Evaluation skipped — already completed.")
            continue

        # Prepare the scorer
        # Safe cast, as we filtered out any None evaluators above
        evaluator = cast(Evaluator, experiment.evaluator)
        scorer = evaluator.scorer
        if isinstance(scorer, ScorerFactory):
            if evaluator.model_config is None:
                logger.error(
                    "❌  Using ScorerFactory as a scorer for evaluation requires a "
                    "model config to specify the used model. Skipping evaluation."
                )
                continue
            scorer = scorer.create_scorer(self._load_model(evaluator.model_config))

        # Retrieve the initial evaluation log.
        init_score_log = self.project.get_log(
            experiment.evaluation_record,
        )
        if init_score_log is None:
            logger.error(
                "❌  Couldn't load initial evaluation log. Skipping evaluation."
            )
            continue

        # Try scoring the model outputs in the log
        exception = None
        try:
            score_log = score(
                log=init_score_log,
                scorers=scorer,
                action="overwrite",
                **(score_kwargs or dict()),
            )
        except BaseException as e:
            score_log = self.project.get_log(experiment.evaluation_record)
            exception = e
        score_log = cast(EvalLog, score_log)
        write_eval_log(score_log, location=score_log.location)

        # Check scoring status and update the project record
        status = "error"
        error_message = "Unknown error"
        log_location = None
        if not score_log:
            error_message = "No log returned from evaluation."
            logger.error("❌  Evaluation failed: no log returned from evaluation.")
        else:
            log_location = score_log.location
            if score_log.status == "error" or exception is not None:
                if score_log.error is not None:
                    error_message = score_log.error.message
                elif exception is not None:
                    error_message = str(exception)
                logger.error(
                    f"❌  Evaluation failed due to an error: {error_message}"
                )
            elif score_log.status == "cancelled":
                error_message = "Evaluation was cancelled."
                logger.error("❌  Evaluation was cancelled.")
            elif score_log.status == "success":
                status = "success"
                error_message = None
                logger.info(
                    f"✅  Evaluation for {experiment.evaluation_record.label} "
                    "completed successfully."
                )
        self.project.update_record(
            experiment.evaluation_record,
            ResultRecord(
                status=status,
                error_message=error_message,
                log_location=log_location,
            ),
        )

        # Perform cleanup if needed
        if evaluator.cleanup_fun is not None:
            try:
                evaluator.cleanup_fun()
            except Exception as e:
                logger.error(
                    f"❌  Error during cleanup for {evaluator.name}: {e}. "
                    "Please check the evaluator's cleanup function."
                )

        # If user interrupted the evaluation, raise KeyboardInterrupt
        if isinstance(exception, KeyboardInterrupt):
            logger.critical("🛑  Execution was interrupted.")
            raise KeyboardInterrupt()

    self._cleanup_active_model()
    logger.info("✨  Evaluation tasks completed.")

generate

generate(
    show_progress: bool = True,
    force_rerun: bool = False,
    force_reload: bool = False,
    eval_kwargs: dict[str, Any] | None = None,
    eval_retry_kwargs: dict[str, Any] | None = None,
)

Runs the generation stage of the pipeline.

Parameters:

Name Type Description Default
show_progress bool

Whether to show a progress bar. Defaults to True.

True
force_rerun bool

Whether to force rerunning the experiments. Defaults to False.

False
force_reload bool

Whether to force reloading and reprocessing the datasets. Defaults to False.

False
eval_kwargs dict[str, Any]

Additional arguments to pass to the Inspect eval function. Defaults to empty dictionary when None.

None
eval_retry_kwargs dict[str, Any]

Additional arguments to pass to the Inspect eval function for retrying failed tasks. Defaults to empty dictionary when None.

None
Source code in evalsense/workflow/pipeline.py
def generate(
    self,
    show_progress: bool = True,
    force_rerun: bool = False,
    force_reload: bool = False,
    eval_kwargs: dict[str, Any] | None = None,
    eval_retry_kwargs: dict[str, Any] | None = None,
):
    """Runs the generation stage of the pipeline.

    Args:
        show_progress (bool, optional): Whether to show a progress bar.
            Defaults to True.
        force_rerun (bool, optional): Whether to force rerunning the experiments.
            Defaults to False.
        force_reload (bool, optional): Whether to force reloading and
            reprocessing the datasets. Defaults to False.
        eval_kwargs (dict[str, Any], optional): Additional arguments to pass
            to the Inspect eval function. Defaults to empty dictionary when
            None.
        eval_retry_kwargs (dict[str, Any], optional): Additional arguments
            to pass to the Inspect eval function for retrying failed tasks.
            Defaults to empty dictionary when None.
    """
    for experiment in tqdm(
        self.generation_experiments,
        disable=not show_progress,
        desc="Experiment Generation",
    ):
        logger.info(
            f"🔄  Starting generation for {experiment.generation_record.label}"
        )

        # Check if we we already have existing generations
        prev_record = self.project.get_record(
            experiment.generation_record,
        )
        if (
            prev_record is not None
            and prev_record.status == "success"
            and not force_rerun
        ):
            logger.info("⏭️  Generation skipped — already completed.")
            continue

        # Load the dataset
        logger.info(f"▶️  Loading dataset {experiment.dataset_manager.name}.")
        dataset_manager = experiment.dataset_manager
        hf_dataset = dataset_manager.load(
            retrieve=not force_reload,
            cache=True,
            force_retrieve=force_reload,
        )

        # Preprocess the dataset
        logger.info(
            "▶️  Preprocessing dataset with task preprocessor "
            f"{experiment.task_preprocessor.name}."
        )
        task_preprocessor = experiment.task_preprocessor
        inspect_dataset = task_preprocessor(
            hf_dataset,
            dataset_manager,
            field_spec=experiment.field_spec,
            force_reprocess=force_reload,
        )

        self._load_model(experiment.model_config)

        self._generate_on_dataset(
            experiment,
            inspect_dataset,
            force_rerun=force_rerun,
            eval_kwargs=eval_kwargs,
            eval_retry_kwargs=eval_retry_kwargs,
        )
    self._cleanup_active_model()
    logger.info("✨  Generation tasks completed.")

run

run(
    show_progress: bool = True,
    force_rerun: bool = False,
    force_reload: bool = False,
    eval_kwargs: dict[str, Any] | None = None,
    eval_retry_kwargs: dict[str, Any] | None = None,
    score_kwargs: dict[str, Any] | None = None,
)

Runs the pipeline.

Parameters:

Name Type Description Default
show_progress bool

Whether to show a progress bar. Defaults to True.

True
force_rerun bool

Whether to force rerun the experiments. Defaults to False.

False
force_reload bool

Whether to force reloading and reprocessing the datasets. Defaults to False.

False
eval_kwargs dict[str, Any]

Additional arguments to pass to the Inspect eval function. Defaults to empty dictionary when None.

None
eval_retry_kwargs dict[str, Any]

Additional arguments to pass to the Inspect eval function for retrying failed tasks. Defaults to empty dictionary when None.

None
score_kwargs dict[str, Any]

Additional arguments to pass to the Inspect score function. Defaults to empty dictionary when None.

None
Source code in evalsense/workflow/pipeline.py
def run(
    self,
    show_progress: bool = True,
    force_rerun: bool = False,
    force_reload: bool = False,
    eval_kwargs: dict[str, Any] | None = None,
    eval_retry_kwargs: dict[str, Any] | None = None,
    score_kwargs: dict[str, Any] | None = None,
):
    """Runs the pipeline.

    Args:
        show_progress (bool, optional): Whether to show a progress bar.
            Defaults to True.
        force_rerun (bool, optional): Whether to force rerun the experiments.
            Defaults to False.
        force_reload (bool, optional): Whether to force reloading and
            reprocessing the datasets. Defaults to False.
        eval_kwargs (dict[str, Any], optional): Additional arguments to pass
            to the Inspect eval function. Defaults to empty dictionary when
            None.
        eval_retry_kwargs (dict[str, Any], optional): Additional arguments
            to pass to the Inspect eval function for retrying failed tasks.
            Defaults to empty dictionary when None.
        score_kwargs (dict[str, Any], optional): Additional arguments to pass
            to the Inspect score function. Defaults to empty dictionary when
            None.
    """
    self.generate(
        show_progress=show_progress,
        force_rerun=force_rerun,
        force_reload=force_reload,
        eval_kwargs=eval_kwargs,
        eval_retry_kwargs=eval_retry_kwargs,
    )
    self.evaluate(
        show_progress=show_progress,
        force_rerun=force_rerun,
        score_kwargs=score_kwargs,
    )

Project

An EvalSense project, tracking the performed experiments and their results.

Methods:

Name Description
__init__

Initializes a project.

cleanup_incomplete_logs

Removes all incomplete logs in the project directory.

get_incomplete_logs

Returns a list of incomplete logs in the project directory.

get_log

Returns the evaluation log for the given record key.

get_logs

Returns a dictionary of logs for the given type and status. The dictionary

get_record

Returns the generation or evaluation record for the given key.

remove

Removes the project from disk.

remove_record

Removes the generation or evaluation record.

update_record

Updates the generation or evaluation record with the specified result.

Attributes:

Name Type Description
evaluation_log_path Path

Returns the path to the evaluation log directory.

generation_log_path Path

Returns the path to the generation log directory.

project_path Path

Returns the path to the project directory.

Source code in evalsense/workflow/project.py
class Project:
    """An EvalSense project, tracking the performed experiments and their results."""

    METADATA_FILE = "metadata.json"

    def __init__(
        self,
        name: str,
        load_existing: bool = True,
        reset_project: bool = False,
    ) -> None:
        """Initializes a project.

        Args:
            name (str): The name of the project.
            load_existing (bool): Whether to load an existing project if it exists.
                Defaults to True.
            reset_project (bool): Whether to reset the project if it exists. Defaults
                to False. If True, the existing project will be deleted and a new one
                will be created.
        """
        PROJECTS_PATH.mkdir(parents=True, exist_ok=True)
        self.name = name

        if reset_project:
            self.remove()

        project_exists = self.project_path.exists()
        if project_exists and not load_existing:
            raise ValueError(
                f"Project with name {name} already exists. "
                "Either choose a different name or set load_existing=True."
            )
        elif project_exists:
            self._load_existing_project()
        else:
            self.records = ProjectRecords()
            self._save()

    @property
    def project_path(self) -> Path:
        """Returns the path to the project directory."""
        return PROJECTS_PATH / to_safe_filename(self.name)

    @property
    def generation_log_path(self) -> Path:
        """Returns the path to the generation log directory."""
        return self.project_path / "generation_logs"

    @property
    def evaluation_log_path(self) -> Path:
        """Returns the path to the evaluation log directory."""
        return self.project_path / "evaluation_logs"

    def _load_existing_project(self) -> None:
        """Loads an existing project from disk."""
        metadata_file = self.project_path / self.METADATA_FILE
        if not metadata_file.exists():
            raise ValueError(f"Attempting to load a non-existent project {self.name}.")

        with open(metadata_file, "r", encoding="utf-8") as f:
            self.records = ProjectRecords.model_validate_json(f.read())
        self.cleanup_incomplete_logs()

    def _save(self) -> None:
        """Saves the project metadata to disk."""
        self.project_path.mkdir(parents=True, exist_ok=True)
        metadata_file = self.project_path / self.METADATA_FILE
        with open(metadata_file, "w", encoding="utf-8") as f:
            f.write(self.records.model_dump_json(indent=4))

    def remove(self) -> None:
        """Removes the project from disk."""
        if self.project_path.exists():
            shutil.rmtree(self.project_path)

    def _remove_log_file(
        self,
        record: ResultRecord | None,
    ):
        """Removes the log file associated with the record, if it exists.

        Args:
            record (ResultRecord | None): The record associated with the log file.
        """
        if record is not None and record.log_location is not None:
            log_path = Path(record.log_location)
            if log_path.exists():
                log_path.unlink()

    def update_record(
        self,
        record_key: GenerationRecord | EvaluationRecord,
        record_value: ResultRecord,
        *,
        init_eval_record_from_generations: bool = False,
    ):
        """Updates the generation or evaluation record with the specified result.

        Args:
            record_key (GenerationRecord | EvaluationRecord): The generation
                or evaluation record to update.
            record_value (ResultRecord): The generation or evaluation result.
            init_eval_record_from_generations (bool): Whether to initialise a new
                evaluation record if the evaluation record does not exist. Defaults
                to False. This is only applicable if the record_key is an
                EvaluationRecord.
        """
        current_record = self.get_record(
            record_key,
            init_eval_record_from_generations=init_eval_record_from_generations,
        )
        if (
            current_record is not None
            and current_record.log_location is not None
            and current_record.log_location != record_value.log_location
        ):
            self._remove_log_file(current_record)

        if type(record_key) is GenerationRecord:
            self.records.generation[record_key] = record_value
        elif type(record_key) is EvaluationRecord:
            self.records.evaluation[record_key] = record_value
        else:
            raise TypeError(f"Invalid record type: {type(record_key)}")
        self._save()

    def remove_record(
        self,
        record_key: GenerationRecord | EvaluationRecord,
    ):
        """Removes the generation or evaluation record.

        Args:
            record_key (GenerationRecord | EvaluationRecord): The generation
                or evaluation record to remove.
        """
        if type(record_key) is GenerationRecord:
            record = self.records.generation.pop(record_key, None)
        elif type(record_key) is EvaluationRecord:
            record = self.records.evaluation.pop(record_key, None)
        else:
            raise TypeError(f"Invalid record type: {type(record_key)}")

        self._remove_log_file(record)
        self._save()

    def _retrieve_verify_record(self, record_key: GenerationRecord | EvaluationRecord):
        """Retrieves and verifies the generation or evaluation record.

        Args:
            record_key (GenerationRecord | EvaluationRecord): The generation
                or evaluation record to retrieve.

        Returns:
            ResultRecord | None: The generation or evaluation result, or None if
                a valid record does not exist.
        """
        if type(record_key) is GenerationRecord:
            retrieved_record = self.records.generation.get(record_key, None)
        elif type(record_key) is EvaluationRecord:
            retrieved_record = self.records.evaluation.get(record_key, None)
        else:
            raise TypeError(f"Invalid record type: {type(record_key)}")

        if retrieved_record is not None and retrieved_record.log_location is not None:
            log_path = Path(retrieved_record.log_location)
            if not log_path.exists():
                # Stale record, remove it
                logger.warning(
                    f"⚠️  Log file {log_path} does not exist. Removing stale record."
                )
                self.remove_record(record_key)
                retrieved_record = None
        return retrieved_record

    def get_record(
        self,
        record_key: GenerationRecord | EvaluationRecord,
        *,
        init_eval_record_from_generations: bool = False,
    ) -> ResultRecord | None:
        """Returns the generation or evaluation record for the given key.

        Note: Calling this method may initialise a new evaluation record from
        the matching generation record if the evaluation record does not exist
        yet and `init_eval_record_from_generations` is set to True.

        Args:
            record_key (GenerationRecord | EvaluationRecord): The generation
                or evaluation record to retrieve.
            init_eval_record_from_generations (bool): Whether to initialise a new
                evaluation record if the evaluation record does not exist.
                Defaults to False. This is only applicable if the record_key is
                an EvaluationRecord.

        Returns:
            ResultRecord | None: The generation or evaluation result, or None if
                a valid record does not exist.
        """
        if type(record_key) is GenerationRecord:
            return self._retrieve_verify_record(record_key)
        elif type(record_key) is EvaluationRecord:
            retrieved_eval_record = self._retrieve_verify_record(record_key)
            if (
                retrieved_eval_record is not None
                or not init_eval_record_from_generations
            ):
                return retrieved_eval_record

            generation_result = self._retrieve_verify_record(
                record_key.generation_record
            )
            if generation_result is None:
                return None
            if (
                generation_result.status != "success"
                or generation_result.log_location is None
            ):
                self.records.evaluation[record_key] = generation_result
                self._save()
                return generation_result

            # Create a new evaluation log based on the generation log
            log_path = Path(generation_result.log_location)
            evaluator_name = record_key.evaluator_name
            log_time, core_name, random_id = log_path.stem.split("_", 2)
            new_log_path = self.evaluation_log_path / (
                f"{log_time}_{core_name}-{to_safe_filename(evaluator_name)}_"
                + f"{random_id}{log_path.suffix}"
            )
            new_log_path.parent.mkdir(parents=True, exist_ok=True)
            if not new_log_path.exists():
                shutil.copy(log_path, new_log_path)
            new_record = ResultRecord(
                log_location=str(new_log_path),
            )
            self.records.evaluation[record_key] = new_record
            self._save()
            return new_record
        else:
            raise TypeError(f"Invalid record type: {type(record_key)}")

    def get_log(
        self,
        record_key: GenerationRecord | EvaluationRecord,
        *,
        init_eval_record_from_generations: bool = False,
    ) -> EvalLog | None:
        """Returns the evaluation log for the given record key.

        Args:
            record_key (GenerationRecord | EvaluationRecord): The generation
                or evaluation record to retrieve.
            init_eval_record_from_generations (bool): Whether to initialise a new
                evaluation record if the evaluation record does not exist. Defaults
                to False. This is only applicable if the record_key is an
                EvaluationRecord.

        Returns:
            EvalLog | None: The evaluation log, or None if a valid log does not
                exist.
        """
        record = self.get_record(
            record_key,
            init_eval_record_from_generations=init_eval_record_from_generations,
        )
        if record is not None and record.log_location is not None:
            log_path = Path(record.log_location)
            if log_path.exists():
                return read_eval_log(str(log_path))

    @overload
    def get_logs(
        self,
        type: Literal["generation"],
        status: RecordStatus | None = None,
    ) -> dict[GenerationRecord, EvalLog]: ...
    @overload
    def get_logs(
        self,
        type: Literal["evaluation"],
        status: RecordStatus | None = None,
    ) -> dict[EvaluationRecord, EvalLog]: ...
    def get_logs(
        self,
        type: Literal["generation", "evaluation"],
        status: RecordStatus | None = None,
    ) -> dict[GenerationRecord, EvalLog] | dict[EvaluationRecord, EvalLog]:
        """Returns a dictionary of logs for the given type and status. The dictionary
        is automatically sorted by the corresponding record keys.

        Args:
            type (Literal["generation", "evaluation"]): The type of logs to retrieve.
            status (RecordStatus | None): The status of the logs to retrieve.
                Defaults to None.

        Returns:
            dict[GenerationRecord | EvaluationRecord, EvalLog]: A dictionary of logs.
        """
        if type == "generation":
            records = self.records.generation
        elif type == "evaluation":
            records = self.records.evaluation
        else:
            raise ValueError(f"Invalid log type: {type}")

        results = {}
        for key, value in records.items():
            if status is not None and value.status != status:
                continue
            if value.log_location is not None:
                log_path = Path(value.log_location)
                if log_path.exists():
                    eval_log = read_eval_log(str(log_path))
                    if eval_log is not None:
                        results[key] = eval_log

        return dict(sorted(results.items()))

    def get_incomplete_logs(
        self,
        type: Literal["generation", "evaluation"],
    ) -> list[EvalLog]:
        """Returns a list of incomplete logs in the project directory.

        Args:
            type (Literal["generation", "evaluation"]): The type of logs to retrieve.

        Returns:
            list[EvalLog]: A list of incomplete logs.
        """
        if type == "generation":
            log_path = self.generation_log_path
            known_logs = [
                v.log_location
                for v in self.records.generation.values()
                if v.log_location
            ]
        elif type == "evaluation":
            log_path = self.evaluation_log_path
            known_logs = [
                v.log_location
                for v in self.records.evaluation.values()
                if v.log_location
            ]
        else:
            raise ValueError(f"Invalid log type: {type}")

        incomplete_logs = []
        extensions = [".json", ".eval"]
        for ext in extensions:
            for log_file in log_path.glob(f"*{ext}"):
                if str(log_file) not in known_logs:
                    loaded_log = read_eval_log(str(log_file))
                    incomplete_logs.append(loaded_log)
        return incomplete_logs

    def cleanup_incomplete_logs(self):
        """Removes all incomplete logs in the project directory."""
        incomplete_logs = self.get_incomplete_logs(
            "generation"
        ) + self.get_incomplete_logs("evaluation")
        for log in incomplete_logs:
            log_path = Path(log.location)
            if log_path.exists():
                log_path.unlink()

evaluation_log_path property

evaluation_log_path: Path

Returns the path to the evaluation log directory.

generation_log_path property

generation_log_path: Path

Returns the path to the generation log directory.

project_path property

project_path: Path

Returns the path to the project directory.

__init__

__init__(
    name: str,
    load_existing: bool = True,
    reset_project: bool = False,
) -> None

Initializes a project.

Parameters:

Name Type Description Default
name str

The name of the project.

required
load_existing bool

Whether to load an existing project if it exists. Defaults to True.

True
reset_project bool

Whether to reset the project if it exists. Defaults to False. If True, the existing project will be deleted and a new one will be created.

False
Source code in evalsense/workflow/project.py
def __init__(
    self,
    name: str,
    load_existing: bool = True,
    reset_project: bool = False,
) -> None:
    """Initializes a project.

    Args:
        name (str): The name of the project.
        load_existing (bool): Whether to load an existing project if it exists.
            Defaults to True.
        reset_project (bool): Whether to reset the project if it exists. Defaults
            to False. If True, the existing project will be deleted and a new one
            will be created.
    """
    PROJECTS_PATH.mkdir(parents=True, exist_ok=True)
    self.name = name

    if reset_project:
        self.remove()

    project_exists = self.project_path.exists()
    if project_exists and not load_existing:
        raise ValueError(
            f"Project with name {name} already exists. "
            "Either choose a different name or set load_existing=True."
        )
    elif project_exists:
        self._load_existing_project()
    else:
        self.records = ProjectRecords()
        self._save()

cleanup_incomplete_logs

cleanup_incomplete_logs()

Removes all incomplete logs in the project directory.

Source code in evalsense/workflow/project.py
def cleanup_incomplete_logs(self):
    """Removes all incomplete logs in the project directory."""
    incomplete_logs = self.get_incomplete_logs(
        "generation"
    ) + self.get_incomplete_logs("evaluation")
    for log in incomplete_logs:
        log_path = Path(log.location)
        if log_path.exists():
            log_path.unlink()

get_incomplete_logs

get_incomplete_logs(
    type: Literal["generation", "evaluation"],
) -> list[EvalLog]

Returns a list of incomplete logs in the project directory.

Parameters:

Name Type Description Default
type Literal['generation', 'evaluation']

The type of logs to retrieve.

required

Returns:

Type Description
list[EvalLog]

list[EvalLog]: A list of incomplete logs.

Source code in evalsense/workflow/project.py
def get_incomplete_logs(
    self,
    type: Literal["generation", "evaluation"],
) -> list[EvalLog]:
    """Returns a list of incomplete logs in the project directory.

    Args:
        type (Literal["generation", "evaluation"]): The type of logs to retrieve.

    Returns:
        list[EvalLog]: A list of incomplete logs.
    """
    if type == "generation":
        log_path = self.generation_log_path
        known_logs = [
            v.log_location
            for v in self.records.generation.values()
            if v.log_location
        ]
    elif type == "evaluation":
        log_path = self.evaluation_log_path
        known_logs = [
            v.log_location
            for v in self.records.evaluation.values()
            if v.log_location
        ]
    else:
        raise ValueError(f"Invalid log type: {type}")

    incomplete_logs = []
    extensions = [".json", ".eval"]
    for ext in extensions:
        for log_file in log_path.glob(f"*{ext}"):
            if str(log_file) not in known_logs:
                loaded_log = read_eval_log(str(log_file))
                incomplete_logs.append(loaded_log)
    return incomplete_logs

get_log

get_log(
    record_key: GenerationRecord | EvaluationRecord,
    *,
    init_eval_record_from_generations: bool = False,
) -> EvalLog | None

Returns the evaluation log for the given record key.

Parameters:

Name Type Description Default
record_key GenerationRecord | EvaluationRecord

The generation or evaluation record to retrieve.

required
init_eval_record_from_generations bool

Whether to initialise a new evaluation record if the evaluation record does not exist. Defaults to False. This is only applicable if the record_key is an EvaluationRecord.

False

Returns:

Type Description
EvalLog | None

EvalLog | None: The evaluation log, or None if a valid log does not exist.

Source code in evalsense/workflow/project.py
def get_log(
    self,
    record_key: GenerationRecord | EvaluationRecord,
    *,
    init_eval_record_from_generations: bool = False,
) -> EvalLog | None:
    """Returns the evaluation log for the given record key.

    Args:
        record_key (GenerationRecord | EvaluationRecord): The generation
            or evaluation record to retrieve.
        init_eval_record_from_generations (bool): Whether to initialise a new
            evaluation record if the evaluation record does not exist. Defaults
            to False. This is only applicable if the record_key is an
            EvaluationRecord.

    Returns:
        EvalLog | None: The evaluation log, or None if a valid log does not
            exist.
    """
    record = self.get_record(
        record_key,
        init_eval_record_from_generations=init_eval_record_from_generations,
    )
    if record is not None and record.log_location is not None:
        log_path = Path(record.log_location)
        if log_path.exists():
            return read_eval_log(str(log_path))

get_logs

get_logs(
    type: Literal["generation"],
    status: RecordStatus | None = None,
) -> dict[GenerationRecord, EvalLog]
get_logs(
    type: Literal["evaluation"],
    status: RecordStatus | None = None,
) -> dict[EvaluationRecord, EvalLog]
get_logs(
    type: Literal["generation", "evaluation"],
    status: RecordStatus | None = None,
) -> (
    dict[GenerationRecord, EvalLog]
    | dict[EvaluationRecord, EvalLog]
)

Returns a dictionary of logs for the given type and status. The dictionary is automatically sorted by the corresponding record keys.

Parameters:

Name Type Description Default
type Literal['generation', 'evaluation']

The type of logs to retrieve.

required
status RecordStatus | None

The status of the logs to retrieve. Defaults to None.

None

Returns:

Type Description
dict[GenerationRecord, EvalLog] | dict[EvaluationRecord, EvalLog]

dict[GenerationRecord | EvaluationRecord, EvalLog]: A dictionary of logs.

Source code in evalsense/workflow/project.py
def get_logs(
    self,
    type: Literal["generation", "evaluation"],
    status: RecordStatus | None = None,
) -> dict[GenerationRecord, EvalLog] | dict[EvaluationRecord, EvalLog]:
    """Returns a dictionary of logs for the given type and status. The dictionary
    is automatically sorted by the corresponding record keys.

    Args:
        type (Literal["generation", "evaluation"]): The type of logs to retrieve.
        status (RecordStatus | None): The status of the logs to retrieve.
            Defaults to None.

    Returns:
        dict[GenerationRecord | EvaluationRecord, EvalLog]: A dictionary of logs.
    """
    if type == "generation":
        records = self.records.generation
    elif type == "evaluation":
        records = self.records.evaluation
    else:
        raise ValueError(f"Invalid log type: {type}")

    results = {}
    for key, value in records.items():
        if status is not None and value.status != status:
            continue
        if value.log_location is not None:
            log_path = Path(value.log_location)
            if log_path.exists():
                eval_log = read_eval_log(str(log_path))
                if eval_log is not None:
                    results[key] = eval_log

    return dict(sorted(results.items()))

get_record

get_record(
    record_key: GenerationRecord | EvaluationRecord,
    *,
    init_eval_record_from_generations: bool = False,
) -> ResultRecord | None

Returns the generation or evaluation record for the given key.

Note: Calling this method may initialise a new evaluation record from the matching generation record if the evaluation record does not exist yet and init_eval_record_from_generations is set to True.

Parameters:

Name Type Description Default
record_key GenerationRecord | EvaluationRecord

The generation or evaluation record to retrieve.

required
init_eval_record_from_generations bool

Whether to initialise a new evaluation record if the evaluation record does not exist. Defaults to False. This is only applicable if the record_key is an EvaluationRecord.

False

Returns:

Type Description
ResultRecord | None

ResultRecord | None: The generation or evaluation result, or None if a valid record does not exist.

Source code in evalsense/workflow/project.py
def get_record(
    self,
    record_key: GenerationRecord | EvaluationRecord,
    *,
    init_eval_record_from_generations: bool = False,
) -> ResultRecord | None:
    """Returns the generation or evaluation record for the given key.

    Note: Calling this method may initialise a new evaluation record from
    the matching generation record if the evaluation record does not exist
    yet and `init_eval_record_from_generations` is set to True.

    Args:
        record_key (GenerationRecord | EvaluationRecord): The generation
            or evaluation record to retrieve.
        init_eval_record_from_generations (bool): Whether to initialise a new
            evaluation record if the evaluation record does not exist.
            Defaults to False. This is only applicable if the record_key is
            an EvaluationRecord.

    Returns:
        ResultRecord | None: The generation or evaluation result, or None if
            a valid record does not exist.
    """
    if type(record_key) is GenerationRecord:
        return self._retrieve_verify_record(record_key)
    elif type(record_key) is EvaluationRecord:
        retrieved_eval_record = self._retrieve_verify_record(record_key)
        if (
            retrieved_eval_record is not None
            or not init_eval_record_from_generations
        ):
            return retrieved_eval_record

        generation_result = self._retrieve_verify_record(
            record_key.generation_record
        )
        if generation_result is None:
            return None
        if (
            generation_result.status != "success"
            or generation_result.log_location is None
        ):
            self.records.evaluation[record_key] = generation_result
            self._save()
            return generation_result

        # Create a new evaluation log based on the generation log
        log_path = Path(generation_result.log_location)
        evaluator_name = record_key.evaluator_name
        log_time, core_name, random_id = log_path.stem.split("_", 2)
        new_log_path = self.evaluation_log_path / (
            f"{log_time}_{core_name}-{to_safe_filename(evaluator_name)}_"
            + f"{random_id}{log_path.suffix}"
        )
        new_log_path.parent.mkdir(parents=True, exist_ok=True)
        if not new_log_path.exists():
            shutil.copy(log_path, new_log_path)
        new_record = ResultRecord(
            log_location=str(new_log_path),
        )
        self.records.evaluation[record_key] = new_record
        self._save()
        return new_record
    else:
        raise TypeError(f"Invalid record type: {type(record_key)}")

remove

remove() -> None

Removes the project from disk.

Source code in evalsense/workflow/project.py
def remove(self) -> None:
    """Removes the project from disk."""
    if self.project_path.exists():
        shutil.rmtree(self.project_path)

remove_record

remove_record(
    record_key: GenerationRecord | EvaluationRecord,
)

Removes the generation or evaluation record.

Parameters:

Name Type Description Default
record_key GenerationRecord | EvaluationRecord

The generation or evaluation record to remove.

required
Source code in evalsense/workflow/project.py
def remove_record(
    self,
    record_key: GenerationRecord | EvaluationRecord,
):
    """Removes the generation or evaluation record.

    Args:
        record_key (GenerationRecord | EvaluationRecord): The generation
            or evaluation record to remove.
    """
    if type(record_key) is GenerationRecord:
        record = self.records.generation.pop(record_key, None)
    elif type(record_key) is EvaluationRecord:
        record = self.records.evaluation.pop(record_key, None)
    else:
        raise TypeError(f"Invalid record type: {type(record_key)}")

    self._remove_log_file(record)
    self._save()

update_record

update_record(
    record_key: GenerationRecord | EvaluationRecord,
    record_value: ResultRecord,
    *,
    init_eval_record_from_generations: bool = False,
)

Updates the generation or evaluation record with the specified result.

Parameters:

Name Type Description Default
record_key GenerationRecord | EvaluationRecord

The generation or evaluation record to update.

required
record_value ResultRecord

The generation or evaluation result.

required
init_eval_record_from_generations bool

Whether to initialise a new evaluation record if the evaluation record does not exist. Defaults to False. This is only applicable if the record_key is an EvaluationRecord.

False
Source code in evalsense/workflow/project.py
def update_record(
    self,
    record_key: GenerationRecord | EvaluationRecord,
    record_value: ResultRecord,
    *,
    init_eval_record_from_generations: bool = False,
):
    """Updates the generation or evaluation record with the specified result.

    Args:
        record_key (GenerationRecord | EvaluationRecord): The generation
            or evaluation record to update.
        record_value (ResultRecord): The generation or evaluation result.
        init_eval_record_from_generations (bool): Whether to initialise a new
            evaluation record if the evaluation record does not exist. Defaults
            to False. This is only applicable if the record_key is an
            EvaluationRecord.
    """
    current_record = self.get_record(
        record_key,
        init_eval_record_from_generations=init_eval_record_from_generations,
    )
    if (
        current_record is not None
        and current_record.log_location is not None
        and current_record.log_location != record_value.log_location
    ):
        self._remove_log_file(current_record)

    if type(record_key) is GenerationRecord:
        self.records.generation[record_key] = record_value
    elif type(record_key) is EvaluationRecord:
        self.records.evaluation[record_key] = record_value
    else:
        raise TypeError(f"Invalid record type: {type(record_key)}")
    self._save()

ResultAnalyser

Bases: Protocol

A protocol for analysing or aggregating evaluation results.

This class is generic in T to enable returning different types of results.

Methods:

Name Description
__call__

Analyses the evaluation results.

__init__

Initializes the result analyser.

Source code in evalsense/workflow/result_analyser.py
class ResultAnalyser[T](Protocol):
    """A protocol for analysing or aggregating evaluation results.

    This class is generic in T to enable returning different types of results.
    """

    name: str

    def __init__(self, name: str) -> None:
        """Initializes the result analyser.

        Args:
            name (str): The name of the result analyser.
        """
        self.name = name

    @abstractmethod
    def __call__(self, project: Project, **kwargs: dict) -> T:
        """Analyses the evaluation results.

        Args:
            project (Project): The project holding the evaluation data to analyse.
            **kwargs (dict): Additional arguments for the analysis.
        """
        ...

__call__ abstractmethod

__call__(project: Project, **kwargs: dict) -> T

Analyses the evaluation results.

Parameters:

Name Type Description Default
project Project

The project holding the evaluation data to analyse.

required
**kwargs dict

Additional arguments for the analysis.

{}
Source code in evalsense/workflow/result_analyser.py
@abstractmethod
def __call__(self, project: Project, **kwargs: dict) -> T:
    """Analyses the evaluation results.

    Args:
        project (Project): The project holding the evaluation data to analyse.
        **kwargs (dict): Additional arguments for the analysis.
    """
    ...

__init__

__init__(name: str) -> None

Initializes the result analyser.

Parameters:

Name Type Description Default
name str

The name of the result analyser.

required
Source code in evalsense/workflow/result_analyser.py
def __init__(self, name: str) -> None:
    """Initializes the result analyser.

    Args:
        name (str): The name of the result analyser.
    """
    self.name = name