Scheduler built with observables

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
4
down vote

favorite












I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.




Follow-up



There is a newer version of the Scheduler.




The Scheduler class is really simple. It's backed by a newer implementation of my old CronExpression.



There are currently only these two methods. The factory method Create creates a new scheduler that ticks at the specified intervals and provides schedules as DateTimes to the observers.



Jobs are scheduled with the Schedule extension. This one requires a cron-expression and the action to execute.



public static class Scheduler

public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());


public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);





The DateTime abstraction is supported by the IDateTime interface:



public interface IDateTime

DateTime Now();



which is implemented as



public class LocalDateTime : IDateTime

public DateTime Now() => DateTime.Now;



or



public class UtcDateTime : IDateTime

public DateTime Now() => DateTime.UtcNow;




Example



In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:



var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());

scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);

scheduler.Schedule("0/5 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule");
);


The output is:





DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]



There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?







share|improve this question

















  • 1




    .Interval(TimeSpan.FromSeconds(1)) you should be using the parameter interval
    – Xiaoy312
    Apr 13 at 21:53
















up vote
4
down vote

favorite












I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.




Follow-up



There is a newer version of the Scheduler.




The Scheduler class is really simple. It's backed by a newer implementation of my old CronExpression.



There are currently only these two methods. The factory method Create creates a new scheduler that ticks at the specified intervals and provides schedules as DateTimes to the observers.



Jobs are scheduled with the Schedule extension. This one requires a cron-expression and the action to execute.



public static class Scheduler

public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());


public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);





The DateTime abstraction is supported by the IDateTime interface:



public interface IDateTime

DateTime Now();



which is implemented as



public class LocalDateTime : IDateTime

public DateTime Now() => DateTime.Now;



or



public class UtcDateTime : IDateTime

public DateTime Now() => DateTime.UtcNow;




Example



In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:



var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());

scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);

scheduler.Schedule("0/5 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule");
);


The output is:





DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]



There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?







share|improve this question

















  • 1




    .Interval(TimeSpan.FromSeconds(1)) you should be using the parameter interval
    – Xiaoy312
    Apr 13 at 21:53












up vote
4
down vote

favorite









up vote
4
down vote

favorite











I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.




Follow-up



There is a newer version of the Scheduler.




The Scheduler class is really simple. It's backed by a newer implementation of my old CronExpression.



There are currently only these two methods. The factory method Create creates a new scheduler that ticks at the specified intervals and provides schedules as DateTimes to the observers.



Jobs are scheduled with the Schedule extension. This one requires a cron-expression and the action to execute.



public static class Scheduler

public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());


public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);





The DateTime abstraction is supported by the IDateTime interface:



public interface IDateTime

DateTime Now();



which is implemented as



public class LocalDateTime : IDateTime

public DateTime Now() => DateTime.Now;



or



public class UtcDateTime : IDateTime

public DateTime Now() => DateTime.UtcNow;




Example



In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:



var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());

scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);

scheduler.Schedule("0/5 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule");
);


The output is:





DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]



There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?







share|improve this question













I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.




Follow-up



There is a newer version of the Scheduler.




The Scheduler class is really simple. It's backed by a newer implementation of my old CronExpression.



There are currently only these two methods. The factory method Create creates a new scheduler that ticks at the specified intervals and provides schedules as DateTimes to the observers.



Jobs are scheduled with the Schedule extension. This one requires a cron-expression and the action to execute.



public static class Scheduler

public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());


public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);





The DateTime abstraction is supported by the IDateTime interface:



public interface IDateTime

DateTime Now();



which is implemented as



public class LocalDateTime : IDateTime

public DateTime Now() => DateTime.Now;



or



public class UtcDateTime : IDateTime

public DateTime Now() => DateTime.UtcNow;




Example



In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:



var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());

scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);

scheduler.Schedule("0/5 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule");
);


The output is:





DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]



There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?









share|improve this question












share|improve this question




share|improve this question








edited Apr 15 at 9:32
























asked Apr 13 at 20:43









t3chb0t

32k54195




32k54195







  • 1




    .Interval(TimeSpan.FromSeconds(1)) you should be using the parameter interval
    – Xiaoy312
    Apr 13 at 21:53












  • 1




    .Interval(TimeSpan.FromSeconds(1)) you should be using the parameter interval
    – Xiaoy312
    Apr 13 at 21:53







1




1




.Interval(TimeSpan.FromSeconds(1)) you should be using the parameter interval
– Xiaoy312
Apr 13 at 21:53




.Interval(TimeSpan.FromSeconds(1)) you should be using the parameter interval
– Xiaoy312
Apr 13 at 21:53










2 Answers
2






active

oldest

votes

















up vote
3
down vote



accepted











public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());




You forgot to plug interval parameter. And, I would consider renaming it resolution.



I wouldn't trust the values from selecting DatetTime::Now or DateTime::UtcNow:



// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);



ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001



Depending on CronExpression::Contains implementation, the scheduler could be skipping task when DateTime.Now.Millisecond gets near 0 or 999.



I've attempted to solve that by adding index * interval to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:



// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)

var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;

return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));



An better solution would be to provide a range of time, and check if expression fall between the range:



public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);


public class TimeRange

public DateTime? Start get;
public DateTime End get;

public TimeRange(DateTime? start, DateTime end)

this.Start = start;
this.End = end;







share|improve this answer





















  • You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
    – t3chb0t
    Apr 14 at 8:00


















up vote
1
down vote













Xiaoy312 has a point about the risk of missing seconds when using Interval. It seems that each "tick" of Observable.Interval() waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.



Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer seems to be a bad idea, since the same Cron job probably can not run concurrently(?).



One way not to lose any seconds using Observable.Interval is as follows:



 Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);

IDisposable subscription = source.Subscribe(
x =>

Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));


Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();


This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.






share|improve this answer























  • This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
    – t3chb0t
    Apr 14 at 22:13











  • @t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
    – Henrik Hansen
    Apr 15 at 4:49










  • @t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
    – Henrik Hansen
    Apr 15 at 4:55










  • Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
    – t3chb0t
    Apr 15 at 6:23











  • @t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
    – Henrik Hansen
    Apr 15 at 7:58











Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192006%2fscheduler-built-with-observables%23new-answer', 'question_page');

);

Post as a guest






























2 Answers
2






active

oldest

votes








2 Answers
2






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
3
down vote



accepted











public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());




You forgot to plug interval parameter. And, I would consider renaming it resolution.



I wouldn't trust the values from selecting DatetTime::Now or DateTime::UtcNow:



// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);



ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001



Depending on CronExpression::Contains implementation, the scheduler could be skipping task when DateTime.Now.Millisecond gets near 0 or 999.



I've attempted to solve that by adding index * interval to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:



// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)

var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;

return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));



An better solution would be to provide a range of time, and check if expression fall between the range:



public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);


public class TimeRange

public DateTime? Start get;
public DateTime End get;

public TimeRange(DateTime? start, DateTime end)

this.Start = start;
this.End = end;







share|improve this answer





















  • You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
    – t3chb0t
    Apr 14 at 8:00















up vote
3
down vote



accepted











public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());




You forgot to plug interval parameter. And, I would consider renaming it resolution.



I wouldn't trust the values from selecting DatetTime::Now or DateTime::UtcNow:



// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);



ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001



Depending on CronExpression::Contains implementation, the scheduler could be skipping task when DateTime.Now.Millisecond gets near 0 or 999.



I've attempted to solve that by adding index * interval to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:



// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)

var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;

return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));



An better solution would be to provide a range of time, and check if expression fall between the range:



public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);


public class TimeRange

public DateTime? Start get;
public DateTime End get;

public TimeRange(DateTime? start, DateTime end)

this.Start = start;
this.End = end;







share|improve this answer





















  • You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
    – t3chb0t
    Apr 14 at 8:00













up vote
3
down vote



accepted







up vote
3
down vote



accepted







public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());




You forgot to plug interval parameter. And, I would consider renaming it resolution.



I wouldn't trust the values from selecting DatetTime::Now or DateTime::UtcNow:



// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);



ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001



Depending on CronExpression::Contains implementation, the scheduler could be skipping task when DateTime.Now.Millisecond gets near 0 or 999.



I've attempted to solve that by adding index * interval to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:



// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)

var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;

return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));



An better solution would be to provide a range of time, and check if expression fall between the range:



public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);


public class TimeRange

public DateTime? Start get;
public DateTime End get;

public TimeRange(DateTime? start, DateTime end)

this.Start = start;
this.End = end;







share|improve this answer














public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)

return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());




You forgot to plug interval parameter. And, I would consider renaming it resolution.



I wouldn't trust the values from selecting DatetTime::Now or DateTime::UtcNow:



// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>

Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);



ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001



Depending on CronExpression::Contains implementation, the scheduler could be skipping task when DateTime.Now.Millisecond gets near 0 or 999.



I've attempted to solve that by adding index * interval to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:



// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)

var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;

return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));



An better solution would be to provide a range of time, and check if expression fall between the range:



public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)

var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);


public class TimeRange

public DateTime? Start get;
public DateTime End get;

public TimeRange(DateTime? start, DateTime end)

this.Start = start;
this.End = end;








share|improve this answer













share|improve this answer



share|improve this answer











answered Apr 13 at 23:03









Xiaoy312

2,767915




2,767915











  • You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
    – t3chb0t
    Apr 14 at 8:00

















  • You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
    – t3chb0t
    Apr 14 at 8:00
















You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
– t3chb0t
Apr 14 at 8:00





You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
– t3chb0t
Apr 14 at 8:00













up vote
1
down vote













Xiaoy312 has a point about the risk of missing seconds when using Interval. It seems that each "tick" of Observable.Interval() waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.



Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer seems to be a bad idea, since the same Cron job probably can not run concurrently(?).



One way not to lose any seconds using Observable.Interval is as follows:



 Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);

IDisposable subscription = source.Subscribe(
x =>

Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));


Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();


This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.






share|improve this answer























  • This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
    – t3chb0t
    Apr 14 at 22:13











  • @t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
    – Henrik Hansen
    Apr 15 at 4:49










  • @t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
    – Henrik Hansen
    Apr 15 at 4:55










  • Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
    – t3chb0t
    Apr 15 at 6:23











  • @t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
    – Henrik Hansen
    Apr 15 at 7:58















up vote
1
down vote













Xiaoy312 has a point about the risk of missing seconds when using Interval. It seems that each "tick" of Observable.Interval() waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.



Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer seems to be a bad idea, since the same Cron job probably can not run concurrently(?).



One way not to lose any seconds using Observable.Interval is as follows:



 Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);

IDisposable subscription = source.Subscribe(
x =>

Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));


Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();


This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.






share|improve this answer























  • This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
    – t3chb0t
    Apr 14 at 22:13











  • @t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
    – Henrik Hansen
    Apr 15 at 4:49










  • @t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
    – Henrik Hansen
    Apr 15 at 4:55










  • Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
    – t3chb0t
    Apr 15 at 6:23











  • @t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
    – Henrik Hansen
    Apr 15 at 7:58













up vote
1
down vote










up vote
1
down vote









Xiaoy312 has a point about the risk of missing seconds when using Interval. It seems that each "tick" of Observable.Interval() waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.



Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer seems to be a bad idea, since the same Cron job probably can not run concurrently(?).



One way not to lose any seconds using Observable.Interval is as follows:



 Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);

IDisposable subscription = source.Subscribe(
x =>

Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));


Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();


This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.






share|improve this answer















Xiaoy312 has a point about the risk of missing seconds when using Interval. It seems that each "tick" of Observable.Interval() waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.



Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer seems to be a bad idea, since the same Cron job probably can not run concurrently(?).



One way not to lose any seconds using Observable.Interval is as follows:



 Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);

IDisposable subscription = source.Subscribe(
x =>

Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));


Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();


This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.







share|improve this answer















share|improve this answer



share|improve this answer








edited Apr 15 at 8:54


























answered Apr 14 at 21:40









Henrik Hansen

3,8481417




3,8481417











  • This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
    – t3chb0t
    Apr 14 at 22:13











  • @t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
    – Henrik Hansen
    Apr 15 at 4:49










  • @t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
    – Henrik Hansen
    Apr 15 at 4:55










  • Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
    – t3chb0t
    Apr 15 at 6:23











  • @t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
    – Henrik Hansen
    Apr 15 at 7:58

















  • This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
    – t3chb0t
    Apr 14 at 22:13











  • @t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
    – Henrik Hansen
    Apr 15 at 4:49










  • @t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
    – Henrik Hansen
    Apr 15 at 4:55










  • Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
    – t3chb0t
    Apr 15 at 6:23











  • @t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
    – Henrik Hansen
    Apr 15 at 7:58
















This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
– t3chb0t
Apr 14 at 22:13





This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the x.Timestamp.DateTime property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s from last. If so, I return two timestamps via SelectMany. It doesn't matter that I return two at the same time, most important is to not loose it.
– t3chb0t
Apr 14 at 22:13













@t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
– Henrik Hansen
Apr 15 at 4:49




@t3chb0t: OK, it seems to be two different problems then. As I understand/understood ObserveOn(NewThreadScheduler.Default), each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
– Henrik Hansen
Apr 15 at 4:49












@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
– Henrik Hansen
Apr 15 at 4:55




@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
– Henrik Hansen
Apr 15 at 4:55












Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
– t3chb0t
Apr 15 at 6:23





Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
– t3chb0t
Apr 15 at 6:23













@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
– Henrik Hansen
Apr 15 at 7:58





@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
– Henrik Hansen
Apr 15 at 7:58













 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192006%2fscheduler-built-with-observables%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Greedy Best First Search implementation in Rust

Function to Return a JSON Like Objects Using VBA Collections and Arrays

C++11 CLH Lock Implementation