????????最近因為需要對大規模的文件進行存儲,選了多種對象存儲方案,最終選擇了MinIO,為了方便python的調用,在minio第三方包的基礎上進行進一步封裝調用,該工具除了基礎的功能外,還封裝了多線程分片下載文件和上傳文件的功能,切片設置不宜過大,因為會受限于機器的帶寬,過大會導致帶寬被占光影響機器性能。分享的代碼僅供學習使用。
import os
import io
from minio import Minio
from minio.error import S3Error
from datetime import timedelta
from tqdm import tqdm
from minio.deleteobjects import DeleteObject
from concurrent.futures import as_completed, ThreadPoolExecutorclass Bucket(object):client = Nonepolicy = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}'def __new__(cls, *args, **kwargs):if not cls.client:cls.client = object.__new__(cls)return cls.clientdef __init__(self, service, access_key, secret_key, secure=False, section_size=10, t_max=3):'''實例化參數:param service: 服務器地址:param access_key: access_key:param secret_key: secret_key:param secure: secure:param section_size: 切片大小mb:param t_max: 線程池大小'''self.service = serviceself.client = Minio(service, access_key=access_key, secret_key=secret_key, secure=secure)self.size = section_size * 1024 * 1024self.processPool = ThreadPoolExecutor(max_workers=t_max)def exists_bucket(self, bucket_name):"""判斷桶是否存在:param bucket_name: 桶名稱:return:"""return self.client.bucket_exists(bucket_name=bucket_name)def create_bucket(self, bucket_name: str, is_policy: bool=True):"""創建桶 + 賦予策略:param bucket_name: 桶名:param is_policy: 策略:return:"""if self.exists_bucket(bucket_name=bucket_name):return Falseelse:self.client.make_bucket(bucket_name=bucket_name)if is_policy:policy = self.policy % (bucket_name, bucket_name)self.client.set_bucket_policy(bucket_name=bucket_name, policy=policy)return Truedef get_bucket_list(self):"""列出存儲桶:return:"""buckets = self.client.list_buckets()bucket_list = []for bucket in buckets:bucket_list.append({"bucket_name": bucket.name, "create_time": bucket.creation_date})return bucket_listdef remove_bucket(self, bucket_name):"""刪除桶:param bucket_name::return:"""try:self.client.remove_bucket(bucket_name=bucket_name)except S3Error as e:print("[error]:", e)return Falsereturn Truedef bucket_list_files(self, bucket_name, prefix):"""列出存儲桶中所有對象:param bucket_name: 同名:param prefix: 前綴:return:"""try:files_list = self.client.list_objects(bucket_name=bucket_name, prefix=prefix, recursive=True)for obj in files_list:print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.last_modified,obj.etag, obj.size, obj.content_type)except S3Error as e:print("[error]:", e)def bucket_policy(self, bucket_name):"""列出桶存儲策略:param bucket_name::return:"""try:policy = self.client.get_bucket_policy(bucket_name)except S3Error as e:print("[error]:", e)return Nonereturn policydef download_file(self, bucket_name, file, file_path, stream=1024*32):"""從bucket 下載文件 + 寫入指定文件:return:"""try:data = self.client.get_object(bucket_name, file)with open(file_path, "wb") as fp:for d in data.stream(stream):fp.write(d)except S3Error as e:print("[error]:", e)def fget_file(self, bucket_name, file, file_path):"""下載保存文件保存本地:param bucket_name::param file::param file_path::return:"""self.client.fget_object(bucket_name, file, file_path)def get_section_data(self, bucket_name, file_name, start, size):'''獲取切片數據:param bucket_name::param file_name::param start::param size::return:'''data = {'start': start, 'data': None}try:obj = self.client.get_object(bucket_name=bucket_name, object_name=file_name, offset=start, length=size)data = {'start': start, 'data': obj}except Exception as e:print('=============', e)return datadef get_file_object(self, bucket_name, object_name):"""獲取文件對象:param bucket_name::param file::return:"""pool_arr = []file_data = io.BytesIO()try:stat_obj = self.client.stat_object(bucket_name=bucket_name, object_name=object_name)total_length = stat_obj.sizesize = self.sizetotal_page = self.get_page_count(total_length, size)total = 0for chunck in range(1, total_page + 1):start = (chunck - 1) * sizeif chunck == total_page:size = total_length - totalthread_item = self.processPool.submit(self.get_section_data, bucket_name, object_name, start, size)pool_arr.append(thread_item)for key, thread_res in tqdm(enumerate(as_completed(pool_arr)), unit='MB', unit_scale=True,unit_divisor=1024 * 1024, ascii=True, total=len(pool_arr), ncols=50):try:_res = thread_res.result()file_data.seek(_res['start'])file_data.write(_res['data'].read())except Exception as e:print(e)except Exception as e:print(e)return file_data.getvalue()def get_object_list(self, bucket_name):objects = []try:objects = self.client.list_objects(bucket_name)except Exception as e:print(e)return objectsdef get_page_count(self, total, per_page):"""計算分頁總數:param total: 記錄總數:param per_page: 每頁記錄數:return: 分頁總數"""page_count = total // per_pageif total % per_page != 0:page_count += 1return page_countdef copy_file(self, bucket_name, file, file_path):"""拷貝文件(最大支持5GB):param bucket_name::param file::param file_path::return:"""self.client.copy_object(bucket_name, file, file_path)def upload_file(self, bucket_name, file, file_path, content_type):"""上傳文件 + 寫入:param bucket_name: 桶名:param file: 文件名:param file_path: 本地文件路徑:param content_type: 文件類型:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:print("Bucket '{}' is not exists".format(bucket_name))self.client.make_bucket(bucket_name)with open(file_path, "rb") as file_data:file_stat = os.stat(file_path)self.client.put_object(bucket_name, file, file_data, file_stat.st_size, content_type=content_type)except S3Error as e:print("[error]:", e)def upload_object(self, bucket_name, file, file_data, content_type='binary/octet-stream'):"""上傳文件 + 寫入:param bucket_name: 桶名:param file: 文件名:param file_data: bytes:param content_type: 文件類型 默認是appliction/octet-stream:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:print("Bucket '{}' is not exists".format(bucket_name))self.client.make_bucket(bucket_name)buffer = io.BytesIO(file_data)st_size = len(file_data)self.client.put_object(bucket_name, file, buffer, st_size, content_type=content_type)except S3Error as e:print("[error]:", e)def fput_file(self, bucket_name, file, file_path):"""上傳文件:param bucket_name: 桶名:param file: 文件名:param file_path: 本地文件路徑:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:self.client.make_bucket(bucket_name)else:print("Bucket '{}' already exists".format(bucket_name))self.client.fput_object(bucket_name, file, file_path)except S3Error as e:print("[error]:", e)def stat_object(self, bucket_name, file, log=True):"""獲取文件元數據:param bucket_name::param file::return:"""res = Nonetry:data = self.client.stat_object(bucket_name, file)res = dataif log:print(data.bucket_name)print(data.object_name)print(data.last_modified)print(data.etag)print(data.size)print(data.metadata)print(data.content_type)except S3Error as e:if log:print("[error]:", e)return resdef remove_file(self, bucket_name, file):"""移除單個文件:return:"""self.client.remove_object(bucket_name, file)def remove_files(self, bucket_name, file_list):"""刪除多個文件:return:"""delete_object_list = [DeleteObject(file) for file in file_list]for del_err in self.client.remove_objects(bucket_name, delete_object_list):print("del_err", del_err)def presigned_get_file(self, bucket_name, file, days=7):"""生成一個http GET操作 簽證URL:return:"""return self.client.presigned_get_object(bucket_name, file, expires=timedelta(days=days))