In Multi threaded environment, execute same group tasks in linear

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
0
down vote

favorite












Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.



Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.



Please let me know is there any issue with this code.



public class CloneExecutorTest 
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");



public class ThreadBlocker implements Runnable

Thread thread = new Thread(this);

private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;

private CurrentState state = CurrentState.STARTED;

public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;


public CurrentState getState()
synchronized (this)
return state;



public void setState(CurrentState state)
synchronized (this)
this.state = state;



public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());

if (timer != null)
timer.cancel(); //Terminate the timer thread

final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);



public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());

timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);

if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();


System.out.println("wait released.." + thread.getId() + thread.getState());


catch (InterruptedException e)
e.printStackTrace(); //Default exception handling



@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();




try
setState(CurrentState.RUNNING);

//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();

System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();




private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED


class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();

timer.cancel();




public class CloneExecutor

private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;

private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);


public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();

return datastoreCloneExecut;



public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");





public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);

executor.submit(threadBlocker);








share|improve this question















  • 1




    I don't understand why you don't use simply a map of single-threaded executors. That is, a Map<Integer, ExecutorService>. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
    – janos
    May 10 at 13:24
















up vote
0
down vote

favorite












Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.



Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.



Please let me know is there any issue with this code.



public class CloneExecutorTest 
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");



public class ThreadBlocker implements Runnable

Thread thread = new Thread(this);

private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;

private CurrentState state = CurrentState.STARTED;

public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;


public CurrentState getState()
synchronized (this)
return state;



public void setState(CurrentState state)
synchronized (this)
this.state = state;



public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());

if (timer != null)
timer.cancel(); //Terminate the timer thread

final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);



public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());

timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);

if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();


System.out.println("wait released.." + thread.getId() + thread.getState());


catch (InterruptedException e)
e.printStackTrace(); //Default exception handling



@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();




try
setState(CurrentState.RUNNING);

//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();

System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();




private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED


class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();

timer.cancel();




public class CloneExecutor

private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;

private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);


public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();

return datastoreCloneExecut;



public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");





public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);

executor.submit(threadBlocker);








share|improve this question















  • 1




    I don't understand why you don't use simply a map of single-threaded executors. That is, a Map<Integer, ExecutorService>. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
    – janos
    May 10 at 13:24












up vote
0
down vote

favorite









up vote
0
down vote

favorite











Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.



Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.



Please let me know is there any issue with this code.



public class CloneExecutorTest 
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");



public class ThreadBlocker implements Runnable

Thread thread = new Thread(this);

private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;

private CurrentState state = CurrentState.STARTED;

public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;


public CurrentState getState()
synchronized (this)
return state;



public void setState(CurrentState state)
synchronized (this)
this.state = state;



public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());

if (timer != null)
timer.cancel(); //Terminate the timer thread

final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);



public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());

timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);

if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();


System.out.println("wait released.." + thread.getId() + thread.getState());


catch (InterruptedException e)
e.printStackTrace(); //Default exception handling



@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();




try
setState(CurrentState.RUNNING);

//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();

System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();




private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED


class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();

timer.cancel();




public class CloneExecutor

private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;

private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);


public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();

return datastoreCloneExecut;



public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");





public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);

executor.submit(threadBlocker);








share|improve this question











Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.



Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.



Please let me know is there any issue with this code.



public class CloneExecutorTest 
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");



public class ThreadBlocker implements Runnable

Thread thread = new Thread(this);

private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;

private CurrentState state = CurrentState.STARTED;

public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;


public CurrentState getState()
synchronized (this)
return state;



public void setState(CurrentState state)
synchronized (this)
this.state = state;



public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());

if (timer != null)
timer.cancel(); //Terminate the timer thread

final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);



public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());

timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);

if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();


System.out.println("wait released.." + thread.getId() + thread.getState());


catch (InterruptedException e)
e.printStackTrace(); //Default exception handling



@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();




try
setState(CurrentState.RUNNING);

//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();

System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();




private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED


class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();

timer.cancel();




public class CloneExecutor

private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;

private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);


public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();

return datastoreCloneExecut;



public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");





public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);

executor.submit(threadBlocker);










share|improve this question










share|improve this question




share|improve this question









asked May 10 at 12:18









asvignesh

101




101







  • 1




    I don't understand why you don't use simply a map of single-threaded executors. That is, a Map<Integer, ExecutorService>. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
    – janos
    May 10 at 13:24












  • 1




    I don't understand why you don't use simply a map of single-threaded executors. That is, a Map<Integer, ExecutorService>. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
    – janos
    May 10 at 13:24







1




1




I don't understand why you don't use simply a map of single-threaded executors. That is, a Map<Integer, ExecutorService>. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
– janos
May 10 at 13:24




I don't understand why you don't use simply a map of single-threaded executors. That is, a Map<Integer, ExecutorService>. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
– janos
May 10 at 13:24















active

oldest

votes











Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194097%2fin-multi-threaded-environment-execute-same-group-tasks-in-linear%23new-answer', 'question_page');

);

Post as a guest



































active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes










 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194097%2fin-multi-threaded-environment-execute-same-group-tasks-in-linear%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Chat program with C++ and SFML

Function to Return a JSON Like Objects Using VBA Collections and Arrays

Will my employers contract hold up in court?