DataVec

DataVec帮助克服机器学习及深度学习实现过程中最重大的障碍之一:将数据转化为神经网络能够识别的格式

神经网络所能识别的是向量。

因此,对许多数据科学家而言,在开始用数据定型自己的算法之前,首先必须要解决向量化的问题。

如果您的数据以CSV(逗号分隔值)格式储存在平面文件中,必须先转换为数值格式再加以摄取,又或者您的数据是一些有标签的图像的目录结构, 那么 DataVec 这款工具就可以帮助您组织数据,以供在Deeplearning4J中使用。

主要特点

  • DataVec 采用输入/输出格式系统(就像Hadoop MapReduce用InputFormat来确定具体的InputSplit和RecordReader一样, DataVec也会用不同的RecordReader来将数据序列化)

  • 支持所有主要的输入数据类型(文本、CSV、音频、图像、视频),每种类型都有相应的输入格式

  • 采用输出格式系统来指定一种与实现无关的向量格式(ARFF、SVMLight等)

  • 可以为特殊输入格式(如某些罕见的图像格式)进行扩展;也就是说,您可以编写自定义的输入格式,让余下的基本代码来处理转换加工管道

  • 让向量化成为“一等公民”

  • 内置数据转换及标准化工具

应用举例

  • 将基于CSV格式的UCI鸢尾花数据集转换为svmLight开放式向量文本格式

  • 将MNIST数据集的原始二进制文件转换为svmLight文本格式。

  • 将原始文本转换为Metronome向量格式

  • 用TF-IDF方法将原始文本转换为文本向量格式 {svmLight, metronome, arff}

  • 将原始文本转换为word2vec文本向量格式 {svmLight, metronome, arff}

支持的向量化引擎

  • 用脚本语言将各种CSV转换为向量
  • MNIST转换为向量
  • 文本转换为向量

    • TF-IDF
    • 词袋
    • Word2vec

CSV转换引擎

CSVRecordReader足以处理格式规范的数值数据;但如果数据包含表示布尔值(真/假)的字符串或标签的字符串等非数值字段,那就需要进行架构(Schema)转换。 DataVec使用Apache Spark来进行转换运算。*即使不了解Spark的内部细节,也能成功使用DataVec进行转换

Example

BasicDataVecExample.java

/**
 * Basic DataVec example for preprocessing operations on some simple CSV data. If you just want to load CSV data
 * and pass it on for learning take a look at {@see org.deeplearning4j.examples.dataExample.CSVExample}.
 *
 * The premise here is that some data regarding transactions is available in CSV format, and we want to do some some
 * operations on this data, including:
 * 1. Removing some unnecessary columns
 * 2. Filtering examples to keep only examples with values "USA" or "CAN" for the "MerchantCountryCode" column
 * 3. Replacing some invalid values in the "TransactionAmountUSD" column
 * 4. Parsing the date string, and extracting the hour of day from it to create a new "HourOfDay" column
 *
 * @author Alex Black
 */
public class BasicDataVecExample {

    public static  void main(String[] args) throws Exception {

        //=====================================================================
        //                 Step 1: Define the input data schema
        //=====================================================================

        //Let's define the schema of the data that we want to import
        //The order in which columns are defined here should match the order in which they appear in the input data
        Schema inputDataSchema = new Schema.Builder()
            //We can define a single column
            .addColumnString("DateTimeString")
            //Or for convenience define multiple columns of the same type
            .addColumnsString("CustomerID", "MerchantID")
            //We can define different column types for different types of data:
            .addColumnInteger("NumItemsInTransaction")
            .addColumnCategorical("MerchantCountryCode", Arrays.asList("USA","CAN","FR","MX"))
            //Some columns have restrictions on the allowable values, that we consider valid:
            .addColumnDouble("TransactionAmountUSD",0.0,null,false,false)   //$0.0 or more, no maximum limit, no NaN and no Infinite values
            .addColumnCategorical("FraudLabel", Arrays.asList("Fraud","Legit"))
            .build();

        //Print out the schema:
        System.out.println("Input data schema details:");
        System.out.println(inputDataSchema);

        System.out.println("\n\nOther information obtainable from schema:");
        System.out.println("Number of columns: " + inputDataSchema.numColumns());
        System.out.println("Column names: " + inputDataSchema.getColumnNames());
        System.out.println("Column types: " + inputDataSchema.getColumnTypes());


        //=====================================================================
        //            Step 2: Define the operations we want to do
        //=====================================================================

        //Lets define some operations to execute on the data...
        //We do this by defining a TransformProcess
        //At each step, we identify column by the name we gave them in the input data schema, above

        TransformProcess tp = new TransformProcess.Builder(inputDataSchema)
            //Let's remove some column we don't need
            .removeColumns("CustomerID","MerchantID")

            //Now, suppose we only want to analyze transactions involving merchants in USA or Canada. Let's filter out
            // everything except for those countries.
            //Here, we are applying a conditional filter. We remove all of the examples that match the condition
            // The condition is "MerchantCountryCode" isn't one of {"USA", "CAN"}
            .filter(new ConditionFilter(
                new CategoricalColumnCondition("MerchantCountryCode", ConditionOp.NotInSet, new HashSet<>(Arrays.asList("USA","CAN")))))

            //Let's suppose our data source isn't perfect, and we have some invalid data: negative dollar amounts that we want to replace with 0.0
            //For positive dollar amounts, we don't want to modify those values
            //Use the ConditionalReplaceValueTransform on the "TransactionAmountUSD" column:
            .conditionalReplaceValueTransform(
                "TransactionAmountUSD",     //Column to operate on
                new DoubleWritable(0.0),    //New value to use, when the condition is satisfied
                new DoubleColumnCondition("TransactionAmountUSD",ConditionOp.LessThan, 0.0)) //Condition: amount < 0.0

            //Finally, let's suppose we want to parse our date/time column in a format like "2016/01/01 17:50.000"
            //We use JodaTime internally, so formats can be specified as follows: http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html
            .stringToTimeTransform("DateTimeString","YYYY-MM-DD HH:mm:ss.SSS", DateTimeZone.UTC)

            //However, our time column ("DateTimeString") isn't a String anymore. So let's rename it to something better:
            .renameColumn("DateTimeString", "DateTime")

            //At this point, we have our date/time format stored internally as a long value (Unix/Epoch format): milliseconds since 00:00.000 01/01/1970
            //Suppose we only care about the hour of the day. Let's derive a new column for that, from the DateTime column
            .transform(new DeriveColumnsFromTimeTransform.Builder("DateTime")
                .addIntegerDerivedColumn("HourOfDay", DateTimeFieldType.hourOfDay())
                .build())

            //We no longer need our "DateTime" column, as we've extracted what we need from it. So let's remove it
            .removeColumns("DateTime")

            //We've finished with the sequence of operations we want to do: let's create the final TransformProcess object
            .build();


        //After executing all of these operations, we have a new and different schema:
        Schema outputSchema = tp.getFinalSchema();

        System.out.println("\n\n\nSchema after transforming data:");
        System.out.println(outputSchema);


        //=====================================================================
        //      Step 3: Load our data and execute the operations on Spark
        //=====================================================================

        //We'll use Spark local to handle our data

        SparkConf conf = new SparkConf();
        conf.setMaster("local[*]");
        conf.setAppName("DataVec Example");

        JavaSparkContext sc = new JavaSparkContext(conf);

        //Define the path to the data file. You could use a directory here if the data is in multiple files
        //Normally just define your path like "file:/..." or "hdfs:/..."
        String path = new ClassPathResource("BasicDataVecExample/exampledata.csv").getFile().getAbsolutePath();
        JavaRDD<String> stringData = sc.textFile(path);

        //We first need to parse this format. It's comma-delimited (CSV) format, so let's parse it using CSVRecordReader:
        RecordReader rr = new CSVRecordReader();
        JavaRDD<List<Writable>> parsedInputData = stringData.map(new StringToWritablesFunction(rr));

        //Now, let's execute the transforms we defined earlier:
        JavaRDD<List<Writable>> processedData = SparkTransformExecutor.execute(parsedInputData, tp);

        //For the sake of this example, let's collect the data locally and print it:
        JavaRDD<String> processedAsString = processedData.map(new WritablesToStringFunction(","));
        //processedAsString.saveAsTextFile("file://your/local/save/path/here");   //To save locally
        //processedAsString.saveAsTextFile("hdfs://your/hdfs/save/path/here");   //To save to hdfs

        List<String> processedCollected = processedAsString.collect();
        List<String> inputDataCollected = stringData.collect();


        System.out.println("\n\n---- Original Data ----");
        for(String s : inputDataCollected) System.out.println(s);

        System.out.println("\n\n---- Processed Data ----");
        for(String s : processedCollected) System.out.println(s);


        System.out.println("\n\nDONE");
    }
}
  • log
...
---- Original Data ----
2016-01-01 17:00:00.000,830a7u3,u323fy8902,1,USA,100.00,Legit
2016-01-01 18:03:01.256,830a7u3,9732498oeu,3,FR,73.20,Legit
2016-01-03 02:53:32.231,78ueoau32,w234e989,1,USA,1621.00,Fraud
2016-01-03 09:30:16.832,t842uocd,9732498oeu,4,USA,43.19,Legit
2016-01-04 23:01:52.920,t842uocd,cza8873bm,10,MX,159.65,Legit
2016-01-05 02:28:10.648,t842uocd,fgcq9803,6,CAN,26.33,Fraud
2016-01-05 10:15:36.483,rgc707ke3,tn342v7,2,USA,-0.90,Legit


---- Processed Data ----
17,1,USA,100.00,Legit
2,1,USA,1621.00,Fraud
9,4,USA,43.19,Legit
2,6,CAN,26.33,Fraud
10,2,USA,0.0,Legit


DONE
...