Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Job Queue #1

Open
freezy opened this issue Oct 22, 2015 · 7 comments
Open

Fix Job Queue #1

freezy opened this issue Oct 22, 2015 · 7 comments
Milestone

Comments

@freezy
Copy link
Member

freezy commented Oct 22, 2015

Found an excellent job queue from Andreas Köpf, but it looks like I've screwed up the transcoding for the latest version of Rx. @andreaskoepf if you could have a look a this Gist, that would be awesome!

@andreaskoepf
Copy link

Please have a look at JobQueue.cs. The soure there was the last version of the JobQueue that I actually used. As the new async features came up I went away from Rx and used Task<> most of the time, e.g. for asyn stream handling as in AsyncQueue.cs. Unfortunately I had no time to follow the latest .net developments because I am currently spending most of the time coding on linux...

@freezy
Copy link
Member Author

freezy commented Oct 23, 2015

JobQueue works perfectly, cheers! AsyncQueue I'm still figuring out what's it good for. ;)

Part of this project here is to bulk-download files from a server, so the queue's job is basically to throttle the concurrent number of downloads and give status feedback. JobQueue seemed to be the tool for the job, but now looking at AsyncQueue I'm not so sure anymore.

In any case, thanks so much for this!

@andreaskoepf
Copy link

Actually AsyncQueue was part of a larger development which was a replacement for AsyncEnumerable. We were not happy with the speed of the official MS AsyncEnumerable impl and needed proper async cancellation support (e.g. catching exceptions of all pending operations and knowing when all async operations actually finished .. something not possible with simple fire-and-forget Dispose() calls). Probably it is to big for you project, but if you are curious you could have a look at the Sequence.cs file. ISequence is a drop-in for IAsyncEnumerable ... and IIterator is a repclacement for IAsyncEnumerator.
The goal of the Sequence system was to write the same code for classic collections (IEnumerable) and reactive-sources (IObservable) ... e.g. simply write yourObservable.ToSequence() or yourList.ToSequence() .. if you scan through Sequence.cs you will find a lot of the classic enumerable operations as sync and async versions.

As a simple solution to add jobs and throttle downloads.. have you checked whether a Subject with Select(x => Observable.DeferAsync(...)) followed by Merge(concurrencyLimit) would be an option? A little bit more work than the download demo here which uses a fixed array as source. A general problem with merge is the fire-and-forget cancellation but maybe for downloads it is not an issue.

@freezy
Copy link
Member Author

freezy commented Oct 23, 2015

Interesting stuff! I've actually done a quick occurrences search of AsyncQueue before and ended up looking at the Sequences class, but it made less sense before reading your explanation. And I agree that this is probably overkill for my project.

What I need is:

  1. Add files to a download queue
  2. Get regular progress updates about downloaded bytes
  3. Be able to cancel a download
  4. Get notified when a download completes

All that while throttling the number of concurrent downloads. Downloads are directly streamed to disk, so no need to return any data besides status updates.

Looking at Paul's code, I assume I would end up doing something like this:

downloadJobs
    .ObserveOn(Scheduler.Default)
    .SelectMany(async x => {
        return await x.DownloadFile();
    })
    .Merge(4)
    .Subscribe(Console.WriteLine);

Where downloadJobs would be a Subject<DownloadJob> and DownloadJob would contain additional observables for handling progress and errors? Would that resolve the fire-and-forget problem you mentioned?

@andreaskoepf
Copy link

If you do not want to have more than 4 concurrent downloads you have to use the Select(x => Observable.FromAsync(..)) pattern, SelectMany will evaluate the seletcor as soon as elements arrive. I am not sure whether ObserverOn() is really necessary .. maybe not. And if you are dealing with large downloads you probably want to use the FromAsync() overload with cancellationToken ...
Regarding fire-and-forget Dispose(): This is a general problem of Rx - but if it is fine for you to know that the operations will eventually terminate this is ok. Regarding exceptions Rx has the concept of fail-early: This means that in the case of multiple operations failing you will normally only get the first exception... With the AsyncJobQueue I tried to pass on exceptions as in a materialized way ...

@freezy
Copy link
Member Author

freezy commented Oct 23, 2015

Alright. I'll take all that in and will hopefully come up with an adequate solution. Thanks again for your help, this is very appreciated!

@freezy
Copy link
Member Author

freezy commented Oct 23, 2015

So finally it seems to boil down to something like this:

var downloads = new Subject<Job>();
var queue = downloads
    .ObserveOn(Scheduler.Default)
    .Select(job => Observable.DeferAsync(async token => Observable.Return(await ProcessDownload(job, token))))
    .Merge(4)
    .Subscribe(job => {
        Console.WriteLine("Job {0} completed.", job);
    }, error => {
        Console.WriteLine("Error: {0}", error);
    });

// add a download
downloads.OnNext(new Job("http://foo"));

Where ProcessDownload() looks like this:

async Task<Job> ProcessDownload(Job job, CancellationToken token)
{
    var client = new WebClient();

    job.RegisterClient(client);         // setup progress event
    token.Register(client.CancelAsync); // setup cancelation

    await client.DownloadFileTaskAsync(new Uri(job.Url), "some-file.dat");

    return job;
}

And my Job object like this:

public class Job
{
    public readonly string Url;
    public IObservable<DownloadProgressChangedEventArgs> WhenDownloadProgresses => _progress;
    private readonly Subject<DownloadProgressChangedEventArgs> _progress = new Subject<DownloadProgressChangedEventArgs>();

    public Job(string url)
    {
        Url = url;
    }

    public void RegisterClient(WebClient client)
    {
        var progress = Observable.FromEvent<DownloadProgressChangedEventHandler, DownloadProgressChangedEventArgs>(
            h => client.DownloadProgressChanged += h,
            h => client.DownloadProgressChanged -= h
        );
        progress.Subscribe(_progress.OnNext);
    }
}

So job completion, error handling and abortion I can do in my "main" subscription, while progress gets handled in the Job object. Does that look reasonable? Line 4 of the first code block still doesn't look very Rxly to me, it seems like I'm converting back and forth between IObservable and Task for nothing...

@freezy freezy modified the milestone: MVP Nov 5, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants