Scala Code Generation
Warning: This API is subject to changes in future releases.

Note: To compile and run queries using the Scala backend requires the Scala compiler. Please refer to Installation for more details.

1. Compiling and running a query

DBToaster generates a JAR file for a query when using the -l scala and -c <file> switches:

$> cat examples/queries/simple/rst.sql CREATE STREAM R(A int, B int) FROM FILE 'examples/data/simple/r.dat' LINE DELIMITED csv; CREATE STREAM S(B int, C int) FROM FILE 'examples/data/simple/s.dat' LINE DELIMITED csv; CREATE STREAM T(C int, D int) FROM FILE 'examples/data/simple/t.dat' LINE DELIMITED csv; SELECT sum(A*D) AS AtimesD FROM R,S,T WHERE R.B=S.B AND S.C=T.C; $> bin/dbtoaster -c test.jar -l scala examples/queries/simple/rst.sql

The command above compiles the query to test.jar, which can be run as follows:

$> java -classpath "test.jar:lib/dbt_scala/*" ddbt.gen.Dbtoaster Java 1.7.0_45, Scala 2.10.3 Time: 0.008s (30/0) ATIMESD: 306

After processing all insertions and deletions, the final result is printed.

Note for Windows users: When running compiled Scala programs under Cygwin or Windows directly, one should use Windows-style classpath separators (i.e., semicolons). For instance:

$> java -cp ".\test.jar;.\lib\dbt_scala\akka-actor_2.10-2.2.3.jar;.\lib\dbt_scala\dbtoaster_2.10-2.1-lms.jar;.\lib\dbt_scala\scala-library-2.10.2.jar;.\lib\dbt_scala\config-1.0.2.jar" ddbt.gen.Dbtoaster Java 1.7.0_65, Scala 2.10.2 Time: 0.002s (30/0) ATIMESD: 306

2. Scala API Guide

In the previous example, we used the standard main function to test the query. However, to use the query in real applications, it has to be run from within an application.

The following listing shows a simple example application that communicates with the query class. The communication between the application and the query class is handled using akka.

package org.dbtoaster import ddbt.gen._ import ddbt.lib.Messages._ import akka.actor._ object ExampleApp { val DEFAULT_TIMEOUT = akka.util.Timeout(1L << 42) def main(args: Array[String]) { val system = ActorSystem("mySystem") val q = system.actorOf(Props[Dbtoaster], "Query") var result:(StreamStat,List[Any]) = null // Send events println("Insert a tuple into R.") q ! TupleEvent(TupleInsert, "R", List(5L, 2L)) println("Insert a tuple into S.") q ! TupleEvent(TupleInsert, "S", List(2L, 3L)) println("Insert a tuple into T.") q ! TupleEvent(TupleInsert, "T", List(3L, 4L)) result = scala.concurrent.Await.result(akka.pattern.ask(q, GetSnapshot(List(1)))(DEFAULT_TIMEOUT), DEFAULT_TIMEOUT.duration).asInstanceOf[(StreamStat,List[Any])] println("Result after this step: " + result._2(0)) println("Insert another tuple into T."); q ! TupleEvent(TupleInsert, "T", List(3L, 5L)) // Retrieve result result = scala.concurrent.Await.result(akka.pattern.ask(q, EndOfStream)(DEFAULT_TIMEOUT), DEFAULT_TIMEOUT.duration).asInstanceOf[(StreamStat,List[Any])] println("Final Result: " + result._2(0)) system.shutdown } }

This example first creates an ActorSystem and then launches the query actor. The events are sent to the query actor using TupleEvent messages with the following structure:

Argument Comment
op : TupleOp TupleInsert for insertion, TupleDelete for deletion.
stream : String Name of the stream as it appears in the SQL file.
data : List[Any] The values of the tuple being inserted into/deleted from the stream.

To retrieve the final result, an EndOfStream message is sent to the query actor. Alternatively the intermediate result of a query can be retrieved using a GetSnapshot message with the following structure:

Argument Comment
view : List[Int] List of maps that a snapshot is taken of.

Assuming that the example code has been saved as example.scala, it can be compiled with:

$> scalac -classpath "rst.jar:lib/dbt_scala/*" -d example.jar example.scala

It can then be launched with the following command:

$> java -classpath "rst.jar:lib/dbt_scala/*:example.jar" org.dbtoaster.ExampleApp Insert a tuple into R. Insert a tuple into S. Insert a tuple into T. Result after this step: 20 Insert another tuple into T. Final Result: 45

3. Generated Code Reference

The Scala code generator generates a single file containing an object, a base class containing trigger functions, and an actor class for the query. The base name is derived from the query name unless the -n flag is used.

The code generated for the previous example looks as follows:

package ddbt.gen import ddbt.lib._ ... // Query object used for standalone binaries object Rst { import Helper._ def execute(args:Array[String],f:List[Any]=>Unit) = ... def main(args:Array[String]) { execute(args,(res:List[Any])=>{ println("ATIMESD:\n"+M3Map.toStr(res(0))+"\n") }) } } // Base class class RstBase { import Rst._ import ddbt.lib.Functions._ // Maps/singletons that hold intermediate results var ATIMESD = 0L val ATIMESD_mT1 = M3Map.make[Long,Long](); ... // Triggers def onAddR(r_a:Long, r_b:Long) { ATIMESD += (ATIMESD_mR1.get(r_b) * r_a); ATIMESD_mT1_mR1.slice(0,r_b).foreach { (k1,v1) => val atimesd_mtt_c = k1._2; ATIMESD_mT1.add(atimesd_mtt_c,(v1 * r_a)); } ATIMESD_mS1.add(r_b,r_a); } def onDelR(r_a:Long, r_b:Long) { ... } ... def onDelT(t_c:Long, t_d:Long) { ... } def onSystemReady() { } } // Query actor class Rst extends RstBase with Actor { import Rst._ import ddbt.lib.Messages._ def receive = { case TupleEvent(TupleInsert,"R",List(v0:Long,v1:Long)) => { ... } onAddR(v0,v1) ... case StreamInit(timeout) => onSystemReady(); { ... } case EndOfStream | GetSnapshot(_) => { ... } sender ! (StreamStat(t1-t0,tN,tS),List(ATIMESD)) } }

3.1. The query object

The query object contains the code used by the standalone binary to execute the query. Its execute method reads from the input streams specified in the query file and sends them to the query actor. The main method calls execute and prints the result when all tuples have been processed.

3.2. The query base class

The actual query processor lives in the query base class. For every stream R, there is an insertion onAddR and a deletion trigger onDelR. These trigger methods are responsible for updating the intermediate result. The map and singleton data structures at the top of the base class hold the intermediate result.

The onSystemReady trigger is responsible of loading static information (CREATE TABLE statements in the query file) before the actual processing begins.

3.3. The query actor

The query actor class dispatches events like tuple insertions and deletions to the base class using the receive method.

The EndOfStream message is sent from the event source when it is exhausted. The query actor replies to this message with the current processing statistics (processing time, number of tuples processed, number of tuples skipped) and one or multiple query results.

The GetSnapshot message can be used by an application to access the intermediate result. The query actor replies to this message with the current processing statistics and the results that the message asks for.

The whole process is guarded by a timeout. If the timeout is reached, the actor will stop to process tuples.