流是什么

流是Java API的新成员,它允许你以声明的方式处理数据集合,简单来说,可以把它当作数据集的高级迭代器。

此外,流还可以透明地并行处理,你无需写任何多线程代码了。

举例说明

举个例子来说明流的好处,有一个简单的场景,要求返回低热量的菜肴名称,并按照卡路里排序,实体代码如下:

基础类

  • 菜肴
public class Dish {
    private final String name;
    private final boolean vegetarian;
    private final int calories;
    private final Type type;

    public Dish(String name, boolean vegetarian, int calories, Type type) {
        this.name = name;
        this.vegetarian = vegetarian;
        this.calories = calories;
        this.type = type;
    }

    //Getter & Setter
}
  • 餐厅
public class Restaurant {

    private List<Dish> menu = Arrays.asList(
            new Dish("pork", false, 800, Dish.Type.MEAT),
            new Dish("beef", false, 700, Dish.Type.MEAT), 
            new Dish("chicken", false, 400, Dish.Type.MEAT),
            new Dish("french fries", true, 530, Dish.Type.OTHER), 
            new Dish("rice", true, 350, Dish.Type.OTHER),
            new Dish("season fruit", true, 120, Dish.Type.OTHER), 
            new Dish("pizza", true, 550, Dish.Type.OTHER),
            new Dish("prawns", false, 300, Dish.Type.FISH), 
            new Dish("salmon", false, 450, Dish.Type.FISH));

    //G&S
}

jdk7 实现

 // 餐厅对象
Restaurant restaurant = new Restaurant();

// 返回低热量的菜肴名称,并按照卡路里排序。
// Java7写法
List<Dish> lowCaloricDishes = new ArrayList<>();
for (Dish d : restaurant.getMenu()) {
    if (d.getCalories() < 400)
        lowCaloricDishes.add(d);
}
Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
    @Override
    public int compare(Dish d1, Dish d2) {
        return Integer.compare(d1.getCalories(), d2.getCalories());
    }
});

List<String> lowCaloricDishesName = new ArrayList<>();
for(Dish d : lowCaloricDishes) {
    lowCaloricDishesName.add(d.getName());
}

这段代码中,用了一个“垃圾变量”lowCaloricDishes,它唯一的作用就是作为一次性的中间容器,在Java8中,实现的细节被放在它本该归属的库中:

jdk8 实现

// 餐厅对象
Restaurant restaurant = new Restaurant();
        
// Java8写法
List<String> threeHighCaloricDishNames = restaurant.getMenu().stream()
        .filter(d -> d.getCalories() > 300)
        .sorted(comparing(Dish::getCalories))
        .map(Dish::getName)
        .limit(3)
        .collect(toList());

jdk8 并行版本

利用多核架构并行执行这段代码只需要吧 stream() 换成 parallelStream() 就可以了。

private static List<String> getLowCaloricDishesNamesInJava8(List<Dish> dishes) {
    return dishes
            .parallelStream()
            // 选出400卡路里以下的菜肴
            .filter(d -> d.getCalories() < 400)
            // 按照卡路里排序
            .sorted(comparing(Dish::getCalories))
            // 提取菜名
            .map(Dish::getName)
            // 转为集合
            .collect(toList());
}

你可能会想,在调用parallelStream方法时到底发生了什么?用了多少个线程?对性能有多大的提升?

不用着急,在后面的读书笔记中会讨论这些问题。

现在,你可以看出,从软件工程师的角度来看,新的方法有几个显而易见的好处。

代码是以声明性的方式写的:说明想要完成什么(筛选热量低的菜肴)而不是说明如何实现一个操作(利用循环和if条件等控制流语句)。

你可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在 filter 后面接上 sorted 、 map 和 collect 操作),同时保持代码清晰可读。

filter 的结果被传给了 sorted 方法,再传给 map 方法,最后传给 collect 方法。

filter、sorted、map和collect等操作是与具体线程模型无关的高层次构件,所以它们的内部实现可以是单线程的,也可能透明地充分利用你的多核架构!

在实践中,这意味着我们用不着为了让某些数据处理任务并行而去操心线程和锁了,Stream API都替你做好了!

现在就来仔细探讨一下怎么使用Stream API。我们会用流与集合做类比,做点儿铺垫。

下一章会详细讨论可以用来表达复杂数据处理查询的流操作。我们会谈到很多模式,如筛选、切片、 查找、匹配、映射和归约,还会提供很多测验和练习来加深你的理解。

接下来,我们会讨论如何创建和操纵数字流,比如生成一个偶数流,或是勾股数流。

最后,我们会讨论如何从不同的源(比如文件)创建流。还会讨论如何生成一个具有无穷多元素的流,这用集合肯定是搞不定。

jdk8 stream 的优势

Java8中的Stream API可以让你写出的这样的代码:

声明性 —— 更简洁、更易读;

可复合 —— 更灵活;

可并行 —— 性能更好;

流简介

要讨论流,我们首先来谈谈集合,这是最容易上手的方式了。Java8中的集合支持一个新的stream方法,它会返回一个流(接口定义在 java.util.stream.Stream 里)。你在后面会看到,还有很多其他的方法可以得到流,比如利用数值范围或从I/O资源生成流元素。

那么,流到底是什么呢?简短的定义就是“从支持数据处理操作的源生成的元素序列”。

让我们一步步剖析这个定义。

元素序列:就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList )。但流的目的在于表达计算,比如你前面见到的filter 、 sorted 和 map 。集合讲的是数据,流讲的是计算。

源:流会使用一个提供数据的源,如集合、数组或输入/输出资源。请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。

数据处理操作:流的数据处理功能支持类似于 数据库 的操作,以及函数式编程语言中的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。

此外,流操作有两个重要的特点。

流水线:很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。

内部迭代:与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。

代码

让我们来看一段能够体现所有这些概念的代码:

List<Dish> menu = Dish.MENU;
// 从menu获得流
List<String> threeHighCaloricDishNames = menu.stream()
        // 通过链式操作,筛选出高热量的菜肴
        .filter(d -> d.getCalories() > 300)
        // 获取菜名
        .map(Dish::getName)
        .limit(3)
        .collect(Collectors.toList());
// [pork, beef, chicken]
System.out.println(threeHighCaloricDishNames);

看起来很简单,就算不明白也没关系,我们来了解了解,刚刚使用到的一些方法:

d -> d.getCalories() > 300
Dish::getName

在刚刚解释的这段代码,与遍历处理菜单集合的代码有很大的不同。

首先,我们使用了声明性的方式来处理菜单数据。我们并没有去实现筛选(filter)、提取(map)或截断(limit)功能,Stream库已经自带了。

因此,StreamAPI在决定如何优化这条流水线时更为灵活。

例如,筛选、提取和截断操作可以一次进行,并在找到这三道菜后立即停止。

流与集合的差别

是否急切

Java现有的集合概念和新的流概念都提供了接口,来配合代表元素型有序值的数据接口。所谓有序,就是说我们一般是按顺序取用值,而不是随机取用的。那这两者有什么区别呢?

打个比方说,我们在看电影的时候,这些视频就是一个流(字节流或帧流),流媒体视频播放器只要提前下载用户观看位置的那几帧就可以了,这样不用等到流中大部分值计算出来。比如,我们在Youtube上看的视频进度条随便拖动到一个位置,你会发现它很快就开始播放了,不需要将整个视频都加载好,而是加载了一段。如果,不按照这种方式的话,我们可以想象一下,视频播放器可能没有将整个流作为集合,保存所需要的内存缓冲区——而且要是非得等到最后一帧出现才能开始看,那等待的时间就太长了,早就没耐心看了。

初略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包含数据结构中目前所有的值,集合中的每个元素都得先算出来才能添加到集合中。

相比之下,流则是在概念上固定的数据结构,其元素则是按需计(懒加载)算的。需要多少就给多少。这是一种生产者与消费者的关系。从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会生成值。与之相反,集合则是急切创建的(就像黄牛囤货一样)。

只能遍历一次

流只能遍历一次,遍历完之后,就说这个流已经消费掉了。

可以从原始数据源在获得一个新的流来重新遍历一边,就像迭代器一样。

下面的操作会抛出一个异常:

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class Example2 {
    public static void main(String[] args) {
        List<String> title = Arrays.asList("Java8", "In", "Action");
        Stream<String> s = title.stream();
        s.forEach(System.out::println);
        s.forEach(System.out::println);
    }
}
  • 异常信息
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.util.stream.AbstractPipeline.sourceStageSpliterator(Unknown Source)
	at java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
	at cn.net.bysoft.chapter4.Example2.main(Example2.java:12)

外部迭代与内部迭代

使用Collection接口需要用用户去做迭代(比如用for-each),这个称为外部迭代。

反之,Stream库使用内部迭代,它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出一个函数说要干什么就可以了。

下面的代码说明了这种区别。

  • 集合:使用for-each循环外部迭代:
// 集合:使用for-each循环外部迭代
List<Dish> menu = Dish.MENU;
List<String> names = new ArrayList<>();
for (Dish dish : menu) {
    names.add(dish.getName());
}

请注意, for-each 还隐藏了迭代中的一些复杂性。

for-each结构是一个语法糖,它背后的东西用Iterator对象表达出来更要丑陋得多。

  • 集合:用背后的迭代器做外部迭代
List<String> names = new ArrayList<>();
Iterator<String> iterator = menu.iterator();
while(iterator.hasNext()) {
    Dish d = iterator.next();
    names.add(d.getName());
}
  • 流:内部迭代
List<String> names = menu.stream()
                    .map(Dish::getName)
                    .collect(toList());

让我们用一个比喻来解释一下内部迭代的差异和好处吧!比方说你在和你两岁的儿子说话,希望他能把玩家收起来。

你:“儿子,我们把玩家收起来吧。地上还有玩具吗?”
儿子:“有,球。”
你:“好,放进盒子里。还有吗?”
儿子:“有,那是我的飞机。”
你:“好,放进盒子里。还有吗?”
儿子:“有,我的书。”
你:“好,放进盒子里。还有吗?”
儿子:“没了,没有了。”
你:“好,我们收好啦!”

这正是你每天都要对Java集合做的。你外部迭代一个集合,显式地取出每个项目再加以处理。

如果,你对儿子说“把地上的所有玩具都放进盒子里收起来”就好了。

内部迭代的优势

内部迭代比较好的原因有二:第一,儿子可以选择一只手拿飞机,另一只手拿球第二,他可以决定先拿离盒子最近的那个东西,然后再拿别的。

同样的道理,内部迭代时,项目可以透明地并行处理,或者用更优化的顺序进行处理。

要是用Java过去的那种外部迭代方法,这些优化都是很困难的。

这似乎有点儿鸡蛋里挑骨头,但这差不多就是Java 8引入流的理由了,Stream库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。

与此相反,一旦通过写 for-each 而选择了外部迭代,那你基本上就要自己管理所有的并行问题了(自己管理实际上意味着“某个良辰吉日我们会把它并行化”或“开始了关于任务和 synchronized 的漫长而艰苦的斗争”)。

Java8需要一个类似于Collection 却没有迭代器的接口,于是就有了Stream!

下面的图说明了流(内部迭代)与集合(外部迭代)之间的差异。

流(内部迭代)与集合(外部迭代)之间的差异

我们已经了解过了集合与流在概念上的差异,特别是利用内部迭代:替你把迭代做了。

但是,只有你已经预先定义好了能够隐藏迭代的操作集合。例如filter或map,这个才有用。

大多数这类操作都接受Lambda表达式作为参数,因此我们可以用前面所了解的知识来参数化其行为。

流操作

可以连接起来的流操作称为中间操作,比如filter,map,sorted,limit等。关闭流量操作,则称为终端操作,比如collect,count。

中间操作会返回一个流,这样多个操作可以连接起来形成一个查询,重要的是,除非流水线上触发了一个终端操作,否则中间操作不会执行任何处理,因为他们都很懒。

中间操作一般都可以合并起来,在终端操作时一次性全部处理。我们把这种技术叫做循环合并,作者在这里给了一个例子可以比较清楚地看出确实是有循环合并。

List<String> names = menu.stream()
                // 中间操作
                .filter(d -> d.getCalories() > 300)
                // 中间操作
                .map(Dish::getName)
                // 中间操作
                .limit(3)
                // 将Stream转为List
                .collect(toList());

中间操作

诸如filter和sorted等中间操作会返回一个流。让多个操作可以连接起来形成一个查询。

重要的是,除非流水线上触发一个终端操作,否则中间操作不会执行任何处理它们懒得很。这就是因为中间操作一般都可以合并起来,在终端操作时一次性全部处理。

为了搞清楚流水线到底发生了什么,我们把代码改一改,让每个Lambda都打印出当前处理的菜肴(就像很多演示和调试技巧一样,这种编程风格要是搁在生产代码里那就吓死人了,但是学习的时候却可以直接看清楚求值的顺序):

List<String>  names = menu.stream()
        .filter(d -> {
            System.out.println("filtering:" + d.getName());
            return d.getCalories() > 300;
        })
        .map(dish -> {
            System.out.println("mapping:" + dish.getName());
            return dish.getName();
        })
        .limit(3)
        .collect(toList());
System.out.println(names);
  • 执行结果:
filtering:pork
mapping:pork
filtering:beef
mapping:beef
filtering:chicken
mapping:chicken
[pork, beef, chicken]

从上面的打印结果,我们可以发现有好几种优化利用了流的延迟性质。

第一,尽管有很多热量都高于300卡路里,但是只会选择前三个!

因为limit操作和一种称为短路的技巧,

第二,尽管filter和map是两个独立的操作,但是它们合并到同一次便利中了(我们把这种技术叫做循环合并)。

终端操作

终端操作会从流的流水线生产结果。其结果是任何不是流的值,比如List、Integer,甚至是void。

例如,在下面的流水线中,foreachh返回的是一个void的终端操作,它对源中的每道菜应用一个Lambda。

把System.out.println()传递给foreach,并要求它打印出由menu生成的流中每一个Dish:

menu.stream().forEach(System.out::println);

为了检验一下对终端操作已经中间操作的理解,下面我们一起来看看一个例子:

下面哪些是中间操作哪些是终端操作?

long count = menu.stream()
            .filter(d -> d.getCalories() > 300)
            .distinct()
            .limit(3)
            .count();

答案:流水线中最后一个操作是count,它会返回一个long,这是一个非Stream的值。

因此,它是终端操作。

#使用流

总而言之,流的使用一般包括三件事:

一个数据源(比如集合)来执行查询 一个中间操作链,形成一条流的流水线 一个终端操作,执行流水线,并能生成结果。 流的流水线背后的理念类似于构建器模式。 在构建器模式中有一个调用链用来设置一套配置(对流来说这就是一个中间操作链),接着是调用built方法(对流来说就是终端操作)。其实,我们目前所看的Stream的例子用到的方法并不是它的全部,还有一些其他的一些操作。

在本章中,我们所接触到的一些中间操作与终端操作:

java.util.stream.Stream中的Stream接口定义了许多操作,可以分为两大类:中间操作和终端操作。

操作 类型 返回 参数 描述
filter 中间 Stream Predicate T -> boolean
map 中间 Stream Function<T,R> T -> R
limit 中间 Stream    
sorted 中间 Stream Comparator (T,T) -> int
distint 中间 Stream    
forEach 终端      
count 终端      
collect 终端      

诸如filter或sorted等中间操作会返回另一个流。这让多个操作可以连接起来形成一个查询。

终端操作会从流的流水线生成结果。

流的使用

总而言之,流的使用一般包括三件事:

  1. 一个数据源来执行一个查询;

  2. 一个中间操作链形成一条流水线;

  3. 一个终端操作执行流水线,并生成结果;

参考资料

《java8 实战》 P82

Java8实战

Java8实战 — 引入流

Java8中的并行流

《Java8实战》-第四章读书笔记(引入流Stream)

https://www.jianshu.com/p/6fab3047c7e7

  • 主要参考

【Java8实战】开始使用流

JDK8 实战系列