591 lines
21 KiB
JavaScript
591 lines
21 KiB
JavaScript
|
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
|
||
|
|
||
|
;(function (factory) {
|
||
|
var objectTypes = {
|
||
|
'function': true,
|
||
|
'object': true
|
||
|
};
|
||
|
|
||
|
function checkGlobal(value) {
|
||
|
return (value && value.Object === Object) ? value : null;
|
||
|
}
|
||
|
|
||
|
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
|
||
|
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
|
||
|
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
|
||
|
var freeSelf = checkGlobal(objectTypes[typeof self] && self);
|
||
|
var freeWindow = checkGlobal(objectTypes[typeof window] && window);
|
||
|
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
|
||
|
var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
|
||
|
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
|
||
|
|
||
|
// Because of build optimizers
|
||
|
if (typeof define === 'function' && define.amd) {
|
||
|
define(['./rx'], function (Rx, exports) {
|
||
|
return factory(root, exports, Rx);
|
||
|
});
|
||
|
} else if (typeof module === 'object' && module && module.exports === freeExports) {
|
||
|
module.exports = factory(root, module.exports, require('./rx'));
|
||
|
} else {
|
||
|
root.Rx = factory(root, {}, root.Rx);
|
||
|
}
|
||
|
}.call(this, function (root, exp, Rx, undefined) {
|
||
|
|
||
|
// Aliases
|
||
|
var Observable = Rx.Observable,
|
||
|
observableProto = Observable.prototype,
|
||
|
ObservableBase = Rx.ObservableBase,
|
||
|
AbstractObserver = Rx.internals.AbstractObserver,
|
||
|
FlatMapObservable = Rx.FlatMapObservable,
|
||
|
observableConcat = Observable.concat,
|
||
|
observableDefer = Observable.defer,
|
||
|
observableEmpty = Observable.empty,
|
||
|
disposableEmpty = Rx.Disposable.empty,
|
||
|
CompositeDisposable = Rx.CompositeDisposable,
|
||
|
SerialDisposable = Rx.SerialDisposable,
|
||
|
SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
|
||
|
Enumerable = Rx.internals.Enumerable,
|
||
|
enumerableOf = Enumerable.of,
|
||
|
currentThreadScheduler = Rx.Scheduler.currentThread,
|
||
|
AsyncSubject = Rx.AsyncSubject,
|
||
|
Observer = Rx.Observer,
|
||
|
inherits = Rx.internals.inherits,
|
||
|
addProperties = Rx.internals.addProperties,
|
||
|
helpers = Rx.helpers,
|
||
|
noop = helpers.noop,
|
||
|
isPromise = helpers.isPromise,
|
||
|
isFunction = helpers.isFunction,
|
||
|
isIterable = Rx.helpers.isIterable,
|
||
|
isArrayLike = Rx.helpers.isArrayLike,
|
||
|
isScheduler = Rx.Scheduler.isScheduler,
|
||
|
observableFromPromise = Observable.fromPromise;
|
||
|
|
||
|
var errorObj = {e: {}};
|
||
|
|
||
|
function tryCatcherGen(tryCatchTarget) {
|
||
|
return function tryCatcher() {
|
||
|
try {
|
||
|
return tryCatchTarget.apply(this, arguments);
|
||
|
} catch (e) {
|
||
|
errorObj.e = e;
|
||
|
return errorObj;
|
||
|
}
|
||
|
};
|
||
|
}
|
||
|
|
||
|
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
|
||
|
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
|
||
|
return tryCatcherGen(fn);
|
||
|
};
|
||
|
|
||
|
function thrower(e) {
|
||
|
throw e;
|
||
|
}
|
||
|
|
||
|
// Shim in iterator support
|
||
|
var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) ||
|
||
|
'_es6shim_iterator_';
|
||
|
// Bug for mozilla version
|
||
|
if (root.Set && typeof new root.Set()['@@iterator'] === 'function') {
|
||
|
$iterator$ = '@@iterator';
|
||
|
}
|
||
|
|
||
|
var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined };
|
||
|
|
||
|
var isIterable = Rx.helpers.isIterable = function (o) {
|
||
|
return o && o[$iterator$] !== undefined;
|
||
|
};
|
||
|
|
||
|
var isArrayLike = Rx.helpers.isArrayLike = function (o) {
|
||
|
return o && o.length !== undefined;
|
||
|
};
|
||
|
|
||
|
Rx.helpers.iterator = $iterator$;
|
||
|
|
||
|
var WhileEnumerable = (function(__super__) {
|
||
|
inherits(WhileEnumerable, __super__);
|
||
|
function WhileEnumerable(c, s) {
|
||
|
this.c = c;
|
||
|
this.s = s;
|
||
|
}
|
||
|
WhileEnumerable.prototype[$iterator$] = function () {
|
||
|
var self = this;
|
||
|
return {
|
||
|
next: function () {
|
||
|
return self.c() ?
|
||
|
{ done: false, value: self.s } :
|
||
|
{ done: true, value: void 0 };
|
||
|
}
|
||
|
};
|
||
|
};
|
||
|
return WhileEnumerable;
|
||
|
}(Enumerable));
|
||
|
|
||
|
function enumerableWhile(condition, source) {
|
||
|
return new WhileEnumerable(condition, source);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions.
|
||
|
* This operator allows for a fluent style of writing queries that use the same sequence multiple times.
|
||
|
*
|
||
|
* @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence.
|
||
|
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
|
||
|
*/
|
||
|
observableProto.letBind = observableProto['let'] = function (func) {
|
||
|
return func(this);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Determines whether an observable collection contains values.
|
||
|
*
|
||
|
* @example
|
||
|
* 1 - res = Rx.Observable.if(condition, obs1);
|
||
|
* 2 - res = Rx.Observable.if(condition, obs1, obs2);
|
||
|
* 3 - res = Rx.Observable.if(condition, obs1, scheduler);
|
||
|
* @param {Function} condition The condition which determines if the thenSource or elseSource will be run.
|
||
|
* @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true.
|
||
|
* @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler.
|
||
|
* @returns {Observable} An observable sequence which is either the thenSource or elseSource.
|
||
|
*/
|
||
|
Observable['if'] = function (condition, thenSource, elseSourceOrScheduler) {
|
||
|
return observableDefer(function () {
|
||
|
elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty());
|
||
|
|
||
|
isPromise(thenSource) && (thenSource = observableFromPromise(thenSource));
|
||
|
isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler));
|
||
|
|
||
|
// Assume a scheduler for empty only
|
||
|
typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler));
|
||
|
return condition() ? thenSource : elseSourceOrScheduler;
|
||
|
});
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Concatenates the observable sequences obtained by running the specified result selector for each element in source.
|
||
|
* There is an alias for this method called 'forIn' for browsers <IE9
|
||
|
* @param {Array} sources An array of values to turn into an observable sequence.
|
||
|
* @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence.
|
||
|
* @returns {Observable} An observable sequence from the concatenated observable sequences.
|
||
|
*/
|
||
|
Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) {
|
||
|
return enumerableOf(sources, resultSelector, thisArg).concat();
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Repeats source as long as condition holds emulating a while loop.
|
||
|
* There is an alias for this method called 'whileDo' for browsers <IE9
|
||
|
*
|
||
|
* @param {Function} condition The condition which determines if the source will be repeated.
|
||
|
* @param {Observable} source The observable sequence that will be run if the condition function returns true.
|
||
|
* @returns {Observable} An observable sequence which is repeated as long as the condition holds.
|
||
|
*/
|
||
|
var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) {
|
||
|
isPromise(source) && (source = observableFromPromise(source));
|
||
|
return enumerableWhile(condition, source).concat();
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Repeats source as long as condition holds emulating a do while loop.
|
||
|
*
|
||
|
* @param {Function} condition The condition which determines if the source will be repeated.
|
||
|
* @param {Observable} source The observable sequence that will be run if the condition function returns true.
|
||
|
* @returns {Observable} An observable sequence which is repeated as long as the condition holds.
|
||
|
*/
|
||
|
observableProto.doWhile = function (condition) {
|
||
|
return observableConcat([this, observableWhileDo(condition, this)]);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Uses selector to determine which source in sources to use.
|
||
|
* @param {Function} selector The function which extracts the value for to test in a case statement.
|
||
|
* @param {Array} sources A object which has keys which correspond to the case statement labels.
|
||
|
* @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler.
|
||
|
*
|
||
|
* @returns {Observable} An observable sequence which is determined by a case statement.
|
||
|
*/
|
||
|
Observable['case'] = function (selector, sources, defaultSourceOrScheduler) {
|
||
|
return observableDefer(function () {
|
||
|
isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler));
|
||
|
defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty());
|
||
|
|
||
|
isScheduler(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler));
|
||
|
|
||
|
var result = sources[selector()];
|
||
|
isPromise(result) && (result = observableFromPromise(result));
|
||
|
|
||
|
return result || defaultSourceOrScheduler;
|
||
|
});
|
||
|
};
|
||
|
|
||
|
var ExpandObservable = (function(__super__) {
|
||
|
inherits(ExpandObservable, __super__);
|
||
|
function ExpandObservable(source, fn, scheduler) {
|
||
|
this.source = source;
|
||
|
this._fn = fn;
|
||
|
this._scheduler = scheduler;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
function scheduleRecursive(args, recurse) {
|
||
|
var state = args[0], self = args[1];
|
||
|
var work;
|
||
|
if (state.q.length > 0) {
|
||
|
work = state.q.shift();
|
||
|
} else {
|
||
|
state.isAcquired = false;
|
||
|
return;
|
||
|
}
|
||
|
var m1 = new SingleAssignmentDisposable();
|
||
|
state.d.add(m1);
|
||
|
m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1)));
|
||
|
recurse([state, self]);
|
||
|
}
|
||
|
|
||
|
ExpandObservable.prototype._ensureActive = function (state) {
|
||
|
var isOwner = false;
|
||
|
if (state.q.length > 0) {
|
||
|
isOwner = !state.isAcquired;
|
||
|
state.isAcquired = true;
|
||
|
}
|
||
|
isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive));
|
||
|
};
|
||
|
|
||
|
ExpandObservable.prototype.subscribeCore = function (o) {
|
||
|
var m = new SerialDisposable(),
|
||
|
d = new CompositeDisposable(m),
|
||
|
state = {
|
||
|
q: [],
|
||
|
m: m,
|
||
|
d: d,
|
||
|
activeCount: 0,
|
||
|
isAcquired: false,
|
||
|
o: o
|
||
|
};
|
||
|
|
||
|
state.q.push(this.source);
|
||
|
state.activeCount++;
|
||
|
this._ensureActive(state);
|
||
|
return d;
|
||
|
};
|
||
|
|
||
|
return ExpandObservable;
|
||
|
}(ObservableBase));
|
||
|
|
||
|
var ExpandObserver = (function(__super__) {
|
||
|
inherits(ExpandObserver, __super__);
|
||
|
function ExpandObserver(state, parent, m1) {
|
||
|
this._s = state;
|
||
|
this._p = parent;
|
||
|
this._m1 = m1;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
ExpandObserver.prototype.next = function (x) {
|
||
|
this._s.o.onNext(x);
|
||
|
var result = tryCatch(this._p._fn)(x);
|
||
|
if (result === errorObj) { return this._s.o.onError(result.e); }
|
||
|
this._s.q.push(result);
|
||
|
this._s.activeCount++;
|
||
|
this._p._ensureActive(this._s);
|
||
|
};
|
||
|
|
||
|
ExpandObserver.prototype.error = function (e) {
|
||
|
this._s.o.onError(e);
|
||
|
};
|
||
|
|
||
|
ExpandObserver.prototype.completed = function () {
|
||
|
this._s.d.remove(this._m1);
|
||
|
this._s.activeCount--;
|
||
|
this._s.activeCount === 0 && this._s.o.onCompleted();
|
||
|
};
|
||
|
|
||
|
return ExpandObserver;
|
||
|
}(AbstractObserver));
|
||
|
|
||
|
/**
|
||
|
* Expands an observable sequence by recursively invoking selector.
|
||
|
*
|
||
|
* @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
|
||
|
* @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
|
||
|
* @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
|
||
|
*/
|
||
|
observableProto.expand = function (selector, scheduler) {
|
||
|
isScheduler(scheduler) || (scheduler = currentThreadScheduler);
|
||
|
return new ExpandObservable(this, selector, scheduler);
|
||
|
};
|
||
|
|
||
|
function argumentsToArray() {
|
||
|
var len = arguments.length, args = new Array(len);
|
||
|
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
|
||
|
return args;
|
||
|
}
|
||
|
|
||
|
var ForkJoinObservable = (function (__super__) {
|
||
|
inherits(ForkJoinObservable, __super__);
|
||
|
function ForkJoinObservable(sources, cb) {
|
||
|
this._sources = sources;
|
||
|
this._cb = cb;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
ForkJoinObservable.prototype.subscribeCore = function (o) {
|
||
|
if (this._sources.length === 0) {
|
||
|
o.onCompleted();
|
||
|
return disposableEmpty;
|
||
|
}
|
||
|
|
||
|
var count = this._sources.length;
|
||
|
var state = {
|
||
|
finished: false,
|
||
|
hasResults: new Array(count),
|
||
|
hasCompleted: new Array(count),
|
||
|
results: new Array(count)
|
||
|
};
|
||
|
|
||
|
var subscriptions = new CompositeDisposable();
|
||
|
for (var i = 0, len = this._sources.length; i < len; i++) {
|
||
|
var source = this._sources[i];
|
||
|
isPromise(source) && (source = observableFromPromise(source));
|
||
|
subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions)));
|
||
|
}
|
||
|
|
||
|
return subscriptions;
|
||
|
};
|
||
|
|
||
|
return ForkJoinObservable;
|
||
|
}(ObservableBase));
|
||
|
|
||
|
var ForkJoinObserver = (function(__super__) {
|
||
|
inherits(ForkJoinObserver, __super__);
|
||
|
function ForkJoinObserver(o, s, i, cb, subs) {
|
||
|
this._o = o;
|
||
|
this._s = s;
|
||
|
this._i = i;
|
||
|
this._cb = cb;
|
||
|
this._subs = subs;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
ForkJoinObserver.prototype.next = function (x) {
|
||
|
if (!this._s.finished) {
|
||
|
this._s.hasResults[this._i] = true;
|
||
|
this._s.results[this._i] = x;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
ForkJoinObserver.prototype.error = function (e) {
|
||
|
this._s.finished = true;
|
||
|
this._o.onError(e);
|
||
|
this._subs.dispose();
|
||
|
};
|
||
|
|
||
|
ForkJoinObserver.prototype.completed = function () {
|
||
|
if (!this._s.finished) {
|
||
|
if (!this._s.hasResults[this._i]) {
|
||
|
return this._o.onCompleted();
|
||
|
}
|
||
|
this._s.hasCompleted[this._i] = true;
|
||
|
for (var i = 0; i < this._s.results.length; i++) {
|
||
|
if (!this._s.hasCompleted[i]) { return; }
|
||
|
}
|
||
|
this._s.finished = true;
|
||
|
|
||
|
var res = tryCatch(this._cb).apply(null, this._s.results);
|
||
|
if (res === errorObj) { return this._o.onError(res.e); }
|
||
|
|
||
|
this._o.onNext(res);
|
||
|
this._o.onCompleted();
|
||
|
}
|
||
|
};
|
||
|
|
||
|
return ForkJoinObserver;
|
||
|
}(AbstractObserver));
|
||
|
|
||
|
/**
|
||
|
* Runs all observable sequences in parallel and collect their last elements.
|
||
|
*
|
||
|
* @example
|
||
|
* 1 - res = Rx.Observable.forkJoin([obs1, obs2]);
|
||
|
* 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...);
|
||
|
* @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences.
|
||
|
*/
|
||
|
Observable.forkJoin = function () {
|
||
|
var len = arguments.length, args = new Array(len);
|
||
|
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
|
||
|
var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
|
||
|
Array.isArray(args[0]) && (args = args[0]);
|
||
|
return new ForkJoinObservable(args, resultSelector);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Runs two observable sequences in parallel and combines their last elemenets.
|
||
|
* @param {Observable} second Second observable sequence.
|
||
|
* @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences.
|
||
|
* @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences.
|
||
|
*/
|
||
|
observableProto.forkJoin = function () {
|
||
|
var len = arguments.length, args = new Array(len);
|
||
|
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
|
||
|
if (Array.isArray(args[0])) {
|
||
|
args[0].unshift(this);
|
||
|
} else {
|
||
|
args.unshift(this);
|
||
|
}
|
||
|
return Observable.forkJoin.apply(null, args);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Comonadic bind operator.
|
||
|
* @param {Function} selector A transform function to apply to each element.
|
||
|
* @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler.
|
||
|
* @returns {Observable} An observable sequence which results from the comonadic bind operation.
|
||
|
*/
|
||
|
observableProto.manySelect = observableProto.extend = function (selector, scheduler) {
|
||
|
isScheduler(scheduler) || (scheduler = Rx.Scheduler.immediate);
|
||
|
var source = this;
|
||
|
return observableDefer(function () {
|
||
|
var chain;
|
||
|
|
||
|
return source
|
||
|
.map(function (x) {
|
||
|
var curr = new ChainObservable(x);
|
||
|
|
||
|
chain && chain.onNext(x);
|
||
|
chain = curr;
|
||
|
|
||
|
return curr;
|
||
|
})
|
||
|
.tap(
|
||
|
noop,
|
||
|
function (e) { chain && chain.onError(e); },
|
||
|
function () { chain && chain.onCompleted(); }
|
||
|
)
|
||
|
.observeOn(scheduler)
|
||
|
.map(selector);
|
||
|
}, source);
|
||
|
};
|
||
|
|
||
|
var ChainObservable = (function (__super__) {
|
||
|
inherits(ChainObservable, __super__);
|
||
|
function ChainObservable(head) {
|
||
|
__super__.call(this);
|
||
|
this.head = head;
|
||
|
this.tail = new AsyncSubject();
|
||
|
}
|
||
|
|
||
|
addProperties(ChainObservable.prototype, Observer, {
|
||
|
_subscribe: function (o) {
|
||
|
var g = new CompositeDisposable();
|
||
|
g.add(currentThreadScheduler.schedule(this, function (_, self) {
|
||
|
o.onNext(self.head);
|
||
|
g.add(self.tail.mergeAll().subscribe(o));
|
||
|
}));
|
||
|
|
||
|
return g;
|
||
|
},
|
||
|
onCompleted: function () {
|
||
|
this.onNext(Observable.empty());
|
||
|
},
|
||
|
onError: function (e) {
|
||
|
this.onNext(Observable['throw'](e));
|
||
|
},
|
||
|
onNext: function (v) {
|
||
|
this.tail.onNext(v);
|
||
|
this.tail.onCompleted();
|
||
|
}
|
||
|
});
|
||
|
|
||
|
return ChainObservable;
|
||
|
|
||
|
}(Observable));
|
||
|
|
||
|
var SwitchFirstObservable = (function (__super__) {
|
||
|
inherits(SwitchFirstObservable, __super__);
|
||
|
function SwitchFirstObservable(source) {
|
||
|
this.source = source;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
SwitchFirstObservable.prototype.subscribeCore = function (o) {
|
||
|
var m = new SingleAssignmentDisposable(),
|
||
|
g = new CompositeDisposable(),
|
||
|
state = {
|
||
|
hasCurrent: false,
|
||
|
isStopped: false,
|
||
|
o: o,
|
||
|
g: g
|
||
|
};
|
||
|
|
||
|
g.add(m);
|
||
|
m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state)));
|
||
|
return g;
|
||
|
};
|
||
|
|
||
|
return SwitchFirstObservable;
|
||
|
}(ObservableBase));
|
||
|
|
||
|
var SwitchFirstObserver = (function(__super__) {
|
||
|
inherits(SwitchFirstObserver, __super__);
|
||
|
function SwitchFirstObserver(state) {
|
||
|
this._s = state;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
SwitchFirstObserver.prototype.next = function (x) {
|
||
|
if (!this._s.hasCurrent) {
|
||
|
this._s.hasCurrent = true;
|
||
|
isPromise(x) && (x = observableFromPromise(x));
|
||
|
var inner = new SingleAssignmentDisposable();
|
||
|
this._s.g.add(inner);
|
||
|
inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner)));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
SwitchFirstObserver.prototype.error = function (e) {
|
||
|
this._s.o.onError(e);
|
||
|
};
|
||
|
|
||
|
SwitchFirstObserver.prototype.completed = function () {
|
||
|
this._s.isStopped = true;
|
||
|
!this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted();
|
||
|
};
|
||
|
|
||
|
inherits(InnerObserver, __super__);
|
||
|
function InnerObserver(state, inner) {
|
||
|
this._s = state;
|
||
|
this._i = inner;
|
||
|
__super__.call(this);
|
||
|
}
|
||
|
|
||
|
InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); };
|
||
|
InnerObserver.prototype.error = function (e) { this._s.o.onError(e); };
|
||
|
InnerObserver.prototype.completed = function () {
|
||
|
this._s.g.remove(this._i);
|
||
|
this._s.hasCurrent = false;
|
||
|
this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted();
|
||
|
};
|
||
|
|
||
|
return SwitchFirstObserver;
|
||
|
}(AbstractObserver));
|
||
|
|
||
|
/**
|
||
|
* Performs a exclusive waiting for the first to finish before subscribing to another observable.
|
||
|
* Observables that come in between subscriptions will be dropped on the floor.
|
||
|
* @returns {Observable} A exclusive observable with only the results that happen when subscribed.
|
||
|
*/
|
||
|
observableProto.switchFirst = function () {
|
||
|
return new SwitchFirstObservable(this);
|
||
|
};
|
||
|
|
||
|
observableProto.flatMapFirst = observableProto.exhaustMap = function(selector, resultSelector, thisArg) {
|
||
|
return new FlatMapObservable(this, selector, resultSelector, thisArg).switchFirst();
|
||
|
};
|
||
|
|
||
|
observableProto.flatMapWithMaxConcurrent = observableProto.flatMapMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
|
||
|
return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit);
|
||
|
};
|
||
|
|
||
|
return Rx;
|
||
|
}));
|