Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见

  • 一、相关API的handler
    • 1、接受HTTP请求的hander(RestRefreshAction)
    • 2、往数据节点发送刷新请求的action(TransportRefreshAction)
    • 3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)
  • 二、在IndexShard执行refresh操作
    • 1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据
      • (1)、maybeRefresh和maybeRefreshBlocking的简单介绍
  • 三、lucene源码中执行逻辑
    • 1、判断是否需要刷新

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是refresh命令把ES写入索引缓冲区的数据刷进Lucene,使数据可供查询,搜索,否则,在索引缓冲区是不可见的,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把索引缓冲区的数据刷进Lucene的,主要是下面左中部分refresh部分

在这里插入图片描述

其他部分源码梳理
1、主节点同时写入ES缓冲区和translog这一部分,请看Elasticsearch 8.9 Bulk批量给索引增加数据源码
2、下半边fsync的源码逻辑,请看Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler

ActionModule.java

 registerHandler.accept(new RestRefreshAction());
 actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
 actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

1、接受HTTP请求的hander(RestRefreshAction)

public class RestRefreshAction extends BaseRestHandler {

    @Override
    public List<Route> routes() {
        return List.of(
            new Route(GET, "/_refresh"),
            new Route(POST, "/_refresh"),
            new Route(GET, "/{index}/_refresh"),
            new Route(POST, "/{index}/_refresh")
        );
    }

    @Override
    public String getName() {
        return "refresh_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));
        refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));
        return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {
            @Override
            protected RestStatus getStatus(RefreshResponse response) {
                return response.getStatus();
            }
        });
    }
}

client.admin().indices().refresh()会执行到下面的父类TransportBroadcastReplicationActiondoExecute方法

2、往数据节点发送刷新请求的action(TransportRefreshAction)

public class TransportRefreshAction extends TransportBroadcastReplicationAction<
    RefreshRequest,
    RefreshResponse,
    BasicReplicationRequest,
    ReplicationResponse> {

    @Inject
    public TransportRefreshAction(
        ClusterService clusterService,
        TransportService transportService,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver,
        NodeClient client
    ) {
        super(
            RefreshAction.NAME,
            RefreshRequest::new,
            clusterService,
            transportService,
            client,
            actionFilters,
            indexNameExpressionResolver,
            TransportShardRefreshAction.TYPE,
            ThreadPool.Names.REFRESH
        );
    }

   //省略代码
}
public abstract class TransportBroadcastReplicationAction<
    Request extends BroadcastRequest<Request>,
    Response extends BaseBroadcastResponse,
    ShardRequest extends ReplicationRequest<ShardRequest>,
    ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {
 @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));
    }

    private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {
        return new CheckedConsumer<ActionListener<Response>, Exception>() {

            private int totalShardCopyCount;
            private int successShardCopyCount;
            private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();

            @Override
            public void accept(ActionListener<Response> listener) {
                assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";

                final ClusterState clusterState = clusterService.state();
                final List<ShardId> shards = shards(request, clusterState);
                final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();

                try (var refs = new RefCountingRunnable(() -> finish(listener))) {
                //遍历所有的分片
                    for (final ShardId shardId : shards) {
                        // NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?
                        shardExecute(
                            task,
                            request,
                            shardId,
                            ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire())
                        );
                    }
                }
            }
        };
    }

    protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
        assert Transports.assertNotTransportThread("may hit all the shards");
        ShardRequest shardRequest = newShardRequest(request, shardId);
        shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
        client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);
    }

}    

3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)

public class TransportShardRefreshAction extends TransportReplicationAction<
    BasicReplicationRequest,
    ShardRefreshReplicaRequest,
    ReplicationResponse> {

    private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);

    public static final String NAME = RefreshAction.NAME + "[s]";
    public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);
    public static final String SOURCE_API = "api";

    @Inject
    public TransportShardRefreshAction(
        Settings settings,
        TransportService transportService,
        ClusterService clusterService,
        IndicesService indicesService,
        ThreadPool threadPool,
        ShardStateAction shardStateAction,
        ActionFilters actionFilters
    ) {
        super(
            settings,
            NAME,
            transportService,
            clusterService,
            indicesService,
            threadPool,
            shardStateAction,
            actionFilters,
            BasicReplicationRequest::new,
            ShardRefreshReplicaRequest::new,
            ThreadPool.Names.REFRESH
        );
        // registers the unpromotable version of shard refresh action
        new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);
    }

  
    @Override
    protected void shardOperationOnPrimary(
        BasicReplicationRequest shardRequest,
        IndexShard primary,
        ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
    ) {
        primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {
            ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);
            replicaRequest.setParentTask(shardRequest.getParentTask());
            logger.trace("{} refresh request executed on primary", primary.shardId());
            l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));
        }));
    }
}    

primary.externalRefresh执行分片的刷新

二、在IndexShard执行refresh操作

 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
        verifyNotClosed();
        getEngine().externalRefresh(source, listener);
    }
 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
        ActionListener.completeWith(listener, () -> {
            logger.trace("external refresh with source [{}]", source);
            return refresh(source);
        });
    }   

getEngine()的实现是InternalEngine

  @Override
    public RefreshResult refresh(String source) throws EngineException {
        return refresh(source, SearcherScope.EXTERNAL, true);
    }

1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据

   protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
        //这两种刷新类型都会导致内部刷新,但只有外部刷新类型也会将新的读取器引用传递给外部读取器管理器。
        //获取当前的本地检查点。
        final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
        boolean refreshed;
        long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
        try {
            //refresh 不需要按住 readLock,因为如果引擎在中途关闭,ReferenceManager 可以正确处理。
            if (store.tryIncRef()) {
                try {
                    //尽管我们保留了 2 managers,但我们实际上只做过一次繁重的工作。第二次刷新只会做我们必须做的额外工作,以预热缓存等。
                    ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
                    long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
                    //根据参数决定是进行阻塞刷新还是非阻塞刷新
                    if (block) { 
                        //刷新可能会导致阻塞
                        referenceManager.maybeRefreshBlocking();
                        refreshed = true;
                    } else {
                    	//刷新不会导致阻塞
                        refreshed = referenceManager.maybeRefresh();
                    }
                    //如果刷新成功,获取当前的读取器,并更新段的生成号
                    if (refreshed) {
                    	//获取当前的目录
                        final ElasticsearchDirectoryReader current = referenceManager.acquire();
                        try {
                            //更新segment信息
                            segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);
                        } finally {
                            referenceManager.release(current);
                        }
                    }
                } finally {
                    store.decRef();
                }
                if (refreshed) {
                    lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
                }
            } else {
                refreshed = false;
            }
        } catch (AlreadyClosedException e) {
            failOnTragicEvent(e);
            throw e;
        } catch (Exception e) {
            try {
                failEngine("refresh failed source[" + source + "]", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw new RefreshFailedEngineException(shardId, e);
        }
        assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
            : "refresh checkpoint was not advanced; "
                + "local_checkpoint="
                + localCheckpointBeforeRefresh
                + " refresh_checkpoint="
                + lastRefreshedCheckpoint();
        // TODO: maybe we should just put a scheduled job in threadPool?
        // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
        // for a long time:
        maybePruneDeletes();
        mergeScheduler.refreshConfig();
        return new RefreshResult(refreshed, segmentGeneration);
    }

其中referenceManager 根据入参是 SearcherScope.EXTERNAL 获得的实现是ExternalReaderManager

    private final ExternalReaderManager externalReaderManager;
  @Override
    protected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {
        return switch (scope) {
            case INTERNAL -> internalReaderManager;
            case EXTERNAL -> externalReaderManager;
        };
    }

根据入参中的block=true 实际执行的是referenceManager.maybeRefreshBlocking(); 来刷新,是异步非阻塞的,
并且根据下图ExternalReaderManager继承了ReferenceManager,所以没有重写maybeRefreshBlocking 所以执行的是父类ReferenceManager

import org.apache.lucene.search.ReferenceManager;

 @SuppressForbidden(reason = "reference counting is required here")
    private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
       
        @Override
        protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
           //省略代码
        }

        @Override
        protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
            return reference.tryIncRef();
        }

        @Override
        protected int getRefCount(ElasticsearchDirectoryReader reference) {
            return reference.getRefCount();
        }

        @Override
        protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
            reference.decRef();
        }
    }

(1)、maybeRefresh和maybeRefreshBlocking的简单介绍

下面是lucene源码中关于这两个API的实现,

//这个是会尝试获取刷新锁,如果没有则不执行刷新操作
  public final boolean maybeRefresh() throws IOException {
        this.ensureOpen();
        boolean doTryRefresh = this.refreshLock.tryLock();
        if (doTryRefresh) {
            try {
                this.doMaybeRefresh();
            } finally {
                this.refreshLock.unlock();
            }
        }

        return doTryRefresh;
    }
	//这里会等待获取刷新锁,所以会阻塞
    public final void maybeRefreshBlocking() throws IOException {
        this.ensureOpen();
        this.refreshLock.lock();

        try {
            this.doMaybeRefresh();
        } finally {
            this.refreshLock.unlock();
        }

    }

但是实际上最后执行刷新还是执行的this.doMaybeRefresh() 方法

三、lucene源码中执行逻辑

private void doMaybeRefresh() throws IOException {
        this.refreshLock.lock();
        boolean refreshed = false;

        try {
            Object reference = this.acquire();

            try {
            	//通知刷新监听器。
                this.notifyRefreshListenersBefore();
                //调用 refreshIfNeeded(reference) 返回一个新的引用 (newReference)
                //用来判断是否需要刷新,如果不需要刷新,refreshIfNeeded 应返回 null
                G newReference = this.refreshIfNeeded(reference);
                if (newReference != null) {
                    assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
                    try {
                    //调用 swapReference(newReference) 方法来交换旧的引用为新的引用。
                        this.swapReference(newReference);
                     //设置 refreshed 为 true 表示刷新成功。   
                        refreshed = true;
                    } finally {
                    //如果刷新失败,释放新的引用
                        if (!refreshed) {
                            this.release(newReference);
                        }
                    }
                }
            } finally {
                //释放旧的引用
                this.release(reference);
                //通知刷新监听器刷新完成
                this.notifyRefreshListenersRefreshed(refreshed);
            }

            this.afterMaybeRefresh();
        } finally {
        	//最后释放刷新锁
            this.refreshLock.unlock();
        }

    }

1、判断是否需要刷新

其中refreshIfNeeded用的是子类ExternalReaderManager的实现方法

private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
 		@Override
        protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
            internalReaderManager.maybeRefreshBlocking();
             //获取其reader对象。
            final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
            //isWarmedUp为false或者获取到的新reader对象与传入的referenceToRefresh对象不相等,说明需要刷新
            if (isWarmedUp == false || newReader != referenceToRefresh) {
                boolean success = false;
                try {
                    refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
                    isWarmedUp = true;
                    success = true;
                } finally {
                    if (success == false) {
                        internalReaderManager.release(newReader);
                    }
                }
            }
            //没有任何变化 - 两个 ref 管理器共享同一个实例,因此我们可以使用引用相等性,不需要执行刷新操作
            if (referenceToRefresh == newReader) {
                internalReaderManager.release(newReader);
                return null;
            } else {
                return newReader; // steal the reference
            }
        }
}        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/234715.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

什么是神经网络的非线性

大家好啊&#xff0c;我是董董灿。 最近在写《计算机视觉入门与调优》&#xff08;右键&#xff0c;在新窗口中打开链接&#xff09;的小册&#xff0c;其中一部分说到激活函数的时候&#xff0c;谈到了神经网络的非线性问题。 今天就一起来看看&#xff0c;为什么神经网络需…

亚马逊云科技re_Invent 2023产品体验:亚马逊云科技产品应用实践 国赛选手带你看Elasticache Serverless

抛砖引玉 讲一下作者背景&#xff0c;曾经参加过国内世界技能大赛云计算的选拔&#xff0c;那么在竞赛中包含两类&#xff0c;一类是架构类竞赛&#xff0c;另一类就是TroubleShooting竞赛&#xff0c;对应的分别为AWS GameDay和AWS Jam&#xff0c;想必也有朋友玩过此类竞赛&…

RTMP流设置超时时间失败

使用FFmpeg(版本是5.0.3&#xff09;将rtmp流作为输入&#xff0c;设置超时时间&#xff08;使用-timeout参数&#xff09;&#xff0c;结果报错&#xff1a;Cannot open Connection tcp://XXX:1935?listen&listen_timeout 通过./ffmpeg -help full 命令查看FFmpeg帮助&am…

【论文笔记】Gemini: A Family of Highly Capable Multimodal Models——细看Gemini

Gemini 【一句话总结&#xff0c;对标GPT4&#xff0c;模型还是transformer的docoder部分&#xff0c;提出三个不同版本的Gemini模型&#xff0c;Ultra的最牛逼&#xff0c;Nano的可以用在手机上。】 谷歌提出了一个新系列多模态模型——Gemini家族模型&#xff0c;包括Ultra…

jenkins设置中文

安装以下两个插件 Locale plugin Localization: Chinese (Simplified) 在jenkins的system配置中找到locale配置项 在locale配置项的默认语言中填入以下内容保存 zh_CN 重启jenkins即可

Gitzip插件【Github免翻下载】

今天给大家推荐一个github下载的插件&#xff0c;平常大家下载应该无外乎就是以下两种&#xff1a; Download zip利用git clone 但是这两种各有各的弊端&#xff0c;前者一般需要科学上网才可以&#xff0c;后者下载不稳定经常中途断掉。 今天给推荐一个款浏览器插件-Gitzip.大…

uniApp应用软件在运行时,不符合华为应用市场审核标准。解决方案合集!

&#xff08;暂时用不到的也建议收藏一下&#xff0c;因为文章持续更新中&#xff09; 最新更改时间&#xff1a;20023-12-10 第一次做App应用开发相信大家一定都遇到过华为应用市场审核的“驳回”&#xff01; 有些问题一看就明白可以立马修改&#xff0c;而有一些问题修改意…

【计算机网络基础1】网络层次划分和OSI七层网络模型

1、网络层次划分 为了使不同计算机厂家生产的计算机能够相互通信&#xff0c;以便在更大的范围内建立计算机网络&#xff0c;国际标准化组织&#xff08;ISO&#xff09;在1978年提出了"开放系统互联参考模型"&#xff0c;即著名的OSI/RM模型&#xff08;Open Syste…

软件工程考试复习

第一章、软件工程概述 &#x1f31f;软件程序数据文档&#xff08;考点&#xff09; &#x1f31f;计算机程序及其说明程序的各种文档称为 &#xff08; 文件 &#xff09; 。计算任务的处理对象和处理规则的描述称为 &#xff08; 程序 &#xff09;。有关计算机程序功能、…

C语言 内联函数 + 递归函数

函数分类 内联函数 1&#xff09;内联函数在编译时将函数的代码直接插入到调用它的地方&#xff0c;而不是通过函数调用的方式执行&#xff0c;从而减少了函数调用的开销&#xff0c;提高了代码的执行速度 2&#xff09;使用 inline 关键字来声明 3&#xff09;将函数声明为内联…

分层网络模型(OSI、TCP/IP)及对应的网络协议

OSI七层网络模型 OSI&#xff08;Open System Interconnect&#xff09;&#xff0c;即开放式系统互连参考模型&#xff0c; 一般都叫OSI参考模型&#xff0c;是ISO组织于1985年研究的网络互连模型。OSI是分层的体系结构&#xff0c;每一层是一个模块&#xff0c;用于完成某种功…

pytorch一致数据增强

分割任务对 image 做&#xff08;某些&#xff09;transform 时&#xff0c;要对 label&#xff08;segmentation mask&#xff09;也做对应的 transform&#xff0c;如 Resize、RandomRotation 等。如果对 image、label 分别用 transform 处理一遍&#xff0c;则涉及随机操作的…

【概率方法】朗之万动力学 Langevin Dynamics

目前我们了解到采样方法有很多种&#xff0c;按照从朴素到高效的演变顺序大致是 反函数采样蒙特卡洛模拟&#xff08;求统计量&#xff09;接受-拒绝采样MCMC HM 算法Gibbs 采样 接上一篇文章&#xff0c;Gibbs 采样能在有条件分布 p ( x d ′ ∣ x − d ) p(\mathbf{x}_{d…

头歌-Python 基础

第1关&#xff1a;建模与仿真 1、 建模过程&#xff0c;通常也称为数学优化建模(Mathematical Optimization Modeling)&#xff0c;不同之处在于它可以确定特定场景的特定的、最优化或最佳的结果。这被称为诊断一个结果&#xff0c;因此命名为▁▁▁。 填空1答案&#xff1a;决…

【数据挖掘】国科大苏桂平老师数据库新技术课程作业 —— 第四次作业

云数据库研究 云计算与云数据库背景 云计算&#xff08;cloud computing&#xff09;是 IT 技术发展的最新趋势&#xff0c;正受到业界和学术界的广泛关注。云计算是在分布式处理、并行处理和网格计算等技术的基础上发展起来的&#xff0c;是一种新兴的共享基础架构的方法。它…

大数据技术7:基于StarRocks统一OALP实时数仓

前言&#xff1a; 大家对StarRocks 的了解可能不及 ClickHouse或者是远不及 ClickHouse 。但是大家可能听说过 Doris &#xff0c;而 StarRocks 实际上原名叫做 Doris DB &#xff0c;他相当于是一个加强版的也就是一个 Doris ,也就是说 Doris 所有的功能 StarRocks 都是有的&a…

this.$emit(‘update:isVisible‘, false)作用

这个写是不是很新颖&#xff0c;传父组件传值&#xff01;这是什么鬼。。。 假设你有以下逻辑业务。在A页面弹出一个组件B&#xff0c;A组件里面使用B组件&#xff0c;是否展示B组件你使用的是baselineShow变量控制&#xff01; <BaselineData :isVisible.sync"basel…

SQL命令---修改字段的排列位置

介绍 使用sql语句表字段的排列顺序。 命令 alter table 表名 modify 字段名1 数据类型 first|after 字段名2;例子 将a表中的age字段改为表的第一个字段。 alter table a modify age int(12) first;下面是执行命令后的表结构&#xff1a; 将a表中的age字段放到name字段之…

ELK简单介绍二

学习目标 能够部署kibana并连接elasticsearch集群能够通过kibana查看elasticsearch索引信息知道用filebeat收集日志相对于logstash的优点能够安装filebeat能够使用filebeat收集日志并传输给logstash kibana kibana介绍 Kibana是一个开源的可视化平台,可以为ElasticSearch集群…

linux 15day apache apache服务安装 httpd服务器 安装虚拟主机系统 一个主机 多个域名如何绑定

目录 一、apache安装二、访问控制总结修改默认网站发布目录 三、虚拟主机 一、apache安装 [rootqfedu.com ~]# systemctl stop firewalld [rootqfedu.com ~]# systemctl disable firewalld [rootqfedu.com ~]# setenforce 0 [rootqfedu.com ~]# yum install -y httpd [rootqfe…
最新文章