Pipelines and Blocks – Pipeline Basics
More articles in Accessing and Manipulating Data
Problem
You want to chain blocks together so that the output of one block is fed into another.
Solution
Up until now, each block was run individually using the .execute
method of the block. While it is possible to manually collect the output from one block and provide it as an input parameter to the next, to reap the full benefits of all the features of inter-connecting blocks, you’ll need to create a pipeline.
A Pipeline is essentially a DAG (Directed Acyclic Graph) of blocks where data flows only in one direction. An example of a pipeline is as shown below.
-
from razor.blocks import Block, inputs, outputs
from razor.blocks import SocketTransport
from razor.blocks import ThreadExecutor, ProcessExecutor
from razor.pipeline import Pipeline
import random
@outputs.series.generic('number_list')
class GenerateNumbers(Block):
def run (self, number_list):
for i in range(1, 100):
n = random.randint(1, 1000)
number_list.put(n)
@inputs.series.generic('number_list')
@inputs.atomic.generic('factor')
@outputs.series.generic('factor_list')
class MultiplyByFactor(Block):
def run (self, number_list, factor, factor_list):
for number in number_list:
res = number * factor
factor_list.put(res)
@inputs.series.generic('factor_list')
class SumListElements(Block):
def run(self, factor_list):
factor_sum = 0
for factor in factor_list:
factor_sum += factor
print(factor_sum)
To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of GenerateNumbers
is passed as an input to the MultiplyByFactor
and the output of MultiplyByFactor
is passed as in input to the SumListElements
A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the following pipeline, the number list which is passed to the MultiplyByFactor
block is accessed as generate_nums.number_list.
-
To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of GenerateNumbers is passed as an input to the MultiplyByFactor and the output of MultiplyByFactor is passed as in input to the SumListElements
A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the following pipeline, the number list which is passed to the MultiplyByFactor block is accessed as generate_nums.number_list.
Transport and Executor
The transporter, defines the mechanism through which data is transported to the next block. In the above block the SocketTransport
is used to transport the output variables from one block to the next.
The executor, defines where the block will be executed. In the above example, a ThreadExecutor
is used which makes the blocks execute within a thread.
-
pipeline = Pipeline(targets=[sum_list_elements])
The pipeline class
has a function called show()
which can be used by you to visualize the DAG structure of the pipeline. It is illustrated as follows.
-
pipeline.show()
The pipeline can be run, using the execute()
method present within the pipeline
class.
-
pipeline.execute()