Send records to messaging queue using either of one policy

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
6
down vote

favorite












I have bunch of keys (clientKey) and values (processBytes) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.



For each partition, I have bunch of dataHolders so I am iterating those and then sending it to my messaging queue:-



private void validateAndSend(final DataPartition partition) 
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());

DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());

packet.close();



Packet class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.



public final class Packet implements Closeable 
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;

private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;

public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;


private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);


private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;

final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;


public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;

final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;

final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();

if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");


final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;


@Override
public void close()
if (pendingItems > 0)
sendData();





Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:



QueuePolicy interface:



public interface QueuePolicy 
public boolean sendToQueue(final long address, final byte encodedRecords);



QPolicyAsync class:



public class QPolicyAsync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);




QPolicySync class:



public class QPolicySync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);




QPolicySyncWithSocket class:



public class QPolicySyncWithSocket implements QueuePolicy 
private final Socket socket;

public QPolicySyncWithSocket(Socket socket)
this.socket = socket;


@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));




SendRecord class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy implementations:



public class SendRecord 
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();

private static class Holder
private static final SendRecord INSTANCE = new SendRecord();


public static SendRecord getInstance()
return Holder.INSTANCE;


private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();

, 0, 1, TimeUnit.SECONDS);


// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());





// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());


// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());


ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();



// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());


// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();

finally
cache.invalidate(address);

return false;


// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);





I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy in the Packet constructor and then sends data via that policy. Each QueuePolicy implementation calls corresponding method in SendRecord class.



  • Does my Packet class needs to know on how data is being sent? I think Packet class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that.

  • Also, each QueuePolicy implementation calls corresponding method of SendRecord class. Is this really needed or there is any better way? Can we not get rid of SendRecord class and have them implemented in each of these three QueuePolicy implementations?

I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?







share|improve this question





















  • What do you mean by "either of one policy"? Can't you only have either of many?
    – Raimund Krämer
    Jan 25 at 10:33










  • Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
    – david
    Jan 25 at 20:26
















up vote
6
down vote

favorite












I have bunch of keys (clientKey) and values (processBytes) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.



For each partition, I have bunch of dataHolders so I am iterating those and then sending it to my messaging queue:-



private void validateAndSend(final DataPartition partition) 
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());

DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());

packet.close();



Packet class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.



public final class Packet implements Closeable 
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;

private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;

public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;


private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);


private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;

final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;


public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;

final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;

final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();

if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");


final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;


@Override
public void close()
if (pendingItems > 0)
sendData();





Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:



QueuePolicy interface:



public interface QueuePolicy 
public boolean sendToQueue(final long address, final byte encodedRecords);



QPolicyAsync class:



public class QPolicyAsync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);




QPolicySync class:



public class QPolicySync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);




QPolicySyncWithSocket class:



public class QPolicySyncWithSocket implements QueuePolicy 
private final Socket socket;

public QPolicySyncWithSocket(Socket socket)
this.socket = socket;


@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));




SendRecord class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy implementations:



public class SendRecord 
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();

private static class Holder
private static final SendRecord INSTANCE = new SendRecord();


public static SendRecord getInstance()
return Holder.INSTANCE;


private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();

, 0, 1, TimeUnit.SECONDS);


// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());





// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());


// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());


ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();



// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());


// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();

finally
cache.invalidate(address);

return false;


// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);





I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy in the Packet constructor and then sends data via that policy. Each QueuePolicy implementation calls corresponding method in SendRecord class.



  • Does my Packet class needs to know on how data is being sent? I think Packet class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that.

  • Also, each QueuePolicy implementation calls corresponding method of SendRecord class. Is this really needed or there is any better way? Can we not get rid of SendRecord class and have them implemented in each of these three QueuePolicy implementations?

I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?







share|improve this question





















  • What do you mean by "either of one policy"? Can't you only have either of many?
    – Raimund Krämer
    Jan 25 at 10:33










  • Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
    – david
    Jan 25 at 20:26












up vote
6
down vote

favorite









up vote
6
down vote

favorite











I have bunch of keys (clientKey) and values (processBytes) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.



For each partition, I have bunch of dataHolders so I am iterating those and then sending it to my messaging queue:-



private void validateAndSend(final DataPartition partition) 
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());

DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());

packet.close();



Packet class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.



public final class Packet implements Closeable 
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;

private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;

public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;


private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);


private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;

final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;


public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;

final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;

final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();

if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");


final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;


@Override
public void close()
if (pendingItems > 0)
sendData();





Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:



QueuePolicy interface:



public interface QueuePolicy 
public boolean sendToQueue(final long address, final byte encodedRecords);



QPolicyAsync class:



public class QPolicyAsync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);




QPolicySync class:



public class QPolicySync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);




QPolicySyncWithSocket class:



public class QPolicySyncWithSocket implements QueuePolicy 
private final Socket socket;

public QPolicySyncWithSocket(Socket socket)
this.socket = socket;


@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));




SendRecord class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy implementations:



public class SendRecord 
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();

private static class Holder
private static final SendRecord INSTANCE = new SendRecord();


public static SendRecord getInstance()
return Holder.INSTANCE;


private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();

, 0, 1, TimeUnit.SECONDS);


// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());





// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());


// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());


ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();



// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());


// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();

finally
cache.invalidate(address);

return false;


// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);





I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy in the Packet constructor and then sends data via that policy. Each QueuePolicy implementation calls corresponding method in SendRecord class.



  • Does my Packet class needs to know on how data is being sent? I think Packet class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that.

  • Also, each QueuePolicy implementation calls corresponding method of SendRecord class. Is this really needed or there is any better way? Can we not get rid of SendRecord class and have them implemented in each of these three QueuePolicy implementations?

I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?







share|improve this question













I have bunch of keys (clientKey) and values (processBytes) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.



For each partition, I have bunch of dataHolders so I am iterating those and then sending it to my messaging queue:-



private void validateAndSend(final DataPartition partition) 
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());

DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());

packet.close();



Packet class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.



public final class Packet implements Closeable 
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;

private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;

public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;


private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);


private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;

final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;


public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;

final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;

final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();

if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");


final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;


@Override
public void close()
if (pendingItems > 0)
sendData();





Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:



QueuePolicy interface:



public interface QueuePolicy 
public boolean sendToQueue(final long address, final byte encodedRecords);



QPolicyAsync class:



public class QPolicyAsync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);




QPolicySync class:



public class QPolicySync implements QueuePolicy 

@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);




QPolicySyncWithSocket class:



public class QPolicySyncWithSocket implements QueuePolicy 
private final Socket socket;

public QPolicySyncWithSocket(Socket socket)
this.socket = socket;


@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));




SendRecord class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy implementations:



public class SendRecord 
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();

private static class Holder
private static final SendRecord INSTANCE = new SendRecord();


public static SendRecord getInstance()
return Holder.INSTANCE;


private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();

, 0, 1, TimeUnit.SECONDS);


// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());





// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());


// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());


ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();



// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());


// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();

finally
cache.invalidate(address);

return false;


// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);





I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy in the Packet constructor and then sends data via that policy. Each QueuePolicy implementation calls corresponding method in SendRecord class.



  • Does my Packet class needs to know on how data is being sent? I think Packet class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that.

  • Also, each QueuePolicy implementation calls corresponding method of SendRecord class. Is this really needed or there is any better way? Can we not get rid of SendRecord class and have them implemented in each of these three QueuePolicy implementations?

I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?









share|improve this question












share|improve this question




share|improve this question








edited Feb 1 at 17:43
























asked Jan 24 at 20:52









david

42751835




42751835











  • What do you mean by "either of one policy"? Can't you only have either of many?
    – Raimund Krämer
    Jan 25 at 10:33










  • Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
    – david
    Jan 25 at 20:26
















  • What do you mean by "either of one policy"? Can't you only have either of many?
    – Raimund Krämer
    Jan 25 at 10:33










  • Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
    – david
    Jan 25 at 20:26















What do you mean by "either of one policy"? Can't you only have either of many?
– Raimund Krämer
Jan 25 at 10:33




What do you mean by "either of one policy"? Can't you only have either of many?
– Raimund Krämer
Jan 25 at 10:33












Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
– david
Jan 25 at 20:26




Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
– david
Jan 25 at 20:26










1 Answer
1






active

oldest

votes

















up vote
3
down vote













Your code is pretty neat looking but, alas, I think the architecture is wrong.



Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :

long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;



SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).

I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicys, I'll propose a solution for the other problem a bit below.



Thirdly, I think you should abstract the addresses in an Address class, manipulating long feels really awkward.

The Address objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.

Modify the sendToQueue accordingly ;)



Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :



public interface PacketSender extends Closeable 

Address getAddress();

void sendRecord(Packet packet) throws IOException;



With a default SimplePacketSender that takes an address and the QueuePolicy.



As already said, if you want to use your cache between your differents QueuePolicy implementations you may need to inject it through the constructor.



The SimplePacketSender class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :



public class PacketSenderThatRetries implements PacketSender 

private final PacketSender decorated;

public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");


@Override
public Address getAddress()
return decorated.getAddress();


@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method




In the end, creating a retrying packet sender may look like this :



final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));



One last point : some people think it's a bad practice to use Optionals as parameter in Java ;)



Hope it helps !






share|improve this answer





















  • Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
    – david
    Jan 26 at 7:39







  • 1




    Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
    – Sergii Nevydanchuk
    Feb 6 at 4:42










Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f185902%2fsend-records-to-messaging-queue-using-either-of-one-policy%23new-answer', 'question_page');

);

Post as a guest






























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
3
down vote













Your code is pretty neat looking but, alas, I think the architecture is wrong.



Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :

long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;



SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).

I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicys, I'll propose a solution for the other problem a bit below.



Thirdly, I think you should abstract the addresses in an Address class, manipulating long feels really awkward.

The Address objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.

Modify the sendToQueue accordingly ;)



Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :



public interface PacketSender extends Closeable 

Address getAddress();

void sendRecord(Packet packet) throws IOException;



With a default SimplePacketSender that takes an address and the QueuePolicy.



As already said, if you want to use your cache between your differents QueuePolicy implementations you may need to inject it through the constructor.



The SimplePacketSender class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :



public class PacketSenderThatRetries implements PacketSender 

private final PacketSender decorated;

public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");


@Override
public Address getAddress()
return decorated.getAddress();


@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method




In the end, creating a retrying packet sender may look like this :



final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));



One last point : some people think it's a bad practice to use Optionals as parameter in Java ;)



Hope it helps !






share|improve this answer





















  • Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
    – david
    Jan 26 at 7:39







  • 1




    Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
    – Sergii Nevydanchuk
    Feb 6 at 4:42














up vote
3
down vote













Your code is pretty neat looking but, alas, I think the architecture is wrong.



Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :

long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;



SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).

I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicys, I'll propose a solution for the other problem a bit below.



Thirdly, I think you should abstract the addresses in an Address class, manipulating long feels really awkward.

The Address objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.

Modify the sendToQueue accordingly ;)



Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :



public interface PacketSender extends Closeable 

Address getAddress();

void sendRecord(Packet packet) throws IOException;



With a default SimplePacketSender that takes an address and the QueuePolicy.



As already said, if you want to use your cache between your differents QueuePolicy implementations you may need to inject it through the constructor.



The SimplePacketSender class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :



public class PacketSenderThatRetries implements PacketSender 

private final PacketSender decorated;

public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");


@Override
public Address getAddress()
return decorated.getAddress();


@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method




In the end, creating a retrying packet sender may look like this :



final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));



One last point : some people think it's a bad practice to use Optionals as parameter in Java ;)



Hope it helps !






share|improve this answer





















  • Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
    – david
    Jan 26 at 7:39







  • 1




    Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
    – Sergii Nevydanchuk
    Feb 6 at 4:42












up vote
3
down vote










up vote
3
down vote









Your code is pretty neat looking but, alas, I think the architecture is wrong.



Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :

long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;



SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).

I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicys, I'll propose a solution for the other problem a bit below.



Thirdly, I think you should abstract the addresses in an Address class, manipulating long feels really awkward.

The Address objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.

Modify the sendToQueue accordingly ;)



Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :



public interface PacketSender extends Closeable 

Address getAddress();

void sendRecord(Packet packet) throws IOException;



With a default SimplePacketSender that takes an address and the QueuePolicy.



As already said, if you want to use your cache between your differents QueuePolicy implementations you may need to inject it through the constructor.



The SimplePacketSender class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :



public class PacketSenderThatRetries implements PacketSender 

private final PacketSender decorated;

public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");


@Override
public Address getAddress()
return decorated.getAddress();


@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method




In the end, creating a retrying packet sender may look like this :



final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));



One last point : some people think it's a bad practice to use Optionals as parameter in Java ;)



Hope it helps !






share|improve this answer













Your code is pretty neat looking but, alas, I think the architecture is wrong.



Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :

long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;



SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).

I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicys, I'll propose a solution for the other problem a bit below.



Thirdly, I think you should abstract the addresses in an Address class, manipulating long feels really awkward.

The Address objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.

Modify the sendToQueue accordingly ;)



Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :



public interface PacketSender extends Closeable 

Address getAddress();

void sendRecord(Packet packet) throws IOException;



With a default SimplePacketSender that takes an address and the QueuePolicy.



As already said, if you want to use your cache between your differents QueuePolicy implementations you may need to inject it through the constructor.



The SimplePacketSender class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :



public class PacketSenderThatRetries implements PacketSender 

private final PacketSender decorated;

public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");


@Override
public Address getAddress()
return decorated.getAddress();


@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method




In the end, creating a retrying packet sender may look like this :



final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));



One last point : some people think it's a bad practice to use Optionals as parameter in Java ;)



Hope it helps !







share|improve this answer













share|improve this answer



share|improve this answer











answered Jan 25 at 10:28









Ronan Dhellemmes

1,147111




1,147111











  • Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
    – david
    Jan 26 at 7:39







  • 1




    Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
    – Sergii Nevydanchuk
    Feb 6 at 4:42
















  • Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
    – david
    Jan 26 at 7:39







  • 1




    Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
    – Sergii Nevydanchuk
    Feb 6 at 4:42















Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
– david
Jan 26 at 7:39





Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
– david
Jan 26 at 7:39





1




1




Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
– Sergii Nevydanchuk
Feb 6 at 4:42




Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
– Sergii Nevydanchuk
Feb 6 at 4:42












 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f185902%2fsend-records-to-messaging-queue-using-either-of-one-policy%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Chat program with C++ and SFML

Function to Return a JSON Like Objects Using VBA Collections and Arrays

Will my employers contract hold up in court?