Flink 為日期和時間提供了豐富的數據類型, 包括?DATE
,?TIME
,?TIMESTAMP
,?TIMESTAMP_LTZ
,?INTERVAL YEAR TO MONTH
,?INTERVAL DAY TO SECOND
?。 Flink 支持在 session (會話)級別設置時區。 Flink 對多種時間類型和時區的支持使得跨時區的數據處理變得非常容易。
1. TIMESTAMP vs TIMESTAMP_LTZ
1.1 TIMESTAMP 類型
TIMESTAMP(p)
?是?TIMESTAMP(p) WITHOUT TIME ZONE
?的簡寫, 精度?p
?支持的范圍是0-9, 默認是6。TIMESTAMP
?用于描述年, 月, 日, 小時, 分鐘, 秒 和 小數秒對應的時間戳。TIMESTAMP
?可以通過一個字符串來指定,例如:
Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
+-------------------------+
| 1970-01-01 00:00:04.001 |
+-------------------------+
1.2 TIMESTAMP_LTZ 類型
TIMESTAMP_LTZ(p)
?是?TIMESTAMP(p) WITH LOCAL TIME ZONE
?的簡寫, 精度?p
?支持的范圍是0-9, 默認是6。TIMESTAMP_LTZ
?用于描述時間線上的絕對時間點, 使用 long 保存從 epoch 至今的毫秒數, 使用int保存毫秒中的納秒數。 epoch 時間是從 java 的標準 epoch 時間?1970-01-01T00:00:00Z
?開始計算。 在計算和可視化時, 每個?TIMESTAMP_LTZ
?類型的數據都是使用的 session (會話)中配置的時區。TIMESTAMP_LTZ
?沒有字符串表達形式因此無法通過字符串來指定, 可以通過一個 long 類型的 epoch 時間來轉化(例如: 通過 Java 來產生一個 long 類型的 epoch 時間?System.currentTimeMillis()
)
Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
Flink SQL> SET 'table.local-time-zone' = 'UTC';
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
| 1970-01-01 00:00:04.001 |
+---------------------------+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
| 1970-01-01 08:00:04.001 |
+---------------------------+
TIMESTAMP_LTZ
?可以用于跨時區的計算,因為它是一個基于 epoch 的絕對時間點(比如上例中的?4001
?毫秒)代表的就是不同時區的同一個絕對時間點。 補充一個背景知識:在同一個時間點, 全世界所有的機器上執行?System.currentTimeMillis()
?都會返回同樣的值。 (比如上例中的?4001
?milliseconds), 這就是絕對時間的定義。
2. 時區的作用
本地時區定義了當前 session(會話)所在的時區, 你可以在 Sql client 或者應用程序中配置。
-- 設置為 UTC 時區
Flink SQL> SET 'table.local-time-zone' = 'UTC';-- 設置為上海時區
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';-- 設置為Los_Angeles時區
Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';
session(會話)的時區設置在 Flink SQL 中非常有用, 它的主要用法如下:
2.1 確定時間函數的返回值
session (會話)中配置的時區會對以下函數生效。
- LOCALTIME
- LOCALTIMESTAMP
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- CURRENT_ROW_TIMESTAMP()
- NOW()
- PROCTIME()
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
Flink SQL> DESC MyView1;
+------------------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------------------+-----------------------------+-------+-----+--------+-----------+
| LOCALTIME | TIME(0) | false | | | |
| LOCALTIMESTAMP | TIMESTAMP(3) | false | | | |
| CURRENT_DATE | DATE | false | | | |
| CURRENT_TIME | TIME(0) | false | | | |
| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | |
|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | |
| NOW() | TIMESTAMP_LTZ(3) | false | | | |
| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
+------------------------+-----------------------------+-------+-----+--------+-----------+
Flink SQL> SET 'table.local-time-zone' = 'UTC';
Flink SQL> SELECT * FROM MyView1;
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
Flink SQL> SELECT * FROM MyView1;
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
2.2 TIMESTAMP_LTZ
?字符串表示
當一個?TIMESTAMP_LTZ
?值轉為 string 格式時, session 中配置的時區會生效。 例如打印這個值,將類型強制轉化為?STRING
?類型, 將類型強制轉換為?TIMESTAMP
?,將?TIMESTAMP
?的值轉化為?TIMESTAMP_LTZ
?類型:
Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
Flink SQL> DESC MyView2;
+------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | true | | | |
| ntz | TIMESTAMP(3) | false | | | |
+------+------------------+-------+-----+--------+-----------+
Flink SQL> SET 'table.local-time-zone' = 'UTC';
Flink SQL> SELECT * FROM MyView2;
+-------------------------+-------------------------+
| ltz | ntz |
+-------------------------+-------------------------+
| 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
+-------------------------+-------------------------+
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
Flink SQL> SELECT * FROM MyView2;
+-------------------------+-------------------------+
| ltz | ntz |
+-------------------------+-------------------------+
| 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
+-------------------------+-------------------------+
Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
Flink SQL> DESC MyView3;
+-------------------------------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-------------------------------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | true | | | |
| CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | |
| CAST(ltz AS STRING) | STRING | true | | | |
| ntz | TIMESTAMP(3) | false | | | |
| CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | |
+-------------------------------+------------------+-------+-----+--------+-----------+
Flink SQL> SELECT * FROM MyView3;
+-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
| ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) |
+-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
| 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
+-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+