Scheduling using System.IObservable

Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
4
down vote
favorite
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.
HSecondCounter
HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
add a comment |Â
up vote
4
down vote
favorite
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.
HSecondCounter
HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
I like the idea with the encapsulatedHSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);, right?
â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is oneDateTime.Nowwith the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
â Henrik Hansen
Apr 16 at 10:55
add a comment |Â
up vote
4
down vote
favorite
up vote
4
down vote
favorite
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.
HSecondCounter
HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.
HSecondCounter
HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
edited Apr 17 at 23:42
Jamalâ¦
30.1k11114225
30.1k11114225
asked Apr 16 at 8:52
Henrik Hansen
3,8481417
3,8481417
I like the idea with the encapsulatedHSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);, right?
â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is oneDateTime.Nowwith the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
â Henrik Hansen
Apr 16 at 10:55
add a comment |Â
I like the idea with the encapsulatedHSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);, right?
â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is oneDateTime.Nowwith the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
â Henrik Hansen
Apr 16 at 10:55
I like the idea with the encapsulated
HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?â t3chb0t
Apr 16 at 10:45
I like the idea with the encapsulated
HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one
DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)â Henrik Hansen
Apr 16 at 10:55
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one
DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)â Henrik Hansen
Apr 16 at 10:55
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.
I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.
I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.
The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.
I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.
I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.
The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.
I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.
I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.
The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
up vote
1
down vote
accepted
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.
I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.
I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.
The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.
I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.
I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.
The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.
answered Apr 17 at 15:10
t3chb0t
32k54195
32k54195
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192177%2fscheduling-using-system-iobservable%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
I like the idea with the encapsulated
HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);, right?â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one
DateTime.Nowwith the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)â Henrik Hansen
Apr 16 at 10:55