Skip to content

prefect.client.schemas

State

Bases: schemas.states.State.subclass(exclude_fields=[data]), Generic[R]

The state of a run.

Source code in prefect/client/schemas.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class State(schemas.states.State.subclass(exclude_fields=["data"]), Generic[R]):
    """
    The state of a run.
    """

    data: Union["BaseResult[R]", "DataDocument[R]", Any] = Field(
        default=None,
    )

    @overload
    def result(self: "State[R]", raise_on_failure: bool = True) -> R:
        ...

    @overload
    def result(self: "State[R]", raise_on_failure: bool = False) -> Union[R, Exception]:
        ...

    def result(self, raise_on_failure: bool = True, fetch: Optional[bool] = None):
        """
        Retrieve the result attached to this state.

        Args:
            raise_on_failure: a boolean specifying whether to raise an exception
                if the state is of type `FAILED` and the underlying data is an exception
            fetch: a boolean specifying whether to resolve references to persisted
                results into data. For synchronous users, this defaults to `True`.
                For asynchronous users, this defaults to `False` for backwards
                compatibility.

        Raises:
            TypeError: If the state is failed but the result is not an exception.

        Returns:
            The result of the run

        Examples:
            >>> from prefect import flow, task
            >>> @task
            >>> def my_task(x):
            >>>     return x

            Get the result from a task future in a flow

            >>> @flow
            >>> def my_flow():
            >>>     future = my_task("hello")
            >>>     state = future.wait()
            >>>     result = state.result()
            >>>     print(result)
            >>> my_flow()
            hello

            Get the result from a flow state

            >>> @flow
            >>> def my_flow():
            >>>     return "hello"
            >>> my_flow(return_state=True).result()
            hello

            Get the result from a failed state

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
            >>> state.result()  # Raises `ValueError`

            Get the result from a failed state without erroring

            >>> @flow
            >>> def my_flow():
            >>>     raise ValueError("oh no!")
            >>> state = my_flow(return_state=True)
            >>> result = state.result(raise_on_failure=False)
            >>> print(result)
            ValueError("oh no!")


            Get the result from a flow state in an async context

            >>> @flow
            >>> async def my_flow():
            >>>     return "hello"
            >>> state = await my_flow(return_state=True)
            >>> await state.result()
            hello
        """
        from prefect.states import get_state_result

        return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

    def to_state_create(self) -> schemas.actions.StateCreate:
        """
        Convert this state to a `StateCreate` type which can be used to set the state of
        a run in the API.

        This method will drop this state's `data` if it is not a result type. Only
        results should be sent to the API. Other data is only available locally.
        """
        from prefect.results import BaseResult

        return schemas.actions.StateCreate(
            type=self.type,
            name=self.name,
            message=self.message,
            data=self.data if isinstance(self.data, BaseResult) else None,
            state_details=self.state_details,
        )

result

Retrieve the result attached to this state.

Parameters:

Name Type Description Default
raise_on_failure bool

a boolean specifying whether to raise an exception if the state is of type FAILED and the underlying data is an exception

True
fetch Optional[bool]

a boolean specifying whether to resolve references to persisted results into data. For synchronous users, this defaults to True. For asynchronous users, this defaults to False for backwards compatibility.

None

Raises:

Type Description
TypeError

If the state is failed but the result is not an exception.

Returns:

Type Description

The result of the run

Examples:

>>> from prefect import flow, task
>>> @task
>>> def my_task(x):
>>>     return x

Get the result from a task future in a flow

>>> @flow
>>> def my_flow():
>>>     future = my_task("hello")
>>>     state = future.wait()
>>>     result = state.result()
>>>     print(result)
>>> my_flow()
hello

Get the result from a flow state

>>> @flow
>>> def my_flow():
>>>     return "hello"
>>> my_flow(return_state=True).result()
hello

Get the result from a failed state

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
>>> state.result()  # Raises `ValueError`

Get the result from a failed state without erroring

>>> @flow
>>> def my_flow():
>>>     raise ValueError("oh no!")
>>> state = my_flow(return_state=True)
>>> result = state.result(raise_on_failure=False)
>>> print(result)
ValueError("oh no!")

Get the result from a flow state in an async context

>>> @flow
>>> async def my_flow():
>>>     return "hello"
>>> state = await my_flow(return_state=True)
>>> await state.result()
hello
Source code in prefect/client/schemas.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def result(self, raise_on_failure: bool = True, fetch: Optional[bool] = None):
    """
    Retrieve the result attached to this state.

    Args:
        raise_on_failure: a boolean specifying whether to raise an exception
            if the state is of type `FAILED` and the underlying data is an exception
        fetch: a boolean specifying whether to resolve references to persisted
            results into data. For synchronous users, this defaults to `True`.
            For asynchronous users, this defaults to `False` for backwards
            compatibility.

    Raises:
        TypeError: If the state is failed but the result is not an exception.

    Returns:
        The result of the run

    Examples:
        >>> from prefect import flow, task
        >>> @task
        >>> def my_task(x):
        >>>     return x

        Get the result from a task future in a flow

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task("hello")
        >>>     state = future.wait()
        >>>     result = state.result()
        >>>     print(result)
        >>> my_flow()
        hello

        Get the result from a flow state

        >>> @flow
        >>> def my_flow():
        >>>     return "hello"
        >>> my_flow(return_state=True).result()
        hello

        Get the result from a failed state

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow(return_state=True)  # Error is wrapped in FAILED state
        >>> state.result()  # Raises `ValueError`

        Get the result from a failed state without erroring

        >>> @flow
        >>> def my_flow():
        >>>     raise ValueError("oh no!")
        >>> state = my_flow(return_state=True)
        >>> result = state.result(raise_on_failure=False)
        >>> print(result)
        ValueError("oh no!")


        Get the result from a flow state in an async context

        >>> @flow
        >>> async def my_flow():
        >>>     return "hello"
        >>> state = await my_flow(return_state=True)
        >>> await state.result()
        hello
    """
    from prefect.states import get_state_result

    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

to_state_create

Convert this state to a StateCreate type which can be used to set the state of a run in the API.

This method will drop this state's data if it is not a result type. Only results should be sent to the API. Other data is only available locally.

Source code in prefect/client/schemas.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def to_state_create(self) -> schemas.actions.StateCreate:
    """
    Convert this state to a `StateCreate` type which can be used to set the state of
    a run in the API.

    This method will drop this state's `data` if it is not a result type. Only
    results should be sent to the API. Other data is only available locally.
    """
    from prefect.results import BaseResult

    return schemas.actions.StateCreate(
        type=self.type,
        name=self.name,
        message=self.message,
        data=self.data if isinstance(self.data, BaseResult) else None,
        state_details=self.state_details,
    )

Workspace

Bases: PrefectBaseModel

A Prefect Cloud workspace.

Expected payload for each workspace returned by the me/workspaces route.

Source code in prefect/client/schemas.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class Workspace(PrefectBaseModel):
    """
    A Prefect Cloud workspace.

    Expected payload for each workspace returned by the `me/workspaces` route.
    """

    account_id: UUID = Field(..., description="The account id of the workspace.")
    account_name: str = Field(..., description="The account name.")
    account_handle: str = Field(..., description="The account's unique handle.")
    workspace_id: UUID = Field(..., description="The workspace id.")
    workspace_name: str = Field(..., description="The workspace name.")
    workspace_description: str = Field(..., description="Description of the workspace.")
    workspace_handle: str = Field(..., description="The workspace's unique handle.")

    class Config:
        extra = "ignore"

    @property
    def handle(self) -> str:
        """
        The full handle of the workspace as `account_handle` / `workspace_handle`
        """
        return self.account_handle + "/" + self.workspace_handle

    def api_url(self) -> str:
        """
        Generate the API URL for accessing this workspace
        """
        return (
            f"{PREFECT_CLOUD_API_URL.value()}"
            f"/accounts/{self.account_id}"
            f"/workspaces/{self.workspace_id}"
        )

    def __hash__(self):
        return hash(self.handle)

handle: str property

The full handle of the workspace as account_handle / workspace_handle

api_url

Generate the API URL for accessing this workspace

Source code in prefect/client/schemas.py
165
166
167
168
169
170
171
172
173
def api_url(self) -> str:
    """
    Generate the API URL for accessing this workspace
    """
    return (
        f"{PREFECT_CLOUD_API_URL.value()}"
        f"/accounts/{self.account_id}"
        f"/workspaces/{self.workspace_id}"
    )