JDK容器类List,Set,Queue源码解读
List,Set,Queue都是继承Collection接口的单列集合接口。List常用的实现主要有ArrayList,LinkedList,List中的数据是有序可重复的。Set常用的实现主要是HashSet,Set中的数据是无序不可重复的。Queue常用的实现主要有ArrayBlockingQueue,LinkedBlockingQueue,Queue是一个保持先进先出的顺序队列,不允许随机访问队列中的元素。 ArrayList核心源码解读ArrayList是一个底层用数组实现的集合,数组元素类型为Object类型,支持随机访问,元素有序且可以重复,它继承于AbstractList,实现了List,RandomAccess,Cloneable,java.io.Serializable这些接口。 当通过 ArrayList() 构造一个是集合,它是构造了一个空数组,初始长度为0。当第1次添加元素时,会创建一个长度为10的数组,并将该元素赋值到数组的第一个位置,当添加的元素大于10的时候,数组会进行第一次扩容。扩容1.5倍,长度变为15。 ArrayList在遍历的时候时不能修改的,即遍历的时候不能增加或者删除元素,否则会抛ConcurrentModificationException ArrayList是线程不安全的。 ArrayList源码中的主要字段// 默认数组的大小 private static final int DEFAULT_CAPACITY = 10; // 默认空数组 private static final Object[] EMPTY_ELEMENTDATA = {}; // 实际存放数据的数组 private transient Object[] elementData; ArrayList源码中的构造器/** * Constructs an empty list with an initial capacity of ten. */ public ArrayList() { this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA; } /** * Constructs an empty list with the specified initial capacity. * * @param initialCapacity the initial capacity of the list * @throws IllegalArgumentException if the specified initial capacity * is negative */ public ArrayList(int initialCapacity) { if (initialCapacity > 0) { this.elementData = new Object[initialCapacity]; //将自定义的容量大小当成初始化elementData的大小 } else if (initialCapacity == 0) { this.elementData = EMPTY_ELEMENTDATA; } else { throw new IllegalArgumentException("Illegal Capacity: "+ initialCapacity); } } ArrayList源码中的add方法/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ public boolean add(E e) { ensureCapacityInternal(size + 1); //添加元素之前,首先要确定集合的大小 elementData[size++] = e; //在数据中正确的位置上放上元素e,并且size++ return true; } private void ensureCapacityInternal(int minCapacity) { ensureExplicitCapacity(calculateCapacity(elementData,minCapacity)); } private static int calculateCapacity(Object[] elementData,int minCapacity) { if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) { // 如果为空数组 return Math.max(DEFAULT_CAPACITY,minCapacity); // 返回默认的我数组长度 } return minCapacity; } private void ensureExplicitCapacity(int minCapacity) { modCount++; // 修改次数+1,相当于版本号 // overflow-conscious code if (minCapacity - elementData.length > 0) // 如果实际容量小于需要容量 grow(minCapacity); // 扩容 } /** * Increases the capacity to ensure that it can hold at least the * number of elements specified by the minimum capacity argument. * * @param minCapacity the desired minimum capacity */ private void grow(int minCapacity) { // overflow-conscious code int oldCapacity = elementData.length; // 拿到数组的当前长度 int newCapacity = oldCapacity + (oldCapacity >> 1); //新数组的长度等于原数组长度的1.5倍 if (newCapacity - minCapacity < 0) //当新数组长度仍然比minCapacity小,则为保证最小长度,新数组等于minCapacity newCapacity = minCapacity; if (newCapacity - MAX_ARRAY_SIZE > 0) //当得到的新数组长度比 MAX_ARRAY_SIZE大时,调用 hugeCapacity 处理大数组 newCapacity = hugeCapacity(minCapacity); // minCapacity is usually close to size,so this is a win: elementData = Arrays.copyOf(elementData,newCapacity); //调用 Arrays.copyOf将原数组拷贝到一个大小为newCapacity的新数组(拷贝引用) } private static int hugeCapacity(int minCapacity) { if (minCapacity < 0) // overflow throw new OutOfMemoryError(); return (minCapacity > MAX_ARRAY_SIZE) ? Integer.MAX_VALUE : MAX_ARRAY_SIZE; // 数组的最大长度为Integer.MAX_VALUE } ArrayList源码中的get方法/** * Returns the element at the specified position in this list. * * @param index index of the element to return * @return the element at the specified position in this list * @throws IndexOutOfBoundsException {@inheritDoc} */ public E get(int index) { rangeCheck(index); //判断索引合法性 return elementData(index); } 线程安全的ArrayList—CopyOnWriteArrayListCopyOnWriteArrayList是基于写时复制(copy-on-write)思想来实现的一个线程安全的ArrayList集合类。它实现了List接口,内部持有一个ReentrantLock来给写上锁,底层是用volatile transient声明的数组array,它是读写分离的,写入数据时会复制出一个新的数组并加上ReentrantLock锁,完成写入后将新数组赋值给当前array,而读操作不需要获得锁,支持并发。 CopyOnWriteArrayList的写时复制导致了数据并不是实时的,有一定的延迟性,同时由于数据的复制,当数据量非常大的时候会占用很大的内存。 CopyOnWriteArrayList是适合读多写少的场景。 CopyOnWriteArrayList核心源码解读// 存放数据的数组 private transient volatile Object[] array; /** * 添加数据方法 * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); // 加锁,保证数据安全 try { Object[] elements = getArray(); // 拿到数组 int len = elements.length; // 获取数组长度 Object[] newElements = Arrays.copyOf(elements,len + 1); // 拷贝数组到新数组 newElements[len] = e; // 将元素赋值到新数组的末尾 setArray(newElements); //设置新数组为当前数组 return true; } finally { lock.unlock(); // 解锁 } } HashSet源码解读HashSet是最常用的Set实现,它继承了AbstractSet抽象类,实现了Set,Cloneable和java.io.Serializable接口。 HashSet核心源码解读// 实际存储数据的HashMap private transient HashMap<E,Object> map; // HashMap的value引用 private static final Object PRESENT = new Object(); /** * Constructs a new,empty set; the backing <tt>HashMap</tt> instance has * default initial capacity (16) and load factor (0.75). */ public HashSet() { map = new HashMap<>(); //new一个 HashMap 对象出来,采用无参的 HashMap 构造函数,具有默认初始容量(16)和加载因子(0.75)。 } /** * Adds the specified element to this set if it is not already present. * More formally,adds the specified element <tt>e</tt> to this set if * this set contains no element <tt>e2</tt> such that * <tt>(e==null ? e2==null : e.equals(e2))</tt>. * If this set already contains the element,the call leaves the set * unchanged and returns <tt>false</tt>. * * @param e element to be added to this set * @return <tt>true</tt> if this set did not already contain the specified * element */ public boolean add(E e) { return map.put(e,PRESENT)==null; } 线程安全的HashSet—CopyOnWriteArraySetCopyOnWriteArraySet是一个线程安全的HashSet,它底层是通过CopyOnWriteArrayList实现的,它是通过在添加数据的时候如果数据不存在才进行添加来实现了数据的不可重复 CopyOnWriteArraySet 核心源码解读// 实际存放数据 private final CopyOnWriteArrayList<E> al; /** * Adds the specified element to this set if it is not already present. * More formally,adds the specified element {@code e} to this set if * the set contains no element {@code e2} such that * <tt>(e==null ? e2==null : e.equals(e2))</tt>. * If this set already contains the element,the call leaves the set * unchanged and returns {@code false}. * * @param e element to be added to this set * @return {@code true} if this set did not already contain the specified * element */ public boolean add(E e) { return al.addIfAbsent(e); // 如果不存在则添加 } Queue详细分析Queue是先入先出(FIFO)的一个队列数据结构,可以分为阻塞队列和非阻塞队列,Queue接口与List、Set同一级别,都是继承了Collection接口。 Queue API
ArrayBlockingQueue源码解读ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。 // 存放数据的数组 final Object[] items; // 取数据索引 int takeIndex; // 放数据索引 int putIndex; // 数据量 int count; // 锁 final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** items index for next put,offer,or add */ int putIndex; /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue‘s capacity,* returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add},* which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { // offer方法是非阻塞 checkNotNull(e); final ReentrantLock lock = this.lock; // offer的时候加锁 lock.lock(); try { if (count == items.length) // 如果没有空间, 返回false return false; else { enqueue(e); // 如果有空间,入队列 return true; } } finally { lock.unlock(); } } /** * Inserts the specified element at the tail of this queue,waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁 lock.lockInterruptibly(); try { while (count == items.length) // queue的容量已满 notFull.await(); // 阻塞 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //队列为空时,将使这个线程进入阻塞状态,直到被其他线程唤醒时取出元素 return dequeue(); //消费对头中的元素 } finally { lock.unlock(); } } LinkedBlockingQueue源码解读LinkedBlockingQueue底层是采用链表实现的一个阻塞的线程安全的队列。 // 容量 private final int capacity; // 元素数量 private final AtomicInteger count = new AtomicInteger(); // 链表头 transient Node<E> head; // 链表尾 private transient Node<E> last; // take锁 private final ReentrantLock takeLock = new ReentrantLock(); // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒 private final Condition notEmpty = takeLock.newCondition(); // 放锁 private final ReentrantLock putLock = new ReentrantLock(); // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒 private final Condition notFull = putLock.newCondition(); /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); // 如果没传容量,就使用最大int值初始化其容量 } /** * Inserts the specified element at the tail of this queue,waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 使用putLock锁加锁 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock),and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { // 如果队列满了,就阻塞在notFull条件上 notFull.await(); // 等待被其它线程唤醒 } enqueue(node); // 队列不满了,就入队 c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 使用takeLock加锁 takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 如果队列无元素,则阻塞在notEmpty条件上 notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } ConcurrentLinkedQueue源码解读ConcurrentLinkedQueue是线程安全的无界非阻塞队列,其底层数据结构是使用单向链表实现,入队和出队操作是使用我们经常提到的CAS来保证线程安全的。 private transient volatile Node<E> head; private transient volatile Node<E> tail; private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; /** * Inserts the specified element at the tail of this queue. * As the queue is unbounded,this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { checkNotNull(e); // 为null则抛出空指针异常 final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail,p = t;;) { // 自旋 Node<E> q = p.next; if (q == null) { // 如果q==null说明p是尾节点,则执行插入 // p is last node if (p.casNext(null,newNode)) { // 使用CAS设置p节点的next节点 // Successful CAS is the linearization point // for e to become an element of this queue,// and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t,newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged,it // will also be off-list,in which case we need to // jump to head,from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |