Migrating from oqc-qcaas-client

oqc-qcaas-client is the original Python library for OQC’s Quantum Computing as a Service platform. oqc-qcaas-sdk is a modern alternative, built around async-first design and a Job-centric abstraction that removes the manual overhead required to use the older client.

This page shows eight scenarios where the new SDK is simpler, safer, or more capable — with side-by-side code comparisons. Each “After” example is a runnable doctest. See Quick Start to get up and running in minutes.

Batch results without the scaffolding

Submitting multiple circuits in the qcaas-client is possible with execute_tasks(), but polling happens sequentially per-task and the returned List[QPUTaskResult] has no built-in helpers for separating successes from failures. Every script writes its own loop.

oqc-qcaas-client (before):

tasks = [QPUTask(program=prog_a), QPUTask(program=prog_b)]
results = client.execute_tasks(tasks=tasks, qpu_id=qpu_id)

# No built-in filtering — manually check every result
successes = [r for r in results if not r.has_errored()]
counts    = [r.result for r in successes]

oqc-qcaas-sdk (now):

>>> async def batch_results():
...     async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
...         cjob = client.create_job(program=[PROG_H, PROG_X], qpu_id=QPU_ID)
...         outputs = await cjob.execute(timeout_s=20)
...         return len(outputs.results().all()), len(outputs.errors().all()), cjob.completed
>>> run_async(batch_results())
(2, 0, True)

All jobs are polled concurrently via asyncio.gather. outputs.results() and outputs.errors() return filtered JobOutputProxy objects — no if checks required.

Manual task ID tracking

execute_tasks() blocks until every task completes — sequentially, with no timeout and no way to do other work in the meantime. When you need non-blocking submission (fire-and-forget, running multiple batches in parallel, or polling from a later session), you must drop down to schedule_tasks(), save every task_id yourself, and write your own polling loop.

oqc-qcaas-client (before):

import time

tasks = [QPUTask(program=prog_a), QPUTask(program=prog_b)]

# schedule_tasks mutates each QPUTask in-place, setting .task_id
scheduled = client.schedule_tasks(tasks, qpu_id=qpu_id)

# Save IDs yourself — they are the only handle to these tasks
task_ids = [t.task_id for t in scheduled]

# Poll manually — sequential, one task at a time
results = {}
pending = list(task_ids)
while pending:
    for task_id in list(pending):
        info = client.get_task(task_id, qpu_id)
        status = info.get("status")
        if status == "COMPLETED":
            results[task_id] = client.get_task_results(task_id, qpu_id)
            pending.remove(task_id)
        elif status == "FAILED":
            results[task_id] = client.get_task_errors(task_id, qpu_id)
            pending.remove(task_id)
        # else: still running — nothing to do but come back later
    if pending:
        time.sleep(2)   # manual back-off; no retry logic

# Results are keyed by task_id, not by program
for task_id, result in results.items():
    if result.has_errored():
        print(f"{task_id}: ERROR — {result.error_details}")
    else:
        print(f"{task_id}: {result.result}")

oqc-qcaas-sdk (now):

>>> async def submit_and_poll():
...     async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
...         cjob = client.create_job(program=[PROG_H, PROG_X], qpu_id=QPU_ID)
...         await cjob.execute(timeout_s=20)   # concurrent submit + poll
...         successes = cjob.outputs.results().all()
...         failures  = cjob.outputs.errors().all()
...         return cjob.completed, len(successes), len(failures)
>>> run_async(submit_and_poll())
(True, 2, 0)

Submit and poll happen in a single await call. Results stay bound to the Job object that produced them — accessible by index via cjob.jobs[n] or by filtering cjob.outputs — rather than being keyed by an opaque UUID. Although you can manually call submit() and refresh() in the SDK, there is little benefit in doing so, as execute() is already non-blocking. These fine-grained methods are not available on a CompositeJob.

Running the same circuit on multiple QPUs

The qcaas-client enforces that every task in a batch must target the same QPU — passing tasks with different QPU IDs to schedule_tasks() raises a ValueError. Running an identical circuit on two QPUs requires two separate client calls and stitching the results together manually.

oqc-qcaas-client (before):

# Tasks with different qpu_ids raise ValueError:
# "All tasks must have the same qpu_id"
task_a = QPUTask(program=qasm, qpu_id="qpu:uk:1:aaaa")
task_b = QPUTask(program=qasm, qpu_id="qpu:uk:2:bbbb")
# client.schedule_tasks([task_a, task_b])  # <-- ValueError

# Workaround: two separate blocking calls, then merge manually
results_qpu1 = client.execute_tasks([QPUTask(program=qasm)], qpu_id="qpu:uk:1:aaaa")
results_qpu2 = client.execute_tasks([QPUTask(program=qasm)], qpu_id="qpu:uk:2:bbbb")
all_results = results_qpu1 + results_qpu2

oqc-qcaas-sdk (now):

async with OqcSdk(url=url, authentication_token=token) as sdk:
    # Pass a list of QPU IDs — one Job is created per QPU, polled in parallel
    cjob = sdk.create_job(
        program=qasm,
        qpu_id=["qpu:uk:1:aaaa", "qpu:uk:2:bbbb"],
    )
    outputs = await cjob.execute(timeout_s=60)

    for result in outputs.results().all():
        print(result.qpu_id, result.data)

Broadcasting dispatches to each QPU in parallel and returns a single JobOutputProxy with results from all of them. The same rule extends across program, qpu_id, and config — see Composite Jobs for the full broadcasting table.

One persistent job tracks every experiment

The qcaas-client has no concept of a persistent job. Each call to execute_tasks() takes a freshly constructed QPUTask, and previous results are entirely disconnected from the new one. Iterating over configurations requires building your own accumulation list and manually reconstructing a task when you want to replay a past run.

oqc-qcaas-client (before):

# Every experiment needs a brand-new QPUTask; previous results are disconnected
past_runs = []
for shots in [500, 1024]:
    config = CompilerConfig(repeats=shots)
    result = client.execute_tasks([QPUTask(program=qasm, config=config)], qpu_id=qpu_id)[0]
    past_runs.append((config, result))

# To replay the first experiment: reconstruct from notes you kept yourself
first_config, _ = past_runs[0]
client.execute_tasks([QPUTask(program=qasm, config=first_config)], qpu_id=qpu_id)

oqc-qcaas-sdk (now):

>>> async def iterate_experiments():
...     async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
...         job = client.create_job(program=PROG_H, qpu_id=QPU_ID)
...         for shots in [500, 1024]:
...             job.reconfigure(config=CompilerConfig(repeats=shots))
...             await job.execute(timeout_s=10)
...         # history is oldest-first; .first is the earliest experiment
...         job.reconfigure(memento=job.history.results().first)
...         await job.execute(timeout_s=10)
...         shot_counts = job.history.results().map(lambda r: r.compiler_config.repeats)
...         return len(job.history.results().all()), shot_counts
>>> run_async(iterate_experiments())
(3, [500, 1024, 500])

Every execute() call appends to job.history automatically. Each history item records the compiler_config that was actually submitted, so there is nothing to track manually. reconfigure(memento=item) restores program, QPU, and config from any past JobResult — one call, no reconstruction. See Re-running a Job in Working with Jobs for the full API.

Execution history as first-class data

After execute() returns in qcaas-client, the result lives in a local variable. Comparing runs requires managing your own accumulation list with no built-in way to filter by outcome.

oqc-qcaas-client (before):

# Accumulate results manually across runs
all_results = []
for cfg in configs_to_try:
    all_results.append(client.execute_tasks([QPUTask(program=qasm, config=cfg)], qpu_id=qpu_id)[0])

failures  = [r for r in all_results if r.has_errored()]
successes = [r for r in all_results if not r.has_errored()]

oqc-qcaas-sdk (now):

>>> async def access_history():
...     async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
...         job = client.create_job(program=PROG_BAD, qpu_id=QPU_ID)
...         await job.execute(timeout_s=10)
...         job.reconfigure(program=PROG_H)
...         await job.execute(timeout_s=10)
...         return (
...             len(job.history.all()),
...             len(job.history.errors().all()),
...             len(job.history.results().all()),
...         )
>>> run_async(access_history())
(2, 1, 1)

Every call to execute() appends to job.history. history is a JobOutputProxy, so .errors(), .results(), .map(), and .filter() work on it directly — the same interface used for CompositeJob outputs.

Save, close, and resume

Closing the Python process with qcaas-client results in losing everything that hasn’t been explicitly saved. The only way to “persist” a run is to write the task ID to a file yourself and re-query results in a later session — with no program, no configuration, and no history attached.

oqc-qcaas-client (before):

# Save the task ID yourself before the session ends
task_id = task.task_id  # write to a file or database manually

# Later session: re-query by ID - relies on the task still being available on the server
# No history, no program, no config — just the raw result dict
result = client.get_task_results(task_id, qpu_id)

oqc-qcaas-sdk (now):

>>> import tempfile, os
>>> sdk = OqcSdk(url="https://example.com", authentication_token="dummy")
>>> job = sdk.create_job(program="OPENQASM 2.0;", qpu_id="qpu:uk:2:d865b5a184")
>>> with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
...     tmp = f.name
>>> job.save(tmp)
>>> restored = sdk.load(tmp)
>>> os.unlink(tmp)
>>> restored.program == job.program
True

The saved file captures the full job state: program, QPU, compiler configuration, current state, result, error, and the complete execution history. Resuming in a new session is a single sdk.load() call:

async with OqcSdk(url=url, authentication_token=token) as sdk:
    job = sdk.load("my_experiment.json")
    print(job.history.all())   # all previous runs are intact
    await job.execute()        # submit a fresh run

For the full persistence API, including CompositeJob snapshots and schema versioning, see Saving and Loading Jobs.

One-call diagnostics

Retrieving timing, metrics, and metadata from qcaas-client requires three separate HTTP round-trips — easy to forget and impossible to batch.

oqc-qcaas-client (before):

# Three separate calls after the task completes
timings  = client.get_task_timings(task_id, qpu_id)
metrics  = client.get_task_metrics(task_id, qpu_id)
metadata = client.get_task_metadata(task_id, qpu_id)

oqc-qcaas-sdk (now):

job_with_diagnostics = sdk.create_job(program=qasm, qpu_id=qpu_id, fetch_diagnostics=True)
await job_with_diagnostics.execute(timeout_s=30)

print(job_with_diagnostics.timings)    # dict | None
print(job_with_diagnostics.metrics)    # dict | None
print(job_with_diagnostics.metadata)   # dict | None

All three are fetched concurrently once the job reaches a terminal state. If one fetch fails (e.g. the data is not yet available) that attribute remains None while the others are unaffected — a missing diagnostic never blocks access to the result. The flag is persisted across save() / load() cycles so a restored job continues to collect diagnostics on subsequent runs.

For details see Execution Diagnostics in Working with Jobs.

Functional result processing

execute_tasks() in qcaas-client returns a plain Python list — no built-in filtering or chaining.

oqc-qcaas-client (before):

results = client.execute_tasks(tasks=tasks, qpu_id=qpu_id)

counts = []
for r in results:
    if not r.has_errored():
        counts.append(r.result["c"])

oqc-qcaas-sdk (now):

>>> async def manipulate_results():
...     async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
...         cjob = client.create_job(program=[PROG_H, PROG_X], qpu_id=QPU_ID)
...         outputs = await cjob.execute(timeout_s=20)
...         counts_list = outputs.results().map(lambda r: r.data["c"])
...         return len(counts_list), all("c" in r.data for r in outputs.results().all())
>>> run_async(manipulate_results())
(2, True)

JobOutputProxy provides .map(fn), .filter(pred), .results(), .errors(), .all(), .first, and .one. The same interface works on job.outputs, job.history, and CompositeJob outputs, so code written for one type works automatically with the other.

For workflows that continue into numerical analysis, the result can be passed directly into NumPy without a separate accumulation step:

counts_list = outputs.results().map(lambda r: r.data["c"])
# Each element is a dict like {"0": 480, "1": 520}

import numpy as np
counts_array = np.array([[c.get("0", 0), c.get("1", 0)] for c in counts_list])
# counts_array.shape == (2, 2) for 2 circuits

See Composite Jobs for the full JobOutputProxy API.

Getting Started

Ready to migrate? See Quick Start for the minimal connection and first-job pattern, then Working with Jobs for the complete Job API reference.