Apache-Flink深度解析-DataStream-Connectors之Kafka


Kafka 簡介

Apache Kafka是一個分布式發布-訂閱消息傳遞系統。 它最初由LinkedIn公司開發,LinkedIn于2010年貢獻給了Apache基金會并成為頂級開源項目。Kafka用于構建實時數據管道和流式應用程序。它具有水平擴展性、容錯性、極快的速度,目前也得到了廣泛的應用。

Kafka不但是分布式消息系統而且也支持流式計算,所以在介紹Kafka在Apache Flink中的應用之前,先以一個Kafka的簡單示例直觀了解什么是Kafka。

安裝

本篇不是系統的,詳盡的介紹Kafka,而是想讓大家直觀認識Kafka,以便在Apahe Flink中進行很好的應用,所以我們以最簡單的方式安裝Kafka。

  • 下載二進制包

curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz復制代碼
  • 解壓安裝
    Kafka安裝只需要將下載的tgz解壓即可,如下:

jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE        NOTICE        bin        config        libs        site-docs
復制代碼

其中bin包含了所有Kafka的管理命令,如接下來我們要啟動的Kafka的Server。

  • 啟動Kafka Server
    Kafka是一個發布訂閱系統,消息訂閱首先要有個服務存在。我們啟動一個Kafka Server 實例。 Kafka需要使用ZooKeeper,要進行投產部署我們需要安裝ZooKeeper集群,這不在本篇的介紹范圍內,所以我們利用Kafka提供的腳本,安裝一個只有一個節點的ZooKeeper實例。如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)復制代碼

啟動之后,ZooKeeper會綁定2181端口(默認)。接下來我們啟動Kafka Server,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)復制代碼

如果上面一切順利,Kafka的安裝就完成了。

創建Topic

Kafka是消息訂閱系統,首先創建可以被訂閱的Topic,我們創建一個名為flink-tipic的Topic,在一個新的terminal中,執行如下命令:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipicCreated topic "flink-tipic".復制代碼

在Kafka Server的terminal中也會輸出如下成功創建信息:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...復制代碼

上面顯示了flink-topic的基本屬性配置,如消息壓縮方式,消息格式,備份數量等等。

除了看日志,我們可以用命令顯示的查詢我們是否成功的創建了flink-topic,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181flink-tipic復制代碼

如果輸出flink-tipic,那么說明我們的Topic成功創建了。

那么Topic是保存在哪里?Kafka是怎樣進行消息的發布和訂閱的呢?為直觀,我們看如下Kafka架構示意圖簡單理解一下:

簡單介紹一下,Kafka利用ZooKeeper來存儲集群信息,也就是上面我們啟動的Kafka Server 實例,一個集群中可以有多個Kafka Server 實例,Kafka Server叫做Broker,我們創建的Topic可以在一個或多個Broker中。Kafka利用Push模式發送消息,利用Pull方式拉取消息。

發送消息

如何向已經存在的Topic中發送消息呢,當然我們可以API的方式編寫代碼發送消息。同時,還可以利用命令方式來便捷的發送消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg 
>Kafka connector復制代碼

上面我們發送了兩條消息Kafka test msgKafka connectorflink-topic Topic中。

讀取消息

如果讀取指定Topic的消息呢?同樣可以API和命令兩種方式都可以完成,我們以命令方式讀取flink-topic的消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector復制代碼

其中--from-beginning 描述了我們從Topic開始位置讀取消息。

Flink Kafka Connector

前面我們以最簡單的方式安裝了Kafka環境,那么我們以上面的環境介紹Flink Kafka Connector的使用。Flink Connector相關的基礎知識會在《Apache Flink 漫談系列(14) - Connectors》中介紹,這里我們直接介紹與Kafka Connector相關的內容。

Apache Flink 中提供了多個版本的Kafka Connector,本篇以flink-1.7.0版本為例進行介紹。

mvn 依賴

要使用Kakfa Connector需要在我們的pom中增加對Kafka Connector的依賴,如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.7.0</version>
</dependency>復制代碼

Flink Kafka Consumer需要知道如何將Kafka中的二進制數據轉換為Java / Scala對象。 DeserializationSchema允許用戶指定這樣的模式。 為每個Kafka消息調用 T deserialize(byte [] message)方法,從Kafka傳遞值。

Examples

我們示例讀取Kafka的數據,再將數據做簡單處理之后寫入到Kafka中。我們需要再創建一個用于寫入的Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output復制代碼

所以示例中我們Source利用flink-topic, Sink用slink-topic-output

Simple ETL

我們假設Kafka中存儲的就是一個簡單的字符串,所以我們需要一個用于對字符串進行serializedeserialize的實現,也就是我們要定義一個實現DeserializationSchemaSerializationSchema 的序列化和反序列化的類。因為我們示例中是字符串,所以我們自定義一個KafkaMsgSchema實現類,然后在編寫Flink主程序。

  • KafkaMsgSchema - 完整代碼

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {private static final long serialVersionUID = 1L;private transient Charset charset;public KafkaMsgSchema() {// 默認UTF-8編碼this(Charset.forName("UTF-8"));}public KafkaMsgSchema(Charset charset) {this.charset = Preconditions.checkNotNull(charset);}public Charset getCharset() {return this.charset;}public String deserialize(byte[] message) {// 將Kafka的消息反序列化為java對象return new String(message, charset);}public boolean isEndOfStream(String nextElement) {// 流永遠不結束return false;}public byte[] serialize(String element) {// 將java對象序列化為Kafka的消息return element.getBytes(this.charset);}public TypeInformation<String> getProducedType() {// 定義產生的數據Typeinforeturn BasicTypeInfo.STRING_TYPE_INFO;}private void writeObject(ObjectOutputStream out) throws IOException {out.defaultWriteObject();out.writeUTF(this.charset.name());}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();String charsetName = in.readUTF();this.charset = Charset.forName(charsetName);}
}
復制代碼
  • 主程序 - 完整代碼

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;public class KafkaExample {public static void main(String[] args) throws Exception {// 用戶參數獲取final ParameterTool parameterTool = ParameterTool.fromArgs(args);// Stream 環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Source的topicString sourceTopic = "flink-topic";// Sink的topicString sinkTopic = "flink-topic-output";// broker 地址String broker = "localhost:9092";// 屬性參數 - 實際投產可以在命令行傳入Properties p = parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers", broker);env.getConfig().setGlobalJobParameters(parameterTool);// 創建消費者FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(sourceTopic,new KafkaMsgSchema(),p);// 設置讀取最早的數據
//        consumer.setStartFromEarliest();// 讀取Kafka消息DataStream<String> input = env.addSource(consumer);// 數據處理DataStream<String> result = input.map(new MapFunction<String, String>() {public String map(String s) throws Exception {String msg = "Flink study ".concat(s);System.out.println(msg);return msg;}});// 創建生產者FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(sinkTopic,new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);// 將數據寫入Kafka指定Topic中result.addSink(producer);// 執行jobenv.execute("Kafka Example");}
}
復制代碼

運行主程序如下:


我測試操作的過程如下:

  1. 啟動flink-topicflink-topic-output的消費拉取;

  2. 通過命令向flink-topic中添加測試消息only for test;

  3. 通過命令打印驗證添加的測試消息 only for test;

  4. 最簡單的FlinkJob source->map->sink 對測試消息進行map處理:"Flink study ".concat(s);

  5. 通過命令打印sink的數據;

#### 內置Schemas
Apache Flink 內部提供了如下3種內置的常用消息格式的Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation創建模式。 如果數據由Flink寫入和讀取,這將非常有用。

  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它將序列化的JSON轉換為ObjectNode對象,可以使用objectNode.get(“field”)作為(Int / String / ...)()從中訪問字段。 KeyValue objectNode包含“key”和“value”字段,其中包含所有字段以及可選的"metadata"字段,該字段公開此消息的偏移量/分區/主題。

  • AvroDeserializationSchema 它使用靜態提供的模式讀取使用Avro格式序列化的數據。 它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(...))推斷出模式,或者它可以與GenericRecords一起使用手動提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用內置的Schemas需要添加如下依賴:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>1.7.0</version>
</dependency>復制代碼

讀取位置配置

我們在消費Kafka數據時候,可能需要指定消費的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置設置,如下:

  • consumer.setStartFromEarliest() - 從最早的記錄開始;

  • consumer.setStartFromLatest() - 從最新記錄開始;

  • consumer.setStartFromTimestamp(...); // 從指定的epoch時間戳(毫秒)開始;

  • consumer.setStartFromGroupOffsets(); // 默認行為,從上次消費的偏移量進行繼續消費。

上面的位置指定可以精確到每個分區,比如如下代碼:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一個分區從23L開始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二個分區從31L開始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三個分區從43L開始consumer.setStartFromSpecificOffsets(specificStartOffsets);復制代碼

對于沒有指定的分區還是默認的setStartFromGroupOffsets方式。

Topic發現

Kafka支持Topic自動發現,也就是用正則的方式創建FlinkKafkaConsumer,比如:

// 創建消費者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);復制代碼

在上面的示例中,當作業開始運行時,消費者將訂閱名稱與指定正則表達式匹配的所有Topic(以sourceTopic的值開頭并以單個數字結尾)。

定義Watermark(Window)

對Kafka Connector的應用不僅限于上面的簡單數據提取,我們更多時候是期望對Kafka數據進行Event-time的窗口操作,那么就需要在Flink Kafka Source中定義Watermark。

要定義Event-time,首先是Kafka數據里面攜帶時間屬性,假設我們數據是String#Long的格式,如only for test#1000。那么我們將Long作為時間列。

  • KafkaWithTsMsgSchema - 完整代碼
    要想解析上面的Kafka的數據格式,我們需要開發一個自定義的Schema,比如叫KafkaWithTsMsgSchema,將String#Long解析為一個Java的Tuple2<String, Long>,完整代碼如下:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {private static final long serialVersionUID = 1L;private transient Charset charset;public KafkaWithTsMsgSchema() {this(Charset.forName("UTF-8"));}public KafkaWithTsMsgSchema(Charset charset) {this.charset = Preconditions.checkNotNull(charset);}public Charset getCharset() {return this.charset;}public Tuple2<String, Long> deserialize(byte[] message) {String msg = new String(message, charset);String[] dataAndTs = msg.split("#");if(dataAndTs.length == 2){return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));}else{// 實際生產上需要拋出runtime異常System.out.println("Fail due to invalid msg format.. ["+msg+"]");return new Tuple2<String, Long>(msg, 0L);}}@Overridepublic boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {return false;}public byte[] serialize(Tuple2<String, Long> element) {return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);}private void writeObject(ObjectOutputStream out) throws IOException {out.defaultWriteObject();out.writeUTF(this.charset.name());}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();String charsetName = in.readUTF();this.charset = Charset.forName(charsetName);}@Overridepublic TypeInformation<Tuple2<String, Long>> getProducedType() {return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);}
}
復制代碼
  • Watermark生成

提取時間戳和創建Watermark,需要實現一個自定義的時間提取和Watermark生成器。在Apache Flink 內部有2種方式如下:

  • AssignerWithPunctuatedWatermarks - 每條記錄都產生Watermark。

  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

    我們以AssignerWithPunctuatedWatermarks為例寫一個自定義的時間提取和Watermark生成器。代碼如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;import javax.annotation.Nullable;public class KafkaAssignerWithPunctuatedWatermarksimplements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {// 利用提取的時間戳創建Watermarkreturn new Watermark(l);}@Overridepublic long extractTimestamp(Tuple2<String, Long> o, long l) {// 提取時間戳return o.f1;}
}復制代碼
  • 主程序 - 完整程序
    我們計算一個大小為1秒的Tumble窗口,計算窗口內最大的值。完整的程序如下:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;public class KafkaWithEventTimeExample {public static void main(String[] args) throws Exception {// 用戶參數獲取final ParameterTool parameterTool = ParameterTool.fromArgs(args);// Stream 環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置 Event-timeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Source的topicString sourceTopic = "flink-topic";// Sink的topicString sinkTopic = "flink-topic-output";// broker 地址String broker = "localhost:9092";// 屬性參數 - 實際投產可以在命令行傳入Properties p = parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers", broker);env.getConfig().setGlobalJobParameters(parameterTool);// 創建消費者FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(sourceTopic,new KafkaWithTsMsgSchema(),p);// 讀取Kafka消息TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);DataStream<Tuple2<String, Long>> input = env.addSource(consumer).returns(typeInformation)// 提取時間戳,并生產Watermark.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());// 數據處理DataStream<Tuple2<String, Long>> result = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).max(0);// 創建生產者FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(sinkTopic,new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);// 將數據寫入Kafka指定Topic中result.addSink(producer);// 執行jobenv.execute("Kafka With Event-time Example");}
}復制代碼

測試運行如下

簡單解釋一下,我們輸入數如下:

MsgWatermark
E#10000001000000
A#30000003000000
B#50000005000000
C#50001005000100
E#50001205000120
A#70000007000000

我們看的5000000~7000000之間的數據,其中B#5000000, C#5000100E#5000120是同一個窗口的內容。計算MAX值,按字符串比較,最大的消息就是輸出的E#5000120

Kafka攜帶Timestamps

在Kafka-0.10+ 消息可以攜帶timestamps,也就是說不用單獨的在msg中顯示添加一個數據列作為timestamps。只有在寫入和讀取都用Flink時候簡單一些。一般情況用上面的示例方式已經足夠了。

小結

本篇重點是向大家介紹Kafka如何在Flink中進行應用,開篇介紹了Kafka的簡單安裝和收發消息的命令演示,然后以一個簡單的數據提取和一個Event-time的窗口示例讓大家直觀的感受如何在Apache Flink中使用Kafka。


你可能感興趣的文章

  • Flink入門

  • Flink DataSet&DataSteam API

  • Flink集群部署

  • Flink重啟策略

  • Flink分布式緩存

  • Flink重啟策略

  • Flink中的Time

  • Flink中的窗口

  • Flink的時間戳和水印

  • Flink廣播變量

  • Flink-Kafka-connetor

  • Flink-Table&SQL

  • Flink實戰項目-熱銷排行

  • Flink-Redis-Sink

  • Flink消費Kafka寫入Mysql

后面會繼續更新更多實戰案例...


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/536644.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/536644.shtml
英文地址,請注明出處:http://en.pswp.cn/news/536644.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java使用繼承的語法是,Java基礎語法八 繼承

1、超類和子類超類和子類父類與子類多態&#xff1a;一個對象變量可以指示多種實際類型的現象稱為多態一個變量可以引用父類對象&#xff0c;也可以引用其子類對象&#xff0c;這就是多態。不能將一個超類的引用賦給子類變量&#xff0c;因為調用子類方法時可能發生運行錯誤子類…

kaka 1.0.0 重磅發布,服務于后端的事件領域模型框架。

百度智能云 云生態狂歡季 熱門云產品1折起>>> kaka 1.0.0正式發布了&#xff0c;從三個月前的kaka-notice-lib 1.0.0的發布&#xff0c;經過多次研磨&#xff0c;終于迎來了本次重大更新。 kaka是一款服務于java后端的事件領域模型框架&#xff0c;主要目的為解耦業…

java配置文件工具類,java項目加載配置文件的工具類

java項目加載配置文件的工具類package com.loadproperties;import java.io.IOException;import java.io.InputStream;import java.util.Properties;public class ConfigUtil {private static InputStream input;private volatile Properties configuration new Properties();/…

如何把WAV格式音頻轉換為MP3格式

WAV為微軟公司&#xff08;Microsoft)開發的一種聲音文件格式&#xff0c;它符合RIFF(Resource Interchange File Format)文件規范&#xff0c;用于保存Windows平臺的音頻信息資源&#xff0c;被Windows平臺及其應用程序所廣泛支持&#xff0c;因此在聲音文件質量和CD相差無幾&…

php 異步處理類,php異步處理類

該類可以請求HTTP和HTTPS協議&#xff0c;還可以處理301、302重定向以及GZIP壓縮等。[PHP]代碼//使用方法require(asynHandle.class.php);$obj new asynHandle();$result $obj->Request(http://baidu.com);$result2 $obj->Get(https://mail.google.com/);echo "{…

惡意軟件盯上了加密貨幣,兩家以色列公司受到攻擊

近日&#xff0c;網絡安全公司Palo Alto Networks威脅研究部門Unit 42發博稱&#xff0c;已確認Cardinal RAT自2017年4月起對兩家從事外匯和加密交易軟件開發的以色列金融科技公司發起過攻擊。 Cardinal RAT是可遠程訪問特洛伊木馬&#xff08;RAT&#xff09;&#xff0c;攻擊…

php 自定義打印模板下載,PHP – 創建自定義模板系統?

我已經在這里搜索過,令人驚訝的是我找不到答案.我發現了一個類似的線程,但沒有真正的解決方案.復雜的部分是循環,如果我不需要循環我可以只是做一個常規替換.所以,我有一個帶有一些標記的.html文件,如下所示&#xff1a;{{startloop}}{{imgname}}{{endLoop}}我想要做的是用其他…

騰訊財報中“最大秘密”:2018云收入91億元,交首份TO B答卷

騰訊財報中“最大秘密”云業務收入又一次被公開了&#xff1a;2018年&#xff0c;騰訊云收入91億元&#xff0c;增長100%。 3月21日&#xff0c;騰訊發布2018年Q4及全年財報&#xff0c;2018全年收入3126.94億元同比增長32%&#xff0c;凈利潤(Non-GAAP)774.69億元。而被列進“…

根據坐標如何在matlab中l連成曲線,matlab中,如何將兩條曲線畫在一個坐標系里,plot(x1,x2,y1,y2)還是怎樣...

matlab中&#xff0c;如何將兩條曲線畫在一個坐標系里&#xff0c;plot(x1,x2,y1,y2)還是怎樣以下文字資料是由(歷史新知網www.lishixinzhi.com)小編為大家搜集整理后發布的內容&#xff0c;讓我們趕快一起來看一下吧&#xff01;matlab中&#xff0c;如何將兩條曲線畫在一個坐…

Android 物聯網 傳感器

前幾天做了一個嵌入式課設。將傳感器收集到的數據傳到手機制作的APP里。 項目中涉及到的主要的java代碼和xml布局文件上傳到了github&#xff0c;https://github.com/123JACK123jack/Android轉載于:https://www.cnblogs.com/libin123/p/10578601.html

java已被弱化簽名,高效Java第四十條建議:謹慎設計方法簽名

作用有助于設計易于學習和使用的API。如何做——謹慎地選擇方法的名稱1.選擇易于理解的&#xff0c;并且與同一個包中的其他名稱風格一致的名稱。2.選擇與大眾認可的名稱相一致的名稱。如何做——不要過于追求提供便利的方法每個方法都應該盡其所能。方法太多會使類難以學習、使…

curl有php內存緩存,PHP CURL內存泄露的解決方法

PHP CURL內存泄露的解決方法curl配置平淡無奇&#xff0c;長時間運行發現一個嚴重問題&#xff0c;內存泄露&#xff01;不論用單線程和多線程都無法避免&#xff01;是curl訪問https站點的時候有bug&#xff01;內存泄露可以通過linux的top命令發現&#xff0c;使用php函數mem…

MySQL學習【第五篇SQL語句上】

一.mysql命令 1.連接服務端命令 1.mysql -uroot -p123 -h127.0.0.1 2.mysql -uroot -p123 -S /tmp/mysql.sock 3.mysql -uroot -p123 -hlocalhost 4.mysql -uroot -p123 2.mysql登陸后的一些命令 1.\h或者help   查看幫助 2.\G       格式化查看數據&#xff08;以k…

phpexcel.php linux,phpexcel在linux系統報錯如何解決

最近有個tp3.2的項目遷移到linux系統上了&#xff0c;突然有天發現原本在win server 2008上運行沒問題的excel導出功能在新的系統上不能使用了。報錯如下&#xff1a;說是1762行有問題&#xff0c;找到這個文件的代碼看看&#xff1a;/*** Get an instance of this class** acc…

優雅的redux異步中間件 redux-effect

不吹不黑&#xff0c;redux蠻好用。只是有時略顯繁瑣&#xff0c;叫我定義每一個action、action type、使用時還要在組件上綁定一遍&#xff0c;臣妾做不到呀&#xff01;下面分享一種個人比較傾向的極簡寫法&#xff0c;仍有待完善&#xff0c;望討論。 github: github.com/li…

oracle 中累加函數,Oracle 分析函數分組累加!

用戶號碼 登陸時間13000000002010-01-0113000000012010-01-0113000000022010-01-0213000000012010-01-0213000000032010-01-0313000000022010-01-0313000000042010-01-0413000000032010-01-0413000000042010-01-0213000000062011-01-0413000000012011-01-04剔除重復登陸的用戶,…

asp.net core系列 48 Identity 身份模型自定義

一.概述 ASP.NET Core Identity提供了一個框架&#xff0c;用于管理和存儲在 ASP.NET Core 應用中的用戶帳戶。 Identity添加到項目時單個用戶帳戶選擇作為身份驗證機制。 默認情況下&#xff0c;Identity可以使用的 Entity Framework (EF) Core 數據模型。 本文介紹如何自定義…

oracle中創建游標,oracle 存儲過程創建游標

Oracle與Sql Server差異點詳解1、create函數或存儲過程異同點Oracle 創建函數或存儲過程一般是 create or replace ……SQL SERVER 則是在創建之前加一條語句&#xff0c;先判斷是否已經存在&#xff0c;如果存在刪除已有的函數或存儲過程。函數語句&#xff1a;if exists (sel…

hosts文件不起作用

突然發現電腦的hosts文件不起作用了。之前用的狠正常&#xff0c;近期也沒有修改過。首先排除什么格式、DNS、注冊表之類的問題。最終解決辦法&#xff08;權限問題&#xff1a;有問題的hosts文件圖標上有個鎖&#xff09;&#xff1a;1.C:\Windows\System32\drivers\etc下復制…

oracle面臨的挑戰,未來數據庫管理員面臨的三大挑戰

原標題&#xff1a;未來數據庫管理員面臨的三大挑戰前言今天的數據庫管理員面臨著三大挑戰&#xff1a;工作重心向以應用程序為中心轉移、支持多個數據庫平臺的需求、在云端以及在本地管理數據庫性能的責任不斷擴大。為了在今天和未來都能站穩腳跟&#xff0c;數據庫管理員需要…