返回

科技赋能,轻松玩转MinIo分片上传进阶篇

后端

MinIo是一款轻量级、高性能的对象存储系统,因其简单易用、功能强大而受到广泛欢迎。在实际应用中,我们经常需要上传大文件,而MinIo的分片上传功能可以将大文件分割成更小的分片,并行上传,从而大大提高上传速度。

在本文中,我们将深入探讨MinIo分片上传的进阶技巧,帮助您更有效地管理大文件上传任务,提高上传性能,优化存储空间利用率。我们将介绍多线程分片上传、断点续传、上传进度跟踪等实用功能,并提供详细的代码示例,让您轻松掌握MinIo分片上传的精髓。

多线程分片上传

多线程分片上传可以充分利用多核CPU的优势,提高上传速度。MinIo支持多线程分片上传,您可以通过设置concurrentUploads参数来指定同时上传的分片数量。例如,以下代码演示了如何使用多线程分片上传:

import io.minio.MinioClient;
import io.minio.ObjectWriteResponse;
import io.minio.PutObjectOptions;
import io.minio.errors.InsufficientDataException;
import io.minio.http.Method;
import io.minio.messages.AbortMultipartUploadResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;

public class MinIoMultithreadedUpload {

  public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
    // 1. 创建MinIo客户端
    MinioClient minioClient = MinioClient.builder().endpoint("localhost", 9000).credentials("minio", "minio123").build();

    // 2. 创建一个存储桶
    minioClient.makeBucket("my-bucket");

    // 3. 定义上传参数
    PutObjectOptions putObjectOptions = new PutObjectOptions(new FileInputStream(new File("large-file.txt")), 1024 * 1024 * 5);
    putObjectOptions.setPartSize(1024 * 1024 * 5); // 分片大小设置为5MB
    putObjectOptions.setConcurrentUploads(5); // 设置同时上传的分片数量为5

    // 4. 初始化分片上传
    String uploadId = minioClient.initiateMultipartUpload("my-bucket", "large-file.txt", putObjectOptions);

    // 5. 上传分片
    List<CompletableFuture<ObjectWriteResponse>> futures = new ArrayList<>();
    for (int i = 0; i < putObjectOptions.calculatePartCount(); i++) {
      futures.add(
          CompletableFuture.supplyAsync(
              () -> {
                try {
                  // 计算分片偏移量和大小
                  long startOffset = i * putObjectOptions.getPartSize();
                  long endOffset = Math.min(startOffset + putObjectOptions.getPartSize() - 1, putObjectOptions.getObjectSize() - 1);

                  // 上传分片
                  return minioClient.uploadPart(
                      "my-bucket",
                      "large-file.txt",
                      uploadId,
                      i + 1,
                      new FileInputStream(new File("large-file.txt")),
                      startOffset,
                      endOffset - startOffset + 1);
                } catch (Exception e) {
                  throw new RuntimeException(e);
                }
              }));
    }

    // 6. 等待所有分片上传完成
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

    // 7. 完成分片上传
    minioClient.completeMultipartUpload("my-bucket", "large-file.txt", uploadId, null);

    // 8. 打印上传结果
    System.out.println("文件上传成功!");
  }
}

断点续传

断点续传功能允许您在上传过程中断后重新开始上传,而无需重新上传整个文件。MinIo支持断点续传,您可以通过uploadId参数来恢复中断的上传任务。例如,以下代码演示了如何使用断点续传:

import io.minio.MinioClient;
import io.minio.ObjectWriteResponse;
import io.minio.PutObjectOptions;
import io.minio.errors.InsufficientDataException;
import io.minio.http.Method;
import io.minio.messages.AbortMultipartUploadResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;

public class MinIoResumableUpload {

  public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
    // 1. 创建MinIo客户端
    MinioClient minioClient = MinioClient.builder().endpoint("localhost", 9000).credentials("minio", "minio123").build();

    // 2. 创建一个存储桶
    minioClient.makeBucket("my-bucket");

    // 3. 定义上传参数
    PutObjectOptions putObjectOptions = new PutObjectOptions(new FileInputStream(new File("large-file.txt")), 1024 * 1024 * 5);
    putObjectOptions.setPartSize(1024 * 1024 * 5); // 分片大小设置为5MB
    putObjectOptions.setConcurrentUploads(5); // 设置同时上传的分片数量为5

    // 4. 初始化分片上传
    String uploadId = minioClient.initiateMultipartUpload("my-bucket", "large-file.txt", putObjectOptions);

    // 5. 上传分片
    List<CompletableFuture<ObjectWriteResponse>> futures = new ArrayList<>();
    for (int i = 0; i < putObjectOptions.calculatePartCount(); i++) {
      futures.add(
          CompletableFuture.supplyAsync(
              () -> {
                try {
                  // 计算分片偏移量和大小
                  long startOffset = i * putObjectOptions.getPartSize();
                  long endOffset = Math.min(startOffset + putObjectOptions.getPartSize() - 1, putObjectOptions.getObjectSize() - 1);

                  // 上传分片
                  return minioClient.uploadPart(
                      "my-bucket",
                      "large-file.txt",
                      uploadId,
                      i + 1,
                      new FileInputStream(new File("large-file.txt")),
                      startOffset,
                      endOffset - startOffset + 1);
                } catch (Exception e) {
                  throw new RuntimeException(e);
                }
              }));
    }

    // 6. 等待所有分片上传完成
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

    // 7. 检查文件是否已经完全上传
    boolean isCompleted = minioClient.isMultipartUploadComplete("my-bucket", "large-file.txt", uploadId);
    if (isCompleted) {
      // 文件已经完全上传,完成分片上传
      minioClient.completeMultipartUpload("my-bucket", "large-file.txt", uploadId, null);

      // 打印上传结果
      System.out.println("文件上传成功!");
    } else {
      // 文件还没有完全上传,断点续传
      resumeMultipartUpload(minioClient, "my-bucket", "large-file.txt", uploadId, putObjectOptions);
    }
  }

  private static void resumeMultipartUpload(
      MinioClient minioClient, String bucketName, String objectName, String uploadId, PutObjectOptions putObjectOptions)
      throws IOException, InterruptedException, ExecutionException {
    // 获取已经上传的分片信息
    List<CompletedPart> completedParts = minioClient.listMultipartUploadParts("my-bucket", "large-file.txt", uploadId);

    // 计算已经上传的分片数量和大小
    int uploadedPartCount = completedParts.size();