从不足到精进:H5即开并行加载方案的演进之路

news/2025/12/4 11:04:50/文章来源:https://www.cnblogs.com/vivotech/p/19306015

作者: vivo 互联网客户端团队- Chen Long

并行加载是 H5 即开 SDK 的加速技术,通过 native 层在用户打开页面时并行请求关键资源(如 index.html 和 CSR 模式 API),利用 webview 初始化时间窗口提前发起请求,减少加载耗时。其核心挑战是解决 webview 与并行任务间的资源交接问题。

 

1分钟看图掌握核心观点👇

图片

一、并行加载能力核心解析

1.1 什么是并行加载

并行加载是H5即开SDK提供的一项加速能力,核心逻辑是:在用户打开页面时,通过native层并行请求关键资源,减少页面加载耗时。其本质是利用webview及native页面初始化的时间窗口,提前发起资源请求,实现"时间复用"。

 

1.2 核心加载资源

并行加载的关键资源包含两类:

  • H5页面首帧渲染依赖的index.html(首帧资源,加载时机极早)。

  • CSR模式下页面依赖的API接口(通常在首帧渲染后调用,加载时机较晚)。

 

1.3 工作流程示意图

图片

 

即开SDK在流程中主要完成两件事:

  1. 用户打开URL时,并行请求H5依赖的API接口或index.html文件。

  2. 拦截webview请求,将并行加载的缓存数据导流给webview,实现加速。

 

二、并行加载的核心挑战:资源交接场景

并行加载的核心问题是如何在webview需要资源时,将并行请求的资源无缝交接。根据资源状态可分为三类场景

 

下面我们针对三个场景分别看处理方案

2.1 场景一:网络数据尚未响应,webview开始需要资源

解决方案一

忽略网络数据,webview自己加载资源。

矛盾点:

如果是因为服务器压力大,网络环境差导致的响应慢,webview自己去加载也会遇到同样的问题,而且直接放弃已并行加载的任务,等于是浪费了已经处于建联中,可能已经完成了一部分等待的时间。

 

解决方案二

webview等待网络资源加载成功后,再使用加载成功的资源。

矛盾点:

让webview的资源获取线程等待并行任务响应并返回,那么等多久,如果时间太久怎么办,需要设置超时时间,如何等待,并且在等待的过程中还要监控并行任务是否已经返回。

 

2.2 场景二:网络数据已响应,webview开始需要资源

场景二需要考虑两种情况

情况一

在webvie需要数据的时候,网络数据流刚好完成建联,webview可以直接使用网络数据流加载。

 

情况二

网络数据流建联的时候,webview还未开始需要使用数据,并行任务有时间将网络数据流读取到缓冲区中,webview在需要数据的时候,可以读取缓冲区的数据。

矛盾点:

大家知道网络数据读取是有受限于网络环境影响的,预先进行网络的数据读取,再交接给webview,肯定能更大程度上降低读取耗时,但是问题又来了,如果并行任务在读取的过程中,还没有读完,webview就来要数据怎么办,让webview等待吗?如果等待,等待多久?如果不等待,又如何将已读取在缓冲区中的数据和未读取的网络流数据一起交给webview呢。

 

2.3 场景三:网络数据返回错误,webview开始需要资源

解决方案

并行任务已失败,直接废弃并行任务,让webview自己加载资源。

 

三、早期方案设计与局限

在最开始,我们希望并行任务设计的足够简单,基于以上所有场景下的理解,权衡开发难度,我们设计了第一个方案。

 

首先我们希望避开场景二中,网络数据流已建联,网络线程正在读取网络流到缓冲区,读到一半webview来取数据的场景,因为我们觉得这种场景较为复杂,如果返回混合流,可能会出现控制不好的情况,而且整个过程中,两条线程参与的生产者和消费者,存在一个中间态,也就是生产者生产到一半时,消费者过来要消费数据,生产者要立刻停止生产,并把未成品交给消费者,这显然比常规的生产者消费者模式更加复杂,于是我们决定用更简单的方法来处理,方案就是把这个较为复杂的生产者消费者,变回简单的典型生产者消费者,消费者不能打断生产者的生产过程,而是等待生产完成,避免中间态下的复杂处理,虽然做了妥协,但是我们依然希望有较好的性能,所以我们将index.html任务和CSR模式下的API任务分为两个不同的方式来进行处理。

 

3.1 index.html首帧资源处理

index.html,这是webview完成初始化后第一个要加载的资源,俗称首帧资源。因此index.html的使用时机可以说是非常的靠前,在并行加载任务中,它的可用并行加载时间也大致在100ms左右,我们认为在这个时间内,并行任务大概率可以完成建联,但是可能没有时间再完成数据流的读取。

 

因此,我们使用了stream对象来保存网络数据流。

 

用户点击H5入口按钮,并行index任务发起,访问网络开始建联,一但完成有效建联,则保存网络stream对象,当webview需要使用该数据流的时候,判定stream对象是否存在,如果存在,则直接使用该数据流,如果stream对象不存在,则判定网络访问是否返回,如果是因为网络访问失败导致stream为空,则认为是并行任务失败,否则会进入等待模式,等待模式下,线程建立一个循环体,每隔5ms,探测一次stream是否存在,网络是否已完成建联,一但探测到结果,则退出循环体,即使多次没有探测到结果,1500ms后依然退出循环体,此时放弃并行任务数据,改为webview自主加载资源。

 

我们来看下这个方案

循环体的等待机制实现了消费者等待生产者完成生产的过程,5ms的检测时机,实现了生产者生产完成后,消费者知晓生产者生产状态的能力,1500ms的的超时退出,解决了生产者出现问题时,消费者困在循环等待中的情况。整个方案,只在保存的stream对象上添加volatile关键字,实现轻量的对象可见性线程同步。

 

整体方案逻辑如下:

 

3.1.1 index并行任务发起

flowchart TD
    A[用户点击H5入口按钮] --> B[并行发起index任务]
    B --> C[native访问网络开始建联]
    C --> D{建联成功?}
    D -->|是| E[保存网络Stream对象]
    E -->F[记录结束状态]
    D -->|否| F[记录结束状态]

 

图片

3.1.2 webview使用index资源

flowchart TD
    A[用户点击H5入口按钮] --> B[webview初始化]
    B --> C{需要使用index数据流}
    C -->|是| D[查找stream对象是否存在]
    D -->|存在| E[直接使用数据流]
    D -->|不存在| F{网络访问已结束?}
    F -->|是且stream为空| G[判定并行任务失败]
    F -->|否| H[进入等待模式]
    H --> I[建立探测循环体]
    I --> J[每隔5ms探测]
    J --> K{stream存在?}
    K -->|是| L[使用数据流并退出循环]
    K -->|否| M{并行任务已结束?}
    M -->|是| N[放弃并行任务并退出循环]
    M -->|否| O{达到1500ms?}
    O -->|是| P[超时退出循环]
    O -->|否| J
    P --> Q[webview自主加载资源]
    N --> Q
    L --> R[流程结束]
    G --> Q
    Q --> R

 

图片

 

3.2 API 接口资源处理

API接口通常在页面完成第一次渲染后,才开始调用,这个时候页面可能会展示一个loading状态,或者是骨架屏,这个时机相对靠后,并行任务有充足的时间来完成网络的建联任务,甚至有充足的时间将网络流读取到缓冲区中。

 

针对这种情况,我们保存了一个byte数组,将网络流数据读取到这个byte数组中,webview需要使用数据时,将byte数组包装成ByteArrayInputStream返回。

 

逻辑图如下:

3.2.1 API并行任务发起

flowchart TD
    A[用户点击H5入口按钮] --> B[并行发起index任务]
    B --> C[native访问网络开始建联]
    C --> D{建联成功?}
    D -->|是| E[读取网络stream]
    E --> F[保存到本地byte数组]
    D -->|否| G[记录结束状态]
    F --> H[流程完成]
    G --> H

 

图片

 

3.2.2 webview使用API资源

flowchart TD
    A[用户点击H5入口按钮] --> B[webview初始化]
    B --> C{需要使用index数据流}
    C -->|是| D[查找byte数组是否存在]
    D -->|存在| E[封装为ByteArrayInputStream返回给webview]
    E --> X[流程结束]
    
    D -->|不存在| G{网络访问已结束?}
    G -->|是且byte数组为空| H[判定并行任务失败]
    G -->|否| I[进入等待模式]
    
    I --> J[建立探测循环体]
    J --> K[每隔5ms探测]
    K --> L{byte数组存在?}
    L -->|是| M[封装使用并退出循环]
    L -->|否| N{并行任务已结束?}
    N -->|是| O[放弃并行任务并退出]
    N -->|否| P{达到1500ms?}
    P -->|是| Q[超时退出]
    P -->|否| K
    
    M --> E
    O --> R[webview自主加载]
    Q --> R
    H --> R
    R --> X

 

图片

 

从上述的逻辑图中可以看到,一但网络建联的时间不足,数据响应不及时,webview就会处于等待状态。

 

有两种场景会导致webview建立线程循环体等待资源:

场景一:并行发起的网络请求,因为网络速度慢,尚未建联返回有效的网络stream。

 

场景二:在并行API的场景下,虽然已经建联,但是网络stream数据读取中,尚未完全读取到缓冲区,webview会等待缓冲区缓冲完成。

 

这两种场景,都会造成时间的浪费

场景一的等待虽然不可避免,但是webview的循环体检测机制,每隔5ms才会检测一次,最差的情况下,无效等待时间可以最长达到5ms(上一次循环检测刚结束,网络即完成建联,需要下一个5ms之后才会发起检测)。

 

场景二的无效等待时间就更长了,即使已经完成建联,webview也要等待缓存全部完成,待缓存完成之后,webview又再次从缓冲区读取一次数据,全量缓存不就浪费了时间,还浪费了内存。

 

3.3 早期方案的核心局限

  • 时间浪费:循环探测(每 5ms 一次)存在无效等待

  • 内存浪费:API 接口全量缓存占用额外内存

  • 资源利用率低:放弃部分加载的资源,未充分利用并行请求的时间窗口

 

四、方案演进:优化时间与内存消耗

在反思早期方案的弊端中,我们提出了内存的消耗和时间上的浪费,那么新方案的优化侧重点就是优化内存的消耗和循环等待时间上的消耗,优化耗时的优化大刀是直接干掉循环等待,优化内存消耗的优化大刀是干掉全量的缓存建立,但是做加法简单,做减法就没那么容易。

 

4.1 优化思路

新方案的核心是解决中间态交接问题,通过线程同步和流桥接技术实现优化。

整体的构思如下:

  • 干掉循环等待:用线程同步机制替代轮询

  • 干掉全量缓存:采用半缓冲模式,仅缓存部分数据

  • 支持中间态交接:允许 webview 打断并行任务,合并已缓冲与未缓冲数据

 

4.2 技术实现:生产者 - 消费者模型

4.2.1 核心逻辑

我们把网络建联+网络数据读取到缓冲区的整个过程,看作是生产过程, 用一个生产者消费者模型来描述这个过程

  • 生产者接到生产产品任务

  • 消费者随时过来消费产品

  • 如果生产者还未开始生产,消费者可以等待一段时间,但是如果超时就放弃等待

  • 如果生产者正在生产,消费者可以随时可以打断生产,拿走生产了一半的产品,消费者完成剩下产品的生产任务

  • 如果生产者已经完成生产,消费者就可以拿走全部产品

 

这里涉及到两个技术点

  • 生产者在生产过程中随时被打断

  • 生产者生产了一半的产品可以被用来使用

 

我们尝试使用线程间同步机制和桥接流技术来实现这些需求:

 

4.3 代码核心实现

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.sleep;public class Main2 {       //模拟响应体
       public static class ResponseBody {
            InputStream stream;             public ResponseBody(InputStream stream){
                 this.stream = stream;
            }            public InputStream byteStream(){
                return stream;
            }
        }        public static class SyncLoadResponseBean {            private static final String TAG = "SyncLoadResponseBean";            // 状态常量定义
            public static final int INIT = 1;  // 初始状态
            public static final int READY = 2; // 数据准备就绪
            public static final int OFFER = 3; // 数据已提供
            public static final int DROP = 4;  // 数据已丢弃            private final String mRequestUrl;
            private final ConcurrentHashMap<String, SyncLoadResponseBean> mSyncLoadCache;
            private final AtomicInteger mStatus = new AtomicInteger(INIT); // 状态机            // 数据存储相关
            private Map<String, String> mResponseHeader;
            private ByteArrayOutputStream mBufferStream;
            private InputStream mNetworkStream;
            private long mResponseTime;            public SyncLoadResponseBean(String requestUrl, ConcurrentHashMap<String, SyncLoadResponseBean> syncLoadCache){
                mRequestUrl = requestUrl;
                mSyncLoadCache = syncLoadCache;
                mStatus.set(INIT);
            }            public boolean before(int status){
               return mStatus.get() < status;
            }            public boolean during(int status){
                return mStatus.get() == status;
            }            public boolean after(int status){
                return mStatus.get() >= status;
            }            /**
             * 唤醒所有等待线程
             * 使用 tryLock 避免长时间阻塞
             */
            public void signalAll(){
                synchronized (SyncLoadResponseBean.this) {
                    this.notifyAll();
                }
            }            /**
             * 保存响应数据并预处理
             * 网络线程会在得到响应后调用该方法,保存数据
             */
            public void saveResponse(ResponseBody responseBody, Map<String, String> responseHeader){
                streamReady(responseBody, responseHeader);
                preReadStream();
            }             /**
             * 准备数据流
             */
             private voids treamReady(ResponseBody responseBody, Map<String, String> responseHeader){
                synchronized (SyncLoadResponseBean.this) {
                    TLog.d(TAG, "并行加载 响应保存");
                    mResponseTime = System.currentTimeMillis();
                    mResponseHeader = responseHeader;
                    mBufferStream = new ByteArrayOutputStream();
                    if (responseBody != null) {
                        mNetworkStream = responseBody.byteStream();
                        // 根据流是否有效设置状态
                        if (mNetworkStream != null) {
                            mStatus.set(READY);
                        } else {
                            drop();
                        }
                    } else {
                        drop();
                    }
                    TLog.d(TAG, "并行加载 保存完成 通知消费者");
                    signalAll();
                }
            }            private void preReadStream(){
                TLog.d(TAG, "并行加载 预读缓存 开始");
                byte[] buffer = new byte[4096];
                int num = 0;
                try {
                    while (during(READY)) {
                        synchronized (SyncLoadResponseBean.this) {
                            try {
                                //双重校验锁
                                if (during(READY)) {
                                   // 读取网络流数据
                                   int bytesRead = mNetworkStream.read(buffer);
                                   if (bytesRead == -1) {
                                        TLog.d(TAG, "并行加载 预读缓存 完成 " + bytesRead);
                                        closeStream(mNetworkStream);
                                        mNetworkStream = null;
                                        return;
                                    }
                                    TLog.d(TAG, "并行加载 预读缓存 " + bytesRead);
                                    mBufferStream.write(buffer, 0, bytesRead);
                                }
                            } finally {
                                num++;
                                TLog.d(TAG, "并行加载 预读缓存 第" + num + "次通知消费者");
                                signalAll();
                            }
                        }
                    }
                    //已经提供了数据,则打印个日志看下
                    if (after(OFFER)) {
                        TLog.d(TAG, "并行加载 数据流已提供 预读缓存 关闭");
                    }
                } catch (IOException e) {
                    TLog.e(TAG, "并行加载 预读缓存 异常 关闭", e);
                    synchronized (SyncLoadResponseBean.this) {
                        //在读取的过程中出现了异常,但是这个时候还没有提供数据,就直接drop调
                        if (!after(OFFER)) {
                            drop();
                        }
                    }
                }
            }            /**
             * 获取桥接流
             * 浏览器线程调用该方法获取数据流
             */
             public InputStream getBridgedStream(){
                TLog.d(TAG, "并行加载 查找流数据");
                synchronized (SyncLoadResponseBean.this) {
                  try {
                       if (before(READY)) {
                            TLog.d(TAG, "并行加载 查找流数据 进入等待状态");
                            this.wait(5000);
                            TLog.d(TAG, "并行加载 查找流数据 被唤醒");
                        }
                        // 等待结束,再确认一次状态是否可用
                        if (before(READY)) {
                            TLog.d(TAG, "并行加载 查找流数据 依旧没有可用数据 返回空流");
                            drop();
                            return null;
                        } elseif (after(OFFER)) {
                            TLog.d(TAG, "并行加载 查找流数据 数据已被废弃或者被他人被使用 返回空流");
                            return null;
                        } elseif (isTimeOut()) {
                            TLog.d(TAG, "并行加载 查找流数据 数据超时 返回空流");
                            drop();
                            return null;
                        } else {
                            if (mNetworkStream != null && mBufferStream != null) {
                                mStatus.set(OFFER);
                                // 创建新的桥接流实例,包含已缓存数据和剩余网络流
                                ByteArrayInputStream cachedStream = new ByteArrayInputStream(mBufferStream.toByteArray());
                                TLog.d(TAG, "并行加载 查找流数据 返回桥接流");
                                return new SequenceInputStream(cachedStream, mNetworkStream);
                            } elseif (mNetworkStream != null) {
                                mStatus.set(OFFER);
                                TLog.d(TAG, "并行加载 查找流数据 返回网络流");
                                return mNetworkStream;
                            } elseif (mBufferStream != null) {
                                mStatus.set(OFFER);
                                // 创建新的桥接流实例,包含已缓存数据和剩余网络流
                                TLog.d(TAG, "并行加载 查找流数据 返回缓存流");
                                return new ByteArrayInputStream(mBufferStream.toByteArray());
                            } else {
                                drop();
                                TLog.d(TAG, "并行加载 查找流数据 返回空流");
                                 return null;
                            }
                        }
                    } catch (Exception e) {
                        TLog.e("TAG", "Create bridged stream failed", e);
                        drop();
                        return null;
                    }
                }
            }            /**
             * 获取响应头
             */
             public Map<String, String> getResponseHeader(){
                //如果请求里面不带跨域标识,则带上跨域标识
                if (mResponseHeader != null && !mResponseHeader.containsKey("Access-Control-Allow-Origin")) {
                    mResponseHeader.put("Access-Control-Allow-Origin", "*");
                }
                return mResponseHeader;
            }            /**
             * 统一关闭流资源操作
             */
             private void closeStream(Closeable stream){
if (stream != null) {
try {
                        stream.close();
                    } catch (Exception e) {
                        TLog.e(TAG, "关闭流失败", e);
                    }
                }
            }            /**
             * 判断数据有没有超时
             */
             private boolean isTimeOut(){
                 return Math.abs(mResponseTime - System.currentTimeMillis()) > 5000;
            }            /**
             * 丢弃数据
             */
             public void drop(){
                synchronized (SyncLoadResponseBean.this) {
                    mStatus.set(DROP);
                    mResponseHeader = null;
                    mResponseTime = 0;
                    closeStream(mBufferStream);
                    closeStream(mNetworkStream);
                    mBufferStream = null;
                    mNetworkStream = null;
                    mSyncLoadCache.remove(mRequestUrl);
                    TLog.d(TAG, "并行加速 缓存数据丢弃");
                }
            }
        }        /**
         * 主方法,测试程序入口
         */
          public static void main(String[] args){            SyncLoadResponseBean syncLoadResponseBean = new SyncLoadResponseBean("https://www.baidu.com", new ConcurrentHashMap<>());            //模拟生产者线程
            new Thread(() -> {
                 try {
                    System.out.println("生产者启动");
                    //模拟网络建联
                    sleep(200);
                    //模拟网络资源返回
                    File file = new File("./xxxx.txt");
                    ResponseBody responseBody = new ResponseBody(new FileInputStream(file));
                     //模拟响应
                    syncLoadResponseBean.saveResponse(responseBody, new HashMap<>());
                } catch (FileNotFoundException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    System.out.println("生产线程工作完成,唤醒等待中的消费者");
                    syncLoadResponseBean.signalAll();
                }
            }).start();              //模拟消费者线程1
              new Thread(() -> {                System.out.println("消费者启动");
                //模拟读取
                InputStream stream = syncLoadResponseBean.getBridgedStream();
                System.out.println("消费者 " + stream);
            }).start();        }    }

 

4.4 代码解读

 

代码的工作流程是这样的:

  • 生产者网络线程发起数据请求。

  • 建联后会第一次发起notify,尝试唤醒等待中的消费者webview线程,如果这个时候刚好有消费者webview在等待,还未轮到生产者预读数据,消费者就会拿走数据。

  • 如果没有消费者在等待,就开始预读,预读是一个循环读取的过程,每次循环读取都会有一个加锁->读取->释放锁的过程,保证消费者可以随时打断并拿走读取了一半的数据。

  • 使用synchronized(SyncLoadResponse-

    Bean.this)来实现生产者和消费者线程间协同工作。

  • 使用AtomicInteger来实现状态机的线程间同步。

  • 使用了wait(long timeoutMillis)来实现了等待和锁的超时释放。

  • preReadStream()方法在while循环体内实现了读取时的同步块。

  • 使用SequenceInputStream实现了已读缓冲流与未读网络流的桥接。

  • 在消费者webview等待资源的过程中,生产者会有多个时机让出同步锁,并且发起notify,通知消费者可以消费数据:时机①:生产者完成网络建联,保存了响应体,但还没有进行数据预读的时候。时机②:生产者在网络数据的预读过程中,每次读取4096之后都会通知消费者。

 

在代码的最下面,我们提供了标准Java的main函数,有兴趣的同学可以尝试将代码拷贝到本地,并且运行他,看下效果,我们这边也直接把运行效果贴出来,看下是否符合预期。

    生产者启动
    消费者启动
    并行加载 查找流数据
    并行加载 查找流数据 进入等待状态
    并行加载 响应保存
    并行加载 保存完成 通知消费者
    并行加载 预读缓存 开始
    并行加载 预读缓存 4096
    并行加载 预读缓存 第1次通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第2次通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第3次通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第4次通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第5次通知消费者
    .......
    .......
    .......
    并行加载 预读缓存 4096
    并行加载 预读缓存 第360次通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第361次通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第362次通知消费者
    并行加载 查找流数据 被唤醒
    并行加载 查找流数据 返回桥接流
    并行加载 预读缓存 第363次通知消费者
    并行加载 数据流已提供 预读缓存 关闭
    
    消费者 java.io.SequenceInputStream@696759a5

 

4.5 运行结果解读

这次的test代码运行,工作流程如下:

消费者webview需要数据的时候,生产者的生产任务还没有返回,此时生产者正在进行虚拟的网络建联,因此消费者进入了等待状态,网络建联后,生产者首次尝试唤醒消费者,但正在等待任务的消费者并没有被唤醒,这里已经开始不符合预期了。于是生产者继续进行数据预读,在数据预读的循环体内,每次一个缓冲读完,生产者都会尝试唤醒一次消费者,但是消费者直到第363次notify的时候,才被唤醒,拿到了数据。

 

我们经过多次尝试,发现消费者被唤醒的时机不确定,有时候是在首次唤醒的时候就能够唤醒,有时候要在唤醒第xxx次的时候才能被唤醒,而且次数还是随机的,显然这样延长了消费者等待的时间,不符合我们既定的想法,还需要进一步的优化。

 

五、方案演进:优化同步策略

从上述代码实际运行的现象上看,生产者释放锁,并且唤醒消费者的时候,线程锁并没有交接给消费者,反而又被生产者的预读任务给抢了过来。

 

5.1 锁的公平性

从运行的结果来看,锁的释放和获取并没有符合预期,我们有理由怀疑在线程同步的过程中,有一些线程争抢资源和锁的情况发生,通过查阅java多线程相关资料,我们了解到一个概念:线程锁的公平性。

线程锁的公平性是指多个线程竞争锁时,锁的获取是否按照请求顺序进行分配,同步锁的类型简单来讲可以分为如下两类:

 

非公平锁

  • 特点:允许线程"插队",新请求锁的线程可能比等待队列中的线程先获取锁

  • 优点:更高的吞吐量,减少线程切换开销

  • 缺点:可能导致某些线程长时间等待(饥饿)

 

公平锁

  • 特点:严格按照FIFO(先进先出)顺序分配锁

  • 优点:避免线程饥饿,行为更可预测

  • 缺点:性能较低,因为需要维护队列顺序

Java内置的synchronized锁就是是非公平锁,wait和notify 也是基于synchronized来实现的。

在上述的示例代码中,我们使用的就是非公平锁,导致线程之间出现资源抢占,发生了不符合预期的情况。

 

适合使用公平锁的场景

  • 严格的顺序要求:当线程执行顺序对业务逻辑至关重要时

  • 避免饥饿:当需要确保所有线程都有机会执行时

虽然公平锁会牺牲同步性能,但是在当前业务中,我们是希望消费者能够尽快的获得数据的,所以我们应该选择使用公平锁来实现同步,在java中要实现公平锁,就必须使用ReentrantLock,如果要实现公平锁的等待,就要使用Condition,我们使用ReentrantLock和Condition来修改代码,对代码中与同步锁相关的逻辑进行重构。

 

公平锁的使用方法

使用公平锁时,无法像使用synchronized关键字一样直接加在方法头上,而是需要手动获得锁和释放锁,示例代码如下:

private final ReentrantLock mLock = new ReentrantLock(true); // 公平锁public void test(){
    //业务代码执行前获得锁
    mLock.lock();  
    try {
        //实际执行的业务代码
        TLog.d(TAG, "此处执行代码");  
    } finally {  
        //业务代码执行完成后释放锁
        mLock.unlock();  
    }  
}

 

如果需要手动使线程处于等待状态,则触发等待和唤醒的示例代码如下:

private final ReentrantLock mLock = new ReentrantLock(true); // 公平锁
private final Condition mCondition \= mLock.newCondition(); // 条件变量/**  
 * 唤醒示例
 * 
 */
public void signal(){  
    if (mLock.tryLock()) {  
        try {  
            mCondition.signal();  
        } finally {  
            mLock.unlock();  
        }  
    }  
}/**  
 * 等待示例  
 * 等待五秒钟后主动释放
 */  
public void test(){  
    mLock.lock();  
    try {  
        TLog.d(TAG, "实际业务代码执行");  
        mCondition.await(5, TimeUnit.SECONDS);  
        TLog.d(TAG, "实际业务代码执行");  
    } finally {  
        mLock.unlock();  
    }  
}

 

这里的疑问,为什么lock并且await的时候,其他线程依然可以获得锁并发起signal?

是因为condition.await() 的原子操作,当线程调用 condition.await() 时,会自动释放锁,然后进入等待状态,这是原子性操作,保证在进入等待状态前一定会释放锁。

其他线程可以获取锁,是因为原线程已经在 condition.await() 时释放了锁,其他线程在调用 condition.signal() 时必须持有锁,当等待中的线程被 condition.signal() 唤醒后,会重新尝试获取锁,获取锁成功后才会从 condition.await() 方法返回。

 

5.2 公平锁代码编写

代码如下

    import java.io.*;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    import static java.lang.Thread.sleep;
    
    
    public class Main {
    
        public static class ResponseBody {
            InputStream stream;
    
            public ResponseBody(InputStream stream){
                this.stream = stream;
            }
    
            public InputStream byteStream(){
                return stream;
            }
        }
    
    
        public static class SyncLoadResponseBean {
    
            private static final String TAG = "SyncLoadResponseBean";
    
            // 状态常量定义
            public static final int INIT = 1;  // 初始状态
            public static final int READY = 2; // 数据准备就绪
            public static final int OFFER = 3; // 数据已提供
            public static finaint DROP = 4;  // 数据已丢弃
    
            private final String mRequestUrl;
            private final ConcurrentHashMap<String, SyncLoadResponseBean> mSyncLoadCache;
            private final ReentrantLock mLock = new ReentrantLock(true); // 公平锁
            private final Condition mCondition = mLock.newCondition();   // 条件变量
            private final AtomicInteger mStatus = new AtomicInteger(INIT); // 状态机
    
            // 数据存储相关
            private Map<String, String> mResponseHeader;
            private ByteArrayOutputStream mBufferStream;
            private InputStream mNetworkStream;
            private long mResponseTime;
    
    
            public SyncLoadResponseBean(String requestUrl, ConcurrentHashMap<String, SyncLoadResponseBean> syncLoadCache){
                mRequestUrl = requestUrl;
                mSyncLoadCache = syncLoadCache;
                mStatus.set(INIT);
            }
    
            public boolean before(int status){
                return mStatus.get() < status;
            }
    
            public boolean during(int status){
                return mStatus.get() == status;
            }
    
            public boolean after(int status){
                return mStatus.get() >= status;
            }
    
            /**
             * 唤醒所有等待线程
             * 使用 tryLock 避免长时间阻塞
             */
            public void signalAll(){
                if (mLock.tryLock()) {
                    try {
                        mCondition.signalAll();
                    } finally {
                        mLock.unlock();
                    }
                }
            }
    
            /**
             * 保存响应数据并预处理
             * 网络线程会在得到响应后调用该方法,保存数据
             */
            public void saveResponse(ResponseBody responseBody, Map<String, String> responseHeader){
                streamReady(responseBody, responseHeader);
                preReadStream();
            }
    
            /**
             * 准备数据流
             */
            private void streamReady(ResponseBody responseBody, Map<String, String> responseHeader){
                mLock.lock();
                try {
                    TLog.d(TAG, "并行加载 响应保存");
                    mResponseTime = System.currentTimeMillis();
                    mResponseHeader = responseHeader;
                    mBufferStream = new ByteArrayOutputStream();
                    if (responseBody != null) {
                        mNetworkStream = responseBody.byteStream();
                        // 根据流是否有效设置状态
                        if (mNetworkStream != null) {
                            mStatus.set(READY);
                        } else {
                            drop();
                        }
                    } else {
                        drop();
                    }
                } finally {
                    TLog.d(TAG, "并行加载 保存完成 通知消费者");
                    mCondition.signalAll();
                    mLock.unlock();
                }
            }
    
    
            private void preReadStream(){
                byte[] buffer = new byte[4096];
                int num = 0;
                try {
                    while (during(READY)) {
                        mLock.lock();
                        try {
                            //双重校验锁
                            if (during(READY)) {
                                // 读取网络流数据
                                int bytesRead = mNetworkStream.read(buffer);
                                if (bytesRead == -1) {
                                    TLog.d(TAG, "并行加载 预读缓存 完成 " + bytesRead);
                                    closeStream(mNetworkStream);
                                    mNetworkStream = null;
                                    break;
                                }
                                TLog.d(TAG, "并行加载 预读缓存 " + bytesRead);
                                mBufferStream.write(buffer, 0, bytesRead);
                            }
                        } finally {
                            num++;
                            TLog.d(TAG, "并行加载 预读缓存 第" + num + "次通知消费者");
                            mCondition.signalAll();
                            mLock.unlock();
                        }
                    }
                    //已经提供了数据,则打印个日志看下
                    if (after(OFFER)) {
                        TLog.d(TAG, "并行加载 数据流已提供 预读缓存 关闭");
                    }
                } catch (IOException e) {
                    TLog.e(TAG, "并行加载 预读缓存 异常 关闭", e);
                    mLock.lock();
                    try {
                        //在读取的过程中出现了异常,但是这个时候还没有提供数据,就直接drop调
                        if (!after(OFFER)) {
                            drop();
                        }
                    } finally {
                        mLock.unlock();
                    }
                }
            }
    
            /**
             * 获取桥接流
             * 浏览器线程调用该方法获取数据流
             */
            public InputStream getBridgedStream(){
                mLock.lock();
                TLog.d(TAG, "并行加载 查找流数据");
                try {
                    if (before(READY)) {
                        long time1 = System.currentTimeMillis();
                        TLog.d(TAG, "并行加载 查找流数据 进入等待状态");
                        mCondition.await(5, TimeUnit.SECONDS);
                        long time2 = System.currentTimeMillis();
                        TLog.d(TAG, "并行加载 查找流数据 被唤醒 等待时长:" + (time2 - time1));
                    }
                    // 等待结束,再确认一次状态是否可用
                    if (before(READY)) {
                        TLog.d(TAG, "并行加载 查找流数据 依旧没有可用数据 返回空流");
                        drop();
                        return null;
                    } elseif (after(OFFER)) {
                        TLog.d(TAG, "并行加载 查找流数据 数据已被废弃或者被他人被使用 返回空流");
                        return null;
                    } elseif (isTimeOut()) {
                        TLog.d(TAG, "并行加载 查找流数据 数据超时 返回空流");
                        drop();
                        return null;
                    } else {
                        if (mNetworkStream != null && mBufferStream != null) {
                            mStatus.set(OFFER);
                            // 创建新的桥接流实例,包含已缓存数据和剩余网络流
                            ByteArrayInputStream cachedStream = new ByteArrayInputStream(mBufferStream.toByteArray());
                            TLog.d(TAG, "并行加载 查找流数据 返回桥接流");
                            returnnew SequenceInputStream(cachedStream, mNetworkStream);
                        } elseif (mNetworkStream != null) {
                            mStatus.set(OFFER);
                            TLog.d(TAG, "并行加载 查找流数据 返回网络流");
                            return mNetworkStream;
                        } elseif (mBufferStream != null) {
                            mStatus.set(OFFER);
                            // 创建新的桥接流实例,包含已缓存数据和剩余网络流
                            TLog.d(TAG, "并行加载 查找流数据 返回缓存流");
                            returnnew ByteArrayInputStream(mBufferStream.toByteArray());
                        } else {
                            drop();
                            TLog.d(TAG, "并行加载 查找流数据 返回空流");
                            return null;
                        }
                    }
                } catch (Exception e) {
                    TLog.e("TAG", "Create bridged stream failed", e);
                    drop();
                    return null;
                } finally {
                    mLock.unlock();
                }
            }
    
            /**
             * 获取响应头(线程安全)
             */
            public Map<String, String> getResponseHeader(){
                mLock.lock();
                try {
                    //如果请求里面不带跨域标识,则带上跨域标识
                    if (mResponseHeader != null && !mResponseHeader.containsKey("Access-Control-Allow-Origin")) {
                        mResponseHeader.put("Access-Control-Allow-Origin", "*");
                    }
                    return mResponseHeader;
                } finally {
                    mLock.unlock();
                }
            }
    
    
            /**
             * 统一关闭流资源操作
             */
            private void closeStream(Closeable stream){
                if (stream != null) {
                    try {
                        stream.close();
                    } catch (Exception e) {
                        TLog.e(TAG, "关闭流失败", e);
                    }
                }
            }
    
    
            /**
             * 判断数据有没有超时
             */
            private boolean isTimeOut(){
                return Math.abs(mResponseTime - System.currentTimeMillis()) > 5000;
            }
    
            /**
             * 丢弃数据
             */
            public void drop(){
                mLock.lock();
                try {
                    mStatus.set(DROP);
                    mResponseHeader = null;
                    mResponseTime = 0;
                    closeStream(mBufferStream);
                    closeStream(mNetworkStream);
                    mBufferStream = null;
                    mNetworkStream = null;
                    mSyncLoadCache.remove(mRequestUrl);
                    TLog.d(TAG, "并行加速 缓存数据丢弃");
                } finally {
                    mLock.unlock();
                }
            }
    
        }
    
    
        /**
         * 主方法,程序入口
         */
        public static void main(String[] args){
    
            SyncLoadResponseBean syncLoadResponseBean = new SyncLoadResponseBean("https://www.baidu.com", new ConcurrentHashMap<>());
    
            //模拟生产者线程
            new Thread(() -> {
                try {
                    System.out.println("生产者启动");
                    //模拟网络建联
                    sleep(200);
                    //模拟网络资源返回
                    File file = new File("./xxxx.txt");
                    ResponseBody responseBody = new ResponseBody(new FileInputStream(file));
                    //模拟响应
                    syncLoadResponseBean.saveResponse(responseBody, new HashMap<>());
                } catch (FileNotFoundException e) {
                    thrownew RuntimeException(e);
                } catch (InterruptedException e) {
                    thrownew RuntimeException(e);
                } finally {
                    System.out.println("生产线程工作完成,唤醒等待中的消费者");
                    syncLoadResponseBean.signalAll();
                }
            }).start();
    
            //模拟消费者线程1
            new Thread(() -> {
                System.out.println("消费者启动");
                //模拟读取
                InputStream stream = syncLoadResponseBean.getBridgedStream();
                System.out.println("消费者 " + stream);
            }).start();
    
        }
    
    }

 

5.3 公平锁代码解读

  • 使用ReentrantLock实现公平锁

  • 使用Condition实现线程等待

    生产者启动
    消费者启动
    并行加载 查找流数据
    并行加载 查找流数据 进入等待状态
    并行加载 响应保存
    并行加载 保存完成 通知消费者
    并行加载 查找流数据 被唤醒 等待时长:203
    并行加载 查找流数据 返回桥接流
    消费者 java.io.SequenceInputStream@4e8de630
    并行加载 预读缓存 第1次通知消费者
    并行加载 数据流已提供 预读缓存 关闭
    生产线程工作完成,唤醒等待中的消费者
    进程已结束,退出代码为 0

 

可以看到在建联完成,保存响应的时候,首次通知消费者,消费者就能够准确的被唤醒。

再通过调整测试代码sleep的时间模拟数据正在被预读中,消费者打断预读的场景,消费者依然能够被准确的唤醒。

    生产者启动
    消费者启动
    并行加载 响应保存
    并行加载 保存完成 通知消费者
    并行加载 预读缓存 4096
    并行加载 预读缓存 第1次通知消费者
    并行加载 查找流数据
    并行加载 查找流数据 返回桥接流
    并行加载 预读缓存 第2次通知消费者
    并行加载 数据流已提供 预读缓存 关闭
    生产线程工作完成,唤醒等待中的消费者
    消费者 java.io.SequenceInputStream@5a40ed62
    进程已结束,退出代码为 0

 

整体代码运行符合预期。

 

5.4 桥接流代码解读

在上述代码中,我们多次提到桥接流的概念,桥接流顾名思义,就是将多个数据流接起来,可以让读流的程序按照顺序先读第一个流,读完第一个再读第二个流,以此类推,这里我们直接使用了java官方的一个类来实现流的桥接。

SequenceInputStream是 Java I/O 类库中的一个输入流类,它允许你将多个输入流按顺序连接起来,形成一个逻辑上的连续输入流。

SequenceInputStream的主要特点

  • 顺序读取:它会按顺序读取多个输入流,先读取第一个流的所有内容,然后自动切换到第二个流,依此类推。

  • 流合并:将多个独立的输入流合并为一个连续的输入流。

  • 自动切换:当一个流读取完毕时,会自动切换到下一个流。

  • 自动关闭:SequenceInputStream 关闭时会自动关闭所有包含的输入流

 

SequenceInputStream的使用场景

当有多个文件需要按顺序读取,但希望像读取单个文件一样处理时,合并多个来源的数据流,需要将多个输入源串联起来处理,我们可以很方便的把一个或者多个流串联起来按顺序读取,通过这个特性,我们就可以实现缓存流和网络流的无缝桥接。

 

六、方案对比与总结

 

公平锁替代非公平锁

  • 问题:synchronized(非公平锁)导致生产者可能重新抢占锁。

  • 解决方案:用ReentrantLock(公平锁)+Condition实现顺序获取。

 

桥接流技术

  • 用SequenceInputStream合并已缓冲流(ByteArrayInputStream)与网络流。

  • 实现无缝衔接,无需等待全量数据。

 

半缓冲机制

  • 并行任务每次读取 4096 字节后释放锁,允许消费者打断。

  • 可以平衡预读效率与内存占用。

 

七、写在最后

回顾上述方案,采用同步锁实现缓冲过程的打断,使用SequenceInputStream实现桥接流,预读过程的打断是通过流读取循环体内的公平锁来实现的,相比最初的循环等待,数据超时废弃的模式,新方案实现了网络流和缓存流的无缝切换,整合了并行请求的资源,充分利用了页面启动时间。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/986565.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年汽车托运物流平台口碑排行榜TOP10,异地专业的汽车托运物流推荐榜单精选优质厂家

随着汽车消费市场的持续活跃与跨区域流动的日益频繁,汽车托运服务已成为连接车主与目的地的重要桥梁。面对市场上众多的汽车托运平台,如何选择一家安全、可靠、高效的服务商,成为广大车主关注的焦点。本文基于公开市…

2025国内最好的出国留学中介公司

2025国内最好的出国留学中介公司一、如何选择2025年的出国留学中介许多学生和家长在搜索“2025国内最好的出国留学中介公司”时,核心关注点在于如何找到可靠、透明且结果有保障的服务机构。根据《2025中国留学中介行业…

基于 STM32 的老人摔倒报警装置项目【源码分享】

基于 STM32 的老人摔倒报警装置(短信提醒)【源码分享】 一、项目背景 在我国逐步迈入老龄化社会的背景下,独居老人摔倒事故频发且救援不及时的情况越来越受到关注。摔倒后无法及时呼救,是老年人伤亡的重要原因之一…

2025出国留学中介哪个最好

2025出国留学中介哪个最好一、2025年如何选择留学中介作为从事国际教育规划工作超过十年的资深规划师,我经常被学生和家长询问:“2025年出国留学,哪个中介机构最值得信赖?”这个问题没有绝对答案,因为留学中介的选…

2025年12月东莞律师服务排名:基于真实数据与用户反馈的详细解析

在东莞这座制造业与创新经济交织的城市,企业及个人面临的法律需求日益复杂。无论是初创企业的股权架构设计,还是重大商事纠纷的化解,选择一位专业匹配、经验丰富的本地律师至关重要。本文基于国家司法部门公开的律师…

2025成都出国留学中介公司有哪些

2025成都出国留学中介公司有哪些一、2025年成都出国留学中介公司概览作为从业12年的国际教育规划师,我经常遇到成都学生和家长询问:“2025年成都地区有哪些可靠的出国留学中介?”根据《2025中国留学服务行业白皮书》…

2025北京留学中介机构哪个好

2025北京留学中介机构哪个好一、2025北京留学中介机构哪个好?作为从业15年的国际教育规划师,我经常被学生和家长问及北京留学中介机构的选择问题。2025年,随着留学政策趋于稳定和人工智能技术的普及,留学申请更注重…

2025年越野高尔夫球车供货厂家权威推荐榜单:4座高尔夫球车‌/定制高尔夫球车‌/电动高尔夫球车‌源头厂家精选

随着户外休闲、景区观光及高端营地等场景的多元化发展,传统的场地高尔夫球车正逐步向具备更强通过性、适应性与功能性的越野型高尔夫球车演变。这类车辆不仅需要满足平坦草坪的舒适行驶,更需具备应对沙地、泥泞缓坡等…

2025澳大利亚留学中介前十名

2025澳大利亚留学中介前十名一、如何选择2025年澳大利亚留学中介:高频问题解答许多计划赴澳大利亚留学的学生和家长在搜索引擎上常询问:"2025年澳大利亚留学中介哪家靠谱?"、"如何辨别中介机构的专业…

2025年领先品牌认证机构推荐:哪家性价比最优?深度实测与案例验证分析

随着品牌经济时代的到来,企业对于第三方认证的需求呈现爆发式增长。根据中国品牌研究院2025年发布的数据显示,领先品牌认证服务市场规模已突破百亿元,年复合增长率达到18.3%。然而市场快速发展的同时,认证机构水平…

详细介绍:相平面控制:深入解析一种经典的非线性控制系统设计方法

详细介绍:相平面控制:深入解析一种经典的非线性控制系统设计方法pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "…

2025年销量领先认证机构推荐:哪家性价比更高?权威数据与案例比对

随着市场竞争日益激烈,企业对于证明自身市场地位的需求不断增长。根据中国广告协会2025年发布的行业报告显示,超过80%的企业在营销传播中需要第三方数据背书,其中销量领先认证成为最受关注的认证类型之一。然而,企…

2025年,国内外最火的10款降AI率工具亲测!(持续更新)

​ 写论文最怕什么?不是查重,而是那句——“AI率过高”。 现在越来越多学校开始严查论文降aigc报告。我当时AI率高达98%,整个人快崩溃。 为了救回这篇论文,我实测了市面上十几款降ai率工具,从中精选了这12款,有免…

BindingSource绑定

1 BindingSource bindingSource = new BindingSource(); 2 3 List<MyClass> list=new List<MyClass>(); 4 5 bindingSource.DataSource=list; 6 7 //刷新 8 9 bindingSource.ResetBindings(true);

2025年晃香油机器直销厂家权威推荐榜单:水代法晃油机‌/香油墩油机‌/香油晃油机‌源头厂家精选

在传统香油(芝麻油)生产领域,“水代法”工艺以其物理取油、无化学添加的特点,能最大程度保留芝麻的原香与营养,备受市场青睐。其中,作为核心设备的晃(墩)香油机,其性能直接决定了出油率、香气浓郁度与生产效率…

完整教程:SpringAI1.0.1实战教程:避坑指南25年8月最新版

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年导热油加热器厂家实力推荐,看看哪家品牌的质量好

为帮工业企业高效锁定适配生产需求的导热油加热器供应商,避免选型走弯路导致设备故障、能耗超标等问题,我们从产品质量稳定性(如加热均匀性、运行故障率)、定制化能力(含非标场景适配、快速出样效率)、全周期服务…

jenkins上执行某个python代码,日志没有打印,如何处理

在Jenkins中执行Python脚本时日志不显示,通常是因为Jenkins的默认行为中断了进程和输出管道。你可以按照以下流程进行排查和解决。 主要问题解决方案详解 1. 防止Jenkins终止后台进程 这是最常见的原因。Jenkins为每…

沉浸式雨天海岸:用A-Frame打造WebXR互动场景 - 实践

沉浸式雨天海岸:用A-Frame打造WebXR互动场景 - 实践2025-12-04 10:46 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; dis…

游记:NOIP2025 游记

炸完了。lyc:你 T2 复杂度多少。 我:\(\mathcal O(\sum n ^ 2)\) 啊? lyc: (短暂思考)是可以优化到。 我:你不是直接范德蒙德卷积吗,然后……不对我怎么多个 \(\log\)??ei 貌似我开始时想到了但后面忘了这回事…