華為開源構建工具
I’ve developed an open-source data testing and a quality tool called data-flare. It aims to help data engineers and data scientists assure the data quality of large datasets using Spark. In this post I’ll share why I wrote this tool, why the existing tools weren’t enough, and how this tool may be helpful to you.
我已經開發了一個開源數據測試和一個稱為data-flare的質量工具。 它旨在幫助數據工程師和數據科學家使用Spark確保大型數據集的數據質量。 在這篇文章中,我將分享我編寫此工具的原因,為什么現有工具不夠用以及該工具如何為您提供幫助。
誰晚上花時間編寫數據質量工具? (Who spends their evenings writing a data quality tool?)
In every data-driven organisation, we must always recognise that without confidence in the quality of our data, that data is useless. Despite that there are relatively few tools available to help us ensure our data quality stays high.
在每個由數據驅動的組織中,我們必須始終認識到,對數據質量沒有信心,數據就毫無用處。 盡管有相對較少的工具可用來幫助我們確保數據質量保持較高水平。
What I was looking for was a tool that:
我一直在尋找的工具是:
- Helped me write high performance checks on the key properties of my data, like the size of my datasets, the percentage of rows that comply with a condition, or the distinct values in my columns 幫助我對數據的關鍵屬性進行高性能檢查,例如數據集的大小,符合條件的行的百分比或列中的不同值
- Helped me track those key properties over time, so that I can see how my datasets are evolving, and spot problem areas easily 幫助我隨著時間的推移跟蹤這些關鍵屬性,以便我可以查看數據集的發展情況,并輕松發現問題區域
- Enabled me to write more complex checks to check other facets of my data that weren’t simple to incorporate in a property, and enabled me to compare between different datasets 使我能夠編寫更復雜的檢查來檢查我的數據的其他方面,這些方面并非很容易合并到屬性中,并使我能夠在不同的數據集之間進行比較
- Would scale to huge volumes of data 可以擴展到海量數據
The tools that I found were more limited, constraining me to simpler checks defined in yaml or json, or only letting me check simpler properties on a single dataset. I wrote data-flare to fill in these gaps, and provide a one-stop-shop for our data quality needs.
我發現的工具受到限制,使我只能使用yaml或json中定義的更簡單的檢查,或者只能讓我檢查單個數據集上的更簡單的屬性。 我寫了數據耀斑來填補這些空白,并為我們的數據質量需求提供一站式服務。
給我看代碼 (Show me the code)
data-flare is a Scala library built on top of Spark. It means you will need to write some Scala, but I’ve tried to keep the interface simple so that even a non-Scala developer could quickly pick it up.
data-flare是一個基于Spark構建的Scala庫。 這意味著您將需要編寫一些Scala,但是我試圖使界面保持簡單,以便即使是非Scala開發人員也可以快速使用它。
Let’s look at a simple example. Imagine we have a dataset containing orders, with the following attributes:
讓我們看一個簡單的例子。 想象一下,我們有一個包含訂單的數據集,具有以下屬性:
- CustomerId 顧客ID
- OrderId OrderId
- ItemId ItemId
- OrderType 訂單類型
- OrderValue 訂單價值
We can represent this in a Dataset[Order] in Spark, with our order being:
我們可以在Spark的Dataset [Order]中表示它,其順序為:
case class Order(customerId: String, orderId: String, itemId: String, orderType: String, orderValue: Int)
檢查單個數據集 (Checks on a single dataset)
We want to check that our orders are all in order, including checking:
我們要檢查我們的訂單是否全部正常,包括檢查:
- orderType is “Sale” at least 90% of the time orderType至少有90%的時間為“銷售”
- orderTypes of “Refund” have order values of less than 0 “退款”的orderType類型的訂單值小于0
- There are 20 different items that we sell, and we expect orders for each of those 我們出售20種不同的商品,我們希望每個商品都有訂單
- We have at least 100 orders 我們至少有100個訂單
We can do this as follows (here orders represents our Dataset[Order]):
我們可以按照以下步驟進行操作(這里的order代表我們的Dataset [Order]):
val ordersChecks = ChecksSuite("orders",singleDsChecks = Map(DescribedDs(orders, "orders") -> Seq(SingleMetricCheck.complianceCheck(AbsoluteThreshold(0.9, 1),ComplianceFn(col("orderType") === "Sale")),SingleMetricCheck.complianceCheck(AbsoluteThreshold(1, 1),ComplianceFn(col("orderValue") < 0),MetricFilter(col("orderType") === "Refund")),SingleMetricCheck.distinctValuesCheck(AbsoluteThreshold(Some(20), None),List("itemId")),SingleMetricCheck.sizeCheck(AbsoluteThreshold(Some(100), None)))))
As you can see from this code, everything starts with a ChecksSuite. You can then pass in all of your checks that operate on single datasets using the singleDsChecks. We’ve been able to do all of these checks using SingleMetricChecks — these are efficient and perform all checks in a single pass over the dataset.
從該代碼可以看到,所有內容都以ChecksSuite開頭。 然后,您可以使用singleDsChecks傳遞對單個數據集進行的所有檢查。 我們已經能夠使用SingleMetricChecks進行所有這些檢查-這些效率很高,并且可以一次通過數據集來執行所有檢查。
What if we wanted to do something that we couldn’t easily express with a metric check? Let’s say we wanted to check that no customer had more than 5 orders with an orderType of “Flash Sale”. We could express that with an Arbitrary Check like so:
如果我們想做一些無法通過度量標準檢查輕易表達的事情怎么辦? 假設我們要檢查的是,沒有任何客戶的orderType為“ Flash Sale”的訂單超過5個。 我們可以這樣用任意支票來表示:
ArbSingleDsCheck("less than 5 flash sales per customer") { ds =>val tooManyFlashSaleCustomerCount = ds.filter(col("orderType") === "Flash Sale").groupBy("customerId").agg(count("orderId").as("flashSaleCount")).filter(col("flashSaleCount") > 5).countif (tooManyFlashSaleCustomerCount > 0)RawCheckResult(CheckStatus.Error, s"$tooManyFlashSaleCustomerCount customers had too many flash sales")elseRawCheckResult(CheckStatus.Success, "No customers had more than 5 flash sales :)")
}
The ability to define arbitrary checks in this way gives you the power to define any check you want. They won’t be as efficient as the metric based checks, but the flexibility you get can make it a worthwhile trade-off.
以這種方式定義任意檢查的能力使您能夠定義所需的任何檢查。 它們不會像基于指標的檢查那樣高效,但是您獲得的靈活性可以使其成為一個有價值的折衷。
檢查一對數據集 (Checks on a pair of datasets)
Let’s imagine we have a machine learning algorithm that predicts which item each customer will order next. We are returned another Dataset[Order] with predicted orders in it.
假設我們有一個機器學習算法,可以預測每個客戶接下來要訂購的商品。 我們返回了另一個帶有預測訂單的Dataset [Order]。
We may want to compare metrics on our predicted orders with metrics on our original orders. Let’s say that we expect to have an entry in our predicted orders for every customer that has had a previous order. We could check this using Flare as follows:
我們可能希望將預測訂單的指標與原始訂單的指標進行比較。 假設我們希望在每個先前擁有訂單的客戶的預測訂單中都有一個條目。 我們可以使用Flare對此進行檢查,如下所示:
val predictedOrdersChecks = ChecksSuite("orders",dualDsChecks = Map(DescribedDsPair(DescribedDs(orders, "orders"), DescribedDs(predictedOrders, "predictedOrders")) ->Seq(DualMetricCheck(CountDistinctValuesMetric(List("customerId")), CountDistinctValuesMetric(List("customerId")),"predicted orders present for every customer", MetricComparator.metricsAreEqual)))
)
We can pass in dualDsChecks to a ChecksSuite. Here we describe the datasets we want to compare, the metrics we want to calculate for each of those datasets, and a MetricComparator which describes how those metrics should be compared. In this case we want the number of distinct customerIds in each dataset to be equal.
我們可以將dualDsChecks傳遞給ChecksSuite。 在這里,我們描述了我們要比較的數據集,我們要為每個數據集計算的指標,以及描述如何比較這些指標的MetricComparator。 在這種情況下,我們希望每個數據集中不同的customerId數量相等。
運行支票時會怎樣? (What happens when you run your checks?)
When you run your checks all metrics are calculated in a single pass over each dataset, and check results are calculated and returned. You can then decide yourself how to handle those results. For example if one of your checks gives an error you could fail the spark job, or send a failure notification.
運行檢查時,所有指標都將通過一次遍歷每個數據集進行計算,并計算并返回檢查結果。 然后,您可以決定自己如何處理這些結果。 例如,如果您的一項檢查出現錯誤,則可能導致Spark作業失敗或發送失敗通知。
你還能做什么? (What else can you do?)
- Store your metrics and check results by passing in a metricsPersister and qcResultsRepository to your ChecksSuite (ElasticSearch supported out the box, and it’s extendable to support any data store) 通過將metricsPersister和qcResultsRepository傳遞到您的ChecksSuite中來存儲指標并檢查結果(ElasticSearch支持開箱即用,并且可擴展以支持任何數據存儲)
- Graph metrics over time in Kibana so you can spot trends 在Kibana中隨時間繪制指標圖表,以便您發現趨勢
- Write arbitrary checks for pairs of datasets 為數據集對編寫任意檢查
For more information check out the documentation and the code!
有關更多信息,請查閱文檔和代碼 !
翻譯自: https://medium.com/swlh/why-i-built-an-opensource-tool-for-big-data-testing-and-quality-control-182a14701e8d
華為開源構建工具
本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。 如若轉載,請注明出處:http://www.pswp.cn/news/390632.shtml 繁體地址,請注明出處:http://hk.pswp.cn/news/390632.shtml 英文地址,請注明出處:http://en.pswp.cn/news/390632.shtml
如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!