Send records to messaging queue using either of one policy
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
6
down vote
favorite
I have bunch of keys (clientKey
) and values (processBytes
) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
For each partition, I have bunch of dataHolders
so I am iterating those and then sending it to my messaging queue:-
private void validateAndSend(final DataPartition partition)
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
packet.close();
Packet
class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.
public final class Packet implements Closeable
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();
if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
@Override
public void close()
if (pendingItems > 0)
sendData();
Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:
QueuePolicy
interface:
public interface QueuePolicy
public boolean sendToQueue(final long address, final byte encodedRecords);
QPolicyAsync
class:
public class QPolicyAsync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
QPolicySync
class:
public class QPolicySync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
QPolicySyncWithSocket
class:
public class QPolicySyncWithSocket implements QueuePolicy
private final Socket socket;
public QPolicySyncWithSocket(Socket socket)
this.socket = socket;
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
SendRecord
class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy
implementations:
public class SendRecord
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder
private static final SendRecord INSTANCE = new SendRecord();
public static SendRecord getInstance()
return Holder.INSTANCE;
private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();
, 0, 1, TimeUnit.SECONDS);
// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());
// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();
// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());
// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();
finally
cache.invalidate(address);
return false;
// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);
I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy
implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy
in the Packet
constructor and then sends data via that policy. Each QueuePolicy
implementation calls corresponding method in SendRecord
class.
- Does my
Packet
class needs to know on how data is being sent? I thinkPacket
class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that. - Also, each
QueuePolicy
implementation calls corresponding method ofSendRecord
class. Is this really needed or there is any better way? Can we not get rid ofSendRecord
class and have them implemented in each of these threeQueuePolicy
implementations?
I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?
java object-oriented multithreading design-patterns thread-safety
add a comment |Â
up vote
6
down vote
favorite
I have bunch of keys (clientKey
) and values (processBytes
) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
For each partition, I have bunch of dataHolders
so I am iterating those and then sending it to my messaging queue:-
private void validateAndSend(final DataPartition partition)
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
packet.close();
Packet
class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.
public final class Packet implements Closeable
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();
if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
@Override
public void close()
if (pendingItems > 0)
sendData();
Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:
QueuePolicy
interface:
public interface QueuePolicy
public boolean sendToQueue(final long address, final byte encodedRecords);
QPolicyAsync
class:
public class QPolicyAsync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
QPolicySync
class:
public class QPolicySync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
QPolicySyncWithSocket
class:
public class QPolicySyncWithSocket implements QueuePolicy
private final Socket socket;
public QPolicySyncWithSocket(Socket socket)
this.socket = socket;
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
SendRecord
class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy
implementations:
public class SendRecord
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder
private static final SendRecord INSTANCE = new SendRecord();
public static SendRecord getInstance()
return Holder.INSTANCE;
private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();
, 0, 1, TimeUnit.SECONDS);
// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());
// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();
// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());
// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();
finally
cache.invalidate(address);
return false;
// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);
I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy
implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy
in the Packet
constructor and then sends data via that policy. Each QueuePolicy
implementation calls corresponding method in SendRecord
class.
- Does my
Packet
class needs to know on how data is being sent? I thinkPacket
class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that. - Also, each
QueuePolicy
implementation calls corresponding method ofSendRecord
class. Is this really needed or there is any better way? Can we not get rid ofSendRecord
class and have them implemented in each of these threeQueuePolicy
implementations?
I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?
java object-oriented multithreading design-patterns thread-safety
What do you mean by "either of one policy"? Can't you only have either of many?
â Raimund Krämer
Jan 25 at 10:33
Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
â david
Jan 25 at 20:26
add a comment |Â
up vote
6
down vote
favorite
up vote
6
down vote
favorite
I have bunch of keys (clientKey
) and values (processBytes
) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
For each partition, I have bunch of dataHolders
so I am iterating those and then sending it to my messaging queue:-
private void validateAndSend(final DataPartition partition)
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
packet.close();
Packet
class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.
public final class Packet implements Closeable
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();
if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
@Override
public void close()
if (pendingItems > 0)
sendData();
Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:
QueuePolicy
interface:
public interface QueuePolicy
public boolean sendToQueue(final long address, final byte encodedRecords);
QPolicyAsync
class:
public class QPolicyAsync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
QPolicySync
class:
public class QPolicySync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
QPolicySyncWithSocket
class:
public class QPolicySyncWithSocket implements QueuePolicy
private final Socket socket;
public QPolicySyncWithSocket(Socket socket)
this.socket = socket;
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
SendRecord
class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy
implementations:
public class SendRecord
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder
private static final SendRecord INSTANCE = new SendRecord();
public static SendRecord getInstance()
return Holder.INSTANCE;
private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();
, 0, 1, TimeUnit.SECONDS);
// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());
// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();
// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());
// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();
finally
cache.invalidate(address);
return false;
// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);
I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy
implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy
in the Packet
constructor and then sends data via that policy. Each QueuePolicy
implementation calls corresponding method in SendRecord
class.
- Does my
Packet
class needs to know on how data is being sent? I thinkPacket
class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that. - Also, each
QueuePolicy
implementation calls corresponding method ofSendRecord
class. Is this really needed or there is any better way? Can we not get rid ofSendRecord
class and have them implemented in each of these threeQueuePolicy
implementations?
I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?
java object-oriented multithreading design-patterns thread-safety
I have bunch of keys (clientKey
) and values (processBytes
) that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
For each partition, I have bunch of dataHolders
so I am iterating those and then sending it to my messaging queue:-
private void validateAndSend(final DataPartition partition)
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
packet.close();
Packet
class: This class packs all the keys and values into one byte array and call corresponding implementation passed in the constructor to send data to queue.
public final class Packet implements Closeable
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy)
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
private void addHeader(final ByteBuffer buffer, final int items)
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
private void sendData()
if (itemBuffer.position() == 0)
// no data to be sent
return;
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
public void addAndSendJunked(final byte key, final byte data)
if (key.length > 255)
return;
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE))
sendData();
if (additionalSize > (MAX_SIZE - HEADER_SIZE))
throw new AppConfigurationException("Size of single item exceeds maximum size");
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
@Override
public void close()
if (pendingItems > 0)
sendData();
Now I can send data to my messaging queue in three different ways so for that I created an interface and then having three different implementations:
QueuePolicy
interface:
public interface QueuePolicy
public boolean sendToQueue(final long address, final byte encodedRecords);
QPolicyAsync
class:
public class QPolicyAsync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
QPolicySync
class:
public class QPolicySync implements QueuePolicy
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
QPolicySyncWithSocket
class:
public class QPolicySyncWithSocket implements QueuePolicy
private final Socket socket;
public QPolicySyncWithSocket(Socket socket)
this.socket = socket;
@Override
public boolean sendToQueue(long address, byte encodedRecords)
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
SendRecord
class: This is the actual class which sends data to my messaging queue. It has three different implementations (numbered with 1, 2, 3 as comments) in it and each of those implementations is being called from above QueuePolicy
implementations:
public class SendRecord
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder
private static final SendRecord INSTANCE = new SendRecord();
public static SendRecord getInstance()
return Holder.INSTANCE;
private SendRecord()
executorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
handleRetry();
, 0, 1, TimeUnit.SECONDS);
// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry()
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages)
if (message.hasExpired())
if (message.shouldRetry())
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
else
cache.invalidate(message.getAddress());
// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte encodedRecords)
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket)
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent())
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try
return msg.send(actualSocket.get());
finally
msg.destroy();
// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords)
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());
// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte encodedRecords,
final Optional<Socket> socket)
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try
if (doSendAsync(m, socket))
return m.waitForAck();
finally
cache.invalidate(address);
return false;
// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address)
PendingMessage message = cache.getIfPresent(address);
if (message != null)
message.ackReceived();
cache.invalidate(address);
I have my code working fine.. The idea is very simple: I am sending data to my messaging queue via either of those three QueuePolicy
implementations. It depends on how clients want to send data. As of now I am passing implementation of QueuePolicy
in the Packet
constructor and then sends data via that policy. Each QueuePolicy
implementation calls corresponding method in SendRecord
class.
- Does my
Packet
class needs to know on how data is being sent? I thinkPacket
class is just a container, and that's all it is. I think we shouldn't expect it to know how to transmit itself. A transmitter does that. - Also, each
QueuePolicy
implementation calls corresponding method ofSendRecord
class. Is this really needed or there is any better way? Can we not get rid ofSendRecord
class and have them implemented in each of these threeQueuePolicy
implementations?
I believe with my design, I might be breaking Single Responsibility Principle or not following oops standard properly so wanted to see if we can design this efficiently and if there is any better way? What is the best way by which we can send key(s) and value(s) by packing them in one byte array (following that 50K limit) either through sync or async mode?
java object-oriented multithreading design-patterns thread-safety
edited Feb 1 at 17:43
asked Jan 24 at 20:52
david
42751835
42751835
What do you mean by "either of one policy"? Can't you only have either of many?
â Raimund Krämer
Jan 25 at 10:33
Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
â david
Jan 25 at 20:26
add a comment |Â
What do you mean by "either of one policy"? Can't you only have either of many?
â Raimund Krämer
Jan 25 at 10:33
Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
â david
Jan 25 at 20:26
What do you mean by "either of one policy"? Can't you only have either of many?
â Raimund Krämer
Jan 25 at 10:33
What do you mean by "either of one policy"? Can't you only have either of many?
â Raimund Krämer
Jan 25 at 10:33
Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
â david
Jan 25 at 20:26
Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
â david
Jan 25 at 20:26
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
3
down vote
Your code is pretty neat looking but, alas, I think the architecture is wrong.
Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :
long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;
SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).
I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicy
s, I'll propose a solution for the other problem a bit below.
Thirdly, I think you should abstract the addresses in an Address
class, manipulating long
feels really awkward.
The Address
objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.
Modify the sendToQueue
accordingly ;)
Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :
public interface PacketSender extends Closeable
Address getAddress();
void sendRecord(Packet packet) throws IOException;
With a default SimplePacketSender
that takes an address and the QueuePolicy
.
As already said, if you want to use your cache between your differents QueuePolicy
implementations you may need to inject it through the constructor.
The SimplePacketSender
class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :
public class PacketSenderThatRetries implements PacketSender
private final PacketSender decorated;
public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");
@Override
public Address getAddress()
return decorated.getAddress();
@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method
In the end, creating a retrying packet sender may look like this :
final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));
One last point : some people think it's a bad practice to use Optional
s as parameter in Java ;)
Hope it helps !
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
1
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
Your code is pretty neat looking but, alas, I think the architecture is wrong.
Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :
long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;
SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).
I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicy
s, I'll propose a solution for the other problem a bit below.
Thirdly, I think you should abstract the addresses in an Address
class, manipulating long
feels really awkward.
The Address
objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.
Modify the sendToQueue
accordingly ;)
Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :
public interface PacketSender extends Closeable
Address getAddress();
void sendRecord(Packet packet) throws IOException;
With a default SimplePacketSender
that takes an address and the QueuePolicy
.
As already said, if you want to use your cache between your differents QueuePolicy
implementations you may need to inject it through the constructor.
The SimplePacketSender
class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :
public class PacketSenderThatRetries implements PacketSender
private final PacketSender decorated;
public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");
@Override
public Address getAddress()
return decorated.getAddress();
@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method
In the end, creating a retrying packet sender may look like this :
final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));
One last point : some people think it's a bad practice to use Optional
s as parameter in Java ;)
Hope it helps !
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
1
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
add a comment |Â
up vote
3
down vote
Your code is pretty neat looking but, alas, I think the architecture is wrong.
Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :
long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;
SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).
I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicy
s, I'll propose a solution for the other problem a bit below.
Thirdly, I think you should abstract the addresses in an Address
class, manipulating long
feels really awkward.
The Address
objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.
Modify the sendToQueue
accordingly ;)
Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :
public interface PacketSender extends Closeable
Address getAddress();
void sendRecord(Packet packet) throws IOException;
With a default SimplePacketSender
that takes an address and the QueuePolicy
.
As already said, if you want to use your cache between your differents QueuePolicy
implementations you may need to inject it through the constructor.
The SimplePacketSender
class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :
public class PacketSenderThatRetries implements PacketSender
private final PacketSender decorated;
public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");
@Override
public Address getAddress()
return decorated.getAddress();
@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method
In the end, creating a retrying packet sender may look like this :
final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));
One last point : some people think it's a bad practice to use Optional
s as parameter in Java ;)
Hope it helps !
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
1
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
add a comment |Â
up vote
3
down vote
up vote
3
down vote
Your code is pretty neat looking but, alas, I think the architecture is wrong.
Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :
long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;
SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).
I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicy
s, I'll propose a solution for the other problem a bit below.
Thirdly, I think you should abstract the addresses in an Address
class, manipulating long
feels really awkward.
The Address
objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.
Modify the sendToQueue
accordingly ;)
Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :
public interface PacketSender extends Closeable
Address getAddress();
void sendRecord(Packet packet) throws IOException;
With a default SimplePacketSender
that takes an address and the QueuePolicy
.
As already said, if you want to use your cache between your differents QueuePolicy
implementations you may need to inject it through the constructor.
The SimplePacketSender
class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :
public class PacketSenderThatRetries implements PacketSender
private final PacketSender decorated;
public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");
@Override
public Address getAddress()
return decorated.getAddress();
@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method
In the end, creating a retrying packet sender may look like this :
final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));
One last point : some people think it's a bad practice to use Optional
s as parameter in Java ;)
Hope it helps !
Your code is pretty neat looking but, alas, I think the architecture is wrong.
Packet should be an immutable object acting as a passive container IMO.... by doing this you are removing the following fields :
long address;
long addressFrom;
long addressOrigin;
QueuePolicy policy;
int pendingItems = 0;
SendRecord have multiple problems, the main one being : it does way too much. The others problems that come to my mind is the fact that it's a singleton (making mocking in your tests a royal pain), and the fact that the constructor is starting an executor... constructor doing too many things is a bad practice for some (me included :)).
I think it's that big because you wanted to have a cache shared between all policies and because you wanted to handle the retries of message in it. If i'm right : you should address the first point with dependency injection in the various QueuePolicy
s, I'll propose a solution for the other problem a bit below.
Thirdly, I think you should abstract the addresses in an Address
class, manipulating long
feels really awkward.
The Address
objects may be nothing more than a wrapper around a long, but they will be responsible for ensuring that their wrapped value isn't complete cr*p.
Modify the sendToQueue
accordingly ;)
Then, I'd probably create a PacketSender/PacketWriter interface... making it Closeable (I think it suits your use-case) like this :
public interface PacketSender extends Closeable
Address getAddress();
void sendRecord(Packet packet) throws IOException;
With a default SimplePacketSender
that takes an address and the QueuePolicy
.
As already said, if you want to use your cache between your differents QueuePolicy
implementations you may need to inject it through the constructor.
The SimplePacketSender
class would not retry to send datas by default, it's better to let your users decide whatever they want to retry or not. You could use a parameter for this but I'd rather create a decorator like this :
public class PacketSenderThatRetries implements PacketSender
private final PacketSender decorated;
public PacketSenderThatRetries(final PacketSender decorated)
this.decorated = Objects.requireNonNull(decorated, "decorated must not be null");
@Override
public Address getAddress()
return decorated.getAddress();
@Override
public void sendRecord(final Packet packet) throws IOException
// TODO start a scheduler here and handle retry by calling the decorated method
In the end, creating a retrying packet sender may look like this :
final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
Address address = new Data().address();
try (PacketSender ps = new PacketSenderThatRetries(new SimplePacketSender(
address, new QPolicySync(cache))))
ps.sendRecord(new Packet(partition));
One last point : some people think it's a bad practice to use Optional
s as parameter in Java ;)
Hope it helps !
answered Jan 25 at 10:28
Ronan Dhellemmes
1,147111
1,147111
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
1
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
add a comment |Â
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
1
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
Yeah I also think architecture should be changed as I might not be following oops standard properly. I got some idea from your suggestion but I am confuse as of now how your approach is gonna fit in my code. If possible, can you provide an example basis on my code so that I can understand better.
â david
Jan 26 at 7:39
1
1
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
Lots of great suggestions/solutions are given here. If you struggle to see the whole picture, then I would say - write down small logical rules and how they are connected with each other. Ronan suggested list of rules - if you apply all that immediately - you have a broken full picture then. Write down your logical rules and write down what is suggested and instead of doing one big change - do 10-15 iterations, then you will not suffer with this. This approach works for me and helped me a lot.
â Sergii Nevydanchuk
Feb 6 at 4:42
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f185902%2fsend-records-to-messaging-queue-using-either-of-one-policy%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
What do you mean by "either of one policy"? Can't you only have either of many?
â Raimund Krämer
Jan 25 at 10:33
Sorry what I meant was - I have many policies to choose from and I will pick only one policy to send data on it. I won't mix two or three policies to send same data.
â david
Jan 25 at 20:26