Android RxJava3 原理浅析

使用

  val retrofit =Retrofit.Builder()
            .baseUrl("https://api.github.com/")
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
            .build()
        val api = retrofit.create(API::class.java)
        api.getRepo("rengwuxian")
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : SingleObserver<MutableList<Repo>>{
                override fun onSubscribe(d: Disposable?) {
                    LogUtil.logD("onSubscribe${Thread.currentThread().name}")
                    clv1.text = "onSubscribe"
                }

                override fun onSuccess(t: MutableList<Repo>) {
                    LogUtil.logD(t[0].name)
                    clv1.text =t[0].name
                }

                override fun onError(e: Throwable) {
                    LogUtil.logD(e.message!!)
                    clv1.text = e.message!!
                }
            })

RxJava需要我们主动切线程

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

也可以

.addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))

在所有配置中设为后台线程

.subscribeOn(Schedulers.io()) 那么这个就不需要了

源码:

private RxJava3CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
    this.scheduler = scheduler;
    this.isAsync = isAsync;
  }

内部赋值

Single.just 发送顺时事件

Single.just("1")
            .subscribe(object : SingleObserver<String?>{
                override fun onSubscribe(d: Disposable?) {
                    TODO("Not yet implemented")
                }

                override fun onSuccess(t: String?) {
                    TODO("Not yet implemented")
                }

                override fun onError(e: Throwable?) {
                    TODO("Not yet implemented")
                }

            })

源码:

 public static <@NonNull T> Single<T> just(T item) {
        Objects.requireNonNull(item, "item is null"); //先判断空
        return RxJavaPlugins.onAssembly(new SingleJust<>(item)); 
    }


//钩子方法
 public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

//Single
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }

}


关键方法
 @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed()); //产生订阅 然后丢弃  是个枚举类
        observer.onSuccess(value); //成功  ,没有失败的回调因为不可能失败 因为是一个可用对象
    }




操作符: map,转换对象类型

 val single: Single<Int> = Single.just(1)
        val singleString = single.map(object : Function<Int,String>{
            override fun apply(t: Int): String {
                return t.toString()
            }

        })

多个single map

按时间发送数据: 从0开始发送,间隔1秒

 Observable.interval(0,1,TimeUnit.SECONDS)
            .subscribe(object : Observer<Long>{
                override fun onSubscribe(d: Disposable?) {
                    TODO("Not yet implemented")
                }

                override fun onNext(t: Long?) {
                   clv1.text= t!!.toString()
                }

                override fun onError(e: Throwable?) {
                    TODO("Not yet implemented")
                }

                override fun onComplete() {
                    TODO("Not yet implemented")
                }

            })
 public static Observable<Long> interval(long initialDelay, long period, @NonNull TimeUnit unit) {
        return interval(initialDelay, period, unit, Schedulers.computation());
    }

interval

/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is); //传入

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }

    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }

        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
}







===
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is);

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }

//实现了Disposable 然后extends AtomicReference引用,线程安全的Disposable
    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this); //调用内部的dis
        }

        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }

        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
}

 // dispose
 public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get(); //拿到内部的
        Disposable d = DISPOSED;
        if (current != d) { //如果已经被取消
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

//setResource  把内部值设为传入的值 ,只设置一次
    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        Objects.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

实际设置定时任务的代码: ObservableInterval->subscribeActual

Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
@Override
public void run() {
    if (get() != DisposableHelper.DISPOSED) {
        downstream.onNext(count++);
    }
}
ObservableInterval 内部维护了一个IntervalObserver创建和取消

SingleMap.delay

public final Single<T> delay(long time, @NonNull TimeUnit unit) {
    return delay(time, unit, Schedulers.computation(), false);
}
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;

public final class SingleDelay<T> extends Single<T> {

    final SingleSource<? extends T> source;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.source = source;
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {

        final SequentialDisposable sd = new SequentialDisposable(); //内部维护
        observer.onSubscribe(sd); //下游的sd
        source.subscribe(new Delay(sd, observer)); //传递到上游
    }

    final class Delay implements SingleObserver<T> {
        private final SequentialDisposable sd;
        final SingleObserver<? super T> downstream;

        Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
            this.sd = sd;
            this.downstream = observer;
        }

        @Override
        public void onSubscribe(Disposable d) {
            sd.replace(d);
        }

        @Override
        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit)); 
//线程调度,进行置换,置换为本地,不再是上游
        }

        @Override
        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
        }

        final class OnSuccess implements Runnable {
            private final T value;

            OnSuccess(T value) {
                this.value = value;
            }

            @Override
            public void run() {
                downstream.onSuccess(value);
            }
        }

        final class OnError implements Runnable {
            private final Throwable e;

            OnError(Throwable e) {
                this.e = e;
            }

            @Override
            public void run() {
                downstream.onError(e);
            }
        }
    }
}




/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.disposables;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.disposables.Disposable;

/**
 * A Disposable container that allows updating/replacing a Disposable
 * atomically and with respect of disposing the container itself.
 * <p>
 * The class extends AtomicReference directly so watch out for the API leak!
 * @since 2.0
 */
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {

    private static final long serialVersionUID = -754898800686245608L;

    /**
     * Constructs an empty SequentialDisposable.
     */
    public SequentialDisposable() {
        // nothing to do
    }

    /**
     * Construct a SequentialDisposable with the initial Disposable provided.
     * @param initial the initial disposable, null allowed
     */
    public SequentialDisposable(Disposable initial) {
        lazySet(initial);
    }

    /**
     * Atomically: set the next disposable on this container and dispose the previous
     * one (if any) or dispose next if the container has been disposed.
     * @param next the Disposable to set, may be null
     * @return true if the operation succeeded, false if the container has been disposed
     * @see #replace(Disposable)
     */
    public boolean update(Disposable next) {
        return DisposableHelper.set(this, next);
    }

    /**
     * Atomically: set the next disposable on this container but don't dispose the previous
     * one (if any) or dispose next if the container has been disposed.
     * @param next the Disposable to set, may be null
     * @return true if the operation succeeded, false if the container has been disposed
     * @see #update(Disposable)
     */
    public boolean replace(Disposable next) {
        return DisposableHelper.replace(this, next);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

ObservableMap
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;

import java.util.Objects;

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
//通过上游调用
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

//上游
   public final void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {

            this.upstream = d;
//有上下游 且可中断的
            if (d instanceof QueueDisposable) {
                this.qd = (QueueDisposable<T>)d;
            }

            if (beforeDownstream()) {

                downstream.onSubscribe(this);

                afterDownstream();
            }

        }
    }

//dispose 上游dispose
 @Override
    public void dispose() {
        upstream.dispose();
    }


ObservableDelay

/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;

public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
    final long delay;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public ObservableDelay(ObservableSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        super(source);
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void subscribeActual(Observer<? super T> t) {
        Observer<T> observer;
        if (delayError) {
            observer = (Observer<T>)t; //
        } else {
            observer = new SerializedObserver<>(t);
        }

        Scheduler.Worker w = scheduler.createWorker();//线程调度器

        source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
//上游的DelayObserver
    }

    static final class DelayObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        final long delay;
        final TimeUnit unit;
        final Scheduler.Worker w;
        final boolean delayError;

        Disposable upstream;

        DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
            super();
            this.downstream = actual;
            this.delay = delay;
            this.unit = unit;
            this.w = w;
            this.delayError = delayError;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(final T t) {
            w.schedule(new OnNext(t), delay, unit);
        }

        @Override
        public void onError(final Throwable t) {
            w.schedule(new OnError(t), delayError ? delay : 0, unit);//local延时
        }

        @Override
        public void onComplete() {
            w.schedule(new OnComplete(), delay, unit);
        }

        @Override
        public void dispose() {
            upstream.dispose();
            w.dispose();
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        final class OnNext implements Runnable {
            private final T t;

            OnNext(T t) {
                this.t = t;
            }

            @Override
            public void run() {
                downstream.onNext(t);
            }
        }

        final class OnError implements Runnable {
            private final Throwable throwable;

            OnError(Throwable throwable) {
                this.throwable = throwable;
            }

            @Override
            public void run() {
                try {
                    downstream.onError(throwable);
                } finally {
                    w.dispose();
                }
            }
        }

        final class OnComplete implements Runnable {
            @Override
            public void run() {
                try {
                    downstream.onComplete();
                } finally {
                    w.dispose();
                }
            }
        }
    }
}



关联上下游 先验证 然后赋值
  public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this); //启动下游subsc
            }
        }




主要是看生产者有没有上游,有没有自己生产的 1 有上游,2自己生产的,

dispose 取消也是这个原则

线程切换:

subscribeOn
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent); //runnable,切线程

        parent.task.replace(f);//切换线程

    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> downstream;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
//d =上游传递的Disposable 
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            downstream.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            downstream.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);//上游取消切换
            task.dispose(); //内部取消
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);//对上游进行订阅
        }
    }

}

SingleObserveOn:
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
//调用subscribe,第一时间不切换线程
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual; 
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) //赋值下游 
{
                downstream.onSubscribe(this); //调用下游的Disposable 
//取消的时候 下游也取消
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);//置换切线程的任务,发消息前取消上游,然后切线程任务 
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);//取消的时候 下游也取消
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}




/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this); //切线程
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this); //切线程
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

切线程work

   @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();  //createWorker 1

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子方法

        DisposeTask task = new DisposeTask(decoratedRun, w);//

        w.schedule(task, delay, unit);

        return task;
    }


1
  @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory); //threadFactory 2
    }

2 
public interface ScheduledExecutorService extends ExecutorService {

}



//本质通过线程池创建管理   Executors
 public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
        exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
        return exec;
    }

切换主线程 mainThread通过  :Looper.getMainLooper()

static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
//发消息到主线程 handler =  new Handler(looper); loop = mainLoop
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/130149.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构之单链表

大家好&#xff0c;我们今天来简单的认识下单链表。 链表的概念及结构 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的 。 单链表就像图中的火车一样&#xff0c;是由一节一节车厢链接起来…

第12章 PyTorch图像分割代码框架-3:推理与部署

推理模块 模型训练完成后&#xff0c;需要单独再写一个推理模块来供用户测试或者使用&#xff0c;该模块可以命名为test.py或者inference.py&#xff0c;导入训练好的模型文件和待测试的图像&#xff0c;输出该图像的分割结果。inference.py主体部分如代码11-7所示。 代码11-7 …

Linux imu6ull驱动- led

一、GPIO模块结构 开始来啃手册了&#xff0c;打开我们的imx6ull手册。本章我们编写的是GPIO的&#xff0c;打开手册的第28章&#xff0c;这一章就有关于IMX6ULL 的 GPIO 模块结构。 mx6ull一共有5 组 GPIO&#xff08;GPIO1&#xff5e;GPIO5&#xff09; GPIO1 有 32 个引脚&…

C/C++输出硬币翻转 2021年6月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C硬币翻转 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C硬币翻转 2021年6月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 假设有N个硬币(N为不大于5000的正整数)&#xff0c;从1…

C语言--汉诺塔【内容超级详细】

今天与大家分享一下如何用C语言解决汉诺塔问题。 目录 一.前言 二.找规律⭐ 三.总结⭐⭐⭐ 四.代码实现⭐⭐ 一.前言 有一部很好看的电影《猩球崛起》⭐&#xff0c;说呀&#xff0c;人类为了抗击癌症发明了一种药物&#x1f357;&#xff0c;然后给猩猩做了实验&#xff0…

2020年五一杯数学建模A题煤炭价格预测问题解题全过程文档及程序

2020年五一杯数学建模 A题 煤炭价格预测问题 原题再现 煤炭属于大宗商品&#xff0c;煤炭价格既受国家相关部门的监管&#xff0c;又受国内煤炭市场的影响。除此之外&#xff0c;气候变化、出行方式、能源消耗方式、国际煤炭市场等其他因素也会影响煤炭价格。请完成如下问题。…

canvas 简单直线轨迹运动与线性插值计算

canvas 简单直线轨迹运动与线性插值计算 一、canvas 直线轨迹运行 添加 canvas 语法提示 通过/** type {HTMLCanvasElement} */代码块 来添加canvas的语法提示 <body><canvas id"canvas"></canvas> </body> <script>/** type {HTM…

软件模拟SPI协议的理解和使用编写W25Q64

SPI软件模拟的时序 SPI协议中&#xff0c;NSS、SCK、MOSI由主机产生&#xff0c;MISO由从机产生&#xff0c;在SCK每个时钟周期MOSI、MISO传输一位数据&#xff0c;数据的输入输出是同时进行的&#xff0c;所以读写数据也可以视作交换数据。所以读写时对数据位的控制都是用同一…

ke10javaweb停更

<jsp:getProperty property"isval" name "username"/> property来修改属性,name是类 所以如果调用tip在前面那么就是没有设置值 证明是先到java里面去运转

AI编程工具:一站式编程解决方案,引领AI编程新时代

在人工智能的巨浪之下&#xff0c;编程领域正在经历一场深刻的转型。这场转型的核心&#xff0c;就是AI智能编程工具的出现。它们为开发者提供了一种全新的编程方式&#xff0c;极大地提高了编程效率。在这场变革中&#xff0c;DevChat无疑是引领者之一。 一、DevChat&#xf…

计算机毕业设计 基于SpringBoot高校毕业与学位资格审核系统的设计与实现 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

AMD发布大小核 CPU,6核心直接砍成单核了

2022年 Intel 第12代酷睿发布&#xff0c;PE 大小核设计被正式带到了 PC 上。 P-Core 也就是传统的大核有着高性能、高功耗&#xff0c;而 E-Core 小核则是更讲究能效比以更低频率运行。 虽说小蝾也曾有对 Windows 调度方面的怀疑&#xff0c;但多线程性能确实实打实证明了其优…

2023年最受欢迎的9款产品原型工具大揭秘!

1.Pop (Prototyping on Paper) 价格&#xff1a;免费试用30天专业版960RMB/人/年 学习曲线&#xff1a;低 简介&#xff1a;POP是设计界面的原型工具&#xff0c;适用于iOS和Android平台。在POP的帮助下&#xff0c;开发人员或设计师只需在纸上简单地描述创意或想法&#xff…

SPI简介及FPGA通用MOSI模块实现

简介 SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外围设备接口&#xff09;通讯协议&#xff0c;是Motorola公司提出的一种同步串行接口技术。是一种高速、全双工、同步通信总线。在芯片中只占用四根管脚用来控制及数据传输。 优缺点&#xff1a; SPI通讯协…

向量数据库:释放数据潜能,重塑信息世界

前言 想必各位开发者一定使用过关系型数据库MySQL去存储我们的项目的数据&#xff0c;也有部分人使用过非关系型数据库Redis去存储我们的一些热点数据作为缓存&#xff0c;提高我们系统的响应速度&#xff0c;减小我们MySQL的压力。那么你有听说过向量数据库吗&#xff1f;知道…

白嫖阿里云服务器教程来了,薅秃阿里云!

白嫖阿里云服务器攻略来了&#xff0c;在阿里云免费试用中心可以申请免费服务器&#xff0c;但是阿里云百科不建议选择免费的&#xff0c;只有3个月使用时长&#xff0c;选择99元服务器不是更香&#xff0c;2核2G配置3M固定带宽&#xff0c;一年99元&#xff0c;重点是新老用户…

【Linux】-文件操作(重定向、缓冲区以及Linux下一切皆文件的详解)

&#x1f496;作者&#xff1a;小树苗渴望变成参天大树&#x1f388; &#x1f389;作者宣言&#xff1a;认真写好每一篇博客&#x1f4a4; &#x1f38a;作者gitee:gitee✨ &#x1f49e;作者专栏&#xff1a;C语言,数据结构初阶,Linux,C 动态规划算法&#x1f384; 如 果 你 …

如何自己实现一个丝滑的流程图绘制工具(九) 自定义连接线

背景 产品又有更近的想法了&#xff0c;bpmn-js的连接线你用的时候是看不到的&#xff0c;也就是你从左侧点击连接线的没有线随鼠标移动. 但是产品想要看得见的连接线移动拖拽。 咩有办法&#xff0c;不能换框架&#xff0c;那就只能自己实现啦&#xff01; 思路&#xff1a; …

06-MySQL-进阶-视图存储函数存储过程触发器

涉及资料 链接&#xff1a;https://pan.baidu.com/s/1M1oXN_pH3RGADx90ZFbfLQ?pwdCoke 提取码&#xff1a;Coke 一、视图 数据准备 create table student(id int auto_increment comment 主键ID primary key,name varchar(10) null comment 姓名,no varchar(10) null co…

JDBC(一)

第1章&#xff1a;JDBC概述 1.1 数据的持久化 持久化(persistence)&#xff1a;把数据保存到可掉电式存储设备中以供之后使用。大多数情况下&#xff0c;特别是企业级应用&#xff0c;数据持久化意味着将内存中的数据保存到硬盘上**&#xff0c;而持久化的实现过程大多通过各种…