线程

线程是什么?

1.线程是CPU调度的最小单位.由于CPU计算指令可以达到数十亿次/秒.因此表现出来的就是线程在并发执行.

2.通常的线程的调度方式有:分时调度和抢占式调度.目前大部分都是抢占式调度.

3.java中的线程调度是交给了操作系统来执行的.因此java中的线程也有优先级.1-10,数量越大,优先级越高,默认值为5.需要注意的是,并不是优先级越大,肯定会先执行,只能说大概率会先执行.

线程的创建方式

1.继承Thread

2.实现runable接口

3.通过callable和FutureTask创建有返回值的线程.

查看UML图:

可以看到,Thread包含Runable属性,中间用一个RunableFuture接口是去继承Runable接口和Future接口.然后FutureTask去实现RunableFuture接口,从而可以让FutureTask能够作为Thread的构造参数.Callable又是FutureTask的成员属性,最终call执行结果保存在FutureTask中,通过Future的get()获取执行结果.

4.通过线程池.

线程的生命周期

1.NEW状态

新建一个线程,但是还没有start的时候,线程的默认状态.

2.RUNABLE状态

调用start()方法后的状态.因为java的线程管理是有操作系统管理的,操作系统中包含就绪状态和运行状态.当调用start()方法后,在操作系统中首先是就绪状态,但是要等到CPU时间片来执行,然后变成运行状态,如果在一个时间片没有执行完,会变回就绪状态,然后等待下次的时间片,如此反复,直到程序执行完或者异常退出.

3.TERMAINTED状态

线程执行完或者异常退出的状态.

4.TIME_WAITING状态

限时等待状态.

能让线程进入此状态的方法通常有:

a.thread.join()方法

b.object.wait()方法.

c.Thread.sleep()方法.

5.BLOCKING状态

a.等待获取锁的时候,进入堵塞状态

b.io堵塞,比如:等待用户的输入

线程的操作

sleep()

线程进入TIME_WAITING状态

interrupt()

将线程设置为中断状态.这个命令的执行有2个作用:

a.会让处于TIME_WAITING状态的线程报异常退出

b.给线程设置中断状态标记,从而使开发者能够根据这个状态做出相应的处理.

join()

假如A线程的执行需要B线程,就可以使用此方法.

在A线程的run()执行:threadB.join();

yiead()

让出当前cpu时间片,重新调度.

deamon()

守护线程,和jvm进程绑定.

线程池技术

线程池调度流程

使用线程池优势

1.降低资源消耗,线程是稀缺资源,并且在新建和销毁都会占用不少资源.

2.提高响应速度.因为可以重用已经创建好的线程.

3.进行线程监控.

实例:

package com.lenovo.javautils.threads;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @description: 自定义线程池
 * @author: dcx
 * @create: 2020-10-25 15:51
 **/
@Slf4j
public enum MyThreadPoolExecutor {
    //创建实例
    INSTANCE;
    private final ThreadPoolExecutor pool;
    private static final String poolName = MyThreadPoolExecutor.class.getSimpleName();

    private static final ThreadLocal<Long> start = new ThreadLocal<>();
    private static final ThreadLocal<String> currentThreadName = new ThreadLocal<>();

    MyThreadPoolExecutor() {
        pool = new ThreadPoolExecutor(10, 10, 3,
                TimeUnit.HOURS, new LinkedBlockingQueue<>(200),
                new MyThreadFactory(), new CustomRejectedExecutionHandler()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                //任务开始执行
                log.info("thread start:{}", t.getName());
                start.set(System.currentTimeMillis());
                currentThreadName.set(t.getName());
                super.beforeExecute(t, r);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                //任务执行完毕
                super.afterExecute(r, t);
                log.info("thread:{}, end,useTime:{}ms", currentThreadName.get(), System.currentTimeMillis() - start.get());
                start.remove();
                currentThreadName.remove();
            }

            @Override
            protected void terminated() {
                //线程池终止
                super.terminated();
                log.info("threadPool stop,msg:{}", poolName);
            }
        };
        initGracefullyShutDown();
    }

    public ThreadPoolExecutor getInstance() {
        return pool;
    }


    /**
     * 线程池创建工厂
     */
    private static class MyThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(poolName + count.addAndGet(1));
            return thread;
        }
    }

    /**
     * 自定义堵塞策略
     */
    private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.error("threadpool reject,currentCount:{}", executor.getTaskCount());
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    executor);
        }
    }

    /**
     * 添加钩子函数
     */
    private void initGracefullyShutDown() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> shutDownThreadPool(pool, poolName)));
    }

    /**
     * 优雅关闭线程池。
     * 自身关闭,await 60s,强制关闭。
     */
    private void shutDownThreadPool(ExecutorService threadPool, String alias) {
        log.info("Start to shutdown the thead pool : {}", alias);

        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
                log.warn("Interrupt the worker, which may cause some task inconsistent");
                if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.warn("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent.");
                }
            }
        } catch (InterruptedException ie) {
            threadPool.shutdownNow();
            log.warn("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state.");
            Thread.currentThread().interrupt();
        }
    }
}

使用:

package com.lenovo.javautils.threads;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class MyThreadPoolExecutorUse {

    public static void submitPool() throws ExecutionException, InterruptedException {
        ThreadPoolExecutor instance = MyThreadPoolExecutor.INSTANCE.getInstance();
        for (int i = 0; i < 10; i++) {
            Future<?> submit = instance.submit(() -> {
                System.out.println("submitPool");
                System.out.println("submitPool1");
                return 1;
            });
            System.out.println(submit.get());
        }
    }

    public static void excPool() {
        ThreadPoolExecutor instance = MyThreadPoolExecutor.INSTANCE.getInstance();
        for (int i = 0; i < 10; i++) {
            instance.execute(() -> {
                System.out.println("excPool");
                System.out.println("excPool1");
            });
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //需要获取返回值
        submitPool();
        //不需要获取返回值
        excPool();
        System.exit(0);
    }
}

拒绝策略:

在例子中我们虽重写了拒绝策略,但是也只是增加了一些日志,返回了拒绝报异常策略.

jdk提供的拒绝策略有:

AbortPolicy:默认的策略,抛异常.

CallerRunsPolicy:让提交任务的当前线程执行.

DiscardPolicy:丢弃,不做任何处理和通知.静默处理.

DiscardOldestPolicy:从队列里面poll最先加入的线程,丢弃最老的策略.

线程数的确定

IO密集型任务

线程数:2n.

Runtime.getRuntime().availableProcessors()*2;

CPU密集型任务

n

Runtime.getRuntime().availableProcessors();

ThreadLocal

简介

本地线程变量,是线程隔离的.一种无锁的解决线程安全问题.

使用场景

线程隔离

常用于一个用户的会话信息,数据库连接,http请求.Session管理

跨函数传递数据

源码分析

thread包含threadLocalMap

threadLocalMap是一种简易版的hashmap结构

需要注意的是,Entry是一种弱引用,好处是防止内存泄露.

当方法执行完后,方法中的threadLocal被销毁,但是Entry中的threadLocal还存在,如果不是弱引用,将导致内存溢出.

threadLocal使用原则

1.使用private static final修饰ThreadLocal

private和final防止被修改.

static可以保证全局唯一.

2.threadLocal使用完后务必手动调用remove方法.可以简单有效的避免内存溢出.

Last updated

Was this helpful?