线程基础、线程之间的共享和协作
线程基础、线程之间的共享和协作
作者:IT王小二
一、基础概念
1. 进程和线程
- 进程是操作系统进行程序运行资源(资源包括:CPU、内存空间、磁盘 IO 等)分配的最小单位,例如我们的 idea,chrome就是一个进程。
- 线程是 CPU 调度的最小单位,必须依赖于进程而存在,一个进程存在一个或多个线程。
2. CPU核心数和线程数的关系
- 多核心:也指单芯片多处理器( Chip Multiprocessors,简称 CMP),CMP 是由美斯坦福大学提出的,其思想是将大规模并行处理器中的 SMP(对称多处理器)集成到同一芯片内,各个处理器并行执行不同的进程。这种依靠多个 CPU 同时并行地运行程序是实现超高速计算的一个重要方向,称为并行处理。
- 多线程: Simultaneous Multithreading.简称 SMT,让同一个处理器上的多个线程同步执行并共享处理器的执行资源。
- 核心数、线程数:目前主流 CPU 都是多核的。增加核心数目就是为了增加线程数,因为操作系统是通过线程来执行任务的,一般情况下它们是 1:1 对应关系,就是说四核 CPU 一般拥有四个线程。但 Intel 引入超线程技术后,使核心数与线程数形成 1:2 的关系。
3. CPU时间片轮转机制
时间片轮转调度是一种最古老,最简单,最公平且使用最广的算法。每个进程被分配一时间段,称作它的时间片,即该进程允许运行的时间。 -- 百度百科
1、怎么理解呢?
理解来就是 cpu 在每一个时间刻上会分配执行一个线程。
2、那为什么我们在运行程序时启动 10 个线程也可以正常的跑呢?
由于 cpu 的速度非常的快,所以你感觉不到它的切换过程。
4. 并行和并发
- 并发:指应用能够交替执行不同的任务,比如单 CPU 核心下执行多线程并非是同时执行多个任务,如果你开两个线程执行,就是在你几乎不可能察觉到的速度不断去切换这两个任务,已达到"同时执行效果",其实并不是的,只是计算机的速度太快,我们无法察觉到而已。
- 并行:指应用能够同时执行不同的任务,例:吃饭的时候可以边吃饭边打电话, 这两件事情可以同时执行。
- 两者区别:一个是交替执行,一个是同时执行。
5. 高并发编程的意义和好处
- 充分利用cpu资源。
- 加快用户响应时间。
- 使代码模块化,异步化,简单化。
6. 多线程编程需要注意的问题
- 线程之间的安全性。
- 线程之间的死锁。
- 线程太多了会将服务器资源耗尽形成死机当机。
二、Java中线程的生命周期
首先,Java程序天生就是多线程的,线程的生命周期有如下几种状态。
大致分为:新建、就绪、运行、阻塞、死亡,其中特别注意,运行状态都是从就绪状态转变的。
1. 新建
新建线程有几种方式呢?
我看别人的博客,有人说2种,有人说3种,甚至还有人说4种,于是去翻了下jdk8源码,官方说了,就是2种。
截图为证:
当然,源码中也说明了创建线程的两种方式。
- 继承Thread,重写run方法
- 实现Runnable,实现run方法
2. 就绪
启动线程后进入就绪状态。
启动切记注意使用 start()方法,而不是直接调用run方法,使用run方法只是普通的方法调用,而不是启动线程。
启动线程的方式
- 继承Thread:XThread extends Thread,即:new XThread.start();
- 实现Runnable:XRunnable implements Runnable,即:new Thread(new XRunnable()).start();
Thread 和 Runnable 区别
Thread 才是 Java 里对线程的唯一抽象,Runnable 只是对任务(业务逻辑)的抽象。Thread 可以接受任意一个 Runnable 的实例并执行。
3. 运行
根据CPU 时间片轮转机制,当时间片轮转到这个线程运行时间时,就会变成运行状态。
4. 阻塞
当调用某些阻塞方法时,线程进入阻塞状态。
5. 死亡
死亡状态,要么是 run 执行完成了,要么是抛出了一个未处理的异常导致线程提前结束,要么使用了 stop() 方法(当然,stop方法已经不推荐使用了)。
三、线程的终止和中断
jdk为什么不推荐使用suspend()、resume()、 stop()这些api方法呢 (分别对应暂停、恢复、停止)
- 以 suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
- 同样,stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。
- 正因为 suspend()、resume()和 stop()方法带来的副作用,这些方法才被标注为不建议使用的过期方法。
知道stop()方法已经不推荐使用了,那么怎么来安全终止线程呢?
答案是使用interrupt()方法来中断线程
什么是 中断线程 呢?
中断好比其他线程对该线程打了个招呼,“A,你要中断了”,不代表线程 A 会立即停止自己的工作,同样的 A 线程完全可以不理会这种中断请求,因为 java 里的线程是协作式的,不是抢占式的。线程通过检查自身的中断标志位是否被置为 true 来进行响应。
线程中断相关方法
- interrupt() // 中断线程,更改中断标志位为 true
- isInterrupted() // 判断线程是否被中断
- Thread.interrupted() // 静态方法,判断线程是否被中断,和 isInterrupted() 不同的是,调用后中断标志位会变成false,建议使用 isInterrupted()
注意点:
如果一个线程处于了阻塞状态(如线程调用了 thread.sleep、thread.join、thread.wait 等),则在线程在检查中断标识时如果发现中断标识为 true,则会在这些阻塞方法调用处抛出 InterruptedException 异常,并且在抛出异常后会立即将线程的中断标识位清除,即重新设置为 false 。
那怎么解决这个问题呢,只要在catch里面再次调用interrupt()即可,例如:
private static class UseThread extends Thread{
public UseThread(String name) {
super(name);
}
@Override
public void run() {
while(!isInterrupted()) {
try {
// 休眠100毫秒
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()
+" in InterruptedException interrupt flag is "
+isInterrupted());
// 资源释放
interrupt();
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ " I am extends Thread.");
}
System.out.println(Thread.currentThread().getName()
+" interrupt flag is "+isInterrupted());
}
}
public static void main(String[] args) throws InterruptedException {
Thread endThread = new UseThread("HasInterrputEx");
endThread.start();
Thread.sleep(500);
endThread.interrupt();
}
四、其他方法
1、yield()方法
- 使当前线程让出 CPU 占有权,但让出的时间是不可设定的,也不会释放锁资源,注意:并不是每个线程都需要这个锁的,而且执行 yield()的线程不一定就会持有锁,我们完全可以在释放锁后再调用 yield 方法
- 所有执行 yield()的线程有可能在进入到 就绪状态 后会被操作系统再次选中马上又被执行
2、join()方法
- 调用某线程的该方法,将当前线程和该线程合并,等待该线程结束,再恢复当前线程的运行
- 例如在线程 B 中调用了线程 A 的 Join()方法,直到线程 A 执行完毕后,才会继续执行线程 B
3、setDaemon(true)方法
- 设置线程为守护线程,通常情况下用不到
- 守护线程:是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作,例如gc垃圾回收线程就是一个守护线程
- 注意点:守护线程被用作完成支持性工作,但是在 Java 虚拟机退出时守护线程中的 finally 块并不一定会执行。在构建 Daemon 线程时,不能依靠 finally 块中的内容来确保执行关闭或清理资源的逻辑
4、setPriority(int)方法
设置线程优先级,默认优先级是 5(范围为1~10),优先级高的线程分配时间片的数量 可能 要多于优先级低的线程,但是也只是可能,没有什么实际意义
5、wait()/notify()/notifyAll():后面详细描述
五、线程间的共享
1. synchronized内置锁
线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,包括数据之间的共享,协同处理事情。这将会带来巨大的价值。Java 支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized。
可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。
简单使用示例
public class SynTest {
private long count =0;
private Object obj = new Object(); // 作为一个锁
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
/**
* 用在同步块上
*/
public void incCount(){
synchronized (obj){
count++;
}
}
/**
* 用在方法上
*/
public synchronized void incCount2(){
count++;
}
/**
* 用在同步块上,但是锁的是当前类的对象实例
*/
public void incCount3(){
synchronized (this){
count++;
}
}
private static class Count extends Thread{
private SynTest simplOper;
public Count(SynTest simplOper) {
this.simplOper = simplOper;
}
@Override
public void run() {
for(int i=0;i<10000;i++){
simplOper.incCount(); // count = count + 10000
}
}
}
public static void main(String[] args) throws InterruptedException {
SynTest simplOper = new SynTest();
// 启动两个线程
Count count1 = new Count(simplOper);
Count count2 = new Count(simplOper);
count1.start();
count2.start();
Thread.sleep(50);
System.out.println(simplOper.count); // 20000
}
}
对synchronized的理解、对象锁和类锁
synchronized锁锁的都是对象,并且只有锁的是 同一个对象,注意,必须是对象,并且还是同一个对象,才可以保证数据的同步。
对象锁和类锁
首先类锁是什么呢?
/**
* 对象锁
*/
public synchronized void incCount2(){
count++;
}
/**
* 类锁
*/
public synchronized static void incCount2(){
count++;
}
// 或者
private static Object o = new Object();
private static void incCount2() {
synchronized(o) {
count++;
}
}
其实类锁只是一个大家的俗称,在 Java 中是没有类锁这个概念,为什么这么说呢?
通过代码可以看到,其实第一个对象锁得是对象本身(即this); 类锁加上了static,但是它锁得的calss对象,即在方法区里面的那个唯一的class对象
2. volatile
volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
使用方法,加上volatile修饰即可,下面代码如果可以去掉volatile对比查看效果。
public class VolatileTest {
private volatile static boolean ready;
private static int number;
private static class PrintThread extends Thread {
@Override
public void run() {
System.out.println("PrintThread is running.......");
while (!ready) {
//无限循环
}
System.out.println("number = " + number);
}
}
public static void main(String[] args) throws InterruptedException {
new PrintThread().start();
Thread.sleep(1000);
number = 50;
ready = true;
Thread.sleep(5000);
System.out.println("main is ended!");
}
}
不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环,而加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。
但是 volatile 不能保证数据在多个线程下同时写时的线程安全,即多个线程同时修改一个volatile修饰的变量时无法保证其原子性,所以volatile的适用场景:一个线程写,多个线程读
3. ThreadLocal
和synchronized比较
ThreadLocal 和 Synchonized 都用于解决多线程并发访问。可是 ThreadLocal 与 synchronized 有本质的差别。synchronized 是利用锁的机制,使变量或代码块在某一时该仅仅能被一个线程访问。而 ThreadLocal 为每个线程都提供了变量的副本,使得每个线程在某一时间访问到的并非同一个对象,这样就隔离了多个线程对数据的数据共享。
Spring 的事务就借助了 ThreadLocal 类。Spring 会从数据库连接池中获得一connection,然会把 connection 放进 ThreadLocal 中,也就和线程绑定了,事务需要提交或者回滚,只要从 ThreadLocal 中拿到 connection 进行操作即可。
ThreadLocal的使用
方法 | 描述 |
---|---|
void set (Object value) | 设置当前线程的线程局部变量的值 |
public Object get() | 该方法返回当前线程所对应的线程局部变量 |
public void remove() | 将当前线程局部变量的值删除,目的是为了减少内存的占用,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度 |
protected Object initialValue() | 返回该线程局部变量的初始值,该方法是一个 protected 的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第 1 次调用 get() 或 set(Object)时才执行,并且仅执行 1 次。ThreadLocal 中的缺省实现直接返回一个 null |
示例代码,可以发现各线程之间互不影响。
public class UseThreadLocal {
private static ThreadLocal<Integer> intLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 1;
}
};
private static ThreadLocal<String> stringThreadLocal;
/**
* 运行3个线程
*/
public void startThreadArray() {
Thread[] runs = new Thread[3];
for (int i = 0; i < runs.length; i++) {
runs[i] = new Thread(new TestThread(i));
}
for (int i = 0; i < runs.length; i++) {
runs[i].start();
}
}
/**
* 类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
*/
public static class TestThread implements Runnable {
int id;
public TestThread(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":start");
Integer s = intLocal.get();
s = s + id;
intLocal.set(s);
System.out.println(Thread.currentThread().getName() + ":" + intLocal.get());
// intLocal.remove();
}
}
public static void main(String[] args) {
UseThreadLocal test = new UseThreadLocal();
test.startThreadArray();
}
}
内存泄漏
在 ThreadLocal 的 set()、get()、remove() 方法中,都有清除无效 Entry 的操作,这样做是为了降低内存泄漏发生的可能,我们可以去看一下get()的方法的源码,一路往下看,可以看到:
图中可以看到使用了一个弱引用,并且他的key使用的是ThreadLocal实例,而值即我们设置的值或者默认的null
- 因为使用的是弱引用,所以当ThreadLoacl没有指向他的引用,而ThreadLocalMap里面的key和ThreadLocal实例是弱引用,所以ThreadLocal实例会被回收。
- 如果使用的是强引用的话,引用 ThreadLocal 的对象被回收了,但是 ThreadLocalMap还持有 ThreadLocal 的强引用,如果没有手动删除,ThreadLocal 的对象实例不会被回收,导致 Entry 内存泄漏。
总结
- JVM 利用设置 ThreadLocalMap 的 Key 为弱引用,来避免内存泄露。
- JVM 利用调用 remove、get、set 方法的时候,回收弱引用。
- 当 ThreadLocal 存储很多 Key 为 null 的 Entry 的时候,而不再去调用 remove、get、set 方法,那么将导致内存泄漏,所以我们使用完毕后最好手动调用一次 remove方法。
- 使用线程池 + ThreadLocal 时要小心,因为这种情况下,线程是一直在不断的重复运行的,从而也就造成了 value 可能造成累积的情况。
六、线程之间的协作
线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者。
简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题:
- 难以确保及时性
- 难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费
1. 等待/通知机制
那么怎么来保证各线程之间的协作呢,这就要使用等待,通知机制了。
方法 | 描述 |
---|---|
wait() | 调用该方法的线程进入 waiting 状态,只有等待另外线程的通知或被中断才会返回。需要注意,调用 wait()方法后,会释放对象的锁。 |
notify() | 通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入 WAITING 状态。 |
notifyAll() | 通知所有等待在该对象上的线程。 |
2. 等待和通知的标准范式
等待方
- 获取对象的锁
- 如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件
- 条件满足则执行对应的逻辑
synchronized(对象) {
while(条件不满足) {
对象.wait();
}
对应的处理逻辑
}
通知方
- 获得对象的锁
- 改变条件
- 通知所有等待在对象上的线程
synchronized(对象) {
改变条件
对象.notifyAll();
}
3. notify和notifyAll应该用谁
尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。
4. 模拟实现数据库连接池原理
1、首先实现一个java.sql.Connection类,里面的方法就实现具体内容了,因为只是模拟连接池原理。
public class SqlConnectImpl implements Connection {
/**
* 得到一个数据库连接
*/
public static final Connection fetchConnection(){
return new SqlConnectImpl();
}
// 省略具体实现的方法
}
2、创建连接池。
public class DBPool {
private static LinkedList<Connection> pool = new LinkedList<Connection>();
/**
* 初始化连接池大小
* @param initialSize 连接数量
*/
public DBPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
}
/**
* 释放连接
*/
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool) {
pool.addLast(connection);
// 通知其他等待连接的线程
pool.notifyAll();
}
}
}
/**
* 获取连接,在mills内无法获取连接,将会返回null,即超时
* 当mills <= 0时代表永不超时
* @return
*/
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized(pool) {
if (mills <= 0) {
// 永不超时
while (pool.size() == 0) {
pool.wait();
}
return pool.removeFirst();
} else {
// 超时时刻
long timeout = System.currentTimeMillis()+mills;
// 等待时长
long remaining = mills;
while (pool.size() == 0 && remaining > 0) {
// 连接池为连接为空并且未超时,进入waiting状态
pool.wait();
// 唤醒一次重新计算时长
remaining = timeout - System.currentTimeMillis();
}
Connection connection = null;
if (pool.size() != 0 && remaining > 0) {
// 从连接池获取一个连接
connection = pool.removeFirst();
}
return connection;
}
}
}
}
3、测试连接池。
public class DBPoolTest {
static DBPool pool = new DBPool(10);
// 控制器:控制主线程等待所有worker线程结束后才继续执行
static CountDownLatch end;
public static void main(String[] args) throws InterruptedException {
// 线程数量
int threadCount = 50;
end = new CountDownLatch(threadCount);
// 每个线程的操作次数
int count = 20;
// 计数器:统计可以拿到连接的线程
AtomicInteger got = new AtomicInteger();
// 计数器:统计没有拿到连接的线程
AtomicInteger notGot = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new Worker(count, got, notGot), "thread_worker_" + i);
thread.start();
}
// main线程在此处等待
end.await();
System.out.println("总共尝试了: " + (threadCount * count));
System.out.println("拿到连接的次数: " + got);
System.out.println("没能连接的次数: " + notGot);
}
static class Worker implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public Worker(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
while (count > 0) {
try {
// 从线程池中获取连接,如果1ms内无法获取到,将会返回null
// 分别统计连接获取的数量got和未获取到的数量notGot
Connection connection = pool.fetchConnection(1);
if (connection != null) {
// 获取到了连接
try {
connection.createStatement();
// PreparedStatement preparedStatement = connection.prepareStatement("");
// preparedStatement.execute();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
// 超时
notGot.incrementAndGet();
System.out.println(Thread.currentThread().getName() + "等待超时!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
count--;
}
}
end.countDown();
}
}
}
5. 调用yield() 、sleep()、wait()、notify()等方法对锁有何影响?
- yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。
- 调用 wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行 wait 方法后面的代码。
- 调用 notify()系列方法后,对锁无影响,线程只有在 synchronized 同步代码执行完后才会自然而然的释放锁,所以 notify()系列方法一般都是 synchronized 同步代码的最后一行。