jeudi 22 octobre 2015

SQLite does not like reactive programming?

I have been playing with Netflix's RxJava for several months now. Reactive programming has changed my entire approach to programming. It truly brings out the best functional programming has to offer.

However, I want to use reactive programming with SQLite. David Moten has written a great library to integrate RxJava around JDBC. But SQLite seems to have an issue. It does not like one query pushing a ResultSet where each record iteration is transformed into an object and drives another query down the chain.

Say I have two tables

CREATE TABLE TABLE_ONE (
    ID    INTEGER PRIMARY KEY
                  NOT NULL,
    VALUE INTEGER NOT NULL
);

CREATE TABLE TABLE_TWO (
    ID         INTEGER NOT NULL
                       PRIMARY KEY,
    FOREIGN_ID INTEGER NOT NULL
                       REFERENCES TABLE_ONE ([KEY]),
    VALUE      INTEGER NOT NULL
); 

I create a monad that does some INSERT parent/SELECT parent/INSERT child/SELECT child kind of operations.

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;

import java.sql.Connection;

public final class Test {

    public static void main(String[] args) {

        Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
        Database db = Database.from(con);

        Observable<Integer> inputs = Observable.just(100,200,300);

        db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
                .parameters(inputs)
                .returnGeneratedKeys()
                .getAs(Integer.class)
                .flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
                ).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
                                .parameter(t1.id)
                                .parameter(t1.value)
                                .returnGeneratedKeys()
                                .getAs(Integer.class)
                ).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
                ).subscribe(System.out::println, Throwable::printStackTrace);

        db.close();
    }
    private static final class Type1 {
        private final int id;
        private final int value;
        private Type1(int id, int value) {
            this.id = id;
            this.value = value;
        }
    }
    private static final class Type2 {
        private final int id;
        private final int foreignId;
        private final int value;
        private Type2(int id, int foreignKey, int value) {
            this.id = id;
            this.foreignId = foreignKey;
            this.value = value;
        }
        @Override
        public String toString() {
            return "Type2{" +
                    "id=" + id +
                    ", foreignId=" + foreignId +
                    ", value=" + value +
                    '}';
        }
    }
}

More specifically, this is the process that happens for all three numbers (100, 200, 300)...

1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object

This all happens atomically for each 100, 200, 300 values and 4 updates/queries occur for each of them in this chain.

However, I get a SQLITE_INTERRUPT error

java.sql.SQLException: [SQLITE_INTERRUPT]  Operation terminated by sqlite3_interrupt() (interrupted)
    at org.sqlite.core.DB.newSQLException(DB.java:890)
    at org.sqlite.core.DB.newSQLException(DB.java:901)
    at org.sqlite.core.DB.throwex(DB.java:868)
    at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)

But the first item pushes through the chain successfully, inserts into both tables, and prints although I had two more values (200 and 300) to go.

Type2{id=1, foreignId=1, value=100}

My theory is that as each emitted item O is pushed from one query to the next, it interrupts and cancels the iteration of the previous query as shown by X

QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------

Therefore the first emitted item goes through but it leaves a trail of interrupted queries behind it to drive the current one, leaving no way for the next item to be onNext()'d since the query iteration was killed.

SQLite and RxJava folks, can either of you think of a way to fix this? Is there a SQLite setting I can configure to stop this interruption? Or is there an RxJava composition trick that can be done to prevent the interruption?

I also created a simple Git repo with the test above. http://ift.tt/1OUPqE6

Aucun commentaire:

Enregistrer un commentaire