Sparklink: Code Examples¶
Input/output¶
Reading data¶
In this example we show how can we read a dataset using sparklink. This module supports reading files in formats json and parquet. Note that the method DataSparkLoader::open automatically detects the format of the file (the indicated path should contain the extension). Finally all the instances in the data set are printed.
package eu.amidst.sparklink.examples.io;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import eu.amidst.sparklink.core.data.DataSpark;
import eu.amidst.sparklink.core.io.DataSparkLoader;
/**
* Created by rcabanas on 10/06/16.
*/
public class DataStreamLoaderExample {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SLink!").setMaster("local");
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//Path to dataset
String path ="datasets/simulated/WI_samples.json";
//Create an AMIDST object for managing the data
DataSpark dataSpark = DataSparkLoader.open(sqlContext, path);
//Print all the instances in the dataset
dataSpark.collectDataStream()
.forEach(
dataInstance -> System.out.println(dataInstance)
);
}
}
Writing data¶
Here we show how can we save spark data into a file. First a random data set is generated using the method DataSetGenerator::generate. Afterwards, such data is save using the method DataSparkWriter::writeDataToFolder
package eu.amidst.sparklink.examples.io;
import eu.amidst.core.io.BayesianNetworkLoader;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.sparklink.core.data.DataSpark;
import eu.amidst.sparklink.core.io.DataSparkWriter;
import eu.amidst.sparklink.core.util.BayesianNetworkSampler;
import eu.amidst.sparklink.core.util.DataSetGenerator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
/**
* Created by rcabanas on 30/09/16.
*/
public class DataStreamWriterExample {
public static void main(String[] args) throws Exception {
//Setting up spark
SparkConf conf = new SparkConf().setAppName("SparkLink!").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);
//Generate random data
int seed = 1234;
int nInstances = 1000;
int nDiscreteAtts=3;
int nContinuousAttributes = 2;
DataSpark data = DataSetGenerator
.generate( jsc,
seed,
nInstances,
nDiscreteAtts,
nContinuousAttributes );
// Save it as a json and parquet file
DataSparkWriter.writeDataToFolder(data, "datasets/simulated/randomData.json", sqlContext);
DataSparkWriter.writeDataToFolder(data, "datasets/simulated/randomData.parquet", sqlContext);
}
}
Parameter learning¶
AMIDST provides parameter learning using spark with the Maximum Likelihood algorithm. In the following example, we load a data set in format json and we use it for learning a simple naive bayes (more complex DAGs can also be learnt).
package eu.amidst.sparklink.examples.learning;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.core.models.DAG;
import eu.amidst.core.utils.DAGGenerator;
import eu.amidst.sparklink.core.data.DataSpark;
import eu.amidst.sparklink.core.io.DataSparkLoader;
import eu.amidst.sparklink.core.learning.ParallelMaximumLikelihood;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* Created by rcabanas on 10/06/16.
*/
public class MaximumLikelihoodLearningExample {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SparkLink!").setMaster("local");;
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//Path to dataset
String path ="datasets/simulated/WI_samples.json";
//Create an AMIDST object for managing the data
DataSpark dataSpark = DataSparkLoader.open(sqlContext, path);
//Learning algorithm
ParallelMaximumLikelihood parameterLearningAlgorithm = new ParallelMaximumLikelihood();
//We fix the BN structure
DAG dag = DAGGenerator.getNaiveBayesStructure(dataSpark.getAttributes(), "W");
parameterLearningAlgorithm.setDAG(dag);
//We set the batch size which will be employed to learn the model in parallel
parameterLearningAlgorithm.setBatchSize(100);
//We set the data which is going to be used for leaning the parameters
parameterLearningAlgorithm.setDataSpark(dataSpark);
//We perform the learning
parameterLearningAlgorithm.runLearning();
//And we get the model
BayesianNetwork bn = parameterLearningAlgorithm.getLearntBayesianNetwork();
System.out.println(bn);
}
}