學會基本的圖像處理技術。?
OpenCV 基礎
實踐:使用 OpenCV 進行圖像讀取、顯示和基本處理?
03
?
代碼示例
1. 導入必要的庫
import cv2
import numpy as np
import matplotlib.pyplot as plt
2. 圖像讀取
# 讀取圖像
image_path = 'path_to_your_image.jpg' # 替換為你的圖像路徑
image = cv2.imread(image_path)
# 檢查圖像是否成功讀取
if image is None:
print("圖像讀取失敗,請檢查路徑是否正確。")
else:
print("圖像讀取成功!")
3. 圖像顯示
# 使用 OpenCV 顯示圖像
cv2.imshow('原圖', image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示圖像
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.title('原圖')
plt.axis('off')
plt.show()
4. 圖像基本信息
# 獲取圖像的基本信息
height, width, channels = image.shape
print(f"圖像高度: {height} 像素")
print(f"圖像寬度: {width} 像素")
print(f"圖像通道數: {channels}")
5. 圖像灰度化???????
# 將圖像轉換為灰度圖像
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 顯示灰度圖像
cv2.imshow('灰度圖', gray_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示灰度圖像
plt.imshow(gray_image, cmap='gray')
plt.title('灰度圖')
plt.axis('off')
plt.show()
6. 圖像裁剪???????
# 裁剪圖像
cropped_image = image[100:400, 100:400]
# 顯示裁剪后的圖像
cv2.imshow('裁剪圖', cropped_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示裁剪后的圖像
plt.imshow(cv2.cvtColor(cropped_image, cv2.COLOR_BGR2RGB))
plt.title('裁剪圖')
plt.axis('off')
plt.show()
7. 圖像縮放???????
# 縮放圖像
resized_image = cv2.resize(image, (width // 2, height // 2))
# 顯示縮放后的圖像
cv2.imshow('縮放圖', resized_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示縮放后的圖像
plt.imshow(cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB))
plt.title('縮放圖')
plt.axis('off')
plt.show()
8. 圖像旋轉???????
# 旋轉圖像
center = (width // 2, height // 2)
angle = 45
scale = 1.0
rotation_matrix = cv2.getRotationMatrix2D(center, angle, scale)
rotated_image = cv2.warpAffine(image, rotation_matrix, (width, height))
# 顯示旋轉后的圖像
cv2.imshow('旋轉圖', rotated_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示旋轉后的圖像
plt.imshow(cv2.cvtColor(rotated_image, cv2.COLOR_BGR2RGB))
plt.title('旋轉圖')
plt.axis('off')
plt.show()
9. 圖像翻轉???????
# 翻轉圖像
flipped_image = cv2.flip(image, 1) # 1 表示水平翻轉,0 表示垂直翻轉,-1 表示水平和垂直翻轉
# 顯示翻轉后的圖像
cv2.imshow('翻轉圖', flipped_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示翻轉后的圖像
plt.imshow(cv2.cvtColor(flipped_image, cv2.COLOR_BGR2RGB))
plt.title('翻轉圖')
plt.axis('off')
plt.show()
10. 圖像保存???????
# 保存處理后的圖像
output_path = 'processed_image.jpg'
cv2.imwrite(output_path, flipped_image)
print(f"處理后的圖像已保存到 {output_path}")
04
?
實踐???????
import cv2
import numpy as np
import matplotlib.pyplot as plt
# 讀取圖像
image_path = 'path_to_your_image.jpg' # 替換為你的圖像路徑
image = cv2.imread(image_path)
# 檢查圖像是否成功讀取
if image is None:
print("圖像讀取失敗,請檢查路徑是否正確。")
else:
print("圖像讀取成功!")
# 使用 OpenCV 顯示圖像
cv2.imshow('原圖', image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示圖像
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.title('原圖')
plt.axis('off')
plt.show()
# 獲取圖像的基本信息
height, width, channels = image.shape
print(f"圖像高度: {height} 像素")
print(f"圖像寬度: {width} 像素")
print(f"圖像通道數: {channels}")
# 將圖像轉換為灰度圖像
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 顯示灰度圖像
cv2.imshow('灰度圖', gray_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示灰度圖像
plt.imshow(gray_image, cmap='gray')
plt.title('灰度圖')
plt.axis('off')
plt.show()
# 裁剪圖像
cropped_image = image[100:400, 100:400]
# 顯示裁剪后的圖像
cv2.imshow('裁剪圖', cropped_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示裁剪后的圖像
plt.imshow(cv2.cvtColor(cropped_image, cv2.COLOR_BGR2RGB))
plt.title('裁剪圖')
plt.axis('off')
plt.show()
# 縮放圖像
resized_image = cv2.resize(image, (width // 2, height // 2))
# 顯示縮放后的圖像
cv2.imshow('縮放圖', resized_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示縮放后的圖像
plt.imshow(cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB))
plt.title('縮放圖')
plt.axis('off')
plt.show()
# 旋轉圖像
center = (width // 2, height // 2)
angle = 45
scale = 1.0
rotation_matrix = cv2.getRotationMatrix2D(center, angle, scale)
rotated_image = cv2.warpAffine(image, rotation_matrix, (width, height))
# 顯示旋轉后的圖像
cv2.imshow('旋轉圖', rotated_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示旋轉后的圖像
plt.imshow(cv2.cvtColor(rotated_image, cv2.COLOR_BGR2RGB))
plt.title('旋轉圖')
plt.axis('off')
plt.show()
# 翻轉圖像
flipped_image = cv2.flip(image, 1) # 1 表示水平翻轉,0 表示垂直翻轉,-1 表示水平和垂直翻轉
# 顯示翻轉后的圖像
cv2.imshow('翻轉圖', flipped_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 使用 Matplotlib 顯示翻轉后的圖像
plt.imshow(cv2.cvtColor(flipped_image, cv2.COLOR_BGR2RGB))
plt.title('翻轉圖')
plt.axis('off')
plt.show()
# 保存處理后的圖像
output_path = 'processed_image.jpg'
cv2.imwrite(output_path, flipped_image)
print(f"處理后的圖像已保存到 {output_path}")
05
總結
通過今天的練習,你應該已經學會了如何使用 OpenCV 進行基本的圖像處理,包括圖像讀取、顯示、灰度化、裁剪、縮放、旋轉和翻轉
數據管道
學會安裝和配置 Apache Airflow
使用 Airflow 構建一個簡單的數據處理管道
調度和監控任務?
01
?
目標
學會安裝和配置 Apache Airflow
使用 Airflow 構建一個簡單的數據處理管道
調度和監控任務?
02
?
步驟
1. 安裝 Apache Airflow
首先,我們需要安裝 Apache Airflow。可以使用 pip 來安裝:
pip install apache-airflow
2. 初始化 Airflow
初始化 Airflow 的數據庫和配置文件:
airflow db init
3. 啟動 Airflow Web 服務器
啟動 Airflow 的 Web 服務器,這樣我們可以在瀏覽器中監控和管理任務:
airflow webserver --port 8080
4. 啟動 Airflow Scheduler
啟動 Airflow 的調度器,它負責調度任務:
airflow scheduler
5. 創建 DAG 文件
DAG(Directed Acyclic Graph)是 Airflow 中的核心概念,表示一系列任務的依賴關系。我們將在 dags 目錄下創建一個 Python 文件來定義我們的 DAG。
假設我們有一個簡單的數據處理管道,包括以下幾個任務:
從 CSV 文件中讀取數據
清洗數據(處理缺失值和重復行)
按部門分組并計算每組的銷售額均值
將結果保存到新的 CSV 文件
創建一個名為 simple_data_pipeline.py 的文件,內容如下:???????
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import os
# 定義默認參數
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 創建 DAG
dag = DAG(
'simple_data_pipeline',
default_args=default_args,
description='一個簡單的數據處理管道',
schedule_interval=timedelta(days=1),
)
# 定義任務函數
def read_csv_file():
"""從 CSV 文件中讀取數據"""
file_path = '/path/to/sales_data.csv'
df = pd.read_csv(file_path, encoding='utf-8-sig')
print(f"原始數據集: \n{df.head()}")
return df
def clean_data(**context):
"""清洗數據(處理缺失值和重復行)"""
df = context['ti'].xcom_pull(task_ids='read_csv_file')
# 檢查每列的缺失值數量
missing_values = df.isnull().sum()
print(f"每列的缺失值數量: \n{missing_values}")
# 刪除含有缺失值的行
df_cleaned = df.dropna()
print(f"刪除缺失值后的數據集: \n{df_cleaned.head()}")
# 檢查重復行
duplicates = df_cleaned.duplicated()
print(f"重復行: \n{duplicates}")
# 刪除重復行
df_no_duplicates = df_cleaned.drop_duplicates()
print(f"刪除重復行后的數據集: \n{df_no_duplicates.head()}")
return df_no_duplicates
def group_and_calculate_mean(**context):
"""按部門分組并計算每組的銷售額均值"""
df = context['ti'].xcom_pull(task_ids='clean_data')
# 按 '部門' 列分組
grouped_by_department = df.groupby('部門')
# 計算每組的銷售額均值
mean_sales_by_department = grouped_by_department['總價'].mean()
print(f"按 '部門' 列分組后,每組的銷售額均值: \n{mean_sales_by_department}")
return mean_sales_by_department
def save_results(**context):
"""將結果保存到新的 CSV 文件"""
mean_sales_by_department = context['ti'].xcom_pull(task_ids='group_and_calculate_mean')
result_path = '/path/to/mean_sales_by_department.csv'
mean_sales_by_department.to_csv(result_path, encoding='utf-8-sig')
print(f"結果已保存到 {result_path}")
# 定義任務
read_csv_task = PythonOperator(
task_id='read_csv_file',
python_callable=read_csv_file,
dag=dag,
)
clean_data_task = PythonOperator(
task_id='clean_data',
python_callable=clean_data,
provide_context=True,
dag=dag,
)
group_and_calculate_mean_task = PythonOperator(
task_id='group_and_calculate_mean',
python_callable=group_and_calculate_mean,
provide_context=True,
dag=dag,
)
save_results_task = PythonOperator(
task_id='save_results',
python_callable=save_results,
provide_context=True,
dag=dag,
)
# 設置任務依賴關系
read_csv_task >> clean_data_task >> group_and_calculate_mean_task >> save_results_task
6. 配置 Airflow
確保 dags 目錄在 Airflow 的配置文件中正確設置。通常情況下,Airflow 會自動檢測 dags 目錄下的 DAG 文件。
7. 運行和監控任務
打開瀏覽器,訪問 http://localhost:8080,登錄 Airflow 的 Web 界面。
在 DAG 列表中找到 simple_data_pipeline,點擊進入詳細頁面。
點擊 "Trigger Dag" 按鈕手動觸發任務,或者等待調度器自動運行任務。
在任務詳情頁面中,可以查看每個任務的運行狀態和日志。?
03
?
總結
通過今天的實踐,你應該已經學會了如何使用 Apache Airflow 構建一個簡單的數據處理管道。這個管道包括從 CSV 文件中讀取數據、清洗數據、按部門分組計算銷售額均值,并將結果保存到新的 CSV 文件。
?