?【集成準備】
1、Python環境配置
下載Python和PyCharm并安裝。
?
使用安裝的python本身作為解釋器。
?
安裝AGC Python SDK。
?云存儲包安裝完成。
?
2、AGC環境配置
在AGC創建項目和應用
?
開通云存儲服務。
返回項目設置界面,選擇Server SDK 頁簽,在認證憑據處點擊創建按鈕,然后下載認證憑據。
?
將認證憑據導入到項目中
【布局設計】
本次測試的Demo是一個Python服務,所以沒有界面UI。
【功能實現】
引入AGC與云存儲模塊
from agconnect.cloud_storage import AGCCloudStorageExceptionfrom agconnect.cloud_storage import GetFilesOptionsfrom agconnect.cloud_storage import Metadatafrom agconnect.cloud_storage import StorageManagementfrom agconnect.common_server import AGCClient, CredentialParser, logger
將下載的憑據文件放入項目中,調用AGCClient.initialize方法初始化AGCClient實例
將配置開發環境中獲取的認證憑據放置到自定義的目錄,通過initialize方法初始化對應認證憑據的AGCClient實例。
通過StorageManagement()來初始化存儲實例。
bucket_name = 'cloudstoragepython-kyuv2'file_name = 'test.txt' # for example: "test.txt"config_path = os.path.normpath(os.path.join(os.path.dirname(__file__), 'agc-apiclient-1157238089186298880-7233693580609187129.json'))credential = CredentialParser.to_credential(config_path)AGCClient.initialize(credential=credential)storage = StorageManagement()bucket = storage.bucket(bucket_name)
上傳文件
使用bucket.upload方法將文件上傳到云端。
def upload_file(bucket):config_path = os.path.normpath(os.path.join(os.path.dirname(__file__), 'C:\Users\kwx1075489\Desktop\agcserversdk-python-1.3.0.300\test.txt'))if not os.path.exists(config_path):logger.error("file does not exist")loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)result = loop.run_until_complete(bucket.upload(path_str=config_path))logger.info(f"Upload file response: {result}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the upload file process: {e}")finally:loop.close()
調用file.get_metadata方法獲取設置在云端的元數據
def get_file_metadata(bucket, file_name):file = bucket.file(file_name)loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)result = loop.run_until_complete(file.get_metadata())res_json = loop.run_until_complete(result.json(content_type='text/plain'))logger.info(f"Get file metadata: {res_json}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the get file metadata: {e}")finally:loop.close()
調用file.set_metadata方法將文件屬性的元數據和自定義的元數據覆蓋到云端。
def update_file_metadata(bucket, file_name):file = bucket.file(file_name)loop = asyncio.new_event_loop()try:asyncio.set_event_loop(loop)get_result = loop.run_until_complete(file.get_metadata())get_result_text = loop.run_until_complete(get_result.text())get_result_json = json.loads(get_result_text)metadata = Metadata(content_language='en-US', custom_metadata={'test': 'test'},content_type=get_result_json.get('contentType'))set_result = loop.run_until_complete(file.set_metadata(metadata))set_result_json = loop.run_until_complete(set_result.json(content_type='text/plain'))logger.info(f"Update file metadata response: {set_result_json}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the update file metadata: {e}")finally:loop.close()
調用file.download方法將云端文件數據寫入本地文件中。
def download_file(bucket, file_name):def progress_callback(progress: Dict[str, int] = None):logger.info(f"Downloaded {progress['writtenBytes']} bytes out of {progress['totalBytes']}")loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)try:local_file = os.path.normpath(os.path.join(os.path.dirname(__file__), ' C:\Users\kwx1075489\Desktop'))if not os.path.exists(local_file):logger.error("file does not exist")remote_file = bucket.file(file_name)text, resp = loop.run_until_complete(remote_file.download(local_file, on_download_progress=progress_callback))logger.info(f"Download file response: {resp}")assert text == "File downloaded successfully. "except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the download file process: {e}")finally:loop.close()
調用file.download方法獲取當前目錄下的文件與子目錄。
def get_files(bucket):loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)options = GetFilesOptions(delimiter="/")try:result = loop.run_until_complete(bucket.get_files(options=options))logger.info(f"Get file list response: {result}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the get file list: {e}")finally:loop.close()
調用file.delete方法刪除云端文件。
def get_files(bucket):loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)options = GetFilesOptions(delimiter="/")try:result = loop.run_until_complete(bucket.get_files(options=options))logger.info(f"Get file list response: {result}")except (Exception, AGCCloudStorageException) as e:raise AssertionError(f"An error occurred during the get file list: {e}")finally:loop.close()
【功能測試】
執行python main.py命令,服務依次執行:
上傳“test.txt”文件到云端:
?
從云端下載“test.txt”文件到本地目錄:
從云端刪除“cloudstoragepython-kyuv2”存儲區的“test.txt”文件: