I have been playing with Netflix's RxJava for several months now. Reactive programming has changed my entire approach to programming. It truly brings out the best functional programming has to offer.
However, I want to use reactive programming with SQLite. David Moten has written a great library to integrate RxJava around JDBC. But SQLite seems to have an issue. It does not like one query pushing a ResultSet where each record iteration is transformed into an object and drives another query down the chain.
Say I have two tables
CREATE TABLE TABLE_ONE (
ID INTEGER PRIMARY KEY
NOT NULL,
VALUE INTEGER NOT NULL
);
CREATE TABLE TABLE_TWO (
ID INTEGER NOT NULL
PRIMARY KEY,
FOREIGN_ID INTEGER NOT NULL
REFERENCES TABLE_ONE ([KEY]),
VALUE INTEGER NOT NULL
);
I create a monad that does some INSERT parent/SELECT parent/INSERT child/SELECT child kind of operations.
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}
More specifically, this is the process that happens for all three numbers (100, 200, 300)...
1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object
This all happens atomically for each 100, 200, 300 values and 4 updates/queries occur for each of them in this chain.
However, I get a SQLITE_INTERRUPT error
java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted)
at org.sqlite.core.DB.newSQLException(DB.java:890)
at org.sqlite.core.DB.newSQLException(DB.java:901)
at org.sqlite.core.DB.throwex(DB.java:868)
at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)
But the first item pushes through the chain successfully, inserts into both tables, and prints although I had two more values (200 and 300) to go.
Type2{id=1, foreignId=1, value=100}
My theory is that as each emitted item O is pushed from one query to the next, it interrupts and cancels the iteration of the previous query as shown by X
QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------
Therefore the first emitted item goes through but it leaves a trail of interrupted queries behind it to drive the current one, leaving no way for the next item to be onNext()'d since the query iteration was killed.
SQLite and RxJava folks, can either of you think of a way to fix this? Is there a SQLite setting I can configure to stop this interruption? Or is there an RxJava composition trick that can be done to prevent the interruption?
I also created a simple Git repo with the test above. http://ift.tt/1OUPqE6
Aucun commentaire:
Enregistrer un commentaire