Gunnar Morling

Gunnar Morling

Random Musings on All Things Software Engineering

Gunnar Morling

Gunnar Morling

Random Musings on All Things Software Engineering

A Zipping Gatherer

Posted at Mar 18, 2024

The other day, I was looking for means of zipping two Java streams: connecting them element by element—​essentially a join based on stream offset position—​and emitting an output stream with the results. Unfortunately, there is no zip() method offered by the Java Streams API itself. While it was considered for inclusion in early preview versions, the method was removed before the API went GA with Java 8 and you have to resort to 3rd party libraries such as Google Guava if you need this functionality.

Java 22, scheduled for release later this week, promises to improve the situation here. It introduces a preview API for so-called stream gatherers (JEP 461). Similar to how collectors allow you to implement custom terminal operations on Java streams, gatherers let you add custom intermediary operations to a stream pipeline, providing an extension point for adding stream operations such as distinct() or window(), without having to bake them into the API itself. This sounds pretty much like what we need for a zip() method, doesn’t it?

So I spent some time studying the JEP and here’s the basic implementation I came up with:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public record ObjToObjZipper<T1, T2, R>(
    Stream<T2> other,
    BiFunction<T1, T2, R> zipperFunction) (1)
    implements Gatherer<T1, Iterator<T2>, R> { (2)

  @Override
  public Supplier<Iterator<T2>> initializer() { (3)
    return () -> other.iterator();
  }

  @Override
  public Integrator<Iterator<T2>, T1, R> integrator() { (4)
    return Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
      if (state.hasNext()) {
        return downstream.push(zipperFunction.apply(element, state.next()));
      }

      return false;
    });
  }
}
1 This gatherer takes the stream to zip with and a function, which is applied to pairs of elements of the two streams and returns the zipped result
2 Gatherer has three type parameters: the element type of the stream the gatherer is applied to, a type for keeping track of intermediary state (in our case, that’s just the iterator of the second stream), and the output type
3 initializer() returns a supplier of the state tracking type, if needed
4 integrator() returns a function which "integrates provided elements, potentially using the provided intermediate state, optionally producing output to the provided Downstream"

It’s the first time I have been using this API, so I hope I haven’t done anything too stupid :) The key part of the gatherer is its Integrator implementation. This is where for each element of the stream the gatherer is applied to, we take the corresponding element of the given second stream, apply the given function, and emit the function’s return value to the next stage in the stream pipeline.

This particular implementation stops emitting elements as soon as one of the two streams has been exhausted, but of course you also could have an implementation with "left join" semantics, or similar. With some more glue code for instantiating this zipping gatherer "builder style" (you can find the complete source code on GitHub), this is how it can be used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Test
public void canZipTwoObjectStreams() {
  List<String> letters = List.of("a", "b", "c", "d", "e");
  Stream<Integer> numbers = IntStream.range(0, letters.size())
      .mapToObj(i -> i);

  List<String> zipped = letters.stream()
      .gather(zip(numbers).with((letter, i) -> i + "-" + letter)) (1)
      .collect(Collectors.toList());

  assertThat(zipped).containsExactly("0-a", "1-b", "2-c", "3-d", "4-e");
}
1 gather() applies the given gatherer to each element of the stream

Et voilà, we have a zip() function which can be used with Java Streams, and short of having a zip() method directly on the Stream interface itself, the resulting code reads quite nicely. In order to avoid the boxing of the int stream, I’ve also built an ObjToIntZipper:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Test
public void canZipObjectWithIntStream() {
  List<String> letters = List.of("a", "b", "c", "d", "e");
  IntStream numbers = IntStream.range(0, letters.size());

  List<String> zipped = letters.stream()
      .gather(zip(numbers).with((letter, i) -> i + "-" + letter))
      .collect(Collectors.toList());

  assertThat(zipped).containsExactly("0-a", "1-b", "2-c", "3-d", "4-e");
}

Usually I am cautious of types with three or more type arguments, as it easily leads to APIs which are cumbersome to use. But the Gatherer API actually felt quite intuitive to me after just a little while.

The only real downside is that this gatherer cannot be parallelized. While the API itself allows for the creation of parallel-ready gatherers (by implementing the optional combiner()) method, you don’t have a handle to the second stream’s spliterator of a particular subdivision step from within a gatherer implementation. The only way for doing this is on the spliterator level, as shown by Jose Paumard in here. Note that both input streams must have the same length in order for this to work, as otherwise you’d end up zipping elements at different positions in the two input streams.

You can find the complete source code of the proof-of-concept zipping gatherer in this GitHub repository.