Scheduling using System.IObservable
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
4
down vote
favorite
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval
, I decided to make my own "time pump" which resulted in the HSecondCounter
.
HSecondCounter
HSecondCounter
is an implementation of the IObservable<DateTime>
interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count()
or async by calling HSceondCounter.CountAsync()
. Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...)
works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler
.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber
:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
add a comment |Â
up vote
4
down vote
favorite
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval
, I decided to make my own "time pump" which resulted in the HSecondCounter
.
HSecondCounter
HSecondCounter
is an implementation of the IObservable<DateTime>
interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count()
or async by calling HSceondCounter.CountAsync()
. Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...)
works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler
.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber
:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
I like the idea with the encapsulatedHSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);
, right?
â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is oneDateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
â Henrik Hansen
Apr 16 at 10:55
add a comment |Â
up vote
4
down vote
favorite
up vote
4
down vote
favorite
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval
, I decided to make my own "time pump" which resulted in the HSecondCounter
.
HSecondCounter
HSecondCounter
is an implementation of the IObservable<DateTime>
interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count()
or async by calling HSceondCounter.CountAsync()
. Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...)
works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler
.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber
:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
m_counter = HSecondCounter.CountAsync();
public IDisposable Subscribe(IScheduleSubscriber subscriber)
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
try
subscriber.Action(time);
DecrementSubscriber(subscriber);
catch (Exception ex)
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
(ex) =>
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>
Console.WriteLine($"subscriber.Name Completed");
);
return Disposable.Create(() =>
handle.Dispose();
RemoveSubscriber(subscriber);
);
private void RemoveSubscriber(IScheduleSubscriber subscriber)
m_parallelCounters.TryRemove(subscriber, out _);
private void DecrementSubscriber(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
private bool CanRun(IScheduleSubscriber subscriber)
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
return result;
internal void Stop()
m_counter.Stop();
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval
, I decided to make my own "time pump" which resulted in the HSecondCounter
.
HSecondCounter
HSecondCounter
is an implementation of the IObservable<DateTime>
interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
observer.OnNext(now);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
static public HSecondCounter CountAsync()
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
counter.Run((now) =>
lock (counter.m_observers)
foreach (var observer in counter.m_observers)
Task.Factory.StartNew(() =>
observer.OnNext(now);
);
Console.WriteLine("HSecondCounter: 0", now);
);
, TaskCreationOptions.LongRunning);
return counter;
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
public void Stop()
m_doContinue = false;
public IDisposable Subscribe(IObserver<DateTime> observer)
lock (m_observers)
if (!m_observers.Contains(observer))
m_observers.Add(observer);
return Disposable.Create(() =>
lock (m_observers)
m_observers.Remove(observer);
observer.OnCompleted();
);
async private void Run(Action<DateTime> notifier)
try
int lastSecond = 0;
while (m_doContinue)
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
notifier(now);
lastSecond = now.Second;
await Task.Delay(500);
catch (Exception ex)
lock (m_observers)
foreach (var observer in m_observers.ToArray())
observer.OnError(ex);
finally
lock (m_observers)
foreach (var observer in m_observers)
observer.OnCompleted();
Console.WriteLine($"HSceondCounter ended at: DateTime.Now");
It can be run "normally" by calling HSceondCounter.Count()
or async by calling HSceondCounter.CountAsync()
. Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...)
works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler
.
public interface IScheduleSubscriber
string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;
ScheduleSubscriber
A test implementation of IScheduleSubscriber
:
public class ScheduleSubscriber : IScheduleSubscriber
public string Name get; set;
public Action<DateTime> Action get; set;
public Func<DateTime, bool> ShouldRunPredicate get; set;
public int DegreeOfParallelism get; set;
public bool ShouldRun(DateTime time)
return ShouldRunPredicate(time);
public override string ToString()
return Name;
Test Case
void TestSchedule()
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 2 == 0;
;
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>
return time.Second % 5 == 0;
;
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
Console.ReadLine();
scheduler.Stop();
Console.ReadLine();
c# timer async-await observer-pattern scheduled-tasks
edited Apr 17 at 23:42
Jamalâ¦
30.1k11114225
30.1k11114225
asked Apr 16 at 8:52
Henrik Hansen
3,8481417
3,8481417
I like the idea with the encapsulatedHSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);
, right?
â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is oneDateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
â Henrik Hansen
Apr 16 at 10:55
add a comment |Â
I like the idea with the encapsulatedHSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);
, right?
â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is oneDateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
â Henrik Hansen
Apr 16 at 10:55
I like the idea with the encapsulated
HSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);
, right?â t3chb0t
Apr 16 at 10:45
I like the idea with the encapsulated
HSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);
, right?â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one
DateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)â Henrik Hansen
Apr 16 at 10:55
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one
DateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)â Henrik Hansen
Apr 16 at 10:55
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue
variable to control the while
loop of the Run
method but there is nowhere any mechanism to stop the LongRunning
task.
I think the scheduler should be IDisposable
and use a CancellationTokenSource
to cancel the Task.Factory.StartNew
method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers
variable. If you made it a HashSet
, you wouldn't need the if
for Subscribe
or you could just use a ConcurrentDictionary
and remove the two lock
s.
I also noticed that the HScheduler
has a Subscribe
method but isn't derived from the IObservable
interface.
The HSecondCounter
has two very similar methods: Count
and CountAsync
. I find this duplication is not necessary. Count
could call CountAsync
synchronously. In fact, they are identical. Additionaly I expect CountAsync
to return a Task
so that I can await
it (if I wanted to) - this is the usual convention for xAsync
methods.
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue
variable to control the while
loop of the Run
method but there is nowhere any mechanism to stop the LongRunning
task.
I think the scheduler should be IDisposable
and use a CancellationTokenSource
to cancel the Task.Factory.StartNew
method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers
variable. If you made it a HashSet
, you wouldn't need the if
for Subscribe
or you could just use a ConcurrentDictionary
and remove the two lock
s.
I also noticed that the HScheduler
has a Subscribe
method but isn't derived from the IObservable
interface.
The HSecondCounter
has two very similar methods: Count
and CountAsync
. I find this duplication is not necessary. Count
could call CountAsync
synchronously. In fact, they are identical. Additionaly I expect CountAsync
to return a Task
so that I can await
it (if I wanted to) - this is the usual convention for xAsync
methods.
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue
variable to control the while
loop of the Run
method but there is nowhere any mechanism to stop the LongRunning
task.
I think the scheduler should be IDisposable
and use a CancellationTokenSource
to cancel the Task.Factory.StartNew
method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers
variable. If you made it a HashSet
, you wouldn't need the if
for Subscribe
or you could just use a ConcurrentDictionary
and remove the two lock
s.
I also noticed that the HScheduler
has a Subscribe
method but isn't derived from the IObservable
interface.
The HSecondCounter
has two very similar methods: Count
and CountAsync
. I find this duplication is not necessary. Count
could call CountAsync
synchronously. In fact, they are identical. Additionaly I expect CountAsync
to return a Task
so that I can await
it (if I wanted to) - this is the usual convention for xAsync
methods.
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
up vote
1
down vote
accepted
up vote
1
down vote
accepted
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue
variable to control the while
loop of the Run
method but there is nowhere any mechanism to stop the LongRunning
task.
I think the scheduler should be IDisposable
and use a CancellationTokenSource
to cancel the Task.Factory.StartNew
method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers
variable. If you made it a HashSet
, you wouldn't need the if
for Subscribe
or you could just use a ConcurrentDictionary
and remove the two lock
s.
I also noticed that the HScheduler
has a Subscribe
method but isn't derived from the IObservable
interface.
The HSecondCounter
has two very similar methods: Count
and CountAsync
. I find this duplication is not necessary. Count
could call CountAsync
synchronously. In fact, they are identical. Additionaly I expect CountAsync
to return a Task
so that I can await
it (if I wanted to) - this is the usual convention for xAsync
methods.
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue
variable to control the while
loop of the Run
method but there is nowhere any mechanism to stop the LongRunning
task.
I think the scheduler should be IDisposable
and use a CancellationTokenSource
to cancel the Task.Factory.StartNew
method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers
variable. If you made it a HashSet
, you wouldn't need the if
for Subscribe
or you could just use a ConcurrentDictionary
and remove the two lock
s.
I also noticed that the HScheduler
has a Subscribe
method but isn't derived from the IObservable
interface.
The HSecondCounter
has two very similar methods: Count
and CountAsync
. I find this duplication is not necessary. Count
could call CountAsync
synchronously. In fact, they are identical. Additionaly I expect CountAsync
to return a Task
so that I can await
it (if I wanted to) - this is the usual convention for xAsync
methods.
answered Apr 17 at 15:10
t3chb0t
32k54195
32k54195
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
â Henrik Hansen
Apr 17 at 17:23
1
1
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/â¦
â Henrik Hansen
Apr 18 at 6:48
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192177%2fscheduling-using-system-iobservable%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
I like the idea with the encapsulated
HSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);
, right?â t3chb0t
Apr 16 at 10:45
@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one
DateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)â Henrik Hansen
Apr 16 at 10:55