Python Minio 工具類封裝

????????最近因為需要對大規模的文件進行存儲,選了多種對象存儲方案,最終選擇了MinIO,為了方便python的調用,在minio第三方包的基礎上進行進一步封裝調用,該工具除了基礎的功能外,還封裝了多線程分片下載文件和上傳文件的功能,切片設置不宜過大,因為會受限于機器的帶寬,過大會導致帶寬被占光影響機器性能。分享的代碼僅供學習使用。

import os
import io
from minio import Minio
from minio.error import S3Error
from datetime import timedelta
from tqdm import tqdm
from minio.deleteobjects import DeleteObject
from concurrent.futures import as_completed, ThreadPoolExecutorclass Bucket(object):client = Nonepolicy = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}'def __new__(cls, *args, **kwargs):if not cls.client:cls.client = object.__new__(cls)return cls.clientdef __init__(self, service, access_key, secret_key, secure=False, section_size=10, t_max=3):'''實例化參數:param service: 服務器地址:param access_key: access_key:param secret_key: secret_key:param secure: secure:param section_size: 切片大小mb:param t_max: 線程池大小'''self.service = serviceself.client = Minio(service, access_key=access_key, secret_key=secret_key, secure=secure)self.size = section_size * 1024 * 1024self.processPool = ThreadPoolExecutor(max_workers=t_max)def exists_bucket(self, bucket_name):"""判斷桶是否存在:param bucket_name: 桶名稱:return:"""return self.client.bucket_exists(bucket_name=bucket_name)def create_bucket(self, bucket_name: str, is_policy: bool=True):"""創建桶 + 賦予策略:param bucket_name: 桶名:param is_policy: 策略:return:"""if self.exists_bucket(bucket_name=bucket_name):return Falseelse:self.client.make_bucket(bucket_name=bucket_name)if is_policy:policy = self.policy % (bucket_name, bucket_name)self.client.set_bucket_policy(bucket_name=bucket_name, policy=policy)return Truedef get_bucket_list(self):"""列出存儲桶:return:"""buckets = self.client.list_buckets()bucket_list = []for bucket in buckets:bucket_list.append({"bucket_name": bucket.name, "create_time": bucket.creation_date})return bucket_listdef remove_bucket(self, bucket_name):"""刪除桶:param bucket_name::return:"""try:self.client.remove_bucket(bucket_name=bucket_name)except S3Error as e:print("[error]:", e)return Falsereturn Truedef bucket_list_files(self, bucket_name, prefix):"""列出存儲桶中所有對象:param bucket_name: 同名:param prefix: 前綴:return:"""try:files_list = self.client.list_objects(bucket_name=bucket_name, prefix=prefix, recursive=True)for obj in files_list:print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.last_modified,obj.etag, obj.size, obj.content_type)except S3Error as e:print("[error]:", e)def bucket_policy(self, bucket_name):"""列出桶存儲策略:param bucket_name::return:"""try:policy = self.client.get_bucket_policy(bucket_name)except S3Error as e:print("[error]:", e)return Nonereturn policydef download_file(self, bucket_name, file, file_path, stream=1024*32):"""從bucket 下載文件 + 寫入指定文件:return:"""try:data = self.client.get_object(bucket_name, file)with open(file_path, "wb") as fp:for d in data.stream(stream):fp.write(d)except S3Error as e:print("[error]:", e)def fget_file(self, bucket_name, file, file_path):"""下載保存文件保存本地:param bucket_name::param file::param file_path::return:"""self.client.fget_object(bucket_name, file, file_path)def get_section_data(self, bucket_name, file_name, start, size):'''獲取切片數據:param bucket_name::param file_name::param start::param size::return:'''data = {'start': start, 'data': None}try:obj = self.client.get_object(bucket_name=bucket_name, object_name=file_name, offset=start, length=size)data = {'start': start, 'data': obj}except Exception as e:print('=============', e)return datadef get_file_object(self, bucket_name, object_name):"""獲取文件對象:param bucket_name::param file::return:"""pool_arr = []file_data = io.BytesIO()try:stat_obj = self.client.stat_object(bucket_name=bucket_name, object_name=object_name)total_length = stat_obj.sizesize = self.sizetotal_page = self.get_page_count(total_length, size)total = 0for chunck in range(1, total_page + 1):start = (chunck - 1) * sizeif chunck == total_page:size = total_length - totalthread_item = self.processPool.submit(self.get_section_data, bucket_name, object_name, start, size)pool_arr.append(thread_item)for key, thread_res in tqdm(enumerate(as_completed(pool_arr)), unit='MB', unit_scale=True,unit_divisor=1024 * 1024, ascii=True, total=len(pool_arr), ncols=50):try:_res = thread_res.result()file_data.seek(_res['start'])file_data.write(_res['data'].read())except Exception as e:print(e)except Exception as e:print(e)return file_data.getvalue()def get_object_list(self, bucket_name):objects = []try:objects = self.client.list_objects(bucket_name)except Exception as e:print(e)return objectsdef get_page_count(self, total, per_page):"""計算分頁總數:param total: 記錄總數:param per_page: 每頁記錄數:return: 分頁總數"""page_count = total // per_pageif total % per_page != 0:page_count += 1return page_countdef copy_file(self, bucket_name, file, file_path):"""拷貝文件(最大支持5GB):param bucket_name::param file::param file_path::return:"""self.client.copy_object(bucket_name, file, file_path)def upload_file(self, bucket_name, file, file_path, content_type):"""上傳文件 + 寫入:param bucket_name: 桶名:param file: 文件名:param file_path: 本地文件路徑:param content_type: 文件類型:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:print("Bucket '{}' is not exists".format(bucket_name))self.client.make_bucket(bucket_name)with open(file_path, "rb") as file_data:file_stat = os.stat(file_path)self.client.put_object(bucket_name, file, file_data, file_stat.st_size, content_type=content_type)except S3Error as e:print("[error]:", e)def upload_object(self, bucket_name, file, file_data, content_type='binary/octet-stream'):"""上傳文件 + 寫入:param bucket_name: 桶名:param file: 文件名:param file_data: bytes:param content_type: 文件類型 默認是appliction/octet-stream:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:print("Bucket '{}' is not exists".format(bucket_name))self.client.make_bucket(bucket_name)buffer = io.BytesIO(file_data)st_size = len(file_data)self.client.put_object(bucket_name, file, buffer, st_size, content_type=content_type)except S3Error as e:print("[error]:", e)def fput_file(self, bucket_name, file, file_path):"""上傳文件:param bucket_name: 桶名:param file: 文件名:param file_path: 本地文件路徑:return:"""try:# Make bucket if not exist.found = self.client.bucket_exists(bucket_name)if not found:self.client.make_bucket(bucket_name)else:print("Bucket '{}' already exists".format(bucket_name))self.client.fput_object(bucket_name, file, file_path)except S3Error as e:print("[error]:", e)def stat_object(self, bucket_name, file, log=True):"""獲取文件元數據:param bucket_name::param file::return:"""res = Nonetry:data = self.client.stat_object(bucket_name, file)res = dataif log:print(data.bucket_name)print(data.object_name)print(data.last_modified)print(data.etag)print(data.size)print(data.metadata)print(data.content_type)except S3Error as e:if log:print("[error]:", e)return resdef remove_file(self, bucket_name, file):"""移除單個文件:return:"""self.client.remove_object(bucket_name, file)def remove_files(self, bucket_name, file_list):"""刪除多個文件:return:"""delete_object_list = [DeleteObject(file) for file in file_list]for del_err in self.client.remove_objects(bucket_name, delete_object_list):print("del_err", del_err)def presigned_get_file(self, bucket_name, file, days=7):"""生成一個http GET操作 簽證URL:return:"""return self.client.presigned_get_object(bucket_name, file, expires=timedelta(days=days))

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/15090.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/15090.shtml
英文地址,請注明出處:http://en.pswp.cn/web/15090.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

DeepDriving | CUDA編程-03:線程層級

本文來源公眾號“DeepDriving”,僅用于學術分享,侵權刪,干貨滿滿。 原文鏈接:CUDA編程-03:線程層級 DeepDriving | CUDA編程-01: 搭建CUDA編程環境-CSDN博客 DeepDriving | CUDA編程-02: 初識CUDA編程-C…

Linux之共享內存mmap用法實例(六十三)

簡介: CSDN博客專家,專注Android/Linux系統,分享多mic語音方案、音視頻、編解碼等技術,與大家一起成長! 優質專欄:Audio工程師進階系列【原創干貨持續更新中……】🚀 優質專欄:多媒…

外賣霸王餐返利外賣會員卡小程序開發

外賣霸王餐返利外賣會員卡小程序開發 "社交電商賦能下的外賣返利小程序"是專為商家與用戶雙贏而設計的創新平臺。 以下是其開發方案的詳細步驟: 一、需求梳理:首先,我們需要明確小程序的核心功能和特色。包括設定活動類型、返利…

Python學習(3) 函數

定義 定義一個函數的格式: def 函數名(參數):執行代碼如果沒有參數,則稱為無參函數。 定義時小括號中寫的是形參(形式參數),調用時寫的是實參(實際參數)。 調用 調用格式: def…

【Docker】Linux 系統(CentOS 7)安裝 Docker

文章目錄 對 VMware 軟件的建議官方說明文檔Docker安裝卸載舊版本docker設置倉庫開始安裝 docker 引擎最新版 Docker 安裝指定版本 Docker 安裝(特殊需求使用) 啟動 Docker查看 Docker 版本查看 Docker 鏡像設置 Docker 開機自啟動 驗證開機啟動是否生效…

自定義原生小程序頂部及獲取膠囊信息

需求:我需要將某個文字或者按鈕放置在小程序頂部位置 思路:根據獲取到的頂部信息來定義我需要放的這個元素樣式 * 這里我是定義某個指定頁面 json:給指定頁面的json中添加自定義設置 "navigationStyle": "custom" JS&am…

新時代AI浪潮下,程序員和產品經理如何入局AIGC領域?

當下,AI浪潮席卷全球,AIGC大模型技術已經成為當今技術領域的一個重要趨勢,對于產品經理來說,掌握這項技術不僅能夠增強他們的職業技能,還能在競爭激烈的職場中脫穎而出。 為什么呢? 把握AI時代的機遇 AI技…

StringMVC

目錄 一,MVC定義 二,SpringMVC的基本使用 2.1建立連接 - RequestMapping("/...") ?編輯 2.2請求 1.傳遞單個參數 2.傳遞多個參數 3.傳遞對象 4.參數重命名 5.傳遞數組 6. 傳遞集合 7.傳遞JSON數據 8. 獲取url中數據 9. 傳遞文…

怎么通過OpenAI API調用其多模態大模型(GPT-4o)

現在只要有額度,大家都可以調用OpenAI的多模態大模型了,例如GPT-4o和GPT-4 Turbo,我一年多前總結過一些OpenAI API的用法,發現現在稍微更新了一下。主要參考了這里:https://platform.openai.com/docs/guides/vision 其…

python數據類型之元組、集合和字典

目錄 0.三者主要作用 1.元組 元組特點 創建元組 元組解包 可變和不可變元素元組 2.集合 集合特點 創建集合 集合元素要求 集合方法 訪問與修改 子集和超集 相等性判斷 集合運算 不可變集合 3.字典 字典特點 字典創建和常見操作 字典內置方法 pprin模塊 0.…

k8s——Pod詳解

一、Pod基礎概念 1.1 Pod定義 Pod是kubernetes中最小的資源管理組件,Pod也是最小化運行容器化應用的資源對象。一個Pod代表著集群中運行的一個進程。kubernetes中其他大多數組件都是圍繞著Pod來進行支撐和擴展Pod功能的,例如,用于管理Pod運行…

繆爾賽思又來到了你的面前(哈希)

定義一棵根節點為 1 1 1, n ( 2 ≤ n ≤ 1 0 3 ) n(2≤n≤10^3) n(2≤n≤103) 個節點的樹的哈希值為: H ∑ i 1 n X i Y f a ( i ) m o d 998244353 H∑^n_{i1}X^iY^{fa(i)}\ mod\ 998244353 Hi1∑n?XiYfa(i) mod 998244353 f a ( i ) fa(i) fa(i)…

斷網之后的頁面,Autox.js是點擊還是上下滑動比較好?

在處理斷網之后的頁面,選擇點擊還是上下滑動作為刷新操作,取決于應用的設計和用戶界面。通常,這兩種操作都可以作為刷新頁面的方式,但它們各自有不同的適用場景: 點擊刷新 - 適用場景:如果應用提供了一個明…

Java進階學習筆記7——權限修飾符

什么是權限修飾符? 就是用來限制類中的成員(成員變量、成員方法、構造器、代碼塊....)能夠被訪問的范圍。 protected使用的比較少,但是程序員還是要閱讀代碼,看官方文檔是怎么寫的,都會接觸到protected修飾…

C#串口通信-串口相關參數介紹

串口通訊(Serial Communication),是指外設和計算機間,通過數據信號線、地線等,按位進行傳輸數據的一種雙向通訊方式。 串口是一種接口標準,它規定了接口的電氣標準,沒有規定接口插件電纜以及使用的通信協議&#xff0c…

ssh 配置 authorized_keys 后無法免密登錄

查看日志: tail -f /var/log/auth.log May 25 15:55:13 121 sudo: pam_unix(sudo:session): session opened for user root by root(uid0) May 25 15:55:13 121 sshd[550561]: Received signal 15; terminating. May 25 15:55:13 121 sshd[922866]: Server liste…

性能測試場景的設計方法

引用:根據2008年Aberdeen Group的研究報告,對于Web網站,1秒的頁面加載延遲相當于少了11%的PV(page view),相當于降低了16%的顧客滿意度。如果從金錢的角度計算,就意味著:如果一個網站…

「探討」:什么是網絡審計?好用的網絡審計系統推薦【圖文詳解】

網絡是企業運營、政府管理、個人生活不可或缺的基礎設施。 然而網絡安全問題卻日益凸顯,數據泄露、網絡攻擊、欺詐行為等風險日益嚴重。 一、網絡審計的定義 網絡審計,又稱信息技術審計或電子審計,是指審計人員運用專業技能和工具&#xff…

fdk-aac將aac格式轉為pcm數據

int sampleRate 44100; // 采樣率int sampleSizeInBits 16; // 采樣位數,通常是16int channels 2; // 通道數,單聲道為1,立體聲為2FILE *m_fd NULL;FILE *m_fd2 NULL;HANDLE_AACDECODER decoder aacDecoder_Open(TT_MP4_ADTS, 1);if (!…

實戰之快速完成 ChatGLM3-6B 在 GPU-8G的 INT4 量化和本地部署

ChatGLM3 (ChatGLM3-6B) 項目地址 https://github.com/THUDM/ChatGLM3大模型是很吃CPU和顯卡的,所以,要不有一個好的CPU,要不有一塊好的顯卡,顯卡盡量13G,內存基本要32GB。 清華大模型分為三種(ChatGLM3-6B-Base&…