Producer Consumer Using TPL Dataflow

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
2
down vote

favorite












I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.



I would like it to be reviewed for any sort of improvement.



public class BatchMessagePublisher : IMessagePublisher

private const int PublishBatchSize = 10;

private readonly IAmazonSQS _sqsClient;
private readonly ILogger<BatchMessagePublisher> _logger;
private readonly string _queueUrl;
private readonly bool _isFifo;
private readonly BatchBlock<Message> _messageBatcher;
private readonly Task _processorTask;

public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)

_sqsClient = sqsClient;
_logger = logger;
_queueUrl = queueUrl;
_isFifo = isFifo;

_messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions

BoundedCapacity = 1000
);

_processorTask = StartAsync();



private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)

var request = new SendMessageBatchRequestEntry

Id = message.MessageId,
MessageBody = message.MessageBody,
MessageAttributes =

["MessageType"] = new MessageAttributeValue

DataType = "String",
StringValue = message.MessageType


;

if (_isFifo)

request.MessageGroupId = message.GroupId;
request.MessageDeduplicationId = message.MessageId;


return request;


private async Task StartAsync()

while (!_messageBatcher.Completion.IsCompleted)

var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
.Select(CreateBatchRequestEntry)
.ToList();

_logger.LogDebug($"Publishing PublishBatchSize messages...");

var messageIds = string.Join(", ", batch.Select(r => r.Id));

try

var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);

if (batchResponse.Failed.Any())

var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds: errors");


_logger.LogDebug($"Published batch of messages with IDs messageIds");

catch (AmazonServiceException ex)

throw new MigrationException(
$"Failed to publish batch of messages with IDs messageIds.", ex);




public void PublishMessages(params Message messages)

foreach (var message in messages)

_messageBatcher.Post(message);



public void Dispose()

_messageBatcher.TriggerBatch();
_messageBatcher.Complete();
_processorTask.Wait();
_processorTask.Dispose();
_sqsClient.Dispose();








share|improve this question



























    up vote
    2
    down vote

    favorite












    I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.



    I would like it to be reviewed for any sort of improvement.



    public class BatchMessagePublisher : IMessagePublisher

    private const int PublishBatchSize = 10;

    private readonly IAmazonSQS _sqsClient;
    private readonly ILogger<BatchMessagePublisher> _logger;
    private readonly string _queueUrl;
    private readonly bool _isFifo;
    private readonly BatchBlock<Message> _messageBatcher;
    private readonly Task _processorTask;

    public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)

    _sqsClient = sqsClient;
    _logger = logger;
    _queueUrl = queueUrl;
    _isFifo = isFifo;

    _messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions

    BoundedCapacity = 1000
    );

    _processorTask = StartAsync();



    private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)

    var request = new SendMessageBatchRequestEntry

    Id = message.MessageId,
    MessageBody = message.MessageBody,
    MessageAttributes =

    ["MessageType"] = new MessageAttributeValue

    DataType = "String",
    StringValue = message.MessageType


    ;

    if (_isFifo)

    request.MessageGroupId = message.GroupId;
    request.MessageDeduplicationId = message.MessageId;


    return request;


    private async Task StartAsync()

    while (!_messageBatcher.Completion.IsCompleted)

    var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
    .Select(CreateBatchRequestEntry)
    .ToList();

    _logger.LogDebug($"Publishing PublishBatchSize messages...");

    var messageIds = string.Join(", ", batch.Select(r => r.Id));

    try

    var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);

    if (batchResponse.Failed.Any())

    var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
    throw new MigrationException(
    $"Failed to publish batch of messages with IDs messageIds: errors");


    _logger.LogDebug($"Published batch of messages with IDs messageIds");

    catch (AmazonServiceException ex)

    throw new MigrationException(
    $"Failed to publish batch of messages with IDs messageIds.", ex);




    public void PublishMessages(params Message messages)

    foreach (var message in messages)

    _messageBatcher.Post(message);



    public void Dispose()

    _messageBatcher.TriggerBatch();
    _messageBatcher.Complete();
    _processorTask.Wait();
    _processorTask.Dispose();
    _sqsClient.Dispose();








    share|improve this question























      up vote
      2
      down vote

      favorite









      up vote
      2
      down vote

      favorite











      I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.



      I would like it to be reviewed for any sort of improvement.



      public class BatchMessagePublisher : IMessagePublisher

      private const int PublishBatchSize = 10;

      private readonly IAmazonSQS _sqsClient;
      private readonly ILogger<BatchMessagePublisher> _logger;
      private readonly string _queueUrl;
      private readonly bool _isFifo;
      private readonly BatchBlock<Message> _messageBatcher;
      private readonly Task _processorTask;

      public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)

      _sqsClient = sqsClient;
      _logger = logger;
      _queueUrl = queueUrl;
      _isFifo = isFifo;

      _messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions

      BoundedCapacity = 1000
      );

      _processorTask = StartAsync();



      private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)

      var request = new SendMessageBatchRequestEntry

      Id = message.MessageId,
      MessageBody = message.MessageBody,
      MessageAttributes =

      ["MessageType"] = new MessageAttributeValue

      DataType = "String",
      StringValue = message.MessageType


      ;

      if (_isFifo)

      request.MessageGroupId = message.GroupId;
      request.MessageDeduplicationId = message.MessageId;


      return request;


      private async Task StartAsync()

      while (!_messageBatcher.Completion.IsCompleted)

      var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
      .Select(CreateBatchRequestEntry)
      .ToList();

      _logger.LogDebug($"Publishing PublishBatchSize messages...");

      var messageIds = string.Join(", ", batch.Select(r => r.Id));

      try

      var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);

      if (batchResponse.Failed.Any())

      var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
      throw new MigrationException(
      $"Failed to publish batch of messages with IDs messageIds: errors");


      _logger.LogDebug($"Published batch of messages with IDs messageIds");

      catch (AmazonServiceException ex)

      throw new MigrationException(
      $"Failed to publish batch of messages with IDs messageIds.", ex);




      public void PublishMessages(params Message messages)

      foreach (var message in messages)

      _messageBatcher.Post(message);



      public void Dispose()

      _messageBatcher.TriggerBatch();
      _messageBatcher.Complete();
      _processorTask.Wait();
      _processorTask.Dispose();
      _sqsClient.Dispose();








      share|improve this question













      I have written the following class which is supposed to act as a consumer of messages that are passed in from producers and forward them onto amazon SQS, I have made a compromise by starting the operation at the constructor and waiting and completing on it in the dispose method, I made this compromise to make the clients job of using this class easier and because I thought since the processorTask and the SQS client have to be disposed anyway then it should be fine to use the dispose method to make sure they are finished with their duties.



      I would like it to be reviewed for any sort of improvement.



      public class BatchMessagePublisher : IMessagePublisher

      private const int PublishBatchSize = 10;

      private readonly IAmazonSQS _sqsClient;
      private readonly ILogger<BatchMessagePublisher> _logger;
      private readonly string _queueUrl;
      private readonly bool _isFifo;
      private readonly BatchBlock<Message> _messageBatcher;
      private readonly Task _processorTask;

      public BatchMessagePublisher(IAmazonSQS sqsClient, ILogger<BatchMessagePublisher> logger, string queueUrl, bool isFifo)

      _sqsClient = sqsClient;
      _logger = logger;
      _queueUrl = queueUrl;
      _isFifo = isFifo;

      _messageBatcher = new BatchBlock<Message>(PublishBatchSize, new GroupingDataflowBlockOptions

      BoundedCapacity = 1000
      );

      _processorTask = StartAsync();



      private SendMessageBatchRequestEntry CreateBatchRequestEntry(Message message)

      var request = new SendMessageBatchRequestEntry

      Id = message.MessageId,
      MessageBody = message.MessageBody,
      MessageAttributes =

      ["MessageType"] = new MessageAttributeValue

      DataType = "String",
      StringValue = message.MessageType


      ;

      if (_isFifo)

      request.MessageGroupId = message.GroupId;
      request.MessageDeduplicationId = message.MessageId;


      return request;


      private async Task StartAsync()

      while (!_messageBatcher.Completion.IsCompleted)

      var batch = (await _messageBatcher.ReceiveAsync().ConfigureAwait(false))
      .Select(CreateBatchRequestEntry)
      .ToList();

      _logger.LogDebug($"Publishing PublishBatchSize messages...");

      var messageIds = string.Join(", ", batch.Select(r => r.Id));

      try

      var batchResponse = await _sqsClient.SendMessageBatchAsync(_queueUrl,batch);

      if (batchResponse.Failed.Any())

      var errors = string.Join(", ", batchResponse.Failed.Select(f => f.Message));
      throw new MigrationException(
      $"Failed to publish batch of messages with IDs messageIds: errors");


      _logger.LogDebug($"Published batch of messages with IDs messageIds");

      catch (AmazonServiceException ex)

      throw new MigrationException(
      $"Failed to publish batch of messages with IDs messageIds.", ex);




      public void PublishMessages(params Message messages)

      foreach (var message in messages)

      _messageBatcher.Post(message);



      public void Dispose()

      _messageBatcher.TriggerBatch();
      _messageBatcher.Complete();
      _processorTask.Wait();
      _processorTask.Dispose();
      _sqsClient.Dispose();










      share|improve this question












      share|improve this question




      share|improve this question








      edited Feb 8 at 13:54
























      asked Feb 8 at 10:40









      xerxes

      200110




      200110

























          active

          oldest

          votes











          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f187085%2fproducer-consumer-using-tpl-dataflow%23new-answer', 'question_page');

          );

          Post as a guest



































          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes










           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f187085%2fproducer-consumer-using-tpl-dataflow%23new-answer', 'question_page');

          );

          Post as a guest













































































          Popular posts from this blog

          Greedy Best First Search implementation in Rust

          Function to Return a JSON Like Objects Using VBA Collections and Arrays

          C++11 CLH Lock Implementation