-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Job Queue #1
Comments
Please have a look at JobQueue.cs. The soure there was the last version of the JobQueue that I actually used. As the new async features came up I went away from Rx and used Task<> most of the time, e.g. for asyn stream handling as in AsyncQueue.cs. Unfortunately I had no time to follow the latest .net developments because I am currently spending most of the time coding on linux... |
Part of this project here is to bulk-download files from a server, so the queue's job is basically to throttle the concurrent number of downloads and give status feedback. In any case, thanks so much for this! |
Actually AsyncQueue was part of a larger development which was a replacement for AsyncEnumerable. We were not happy with the speed of the official MS AsyncEnumerable impl and needed proper async cancellation support (e.g. catching exceptions of all pending operations and knowing when all async operations actually finished .. something not possible with simple fire-and-forget Dispose() calls). Probably it is to big for you project, but if you are curious you could have a look at the Sequence.cs file. ISequence is a drop-in for IAsyncEnumerable ... and IIterator is a repclacement for IAsyncEnumerator. As a simple solution to add jobs and throttle downloads.. have you checked whether a Subject with Select(x => Observable.DeferAsync(...)) followed by Merge(concurrencyLimit) would be an option? A little bit more work than the download demo here which uses a fixed array as source. A general problem with merge is the fire-and-forget cancellation but maybe for downloads it is not an issue. |
Interesting stuff! I've actually done a quick occurrences search of What I need is:
All that while throttling the number of concurrent downloads. Downloads are directly streamed to disk, so no need to return any data besides status updates. Looking at Paul's code, I assume I would end up doing something like this: downloadJobs
.ObserveOn(Scheduler.Default)
.SelectMany(async x => {
return await x.DownloadFile();
})
.Merge(4)
.Subscribe(Console.WriteLine); Where |
If you do not want to have more than 4 concurrent downloads you have to use the Select(x => Observable.FromAsync(..)) pattern, SelectMany will evaluate the seletcor as soon as elements arrive. I am not sure whether ObserverOn() is really necessary .. maybe not. And if you are dealing with large downloads you probably want to use the FromAsync() overload with cancellationToken ... |
Alright. I'll take all that in and will hopefully come up with an adequate solution. Thanks again for your help, this is very appreciated! |
So finally it seems to boil down to something like this: var downloads = new Subject<Job>();
var queue = downloads
.ObserveOn(Scheduler.Default)
.Select(job => Observable.DeferAsync(async token => Observable.Return(await ProcessDownload(job, token))))
.Merge(4)
.Subscribe(job => {
Console.WriteLine("Job {0} completed.", job);
}, error => {
Console.WriteLine("Error: {0}", error);
});
// add a download
downloads.OnNext(new Job("http://foo")); Where async Task<Job> ProcessDownload(Job job, CancellationToken token)
{
var client = new WebClient();
job.RegisterClient(client); // setup progress event
token.Register(client.CancelAsync); // setup cancelation
await client.DownloadFileTaskAsync(new Uri(job.Url), "some-file.dat");
return job;
} And my public class Job
{
public readonly string Url;
public IObservable<DownloadProgressChangedEventArgs> WhenDownloadProgresses => _progress;
private readonly Subject<DownloadProgressChangedEventArgs> _progress = new Subject<DownloadProgressChangedEventArgs>();
public Job(string url)
{
Url = url;
}
public void RegisterClient(WebClient client)
{
var progress = Observable.FromEvent<DownloadProgressChangedEventHandler, DownloadProgressChangedEventArgs>(
h => client.DownloadProgressChanged += h,
h => client.DownloadProgressChanged -= h
);
progress.Subscribe(_progress.OnNext);
}
} So job completion, error handling and abortion I can do in my "main" subscription, while progress gets handled in the |
Found an excellent job queue from Andreas Köpf, but it looks like I've screwed up the transcoding for the latest version of Rx. @andreaskoepf if you could have a look a this Gist, that would be awesome!
The text was updated successfully, but these errors were encountered: