API Streaming incorporates very useful tools for accessing data streams. The streaming allows JavaScript to access various data streams in a given network thereby processing them as per the user’s demand. JavaScript represents readable streams through an underlying source. The underlying source consists of two types, the pull and push sources.

The Pull sources require the user to request stream data immediately after connection i.e. accessing a file using XHR and Fetch.

The Push sources offer constant push of data to the user after the access. Not unless the user cancels or pauses the data stream, it continues to push data nonstop i.e. Web/TCP Sockets and Video Streams.

The Stream data is read in sequential small bits known as Chunks. One chunk can accommodate various types or sizes of data.

The Process

After the chunks are put in an API streaming, they form a queue waiting to be read. All chunks that have not been read are recorded. The streamed data is passed through a Reader that does sequential processing giving time to the user to act. The processing code and reader are known as the consumer. The good thing is that every reader has a controller that regulates the streamed data. It is important to understand that it is only one reader that can be active on any given stream. The reader reading is said to be locked or Active Reader. If you want to use a different reader to read the data stream, then you have two options, either Tee the streams or cancel one reader.

Even though, we have talked much about a stream, what is it?

What is A Stream?

A sequenced input of objects supporting different methods that is passed through a pipeline to achieve a specific outcome. Or else, in brief, it can be a piped stream of messages. A simpler definition could be the generated content from a specific source. It is important to note that a stream does not alter the inputted data structure and only provides results as per pipelined data. Another thing is that it is not a structured data but only collects collections from sources. A stream has a source that comes before a Zero or more intermediate combinations of formats with a terminal procedure to develop the input from sources as per desired methods. Below is a code example:

// imports
import org.apache.storm.streams.Stream;
import org.apache.storm.streams.StreamBuilder;
...
StreamBuilder builder = new StreamBuilder();
// a stream of sentences obtained from a source spout
Stream sentences = builder.newStream(new RandomSentenceSpout()).map(tuple -> tuple.getString(0));
// a stream of words obtained by transforming (splitting) the stream of sentences
Stream words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));
// output operation that prints the words to console
words.forEach(w -> System.out.println(w));

Any stream supports both Output Operations and Transformations.

Output Operations: An operation of producing results i.e. forEach in the code above.

Transformations: An operation that produces another stream or a duplicate stream i.e. flat map in the above code.

A StreamBuilder has the relevant builder APIs that create a new stream. The builder tracks and manages all the stream operations in a pipeline.

StreamBuilder builder = new StreamBuilder();
Stream sentences = builder.newStream(new TestSentenceSpout());

The Streaming APIs support various operations among them, filtering, aggregating, joining, windowing, branching, outputting and debugging.

The Basic Transformations

These include Windowing, Mapping, and Filtering.

The Filter is responsible for returning a stream that consists of characteristics matching the particular Predicate.

Stream logs = ...
Stream errors = logs.filter(line -> line.contains("ERROR"));

Mapping has two dimensions, the Map, and Flatmap.

The Map gives a stream comprising the result of the Mapping function in stream values.

Stream words = ...
Stream wordLengths = words.map(String::length);

The flatmap results to a stream that has results altering the stream’s value with mapping function contents.

Stream sentences = ...
Stream words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));

The Windowing Creates a stream that falls within a particular window parameter. The stream has elements that fit to that window.

Stream windowedStream = stream.window(Window windowConfig);
// time based sliding window
stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1)));
// count based sliding window
stream.window(SlidingWindows.of(Count.(10), Count.of(2)));
// tumbling window
stream.window(TumblingWindows.of(Duration.seconds(10));
// specifying timestamp field for event time based processing and a late tuple stream.
stream.window(TumblingWindows.of(Duration.seconds(10)
.withTimestampField("ts")
.withLateTupleStream("late_events"));

It is important to understand that the operation is responsible for splitting a continuous stream sequence into subsets. It performs aggregations and joins operations.

Transforming to Value Pairs

Here the stream values are transformed to Key Value Pairs i.e. Mapping to Pair and flat mapping to Pair.

Stream integers = … // 1, 2, 3, 4, ...
PairStream<Integer, Integer> squares = integers.mapToPair(x -> Pair.of(x, x*x)); // (1, 1), (2, 4), (3, 9), (4, 16), ...

Aggregating the key values in a given stream is done through aggregation operation by emitting aggregate results. Under this section, we have aggregate and reduce.

An aggregate operation performs a mutable reduction by accumulation as it processes the outcome. On the other hand, Reduce operation gets a single value by reducing two values by applying a given reducer.

public class Sum implements CombinerAggregator<Long, Long, Long> {
// The initial value of the sum
@Override
public Long init() {
return 0L;
}
// Updates the sum by adding the value (this could be a partial sum)
@Override
public Long apply(Long aggregate, Long value) {
return aggregate + value;
}
// merges the partial sums
@Override
public Long merge(Long accum1, Long accum2) {
return accum1 + accum2;
}
// extract result from the accumulator (here the accumulator and result is the same)
@Override
public Long result(Long accum) {
return accum;
}
}

Other operations such as Aggregate/reduce by keys, group by key, count by key, and Repartition apply.

Finally, to push out the changed values in a particular stream to external outputs like databases, console or files one has to perform Output Operations. The operations include Printing, Peeking, branching, and joining among others.

Printing


// transforms words to uppercase and prints to the console
words.map(String::toUpperCase).print();

Peeking


builder.newStream(...).flatMap(s -> Arrays.asList(s.split(" ")))
// print the results of the flatMap operation as the values flow across the stream.
.peek(s -> System.out.println(s))
.mapToPair(w -> new Pair<>(w, 1))

Branching


Stream[] streams = builder.newStream(new RandomIntegerSpout(), new ValueMapper(0))
.branch(x -> (x % 2) == 0,
x -> (x % 2) == 1);
Stream evenNumbers = streams[0];
Stream oddNumbers = streams[1];

Joining


PairStream<Long, Long> squares = … // (1, 1), (2, 4), (3, 9) ...
PairStream<Long, Long> cubes = … // (1, 1), (2, 8), (3, 27) ...
// join the sqaures and cubes stream to produce (1, [1, 1]), (2, [4, 8]), (3, [9, 27]) ...
PairStream<Long, Pair<Long, Long>> joined = squares.window(TumblingWindows.of(Duration.seconds(5))).join(cubes);

Conclusion

Streaming API allows the user to access the streams of data in a network or whichever format. It entails the transformation of the data streams into tiny chunks then processing them as required. In this article, we have shared useful information that will assist you in understanding the processes behind API streaming pair.