返回

如何将 CSV 文件从 Google Cloud Storage 传输到特定的 Google Drive 文件夹?

python

如何将 CSV 文件从 Google Cloud Storage 复制到特定的 Google Drive 文件夹

问题

需要将 Google Cloud Storage (GCS) 中的 CSV 文件复制到特定的 Google Drive 文件夹,但遇到了以下问题:

  • 文件上传到 Google Drive 失败
  • 下载文件的路径错误
  • 文件写入问题

解决方案

验证权限

确保用于 Google Drive API 访问的凭证具有将文件上传到目标文件夹所需的权限。

检查文件路径

仔细检查 download_file_from_gcs() 函数中的 key 参数,确保它是文件的完整路径,包括文件名和扩展名。

修改 upload_to_drive() 函数

使用 MediaFileUpload 类上传文件,而不是直接上传 temp_file

修复文件写入问题

_write_to_stream() 函数中,将 self._stream.write(chunk) 替换为 self._fp.write(chunk)

考虑文件大小限制

Google Drive 有 5TB 的单个文件大小限制。如果文件大于此限制,请考虑分块上传或分段传输。

更新后的代码

import functions_framework
import base64
import tempfile
import traceback
from google.cloud import storage
from googleapiclient.discovery import build
import google.auth
from googleapiclient.http import MediaFileUpload

def upload_to_drive(temp_file, remote_path, credentials):
    """Uploads a file to the specified folder in Google Drive.

    Args:
        temp_file (str): Path to the local file to upload.
        remote_path (str): Name of the file to be created in Google Drive.
        credentials (google.auth.credentials.Credentials): Authentication credentials.
    """
    folder_id = str("1nNqtRloaFT0f71").split(",")
    service = build('drive', 'v3', credentials=credentials)
    print(f"Uploading file: {temp_file} to Google Drive folder: {folder_id}")

    # Use MediaFileUpload class for efficient upload
    media = MediaFileUpload(temp_file, mimetype='text/csv')
    response = service.files().create(
        body={
            "name": remote_path.split("/")[-1],
            "parents": folder_id,
        },
        media_body=media
    ).execute()
    print(f"File uploaded successfully: {response['id']}")

def download_file_from_gcs(bucket, key):
  """
  Downloads a file from Cloud Storage and writes it to a temporary file.

  Args:
      bucket (str): Name of the Cloud Storage bucket.
      key (str): Name of the file in the bucket.

  Returns:
      str: Path to the downloaded file within the temporary location.
  """
  print(f"Downloading key: {key} in bucket: {bucket}")
  client = storage.Client()
  source_bucket = client.bucket(bucket)
  blob_object = source_bucket.blob(key)
  with tempfile.NamedTemporaryFile(delete=False) as temp_file:
      blob_object.download_to_filename(temp_file.name)
      local_path = temp_file.name  # Capture the temporary file path
  return local_path

# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def drive_upload(cloud_event):
    data = cloud_event.data
    print("Full event data:", data)
    try:
        # Access bucket name assuming a nested structure
        bucket_name = data["bucket"]
        file_name = data["name"]
        print(f"File name extracted from event data: {file_name}")
        print(f"Bucket name extracted from event data: {bucket_name}")
    except KeyError:
        print("Bucket or file name not found in event data.")
        return  # Gracefully exit the function

    if file_name in ["Trials", "Control"]:
        try:
            # Credentials for Google Drive API access
            credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/drive"])
            # Downloading file from GCS with temporary file
            with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                local = download_file_from_gcs(bucket_name, file_name)
                print(f("Temp File Path: {local}"))
                # Uploading the file to drive
                upload_to_drive(local, file_name, credentials)
                print(f'File {file_name} copied to Google Drive folder')
        except Exception as e:
            print(traceback.format_exc())

结论

通过应用上述解决方案,可以成功地将 CSV 文件从 GCS 复制到指定的 Google Drive 文件夹。该解决方案考虑了权限、文件路径、文件大小限制和文件写入等问题。

常见问题解答

1. 如何确保 CSV 文件不会被覆盖?

可以通过在 upload_to_drive() 函数中添加检查来实现,以查看文件是否存在。如果文件存在,可以提示用户采取适当的措施。

2. 是否可以在不同的 Google Drive 文件夹中上传多个 CSV 文件?

是的,可以通过在 drive_upload() 函数中添加一个循环来实现,该循环会迭代要上传的文件列表并将其上传到适当的文件夹中。

3. 如何在脚本中处理较大的 CSV 文件?

对于较大的 CSV 文件,可以考虑分块上传或分段传输。这将涉及将文件分成更小的块,然后分批上传。

4. 是否可以将脚本与其他云服务(如 BigQuery)集成?

是的,可以通过使用 Cloud Functions 触发器和 BigQuery API 来实现与其他云服务的集成。

5. 如何监控脚本的运行情况?

可以通过使用 Cloud Functions 日志记录和指标来监控脚本的运行情况。这些工具可以提供有关脚本执行时间、错误和其他指标的信息。