Skip to content
Pasqal Documentation

Batch

Batch(**data)

Bases: BaseModel

Class to load batch data return by the API.

A batch groups up several jobs with the same sequence. When a batch is assigned to a QPU, all its jobs are run sequentially and no other batch can be assigned to the device until all its jobs are done and declared complete.

ATTRIBUTE DESCRIPTION
open

Whether the batch accepts more jobs or not.

TYPE: bool

complete

Opposite of open, deprecated.

TYPE: bool

created_at

Timestamp of the creation of the batch.

TYPE: str

updated_at

Timestamp of the last update of the batch.

TYPE: str

device_type

Type of device to run the batch on.

TYPE: str

project_id

ID of the owner project of the batch.

TYPE: str

id

Unique identifier for the batch.

TYPE: str

user_id

Unique identifier of the user that created the batch.

TYPE: str

status

Status of the batch. Possible values are: PENDING, RUNNING, DONE, CANCELED, TIMED_OUT, ERROR, PAUSED.

TYPE: str

webhook

Webhook where the job results are automatically sent to.

TYPE: Optional[str]

_client

A Client instance to connect to PCS.

TYPE: Client

sequence_builder

Pulser sequence of the batch.

TYPE: Optional[str]

start_datetime

Timestamp of the time the batch was sent to the QPU.

TYPE: Optional[str]

end_datetime

Timestamp of when the batch process was finished.

TYPE: Optional[str]

device_status

Status of the device where the batch is running.

TYPE: Optional[str]

jobs

Dictionary of all the jobs added to the batch.

TYPE: deprecated

ordered_jobs

List of all the jobs added to the batch, ordered by creation time.

TYPE: List[Job]

jobs_count

Number of jobs added to the batch.

TYPE: int

jobs_count_per_status

Number of jobs per status.

TYPE: Dict[str, int]

configuration

Further configuration for certain emulators.

TYPE: Union[BaseConfig, Dict[str, Any], None]

Source code in pasqal_cloud/batch.py
def __init__(self, **data: Any) -> None:
    # Workaround to make the private attribute '_client' working
    # like we need with Pydantic V2, more information on
    # https://docs.pydantic.dev/latest/concepts/models/#private-model-attributes
    super().__init__(**data)
    self._client = data["_client"]
    if data.get("jobs"):
        self._ordered_jobs = [
            Job(**raw_job, _client=self._client) for raw_job in data["jobs"]
        ]

jobs property writable

Once the 'ordered_jobs' is built, we need to keep the 'jobs' attribute for backward compatibility with the code written by the users of the sdk

add_jobs(jobs, wait=False)

Add some jobs to batch for execution on PCS and returns the updated batch.

The batch must be open otherwise the API will return an error. The new jobs are appended to the ordered_jobs list attribute.

PARAMETER DESCRIPTION
jobs

List of jobs to be added to the batch.

TYPE: List[CreateJob]

wait

If True, blocks until all jobs in the batch are done.

TYPE: bool DEFAULT: False

Source code in pasqal_cloud/batch.py
def add_jobs(
    self,
    jobs: List[CreateJob],
    wait: bool = False,
) -> None:
    """Add some jobs to batch for execution on PCS and returns the updated batch.

    The batch must be `open` otherwise the API will return an error.
    The new jobs are appended to the `ordered_jobs` list attribute.

    Args:
        jobs: List of jobs to be added to the batch.
        wait: If True, blocks until all jobs in the batch are done.

    """
    try:
        batch_rsp = self._client.add_jobs(self.id, jobs)
    except HTTPError as e:
        raise JobCreationError(e) from e
    self._update_from_api_response(batch_rsp)

    if wait:
        while any(
            job.status in {"PENDING", "RUNNING"} for job in self.ordered_jobs
        ):
            time.sleep(RESULT_POLLING_INTERVAL)
            self.refresh()

cancel()

Cancel the current batch on the PCS.

Source code in pasqal_cloud/batch.py
def cancel(self) -> None:
    """Cancel the current batch on the PCS."""
    try:
        batch_rsp = self._client.cancel_batch(self.id)
    except HTTPError as e:
        raise BatchCancellingError(e) from e
    self._update_from_api_response(batch_rsp)

close(wait=False, fetch_results=False)

Declare to PCS that the batch is closed and returns an updated batch instance.

PARAMETER DESCRIPTION
wait

Whether to wait for the batch to be done and fetch results.

TYPE: bool DEFAULT: False

fetch_results

Whether to wait for the batch to be done and fetch results.

TYPE: deprecated DEFAULT: False

A batch that is closed awaits no extra jobs. All jobs previously added will be executed before the batch is terminated. When all its jobs are done, the closed batch is unassigned to its running device.

Source code in pasqal_cloud/batch.py
def close(self, wait: bool = False, fetch_results: bool = False) -> None:
    """Declare to PCS that the batch is closed and returns an updated
    batch instance.

    Args:
        wait: Whether to wait for the batch to be done and fetch results.
        fetch_results (deprecated): Whether to wait for the batch \
            to be done and fetch results.

    A batch that is closed awaits no extra jobs. All jobs previously added
    will be executed before the batch is terminated. When all its jobs are done,
    the closed batch is unassigned to its running device.
    """
    try:
        batch_rsp = self._client.close_batch(self.id)
    except HTTPError as e:
        raise BatchClosingError(e) from e
    self._update_from_api_response(batch_rsp)
    if wait or fetch_results:
        while any(
            job.status in {"PENDING", "RUNNING"} for job in self.ordered_jobs
        ):
            time.sleep(RESULT_POLLING_INTERVAL)
            self.refresh()

declare_complete(wait=False, fetch_results=False)

Deprecated, use close instead.

Source code in pasqal_cloud/batch.py
def declare_complete(self, wait: bool = False, fetch_results: bool = False) -> None:
    """
    Deprecated, use close instead.
    """
    warn(
        "This method is deprecated, use close instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.close(wait=wait, fetch_results=fetch_results)

refresh()

Fetch the batch from the API and update it in place.

Source code in pasqal_cloud/batch.py
def refresh(self) -> None:
    """Fetch the batch from the API and update it in place."""
    try:
        batch_rsp = self._client.get_batch(self.id)
    except HTTPError as e:
        raise BatchFetchingError(e) from e
    self._update_from_api_response(batch_rsp)

retry(job, wait=False)

Retry a job in the same batch. The batch should not be 'closed'. The new job is appended to the ordered_jobs list attribute.

PARAMETER DESCRIPTION
job

The job to retry

TYPE: Job

wait

Whether to wait for job completion

TYPE: bool DEFAULT: False

RAISES DESCRIPTION
JobRetryError

if there was an error adding the job to the batch.

Source code in pasqal_cloud/batch.py
def retry(self, job: Job, wait: bool = False) -> None:
    """
    Retry a job in the same batch.
    The batch should not be 'closed'.
    The new job is appended to the `ordered_jobs` list attribute.

    Args:
        job: The job to retry
        wait: Whether to wait for job completion

    Raises:
        JobRetryError: if there was an error adding the job to the batch.
    """
    retried_job = CreateJob(runs=job.runs, variables=job.variables)
    try:
        self.add_jobs([retried_job], wait=wait)
    except JobCreationError as e:
        raise JobRetryError from e