In Multi threaded environment, execute same group tasks in linear
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
0
down vote
favorite
Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.
Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.
Please let me know is there any issue with this code.
public class CloneExecutorTest
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");
public class ThreadBlocker implements Runnable
Thread thread = new Thread(this);
private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;
private CurrentState state = CurrentState.STARTED;
public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;
public CurrentState getState()
synchronized (this)
return state;
public void setState(CurrentState state)
synchronized (this)
this.state = state;
public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());
if (timer != null)
timer.cancel(); //Terminate the timer thread
final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);
public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());
timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);
if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();
System.out.println("wait released.." + thread.getId() + thread.getState());
catch (InterruptedException e)
e.printStackTrace(); //Default exception handling
@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();
try
setState(CurrentState.RUNNING);
//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();
private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED
class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();
timer.cancel();
public class CloneExecutor
private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;
private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);
public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();
return datastoreCloneExecut;
public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");
public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);
executor.submit(threadBlocker);
java multithreading thread-safety
add a comment |Â
up vote
0
down vote
favorite
Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.
Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.
Please let me know is there any issue with this code.
public class CloneExecutorTest
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");
public class ThreadBlocker implements Runnable
Thread thread = new Thread(this);
private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;
private CurrentState state = CurrentState.STARTED;
public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;
public CurrentState getState()
synchronized (this)
return state;
public void setState(CurrentState state)
synchronized (this)
this.state = state;
public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());
if (timer != null)
timer.cancel(); //Terminate the timer thread
final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);
public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());
timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);
if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();
System.out.println("wait released.." + thread.getId() + thread.getState());
catch (InterruptedException e)
e.printStackTrace(); //Default exception handling
@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();
try
setState(CurrentState.RUNNING);
//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();
private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED
class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();
timer.cancel();
public class CloneExecutor
private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;
private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);
public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();
return datastoreCloneExecut;
public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");
public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);
executor.submit(threadBlocker);
java multithreading thread-safety
1
I don't understand why you don't use simply a map of single-threaded executors. That is, aMap<Integer, ExecutorService>
. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
â janos
May 10 at 13:24
add a comment |Â
up vote
0
down vote
favorite
up vote
0
down vote
favorite
Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.
Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.
Please let me know is there any issue with this code.
public class CloneExecutorTest
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");
public class ThreadBlocker implements Runnable
Thread thread = new Thread(this);
private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;
private CurrentState state = CurrentState.STARTED;
public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;
public CurrentState getState()
synchronized (this)
return state;
public void setState(CurrentState state)
synchronized (this)
this.state = state;
public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());
if (timer != null)
timer.cancel(); //Terminate the timer thread
final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);
public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());
timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);
if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();
System.out.println("wait released.." + thread.getId() + thread.getState());
catch (InterruptedException e)
e.printStackTrace(); //Default exception handling
@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();
try
setState(CurrentState.RUNNING);
//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();
private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED
class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();
timer.cancel();
public class CloneExecutor
private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;
private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);
public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();
return datastoreCloneExecut;
public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");
public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);
executor.submit(threadBlocker);
java multithreading thread-safety
Problem Statement:
There will be a lot of requests to do a specific type of operation with different parameters.
We have to do execution in parallel but In one scenario where operation from same parent Id (lunId) has to run in Linear.
Solution Tried:
I implemented a map which records the parentId ( lunId ) and the executing thread, and an executor which executes.
Checking in the Map if any thread already executing for parent Id, then set the thread wait and executing thread will notify once it is completed.
Please let me know is there any issue with this code.
public class CloneExecutorTest
public static void main(String args)
CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.submitExecution("1", "a");
datastoreCloneExecuter.submitExecution("1", "c");
datastoreCloneExecuter.submitExecution("1", "b");
// datastoreCloneExecuter.submitExecution("1", "d");
// datastoreCloneExecuter.submitExecution("1", "e");
public class ThreadBlocker implements Runnable
Thread thread = new Thread(this);
private Timer timer;
private ThreadBlocker preceederThread;
private String lunId;
private String cloneName;
private long timeout = 15 * 60 * 1000;
private CurrentState state = CurrentState.STARTED;
public ThreadBlocker(ThreadBlocker preceederThread, String lunId, String cloneName)
setState(CurrentState.RUNNING);
this.preceederThread = preceederThread;
this.lunId = lunId;
this.cloneName = cloneName;
public CurrentState getState()
synchronized (this)
return state;
public void setState(CurrentState state)
synchronized (this)
this.state = state;
public void unblock()
System.out.println("Unblocking thread .." + thread.getId() + thread.getState());
final Long id = Long.valueOf(thread.getId());
synchronized (id)
id.notify();
System.out.println("Notified thread .." + thread.getId() + thread.getState());
if (timer != null)
timer.cancel(); //Terminate the timer thread
final CloneExecutor datastoreCloneExecuter = CloneExecutor.getInstance();
datastoreCloneExecuter.deleteExecutingObjects(lunId, this);
public void block()
System.out.println("blocking thread .." + thread.getId() + thread.getState());
timer = new Timer();
timer.schedule(new RemindTask(), timeout);
try
setState(CurrentState.WAITING);
if (preceederThread != null)
final Long value = Long.valueOf(preceederThread.thread.getId());
synchronized (value)
if (!CurrentState.COMPLETED.equals(preceederThread.getState()))
System.out.println("Thread set to waiting .." + thread.getId() + thread.getState());
value.wait();
System.out.println("wait released.." + thread.getId() + thread.getState());
catch (InterruptedException e)
e.printStackTrace(); //Default exception handling
@Override
public void run()
System.out.println("Entered RUN " + lunId + " With clone name " + cloneName + " Thread ID" + thread.getId());
if (preceederThread != null)
final CurrentState state = preceederThread.getState();
synchronized (state)
if (!CurrentState.COMPLETED.equals(state))
block();
try
setState(CurrentState.RUNNING);
//Starting of core imlementation
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Created Datastore of " + lunId + " With clone name " + cloneName);
//End of core imlementation
finally
unblock();
private enum CurrentState
STARTED, WAITING, RUNNING, COMPLETED
class RemindTask extends TimerTask
public void run()
System.out.println("Unblocking thread ..");
synchronized (preceederThread)
preceederThread.notify();
timer.cancel();
public class CloneExecutor
private static CloneExecutor datastoreCloneExecut;
private static Object mutex = new Object();
private ExecutorService executor;
private Map<String, ThreadBlocker> executingObjects;
private CloneExecutor()
executingObjects = new ConcurrentHashMap<>();
executor = Executors.newFixedThreadPool(100);
public static CloneExecutor getInstance()
synchronized (mutex)
if (datastoreCloneExecut == null)
datastoreCloneExecut = new CloneExecutor();
return datastoreCloneExecut;
public void deleteExecutingObjects(String lunId, ThreadBlocker threadBlocker)
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
if (lunHold.thread.getId() == threadBlocker.thread.getId())
executingObjects.remove(lunId);
System.out.println("Removed from Map");
public void submitExecution(String lunId, String cloneName)
ThreadBlocker threadBlocker;
synchronized (executingObjects)
ThreadBlocker lunHold = executingObjects.get(lunId);
threadBlocker = new ThreadBlocker(lunHold, lunId, cloneName);
executingObjects.put(lunId, threadBlocker);
executor.submit(threadBlocker);
java multithreading thread-safety
asked May 10 at 12:18
asvignesh
101
101
1
I don't understand why you don't use simply a map of single-threaded executors. That is, aMap<Integer, ExecutorService>
. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
â janos
May 10 at 13:24
add a comment |Â
1
I don't understand why you don't use simply a map of single-threaded executors. That is, aMap<Integer, ExecutorService>
. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?
â janos
May 10 at 13:24
1
1
I don't understand why you don't use simply a map of single-threaded executors. That is, a
Map<Integer, ExecutorService>
. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?â janos
May 10 at 13:24
I don't understand why you don't use simply a map of single-threaded executors. That is, a
Map<Integer, ExecutorService>
. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?â janos
May 10 at 13:24
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194097%2fin-multi-threaded-environment-execute-same-group-tasks-in-linear%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
1
I don't understand why you don't use simply a map of single-threaded executors. That is, a
Map<Integer, ExecutorService>
. If an entry doesn't exist, you create it and submit to it, which will run in parallel with other executors currently in progress. If an entry exists, you submit to it, and the single-threaded executor will queue it after the currently executing task, if any. This simple design seems to satisfy your requirement. Am I missing something?â janos
May 10 at 13:24