API

Client

class oqc_qcaas_sdk.client.OqcSdk(url: str, authentication_token: str | None = None, email: str | None = None, password: str | None = None, auto_authenticate: bool = True, default_qpu_id: str | None = None)

Async SDK for OQC Quantum Computing as a Service.

Tasks are first-class objects with methods (submit/execute/refresh/wait).

async close() None

Close the SDK connection opened with open().

Releases the underlying HTTP client and its connection pool. Safe to call even if the SDK was never opened.

Example (Jupyter notebook):

# Final cell — always close, even if earlier cells raised
await sdk.close()

See also

open

Counterpart that opens the connection.

create_job(*, program: str | list[str], qpu_id: str | list[str] | None = None, config: CompilerConfig | ExperimentalConfig | list[CompilerConfig | ExperimentalConfig] | None = None, fetch_diagnostics: bool = False) Job | CompositeJob

Create one or more mutable Jobs bound to this client.

  • If both program and qpu_id are scalars, returns a single Job.

  • If either is a list, returns all combinations as Jobs.

  • If qpu_id is None, uses the default_qpu_id (must be set first).

  • If fetch_diagnostics is True, the job will fetch timing, metrics, and metadata from the server each time a run reaches a terminal state. The results are stored on job.timings, job.metrics, and job.metadata respectively. Fetch failures are silently suppressed so the job result is never affected.

property default_qpu_id: str | None

Get the default QPU ID for this SDK instance.

from_snapshot(snapshot: JobSnapshot | CompositeJobSnapshot) Job | CompositeJob

Rehydrate a Job or CompositeJob from a snapshot model.

The live client is attached automatically. Use this when you already have a snapshot object in memory (e.g. from job.to_snapshot()). To load from a file, use sdk.load(path) instead.

Parameters:

snapshot – A JobSnapshot or CompositeJobSnapshot as returned by job.to_snapshot() or composite.to_snapshot().

Returns:

Fully restored object with this SDK instance attached.

Return type:

Job | CompositeJob

Raises:

ValueError – If the snapshot’s schema version is not supported.

async get_calibration(qpu_id: str | None = None, date_filter: date | datetime | str | None = None) Any

Get QPU calibration data.

Parameters:
  • qpu_id – QPU identifier, uses default if not provided

  • date_filter – Optional date to filter calibration data. Accepts a datetime.date, datetime.datetime, or an ISO-format string 'YYYY-MM-DD'. Raises ValueError for malformed strings.

Returns:

Dict containing QPU calibration information

async get_features(qpu_id: str | None = None) Any

Get QPU feature set (capabilities, max shots, gate set, etc.).

Parameters:

qpu_id – QPU identifier, uses default if not provided

Returns:

Dict containing QPU features like Maximum Shots, Max Entangled Measurements, etc.

async get_next_window(*, qpu_id: str | None = None) datetime | None

Return next execution window for a given QPU.

Parameters:

qpu_id – QPU identifier, uses default if not provided

Returns:

The start of the next available execution window as a datetime, or None if no window is currently scheduled.

async get_system_status(qpu_id: str | None = None) Any

Get QPU system status (health, availability).

Parameters:

qpu_id – QPU identifier. If provided, returns status for that QPU. If None, returns status for all QPUs.

Returns:

Dict or list of dicts containing QPU status information

async list_qpus() Any

Return QPU metadata.

load(path: str | Path) Job | CompositeJob

Load a Job or CompositeJob from a JSON file written by save().

Reads the file, determines whether it contains a single Job or a CompositeJob, validates the schema version, and returns the restored object with this SDK instance attached.

Parameters:

path – Path to a .json file previously written by job.save() or composite.save().

Returns:

Fully restored object with this SDK instance attached.

Return type:

Job | CompositeJob

Raises:
  • ValueError – If the file contains invalid JSON, does not conform to the expected snapshot schema, has an unsupported schema version, or has an unrecognised snapshot_kind.

  • FileNotFoundError – If path does not exist.

async open() OqcSdk

Open the SDK connection without using a context manager.

Use open() and close() when you need the SDK to persist across multiple cells in a Jupyter notebook, where the async with block scope would otherwise force all work into a single cell.

Returns:

  • OqcSdk

    self, so instantiation and connection can be chained:

    sdk = await OqcSdk(url, token).open()
    
  • Example (Jupyter notebook):: – # Cell 1 — connect once sdk = OqcSdk(url=”https://cloud.oqc.app”, authentication_token=”…”) await sdk.open()

    # Cell 2, 3, … — use sdk freely across cells job = sdk.create_job(program, qpu_id) result = await job.execute()

    # Final cell — release resources await sdk.close()

See also

close

Counterpart that releases the connection.

set_default_qpu(qpu_id: str) None

Set the default QPU ID for this SDK instance.

Parameters:

qpu_id – QPU identifier to use as default

Example

>>> from oqc_qcaas_sdk import OqcSdk
>>> sdk = OqcSdk(url="https://example.com", authentication_token="token")
>>> sdk.set_default_qpu("qpu:uk:1:12345")
>>> sdk.default_qpu_id
'qpu:uk:1:12345'

Jobs

class oqc_qcaas_sdk.job.CompositeJob(jobs: list[Job] | None = None)

A Job that coordinates multiple jobs and can execute them all.

property active_jobs: list[Job]

Jobs whose latest run is still in-flight (not terminal).

async cancel() CompositeJob

Cancel all component jobs concurrently.

async cancel_if_active() CompositeJob

Cancel all in-flight child jobs concurrently. No-op for terminal children.

property completed: bool

True if every child job’s latest run completed successfully.

property completed_jobs: list[Job]

Return list of successfully completed jobs.

async execute(*, timeout_s: float | None = 10.0) JobOutputProxy

Execute all component jobs using batch submission and concurrent polling.

Per-child logic: - Active jobs (already in-flight): resume polling only. - Terminal or fresh jobs: reset if terminal, then batch-submit.

Parameters:

timeout_s (float, optional) – Maximum time to wait for each job to complete. Default is 10 seconds.

Returns:

Collection of results and errors from all jobs

Return type:

JobOutputProxy

property failed_jobs: list[Job]

Return list of failed jobs.

property outputs: JobOutputProxy

Outputs from the most recent execute() call across all child jobs.

save(path: str | Path) None

Save this composite job’s complete state to a JSON file.

The file can be reloaded in a later session via sdk.load(path) or sdk.from_snapshot(composite.to_snapshot()) for programmatic use.

Parameters:

path – File path to write. The file is created or overwritten.

property terminal_jobs: list[Job]

Jobs whose latest run has reached a terminal state.

to_snapshot() CompositeJobSnapshot

Return a serialisable snapshot of this composite job’s complete state.

Captures the full state of every child job including their execution history and results. Use save() to persist to a file, or pass the returned CompositeJobSnapshot to sdk.from_snapshot().

Returns:

Pydantic model representing this composite job and all its children.

Return type:

CompositeJobSnapshot

class oqc_qcaas_sdk.job.Job(program: str, qpu_id: str, config: CompilerConfig | ExperimentalConfig | None = None, task_id: uuid.UUID | None = None, tag: str | None = None, hybrid_marker: str | None = None, fetch_diagnostics: bool = False, _client: Any = None)

Mutable job handle for task management.

A Job encapsulates: - the input settings (program, qpu_id, config) - the task_id once submitted - the latest known state (status/result/error/metadata) - history of previous results/errors

Diagnostic attributes (populated when fetch_diagnostics=True is passed to create_job()):

  • timings — wall-clock and queue timing breakdown (dict | None)

  • metrics — gate counts, circuit depth, and similar metrics (dict | None)

  • metadata — compiler and hardware metadata (dict | None)

async cancel() Job

Cancel this job.

async cancel_if_active() Job

Cancel this job if the latest run is still in-flight. No-op if terminal.

Returns self for chaining.

async execute(*, timeout_s: float | None = 10.0) JobOutputProxy

Submit (or resume), wait, and return outputs for this run.

  • If the latest run is active (CREATED/SUBMITTED/RUNNING/UNKNOWN): resumes polling.

  • If the latest run is terminal (COMPLETED/FAILED/CANCELLED): resets and submits a fresh run.

Returns:

Container holding the result or error for this run.

Return type:

JobOutputProxy

property history: JobOutputProxy

Append-only record of all run outputs (oldest first).

Includes the current run’s output once it completes. Use .all() to iterate all entries, .first for the most recent.

property outputs: JobOutputProxy

The output (result or error) for the latest completed run.

Returns an empty JobOutputProxy if no run has completed yet.

raise_for_status() None

Raise exception if job failed.

reconfigure(memento: JobOutput | None = None, **kwargs)

Re-configure this job for re-submission or re-execution using an existing JobOutput as a template. Additional overrides can be provided as keyword arguments for. Always resets the task_id and the state to CREATED.

Parameters:
  • memento (JobOutput, optional) – A previous JobOutput to use as a template for re-initialization.

  • program (str, optional) – Override the program for this job.

  • qpu_id (str, optional) – Override the QPU ID for this job.

  • config (CompilerConfig, optional) – Override the compiler configuration.

async refresh() Job

Refresh status/result/error into this object.

save(path: str | Path) None

Save this job’s complete state to a JSON file.

The file can be reloaded in a later session via sdk.load(path) or sdk.from_snapshot(job.to_snapshot()) for programmatic use.

Parameters:

path – File path to write. The file is created or overwritten.

async submit(*, timeout_s: float | None = 30.0) Job

Submit this job to the QPU queue.

Parameters:

timeout_s (float, optional) – Maximum time to wait for submission to complete. Default is 30 seconds.

to_snapshot() JobSnapshot

Return a serialisable snapshot of this job’s complete state.

Captures program, QPU, config, current state, latest result or error, and the full execution history. The live client and asyncio lock are excluded — both are attached fresh when the snapshot is loaded via sdk.from_snapshot() or sdk.load().

Use save() to persist directly to a file, or pass the returned JobSnapshot to sdk.from_snapshot() to rehydrate in a new session.

Returns:

Pydantic model representing this job’s full state.

Return type:

JobSnapshot

async wait(timeout_s: float | None = 10.0) Job

Wait for job to complete.

class oqc_qcaas_sdk.job.JobState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Results & Output

class oqc_qcaas_sdk.job_output.JobError(error_code: int | None = None, error_message: str | None = None, qpu_id: str | None = None, task_id: uuid.UUID | None = None, compiler_config: CompilerConfig | None = None, program: str | None = None, received: datetime.datetime | None = None, job: Job | None = None)

Failed task output.

classmethod from_json(json_str: str, job: Job | None = None)

Deserialize from JSON string.

to_dict() dict

Serialize to dict.

to_json() str

Serialize to JSON string.

class oqc_qcaas_sdk.job_output.JobOutput(qpu_id: str | None = None, task_id: uuid.UUID | None = None, compiler_config: CompilerConfig | None = None, program: str | None = None, received: datetime.datetime | None = None, job: Job | None = None)

Historical job output for failed or completed status.

classmethod from_json(json_str: str, job: Job | None = None)

Deserialize from JSON string.

to_dict() dict

Serialize to dict.

to_json() str

Serialize to JSON string.

class oqc_qcaas_sdk.job_output.JobResult(data: Mapping[str, Any] | None = None, qpu_id: str | None = None, task_id: uuid.UUID | None = None, compiler_config: CompilerConfig | None = None, program: str | None = None, received: datetime.datetime | None = None, job: Job | None = None)

Successful task output with measurement counts.

classmethod from_json(json_str: str, job: Job | None = None)

Deserialize from JSON string.

to_counts_dataarray(creg: str, endianness: Endianness = Endianness.BIG) DataArray

Convert this result to a (1, n_bitstrings) counts DataArray.

Convenience wrapper around results_to_counts_dataarray(). Requires BinaryCount results format (the SDK default).

to_dict() dict

Serialize to dict.

to_json() str

Serialize to JSON string.

class oqc_qcaas_sdk.job_output_proxy.AttrPolicy(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)
class oqc_qcaas_sdk.job_output_proxy.JobOutputProxy(items: T | Iterable | None = None, attr_policy: AttrPolicy = AttrPolicy.RAISE)

Proxy for outputs to make them easier to work with in universal situations particularly when returning batches and single job outputs.

like(pattern: str, creg: str, endianness: Endianness = Endianness.BIG) DataArray

Return a counts DataArray filtered to bitstrings matching a wildcard pattern.

Combines to_counts_dataarray() and filter_counts_dataarray() in one call. Pattern characters: '0'/'1' exact match, '*' wildcard. The pattern must be exactly as long as the bitstrings in creg and written in the same endianness convention. Errors in the proxy are skipped with a UserWarning (same behaviour as to_counts_dataarray()).

raise_for_error() JobOutputProxy[T]

Raises the first JobError found. Returns self otherwise.

to_counts_dataarray(creg: str, endianness: Endianness = Endianness.BIG) DataArray

Convert results to a counts scipp DataArray.

Only JobResult items contribute rows; any JobError items are skipped with a UserWarning. Call raise_for_error() first for strict all-or-nothing behaviour.

See results_to_counts_dataarray() for the full DataArray structure and the endianness parameter.

Data Analysis Utilities

Shared utilities for scipp DataArray conversion.

class oqc_qcaas_sdk.dataarray_utils.Endianness(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bit-ordering convention for bitstring interpretation.

OQC natively produces big-endian bitstrings (leftmost character is the most-significant bit). Qiskit and several other quantum SDKs use little-endian (leftmost character is the least-significant bit).

Pass this to results_to_counts_dataarray(), bits(), and like() whenever you need to work in a convention other than OQC’s default.

BIG = 'big'

leftmost character is the most-significant bit (OQC default).

Type:

Big-endian

LITTLE = 'little'

leftmost character is the least-significant bit (Qiskit convention).

Type:

Little-endian

oqc_qcaas_sdk.dataarray_utils.bits(s: str, endianness: Endianness = Endianness.BIG) Variable

Convert a bitstring to a unit-less scipp scalar for use as a DataArray index.

Returns the physical (big-endian) integer value regardless of endianness, so bits("011", BIG) and bits("110", LITTLE) both return 3 and select the same DataArray column.

Raises ValueError if s is empty or contains non-binary characters.

oqc_qcaas_sdk.dataarray_utils.filter_counts_dataarray(da: DataArray, pattern: str) DataArray

Filter a counts DataArray to columns whose "bitstring" coordinate matches a wildcard pattern ('0'/'1' exact, '*' wildcard).

The pattern must be written in the same Endianness convention that was used to build da, and must be exactly as long as the bitstrings in da.

Prefer like(), which builds the DataArray and filters in one call.

Raises ValueError for an empty/invalid pattern or a length mismatch.

oqc_qcaas_sdk.dataarray_utils.results_to_counts_dataarray(results: Sequence[JobResult], creg: str, endianness: Endianness = Endianness.BIG) sc.DataArray

Convert a sequence of JobResult objects to a 2-D counts scipp DataArray.

Requires BinaryCount results format (the SDK default).

The returned array has dim="result" (one row per result) and dim="bit" (one column per unique observed bitstring, filled with 0 where absent). Three coordinates label the "bit" axis: "result" (row index), "bit" (physical big-endian integer), and "bitstring" (string label in the requested endianness).

See Analysing Results with DataArrays for usage examples.

Parameters:
  • results – Non-empty sequence of JobResult objects.

  • creg – Name of the classical register to extract counts from.

  • endianness – Bit-ordering convention for "bitstring" labels. BIG (default) matches OQC’s native notation; LITTLE matches Qiskit’s convention.

Raises:
  • ValueError – If results is empty or bitstrings in creg have inconsistent lengths.

  • KeyError – If creg is not found in any of the supplied results.

Protocols

class oqc_qcaas_sdk.job_like.JobLike(*args, **kwargs)

Structural interface satisfied by both Job and CompositeJob.

Consumers should type-annotate with JobLike when they want to accept either kind of job without branching on the concrete type.

Persistence

Pydantic v2 models for Job and CompositeJob persistence.

These models define the at-rest schema for saved jobs. They are deliberately separate from the runtime Job classes — all runtime objects are plain Python; Pydantic is used only for serialization, validation, and discriminated-union dispatch on the kind field.

Users interact with these indirectly via:
  • job.to_snapshot() / job.save(path)

  • sdk.from_snapshot(snapshot) / sdk.load(path)

class oqc_qcaas_sdk.job_snapshot.CompositeJobSnapshot(*, schema_version: str = '1', snapshot_kind: ~typing.Literal['composite'] = 'composite', jobs: list[~oqc_qcaas_sdk.job_snapshot.JobSnapshot] = <factory>)

Complete serializable state of a CompositeJob.

Contains a snapshot of every child Job. Loading this via sdk.from_snapshot() restores the full CompositeJob with all children, their state, results, and history.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class oqc_qcaas_sdk.job_snapshot.JobErrorSnapshot(*, qpu_id: str | None = None, task_id: UUID | None = None, program: str | None = None, received: datetime | None = None, config_json: str | None = None, config_type: Literal['CompilerConfig', 'ExperimentalConfig'] | None = None, timings: dict | None = None, metrics: dict | None = None, metadata: dict | None = None, kind: Literal['error'] = 'error', error_code: int | None = None, error_message: str | None = None)

Snapshot of a failed run output.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class oqc_qcaas_sdk.job_snapshot.JobOutputSnapshot(*, qpu_id: str | None = None, task_id: UUID | None = None, program: str | None = None, received: datetime | None = None, config_json: str | None = None, config_type: Literal['CompilerConfig', 'ExperimentalConfig'] | None = None, timings: dict | None = None, metrics: dict | None = None, metadata: dict | None = None)

Fields shared by both result and error history items.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class oqc_qcaas_sdk.job_snapshot.JobResultSnapshot(*, qpu_id: str | None = None, task_id: ~uuid.UUID | None = None, program: str | None = None, received: ~datetime.datetime | None = None, config_json: str | None = None, config_type: ~typing.Literal['CompilerConfig', 'ExperimentalConfig'] | None = None, timings: dict | None = None, metrics: dict | None = None, metadata: dict | None = None, kind: ~typing.Literal['result'] = 'result', data: dict = <factory>)

Snapshot of a successful run output.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class oqc_qcaas_sdk.job_snapshot.JobSnapshot(*, schema_version: str = '1', snapshot_kind: ~typing.Literal['job'] = 'job', program: str, qpu_id: str, config_json: str | None = None, config_type: ~typing.Literal['CompilerConfig', 'ExperimentalConfig'] | None = None, tag: str | None = None, hybrid_marker: str | None = None, timings: dict | None = None, metrics: dict | None = None, metadata: dict | None = None, task_id: ~uuid.UUID | None = None, state: ~typing.Literal['CREATED', 'SUBMITTED', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED'] = 'CREATED', raise_on_error: bool = False, fetch_diagnostics: bool = False, result: ~oqc_qcaas_sdk.job_snapshot.JobResultSnapshot | None = None, error: ~oqc_qcaas_sdk.job_snapshot.JobErrorSnapshot | None = None, history: list[~typing.Annotated[~oqc_qcaas_sdk.job_snapshot.JobResultSnapshot | ~oqc_qcaas_sdk.job_snapshot.JobErrorSnapshot, FieldInfo(annotation=NoneType, required=True, discriminator='kind')]] = <factory>)

Complete serializable state of a single Job.

Captures everything needed to restore a Job in a later session: program, QPU, config, current state, latest result or error, and the full execution history.

The live HTTP client and asyncio lock are excluded — both are created fresh when the snapshot is loaded via sdk.from_snapshot() or sdk.load().

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Exceptions

exception oqc_qcaas_sdk.exceptions.JobFailed(task_id: 'str', qpu_id: 'str | None' = None, code: 'int | None' = None, details: 'str | None' = None)