Producer Consumer Using TPL Dataflow
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
2
down vote
favorite
I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.
I would like it to be reviewed for any sort of improvement.
public class BatchMessagePublisher : IMessagePublisher
private const int PublishBatchSize = 10;
private readonly IAmazonSQS _sqsClient;
private readonly ILogger<BatchMessagePublisher> _logger;
private readonly string _queueUrl;
private readonly bool _isFifo;
private readonly BatchBlock<Message> _messageBatcher;
private readonly Task _processorTask;
public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)
_sqsClient = sqsClient;
_logger = logger;
_queueUrl = queueUrl;
_isFifo = isFifo;
_messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions
BoundedCapacity = 1000
);
_processorTask = StartAsync();
private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)
var request = new SendMessageBatchRequestEntry
Id = message.MessageId,
MessageBody = message.MessageBody,
MessageAttributes =
["MessageType"] = new MessageAttributeValue
DataType = "String",
StringValue = message.MessageType
;
if (_isFifo)
request.MessageGroupId = message.GroupId;
request.MessageDeduplicationId = message.MessageId;
return request;
private async Task StartAsync()
while (!_messageBatcher.Completion.IsCompleted)
var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
.Select(CreateBatchRequestEntry)
.ToList();
_logger.LogDebug($"Publishing PublishBatchSize messages...");
var messageIds = string.Join(", ", batch.Select(r => r.Id));
try
var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);
if (batchResponse.Failed.Any())
var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds: errors");
_logger.LogDebug($"Published batch of messages with IDs messageIds");
catch (AmazonServiceException ex)
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds.", ex);
public void PublishMessages(params Message messages)
foreach (var message in messages)
_messageBatcher.Post(message);
public void Dispose()
_messageBatcher.TriggerBatch();
_messageBatcher.Complete();
_processorTask.Wait();
_processorTask.Dispose();
_sqsClient.Dispose();
c# multithreading queue producer-consumer tpl-dataflow
add a comment |Â
up vote
2
down vote
favorite
I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.
I would like it to be reviewed for any sort of improvement.
public class BatchMessagePublisher : IMessagePublisher
private const int PublishBatchSize = 10;
private readonly IAmazonSQS _sqsClient;
private readonly ILogger<BatchMessagePublisher> _logger;
private readonly string _queueUrl;
private readonly bool _isFifo;
private readonly BatchBlock<Message> _messageBatcher;
private readonly Task _processorTask;
public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)
_sqsClient = sqsClient;
_logger = logger;
_queueUrl = queueUrl;
_isFifo = isFifo;
_messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions
BoundedCapacity = 1000
);
_processorTask = StartAsync();
private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)
var request = new SendMessageBatchRequestEntry
Id = message.MessageId,
MessageBody = message.MessageBody,
MessageAttributes =
["MessageType"] = new MessageAttributeValue
DataType = "String",
StringValue = message.MessageType
;
if (_isFifo)
request.MessageGroupId = message.GroupId;
request.MessageDeduplicationId = message.MessageId;
return request;
private async Task StartAsync()
while (!_messageBatcher.Completion.IsCompleted)
var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
.Select(CreateBatchRequestEntry)
.ToList();
_logger.LogDebug($"Publishing PublishBatchSize messages...");
var messageIds = string.Join(", ", batch.Select(r => r.Id));
try
var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);
if (batchResponse.Failed.Any())
var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds: errors");
_logger.LogDebug($"Published batch of messages with IDs messageIds");
catch (AmazonServiceException ex)
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds.", ex);
public void PublishMessages(params Message messages)
foreach (var message in messages)
_messageBatcher.Post(message);
public void Dispose()
_messageBatcher.TriggerBatch();
_messageBatcher.Complete();
_processorTask.Wait();
_processorTask.Dispose();
_sqsClient.Dispose();
c# multithreading queue producer-consumer tpl-dataflow
add a comment |Â
up vote
2
down vote
favorite
up vote
2
down vote
favorite
I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.
I would like it to be reviewed for any sort of improvement.
public class BatchMessagePublisher : IMessagePublisher
private const int PublishBatchSize = 10;
private readonly IAmazonSQS _sqsClient;
private readonly ILogger<BatchMessagePublisher> _logger;
private readonly string _queueUrl;
private readonly bool _isFifo;
private readonly BatchBlock<Message> _messageBatcher;
private readonly Task _processorTask;
public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)
_sqsClient = sqsClient;
_logger = logger;
_queueUrl = queueUrl;
_isFifo = isFifo;
_messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions
BoundedCapacity = 1000
);
_processorTask = StartAsync();
private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)
var request = new SendMessageBatchRequestEntry
Id = message.MessageId,
MessageBody = message.MessageBody,
MessageAttributes =
["MessageType"] = new MessageAttributeValue
DataType = "String",
StringValue = message.MessageType
;
if (_isFifo)
request.MessageGroupId = message.GroupId;
request.MessageDeduplicationId = message.MessageId;
return request;
private async Task StartAsync()
while (!_messageBatcher.Completion.IsCompleted)
var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
.Select(CreateBatchRequestEntry)
.ToList();
_logger.LogDebug($"Publishing PublishBatchSize messages...");
var messageIds = string.Join(", ", batch.Select(r => r.Id));
try
var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);
if (batchResponse.Failed.Any())
var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds: errors");
_logger.LogDebug($"Published batch of messages with IDs messageIds");
catch (AmazonServiceException ex)
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds.", ex);
public void PublishMessages(params Message messages)
foreach (var message in messages)
_messageBatcher.Post(message);
public void Dispose()
_messageBatcher.TriggerBatch();
_messageBatcher.Complete();
_processorTask.Wait();
_processorTask.Dispose();
_sqsClient.Dispose();
c# multithreading queue producer-consumer tpl-dataflow
I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.
I would like it to be reviewed for any sort of improvement.
public class BatchMessagePublisher : IMessagePublisher
private const int PublishBatchSize = 10;
private readonly IAmazonSQS _sqsClient;
private readonly ILogger<BatchMessagePublisher> _logger;
private readonly string _queueUrl;
private readonly bool _isFifo;
private readonly BatchBlock<Message> _messageBatcher;
private readonly Task _processorTask;
public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)
_sqsClient = sqsClient;
_logger = logger;
_queueUrl = queueUrl;
_isFifo = isFifo;
_messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions
BoundedCapacity = 1000
);
_processorTask = StartAsync();
private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)
var request = new SendMessageBatchRequestEntry
Id = message.MessageId,
MessageBody = message.MessageBody,
MessageAttributes =
["MessageType"] = new MessageAttributeValue
DataType = "String",
StringValue = message.MessageType
;
if (_isFifo)
request.MessageGroupId = message.GroupId;
request.MessageDeduplicationId = message.MessageId;
return request;
private async Task StartAsync()
while (!_messageBatcher.Completion.IsCompleted)
var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
.Select(CreateBatchRequestEntry)
.ToList();
_logger.LogDebug($"Publishing PublishBatchSize messages...");
var messageIds = string.Join(", ", batch.Select(r => r.Id));
try
var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);
if (batchResponse.Failed.Any())
var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds: errors");
_logger.LogDebug($"Published batch of messages with IDs messageIds");
catch (AmazonServiceException ex)
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds.", ex);
public void PublishMessages(params Message messages)
foreach (var message in messages)
_messageBatcher.Post(message);
public void Dispose()
_messageBatcher.TriggerBatch();
_messageBatcher.Complete();
_processorTask.Wait();
_processorTask.Dispose();
_sqsClient.Dispose();
c# multithreading queue producer-consumer tpl-dataflow
edited Feb 8 at 13:54
asked Feb 8 at 10:40
xerxes
200110
200110
add a comment |Â
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f187085%2fproducer-consumer-using-tpl-dataflow%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password