work.suroh.tk/node_modules/rx/dist/rx.lite.extras.js

807 lines
29 KiB
JavaScript

// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
;(function (factory) {
var objectTypes = {
'function': true,
'object': true
};
function checkGlobal(value) {
return (value && value.Object === Object) ? value : null;
}
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
var freeSelf = checkGlobal(objectTypes[typeof self] && self);
var freeWindow = checkGlobal(objectTypes[typeof window] && window);
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
// Because of build optimizers
if (typeof define === 'function' && define.amd) {
define(['./rx.lite'], function (Rx, exports) {
return factory(root, exports, Rx);
});
} else if (typeof module === 'object' && module && module.exports === freeExports) {
module.exports = factory(root, module.exports, require('rx-lite'));
} else {
root.Rx = factory(root, {}, root.Rx);
}
}.call(this, function (root, exp, Rx, undefined) {
// References
var Observable = Rx.Observable,
observableProto = Observable.prototype,
observableNever = Observable.never,
observableThrow = Observable['throw'],
AnonymousObservable = Rx.AnonymousObservable,
ObservableBase = Rx.ObservableBase,
AnonymousObserver = Rx.AnonymousObserver,
notificationCreateOnNext = Rx.Notification.createOnNext,
notificationCreateOnError = Rx.Notification.createOnError,
notificationCreateOnCompleted = Rx.Notification.createOnCompleted,
Observer = Rx.Observer,
observerCreate = Observer.create,
AbstractObserver = Rx.internals.AbstractObserver,
Subject = Rx.Subject,
internals = Rx.internals,
helpers = Rx.helpers,
ScheduledObserver = internals.ScheduledObserver,
SerialDisposable = Rx.SerialDisposable,
SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
CompositeDisposable = Rx.CompositeDisposable,
BinaryDisposable = Rx.BinaryDisposable,
RefCountDisposable = Rx.RefCountDisposable,
disposableEmpty = Rx.Disposable.empty,
immediateScheduler = Rx.Scheduler.immediate,
defaultKeySerializer = helpers.defaultKeySerializer,
addRef = Rx.internals.addRef,
identity = helpers.identity,
isPromise = helpers.isPromise,
isFunction = helpers.isFunction,
inherits = internals.inherits,
bindCallback = internals.bindCallback,
noop = helpers.noop,
isScheduler = Rx.Scheduler.isScheduler,
observableFromPromise = Observable.fromPromise,
ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError;
var errorObj = {e: {}};
function tryCatcherGen(tryCatchTarget) {
return function tryCatcher() {
try {
return tryCatchTarget.apply(this, arguments);
} catch (e) {
errorObj.e = e;
return errorObj;
}
};
}
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
return tryCatcherGen(fn);
};
function thrower(e) {
throw e;
}
function ScheduledDisposable(scheduler, disposable) {
this.scheduler = scheduler;
this.disposable = disposable;
this.isDisposed = false;
}
function scheduleItem(s, self) {
if (!self.isDisposed) {
self.isDisposed = true;
self.disposable.dispose();
}
}
ScheduledDisposable.prototype.dispose = function () {
this.scheduler.schedule(this, scheduleItem);
};
var CheckedObserver = (function (__super__) {
inherits(CheckedObserver, __super__);
function CheckedObserver(observer) {
__super__.call(this);
this._observer = observer;
this._state = 0; // 0 - idle, 1 - busy, 2 - done
}
var CheckedObserverPrototype = CheckedObserver.prototype;
CheckedObserverPrototype.onNext = function (value) {
this.checkAccess();
var res = tryCatch(this._observer.onNext).call(this._observer, value);
this._state = 0;
res === errorObj && thrower(res.e);
};
CheckedObserverPrototype.onError = function (err) {
this.checkAccess();
var res = tryCatch(this._observer.onError).call(this._observer, err);
this._state = 2;
res === errorObj && thrower(res.e);
};
CheckedObserverPrototype.onCompleted = function () {
this.checkAccess();
var res = tryCatch(this._observer.onCompleted).call(this._observer);
this._state = 2;
res === errorObj && thrower(res.e);
};
CheckedObserverPrototype.checkAccess = function () {
if (this._state === 1) { throw new Error('Re-entrancy detected'); }
if (this._state === 2) { throw new Error('Observer completed'); }
if (this._state === 0) { this._state = 1; }
};
return CheckedObserver;
}(Observer));
var ObserveOnObserver = (function (__super__) {
inherits(ObserveOnObserver, __super__);
function ObserveOnObserver(scheduler, observer, cancel) {
__super__.call(this, scheduler, observer);
this._cancel = cancel;
}
ObserveOnObserver.prototype.next = function (value) {
__super__.prototype.next.call(this, value);
this.ensureActive();
};
ObserveOnObserver.prototype.error = function (e) {
__super__.prototype.error.call(this, e);
this.ensureActive();
};
ObserveOnObserver.prototype.completed = function () {
__super__.prototype.completed.call(this);
this.ensureActive();
};
ObserveOnObserver.prototype.dispose = function () {
__super__.prototype.dispose.call(this);
this._cancel && this._cancel.dispose();
this._cancel = null;
};
return ObserveOnObserver;
})(ScheduledObserver);
/**
* Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
* If a violation is detected, an Error is thrown from the offending observer method call.
*
* @returns An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
*/
Observer.prototype.checked = function () { return new CheckedObserver(this); };
/**
* Schedules the invocation of observer methods on the given scheduler.
* @param {Scheduler} scheduler Scheduler to schedule observer messages on.
* @returns {Observer} Observer whose messages are scheduled on the given scheduler.
*/
Observer.notifyOn = function (scheduler) {
return new ObserveOnObserver(scheduler, this);
};
/**
* Creates an observer from a notification callback.
* @param {Function} handler Action that handles a notification.
* @returns The observer object that invokes the specified handler using a notification corresponding to each message it receives.
*/
Observer.fromNotifier = function (handler, thisArg) {
var handlerFunc = bindCallback(handler, thisArg, 1);
return new AnonymousObserver(function (x) {
return handlerFunc(notificationCreateOnNext(x));
}, function (e) {
return handlerFunc(notificationCreateOnError(e));
}, function () {
return handlerFunc(notificationCreateOnCompleted());
});
};
/**
* Creates a notification callback from an observer.
* @returns The action that forwards its input notification to the underlying observer.
*/
Observer.prototype.toNotifier = function () {
var observer = this;
return function (n) { return n.accept(observer); };
};
/**
* Hides the identity of an observer.
* @returns An observer that hides the identity of the specified observer.
*/
Observer.prototype.asObserver = function () {
var source = this;
return new AnonymousObserver(
function (x) { source.onNext(x); },
function (e) { source.onError(e); },
function () { source.onCompleted(); }
);
};
var ObserveOnObservable = (function (__super__) {
inherits(ObserveOnObservable, __super__);
function ObserveOnObservable(source, s) {
this.source = source;
this._s = s;
__super__.call(this);
}
ObserveOnObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new ObserveOnObserver(this._s, o));
};
return ObserveOnObservable;
}(ObservableBase));
/**
* Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
*
* This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
* that require to be run on a scheduler, use subscribeOn.
*
* @param {Scheduler} scheduler Scheduler to notify observers on.
* @returns {Observable} The source sequence whose observations happen on the specified scheduler.
*/
observableProto.observeOn = function (scheduler) {
return new ObserveOnObservable(this, scheduler);
};
var SubscribeOnObservable = (function (__super__) {
inherits(SubscribeOnObservable, __super__);
function SubscribeOnObservable(source, s) {
this.source = source;
this._s = s;
__super__.call(this);
}
function scheduleMethod(scheduler, state) {
var source = state[0], d = state[1], o = state[2];
d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(o)));
}
SubscribeOnObservable.prototype.subscribeCore = function (o) {
var m = new SingleAssignmentDisposable(), d = new SerialDisposable();
d.setDisposable(m);
m.setDisposable(this._s.schedule([this.source, d, o], scheduleMethod));
return d;
};
return SubscribeOnObservable;
}(ObservableBase));
/**
* Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
* see the remarks section for more information on the distinction between subscribeOn and observeOn.
* This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
* callbacks on a scheduler, use observeOn.
* @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on.
* @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
observableProto.subscribeOn = function (scheduler) {
return new SubscribeOnObservable(this, scheduler);
};
var GenerateObservable = (function (__super__) {
inherits(GenerateObservable, __super__);
function GenerateObservable(state, cndFn, itrFn, resFn, s) {
this._initialState = state;
this._cndFn = cndFn;
this._itrFn = itrFn;
this._resFn = resFn;
this._s = s;
__super__.call(this);
}
function scheduleRecursive(state, recurse) {
if (state.first) {
state.first = false;
} else {
state.newState = tryCatch(state.self._itrFn)(state.newState);
if (state.newState === errorObj) { return state.o.onError(state.newState.e); }
}
var hasResult = tryCatch(state.self._cndFn)(state.newState);
if (hasResult === errorObj) { return state.o.onError(hasResult.e); }
if (hasResult) {
var result = tryCatch(state.self._resFn)(state.newState);
if (result === errorObj) { return state.o.onError(result.e); }
state.o.onNext(result);
recurse(state);
} else {
state.o.onCompleted();
}
}
GenerateObservable.prototype.subscribeCore = function (o) {
var state = {
o: o,
self: this,
first: true,
newState: this._initialState
};
return this._s.scheduleRecursive(state, scheduleRecursive);
};
return GenerateObservable;
}(ObservableBase));
/**
* Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.
*
* @example
* var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; });
* var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }, Rx.Scheduler.timeout);
* @param {Mixed} initialState Initial state.
* @param {Function} condition Condition to terminate generation (upon returning false).
* @param {Function} iterate Iteration step function.
* @param {Function} resultSelector Selector function for results produced in the sequence.
* @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not provided, defaults to Scheduler.currentThread.
* @returns {Observable} The generated sequence.
*/
Observable.generate = function (initialState, condition, iterate, resultSelector, scheduler) {
isScheduler(scheduler) || (scheduler = currentThreadScheduler);
return new GenerateObservable(initialState, condition, iterate, resultSelector, scheduler);
};
var UsingObservable = (function (__super__) {
inherits(UsingObservable, __super__);
function UsingObservable(resFn, obsFn) {
this._resFn = resFn;
this._obsFn = obsFn;
__super__.call(this);
}
UsingObservable.prototype.subscribeCore = function (o) {
var disposable = disposableEmpty;
var resource = tryCatch(this._resFn)();
if (resource === errorObj) {
return new BinaryDisposable(observableThrow(resource.e).subscribe(o), disposable);
}
resource && (disposable = resource);
var source = tryCatch(this._obsFn)(resource);
if (source === errorObj) {
return new BinaryDisposable(observableThrow(source.e).subscribe(o), disposable);
}
return new BinaryDisposable(source.subscribe(o), disposable);
};
return UsingObservable;
}(ObservableBase));
/**
* Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
* @param {Function} resourceFactory Factory function to obtain a resource object.
* @param {Function} observableFactory Factory function to obtain an observable sequence that depends on the obtained resource.
* @returns {Observable} An observable sequence whose lifetime controls the lifetime of the dependent resource object.
*/
Observable.using = function (resourceFactory, observableFactory) {
return new UsingObservable(resourceFactory, observableFactory);
};
/**
* Propagates the observable sequence or Promise that reacts first.
* @param {Observable} rightSource Second observable sequence or Promise.
* @returns {Observable} {Observable} An observable sequence that surfaces either of the given sequences, whichever reacted first.
*/
observableProto.amb = function (rightSource) {
var leftSource = this;
return new AnonymousObservable(function (observer) {
var choice,
leftChoice = 'L', rightChoice = 'R',
leftSubscription = new SingleAssignmentDisposable(),
rightSubscription = new SingleAssignmentDisposable();
isPromise(rightSource) && (rightSource = observableFromPromise(rightSource));
function choiceL() {
if (!choice) {
choice = leftChoice;
rightSubscription.dispose();
}
}
function choiceR() {
if (!choice) {
choice = rightChoice;
leftSubscription.dispose();
}
}
var leftSubscribe = observerCreate(
function (left) {
choiceL();
choice === leftChoice && observer.onNext(left);
},
function (e) {
choiceL();
choice === leftChoice && observer.onError(e);
},
function () {
choiceL();
choice === leftChoice && observer.onCompleted();
}
);
var rightSubscribe = observerCreate(
function (right) {
choiceR();
choice === rightChoice && observer.onNext(right);
},
function (e) {
choiceR();
choice === rightChoice && observer.onError(e);
},
function () {
choiceR();
choice === rightChoice && observer.onCompleted();
}
);
leftSubscription.setDisposable(leftSource.subscribe(leftSubscribe));
rightSubscription.setDisposable(rightSource.subscribe(rightSubscribe));
return new BinaryDisposable(leftSubscription, rightSubscription);
});
};
function amb(p, c) { return p.amb(c); }
/**
* Propagates the observable sequence or Promise that reacts first.
* @returns {Observable} An observable sequence that surfaces any of the given sequences, whichever reacted first.
*/
Observable.amb = function () {
var acc = observableNever(), items;
if (Array.isArray(arguments[0])) {
items = arguments[0];
} else {
var len = arguments.length;
items = new Array(items);
for(var i = 0; i < len; i++) { items[i] = arguments[i]; }
}
for (var i = 0, len = items.length; i < len; i++) {
acc = amb(acc, items[i]);
}
return acc;
};
/**
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
* @param {Observable} second Second observable sequence used to produce results after the first sequence terminates.
* @returns {Observable} An observable sequence that concatenates the first and second sequence, even if the first sequence terminates exceptionally.
*/
observableProto.onErrorResumeNext = function (second) {
if (!second) { throw new Error('Second observable is required'); }
return onErrorResumeNext([this, second]);
};
var OnErrorResumeNextObservable = (function(__super__) {
inherits(OnErrorResumeNextObservable, __super__);
function OnErrorResumeNextObservable(sources) {
this.sources = sources;
__super__.call(this);
}
function scheduleMethod(state, recurse) {
if (state.pos < state.sources.length) {
var current = state.sources[state.pos++];
isPromise(current) && (current = observableFromPromise(current));
var d = new SingleAssignmentDisposable();
state.subscription.setDisposable(d);
d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
} else {
state.o.onCompleted();
}
}
OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
var subscription = new SerialDisposable(),
state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
return new BinaryDisposable(subscription, cancellable);
};
return OnErrorResumeNextObservable;
}(ObservableBase));
var OnErrorResumeNextObserver = (function(__super__) {
inherits(OnErrorResumeNextObserver, __super__);
function OnErrorResumeNextObserver(state, recurse) {
this._state = state;
this._recurse = recurse;
__super__.call(this);
}
OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
return OnErrorResumeNextObserver;
}(AbstractObserver));
/**
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
* @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
*/
var onErrorResumeNext = Observable.onErrorResumeNext = function () {
var sources = [];
if (Array.isArray(arguments[0])) {
sources = arguments[0];
} else {
var len = arguments.length;
sources = new Array(len);
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
}
return new OnErrorResumeNextObservable(sources);
};
function toArray(x) { return x.toArray(); }
function notEmpty(x) { return x.length > 0; }
/**
* Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
* @param {Number} count Length of each buffer.
* @param {Number} [skip] Number of elements to skip between creation of consecutive buffers. If not provided, defaults to the count.
* @returns {Observable} An observable sequence of buffers.
*/
observableProto.bufferWithCount = observableProto.bufferCount = function (count, skip) {
typeof skip !== 'number' && (skip = count);
return this.windowWithCount(count, skip)
.flatMap(toArray)
.filter(notEmpty);
};
/**
* Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
* @param {Number} count Length of each window.
* @param {Number} [skip] Number of elements to skip between creation of consecutive windows. If not specified, defaults to the count.
* @returns {Observable} An observable sequence of windows.
*/
observableProto.windowWithCount = observableProto.windowCount = function (count, skip) {
var source = this;
+count || (count = 0);
Math.abs(count) === Infinity && (count = 0);
if (count <= 0) { throw new ArgumentOutOfRangeError(); }
skip == null && (skip = count);
+skip || (skip = 0);
Math.abs(skip) === Infinity && (skip = 0);
if (skip <= 0) { throw new ArgumentOutOfRangeError(); }
return new AnonymousObservable(function (observer) {
var m = new SingleAssignmentDisposable(),
refCountDisposable = new RefCountDisposable(m),
n = 0,
q = [];
function createWindow () {
var s = new Subject();
q.push(s);
observer.onNext(addRef(s, refCountDisposable));
}
createWindow();
m.setDisposable(source.subscribe(
function (x) {
for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); }
var c = n - count + 1;
c >= 0 && c % skip === 0 && q.shift().onCompleted();
++n % skip === 0 && createWindow();
},
function (e) {
while (q.length > 0) { q.shift().onError(e); }
observer.onError(e);
},
function () {
while (q.length > 0) { q.shift().onCompleted(); }
observer.onCompleted();
}
));
return refCountDisposable;
}, source);
};
var TakeLastBufferObserver = (function (__super__) {
inherits(TakeLastBufferObserver, __super__);
function TakeLastBufferObserver(o, c) {
this._o = o;
this._c = c;
this._q = [];
__super__.call(this);
}
TakeLastBufferObserver.prototype.next = function (x) {
this._q.push(x);
this._q.length > this._c && this._q.shift();
};
TakeLastBufferObserver.prototype.error = function (e) {
this._o.onError(e);
};
TakeLastBufferObserver.prototype.completed = function () {
this._o.onNext(this._q);
this._o.onCompleted();
};
return TakeLastBufferObserver;
}(AbstractObserver));
/**
* Returns an array with the specified number of contiguous elements from the end of an observable sequence.
*
* @description
* This operator accumulates a buffer with a length enough to store count elements. Upon completion of the
* source sequence, this buffer is produced on the result sequence.
* @param {Number} count Number of elements to take from the end of the source sequence.
* @returns {Observable} An observable sequence containing a single array with the specified number of elements from the end of the source sequence.
*/
observableProto.takeLastBuffer = function (count) {
if (count < 0) { throw new ArgumentOutOfRangeError(); }
var source = this;
return new AnonymousObservable(function (o) {
return source.subscribe(new TakeLastBufferObserver(o, count));
}, source);
};
var DefaultIfEmptyObserver = (function (__super__) {
inherits(DefaultIfEmptyObserver, __super__);
function DefaultIfEmptyObserver(o, d) {
this._o = o;
this._d = d;
this._f = false;
__super__.call(this);
}
DefaultIfEmptyObserver.prototype.next = function (x) {
this._f = true;
this._o.onNext(x);
};
DefaultIfEmptyObserver.prototype.error = function (e) {
this._o.onError(e);
};
DefaultIfEmptyObserver.prototype.completed = function () {
!this._f && this._o.onNext(this._d);
this._o.onCompleted();
};
return DefaultIfEmptyObserver;
}(AbstractObserver));
/**
* Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty.
*
* var res = obs = xs.defaultIfEmpty();
* 2 - obs = xs.defaultIfEmpty(false);
*
* @memberOf Observable#
* @param defaultValue The value to return if the sequence is empty. If not provided, this defaults to null.
* @returns {Observable} An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself.
*/
observableProto.defaultIfEmpty = function (defaultValue) {
var source = this;
defaultValue === undefined && (defaultValue = null);
return new AnonymousObservable(function (o) {
return source.subscribe(new DefaultIfEmptyObserver(o, defaultValue));
}, source);
};
// Swap out for Array.findIndex
function arrayIndexOfComparer(array, item, comparer) {
for (var i = 0, len = array.length; i < len; i++) {
if (comparer(array[i], item)) { return i; }
}
return -1;
}
function HashSet(comparer) {
this.comparer = comparer;
this.set = [];
}
HashSet.prototype.push = function(value) {
var retValue = arrayIndexOfComparer(this.set, value, this.comparer) === -1;
retValue && this.set.push(value);
return retValue;
};
var DistinctObservable = (function (__super__) {
inherits(DistinctObservable, __super__);
function DistinctObservable(source, keyFn, cmpFn) {
this.source = source;
this._keyFn = keyFn;
this._cmpFn = cmpFn;
__super__.call(this);
}
DistinctObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new DistinctObserver(o, this._keyFn, this._cmpFn));
};
return DistinctObservable;
}(ObservableBase));
var DistinctObserver = (function (__super__) {
inherits(DistinctObserver, __super__);
function DistinctObserver(o, keyFn, cmpFn) {
this._o = o;
this._keyFn = keyFn;
this._h = new HashSet(cmpFn);
__super__.call(this);
}
DistinctObserver.prototype.next = function (x) {
var key = x;
if (isFunction(this._keyFn)) {
key = tryCatch(this._keyFn)(x);
if (key === errorObj) { return this._o.onError(key.e); }
}
this._h.push(key) && this._o.onNext(x);
};
DistinctObserver.prototype.error = function (e) { this._o.onError(e); };
DistinctObserver.prototype.completed = function () { this._o.onCompleted(); };
return DistinctObserver;
}(AbstractObserver));
/**
* Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer.
* Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.
*
* @example
* var res = obs = xs.distinct();
* 2 - obs = xs.distinct(function (x) { return x.id; });
* 2 - obs = xs.distinct(function (x) { return x.id; }, function (a,b) { return a === b; });
* @param {Function} [keySelector] A function to compute the comparison key for each element.
* @param {Function} [comparer] Used to compare items in the collection.
* @returns {Observable} An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.
*/
observableProto.distinct = function (keySelector, comparer) {
comparer || (comparer = defaultComparer);
return new DistinctObservable(this, keySelector, comparer);
};
/**
* Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
* can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`)
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source.
*/
observableProto.singleInstance = function() {
var source = this, hasObservable = false, observable;
function getObservable() {
if (!hasObservable) {
hasObservable = true;
observable = source['finally'](function() { hasObservable = false; }).publish().refCount();
}
return observable;
}
return new AnonymousObservable(function(o) {
return getObservable().subscribe(o);
});
};
return Rx;
}));