假設你有兩個Kafka主題:user_activities_topic
和 product_views_topic
,并且你希望將user_activities_topic
中的數據寫入到user_activities
表,而將product_views_topic
中的數據寫入到product_views
表。
maven
<dependencies><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency>
</dependencies>
Flink Job 示例代碼
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;import java.util.Properties;public class MultipleKafkaToFlinkToMysql {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置Kafka消費者屬性Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");// 創建第一個Kafka消費者 (User Activities)FlinkKafkaConsumer<String> userActivitiesConsumer = new FlinkKafkaConsumer<>("user_activities_topic",new SimpleStringSchema(),kafkaProperties);// 創建第二個Kafka消費者 (Product Views)FlinkKafkaConsumer<String> productViewsConsumer = new FlinkKafkaConsumer<>("product_views_topic",new SimpleStringSchema(),kafkaProperties);// 從Kafka獲取用戶活動數據流env.addSource(userActivitiesConsumer).map(value -> {String[] parts = value.split(",");return new UserActivity(parts[0], parts[1]);}).addSink(JdbcSink.sink("INSERT INTO user_activities (user_id, activity) VALUES (?, ?)",(statement, userActivity) -> {statement.setString(1, userActivity.userId);statement.setString(2, userActivity.activity);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));// 從Kafka獲取產品瀏覽數據流env.addSource(productViewsConsumer).map(value -> {String[] parts = value.split(",");return new ProductView(parts[0], Integer.parseInt(parts[1]));}).addSink(JdbcSink.sink("INSERT INTO product_views (user_id, product_id) VALUES (?, ?)",(statement, productView) -> {statement.setString(1, productView.userId);statement.setInt(2, productView.productId);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}// 用戶活動類public static class UserActivity {public String userId;public String activity;public UserActivity(String userId, String activity) {this.userId = userId;this.activity = activity;}}// 產品瀏覽類public static class ProductView {public String userId;public int productId;public ProductView(String userId, int productId) {this.userId = userId;this.productId = productId;}}
}
當處理多個消費者和表時,直接為每個消費者編寫獨立的代碼會導致代碼冗長且難以維護。為了提高代碼的可維護性和擴展性,可以采用一些設計模式和抽象方法來簡化代碼結構。以下是一些改進策略:
### 1. 使用工廠模式和配置文件
通過使用工廠模式和配置文件,可以將不同Kafka主題和MySQL表的映射關系抽象出來,從而減少重復代碼。
### 2. 示例代碼重構
下面是一個示例,展示了如何通過配置文件和工廠模式來管理多個Kafka消費者和相應的MySQL輸出。
#### 2.1 配置文件 (`application.yaml`)
首先,定義一個配置文件來描述每個消費者的配置信息,包括Kafka主題、目標MySQL表名以及字段映射等。
consumers:- name: user_activities_consumerkafka_topic: user_activities_topicmysql_table: user_activitiesfields:- { index: 0, column: user_id }- { index: 1, column: activity }- name: product_views_consumerkafka_topic: product_views_topicmysql_table: product_viewsfields:- { index: 0, column: user_id }- { index: 1, column: product_id }
#### 2.2 工廠類 (`ConsumerFactory.java`)
創建一個工廠類,根據配置文件中的信息動態生成消費者并設置其數據處理邏輯。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import java.util.Properties;
import java.util.List;
import java.util.Map;public class ConsumerFactory {public static void createAndRegisterConsumers(StreamExecutionEnvironment env, List<Map<String, Object>> consumers) {Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");for (Map<String, Object> consumerConfig : consumers) {String kafkaTopic = (String) consumerConfig.get("kafka_topic");String mysqlTable = (String) consumerConfig.get("mysql_table");List<Map<String, Object>> fields = (List<Map<String, Object>>) consumerConfig.get("fields");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic,new SimpleStringSchema(),kafkaProperties);env.addSource(kafkaConsumer).map(value -> parseMessage(value, fields)).addSink(JdbcSink.sink(generateInsertSQL(mysqlTable, fields),(statement, record) -> populateStatement(statement, record, fields),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));}}private static Map<String, Object> parseMessage(String value, List<Map<String, Object>> fields) {String[] parts = value.split(",");return fields.stream().collect(Collectors.toMap(field -> (String) field.get("column"),field -> parts[(Integer) field.get("index")]));}private static String generateInsertSQL(String table, List<Map<String, Object>> fields) {StringBuilder columns = new StringBuilder();StringBuilder placeholders = new StringBuilder();for (int i = 0; i < fields.size(); i++) {if (i > 0) {columns.append(", ");placeholders.append(", ");}columns.append(fields.get(i).get("column"));placeholders.append("?");}return "INSERT INTO " + table + " (" + columns.toString() + ") VALUES (" + placeholders.toString() + ")";}private static void populateStatement(java.sql.PreparedStatement statement, Map<String, Object> record, List<Map<String, Object>> fields) throws Exception {for (int i = 0; i < fields.size(); i++) {String column = (String) fields.get(i).get("column");Object value = record.get(column);if (value instanceof Integer) {statement.setInt(i + 1, (Integer) value);} else if (value instanceof String) {statement.setString(i + 1, (String) value);}// 其他類型可以根據需要添加}}
}
#### 2.3 主程序 (`Main.java`)
在主程序中加載配置文件,并調用工廠類來注冊所有消費者。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.yaml.snakeyaml.Yaml;import java.io.InputStream;
import java.util.List;
import java.util.Map;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Yaml yaml = new Yaml();InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("application.yaml");Map<String, Object> config = yaml.load(inputStream);List<Map<String, Object>> consumers = (List<Map<String, Object>>) config.get("consumers");ConsumerFactory.createAndRegisterConsumers(env, consumers);env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}
}
### 關鍵點解釋
1. **配置文件**:通過配置文件定義每個消費者的信息,使得添加新的消費者變得簡單,只需修改配置文件即可。
? ?
2. **工廠模式**:使用工廠類 `ConsumerFactory` 根據配置動態創建消費者,并為其設置數據處理邏輯和輸出目標。
3. **通用的數據處理邏輯**:`parseMessage` 方法根據配置文件中的字段映射解析消息,`generateInsertSQL` 和 `populateStatement` 方法則用于生成插入SQL語句和填充PreparedStatement。
4. **擴展性**:這種設計方式非常靈活,易于擴展。如果需要增加新的消費者或修改現有消費者的配置,只需更新配置文件而無需更改代碼邏輯。
這種方法不僅減少了代碼量,還提高了代碼的可維護性和擴展性,使得系統更容易管理和維護。