百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

JUC并发—4.wait和notify以及Atomic原理一

liebian365 2025-03-12 16:29 1 浏览 0 评论

大纲

1.wait()与notify()实现一个简易的内存队列

2.wait()与notify()的底层原理

3.分布式存储系统NameNode机制介绍

4.分布式存储系统的edits log机制介绍

5.分布式存储系统的NameNode实现

6.分布式存储系统的创建目录功能的实现

7.edits log的全局txid机制和双缓冲机制实现

8.synchronized实现edits log分段加锁机制

9.wait()与notify()实现edits log批量刷磁盘

10.i++和AtomicInteger分别实现并发安全

11.AtomicInteger中的CAS无锁化原理

12.Atomic源码之仅限JDK使用的Unsafe类

13.Atomic源码之无限重复循环以及CAS操作

14.Atomic原子类基于CAS操作的三大问题

15.AtomicLong优化服务注册中心心跳计数器

16.LongAdder的分段CAS优化多线程自旋

17.LongAdder的分段CAS优化心跳计数器

18.服务注册中心的增量拉取机制

19.AtomicReference优化客户端缓存注册表

20.AtomicStampedReference解决ABA问题

21.AtomicLong多线程拉取注册表版本不错乱


1.wait()与notify()实现一个简易的内存队列

在多线程开发中,wait()和notify()/notifyAll()还是挺常见的。在分布式系统里经常会使用wait()和notifyAll()来进行线程通信,当某个线程处于阻塞等待状态时,其他线程可以进行通知并唤醒它。


如下代码向内存队列添加元素和获取元素时,都使用了MyQueue对象锁。当内存队列满或者空时,需要释放锁,才能让添加或者获取继续下去。其中wait()方法会释放锁,并让当前线程进入等待状态,而notify()方法和notifyAll()方法会唤醒等待获取锁的线程。所以wait()和notify()主要是用来控制线程的,当然也可认为用于线程通信。

public class MyQueue {
    private final static int MAX_SIZE = 100;
    private LinkedList queue = new LinkedList();
   
    //向内存队列添加一个元素
    public synchronized void offer(String element) {
        try {
            if (queue.size() == MAX_SIZE) {
                //一个线程只要执行到这一步,就说明已经获取到锁
                //但现在内存队列已经满了,所以可以让线程进入一个等待的状态,并释放锁
                wait();
            }
            queue.addLast(element);
            //唤醒当前在等待锁的线程
            notifyAll();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //获取内存队列的第一个元素
    public synchronized String take() {
        //别的线程可以通过take()方法从队列里获取数据
        String element = null;
        try {
            if (queue.size() == 0) {
                //释放锁,并让当前线程自己进行阻塞等待
                //等待其他线程往内存队列放入数据后,通过notifyAll()来唤醒自己
                wait();
            }
            element = queue.removeFirst();
            //唤醒当前在等待锁的线程
            notifyAll();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return element;
    }
}


2.wait()与notify()的底层原理

(1)获取重量级锁之前的锁膨胀

(2)ObjectMonitor对象的重要字段

(3)重量级锁的获取流程

(4)重量级锁的释放流程

(5)wait()与notify()的底层原理

(6)wait()与notify()在使用时的注意事项


(1)获取重量级锁之前的锁膨胀

如果线程在运行synchronized修饰的同步块代码时,发现锁状态是轻量级锁并且有其他线程抢占了锁资源,那么该线程就会触发锁膨胀升级到重量级锁。


在获取重量级锁之前会先实现锁膨胀,锁膨胀时首先会创建一个ObjectMonitor对象,然后把ObjectMonitor对象的指针保存到锁对象的Mark Word中。


重量级锁的实现是在ObjectMonitor中完成的,所以锁膨胀的意义就是构建一个ObjectMonitor对象


(2)ObjectMonitor对象的重要字段

_owner:保存当前持有锁的线程

_cxq:没有获得锁的线程队列

_waitset:被wait()方法阻塞的线程队列

_recursions:锁被重入的次数


(3)重量级锁的获取流程

重量级锁的竞争都是在ObjectMonitor对象中完成的。首先判断当前线程是否是重入,如果是则重入次数+1。然后通过CAS自旋来判断ObjectMonitor中的_owner字段是否为空。如果为空,则表示重量级锁已经被释放,当前线程可以获得锁。如果不为空,就继续进行自适应自旋重试。最后如果通过自旋竞争锁失败,则把当前线程构建成一个ObjectWaiter结点,插入到ObjectMonitor的_cxq队列的队头,再调用park()方法阻塞当前线程。


(4)重量级锁的释放流程

首先把ObjectMonitor的_owner字段设置为null,然后从ObjectMonitor的_cxq队列调用unpark()方法唤醒一个阻塞的线程。被唤醒的线程会重新竞争重量级锁,如果没抢到,则继续阻塞等待。因为synchronized是非公平锁,被唤醒的线程不一定能重新抢占到锁。


(5)wait()与notify()的底层原理

这与synchronized的原理(ObjectMonitor对象)相关,ObjectMonitor对象有一个_waitset队列重入计数器使用wait()和notify()时必须对同一个对象实例进行加synchronized锁。如果对象实例加锁,那么重入计数器 + 1。如果对象实例释放锁,那么重入计数器 - 1。


执行wait()方法时会释放锁 + 阻塞当前线程 + 把当前线程放入_waitset队列,执行notify()方法时会唤醒_waitset队列里的被阻塞的线程


(6)wait()与notify()在使用时的注意事项

wait()与sleep()的区别:两者都会等待,前者释放锁后者不释放锁。wait()必须要有其他线程调用notify()来唤醒它。wait(timeout)会阻塞一段时间,然后自己唤醒自己,继续争抢锁。wait()与notify()必须与synchornized一起对同一个对象进行使用。notify()会唤醒阻塞状态的一个线程,notifyall()会唤醒阻塞状态的所有线程


3.分布式存储系统NameNode机制介绍

(1)HDFS的DataNode和NameNode

(2)HDFS的NameNode架构简介


(1)HDFS的DataNode和NameNode

HDFS是Hadoop的分布式文件系统,它由很多机器组成。每台机器上运行一个DataNode进程,存储一部分数据。然后会有一台机器上运行一个NameNode进程,NameNode可以认为是负责管理整个HDFS集群的进程,NameNode里存储了HDFS集群的所有元数据。


(2)HDFS的NameNode架构简介

一.每次修改元数据都顺序追加edits log

二.如何避免edits log过大导致恢复过慢

三.NameNode主备高可用故障转移机制


一.每次修改元数据都顺序追加edits log

NameNode的核心功能管理整个HDFS集群的元数据,比如文件目录树权限设置副本数设置等。


HDFS客户端每次上传文件时,都要维护NameNode的文件目录树。但是NameNode的文件目录树是在内存里的,万一NameNode宕机,内存里的文件目录树可能就会丢失。


所以每次修改内存就顺序追加一条edits log(元数据操作日志)到磁盘文件。每次NameNode重启,就把edits log(元数据操作日志)读到内存恢复数据。


二.如何避免edits log过大导致恢复过慢

为了避免edits log(元数据操作日志)越来越大每次重启恢复过慢,于是引入了一个新的磁盘文件fsimage一个JournalNodes集群一个Active NameNode(主节点)一个Standby NameNode(备节点)


主节点每修改一条元数据都会生成一条edits log。每条edits log除了写到主节点外,还会写到JournalNodes集群。然后备节点会从JournalNodes集群拉取edits log到自己内存的文件目录树里,这样备节点的数据就可以跟主节点的数据保持一致了。


每隔一段时间备节点会把自己内存的文件目录树写一份到fsimage磁盘文件,这个也就是所谓的checkpoint检查点操作。然后备节点再把这个fsimage磁盘文件上传到到主节点,接着清空掉主节点上的旧的edits log文件(可能几十万行)。之后主节点继续处理修改元数据请求,那么可能只有几十行edits log日志了。


如果此时主节点重启,首先把备节点传过来的fsimage文件读到内存里,然后把新的edits log里少量的几十行操作日志重新恢复到内存中即可。


三.NameNode主备高可用故障转移机制

整个过程有两个NameNode:一是对外提供服务接收请求主节点NameNode,二是同步主节点edits log + 定期执行checkpoint备节点NameNode


这两个NameNode内存里的元数据几乎一模一样。所以如果主节点挂了,可以马上切换到备节点对外提供服务,而这就是所谓的NameNode主备高可用故障转移机制了。


4.分布式存储系统的edits log机制介绍

(1)高并发请求下NameNode会遇到的问题

(2)通过双缓冲机制来提升写edits log的性能


(1)高并发请求下NameNode会遇到的问题

NameNode每修改一条元数据都要写一条edits log,这包括两个步骤:写入本地磁盘通过网络传输给JournalNodes集群


NameNode必须保证写入的每条edits log都有一个全局顺序递增的txid,这样才可以标识出一条edits log的先后顺序。


如果要保证每条edits log的txid都是递增的,那么就必须要加锁。每个线程修改完元数据准备写一条edits log时,按顺序排队获取锁,获取到锁之后才能生成一个递增的txid给要准备写的edits log。


但是如果每次在加锁的代码块里生成txid,然后写磁盘文件edits log,接着通过网络传输写入JournalNodes,那么性能就一定很低。所以每个线程写edits log时最好不要串行化排队来执行这3个操作生成txid + 写磁盘 + 写JournalNode


(2)通过双缓冲机制来提升写edits log的性能

为了避免线程写edits log时串行化排队去生成txid + 写磁盘 + 写JournalNode,可以考虑增加内存缓冲。首先将edits log写入到内存缓冲里,然后通过后台线程将内存中的edits log刷入磁盘 + 写入JournalNode。而且将edits log刷盘的过程中,其他线程依然可以将edits log写入内存缓冲


如果针对同一块内存缓冲,同时有线程写入、同时有线程读取后刷入磁盘,那么是会存在并发读写问题的,因为不能并发读写一块共享内存数据。


所以HDFS采取了双缓冲机制来处理,也就是将一块内存缓冲分成两部分。其中一部分只用来写入另一部分只用来读取进行刷盘


5.分布式存储系统的NameNode实现

(1)NameNode的基本功能

(2)NameNode的核心启动类

(3)NameNode的RPC服务接口

(4)负责管理元数据的FSNamesystem

(5)负责管理文件目录树的FSDirectory

(6)负责管理edits log日志的FSEditlog


(1)NameNode的基本功能

如果NameNode执行命令创建一个目录,那么会做两件事情:一是在内存里的文件目录树中加入目录节点,二是在磁盘里写入一条edits log日志来记录本次元数据修改


所以接下来要实现两个功能:一是在内存文件目录树中加入目录节点,二是写edits log到磁盘文件。


如下是NameNode的核心组件说明:

FSNamesystem类:作为NameNode里元数据操作的核心入口,负责管理所有的元数据的操作,会调用其他组件完成相关事情。

FSDirectory类:管理内存中的文件目录树。

FSEditLog类:写入edits log到磁盘文件里。


(2)NameNode的核心启动类

//NameNode核心启动类
public class NameNode {
    //NameNode是否在运行
    private volatile Boolean shouldRun;
  
    //负责管理元数据的核心组件
    private FSNamesystem namesystem;
  
    //NameNode对外提供RPC接口的Server,可以响应请求
    private NameNodeRpcServer rpcServer;
  
    public NameNode() {
        this.shouldRun = true;
    }
  
    //初始化NameNode
    private void initialize() {
        this.namesystem = new FSNamesystem();
        this.rpcServer = new NameNodeRpcServer(this.namesystem);  
        this.rpcServer.start();
    }
  
    //让NameNode运行起来
    private void run() {
        try {
            while(shouldRun) {
                Thread.sleep(10000);  
            }  
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  
    public static void main(String[] args) throws Exception {
        NameNode namenode = new NameNode();
        namenode.initialize();
        namenode.run();
    }
}

(3)NameNode的RPC服务接口

//NameNode的rpc服务的接口
public class NameNodeRpcServer {
    //负责管理元数据的核心组件
    private FSNamesystem namesystem;
  
    public NameNodeRpcServer(FSNamesystem namesystem) {
        this.namesystem = namesystem;
    }
  
    //创建目录
    public Boolean mkdir(String path) throws Exception {
        return this.namesystem.mkdir(path);
    }

    //启动这个rpc server
    public void start() {
        System.out.println("开始监听指定的rpc server的端口号,来接收请求");  
    }
}

(4)负责管理元数据的FSNamesystem

//负责管理元数据的核心组件
public class FSNamesystem {
    //负责管理内存文件目录树的组件
    private FSDirectory directory;
  
    //负责管理edits log写入磁盘的组件
    private FSEditlog editlog;
  
    public FSNamesystem() {
        this.directory = new FSDirectory();
        this.editlog = new FSEditlog();
    }
  
    //创建目录
    public Boolean mkdir(String path) throws Exception {
        this.directory.mkdir(path); 
        this.editlog.logEdit("创建了一个目录:" + path);   
        return true;
    }
}

(5)负责管理文件目录树的FSDirectory

//负责管理内存中的文件目录树的核心组件
public class FSDirectory {
    //创建目录
    public void mkdir(String path) {
    }
}

(6)负责管理edits log日志的FSEditlog

//负责管理edits log日志的核心组件
public class FSEditlog {
    //记录edits log日志
    public void logEdit(String log) {
    }
}


6.分布式存储系统的创建目录功能实现

内存的文件目录树创建一个目录节点的代码如下。内存里的文件目录树是会被多线程并发写的资源,所以创建目录的代码块必须要用synchronized保护起来。

//负责管理内存中的文件目录树的核心组件
public class FSDirectory {
    //内存中的文件目录树
    private INodeDirectory dirTree;
    
    public FSDirectory() {
        this.dirTree = new INodeDirectory("/");  
    }
    
    //创建目录
    public void mkdir(String path) {
        //path = /usr/warehouse/hive
        //首先判断'/'根目录下有没有一个'usr'目录
        //如果有,那么再判断'/usr'目录下有没有一个'/warehouse'目录
        //如果没有,那么就得先在'/usr'目录下创建一个'/warehosue'目录
        //接着再在'/warehosue'目录下,创建'hive'这个目录节点
        synchronized(dirTree) {
            String[] pathes = path.split("/");
            INodeDirectory parent = dirTree;
            for (String splitedPath : pathes) {
                if (splitedPath.trim().equals("")) {
                    continue;
                }
                INodeDirectory dir = findDirectory(parent, splitedPath);
                if (dir != null) {
                    parent = dir;
                    continue;
                }
                INodeDirectory child = new INodeDirectory(splitedPath); 
                parent.addChild(child);  
            }
        }
    }
    
    //对文件目录树递归查找目录
    private INodeDirectory findDirectory(INodeDirectory dir, String path) {
        if (dir.getChildren().size() == 0) {
            return null;
        }
        INodeDirectory resultDir = null;
        for (INode child : dir.getChildren()) {
            if (child instanceof INodeDirectory) {
                INodeDirectory childDir = (INodeDirectory) child;
                if ((childDir.getPath().equals(path))) {
                    return childDir;
                }
                resultDir = findDirectory(childDir, path);
                if (resultDir != null) {
                    return resultDir;
                }
            }
        }
        return null;
    }
   
    //代表的是文件目录树中的一个节点
    private interface INode {
      
    }
    
    //代表文件目录树中的一个目录
    public static class INodeDirectory implements INode {
        private String path;
        private List children;
        public INodeDirectory(String path) {
            this.path = path;
            this.children = new LinkedList();
        }
        public void addChild(INode inode) {
            this.children.add(inode);
        }
        public String getPath() {
            return path;
        }
        public void setPath(String path) {
            this.path = path;
        }
        public List getChildren() {
            return children;
        }
        public void setChildren(List children) {
            this.children = children;
        }
    }
    
    //代表文件目录树中的一个文件
    public static class INodeFile implements INode {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }
}


7.edits log的全局txid机制和双缓冲机制实现

全局txid机制 + 双缓冲机制的代码如下:

//负责管理edits log日志的核心组件
public class FSEditlog {
    //当前递增到的txid的序号
    private long txidSeq = 0L;
    
    //内存双缓冲区
    private DoubleBuffer editLogBuffer = new DoubleBuffer();
    
    //记录edits log日志
    public void logEdit(String content) {
        //这里必须加锁
        synchronized(this) {
            //获取全局唯一递增的txid,代表了edits log的序号
            txidSeq++;
            long txid = txidSeq;
            //构造一条edits log对象
            EditLog log = new EditLog(txid, content);
            //将edits log写入内存缓冲中,不是直接刷入磁盘文件
            editLogBuffer.write(log);  
        }
    }
   
    //代表了一条edits log,内部类
    private class EditLog {
        long txid;
        String content;
        public EditLog(long txid, String content) {
            this.txid = txid;
            this.content = content;
        }
    }
   
    //内存双缓冲,内部类
    private class DoubleBuffer {
        //专门用来承载线程写入edits log
        LinkedList currentBuffer = new LinkedList();

        //专门用来将数据同步到磁盘中去的一块缓冲
        LinkedList syncBuffer = new LinkedList();

        //将edits log写到内存缓冲里去
        public void write(EditLog log) {
            currentBuffer.add(log);
        }

        //交换两块缓冲区,为同步内存数据到磁盘做准备
        public void setReadyToSync() {
            LinkedList tmp = currentBuffer;
            currentBuffer = syncBuffer;
            syncBuffer = tmp;
        }

        //获取sync buffer缓冲区里的最大的一个txid
        public Long getSyncMaxTxid() {
            return syncBuffer.getLast().txid;
        }

        //将syncBuffer缓冲区中的数据刷入磁盘中
        public void flush() {
            for (EditLog log : syncBuffer) {
                System.out.println("将edit log写入磁盘文件中:" + log); 
                //正常来说,就是用文件输出流将数据写入磁盘文件中
            }
            syncBuffer.clear();  
        }
    }
}


8.synchronized实现edits log分段加锁机制

logSync()方法通过分两段加synchronized锁,将耗时的刷盘操作放在锁外,然后通过更改标志位以及使用wait()和notify()来控制线程的等待和锁的释放,从而保证高并发下的刷盘性能。

//负责管理edits log日志的核心组件
public class FSEditlog {
    //当前递增到的txid的序号
    private long txidSeq = 0L;
    //内存双缓冲区
    private DoubleBuffer editLogBuffer = new DoubleBuffer();
    //当前是否在将内存缓冲刷入磁盘中
    private volatile Boolean isSyncRunning = false;
    //当前是否有线程在等待刷新下一批edits log到磁盘里去
    private volatile Boolean isWaitSync = false;
    //在同步到磁盘中的最大的一个txid
    private volatile Long syncMaxTxid = 0L;
    //每个线程自己本地的txid副本
    private ThreadLocal localTxid = new ThreadLocal();
   
    //记录edits log日志
    public void logEdit(String content) {
        //这里直接加锁,有线程执行logSync()方法时这里没有其他线程能进来
        synchronized(this) {
            //获取全局唯一递增的txid,代表了edits log的序号
            txidSeq++;
            long txid = txidSeq;
            localTxid.set(txid);
            //构造一条edits log对象
            EditLog log = new EditLog(txid, content);
            //将edits log写入内存缓冲中,不是直接刷入磁盘文件
            editLogBuffer.write(log);  
        }
        //尝试允许某一个执行logEdit()方法的线程,一次性将内存缓冲中的数据刷入到磁盘文件中
        logSync();
    }
   
    //将内存缓冲中的数据刷入磁盘文件中
    //在这里尝试允许某一个线程一次性将内存缓冲中的数据刷入到磁盘文件中
    //相当于批量将内存缓冲的数据刷入磁盘
    private void logSync() {
        //再次尝试加锁,只有一个线程能进来,这个过程很快,纳秒级别,这里属于第一段加锁
        synchronized(this) {
            //如果当前正好有线程在刷内存缓冲到磁盘中去
            if (isSyncRunning) {
                ...
            }

            //交换两块缓冲区
            editLogBuffer.setReadyToSync();

            //然后保存当前要同步到磁盘中的最大txid,此时editLogBuffer中的syncBuffer在交换完以后可能有多条数据
            //而且里面的edits log的txid一定是从小到大的,此时要同步的txid = 6,7,8,9,10,11,12,所以syncMaxTxid = 12
            syncMaxTxid = editLogBuffer.getSyncMaxTxid();

            //设置当前正在同步到磁盘的标志位
            isSyncRunning = true;
        }
      
        //释放锁,开始同步内存缓冲的数据到磁盘文件里去
        //这个过程其实是比较慢,基本上肯定是毫秒级了,弄不好就要几十毫秒
        editLogBuffer.flush();  
      
        //这里属于另外一段加锁
        synchronized(this) {
            //同步完了磁盘之后,就会将标志位复位,再释放锁
            isSyncRunning = false;
            //唤醒可能正在等待他同步完磁盘的线程
            notifyAll();
        }
    }
    ...
}


9.wait()与notify()实现edits log批量刷磁盘

//负责管理edits log日志的核心组件
public class FSEditlog {
    //当前递增到的txid的序号
    private long txidSeq = 0L;
    //内存双缓冲区
    private DoubleBuffer editLogBuffer = new DoubleBuffer();
    //当前是否在将内存缓冲刷入磁盘中
    private volatile Boolean isSyncRunning = false;
    //当前是否有线程在等待刷新下一批edits log到磁盘里去
    private volatile Boolean isWaitSync = false;
    //在同步到磁盘中的最大的一个txid
    private volatile Long syncMaxTxid = 0L;
    //每个线程自己本地的txid副本
    private ThreadLocal localTxid = new ThreadLocal();
   
    //记录edits log日志
    public void logEdit(String content) {
        //这里直接加锁,有线程执行logSync()方法时这里没有其他线程能进来
        synchronized(this) {
            //获取全局唯一递增的txid,代表了edits log的序号
            txidSeq++;
            long txid = txidSeq;
            localTxid.set(txid);
            //构造一条edits log对象
            EditLog log = new EditLog(txid, content);
            //将edits log写入内存缓冲中,不是直接刷入磁盘文件
            editLogBuffer.write(log);  
        }
        //尝试允许某一个执行logEdit()方法的线程,一次性将内存缓冲中的数据刷入到磁盘文件中
        logSync();
    }
   
    //将内存缓冲中的数据刷入磁盘文件中
    //在这里尝试允许某一个线程一次性将内存缓冲中的数据刷入到磁盘文件中
    //相当于批量将内存缓冲的数据刷入磁盘
    private void logSync() {
        //再次尝试加锁,只有一个线程能进来,这个过程很快,纳秒级别,这里属于第一段加锁
        synchronized(this) {
            //如果当前正好有线程在刷内存缓冲到磁盘中去
            if (isSyncRunning) {
                //假如某个线程正在把txid = 6,7,8,9,10,11,12的edits log从syncBuffer刷入磁盘
                //此时syncMaxTxid = 12,代表的是正在刷入磁盘的最大txid
                //那么刷盘的线程释放锁进行刷盘后,这时来一个线程对应的txid = 10,此时它可以直接返回
                //因为它对应的edits log被刷盘的线程正在刷入或者已经刷入磁盘了,这时txid = 12的线程就不需要等待
                long txid = localTxid.get();
                if (txid <= syncMaxTxid) {
                    return;
                }
   
                //此时如果来的是一个txid = 13的线程,那么就会发现已经有线程在等待刷下一批数据到磁盘,此时会直接返回
                if (isWaitSync) {
                    return;
                }

                //此时如果来的是一个txid = 14的线程,并且刷盘还没刷完,
                //那么就在这里等待或者成为下一个刷盘的线程,只有一个线程在等
                isWaitSync = true;
                while (isSyncRunning) {
                    try {
                        wait(2000);//释放锁并自己等2秒或者等别人唤醒
                    } catch (Exception e) {
                        e.printStackTrace();  
                    }
                }
                isWaitSync = false;
            }

            //交换两块缓冲区
            editLogBuffer.setReadyToSync();

            //然后保存当前要同步到磁盘中的最大txid,此时editLogBuffer中的syncBuffer在交换完以后可能有多条数据
            //而且里面的edits log的txid一定是从小到大的,此时要同步的txid = 6,7,8,9,10,11,12,所以syncMaxTxid = 12
            syncMaxTxid = editLogBuffer.getSyncMaxTxid();

            //设置当前正在同步到磁盘的标志位
            isSyncRunning = true;
        }
      
        //释放锁,开始同步内存缓冲的数据到磁盘文件里去
        //这个过程其实是比较慢,基本上肯定是毫秒级了,弄不好就要几十毫秒
        editLogBuffer.flush();  
      
        //这里属于另外一段加锁
        synchronized(this) {
            //同步完了磁盘之后,就会将标志位复位,再释放锁
            isSyncRunning = false;
            //唤醒可能正在等待他同步完磁盘的线程
            notifyAll();
        }
    }
    ...
}


10.i++和AtomicInteger分别实现并发安全

public class AtomicIntegerDemo {
    static Integer i = 0;
    static AtomicInteger j = new AtomicInteger(0); 

    public static void main(String[] args) {
        synchronizedAdd();
        atomicAdd();
    }

    private static void synchronizedAdd() {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                public void run() {
                    //10个线程就要串行依次的一个一个进入锁代码块,然后依次对i变量进行++的操作
                    //每次操作完i++,就写回到主存,下一个线程间进行从主存来加载,再次i++
                    synchronized(AtomicIntegerDemo.class) {
                        System.out.println(++AtomicIntegerDemo.i);
                    }
                }
            }.start();
        }
    }

    private static void atomicAdd() {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                public void run() {
                    //通过atomic实现多线程并发安全
                    System.out.println(AtomicIntegerDemo.j.incrementAndGet()); 
                }
            }.start();
        }
    }
}


11.AtomicInteger中的CAS无锁化原理

(1)CAS简介

(2)Atomic原子类简介


(1)CAS简介

CAS就是Compare and Set,首先判断此时内存中是否是某个值。如果是则修改,如果不是则重新查询最新的值,再执行判断。


(2)Atomic原子类简介

Atomic的原子类分别有:

AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference、LongAdder等。


Atomic原子类底层的核心原理就是CAS,属于一种乐观锁。每次修改时就先对比原值,看看有没有其他线程修改过原值。如果没有修改过就可以修改,如果有修改就重新查出最新值来重复这个过程。


12.Atomic源码之仅限JDK使用的Unsafe类

(1)Atomic原子类通过Unsafe执行CAS操作

(2)Unsafe类仅限JDK内部使用


(1)Atomic原子类通过Unsafe执行CAS操作

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    private volatile int value;
    ...
    //Creates a new AtomicInteger with the given initial value.
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    //Creates a new AtomicInteger with initial value {@code 0}.
    public AtomicInteger() {
        
    }

    //Gets the current value.
    public final int get() {
        return value;
    }

    //Sets to the given value.
    public final void set(int newValue) {
        value = newValue;
    }

    //Eventually sets to the given value.
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    //Atomically sets to the given value and returns the old value.
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    //Atomically sets the value to the given updated value if the current value == the expected value.
    //@param expect the expected value
    //@param update the new value
    //@return true if successful. False return indicates that the actual value was not equal to the expected value.
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    //Atomically increments by one the current value.
    //@return the previous value
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    //Atomically decrements by one the current value.
    //@return the previous value
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

    //Atomically adds the given value to the current value.
    //@param delta the value to add
    //@return the previous value
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    //Atomically increments by one the current value.
    //@return the updated value
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    //Atomically decrements by one the current value.
    //@return the updated value
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    //Atomically adds the given value to the current value.
    //@param delta the value to add
    //@return the updated value
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }
    
    //Atomically updates the current value with the results of applying the given function, returning the previous value. 
    //The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.
    //@param updateFunction a side-effect-free function
    //@return the previous value
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }
    
    //Atomically updates the current value with the results of applying the given function, returning the updated value. 
    //The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.
    //@param updateFunction a side-effect-free function
    //@return the updated value
    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    //Atomically updates the current value with the results of applying the given function to the current and given values, returning the previous value. 
    //The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.  
    //The function is applied with the current value as its first argument, and the given update as the second argument.
    //@param x the update value
    //@param accumulatorFunction a side-effect-free function of two arguments
    //@return the previous value
    public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    //Atomically updates the current value with the results of applying the given function to the current and given values, returning the updated value.
    //The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.  
    //The function is applied with the current value as its first argument, and the given update as the second argument.
    //@param x the update value
    //@param accumulatorFunction a side-effect-free function of two arguments
    //@return the updated value
    public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }
    ...
}

(2)Unsafe类仅限JDK内部使用

Unsafe类是JDK底层的一个类,不允许被实例化和使用它里面的方法。因为Unsafe类的构造函数是私有的,所以不能手动进行实例化。其次调用Unsafe.getUnsafe()方法来获取一个UnSafe实例也不被允许。

public final class Unsafe {
    ...
    private Unsafe() {
    }

    @CallerSensitive
    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }
    ...
}


13.Atomic源码之无限重复循环以及CAS操作

(1)AtomicInteger类的初始化

(2)Unsafe类的getAndAddInt()方法

(3)CAS的底层工作原理

(4)自旋策略与阻塞策略


(1)AtomicInteger类的初始化

AtomicInteger类初始化时,会执行静态代码块,即初始化valueOffset变量value变量在AtomicInteger类中的偏移量


这个valueOffset偏移量可以理解为:value变量在AtomicInteger类的位置。由于valueOffset偏移量是final的,所以一旦初始化完毕就不会再改变。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    private volatile int value;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    //Atomically increments by one the current value.
    //@return the updated value
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    //Atomically sets the value to the given updated value if the current value == the expected value.
    //@param expect the expected value
    //@param update the new value
    //@return true if successful. False return indicates that the actual value was not equal to the expected value.
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    ...
}

(2)Unsafe类的getAndAddInt()方法

public final class Unsafe {
    ...
    //compareAndSwapInt()属于CAS方法
    //这个方法会拿刚刚获取到的那个l值,认为是当前value的最新值
    //然后和当前AtomicInteger对象实例中的value值进行比较
    //如果一样,就将value的值给设置为:l(之前拿到的值) + delta(递增的值)
    //如果不一样,compareAndSwapInt()方法就会返回false,自动进入下一轮while循环
    //直到while循环结束,最后会返回一个l值,是递增delta前的旧值
    public final int getAndAddInt(Object paramObject, long valueOffset, int delta) {
        int l;
        do {
            //Unsafe的getIntVolatile方法会从AtomicInteger对象实例中,
            //根据valueOffset偏移量(value变量的位置),去获取当前value的最新值为l
            l = this.getIntVolatile(paramObject, valueOffset);
        } while(!this.compareAndSwapInt(paramObject, valueOffset, l, l + delta));
        return l;
    }

    public native int getIntVolatile(Object paramObject, long valueOffset);

    //paramObject表示当前的实例对象
    //valueOffset表示实例变量的内存地址偏移量
    //expect表示期望值
    //update表示更新后的值
    public final native boolean compareAndSwapInt(Object paramObject, long valueOffset, int expect, int update);
    ...
}

(3)CAS的底层工作原理

下图表示通过CAS对变量V进行原子更新操作,底层的CAS方法中会传递三个参数。第一个参数V表示要更新的变量,第二个参数E表示期望值,第三个参数U表示更新后的值


更新的方式是:如果V == E,表示预期值和实际值相等,则将V修改成U并返回true;否则修改失败,然后返回false。

(4)自旋策略与阻塞策略

当一个线程拿不到锁时,有两种基本策略。

策略一:放弃CPU进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。

策略二:不放弃CPU,而是进入空转进行不断重试,也就是自旋


单核CPU只能使用策略一,AtomicInteger就是使用了自旋策略。synchronized则是先自旋几圈,自旋后还获取不到锁再阻塞

相关推荐

月薪 4K 到 4W 的运维工程师都经历了什么?

运维工程师在前期是一个很苦逼的工作,在这期间可能干着修电脑、掐网线、搬机器的活,显得没地位!时间也很碎片化,各种零碎的琐事围绕着你,很难体现个人价值,渐渐的对行业很迷茫,觉得没什么发展前途。这些枯燥无...

计算机专业必须掌握的脚本开发语言—shell

提起Shell脚本很多都有了解,因为无论是windows的Dom命令行还是Linux的bash都是它的表现形式,但是很多人不知道它还有一门脚本编程语言,就是ShellScript,我们提起的Shel...

Linux/Shell:排名第四的计算机关键技能

除了编程语言之外,要想找一份计算机相关的工作,还需要很多其他方面的技能。最近,来自美国求职公司Indeed的一份报告显示:在全美工作技能需求中,Linux/Shell技能仅次于SQL、Java、P...

使用Flask应用框架在Centos7.8系统上部署机器学习模型

安装centos7.8虚拟环境1、镜像链接...

shell编程

简介:Shell是一个用C语言编写的程序,它是用户使用Linux的桥梁。Shell既是一种命令语言,又是一种程序设计语言。...

14天shell脚本入门学习-第二天#脚本和参数#排版修正

脚本是一种包含一系列命令的文本文件,通常用于自动化任务。Shell脚本是用Shell命令编写的脚本,可以在命令行中执行。掌握脚本的基础知识和变量的使用是编写高效脚本的关键。...

嵌入式Linux开发教程:Linux Shell

本章重点介绍Linux的常用操作和命令。在介绍命令之前,先对Linux的Shell进行了简单介绍,然后按照大多数用户的使用习惯,对各种操作和相关命令进行了分类介绍。对相关命令的介绍都力求通俗易懂,都给...

实现SHELL中的列表和字典效果

大家好,我是博哥爱运维。编写代码,很多情况下我们需要有种类型来存储数据,在python中有列表和字典,golang中有切片slice和map,那么在shell中,我们能否实现列表和字典呢,答案是肯定的...

14天shell脚本入门学习-第二天#脚本和变量

脚本是一种包含一系列命令的文本文件,通常用于自动化任务。Shell脚本是用Shell命令编写的脚本,可以在命令行中执行。掌握脚本的基础知识和变量的使用是编写高效脚本的关键。...

shell常用命令之awk用法介绍

一、awk介绍awk的强大之处,在于能生成强大的格式化报告。数据可以来自标准输入,一个或多个文件,或者其他命令的输出。他支持用户自定义函数和动态正则表达式等先进功能,是Linux/unix一个强大的文...

Linux编程Shell之入门——Shell数组拼接与合并

在Shell中,可以使用不同的方式实现数组拼接和合并。数组拼接指将两个数组中的元素合并成一个数组,而数组合并指将两个数组逐个组合成一个新数组。以下是关于Shell数组拼接和合并的详细介绍:数...

shell中如何逆序打印数组的内容,或者反转一个数组?

章节索引图首先请注意,有序的概念仅适用于索引数组,而不适用于关联数组。如果没有稀疏数组,答案会更简单,但是Bash的数组可以是稀疏的(非连续索引)。因此,我们需要引入一个额外的步骤。...

如何学好大数据开发?---shell基本语法

昨天我们初步了解到了shell的一些基本知识,比如shell的分类,常用的shell类型。今天就带来大数据开发之shell基本语法,掌握好基础才是最重要的,那接下来就开始学习shell的基本语法。一、...

Linux编程Shell之入门——Shell关联数组

关联数组是Shell中一种特殊的数组类型,它使用字符串作为下标。在关联数组中,每个元素都被标识为一个唯一的字符串键值,也称为关联数组的索引。在Shell中,可以使用declare或typeset命令...

从编译器视角看数组和指针

虽然有单独的文章描述数组和指针,但二者的关系实在值得再写一篇文章。...

取消回复欢迎 发表评论: