簡單封裝kafka相關的api

一、針對于kafka版本

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.8.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.8.2.2</version>
</dependency>

二、操作topic、consumer相關方法

import kafka.admin.AdminUtils;
import kafka.admin.TopicCommand;
import kafka.api.TopicMetadata;
import kafka.tools.ConsumerOffsetChecker;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.tools.ant.taskdefs.Execute;public static class KafkaUtils {private static Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);private static AutoZkClient zkClient;public static AutoZkClient getZkClient() {return zkClient;}public static void setZkClient(AutoZkClient zkClient) {KafkaUtils.zkClient = zkClient;}public static boolean topicExists(String topic) {Assert.notNull(zkClient, "zkclient is null");return AdminUtils.topicExists(zkClient, topic);}public static void topicChangeConfig(String topic, Properties properties) {Assert.notNull(zkClient, "zkclient is null");AdminUtils.changeTopicConfig(zkClient, topic, properties);}public static void topicAlterPartitions(String topic, int partitions) {Assert.notNull(zkClient, "zkclient is null");TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient);int curPartitions = topicMetadata.partitionsMetadata().size();if (curPartitions == partitions) {return;}if (curPartitions > partitions) {LOGGER.info(String.format("curPartitions=%d,不能修改partitions=%d,請確保大與當前分區數", curPartitions, partitions));return;}String[] args = {"--zookeeper", zkClient.zkServers,"--partitions", String.valueOf(partitions),"--alter","--topic", topic};TopicCommand.TopicCommandOptions alterOpt = new TopicCommand.TopicCommandOptions(args);alterOpt.checkArgs();TopicCommand.alterTopic(zkClient, alterOpt);}public static void topicDescribe(String topic) {Assert.notNull(zkClient, "zkclient is null");String[] args = {"--zookeeper", zkClient.zkServers,"--describe","--topic", topic};TopicCommand.TopicCommandOptions describeOpt = new TopicCommand.TopicCommandOptions(args);describeOpt.checkArgs();TopicCommand.describeTopic(zkClient, describeOpt);}public static void topicOverrideConfig(String topic, Properties properties) {Assert.notNull(zkClient, "zkclient is null");Properties oldProperties = KafkaUtils.topicConfig(topic);oldProperties.putAll(properties);AdminUtils.changeTopicConfig(zkClient, topic, oldProperties);}public static void topicCreate(TopicConfig topicConfig) {Assert.notNull(zkClient, "zkclient is null");int brokerSize = ZkUtils.getSortedBrokerList(zkClient).size();if (topicConfig.getReplicationFactor() > brokerSize) {topicConfig.setReplicationFactor(brokerSize);LOGGER.info(String.format("broker-size=%d < replication-factor=%d, 所以設置replication-factor大小為broker-size大小", brokerSize, topicConfig.getReplicationFactor()));}AdminUtils.createTopic(zkClient, topicConfig.getName(), topicConfig.getPartitions(), topicConfig.getReplicationFactor(), topicConfig.getProperties());}public static void topicDelete(String topic) {Assert.notNull(zkClient, "zkclient is null");AdminUtils.deleteTopic(zkClient, topic);}public static List<String> topicsList() {Assert.notNull(zkClient, "zkclient is null");return seqAsJavaList(ZkUtils.getAllTopics(zkClient));}public static Properties topicConfig(String topic) {Assert.notNull(zkClient, "zkclient is null");return AdminUtils.fetchTopicConfig(zkClient, topic);}public static Map<String, Properties> topicsConfig() {Assert.notNull(zkClient, "zkclient is null");return mapAsJavaMap(AdminUtils.fetchAllTopicConfigs(zkClient));}public static void consumerDetail(String topic, String group){String[] args = {"--zookeeper", zkClient.getZkServers(),"--group", group,"--topic", topic};ConsumerOffsetChecker.main(args);}public static Map<String, List<String>> getConsumersPerTopic(String group) {return mapAsJavaMap(ZkUtils.getConsumersPerTopic(zkClient, group, false)).entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> JavaConversions.seqAsJavaList(entry.getValue()).stream().map(consumerThreadId -> consumerThreadId.consumer()).collect(Collectors.toList())));}public static List<String> getConsumersInGroup(String group){return JavaConversions.seqAsJavaList(ZkUtils.getConsumersInGroup(zkClient, group));}public static String executeCommond(String commond) {LOGGER.info("begin to execute commond: " + commond);File tmpFileDir = Files.createTempDir();String tmpFileName = UUID.randomUUID().toString() + ".txt";String fileSavePath = tmpFileDir.getAbsolutePath() + tmpFileName;CommandLine oCmdLine = CommandLine.parse(commond + " > " + fileSavePath);DefaultExecutor executor = new DefaultExecutor();ExecuteWatchdog watchdog = new ExecuteWatchdog(20000);executor.setWatchdog(watchdog);int[] exitValues = {0, 1};executor.setExitValues(exitValues);try {if (Execute.isFailure(executor.execute(oCmdLine))) {watchdog.killedProcess();LOGGER.error("遠程命令執行失敗... commond=" + commond);} else {try (Stream<String> lines = java.nio.file.Files.lines(new File(fileSavePath).toPath(), Charset.defaultCharset())) {List<String> fileLines = lines.collect(toCollection(LinkedList::new));StringBuilder result = new StringBuilder();fileLines.forEach(line -> result.append(line).append(System.lineSeparator()));return result.toString();} finally {FileUtils.deleteQuietly(tmpFileDir);}}} catch (Exception e) {LOGGER.error("execute command error happened... commond=" + commond, e);}return StringUtils.EMPTY;}
}

三、控制層展示

import com.alibaba.fastjson.JSON;
import com.cmos.common.annotation.CompatibleOutput;
import com.cmos.core.logger.Logger;
import com.cmos.core.logger.LoggerFactory;
import com.cmos.wmhopenapi.web.config.KafkaMessageConfig;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import scala.Console;import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;import static com.cmos.wmhopenapi.web.config.KafkaMessageConfig.KafkaUtils;/*** @author hujunzheng* @create 2018-07-16 10:20**/
@RestController
@RequestMapping("/message/state")
@CompatibleOutput
public class MessageCenterStateController {private static Logger LOGGER = LoggerFactory.getLogger(MessageCenterStateController.class);@Autowiredprivate KafkaMessageConfig.NoAckConsumer noAckConsumer;@Autowiredprivate KafkaMessageConfig.AckConsumer ackConsumer;/*** 獲取topic描述** @param topic**/@GetMapping("/topic-describe")public String topicDescribe(@RequestParam String topic) {try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {//scala 輸出流重定向Console.setOut(new PrintStream(bos));KafkaUtils.topicDescribe(topic);String result = bos.toString();LOGGER.info(result);return String.format("%s%s%s", "<pre>", result, "</pre>");} catch (Exception e) {LOGGER.error("獲取topic描述異常", e);}return StringUtils.EMPTY;}/*** 獲取全部topic**/@GetMapping(value = "/topics-all", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public String topicAll() {String result = JSON.toJSONString(KafkaUtils.topicsList());LOGGER.info(result);return result;}/*** 獲取topic配置** @param topic**/@GetMapping(value = "/topic-config", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public String topicConfig(@RequestParam String topic) {String result = JSON.toJSONString(KafkaUtils.topicConfig(topic));LOGGER.info(result);return result;}/*** 獲取所有topic的配置**/@GetMapping(value = "/topics-configs", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public String topicsConfigs() {String result = JSON.toJSONString(KafkaUtils.topicsConfig());LOGGER.info(result);return result;}/*** 展示在某個分組中的consumer** @param group**/@GetMapping(value = "/consumers-in-group", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public String consumersInGroup(@RequestParam String group) {String result = JSON.toJSONString(KafkaUtils.getConsumersInGroup(group));LOGGER.info(result);return result;}/*** 展示在某個分組中的consumer,按照topic劃分** @param group**/@GetMapping(value = "/consumers-per-topic", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public String consumersPerTopic(@RequestParam String group) {String result = JSON.toJSONString(KafkaUtils.getConsumersPerTopic(group));LOGGER.info(result);return result;}/*** 展示消費者消費詳情** @param topic* @param group**/@GetMapping("/consumer-detail")public String consumerDetail(@RequestParam String topic, @RequestParam String group) {try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {//scala 輸出流重定向Console.setOut(new PrintStream(bos));KafkaUtils.consumerDetail(topic, group);String result = bos.toString();LOGGER.info(result);return String.format("%s%s%s", "<pre>", result, "</pre>");} catch (Exception e) {LOGGER.error("獲取消費詳情", e);}return StringUtils.EMPTY;}/*** 消費消息并展示消息** @param topic* @param group* @param size 消費消息數量* @param ack 消費的消息是否需要進行ack操作**/@GetMapping("/consumer-consume")public String consumerConsume(@RequestParam String topic, @RequestParam(required = false, defaultValue = "default") String group, @RequestParam(required = false, defaultValue = "1") int size, @RequestParam(required = false, defaultValue = "false") boolean ack) {List<String> messages;if (ack) {messages = ackConsumer.consume(topic, group, size);} else {messages = noAckConsumer.consume(topic, group, size);}return JSON.toJSONString(messages);}/*** 運行kafka相關命令** @param sshRemote 連接遠程主機命令(ssh user@host)* @param sshCommond kafka相關命令 (kafka-consumer.sh ...)**/@PostMapping("/commond-execute")public String commondExecute(@RequestParam(required = false) String sshRemote, @RequestParam String sshCommond) {String commond = sshCommond + StringUtils.EMPTY;if (StringUtils.isNotBlank(sshRemote)) {commond = String.format("%s \"%s\"", sshRemote, commond);}String result = KafkaUtils.executeCommond(commond);return String.format("%s%s%s", "<pre>", result, "</pre>");}
}

四、消費配置

  消費邏輯

package com.mochasoft.latte.data.kafka.consumer;import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.apache.commons.lang3.StringUtils;public class KafkaConsumerConfig
{private String zkConnect;private String zkSessionTimeoutMs;private String zkSyncTimeMs;private String autoCommitIntervalMs;private String groupId = "default";static enum OffSet{smallest,  largest;private OffSet() {}}private OffSet offset = OffSet.largest;private Properties properties;public KafkaConsumerConfig(){this.properties = new Properties();}public KafkaConsumerConfig(String zkConnect, String zkSessionTimeoutMs, String zkSyncTimeMs, String autoCommitIntervalMs){this.zkConnect = zkConnect;this.zkSessionTimeoutMs = zkSessionTimeoutMs;this.zkSyncTimeMs = zkSyncTimeMs;this.autoCommitIntervalMs = autoCommitIntervalMs;this.properties = new Properties();}public String getZkConnect(){return this.zkConnect;}public void setZkConnect(String zkConnect){this.zkConnect = zkConnect;}public String getZkSessionTimeoutMs(){return this.zkSessionTimeoutMs;}public void setZkSessionTimeoutMs(String zkSessionTimeoutMs){this.zkSessionTimeoutMs = zkSessionTimeoutMs;}public String getZkSyncTimeMs(){return this.zkSyncTimeMs;}public void setZkSyncTimeMs(String zkSyncTimeMs){this.zkSyncTimeMs = zkSyncTimeMs;}public String getAutoCommitIntervalMs(){return this.autoCommitIntervalMs;}public void setAutoCommitIntervalMs(String autoCommitIntervalMs){this.autoCommitIntervalMs = autoCommitIntervalMs;}public String getGroupId(){return this.groupId;}public void setGroupId(String groupId){if (StringUtils.isNotBlank(groupId)) {this.groupId = groupId;}}public OffSet getOffset(){return this.offset;}public void setOffset(OffSet offset){this.offset = offset;}public ConsumerConfig getConsumerConfig(){return new ConsumerConfig(getProperties());}public Properties getProperties(){if (StringUtils.isBlank(this.zkConnect)) {throw new IllegalArgumentException("Blank zkConnect");}if (StringUtils.isNotBlank(this.zkSessionTimeoutMs)) {this.properties.put("zookeeper.session.timeout.ms", this.zkSessionTimeoutMs);}if (StringUtils.isNotBlank(this.zkSyncTimeMs)) {this.properties.put("zookeeper.sync.time.ms", this.zkSyncTimeMs);}if (StringUtils.isNotBlank(this.autoCommitIntervalMs)) {this.properties.put("auto.commit.interval.ms", this.autoCommitIntervalMs);}if (StringUtils.isNotBlank(this.offset.name())) {this.properties.put("auto.offset.reset", this.offset.name());}this.properties.put("group.id", getGroupId());this.properties.put("zookeeper.connect", this.zkConnect);return this.properties;}
}

public
static final class NoAckConsumer extends TheConsumer {public NoAckConsumer(KafkaConsumerConfig kafkaConsumerConfig) {super(kafkaConsumerConfig, false);this.consumerConfigProperties.setProperty("auto.commit.enable", "false");} }public static final class AckConsumer extends TheConsumer {public AckConsumer(KafkaConsumerConfig kafkaConsumerConfig) {super(kafkaConsumerConfig, true);this.consumerConfigProperties.setProperty("auto.commit.enable", "true");} }public static class TheConsumer {protected Properties consumerConfigProperties;private boolean ack;private StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());private StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());public TheConsumer(KafkaConsumerConfig kafkaConsumerConfig, boolean ack) {this.ack = ack;this.consumerConfigProperties = new Properties();this.consumerConfigProperties.putAll(kafkaConsumerConfig.getProperties());}/*** @param topic 主題* @param group 分組* @param size 消費數量**/public List<String> consume(String topic, String group, int size) {if (StringUtils.isNotBlank(group)) {this.consumerConfigProperties.setProperty("group.id", group);}ConsumerConnector consumerConnector = null;try {consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.consumerConfigProperties));Map<String, Integer> topics = new HashMap<>(1);topics.put(topic, 1);Map<String, List<KafkaStream<String, String>>> streams = consumerConnector.createMessageStreams(topics, keyDecoder, valueDecoder);if (!(CollectionUtils.isEmpty(streams) || CollectionUtils.isEmpty(streams.get(topic)))) {List<String> messages = new ArrayList<>();KafkaStream<String, String> messageStream = streams.get(topic).get(0);for (ConsumerIterator<String, String> it = messageStream.iterator(); it.hasNext(); ) {MessageAndMetadata<String, String> messageAndMetadata = it.next();messages.add(messageAndMetadata.message());if (this.ack) {consumerConnector.commitOffsets();}if (size <= messages.size()) {break;}}return messages;}} catch (Exception e) {LOGGER.error(String.format("%s ack consume has errors. topic=%s, group=%s, size=%d.", this.ack ? "" : "no", topic, group, size), e);} finally {if (consumerConnector != null) {consumerConnector.shutdown();}}return Collections.EMPTY_LIST;} }

  消費測試

public class KafkaTest extends BaseUnitTest {private static Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);@Value("${kafka.connection.zkconnect}")private String zkConnect;private static final AutoZkClient zkClient = new AutoZkClient("127.0.0.1:2181");private static final String TEST_TOPIC = "message-center-biz-expiration-reminder-topic";private static final String TEST_GROUP = "hjz-group";@Autowiredprivate NoAckConsumer noAckConsumer;@Autowiredprivate AckConsumer ackConsumer;@Autowiredprivate KafkaProducer kafkaProducer;private CountDownLatch finishCountDownLatch = new CountDownLatch(20);@Testpublic void testNoAckConsume() throws InterruptedException {class ConsumeRun implements Callable<List<String>> {private TheConsumer consumer;private CountDownLatch countDownLatch;public ConsumeRun(TheConsumer consumer, CountDownLatch countDownLatch) {this.consumer = consumer;this.countDownLatch = countDownLatch;}@Overridepublic List<String> call() {try {this.countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}List<String> messages = consumer.consume(TEST_TOPIC, TEST_GROUP, 8);finishCountDownLatch.countDown();return messages;}}ExecutorService executorService = Executors.newFixedThreadPool(20);CountDownLatch countDownLatch = new CountDownLatch(1);List<Future<List<String>>> noAckConsumeFutures = new ArrayList<>(), ackConsumeFutures = new ArrayList<>();for (int i = 0; i < 10; ++i) {ConsumeRun consumeRun = new ConsumeRun(this.noAckConsumer, countDownLatch);noAckConsumeFutures.add(executorService.submit(consumeRun));}for (int i = 0; i < 10; ++i) {ConsumeRun consumeRun = new ConsumeRun(this.ackConsumer, countDownLatch);ackConsumeFutures.add(executorService.submit(consumeRun));}countDownLatch.countDown();finishCountDownLatch.await();System.out.println("no ack consumers response....");noAckConsumeFutures.forEach(future -> {try {System.out.println(future.get());} catch (Exception e){}});System.out.println("\n\nack consumers response....");ackConsumeFutures.forEach(future -> {try {System.out.println(future.get());} catch (Exception e) {e.printStackTrace();}});}@Testpublic void testProduce() {for (int i = 0; i < 100; ++i) {kafkaProducer.send(TEST_TOPIC, String.valueOf(i), "message " + i);}KafkaUtils.consumerDetail(TEST_TOPIC, TEST_GROUP);}public static void createTopic() {MessageCenterConstants.TopicConfig topicConfig = new MessageCenterConstants.TopicConfig();topicConfig.setName("kafka-test");KafkaMessageConfig.KafkaUtils.topicCreate(topicConfig);}public static void testKafka() {createTopic();System.out.println(KafkaUtils.topicsList());Properties properties = new Properties();properties.put("min.cleanable.dirty.ratio", "0.3");KafkaMessageConfig.KafkaUtils.topicChangeConfig(TEST_TOPIC, properties);System.out.println(KafkaMessageConfig.KafkaUtils.topicConfig(TEST_TOPIC));KafkaUtils.topicAlterPartitions(TEST_TOPIC, 7);KafkaMessageConfig.KafkaUtils.topicDescribe(TEST_TOPIC);kafka.utils.ZkUtils.getSortedBrokerList(zkClient);}public static void testTopicDescribe() {KafkaUtils.setZkClient(zkClient);new MessageCenterStateController().topicDescribe("message-center-recharge-transaction-push-topic");}public static void testConsumerDescribe() {KafkaUtils.setZkClient(zkClient);String[] args = {"--zookeeper", zkClient.getZkServers(),"--group", "","--topic", "message-center-recharge-transaction-push-topic"};ConsumerOffsetChecker.main(args);}public static void testConsumerList() {KafkaUtils.setZkClient(zkClient);String[] args = {"--broker-list", zkClient.getZkServers(),"--topic", "message-center-recharge-transaction-push-topic","--list"};SimpleConsumerShell.main(args);}public static void main(String[] args) {testConsumerList();}
}

  測試no ack 以及 ack的消費結果

no ack consumers response....
[message 8, message 14, message 23, message 32, message 41, message 50, message 8, message 14]
[message 14, message 23, message 32, message 41, message 50, message 12, message 21, message 30]
[message 17, message 26, message 35, message 44, message 53, message 62, message 71, message 80]
[message 19, message 28, message 37, message 46, message 55, message 64, message 73, message 82]
[message 89, message 98, message 89, message 98, message 19, message 28, message 37, message 46]
[message 0, message 39, message 48, message 57, message 66, message 75, message 84, message 93]
[message 1, message 49, message 58, message 67, message 76, message 85, message 94, message 77]
[message 8, message 14, message 23, message 32, message 41, message 50, message 89, message 98]
[message 17, message 26, message 35, message 44, message 53, message 62, message 71, message 80]
[message 2, message 59, message 68, message 77, message 86, message 95, message 0, message 39]ack consumers response....
[message 7, message 13, message 22, message 31, message 40, message 5, message 11, message 20]
[message 17, message 26, message 35, message 44, message 53, message 62, message 71, message 80]
[message 77, message 86, message 95, message 67, message 76, message 85, message 94, message 0]
[message 9, message 15, message 24, message 33, message 42, message 51, message 60, message 6]
[message 4, message 10, message 79, message 88, message 97, message 2, message 59, message 68]
[message 29, message 38, message 47, message 56, message 65, message 74, message 83, message 92]
[message 16, message 25, message 34, message 43, message 52, message 61, message 70, message 8]
[message 18, message 27, message 36, message 45, message 54, message 63, message 72, message 81]
[message 3, message 69, message 78, message 87, message 96, message 1, message 49, message 58]
[message 14, message 23, message 32, message 41, message 50, message 89, message 98, message 12]

  消費測試結果分析:no ack的consumer可以實現消息的窺探。

?

五、效果圖

  獲取topic詳情

  

  獲取所有的topic信息

  

?  獲取消費詳情

? 

?  窺探消息

  

?

轉載于:https://www.cnblogs.com/hujunzheng/p/9327927.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/531224.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/531224.shtml
英文地址,請注明出處:http://en.pswp.cn/news/531224.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

springmvc controller動態設置content-type

springmvc RequestMappingHandlerAdapter#invokeHandlerMethod 通過ServletInvocableHandlerMethod#invokeAndHandle調用目標方法&#xff0c;并處理返回值。 如果return value &#xff01; null&#xff0c;則通過returnvalueHandlers處理&#xff0c;內部會調用MessageConv…

springboot2.0 redis EnableCaching的配置和使用

一、前言 關于EnableCaching最簡單使用&#xff0c;個人感覺只需提供一個CacheManager的一個實例就好了。springboot為我們提供了cache相關的自動配置。引入cache模塊&#xff0c;如下。 二、maven依賴 <dependency><groupId>org.springframework.boot</groupId…

依賴配置中心實現注有@ConfigurationProperties的bean相關屬性刷新

配置中心是什么 配置中心&#xff0c;通過keyvalue的形式存儲環境變量。配置中心的屬性做了修改&#xff0c;項目中可以通過配置中心的依賴&#xff08;sdk&#xff09;立即感知到。需要做的就是如何在屬性發生變化時&#xff0c;改變帶有ConfigurationProperties的bean的相關屬…

java接口簽名(Signature)實現方案

預祝大家國慶節快樂&#xff0c;趕快迎接美麗而快樂的假期吧&#xff01;&#xff01;&#xff01; 前言 在為第三方系統提供接口的時候&#xff0c;肯定要考慮接口數據的安全問題&#xff0c;比如數據是否被篡改&#xff0c;數據是否已經過時&#xff0c;數據是否可以重復提交…

Git rebase命令實戰

一、前言 一句話&#xff0c;git rebase 可以幫助項目中的提交歷史干凈整潔&#xff01;&#xff01;&#xff01; 二、避免合并出現分叉現象 git merge操作 1、新建一個 develop 分支 2、在develop分支上新建兩個文件 3、然后分別執行 add、commit、push 4、接著切換到master分…

HttpServletRequestWrapper使用技巧(自定義session和緩存InputStream)

一、前言 javax.servlet.http.HttpServletRequestWrapper 是一個開發者可以繼承的類&#xff0c;我們可以重寫相應的方法來實現session的自定義以及緩存InputStream&#xff0c;在程序中可以多次獲取request body的內容。 二、自定義seesion import javax.servlet.http.*;publi…

spring注解工具類AnnotatedElementUtils和AnnotationUtils

一、前言 spring為開發人員提供了兩個搜索注解的工具類&#xff0c;分別是AnnotatedElementUtils和AnnotationUtils。在使用的時候&#xff0c;總是傻傻分不清&#xff0c;什么情況下使用哪一個。于是我做了如下的整理和總結。 二、AnnotationUtils官方解釋 功能 用于處理注解&…

windows系統nexus3安裝和配置

一、前言 為什么要在本地開發機器上安裝nexus&#xff1f;首先聲明公司內部是有自己的nexus倉庫&#xff0c;但是對上傳jar包做了限制&#xff0c;不能暢快的上傳自己測試包依賴。于是就自己在本地搭建了一個nexus私服&#xff0c;即可以使用公司nexus私服倉庫中的依賴&#xf…

Springmvc借助SimpleUrlHandlerMapping實現接口開關功能

一、接口開關功能 1、可配置化&#xff0c;依賴配置中心 2、接口訪問權限可控 3、springmvc不會掃描到&#xff0c;即不會直接的將接口暴露出去 二、接口開關使用場景 和業務沒什么關系&#xff0c;主要方便查詢系統中的一些狀態信息。比如系統的配置信息&#xff0c;中間件的狀…

log4j平穩升級到log4j2

一、前言 公司中的項目雖然已經用了很多的新技術了&#xff0c;但是日志的底層框架還是log4j&#xff0c;個人還是不喜歡用這個的。最近項目再生產環境上由于log4j引起了一場血案&#xff0c;于是決定升級到log4j2。 二、現象 雖然生產環境有多個結點分散高并發帶來的壓力&…

Springboot集成ES啟動報錯

報錯內容 None of the configured nodes are available elasticsearch.yml配置 cluster.name: ftest node.name: node-72 node.master: true node.data: true network.host: 112.122.245.212 http.port: 39200 transport.tcp.port: 39300 discovery.zen.ping.unicast.hosts: [&…

高效使用hibernate-validator校驗框架

一、前言 高效、合理的使用hibernate-validator校驗框架可以提高程序的可讀性&#xff0c;以及減少不必要的代碼邏輯。接下來會介紹一下常用一些使用方式。 二、常用注解說明 限制說明Null限制只能為nullNotNull限制必須不為nullAssertFalse限制必須為falseAssertTrue限制必須為…

kafka-manager配置和使用

kafka-manager配置 最主要配置就是用于kafka管理器狀態的zookeeper主機。這可以在conf目錄中的application.conf文件中找到。 kafka-manager.zkhosts"my.zookeeper.host.com:2181" 當然也可以聲明為zookeeper集群。 kafka-manager.zkhosts"my.zookeeper.host.co…

kafka告警簡單方案

一、前言 為什么要設計kafka告警方案&#xff1f;現成的監控項目百度一下一大堆&#xff0c;KafkaOffsetMonitor、KafkaManager、 Burrow等&#xff0c;具體參考&#xff1a;kafka的消息擠壓監控。由于本小組的項目使用的kafka集群并沒有被公司的kafka-manager管理&#xff0c;…

RedisCacheManager設置Value序列化器技巧

CacheManager基本配置 請參考博文&#xff1a;springboot2.0 redis EnableCaching的配置和使用 RedisCacheManager構造函數 /*** Construct a {link RedisCacheManager}.* * param redisOperations*/ SuppressWarnings("rawtypes") public RedisCacheManager(RedisOp…

Nginx配置以及域名轉發

工程中的nginx配置 #user nobody; worker_processes 24; error_log /home/xxx/opt/nginx/logs/error.log; pid /home/xxx/opt/nginx/run/nginx.pid;events {use epoll;worker_connections 102400; }http {include /home/xxx/opt/nginx/conf.d/mime.types;default_…

java接口簽名(Signature)實現方案續

一、前言 由于之前寫過的一片文章 &#xff08;java接口簽名(Signature)實現方案 &#xff09;收獲了很多好評&#xff0c;此次來說一下另一種簡單粗暴的簽名方案。相對于之前的簽名方案&#xff0c;對body、paramenter、path variable的獲取都做了簡化的處理。也就是說這種方式…

支付寶敏感信息解密

支付寶官方解密文檔&#xff1a;https://docs.alipay.com/mini/introduce/aes String response "小程序前端提交的";//1. 獲取驗簽和解密所需要的參數 Map<String, String> openapiResult JSON.parseObject(response,new TypeReference<Map<String, St…

HashMap 源碼閱讀

前言 之前讀過一些類的源碼&#xff0c;近來發現都忘了&#xff0c;再讀一遍整理記錄一下。這次讀的是 JDK 11 的代碼&#xff0c;貼上來的源碼會去掉大部分的注釋, 也會加上一些自己的理解。 Map 接口 這里提一下 Map 接口與1.8相比 Map接口又新增了幾個方法&#xff1a;   …

SpringMvc接口中轉設計(策略+模板方法)

一、前言 最近帶著兩個兄弟做支付寶小程序后端相關的開發&#xff0c;小程序首頁涉及到很多查詢的服務。小程序后端服務在我司屬于互聯網域&#xff0c;相關的查詢服務已經在核心域存在了&#xff0c;查詢這塊所要做的工作就是做接口中轉。參考了微信小程序的代碼&#xff0c;發…