Migrating from oqc-qcaas-client¶
oqc-qcaas-client is the original Python library
for OQC’s Quantum Computing as a Service platform. oqc-qcaas-sdk is a
modern alternative, built around async-first design and a Job-centric
abstraction that removes the manual overhead required to use the older client.
This page shows eight scenarios where the new SDK is simpler, safer, or more capable — with side-by-side code comparisons. Each “After” example is a runnable doctest. See Quick Start to get up and running in minutes.
Batch results without the scaffolding¶
Submitting multiple circuits in the qcaas-client is possible with
execute_tasks(), but polling happens sequentially per-task and the
returned List[QPUTaskResult] has no built-in helpers for separating
successes from failures. Every script writes its own loop.
oqc-qcaas-client (before):
tasks = [QPUTask(program=prog_a), QPUTask(program=prog_b)]
results = client.execute_tasks(tasks=tasks, qpu_id=qpu_id)
# No built-in filtering — manually check every result
successes = [r for r in results if not r.has_errored()]
counts = [r.result for r in successes]
oqc-qcaas-sdk (now):
>>> async def batch_results():
... async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
... cjob = client.create_job(program=[PROG_H, PROG_X], qpu_id=QPU_ID)
... outputs = await cjob.execute(timeout_s=20)
... return len(outputs.results().all()), len(outputs.errors().all()), cjob.completed
>>> run_async(batch_results())
(2, 0, True)
All jobs are polled concurrently via asyncio.gather. outputs.results()
and outputs.errors() return filtered JobOutputProxy objects — no
if checks required.
Manual task ID tracking¶
execute_tasks() blocks until every task completes — sequentially, with no
timeout and no way to do other work in the meantime. When you need
non-blocking submission (fire-and-forget, running multiple batches in
parallel, or polling from a later session), you must drop down to
schedule_tasks(), save every task_id yourself, and write your own
polling loop.
oqc-qcaas-client (before):
import time
tasks = [QPUTask(program=prog_a), QPUTask(program=prog_b)]
# schedule_tasks mutates each QPUTask in-place, setting .task_id
scheduled = client.schedule_tasks(tasks, qpu_id=qpu_id)
# Save IDs yourself — they are the only handle to these tasks
task_ids = [t.task_id for t in scheduled]
# Poll manually — sequential, one task at a time
results = {}
pending = list(task_ids)
while pending:
for task_id in list(pending):
info = client.get_task(task_id, qpu_id)
status = info.get("status")
if status == "COMPLETED":
results[task_id] = client.get_task_results(task_id, qpu_id)
pending.remove(task_id)
elif status == "FAILED":
results[task_id] = client.get_task_errors(task_id, qpu_id)
pending.remove(task_id)
# else: still running — nothing to do but come back later
if pending:
time.sleep(2) # manual back-off; no retry logic
# Results are keyed by task_id, not by program
for task_id, result in results.items():
if result.has_errored():
print(f"{task_id}: ERROR — {result.error_details}")
else:
print(f"{task_id}: {result.result}")
oqc-qcaas-sdk (now):
>>> async def submit_and_poll():
... async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
... cjob = client.create_job(program=[PROG_H, PROG_X], qpu_id=QPU_ID)
... await cjob.execute(timeout_s=20) # concurrent submit + poll
... successes = cjob.outputs.results().all()
... failures = cjob.outputs.errors().all()
... return cjob.completed, len(successes), len(failures)
>>> run_async(submit_and_poll())
(True, 2, 0)
Submit and poll happen in a single await call. Results stay bound to the
Job object that produced them — accessible by index via cjob.jobs[n]
or by filtering cjob.outputs — rather than being keyed by an opaque UUID.
Although you can manually call submit() and refresh() in the SDK, there
is little benefit in doing so, as execute() is already non-blocking. These
fine-grained methods are not available on a CompositeJob.
Running the same circuit on multiple QPUs¶
The qcaas-client enforces that every task in a batch must target the same
QPU — passing tasks with different QPU IDs to schedule_tasks() raises a
ValueError. Running an identical circuit on two QPUs requires two separate
client calls and stitching the results together manually.
oqc-qcaas-client (before):
# Tasks with different qpu_ids raise ValueError:
# "All tasks must have the same qpu_id"
task_a = QPUTask(program=qasm, qpu_id="qpu:uk:1:aaaa")
task_b = QPUTask(program=qasm, qpu_id="qpu:uk:2:bbbb")
# client.schedule_tasks([task_a, task_b]) # <-- ValueError
# Workaround: two separate blocking calls, then merge manually
results_qpu1 = client.execute_tasks([QPUTask(program=qasm)], qpu_id="qpu:uk:1:aaaa")
results_qpu2 = client.execute_tasks([QPUTask(program=qasm)], qpu_id="qpu:uk:2:bbbb")
all_results = results_qpu1 + results_qpu2
oqc-qcaas-sdk (now):
async with OqcSdk(url=url, authentication_token=token) as sdk:
# Pass a list of QPU IDs — one Job is created per QPU, polled in parallel
cjob = sdk.create_job(
program=qasm,
qpu_id=["qpu:uk:1:aaaa", "qpu:uk:2:bbbb"],
)
outputs = await cjob.execute(timeout_s=60)
for result in outputs.results().all():
print(result.qpu_id, result.data)
Broadcasting dispatches to each QPU in parallel and returns a single
JobOutputProxy with results from all of them. The same rule extends
across program, qpu_id, and config — see Composite Jobs
for the full broadcasting table.
One persistent job tracks every experiment¶
The qcaas-client has no concept of a persistent job. Each call to
execute_tasks() takes a freshly constructed QPUTask, and previous
results are entirely disconnected from the new one. Iterating over
configurations requires building your own accumulation list and manually
reconstructing a task when you want to replay a past run.
oqc-qcaas-client (before):
# Every experiment needs a brand-new QPUTask; previous results are disconnected
past_runs = []
for shots in [500, 1024]:
config = CompilerConfig(repeats=shots)
result = client.execute_tasks([QPUTask(program=qasm, config=config)], qpu_id=qpu_id)[0]
past_runs.append((config, result))
# To replay the first experiment: reconstruct from notes you kept yourself
first_config, _ = past_runs[0]
client.execute_tasks([QPUTask(program=qasm, config=first_config)], qpu_id=qpu_id)
oqc-qcaas-sdk (now):
>>> async def iterate_experiments():
... async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
... job = client.create_job(program=PROG_H, qpu_id=QPU_ID)
... for shots in [500, 1024]:
... job.reconfigure(config=CompilerConfig(repeats=shots))
... await job.execute(timeout_s=10)
... # history is oldest-first; .first is the earliest experiment
... job.reconfigure(memento=job.history.results().first)
... await job.execute(timeout_s=10)
... shot_counts = job.history.results().map(lambda r: r.compiler_config.repeats)
... return len(job.history.results().all()), shot_counts
>>> run_async(iterate_experiments())
(3, [500, 1024, 500])
Every execute() call appends to job.history automatically. Each
history item records the compiler_config that was actually submitted, so
there is nothing to track manually. reconfigure(memento=item) restores
program, QPU, and config from any past JobResult — one call, no
reconstruction. See Re-running a Job in Working with Jobs for the full API.
Execution history as first-class data¶
After execute() returns in qcaas-client, the result lives in a local
variable. Comparing runs requires managing your own accumulation list with no
built-in way to filter by outcome.
oqc-qcaas-client (before):
# Accumulate results manually across runs
all_results = []
for cfg in configs_to_try:
all_results.append(client.execute_tasks([QPUTask(program=qasm, config=cfg)], qpu_id=qpu_id)[0])
failures = [r for r in all_results if r.has_errored()]
successes = [r for r in all_results if not r.has_errored()]
oqc-qcaas-sdk (now):
>>> async def access_history():
... async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
... job = client.create_job(program=PROG_BAD, qpu_id=QPU_ID)
... await job.execute(timeout_s=10)
... job.reconfigure(program=PROG_H)
... await job.execute(timeout_s=10)
... return (
... len(job.history.all()),
... len(job.history.errors().all()),
... len(job.history.results().all()),
... )
>>> run_async(access_history())
(2, 1, 1)
Every call to execute() appends to job.history. history is a
JobOutputProxy, so .errors(), .results(), .map(), and
.filter() work on it directly — the same interface used for
CompositeJob outputs.
Save, close, and resume¶
Closing the Python process with qcaas-client results in losing everything that hasn’t been explicitly saved. The only way to “persist” a run is to write the task ID to a file yourself and re-query results in a later session — with no program, no configuration, and no history attached.
oqc-qcaas-client (before):
# Save the task ID yourself before the session ends
task_id = task.task_id # write to a file or database manually
# Later session: re-query by ID - relies on the task still being available on the server
# No history, no program, no config — just the raw result dict
result = client.get_task_results(task_id, qpu_id)
oqc-qcaas-sdk (now):
>>> import tempfile, os
>>> sdk = OqcSdk(url="https://example.com", authentication_token="dummy")
>>> job = sdk.create_job(program="OPENQASM 2.0;", qpu_id="qpu:uk:2:d865b5a184")
>>> with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
... tmp = f.name
>>> job.save(tmp)
>>> restored = sdk.load(tmp)
>>> os.unlink(tmp)
>>> restored.program == job.program
True
The saved file captures the full job state: program, QPU, compiler
configuration, current state, result, error, and the complete execution
history. Resuming in a new session is a single sdk.load() call:
async with OqcSdk(url=url, authentication_token=token) as sdk:
job = sdk.load("my_experiment.json")
print(job.history.all()) # all previous runs are intact
await job.execute() # submit a fresh run
For the full persistence API, including CompositeJob snapshots and
schema versioning, see Saving and Loading Jobs.
One-call diagnostics¶
Retrieving timing, metrics, and metadata from qcaas-client requires three separate HTTP round-trips — easy to forget and impossible to batch.
oqc-qcaas-client (before):
# Three separate calls after the task completes
timings = client.get_task_timings(task_id, qpu_id)
metrics = client.get_task_metrics(task_id, qpu_id)
metadata = client.get_task_metadata(task_id, qpu_id)
oqc-qcaas-sdk (now):
job_with_diagnostics = sdk.create_job(program=qasm, qpu_id=qpu_id, fetch_diagnostics=True)
await job_with_diagnostics.execute(timeout_s=30)
print(job_with_diagnostics.timings) # dict | None
print(job_with_diagnostics.metrics) # dict | None
print(job_with_diagnostics.metadata) # dict | None
All three are fetched concurrently once the job reaches a terminal state.
If one fetch fails (e.g. the data is not yet available) that attribute
remains None while the others are unaffected — a missing diagnostic
never blocks access to the result. The flag is persisted across
save() / load() cycles so a restored job continues to collect
diagnostics on subsequent runs.
For details see Execution Diagnostics in Working with Jobs.
Functional result processing¶
execute_tasks() in qcaas-client returns a plain Python list — no
built-in filtering or chaining.
oqc-qcaas-client (before):
results = client.execute_tasks(tasks=tasks, qpu_id=qpu_id)
counts = []
for r in results:
if not r.has_errored():
counts.append(r.result["c"])
oqc-qcaas-sdk (now):
>>> async def manipulate_results():
... async with OqcSdk(url=URL, authentication_token=TOKEN) as client:
... cjob = client.create_job(program=[PROG_H, PROG_X], qpu_id=QPU_ID)
... outputs = await cjob.execute(timeout_s=20)
... counts_list = outputs.results().map(lambda r: r.data["c"])
... return len(counts_list), all("c" in r.data for r in outputs.results().all())
>>> run_async(manipulate_results())
(2, True)
JobOutputProxy provides .map(fn), .filter(pred), .results(),
.errors(), .all(), .first, and .one. The same interface works
on job.outputs, job.history, and CompositeJob outputs, so code
written for one type works automatically with the other.
For workflows that continue into numerical analysis, the result can be passed directly into NumPy without a separate accumulation step:
counts_list = outputs.results().map(lambda r: r.data["c"])
# Each element is a dict like {"0": 480, "1": 520}
import numpy as np
counts_array = np.array([[c.get("0", 0), c.get("1", 0)] for c in counts_list])
# counts_array.shape == (2, 2) for 2 circuits
See Composite Jobs for the full JobOutputProxy API.
Getting Started¶
Ready to migrate? See Quick Start for the minimal connection and
first-job pattern, then Working with Jobs for the complete Job API reference.