Transforming websocket data though Akka framework for Scala

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
0
down vote

favorite












I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.



I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...



I initialize the request...



val req = WebSocketRequest("wss://ws-feed.gdax.com")
val flow = Http().webSocketClientFlow(req)


Then set up the source



val source: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)


And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...



val messageToString: Flow[Message, String, NotUsed] =
Flow[Message].
collect
case message: TextMessage.Strict => Future.successful(message.text)
case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
.mapAsync(1)(identity)

val stringToObject: Flow[String, BaseGdax, NotUsed] =
Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))


I have a sink that gets the initial data and then pipes it to the two transformations



val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)



Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.



Lastly I create a graph from the source, flow and sink.



val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)






share|improve this question

























    up vote
    0
    down vote

    favorite












    I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.



    I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...



    I initialize the request...



    val req = WebSocketRequest("wss://ws-feed.gdax.com")
    val flow = Http().webSocketClientFlow(req)


    Then set up the source



    val source: Source[Message, ActorRef] =
    Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)


    And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...



    val messageToString: Flow[Message, String, NotUsed] =
    Flow[Message].
    collect
    case message: TextMessage.Strict => Future.successful(message.text)
    case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
    .mapAsync(1)(identity)

    val stringToObject: Flow[String, BaseGdax, NotUsed] =
    Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))


    I have a sink that gets the initial data and then pipes it to the two transformations



    val sink: Sink[Message, NotUsed] =
    Flow[Message]
    .via(messageToString).async
    .via(stringToObject).async
    .map(println)
    .to(Sink.ignore)



    Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.



    Lastly I create a graph from the source, flow and sink.



    val sink: Sink[Message, NotUsed] =
    Flow[Message]
    .via(messageToString).async
    .via(stringToObject).async
    .map(println)
    .to(Sink.ignore)






    share|improve this question





















      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.



      I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...



      I initialize the request...



      val req = WebSocketRequest("wss://ws-feed.gdax.com")
      val flow = Http().webSocketClientFlow(req)


      Then set up the source



      val source: Source[Message, ActorRef] =
      Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)


      And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...



      val messageToString: Flow[Message, String, NotUsed] =
      Flow[Message].
      collect
      case message: TextMessage.Strict => Future.successful(message.text)
      case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
      .mapAsync(1)(identity)

      val stringToObject: Flow[String, BaseGdax, NotUsed] =
      Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))


      I have a sink that gets the initial data and then pipes it to the two transformations



      val sink: Sink[Message, NotUsed] =
      Flow[Message]
      .via(messageToString).async
      .via(stringToObject).async
      .map(println)
      .to(Sink.ignore)



      Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.



      Lastly I create a graph from the source, flow and sink.



      val sink: Sink[Message, NotUsed] =
      Flow[Message]
      .via(messageToString).async
      .via(stringToObject).async
      .map(println)
      .to(Sink.ignore)






      share|improve this question











      I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.



      I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...



      I initialize the request...



      val req = WebSocketRequest("wss://ws-feed.gdax.com")
      val flow = Http().webSocketClientFlow(req)


      Then set up the source



      val source: Source[Message, ActorRef] =
      Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)


      And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...



      val messageToString: Flow[Message, String, NotUsed] =
      Flow[Message].
      collect
      case message: TextMessage.Strict => Future.successful(message.text)
      case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
      .mapAsync(1)(identity)

      val stringToObject: Flow[String, BaseGdax, NotUsed] =
      Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))


      I have a sink that gets the initial data and then pipes it to the two transformations



      val sink: Sink[Message, NotUsed] =
      Flow[Message]
      .via(messageToString).async
      .via(stringToObject).async
      .map(println)
      .to(Sink.ignore)



      Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.



      Lastly I create a graph from the source, flow and sink.



      val sink: Sink[Message, NotUsed] =
      Flow[Message]
      .via(messageToString).async
      .via(stringToObject).async
      .map(println)
      .to(Sink.ignore)








      share|improve this question










      share|improve this question




      share|improve this question









      asked Feb 9 at 23:43









      kyle

      1236




      1236

























          active

          oldest

          votes











          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f187229%2ftransforming-websocket-data-though-akka-framework-for-scala%23new-answer', 'question_page');

          );

          Post as a guest



































          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes










           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f187229%2ftransforming-websocket-data-though-akka-framework-for-scala%23new-answer', 'question_page');

          );

          Post as a guest













































































          Popular posts from this blog

          Python Lists

          Aion

          JavaScript Array Iteration Methods