انتقل إلى المحتوى الرئيسي

إدارة موارد الحوسبة والبيانات في Qiskit Serverless

إصدارات الحزم

الكود الموجود في هذه الصفحة تم تطويره باستخدام المتطلبات التالية. نوصي باستخدام هذه الإصدارات أو أحدث منها.

qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0

باستخدام Qiskit Serverless، يمكنك إدارة الحوسبة والبيانات عبر نمط Qiskit الخاص بك، بما في ذلك وحدات المعالجة المركزية (CPUs)، ووحدات معالجة الكم (QPUs)، وغيرها من مسرّعات الحوسبة.

تعيين حالات تفصيلية

لأحمال عمل Serverless عدة مراحل خلال سير العمل. افتراضيًا، يمكن مشاهدة الحالات التالية باستخدام job.status():

  • QUEUED: حمل العمل في قائمة الانتظار للموارد الكلاسيكية
  • INITIALIZING: جارٍ إعداد حمل العمل
  • RUNNING: حمل العمل يعمل حاليًا على الموارد الكلاسيكية
  • DONE: اكتمل حمل العمل بنجاح

يمكنك أيضًا تعيين حالات مخصصة تصف مرحلة سير العمل المحددة بشكل أدق، كما يلي.

# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path

Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py

from qiskit_serverless import update_status, Job

## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)

## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)

## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)

## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)

## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py

بعد اكتمال حمل العمل بنجاح (باستخدام save_result())، سيتم تحديث هذه الحالة إلى DONE تلقائيًا.

سير العمل المتوازي

بالنسبة للمهام الكلاسيكية القابلة للتوازي، استخدم مزيّن @distribute_task لتحديد متطلبات الحوسبة اللازمة لتنفيذ المهمة. ابدأ باسترجاع مثال transpile_remote.py من موضوع كتابة أول برنامج Qiskit Serverless بالكود التالي.

الكود التالي يتطلب منك أن تكون قد حفظت بيانات اعتمادك مسبقًا.

%%writefile ./source_files/transpile_remote.py

from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task

service = QiskitRuntimeService()

@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py

في هذا المثال، قمت بتزيين دالة transpile_remote() بـ @distribute_task(target={"cpu": 1}). عند تشغيله، يُنشئ هذا مهمة عامل متوازية غير متزامنة بنواة CPU واحدة، ويعيد مرجعًا لتتبع العامل. لجلب النتيجة، مرّر المرجع إلى دالة get(). يمكننا استخدام هذا لتشغيل مهام متوازية متعددة:

%%writefile --append ./source_files/transpile_remote.py

from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job

# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation

update_status(Job.OPTIMIZING_HARDWARE)

start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]

transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata

result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}

save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.

def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid

title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")

test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

استكشاف إعدادات المهام المختلفة

يمكنك تخصيص وحدات CPU وGPU والذاكرة لمهامك بمرونة عبر @distribute_task(). في Qiskit Serverless على منصة IBM Quantum®، كل برنامج مزوّد بـ 16 نواة CPU و32 جيجابايت RAM، يمكن تخصيصها ديناميكيًا حسب الحاجة.

يمكن تخصيص أنوية CPU بشكل كامل، أو حتى بتخصيصات جزئية، كما هو موضح فيما يلي.

يتم تخصيص الذاكرة بعدد البايتات. تذكّر أن في الكيلوبايت 1024 بايت، وفي الميغابايت 1024 كيلوبايت، وفي الجيغابايت 1024 ميغابايت. لتخصيص 2 جيغابايت من الذاكرة للعامل، تحتاج إلى تخصيص "mem": 2 * 1024 * 1024 * 1024.

%%writefile --append ./source_files/transpile_remote.py

@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

إدارة البيانات عبر برنامجك

يتيح لك Qiskit Serverless إدارة الملفات في مجلد /data عبر جميع برامجك. يشمل ذلك عدة قيود:

  • يُدعم حاليًا ملفات tar وh5 فقط
  • هذا مجرد تخزين مسطّح في /data، ولا يمكن إنشاء مجلدات فرعية كـ /data/folder/

يوضح ما يلي كيفية رفع الملفات. تأكد من أنك قمت بالمصادقة على Qiskit Serverless بحساب IBM Quantum الخاص بك (راجع النشر على منصة IBM Quantum للحصول على التعليمات).

import tarfile
from qiskit_serverless import IBMServerlessClient

# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()

# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)

# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'

بعد ذلك، يمكنك سرد جميع الملفات في مجلد data الخاص بك. هذه البيانات متاحة لجميع البرامج.

serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']

يمكن إنجاز ذلك من برنامج باستخدام file_download() لتنزيل الملف إلى بيئة البرنامج، ثم فك ضغط ملف tar.

%%writefile ./source_files/extract_tarfile.py

import tarfile
from qiskit_serverless import IBMServerlessClient

serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)

with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()

عند هذه النقطة، يمكن لبرنامجك التفاعل مع الملفات، كما تفعل في تجربة محلية. يمكن استدعاء file_upload() وfile_download() وfile_delete() من تجربتك المحلية أو من برنامجك المرفوع، لإدارة بيانات متسقة ومرنة.

الخطوات التالية

توصيات