Skip to content
Pasqal Documentation

SDK

SDK(username=None, password=None, token_provider=None, endpoints=None, auth0=None, webhook=None, project_id=None)

This class provides helper methods to call the Pasqal Cloud endpoints.

To authenticate to Pasqal Cloud, you have to provide either an email/password combination or a TokenProvider instance. You may omit the password, you will then be prompted to enter one.

The SDK can be initialized with several authentication options
  • Option 1: No arguments -> Allows unauthenticated access to public features.
  • Option 2: username and password -> Authenticated access using a username and password.
  • Option 3: username only -> Prompts for password during initialization.
  • Option 4 (for developers): Provide a custom token_provider for token-based authentication.
PARAMETER DESCRIPTION
username

Email of the user to login as.

TYPE: Optional[str] DEFAULT: None

password

Password of the user to login as.

TYPE: Optional[str] DEFAULT: None

token_provider

The token provider is an alternative log-in method not for human users.

TYPE: Optional[TokenProvider] DEFAULT: None

webhook

Webhook where the job results are automatically sent to.

TYPE: Optional[str] DEFAULT: None

endpoints

Endpoints targeted of the public apis.

TYPE: Optional[Endpoints] DEFAULT: None

auth0

Auth0Config object to define the auth0 tenant to target.

TYPE: Optional[Auth0Conf] DEFAULT: None

project_id

ID of the owner project of the batch.

TYPE: Optional[str] DEFAULT: None

Source code in pasqal_cloud/__init__.py
def __init__(
    self,
    username: Optional[str] = None,
    password: Optional[str] = None,
    token_provider: Optional[TokenProvider] = None,
    endpoints: Optional[Endpoints] = None,
    auth0: Optional[Auth0Conf] = None,
    webhook: Optional[str] = None,
    project_id: Optional[str] = None,
):
    """
    This class provides helper methods to call the Pasqal Cloud endpoints.

    To authenticate to Pasqal Cloud, you have to provide either an
    email/password combination or a TokenProvider instance.
    You may omit the password, you will then be prompted to enter one.


    The SDK can be initialized with several authentication options:
        - Option 1: No arguments -> Allows unauthenticated access to public
            features.
        - Option 2: `username` and `password` -> Authenticated access using a
            username and password.
        - Option 3: `username` only -> Prompts for password during initialization.
        - Option 4 (for developers): Provide a custom `token_provider` for
            token-based authentication.

    Args:
        username: Email of the user to login as.
        password: Password of the user to login as.
        token_provider: The token provider is an alternative log-in method \
          not for human users.
        webhook: Webhook where the job results are automatically sent to.
        endpoints: Endpoints targeted of the public apis.
        auth0: Auth0Config object to define the auth0 tenant to target.
        project_id: ID of the owner project of the batch.
    """
    _check_sdk_version()

    self._client = Client(
        project_id=project_id,
        username=username,
        password=password,
        token_provider=token_provider,
        endpoints=endpoints,
        auth0=auth0,
    )
    self.batches: Dict[str, Batch] = {}
    self.workloads: Dict[str, Workload] = {}
    self.webhook = webhook

add_jobs(batch_id, jobs)

Add jobs to an already existing batch.

PARAMETER DESCRIPTION
batch_id

A unique identifier for the batch data.

TYPE: str

jobs

a collection of CreateJob payloads

TYPE: List[CreateJob]

RETURNS DESCRIPTION
Batch

An instance of a Batch model from the PCS database

RAISES DESCRIPTION
JobCreationError

spawns from a HTTPError.

Source code in pasqal_cloud/__init__.py
def add_jobs(self, batch_id: str, jobs: List[CreateJob]) -> Batch:
    """
    Add jobs to an already existing batch.

    Args:
        batch_id: A unique identifier for the batch data.
        jobs: a collection of CreateJob payloads

    Returns:
        An instance of a Batch model from the PCS database

    Raises:
        JobCreationError: spawns from a HTTPError.
    """
    try:
        resp = self._client.add_jobs(batch_id, jobs)
    except HTTPError as e:
        raise JobCreationError(e)
    return Batch(**resp, _client=self._client)

cancel_batch(id)

Cancel the given batch on the PCS

PARAMETER DESCRIPTION
id

ID of the batch.

TYPE: str

Source code in pasqal_cloud/__init__.py
def cancel_batch(self, id: str) -> Batch:
    """Cancel the given batch on the PCS

    Args:
        id: ID of the batch.
    """
    try:
        batch_rsp = self._client.cancel_batch(id)
    except HTTPError as e:
        raise BatchCancellingError(e) from e
    return Batch(**batch_rsp, _client=self._client)

cancel_batches(batch_ids)

Cancel a group of batches by their ids.

PARAMETER DESCRIPTION
batch_ids

batch ids to cancel.

TYPE: List[str]

RETURNS DESCRIPTION
BatchCancellationResponse

TYPE: BatchCancellationResponse

BatchCancellationResponse

a class containing the batches that have been cancelled and the id of

BatchCancellationResponse

the batches that could not be cancelled with the reason explained

RAISES DESCRIPTION
BatchCancellingError

spawns from a HTTPError

Source code in pasqal_cloud/__init__.py
def cancel_batches(self, batch_ids: List[str]) -> BatchCancellationResponse:
    """
    Cancel a group of batches by their ids.

    Args:
        batch_ids: batch ids to cancel.

    Returns:
        BatchCancellationResponse:
        a class containing the batches that have been cancelled and the id of
        the batches that could not be cancelled with the reason explained

    Raises:
        BatchCancellingError: spawns from a HTTPError

    """

    try:
        response = self._client.cancel_batches(
            batch_ids=batch_ids,
        )
    except HTTPError as e:
        raise BatchCancellingError(e) from e
    return BatchCancellationResponse(
        batches=[
            Batch(**batch, _client=self._client) for batch in response["batches"]
        ],
        errors=response["errors"],
    )

cancel_job(id)

Cancel the given job on the PCS

PARAMETER DESCRIPTION
id

ID of the job.

TYPE: str

RETURNS DESCRIPTION
Job

The job stored in the PCS database.

TYPE: Job

Source code in pasqal_cloud/__init__.py
def cancel_job(self, id: str) -> Job:
    """Cancel the given job on the PCS

    Args:
        id: ID of the job.

    Returns:
        Job: The job stored in the PCS database.
    """
    try:
        job_rsp = self._client.cancel_job(id)
    except HTTPError as e:
        raise JobCancellingError(e) from e

    return Job(**job_rsp, _client=self._client)

cancel_jobs(batch_id, filters=None)

Cancel a group of jobs matching the filters in a selected batch.

PARAMETER DESCRIPTION
batch_id

id of the batch

TYPE: Union[UUID, str]

filters

filters to be applied to find the jobs that will be cancelled

TYPE: Optional[CancelJobFilters] DEFAULT: None

RETURNS DESCRIPTION
JobCancellationResponse

TYPE: JobCancellationResponse

JobCancellationResponse

a class containing the jobs that have been cancelled and the id of the jobs

JobCancellationResponse

that could not be cancelled with the reason explained

RAISES DESCRIPTION
JobCancellingError

spawns from a HTTPError

Source code in pasqal_cloud/__init__.py
def cancel_jobs(
    self,
    batch_id: Union[UUID, str],
    filters: Optional[CancelJobFilters] = None,
) -> JobCancellationResponse:
    """
    Cancel a group of jobs matching the filters in a selected batch.

    Args:
        batch_id: id of the batch
        filters: filters to be applied to find the jobs that will be cancelled

    Returns:
        JobCancellationResponse:
        a class containing the jobs that have been cancelled and the id of the jobs
        that could not be cancelled with the reason explained

    Raises:
        JobCancellingError: spawns from a HTTPError

    """
    if filters is None:
        filters = CancelJobFilters()
    elif not isinstance(filters, CancelJobFilters):
        raise TypeError(
            "Filters needs to be a CancelJobFilters instance, "
            f"not a {type(filters)}"
        )

    try:
        response = self._client.cancel_jobs(
            batch_id=batch_id,
            filters=filters,
        )
    except HTTPError as e:
        raise JobCancellingError(e) from e
    return JobCancellationResponse(
        jobs=[Job(**job, _client=self._client) for job in response["jobs"]],
        errors=response["errors"],
    )

cancel_workload(id)

Cancel the given workload on the PCS.

PARAMETER DESCRIPTION
id

Workload id.

TYPE: str

RETURNS DESCRIPTION
workload

The canceled workload.

TYPE: Workload

RAISES DESCRIPTION
WorkloadCancelingError

If cancelation failed.

Source code in pasqal_cloud/__init__.py
def cancel_workload(self, id: str) -> Workload:
    """Cancel the given workload on the PCS.

    Args:
        id: Workload id.

    Returns:
        workload: The canceled workload.

    Raises:
        WorkloadCancelingError: If cancelation failed.
    """
    try:
        workload_rsp = self._client.cancel_workload(id)
    except HTTPError as e:
        raise WorkloadCancellingError(e) from e
    return Workload(**workload_rsp, _client=self._client)

close_batch(batch_id)

Set a batch 'open' field as False, indicating no more Jobs can be submitted.

PARAMETER DESCRIPTION
batch_id

A unique identifier for the batch data.

TYPE: str

RETURNS DESCRIPTION
Batch

An instance of a Batch model from the PCS database

RAISES DESCRIPTION
BatchClosingError

spawns from a HTTPError

Source code in pasqal_cloud/__init__.py
def close_batch(self, batch_id: str) -> Batch:
    """
    Set a batch 'open' field as False, indicating no more Jobs
    can be submitted.

    Args:
        batch_id: A unique identifier for the batch data.

    Returns:
        An instance of a Batch model from the PCS database

    Raises:
        BatchClosingError: spawns from a HTTPError
    """
    try:
        resp = self._client.close_batch(batch_id)
    except HTTPError as e:
        raise BatchClosingError(e)

    return Batch(**resp, _client=self._client)

complete_batch(batch_id)

Deprecated, use close_batch instead.

Source code in pasqal_cloud/__init__.py
def complete_batch(self, batch_id: str) -> Batch:
    """
    Deprecated, use close_batch instead.
    """
    warn(
        "This method is deprecated, use close_batch instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.close_batch(batch_id)

create_batch(serialized_sequence, jobs, complete=None, open=None, emulator=None, configuration=None, wait=False, fetch_results=False)

Create a new batch and send it to the API.

PARAMETER DESCRIPTION
serialized_sequence

Serialized pulser sequence.

TYPE: str

complete

Opposite of open, deprecated.

TYPE: Optional[bool] DEFAULT: None

jobs

List of jobs to be added to the batch at creation.

TYPE: List[CreateJob]

open

If all jobs are sent at creation. If set to True, jobs can be added using the Batch.add_jobs method. Once all the jobs are sent, use the Batch.close method. Otherwise, the batch will be timed out if all jobs have already been terminated and no new jobs are sent.

TYPE: Optional[bool] DEFAULT: None

emulator

The type of emulator to use, If set to None, the device_type will be set to the one stored in the serialized sequence

TYPE: Optional[EmulatorType] DEFAULT: None

configuration

A dictionary with extra configuration for the emulators that accept it.

TYPE: Optional[BaseConfig] DEFAULT: None

wait

Whether to block on this statement until all the submitted jobs are terminated

TYPE: bool DEFAULT: False

fetch_results

Whether to wait for the batch to be done and fetch results

TYPE: deprecated DEFAULT: False

RETURNS DESCRIPTION
Batch

The new batch that has been created in the database.

TYPE: Batch

RAISES DESCRIPTION
BatchCreationError

If batch creation failed

BatchFetchingError

If batch fetching failed

Source code in pasqal_cloud/__init__.py
def create_batch(
    self,
    serialized_sequence: str,
    jobs: List[CreateJob],
    complete: Optional[bool] = None,
    open: Optional[bool] = None,
    emulator: Optional[EmulatorType] = None,
    configuration: Optional[BaseConfig] = None,
    wait: bool = False,
    fetch_results: bool = False,
) -> Batch:
    """Create a new batch and send it to the API.

    Args:
        serialized_sequence: Serialized pulser sequence.
        complete: Opposite of open, deprecated.
        jobs: List of jobs to be added to the batch at creation.
        open: If all jobs are sent at creation.
            If set to True, jobs can be added using the `Batch.add_jobs` method.
            Once all the jobs are sent, use the `Batch.close` method.
            Otherwise, the batch will be timed out if all jobs have already
            been terminated and no new jobs are sent.
        emulator: The type of emulator to use,
            If set to None, the device_type will be set to the one
            stored in the serialized sequence
        configuration: A dictionary with extra configuration for the emulators
            that accept it.
        wait: Whether to block on this statement until all the submitted jobs are
            terminated
        fetch_results (deprecated): Whether to wait for the batch to
            be done and fetch results


    Returns:
        Batch: The new batch that has been created in the database.

    Raises:
        BatchCreationError: If batch creation failed
        BatchFetchingError: If batch fetching failed
    """
    if complete is not None and open is not None:
        raise OnlyCompleteOrOpenCanBeSet
    if complete is not None:
        warn(
            "Argument `complete` is deprecated and will be removed in a"
            " future version. Please use argument `open` instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        open = not complete
    elif complete is None and open is None:
        open = False
    if fetch_results:
        warn(
            "Argument `fetch_results` is deprecated and will be removed in a"
            " future version. Please use argument `wait` instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        wait = wait or fetch_results

    req = {
        "sequence_builder": serialized_sequence,
        "webhook": self.webhook,
        "jobs": jobs,
        "open": open,
    }

    # the emulator field is only added in the case
    # an emulator job is requested otherwise it's left empty
    if emulator:
        req.update({"emulator": emulator})

    # The configuration field is only added in the case
    # it's requested
    if configuration:
        req.update({"configuration": configuration.to_dict()})  # type: ignore[dict-item]

    try:
        batch_rsp = self._client.send_batch(req)
    except HTTPError as e:
        raise BatchCreationError(e) from e

    batch = Batch(**batch_rsp, _client=self._client)

    if wait:
        while any(
            job.status in {"PENDING", "RUNNING"} for job in batch.ordered_jobs
        ):
            time.sleep(RESULT_POLLING_INTERVAL)
            batch.refresh()

    self.batches[batch.id] = batch
    return batch

create_workload(workload_type, backend, config, wait=False)

Create a workload to be scheduled for execution.

PARAMETER DESCRIPTION
workload_type

The type of workload to create.

TYPE: str

backend

The backend to run the workload on.

TYPE: str

config

The config that defines the workload.

TYPE: Dict[str, Any]

wait

Whether to wait for completion to fetch results.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
workload

The created workload instance.

TYPE: Workload

RAISES DESCRIPTION
WorkloadCreationError

If creation failed.

Source code in pasqal_cloud/__init__.py
def create_workload(
    self,
    workload_type: str,
    backend: str,
    config: Dict[str, Any],
    wait: bool = False,
) -> Workload:
    """Create a workload to be scheduled for execution.

    Args:
        workload_type: The type of workload to create.
        backend: The backend to run the workload on.
        config: The config that defines the workload.
        wait: Whether to wait for completion to fetch results.

    Returns:
        workload: The created workload instance.

    Raises:
        WorkloadCreationError: If creation failed.

    """
    req = {
        "workload_type": workload_type,
        "backend": backend,
        "config": config,
    }
    try:
        workload_rsp = self._client.send_workload(req)
    except HTTPError as e:
        raise WorkloadCreationError(e) from e
    if wait:
        workload_rsp = self.wait_for_workload(workload_rsp["id"], workload_rsp)
    workload = Workload(**workload_rsp, _client=self._client)

    self.workloads[workload.id] = workload
    return workload

get_batch(id, fetch_results=False)

Retrieve a batch's data and all its jobs.

PARAMETER DESCRIPTION
fetch_results

Whether to wait for the batch to be done and fetch results

TYPE: deprecated DEFAULT: False

id

ID of the batch.

TYPE: str

RETURNS DESCRIPTION
Batch

the batch stored in the PCS database.

TYPE: Batch

RAISES DESCRIPTION
BatchFetchingError

in case fetching failed

Source code in pasqal_cloud/__init__.py
def get_batch(self, id: str, fetch_results: bool = False) -> Batch:
    """Retrieve a batch's data and all its jobs.

    Args:
        fetch_results (deprecated): Whether to wait for the batch to be
            done and fetch results
        id: ID of the batch.

    Returns:
        Batch: the batch stored in the PCS database.

    Raises:
        BatchFetchingError: in case fetching failed
    """
    if fetch_results:
        warn(
            "Argument `fetch_results` is deprecated and will be removed in a"
            " future version. Results are fetched anyway with this function.",
            DeprecationWarning,
            stacklevel=2,
        )
    batch_rsp = self._get_batch(id)
    batch = Batch(**batch_rsp, _client=self._client)
    self.batches[batch.id] = batch
    return batch

get_batches(filters=None, pagination_params=None)

Retrieve a list of batches matching filters using a pagination system.

Batches are sorted by their creation date in descending order.

If no 'pagination_params' are provided, the first 100 batches matching the query will be returned by default.

PARAMETER DESCRIPTION
filters

Filters to be applied to the batches.

TYPE: Optional[BatchFilters] DEFAULT: None

pagination_params

Pagination to be applied to the query.

TYPE: Optional[PaginationParams] DEFAULT: None

Examples:

get_batches(filters=BatchFilters(status=BatchStatus.ERROR)) Returns the first 100 batches with an ERROR status.

Get_batches(filters=BatchFilters(status=BatchStatus.ERROR), pagination_params=PaginationParams(offset=100)) Returns batches 101-200 with an ERROR status.

Get_batches(filters=BatchFilters(status=BatchStatus.ERROR), pagination_params=PaginationParams(offset=200)) Returns batches 201-300 with an ERROR status.

RETURNS DESCRIPTION
PaginatedResponse

Includes the results of the API and some pagination information.

TYPE: PaginatedResponse

RAISES DESCRIPTION
BatchFetchingError

If fetching batches call failed.

TypeError

If filters is not an instance of BatchFilters, or if pagination_params is not an instance of PaginationParams.

Source code in pasqal_cloud/__init__.py
def get_batches(
    self,
    filters: Optional[BatchFilters] = None,
    pagination_params: Optional[PaginationParams] = None,
) -> PaginatedResponse:
    """
    Retrieve a list of batches matching filters using a pagination system.

    Batches are sorted by their creation date in descending order.

    If no 'pagination_params' are provided, the first 100 batches
    matching the query will be returned by default.

    Args:
        filters: Filters to be applied to the batches.
        pagination_params: Pagination to be applied to the query.

    Examples:
    >>> get_batches(filters=BatchFilters(status=BatchStatus.ERROR))
    Returns the first 100 batches with an ERROR status.

    >>> Get_batches(filters=BatchFilters(status=BatchStatus.ERROR),
                 pagination_params=PaginationParams(offset=100))
    Returns batches 101-200 with an ERROR status.

    >>> Get_batches(filters=BatchFilters(status=BatchStatus.ERROR),
                 pagination_params=PaginationParams(offset=200))
    Returns batches 201-300 with an ERROR status.

    Returns:
        PaginatedResponse: Includes the results of the API and some
            pagination information.

    Raises:
        BatchFetchingError: If fetching batches call failed.
        TypeError: If `filters` is not an instance of BatchFilters,
            or if `pagination_params` is not an instance of PaginationParams.
    """

    if pagination_params is None:
        pagination_params = PaginationParams()
    elif not isinstance(pagination_params, PaginationParams):
        raise TypeError(
            f"Pagination parameters needs to be a PaginationParams instance, "
            f"not a {type(pagination_params)}"
        )

    if filters is None:
        filters = BatchFilters()
    elif not isinstance(filters, BatchFilters):
        raise TypeError(
            f"Filters needs to be a BatchFilters instance, not a {type(filters)}"
        )

    try:
        response = self._client.get_batches(
            filters=filters, pagination_params=pagination_params
        )
        batches = response["data"]
        pagination_response = response.get("pagination")
        # It should return a pagination all the time
        total_nb_batches = (
            pagination_response["total"] if pagination_response else 0
        )
    except HTTPError as e:
        raise BatchFetchingError(e) from e
    return PaginatedResponse(
        total=total_nb_batches,
        offset=pagination_params.offset,
        results=[Batch(**batch, _client=self._client) for batch in batches],
    )

get_device_specs_dict()

Retrieve the list of available device specifications.

RETURNS DESCRIPTION
DeviceSpecs

The list of available device specifications.

TYPE: Dict[str, str]

RAISES DESCRIPTION
DeviceSpecsFetchingError

If fetching of specs failed.

Source code in pasqal_cloud/__init__.py
def get_device_specs_dict(self) -> Dict[str, str]:
    """Retrieve the list of available device specifications.

    Returns:
        DeviceSpecs: The list of available device specifications.

    Raises:
        DeviceSpecsFetchingError: If fetching of specs failed.
    """

    try:
        return self._client.get_device_specs_dict()
    except HTTPError as e:
        raise DeviceSpecsFetchingError(e) from e

get_job(id, wait=False)

Retrieve a job's data.

PARAMETER DESCRIPTION
id

ID of the job.

TYPE: str

wait

Whether to wait for the job to be done

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Job

The job stored in the PCS database.

TYPE: Job

RAISES DESCRIPTION
JobFetchingError

If fetching failed.

Source code in pasqal_cloud/__init__.py
def get_job(self, id: str, wait: bool = False) -> Job:
    """Retrieve a job's data.

    Args:
        id: ID of the job.
        wait: Whether to wait for the job to be done

    Returns:
        Job: The job stored in the PCS database.

    Raises:
        JobFetchingError: If fetching failed.
    """
    job_rsp = self._get_job(id)
    if wait:
        while job_rsp["status"] in ["PENDING", "RUNNING"]:
            time.sleep(RESULT_POLLING_INTERVAL)
            job_rsp = self._get_job(id)
    return Job(**job_rsp, _client=self._client)

get_jobs(filters=None, pagination_params=None)

Retrieve a list of jobs matching filters using a pagination system.

Jobs are sorted by their creation date in descending order.

If no 'pagination_params' are provided, the first 100 jobs matching the query will be returned by default.

PARAMETER DESCRIPTION
filters

Filters to be applied to the jobs.

TYPE: Optional[JobFilters] DEFAULT: None

pagination_params

Pagination to be applied to the query.

TYPE: Optional[PaginationParams] DEFAULT: None

Examples:

get_jobs(filters=JobFilters(status=JobStatus.ERROR)) Returns the first 100 jobs with an ERROR status.

get_jobs(filters=JobFilters(status=JobStatus.ERROR), pagination_params=PaginationParams(offset=100)) Returns jobs 101-200 with an ERROR status.

get_jobs(filters=JobFilters(status=JobStatus.ERROR), pagination_params=PaginationParams(offset=200)) Returns jobs 201-300 with an ERROR status.

RETURNS DESCRIPTION
PaginatedResponse

Includes the results of the API and some pagination information.

TYPE: PaginatedResponse

RAISES DESCRIPTION
JobFetchingError

If fetching jobs call failed.

TypeError

If filters is not an instance of JobFilters or if pagination_params is not an instance of PaginationParams.

Source code in pasqal_cloud/__init__.py
def get_jobs(
    self,
    filters: Optional[JobFilters] = None,
    pagination_params: Optional[PaginationParams] = None,
) -> PaginatedResponse:
    """
    Retrieve a list of jobs matching filters using a pagination system.

    Jobs are sorted by their creation date in descending order.

    If no 'pagination_params' are provided, the first 100 jobs
    matching the query will be returned by default.

    Args:
        filters: Filters to be applied to the jobs.
        pagination_params: Pagination to be applied to the query.

    Examples:
    >>> get_jobs(filters=JobFilters(status=JobStatus.ERROR))
    Returns the first 100 jobs with an ERROR status.

    >>> get_jobs(filters=JobFilters(status=JobStatus.ERROR),
                 pagination_params=PaginationParams(offset=100))
    Returns jobs 101-200 with an ERROR status.

    >>> get_jobs(filters=JobFilters(status=JobStatus.ERROR),
                 pagination_params=PaginationParams(offset=200))
    Returns jobs 201-300 with an ERROR status.

    Returns:
        PaginatedResponse: Includes the results of the API and some
            pagination information.

    Raises:
        JobFetchingError: If fetching jobs call failed.
        TypeError: If `filters` is not an instance of JobFilters
                or if `pagination_params` is not an instance of PaginationParams.
    """

    if pagination_params is None:
        pagination_params = PaginationParams()
    elif not isinstance(pagination_params, PaginationParams):
        raise TypeError(
            "Pagination parameters needs to be a PaginationParams instance, "
            f"not a {type(pagination_params)}"
        )

    if filters is None:
        filters = JobFilters()
    elif not isinstance(filters, JobFilters):
        raise TypeError(
            f"Filters needs to be a JobFilters instance, not a {type(filters)}"
        )

    try:
        response = self._client.get_jobs(
            filters=filters, pagination_params=pagination_params
        )
        jobs = response["data"]
        pagination_response = response.get("pagination")
        # It should return a pagination all the time
        total_nb_jobs = pagination_response["total"] if pagination_response else 0
    except HTTPError as e:
        raise JobFetchingError(e) from e
    return PaginatedResponse(
        total=total_nb_jobs,
        offset=pagination_params.offset,
        results=[Job(**job, _client=self._client) for job in jobs],
    )

get_workload(id, wait=False)

Retrieve a workload's data.

PARAMETER DESCRIPTION
id

ID of the workload.

TYPE: str

wait

Whether to wait for the workload to be done.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
workload

The workload stored in the PCS database.

TYPE: Workload

RAISES DESCRIPTION
WorkloadFetchingError

If fetching failed.

Source code in pasqal_cloud/__init__.py
def get_workload(self, id: str, wait: bool = False) -> Workload:
    """Retrieve a workload's data.

    Args:
        id: ID of the workload.
        wait: Whether to wait for the workload to be done.

    Returns:
        workload: The workload stored in the PCS database.

    Raises:
        WorkloadFetchingError: If fetching failed.
    """
    workload_rsp = self._get_workload(id)
    if wait:
        workload_rsp = self.wait_for_workload(id, workload_rsp)
    return Workload(**workload_rsp, _client=self._client)

rebatch(id, filters=None)

Retry a group of jobs matching filters in a new batch.

PARAMETER DESCRIPTION
id

id of the batch to re-create

TYPE: Union[UUID, str]

filters

filters to be applied to find the jobs that will be retried

TYPE: Optional[RebatchFilters] DEFAULT: None

RETURNS DESCRIPTION
Batch

An instance of a rescheduled Batch model. The fields

Batch

can differ from the original batch as the record

Batch

is recreated as to prevent modifying the original batch.

RAISES DESCRIPTION
RebatchError

if rebatch call failed.

Source code in pasqal_cloud/__init__.py
def rebatch(
    self,
    id: Union[UUID, str],
    filters: Optional[RebatchFilters] = None,
) -> Batch:
    """
    Retry a group of jobs matching filters in a new batch.

    Args:
        id: id of the batch to re-create
        filters: filters to be applied to find the jobs that will be retried

    Returns:
        An instance of a rescheduled Batch model. The fields
        can differ from the original batch as the record
        is recreated as to prevent modifying the original batch.

    Raises:
        RebatchError: if rebatch call failed.
    """
    if filters is None:
        filters = RebatchFilters()
    elif not isinstance(filters, RebatchFilters):
        raise TypeError(
            f"Filters needs to be a RebatchFilters instance, not a {type(filters)}"
        )

    try:
        new_batch_data = self._client.rebatch(
            batch_id=id,
            filters=filters,
        )
    except HTTPError as e:
        raise RebatchError(e) from e
    return Batch(**new_batch_data, _client=self._client)