如何将 CSV 文件从 Google Cloud Storage 传输到特定的 Google Drive 文件夹?
2024-03-10 23:05:29
如何将 CSV 文件从 Google Cloud Storage 复制到特定的 Google Drive 文件夹
问题
需要将 Google Cloud Storage (GCS) 中的 CSV 文件复制到特定的 Google Drive 文件夹,但遇到了以下问题:
- 文件上传到 Google Drive 失败
- 下载文件的路径错误
- 文件写入问题
解决方案
验证权限
确保用于 Google Drive API 访问的凭证具有将文件上传到目标文件夹所需的权限。
检查文件路径
仔细检查 download_file_from_gcs()
函数中的 key
参数,确保它是文件的完整路径,包括文件名和扩展名。
修改 upload_to_drive()
函数
使用 MediaFileUpload
类上传文件,而不是直接上传 temp_file
。
修复文件写入问题
在 _write_to_stream()
函数中,将 self._stream.write(chunk)
替换为 self._fp.write(chunk)
。
考虑文件大小限制
Google Drive 有 5TB 的单个文件大小限制。如果文件大于此限制,请考虑分块上传或分段传输。
更新后的代码
import functions_framework
import base64
import tempfile
import traceback
from google.cloud import storage
from googleapiclient.discovery import build
import google.auth
from googleapiclient.http import MediaFileUpload
def upload_to_drive(temp_file, remote_path, credentials):
"""Uploads a file to the specified folder in Google Drive.
Args:
temp_file (str): Path to the local file to upload.
remote_path (str): Name of the file to be created in Google Drive.
credentials (google.auth.credentials.Credentials): Authentication credentials.
"""
folder_id = str("1nNqtRloaFT0f71").split(",")
service = build('drive', 'v3', credentials=credentials)
print(f"Uploading file: {temp_file} to Google Drive folder: {folder_id}")
# Use MediaFileUpload class for efficient upload
media = MediaFileUpload(temp_file, mimetype='text/csv')
response = service.files().create(
body={
"name": remote_path.split("/")[-1],
"parents": folder_id,
},
media_body=media
).execute()
print(f"File uploaded successfully: {response['id']}")
def download_file_from_gcs(bucket, key):
"""
Downloads a file from Cloud Storage and writes it to a temporary file.
Args:
bucket (str): Name of the Cloud Storage bucket.
key (str): Name of the file in the bucket.
Returns:
str: Path to the downloaded file within the temporary location.
"""
print(f"Downloading key: {key} in bucket: {bucket}")
client = storage.Client()
source_bucket = client.bucket(bucket)
blob_object = source_bucket.blob(key)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
blob_object.download_to_filename(temp_file.name)
local_path = temp_file.name # Capture the temporary file path
return local_path
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def drive_upload(cloud_event):
data = cloud_event.data
print("Full event data:", data)
try:
# Access bucket name assuming a nested structure
bucket_name = data["bucket"]
file_name = data["name"]
print(f"File name extracted from event data: {file_name}")
print(f"Bucket name extracted from event data: {bucket_name}")
except KeyError:
print("Bucket or file name not found in event data.")
return # Gracefully exit the function
if file_name in ["Trials", "Control"]:
try:
# Credentials for Google Drive API access
credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/drive"])
# Downloading file from GCS with temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
local = download_file_from_gcs(bucket_name, file_name)
print(f("Temp File Path: {local}"))
# Uploading the file to drive
upload_to_drive(local, file_name, credentials)
print(f'File {file_name} copied to Google Drive folder')
except Exception as e:
print(traceback.format_exc())
结论
通过应用上述解决方案,可以成功地将 CSV 文件从 GCS 复制到指定的 Google Drive 文件夹。该解决方案考虑了权限、文件路径、文件大小限制和文件写入等问题。
常见问题解答
1. 如何确保 CSV 文件不会被覆盖?
可以通过在 upload_to_drive()
函数中添加检查来实现,以查看文件是否存在。如果文件存在,可以提示用户采取适当的措施。
2. 是否可以在不同的 Google Drive 文件夹中上传多个 CSV 文件?
是的,可以通过在 drive_upload()
函数中添加一个循环来实现,该循环会迭代要上传的文件列表并将其上传到适当的文件夹中。
3. 如何在脚本中处理较大的 CSV 文件?
对于较大的 CSV 文件,可以考虑分块上传或分段传输。这将涉及将文件分成更小的块,然后分批上传。
4. 是否可以将脚本与其他云服务(如 BigQuery)集成?
是的,可以通过使用 Cloud Functions 触发器和 BigQuery API 来实现与其他云服务的集成。
5. 如何监控脚本的运行情况?
可以通过使用 Cloud Functions 日志记录和指标来监控脚本的运行情况。这些工具可以提供有关脚本执行时间、错误和其他指标的信息。