创建一个IObservable并立即返回缓存的异步操作的结果

[英]Create an IObservable and return the result of a cached async operation immediately


I'm using reactive extensions to call a async method and I want to cache the result and return this for subsequent calls to the method.

我正在使用反应式扩展来调用异步方法,我想缓存结果并将其返回以便后续调用该方法。

How can I create an Observable instance, return it and provided the data (cacheResult) the subscribe requires?

如何创建Observable实例,返回它并提供订阅所需的数据(cacheResult)?

public IObservable<Bar> GetBars(int pageIndex, int pageSize)
{
   var @params = new object[] { pageIndex, pageSize };
   var cachedResult = _cache.Get(@params);
   if (cachedResult != null)
   {
 // How do I create a Observable instance and return the 'cacheResult'...
 return ...
   }

   var observable = new BaseObservable<Bar>();
   _components.WithSsoToken(_configuration.SsoToken)
      .Get(@params)
      .Select(Map)
      .Subscribe(c =>
                     {
                          _cache.Add(@params, c);
                          observable.Publish(c);
                          observable.Completed();
                     }, exception =>
                     {
                        observable.Failed(exception);
                        observable.Completed();
                     });

       return observable;
}

2 个解决方案

#1


3  

I believe you are looking for Observable.Return:

我相信你正在寻找Observable.Return:

return Observable.Return((Bar)cachedResult);

On an unrelated note:

在一个不相关的说明:

  • There's no need to return a BaseObservable<T>. You should return a Subject<T> as it does what your implementation is doing but is thread safe (you should also call .AsObservable() on the return value to it can't be cast back).
  • 无需返回BaseObservable 。你应该返回一个Subject ,因为它执行你的实现正在做的但是线程安全的(你也应该调用.AsObservable()对它的返回值不能被强制转换)。
  • You use Do to add the value to the cache:
  • 您使用Do将值添加到缓存:

var observable = new Subject<Bar>();
_components.WithSsoToken(_configuration.SsoToken)
    .Get(@params)
    .Select(Map)
    .Subscribe(c =>
    {
        _cache.Add(@params, c);
        observable.OnNext(c);
        observable.OnCompleted();
    }, exception =>
    {
        observable.OnError(exception);
    });

return observable.AsObservable();

#2


2  

Conveniently, I've written a class that does this pattern for you, check it out:

方便的是,我写了一个为你做这个模式的课,检查出来:

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ObservableAsyncMRUCache.cs

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ObservableAsyncMRUCache.cs

var cache = new ObservableAsyncMRUCache<int, int>(
    x => Observable.Return(x*10).Delay(1000) /* Return an IObservable here */, 
    100 /*items to cache*/,
    5 /* max in-flight, important for web calls */
    );

IObservable<int> futureResult = cache.AsyncGet(10);

futureResult.Subscribe(Console.WriteLine);
>>> 100

Some tricky things that it handles correctly:

它正确处理的一些棘手的事情:

  • It caches the last n items and throws away items that aren't being used
  • 它缓存最后n个项目并丢弃未使用的项目
  • It ensures that no more than n items are running at the same time - if you don't do this, you can easily spawn out thousands of web calls if the cache is empty
  • 它确保不会同时运行n个项目 - 如果不这样做,如果缓存为空,则可以轻松生成数千个Web调用
  • If you ask for the same item twice in a row, the first request will initiate a request, and the second call will wait on the first one instead of spawning an identical request, so you won't end up querying data redundantly.
  • 如果您连续两次请求相同的项目,则第一个请求将启动请求,第二个请求将在第一个请求上等待而不是生成相同的请求,因此您不会以冗余方式查询数据。
智能推荐

注意!

本站翻译的文章,版权归属于本站,未经许可禁止转摘,转摘请注明本文地址:http://www.silva-art.net/blog/2010/12/08/9ad200e0a8839f149e4afb9c8acf81b5.html



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告