(四)elasticsearch 源码之索引流程分析

https://www.cnblogs.com/darcy-yuan/p/17024341.html

1.概览

前面我们讨论了es是如何启动,本文研究下es是如何索引文档的。

下面是启动流程图,我们按照流程图的顺序依次描述。

 

其中主要类的关系如下:

2. 索引流程 (primary)

我们用postman发送请求,创建一个文档

我们发送的是http请求,es也有一套http请求处理逻辑,和spring的mvc类似

// org.elasticsearch.rest.RestController

private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
        final int contentLength = request.content().length();
        if (contentLength > 0) {
            final XContentType xContentType = request.getXContentType(); // 校验content-type
            if (xContentType == null) {
                sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
                return;
            }
            if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
                channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,
                    "Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"));
                return;
            }
        }
        RestChannel responseChannel = channel;
        try {
            if (handler.canTripCircuitBreaker()) {
                inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
            } else {
                inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
            }
            // iff we could reserve bytes for the request we need to send the response also over this channel
            responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
            handler.handleRequest(request, responseChannel, client);
        } catch (Exception e) {
            responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
        }
    }
// org.elasticsearch.rest.BaseRestHandler 

    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // prepare the request for execution; has the side effect of touching the request parameters
        final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet<String> unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) {
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }

        if (request.hasContent() && request.isContentConsumed() == false) {
            throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
        }

        usageCount.increment();
        // execute the action
        action.accept(channel); // 执行action
    }
// org.elasticsearch.rest.action.document.RestIndexAction

    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        IndexRequest indexRequest;
        final String type = request.param("type");
         if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {
            deprecationLogger.deprecatedAndMaybeLog("index_with_types", TYPES_DEPRECATION_MESSAGE); // type 已经废弃
            indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));
        } else {
            indexRequest = new IndexRequest(request.param("index"));
            indexRequest.id(request.param("id"));
        }
        indexRequest.routing(request.param("routing"));
        indexRequest.setPipeline(request.param("pipeline"));
        indexRequest.source(request.requiredContent(), request.getXContentType());
        indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
        indexRequest.setRefreshPolicy(request.param("refresh"));
        indexRequest.version(RestActions.parseVersion(request));
        indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
        indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
        indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
        String sOpType = request.param("op_type");
        String waitForActiveShards = request.param("wait_for_active_shards");
        if (waitForActiveShards != null) {
            indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
        }
        if (sOpType != null) {
            indexRequest.opType(sOpType);
        }

        return channel ->
                client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))); // 执行index操作的consumer
    }

然后我们来看index操作具体是怎么处理的,主要由TransportAction管理

// org.elasticsearch.action.support.TransportAction

public final Task execute(Request request, ActionListener<Response> listener) {
        /*
         * While this version of execute could delegate to the TaskListener
         * version of execute that'd add yet another layer of wrapping on the
         * listener and prevent us from using the listener bare if there isn't a
         * task. That just seems like too many objects. Thus the two versions of
         * this method.
         */
        Task task = taskManager.register("transport", actionName, request); // 注册任务管理器,call -> task
        execute(task, request, new ActionListener<Response>() { // ActionListener 封装
            @Override
            public void onResponse(Response response) {
                try {
                    taskManager.unregister(task);
                } finally {
                    listener.onResponse(response);
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    taskManager.unregister(task);
                } finally {
                    listener.onFailure(e);
                }
            }
        });
        return task;
    }
...
    public final void execute(Task task, Request request, ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
            if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

         if (task != null && request.getShouldStoreResult()) {
            listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
        }

        RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger); // 链式处理
        requestFilterChain.proceed(task, actionName, request, listener);
    }
...
    public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
              int i = index.getAndIncrement();
            try {
                 if (i < this.action.filters.length) {
                    this.action.filters[i].apply(task, actionName, request, listener, this); // 先处理过滤器
                   } else if (i == this.action.filters.length) {
                      this.action.doExecute(task, request, listener); // 执行action操作
                } else {
                    listener.onFailure(new IllegalStateException("proceed was called too many times"));
                }
            } catch(Exception e) {
                logger.trace("Error during transport action execution.", e);
                listener.onFailure(e);
            }
        }

实际上是TransportBulkAction执行具体操作

// org.elasticsearch.action.bulk.TransportBulkAction

protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
        final long startTime = relativeTime();
        final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

        boolean hasIndexRequestsWithPipelines = false;
        final MetaData metaData = clusterService.state().getMetaData();
        ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
            IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
            if (indexRequest != null) {
                // get pipeline from request
                String pipeline = indexRequest.getPipeline();
                if (pipeline == null) { // 不是管道
                    // start to look for default pipeline via settings found in the index meta data
                    IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
                    // check the alias for the index request (this is how normal index requests are modeled)
                    if (indexMetaData == null && indexRequest.index() != null) {
                        AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); // 使用别名
                        if (indexOrAlias != null && indexOrAlias.isAlias()) {
                            AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
                            indexMetaData = alias.getWriteIndex();
                        }
                    }
                    // check the alias for the action request (this is how upserts are modeled)
                     if (indexMetaData == null && actionRequest.index() != null) {
                        AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
                        if (indexOrAlias != null && indexOrAlias.isAlias()) {
                            AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
                            indexMetaData = alias.getWriteIndex();
                        }
                    }
                    if (indexMetaData != null) {
                        // Find the default pipeline if one is defined from and existing index.
                        String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
                        indexRequest.setPipeline(defaultPipeline);
                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
                            hasIndexRequestsWithPipelines = true;
                        }
                    } else if (indexRequest.index() != null) {
                        // No index exists yet (and is valid request), so matching index templates to look for a default pipeline
                        List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
                        assert (templates != null);
                        String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
                        // order of templates are highest order first, break if we find a default_pipeline
                        for (IndexTemplateMetaData template : templates) {
                            final Settings settings = template.settings();
                            if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
                                defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
                                break;
                            }
                        }
                        indexRequest.setPipeline(defaultPipeline);
                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
                            hasIndexRequestsWithPipelines = true;
                        }
                    }
                } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
                    hasIndexRequestsWithPipelines = true;
                }
            }
        }

        if (hasIndexRequestsWithPipelines) {
            // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
            // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
            // this path is never taken.
            try {
                if (clusterService.localNode().isIngestNode()) {
                    processBulkIndexIngestRequest(task, bulkRequest, listener);
                } else {
                    ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
                }
            } catch (Exception e) {
                listener.onFailure(e);
            }
            return;
        }

        if (needToCheck()) { // 根据批量请求自动创建索引,方便后续写入数据
            // Attempt to create all the indices that we're going to need during the bulk before we start.
            // Step 1: collect all the indices in the request
              final Set<String> indices = bulkRequest.requests.stream()
                    // delete requests should not attempt to create the index (if the index does not
                    // exists), unless an external versioning is used
                .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
                        || request.versionType() == VersionType.EXTERNAL
                        || request.versionType() == VersionType.EXTERNAL_GTE)
                .map(DocWriteRequest::index)
                .collect(Collectors.toSet());
            /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
             * that we'll use when we try to run the requests. */
            final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
            Set<String> autoCreateIndices = new HashSet<>();
            ClusterState state = clusterService.state();
            for (String index : indices) {
                boolean shouldAutoCreate;
                try {
                    shouldAutoCreate = shouldAutoCreate(index, state);
                } catch (IndexNotFoundException e) {
                    shouldAutoCreate = false;
                    indicesThatCannotBeCreated.put(index, e);
                }
                if (shouldAutoCreate) {
                    autoCreateIndices.add(index);
                }
            }
            // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
            if (autoCreateIndices.isEmpty()) {
                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); // 索引
            } else {
                final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
                for (String index : autoCreateIndices) {
                    createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
                        @Override
                        public void onResponse(CreateIndexResponse result) {
                            if (counter.decrementAndGet() == 0) {
                                threadPool.executor(ThreadPool.Names.WRITE).execute(
                                    () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
                                // fail all requests involving this index, if create didn't work
                                for (int i = 0; i < bulkRequest.requests.size(); i++) {
                                    DocWriteRequest<?> request = bulkRequest.requests.get(i);
                                    if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                        bulkRequest.requests.set(i, null);
                                    }
                                }
                            }
                            if (counter.decrementAndGet() == 0) {
                                executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
                                    inner.addSuppressed(e);
                                    listener.onFailure(inner);
                                }), responses, indicesThatCannotBeCreated);
                            }
                        }
                    });
                }
            }
        } else {
            executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
        }
    }

接下来, BulkOperation将 BulkRequest 转换成 BulkShardRequest,也就是具体在哪个分片上执行操作

// org.elasticsearch.action.bulk.TransportBulkAction

protected void doRun() {
            final ClusterState clusterState = observer.setAndGetObservedState();
            if (handleBlockExceptions(clusterState)) {
                return;
            }
            final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
            MetaData metaData = clusterState.metaData();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
                //the request can only be null because we set it to null in the previous step, so it gets ignored
                if (docWriteRequest == null) {
                    continue;
                }
                if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
                    continue;
                }
                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); // 解析索引
                try {
                    switch (docWriteRequest.opType()) {
                        case CREATE:
                        case INDEX:
                            IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                            final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                            MappingMetaData mappingMd = indexMetaData.mappingOrDefault();
                            Version indexCreated = indexMetaData.getCreationVersion();
                            indexRequest.resolveRouting(metaData);
                            indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); // 校验indexRequest,自动生成id
                            break;
                        case UPDATE:
                            TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),
                                (UpdateRequest) docWriteRequest);
                            break;
                        case DELETE:
                            docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
                            // check if routing is required, if so, throw error if routing wasn't specified
                            if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName())) {
                                throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
                            }
                            break;
                        default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                    }
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
                        docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }

            // first, go over all the requests and create a ShardId -> Operations mapping
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> request = bulkRequest.requests.get(i);
                if (request == null) {
                    continue;
                }
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
                    request.routing()).shardId(); // 根据文档id路由确定分片
                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                shardRequests.add(new BulkItemRequest(i, request));
            }

            if (requestsByShard.isEmpty()) {
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                    buildTookInMillis(startTimeNanos)));
                return;
            }

            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            String nodeId = clusterService.localNode().getId();
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                final ShardId shardId = entry.getKey();
                final List<BulkItemRequest> requests = entry.getValue();
                BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), // 构建BulkShardRequest
                        requests.toArray(new BulkItemRequest[requests.size()]));
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkShardRequest.timeout(bulkRequest.timeout());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                            // we may have no response if item failed
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
                            DocWriteRequest<?> docWriteRequest = request.request();
                            responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
                                    new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                            buildTookInMillis(startTimeNanos)));
                    }
                });
            }
        }

看下id路由逻辑

// org.elasticsearch.cluster.routing.OperationRouting

    public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
        final String effectiveRouting;
        final int partitionOffset;

        if (routing == null) {
            assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
            effectiveRouting = id;
        } else {
            effectiveRouting = routing;
        }

        if (indexMetaData.isRoutingPartitionedIndex()) {
            partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
        } else {
            // we would have still got 0 above but this check just saves us an unnecessary hash calculation
            partitionOffset = 0;
        }

        return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
    }

    private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
        final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset; // hash

        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash documents
        return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
    }

然后看看此分片是在当前节点,还是远程节点上,现在进入routing阶段。(笔者这里只启动了一个节点,我们就看下本地节点的逻辑)

// org.elasticsearch.action.support.replication.TransportReplicationAction

protected void doRun() {
            setPhase(task, "routing");
            final ClusterState state = observer.setAndGetObservedState();
            final String concreteIndex = concreteIndex(state, request);
            final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
            if (blockException != null) {
                if (blockException.retryable()) {
                    logger.trace("cluster is blocked, scheduling a retry", blockException);
                    retry(blockException);
                } else {
                    finishAsFailed(blockException);
                }
            } else {
                // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
                final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
                if (indexMetaData == null) {
                    retry(new IndexNotFoundException(concreteIndex));
                    return;
                }
                if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                    throw new IndexClosedException(indexMetaData.getIndex());
                }

                // resolve all derived request fields, so we can route and apply it
                resolveRequest(indexMetaData, request);
                assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
                    "request waitForActiveShards must be set in resolveRequest";

                final ShardRouting primary = primary(state);
                if (retryIfUnavailable(state, primary)) {
                    return;
                }
                final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
                if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { // 根据路由确定primary在哪个node上,然后和当前node做比较
                    performLocalAction(state, primary, node, indexMetaData);
                } else {
                    performRemoteAction(state, primary, node);
                }
            }
        }

既然是当前节点,那就是发送内部请求

// org.elasticsearch.transport.TransportService

private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                   final TransportRequest request,
                                                                   final TransportRequestOptions options,
                                                                   TransportResponseHandler<T> handler) {
        if (connection == null) {
            throw new IllegalStateException("can't send request to a null connection");
        }
        DiscoveryNode node = connection.getNode();

        Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
        ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
        // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
        final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
        final TimeoutHandler timeoutHandler;
        if (options.timeout() != null) {
            timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
            responseHandler.setTimeoutHandler(timeoutHandler);
        } else {
            timeoutHandler = null;
        }
        try {
            if (lifecycle.stoppedOrClosed()) {
                /*
                 * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
                 * caller. It will only notify if toStop hasn't done the work yet.
                 *
                 * Do not edit this exception message, it is currently relied upon in production code!
                 */
                // TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
                throw new TransportException("TransportService is closed stopped can't send request");
            }
            if (timeoutHandler != null) {
                assert options.timeout() != null;
                timeoutHandler.scheduleTimeout(options.timeout());
            }
            connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
 ...
     private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
        final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
        try {
            onRequestSent(localNode, requestId, action, request, options);
            onRequestReceived(requestId, action);
            final RequestHandlerRegistry reg = getRequestHandler(action); // 注册器模式 action -> handler
            if (reg == null) {
                throw new ActionNotFoundTransportException("Action [" + action + "] not found");
            }
            final String executor = reg.getExecutor();
            if (ThreadPool.Names.SAME.equals(executor)) {
                //noinspection unchecked
                reg.processMessageReceived(request, channel);
            } else {
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        //noinspection unchecked
                        reg.processMessageReceived(request, channel); // 处理请求
                      }

                    @Override
                    public boolean isForceExecution() {
                        return reg.isForceExecution();
                    }

                    @Override
                    public void onFailure(Exception e) {
                        try {
                            channel.sendResponse(e);
                        } catch (Exception inner) {
                            inner.addSuppressed(e);
                            logger.warn(() -> new ParameterizedMessage(
                                    "failed to notify channel of error message for action [{}]", action), inner);
                        }
                    }

                    @Override
                    public String toString() {
                        return "processing of [" + requestId + "][" + action + "]: " + request;
                    }
                });
            }

然后获取在分片上的执行请求许可

// org.elasticsearch.action.support.replication.TransportReplicationAction

protected void doRun() throws Exception {
            final ShardId shardId = primaryRequest.getRequest().shardId();
            final IndexShard indexShard = getIndexShard(shardId);
            final ShardRouting shardRouting = indexShard.routingEntry();
            // we may end up here if the cluster state used to route the primary is so stale that the underlying
            // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
            // the replica will take over and a replica will be assigned to the first node.
            if (shardRouting.primary() == false) {
                throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
            }
            final String actualAllocationId = shardRouting.allocationId().getId();
            if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
                throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
                    primaryRequest.getTargetAllocationID(), actualAllocationId);
            }
            final long actualTerm = indexShard.getPendingPrimaryTerm();
            if (actualTerm != primaryRequest.getPrimaryTerm()) {
                throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
                    primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
            }

            acquirePrimaryOperationPermit( // 获取在primary分片上执行操作的许可
                    indexShard,
                    primaryRequest.getRequest(),
                    ActionListener.wrap(
                            releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
                            e -> {
                                if (e instanceof ShardNotInPrimaryModeException) {
                                    onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
                                } else {
                                    onFailure(e);
                                }
                            }));
         }

现在进入primary阶段

// org.elasticsearch.action.support.replication.TransportReplicationAction                    

                    setPhase(replicationTask, "primary");

                    final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
                        primaryShardReference.close(); // release shard operation lock before responding to caller
                        setPhase(replicationTask, "finished");
                        onCompletionListener.onResponse(response);
                    }, e -> handleException(primaryShardReference, e));

                    final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
                        if (syncGlobalCheckpointAfterOperation) {
                            final IndexShard shard = primaryShardReference.indexShard;
                            try {
                                shard.maybeSyncGlobalCheckpoint("post-operation");
                            } catch (final Exception e) {
                                // only log non-closed exceptions
                                if (ExceptionsHelper.unwrap(
                                    e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
                                    // intentionally swallow, a missed global checkpoint sync should not fail this operation
                                    logger.info(
                                        new ParameterizedMessage(
                                            "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
                                }
                            }
                        }
                        referenceClosingListener.onResponse(response);
                    }, referenceClosingListener::onFailure);

                    new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
                        ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
                        newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();

中间的调用跳转不赘述,最后TransportShardBulkAction 调用索引引引擎

// org.elasticsearch.action.bulk.TransportShardBulkAction
static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
                                       MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
                                       ActionListener<Void> itemDoneListener) throws Exception {
        final DocWriteRequest.OpType opType = context.getCurrent().opType();

        final UpdateHelper.Result updateResult;
        if (opType == DocWriteRequest.OpType.UPDATE) {
            final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
            try {
                updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
            } catch (Exception failure) {
                // we may fail translating a update to index or delete operation
                // we use index result to communicate failure while translating update request
                final Engine.Result result =
                    new Engine.IndexResult(failure, updateRequest.version());
                context.setRequestToExecute(updateRequest);
                context.markOperationAsExecuted(result);
                context.markAsCompleted(context.getExecutionResult());
                return true;
            }
            // execute translated update request
            switch (updateResult.getResponseResult()) {
                case CREATED:
                case UPDATED:
                    IndexRequest indexRequest = updateResult.action();
                    IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData();
                    MappingMetaData mappingMd = metaData.mappingOrDefault();
                    indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
                    context.setRequestToExecute(indexRequest);
                    break;
                case DELETED:
                    context.setRequestToExecute(updateResult.action());
                    break;
                case NOOP:
                    context.markOperationAsNoOp(updateResult.action());
                    context.markAsCompleted(context.getExecutionResult());
                    return true;
                default:
                    throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
            }
        } else {
            context.setRequestToExecute(context.getCurrent());
            updateResult = null;
        }

        assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state

        final IndexShard primary = context.getPrimary();
        final long version = context.getRequestToExecute().version();
        final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
        final Engine.Result result;
        if (isDelete) {
            final DeleteRequest request = context.getRequestToExecute();
            result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
                request.ifSeqNo(), request.ifPrimaryTerm());
        } else {
            final IndexRequest request = context.getRequestToExecute();
            result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( // lucene 执行引擎
                    request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
                request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
        }

3.索引流程(replica)

在ReplicationOperation的execute方法中,primary分片执行完操作后,监听器会向复制分片发送请求

// org.elasticsearch.action.support.replication.ReplicationOperation

    public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure)); // 监听器调用 handlePrimaryResult
    }

    private void handlePrimaryResult(final PrimaryResultT primaryResult) {
        this.primaryResult = primaryResult;
        primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
        primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
            }
            // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
            // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
            // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
            // of the sampled replication group, and advanced further than what the given replication group would allow it to.
            // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.computedGlobalCheckpoint();
            // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
            // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
            // on.
            final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
            assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup);
            performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); // 在复制分片上执行操作
        }
        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }

    ...
    private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
                                   final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
        // for total stats, add number of unassigned shards and
        // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
        totalShards.addAndGet(replicationGroup.getSkippedShards().size());

        final ShardRouting primaryRouting = primary.routingEntry();

        for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { // 轮询各个复制分片
            if (shard.isSameAllocation(primaryRouting) == false) {
                performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
            }
        }
    }
    private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
                                  final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet();
        replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, // 调用代理ReplicasProxy
            new ActionListener<ReplicaResponse>() {
                @Override
                public void onResponse(ReplicaResponse response) {
                    successfulShards.incrementAndGet();
                    try {
                        primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
                        primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                    } catch (final AlreadyClosedException e) {
                        // the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                    } catch (final Exception e) {
                        // fail the primary but fall through and let the rest of operation processing complete
                        final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                        primary.failShard(message, e);
                    }
                    decPendingAndFinishIfNeeded();
                }

                @Override
                public void onFailure(Exception replicaException) {
                    logger.trace(() -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(), opType, shard, replicaRequest), replicaException);
                    // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
                    if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                        RestStatus restStatus = ExceptionsHelper.status(replicaException);
                        shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                            shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    }
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
                        ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
                }

                @Override
                public String toString() {
                    return "[" + replicaRequest + "][" + shard + "]";
                }
            });
    }
        

发送transport请求

// org.elasticsearch.action.support.replication.TransportReplicationAction

public void performOn(
        final ShardRouting replica,
        final ReplicaRequest request,
        final long primaryTerm,
        final long globalCheckpoint,
        final long maxSeqNoOfUpdatesOrDeletes,
        final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
    String nodeId = replica.currentNodeId();
    final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
    if (node == null) {
        listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
        return;
    }
    final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
        request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
    final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener,
        ReplicaResponse::new);
    transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); // 发送transport请求
}

副分片收到请求处理结果与主分片类似,最后调用lucene引擎

// org.elasticsearch.action.bulk.TransportShardBulkAction

private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest<?> docWriteRequest,
                                                    IndexShard replica) throws Exception {
        final Engine.Result result;
        switch (docWriteRequest.opType()) {
            case CREATE:
            case INDEX:
                final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                final ShardId shardId = replica.shardId();
                final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.type(), indexRequest.id(),
                    indexRequest.source(), indexRequest.getContentType(), indexRequest.routing());
                result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), // 调用lucene引擎
                    indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
                break;
            case DELETE:
                DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
                result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
                    deleteRequest.type(), deleteRequest.id());
                break;
            default:
                assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;
                throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
        }

4.总结

本文简单描述了es索引流程,包括了http请求是如何解析的,如何确定分片的。但是仍有许多不足,比如没有讨论远程节点是如何处理的,lucene执行引擎的细节,后面博客会继续探讨这些课题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/375413.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Java八股面试系列】JVM-class文件结构

Class文件结构总结 根据 Java 虚拟机规范&#xff0c;Class 文件通过 ClassFile 定义&#xff0c;有点类似 C 语言的结构体。我们之前都是使用javap命令来对字节码文件进行反编译查看的&#xff0c;我们可以使用WinHex软件&#xff08;Mac平台可以使用010 Editor&#xff09;来…

如何决定K8S Pod的剔除优先级

在Kubernetes&#xff08;k8s&#xff09;中&#xff0c;当节点资源面临压力时&#xff0c;如何决定Pod的优先级是一个关键问题。在Kubernetes 1.8版本之后&#xff0c;引入了基于Pod优先级的调度策略&#xff0c;即Pod Priority Preemption。这种策略允许在资源不足的情况下&a…

git命令远程仓库推送本地项目报错了,解决方案

如果你在使用git命令上传本地项目到远程仓库遇到了如下错误&#xff1a; Updates were rejected because the tip of your current branch is behind。n 别慌&#xff0c;肯定是你的远程仓库里面有原始文件&#xff0c;需要你提前进行一下合并操作&#xff0c;然后才能使用pu…

蓝桥杯嵌入式第8届真题(完成) STM32G431

蓝桥杯嵌入式第8届真题(完成) STM32G431 题目 分析和代码 对比第六届和第七届&#xff0c;这届的题目在逻辑思维上确实要麻烦不少&#xff0c;可以从题目看出&#xff0c;这届题目对时间顺序的要求很严格&#xff0c;所以就可以使用状态机的思想来编程&#xff0c;拿到类似题…

MyBatis多数据源以及动态切换实现(基于SpringBoot 2.7.x)

MyBatis多数据源以及动态切换实现可以实现不同功能模块可以对应到不同的数据库&#xff0c;现在就让我们来讲解一下。 目录 一、引入Maven二、配置文件三、实现多数据源四、动态切换数据源 一、引入Maven 注意&#xff1a;博主这边使用的springboot版本是2.7.14的 <!-- htt…

thinkphp6入门(19)-- 中间件向控制器传参

可以通过给请求对象赋值的方式传参给控制器&#xff08;或者其它地方&#xff09;&#xff0c;例如 <?phpnamespace app\middleware;class Hello {public function handle($request, \Closure $next){$request->hello ThinkPHP;return $next($request);} } 然后在控制…

类图(Class diagram)

类图主要是用来展现软件系统中的类、接口以及它们之间的静态结构 。 一、元素 1、类 ​ 从上到下分为三部分&#xff0c;分别是类名、属性和操作。 类名是必须有的。​类如果有属性&#xff0c;则每一个属性必须有一个名字&#xff0c;另外还可以有其他的描述信息&#xff…

论文阅读-通过云特征增强的深度学习预测云工作负载转折点

论文名称&#xff1a;Cloud Workload Turning Points Prediction via Cloud Feature-Enhanced Deep Learning 摘要 云工作负载转折点要么是代表工作负载压力的局部峰值点&#xff0c;要么是代表资源浪费的局部谷值点。预测这些关键点对于向系统管理者发出警告、采取预防措施以…

选择大语言模型:2024 年开源 LLM 入门指南

作者&#xff1a;来自 Elastic Aditya Tripathi 如果说人工智能在 2023 年起飞&#xff0c;这绝对是轻描淡写的说法。数千种新的人工智能工具被推出&#xff0c;人工智能功能被添加到现有的应用程序中&#xff0c;好莱坞因对这项技术的担忧而戛然而止。 甚至还有一个人工智能工…

【网站项目】038汽车养护管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

sql实现将某一列下移一行

问题 实现如下图所示的 max_salary 下移一行 方法&#xff1a;使用开窗函数 select max_salary, max(max_salary) over(order by max_salary asc rows between 1 PRECEDING and 1 PRECEDING) max_salary_plus from jobs

电商小程序02数据源设计

上一篇我们讲解了电商小程序的需求分析&#xff0c;分析了需要具备的功能并且绘制了系统原型。有了原型之后下一步的事情就是根据原型来设计数据源。 数据源就像盖房子打地基一样&#xff0c;地基打不好&#xff0c;楼可能就盖不高&#xff0c;盖起来要再想调整就比较困难。 …

使用Qt创建项目 Qt中输出内容到控制台 设置窗口大小和窗口标题 Qt查看说明文档

按windows键&#xff0c;找到Qt Creator &#xff0c;打开 一.创建带模板的项目 新建项目 设置项目路径QMainWindow是带工具栏的窗口。 QWidget是无工具栏的窗口。 QDuakig是对话框窗口。创建好的项目如下&#xff1a; #include "widget.h"// 构造函数&#xff…

Windows系统安装Flink及实现MySQL之间数据同步

Apache Flink是一个框架和分布式处理引擎&#xff0c;用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行&#xff0c;并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理&#xff0c;并且是一个面向流处理和批处理…

03 动力云客项目之登录功能后端实现

创建项目 使用Spring initializr初始化项目 老师讲的是3.2.0, 但小版本之间问题应该不大.

14.0 Zookeeper环球锁实现原理

全局锁是控制全局系统之间同步访问共享资源的一种方式。 下面介绍zookeeper如何实现全民锁&#xff0c;讲解他锁和共享锁两类全民锁。 排他锁 排他锁&#xff08;Exclusive Locks&#xff09;&#xff0c;又被称为写锁或独占锁&#xff0c;如果事务T1对数据对象O1加上排他锁…

2023年09月CCF-GESP编程能力等级认证C++编程一级真题解析

一、单选题(共15题,共30分) 第1题 我们通常说的“内存”属于计算机中的( )。 A:输出设备 B:输入设备 C:存储设备 D:打印设备 答案:C 第2题 以下C++不可以作为变量的名称的是( )。 A:redStar B:RedStar C:red_star D:red star 答案:D 第3题 C++表达式…

Linux C/C++ 原始套接字:打造链路层ping实现

在C/C中&#xff0c;我们可以使用socket函数来创建套接字。我们需要指定地址族为AF_PACKET&#xff0c;协议为htons(ETH_P_ALL)来捕获所有传入和传出的数据包。 可以使用sendto和recvfrom函数来发送和接收数据包。我们需要构建一个合法的链路层数据包&#xff0c;在数据包的头…

C++ //练习 4.23 因为运算符的优先级问题,下面这条表达式无法通过编译。根据4.12节中的表(第147页)指出它的问题在哪里?应该如何修改?

C Primer&#xff08;第5版&#xff09; 练习 4.23 练习 4.23 因为运算符的优先级问题&#xff0c;下面这条表达式无法通过编译。根据4.12节中的表&#xff08;第147页&#xff09;指出它的问题在哪里&#xff1f;应该如何修改&#xff1f; string s "word"; stri…

树莓派4b连接WQ9201外置无线网卡命令行配置详解

树莓派4B连接WQ9201无线网卡 接线方式 蓝色的线来连接树莓派和WQ9201demo板&#xff0c;USB接树莓派的USB接口&#xff0c;microUSB一端接demo板靠近天线部分的microUSB口。 驱动和固件准备 驱动直接放在树莓派系统的任意目录&#xff0c;目前配置则是将驱动放在树莓派的主目…
最新文章