1. SDK 选型与环境准备
1.1 SDK 选型建议
由于 MinIO 完全兼容 S3 API,开发者面临两种 SDK 选择:
AWS 官方 SDK(推荐用于大多数场景):
- 覆盖语言最广(Python boto3、Java SDK v2、Go SDK、JS SDK、.NET SDK、Ruby SDK 等)
- 社区文档极为丰富,遇到问题容易查找
- 与 AWS 生态无缝互通,业务代码在 MinIO 和 AWS S3 之间迁移几乎零成本
- 支持自定义 endpoint_url,直接对接 MinIO
MinIO 原生 SDK(推荐用于 Go 语言或需要 MinIO 扩展特性):
- Go 语言版本(github.com/minio/minio-go)是 MinIO 官方维护的高性能客户端
- 支持 MinIO 特有的管理 API(桶通知、分层存储管理等)
- Python 版本(minio)提供更简洁的 API,但功能覆盖不如 boto3 全面
| 语言 | 推荐 SDK | 包名 |
|---|
| Python | AWS boto3 | boto3 |
| Java | AWS SDK v2 | software.amazon.awssdk:s3 |
| Go | MinIO 原生 SDK | github.com/minio/minio-go/v7 |
| JavaScript/Node.js | AWS SDK v3 | @aws-sdk/client-s3 |
| .NET/C# | AWS SDK | AWSSDK.S3 |
1.2 本章环境约定
本章所有示例统一使用以下环境:
MinIO 地址:http://127.0.0.1:9000
Access Key:minioadmin
Secret Key:minioadmin123
Region:us-east-1(MinIO 忽略 region,填任意值即可)
测试 Bucket:demo-bucket
1.3 快速启动测试环境
# 使用 Docker 启动 MinIO 测试环境
docker run -d \
--name minio-dev \
-p 9000:9000 \
-p 9001:9001 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin123 \
minio/minio server /data --console-address ":9001"
# 创建测试 Bucket
docker exec minio-dev \
mc alias set local http://127.0.0.1:9000 minioadmin minioadmin123 && \
mc mb local/demo-bucket
2. Python boto3 完整操作指南
2.1 安装与客户端初始化
客户端初始化(两种方式):
import boto3
from botocore.config import Config
# 方式一:使用 client(低级 API,功能最全)
s3_client = boto3.client(
's3',
endpoint_url='http://127.0.0.1:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin123',
region_name='us-east-1',
config=Config(
signature_version='s3v4', # MinIO 要求 SigV4 签名
retries={
'max_attempts': 3,
'mode': 'adaptive'
},
connect_timeout=10,
read_timeout=60,
max_pool_connections=50 # 连接池大小,高并发场景调大
)
)
# 方式二:使用 resource(高级 ORM 风格 API,部分功能不支持)
s3_resource = boto3.resource(
's3',
endpoint_url='http://127.0.0.1:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin123',
region_name='us-east-1',
config=Config(signature_version='s3v4')
)
使用环境变量或配置文件管理凭据(推荐生产用法):
# 设置环境变量(应用启动前注入)
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin123
export AWS_DEFAULT_REGION=us-east-1
export MINIO_ENDPOINT=http://127.0.0.1:9000
import os
import boto3
from botocore.config import Config
s3_client = boto3.client(
's3',
endpoint_url=os.environ['MINIO_ENDPOINT'],
config=Config(signature_version='s3v4')
# 凭据自动从环境变量读取
)
2.2 Bucket 操作
# ── 创建 Bucket ──────────────────────────────────────────────────────
s3_client.create_bucket(Bucket='my-new-bucket')
# 注意:MinIO 不强制 region,但 boto3 在非 us-east-1 时需要传 LocationConstraint
# 统一传 us-east-1 可避免该问题
s3_client.create_bucket(
Bucket='my-new-bucket',
CreateBucketConfiguration={'LocationConstraint': 'us-east-1'}
)
# ── 列出所有 Bucket ──────────────────────────────────────────────────
response = s3_client.list_buckets()
for bucket in response['Buckets']:
print(f"Bucket: {bucket['Name']}, Created: {bucket['CreationDate']}")
# ── 检查 Bucket 是否存在 ─────────────────────────────────────────────
from botocore.exceptions import ClientError
def bucket_exists(client, bucket_name):
try:
client.head_bucket(Bucket=bucket_name)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
raise
print(bucket_exists(s3_client, 'demo-bucket'))
# ── 删除 Bucket(须为空)─────────────────────────────────────────────
s3_client.delete_bucket(Bucket='empty-bucket')
2.3 对象上传
# ── 上传字符串/字节内容 ───────────────────────────────────────────────
s3_client.put_object(
Bucket='demo-bucket',
Key='hello.txt',
Body='Hello, MinIO!'.encode('utf-8'),
ContentType='text/plain',
Metadata={ # 用户自定义元数据
'author': 'alice',
'project': 'demo'
}
)
# ── 上传本地文件 ─────────────────────────────────────────────────────
# 方式一:upload_file(推荐,自动选择分片上传)
s3_client.upload_file(
Filename='/local/path/document.pdf',
Bucket='demo-bucket',
Key='documents/document.pdf',
ExtraArgs={
'ContentType': 'application/pdf',
'StorageClass': 'STANDARD',
'Metadata': {'department': 'finance'}
}
)
# 方式二:upload_fileobj(从文件对象上传,适合流式场景)
with open('/local/path/image.jpg', 'rb') as f:
s3_client.upload_fileobj(
Fileobj=f,
Bucket='demo-bucket',
Key='images/photo.jpg',
ExtraArgs={'ContentType': 'image/jpeg'}
)
# ── 带上传进度回调 ───────────────────────────────────────────────────
import os
class ProgressPercentage:
def __init__(self, filename):
self._filename = filename
self._size = os.path.getsize(filename)
self._seen_so_far = 0
def __call__(self, bytes_amount):
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
print(f'\r{self._filename}: {self._seen_so_far}/{self._size} '
f'({percentage:.1f}%)', end='', flush=True)
s3_client.upload_file(
'/local/path/large-file.zip',
'demo-bucket',
'archives/large-file.zip',
Callback=ProgressPercentage('/local/path/large-file.zip')
)
2.4 对象下载
# ── 下载到本地文件 ───────────────────────────────────────────────────
s3_client.download_file(
Bucket='demo-bucket',
Key='documents/document.pdf',
Filename='/local/download/document.pdf'
)
# ── 下载到文件对象 ───────────────────────────────────────────────────
with open('/local/download/image.jpg', 'wb') as f:
s3_client.download_fileobj(
Bucket='demo-bucket',
Key='images/photo.jpg',
Fileobj=f
)
# ── 获取对象内容到内存(小文件)─────────────────────────────────────
response = s3_client.get_object(
Bucket='demo-bucket',
Key='hello.txt'
)
content = response['Body'].read().decode('utf-8')
content_type = response['ContentType']
metadata = response['Metadata']
print(f'Content: {content}')
print(f'Metadata: {metadata}')
# ── 流式读取大文件(避免内存溢出)───────────────────────────────────
response = s3_client.get_object(Bucket='demo-bucket', Key='large-file.zip')
with open('/local/path/large-file.zip', 'wb') as f:
# 每次读取 8MB 块
for chunk in response['Body'].iter_chunks(chunk_size=8 * 1024 * 1024):
f.write(chunk)
# ── 范围读取(只下载文件的一部分)───────────────────────────────────
response = s3_client.get_object(
Bucket='demo-bucket',
Key='large-file.zip',
Range='bytes=0-1023' # 只读取前 1024 字节
)
partial_content = response['Body'].read()
2.5 对象元数据与查询
# ── 获取对象元数据(不下载内容)─────────────────────────────────────
response = s3_client.head_object(
Bucket='demo-bucket',
Key='documents/document.pdf'
)
print(f"Size: {response['ContentLength']} bytes")
print(f"ETag: {response['ETag']}")
print(f"Last-Modified: {response['LastModified']}")
print(f"Content-Type: {response['ContentType']}")
print(f"Custom Metadata: {response['Metadata']}")
# ── 列出 Bucket 中的对象 ─────────────────────────────────────────────
# 基本列出(最多 1000 个)
response = s3_client.list_objects_v2(
Bucket='demo-bucket',
Prefix='documents/', # 前缀过滤
Delimiter='/', # 使用分隔符模拟目录
MaxKeys=100
)
for obj in response.get('Contents', []):
print(f"Key: {obj['Key']}, Size: {obj['Size']}, "
f"Modified: {obj['LastModified']}")
# 列出"虚拟目录"
for prefix in response.get('CommonPrefixes', []):
print(f"Dir: {prefix['Prefix']}")
# ── 分页列出所有对象(超过 1000 个时必须分页)──────────────────────
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket='demo-bucket',
Prefix='logs/',
PaginationConfig={'PageSize': 1000}
)
total_size = 0
total_count = 0
for page in pages:
for obj in page.get('Contents', []):
total_size += obj['Size']
total_count += 1
print(f"Total objects: {total_count}, Total size: {total_size / 1024 / 1024:.2f} MB")
2.6 对象删除与复制
# ── 删除单个对象 ─────────────────────────────────────────────────────
s3_client.delete_object(
Bucket='demo-bucket',
Key='documents/old-file.pdf'
)
# ── 删除指定版本 ─────────────────────────────────────────────────────
s3_client.delete_object(
Bucket='demo-bucket',
Key='documents/file.pdf',
VersionId='v1abc123'
)
# ── 批量删除(一次最多 1000 个)─────────────────────────────────────
response = s3_client.delete_objects(
Bucket='demo-bucket',
Delete={
'Objects': [
{'Key': 'temp/file1.txt'},
{'Key': 'temp/file2.txt'},
{'Key': 'temp/file3.txt', 'VersionId': 'v2xyz'}
],
'Quiet': True # True: 只返回失败的删除;False: 返回所有结果
}
)
# 检查是否有删除失败的对象
for error in response.get('Errors', []):
print(f"Failed to delete {error['Key']}: {error['Message']}")
# ── 服务端复制对象(不经过本地,直接在 MinIO 内部复制)─────────────
s3_client.copy_object(
CopySource={'Bucket': 'demo-bucket', 'Key': 'source/file.pdf'},
Bucket='demo-bucket',
Key='destination/file-copy.pdf',
MetadataDirective='COPY' # COPY: 复制原元数据;REPLACE: 使用新元数据
)
# 跨 Bucket 复制
s3_client.copy_object(
CopySource={'Bucket': 'source-bucket', 'Key': 'file.pdf'},
Bucket='destination-bucket',
Key='file.pdf'
)
2.7 对象标签管理
# ── 为对象设置标签 ───────────────────────────────────────────────────
s3_client.put_object_tagging(
Bucket='demo-bucket',
Key='reports/q4-report.pdf',
Tagging={
'TagSet': [
{'Key': 'department', 'Value': 'finance'},
{'Key': 'quarter', 'Value': 'Q4-2024'},
{'Key': 'status', 'Value': 'final'}
]
}
)
# ── 获取对象标签 ─────────────────────────────────────────────────────
response = s3_client.get_object_tagging(
Bucket='demo-bucket',
Key='reports/q4-report.pdf'
)
tags = {tag['Key']: tag['Value'] for tag in response['TagSet']}
print(tags) # {'department': 'finance', 'quarter': 'Q4-2024', 'status': 'final'}
# ── 删除对象标签 ─────────────────────────────────────────────────────
s3_client.delete_object_tagging(
Bucket='demo-bucket',
Key='reports/q4-report.pdf'
)
3. Python 进阶:批量操作与并发处理
3.1 使用 TransferManager 进行高性能传输
boto3 内置的 S3Transfer 管理器(通过 upload_file/download_file 隐式使用)支持自动分片、并发和重试,通过 TransferConfig 可以精细调整:
from boto3.s3.transfer import TransferConfig
# 自定义传输配置
transfer_config = TransferConfig(
multipart_threshold=64 * 1024 * 1024, # 大于 64MB 自动启用分片上传
multipart_chunksize=64 * 1024 * 1024, # 每个分片大小 64MB
max_concurrency=10, # 最大并发分片数
use_threads=True, # 使用多线程
max_io_queue=100 # I/O 队列深度
)
# 使用自定义配置上传
s3_client.upload_file(
'/local/path/huge-file.tar.gz',
'demo-bucket',
'archives/huge-file.tar.gz',
Config=transfer_config
)
3.2 并发批量上传目录
import os
import concurrent.futures
from pathlib import Path
def upload_directory(client, local_dir, bucket, prefix='', max_workers=10):
"""
并发上传本地目录到 MinIO,返回成功和失败的文件列表
"""
local_path = Path(local_dir)
files_to_upload = list(local_path.rglob('*'))
files_to_upload = [f for f in files_to_upload if f.is_file()]
success = []
failures = []
def upload_single(file_path):
# 计算相对路径,拼接 S3 Key
relative = file_path.relative_to(local_path)
key = f"{prefix}/{relative}".lstrip('/') if prefix else str(relative)
# 统一使用正斜杠(Windows 兼容)
key = key.replace('\\', '/')
try:
client.upload_file(str(file_path), bucket, key)
return ('success', key)
except Exception as e:
return ('failure', key, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(upload_single, f): f for f in files_to_upload}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result[0] == 'success':
success.append(result[1])
print(f"Uploaded: {result[1]}")
else:
failures.append((result[1], result[2]))
print(f"Failed: {result[1]} - {result[2]}")
return success, failures
# 调用示例
success_list, failure_list = upload_directory(
s3_client,
local_dir='/local/data/reports',
bucket='demo-bucket',
prefix='reports/2024',
max_workers=20
)
print(f"Uploaded: {len(success_list)}, Failed: {len(failure_list)}")
3.3 并发批量删除(分页 + 批量)
def delete_objects_by_prefix(client, bucket, prefix, batch_size=1000):
"""
删除指定前缀下的所有对象,使用分页 + 批量删除避免超时
"""
paginator = client.get_paginator('list_objects_v2')
total_deleted = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
objects = page.get('Contents', [])
if not objects:
continue
# 每批最多 1000 个
for i in range(0, len(objects), batch_size):
batch = objects[i:i + batch_size]
delete_keys = [{'Key': obj['Key']} for obj in batch]
response = client.delete_objects(
Bucket=bucket,
Delete={'Objects': delete_keys, 'Quiet': True}
)
errors = response.get('Errors', [])
if errors:
for err in errors:
print(f"Delete failed: {err['Key']} - {err['Message']}")
deleted_count = len(batch) - len(errors)
total_deleted += deleted_count
print(f"Deleted {deleted_count} objects (total: {total_deleted})")
return total_deleted
# 删除 logs/ 前缀下的所有对象
count = delete_objects_by_prefix(s3_client, 'demo-bucket', 'logs/')
print(f"Total deleted: {count}")
3.4 遍历所有版本并操作
def list_all_versions(client, bucket, prefix=''):
"""
分页遍历 Bucket 中所有对象版本(含删除标记)
"""
paginator = client.get_paginator('list_object_versions')
kwargs = {'Bucket': bucket}
if prefix:
kwargs['Prefix'] = prefix
for page in paginator.paginate(**kwargs):
# 当前版本和历史版本
for version in page.get('Versions', []):
yield {
'type': 'version',
'key': version['Key'],
'version_id': version['VersionId'],
'is_latest': version['IsLatest'],
'size': version['Size'],
'last_modified': version['LastModified']
}
# 删除标记
for marker in page.get('DeleteMarkers', []):
yield {
'type': 'delete_marker',
'key': marker['Key'],
'version_id': marker['VersionId'],
'is_latest': marker['IsLatest'],
'last_modified': marker['LastModified']
}
# 统计版本数量和大小
total_size = 0
version_count = 0
marker_count = 0
for item in list_all_versions(s3_client, 'demo-bucket'):
if item['type'] == 'version':
total_size += item.get('size', 0)
version_count += 1
else:
marker_count += 1
print(f"Versions: {version_count}, Markers: {marker_count}, "
f"Total size: {total_size / 1024 / 1024:.2f} MB")
4. Java AWS SDK v2 集成
4.1 Maven 依赖配置
software.amazon.awssdk
s3
2.25.0
software.amazon.awssdk.crt
aws-crt
0.29.13
software.amazon.awssdk
s3-transfer-manager
2.25.0
software.amazon.awssdk
url-connection-client
2.25.0
4.2 客户端初始化
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import java.net.URI;
import java.time.Duration;
public class MinioClientFactory {
public static S3Client createClient() {
return S3Client.builder()
// 指定 MinIO 服务地址
.endpointOverride(URI.create("http://127.0.0.1:9000"))
// 凭据
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create("minioadmin", "minioadmin123")
)
)
// Region(MinIO 不校验,填 us-east-1 即可)
.region(Region.US_EAST_1)
// 关键:关闭路径风格(MinIO 使用路径风格,非虚拟托管风格)
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build()
)
// HTTP 客户端配置
.httpClient(
UrlConnectionHttpClient.builder()
.connectionTimeout(Duration.ofSeconds(10))
.socketTimeout(Duration.ofSeconds(60))
.build()
)
.build();
}
}
关于 `pathStyleAccessEnabled(true)` 的说明:
S3 支持两种 URL 风格:
- 虚拟托管风格(Virtual-hosted-style):https://bucket.s3.amazonaws.com/key(AWS 默认)
- 路径风格(Path-style):https://s3.amazonaws.com/bucket/key
MinIO 默认使用路径风格,因此必须将 pathStyleAccessEnabled 设置为 true,否则 SDK 会将 bucket 名称拼接到域名中,导致 DNS 解析失败。
4.3 Bucket 与对象操作
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.*;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
public class MinioOperations {
private final S3Client s3;
public MinioOperations(S3Client s3) {
this.s3 = s3;
}
// ── 创建 Bucket ───────────────────────────────────────────────────
public void createBucket(String bucketName) {
s3.createBucket(CreateBucketRequest.builder()
.bucket(bucketName)
.build());
System.out.println("Bucket created: " + bucketName);
}
// ── 检查 Bucket 是否存在 ──────────────────────────────────────────
public boolean bucketExists(String bucketName) {
try {
s3.headBucket(HeadBucketRequest.builder()
.bucket(bucketName)
.build());
return true;
} catch (NoSuchBucketException e) {
return false;
}
}
// ── 上传字符串内容 ────────────────────────────────────────────────
public void putObject(String bucket, String key, String content,
Map metadata) {
s3.putObject(
PutObjectRequest.builder()
.bucket(bucket)
.key(key)
.contentType("text/plain; charset=utf-8")
.metadata(metadata)
.build(),
RequestBody.fromString(content)
);
}
// ── 上传本地文件 ──────────────────────────────────────────────────
public void uploadFile(String bucket, String key, String localFilePath) {
Path filePath = Paths.get(localFilePath);
s3.putObject(
PutObjectRequest.builder()
.bucket(bucket)
.key(key)
.build(),
RequestBody.fromFile(filePath)
);
System.out.printf("Uploaded: %s -> s3://%s/%s%n", localFilePath, bucket, key);
}
// ── 下载对象内容到字符串 ──────────────────────────────────────────
public String getObjectAsString(String bucket, String key) throws IOException {
ResponseInputStream response = s3.getObject(
GetObjectRequest.builder()
.bucket(bucket)
.key(key)
.build()
);
return new String(response.readAllBytes());
}
// ── 下载到本地文件 ────────────────────────────────────────────────
public void downloadFile(String bucket, String key, String localFilePath) {
s3.getObject(
GetObjectRequest.builder()
.bucket(bucket)
.key(key)
.build(),
Paths.get(localFilePath)
);
System.out.printf("Downloaded: s3://%s/%s -> %s%n", bucket, key, localFilePath);
}
// ── 列出对象(分页)───────────────────────────────────────────────
public void listObjects(String bucket, String prefix) {
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix)
.maxKeys(1000)
.build();
ListObjectsV2Response response;
do {
response = s3.listObjectsV2(request);
for (S3Object obj : response.contents()) {
System.out.printf("Key: %-50s Size: %d bytes%n",
obj.key(), obj.size());
}
// 处理下一页
request = request.toBuilder()
.continuationToken(response.nextContinuationToken())
.build();
} while (response.isTruncated());
}
// ── 使用 Paginator 列出(更简洁的分页方式)───────────────────────
public long countObjects(String bucket, String prefix) {
return s3.listObjectsV2Paginator(
ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix)
.build()
).contents().stream().count();
}
// ── 删除对象 ──────────────────────────────────────────────────────
public void deleteObject(String bucket, String key) {
s3.deleteObject(DeleteObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
}
// ── 批量删除对象 ──────────────────────────────────────────────────
public void deleteObjects(String bucket, List keys) {
List identifiers = keys.stream()
.map(key -> ObjectIdentifier.builder().key(key).build())
.toList();
DeleteObjectsResponse response = s3.deleteObjects(
DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(Delete.builder()
.objects(identifiers)
.quiet(true)
.build())
.build()
);
if (!response.errors().isEmpty()) {
response.errors().forEach(err ->
System.err.printf("Delete failed: %s - %s%n",
err.key(), err.message())
);
}
}
// ── 获取对象元数据 ────────────────────────────────────────────────
public HeadObjectResponse getObjectMetadata(String bucket, String key) {
return s3.headObject(HeadObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
}
// ── 对象是否存在 ──────────────────────────────────────────────────
public boolean objectExists(String bucket, String key) {
try {
s3.headObject(HeadObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
return true;
} catch (NoSuchKeyException e) {
return false;
}
}
}
5. Java 进阶:Spring Boot 场景最佳实践
5.1 MinIO 配置类
// MinioProperties.java
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "minio")
public class MinioProperties {
private String endpoint;
private String accessKey;
private String secretKey;
private String defaultBucket;
// getter / setter 省略(或使用 Lombok @Data)
}
# application.yml
minio:
endpoint: http://127.0.0.1:9000
access-key: ${MINIO_ACCESS_KEY:minioadmin}
secret-key: ${MINIO_SECRET_KEY:minioadmin123}
default-bucket: app-files
// MinioConfig.java
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import java.net.URI;
@Configuration
@EnableConfigurationProperties(MinioProperties.class)
public class MinioConfig {
@Bean
public S3Client s3Client(MinioProperties props) {
return S3Client.builder()
.endpointOverride(URI.create(props.getEndpoint()))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(props.getAccessKey(), props.getSecretKey())
))
.region(Region.US_EAST_1)
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build())
.build();
}
@Bean
public S3Presigner s3Presigner(MinioProperties props) {
return S3Presigner.builder()
.endpointOverride(URI.create(props.getEndpoint()))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(props.getAccessKey(), props.getSecretKey())
))
.region(Region.US_EAST_1)
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build())
.build();
}
}
5.2 文件服务封装(Service 层)
// FileStorageService.java
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
import software.amazon.awssdk.services.s3.presigner.model.PutObjectPresignRequest;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
@Service
public class FileStorageService {
private final S3Client s3Client;
private final S3Presigner s3Presigner;
private final MinioProperties minioProps;
public FileStorageService(S3Client s3Client, S3Presigner s3Presigner,
MinioProperties minioProps) {
this.s3Client = s3Client;
this.s3Presigner = s3Presigner;
this.minioProps = minioProps;
}
/**
* 上传 MultipartFile,返回存储后的对象 Key
*/
public String uploadFile(MultipartFile file, String folder) throws IOException {
String originalFilename = file.getOriginalFilename();
String extension = originalFilename != null
? originalFilename.substring(originalFilename.lastIndexOf('.'))
: "";
// 使用 UUID 避免文件名冲突
String key = folder + "/" + UUID.randomUUID() + extension;
s3Client.putObject(
PutObjectRequest.builder()
.bucket(minioProps.getDefaultBucket())
.key(key)
.contentType(file.getContentType())
.contentLength(file.getSize())
.build(),
RequestBody.fromInputStream(file.getInputStream(), file.getSize())
);
return key;
}
/**
* 生成对象的预签名下载 URL,有效期 1 小时
*/
public String generatePresignedDownloadUrl(String key, Duration expiry) {
GetObjectPresignRequest presignRequest = GetObjectPresignRequest.builder()
.signatureDuration(expiry)
.getObjectRequest(GetObjectRequest.builder()
.bucket(minioProps.getDefaultBucket())
.key(key)
.build())
.build();
return s3Presigner.presignGetObject(presignRequest)
.url()
.toString();
}
/**
* 生成预签名上传 URL(前端直传场景)
*/
public String generatePresignedUploadUrl(String key, String contentType,
Duration expiry) {
PutObjectPresignRequest presignRequest = PutObjectPresignRequest.builder()
.signatureDuration(expiry)
.putObjectRequest(PutObjectRequest.builder()
.bucket(minioProps.getDefaultBucket())
.key(key)
.contentType(contentType)
.build())
.build();
return s3Presigner.presignPutObject(presignRequest)
.url()
.toString();
}
/**
* 删除对象
*/
public void deleteFile(String key) {
s3Client.deleteObject(DeleteObjectRequest.builder()
.bucket(minioProps.getDefaultBucket())
.key(key)
.build());
}
}
5.3 Controller 层接口
// FileController.java
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
@RestController
@RequestMapping("/api/files")
public class FileController {
private final FileStorageService fileStorageService;
public FileController(FileStorageService fileStorageService) {
this.fileStorageService = fileStorageService;
}
/**
* 上传文件接口
* POST /api/files/upload
*/
@PostMapping("/upload")
public ResponseEntity> upload(
@RequestParam("file") MultipartFile file,
@RequestParam(value = "folder", defaultValue = "uploads") String folder)
throws IOException {
String key = fileStorageService.uploadFile(file, folder);
String downloadUrl = fileStorageService.generatePresignedDownloadUrl(
key, Duration.ofHours(1));
return ResponseEntity.ok(Map.of(
"key", key,
"downloadUrl", downloadUrl
));
}
/**
* 获取预签名下载链接
* GET /api/files/download-url?key=xxx&expireMinutes=60
*/
@GetMapping("/download-url")
public ResponseEntity> getDownloadUrl(
@RequestParam String key,
@RequestParam(defaultValue = "60") int expireMinutes) {
String url = fileStorageService.generatePresignedDownloadUrl(
key, Duration.ofMinutes(expireMinutes));
return ResponseEntity.ok(Map.of("url", url));
}
/**
* 获取前端直传的预签名上传链接
* POST /api/files/upload-url
*/
@PostMapping("/upload-url")
public ResponseEntity> getUploadUrl(
@RequestParam String key,
@RequestParam String contentType) {
String url = fileStorageService.generatePresignedUploadUrl(
key, contentType, Duration.ofMinutes(15));
return ResponseEntity.ok(Map.of("uploadUrl", url, "key", key));
}
/**
* 删除文件
* DELETE /api/files?key=xxx
*/
@DeleteMapping
public ResponseEntity delete(@RequestParam String key) {
fileStorageService.deleteFile(key);
return ResponseEntity.noContent().build();
}
}
6. Go MinIO SDK 原生操作
6.1 安装与初始化
go get github.com/minio/minio-go/v7
package main
import (
"context"
"log"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
func newMinioClient() *minio.Client {
client, err := minio.New("127.0.0.1:9000", &minio.Options{
Creds: credentials.NewStaticV4("minioadmin", "minioadmin123", ""),
Secure: false, // true 表示使用 HTTPS
Region: "us-east-1",
})
if err != nil {
log.Fatalf("Failed to create MinIO client: %v", err)
}
return client
}
6.2 Bucket 操作
package storage
import (
"context"
"fmt"
"log"
"github.com/minio/minio-go/v7"
)
// CreateBucket 创建桶,如已存在则跳过
func CreateBucket(ctx context.Context, client *minio.Client,
bucketName, region string) error {
exists, err := client.BucketExists(ctx, bucketName)
if err != nil {
return fmt.Errorf("check bucket existence: %w", err)
}
if exists {
log.Printf("Bucket %s already exists", bucketName)
return nil
}
err = client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{
Region: region,
})
if err != nil {
return fmt.Errorf("create bucket %s: %w", bucketName, err)
}
log.Printf("Bucket %s created successfully", bucketName)
return nil
}
// ListBuckets 列出所有桶
func ListBuckets(ctx context.Context, client *minio.Client) {
buckets, err := client.ListBuckets(ctx)
if err != nil {
log.Fatalf("ListBuckets: %v", err)
}
for _, bucket := range buckets {
fmt.Printf("Bucket: %-30s Created: %s\n",
bucket.Name, bucket.CreationDate)
}
}
6.3 对象上传
package storage
import (
"bytes"
"context"
"fmt"
"os"
"github.com/minio/minio-go/v7"
)
// UploadFile 上传本地文件
func UploadFile(ctx context.Context, client *minio.Client,
bucket, key, localPath, contentType string) (minio.UploadInfo, error) {
info, err := client.FPutObject(ctx, bucket, key, localPath,
minio.PutObjectOptions{
ContentType: contentType,
UserMetadata: map[string]string{
"uploaded-by": "go-sdk",
},
})
if err != nil {
return minio.UploadInfo{}, fmt.Errorf("upload file %s: %w", localPath, err)
}
return info, nil
}
// UploadBytes 上传字节切片
func UploadBytes(ctx context.Context, client *minio.Client,
bucket, key string, data []byte, contentType string) (minio.UploadInfo, error) {
reader := bytes.NewReader(data)
info, err := client.PutObject(ctx, bucket, key, reader, int64(len(data)),
minio.PutObjectOptions{
ContentType: contentType,
})
if err != nil {
return minio.UploadInfo{}, fmt.Errorf("upload bytes to %s/%s: %w",
bucket, key, err)
}
return info, nil
}
// UploadStream 上传 io.Reader 流(大文件推荐,支持未知大小)
func UploadStream(ctx context.Context, client *minio.Client,
bucket, key string, reader io.Reader, contentType string) (minio.UploadInfo, error) {
info, err := client.PutObject(ctx, bucket, key, reader, -1,
minio.PutObjectOptions{
ContentType: contentType,
// -1 表示大小未知,MinIO SDK 自动处理分片上传
PartSize: 64 * 1024 * 1024, // 每片 64MB
})
if err != nil {
return minio.UploadInfo{}, fmt.Errorf("upload stream to %s/%s: %w",
bucket, key, err)
}
return info, nil
}
6.4 对象下载与查询
// DownloadFile 下载对象到本地文件
func DownloadFile(ctx context.Context, client *minio.Client,
bucket, key, localPath string) error {
return client.FGetObject(ctx, bucket, key, localPath,
minio.GetObjectOptions{})
}
// GetObjectBytes 下载对象到内存(小文件)
func GetObjectBytes(ctx context.Context, client *minio.Client,
bucket, key string) ([]byte, error) {
obj, err := client.GetObject(ctx, bucket, key, minio.GetObjectOptions{})
if err != nil {
return nil, fmt.Errorf("get object %s/%s: %w", bucket, key, err)
}
defer obj.Close()
data, err := io.ReadAll(obj)
if err != nil {
return nil, fmt.Errorf("read object body: %w", err)
}
return data, nil
}
// StatObject 获取对象元数据
func StatObject(ctx context.Context, client *minio.Client,
bucket, key string) (minio.ObjectInfo, error) {
return client.StatObject(ctx, bucket, key, minio.StatObjectOptions{})
}
// ListObjects 列出前缀下的所有对象
func ListObjects(ctx context.Context, client *minio.Client,
bucket, prefix string, recursive bool) []minio.ObjectInfo {
objectCh := client.ListObjects(ctx, bucket, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: recursive,
})
var objects []minio.ObjectInfo
for obj := range objectCh {
if obj.Err != nil {
log.Printf("ListObjects error: %v", obj.Err)
continue
}
objects = append(objects, obj)
}
return objects
}
// RemoveObject 删除对象
func RemoveObject(ctx context.Context, client *minio.Client,
bucket, key string) error {
return client.RemoveObject(ctx, bucket, key, minio.RemoveObjectOptions{})
}
7. Go 进阶:高性能并发上传与流式处理
7.1 并发上传目录
package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"sync/atomic"
"github.com/minio/minio-go/v7"
)
// UploadDirectoryConcurrent 并发上传整个目录
func UploadDirectoryConcurrent(ctx context.Context, client *minio.Client,
localDir, bucket, prefix string, concurrency int) error {
// 收集所有待上传的文件
var files []string
err := filepath.Walk(localDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
files = append(files, path)
}
return nil
})
if err != nil {
return fmt.Errorf("walk directory: %w", err)
}
// 工作队列 channel
jobs := make(chan string, len(files))
for _, f := range files {
jobs <- f
}
close(jobs)
var (
wg sync.WaitGroup
successCount int64
failCount int64
mu sync.Mutex
errs []error
)
// 启动 worker 协程
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for localPath := range jobs {
// 计算 S3 Key
rel, _ := filepath.Rel(localDir, localPath)
key := filepath.Join(prefix, rel)
key = filepath.ToSlash(key) // Windows 兼容
_, err := client.FPutObject(ctx, bucket, key, localPath,
minio.PutObjectOptions{})
if err != nil {
atomic.AddInt64(&failCount, 1)
mu.Lock()
errs = append(errs, fmt.Errorf("upload %s: %w", localPath, err))
mu.Unlock()
log.Printf("FAIL: %s -> %s: %v", localPath, key, err)
} else {
atomic.AddInt64(&successCount, 1)
log.Printf("OK: %s -> s3://%s/%s", localPath, bucket, key)
}
}
}()
}
wg.Wait()
log.Printf("Upload complete. Success: %d, Failed: %d",
successCount, failCount)
if len(errs) > 0 {
return fmt.Errorf("%d files failed to upload, first error: %w",
len(errs), errs[0])
}
return nil
}
7.2 流式处理:边下载边处理
package main
import (
"bufio"
"compress/gzip"
"context"
"fmt"
"strings"
"github.com/minio/minio-go/v7"
)
// ProcessGzipLog 流式读取 MinIO 上的 gzip 压缩日志,统计错误行数
// 不将完整文件加载到内存,适合处理大型日志文件
func ProcessGzipLog(ctx context.Context, client *minio.Client,
bucket, key string) (int, error) {
// 获取对象流
obj, err := client.GetObject(ctx, bucket, key, minio.GetObjectOptions{})
if err != nil {
return 0, fmt.Errorf("get object: %w", err)
}
defer obj.Close()
// 创建 gzip 解压流(直接在网络流上解压,无需本地临时文件)
gzReader, err := gzip.NewReader(obj)
if err != nil {
return 0, fmt.Errorf("create gzip reader: %w", err)
}
defer gzReader.Close()
// 逐行扫描
errorCount := 0
scanner := bufio.NewScanner(gzReader)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 调大 buffer 以处理长行
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "ERROR") || strings.Contains(line, "FATAL") {
errorCount++
}
}
if err := scanner.Err(); err != nil {
return errorCount, fmt.Errorf("scan log: %w", err)
}
return errorCount, nil
}
8. 分片上传的完整实现与断点续传
8.1 分片上传流程回顾
分片上传(Multipart Upload)是处理大文件的标准机制:
1. InitiateMultipartUpload -> 获得 UploadId
2. UploadPart × N -> 并发上传各个分片,收集 ETag
3. CompleteMultipartUpload -> 提交所有分片,服务端合并
(异常时) AbortMultipartUpload -> 取消并清理
8.2 Python 断点续传实现
import os
import json
import hashlib
import math
import boto3
from botocore.config import Config
CHUNK_SIZE = 64 * 1024 * 1024 # 64MB 每片
CHECKPOINT_DIR = '/tmp/minio_checkpoints'
class ResumableUploader:
"""
支持断点续传的大文件上传器
上传中断后重新运行,会跳过已成功上传的分片
"""
def __init__(self, client, bucket):
self.client = client
self.bucket = bucket
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
def _checkpoint_path(self, local_file, key):
"""生成断点文件路径(基于文件路径和 key 的哈希)"""
ident = hashlib.md5(f"{local_file}:{key}".encode()).hexdigest()[:8]
return os.path.join(CHECKPOINT_DIR, f"upload_{ident}.json")
def _load_checkpoint(self, checkpoint_path):
if os.path.exists(checkpoint_path):
with open(checkpoint_path) as f:
return json.load(f)
return None
def _save_checkpoint(self, checkpoint_path, data):
with open(checkpoint_path, 'w') as f:
json.dump(data, f, indent=2)
def upload(self, local_file, key):
file_size = os.path.getsize(local_file)
total_parts = math.ceil(file_size / CHUNK_SIZE)
checkpoint_path = self._checkpoint_path(local_file, key)
# 尝试加载已有断点
checkpoint = self._load_checkpoint(checkpoint_path)
if checkpoint and checkpoint.get('key') == key:
upload_id = checkpoint['upload_id']
completed_parts = checkpoint['completed_parts']
print(f"Resuming upload, UploadId: {upload_id}, "
f"completed parts: {len(completed_parts)}/{total_parts}")
else:
# 发起新的分片上传
response = self.client.create_multipart_upload(
Bucket=self.bucket,
Key=key
)
upload_id = response['UploadId']
completed_parts = {}
print(f"Started new upload, UploadId: {upload_id}")
try:
with open(local_file, 'rb') as f:
for part_number in range(1, total_parts + 1):
str_part = str(part_number)
if str_part in completed_parts:
print(f"Skipping part {part_number} (already uploaded)")
continue
offset = (part_number - 1) * CHUNK_SIZE
f.seek(offset)
chunk = f.read(CHUNK_SIZE)
response = self.client.upload_part(
Bucket=self.bucket,
Key=key,
UploadId=upload_id,
PartNumber=part_number,
Body=chunk
)
completed_parts[str_part] = response['ETag']
print(f"Uploaded part {part_number}/{total_parts}")
# 每上传一片立即保存断点
self._save_checkpoint(checkpoint_path, {
'key': key,
'upload_id': upload_id,
'completed_parts': completed_parts
})
# 所有分片上传完成,提交
parts = [
{'PartNumber': int(k), 'ETag': v}
for k, v in sorted(completed_parts.items(), key=lambda x: int(x[0]))
]
self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
print(f"Upload complete: s3://{self.bucket}/{key}")
# 清理断点文件
os.remove(checkpoint_path)
except Exception as e:
print(f"Upload interrupted: {e}")
print(f"Checkpoint saved to {checkpoint_path}, "
f"restart the program to resume")
raise
# 使用示例
s3_client = boto3.client(
's3',
endpoint_url='http://127.0.0.1:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin123',
region_name='us-east-1',
config=Config(signature_version='s3v4')
)
uploader = ResumableUploader(s3_client, 'demo-bucket')
uploader.upload('/local/huge-file.tar.gz', 'archives/huge-file.tar.gz')
9. 预签名 URL 的生成与应用
9.1 预签名 URL 的应用场景
预签名 URL 是 S3/MinIO 开发中最重要的特性之一,允许在不暴露凭据的情况下授予临时访问权限:
| 场景 | 操作 | 说明 |
|---|
| 前端直接下载文件 | GET 预签名 URL | 后端生成链接,前端直接从 MinIO 下载 |
| 前端直传文件 | PUT 预签名 URL | 后端生成上传链接,前端直传 MinIO,绕过后端 |
| 分享文件给第三方 | GET 预签名 URL(带有效期) | 生成有时效的临时下载链接 |
| 邮件中嵌入附件链接 | GET 预签名 URL | 邮件中直接放下载链接 |
9.2 Python 生成预签名 URL
from datetime import timedelta
import boto3
from botocore.config import Config
s3_client = boto3.client(
's3',
endpoint_url='http://127.0.0.1:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin123',
region_name='us-east-1',
config=Config(signature_version='s3v4')
)
# ── 生成预签名下载 URL ────────────────────────────────────────────────
download_url = s3_client.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': 'demo-bucket',
'Key': 'documents/report.pdf',
'ResponseContentDisposition': 'attachment; filename="report.pdf"'
},
ExpiresIn=3600 # 有效期 1 小时(秒数)
)
print(f"Download URL (1h): {download_url}")
# ── 生成预签名上传 URL ────────────────────────────────────────────────
upload_url = s3_client.generate_presigned_url(
ClientMethod='put_object',
Params={
'Bucket': 'demo-bucket',
'Key': 'uploads/user-avatar.jpg',
'ContentType': 'image/jpeg'
},
ExpiresIn=900 # 有效期 15 分钟
)
print(f"Upload URL (15min): {upload_url}")
# 前端使用该 URL 上传文件(示例 curl 命令)
print(f"curl -X PUT -H 'Content-Type: image/jpeg' "
f"--data-binary @avatar.jpg '{upload_url}'")
9.3 Go 生成预签名 URL
package main
import (
"context"
"fmt"
"net/url"
"time"
"github.com/minio/minio-go/v7"
)
// GeneratePresignedDownloadURL 生成有时效的下载链接
func GeneratePresignedDownloadURL(ctx context.Context, client *minio.Client,
bucket, key string, expiry time.Duration) (string, error) {
// 可选:设置响应头(强制浏览器下载而非预览)
reqParams := make(url.Values)
reqParams.Set("response-content-disposition",
fmt.Sprintf(`attachment; filename="%s"`, key))
presignedURL, err := client.PresignedGetObject(
ctx, bucket, key, expiry, reqParams)
if err != nil {
return "", fmt.Errorf("presign get object: %w", err)
}
return presignedURL.String(), nil
}
// GeneratePresignedUploadURL 生成有时效的上传链接
func GeneratePresignedUploadURL(ctx context.Context, client *minio.Client,
bucket, key string, expiry time.Duration) (string, error) {
presignedURL, err := client.PresignedPutObject(ctx, bucket, key, expiry)
if err != nil {
return "", fmt.Errorf("presign put object: %w", err)
}
return presignedURL.String(), nil
}
// 使用示例
func main() {
client := newMinioClient()
ctx := context.Background()
downloadURL, err := GeneratePresignedDownloadURL(
ctx, client, "demo-bucket", "reports/q4.pdf", time.Hour)
if err != nil {
panic(err)
}
fmt.Println("Download URL:", downloadURL)
uploadURL, err := GeneratePresignedUploadURL(
ctx, client, "demo-bucket", "uploads/photo.jpg", 15*time.Minute)
if err != nil {
panic(err)
}
fmt.Println("Upload URL:", uploadURL)
}
10. 错误处理、重试机制与超时配置
10.1 Python 错误处理
from botocore.exceptions import (
ClientError, NoCredentialsError, EndpointResolutionError,
ConnectTimeoutError, ReadTimeoutError
)
def safe_upload(client, local_path, bucket, key, max_retries=3):
"""带重试的安全上传"""
for attempt in range(max_retries):
try:
client.upload_file(local_path, bucket, key)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
if error_code == 'NoSuchBucket':
# Bucket 不存在,不重试
raise RuntimeError(f"Bucket {bucket} does not exist") from e
elif error_code in ('SlowDown', 'ServiceUnavailable'):
# 服务端限流,等待后重试
wait_time = 2 ** attempt
print(f"Rate limited, retrying in {wait_time}s "
f"(attempt {attempt+1}/{max_retries})")
import time
time.sleep(wait_time)
elif error_code == 'AccessDenied':
# 权限不足,不重试
raise PermissionError(f"Access denied to {bucket}/{key}") from e
else:
if attempt == max_retries - 1:
raise
print(f"ClientError {error_code}: {error_msg}, "
f"retrying ({attempt+1}/{max_retries})")
except (ConnectTimeoutError, ReadTimeoutError) as e:
# 超时重试
if attempt == max_retries - 1:
raise
print(f"Timeout, retrying ({attempt+1}/{max_retries}): {e}")
except NoCredentialsError:
# 凭据问题,不重试
raise RuntimeError("No valid credentials configured") from None
return False
def get_s3_error_code(e: ClientError) -> str:
"""提取 S3 错误码"""
return e.response['Error']['Code']
def object_exists_safe(client, bucket, key) -> bool:
"""安全检查对象是否存在,区分不存在和权限错误"""
try:
client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
code = get_s3_error_code(e)
if code in ('404', 'NoSuchKey'):
return False
elif code == '403':
raise PermissionError(f"No permission to access {bucket}/{key}")
raise
10.2 Go 错误处理
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/minio/minio-go/v7"
)
// IsNotFound 判断是否为对象不存在错误
func IsNotFound(err error) bool {
var respErr minio.ErrorResponse
if errors.As(err, &respErr) {
return respErr.Code == "NoSuchKey" || respErr.StatusCode == 404
}
return false
}
// IsAccessDenied 判断是否为权限错误
func IsAccessDenied(err error) bool {
var respErr minio.ErrorResponse
if errors.As(err, &respErr) {
return respErr.Code == "AccessDenied" || respErr.StatusCode == 403
}
return false
}
// UploadWithRetry 带指数退避重试的上传
func UploadWithRetry(ctx context.Context, client *minio.Client,
bucket, key, localPath string, maxAttempts int) error {
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
if attempt > 0 {
waitDuration := time.Duration(1<
log.Printf("Retry %d/%d after %v: %v",
attempt, maxAttempts, waitDuration, lastErr)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(waitDuration):
}
}
_, err := client.FPutObject(ctx, bucket, key, localPath,
minio.PutObjectOptions{})
if err == nil {
return nil
}
// 不可重试的错误
if IsAccessDenied(err) {
return fmt.Errorf("access denied to %s/%s: %w", bucket, key, err)
}
lastErr = err
}
return fmt.Errorf("upload failed after %d attempts: %w", maxAttempts, lastErr)
}
10.3 超时配置最佳实践
from botocore.config import Config
# 不同操作场景的超时配置
# 小文件上传/下载(< 10MB)
small_file_config = Config(
connect_timeout=5,
read_timeout=30,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
# 大文件传输(> 100MB)
large_file_config = Config(
connect_timeout=10,
read_timeout=300, # 5 分钟读取超时
retries={'max_attempts': 5, 'mode': 'adaptive'}
)
# 元数据操作(HeadObject、ListObjects 等)
metadata_config = Config(
connect_timeout=5,
read_timeout=10,
retries={'max_attempts': 3, 'mode': 'standard'}
)
11. 通过 SDK 管理桶配置
11.1 Python:管理版本控制与生命周期
# ── 启用版本控制 ─────────────────────────────────────────────────────
s3_client.put_bucket_versioning(
Bucket='demo-bucket',
VersioningConfiguration={'Status': 'Enabled'}
)
# ── 查看版本控制状态 ──────────────────────────────────────────────────
response = s3_client.get_bucket_versioning(Bucket='demo-bucket')
print(response.get('Status', 'Not enabled'))
# ── 设置生命周期规则 ──────────────────────────────────────────────────
s3_client.put_bucket_lifecycle_configuration(
Bucket='demo-bucket',
LifecycleConfiguration={
'Rules': [
{
'ID': 'expire-logs',
'Status': 'Enabled',
'Filter': {'Prefix': 'logs/'},
'Expiration': {'Days': 30},
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}
},
{
'ID': 'cleanup-versions',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'NoncurrentVersionExpiration': {
'NoncurrentDays': 7,
'NewerNoncurrentVersions': 5
}
}
]
}
)
# ── 设置 Bucket Policy ────────────────────────────────────────────────
import json
policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": ["arn:aws:s3:::demo-bucket/public/*"]
}]
}
s3_client.put_bucket_policy(
Bucket='demo-bucket',
Policy=json.dumps(policy)
)
# ── 设置桶事件通知 ────────────────────────────────────────────────────
s3_client.put_bucket_notification_configuration(
Bucket='demo-bucket',
NotificationConfiguration={
'QueueConfigurations': [
{
'QueueArn': 'arn:minio:sqs::primary:kafka',
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
'Filter': {
'Key': {
'FilterRules': [
{'Name': 'prefix', 'Value': 'uploads/'},
{'Name': 'suffix', 'Value': '.jpg'}
]
}
}
}
]
}
)
11.2 Go:桶策略与 CORS 配置
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/minio/minio-go/v7"
)
// SetPublicReadPolicy 设置桶的公开只读策略
func SetPublicReadPolicy(ctx context.Context, client *minio.Client,
bucket string) error {
policy := map[string]interface{}{
"Version": "2012-10-17",
"Statement": []map[string]interface{}{
{
"Effect": "Allow",
"Principal": map[string]interface{}{"AWS": []string{"*"}},
"Action": []string{"s3:GetObject"},
"Resource": []string{fmt.Sprintf("arn:aws:s3:::%s/*", bucket)},
},
},
}
policyBytes, err := json.Marshal(policy)
if err != nil {
return fmt.Errorf("marshal policy: %w", err)
}
return client.SetBucketPolicy(ctx, bucket, string(policyBytes))
}
// GetBucketPolicy 获取桶策略
func GetBucketPolicy(ctx context.Context, client *minio.Client,
bucket string) (string, error) {
return client.GetBucketPolicy(ctx, bucket)
}