Scheduler built with observables
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
4
down vote
favorite
I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.
Follow-up
There is a newer version of the Scheduler
.
The Scheduler
class is really simple. It's backed by a newer implementation of my old CronExpression.
There are currently only these two methods. The factory method Create
creates a new scheduler that ticks at the specified intervals and provides schedules as DateTime
s to the observers.
Jobs are scheduled with the Schedule
extension. This one requires a cron-expression and the action to execute.
public static class Scheduler
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);
The DateTime
abstraction is supported by the IDateTime
interface:
public interface IDateTime
DateTime Now();
which is implemented as
public class LocalDateTime : IDateTime
public DateTime Now() => DateTime.Now;
or
public class UtcDateTime : IDateTime
public DateTime Now() => DateTime.UtcNow;
Example
In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);
scheduler.Schedule("0/5 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule");
);
The output is:
DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]
There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?
c# extension-methods observer-pattern scheduled-tasks
add a comment |Â
up vote
4
down vote
favorite
I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.
Follow-up
There is a newer version of the Scheduler
.
The Scheduler
class is really simple. It's backed by a newer implementation of my old CronExpression.
There are currently only these two methods. The factory method Create
creates a new scheduler that ticks at the specified intervals and provides schedules as DateTime
s to the observers.
Jobs are scheduled with the Schedule
extension. This one requires a cron-expression and the action to execute.
public static class Scheduler
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);
The DateTime
abstraction is supported by the IDateTime
interface:
public interface IDateTime
DateTime Now();
which is implemented as
public class LocalDateTime : IDateTime
public DateTime Now() => DateTime.Now;
or
public class UtcDateTime : IDateTime
public DateTime Now() => DateTime.UtcNow;
Example
In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);
scheduler.Schedule("0/5 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule");
);
The output is:
DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]
There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?
c# extension-methods observer-pattern scheduled-tasks
1
.Interval(TimeSpan.FromSeconds(1))
you should be using the parameterinterval
â Xiaoy312
Apr 13 at 21:53
add a comment |Â
up vote
4
down vote
favorite
up vote
4
down vote
favorite
I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.
Follow-up
There is a newer version of the Scheduler
.
The Scheduler
class is really simple. It's backed by a newer implementation of my old CronExpression.
There are currently only these two methods. The factory method Create
creates a new scheduler that ticks at the specified intervals and provides schedules as DateTime
s to the observers.
Jobs are scheduled with the Schedule
extension. This one requires a cron-expression and the action to execute.
public static class Scheduler
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);
The DateTime
abstraction is supported by the IDateTime
interface:
public interface IDateTime
DateTime Now();
which is implemented as
public class LocalDateTime : IDateTime
public DateTime Now() => DateTime.Now;
or
public class UtcDateTime : IDateTime
public DateTime Now() => DateTime.UtcNow;
Example
In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);
scheduler.Schedule("0/5 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule");
);
The output is:
DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]
There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?
c# extension-methods observer-pattern scheduled-tasks
I sometimes have to schedule some tasks and wanted to have my own reusable scheduler. Since I like the Rx, I build it on top of it.
Follow-up
There is a newer version of the Scheduler
.
The Scheduler
class is really simple. It's backed by a newer implementation of my old CronExpression.
There are currently only these two methods. The factory method Create
creates a new scheduler that ticks at the specified intervals and provides schedules as DateTime
s to the observers.
Jobs are scheduled with the Schedule
extension. This one requires a cron-expression and the action to execute.
public static class Scheduler
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
public static IDisposable Schedule(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = CronExpression.Parse(cronExpressionString);
return
schedules
.Where(cronExpression.Contains)
.Subscribe(action);
The DateTime
abstraction is supported by the IDateTime
interface:
public interface IDateTime
DateTime Now();
which is implemented as
public class LocalDateTime : IDateTime
public DateTime Now() => DateTime.Now;
or
public class UtcDateTime : IDateTime
public DateTime Now() => DateTime.UtcNow;
Example
In order to use it I just create a scheduler, specify what kind of timestamp it should generate and schedule some actions:
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"DEBUG: schedule [Thread.CurrentThread.ManagedThreadId]");
);
scheduler.Schedule("0/5 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule");
);
The output is:
DEBUG: 13/04/2018 22:32:09 [10]
DEBUG: 13/04/2018 22:32:10 [12]
ACTION: 13/04/2018 22:32:10
DEBUG: 13/04/2018 22:32:11 [10]
DEBUG: 13/04/2018 22:32:12 [14]
DEBUG: 13/04/2018 22:32:13 [8]
DEBUG: 13/04/2018 22:32:14 [12]
ACTION: 13/04/2018 22:32:15
DEBUG: 13/04/2018 22:32:15 [8]
DEBUG: 13/04/2018 22:32:16 [12]
DEBUG: 13/04/2018 22:32:17 [8]
There seems to be no rocket science here but this might elusive. Can/should this scheduler by improved in any way?
c# extension-methods observer-pattern scheduled-tasks
edited Apr 15 at 9:32
asked Apr 13 at 20:43
t3chb0t
32k54195
32k54195
1
.Interval(TimeSpan.FromSeconds(1))
you should be using the parameterinterval
â Xiaoy312
Apr 13 at 21:53
add a comment |Â
1
.Interval(TimeSpan.FromSeconds(1))
you should be using the parameterinterval
â Xiaoy312
Apr 13 at 21:53
1
1
.Interval(TimeSpan.FromSeconds(1))
you should be using the parameter interval
â Xiaoy312
Apr 13 at 21:53
.Interval(TimeSpan.FromSeconds(1))
you should be using the parameter interval
â Xiaoy312
Apr 13 at 21:53
add a comment |Â
2 Answers
2
active
oldest
votes
up vote
3
down vote
accepted
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
You forgot to plug interval
parameter. And, I would consider renaming it resolution
.
I wouldn't trust the values from selecting DatetTime::Now
or DateTime::UtcNow
:
// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);
ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001
Depending on CronExpression::Contains
implementation, the scheduler could be skipping task when DateTime.Now.Millisecond
gets near 0 or 999.
I've attempted to solve that by adding index * interval
to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:
// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)
var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;
return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));
An better solution would be to provide a range of time, and check if expression fall between the range:
public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);
public class TimeRange
public DateTime? Start get;
public DateTime End get;
public TimeRange(DateTime? start, DateTime end)
this.Start = start;
this.End = end;
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
add a comment |Â
up vote
1
down vote
Xiaoy312 has a point about the risk of missing seconds when using Interval
. It seems that each "tick" of Observable.Interval()
waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.
Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer
seems to be a bad idea, since the same Cron job probably can not run concurrently(?).
One way not to lose any seconds using Observable.Interval
is as follows:
Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);
IDisposable subscription = source.Subscribe(
x =>
Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get thex.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is>1s
from last. If so, I return two timestamps viaSelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.
â t3chb0t
Apr 14 at 22:13
@t3chb0t: OK, it seems to be two different problems then. As I understand/understoodObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
add a comment |Â
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
You forgot to plug interval
parameter. And, I would consider renaming it resolution
.
I wouldn't trust the values from selecting DatetTime::Now
or DateTime::UtcNow
:
// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);
ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001
Depending on CronExpression::Contains
implementation, the scheduler could be skipping task when DateTime.Now.Millisecond
gets near 0 or 999.
I've attempted to solve that by adding index * interval
to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:
// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)
var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;
return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));
An better solution would be to provide a range of time, and check if expression fall between the range:
public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);
public class TimeRange
public DateTime? Start get;
public DateTime End get;
public TimeRange(DateTime? start, DateTime end)
this.Start = start;
this.End = end;
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
add a comment |Â
up vote
3
down vote
accepted
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
You forgot to plug interval
parameter. And, I would consider renaming it resolution
.
I wouldn't trust the values from selecting DatetTime::Now
or DateTime::UtcNow
:
// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);
ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001
Depending on CronExpression::Contains
implementation, the scheduler could be skipping task when DateTime.Now.Millisecond
gets near 0 or 999.
I've attempted to solve that by adding index * interval
to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:
// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)
var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;
return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));
An better solution would be to provide a range of time, and check if expression fall between the range:
public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);
public class TimeRange
public DateTime? Start get;
public DateTime End get;
public TimeRange(DateTime? start, DateTime end)
this.Start = start;
this.End = end;
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
add a comment |Â
up vote
3
down vote
accepted
up vote
3
down vote
accepted
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
You forgot to plug interval
parameter. And, I would consider renaming it resolution
.
I wouldn't trust the values from selecting DatetTime::Now
or DateTime::UtcNow
:
// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);
ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001
Depending on CronExpression::Contains
implementation, the scheduler could be skipping task when DateTime.Now.Millisecond
gets near 0 or 999.
I've attempted to solve that by adding index * interval
to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:
// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)
var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;
return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));
An better solution would be to provide a range of time, and check if expression fall between the range:
public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);
public class TimeRange
public DateTime? Start get;
public DateTime End get;
public TimeRange(DateTime? start, DateTime end)
this.Start = start;
this.End = end;
public static IObservable<DateTime> Create(TimeSpan interval, IDateTime dateTime)
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
You forgot to plug interval
parameter. And, I would consider renaming it resolution
.
I wouldn't trust the values from selecting DatetTime::Now
or DateTime::UtcNow
:
// try running this for a while
var scheduler = Scheduler.Create(TimeSpan.FromSeconds(1), new LocalDateTime());
scheduler.Schedule("0/1 * * * * * *", schedule =>
Console.WriteLine($"ACTION: schedule: schedule:ss.fff");
);
ACTION: 2018-04-13 6:28:29 PM: 29.987
ACTION: 2018-04-13 6:28:30 PM: 30.987
ACTION: 2018-04-13 6:28:32 PM: 32.001
ACTION: 2018-04-13 6:28:33 PM: 33.001
Depending on CronExpression::Contains
implementation, the scheduler could be skipping task when DateTime.Now.Millisecond
gets near 0 or 999.
I've attempted to solve that by adding index * interval
to a snapshot of time, however this solution suffered a problem: The delay is cumulative, so the schedule's time will slowly deviate(fall behind) from the real time:
// dont use this
public static IObservable<DateTime> Create2(TimeSpan interval, IDateTime dateTime)
var snapshot = dateTime.Now();
var offset = interval.Ticks - snapshot.Ticks % interval.Ticks;
return Observable.Interval(interval)
.Delay(TimeSpan.FromTicks(offset))
.Select(x => snapshot.AddTicks(interval.Ticks * (x + 1) + offset));
An better solution would be to provide a range of time, and check if expression fall between the range:
public static IDisposable Schedule2(this IObservable<DateTime> schedules, string cronExpressionString, Action<DateTime> action)
var cronExpression = Regex.Match(cronExpressionString, @"^0/(?<s>d+)");
return
schedules
.Scan(default(TimeRange), (previous, x) => new TimeRange(previous?.End, x))
.Where(cronExpression.Contains)
.Subscribe(action);
public class TimeRange
public DateTime? Start get;
public DateTime End get;
public TimeRange(DateTime? start, DateTime end)
this.Start = start;
this.End = end;
answered Apr 13 at 23:03
Xiaoy312
2,767915
2,767915
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
add a comment |Â
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
You forgot to plug interval parameter. oops, refactoring mistake :-] Since the smallest value of a cron-expression is seconds, I think the milliseconds can be ignored... but I see that I have to take care of the missing ones :-o Could you explain the range approach in more detail? I cannot figure this out :-( but you have my vote!
â t3chb0t
Apr 14 at 8:00
add a comment |Â
up vote
1
down vote
Xiaoy312 has a point about the risk of missing seconds when using Interval
. It seems that each "tick" of Observable.Interval()
waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.
Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer
seems to be a bad idea, since the same Cron job probably can not run concurrently(?).
One way not to lose any seconds using Observable.Interval
is as follows:
Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);
IDisposable subscription = source.Subscribe(
x =>
Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get thex.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is>1s
from last. If so, I return two timestamps viaSelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.
â t3chb0t
Apr 14 at 22:13
@t3chb0t: OK, it seems to be two different problems then. As I understand/understoodObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
add a comment |Â
up vote
1
down vote
Xiaoy312 has a point about the risk of missing seconds when using Interval
. It seems that each "tick" of Observable.Interval()
waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.
Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer
seems to be a bad idea, since the same Cron job probably can not run concurrently(?).
One way not to lose any seconds using Observable.Interval
is as follows:
Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);
IDisposable subscription = source.Subscribe(
x =>
Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get thex.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is>1s
from last. If so, I return two timestamps viaSelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.
â t3chb0t
Apr 14 at 22:13
@t3chb0t: OK, it seems to be two different problems then. As I understand/understoodObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
add a comment |Â
up vote
1
down vote
up vote
1
down vote
Xiaoy312 has a point about the risk of missing seconds when using Interval
. It seems that each "tick" of Observable.Interval()
waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.
Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer
seems to be a bad idea, since the same Cron job probably can not run concurrently(?).
One way not to lose any seconds using Observable.Interval
is as follows:
Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);
IDisposable subscription = source.Subscribe(
x =>
Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.
Xiaoy312 has a point about the risk of missing seconds when using Interval
. It seems that each "tick" of Observable.Interval()
waits until the previous returns. So if the OnNext halts the thread a while, the next "tick" is fired too late and you may miss a second or two.
Trying to start each scheduled task on a new thread - for instance via a System.Timers.Timer
seems to be a bad idea, since the same Cron job probably can not run concurrently(?).
One way not to lose any seconds using Observable.Interval
is as follows:
Random rand = new Random(5);
IObservable<Timestamped<long>> source = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().ObserveOn(NewThreadScheduler.Default);
IDisposable subscription = source.Subscribe(
x =>
Console.WriteLine("OnNext: 0", x);
Thread.Sleep(rand.Next(0, 4001));
Console.WriteLine("After Sleep OnNext: 0", x);
,
ex => Console.WriteLine("OnError: 0", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
This seems to ensure no slip in the sequence of timestamps (Second-wise at least), but each elapsed interval may be (cumulatively) delayed according to previous intervals "laziness". In this way you are not guarantied that the job is done on schedule, but you won't miss a job because of "missing" seconds. It seems though that the time pump catches up the delay when no action delay the current elapsed interval.
edited Apr 15 at 8:54
answered Apr 14 at 21:40
Henrik Hansen
3,8481417
3,8481417
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get thex.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is>1s
from last. If so, I return two timestamps viaSelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.
â t3chb0t
Apr 14 at 22:13
@t3chb0t: OK, it seems to be two different problems then. As I understand/understoodObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
add a comment |Â
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get thex.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is>1s
from last. If so, I return two timestamps viaSelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.
â t3chb0t
Apr 14 at 22:13
@t3chb0t: OK, it seems to be two different problems then. As I understand/understoodObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)
â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the
x.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s
from last. If so, I return two timestamps via SelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.â t3chb0t
Apr 14 at 22:13
This is an interesting approach but unfortunatelly if you constrain the thread, all worker tasks will execute on the same one. If you get the
x.Timestamp.DateTime
property and format it with milliseconds you'll notice that it suffers from the same inaccuracy as other techniques :-( I've been experimenting with the missing second for some time and I think I might have found how to fix it. I'll post it later but it checks if now is >1s
from last. If so, I return two timestamps via SelectMany
. It doesn't matter that I return two at the same time, most important is to not loose it.â t3chb0t
Apr 14 at 22:13
@t3chb0t: OK, it seems to be two different problems then. As I understand/understood
ObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: OK, it seems to be two different problems then. As I understand/understood
ObserveOn(NewThreadScheduler.Default)
, each interval should be executed on its own thread, but that is not the case. And it seems not to be the case no matter how you configure Observable.Intervals (?)â Henrik Hansen
Apr 15 at 4:49
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
@t3chb0t: You approach with two timestamps seems to be the same as to have an initial start time, and then just iterate over a long representing the second-offset from that start time.
â Henrik Hansen
Apr 15 at 4:55
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
Observers are executed on their threads without any additional measures. See the output in my question. I write the thread id to the console. Two timestamps aren't the same as incrementing a start date. Because with incrementing it'll be at some point permanently inaccurate, with my approach I use Now, and publish an additional timestamp only once in a while if one was missing due to 15ms resolution of the timers. You can see how this grows by adding milliseconds to the output :-(
â t3chb0t
Apr 15 at 6:23
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
@t3chb0t: Yes, I understand. If you do some time consuming work in the actions, you'll see a different behavior. About the two timestamp approach: I suppose you'll then execute the cron action twice if they differ and both timestamps trigger them. I won't argue, but look forward to see your solution - it's always interesting to see, what you're up to :-). My long-suggestion was an attempt to solve another tick-slip problem, that may not occur in you real environment.
â Henrik Hansen
Apr 15 at 7:58
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192006%2fscheduler-built-with-observables%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
1
.Interval(TimeSpan.FromSeconds(1))
you should be using the parameterinterval
â Xiaoy312
Apr 13 at 21:53