JUC并發—9.并發安全集合四

大綱

1.并發安全的數組列表CopyOnWriteArrayList

2.并發安全的鏈表隊列ConcurrentLinkedQueue

3.并發編程中的阻塞隊列概述

4.JUC的各種阻塞隊列介紹

5.LinkedBlockingQueue的具體實現原理

6.基于兩個隊列實現的集群同步機制

4.JUC的各種阻塞隊列介紹

(1)基于數組的阻塞隊列ArrayBlockingQueue

(2)基于鏈表的阻塞隊列LinkedBlockingQueue

(3)優先級阻塞隊列PriorityBlockingQueue

(4)延遲阻塞隊列DelayQueue

(5)無存儲結構的阻塞隊列SynchronousQueue

(6)阻塞隊列結合體LinkedTransferQueue

(7)雙向阻塞隊列LinkedBlockingDeque

(1)基于數組的阻塞隊列ArrayBlockingQueue

ArrayBlockingQueue是一個基于數組實現的阻塞隊列。其構造方法可以指定:數組的長度、公平還是非公平、數組的初始集合。

ArrayBlockingQueue會通過ReentrantLock來解決線程競爭的問題,以及采用Condition來解決線程的喚醒與阻塞的問題。

//A bounded BlockingQueue backed by an array.  
//This queue orders elements FIFO (first-in-first-out).  
//The head of the queue is that element that has been on the queue the longest time.  
//The tail of the queue is that element that has been on the queue the shortest time. 
//New elements are inserted at the tail of the queue, 
//and the queue retrieval operations obtain elements at the head of the queue.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //The queued itemsfinal Object[] items;//items index for next take, poll, peek or removeint takeIndex;//items index for next put, offer, or addint putIndex;//Number of elements in the queueint count;//Main lock guarding all accessfinal ReentrantLock lock;//Condition for waiting takesprivate final Condition notEmpty;//Condition for waiting putsprivate final Condition notFull;//Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.public ArrayBlockingQueue(int capacity) {this(capacity, false);}//Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}//Inserts the specified element at the tail of this queue, //waiting for space to become available if the queue is full.public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {notFull.await();}enqueue(e);} finally {lock.unlock();}}//Inserts element at current put position, advances, and signals.//Call only when holding lock.private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) {putIndex = 0;}count++;notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}}//Returns the number of elements in this queue.public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}...
}

(2)基于鏈表的阻塞隊列LinkedBlockingQueue

LinkedBlockingQueue是一個基于鏈表實現的阻塞隊列,它可以不指定阻塞隊列的長度,它的默認長度是Integer.MAX_VALUE。由于這個默認長度非常大,一般也稱LinkedBlockingQueue為無界隊列。

//An optionally-bounded BlockingQueue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time. 
//New elements are inserted at the tail of the queue, 
//and the queue retrieval operations obtain elements at the head of the queue.
//Linked queues typically have higher throughput than array-based queues 
//but less predictable performance in most concurrent applications.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...//The capacity bound, or Integer.MAX_VALUE if noneprivate final int capacity;//Current number of elementsprivate final AtomicInteger count = new AtomicInteger();//Head of linked list.transient Node<E> head;//Tail of linked list.private transient Node<E> last;//Lock held by take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//Lock held by put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//Wait queue for waiting takesprivate final Condition notEmpty = takeLock.newCondition();//Wait queue for waiting putsprivate final Condition notFull = putLock.newCondition();//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public int size() {return count.get();}...
}

(3)優先級阻塞隊列PriorityBlockingQueue

PriorityBlockingQueue是一個支持自定義元素優先級的無界阻塞隊列。默認情況下添加的元素采用自然順序升序排列,當然可以通過實現元素的compareTo()方法自定義優先級規則。

PriorityBlockingQueue是基于數組實現的,這個數組會自動進行動態擴容。在應用方面,消息中間件可以基于優先級阻塞隊列來實現。

//An unbounded BlockingQueue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.  
//While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 
//This class does not permit null elements.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, //if comparator is null: For each node n in the heap and each descendant d of n, n <= d.//The element with the lowest value is in queue[0], assuming the queue is nonempty.private transient Object[] queue;//The number of elements in the priority queue.private transient int size;//The comparator, or null if priority queue uses elements' natural ordering.private transient Comparator<? super E> comparator;//Lock used for all public operationsprivate final ReentrantLock lock;//Condition for blocking when emptyprivate final Condition notEmpty;//Spinlock for allocation, acquired via CAS.private transient volatile int allocationSpinLock;//Creates a PriorityBlockingQueue with the default initial capacity (11) that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {if (initialCapacity < 1) {throw new IllegalArgumentException();}this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never block.public void put(E e) {offer(e); // never need to block}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never return false.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length)) {tryGrow(array, cap);}try {Comparator<? super E> cmp = comparator;if (cmp == null) {siftUpComparable(n, e, array);} else {siftUpUsingComparator(n, e, array, cmp);}size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//Tries to grow array to accommodate at least one more element (but normally expand by about 50%), //giving up (allowing retry) on contention (which we expect to be rare). Call only while holding lock.private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError();}newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array) {newArray = new Object[newCap];}} finally {allocationSpinLock = 0;}}if (newArray == null) {// back off if another thread is allocatingThread.yield();}lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {while(k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = x;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null) {notEmpty.await();}} finally {lock.unlock();}return result;}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}...
}

(4)延遲阻塞隊列DelayQueue

DelayQueue是一個支持延遲獲取元素的無界阻塞隊列,它是基于優先級隊列PriorityQueue實現的。

往DelayQueue隊列插入元素時,可以按照自定義的delay時間進行排序。也就是隊列中的元素順序是按照到期時間排序的,只有delay時間小于或等于0的元素才能夠被取出。

DelayQueue的應用場景有:

一.訂單超時支付需要自動取消訂單

二.任務超時處理需要自動丟棄任務

三.消息中間件的實現

//An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired.
//The head of the queue is that Delayed element whose delay expired furthest in the past.
//If no delay has expired there is no head and poll will return null. 
//Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
//Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. 
//For example, the size method returns the count of both expired and unexpired elements.
//This queue does not permit null elements.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//Thread designated to wait for the element at the head of the queue.//When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.//The leader thread must signal some other thread before returning from take() or poll(...), //unless some other thread becomes leader in the interim.  //Whenever the head of the queue is replaced with an element with an earlier expiration time, //the leader field is invalidated by being reset to null, and some waiting thread, //but not necessarily the current leader, is signalled. //So waiting threads must be prepared to acquire and lose leadership while waiting.private Thread leader = null;//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.private final Condition available = lock.newCondition();//Creates a new {@code DelayQueue} that is initially empty.public DelayQueue() {}//Inserts the specified element into this delay queue. //As the queue is unbounded this method will never block.public void put(E e) {offer(e);}//Inserts the specified element into this delay queue.public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}//Retrieves and removes the head of this queue, //waiting if necessary until an element with an expired delay is available on this queue.public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await();} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {return q.poll();}  first = null; // don't retain ref while waitingif (leader != null) {available.await();} else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) {leader = null;}}}}}} finally {if (leader == null && q.peek() != null) {available.signal();}lock.unlock();}}...
}public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable {//Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: //For each node n in the heap and each descendant d of n, n <= d.  //The element with the lowest value is in queue[0], assuming the queue is nonempty.transient Object[] queue; //The number of elements in the priority queue.private int size = 0;//The comparator, or null if priority queue uses elements' natural ordering.private final Comparator<? super E> comparator;public E peek() {return (size == 0) ? null : (E) queue[0];}//Inserts the specified element into this priority queue.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}modCount++;int i = size;if (i >= queue.length) {grow(i + 1);}size = i + 1;if (i == 0) {queue[0] = e;} else {siftUp(i, e);}return true;}//Increases the capacity of the array.private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0) {newCapacity = hugeCapacity(minCapacity);}queue = Arrays.copyOf(queue, newCapacity);}private void siftUp(int k, E x) {if (comparator != null) {siftUpUsingComparator(k, x);} else {siftUpComparable(k, x);}}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = key;}@SuppressWarnings("unchecked")private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = x;}...
}

(5)無存儲結構的阻塞隊列SynchronousQueue

SynchronousQueue的內部沒有容器來存儲數據,因此當生產者往其添加一個元素而沒有消費者去獲取元素時,生產者會阻塞。當消費者往其獲取一個元素而沒有生產者去添加元素時,消費者也會阻塞。

SynchronousQueue的本質是借助了無容量存儲的特點,來實現生產者線程和消費者線程的即時通信,所以它特別適合在兩個線程之間及時傳遞數據。

線程池是基于阻塞隊列來實現生產者/消費者模型的。當向線程池提交任務時,首先會把任務放入阻塞隊列中,然后線程池中會有對應的工作線程專門處理阻塞隊列中的任務。

Executors.newCachedThreadPool()就是基于SynchronousQueue來實現的,它會返回一個可以緩存的線程池。如果這個線程池大小超過處理當前任務所需的數量,會靈活回收空閑線程。當任務數量增加時,這個線程池會不斷創建新的工作線程來處理這些任務。

public class Executors {...//Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available.//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.//Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters)//may be created using ThreadPoolExecutor constructors.public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}...
}

(6)阻塞隊列結合體LinkedTransferQueue

LinkedTransferQueue是一個基于鏈表實現的無界阻塞TransferQueue。

阻塞隊列的特性是根據隊列的數據情況來阻塞生產者線程或消費者線程,TransferQueue的特性是生產者線程生產數據后必須等消費者消費才返回。

LinkedTransferQueue是TransferQueue和LinkedBlockingQueue的結合體,而SynchronousQueue內部其實也是基于TransferQueue來實現的,所以LinkedTransferQueue是帶有阻塞隊列功能的SynchronousQueue。

(7)雙向阻塞隊列LinkedBlockingDeque

LinkedBlockingDeque是一個基于鏈表實現的雙向阻塞隊列,雙向隊列的兩端都可以插入和移除元素,可減少多線程并發下的一半競爭。

圖片

5.LinkedBlockingQueue的具體實現原理

(1)阻塞隊列的設計分析

(2)有界隊列LinkedBlockingQueue

(3)LinkedBlockingQueue的put()方法

(4)LinkedBlockingQueue的take()方法

(5)LinkedBlockingQueue使用兩把鎖拆分鎖功能

(6)LinkedBlockingQueue的size()方法和迭代

(7)對比LinkedBlockingQueue鏈表隊列和ArrayBlockingQueue數組隊列

(1)阻塞隊列的設計分析

阻塞隊列的特性為:如果隊列為空,消費者線程會被阻塞。如果隊列滿了,生產者線程會被阻塞。

為了實現這個特性:如何讓線程在滿足某個特定條件的情況下實現阻塞和喚醒?阻塞隊列中的數據應該用什么樣的容器來存儲?

線程的阻塞和喚醒,可以使用wait/notify或者Condition。阻塞隊列中數據的存儲,可以使用數組或者鏈表。

(2)有界隊列LinkedBlockingQueue

一.并發安全的無界隊列

比如ConcurrentLinkedQueue,是沒有邊界沒有大小限制的。它就是一個單向鏈表,可以無限制的往里面去存放數據。如果不停地往無界隊列里添加數據,那么可能會導致內存溢出。

二.并發安全的有界隊列

比如LinkedBlockingQueue,是有邊界的有大小限制的。它也是一個單向鏈表,如果超過了限制,往隊列里添加數據就會被阻塞。因此可以限制內存隊列的大小,避免內存隊列無限增長,最后撐爆內存。

(3)LinkedBlockingQueue的put()方法

put()方法是阻塞添加元素的方法,當隊列滿時,阻塞添加元素的線程。

首先把添加的元素封裝成一個Node對象,該對象表示鏈表中的一個結點。

然后使用ReentrantLock.lockInterruptibly()方法來獲取一個可被中斷的鎖,加鎖的目的是保證數據添加到隊列過程中的安全性 + 避免隊列長度超閾值。

接著調用enqueue()方法把封裝的Node對象存儲到鏈表尾部,然后通過AtomicInteger來遞增當前阻塞隊列中的元素個數。

最后根據AtomicInteger類型的變量判斷隊列元素是否已超閾值。

注意:這里用到了一個很重要的屬性notFull。notFull是一個Condition對象,用來阻塞和喚醒生產者線程。如果隊列元素個數等于最大容量,就調用notFull.await()阻塞生產者線程。如果隊列元素個數小于最大容量,則調用notFull.signal()喚醒生產者線程。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞隊列的最大容量,默認是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞隊列的元素個數transient Node<E> head;//鏈表的頭結點private transient Node<E> last;//鏈表的尾結點//使用兩把鎖,使put()和take()的鎖分離,提升并發性能private final ReentrantLock takeLock = new ReentrantLock();//出隊的鎖private final ReentrantLock putLock = new ReentrantLock();//入隊的鎖//使用兩個Condition,分別阻塞和喚醒出隊時的線程和入隊時的線程private final Condition notEmpty = takeLock.newCondition();//出隊的等待隊列conditionprivate final Condition notFull = putLock.newCondition();//入隊的等待隊列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;//將添加的元素封裝成一個Node對象Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//當前隊列元素的數量putLock.lockInterruptibly();//加可被中斷的鎖try {//注意:這里用到了一個很重要的屬性notFull,它是一個Condition對象,用來阻塞和喚醒生產者線程//如果阻塞隊列當前的元素個數等于最大容量,就調用notFull.await()方法來阻塞生產者線程while (count.get() == capacity) {notFull.await();//阻塞當前線程,并釋放鎖}//把封裝的Node對象存儲到鏈表中enqueue(node);//通過AtomicInteger來遞增當前阻塞隊列中的元素個數,用于后續判斷是否已超阻塞隊列的最大容量c = count.getAndIncrement();//根據AtomicInteger類型的變量判斷隊列元素是否已超閾值if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();//釋放鎖}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {//node先成為當前last的next//然后last又指向last的next(即node)last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}...
}

(4)LinkedBlockingQueue的take()方法

take()方法是阻塞獲取元素的方法,當隊列為空時,阻塞獲取元素的線程。

首先使用ReentrantLock.lockInterruptibly()方法來獲取一個可被中斷的鎖。

然后判斷元素個數是否為0,若是則通過notEmpty.await()阻塞消費者線程。

否則接著調用dequeue()方法從鏈表頭部獲取一個元素,并通過AtomicInteger來遞減當前阻塞隊列中的元素個數。

最后判斷阻塞隊列中的元素個數是否大于1,如果是,則調用notEmpty.signal()喚醒被阻塞的消費者線程。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞隊列的最大容量,默認是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞隊列的元素個數transient Node<E> head;//鏈表的頭結點private transient Node<E> last;//鏈表的尾結點//使用兩把鎖,使put()和take()的鎖分離,提升并發性能private final ReentrantLock takeLock = new ReentrantLock();//出隊的鎖private final ReentrantLock putLock = new ReentrantLock();//入隊的鎖//使用兩個Condition,分別阻塞和喚醒出隊時的線程和入隊時的線程private final Condition notEmpty = takeLock.newCondition();//出隊的等待隊列conditionprivate final Condition notFull = putLock.newCondition();//入隊的等待隊列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//獲取一個可中斷的鎖takeLock.lockInterruptibly();try {//判斷元素個數是否為0while (count.get() == 0) {notEmpty.await();//阻塞當前線程并釋放鎖}//調用dequeue()方法從鏈表中獲取一個元素x = dequeue();//通過AtomicInteger來遞減當前阻塞隊列中的元素個數c = count.getAndDecrement();//判斷阻塞隊列中的元素個數是否大于1//如果是,則調用notEmpty.signal()喚醒被阻塞的消費者消除if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}//首先獲取鏈表的頭結點head//然后拿到頭結點的下一個結點first//然后把原來的頭結點從隊列中移除,設置first結點的數據為null,并將first結點設置為新的頭結點//最后返回first結點的數據private E dequeue() {Node<E> h = head;//h指向headNode<E> first = h.next;//first指向h的nexth.next = h;// help GChead = first;E x = first.item;first.item = null;return x;}...
}

(5)LinkedBlockingQueue使用兩把鎖拆分鎖功能

兩把獨占鎖可以提升并發性能,因為出隊和入隊用的是不同的鎖。這樣在并發出隊和入隊的時候,出隊和入隊就可以同時執行,不會鎖沖突。

這也是鎖優化的一種思想,通過將一把鎖按不同的功能進行拆分,使用不同的鎖控制不同功能下的并發沖突,從而提升性能。

(6)LinkedBlockingQueue的size()方法和迭代

一.size()方法獲取的結果也不是100%準確

LinkedBlockingQueue的size()方法獲取元素個數是通過AtomicInteger獲取的。

相比ConcurrentLinkedQueue通過遍歷隊列獲取,準確性大很多。

相比CopyOnWriteArrayList通過遍歷老副本數組獲取,準確性也大很多。

但是相比ConcurrentHashMap通過分段CAS統計,那么準確性則差不多。

注意:LinkedBlockingQueue也不能獲取到100%準確的隊列元素的個數。除非鎖掉整個隊列,調用size()時不允許入隊和出隊,才會是100%準確。因為是完成入隊或出隊之后,才會對AtomicInteger變量進行遞增或遞減。

二.迭代時獲取兩把鎖來鎖整個隊列

LinkedBlockingQueue的遍歷會直接鎖整個隊列,即會先獲取兩把鎖。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final AtomicInteger count = new AtomicInteger();//阻塞隊列的元素個數transient Node<E> head;//鏈表的頭結點private transient Node<E> last;//鏈表的尾結點//使用兩把鎖,使put()和take()的鎖分離,提升并發性能private final ReentrantLock takeLock = new ReentrantLock();//出隊的鎖private final ReentrantLock putLock = new ReentrantLock();//入隊的鎖public int size() {return count.get();}public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null) {currentElement = current.item;}} finally {fullyUnlock();}}...}void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}...
}

(7)對比LinkedBlockingQueue鏈表隊列和ArrayBlockingQueue數組隊列

一.LinkedBlockingQueue是基于鏈表實現的有界阻塞隊列,ArrayBlockingQueue是基于數組實現的有界阻塞隊列

二.ArrayBlockingQueue的整體實現原理與LinkedBlockingQueue的整體實現原理是一樣的

三.LinkedBlockingQueue需要使用兩把獨占鎖,分別鎖出隊和入隊的場景

四.ArrayBlockingQueue只使用一把鎖,鎖整個數組,所以其入隊和出隊不能同時進行

五.ArrayBlockingQueue執行size()方法獲取元素個數時會直接加獨占鎖

六.ArrayBlockingQueue和LinkedBlockingQueue執行迭代方法時都會鎖數據

6.基于兩個隊列實現的集群同步機制

(1)服務注冊中心集群需要實現的功能

(2)基于兩個隊列實現的集群同步機制

(3)使用ConcurrentLinkedQueue實現第一個隊列

(4)使用LinkedBlockingQueue實現第二個隊列

(5)集群同步機制的具體實現

(1)服務注冊中心集群需要實現的功能

服務實例向任何一個服務注冊中心實例發送注冊、下線、心跳的請求,該服務注冊中心實例都需要將這些信息同步到其他的服務注冊中心實例上,從而確保所有服務注冊中心實例的內存注冊表的數據是一致的。

(2)基于兩個隊列實現的集群同步機制

某服務注冊中心實例接收到服務實例A的請求時,首先會把服務實例A的服務請求信息存儲到本地的內存注冊表里,也就是把服務實例A的服務請求信息寫到第一個內存隊列中,之后該服務注冊中心實例對服務實例A的請求處理就可以結束并返回。

接著該服務注冊中心實例會有一個后臺線程消費第一個內存隊列里的數據,把消費到的第一個內存隊列的數據batch打包然后寫到第二個內存隊列里。

最后該服務注冊中心實例還有一個后臺線程消費第二個內存隊列里的數據,把消費到的第二個內存隊列的數據同步到其他服務注冊中心實例中。

(3)使用ConcurrentLinkedQueue實現第一個隊列

首先有兩種隊列:

一是無界隊列ConcurrentLinkedQueue,基于CAS實現,并發性能很高。

二是有界隊列LinkedBlockingQueue,基于兩把鎖實現,并發性能一般。

LinkedBlockingQueue默認的隊列長度是MAX_VALUE,所以可以看成是無界隊列。但是也可以指定正常大小的隊列長度,從而實現入隊的阻塞,避免耗盡內存。

當服務注冊中心實例接收到各種請求時,會先將請求信息放入第一個隊列。所以第一個隊列會存在高并發寫的情況,因此LinkedBlockingQueue不合適。

因為LinkedBlockingQueue屬于阻塞隊列,如果LinkedBlockingQueue滿了,那么服務注冊中心實例中的,處理服務請求的線程,就會被阻塞住。而且LinkedBlockingQueue的并發性能也不是太高,要獲取獨占鎖才能寫,所以最好還是使用無界隊列ConcurrentLinkedQueue來實現第一個隊列。

(4)使用LinkedBlockingQueue實現第二個隊列

消費第一個內存隊列的數據時,可以按時間來進行batch打包,比如每隔500ms才將消費到的所有數據打包成一個batch消息。接著再將這個batch信息放入到第二個內存隊列中,這樣消費第二個隊列的數據時,只需同步batch信息到集群其他實例即可。

可見對第二個隊列進行的入隊和出隊操作是由少數的后臺線程來執行的,因此可以使用有界隊列LinkedBlockingQueue來實現第二個內存隊列。

此外還要估算有界隊列LinkedBlockingQueue的隊列長度應設置多少才合適。假如每一條需要同步給集群其他實例的請求信息,有6個字段,占30字節。平均每一條batch信息會包含100條請求信息,也就是會占3000字節 = 3KB。那么1000條batch消息,才占用3000KB = 3MB。因此可以設置第二個內存隊列LinkedBlockingQueue的長度為1000。

(5)集群同步機制的具體實現

//集群同步組件
public class PeersReplicator {//集群同步生成batch的間隔時間:500msprivate static final long PEERS_REPLICATE_BATCH_INTERVAL = 500;private static final PeersReplicator instance = new PeersReplicator();private PeersReplicator() {//啟動接收請求和打包batch的線程AcceptorBatchThread acceptorBatchThread = new AcceptorBatchThread();acceptorBatchThread.setDaemon(true); acceptorBatchThread.start();//啟動同步發送batch的線程PeersReplicateThread peersReplicateThread = new PeersReplicateThread();peersReplicateThread.setDaemon(true);peersReplicateThread.start(); }public static PeersReplicator getInstance() {return instance;}//第一個內存隊列:處理高并發的服務請求,所以存在高并發的寫入情況,無界隊列private ConcurrentLinkedQueue<AbstractRequest> acceptorQueue = new ConcurrentLinkedQueue<AbstractRequest>();//第二個內存隊列:有界隊列,用于同步batch消息到其他集群實例private LinkedBlockingQueue<PeersReplicateBatch> replicateQueue = new LinkedBlockingQueue<PeersReplicateBatch>(10000);  //同步服務注冊請求public void replicateRegister(RegisterRequest request) {request.setType(AbstractRequest.REGISTER_REQUEST); //將請求消息放入第一個內存隊列acceptorQueue.offer(request);}//同步服務下線請求public void replicateCancel(CancelRequest request) {request.setType(AbstractRequest.CANCEL_REQUEST);//將請求消息放入第一個內存隊列acceptorQueue.offer(request);}//同步發送心跳請求public void replicateHeartbeat(HeartbeatRequest request) {request.setType(AbstractRequest.HEARTBEAT_REQUEST);//將請求消息放入第一個內存隊列acceptorQueue.offer(request);}//負責接收數據以及打包為batch的后臺線程class AcceptorBatchThread extends Thread {long latestBatchGeneration = System.currentTimeMillis();@Overridepublic void run() {while(true) {try {//每隔500ms生成一個batchPeersReplicateBatch batch = new PeersReplicateBatch();long now = System.currentTimeMillis();if (now - latestBatchGeneration >= PEERS_REPLICATE_BATCH_INTERVAL) {//已經到了500ms的時間間隔//將batch消息放入第二個內存隊列replicateQueue.offer(batch);//更新latestBatchGenerationlatestBatchGeneration = System.currentTimeMillis();//重置batchbatch = new PeersReplicateBatch();} else {//還沒到500ms的時間間隔//從第一層隊列獲取數據,然后batch放入到第二層隊列中AbstractRequest request = acceptorQueue.poll();if (request != null) {batch.add(request);  } else {Thread.sleep(100);}            }} catch (Exception e) {e.printStackTrace(); }}}}//集群同步線程class PeersReplicateThread extends Thread {@Overridepublic void run() {while(true) {try {PeersReplicateBatch batch = replicateQueue.take();if (batch != null) {//遍歷其他的register-server地址//給每個地址的register-server都發送一個http請求同步batchSystem.out.println("給其他的register-server發送請求,同步batch......");      }} catch (Exception e) {e.printStackTrace(); }}}}
}//用于進行批量同步的batch消息
public class PeersReplicateBatch {private List<AbstractRequest> requests = new ArrayList<AbstractRequest>();public void add(AbstractRequest request) {this.requests.add(request);}public List<AbstractRequest> getRequests() {return requests;}public void setRequests(List<AbstractRequest> requests) {this.requests = requests;}
}//負責接收和處理register-client發送過來的請求的
public class RegisterServerController {//服務注冊表private ServiceRegistry registry = ServiceRegistry.getInstance();//服務注冊表的緩存private ServiceRegistryCache registryCache = ServiceRegistryCache.getInstance();//集群同步組件private PeersReplicator peersReplicator = PeersReplicator.getInstance();//服務注冊public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse = new RegisterResponse();try {//在注冊表中加入這個服務實例ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保護機制的閾值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//過期掉注冊表緩存registryCache.invalidate();//進行集群同步peersReplicator.replicateRegister(registerRequest);registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE);}return registerResponse;}//服務下線  public void cancel(CancelRequest cancelRequest) {//從服務注冊中摘除實例registry.remove(cancelRequest.getServiceName(), cancelRequest.getServiceInstanceId());//更新自我保護機制的閾值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//過期掉注冊表緩存registryCache.invalidate();//進行集群同步peersReplicator.replicateCancel(cancelRequest);  }//發送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse();try {//獲取服務實例ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance != null) {serviceInstance.renew();}//記錄一下每分鐘的心跳的次數HeartbeatCounter heartbeatMessuredRate = HeartbeatCounter.getInstance();heartbeatMessuredRate.increment();//進行集群同步peersReplicator.replicateHeartbeat(heartbeatRequest);heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//同步batch數據public void replicateBatch(PeersReplicateBatch batch) {for (AbstractRequest request : batch.getRequests()) {if (request.getType().equals(AbstractRequest.REGISTER_REQUEST)) {register((RegisterRequest) request);} else if (request.getType().equals(AbstractRequest.CANCEL_REQUEST)) {cancel((CancelRequest) request);} else if (request.getType().equals(AbstractRequest.HEARTBEAT_REQUEST)) {heartbeat((HeartbeatRequest) request); }}}//拉取全量注冊表public Applications fetchFullRegistry() {return (Applications) registryCache.get(CacheKey.FULL_SERVICE_REGISTRY);}//拉取增量注冊表public DeltaRegistry fetchDeltaRegistry() {return (DeltaRegistry) registryCache.get(CacheKey.DELTA_SERVICE_REGISTRY); }
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/896019.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/896019.shtml
英文地址,請注明出處:http://en.pswp.cn/news/896019.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

vue項目啟動時報錯:error:0308010C:digital envelope routines::unsupported

此錯誤與 Node.js 的加密模塊有關&#xff0c;特別是在使用 OpenSSL 3.0 及以上版本時。Vue 項目在啟動時可能會依賴一些舊的加密算法&#xff0c;而這些算法在 OpenSSL 3.0 中默認被禁用&#xff0c;導致 error:0308010C:digital envelope routines::unsupported 錯誤。 解決…

ncDLRES:一種基于動態LSTM和ResNet的非編碼RNA家族預測新方法

現有的計算方法主要分為兩類&#xff1a;第一類是通過學習序列或二級結構的特征來預測ncRNAs家族&#xff0c;另一類是通過同源序列之間的比對來預測ncRNAs家族。在第一類中&#xff0c;一些方法通過學習預測的二級結構特征來預測ncRNAs家族。二級結構預測的不準確性可能會導致…

愛普生 SG-8101CE 可編程晶振在筆記本電腦的應用

在筆記本電腦的精密架構中&#xff0c;每一個微小的元件都如同精密儀器中的齒輪&#xff0c;雖小卻對整體性能起著關鍵作用。如今的筆記本電腦早已不再局限于簡單的辦公用途&#xff0c;其功能愈發豐富多樣。從日常輕松的文字處理、網頁瀏覽&#xff0c;到專業領域中對圖形處理…

SPRING10_getBean源碼詳細解讀、流程圖

文章目錄 ①. getBean方法的入口-DefaultListableBeanFactory②. DefaultListableBeanFactory調用getBean③. 進入到doGetBean方法④. getSingleton三級緩存方法⑤. getSingleton()方法分析⑥. createBean創建對象方法⑦. 對象創建、屬性賦值、初始化⑧. getBean最詳細流程圖 ①…

IDEA中查詢Maven項目的依賴樹

在Maven項目中&#xff0c;查看項目的依賴樹是一個常見的需求&#xff0c;特別是當你需要了解項目中直接或間接依賴了哪些庫及其版本時。你可以通過命令行使用Maven的dependency:tree插件來做到這一點。這個命令會列出項目中所有依賴的樹狀結構。 打開idea項目的終端&#xff…

深入xtquant:財務數據獲取與應用的實戰指南

深入xtquant&#xff1a;財務數據獲取與應用的實戰指南 在量化交易領域&#xff0c;雖然技術分析和市場情緒分析占據了主導地位&#xff0c;但財務數據作為評估公司基本面的重要依據&#xff0c;同樣不可或缺。xtquant作為一個強大的Python庫&#xff0c;提供了便捷的接口來獲…

windows 安裝 stable diffusion

在windows上安裝 stable diffusion&#xff0c;如果windows沒有nvidia顯卡&#xff0c;想只使用CPU可在webui-user.bat中添加命令 set COMMANDLINE_ARGS--no-half --skip-torch-cuda-test 可正常使用stable diffusion&#xff0c;但速度較慢

Kubernetes控制平面組件:APIServer 基于 引導Token 的認證機制

云原生學習路線導航頁&#xff08;持續更新中&#xff09; kubernetes學習系列快捷鏈接 Kubernetes架構原則和對象設計&#xff08;一&#xff09;Kubernetes架構原則和對象設計&#xff08;二&#xff09;Kubernetes架構原則和對象設計&#xff08;三&#xff09;Kubernetes控…

DeepSeek 助力 Vue 開發:打造絲滑的縮略圖列表(Thumbnail List)

前言&#xff1a;哈嘍&#xff0c;大家好&#xff0c;今天給大家分享一篇文章&#xff01;并提供具體代碼幫助大家深入理解&#xff0c;徹底掌握&#xff01;創作不易&#xff0c;如果能幫助到大家或者給大家一些靈感和啟發&#xff0c;歡迎收藏關注哦 &#x1f495; 目錄 Deep…

DeepSeek寫俄羅斯方塊手機小游戲

DeepSeek寫俄羅斯方塊手機小游戲 提問 根據提的要求&#xff0c;讓DeepSeek整理的需求&#xff0c;進行提問&#xff0c;內容如下&#xff1a; 請生成一個包含以下功能的可運行移動端俄羅斯方塊H5文件&#xff1a; 核心功能要求 原生JavaScript實現&#xff0c;適配手機屏幕 …

百問網(100ask)的IMX6ULL開發板的以太網控制器(MAC)與物理層(PHY)芯片(LAN8720A)連接的原理圖分析(包含各引腳說明以及工作原理)

前言 本博文承接博文 https://blog.csdn.net/wenhao_ir/article/details/145663029 。 本博文和博文 https://blog.csdn.net/wenhao_ir/article/details/145663029 的目錄是找出百問網(100ask)的IMX6ULL開發板與NXP官方提供的公板MCIMX6ULL-EVK(imx6ull14x14evk)在以太網硬件…

QT開發技術 【opencv圖片裁剪,平均哈希相似度判斷,以及獲取游戲窗口圖片】

一、圖片裁剪 int CJSAutoWidget::GetHouseNo(cv::Mat matMap) {cv::imwrite(m_strPath "/Data/map.png", matMap);for (int i 0; i < 4; i){for (int j 0; j < 6; j){// 計算當前子區域的矩形cv::Rect roi(j * 20, i * 17, 20, 17);// 提取子區域cv::Mat …

TiDB 是一個分布式 NewSQL 數據庫

TiDB 是一個分布式 NewSQL 數據庫。它支持水平彈性擴展、ACID 事務、標準 SQL、MySQL 語法和 MySQL 協議&#xff0c;具有數據強一致的高可用特性&#xff0c;是一個不僅適合 OLTP 場景還適合 OLAP 場景的混合數據庫。 TiDB是 PingCAP公司自主設計、研發的開源分布式關系型數據…

請解釋 Vue 中的生命周期鉤子,不同階段觸發的鉤子函數及其用途是什么?

vue生命周期鉤子詳解&#xff08;Vue 3版本&#xff09; 一、生命周期階段劃分 Vue組件的生命周期可分為四大階段&#xff0c;每個階段對應特定鉤子函數&#xff1a; 創建階段&#xff1a;初始化實例并準備數據掛載階段&#xff1a;將虛擬DOM渲染為真實DOM更新階段&#xff…

計算機專業知識【深入理解子網中的特殊地址:為何 192.168.0.1 和 192.168.0.255 不能隨意分配】

在計算機網絡的世界里&#xff0c;IP 地址是設備進行通信的關鍵標識。對于常見的子網&#xff0c;如 192.168.0.0/24&#xff0c;我們可能會疑惑為何某些地址不能分配給主機使用。接下來&#xff0c;我們就以 192.168.0.0/24 為例&#xff0c;詳細解釋為何 192.168.0.1 和 192.…

軟件架構設計:軟件工程

一、軟件工程概述 軟件工程的定義 軟件工程是應用系統化、規范化、可量化的方法開發、運行和維護軟件。 軟件工程的目標 提高軟件質量、降低開發成本、縮短開發周期。 軟件生命周期 瀑布模型&#xff1a;需求分析→設計→編碼→測試→維護。迭代模型&#xff1a;分階段迭代開…

mysql 學習15 SQL優化,插入數據優化,主鍵優化,order by優化,group by 優化,limit 優化,count 優化,update 優化

插入數據優化&#xff0c; insert 優化&#xff0c; 批量插入&#xff08;一次不超過1000條&#xff09; 手動提交事務 主鍵順序插入 load 從本地一次插入大批量數據&#xff0c; 登陸時 mysql --local-infile -u root -p load data local infile /root/sql1.log into table tb…

達夢數據庫針對慢SQL,收集統計信息清除執行計劃緩存

前言&#xff1a;若遇到以下場景&#xff0c;大概率是SQL走錯了執行計劃&#xff1a; 1、一條SQL在頁面上查詢特別慢&#xff0c;但拿到數據庫終端執行特別快 2、一條SQL在某種檢索條件下查詢特別慢&#xff0c;但拿到數據庫終端執行特別快 此時&#xff0c;可以嘗試按照下述步…

使用JWT實現微服務鑒權

目錄 一、微服務鑒權 1、思路分析 2、系統微服務簽發token 3、網關過濾器驗證token 4、測試鑒權功能 前言&#xff1a; 隨著微服務架構的廣泛應用&#xff0c;服務間的鑒權與安全通信成為系統設計的核心挑戰之一。傳統的集中式會話管理在分布式場景下面臨性能瓶頸和擴展性…