sqlalchemy
One of the key aspects of any data science workflow is the sourcing, cleaning, and storing of raw data in a form that can be used upstream. This process is commonly referred to as “Extract-Transform-Load,” or ETL for short.
任何數據科學工作流程的關鍵方面之一就是以可以在上游使用的形式采購,清理和存儲原始數據。 此過程通常稱為“提取-轉換-加載”或簡稱ETL。
It is important to design efficient, robust, and reliable ETL processes, or “data pipelines.” An inefficient pipeline will make working with data slow and unproductive. A non-robust pipeline will break easily, leaving gaps.
設計有效,健壯和可靠的ETL流程或“數據管道”非常重要。 效率低下的管道會使處理數據的速度變慢且效率低下。 非健壯的管道將很容易中斷,并留下空白。
Worse still, an unreliable data pipeline will silently contaminate your database with false data that may not become apparent until damage has been done.
更糟糕的是,不可靠的數據管道將以錯誤的數據靜默污染數據庫,這些錯誤的數據在損壞完成之前可能不會變得明顯。
Although critically important, ETL development can be a slow and cumbersome process at times. Luckily, there are open source solutions that make life much easier.
盡管非常重要,但ETL開發有時可能是一個緩慢而繁瑣的過程。 幸運的是,有開源解決方案可以使生活更加輕松。
什么是SQLAlchemy? (What is SQLAlchemy?)
One such solution is a Python module called SQLAlchemy. It allows data engineers and developers to define schemas, write queries, and manipulate SQL databases entirely through Python.
一種這樣的解決方案是稱為SQLAlchemy的Python模塊。 它允許數據工程師和開發人員完全通過Python定義架構,編寫查詢和操作SQL數據庫。
SQLAlchemy’s Object Relational Mapper (ORM) and Expression Language functionalities iron out some of the idiosyncrasies apparent between different implementations of SQL by allowing you to associate Python classes and constructs with data tables and expressions.
SQLAlchemy的對象關系映射器(ORM)和表達式語言功能允許您將Python類和構造與數據表和表達式相關聯,從而消除了不同SQL實現之間的某些特質。
Here, we’ll run through some highlights of SQLAlchemy to discover what it can do and how it can make ETL development a smoother process.
在這里,我們將遍歷SQLAlchemy的一些要點,以發現它可以做什么以及如何使ETL開發更順暢。
配置 (Setting up)
You can install SQLAlchemy using the pip package installer.
您可以使用pip軟件包安裝程序安裝SQLAlchemy。
$ sudo pip install sqlalchemy
As for SQL itself, there are many different versions available, including MySQL, Postgres, Oracle, and Microsoft SQL Server. For this article, we’ll be using SQLite.
至于SQL本身,有許多可用的版本,包括MySQL,Postgres,Oracle和Microsoft SQL Server。 對于本文,我們將使用SQLite。
SQLite is an open-source implementation of SQL that usually comes pre-installed with Linux and Mac OS X. It is also available for Windows. If you don’t have it on your system already, you can follow these instructions to get up and running.
SQLite是SQL的開源實現,通常預裝在Linux和Mac OS X中。它也可用于Windows。 如果您的系統上尚未安裝它,則可以按照以下說明進行操作 。
In a new directory, use the terminal to create a new database:
在新目錄中,使用終端創建新數據庫:
$ mkdir sqlalchemy-demo && cd sqlalchemy-demo
$ touch demo.db
定義架構 (Defining a schema)
A database schema defines the structure of a database system, in terms of tables, columns, fields, and the relationships between them. Schemas can be defined in raw SQL, or through the use of SQLAlchemy’s ORM feature.
數據庫模式根據表,列,字段以及它們之間的關系來定義數據庫系統的結構。 模式可以在原始SQL中定義,也可以通過使用SQLAlchemy的ORM功能進行定義。
Below is an example showing how to define a schema of two tables for an imaginary blogging platform. One is a table of users, and the other is a table of posts uploaded.
下面的示例顯示了如何為虛構的博客平臺定義兩個表的架構。 一個是用戶表,另一個是上載的帖子表。
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *engine = create_engine('sqlite:///demo.db')
Base = declarative_base()class Users(Base):__tablename__ = "users"UserId = Column(Integer, primary_key=True)Title = Column(String)FirstName = Column(String)LastName = Column(String)Email = Column(String)Username = Column(String)DOB = Column(DateTime)class Uploads(Base):__tablename__ = "uploads"UploadId = Column(Integer, primary_key=True)UserId = Column(Integer)Title = Column(String)Body = Column(String)Timestamp = Column(DateTime)Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)
First, import everything you need from SQLAlchemy. Then, use create_engine(connection_string)
to connect to your database. The exact connection string will depend on the version of SQL you are working with. This example uses a relative path to the SQLite database created earlier.
首先,從SQLAlchemy導入所需的一切。 然后,使用create_engine(connection_string)
連接到數據庫。 確切的連接字符串將取決于您使用SQL版本。 本示例使用到先前創建SQLite數據庫的相對路徑。
Next, start defining your table classes. The first one in the example is Users
. Each column in this table is defined as a class variable using SQLAlchemy’s Column(type)
, where type
is a data type (such as Integer
, String
, DateTime
and so on). Use primary_key=True
to denote columns which will be used as primary keys.
接下來,開始定義表類。 示例中的第一個是Users
。 該表中的每一列都使用SQLAlchemy的Column(type)
定義為類變量,其中type
是數據類型(例如Integer
, String
, DateTime
等)。 使用primary_key=True
表示將用作主鍵的列。
The next table defined here is Uploads
. It’s very much the same idea — each column is defined as before.
此處定義的下一個表格是Uploads
。 這幾乎是相同的想法-每列的定義都像以前一樣。
The final two lines actually create the tables. The checkfirst=True
parameter ensures that new tables are only created if they do not currently exist in the database.
最后兩行實際上創建了表。 checkfirst=True
參數可確保僅在數據庫中當前不存在新表時才創建它們。
提取 (Extract)
Once the schema has been defined, the next task is to extract the raw data from its source. The exact details can vary wildly from case to case, depending on how the raw data is provided. Maybe your app calls an in-house or third-party API, or perhaps you need to read data logged in a CSV file.
定義架構后,下一個任務是從其源中提取原始數據。 具體情況可能因情況而異,具體取決于提供原始數據的方式。 也許您的應用調用了內部API或第三方API,或者您可能需要讀取記錄在CSV文件中的數據。
The example below uses two APIs to simulate data for the fictional blogging platform described above. The Users
table will be populated with profiles randomly generated at randomuser.me, and the Uploads
table will contain lorem ipsum-inspired data courtesy of JSONPlaceholder.
下面的示例使用兩個API為上述虛構的博客平臺模擬數據。 將在Users
表中填充在randomuser.me處隨機生成的配置文件,并且Uploads
表將包含受lorem ipsum啟發的JSONPlaceholder數據。
Python’s Requests
module can be used to call these APIs, as shown below:
可以使用Python的Requests
模塊來調用這些API,如下所示:
import requestsurl = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()
url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()
The data is currently held in two objects (users_json
and uploads_json
) in JSON format. The next step will be to transform and load this data into the tables defined earlier.
數據當前以JSON格式保存在兩個對象( users_json
和uploads_json
)中。 下一步將轉換此數據并將其加載到之前定義的表中。
轉變 (Transform)
Before the data can be loaded into the database, it is important to ensure that it is in the correct format. The JSON objects created in the code above are nested, and contain more data than is required for the tables defined.
在將數據加載到數據庫之前,重要的是要確保其格式正確。 在上面的代碼中創建的JSON對象是嵌套的,并且包含的??數據量超過定義的表所需的數據量。
An important intermediary step is to transform the data from its current nested JSON format to a flat format that can be safely written to the database without error.
一個重要的中間步驟是從其當前嵌套JSON格式變換的數據可以被安全地寫入到數據庫而不會出現錯誤的平坦格式。
For the example running through this article, the data are relatively simple, and won’t need much transformation. The code below creates two lists, users
and uploads
, which will be used in the final step:
對于本文中運行的示例,數據相對簡單,不需要太多轉換。 下面的代碼創建兩個列表, users
和uploads
,將在最后一步中使用它們:
from datetime import datetime, timedelta
from random import randintusers, uploads = [], []for i, result in enumerate(users_json['results']):row = {}row['UserId'] = irow['Title'] = result['name']['title']row['FirstName'] = result['name']['first']row['LastName'] = result['name']['last']row['Email'] = result['email']row['Username'] = result['login']['username']dob = datetime.strptime(result['dob'],'%Y-%m-%d %H:%M:%S') row['DOB'] = dob.date()users.append(row)for result in uploads_json:row = {}row['UploadId'] = result['id']row['UserId'] = result['userId']row['Title'] = result['title']row['Body'] = result['body']delta = timedelta(seconds=randint(1,86400))row['Timestamp'] = datetime.now() - deltauploads.append(row)
The main step here is to iterate through the JSON objects created before. For each result, create a new Python dictionary object with keys corresponding to each column defined for the relevant table in the schema. This ensures that the data is no longer nested, and keeps only the data needed for the tables.
這里的主要步驟是遍歷之前創建的JSON對象。 對于每個結果,創建一個新的Python字典對象,其鍵對應于為架構中相關表定義的每一列。 這樣可以確保不再嵌套數據,并僅保留表所需的數據。
The other step is to use Python’s datetime
module to manipulate dates, and transform them into DateTime
type objects that can be written to the database. For the sake of this example, random DateTime
objects are generated using the timedelta()
method from Python’s DateTime module.
另一個步驟是使用Python的datetime
模塊來處理日期,并將其轉換為可以寫入數據庫的DateTime
類型的對象。 對于本示例,將使用Python的DateTime模塊中的timedelta()
方法生成隨機的DateTime
對象。
Each created dictionary is appended to a list, which will be used in the final step of the pipeline.
每個創建的字典都會附加到列表中,該列表將在管道的最后一步中使用。
加載 (Load)
Finally, the data is in a form that can be loaded into the database. SQLAlchemy makes this step straightforward through its Session API.
最后,數據采用可以加載到數據庫中的形式。 SQLAlchemy通過其Session API使這一步驟變得簡單。
The Session API acts a bit like a middleman, or “holding zone,” for Python objects you have either loaded from or associated with the database. These objects can be manipulated within the session before being committed to the database.
對于從數據庫加載或與數據庫關聯的Python對象,Session API的行為有點像中間人或“保留區”。 這些對象可以在提交給數據庫之前在會話中進行操作。
The code below creates a new session object, adds rows to it, then merges and commits them to the database:
下面的代碼創建一個新的會話對象,向其中添加行,然后合并并將它們提交到數據庫:
Session = sessionmaker(bind=engine)
session = Session()for user in users:row = Users(**user)session.add(row)for upload in uploads:row = Uploads(**upload)session.add(row)session.commit()
The sessionmaker
factory is used to generate newly-configured Session
classes. Session
is an everyday Python class that is instantiated on the second line as session
.
sessionmaker
工廠用于生成新配置的Session
類。 Session
是日常的Python類,在第二行將其實例化為session
。
Next up are two loops which iterate through the users
and uploads
lists created earlier. The elements of these lists are dictionary objects whose keys correspond to the columns given in the Users
and Uploads
classes defined previously.
接下來是兩個循環,這些循環遍歷users
并uploads
先前創建的列表。 這些列表的元素是字典對象,其關鍵字對應于先前定義的“ Users
和“上Uploads
類中給定的列。
Each object is used to instantiate a new instance of the relevant class (using Python’s handy some_function(**some_dict)
trick). This object is added to the current session with session.add()
.
每個對象用于實例化相關類的新實例(使用Python方便的some_function(**some_dict)
技巧)。 該對象通過session.add()
添加到當前會話中。
Finally, when the session contains the rows to be added, session.commit()
is used to commit the transaction to the database.
最后,當會話包含要添加的行時, session.commit()
用于將事務提交到數據庫。
匯總 (Aggregating)
Another cool feature of SQLAlchemy is the ability to use its Expression Language system to write and execute backend-agnostic SQL queries.
SQLAlchemy的另一個很酷的功能是能夠使用其Expression Language系統編寫和執行與后端無關SQL查詢。
What are the advantages of writing backend-agnostic queries? For a start, they make any future migration projects a whole lot easier. Different versions of SQL have somewhat incompatible syntaxes, but SQLAlchemy’s Expression Language acts as a lingua franca between them.
編寫與后端無關的查詢有什么優勢? 首先,它們使將來的任何遷移項目變得更加容易。 不同版本SQL語法有些不兼容,但是SQLAlchemy的Expression Language充當它們之間的通用語言。
Also, being able to query and interact with your database in a seamlessly Pythonic way is a real advantage to developers who’d prefer work entirely in the language they know best. However, SQLAlchemy will also let you work in plain SQL, for cases when it is simpler to use a pre-written query.
此外,對于希望完全使用他們最了解的語言工作的開發人員而言,能夠以無縫的Python語言方式查詢數據庫并與之交互是一個真正的優勢。 但是,在使用預先編寫的查詢更簡單的情況下,SQLAlchemy還可以讓您使用純SQL。
Here, we will extend the fictional blogging platform example to illustrate how this works. Once the basic Users and Uploads tables have been created and populated, a next step might be to create an aggregated table — for instance, showing how many articles each user has posted, and the time they were last active.
在這里,我們將擴展虛構的博客平臺示例,以說明其工作原理。 創建并填充基本的“用戶和上載”表后,下一步可能是創建匯總表-例如,顯示每個用戶發布了多少文章以及他們上次活動的時間。
First, define a class for the aggregated table:
首先,為匯總表定義一個類:
class UploadCounts(Base):__tablename__ = "upload_counts"UserId = Column(Integer, primary_key=True)LastActive = Column(DateTime)PostCount = Column(Integer)UploadCounts.__table__.create(bind=engine, checkfirst=True)
This table will have three columns. For each UserId
, it will store the timestamp of when they were last active, and a count of how many posts they have uploaded.
該表將包含三列。 對于每個UserId
,它將存儲上次激活時間的時間戳以及上載多少帖子的計數。
In plain SQL, this table would be populated using a query along the lines of:
在普通SQL中,將使用查詢填充該表,包括:
INSERT INTO upload_counts
SELECTUserId,MAX(Timestamp) AS LastActive,COUNT(UploadId) AS PostCount
FROMuploads
GROUP BY 1;
In SQLAlchemy, this would be written as:
在SQLAlchemy中,這將寫為:
connection = engine.connect()query = select([Uploads.UserId,func.max(Uploads.Timestamp).label('LastActive'),func.count(Uploads.UploadId).label('PostCount')]).\ group_by('UserId')results = connection.execute(query)for result in results:row = UploadCounts(**result)session.add(row)session.commit()
The first line creates a Connection
object using the engine
object’s connect()
method. Next, a query is defined using the select()
function.
第一行使用engine
對象的connect()
方法創建一個Connection
對象。 接下來,使用select()
函數定義查詢。
This query is the same as the plain SQL version given above. It selects the UserId
column from the uploads
table. It also applies func.max()
to the Timestamp
column, which identifies the most recent timestamp. This is labelled LastActive
using the label()
method.
此查詢與上面給出的普通SQL版本相同。 它從uploads
表中選擇UserId
列。 還將func.max()
應用于“ Timestamp
列,該列標識最近的時間戳。 使用label()
方法將其label()
為LastActive
。
Likewise, the query applies func.count()
to count the number of records that appear in the Title
column. This is labelled PostCount
.
同樣,該查詢將應用func.count()
來計算出現在“ Title
列中的記錄數。 這被標記為PostCount
。
Finally, the query uses group_by()
to group results by UserId
.
最后,查詢使用group_by()
將結果按UserId
分組。
To use the results of the query, a for loop iterates over the row objects returned by connection.execute(query)
. Each row is used to instantiate an instance of the UploadCounts
table class. As before, each row is added to the session
object, and finally the session is committed to the database.
要使用查詢結果,for循環遍歷由connection.execute(query)
返回的行對象。 每行用于實例化UploadCounts
表類的實例。 和以前一樣,將每一行添加到session
對象,最后將會話提交到數據庫。
簽出 (Checking out)
Once you have run this script, you may want to convince yourself that the data have been written correctly into the demo.db
database created earlier.
一旦運行了該腳本,您可能想使自己確信數據已正確寫入到先前創建的demo.db
數據庫中。
After quitting Python, open the database in SQLite:
退出Python后,在SQLite中打開數據庫:
$ sqlite3 demo.db
Now, you should be able to run the following queries:
現在,您應該能夠運行以下查詢:
SELECT * FROM users;SELECT * FROM uploads;SELECT * FROM upload_counts;
And the contents of each table will be printed to the console! By scheduling the Python script to run at regular intervals, you can be sure the database will be kept up-to-date.
每個表的內容將被打印到控制臺! 通過安排Python腳本定期運行,可以確保數據庫保持最新狀態。
You could now use these tables to write queries for further analysis, or to build dashboards for visualisation purposes.
現在,您可以使用這些表編寫查詢以進行進一步分析,或者構建儀表板以進行可視化。
進一步閱讀 (Reading further)
If you’ve made it this far, then hopefully you’ll have learned a thing or two about how SQLAlchemy can make ETL development in Python much more straightforward!
如果到此為止,那么希望您能學到一兩個關于SQLAlchemy如何使Python中的ETL開發更加簡單的事情!
It is not possible for a single article to do full justice to all the features of SQLAlchemy. However, one of the project’s key advantages is the depth and detail of its documentation. You can dive into it here.
單篇文章不可能對SQLAlchemy的所有功能都做到十全十美。 但是,該項目的主要優勢之一是其文檔的深度和細節。 你可以在這里潛入。
Otherwise, check out this cheatsheet if you want to get started quickly.
否則,如果您想快速入門,請查看此備忘單 。
The full code for this article can be found in this gist.
本文的完整代碼可在本要點中找到。
Thanks for reading! If you have any questions or comments, please leave a response below.
謝謝閱讀! 如果您有任何疑問或意見,請在下面留下答復。
翻譯自: https://www.freecodecamp.org/news/sqlalchemy-makes-etl-magically-easy-ab2bd0df928/
sqlalchemy