Skip to content

prefect.server.models.flows

Functions for interacting with flow ORM objects. Intended for internal use by the Prefect REST API.

count_flows async

Count flows.

Parameters:

Name Type Description Default
session sa.orm.Session

A database session

required
flow_filter schemas.filters.FlowFilter

only count flows that match these filters

None
flow_run_filter schemas.filters.FlowRunFilter

only count flows whose flow runs match these filters

None
task_run_filter schemas.filters.TaskRunFilter

only count flows whose task runs match these filters

None
deployment_filter schemas.filters.DeploymentFilter

only count flows whose deployments match these filters

None

Returns:

Name Type Description
int int

count of flows

Source code in prefect/server/models/flows.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@inject_db
async def count_flows(
    session: sa.orm.Session,
    db: PrefectDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
) -> int:
    """
    Count flows.

    Args:
        session: A database session
        flow_filter: only count flows that match these filters
        flow_run_filter: only count flows whose flow runs match these filters
        task_run_filter: only count flows whose task runs match these filters
        deployment_filter: only count flows whose deployments match these filters

    Returns:
        int: count of flows
    """

    query = select(sa.func.count(sa.text("*"))).select_from(db.Flow)

    query = await _apply_flow_filters(
        query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    result = await session.execute(query)
    return result.scalar()

create_flow async

Creates a new flow.

If a flow with the same name already exists, the existing flow is returned.

Parameters:

Name Type Description Default
session sa.orm.Session

a database session

required
flow schemas.core.Flow

a flow model

required

Returns:

Type Description

db.Flow: the newly-created or existing flow

Source code in prefect/server/models/flows.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@inject_db
async def create_flow(
    session: sa.orm.Session, flow: schemas.core.Flow, db: PrefectDBInterface
):
    """
    Creates a new flow.

    If a flow with the same name already exists, the existing flow is returned.

    Args:
        session: a database session
        flow: a flow model

    Returns:
        db.Flow: the newly-created or existing flow
    """

    insert_stmt = (
        (await db.insert(db.Flow))
        .values(**flow.dict(shallow=True, exclude_unset=True))
        .on_conflict_do_nothing(
            index_elements=db.flow_unique_upsert_columns,
        )
    )
    await session.execute(insert_stmt)

    query = (
        sa.select(db.Flow)
        .where(
            db.Flow.name == flow.name,
        )
        .limit(1)
        .execution_options(populate_existing=True)
    )
    result = await session.execute(query)
    model = result.scalar()
    return model

delete_flow async

Delete a flow by id.

Parameters:

Name Type Description Default
session sa.orm.Session

A database session

required
flow_id UUID

a flow id

required

Returns:

Name Type Description
bool bool

whether or not the flow was deleted

Source code in prefect/server/models/flows.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@inject_db
async def delete_flow(
    session: sa.orm.Session, flow_id: UUID, db: PrefectDBInterface
) -> bool:
    """
    Delete a flow by id.

    Args:
        session: A database session
        flow_id: a flow id

    Returns:
        bool: whether or not the flow was deleted
    """

    result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id))
    return result.rowcount > 0

read_flow async

Reads a flow by id.

Parameters:

Name Type Description Default
session sa.orm.Session

A database session

required
flow_id UUID

a flow id

required

Returns:

Type Description

db.Flow: the flow

Source code in prefect/server/models/flows.py
83
84
85
86
87
88
89
90
91
92
93
94
95
@inject_db
async def read_flow(session: sa.orm.Session, flow_id: UUID, db: PrefectDBInterface):
    """
    Reads a flow by id.

    Args:
        session: A database session
        flow_id: a flow id

    Returns:
        db.Flow: the flow
    """
    return await session.get(db.Flow, flow_id)

read_flow_by_name async

Reads a flow by name.

Parameters:

Name Type Description Default
session sa.orm.Session

A database session

required
name str

a flow name

required

Returns:

Type Description

db.Flow: the flow

Source code in prefect/server/models/flows.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@inject_db
async def read_flow_by_name(session: sa.orm.Session, name: str, db: PrefectDBInterface):
    """
    Reads a flow by name.

    Args:
        session: A database session
        name: a flow name

    Returns:
        db.Flow: the flow
    """

    result = await session.execute(select(db.Flow).filter_by(name=name))
    return result.scalar()

read_flows async

Read multiple flows.

Parameters:

Name Type Description Default
session sa.orm.Session

A database session

required
flow_filter schemas.filters.FlowFilter

only select flows that match these filters

None
flow_run_filter schemas.filters.FlowRunFilter

only select flows whose flow runs match these filters

None
task_run_filter schemas.filters.TaskRunFilter

only select flows whose task runs match these filters

None
deployment_filter schemas.filters.DeploymentFilter

only select flows whose deployments match these filters

None
offset int

Query offset

None
limit int

Query limit

None

Returns:

Type Description

List[db.Flow]: flows

Source code in prefect/server/models/flows.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@inject_db
async def read_flows(
    session: sa.orm.Session,
    db: PrefectDBInterface,
    flow_filter: schemas.filters.FlowFilter = None,
    flow_run_filter: schemas.filters.FlowRunFilter = None,
    task_run_filter: schemas.filters.TaskRunFilter = None,
    deployment_filter: schemas.filters.DeploymentFilter = None,
    sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC,
    offset: int = None,
    limit: int = None,
):
    """
    Read multiple flows.

    Args:
        session: A database session
        flow_filter: only select flows that match these filters
        flow_run_filter: only select flows whose flow runs match these filters
        task_run_filter: only select flows whose task runs match these filters
        deployment_filter: only select flows whose deployments match these filters
        offset: Query offset
        limit: Query limit

    Returns:
        List[db.Flow]: flows
    """

    query = select(db.Flow).order_by(sort.as_sql_sort(db=db))

    query = await _apply_flow_filters(
        query,
        flow_filter=flow_filter,
        flow_run_filter=flow_run_filter,
        task_run_filter=task_run_filter,
        deployment_filter=deployment_filter,
        db=db,
    )

    if offset is not None:
        query = query.offset(offset)

    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

update_flow async

Updates a flow.

Parameters:

Name Type Description Default
session sa.orm.Session

a database session

required
flow_id UUID

the flow id to update

required
flow schemas.actions.FlowUpdate

a flow update model

required

Returns:

Name Type Description
bool

whether or not matching rows were found to update

Source code in prefect/server/models/flows.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@inject_db
async def update_flow(
    session: sa.orm.Session,
    flow_id: UUID,
    flow: schemas.actions.FlowUpdate,
    db: PrefectDBInterface,
):
    """
    Updates a flow.

    Args:
        session: a database session
        flow_id: the flow id to update
        flow: a flow update model

    Returns:
        bool: whether or not matching rows were found to update
    """
    update_stmt = (
        sa.update(db.Flow).where(db.Flow.id == flow_id)
        # exclude_unset=True allows us to only update values provided by
        # the user, ignoring any defaults on the model
        .values(**flow.dict(shallow=True, exclude_unset=True))
    )
    result = await session.execute(update_stmt)
    return result.rowcount > 0