跳至主要內容

线程基础、线程之间的共享和协作

IT王小二约 5356 字

线程基础、线程之间的共享和协作

作者:IT王小二

博客:https://itwxe.comopen in new window

一、基础概念

1. 进程和线程

  • 进程是操作系统进行程序运行资源(资源包括:CPU、内存空间、磁盘 IO 等)分配的最小单位,例如我们的 idea,chrome就是一个进程。
  • 线程是 CPU 调度的最小单位,必须依赖于进程而存在,一个进程存在一个或多个线程。

2. CPU核心数和线程数的关系

  • 多核心:也指单芯片多处理器( Chip Multiprocessors,简称 CMP),CMP 是由美斯坦福大学提出的,其思想是将大规模并行处理器中的 SMP(对称多处理器)集成到同一芯片内,各个处理器并行执行不同的进程。这种依靠多个 CPU 同时并行地运行程序是实现超高速计算的一个重要方向,称为并行处理。
  • 多线程: Simultaneous Multithreading.简称 SMT,让同一个处理器上的多个线程同步执行并共享处理器的执行资源。
  • 核心数、线程数:目前主流 CPU 都是多核的。增加核心数目就是为了增加线程数,因为操作系统是通过线程来执行任务的,一般情况下它们是 1:1 对应关系,就是说四核 CPU 一般拥有四个线程。但 Intel 引入超线程技术后,使核心数与线程数形成 1:2 的关系。

3. CPU时间片轮转机制

时间片轮转调度是一种最古老,最简单,最公平且使用最广的算法。每个进程被分配一时间段,称作它的时间片,即该进程允许运行的时间。 -- 百度百科

1、怎么理解呢?

理解来就是 cpu 在每一个时间刻上会分配执行一个线程。

2、那为什么我们在运行程序时启动 10 个线程也可以正常的跑呢?

由于 cpu 的速度非常的快,所以你感觉不到它的切换过程。

4. 并行和并发

  • 并发:指应用能够交替执行不同的任务,比如单 CPU 核心下执行多线程并非是同时执行多个任务,如果你开两个线程执行,就是在你几乎不可能察觉到的速度不断去切换这两个任务,已达到"同时执行效果",其实并不是的,只是计算机的速度太快,我们无法察觉到而已。
  • 并行:指应用能够同时执行不同的任务,例:吃饭的时候可以边吃饭边打电话, 这两件事情可以同时执行。
  • 两者区别:一个是交替执行,一个是同时执行。

5. 高并发编程的意义和好处

  • 充分利用cpu资源。
  • 加快用户响应时间。
  • 使代码模块化,异步化,简单化。

6. 多线程编程需要注意的问题

  • 线程之间的安全性。
  • 线程之间的死锁。
  • 线程太多了会将服务器资源耗尽形成死机当机。

二、Java中线程的生命周期

首先,Java程序天生就是多线程的,线程的生命周期有如下几种状态。

Java中线程的生命周期

大致分为:新建、就绪、运行、阻塞、死亡,其中特别注意,运行状态都是从就绪状态转变的。

1. 新建

新建线程有几种方式呢?

我看别人的博客,有人说2种,有人说3种,甚至还有人说4种,于是去翻了下jdk8源码,官方说了,就是2种。

截图为证:

Java创建线程的方式

当然,源码中也说明了创建线程的两种方式。

  • 继承Thread,重写run方法
  • 实现Runnable,实现run方法

2. 就绪

启动线程后进入就绪状态。

启动切记注意使用 start()方法,而不是直接调用run方法,使用run方法只是普通的方法调用,而不是启动线程。

启动线程的方式

  • 继承Thread:XThread extends Thread,即:new XThread.start();
  • 实现Runnable:XRunnable implements Runnable,即:new Thread(new XRunnable()).start();

Thread 和 Runnable 区别

Thread 才是 Java 里对线程的唯一抽象,Runnable 只是对任务(业务逻辑)的抽象。Thread 可以接受任意一个 Runnable 的实例并执行。

3. 运行

根据CPU 时间片轮转机制,当时间片轮转到这个线程运行时间时,就会变成运行状态。

4. 阻塞

当调用某些阻塞方法时,线程进入阻塞状态。

5. 死亡

死亡状态,要么是 run 执行完成了,要么是抛出了一个未处理的异常导致线程提前结束,要么使用了 stop() 方法(当然,stop方法已经不推荐使用了)。

三、线程的终止和中断

jdk为什么不推荐使用suspend()、resume()、 stop()这些api方法呢 (分别对应暂停、恢复、停止)

  • 以 suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
  • 同样,stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。
  • 正因为 suspend()、resume()和 stop()方法带来的副作用,这些方法才被标注为不建议使用的过期方法。

知道stop()方法已经不推荐使用了,那么怎么来安全终止线程呢?

答案是使用interrupt()方法来中断线程

什么是 中断线程 呢?

中断好比其他线程对该线程打了个招呼,“A,你要中断了”,不代表线程 A 会立即停止自己的工作,同样的 A 线程完全可以不理会这种中断请求,因为 java 里的线程是协作式的,不是抢占式的。线程通过检查自身的中断标志位是否被置为 true 来进行响应。

线程中断相关方法

  • interrupt() // 中断线程,更改中断标志位为 true
  • isInterrupted() // 判断线程是否被中断
  • Thread.interrupted() // 静态方法,判断线程是否被中断,和 isInterrupted() 不同的是,调用后中断标志位会变成false,建议使用 isInterrupted()

注意点:

如果一个线程处于了阻塞状态(如线程调用了 thread.sleep、thread.join、thread.wait 等),则在线程在检查中断标识时如果发现中断标识为 true,则会在这些阻塞方法调用处抛出 InterruptedException 异常,并且在抛出异常后会立即将线程的中断标识位清除,即重新设置为 false 。

那怎么解决这个问题呢,只要在catch里面再次调用interrupt()即可,例如:

private static class UseThread extends Thread{
	
	public UseThread(String name) {
		super(name);
	}
	
	@Override
	public void run() {
		while(!isInterrupted()) {
			try {
			    // 休眠100毫秒
				Thread.sleep(100);
			} catch (InterruptedException e) {
				System.out.println(Thread.currentThread().getName()
						+" in InterruptedException interrupt flag is "
						+isInterrupted());
				// 资源释放
				interrupt();
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()
					+ " I am extends Thread.");
		}
		System.out.println(Thread.currentThread().getName()
				+" interrupt flag is "+isInterrupted());
	}
}

public static void main(String[] args) throws InterruptedException {
	Thread endThread = new UseThread("HasInterrputEx");
	endThread.start();
	Thread.sleep(500);
	endThread.interrupt();
}

四、其他方法

1、yield()方法

  • 使当前线程让出 CPU 占有权,但让出的时间是不可设定的,也不会释放锁资源,注意:并不是每个线程都需要这个锁的,而且执行 yield()的线程不一定就会持有锁,我们完全可以在释放锁后再调用 yield 方法
  • 所有执行 yield()的线程有可能在进入到 就绪状态 后会被操作系统再次选中马上又被执行

2、join()方法

  • 调用某线程的该方法,将当前线程和该线程合并,等待该线程结束,再恢复当前线程的运行
  • 例如在线程 B 中调用了线程 A 的 Join()方法,直到线程 A 执行完毕后,才会继续执行线程 B

3、setDaemon(true)方法

  • 设置线程为守护线程,通常情况下用不到
  • 守护线程:是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作,例如gc垃圾回收线程就是一个守护线程
  • 注意点:守护线程被用作完成支持性工作,但是在 Java 虚拟机退出时守护线程中的 finally 块并不一定会执行。在构建 Daemon 线程时,不能依靠 finally 块中的内容来确保执行关闭或清理资源的逻辑

4、setPriority(int)方法

设置线程优先级,默认优先级是 5(范围为1~10),优先级高的线程分配时间片的数量 可能 要多于优先级低的线程,但是也只是可能,没有什么实际意义

5、wait()/notify()/notifyAll():后面详细描述

五、线程间的共享

1. synchronized内置锁

线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,包括数据之间的共享,协同处理事情。这将会带来巨大的价值。Java 支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized。

可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。

简单使用示例

public class SynTest {

	private long count =0;
	private Object obj = new Object(); // 作为一个锁

	public long getCount() {
		return count;
	}

	public void setCount(long count) {
		this.count = count;
	}

	/**
	 * 用在同步块上
	 */
	public void incCount(){
		synchronized (obj){
			count++;
		}
	}

	/**
	 * 用在方法上
	 */
	public synchronized void incCount2(){
			count++;
	}

	/**
	 * 用在同步块上,但是锁的是当前类的对象实例
	 */
	public void incCount3(){
		synchronized (this){
			count++;
		}
	}

	private static class Count extends Thread{

		private SynTest simplOper;

		public Count(SynTest simplOper) {
			this.simplOper = simplOper;
		}

		@Override
		public void run() {
			for(int i=0;i<10000;i++){
				simplOper.incCount(); // count = count + 10000
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		SynTest simplOper = new SynTest();
		// 启动两个线程
		Count count1 = new Count(simplOper);
		Count count2 = new Count(simplOper);
		count1.start();
		count2.start();
		Thread.sleep(50);
		System.out.println(simplOper.count); // 20000
	}
}

对synchronized的理解、对象锁和类锁

synchronized锁锁的都是对象,并且只有锁的是 同一个对象,注意,必须是对象,并且还是同一个对象,才可以保证数据的同步。

对象锁和类锁

首先类锁是什么呢?

/**
 * 对象锁
 */
public synchronized void incCount2(){
    count++;
}

/**
 * 类锁
 */
public synchronized static void incCount2(){
    count++;
}

// 或者 
private static Object o = new Object();
private static void incCount2() {
    synchronized(o) {
        count++;
    }
}

其实类锁只是一个大家的俗称,在 Java 中是没有类锁这个概念,为什么这么说呢?

通过代码可以看到,其实第一个对象锁得是对象本身(即this); 类锁加上了static,但是它锁得的calss对象,即在方法区里面的那个唯一的class对象

2. volatile

volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

使用方法,加上volatile修饰即可,下面代码如果可以去掉volatile对比查看效果。

public class VolatileTest {
    private volatile static boolean ready;
    private static int number;

    private static class PrintThread extends Thread {
        @Override
        public void run() {
            System.out.println("PrintThread is running.......");
            while (!ready) {
                //无限循环
            }
            System.out.println("number = " + number);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new PrintThread().start();
        Thread.sleep(1000);
        number = 50;
        ready = true;
        Thread.sleep(5000);
        System.out.println("main is ended!");
    }
}

不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环,而加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。

但是 volatile 不能保证数据在多个线程下同时写时的线程安全,即多个线程同时修改一个volatile修饰的变量时无法保证其原子性,所以volatile的适用场景:一个线程写,多个线程读

3. ThreadLocal

和synchronized比较

ThreadLocal 和 Synchonized 都用于解决多线程并发访问。可是 ThreadLocal 与 synchronized 有本质的差别。synchronized 是利用锁的机制,使变量或代码块在某一时该仅仅能被一个线程访问。而 ThreadLocal 为每个线程都提供了变量的副本,使得每个线程在某一时间访问到的并非同一个对象,这样就隔离了多个线程对数据的数据共享。

Spring 的事务就借助了 ThreadLocal 类。Spring 会从数据库连接池中获得一connection,然会把 connection 放进 ThreadLocal 中,也就和线程绑定了,事务需要提交或者回滚,只要从 ThreadLocal 中拿到 connection 进行操作即可。

ThreadLocal的使用

方法描述
void set (Object value)设置当前线程的线程局部变量的值
public Object get()该方法返回当前线程所对应的线程局部变量
public void remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度
protected Object initialValue()返回该线程局部变量的初始值,该方法是一个 protected 的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第 1 次调用 get() 或 set(Object)时才执行,并且仅执行 1 次。ThreadLocal 中的缺省实现直接返回一个 null

示例代码,可以发现各线程之间互不影响。

public class UseThreadLocal {

    private static ThreadLocal<Integer> intLocal = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 1;
        }
    };

    private static ThreadLocal<String> stringThreadLocal;

    /**
     * 运行3个线程
     */
    public void startThreadArray() {
        Thread[] runs = new Thread[3];
        for (int i = 0; i < runs.length; i++) {
            runs[i] = new Thread(new TestThread(i));
        }
        for (int i = 0; i < runs.length; i++) {
            runs[i].start();
        }
    }

    /**
     * 类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
     */
    public static class TestThread implements Runnable {
        int id;

        public TestThread(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":start");
            Integer s = intLocal.get();
            s = s + id;
            intLocal.set(s);
            System.out.println(Thread.currentThread().getName() + ":" + intLocal.get());
            // intLocal.remove();
        }
    }

    public static void main(String[] args) {
        UseThreadLocal test = new UseThreadLocal();
        test.startThreadArray();
    }
}

内存泄漏

在 ThreadLocal 的 set()、get()、remove() 方法中,都有清除无效 Entry 的操作,这样做是为了降低内存泄漏发生的可能,我们可以去看一下get()的方法的源码,一路往下看,可以看到:

内存泄漏

图中可以看到使用了一个弱引用,并且他的key使用的是ThreadLocal实例,而值即我们设置的值或者默认的null

  • 因为使用的是弱引用,所以当ThreadLoacl没有指向他的引用,而ThreadLocalMap里面的key和ThreadLocal实例是弱引用,所以ThreadLocal实例会被回收。
  • 如果使用的是强引用的话,引用 ThreadLocal 的对象被回收了,但是 ThreadLocalMap还持有 ThreadLocal 的强引用,如果没有手动删除,ThreadLocal 的对象实例不会被回收,导致 Entry 内存泄漏。

总结

  • JVM 利用设置 ThreadLocalMap 的 Key 为弱引用,来避免内存泄露。
  • JVM 利用调用 remove、get、set 方法的时候,回收弱引用。
  • 当 ThreadLocal 存储很多 Key 为 null 的 Entry 的时候,而不再去调用 remove、get、set 方法,那么将导致内存泄漏,所以我们使用完毕后最好手动调用一次 remove方法。
  • 使用线程池 + ThreadLocal 时要小心,因为这种情况下,线程是一直在不断的重复运行的,从而也就造成了 value 可能造成累积的情况。

六、线程之间的协作

线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者。

简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题:

  • 难以确保及时性
  • 难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费

1. 等待/通知机制

那么怎么来保证各线程之间的协作呢,这就要使用等待,通知机制了。

方法描述
wait()调用该方法的线程进入 waiting 状态,只有等待另外线程的通知或被中断才会返回。需要注意,调用 wait()方法后,会释放对象的锁。
notify()通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入 WAITING 状态。
notifyAll()通知所有等待在该对象上的线程。

2. 等待和通知的标准范式

等待方

  • 获取对象的锁
  • 如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件
  • 条件满足则执行对应的逻辑
synchronized(对象) {
	while(条件不满足) {
		对象.wait();
	}
	对应的处理逻辑
}

通知方

  • 获得对象的锁
  • 改变条件
  • 通知所有等待在对象上的线程
synchronized(对象) {
	改变条件
	对象.notifyAll();
}

3. notify和notifyAll应该用谁

尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。

4. 模拟实现数据库连接池原理

1、首先实现一个java.sql.Connection类,里面的方法就实现具体内容了,因为只是模拟连接池原理。

public class SqlConnectImpl implements Connection {

    /**
     * 得到一个数据库连接
     */
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }

    // 省略具体实现的方法

}

2、创建连接池。

public class DBPool {
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    /**
     * 初始化连接池大小
     * @param initialSize 连接数量
     */
    public DBPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
    }

    /**
     * 释放连接
     */
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                pool.addLast(connection);
                // 通知其他等待连接的线程
                pool.notifyAll();
            }
        }
    }

    /**
     * 获取连接,在mills内无法获取连接,将会返回null,即超时
     * 当mills <= 0时代表永不超时
     * @return
     */
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized(pool) {
            if (mills <= 0) {
                // 永不超时
                while (pool.size() == 0) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                // 超时时刻
                long timeout = System.currentTimeMillis()+mills;
                // 等待时长
                long remaining = mills;
                while (pool.size() == 0 && remaining > 0) {
                    // 连接池为连接为空并且未超时,进入waiting状态
                    pool.wait();
                    // 唤醒一次重新计算时长
                    remaining = timeout - System.currentTimeMillis();
                }
                Connection connection = null;
                if (pool.size() != 0 && remaining > 0) {
                    // 从连接池获取一个连接
                    connection = pool.removeFirst();
                }
                return connection;
            }
        }
    }
}

3、测试连接池。

public class DBPoolTest {

    static DBPool pool = new DBPool(10);
    // 控制器:控制主线程等待所有worker线程结束后才继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws InterruptedException {
        // 线程数量
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        // 每个线程的操作次数
        int count = 20;
        // 计数器:统计可以拿到连接的线程
        AtomicInteger got = new AtomicInteger();
        // 计数器:统计没有拿到连接的线程
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new Worker(count, got, notGot), "thread_worker_" + i);
            thread.start();
        }
        // main线程在此处等待
        end.await();
        System.out.println("总共尝试了: " + (threadCount * count));
        System.out.println("拿到连接的次数:  " + got);
        System.out.println("没能连接的次数: " + notGot);
    }

    static class Worker implements Runnable {
        int count;
        AtomicInteger got;
        AtomicInteger notGot;

        public Worker(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            while (count > 0) {
                try {
                    // 从线程池中获取连接,如果1ms内无法获取到,将会返回null
                    // 分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1);
                    if (connection != null) {
                        // 获取到了连接
                        try {
                            connection.createStatement();
//                            PreparedStatement preparedStatement = connection.prepareStatement("");
//                            preparedStatement.execute();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        // 超时
                        notGot.incrementAndGet();
                        System.out.println(Thread.currentThread().getName() + "等待超时!");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }
            end.countDown();
        }

    }
}

5. 调用yield() 、sleep()、wait()、notify()等方法对锁有何影响?

  • yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。
  • 调用 wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行 wait 方法后面的代码。
  • 调用 notify()系列方法后,对锁无影响,线程只有在 synchronized 同步代码执行完后才会自然而然的释放锁,所以 notify()系列方法一般都是 synchronized 同步代码的最后一行。