返回
科技赋能,轻松玩转MinIo分片上传进阶篇
后端
2024-02-09 09:17:21
MinIo是一款轻量级、高性能的对象存储系统,因其简单易用、功能强大而受到广泛欢迎。在实际应用中,我们经常需要上传大文件,而MinIo的分片上传功能可以将大文件分割成更小的分片,并行上传,从而大大提高上传速度。
在本文中,我们将深入探讨MinIo分片上传的进阶技巧,帮助您更有效地管理大文件上传任务,提高上传性能,优化存储空间利用率。我们将介绍多线程分片上传、断点续传、上传进度跟踪等实用功能,并提供详细的代码示例,让您轻松掌握MinIo分片上传的精髓。
多线程分片上传
多线程分片上传可以充分利用多核CPU的优势,提高上传速度。MinIo支持多线程分片上传,您可以通过设置concurrentUploads
参数来指定同时上传的分片数量。例如,以下代码演示了如何使用多线程分片上传:
import io.minio.MinioClient;
import io.minio.ObjectWriteResponse;
import io.minio.PutObjectOptions;
import io.minio.errors.InsufficientDataException;
import io.minio.http.Method;
import io.minio.messages.AbortMultipartUploadResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
public class MinIoMultithreadedUpload {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
// 1. 创建MinIo客户端
MinioClient minioClient = MinioClient.builder().endpoint("localhost", 9000).credentials("minio", "minio123").build();
// 2. 创建一个存储桶
minioClient.makeBucket("my-bucket");
// 3. 定义上传参数
PutObjectOptions putObjectOptions = new PutObjectOptions(new FileInputStream(new File("large-file.txt")), 1024 * 1024 * 5);
putObjectOptions.setPartSize(1024 * 1024 * 5); // 分片大小设置为5MB
putObjectOptions.setConcurrentUploads(5); // 设置同时上传的分片数量为5
// 4. 初始化分片上传
String uploadId = minioClient.initiateMultipartUpload("my-bucket", "large-file.txt", putObjectOptions);
// 5. 上传分片
List<CompletableFuture<ObjectWriteResponse>> futures = new ArrayList<>();
for (int i = 0; i < putObjectOptions.calculatePartCount(); i++) {
futures.add(
CompletableFuture.supplyAsync(
() -> {
try {
// 计算分片偏移量和大小
long startOffset = i * putObjectOptions.getPartSize();
long endOffset = Math.min(startOffset + putObjectOptions.getPartSize() - 1, putObjectOptions.getObjectSize() - 1);
// 上传分片
return minioClient.uploadPart(
"my-bucket",
"large-file.txt",
uploadId,
i + 1,
new FileInputStream(new File("large-file.txt")),
startOffset,
endOffset - startOffset + 1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
// 6. 等待所有分片上传完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
// 7. 完成分片上传
minioClient.completeMultipartUpload("my-bucket", "large-file.txt", uploadId, null);
// 8. 打印上传结果
System.out.println("文件上传成功!");
}
}
断点续传
断点续传功能允许您在上传过程中断后重新开始上传,而无需重新上传整个文件。MinIo支持断点续传,您可以通过uploadId
参数来恢复中断的上传任务。例如,以下代码演示了如何使用断点续传:
import io.minio.MinioClient;
import io.minio.ObjectWriteResponse;
import io.minio.PutObjectOptions;
import io.minio.errors.InsufficientDataException;
import io.minio.http.Method;
import io.minio.messages.AbortMultipartUploadResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
public class MinIoResumableUpload {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
// 1. 创建MinIo客户端
MinioClient minioClient = MinioClient.builder().endpoint("localhost", 9000).credentials("minio", "minio123").build();
// 2. 创建一个存储桶
minioClient.makeBucket("my-bucket");
// 3. 定义上传参数
PutObjectOptions putObjectOptions = new PutObjectOptions(new FileInputStream(new File("large-file.txt")), 1024 * 1024 * 5);
putObjectOptions.setPartSize(1024 * 1024 * 5); // 分片大小设置为5MB
putObjectOptions.setConcurrentUploads(5); // 设置同时上传的分片数量为5
// 4. 初始化分片上传
String uploadId = minioClient.initiateMultipartUpload("my-bucket", "large-file.txt", putObjectOptions);
// 5. 上传分片
List<CompletableFuture<ObjectWriteResponse>> futures = new ArrayList<>();
for (int i = 0; i < putObjectOptions.calculatePartCount(); i++) {
futures.add(
CompletableFuture.supplyAsync(
() -> {
try {
// 计算分片偏移量和大小
long startOffset = i * putObjectOptions.getPartSize();
long endOffset = Math.min(startOffset + putObjectOptions.getPartSize() - 1, putObjectOptions.getObjectSize() - 1);
// 上传分片
return minioClient.uploadPart(
"my-bucket",
"large-file.txt",
uploadId,
i + 1,
new FileInputStream(new File("large-file.txt")),
startOffset,
endOffset - startOffset + 1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
// 6. 等待所有分片上传完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
// 7. 检查文件是否已经完全上传
boolean isCompleted = minioClient.isMultipartUploadComplete("my-bucket", "large-file.txt", uploadId);
if (isCompleted) {
// 文件已经完全上传,完成分片上传
minioClient.completeMultipartUpload("my-bucket", "large-file.txt", uploadId, null);
// 打印上传结果
System.out.println("文件上传成功!");
} else {
// 文件还没有完全上传,断点续传
resumeMultipartUpload(minioClient, "my-bucket", "large-file.txt", uploadId, putObjectOptions);
}
}
private static void resumeMultipartUpload(
MinioClient minioClient, String bucketName, String objectName, String uploadId, PutObjectOptions putObjectOptions)
throws IOException, InterruptedException, ExecutionException {
// 获取已经上传的分片信息
List<CompletedPart> completedParts = minioClient.listMultipartUploadParts("my-bucket", "large-file.txt", uploadId);
// 计算已经上传的分片数量和大小
int uploadedPartCount = completedParts.size();