Pipelines and Blocks – Streaming inputs and outputs
More articles in Accessing and Manipulating Data
Problem Statement
You want to define a block that works on a stream of data.
Solution
In the @inputs
decorator class use the series
attribute. Razor SDK treats a series of input as a Queue. There are a couple of ways to retrieve values from this queue:
- by treating the input as an iterator
- by using
.get
method to take one value at a time
The following example shows you how to rewrite the block from the previous recipe to one that accepts a stream of strings as input:
-
@inputs.series.generic(name='texts', doc='A string of text to split')
@inputs.atomic.generic(name='delimiter', doc='A single character or a sequence of characters')
@outputs.series.generic(name='data', doc='Results of the split operation')
class SplitStringSeries(Block):
def run(self, texts, delimiter, data):
for text in texts:
res = text.split(delimiter)
data.put(res)
Although stream blocks are usually used in pipelines where a series of operations are performed by blocks one after another, when using the .execute
method to run a single block, a series input can be provided as an iterative object such as list, tuple, generator etc. In the above example, yield
ing the processed value rather than return
ing it will cause this block to stream its output as well.
-
# inputs can also be specified as class parameters during instantiation
split_string_series = (
SplitStringSeries()
.delimiter('/')
.texts(['1/1/2020', '3/1/2020', '23/1/2020'])
)
result = split_string_series.execute()
list(result['data'].values())
-
# using generator comprehension to define a date generator
dates = (f'{i}/1/2000' for i in range(3))
result = split_string_series.texts(dates).execute()
list(result['data'].values())
Using .get
method to retrieve values from the stream is typically used when you don’t need to access the values sequentially; for example blocks that processes values in parallel. Upcoming recipes describe such blocks in more detail.