您当前的位置: 首页 > 

顧棟

暂无认证

  • 4浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】同步工具类之ThreadLocal

顧棟 发布时间:2022-06-28 21:00:00 ,浏览量:4

ThreadLocal分析

文章目录
  • ThreadLocal分析
    • ThreadLocal定义
    • 示例
    • 组成
      • 内部类SuppliedThreadLocal
      • 内部类ThreadLocalMap
      • 构造方法
      • 核心方法
      • set过程分析
        • void set(T value)
        • 更新和扩容
      • get过程分析
        • get()
      • remove过程分析
        • remove()
    • 内存泄露
      • 如何避免内存泄漏
    • 使用场景
      • 每个线程中的各自维护一个序列号
      • session的管理
      • 线程池中使用threadlocal

ThreadLocal定义

此类提供线程局部变量。这种变量不同于一般变量,因为每个访问该变量(通过ThreadLocal的 get 或 set 方法)的线程都有自己的、独立初始化的变量副本。ThreadLocal实例通常是希望将状态与线程相关联的类中的私有静态字段(例如,用户 ID 或事务 ID)。

只要线程处于活动状态并且ThreadLocal实例可访问,每个线程都持有对其线程局部变量副本的隐式引用; 在线程消失后,它的所有线程本地实例副本都将受到垃圾回收(除非存在对这些副本的其他引用)。 在这里插入图片描述

示例
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadLocalDemo {

    private static final ThreadLocal THREAD_LOCAL_1 = ThreadLocal.withInitial(() -> "0s");
    private static final ThreadLocal THREAD_LOCAL_2 = ThreadLocal.withInitial(() -> "00s");

    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(10);

    private static final ThreadPoolExecutor POOL_EXECUTOR = initThreadPool(5, 10, 1000);

    /**
     * 工作线程
     */
    public static class WorkerThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger nextId = new AtomicInteger(1);

        WorkerThreadFactory(String whatFeatureOfGroup) {
            this.namePrefix = "From WorkerThreadFactory's " + whatFeatureOfGroup + "-Worker-";
        }

        @Override
        public Thread newThread(Runnable task) {
            String name = namePrefix + nextId.getAndIncrement();
            return new Thread(null, task, name, 0);
        }
    }

    /**
     * 初始化线程池
     */
    public static ThreadPoolExecutor initThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime) {
        return new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(1000),
                new WorkerThreadFactory("ThreadLocalDemo"),
                new ThreadPoolExecutor.AbortPolicy());
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i  {
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_1, threadId:" + Thread.currentThread().getId() + ", " + THREAD_LOCAL_1.get());
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_2, threadId:" + Thread.currentThread().getId() + ", " + THREAD_LOCAL_2.get());
                THREAD_LOCAL_1.set("THREAD_LOCAL_1 is " + finalI);
                THREAD_LOCAL_2.set("THREAD_LOCAL_2 is " + finalI);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_1, threadId:" + Thread.currentThread().getId() + ", " + THREAD_LOCAL_1.get());
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_2, threadId:" + Thread.currentThread().getId() + ", " + THREAD_LOCAL_2.get());
                THREAD_LOCAL_1.remove();
                THREAD_LOCAL_2.remove();
                COUNT_DOWN_LATCH.countDown();
            });
        }
        if (COUNT_DOWN_LATCH.await(2, TimeUnit.MINUTES)) {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "]--[" + Thread.currentThread().getName() + "] is over.");
        }

        POOL_EXECUTOR.shutdown();

    }
}

执行结果

[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_1, threadId:13, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_1, threadId:14, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_1, threadId:11, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_2, threadId:13, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_2, threadId:11, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_1, threadId:13, THREAD_LOCAL_1 is 3
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-2] THREAD_LOCAL_1, threadId:12, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_2, threadId:13, THREAD_LOCAL_2 is 3
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-5] THREAD_LOCAL_1, threadId:15, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_1, threadId:13, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-5] THREAD_LOCAL_2, threadId:15, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_2, threadId:13, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-2] THREAD_LOCAL_2, threadId:12, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_1, threadId:13, THREAD_LOCAL_1 is 6
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_1, threadId:11, THREAD_LOCAL_1 is 1
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_2, threadId:14, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_2, threadId:11, THREAD_LOCAL_2 is 1
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_2, threadId:13, THREAD_LOCAL_2 is 6
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_1, threadId:14, THREAD_LOCAL_1 is 4
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_1, threadId:11, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_2, threadId:11, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_2, threadId:14, THREAD_LOCAL_2 is 4
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_1, threadId:11, THREAD_LOCAL_1 is 7
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_1, threadId:14, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_2, threadId:11, THREAD_LOCAL_2 is 7
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_2, threadId:14, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_1, threadId:11, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_1, threadId:14, THREAD_LOCAL_1 is 9
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_2, threadId:11, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-4] THREAD_LOCAL_2, threadId:14, THREAD_LOCAL_2 is 9
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_1, threadId:11, THREAD_LOCAL_1 is 10
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-1] THREAD_LOCAL_2, threadId:11, THREAD_LOCAL_2 is 10
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-2] THREAD_LOCAL_1, threadId:12, THREAD_LOCAL_1 is 2
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-2] THREAD_LOCAL_2, threadId:12, THREAD_LOCAL_2 is 2
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-5] THREAD_LOCAL_1, threadId:15, THREAD_LOCAL_1 is 5
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_1, threadId:13, 0s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-5] THREAD_LOCAL_2, threadId:15, THREAD_LOCAL_2 is 5
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_2, threadId:13, 00s
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_1, threadId:13, THREAD_LOCAL_1 is 8
[17:59:01.646]--[From WorkerThreadFactory's ThreadLocalDemo-Worker-3] THREAD_LOCAL_2, threadId:13, THREAD_LOCAL_2 is 8
[17:59:01.646]--[main] is over.
组成 内部类SuppliedThreadLocal

ThreadLocal 的扩展,从指定的供应商处获取其初始值。

    static final class SuppliedThreadLocal extends ThreadLocal {

        private final Supplier supplier;

        SuppliedThreadLocal(Supplier supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
    }
内部类ThreadLocalMap

ThreadLocalMap是一个自定义的哈希映射,仅适用于维护线程本地值。 不会在ThreadLocal类之外导出任何操作。 该类是包私有的,以允许在类 Thread中声明字段,ThreadLocal.ThreadLocalMap threadLocals。哈希表的Entry继承了WeakReferences 作为key,它的key的值为ThreadLocalvalueThreadLocal对应的值。 threadLocal.set(5)会将threadLocal5作为键值对保存在该线程的threadLocals里。

构造方法

这是个空的构造函数,若需要设置初始值,可以调用withInitial

public ThreadLocal() {
}
public static  ThreadLocal withInitial(Supplier supplier) {
    return new SuppliedThreadLocal(supplier);
}
核心方法 方法名描述set(T value)将此线程局部变量的当前线程副本设置为指定值。大多数子类不需要重写此方法,仅依靠initialValue方法来设置线程局部变量的值。T get()返回此线程局部变量的当前线程副本中的值。 如果变量没有当前线程的值,则首先将其初始化为调用initialValue方法返回的值。void remove()删除此线程局部变量的当前线程值。 如果这个线程局部变量随后被当前线程读取,它的值将通过调用它的initialValue方法重新初始化,除非它的值是由当前线程在中间设置的。 这可能会导致在当前线程中多次调用initialValue方法。 set过程分析 void set(T value)

为线程局部的当前线程的副本设置值

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}
  1. 先获取当前线程,通过getMap方法获取对应线程的threadLocals变量
  2. 如果threadLocals变量不为空,则调用set方法为当前线程的线程局部变量副本设置value
  3. 如果threadLocals变量为空,则调用createMap方法,创建ThreadLocalMap,同时设置了线程局部变量副本的值。在把ThreadLocalMap对象赋给threadLocals变量。
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

Thread类中的threadLocals

    /* 与此线程有关的 ThreadLocal 值。 此映射由 ThreadLocal 类维护。 */
    ThreadLocal.ThreadLocalMap threadLocals = null;

set(ThreadLocal key, Object value)

private void set(ThreadLocal key, Object value) {
    // 
    Entry[] tab = table;
    int len = tab.length;
    // 1.通过key的hashcode计算出索引的位置
    int i = key.threadLocalHashCode & (len-1);
    // 2.从索引位置开始遍历,通过nextIndex方法寻找下一个索引位置
    for (Entry e = tab[i];
         e != null; // 这行代码什么意思?
         
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal k = e.get();

        // 当key与Entry[]数组中的k相同,则更新value。
        if (k == key) {
            e.value = value;
            return;
        }

        // 如果遍历到某个Entry的k为空,则需要清空key为null的Entry
        if (k == null) {
            // 继续寻找key的存放的下标,并清理key为空的Entry
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    // 如果通过nextIndex寻找到一个空位置(代表没有找到key相同的),则将元素放在该位置上
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 调用cleanSomeSlots方法清理key为null的Entry,并判断是否需要扩容,如果需要则调用rehash方法进行扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

nextIndex(int i, int len)

/** 在len长度中寻找i之后的下标*/
private static int nextIndex(int i, int len) {
    return ((i + 1  key)

private Entry getEntry(ThreadLocal key) {
    // 计算key对于的索引位置
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    // 若找到对应的key与传入的key相等,则返回。
    if (e != null && e.get() == key)
        return e;
    else
    // 若e为null 或者 e对应的key与传入的key不一致,则继续寻找
        return getEntryAfterMiss(key, i, e);
}

getEntryAfterMiss(ThreadLocal key, int i, Entry e)

private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;
    // 若e不为空继续遍历寻找
    while (e != null) {
        ThreadLocal k = e.get();
        // 找到目标Entry则返回。
        if (k == key)
            return e;
        // 若发现key为null,则触发了清除逻辑
        if (k == null)
            expungeStaleEntry(i);
        else
        // 寻找下一个位置,获取对应的Entry
            i = nextIndex(i, len);
        e = tab[i];
    }
    // 找不到则null
    return null;
}
remove过程分析 remove()
public void remove() {
    // 获取当前线程的ThreadLocalMap
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}
  1. 获取当前线程的threadLocals属性,如果不为空,则将key为当前ThreadLocal的键值对移除,同时会调用expungeStaleEntry()方法清除key为null的Entry。

remove(ThreadLocal key)

        private void remove(ThreadLocal key) {
            Entry[] tab = table;
            int len = tab.length;
            // 根据hashCode计算出当前ThreadLocal的索引位置
            int i = key.threadLocalHashCode & (len-1);
            // 从i开始遍历,直到Entry为null
            for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
                // 找到对应的key
                if (e.get() == key) {
                    // clear()方法会清空key的引用
                    e.clear();
                    // 从i开始清除key为null的元素
                    expungeStaleEntry(i);
                    return;
                }
            }
        }
内存泄露

内存泄漏(Memory Leak)是指程序中已动态分配的堆内存由于某种原因程序未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果。

*内存溢出(Out Of Memory,简称OOM)*是指应用系统中存在无法回收的内存或使用的内存过多,最终使得程序运行要用到的内存大于能提供的最大内存。

由于过多的内存泄露会造成内存溢出。 在这里插入图片描述 ThreadLocalMap使用ThreadLocal的弱引用作为Entrykey,如果一个ThreadLocal没有外部强引用来引用它,下一次系统GC时,这个ThreadLocal必然会被回收,这样一来,ThreadLocalMap中就会出现keynullEntry,就没有办法访问这些key为nullEntryvalue

如果当前线程一直在运行,并且一直不执行getsetremove方法,这些keynullEntryvalue就会一直存在一条强引用链:Thread Ref -> Thread -> ThreadLocalMap -> Entry -> value,导致这些keynullEntryvalue永远无法回收,造成内存泄漏。

内存泄露的总内存就是各线程中key为null的value的总和

如何避免内存泄漏

为了避免这种情况,我们可以在使用完ThreadLocal后,手动调用remove方法,以避免出现内存泄漏。

THREAD_LOCAL_1.remove();
THREAD_LOCAL_2.remove();
使用场景 每个线程中的各自维护一个序列号
public class SerialNum {
    // The next serial number to be assigned
    private static int nextSerialNum = 0;

    private static ThreadLocal serialNum = new ThreadLocal() {
        protected synchronized Object initialValue() {
            return new Integer(nextSerialNum++);
        }
    };

    public static int get() {
        return ((Integer) (serialNum.get())).intValue();
    }
}
session的管理
private static final ThreadLocal threadSession = new ThreadLocal();  
  
public static Session getSession() throws InfrastructureException {  
    Session s = (Session) threadSession.get();  
    try {  
        if (s == null) {  
            s = getSessionFactory().openSession();  
            threadSession.set(s);  
        }  
    } catch (HibernateException ex) {  
        throw new InfrastructureException(ex);  
    }  
    return s;  
}  
线程池中使用threadlocal
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadLocalDemo {

    private static final ThreadLocal THREAD_LOCAL_1 = ThreadLocal.withInitial(() -> "0s");
    private static final ThreadLocal THREAD_LOCAL_2 = new ThreadLocal();
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(10);
    private static final ThreadPoolExecutor POOL_EXECUTOR = initThreadPool(5, 10, 1000);

    /**
     * 工作线程
     */
    public static class WorkerThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger nextId = new AtomicInteger(1);

        WorkerThreadFactory(String whatFeatureOfGroup) {
            this.namePrefix = "From WorkerThreadFactory's " + whatFeatureOfGroup + "-Worker-";
        }

        @Override
        public Thread newThread(Runnable task) {
            String name = namePrefix + nextId.getAndIncrement();
            return new Thread(null, task, name, 0);
        }
    }

    /**
     * 初始化线程池
     */
    public static ThreadPoolExecutor initThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime) {
        return new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(1000),
                new WorkerThreadFactory("ThreadLocalDemo"),
                new ThreadPoolExecutor.AbortPolicy());
    }

    static class Student {
        int age;

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }

    private static Student getStudent() {
        Student s = THREAD_LOCAL_2.get();
        if (null == s) {
            s = new Student();
            s.setAge(5);
            THREAD_LOCAL_2.set(s);
        }else {
            s.setAge(55);
            THREAD_LOCAL_2.set(s);
        }
        return s;
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i  {
                Student student = getStudent();
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_1, threadId:" + Thread.currentThread().getId() + ", " + THREAD_LOCAL_1.get());
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_2, threadId:" + Thread.currentThread().getId() + ", " + student.getAge());
                THREAD_LOCAL_1.set("THREAD_LOCAL_1 is " + finalI);

                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())
                        + "]--[" + Thread.currentThread().getName() + "] THREAD_LOCAL_1, threadId:" + Thread.currentThread().getId() + ", " + THREAD_LOCAL_1.get());
                THREAD_LOCAL_1.remove();
                COUNT_DOWN_LATCH.countDown();
            });
            THREAD_LOCAL_2.remove();
        }
        if (COUNT_DOWN_LATCH.await(2, TimeUnit.MINUTES)) {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "]--[" + Thread.currentThread().getName() + "] is over.");
        }

        POOL_EXECUTOR.shutdown();

    }
}

参考文章

https://zhuanlan.zhihu.com/p/34406557

关注
打赏
1663402667
查看更多评论
0.2027s