Scheduling using System.IObservable

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
4
down vote

favorite












With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.



The scheduler



 public class HScheduler

public const int UnlimitedJobParallelism = -1;

HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();

public void Start()

m_counter = HSecondCounter.CountAsync();


public IDisposable Subscribe(IScheduleSubscriber subscriber)

IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>

try

subscriber.Action(time);
DecrementSubscriber(subscriber);

catch (Exception ex)

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);

,
(ex) =>

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>

Console.WriteLine($"subscriber.Name Completed");
);

return Disposable.Create(() =>

handle.Dispose();
RemoveSubscriber(subscriber);
);


private void RemoveSubscriber(IScheduleSubscriber subscriber)

m_parallelCounters.TryRemove(subscriber, out _);


private void DecrementSubscriber(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);


private bool CanRun(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;

int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)

m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);

return result;


internal void Stop()

m_counter.Stop();




This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.



HSecondCounter



HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?



 public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

observer.OnNext(now);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


static public HSecondCounter CountAsync()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

Task.Factory.StartNew(() =>

observer.OnNext(now);
);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;

private HSecondCounter()



public void Stop()

m_doContinue = false;


public IDisposable Subscribe(IObserver<DateTime> observer)

lock (m_observers)

if (!m_observers.Contains(observer))

m_observers.Add(observer);



return Disposable.Create(() =>

lock (m_observers)

m_observers.Remove(observer);


observer.OnCompleted();
);


async private void Run(Action<DateTime> notifier)

try

int lastSecond = 0;

while (m_doContinue)

DateTime now = DateTime.Now;

if (now.Second != lastSecond)

notifier(now);


lastSecond = now.Second;

await Task.Delay(500);


catch (Exception ex)

lock (m_observers)

foreach (var observer in m_observers.ToArray())

observer.OnError(ex);



finally

lock (m_observers)

foreach (var observer in m_observers)

observer.OnCompleted();


Console.WriteLine($"HSceondCounter ended at: DateTime.Now");




It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?



IScheduleSubscriber



The contract for subscribers to the HScheduler.



 public interface IScheduleSubscriber

string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;



ScheduleSubscriber



A test implementation of IScheduleSubscriber:



 public class ScheduleSubscriber : IScheduleSubscriber

public string Name get; set;

public Action<DateTime> Action get; set;

public Func<DateTime, bool> ShouldRunPredicate get; set;

public int DegreeOfParallelism get; set;

public bool ShouldRun(DateTime time)

return ShouldRunPredicate(time);


public override string ToString()

return Name;




Test Case



void TestSchedule()

HScheduler scheduler = new HScheduler();
scheduler.Start();

ScheduleSubscriber subscriber1 = new ScheduleSubscriber

Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>

Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 2 == 0;

;

ScheduleSubscriber subscriber2 = new ScheduleSubscriber

Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>

Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 5 == 0;

;

using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))

Console.ReadLine();


scheduler.Stop();
Console.ReadLine();







share|improve this question





















  • I like the idea with the encapsulated HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?
    – t3chb0t
    Apr 16 at 10:45











  • @t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
    – Henrik Hansen
    Apr 16 at 10:55

















up vote
4
down vote

favorite












With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.



The scheduler



 public class HScheduler

public const int UnlimitedJobParallelism = -1;

HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();

public void Start()

m_counter = HSecondCounter.CountAsync();


public IDisposable Subscribe(IScheduleSubscriber subscriber)

IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>

try

subscriber.Action(time);
DecrementSubscriber(subscriber);

catch (Exception ex)

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);

,
(ex) =>

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>

Console.WriteLine($"subscriber.Name Completed");
);

return Disposable.Create(() =>

handle.Dispose();
RemoveSubscriber(subscriber);
);


private void RemoveSubscriber(IScheduleSubscriber subscriber)

m_parallelCounters.TryRemove(subscriber, out _);


private void DecrementSubscriber(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);


private bool CanRun(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;

int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)

m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);

return result;


internal void Stop()

m_counter.Stop();




This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.



HSecondCounter



HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?



 public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

observer.OnNext(now);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


static public HSecondCounter CountAsync()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

Task.Factory.StartNew(() =>

observer.OnNext(now);
);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;

private HSecondCounter()



public void Stop()

m_doContinue = false;


public IDisposable Subscribe(IObserver<DateTime> observer)

lock (m_observers)

if (!m_observers.Contains(observer))

m_observers.Add(observer);



return Disposable.Create(() =>

lock (m_observers)

m_observers.Remove(observer);


observer.OnCompleted();
);


async private void Run(Action<DateTime> notifier)

try

int lastSecond = 0;

while (m_doContinue)

DateTime now = DateTime.Now;

if (now.Second != lastSecond)

notifier(now);


lastSecond = now.Second;

await Task.Delay(500);


catch (Exception ex)

lock (m_observers)

foreach (var observer in m_observers.ToArray())

observer.OnError(ex);



finally

lock (m_observers)

foreach (var observer in m_observers)

observer.OnCompleted();


Console.WriteLine($"HSceondCounter ended at: DateTime.Now");




It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?



IScheduleSubscriber



The contract for subscribers to the HScheduler.



 public interface IScheduleSubscriber

string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;



ScheduleSubscriber



A test implementation of IScheduleSubscriber:



 public class ScheduleSubscriber : IScheduleSubscriber

public string Name get; set;

public Action<DateTime> Action get; set;

public Func<DateTime, bool> ShouldRunPredicate get; set;

public int DegreeOfParallelism get; set;

public bool ShouldRun(DateTime time)

return ShouldRunPredicate(time);


public override string ToString()

return Name;




Test Case



void TestSchedule()

HScheduler scheduler = new HScheduler();
scheduler.Start();

ScheduleSubscriber subscriber1 = new ScheduleSubscriber

Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>

Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 2 == 0;

;

ScheduleSubscriber subscriber2 = new ScheduleSubscriber

Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>

Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 5 == 0;

;

using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))

Console.ReadLine();


scheduler.Stop();
Console.ReadLine();







share|improve this question





















  • I like the idea with the encapsulated HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?
    – t3chb0t
    Apr 16 at 10:45











  • @t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
    – Henrik Hansen
    Apr 16 at 10:55













up vote
4
down vote

favorite









up vote
4
down vote

favorite











With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.



The scheduler



 public class HScheduler

public const int UnlimitedJobParallelism = -1;

HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();

public void Start()

m_counter = HSecondCounter.CountAsync();


public IDisposable Subscribe(IScheduleSubscriber subscriber)

IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>

try

subscriber.Action(time);
DecrementSubscriber(subscriber);

catch (Exception ex)

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);

,
(ex) =>

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>

Console.WriteLine($"subscriber.Name Completed");
);

return Disposable.Create(() =>

handle.Dispose();
RemoveSubscriber(subscriber);
);


private void RemoveSubscriber(IScheduleSubscriber subscriber)

m_parallelCounters.TryRemove(subscriber, out _);


private void DecrementSubscriber(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);


private bool CanRun(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;

int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)

m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);

return result;


internal void Stop()

m_counter.Stop();




This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.



HSecondCounter



HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?



 public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

observer.OnNext(now);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


static public HSecondCounter CountAsync()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

Task.Factory.StartNew(() =>

observer.OnNext(now);
);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;

private HSecondCounter()



public void Stop()

m_doContinue = false;


public IDisposable Subscribe(IObserver<DateTime> observer)

lock (m_observers)

if (!m_observers.Contains(observer))

m_observers.Add(observer);



return Disposable.Create(() =>

lock (m_observers)

m_observers.Remove(observer);


observer.OnCompleted();
);


async private void Run(Action<DateTime> notifier)

try

int lastSecond = 0;

while (m_doContinue)

DateTime now = DateTime.Now;

if (now.Second != lastSecond)

notifier(now);


lastSecond = now.Second;

await Task.Delay(500);


catch (Exception ex)

lock (m_observers)

foreach (var observer in m_observers.ToArray())

observer.OnError(ex);



finally

lock (m_observers)

foreach (var observer in m_observers)

observer.OnCompleted();


Console.WriteLine($"HSceondCounter ended at: DateTime.Now");




It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?



IScheduleSubscriber



The contract for subscribers to the HScheduler.



 public interface IScheduleSubscriber

string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;



ScheduleSubscriber



A test implementation of IScheduleSubscriber:



 public class ScheduleSubscriber : IScheduleSubscriber

public string Name get; set;

public Action<DateTime> Action get; set;

public Func<DateTime, bool> ShouldRunPredicate get; set;

public int DegreeOfParallelism get; set;

public bool ShouldRun(DateTime time)

return ShouldRunPredicate(time);


public override string ToString()

return Name;




Test Case



void TestSchedule()

HScheduler scheduler = new HScheduler();
scheduler.Start();

ScheduleSubscriber subscriber1 = new ScheduleSubscriber

Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>

Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 2 == 0;

;

ScheduleSubscriber subscriber2 = new ScheduleSubscriber

Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>

Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 5 == 0;

;

using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))

Console.ReadLine();


scheduler.Stop();
Console.ReadLine();







share|improve this question













With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.



The scheduler



 public class HScheduler

public const int UnlimitedJobParallelism = -1;

HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();

public void Start()

m_counter = HSecondCounter.CountAsync();


public IDisposable Subscribe(IScheduleSubscriber subscriber)

IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>

try

subscriber.Action(time);
DecrementSubscriber(subscriber);

catch (Exception ex)

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);

,
(ex) =>

// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
,
() =>

Console.WriteLine($"subscriber.Name Completed");
);

return Disposable.Create(() =>

handle.Dispose();
RemoveSubscriber(subscriber);
);


private void RemoveSubscriber(IScheduleSubscriber subscriber)

m_parallelCounters.TryRemove(subscriber, out _);


private void DecrementSubscriber(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);


private bool CanRun(IScheduleSubscriber subscriber)

if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;

int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)

m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);

return result;


internal void Stop()

m_counter.Stop();




This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval, I decided to make my own "time pump" which resulted in the HSecondCounter.



HSecondCounter



HSecondCounter is an implementation of the IObservable<DateTime> interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?



 public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

observer.OnNext(now);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


static public HSecondCounter CountAsync()

HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>

counter.Run((now) =>

lock (counter.m_observers)

foreach (var observer in counter.m_observers)

Task.Factory.StartNew(() =>

observer.OnNext(now);
);

Console.WriteLine("HSecondCounter: 0", now);

);
, TaskCreationOptions.LongRunning);

return counter;


List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;

private HSecondCounter()



public void Stop()

m_doContinue = false;


public IDisposable Subscribe(IObserver<DateTime> observer)

lock (m_observers)

if (!m_observers.Contains(observer))

m_observers.Add(observer);



return Disposable.Create(() =>

lock (m_observers)

m_observers.Remove(observer);


observer.OnCompleted();
);


async private void Run(Action<DateTime> notifier)

try

int lastSecond = 0;

while (m_doContinue)

DateTime now = DateTime.Now;

if (now.Second != lastSecond)

notifier(now);


lastSecond = now.Second;

await Task.Delay(500);


catch (Exception ex)

lock (m_observers)

foreach (var observer in m_observers.ToArray())

observer.OnError(ex);



finally

lock (m_observers)

foreach (var observer in m_observers)

observer.OnCompleted();


Console.WriteLine($"HSceondCounter ended at: DateTime.Now");




It can be run "normally" by calling HSceondCounter.Count() or async by calling HSceondCounter.CountAsync(). Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...) works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?



IScheduleSubscriber



The contract for subscribers to the HScheduler.



 public interface IScheduleSubscriber

string Name get;
Action<DateTime> Action get;
bool ShouldRun(DateTime time);
int DegreeOfParallelism get;



ScheduleSubscriber



A test implementation of IScheduleSubscriber:



 public class ScheduleSubscriber : IScheduleSubscriber

public string Name get; set;

public Action<DateTime> Action get; set;

public Func<DateTime, bool> ShouldRunPredicate get; set;

public int DegreeOfParallelism get; set;

public bool ShouldRun(DateTime time)

return ShouldRunPredicate(time);


public override string ToString()

return Name;




Test Case



void TestSchedule()

HScheduler scheduler = new HScheduler();
scheduler.Start();

ScheduleSubscriber subscriber1 = new ScheduleSubscriber

Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>

Console.WriteLine($"AAAA: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 2 == 0;

;

ScheduleSubscriber subscriber2 = new ScheduleSubscriber

Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>

Console.WriteLine($"BBBB: value - value.Millisecond - Thread Id: Thread.CurrentThread.ManagedThreadId");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
,
ShouldRunPredicate = (time) =>

return time.Second % 5 == 0;

;

using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))

Console.ReadLine();


scheduler.Stop();
Console.ReadLine();









share|improve this question












share|improve this question




share|improve this question








edited Apr 17 at 23:42









Jamal♦

30.1k11114225




30.1k11114225









asked Apr 16 at 8:52









Henrik Hansen

3,8481417




3,8481417











  • I like the idea with the encapsulated HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?
    – t3chb0t
    Apr 16 at 10:45











  • @t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
    – Henrik Hansen
    Apr 16 at 10:55

















  • I like the idea with the encapsulated HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?
    – t3chb0t
    Apr 16 at 10:45











  • @t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
    – Henrik Hansen
    Apr 16 at 10:55
















I like the idea with the encapsulated HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?
– t3chb0t
Apr 16 at 10:45





I like the idea with the encapsulated HSecondCounter. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second with await Task.Delay(500);, right?
– t3chb0t
Apr 16 at 10:45













@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
– Henrik Hansen
Apr 16 at 10:55





@t3chb0t: Yes that's the idea. I discovered that when stepping with 500 ms there is one DateTime.Now with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-)
– Henrik Hansen
Apr 16 at 10:55











1 Answer
1






active

oldest

votes

















up vote
1
down vote



accepted










It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)




There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.



I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.




Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.




I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.




The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.






share|improve this answer





















  • Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
    – Henrik Hansen
    Apr 17 at 17:23






  • 1




    I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
    – Henrik Hansen
    Apr 18 at 6:48










Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192177%2fscheduling-using-system-iobservable%23new-answer', 'question_page');

);

Post as a guest






























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
1
down vote



accepted










It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)




There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.



I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.




Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.




I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.




The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.






share|improve this answer





















  • Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
    – Henrik Hansen
    Apr 17 at 17:23






  • 1




    I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
    – Henrik Hansen
    Apr 18 at 6:48














up vote
1
down vote



accepted










It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)




There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.



I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.




Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.




I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.




The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.






share|improve this answer





















  • Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
    – Henrik Hansen
    Apr 17 at 17:23






  • 1




    I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
    – Henrik Hansen
    Apr 18 at 6:48












up vote
1
down vote



accepted







up vote
1
down vote



accepted






It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)




There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.



I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.




Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.




I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.




The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.






share|improve this answer













It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)




There is one issue with stopping the scheduler. You are using the m_doContinue variable to control the while loop of the Run method but there is nowhere any mechanism to stop the LongRunning task.



I think the scheduler should be IDisposable and use a CancellationTokenSource to cancel the Task.Factory.StartNew method that initialized it.




Another thing I'd change is the List<IObserver<DateTime>> m_observers variable. If you made it a HashSet, you wouldn't need the if for Subscribe or you could just use a ConcurrentDictionary and remove the two locks.




I also noticed that the HScheduler has a Subscribe method but isn't derived from the IObservable interface.




The HSecondCounter has two very similar methods: Count and CountAsync. I find this duplication is not necessary. Count could call CountAsync synchronously. In fact, they are identical. Additionaly I expect CountAsync to return a Task so that I can await it (if I wanted to) - this is the usual convention for xAsync methods.







share|improve this answer













share|improve this answer



share|improve this answer











answered Apr 17 at 15:10









t3chb0t

32k54195




32k54195











  • Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
    – Henrik Hansen
    Apr 17 at 17:23






  • 1




    I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
    – Henrik Hansen
    Apr 18 at 6:48
















  • Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
    – Henrik Hansen
    Apr 17 at 17:23






  • 1




    I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
    – Henrik Hansen
    Apr 18 at 6:48















Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
– Henrik Hansen
Apr 17 at 17:23




Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address.
– Henrik Hansen
Apr 17 at 17:23




1




1




I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
– Henrik Hansen
Apr 18 at 6:48




I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/…
– Henrik Hansen
Apr 18 at 6:48












 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192177%2fscheduling-using-system-iobservable%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Greedy Best First Search implementation in Rust

Function to Return a JSON Like Objects Using VBA Collections and Arrays

C++11 CLH Lock Implementation