Flinklink: Code Examples¶
Input/output¶
Reading data¶
In this example we show how can we read a dataset using Flink. Note that the process is the same regardless being a single or a distributed file.
package eu.amidst.flinklink.examples.io;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.io.DataFlinkLoader;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by rcabanas on 10/06/16.
*/
public class DataStreamLoaderExample {
public static void main(String[] args) throws Exception {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//Paths to datasets
String simpleFile = "datasets/simulated/syntheticData.arff";
String distriFile = "datasets/simulated/distributed.arff";
//Load the data
DataFlink<DataInstance> dataSimple = DataFlinkLoader.open(env, simpleFile, false);
DataFlink<DataInstance> dataDistri = DataFlinkLoader.open(env,distriFile, false);
//Print the number of data samples
System.out.println(dataSimple.getDataSet().count());
System.out.println(dataDistri.getDataSet().count());
}
}
Writing data¶
Below we generate a random Flink dataset with 1000 instances, 2 discrete variables and 3 continuous ones. The seed used is 1234. Eventually, we save it as a distributed dataset (format ARFF folder).
package eu.amidst.flinklink.examples.io;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.io.DataFlinkWriter;
import eu.amidst.flinklink.core.utils.DataSetGenerator;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by rcabanas on 09/06/16.
*/
public class DataStreamWriterExample {
public static void main(String[] args) throws Exception {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env,1234,1000,2,3);
//Saves it as a distributed arff file
DataFlinkWriter.writeDataToARFFFolder(dataFlink, "datasets/simulated/distributed.arff");
}
}
//TODO: Write to standard arff --> convert to datastream??
Parametric learning¶
Here give examples of the provided algorithms by AMiDST for learning the probability distributions from a Flink data set. For shake of simplicity, we will consider the Naive Bayes DAG structure. Note that the code is almost the same of each of the algoritms, they only differ on the constructor used (e.g. new ParallelMaximumLikelihood(), new dVMP(), etc.)
Parallel Maximum Likelihood¶
package eu.amidst.flinklink.examples.learning;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.core.models.DAG;
import eu.amidst.core.utils.DAGGenerator;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.learning.parametric.ParallelMaximumLikelihood;
import eu.amidst.flinklink.core.learning.parametric.ParameterLearningAlgorithm;
import eu.amidst.flinklink.core.utils.DataSetGenerator;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by rcabanas on 14/06/16.
*/
public class ParallelMLExample {
public static void main(String[] args) throws Exception {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env,1234,1000,5,0);
//Creates a DAG with the NaiveBayes structure for the random dataset
DAG dag = DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), "DiscreteVar4");
System.out.println(dag.toString());
//Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink =
new ParallelMaximumLikelihood();
//Learning parameters
learningAlgorithmFlink.setBatchSize(10);
learningAlgorithmFlink.setDAG(dag);
//Initialize the learning process
learningAlgorithmFlink.initLearning();
//Learn from the flink data
learningAlgorithmFlink.updateModel(dataFlink);
//Print the learnt BN
BayesianNetwork bn = learningAlgorithmFlink.getLearntBayesianNetwork();
System.out.println(bn);
}
}
Distributed Variational Message Passing¶
package eu.amidst.flinklink.examples.learning;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.core.models.DAG;
import eu.amidst.core.utils.DAGGenerator;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.learning.parametric.ParameterLearningAlgorithm;
import eu.amidst.flinklink.core.learning.parametric.dVMP;
import eu.amidst.flinklink.core.utils.DataSetGenerator;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by rcabanas on 14/06/16.
*/
public class dVMPExample {
public static void main(String[] args) throws Exception {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env,1234,1000,5,0);
//Creates a DAG with the NaiveBayes structure for the random dataset
DAG dag = DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), "DiscreteVar4");
System.out.println(dag.toString());
//Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink =
new dVMP();
//Learning parameters
learningAlgorithmFlink.setBatchSize(10);
learningAlgorithmFlink.setDAG(dag);
//Initialize the learning process
learningAlgorithmFlink.initLearning();
//Learn from the flink data
learningAlgorithmFlink.updateModel(dataFlink);
//Print the learnt BN
BayesianNetwork bn = learningAlgorithmFlink.getLearntBayesianNetwork();
System.out.println(bn);
}
}
Distributed VI¶
package eu.amidst.flinklink.examples.learning;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.core.models.DAG;
import eu.amidst.core.utils.DAGGenerator;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.learning.parametric.DistributedVI;
import eu.amidst.flinklink.core.learning.parametric.ParameterLearningAlgorithm;
import eu.amidst.flinklink.core.utils.DataSetGenerator;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by rcabanas on 14/06/16.
*/
public class DistributedVIExample {
public static void main(String[] args) throws Exception {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env,1234,1000,5,0);
//Creates a DAG with the NaiveBayes structure for the random dataset
DAG dag = DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), "DiscreteVar4");
System.out.println(dag.toString());
//Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink =
new DistributedVI();
//Learning parameters
learningAlgorithmFlink.setBatchSize(10);
learningAlgorithmFlink.setDAG(dag);
//Initialize the learning process
learningAlgorithmFlink.initLearning();
//Learn from the flink data
learningAlgorithmFlink.updateModel(dataFlink);
//Print the learnt BN
BayesianNetwork bn = learningAlgorithmFlink.getLearntBayesianNetwork();
System.out.println(bn);
}
}
Stochastic VI¶
An example of the learning algorithm Stochastic VI is given below. Note that two specific parameters must be set, namely the learning factor and the data size.
package eu.amidst.flinklink.examples.learning;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.core.models.DAG;
import eu.amidst.core.utils.DAGGenerator;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.learning.parametric.ParameterLearningAlgorithm;
import eu.amidst.flinklink.core.learning.parametric.StochasticVI;
import eu.amidst.flinklink.core.utils.DataSetGenerator;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by rcabanas on 14/06/16.
*/
public class StochasticVIExample {
public static void main(String[] args) throws Exception {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//generate a random dataset
DataFlink<DataInstance> dataFlink = new DataSetGenerator().generate(env,1234,1000,5,0);
//Creates a DAG with the NaiveBayes structure for the random dataset
DAG dag = DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), "DiscreteVar4");
System.out.println(dag.toString());
//Create the Learner object
ParameterLearningAlgorithm learningAlgorithmFlink =
new StochasticVI();
//Learning parameters
learningAlgorithmFlink.setBatchSize(10);
learningAlgorithmFlink.setDAG(dag);
//Initialize the learning process
learningAlgorithmFlink.initLearning();
//Learn from the flink data
learningAlgorithmFlink.updateModel(dataFlink);
//Specific parameters for the algorithm
((StochasticVI)learningAlgorithmFlink).setLearningFactor(0.7);
((StochasticVI)learningAlgorithmFlink).setDataSetSize((int) dataFlink.getDataSet().count());
//Print the learnt BN
BayesianNetwork bn = learningAlgorithmFlink.getLearntBayesianNetwork();
System.out.println(bn);
}
}
Extensions and applications¶
Latent variable models with Flink¶
The module latent-variable-models contains a large set of classes that allow to easily learn some of the standard models with latent variables. These models can be learnt from not only from local datasets (e.g. a single ARFF file) but also from distributed ones (e.g. ARFF folder). These last ones are managed using Flink. In code example shown below the model Factor Analysis is learnt from a distributed dataset.
package eu.amidst.flinklink.examples.extensions;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.io.DataFlinkLoader;
import eu.amidst.latentvariablemodels.staticmodels.FactorAnalysis;
import eu.amidst.latentvariablemodels.staticmodels.Model;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import java.io.FileNotFoundException;
/**
* Created by rcabanas on 14/06/16.
*/
public class LatentModelsFlink {
public static void main(String[] args) throws FileNotFoundException {
boolean hadoop_cluster = false;
if (args.length>1){
hadoop_cluster = Boolean.parseBoolean(args[0]);
}
final ExecutionEnvironment env;
//Set-up Flink session.
if(hadoop_cluster){
env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
}else{
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
conf.setInteger("taskmanager.numberOfTaskSlots",Main.PARALLELISM);
env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(Main.PARALLELISM);
env.getConfig().disableSysoutLogging();
}
//Load the datastream
String filename = "datasets/simulated/exampleDS_d0_c5.arff";
DataFlink<DataInstance> data = DataFlinkLoader.loadDataFromFile(env, filename, false);
//Learn the model
Model model = new FactorAnalysis(data.getAttributes());
((FactorAnalysis)model).setNumberOfLatentVariables(3);
model.updateModel(data);
BayesianNetwork bn = model.getModel();
System.out.println(bn);
}
}
Concept drift detection¶
A salient aspect of streaming data is that the domain being modeled is often non-stationary. That is, the distribution governing the data changes over time. This situation is known as concept drift and if not carefully taken into account, the result can be a failure to capture and interpret intrinsic properties of the data during data exploration. The AMIDST toolbox can be used for detecting this situation as shown in the example below.
/*
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*
*/
package eu.amidst.flinklink.examples.reviewMeeting2015;
import eu.amidst.Main;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.flinklink.core.conceptdrift.IDAConceptDriftDetector;
import eu.amidst.flinklink.core.data.DataFlink;
import eu.amidst.flinklink.core.io.DataFlinkLoader;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Created by ana@cs.aau.dk on 18/01/16.
*/
public class ConceptDriftDetector {
//public int NSETS = 15;
public static void learnIDAConceptDriftDetector(int NSETS) throws Exception {
//Set-up Flink session.
Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 12000);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.getConfig().disableSysoutLogging();
env.setParallelism(Main.PARALLELISM);
// DataFlink<DataInstance> data0 = DataFlinkLoader.loadDataFromFolder(env,
// "hdfs:///tmp_conceptdrift_data0.arff", false);
DataFlink<DataInstance> data0 = DataFlinkLoader.open(env,
"./datasets/simulated/tmp_conceptdrift_data0.arff", false);
long start = System.nanoTime();
IDAConceptDriftDetector learn = new IDAConceptDriftDetector();
learn.setBatchSize(1000);
learn.setClassIndex(0);
learn.setAttributes(data0.getAttributes());
learn.setNumberOfGlobalVars(1);
learn.setTransitionVariance(0.1);
learn.setSeed(0);
learn.initLearning();
System.out.println(learn.getGlobalDAG().toString());
double[] output = new double[NSETS];
System.out.println("--------------- LEARNING DATA " + 0 + " --------------------------");
double[] out = learn.updateModelWithNewTimeSlice(data0);
//System.out.println(learn.getLearntDynamicBayesianNetwork());
output[0] = out[0];
for (int i = 1; i < NSETS; i++) {
System.out.println("--------------- LEARNING DATA " + i + " --------------------------");
DataFlink<DataInstance> dataNew = DataFlinkLoader.open(env,
"./datasets/simulated/tmp_conceptdrift_data"+i+".arff", false);
out = learn.updateModelWithNewTimeSlice(dataNew);
//System.out.println(learn.getLearntDynamicBayesianNetwork());
output[i] = out[0];
}
long duration = (System.nanoTime() - start) / 1;
double seconds = duration / 1000000000.0;
System.out.println("Running time" + seconds + " seconds");
//System.out.println(learn.getLearntDynamicBayesianNetwork());
for (int i = 0; i < NSETS; i++) {
System.out.println("E(H_"+i+") =\t" + output[i]);
}
}
public static void main(String[] args) throws Exception {
int NSETS = Integer.parseInt(args[0]);
learnIDAConceptDriftDetector(NSETS);
}
}