Skip to content

prefect.server.utilities.schemas

Utilities for creating and working with Prefect REST API schemas.

IDBaseModel

Bases: PrefectBaseModel

A PrefectBaseModel with an auto-generated UUID ID value.

The ID is reset on copy() and not included in equality comparisons.

Source code in prefect/server/utilities/schemas.py
356
357
358
359
360
361
362
363
364
365
366
class IDBaseModel(PrefectBaseModel):
    """
    A PrefectBaseModel with an auto-generated UUID ID value.

    The ID is reset on copy() and not included in equality comparisons.
    """

    id: UUID = Field(default_factory=uuid4)

    def _reset_fields(self) -> Set[str]:
        return super()._reset_fields().union({"id"})

ORMBaseModel

Bases: IDBaseModel

A PrefectBaseModel with an auto-generated UUID ID value and created / updated timestamps, intended for compatibility with our standard ORM models.

The ID, created, and updated fields are reset on copy() and not included in equality comparisons.

Source code in prefect/server/utilities/schemas.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
class ORMBaseModel(IDBaseModel):
    """
    A PrefectBaseModel with an auto-generated UUID ID value and created /
    updated timestamps, intended for compatibility with our standard ORM models.

    The ID, created, and updated fields are reset on copy() and not included in
    equality comparisons.
    """

    class Config:
        orm_mode = True

    created: Optional[DateTimeTZ] = Field(default=None, repr=False)
    updated: Optional[DateTimeTZ] = Field(default=None, repr=False)

    def _reset_fields(self) -> Set[str]:
        return super()._reset_fields().union({"created", "updated"})

PrefectBaseModel

Bases: BaseModel

A base pydantic.BaseModel for all Prefect schemas and pydantic models.

As the basis for most Prefect schemas, this base model usually ignores extra fields that are passed to it at instantiation. Because adding new fields to API payloads is not considered a breaking change, this ensures that any Prefect client loading data from a server running a possibly-newer version of Prefect will be able to process those new fields gracefully. However, when PREFECT_TEST_MODE is on, extra fields are forbidden in order to catch subtle unintentional testing errors.

Source code in prefect/server/utilities/schemas.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
class PrefectBaseModel(BaseModel):
    """A base pydantic.BaseModel for all Prefect schemas and pydantic models.

    As the basis for most Prefect schemas, this base model usually ignores extra
    fields that are passed to it at instantiation. Because adding new fields to
    API payloads is not considered a breaking change, this ensures that any
    Prefect client loading data from a server running a possibly-newer version
    of Prefect will be able to process those new fields gracefully. However,
    when PREFECT_TEST_MODE is on, extra fields are forbidden in order to catch
    subtle unintentional testing errors.
    """

    class Config:
        # extra attributes are forbidden in order to raise meaningful errors for
        # bad API payloads
        # We cannot load this setting through the normal pattern due to circular
        # imports; instead just check if its a truthy setting directly
        if os.getenv("PREFECT_TEST_MODE", "0").lower() in ["1", "true"]:
            extra = "forbid"
        else:
            extra = "ignore"

        json_encoders = {
            # Uses secret fields and strange logic to avoid a circular import error
            # for Secret dict in prefect.blocks.fields
            SecretField: lambda v: v.dict() if getattr(v, "dict", None) else str(v)
        }

        pydantic_version = getattr(pydantic, "__version__", None)
        if pydantic_version is not None and Version(pydantic_version) >= Version(
            "1.9.2"
        ):
            copy_on_model_validation = "none"
        else:
            copy_on_model_validation = False

        # Use orjson for serialization
        json_loads = orjson.loads
        json_dumps = orjson_dumps

    @classmethod
    def subclass(
        cls: Type[B],
        name: str = None,
        include_fields: List[str] = None,
        exclude_fields: List[str] = None,
    ) -> Type[B]:
        """Creates a subclass of this model containing only the specified fields.

        See `pydantic_subclass()`.

        Args:
            name (str, optional): a name for the subclass
            include_fields (List[str], optional): fields to include
            exclude_fields (List[str], optional): fields to exclude

        Returns:
            BaseModel: a subclass of this class
        """
        return pydantic_subclass(
            base=cls,
            name=name,
            include_fields=include_fields,
            exclude_fields=exclude_fields,
        )

    def _reset_fields(self) -> Set[str]:
        """A set of field names that are reset when the PrefectBaseModel is copied.
        These fields are also disregarded for equality comparisons.
        """
        return set()

    def __eq__(self, other: Any) -> bool:
        """Equaltiy operator that ignores the resettable fields of the PrefectBaseModel.

        NOTE: this equality operator will only be applied if the PrefectBaseModel is
        the left-hand operand. This is a limitation of Python.
        """
        copy_dict = self.dict(exclude=self._reset_fields())
        if isinstance(other, PrefectBaseModel):
            return copy_dict == other.dict(exclude=other._reset_fields())
        if isinstance(other, BaseModel):
            return copy_dict == other.dict()
        else:
            return copy_dict == other

    def json(self, *args, include_secrets: bool = False, **kwargs) -> str:
        """
        Returns a representation of the model as JSON.

        If `include_secrets=True`, then `SecretStr` and `SecretBytes` objects are
        fully revealed. Otherwise they are obfuscated.

        """
        if include_secrets:
            if "encoder" in kwargs:
                raise ValueError(
                    "Alternative encoder provided; can not set encoder for"
                    " SecretFields."
                )
            kwargs["encoder"] = partial(
                custom_pydantic_encoder,
                {SecretField: lambda v: v.get_secret_value() if v else None},
            )
        return super().json(*args, **kwargs)

    def dict(
        self, *args, shallow: bool = False, json_compatible: bool = False, **kwargs
    ) -> dict:
        """Returns a representation of the model as a Python dictionary.

        For more information on this distinction please see
        https://pydantic-docs.helpmanual.io/usage/exporting_models/#dictmodel-and-iteration


        Args:
            shallow (bool, optional): If True (default), nested Pydantic fields
                are also coerced to dicts. If false, they are left as Pydantic
                models.
            json_compatible (bool, optional): if True, objects are converted
                into json-compatible representations, similar to calling
                `json.loads(self.json())`. Not compatible with shallow=True.

        Returns:
            dict
        """

        experimental_fields = [
            field
            for _, field in self.__fields__.items()
            if field.field_info.extra.get("experimental")
        ]
        experimental_fields_to_exclude = [
            field.name
            for field in experimental_fields
            if not experiment_enabled(field.field_info.extra["experimental-group"])
        ]

        if experimental_fields_to_exclude:
            kwargs["exclude"] = (kwargs.get("exclude") or set()).union(
                experimental_fields_to_exclude
            )

        if json_compatible and shallow:
            raise ValueError(
                "`json_compatible` can only be applied to the entire object."
            )

        # return a json-compatible representation of the object
        elif json_compatible:
            return json.loads(self.json(*args, **kwargs))

        # if shallow wasn't requested, return the standard pydantic behavior
        elif not shallow:
            return super().dict(*args, **kwargs)

        # if no options were requested, return simple dict transformation
        # to apply shallow conversion
        elif not args and not kwargs:
            return dict(self)

        # if options like include/exclude were provided, perform
        # a full dict conversion then overwrite with any shallow
        # differences
        else:
            deep_dict = super().dict(*args, **kwargs)
            shallow_dict = dict(self)
            for k, v in list(deep_dict.items()):
                if isinstance(v, dict) and isinstance(shallow_dict[k], BaseModel):
                    deep_dict[k] = shallow_dict[k]
            return deep_dict

    def copy(
        self: T, *, update: Dict = None, reset_fields: bool = False, **kwargs: Any
    ) -> T:
        """
        Duplicate a model.

        Args:
            update: values to change/add to the model copy
            reset_fields: if True, reset the fields specified in `self._reset_fields`
                to their default value on the new model
            kwargs: kwargs to pass to `pydantic.BaseModel.copy`

        Returns:
            A new copy of the model
        """
        if reset_fields:
            update = update or dict()
            for field in self._reset_fields():
                update.setdefault(field, self.__fields__[field].get_default())
        return super().copy(update=update, **kwargs)

    def __rich_repr__(self):
        # Display all of the fields in the model if they differ from the default value
        for name, field in self.__fields__.items():
            value = getattr(self, name)

            # Simplify the display of some common fields
            if field.type_ == UUID and value:
                value = str(value)
            elif (
                isinstance(field.type_, datetime.datetime)
                and name == "timestamp"
                and value
            ):
                value = pendulum.instance(value).isoformat()
            elif isinstance(field.type_, datetime.datetime) and value:
                value = pendulum.instance(value).diff_for_humans()

            yield name, value, field.get_default()

copy

Duplicate a model.

Parameters:

Name Type Description Default
update Dict

values to change/add to the model copy

None
reset_fields bool

if True, reset the fields specified in self._reset_fields to their default value on the new model

False
kwargs Any

kwargs to pass to pydantic.BaseModel.copy

{}

Returns:

Type Description
T

A new copy of the model

Source code in prefect/server/utilities/schemas.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def copy(
    self: T, *, update: Dict = None, reset_fields: bool = False, **kwargs: Any
) -> T:
    """
    Duplicate a model.

    Args:
        update: values to change/add to the model copy
        reset_fields: if True, reset the fields specified in `self._reset_fields`
            to their default value on the new model
        kwargs: kwargs to pass to `pydantic.BaseModel.copy`

    Returns:
        A new copy of the model
    """
    if reset_fields:
        update = update or dict()
        for field in self._reset_fields():
            update.setdefault(field, self.__fields__[field].get_default())
    return super().copy(update=update, **kwargs)

dict

Returns a representation of the model as a Python dictionary.

For more information on this distinction please see https://pydantic-docs.helpmanual.io/usage/exporting_models/#dictmodel-and-iteration

Parameters:

Name Type Description Default
shallow bool

If True (default), nested Pydantic fields are also coerced to dicts. If false, they are left as Pydantic models.

False
json_compatible bool

if True, objects are converted into json-compatible representations, similar to calling json.loads(self.json()). Not compatible with shallow=True.

False

Returns:

Type Description
dict

dict

Source code in prefect/server/utilities/schemas.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def dict(
    self, *args, shallow: bool = False, json_compatible: bool = False, **kwargs
) -> dict:
    """Returns a representation of the model as a Python dictionary.

    For more information on this distinction please see
    https://pydantic-docs.helpmanual.io/usage/exporting_models/#dictmodel-and-iteration


    Args:
        shallow (bool, optional): If True (default), nested Pydantic fields
            are also coerced to dicts. If false, they are left as Pydantic
            models.
        json_compatible (bool, optional): if True, objects are converted
            into json-compatible representations, similar to calling
            `json.loads(self.json())`. Not compatible with shallow=True.

    Returns:
        dict
    """

    experimental_fields = [
        field
        for _, field in self.__fields__.items()
        if field.field_info.extra.get("experimental")
    ]
    experimental_fields_to_exclude = [
        field.name
        for field in experimental_fields
        if not experiment_enabled(field.field_info.extra["experimental-group"])
    ]

    if experimental_fields_to_exclude:
        kwargs["exclude"] = (kwargs.get("exclude") or set()).union(
            experimental_fields_to_exclude
        )

    if json_compatible and shallow:
        raise ValueError(
            "`json_compatible` can only be applied to the entire object."
        )

    # return a json-compatible representation of the object
    elif json_compatible:
        return json.loads(self.json(*args, **kwargs))

    # if shallow wasn't requested, return the standard pydantic behavior
    elif not shallow:
        return super().dict(*args, **kwargs)

    # if no options were requested, return simple dict transformation
    # to apply shallow conversion
    elif not args and not kwargs:
        return dict(self)

    # if options like include/exclude were provided, perform
    # a full dict conversion then overwrite with any shallow
    # differences
    else:
        deep_dict = super().dict(*args, **kwargs)
        shallow_dict = dict(self)
        for k, v in list(deep_dict.items()):
            if isinstance(v, dict) and isinstance(shallow_dict[k], BaseModel):
                deep_dict[k] = shallow_dict[k]
        return deep_dict

json

Returns a representation of the model as JSON.

If include_secrets=True, then SecretStr and SecretBytes objects are fully revealed. Otherwise they are obfuscated.

Source code in prefect/server/utilities/schemas.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def json(self, *args, include_secrets: bool = False, **kwargs) -> str:
    """
    Returns a representation of the model as JSON.

    If `include_secrets=True`, then `SecretStr` and `SecretBytes` objects are
    fully revealed. Otherwise they are obfuscated.

    """
    if include_secrets:
        if "encoder" in kwargs:
            raise ValueError(
                "Alternative encoder provided; can not set encoder for"
                " SecretFields."
            )
        kwargs["encoder"] = partial(
            custom_pydantic_encoder,
            {SecretField: lambda v: v.get_secret_value() if v else None},
        )
    return super().json(*args, **kwargs)

subclass classmethod

Creates a subclass of this model containing only the specified fields.

See pydantic_subclass().

Parameters:

Name Type Description Default
name str

a name for the subclass

None
include_fields List[str]

fields to include

None
exclude_fields List[str]

fields to exclude

None

Returns:

Name Type Description
BaseModel Type[B]

a subclass of this class

Source code in prefect/server/utilities/schemas.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@classmethod
def subclass(
    cls: Type[B],
    name: str = None,
    include_fields: List[str] = None,
    exclude_fields: List[str] = None,
) -> Type[B]:
    """Creates a subclass of this model containing only the specified fields.

    See `pydantic_subclass()`.

    Args:
        name (str, optional): a name for the subclass
        include_fields (List[str], optional): fields to include
        exclude_fields (List[str], optional): fields to exclude

    Returns:
        BaseModel: a subclass of this class
    """
    return pydantic_subclass(
        base=cls,
        name=name,
        include_fields=include_fields,
        exclude_fields=exclude_fields,
    )

FieldFrom

Indicates that the given field is to be copied from another class by copy_model_fields.

Source code in prefect/server/utilities/schemas.py
395
396
397
398
399
400
def FieldFrom(origin: Type[BaseModel]) -> Any:
    """
    Indicates that the given field is to be copied from another class by
    `copy_model_fields`.
    """
    return _FieldFrom(origin=origin)

copy_model_fields

A class decorator which copies field definitions and field validators from other Pydantic BaseModel classes. This does not make the model a subclass of any of the copied field's owning classes, nor does this copy root validators from any of those classes. Note that you should still include the type hint for the field in order to make typing explicit.

Use this decorator and the corresponding FieldFrom to compose response and action schemas from other classes.

Example

from pydantic import BaseModel from prefect.server.utilities.schemas import copy_model_fields, FieldFrom

class Parent(BaseModel): ... name: str ... sensitive: str

@copy_model_fields class Derived(BaseModel): ... name: str = FieldFrom(Parent) ... my_own: str

In this example, Derived will have the fields name, and my_own, with the name field being a complete copy of the Parent.name field.

Source code in prefect/server/utilities/schemas.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def copy_model_fields(model_class: Type[B]) -> Type[B]:
    """
    A class decorator which copies field definitions and field validators from other
    Pydantic BaseModel classes.  This does _not_ make the model a subclass of any of
    the copied field's owning classes, nor does this copy root validators from any of
    those classes.  Note that you should still include the type hint for the field in
    order to make typing explicit.

    Use this decorator and the corresponding `FieldFrom` to compose response and
    action schemas from other classes.

    Example:

        >>> from pydantic import BaseModel
        >>> from prefect.server.utilities.schemas import copy_model_fields, FieldFrom
        >>>
        >>> class Parent(BaseModel):
        ...     name: str
        ...     sensitive: str
        >>>
        >>> @copy_model_fields
        >>> class Derived(BaseModel):
        ...     name: str = FieldFrom(Parent)
        ...     my_own: str

        In this example, `Derived` will have the fields `name`, and `my_own`, with the
        `name` field being a complete copy of the `Parent.name` field.

    """
    for name, field in model_class.__fields__.items():
        if not isinstance(field.default, _FieldFrom):
            continue

        origin = field.default.origin

        origin_field = origin.__fields__[name]

        # For safety, types defined on the model must match those of the origin
        # We make an exception here for `Optional` where the model can make the same
        # type definition nullable.
        if (
            field.type_ != origin_field.type_
            and not field.type_ == Optional[origin_field.type_]
        ):
            if not issubclass(
                origin_field.type_,
                field.type_,
            ):
                raise TypeError(
                    f"Field {name} ({field.type_}) does not match the type of the"
                    f" origin field {origin_field.type_}"
                )

        # Create a copy of the origin field
        new_field = copy.deepcopy(origin_field)

        # Retain any validators from the model field
        new_field.post_validators = new_field.post_validators or []
        new_field.pre_validators = new_field.pre_validators or []
        new_field.post_validators.extend(field.post_validators or [])
        new_field.pre_validators.extend(field.pre_validators or [])

        # Retain "optional" from the model field
        new_field.required = field.required
        new_field.allow_none = field.allow_none

        model_class.__fields__[name] = new_field

        if name in origin.__validators__:
            # The type: ignores here are because pydantic has a mistyping for these
            # __validators__ fields (TODO: file an upstream PR)
            validators: list = list(origin.__validators__[name])  # type: ignore
            if name in model_class.__validators__:
                validators.extend(model_class.__validators__[name])  # type: ignore
            model_class.__validators__[name] = validators  # type: ignore

    return model_class

orjson_dumps

Utility for dumping a value to JSON using orjson.

orjson.dumps returns bytes, to match standard json.dumps we need to decode.

Source code in prefect/server/utilities/schemas.py
120
121
122
123
124
125
126
def orjson_dumps(v: Any, *, default: Any) -> str:
    """
    Utility for dumping a value to JSON using orjson.

    orjson.dumps returns bytes, to match standard json.dumps we need to decode.
    """
    return orjson.dumps(v, default=default).decode()

orjson_dumps_extra_compatible

Utility for dumping a value to JSON using orjson, but allows for 1) non-string keys: this is helpful for situations like pandas dataframes, which can result in non-string keys 2) numpy types: for serializing numpy arrays

orjson.dumps returns bytes, to match standard json.dumps we need to decode.

Source code in prefect/server/utilities/schemas.py
129
130
131
132
133
134
135
136
137
138
139
140
def orjson_dumps_extra_compatible(v: Any, *, default: Any) -> str:
    """
    Utility for dumping a value to JSON using orjson, but allows for
    1) non-string keys: this is helpful for situations like pandas dataframes,
    which can result in non-string keys
    2) numpy types: for serializing numpy arrays

    orjson.dumps returns bytes, to match standard json.dumps we need to decode.
    """
    return orjson.dumps(
        v, default=default, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
    ).decode()

pydantic_subclass

Creates a subclass of a Pydantic model that excludes certain fields. Pydantic models use the fields attribute of their parent class to determine inherited fields, so to create a subclass without fields, we temporarily remove those fields from the parent fields and use create_model to dynamically generate a new subclass.

Parameters:

Name Type Description Default
base pydantic.BaseModel

a Pydantic BaseModel

required
name str

a name for the subclass. If not provided it will have the same name as the base class.

None
include_fields List[str]

a set of field names to include. If None, all fields are included.

None
exclude_fields List[str]

a list of field names to exclude. If None, no fields are excluded.

None

Returns:

Type Description
Type[B]

pydantic.BaseModel: a new model subclass that contains only the specified fields.

Example

To subclass a model with a subset of fields:

class Parent(pydantic.BaseModel):
    x: int = 1
    y: int = 2

Child = pydantic_subclass(Parent, 'Child', exclude_fields=['y'])
assert hasattr(Child(), 'x')
assert not hasattr(Child(), 'y')

To subclass a model with a subset of fields but include a new field:

class Child(pydantic_subclass(Parent, exclude_fields=['y'])):
    z: int = 3

assert hasattr(Child(), 'x')
assert not hasattr(Child(), 'y')
assert hasattr(Child(), 'z')

Source code in prefect/server/utilities/schemas.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def pydantic_subclass(
    base: Type[B],
    name: str = None,
    include_fields: List[str] = None,
    exclude_fields: List[str] = None,
) -> Type[B]:
    """Creates a subclass of a Pydantic model that excludes certain fields.
    Pydantic models use the __fields__ attribute of their parent class to
    determine inherited fields, so to create a subclass without fields, we
    temporarily remove those fields from the parent __fields__ and use
    `create_model` to dynamically generate a new subclass.

    Args:
        base (pydantic.BaseModel): a Pydantic BaseModel
        name (str): a name for the subclass. If not provided
            it will have the same name as the base class.
        include_fields (List[str]): a set of field names to include.
            If `None`, all fields are included.
        exclude_fields (List[str]): a list of field names to exclude.
            If `None`, no fields are excluded.

    Returns:
        pydantic.BaseModel: a new model subclass that contains only the specified fields.

    Example:
        To subclass a model with a subset of fields:
        ```python
        class Parent(pydantic.BaseModel):
            x: int = 1
            y: int = 2

        Child = pydantic_subclass(Parent, 'Child', exclude_fields=['y'])
        assert hasattr(Child(), 'x')
        assert not hasattr(Child(), 'y')
        ```

        To subclass a model with a subset of fields but include a new field:
        ```python
        class Child(pydantic_subclass(Parent, exclude_fields=['y'])):
            z: int = 3

        assert hasattr(Child(), 'x')
        assert not hasattr(Child(), 'y')
        assert hasattr(Child(), 'z')
        ```
    """

    # collect field names
    field_names = set(include_fields or base.__fields__)
    excluded_fields = set(exclude_fields or [])
    if field_names.difference(base.__fields__):
        raise ValueError(
            "Included fields not found on base class: "
            f"{field_names.difference(base.__fields__)}"
        )
    elif excluded_fields.difference(base.__fields__):
        raise ValueError(
            "Excluded fields not found on base class: "
            f"{excluded_fields.difference(base.__fields__)}"
        )
    field_names.difference_update(excluded_fields)

    # create a new class that inherits from `base` but only contains the specified
    # pydantic __fields__
    new_cls = type(
        name or base.__name__,
        (base,),
        {
            "__fields__": {
                k: copy.copy(v) for k, v in base.__fields__.items() if k in field_names
            }
        },
    )

    return new_cls