import { Observable } from '../Observable'; import { ReplaySubject } from '../ReplaySubject'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { Subscriber } from '../Subscriber'; export interface ShareReplayConfig { bufferSize?: number; windowTime?: number; refCount: boolean; scheduler?: SchedulerLike; } /** * Share source and replay specified number of emissions on subscription. * * This operator is a specialization of `replay` that connects to a source observable * and multicasts through a `ReplaySubject` constructed with the specified arguments. * A successfully completed source will stay cached in the `shareReplayed observable` forever, * but an errored source can be retried. * * ## Why use shareReplay? * You generally want to use `shareReplay` when you have side-effects or taxing computations * that you do not wish to be executed amongst multiple subscribers. * It may also be valuable in situations where you know you will have late subscribers to * a stream that need access to previously emitted values. * This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`. * *  * * ## Example * ```ts * import { interval } from 'rxjs'; * import { shareReplay, take } from 'rxjs/operators'; * * const obs$ = interval(1000); * const shared$ = obs$.pipe( * take(4), * shareReplay(3) * ); * shared$.subscribe(x => console.log('source A: ', x)); * shared$.subscribe(y => console.log('source B: ', y)); * * ``` * * @see {@link publish} * @see {@link share} * @see {@link publishReplay} * * @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer. * @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds. * @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function * will be invoked on. * @return {Observable} An observable sequence that contains the elements of a sequence produced * by multicasting the source sequence within a selector function. * @method shareReplay * @owner Observable */ export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>; export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>; export function shareReplay<T>( configOrBufferSize?: ShareReplayConfig | number, windowTime?: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction<T> { let config: ShareReplayConfig; if (configOrBufferSize && typeof configOrBufferSize === 'object') { config = configOrBufferSize as ShareReplayConfig; } else { config = { bufferSize: configOrBufferSize as number | undefined, windowTime, refCount: false, scheduler }; } return (source: Observable<T>) => source.lift(shareReplayOperator(config)); } function shareReplayOperator<T>({ bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, refCount: useRefCount, scheduler }: ShareReplayConfig) { let subject: ReplaySubject<T> | undefined; let refCount = 0; let subscription: Subscription | undefined; let hasError = false; let isComplete = false; return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) { refCount++; let innerSub: Subscription; if (!subject || hasError) { hasError = false; subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler); innerSub = subject.subscribe(this); subscription = source.subscribe({ next(value) {; }, error(err) { hasError = true; subject.error(err); }, complete() { isComplete = true; subscription = undefined; subject.complete(); }, }); } else { innerSub = subject.subscribe(this); } this.add(() => { refCount--; innerSub.unsubscribe(); if (subscription && !isComplete && useRefCount && refCount === 0) { subscription.unsubscribe(); subscription = undefined; subject = undefined; } }); }; }
Name | Type | Size | Permission | Actions |
audit.ts | File | 4.14 KB | 0644 |
auditTime.ts | File | 2.38 KB | 0644 |
buffer.ts | File | 2.54 KB | 0644 |
bufferCount.ts | File | 4.46 KB | 0644 |
bufferTime.ts | File | 8.51 KB | 0644 |
bufferToggle.ts | File | 5.6 KB | 0644 |
bufferWhen.ts | File | 3.89 KB | 0644 |
catchError.ts | File | 4.72 KB | 0644 |
combineAll.ts | File | 2.44 KB | 0644 |
combineLatest.ts | File | 4.32 KB | 0644 |
concat.ts | File | 2.18 KB | 0644 |
concatAll.ts | File | 2.26 KB | 0644 |
concatMap.ts | File | 3.4 KB | 0644 |
concatMapTo.ts | File | 3.05 KB | 0644 |
count.ts | File | 3.78 KB | 0644 |
debounce.ts | File | 4.88 KB | 0644 |
debounceTime.ts | File | 4.54 KB | 0644 |
defaultIfEmpty.ts | File | 2.69 KB | 0644 |
delay.ts | File | 5.09 KB | 0644 |
delayWhen.ts | File | 7.7 KB | 0644 |
dematerialize.ts | File | 2.58 KB | 0644 |
distinct.ts | File | 4.22 KB | 0644 |
distinctUntilChanged.ts | File | 3.76 KB | 0644 |
distinctUntilKeyChanged.ts | File | 2.79 KB | 0644 |
elementAt.ts | File | 2.56 KB | 0644 |
endWith.ts | File | 4.06 KB | 0644 |
every.ts | File | 2.54 KB | 0644 |
exhaust.ts | File | 3.2 KB | 0644 |
exhaustMap.ts | File | 5.64 KB | 0644 |
expand.ts | File | 6.18 KB | 0644 |
filter.ts | File | 3.69 KB | 0644 |
finalize.ts | File | 1.3 KB | 0644 |
find.ts | File | 3.81 KB | 0644 |
findIndex.ts | File | 1.86 KB | 0644 |
first.ts | File | 3.36 KB | 0644 |
groupBy.ts | File | 9.95 KB | 0644 |
ignoreElements.ts | File | 1.49 KB | 0644 |
index.ts | File | 4.02 KB | 0644 |
isEmpty.ts | File | 2.67 KB | 0644 |
last.ts | File | 2.36 KB | 0644 |
map.ts | File | 3.04 KB | 0644 |
mapTo.ts | File | 1.9 KB | 0644 |
materialize.ts | File | 3.25 KB | 0644 |
max.ts | File | 1.54 KB | 0644 |
merge.ts | File | 3.59 KB | 0644 |
mergeAll.ts | File | 2.43 KB | 0644 |
mergeMap.ts | File | 6.28 KB | 0644 |
mergeMapTo.ts | File | 2.56 KB | 0644 |
mergeScan.ts | File | 4.65 KB | 0644 |
min.ts | File | 1.54 KB | 0644 |
multicast.ts | File | 3.46 KB | 0644 |
observeOn.ts | File | 5.17 KB | 0644 |
onErrorResumeNext.ts | File | 8.18 KB | 0644 |
pairwise.ts | File | 2.43 KB | 0644 |
partition.ts | File | 2.68 KB | 0644 |
pluck.ts | File | 3.07 KB | 0644 |
publish.ts | File | 2.51 KB | 0644 |
publishBehavior.ts | File | 589 B | 0644 |
publishLast.ts | File | 2.06 KB | 0644 |
publishReplay.ts | File | 1.47 KB | 0644 |
race.ts | File | 1.85 KB | 0644 |
reduce.ts | File | 3.71 KB | 0644 |
refCount.ts | File | 5 KB | 0644 |
repeat.ts | File | 3.03 KB | 0644 |
repeatWhen.ts | File | 4.15 KB | 0644 |
retry.ts | File | 2.74 KB | 0644 |
retryWhen.ts | File | 3.45 KB | 0644 |
sample.ts | File | 2.78 KB | 0644 |
sampleTime.ts | File | 3.18 KB | 0644 |
scan.ts | File | 4.13 KB | 0644 |
sequenceEqual.ts | File | 5.16 KB | 0644 |
share.ts | File | 1.02 KB | 0644 |
shareReplay.ts | File | 4.19 KB | 0644 |
single.ts | File | 3.79 KB | 0644 |
skip.ts | File | 1.27 KB | 0644 |
skipLast.ts | File | 2.81 KB | 0644 |
skipUntil.ts | File | 3.81 KB | 0644 |
skipWhile.ts | File | 1.95 KB | 0644 |
startWith.ts | File | 4.35 KB | 0644 |
subscribeOn.ts | File | 2.56 KB | 0644 |
switchAll.ts | File | 1.99 KB | 0644 |
switchMap.ts | File | 6.04 KB | 0644 |
switchMapTo.ts | File | 2.56 KB | 0644 |
take.ts | File | 2.74 KB | 0644 |
takeLast.ts | File | 3.44 KB | 0644 |
takeUntil.ts | File | 2.79 KB | 0644 |
takeWhile.ts | File | 3.87 KB | 0644 |
tap.ts | File | 5.33 KB | 0644 |
throttle.ts | File | 4.83 KB | 0644 |
throttleTime.ts | File | 5.83 KB | 0644 |
throwIfEmpty.ts | File | 2.19 KB | 0644 |
timeInterval.ts | File | 2.49 KB | 0644 |
timeout.ts | File | 4.07 KB | 0644 |
timeoutWith.ts | File | 6.13 KB | 0644 |
timestamp.ts | File | 1.67 KB | 0644 |
toArray.ts | File | 1.12 KB | 0644 |
window.ts | File | 3.72 KB | 0644 |
windowCount.ts | File | 4.82 KB | 0644 |
windowTime.ts | File | 9.9 KB | 0644 |
windowToggle.ts | File | 6.2 KB | 0644 |
windowWhen.ts | File | 4.57 KB | 0644 |
withLatestFrom.ts | File | 7.72 KB | 0644 |
zip.ts | File | 3.35 KB | 0644 |
zipAll.ts | File | 653 B | 0644 |