

要点总结
Walrus 是一个构建在 Sui 区块链上的去中心化 Blob 存储协议。如果你是 Python 开发者,walrus-python 提供了原生 SDK,让你无需区块链经验,也能轻松存储、检索和管理数据。
🌟 walrus-python:https://github.com/standard-crypto/walrus-python
本教程将带你完整了解这个 SDK:设置客户端、上传第一个 Blob、理解 Walrus 的返回格式、多种方式读取数据、处理大文件流,以及构建适用于生产环境的错误处理机制。学完后,你将拥有一个可以在去中心化存储上保存和读取文件的 Python 脚本。
如果你在学习过程中遇到问题,欢迎加入 Walrus Discord,并在 #developer 频道提问。
🌟 Discord:https://discord.com/invite/walrusprotocol
你将学到
如何安装并配置 walrus-python
使用字节、文件和流上传 Blob
理解 Blob ID 与 Object ID 的区别
多种检索方式:内存、文件、流、仅元数据
配置存储时长、可删除性与对象归属权
适用于生产环境的错误处理模式
构建一个基于 Walrus 的实用文件备份工具
前置条件
Python >= 3.8
pip(或你常用的包管理器)
对 Python 的 requests 库有基础了解(有帮助但不是必须)
无需区块链钱包、SUI 代币或 Sui 使用经验。该 SDK 通过公共 Publisher 和 Aggregator HTTP 端点工作——在测试网上,存储是免费的。
1. 安装
pip install walrus-python就这么简单,唯一依赖是 requests。没有原生二进制文件,没有 Rust FFI,不需要区块链 SDK。只要能 pip install,你就可以开始。
验证安装:
from walrus import WalrusClient, WalrusAPIErrorprint("walrus-python ready")
2. 设置客户端
要与 Walrus 交互,你需要两个端点:
Publisher:接收你的数据、performs erasure 编码、分发到存储节点,并处理链上交易。可以理解为“写入端点”。
Aggregator:从存储节点重新组装你的数据。可以理解为“读取端点”。
Walrus 提供免费的测试网公共端点:
from walrus import WalrusClientclient = WalrusClient(publisher_base_url="https://publisher.walrus-testnet.walrus.space",aggregator_base_url="https://aggregator.walrus-testnet.walrus.space",)
timeout 参数(默认 30 秒)控制 HTTP 请求等待时间。对于大文件上传,建议增加超时时间:
# For uploading files >1MB, give it more roomclient = WalrusClient(publisher_base_url="https://publisher.walrus-testnet.walrus.space",aggregator_base_url="https://aggregator.walrus-testnet.walrus.space",timeout=120,)
测试网 vs 主网:Walrus 主网已于 2025 年 3 月上线。本教程使用免费测试网。在生产环境中,你只需替换为主网的 Publisher / Aggregator URL。
具体公共服务列表可参考 Walrus 官方文档(https://docs.wal.app/)。
3. 第一次上传
将一段字节存入去中心化存储:
from walrus import WalrusClientclient = WalrusClient(publisher_base_url="https://publisher.walrus-testnet.walrus.space",aggregator_base_url="https://aggregator.walrus-testnet.walrus.space",)data = b"Hello from Python on Walrus!"response = client.put_blob(data=data)print(response)
运行然后你将得到 JSON 返回结果大致如下:
{"newlyCreated": {"blobObject": {"id": "0x1a2b3c...","blobId": "Abc123...","size": 28,"encodingType": "RedStuff",...},"cost": {"storageCost": 1000000,...}}}
你的数据已经分布式存储到去中心化网络中。无需账户、API Key、无需配置 S3 Bucket。
让我们拆分看一下发生了什么。
4. 理解返回结果
上传返回包含后续读取数据所需的一切信息。
Blob ID (blobId) :内容寻址 ID,由数据本身生成,相同字节始终产生相同 Blob ID。无论谁上传、何时上传都一样,这是你最常用的读取标识。
Object ID (id) :Sui 链上的存储对象,每次上传都会生成新的 Object ID。即使上传相同数据两次,也会创建不同对象,适用于引用具体存储交易。
该返回结果也会告诉你 blob 是否新建,或已经存在于网络中:
response = client.put_blob(data=b"Hello from Python on Walrus!")if "newlyCreated" in response:blob_obj = response["newlyCreated"]["blobObject"]print(f"New blob stored!")print(f" Blob ID: {blob_obj['blobId']}")print(f" Object ID: {blob_obj['id']}")elif "alreadyCertified" in response:blob_obj = response["alreadyCertified"]["blobObject"]print(f"Blob already exists on the network.")print(f" Blob ID: {blob_obj['blobId']}")
去重存储是 Walrus 的核心特性。如果网络上已有完全相同的数据,你无需再次支付存储成本。 alreadyCertified 的返回确认数据已经可用,并给你 blob ID 以读取。
5. 读取 Blob
现在让我们检索数据。SDK 提供四种读取方式,取决于你的需求。
通过 Blob ID(最常见)
返回内存中的原始字节:
blob_id = "Abc123..." # from your upload responsecontent = client.get_blob(blob_id)print(content) # b"Hello from Python on Walrus!"
通过 Object ID
结果相同,但查找键不同。当您存储的是 Sui 对象引用而不是 blob ID 时,此功能非常有用:
object_id = "0x1a2b3c..." # from your upload responsecontent = client.get_blob_by_object_id(object_id)print(content)
直接存储为文件
对于较大的数据块,可以跳过将其全部保存在内存中这一步骤:
client.get_blob_as_file(blob_id, "downloaded_image.jpg")print("Saved to downloaded_image.jpg")
内部以 8KB 分块流式下载。内存占用恒定。
作为流传输
为了获得最大程度的控制,您可以将数据通过管道传输到任何需要的地方:
stream = client.get_blob_as_stream(blob_id)with open("output.bin", "wb") as f:f.write(stream.read())
这将返回一个类似文件的对象(原始的 urllib3 响应),因此您可以将其与任何接受流的程序集成。
仅获取元数据
检查 blob 是否存在,并在不下载其内容的情况下检查其标头:
metadata = client.get_blob_metadata(blob_id)print(metadata)# {'content-length': '28', 'content-type': 'application/octet-stream', 'etag': 'Abc123...', ...}
etag 标头与 Blob ID 一致。一个快速的方法用于完整性校验。元数据调用使用 HTTP HEAD 请求,因此非常轻量级。
6. 上传文件
从文件路径上传文件只需一行代码:
response = client.put_blob_from_file("photo.jpg")blob_id = response["newlyCreated"]["blobObject"]["blobId"]print(f"Stored photo.jpg as blob: {blob_id}")
put_blob_from_file 方法的底层逻辑是将整个文件读取到内存中并发送。对于几十MB以下的文件,这种方法完全没问题。对于更大的文件,请使用下文第七部分提到的流传输方法。
如果路径不存在,该方法会抛出 FileNotFoundError 异常,因此您可以自然地处理它:
try:response = client.put_blob_from_file("missing.txt")except FileNotFoundError:print("File doesn't exist!")
7. 流式上传
对于从其他来源(HTTP 下载、数据库导出、子进程管道)提取的大型文件或数据,您可以直接从流上传,而无需将所有内容加载到内存中:
import requests as http_requestsfrom walrus import WalrusClientclient = WalrusClient(publisher_base_url="https://publisher.walrus-testnet.walrus.space",aggregator_base_url="https://aggregator.walrus-testnet.walrus.space",timeout=120,)# Stream a file from the web directly into Walrusurl = "https://example.com/large-dataset.csv"with http_requests.get(url, stream=True) as r:result = client.put_blob_from_stream(r.raw)print(result)
您还可以使用 BytesIO 处理内存流——这在测试或动态组装数据时非常有用:
from io import BytesIObuffer = BytesIO()buffer.write(b"dynamically assembled data...")buffer.seek(0) # rewind before uploadingresponse = client.put_blob_from_stream(buffer)
重要提示:上传数据前,务必先对 BytesIO 执行 seek(0) 操作。如果当前位置已到达缓冲区末尾,SDK 将上传零字节。
8. 上传选项
所有上传方法(put_blob、put_blob_from_file、put_blob_from_stream)都接受相同的可选参数:
epochs - 存储时长
控制 blob 在网络上保留的时间。每个 epoch 是 Walrus 协议定义的固定时间段。默认值为 1(比当前 epoch 提前一个 epoch)。
# Store for 5 epochsresponse = client.put_blob(data=b"keep this longer", epochs=5)
更高的 epoch 计数会增加成本,但可以延长数据的可用性。
deletable - 可变存储
默认情况下,blob 是永久性的——一旦存储,它们就会一直存在,直到其 epoch 过期。设置 deletable=True 可以创建一个可以提前删除的 blob:
response = client.put_blob(data=b"temporary data", deletable=True)可删除 Blob 对象适用于暂存数据、草稿内容,或任何需要在存储期限结束前删除的内容。
send_object_to - 转移所有权
将生成的 Sui Blob 对象发送到指定地址。当构建应用程序时,如果存储资源的所有权应归用户(而非发布者)所有,则此方法非常有用。
response = client.put_blob(data=b"user's data",send_object_to="0xUSER_SUI_ADDRESS...")
encoding_type- 编码格式
指定编码方案。实际上,您很少需要设置此项——默认值(RedStuff)即可满足需求。
response = client.put_blob(data=payload,epochs=3,deletable=True,send_object_to="0x9f8e7d...")
9. 错误处理
SDK 会针对任何 API 级别的故障引发 WalrusAPIError 异常,其中包含可供检查的结构化字段:
from walrus import WalrusClient, WalrusAPIErrorclient = WalrusClient(publisher_base_url="https://publisher.walrus-testnet.walrus.space",aggregator_base_url="https://aggregator.walrus-testnet.walrus.space",)try:client.get_blob("nonexistent-blob-id")except WalrusAPIError as e:print(f"Code: {e.code}") # 404print(f"Status: {e.status}") # "NOT_FOUND"print(f"Message: {e.message}") # "Blob not found"print(f"Details: {e.details}") # []
WalrusAPIError 是 requests.exceptions.RequestException 的子类,因这意味着如果你的代码中已经存在较宽泛的 except RequestException 异常处理逻辑,它同样可以捕获 Walrus 抛出的错误。
对于网络级别的故障(超时、DNS 解析失败、连接被拒绝),SDK 会将它们包装在 WalrusAPIError 中,并返回 500 错误代码。
bad_client = WalrusClient(publisher_base_url="https://nonexistent.example.com",aggregator_base_url="https://nonexistent.example.com",)try:bad_client.get_blob("some-id")except WalrusAPIError as e:print(e.code) # 500print(e.status) # "REQUEST_FAILED"
生产环境模式
以下是如何在生产环境中封装 Walrus 操作:
import loggingfrom walrus import WalrusClient, WalrusAPIErrorlogger = logging.getLogger(__name__)def store_blob(client: WalrusClient, data: bytes, retries: int = 3) -> str:"""Store a blob and return its blob ID, with retries."""for attempt in range(1, retries + 1):try:response = client.put_blob(data=data, deletable=True)if "newlyCreated" in response:blob_id = response["newlyCreated"]["blobObject"]["blobId"]elif "alreadyCertified" in response:blob_id = response["alreadyCertified"]["blobObject"]["blobId"]else:raise ValueError(f"Unexpected response format: {response}")logger.info(f"Stored blob {blob_id} on attempt {attempt}")return blob_idexcept WalrusAPIError as e:logger.warning(f"Attempt {attempt} failed: {e}")if attempt == retries:raiseraise RuntimeError("Unreachable")
10. 整合实践:文件备份脚本
让我们构建一些实际的东西:一个将目录备份到 Walrus 并保存清单以便以后恢复的脚本。
"""backup_to_walrus.py — Back up a directory to Walrus decentralized storage."""import jsonimport osimport sysfrom datetime import datetime, timezonefrom walrus import WalrusClient, WalrusAPIErrordef backup_directory(directory: str, client: WalrusClient) -> dict:"""Walk a directory, upload each file, return a manifest."""manifest = {"backup_timestamp": datetime.now(timezone.utc).isoformat(),"source_directory": os.path.abspath(directory),"files": [],}for root, _, files in os.walk(directory):for filename in files:filepath = os.path.join(root, filename)relative_path = os.path.relpath(filepath, directory)file_size = os.path.getsize(filepath)print(f" Uploading {relative_path} ({file_size:,} bytes)...", end=" ")try:response = client.put_blob_from_file(filepath, epochs=5)if "newlyCreated" in response:blob_obj = response["newlyCreated"]["blobObject"]status = "new"elif "alreadyCertified" in response:blob_obj = response["alreadyCertified"]["blobObject"]status = "deduplicated"else:print("UNKNOWN RESPONSE")continuemanifest["files"].append({"path": relative_path,"blob_id": blob_obj["blobId"],"object_id": blob_obj.get("id", ""),"size": file_size,"status": status,})print(f"OK ({status})")except WalrusAPIError as e:print(f"FAILED ({e})")manifest["files"].append({"path": relative_path,"error": str(e),"size": file_size,})return manifestdef restore_from_manifest(manifest_path: str, output_dir: str, client: WalrusClient):"""Restore files from a Walrus backup manifest."""with open(manifest_path) as f:manifest = json.load(f)print(f"Restoring {len(manifest['files'])} files to {output_dir}/")for entry in manifest["files"]:if "error" in entry:print(f" Skipping {entry['path']} (failed during backup)")continueoutput_path = os.path.join(output_dir, entry["path"])os.makedirs(os.path.dirname(output_path), exist_ok=True)print(f" Downloading {entry['path']}...", end=" ")try:client.get_blob_as_file(entry["blob_id"], output_path)print("OK")except WalrusAPIError as e:print(f"FAILED ({e})")if __name__ == "__main__":client = WalrusClient(publisher_base_url="https://publisher.walrus-testnet.walrus.space",aggregator_base_url="https://aggregator.walrus-testnet.walrus.space",timeout=120,)if len(sys.argv) < 3:print("Usage:")print(" python backup_to_walrus.py backup <directory>")print(" python backup_to_walrus.py restore <manifest.json> <output_dir>")sys.exit(1)command = sys.argv[1]if command == "backup":directory = sys.argv[2]print(f"Backing up {directory} to Walrus...")manifest = backup_directory(directory, client)manifest_path = f"walrus_backup_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"with open(manifest_path, "w") as f:json.dump(manifest, f, indent=2)succeeded = sum(1 for f in manifest["files"] if "blob_id" in f)failed = sum(1 for f in manifest["files"] if "error" in f)print(f"\nDone. {succeeded} files backed up, {failed} failed.")print(f"Manifest saved to {manifest_path}")elif command == "restore":manifest_path = sys.argv[2]output_dir = sys.argv[3] if len(sys.argv) > 3 else "restored"restore_from_manifest(manifest_path, output_dir, client)print("\nRestore complete.")
运行:
# Back up a directorypython backup_to_walrus.py backup ./my-project# Restore from the manifestpython backup_to_walrus.py restore walrus_backup_20260414_120000.json ./restored
这在实际环境中展示了完整的 SDK 功能:文件上传、错误处理、blob 检索到文件以及 newcreated / alreadyCertified 响应模式。
快速参考:SDK 方法

所有上传方法都接受以下可选参数:encoding_type、epochs、deletable、send_object_to。
下一步是什么?
walrus-python on PyPI — 包含版本历史记录的软件包页面;
GitHub Repository — 源代码、问题和贡献;
Walrus Documentation — 协议级文档,涵盖纠删码、epoch和代币经济学;
Walrus HTTP API Spec — 任何聚合器提供的交互式 API 文档;
Awesome Walrus — 社区精选的工具、库和应用。

Sui是基于第一原理重新设计和构建而成的L1公有链,旨在为创作者和开发者提供能够承载Web3中下一个十亿用户的开发平台。Sui上的应用基于Move智能合约语言,并具有水平可扩展性,让开发者能够快速且低成本支持广泛的应用开发。
获取更多信息:
https://linktr.ee/sui_apac
关于Sui Network
获取更多信息
官方网站: https://sui.io
Discord: https://discord.com/invite/sui
中文Twitter: https://twitter.com/SuiNetworkCN
中文Medium: https://medium.com/sui-network-cn
中文电报群: https://t.me/Sui_Blockchain_Chinese
↙️「阅读原文」了解更多