DBToaster in Java Programs

Note: To compile and run queries using the Scala backend requires the Scala compiler. Please refer to Installation for more details.

Since Scala is compatible with Java, one can write a Java application that interacts with generated Scala code and uses its routines for sending events to the query engine and retrieving the results back to the application. We demonstrate this with the following example.

We use DBToaster to generate Scala code for a given SQL query, in this example rst.sql that ships with the release, and then compile the code into a JAR file.

$> cat examples/queries/simple/rst.sql CREATE STREAM R(A int, B int) FROM FILE 'examples/data/simple/r.dat' LINE DELIMITED csv; CREATE STREAM S(B int, C int) FROM FILE 'examples/data/simple/s.dat' LINE DELIMITED csv; CREATE STREAM T(C int, D int) FROM FILE 'examples/data/simple/t.dat' LINE DELIMITED csv; SELECT sum(A*D) AS AtimesD FROM R,S,T WHERE R.B=S.B AND S.C=T.C; $> bin/dbtoaster -c rst.jar -l scala examples/queries/simple/rst.sql

The following Java code communicates with the compiled query engine produced by DBToaster. Submiting events and retriving results is achieved using akka actors. This example is equivalent to the Scala program presented in Scala Code Generation.

import ddbt.gen.*; import ddbt.lib.Messages.*; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.Inbox; import scala.Tuple2; import scala.collection.immutable.List; import scala.collection.immutable.List$; import scala.collection.immutable.$colon$colon; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; class ExampleApp { public static final byte TUPLE_DELETE = 0x00; public static final byte TUPLE_INSERT = 0x01; private static final FiniteDuration DEFAULT_TIMEOUT = Duration.create(1L << 23, TimeUnit.SECONDS); private static final List EMPTY_LIST = List$.MODULE$.empty(); public static List<Object> tuple(Object ... ts) { List<Object> result = EMPTY_LIST; for(int i = ts.length; i > 0; i--) { result = new $colon$colon<Object>(ts[i - 1], result); } return result; } public static void main(String[] args) { ActorSystem system = ActorSystem.create("mySystem"); final Inbox inbox = Inbox.create(system); final ActorRef q = system.actorOf(Props.create(Dbtoaster.class), "Query"); // Send events Tuple2 result; System.out.println("Insert a tuple into R."); q.tell(new TupleEvent(TUPLE_INSERT, "R", tuple(5L, 2L)), null); System.out.println("Insert a tuple into S."); q.tell(new TupleEvent(TUPLE_INSERT, "S", tuple(2L, 3L)), null); System.out.println("Insert a tuple into T."); q.tell(new TupleEvent(TUPLE_INSERT, "T", tuple(3L, 4L)), null); inbox.send(q, new GetSnapshot(new $colon$colon(1, EMPTY_LIST))); result = (Tuple2) (inbox.receive(DEFAULT_TIMEOUT)); System.out.println("Result after this step: " + result._2().toString()); System.out.println("Insert another tuple into T."); q.tell(new TupleEvent(TUPLE_INSERT, "T", tuple(3L, 5L)), null); // Retrieve result inbox.send(q, EndOfStream$.MODULE$); result = (Tuple2) (inbox.receive(DEFAULT_TIMEOUT)); System.out.println("Final Result: " + result._2().toString()); system.shutdown(); } }

This example first creates an ActorSystem object and then launches the query actor. The events are sent to the query actor using TupleEvent objects whose constructor has the following parameters:

Argument Comment
op : byte 0x01 for insertion, 0x00 for deletion.
stream : String Name of the stream as it appears in the SQL file.
data : List<Object> The values of the tuple being inserted into/deleted from the stream.

To retrieve the final result, an EndOfStream message is sent to the query actor. Alternatively the intermediate result of a query can be retrieved using a GetSnapshot message with the following structure:

Argument Comment
view : List<Integer> List of maps that a snapshot is taken of.

Note: Currently, the DBToaster library does not offer a specialized interface for Java applications. We plan to resolve this issue in the future once the Scala API becomes stable.

If this code has been saved to ExampleApp.java, the program can be compiled as follows:

$> javac -classpath "rst.jar:lib/dbt_scala/*" ExampleApp.java $> jar cvf exampleapp.jar ExampleApp.class

The resulting exampleapp.jar can be launched using the following command:

$> java -classpath "rst.jar:lib/dbt_scala/*:exampleapp.jar" ExampleApp Insert a tuple into R. Insert a tuple into S. Insert a tuple into T. Result after this step: List(20) Insert another tuple into T. Final Result: List(45)