Skip to content

StepRunner

This is a description of the StepRunner class. It is intended to be used as a container and runner for all ETL DataRailsStep child classes. Steps are run in the order they are added to the runner. The constructor take a list of step class definitions. A DataBox will be created automatically in the constructor and passed to each step as the execution progresses.

To use this class

from datarails.runner import DataRailsStepRunner

This class orchestrates the execution of a sequence of DataRailsSteps. It primarily uses DataBox as input, advancing through the steps and managing the DataBox state.

Attributes:

Name Type Description
steps List[Type[DataRailsStep]]

The ordered list of step classes to be executed.

dbx DataBox

The DataBox object for data management.

ctx DataRailsContext

The DataRailsContext object for context management.

Source code in datarails/runner.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class DataRailsStepRunner:
    """
    This class orchestrates the execution of a sequence of DataRailsSteps. It primarily uses
    DataBox as input, advancing through the steps and managing the DataBox state.

    Attributes:
        steps (List[Type[DataRailsStep]]): The ordered list of step classes to be executed.
        dbx (DataBox): The DataBox object for data management.
        ctx (DataRailsContext): The DataRailsContext object for context management.
    """

    def __init__(
        self, steps: List[Type[DataRailsStep]], dbx: Optional[DataBox] = None, ctx: Optional[DataRailsContext] = None
    ) -> None:
        """
        Initializes the StepRunner object with a list of DataRailsStep classes.

        Args:
            steps (List[Type[DataRailsStep]]): The list of step classes to be executed.
            dbx (DataBox, optional): The DataBox object. If not provided, a new one is created.
            ctx (DataRailsContext, optional): The DataRailsContext object. If not provided, a new one is created.
        """
        self.steps = steps
        self.dbx = dbx or DataBox()
        self.ctx = ctx or DataRailsContext()
        self._i = 0

    def reset(self):
        """
        Resets the steps_iterator to its initial state, enabling the step sequence to be run again from the start.
        """
        self._i = 0

    def _i_in_bounds(self) -> bool:
        """
        Checks if the current index is within the bounds of the steps list.

        Returns:
            bool: True if the index is in bounds, False otherwise.
        """
        return 0 <= self._i < len(self.steps)

    def get_current_step(self):
        """
        Retrieves the current step instance based on the index.

        Returns:
            DataRailsStep: The current step instance.
        """
        return self.steps[self._i](self.dbx, self.ctx)

    def print_current_step(self) -> None:
        """
        Prints the details of the current step to standard output.
        """
        step_instance = self.get_current_step()
        print(f"The Current Step : {self._i} : {step_instance}")

    def advance(self) -> None:
        """
        Executes the current step in the steps list and advances the index to the next one.
        Prints a message if there are no more steps to execute.
        """
        if self._i_in_bounds():
            current_step_instance = self.get_current_step()
            print(f"Running step: {self._i} : {current_step_instance}")
            self.dbx = current_step_instance.run()
            self._i += 1
        else:
            print("All Steps Completed.")

    def run(self) -> None:
        """
        Executes all steps in the steps list. If all steps have been completed, it stops the execution.
        """
        while self._i_in_bounds():
            self.advance()

__init__(steps, dbx=None, ctx=None)

Initializes the StepRunner object with a list of DataRailsStep classes.

Parameters:

Name Type Description Default
steps List[Type[DataRailsStep]]

The list of step classes to be executed.

required
dbx DataBox

The DataBox object. If not provided, a new one is created.

None
ctx DataRailsContext

The DataRailsContext object. If not provided, a new one is created.

None
Source code in datarails/runner.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
    self, steps: List[Type[DataRailsStep]], dbx: Optional[DataBox] = None, ctx: Optional[DataRailsContext] = None
) -> None:
    """
    Initializes the StepRunner object with a list of DataRailsStep classes.

    Args:
        steps (List[Type[DataRailsStep]]): The list of step classes to be executed.
        dbx (DataBox, optional): The DataBox object. If not provided, a new one is created.
        ctx (DataRailsContext, optional): The DataRailsContext object. If not provided, a new one is created.
    """
    self.steps = steps
    self.dbx = dbx or DataBox()
    self.ctx = ctx or DataRailsContext()
    self._i = 0

advance()

Executes the current step in the steps list and advances the index to the next one. Prints a message if there are no more steps to execute.

Source code in datarails/runner.py
65
66
67
68
69
70
71
72
73
74
75
76
def advance(self) -> None:
    """
    Executes the current step in the steps list and advances the index to the next one.
    Prints a message if there are no more steps to execute.
    """
    if self._i_in_bounds():
        current_step_instance = self.get_current_step()
        print(f"Running step: {self._i} : {current_step_instance}")
        self.dbx = current_step_instance.run()
        self._i += 1
    else:
        print("All Steps Completed.")

get_current_step()

Retrieves the current step instance based on the index.

Returns:

Name Type Description
DataRailsStep

The current step instance.

Source code in datarails/runner.py
49
50
51
52
53
54
55
56
def get_current_step(self):
    """
    Retrieves the current step instance based on the index.

    Returns:
        DataRailsStep: The current step instance.
    """
    return self.steps[self._i](self.dbx, self.ctx)

print_current_step()

Prints the details of the current step to standard output.

Source code in datarails/runner.py
58
59
60
61
62
63
def print_current_step(self) -> None:
    """
    Prints the details of the current step to standard output.
    """
    step_instance = self.get_current_step()
    print(f"The Current Step : {self._i} : {step_instance}")

reset()

Resets the steps_iterator to its initial state, enabling the step sequence to be run again from the start.

Source code in datarails/runner.py
34
35
36
37
38
def reset(self):
    """
    Resets the steps_iterator to its initial state, enabling the step sequence to be run again from the start.
    """
    self._i = 0

run()

Executes all steps in the steps list. If all steps have been completed, it stops the execution.

Source code in datarails/runner.py
78
79
80
81
82
83
def run(self) -> None:
    """
    Executes all steps in the steps list. If all steps have been completed, it stops the execution.
    """
    while self._i_in_bounds():
        self.advance()