Transforming websocket data though Akka framework for Scala

Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
0
down vote
favorite
I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.
I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...
I initialize the request...
val req = WebSocketRequest("wss://ws-feed.gdax.com")
val flow = Http().webSocketClientFlow(req)
Then set up the source
val source: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...
val messageToString: Flow[Message, String, NotUsed] =
Flow[Message].
collect
case message: TextMessage.Strict => Future.successful(message.text)
case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
.mapAsync(1)(identity)
val stringToObject: Flow[String, BaseGdax, NotUsed] =
Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))
I have a sink that gets the initial data and then pipes it to the two transformations
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.
Lastly I create a graph from the source, flow and sink.
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
scala akka
add a comment |Â
up vote
0
down vote
favorite
I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.
I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...
I initialize the request...
val req = WebSocketRequest("wss://ws-feed.gdax.com")
val flow = Http().webSocketClientFlow(req)
Then set up the source
val source: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...
val messageToString: Flow[Message, String, NotUsed] =
Flow[Message].
collect
case message: TextMessage.Strict => Future.successful(message.text)
case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
.mapAsync(1)(identity)
val stringToObject: Flow[String, BaseGdax, NotUsed] =
Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))
I have a sink that gets the initial data and then pipes it to the two transformations
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.
Lastly I create a graph from the source, flow and sink.
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
scala akka
add a comment |Â
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.
I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...
I initialize the request...
val req = WebSocketRequest("wss://ws-feed.gdax.com")
val flow = Http().webSocketClientFlow(req)
Then set up the source
val source: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...
val messageToString: Flow[Message, String, NotUsed] =
Flow[Message].
collect
case message: TextMessage.Strict => Future.successful(message.text)
case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
.mapAsync(1)(identity)
val stringToObject: Flow[String, BaseGdax, NotUsed] =
Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))
I have a sink that gets the initial data and then pipes it to the two transformations
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.
Lastly I create a graph from the source, flow and sink.
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
scala akka
I am using Akka to listen to a websocket that handles data from the GDAX trading platform. I haven't used Akka or Scala before and it seems like there are a lot of different ways to approach problems.
I would like to know if there is a better way to be performing the transformations on the data. I have briefly looking into graphs, but I am not joining different data streams, so I am not sure if it would apply here...
I initialize the request...
val req = WebSocketRequest("wss://ws-feed.gdax.com")
val flow = Http().webSocketClientFlow(req)
Then set up the source
val source: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
And as I get data from the source, I perform two transformations. First I convert the Messages to Strings and then I convert the Strings to respective case classes...
val messageToString: Flow[Message, String, NotUsed] =
Flow[Message].
collect
case message: TextMessage.Strict => Future.successful(message.text)
case streamed: TextMessage.Streamed=> streamed.textStream.runFold("")(_ ++ _).flatMap(Future.successful)
.mapAsync(1)(identity)
val stringToObject: Flow[String, BaseGdax, NotUsed] =
Flow[String].map(s => GdaxJsonProtocol.GdaxJsonFormat.read(JsonParser(s)))
I have a sink that gets the initial data and then pipes it to the two transformations
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
Firstly, I am not sure if I should be doing this with the flows, and then calling via, or if there is just some other to be doing this. Also, do I gain a lot of speed by adding async? Ideally I would like for the to process the objects in order, so I can check their sequence numbers, but I lose that ability to do so easily since I used async.
Lastly I create a graph from the source, flow and sink.
val sink: Sink[Message, NotUsed] =
Flow[Message]
.via(messageToString).async
.via(stringToObject).async
.map(println)
.to(Sink.ignore)
scala akka
asked Feb 9 at 23:43
kyle
1236
1236
add a comment |Â
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f187229%2ftransforming-websocket-data-though-akka-framework-for-scala%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password