Pushing a value to a Processor on a separate thread
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
1
down vote
favorite
I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL()
after the Room database is created. Though you are limited to using RawQuery
on tables created this way (instead of Query
, Insert
, etc.). See my other question on implementing search with Room for more information.
This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.
So I am trying to mimic this functionality by adding it to my Dao
.
To do this, I added a BehaviorProcessor
that publishes a new Flowable
for each access method. The Processor
is then updated each time a value is inserted or deleted into the table.
The issue is that the RawQuery
method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable
that pushes the result to the Processor
upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor
.
Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor
that allows me to update it away from the main thread without creating a separate Flowable
?
@Dao
abstract class PlaceDao(private val database: MapDatabase)
private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())
@RawQuery
protected abstract fun rawReadList(query: SupportSQLiteQuery): Place
private fun refreshProcessor()
Flowable.create<List<Place>>(
emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
processor.onNext(places)
fun readAll(): Flowable<List<Place>>
refreshProcessor()
return processor.publish().autoConnect()
fun findById(): Flowable<Place>
refreshProcessor()
return processor.map it.first it.id == id .publish().autoConnect()
fun insert(varargs places: Place)
val writableDatabase = database.openHelper.writableDatabase
for (place in places)
writableDatabase.execSQL(
"INSERT OR REPLACE INTO `places` " +
"(`id`, `title`, `subtitle`, `description`, " +
"`latitude`, `longitude`, `type`, `parent`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
arrayOf(place.id, place.title, place.subtitle, place.description,
place.location.latitude, place.location.longitude,
place.type, place.parent))
)
refreshProcessor()
fun deleteAll()
val writableDatabase = database.openHelper.writableDatabase
writableDatabase.execSQL("DELETE FROM `places`")
refreshProcessor()
Update
I found out that using a ConnectedFlowable
is not the way to go. It seems as if the Flowable
returned by publish()
is not connected to the BehaviorProcessor
and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject
after publish()
is called.
So I removed any calls to publish()
and autoConnect()
in my code and just return the BehaviorProcessor
itself instead.
android kotlin rx-java
add a comment |Â
up vote
1
down vote
favorite
I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL()
after the Room database is created. Though you are limited to using RawQuery
on tables created this way (instead of Query
, Insert
, etc.). See my other question on implementing search with Room for more information.
This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.
So I am trying to mimic this functionality by adding it to my Dao
.
To do this, I added a BehaviorProcessor
that publishes a new Flowable
for each access method. The Processor
is then updated each time a value is inserted or deleted into the table.
The issue is that the RawQuery
method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable
that pushes the result to the Processor
upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor
.
Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor
that allows me to update it away from the main thread without creating a separate Flowable
?
@Dao
abstract class PlaceDao(private val database: MapDatabase)
private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())
@RawQuery
protected abstract fun rawReadList(query: SupportSQLiteQuery): Place
private fun refreshProcessor()
Flowable.create<List<Place>>(
emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
processor.onNext(places)
fun readAll(): Flowable<List<Place>>
refreshProcessor()
return processor.publish().autoConnect()
fun findById(): Flowable<Place>
refreshProcessor()
return processor.map it.first it.id == id .publish().autoConnect()
fun insert(varargs places: Place)
val writableDatabase = database.openHelper.writableDatabase
for (place in places)
writableDatabase.execSQL(
"INSERT OR REPLACE INTO `places` " +
"(`id`, `title`, `subtitle`, `description`, " +
"`latitude`, `longitude`, `type`, `parent`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
arrayOf(place.id, place.title, place.subtitle, place.description,
place.location.latitude, place.location.longitude,
place.type, place.parent))
)
refreshProcessor()
fun deleteAll()
val writableDatabase = database.openHelper.writableDatabase
writableDatabase.execSQL("DELETE FROM `places`")
refreshProcessor()
Update
I found out that using a ConnectedFlowable
is not the way to go. It seems as if the Flowable
returned by publish()
is not connected to the BehaviorProcessor
and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject
after publish()
is called.
So I removed any calls to publish()
and autoConnect()
in my code and just return the BehaviorProcessor
itself instead.
android kotlin rx-java
add a comment |Â
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL()
after the Room database is created. Though you are limited to using RawQuery
on tables created this way (instead of Query
, Insert
, etc.). See my other question on implementing search with Room for more information.
This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.
So I am trying to mimic this functionality by adding it to my Dao
.
To do this, I added a BehaviorProcessor
that publishes a new Flowable
for each access method. The Processor
is then updated each time a value is inserted or deleted into the table.
The issue is that the RawQuery
method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable
that pushes the result to the Processor
upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor
.
Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor
that allows me to update it away from the main thread without creating a separate Flowable
?
@Dao
abstract class PlaceDao(private val database: MapDatabase)
private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())
@RawQuery
protected abstract fun rawReadList(query: SupportSQLiteQuery): Place
private fun refreshProcessor()
Flowable.create<List<Place>>(
emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
processor.onNext(places)
fun readAll(): Flowable<List<Place>>
refreshProcessor()
return processor.publish().autoConnect()
fun findById(): Flowable<Place>
refreshProcessor()
return processor.map it.first it.id == id .publish().autoConnect()
fun insert(varargs places: Place)
val writableDatabase = database.openHelper.writableDatabase
for (place in places)
writableDatabase.execSQL(
"INSERT OR REPLACE INTO `places` " +
"(`id`, `title`, `subtitle`, `description`, " +
"`latitude`, `longitude`, `type`, `parent`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
arrayOf(place.id, place.title, place.subtitle, place.description,
place.location.latitude, place.location.longitude,
place.type, place.parent))
)
refreshProcessor()
fun deleteAll()
val writableDatabase = database.openHelper.writableDatabase
writableDatabase.execSQL("DELETE FROM `places`")
refreshProcessor()
Update
I found out that using a ConnectedFlowable
is not the way to go. It seems as if the Flowable
returned by publish()
is not connected to the BehaviorProcessor
and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject
after publish()
is called.
So I removed any calls to publish()
and autoConnect()
in my code and just return the BehaviorProcessor
itself instead.
android kotlin rx-java
I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL()
after the Room database is created. Though you are limited to using RawQuery
on tables created this way (instead of Query
, Insert
, etc.). See my other question on implementing search with Room for more information.
This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.
So I am trying to mimic this functionality by adding it to my Dao
.
To do this, I added a BehaviorProcessor
that publishes a new Flowable
for each access method. The Processor
is then updated each time a value is inserted or deleted into the table.
The issue is that the RawQuery
method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable
that pushes the result to the Processor
upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor
.
Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor
that allows me to update it away from the main thread without creating a separate Flowable
?
@Dao
abstract class PlaceDao(private val database: MapDatabase)
private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())
@RawQuery
protected abstract fun rawReadList(query: SupportSQLiteQuery): Place
private fun refreshProcessor()
Flowable.create<List<Place>>(
emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
processor.onNext(places)
fun readAll(): Flowable<List<Place>>
refreshProcessor()
return processor.publish().autoConnect()
fun findById(): Flowable<Place>
refreshProcessor()
return processor.map it.first it.id == id .publish().autoConnect()
fun insert(varargs places: Place)
val writableDatabase = database.openHelper.writableDatabase
for (place in places)
writableDatabase.execSQL(
"INSERT OR REPLACE INTO `places` " +
"(`id`, `title`, `subtitle`, `description`, " +
"`latitude`, `longitude`, `type`, `parent`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
arrayOf(place.id, place.title, place.subtitle, place.description,
place.location.latitude, place.location.longitude,
place.type, place.parent))
)
refreshProcessor()
fun deleteAll()
val writableDatabase = database.openHelper.writableDatabase
writableDatabase.execSQL("DELETE FROM `places`")
refreshProcessor()
Update
I found out that using a ConnectedFlowable
is not the way to go. It seems as if the Flowable
returned by publish()
is not connected to the BehaviorProcessor
and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject
after publish()
is called.
So I removed any calls to publish()
and autoConnect()
in my code and just return the BehaviorProcessor
itself instead.
android kotlin rx-java
edited Apr 19 at 19:59
asked Apr 17 at 19:32
Bryan
1065
1065
add a comment |Â
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192322%2fpushing-a-value-to-a-processor-on-a-separate-thread%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password