DataVec

DataVec帮助克服机器学习及深度学习实现过程中最重大的障碍之一:将数据转化为神经网络能够识别的格式

神经网络所能识别的是向量。

因此,对许多数据科学家而言,在开始用数据定型自己的算法之前,首先必须要解决向量化的问题。

如果您的数据以CSV(逗号分隔值)格式储存在平面文件中,必须先转换为数值格式再加以摄取,又或者您的数据是一些有标签的图像的目录结构, 那么 DataVec 这款工具就可以帮助您组织数据,以供在Deeplearning4J中使用。

主要特点

  • DataVec 采用输入/输出格式系统(就像Hadoop MapReduce用InputFormat来确定具体的InputSplit和RecordReader一样, DataVec也会用不同的RecordReader来将数据序列化)

  • 支持所有主要的输入数据类型(文本、CSV、音频、图像、视频),每种类型都有相应的输入格式

  • 采用输出格式系统来指定一种与实现无关的向量格式(ARFF、SVMLight等)

  • 可以为特殊输入格式(如某些罕见的图像格式)进行扩展;也就是说,您可以编写自定义的输入格式,让余下的基本代码来处理转换加工管道

  • 让向量化成为“一等公民”

  • 内置数据转换及标准化工具

应用举例

  • 将基于CSV格式的UCI鸢尾花数据集转换为svmLight开放式向量文本格式

  • 将MNIST数据集的原始二进制文件转换为svmLight文本格式。

  • 将原始文本转换为Metronome向量格式

  • 用TF-IDF方法将原始文本转换为文本向量格式 {svmLight, metronome, arff}

  • 将原始文本转换为word2vec文本向量格式 {svmLight, metronome, arff}

支持的向量化引擎

  • 用脚本语言将各种CSV转换为向量
  • MNIST转换为向量
  • 文本转换为向量

    • TF-IDF
    • 词袋
    • Word2vec

CSV转换引擎

CSVRecordReader足以处理格式规范的数值数据;但如果数据包含表示布尔值(真/假)的字符串或标签的字符串等非数值字段,那就需要进行架构(Schema)转换。 DataVec使用Apache Spark来进行转换运算。*即使不了解Spark的内部细节,也能成功使用DataVec进行转换

Example

BasicDataVecExample.java

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/** * Basic DataVec example for preprocessing operations on some simple CSV data. If you just want to load CSV data * and pass it on for learning take a look at {@see org.deeplearning4j.examples.dataExample.CSVExample}. * * The premise here is that some data regarding transactions is available in CSV format, and we want to do some some * operations on this data, including: * 1. Removing some unnecessary columns * 2. Filtering examples to keep only examples with values "USA" or "CAN" for the "MerchantCountryCode" column * 3. Replacing some invalid values in the "TransactionAmountUSD" column * 4. Parsing the date string, and extracting the hour of day from it to create a new "HourOfDay" column * * @author Alex Black */ public class BasicDataVecExample { public static void main(String[] args) throws Exception { //===================================================================== // Step 1: Define the input data schema //===================================================================== //Let's define the schema of the data that we want to import //The order in which columns are defined here should match the order in which they appear in the input data Schema inputDataSchema = new Schema.Builder() //We can define a single column .addColumnString("DateTimeString") //Or for convenience define multiple columns of the same type .addColumnsString("CustomerID", "MerchantID") //We can define different column types for different types of data: .addColumnInteger("NumItemsInTransaction") .addColumnCategorical("MerchantCountryCode", Arrays.asList("USA","CAN","FR","MX")) //Some columns have restrictions on the allowable values, that we consider valid: .addColumnDouble("TransactionAmountUSD",0.0,null,false,false) //$0.0 or more, no maximum limit, no NaN and no Infinite values .addColumnCategorical("FraudLabel", Arrays.asList("Fraud","Legit")) .build(); //Print out the schema: System.out.println("Input data schema details:"); System.out.println(inputDataSchema); System.out.println("\n\nOther information obtainable from schema:"); System.out.println("Number of columns: " + inputDataSchema.numColumns()); System.out.println("Column names: " + inputDataSchema.getColumnNames()); System.out.println("Column types: " + inputDataSchema.getColumnTypes()); //===================================================================== // Step 2: Define the operations we want to do //===================================================================== //Lets define some operations to execute on the data... //We do this by defining a TransformProcess //At each step, we identify column by the name we gave them in the input data schema, above TransformProcess tp = new TransformProcess.Builder(inputDataSchema) //Let's remove some column we don't need .removeColumns("CustomerID","MerchantID") //Now, suppose we only want to analyze transactions involving merchants in USA or Canada. Let's filter out // everything except for those countries. //Here, we are applying a conditional filter. We remove all of the examples that match the condition // The condition is "MerchantCountryCode" isn't one of {"USA", "CAN"} .filter(new ConditionFilter( new CategoricalColumnCondition("MerchantCountryCode", ConditionOp.NotInSet, new HashSet<>(Arrays.asList("USA","CAN"))))) //Let's suppose our data source isn't perfect, and we have some invalid data: negative dollar amounts that we want to replace with 0.0 //For positive dollar amounts, we don't want to modify those values //Use the ConditionalReplaceValueTransform on the "TransactionAmountUSD" column: .conditionalReplaceValueTransform( "TransactionAmountUSD", //Column to operate on new DoubleWritable(0.0), //New value to use, when the condition is satisfied new DoubleColumnCondition("TransactionAmountUSD",ConditionOp.LessThan, 0.0)) //Condition: amount < 0.0 //Finally, let's suppose we want to parse our date/time column in a format like "2016/01/01 17:50.000" //We use JodaTime internally, so formats can be specified as follows: http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html .stringToTimeTransform("DateTimeString","YYYY-MM-DD HH:mm:ss.SSS", DateTimeZone.UTC) //However, our time column ("DateTimeString") isn't a String anymore. So let's rename it to something better: .renameColumn("DateTimeString", "DateTime") //At this point, we have our date/time format stored internally as a long value (Unix/Epoch format): milliseconds since 00:00.000 01/01/1970 //Suppose we only care about the hour of the day. Let's derive a new column for that, from the DateTime column .transform(new DeriveColumnsFromTimeTransform.Builder("DateTime") .addIntegerDerivedColumn("HourOfDay", DateTimeFieldType.hourOfDay()) .build()) //We no longer need our "DateTime" column, as we've extracted what we need from it. So let's remove it .removeColumns("DateTime") //We've finished with the sequence of operations we want to do: let's create the final TransformProcess object .build(); //After executing all of these operations, we have a new and different schema: Schema outputSchema = tp.getFinalSchema(); System.out.println("\n\n\nSchema after transforming data:"); System.out.println(outputSchema); //===================================================================== // Step 3: Load our data and execute the operations on Spark //===================================================================== //We'll use Spark local to handle our data SparkConf conf = new SparkConf(); conf.setMaster("local[*]"); conf.setAppName("DataVec Example"); JavaSparkContext sc = new JavaSparkContext(conf); //Define the path to the data file. You could use a directory here if the data is in multiple files //Normally just define your path like "file:/..." or "hdfs:/..." String path = new ClassPathResource("BasicDataVecExample/exampledata.csv").getFile().getAbsolutePath(); JavaRDD<String> stringData = sc.textFile(path); //We first need to parse this format. It's comma-delimited (CSV) format, so let's parse it using CSVRecordReader: RecordReader rr = new CSVRecordReader(); JavaRDD<List<Writable>> parsedInputData = stringData.map(new StringToWritablesFunction(rr)); //Now, let's execute the transforms we defined earlier: JavaRDD<List<Writable>> processedData = SparkTransformExecutor.execute(parsedInputData, tp); //For the sake of this example, let's collect the data locally and print it: JavaRDD<String> processedAsString = processedData.map(new WritablesToStringFunction(",")); //processedAsString.saveAsTextFile("file://your/local/save/path/here"); //To save locally //processedAsString.saveAsTextFile("hdfs://your/hdfs/save/path/here"); //To save to hdfs List<String> processedCollected = processedAsString.collect(); List<String> inputDataCollected = stringData.collect(); System.out.println("\n\n---- Original Data ----"); for(String s : inputDataCollected) System.out.println(s); System.out.println("\n\n---- Processed Data ----"); for(String s : processedCollected) System.out.println(s); System.out.println("\n\nDONE"); } }
  • log
  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
... ---- Original Data ---- 2016-01-01 17:00:00.000,830a7u3,u323fy8902,1,USA,100.00,Legit 2016-01-01 18:03:01.256,830a7u3,9732498oeu,3,FR,73.20,Legit 2016-01-03 02:53:32.231,78ueoau32,w234e989,1,USA,1621.00,Fraud 2016-01-03 09:30:16.832,t842uocd,9732498oeu,4,USA,43.19,Legit 2016-01-04 23:01:52.920,t842uocd,cza8873bm,10,MX,159.65,Legit 2016-01-05 02:28:10.648,t842uocd,fgcq9803,6,CAN,26.33,Fraud 2016-01-05 10:15:36.483,rgc707ke3,tn342v7,2,USA,-0.90,Legit ---- Processed Data ---- 17,1,USA,100.00,Legit 2,1,USA,1621.00,Fraud 9,4,USA,43.19,Legit 2,6,CAN,26.33,Fraud 10,2,USA,0.0,Legit DONE ...