百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

打通JAVA与内核!一个ReentrantLock锁的实现原理

liebian365 2024-11-12 13:11 5 浏览 0 评论

写JAVA都知道,JAVA里的同学锁有几个类代码,是同步锁,魔法是并发包里的锁(JUC锁)。其中同步锁是JAVA语言文字提供的能力,在这个不展开,本文主要讨论JUC里的ReentrantLock锁。

一 JDK 层

1 AbstractQueuedSynchronizer


ReentrantLock的lock(),unlock()等API实际上依赖于内部的Synchronizer(注意,不是synchronized)来实现。Synchronizer又分为FairSync和NonfairSync,顾名思义是指公平和非公平。


当调用ReentrantLock的lock方法时,实际上只是简单地转给Synchronizer的lock()方法:

代码节选自:java.util.concurrent.locks.ReentrantLock.java
 /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
......
}


    public void lock() {
        sync.lock();
    }

那么这个sync又是什么?我们看到Sync继承自AbstractQueueSynchronizer(AQS),AQS是并发的基石,AQS不实现任何同步接口(比如lock,unlock,countDown等),但它定义了一个并发接口资源控制逻辑的框架(一式了模板方法设计模式),它定义了获取和释放方法用于独占地(独占)获取和释放资源,以及获取共享和释放共享方法用于共享地获取和释放资源。 release用于实现ReentrantLock,而acquireShared/releaseShared用于实现CountDownLacth,Semaphore。比如acquire的框架如下:

    /**

     * Acquires in exclusive mode, ignoring interrupts.  Implemented

     * by invoking at least once {@link #tryAcquire},

     * returning on success.  Otherwise the thread is queued, possibly

     * repeatedly blocking and unblocking, invoking {@link

     * #tryAcquire} until success.  This method can be used

     * to implement method {@link Lock#lock}.

     *

     * @param arg the acquire argument.  This value is conveyed to

     *        {@link #tryAcquire} but is otherwise uninterpreted and

     *        can represent anything you like.

     */

    public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

整体逻辑是,先进行一次tryAcquire,成功者,继续啥事了,调用了自己的代码,如果失败,则执行addWaiter和acquireQueued。其中tryAcquire()需要子类根据自己的同步需求进行acquireQueued()和addWaiter()已经由AQS实现。addWai的作用是把当前加入到AQS内部实现的线程,而acquireQueued的作用是当tryAcquire()失败的时候激发线程。

addWaiter 的代码如下:

/**

     * Creates and enqueues node for current thread and given mode.

     *

     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared

     * @return the new node

     */

    private Node addWaiter(Node mode) {

        //创建节点,设置关联线程和模式(独占或共享)

        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;

        // 如果尾节点不为空,说明同步队列已经初始化过

        if (pred != null) {

            //新节点的前驱节点设置为尾节点

            node.prev = pred;

            // 设置新节点为尾节点

            if (compareAndSetTail(pred, node)) {

                //老的尾节点的后继节点设置为新的尾节点。 所以同步队列是一个双向列表。

                pred.next = node;

                return node;

            }

        }

        //如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点

        enq(node);

        return node;

    }

enq(node)的代码如下:


/**

     * Inserts node into queue, initializing if necessary. See picture above.

     * @param node the node to insert

     * @return node's predecessor

     */

    private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            if (t == null) { // Must initialize

                // 如果tail为空,则新建一个head节点,并且tail和head都指向这个head节点

                //队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联

                if (compareAndSetHead(new Node()))

                    tail = head;

            } else {

                //第二次循环进入这个分支,

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

        }

    }






addWaiter执行结束后,同步驱动的结构如下所示:




acquireQueued 的代码如下:


 /**

     * Acquires in exclusive uninterruptible mode for thread already in

     * queue. Used by condition wait methods as well as acquire.

     *

     * @param node the node

     * @param arg the acquire argument

     * @return {@code true} if interrupted while waiting

     */

    final boolean acquireQueued(final Node node, int arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            for (;;) {

                //获取当前node的前驱node

                final Node p = node.predecessor();

                //如果前驱node是head node,说明自己是第一个排队的线程,则尝试获锁

                if (p == head && tryAcquire(arg)) {

                    //把获锁成功的当前节点变成head node(哑节点)。

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

acquireQueued 的逻辑是:


判断自己是不是同步起源中的第一个过程自己的节点,则试点进行锁定,如果成功则将其变成头节点,如下所示:




如果自己不是第一个事件的节点或者tryAcqui失败,则调用shouldParkAfterFailed,其主要逻辑是使用CAS将节点状态由INITIAL设置成SIGNAL,显示当前线程等待SIGNAL。如果设置失败,会中acquire的循环中继续重试,直到设置成功,然后调用parkAndCheckInterrupt方法。parkAndCheckInterrupt的作用是当前线程模拟挂起,等待当前把parkAndCheckInterrupt的需要借助层的能力,这是这一点的重点,在中实现死层判断。


2 重入锁


下面就让我们一起来看看 ReentrantLock 是如何基于AbstractQueueSynchronizer 实现其输入的。


ReentrantLock内部使用的FairSync和NonfairSync,它们都是AQS的子类,比如FairSync的主要代码如下:


/**

     * Sync object for fair locks

     */

    static final class FairSync extends Sync {

        private static final long serialVersionUID = -3000897897090466540L;




        final void lock() {

            acquire(1);

        }




        /**

         * Fair version of tryAcquire.  Don't grant access unless

         * recursive call or no waiters or is first.

         */

        protected final boolean tryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

    }

AQS中最重要的一个领域就是状态,锁和同步器的实现都围绕着这个领域的修改展开。根据自己需要可以对同步状态的意义有不同的定义,并重新对应的tryAcquire、tryRelease或tryAcquireshared、tryReleaseShared等方法来操作同步状态。


我们来看看ReentrantLock的FairSync的tryAcquire的逻辑:


  1. 如果此时状态(private volatile int state)是0,那么就表示这个时候没有人占有锁。但因为是非锁,所以还要判断自己是首节点,然后才把实验状态设置为1,成功成功那么,就成功占有了锁。compareAndSetState也是通过CAS来实现的。CAS是原子操作,而且state的类型是volatile,所以state的值是线程安全的。
  2. 如果此时不是状态,那么再判断当前线程是不是锁的主人,如果是的话,则状态会启动,当状态时,会抛错。
  3. 获得不满足,则返回假,获取锁失败。


至此,整个JAVA语言的实现基本说清楚了,小结一下,框架如下所示:



关于解锁的实现,只有篇幅,还有讨论了,重点分析锁中是如何把当前的场景演示挂起的,上上的unsafe.park()是如何实现的。


二、JVM层

Unsafe.park和Unsafe.unpark是sun.misc.Unsafe类的原生方法,


public native void unpark(Object var1);




public native void park(boolean var1, long var2);

这两种方法的实现是在JVM的hotspot/src/share/vm/prims/unsafe.cpp文件中,


UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))

  UnsafeWrapper("Unsafe_Park");

  EventThreadPark event;

#ifndef USDT2

  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);

#else /* USDT2 */

   HOTSPOT_THREAD_PARK_BEGIN(

                             (uintptr_t) thread->parker(), (int) isAbsolute, time);

#endif /* USDT2 */

  JavaThreadParkedState jtps(thread, time != 0);

  thread->parker()->park(isAbsolute != 0, time);

#ifndef USDT2

  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());

#else /* USDT2 */

  HOTSPOT_THREAD_PARK_END(

                          (uintptr_t) thread->parker());

#endif /* USDT2 */

  if (event.should_commit()) {

    const oop obj = thread->current_park_blocker();

    if (time == 0) {

      post_thread_park_event(&event, obj, min_jlong, min_jlong);

    } else {

      if (isAbsolute != 0) {

        post_thread_park_event(&event, obj, min_jlong, time);

      } else {

        post_thread_park_event(&event, obj, time, min_jlong);

      }

    }

  }

UNSAFE_END

核心是逻辑是thread->parker()->park(isAbsolute != 0, time); 就是获取java线程的parker对象,然后执行它的park方法。


class Parker : public os::PlatformParker {

private:

  volatile int _counter ;

  ...

public:

  void park(bool isAbsolute, jlong time);

  void unpark();

  ...

}

class PlatformParker : public CHeapObj<mtInternal> {

  protected:

    enum {

        REL_INDEX = 0,

        ABS_INDEX = 1

    };

    int _cur_index;  // which cond is in use: -1, 0, 1

    pthread_mutex_t _mutex [1] ;

    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.




  public:       // TODO-FIXME: make dtor private

    ~PlatformParker() { guarantee (0, "invariant") ; }




  public:

    PlatformParker() {

      int status;

      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());

      assert_status(status == 0, status, "cond_init rel");

      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);

      assert_status(status == 0, status, "cond_init abs");

      status = pthread_mutex_init (_mutex, NULL);

      assert_status(status == 0, status, "mutex_init");

      _cur_index = -1; // mark as unused

    }

};
公园方法:

void Parker::park(bool isAbsolute, jlong time) {

  // Return immediately if a permit is available.

  // We depend on Atomic::xchg() having full barrier semantics

  // since we are doing a lock-free update to _counter.

  if (Atomic::xchg(0, &_counter) > 0) return;




  Thread* thread = Thread::current();

  assert(thread->is_Java_thread(), "Must be JavaThread");

  JavaThread *jt = (JavaThread *)thread;




  if (Thread::is_interrupted(thread, false)) {

    return;

  }




  // Next, demultiplex/decode time arguments

  timespec absTime;

  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all

    return;

  }

  if (time > 0) {

    unpackTime(&absTime, isAbsolute, time);

  }




  ////进入safepoint region,更改线程为阻塞状态

  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from

  // unblocking.  Also. check interrupt before trying wait

  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {

  //如果线程被中断,或者尝试给互斥变量加锁时失败,比如被其它线程锁住了,直接返回

    return;

  }

  //到这里,意味着pthread_mutex_trylock(_mutex)成功

  int status ;

  if (_counter > 0)  { // no wait needed

    _counter = 0;

    status = pthread_mutex_unlock(_mutex);

    assert (status == 0, "invariant") ;

    OrderAccess::fence();

    return;

  }




#ifdef ASSERT

  // Don't catch signals while blocked; let the running threads have the signals.

  // (This allows a debugger to break into the running thread.)

  sigset_t oldsigs;

  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();

  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);

  jt->set_suspend_equivalent();

  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()




  assert(_cur_index == -1, "invariant");

  if (time == 0) {

    _cur_index = REL_INDEX; // arbitrary choice when not timed

    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;

  } else {

    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;

    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;

    if (status != 0 && WorkAroundNPTLTimedWaitHang) {

      pthread_cond_destroy (&_cond[_cur_index]) ;

      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());

    }

  }

  _cur_index = -1;

  assert_status(status == 0 || status == EINTR ||

                status == ETIME || status == ETIMEDOUT,

                status, "cond_timedwait");




#ifdef ASSERT

  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);

#endif




  _counter = 0 ;

  status = pthread_mutex_unlock(_mutex) ;

  assert_status(status == 0, status, "invariant") ;

  // Paranoia to ensure our locked and lock-free paths interact

  // correctly with each other and Java-level accesses.

  OrderAccess::fence();




  // If externally suspended while waiting, re-suspend

  if (jt->handle_special_suspend_equivalent_condition()) {

    jt->java_suspend_self();

  }

}

park的思路:parker内部有关键字段_计数器,一个计数器记录的“许可”,当_计数器大于0时,有许可,然后就可以把_计数器设置为0,是获得了许可,可以继续运行过去的代码。如果此时_计数器不大于0,则等待这个条件满足。


下面我具体来看看公园的具体实现:


  1. 当调用park,先尝试可能成功实现“许可”,即,如果,则把_counter设置为0,并返回。
  2. 如果不成功,则将线程的设置成_thread_in_vm并且_thread_blocked。_thread_in_vm表示线程当前在JVM中执行,_thread_blocked表示线程状态当前模式了。
  3. mutex之后,再次检查_counter是不是>0,如果是,则把_counter设置为0,解锁mutex并返回
  4. 如果_counter还是不大于0,则判断等待的时间等于0,然后调用相应的pthread_cond_wait系列函数进行等待,如果等待返回(即有人进行unpark,则pthread_cond_signal来通知),则把_counter设置为0,解锁互斥并返回。


所以上本质原因,LockSupport.park是通过pthread库的条件pthread_cond_t来实现的。下面我们就来看看pthread_cond_t是怎么实现的。


三 GLIBC 层

pthread_cond_t 典型的用法如下:


#include <pthread.h>

#include <stdio.h>

#include <stdlib.h>




pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥锁*/

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  //初始化条件变量 




void *thread1(void *);

void *thread2(void *);




int i=1;

int main(void)

{

        pthread_t t_a;

        pthread_t t_b;

        pthread_create(&t_a,NULL,thread1,(void *)NULL);/*创建进程t_a*/

        pthread_create(&t_b,NULL,thread2,(void *)NULL); /*创建进程t_b*/

        pthread_join(t_b, NULL);/*等待进程t_b结束*/

        pthread_mutex_destroy(&mutex);

        pthread_cond_destroy(&cond);

        exit(0);

}

void *thread1(void *junk)

{

        for(i=1;i<=9;i++)

        {

             pthread_mutex_lock(&mutex);//

             if(i%3==0)

                 pthread_cond_signal(&cond);/*条件改变,发送信号,通知t_b进程*/

              else       

                  printf("thead1:%d/n",i);

              pthread_mutex_unlock(&mutex);//*解锁互斥量*/

              printf("Up Unlock Mutex/n");      

              sleep(1);

        }

}

void *thread2(void *junk)

{

        while(i<9)

        {

            pthread_mutex_lock(&mutex);

            if(i%3!=0)

                   pthread_cond_wait(&cond,&mutex);/*等待*/

            printf("thread2:%d/n",i);

            pthread_mutex_unlock(&mutex);

            printf("Down Ulock Mutex/n");

            sleep(1);

         }




}

重点就是:无论是pthread_cond_waitpthread_cond_signal都必须得先pthread_mutex_lock。如果没有这个保护,可能会产生竞态条件,漏掉信号。pthread_cond_wait()函数一进入wait状态会自动释放互斥锁。当其他线程通过pthread_cond_signal或pthread_cond_broadcast 使该线程启动,使 pthread_cond_wait()返回时,该线程又自动获得该互斥锁。


整个过程如下图所示:



1 pthread_mutex_lock


例如,在Linux中,使用了一个Futex(快速用户空间互斥锁的部分)的。


在此系统中,对用户空间中的互斥启动原子和测试操作。


如果操作结果证明锁上没有争用,则对pthread_mutex_lock的调用将返回,而无需可以通信到内核中,因此获得互斥量的操作非常快。


仅当检测到争用时,系统调用(以后futex)才会发生,并且上下文切换到内核中,使这调用进程进入睡眠状态,直到解除互锁直到。


还有很多更多的细节,尤其是可以相信和/或优先级继承反对,但这只是它的本质。


nptl/pthread_mutex_lock.c


int

PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex)

{

  /* See concurrency notes regarding mutex type which is loaded from __kind

     in struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h.  */

  unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);




  LIBC_PROBE (mutex_entry, 1, mutex);




  if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP

                                 | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))

    return __pthread_mutex_lock_full (mutex);




  if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))

    {

      FORCE_ELISION (mutex, goto elision);

    simple:

      /* Normal mutex.  */

      LLL_MUTEX_LOCK_OPTIMIZED (mutex);

      assert (mutex->__data.__owner == 0);

    }

#if ENABLE_ELISION_SUPPORT

  else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))

    {

  elision: __attribute__((unused))

      /* This case can never happen on a system without elision,

         as the mutex type initialization functions will not

         allow to set the elision flags.  */

      /* Don't record owner or users for elision case.  This is a

         tail call.  */

      return LLL_MUTEX_LOCK_ELISION (mutex);

    }

#endif

  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)

                             == PTHREAD_MUTEX_RECURSIVE_NP, 1))

    {

      /* Recursive mutex.  */

      pid_t id = THREAD_GETMEM (THREAD_SELF, tid);




      /* Check whether we already hold the mutex.  */

      if (mutex->__data.__owner == id)

        {

          /* Just bump the counter.  */

          if (__glibc_unlikely (mutex->__data.__count + 1 == 0))

            /* Overflow of the counter.  */

            return EAGAIN;




          ++mutex->__data.__count;




          return 0;

        }




      /* We have to get the mutex.  */

      LLL_MUTEX_LOCK_OPTIMIZED (mutex);




      assert (mutex->__data.__owner == 0);

      mutex->__data.__count = 1;

    }

  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)

                          == PTHREAD_MUTEX_ADAPTIVE_NP, 1))

    {

      if (LLL_MUTEX_TRYLOCK (mutex) != 0)

        {

          int cnt = 0;

          int max_cnt = MIN (max_adaptive_count (),

                             mutex->__data.__spins * 2 + 10);

          do

            {

              if (cnt++ >= max_cnt)

                {

                  LLL_MUTEX_LOCK (mutex);

                  break;

                }

              atomic_spin_nop ();

            }

          while (LLL_MUTEX_TRYLOCK (mutex) != 0);




          mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;

        }

      assert (mutex->__data.__owner == 0);

    }

  else

    {

      pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

      assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);

      /* Check whether we already hold the mutex.  */

      if (__glibc_unlikely (mutex->__data.__owner == id))

        return EDEADLK;

      goto simple;

    }




  pid_t id = THREAD_GETMEM (THREAD_SELF, tid);




  /* Record the ownership.  */

  mutex->__data.__owner = id;

#ifndef NO_INCR

  ++mutex->__data.__nusers;

#endif




  LIBC_PROBE (mutex_acquired, 1, mutex);




  return 0;

}

pthread_mutex_t 的定义如下:


typedef union

{

  struct __pthread_mutex_s

  {

    int __lock;

    unsigned int __count;

    int __owner;

    unsigned int __nusers;

    int __kind;

    int __spins;

    __pthread_list_t __list;

  } __data;

  ......

} pthread_mutex_t;

__一种类型是指锁的类型,取值如下:


/* Mutex types.  */

enum

{ 

  PTHREAD_MUTEX_TIMED_NP,

  PTHREAD_MUTEX_RECURSIVE_NP,

  PTHREAD_MUTEX_ERRORCHECK_NP,

  PTHREAD_MUTEX_ADAPTIVE_NP

#if defined __USE_UNIX98 || defined __USE_XOPEN2K8

  ,

  PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP,

  PTHREAD_MUTEX_RECURSIVE = PTHREAD_MUTEX_RECURSIVE_NP,

  PTHREAD_MUTEX_ERRORCHECK = PTHREAD_MUTEX_ERRORCHECK_NP,

  PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL

#endif

#ifdef __USE_GNU

  /* For compatibility.  */

  , PTHREAD_MUTEX_FAST_NP = PTHREAD_MUTEX_TIMED_NP

#endif

};

其中: 


  • PTHREAD_MU_TIMED_NP,这是一种值,也就是普通锁。 


  • PTHREAD_MUTEX_RECURSIVE_NP允许可重入锁,一个线程对同一个锁成功获得多次,并通过多次解锁重启。


  • PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程重复请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型相同。


  • PTHREAD_MUTEX_ADAPTIVE_NP,惯锁,自旋锁与普通锁的混合。 


互斥体默认使用的是 PTHREAD_MUTEX_TIMED_NP,所以会走 LLL_MUTEX_LOCK_OPTIMIZED,这是个宏:


# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)
lll_mutex_lock_optimized (pthread_mutex_t *mutex)

{

  /* The single-threaded optimization is only valid for private

     mutexes.  For process-shared mutexes, the mutex could be in a

     shared mapping, so synchronization with another process is needed

     even without any threads.  If the lock is already marked as

     acquired, POSIX requires that pthread_mutex_lock deadlocks for

     normal mutexes, so skip the optimization in that case as

     well.  */

  int private = PTHREAD_MUTEX_PSHARED (mutex);

  if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)

    mutex->__data.__lock = 1;

  else

    lll_lock (mutex->__data.__lock, private);

}

因为不是LLL_PRIVATE,所以走lll_lock,lll_lock也是个宏:


#define lll_lock(futex, private)        \

  __lll_lock (&(futex), private)

注意这里出现了文字,本文的重点主要是围绕它展开的。


#define __lll_lock(futex, private)                                      \

  ((void)                                                               \

   ({                                                                   \

     int *__futex = (futex);                                            \

     if (__glibc_unlikely                                               \

         (atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))        \

       {                                                                \

         if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \

           __lll_lock_wait_private (__futex);                           \

         else                                                           \

           __lll_lock_wait (__futex, private);                          \

       }                                                                \

   }))

其中,atomic_compare_and_exchange_bool_acq是实验如下通过原子操作实验将__futex(mutex->__data.__lock)从0变为1,如果成功就直接返回了,如果失败,则调用__lll_lock_wait,代码:


void

__lll_lock_wait (int *futex, int private)

{

  if (atomic_load_relaxed (futex) == 2)

    goto futex;




  while (atomic_exchange_acquire (futex, 2) != 0)

    {

    futex:

      LIBC_PROBE (lll_lock_wait, 1, futex);

      futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2.  */

    }

}

在这里先要说明一下,pthread将futex的锁状态定义为3种:


  • 0,代表当前锁开启无锁,可以进行快速上锁,无需进门。


  • 1,代表有线程抓住当前锁,如果有线程线程需要上锁,就必须标记futex为“锁竞争”,然后通过futex系统调用进内核把当前线程挂起。


  • 2,更换锁竞争,有其他线程将要或正在内核的futex系统中等待锁定。


上锁失败进入到__lll_lock_wait后,先判断futex是不是等于2,那么说明大家都在这里所以如果你也排着吧(直接跳转到futex_wait)。如果不等于2,那说明你是第一个来设置自己的人,把futex 2,告诉对方来的人要结婚,然后以身作则先结婚。

futex_wait 就是调用futex系统调用在第四节,我们就来仔细分析这个系统。


2 pthread_cond_wait


本质也是需要futex系统调用,只有篇幅不展开了。


四面层

什么时候加入内核的?


简单简单,futex 的解决思路是:在无战斗的情况下操作完全在用户空间进行,无需系统调用,仅在发生或者发生的时候就进入昏迷去完成相应的处理(等待醒来)。所以说,futex是一种用户模式和内核模式混合的同步机制,需要两种模式合作才能完成,futex创建位于用户空间,而不是代码内核对象,futex的也分为用户模式和内核模式两部分,无竞争的情况下在用户模式,发生竞争时则通过sys_futex系统调用进入内核模式进行处理。


已经在前面演示用户了,本节重点讲解futex在内核部分的实现。


futex 设计了三个基本数据结构:futex_hash_bucket,futex_key,futex_q。


struct futex_hash_bucket {

        atomic_t waiters;

        spinlock_t lock;

        struct plist_head chain;

} ____cacheline_aligned_in_smp;
struct futex_q {

        struct plist_node list;

        struct task_struct *task;

        spinlock_t *lock_ptr;

        union futex_key key;   //唯一标识uaddr的key值

        struct futex_pi_state *pi_state;

        struct rt_mutex_waiter *rt_waiter;

        union futex_key *requeue_pi_key;

        u32 bitset;

};
union futex_key { 

        struct {

                unsigned long pgoff;

                struct inode *inode;

                int offset;

        } shared;

        struct {

                unsigned long address;

                struct mm_struct *mm;

                int offset;

        } private; 

        struct {

                unsigned long word;

                void *ptr;

                int offset;

        } both;

};

其实还有个struct __futex_data,如下所示,这个


static struct {

        struct futex_hash_bucket *queues;

        unsigned long            hashsize;

} __futex_data __read_mostly __aligned(2*sizeof(long));




#define futex_queues   (__futex_data.queues)

#define futex_hashsize (__futex_data.hashsize)

在futex初始化的时候(futex_init),会确定hashsize,比如24核cpu时,hashsize = 8192。然后根据这个hashsize调用alloc_large_system_hash分配数组空间,并初始化数组元素里的相关属性,比如plist_head,lock。


static int __init futex_init(void)

{

        unsigned int futex_shift;

        unsigned long i;




#if CONFIG_BASE_SMALL

        futex_hashsize = 16;

#else

        futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());

#endif




        futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),

                                               futex_hashsize, 0,

                                               futex_hashsize < 256 ? HASH_SMALL : 0,

                                               &futex_shift, NULL,

                                               futex_hashsize, futex_hashsize);

        futex_hashsize = 1UL << futex_shift;




        futex_detect_cmpxchg();




        for (i = 0; i < futex_hashsize; i++) {

                atomic_set(&futex_queues[i].waiters, 0);

                plist_head_init(&futex_queues[i].chain);

                spin_lock_init(&futex_queues[i].lock);

        }




        return 0;

}

这些数据结构之间的关系如下所示:



脑子里有了数据结构,流程就容易理解了。futex_wait的总体流程如下:



static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,

                      ktime_t *abs_time, u32 bitset)

{

        struct hrtimer_sleeper timeout, *to = NULL;

        struct restart_block *restart;

        struct futex_hash_bucket *hb;

        struct futex_q q = futex_q_init;

        int ret;




        if (!bitset)

                return -EINVAL;

        q.bitset = bitset;




        if (abs_time) {

                to = &timeout;

                hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?

                                      CLOCK_REALTIME : CLOCK_MONOTONIC,

                                      HRTIMER_MODE_ABS);

                hrtimer_init_sleeper(to, current);

                hrtimer_set_expires_range_ns(&to->timer, *abs_time,

                                             current->timer_slack_ns);

        }




retry:

        /*

         * Prepare to wait on uaddr. On success, holds hb lock and increments

         * q.key refs.

         */

        ret = futex_wait_setup(uaddr, val, flags, &q, &hb);

        if (ret)

                goto out;




        /* queue_me and wait for wakeup, timeout, or a signal. */

        futex_wait_queue_me(hb, &q, to);




        /* If we were woken (and unqueued), we succeeded, whatever. */

        ret = 0;

        /* unqueue_me() drops q.key ref */

        if (!unqueue_me(&q))

                goto out;

        ret = -ETIMEDOUT;

        if (to && !to->task)

                goto out;




        /*

         * We expect signal_pending(current), but we might be the

         * victim of a spurious wakeup as well.

         */

        if (!signal_pending(current))

                goto retry;




        ret = -ERESTARTSYS;

        if (!abs_time)

                goto out;




        restart = ¤t->restart_block;

        restart->fn = futex_wait_restart;

        restart->futex.uaddr = uaddr;

        restart->futex.val = val;

        restart->futex.time = *abs_time;

        restart->futex.bitset = bitset;

        restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;




        ret = -ERESTART_RESTARTBLOCK;




out:

        if (to) {

                hrtimer_cancel(&to->timer);

                destroy_hrtimer_on_stack(&to->timer);

        }

        return ret;

}

函数futex_wait_setup主要做两件事,一是对uaddr进行hash,找到futex_hash_bucket并获取它上面的自旋锁,二是判断*uaddr是否为预期值。如果没有被录取立即返回,由用户状态继续trylock。


*

 * futex_wait_setup() - Prepare to wait on a futex

 * @uaddr:      the futex userspace address

 * @val:        the expected value

 * @flags:      futex flags (FLAGS_SHARED, etc.)

 * @q:          the associated futex_q

 * @hb:         storage for hash_bucket pointer to be returned to caller

 *

 * Setup the futex_q and locate the hash_bucket.  Get the futex value and

 * compare it with the expected value.  Handle atomic faults internally.

 * Return with the hb lock held and a q.key reference on success, and unlocked

 * with no q.key reference on failure.

 *

 * Return:

 *  -  0 - uaddr contains val and hb has been locked;

 *  - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked

 */

static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,

                           struct futex_q *q, struct futex_hash_bucket **hb)

{

        u32 uval;

        int ret;        

retry:

        //初始化futex_q, 把uaddr设置到futex_key的字段中,将来futex_wake时也是通过这个key来查找futex。

        ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);

        if (unlikely(ret != 0))

                return ret;




retry_private:

        //根据key计算hash,然后在数组里找到对应的futex_hash_bucket

        *hb = queue_lock(q);

        //原子地将uaddr的值读到uval中

        ret = get_futex_value_locked(&uval, uaddr);




        if (ret) {

                queue_unlock(*hb);




                ret = get_user(uval, uaddr);

                if (ret)

                        goto out;




                if (!(flags & FLAGS_SHARED))

                        goto retry_private;




                put_futex_key(&q->key);

                goto retry;

        }

        //如果当前uaddr指向的值不等于val,即说明其他进程修改了

  //uaddr指向的值,等待条件不再成立,不用阻塞直接返回。

       if (uval != val) {

                queue_unlock(*hb);

                ret = -EWOULDBLOCK;

        }




out:

        if (ret)

                put_futex_key(&q->key);

        return ret;

}

然后调用futex_wait_queue_me 把当前进程挂起:


/**

 * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal

 * @hb:         the futex hash bucket, must be locked by the caller

 * @q:          the futex_q to queue up on

 * @timeout:    the prepared hrtimer_sleeper, or null for no timeout

 */

static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,

                                struct hrtimer_sleeper *timeout)

{

        /*

         * The task state is guaranteed to be set before another task can

         * wake it. set_current_state() is implemented using smp_store_mb() and

         * queue_me() calls spin_unlock() upon completion, both serializing

         * access to the hash list and forcing another memory barrier.

         */

        set_current_state(TASK_INTERRUPTIBLE);

        queue_me(q, hb);




        /* Arm the timer */

        if (timeout)

                hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);




        /*

         * If we have been removed from the hash list, then another task

         * has tried to wake us, and we can skip the call to schedule().

         */

        if (likely(!plist_node_empty(&q->list))) {

                /*

                 * If the timer has already expired, current will already be

                 * flagged for rescheduling. Only call schedule if there

                 * is no timeout, or if it has yet to expire.

                 */

                if (!timeout || timeout->task)

                        freezable_schedule();

        }

        __set_current_state(TASK_RUNNING);

}

futex_wait_queue_me我主要做几件事:


  1. 将当前线程插入到延迟,就是挂到futex_hash_bucket上
  2. 启动定时任务
  3. 主动触发进程调度


五总结

本文主要是对JAVA中的ReentrantLock.lock流程进行了自上而下的极限。

相关推荐

快递查询教程,批量查询物流,一键管理快递

作为商家,每天需要查询许许多多的快递单号,面对不同的快递公司,有没有简单一点的物流查询方法呢?小编的回答当然是有的,下面随小编一起来试试这个新技巧。需要哪些工具?安装一个快递批量查询高手快递单号怎么快...

一键自动查询所有快递的物流信息 支持圆通、韵达等多家快递

对于各位商家来说拥有一个好的快递软件,能够有效的提高自己的工作效率,在管理快递单号的时候都需要对单号进行表格整理,那怎么样能够快速的查询所有单号信息,并自动生成表格呢?1、其实方法很简单,我们不需要一...

快递查询单号查询,怎么查物流到哪了

输入单号怎么查快递到哪里去了呢?今天小编给大家分享一个新的技巧,它支持多家快递,一次能查询多个单号物流,还可对查询到的物流进行分析、筛选以及导出,下面一起来试试。需要哪些工具?安装一个快递批量查询高手...

3分钟查询物流,教你一键批量查询全部物流信息

很多朋友在问,如何在短时间内把单号的物流信息查询出来,查询完成后筛选已签收件、筛选未签收件,今天小编就分享一款物流查询神器,感兴趣的朋友接着往下看。第一步,运行【快递批量查询高手】在主界面中点击【添...

快递单号查询,一次性查询全部物流信息

现在各种快递的查询方式,各有各的好,各有各的劣,总的来说,还是有比较方便的。今天小编就给大家分享一个新的技巧,支持多家快递,一次能查询多个单号的物流,还能对查询到的物流进行分析、筛选以及导出,下面一起...

快递查询工具,批量查询多个快递快递单号的物流状态、签收时间

最近有朋友在问,怎么快速查询单号的物流信息呢?除了官网,还有没有更简单的方法呢?小编的回答当然是有的,下面一起来看看。需要哪些工具?安装一个快递批量查询高手多个京东的快递单号怎么快速查询?进入快递批量...

快递查询软件,自动识别查询快递单号查询方法

当你拥有多个快递单号的时候,该如何快速查询物流信息?比如单号没有快递公司时,又该如何自动识别再去查询呢?不知道如何操作的宝贝们,下面随小编一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号若干...

教你怎样查询快递查询单号并保存物流信息

商家发货,快递揽收后,一般会直接手动复制到官网上一个个查询物流,那么久而久之,就会觉得查询变得特别繁琐,今天小编给大家分享一个新的技巧,下面一起来试试。教程之前,我们来预览一下用快递批量查询高手...

简单几步骤查询所有快递物流信息

在高峰期订单量大的时候,可能需要一双手当十双手去查询快递物流,但是由于逐一去查询,效率极低,追踪困难。那么今天小编给大家分享一个新的技巧,一次能查询多个快递单号的物流,下面一起来学习一下,希望能给大家...

物流单号查询,如何查询快递信息,按最后更新时间搜索需要的单号

最近有很多朋友在问,如何通过快递单号查询物流信息,并按最后更新时间搜索出需要的单号呢?下面随小编一起来试试吧。需要哪些工具?安装一个快递批量查询高手快递单号若干怎么快速查询?运行【快递批量查询高手】...

连续保存新单号功能解析,导入单号查询并自动识别批量查快递信息

快递查询已经成为我们日常生活中不可或缺的一部分。然而,面对海量的快递单号,如何高效、准确地查询每一个快递的物流信息,成为了许多人头疼的问题。幸运的是,随着科技的进步,一款名为“快递批量查询高手”的软件...

快递查询教程,快递单号查询,筛选更新量为1的单号

最近有很多朋友在问,怎么快速查询快递单号的物流,并筛选出更新量为1的单号呢?今天小编给大家分享一个新方法,一起来试试吧。需要哪些工具?安装一个快递批量查询高手多个快递单号怎么快速查询?运行【快递批量查...

掌握批量查询快递动态的技巧,一键查找无信息记录的两种方法解析

在快节奏的商业环境中,高效的物流查询是确保业务顺畅运行的关键。作为快递查询达人,我深知时间的宝贵,因此,今天我将向大家介绍一款强大的工具——快递批量查询高手软件。这款软件能够帮助你批量查询快递动态,一...

从复杂到简单的单号查询,一键清除单号中的符号并批量查快递信息

在繁忙的商务与日常生活中,快递查询已成为不可或缺的一环。然而,面对海量的单号,逐一查询不仅耗时费力,还容易出错。现在,有了快递批量查询高手软件,一切变得简单明了。只需一键,即可搞定单号查询,一键处理单...

物流单号查询,在哪里查询快递

如果在快递单号多的情况,你还在一个个复制粘贴到官网上手动查询,是一件非常麻烦的事情。于是乎今天小编给大家分享一个新的技巧,下面一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号怎么快速查询?...

取消回复欢迎 发表评论: