线程
线程是什么?
1.线程是CPU调度的最小单位.由于CPU计算指令可以达到数十亿次/秒.因此表现出来的就是线程在并发执行.
2.通常的线程的调度方式有:分时调度和抢占式调度.目前大部分都是抢占式调度.
3.java中的线程调度是交给了操作系统来执行的.因此java中的线程也有优先级.1-10,数量越大,优先级越高,默认值为5.需要注意的是,并不是优先级越大,肯定会先执行,只能说大概率会先执行.
线程的创建方式
1.继承Thread
2.实现runable接口
3.通过callable和FutureTask创建有返回值的线程.

查看UML图:

可以看到,Thread包含Runable属性,中间用一个RunableFuture接口是去继承Runable接口和Future接口.然后FutureTask去实现RunableFuture接口,从而可以让FutureTask能够作为Thread的构造参数.Callable又是FutureTask的成员属性,最终call执行结果保存在FutureTask中,通过Future的get()获取执行结果.
4.通过线程池.
线程的生命周期
1.NEW状态
新建一个线程,但是还没有start的时候,线程的默认状态.
2.RUNABLE状态
调用start()方法后的状态.因为java的线程管理是有操作系统管理的,操作系统中包含就绪状态和运行状态.当调用start()方法后,在操作系统中首先是就绪状态,但是要等到CPU时间片来执行,然后变成运行状态,如果在一个时间片没有执行完,会变回就绪状态,然后等待下次的时间片,如此反复,直到程序执行完或者异常退出.
3.TERMAINTED状态
线程执行完或者异常退出的状态.
4.TIME_WAITING状态
限时等待状态.
能让线程进入此状态的方法通常有:
a.thread.join()方法
b.object.wait()方法.
c.Thread.sleep()方法.
5.BLOCKING状态
a.等待获取锁的时候,进入堵塞状态
b.io堵塞,比如:等待用户的输入

线程的操作
sleep()
线程进入TIME_WAITING状态
interrupt()
将线程设置为中断状态.这个命令的执行有2个作用:
a.会让处于TIME_WAITING状态的线程报异常退出
b.给线程设置中断状态标记,从而使开发者能够根据这个状态做出相应的处理.
join()
假如A线程的执行需要B线程,就可以使用此方法.
在A线程的run()执行:threadB.join();
yiead()
让出当前cpu时间片,重新调度.
deamon()
守护线程,和jvm进程绑定.
线程池技术
线程池调度流程

使用线程池优势
1.降低资源消耗,线程是稀缺资源,并且在新建和销毁都会占用不少资源.
2.提高响应速度.因为可以重用已经创建好的线程.
3.进行线程监控.
实例:
package com.lenovo.javautils.threads;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @description: 自定义线程池
* @author: dcx
* @create: 2020-10-25 15:51
**/
@Slf4j
public enum MyThreadPoolExecutor {
//创建实例
INSTANCE;
private final ThreadPoolExecutor pool;
private static final String poolName = MyThreadPoolExecutor.class.getSimpleName();
private static final ThreadLocal<Long> start = new ThreadLocal<>();
private static final ThreadLocal<String> currentThreadName = new ThreadLocal<>();
MyThreadPoolExecutor() {
pool = new ThreadPoolExecutor(10, 10, 3,
TimeUnit.HOURS, new LinkedBlockingQueue<>(200),
new MyThreadFactory(), new CustomRejectedExecutionHandler()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
//任务开始执行
log.info("thread start:{}", t.getName());
start.set(System.currentTimeMillis());
currentThreadName.set(t.getName());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//任务执行完毕
super.afterExecute(r, t);
log.info("thread:{}, end,useTime:{}ms", currentThreadName.get(), System.currentTimeMillis() - start.get());
start.remove();
currentThreadName.remove();
}
@Override
protected void terminated() {
//线程池终止
super.terminated();
log.info("threadPool stop,msg:{}", poolName);
}
};
initGracefullyShutDown();
}
public ThreadPoolExecutor getInstance() {
return pool;
}
/**
* 线程池创建工厂
*/
private static class MyThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(poolName + count.addAndGet(1));
return thread;
}
}
/**
* 自定义堵塞策略
*/
private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("threadpool reject,currentCount:{}", executor.getTaskCount());
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
executor);
}
}
/**
* 添加钩子函数
*/
private void initGracefullyShutDown() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutDownThreadPool(pool, poolName)));
}
/**
* 优雅关闭线程池。
* 自身关闭,await 60s,强制关闭。
*/
private void shutDownThreadPool(ExecutorService threadPool, String alias) {
log.info("Start to shutdown the thead pool : {}", alias);
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
log.warn("Interrupt the worker, which may cause some task inconsistent");
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
log.warn("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent.");
}
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
log.warn("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state.");
Thread.currentThread().interrupt();
}
}
}
使用:
package com.lenovo.javautils.threads;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class MyThreadPoolExecutorUse {
public static void submitPool() throws ExecutionException, InterruptedException {
ThreadPoolExecutor instance = MyThreadPoolExecutor.INSTANCE.getInstance();
for (int i = 0; i < 10; i++) {
Future<?> submit = instance.submit(() -> {
System.out.println("submitPool");
System.out.println("submitPool1");
return 1;
});
System.out.println(submit.get());
}
}
public static void excPool() {
ThreadPoolExecutor instance = MyThreadPoolExecutor.INSTANCE.getInstance();
for (int i = 0; i < 10; i++) {
instance.execute(() -> {
System.out.println("excPool");
System.out.println("excPool1");
});
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//需要获取返回值
submitPool();
//不需要获取返回值
excPool();
System.exit(0);
}
}
拒绝策略:
在例子中我们虽重写了拒绝策略,但是也只是增加了一些日志,返回了拒绝报异常策略.
jdk提供的拒绝策略有:
AbortPolicy:默认的策略,抛异常.
CallerRunsPolicy:让提交任务的当前线程执行.
DiscardPolicy:丢弃,不做任何处理和通知.静默处理.
DiscardOldestPolicy:从队列里面poll最先加入的线程,丢弃最老的策略.
线程数的确定
IO密集型任务
线程数:2n.
Runtime.getRuntime().availableProcessors()*2;
CPU密集型任务
n
Runtime.getRuntime().availableProcessors();
ThreadLocal
简介
本地线程变量,是线程隔离的.一种无锁的解决线程安全问题.
使用场景
线程隔离
常用于一个用户的会话信息,数据库连接,http请求.Session管理
跨函数传递数据
源码分析
thread包含threadLocalMap

threadLocalMap是一种简易版的hashmap结构

需要注意的是,Entry是一种弱引用,好处是防止内存泄露.

当方法执行完后,方法中的threadLocal被销毁,但是Entry中的threadLocal还存在,如果不是弱引用,将导致内存溢出.

threadLocal使用原则
1.使用private static final修饰ThreadLocal
private和final防止被修改.
static可以保证全局唯一.
2.threadLocal使用完后务必手动调用remove方法.可以简单有效的避免内存溢出.
Last updated
Was this helpful?