Pushing a value to a Processor on a separate thread

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
1
down vote

favorite












I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL() after the Room database is created. Though you are limited to using RawQuery on tables created this way (instead of Query, Insert, etc.). See my other question on implementing search with Room for more information.



This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.




So I am trying to mimic this functionality by adding it to my Dao.



To do this, I added a BehaviorProcessor that publishes a new Flowable for each access method. The Processor is then updated each time a value is inserted or deleted into the table.



The issue is that the RawQuery method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable that pushes the result to the Processor upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor.



Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor that allows me to update it away from the main thread without creating a separate Flowable?



@Dao
abstract class PlaceDao(private val database: MapDatabase)

private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())

@RawQuery
protected abstract fun rawReadList(query: SupportSQLiteQuery): Place

private fun refreshProcessor()
Flowable.create<List<Place>>(
emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
processor.onNext(places)



fun readAll(): Flowable<List<Place>>
refreshProcessor()
return processor.publish().autoConnect()


fun findById(): Flowable<Place>
refreshProcessor()
return processor.map it.first it.id == id .publish().autoConnect()


fun insert(varargs places: Place)
val writableDatabase = database.openHelper.writableDatabase

for (place in places)
writableDatabase.execSQL(
"INSERT OR REPLACE INTO `places` " +
"(`id`, `title`, `subtitle`, `description`, " +
"`latitude`, `longitude`, `type`, `parent`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
arrayOf(place.id, place.title, place.subtitle, place.description,
place.location.latitude, place.location.longitude,
place.type, place.parent))
)


refreshProcessor()


fun deleteAll()
val writableDatabase = database.openHelper.writableDatabase
writableDatabase.execSQL("DELETE FROM `places`")
refreshProcessor()






Update



I found out that using a ConnectedFlowable is not the way to go. It seems as if the Flowable returned by publish() is not connected to the BehaviorProcessor and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject after publish() is called.



So I removed any calls to publish() and autoConnect() in my code and just return the BehaviorProcessor itself instead.







share|improve this question



























    up vote
    1
    down vote

    favorite












    I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL() after the Room database is created. Though you are limited to using RawQuery on tables created this way (instead of Query, Insert, etc.). See my other question on implementing search with Room for more information.



    This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.




    So I am trying to mimic this functionality by adding it to my Dao.



    To do this, I added a BehaviorProcessor that publishes a new Flowable for each access method. The Processor is then updated each time a value is inserted or deleted into the table.



    The issue is that the RawQuery method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable that pushes the result to the Processor upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor.



    Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor that allows me to update it away from the main thread without creating a separate Flowable?



    @Dao
    abstract class PlaceDao(private val database: MapDatabase)

    private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())

    @RawQuery
    protected abstract fun rawReadList(query: SupportSQLiteQuery): Place

    private fun refreshProcessor()
    Flowable.create<List<Place>>(
    emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
    , BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
    processor.onNext(places)



    fun readAll(): Flowable<List<Place>>
    refreshProcessor()
    return processor.publish().autoConnect()


    fun findById(): Flowable<Place>
    refreshProcessor()
    return processor.map it.first it.id == id .publish().autoConnect()


    fun insert(varargs places: Place)
    val writableDatabase = database.openHelper.writableDatabase

    for (place in places)
    writableDatabase.execSQL(
    "INSERT OR REPLACE INTO `places` " +
    "(`id`, `title`, `subtitle`, `description`, " +
    "`latitude`, `longitude`, `type`, `parent`) " +
    "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
    arrayOf(place.id, place.title, place.subtitle, place.description,
    place.location.latitude, place.location.longitude,
    place.type, place.parent))
    )


    refreshProcessor()


    fun deleteAll()
    val writableDatabase = database.openHelper.writableDatabase
    writableDatabase.execSQL("DELETE FROM `places`")
    refreshProcessor()






    Update



    I found out that using a ConnectedFlowable is not the way to go. It seems as if the Flowable returned by publish() is not connected to the BehaviorProcessor and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject after publish() is called.



    So I removed any calls to publish() and autoConnect() in my code and just return the BehaviorProcessor itself instead.







    share|improve this question























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL() after the Room database is created. Though you are limited to using RawQuery on tables created this way (instead of Query, Insert, etc.). See my other question on implementing search with Room for more information.



      This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.




      So I am trying to mimic this functionality by adding it to my Dao.



      To do this, I added a BehaviorProcessor that publishes a new Flowable for each access method. The Processor is then updated each time a value is inserted or deleted into the table.



      The issue is that the RawQuery method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable that pushes the result to the Processor upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor.



      Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor that allows me to update it away from the main thread without creating a separate Flowable?



      @Dao
      abstract class PlaceDao(private val database: MapDatabase)

      private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())

      @RawQuery
      protected abstract fun rawReadList(query: SupportSQLiteQuery): Place

      private fun refreshProcessor()
      Flowable.create<List<Place>>(
      emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
      , BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
      processor.onNext(places)



      fun readAll(): Flowable<List<Place>>
      refreshProcessor()
      return processor.publish().autoConnect()


      fun findById(): Flowable<Place>
      refreshProcessor()
      return processor.map it.first it.id == id .publish().autoConnect()


      fun insert(varargs places: Place)
      val writableDatabase = database.openHelper.writableDatabase

      for (place in places)
      writableDatabase.execSQL(
      "INSERT OR REPLACE INTO `places` " +
      "(`id`, `title`, `subtitle`, `description`, " +
      "`latitude`, `longitude`, `type`, `parent`) " +
      "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
      arrayOf(place.id, place.title, place.subtitle, place.description,
      place.location.latitude, place.location.longitude,
      place.type, place.parent))
      )


      refreshProcessor()


      fun deleteAll()
      val writableDatabase = database.openHelper.writableDatabase
      writableDatabase.execSQL("DELETE FROM `places`")
      refreshProcessor()






      Update



      I found out that using a ConnectedFlowable is not the way to go. It seems as if the Flowable returned by publish() is not connected to the BehaviorProcessor and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject after publish() is called.



      So I removed any calls to publish() and autoConnect() in my code and just return the BehaviorProcessor itself instead.







      share|improve this question













      I am attempting to use Room along with a full-text search (FTS) table. The issue is that Room doesn't recognize or support FTS tables. Though there is a work around for this; you just create the FTS table using execSQL() after the Room database is created. Though you are limited to using RawQuery on tables created this way (instead of Query, Insert, etc.). See my other question on implementing search with Room for more information.



      This is works well, but it limits the table to a subset of the functionality normally provided by Room. For instance; you cannot use reactive queries with the RxJava artifact, you can only get a (blocking) result from the database.




      So I am trying to mimic this functionality by adding it to my Dao.



      To do this, I added a BehaviorProcessor that publishes a new Flowable for each access method. The Processor is then updated each time a value is inserted or deleted into the table.



      The issue is that the RawQuery method is a blocking method, and throws an exception if accessed from the main thread. So I wrapped the read method in a Flowable that pushes the result to the Processor upon success. This works fairly well, but I am unsure if this is the best way to go about updating the Processor.



      Is there a more "RxJava" way of achieving this effect? Possibly some method I failed to recognize in the Processor that allows me to update it away from the main thread without creating a separate Flowable?



      @Dao
      abstract class PlaceDao(private val database: MapDatabase)

      private val processor = BehaviorProcessor.createDefault<List<Place>>(emptyList())

      @RawQuery
      protected abstract fun rawReadList(query: SupportSQLiteQuery): Place

      private fun refreshProcessor()
      Flowable.create<List<Place>>(
      emitter.onNext(rawReadList(SimpleSQLiteQuery("SELECT * FROM `places`")))
      , BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe( places ->
      processor.onNext(places)



      fun readAll(): Flowable<List<Place>>
      refreshProcessor()
      return processor.publish().autoConnect()


      fun findById(): Flowable<Place>
      refreshProcessor()
      return processor.map it.first it.id == id .publish().autoConnect()


      fun insert(varargs places: Place)
      val writableDatabase = database.openHelper.writableDatabase

      for (place in places)
      writableDatabase.execSQL(
      "INSERT OR REPLACE INTO `places` " +
      "(`id`, `title`, `subtitle`, `description`, " +
      "`latitude`, `longitude`, `type`, `parent`) " +
      "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
      arrayOf(place.id, place.title, place.subtitle, place.description,
      place.location.latitude, place.location.longitude,
      place.type, place.parent))
      )


      refreshProcessor()


      fun deleteAll()
      val writableDatabase = database.openHelper.writableDatabase
      writableDatabase.execSQL("DELETE FROM `places`")
      refreshProcessor()






      Update



      I found out that using a ConnectedFlowable is not the way to go. It seems as if the Flowable returned by publish() is not connected to the BehaviorProcessor and either does not emit the most recent item observed or does not emit any item pushed to the BehaviorSubject after publish() is called.



      So I removed any calls to publish() and autoConnect() in my code and just return the BehaviorProcessor itself instead.









      share|improve this question












      share|improve this question




      share|improve this question








      edited Apr 19 at 19:59
























      asked Apr 17 at 19:32









      Bryan

      1065




      1065

























          active

          oldest

          votes











          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192322%2fpushing-a-value-to-a-processor-on-a-separate-thread%23new-answer', 'question_page');

          );

          Post as a guest



































          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes










           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f192322%2fpushing-a-value-to-a-processor-on-a-separate-thread%23new-answer', 'question_page');

          );

          Post as a guest













































































          Popular posts from this blog

          Greedy Best First Search implementation in Rust

          Function to Return a JSON Like Objects Using VBA Collections and Arrays

          C++11 CLH Lock Implementation