Skip to content

Infrastructure

Users may specify an infrastructure block when creating a deployment. This block will be used to specify infrastructure for flow runs created by the deployment at runtime.

Infrastructure can only be used with a deployment. When you run a flow directly by calling the flow yourself, you are responsible for the environment in which the flow executes.

Infrastructure overview

Prefect uses infrastructure to create the environment for a user's flow to execute.

Infrastructure is attached to a deployment and is propagated to flow runs created for that deployment. Infrastructure is deserialized by the agent and it has two jobs:

  • Create execution environment infrastructure for the flow run.
  • Run a Python command to start the prefect.engine in the infrastructure, which retrieves the flow from storage and executes the flow.

The engine acquires and calls the flow. Infrastructure doesn't know anything about how the flow is stored, it's just passing a flow run ID to the engine.

Infrastructure is specific to the environments in which flows will run. Prefect currently provides the following infrastructure types:

What about tasks?

Flows and tasks can both use configuration objects to manage the environment in which code runs.

Flows use infrastructure.

Tasks use task runners. For more on how task runners work, see Task Runners.

Using infrastructure

You may create customized infrastructure blocks through the Prefect UI or Prefect Cloud Blocks page or create them in code and save them to the API using the blocks .save() method.

Once created, there are two distinct ways to use infrastructure in a deployment:

  • Starting with Prefect defaults — this is what happens when you pass the -i or --infra flag and provide a type when building deployment files.
  • Pre-configure infrastructure settings as blocks and base your deployment infrastructure on those settings — by passing -ib or --infra-block and a block slug when building deployment files.

For example, when creating your deployment files, the supported Prefect infrastrucure types are:

  • process
  • docker-container
  • kubernetes-job
  • ecs-task
  • cloud-run-job
  • container-instance-job
$ prefect deployment build ./my_flow.py:my_flow -n my-flow-deployment -t test -i docker-container -sb s3/my-bucket --override env.EXTRA_PIP_PACKAGES=s3fs
Found flow 'my-flow'
Successfully uploaded 2 files to s3://bucket-full-of-sunshine
Deployment YAML created at '/Users/terry/test/flows/infra/deployment.yaml'.

In this example we specify the DockerContainer infrastructure in addition to a preconfigured AWS S3 bucket storage block.

The default deployment YAML filename may be edited as needed to add an infrastructure type or infrastructure settings.

###
### A complete description of a Prefect Deployment for flow 'my-flow'
###
name: my-flow-deployment
description: null
version: e29de5d01b06d61b4e321d40f34a480c
# The work queue that will handle this deployment's runs
work_queue_name: default
work_pool_name: default-agent-pool
tags:
- test
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides:
  env.EXTRA_PIP_PACKAGES: s3fs
infrastructure:
  type: docker-container
  env: {}
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  image: prefecthq/prefect:dev-python3.9
  image_pull_policy: null
  networks: []
  network_mode: null
  auto_remove: false
  volumes: []
  stream_output: true
  memswap_limit: null
  mem_limit: null
  privileged: false
  block_type_slug: docker-container
  _block_type_slug: docker-container

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: my-flow
manifest_path: my_flow-manifest.json
storage:
  bucket_path: bucket-full-of-sunshine
  aws_access_key_id: '**********'
  aws_secret_access_key: '**********'
  _is_anonymous: true
  _block_document_name: anonymous-xxxxxxxx-f1ff-4265-b55c-6353a6d65333
  _block_document_id: xxxxxxxx-06c2-4c3c-a505-4a8db0147011
  block_type_slug: s3
  _block_type_slug: s3
path: ''
entrypoint: my_flow.py:my-flow
parameter_openapi_schema:
  title: Parameters
  type: object
  properties: {}
  required: null
  definitions: null
timestamp: '2023-02-08T23:00:14.974642+00:00'

Editing deployment YAML

Note the big DO NOT EDIT comment in the deployment YAML: In practice, anything above this block can be freely edited before running prefect deployment apply to create the deployment on the API.

Once the deployment exists, any flow runs that this deployment starts will use DockerContainer infrastructure.

You can also create custom infrastructure blocks — either in the Prefect UI for in code via the API — and use the settings in the block to configure your infastructure. For example, here we specify settings for Kubernetes infrastructure in a block named k8sdev.

from prefect.infrastructure import KubernetesJob, KubernetesImagePullPolicy

k8s_job = KubernetesJob(
    namespace="dev",
    image="prefecthq/prefect:2.0.0-python3.9",
    image_pull_policy=KubernetesImagePullPolicy.IF_NOT_PRESENT,
)
k8s_job.save("k8sdev")

Now we can apply the infrastrucure type and settings in the block by specifying the block slug kubernetes-job/k8sdev as the infrastructure type when building a deployment:

prefect deployment build flows/k8s_example.py:k8s_flow --name k8sdev --tag k8s -sb s3/dev -ib kubernetes-job/k8sdev

See Deployments for more information about deployment build options.

Configuring infrastructure

Every infrastrcture type has type-specific options.

Process

Process infrastructure runs a command in a new process.

Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables.

Process supports the following settings:

Attributes Description
command A list of strings specifying the command to start the flow run. In most cases you should not override this.
env Environment variables to set for the new process.
labels Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself.
name A name for the process. For display purposes only.

DockerContainer

DockerContainer infrastructure executes flow runs in a container.

Requirements for DockerContainer:

  • Docker Engine must be available.
  • You must configure remote Storage. Local storage is not supported for Docker.
  • The API must be available from within the flow run container. To facilitate connections to locally hosted APIs, localhost and 127.0.0.1 will be replaced with host.docker.internal.
  • The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.

DockerContainer supports the following settings:

Attributes Description
auto_remove Bool indicating whether the container will be removed on completion. If False, the container will remain after exit for inspection.
command A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
env Environment variables to set for the container.
image An optional string specifying the tag of a Docker image to use. Defaults to the Prefect image. If the image is stored anywhere other than a public Docker Hub registry, use a corresponding registry block, e.g. DockerRegistry or ensure otherwise that your execution layer is authenticated to pull the image from the image registry.
image_pull_policy Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'.
image_registry A DockerRegistry block containing credentials to use if image is stored in a private image registry.
labels An optional dictionary of labels, mapping name to value.
name An optional name for the container.
networks An optional list of strings specifying Docker networks to connect the container to.
network_mode Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set.
stream_output Bool indicating whether to stream output from the subprocess to local standard output.
volumes An optional list of volume mount strings in the format of "local_path:container_path".

Prefect automatically sets a Docker image matching the Python and Prefect version you're using at deployment time. You can see all available images at Docker Hub.

KubernetesJob

KubernetesJob infrastructure executes flow runs in a Kubernetes Job.

Requirements for KubernetesJob:

  • kubectl must be available.
  • You must configure remote Storage. Local storage is not supported for Kubernetes.
  • The ephemeral Prefect API won't work with Docker and Kubernetes. You must have an Prefect server or Prefect Cloud API endpoint set in your agent's configuration.

The Prefect CLI command prefect kubernetes manifest server automatically generates a Kubernetes manifest with default settings for Prefect deployments. By default, it simply prints out the YAML configuration for a manifest. You can pipe this output to a file of your choice and edit as necessary.

KubernetesJob supports the following settings:

Attributes Description
cluster_config An optional Kubernetes cluster config to use for this job.
command A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
customizations A list of JSON 6902 patches to apply to the base Job manifest. Alternatively, a valid JSON string is allowed (handy for deployments CLI).
env Environment variables to set for the container.
finished_job_ttl The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.
image String specifying the tag of a Docker image to use for the Job.
image_pull_policy The Kubernetes image pull policy to use for job containers.
job The base manifest for the Kubernetes Job.
job_watch_timeout_seconds Number of seconds to watch for job creation before timing out (defaults to None).
labels Dictionary of labels to add to the Job.
name An optional name for the job.
namespace String signifying the Kubernetes namespace to use.
pod_watch_timeout_seconds Number of seconds to watch for pod creation before timing out (default 60).
service_account_name An optional string specifying which Kubernetes service account to use.
stream_output Bool indicating whether to stream output from the subprocess to local standard output.

KubernetesJob overrides and customizations

When creating deployments using KubernetesJob infrastructure, the infra_overrides parameter expects a dictionary. For a KubernetesJob, the customizations parameter expects a list.

Containers expect a list of objects, even if there is only one. For any patches applying to the container, the path value should be a list, for example: /spec/templates/spec/containers/0/resources

A Kubernetes-Job infrastructure block defined in Python:

customizations = [
    {
        "op": "add",
        "path": "/spec/template/spec/containers/0/resources",
        "value": {
            "requests": {
                "cpu": "2000m",
                "memory": "4gi"
            },
            "limits": {
                "cpu": "4000m",
                "memory": "8Gi",
                "nvidia.com/gpu": "1"
            }
        },
    }
]

k8s_job = KubernetesJob(
        namespace=namespace,
        image=image_name,
        image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
        finished_job_ttl=300,
        job_watch_timeout_seconds=600,
        pod_watch_timeout_seconds=600,
        service_account_name="prefect-server",
        customizations=customizations,
    )
k8s_job.save("devk8s")

A Deployment with infra-overrides defined in Python:

infra_overrides={ 
    "customizations": [
            {
                "op": "add",
                "path": "/spec/template/spec/containers/0/resources",
                "value": {
                    "requests": {
                        "cpu": "2000m",
                        "memory": "4gi"
                    },
                    "limits": {
                        "cpu": "4000m",
                        "memory": "8Gi",
                        "nvidia.com/gpu": "1"
                }
            },
        }
    ]
}

# Load an already created K8s Block
k8sjob = k8s_job.load("devk8s")

deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="s3-example",
    version=2,
    work_queue_name="aws",
    infrastructure=k8sjob,
    storage=storage,
    infra_overrides=infra_overrides,
)

deployment.apply()

ECSTask

ECSTask infrastructure runs your flow in an ECS Task.

Requirements for ECSTask:

  • The ephemeral Prefect API won't work with ECS directly. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
  • The prefect-aws collection must be installed within the agent environment: pip install prefect-aws
  • The ECSTask and AwsCredentials blocks must be registered within the agent environment: prefect block register -m prefect_aws.ecs
  • You must configure remote Storage. Local storage is not supported for ECS tasks. The most commonly used type of storage with ECSTask is S3. If you leverage that type of block, make sure that s3fs is installed within your agent and flow run environment. The easiest way to satisfy all the installation-related points mentioned above is to include the following commands in your Dockerfile:
FROM prefecthq/prefect:2-python3.9  # example base image 
RUN pip install s3fs prefect-aws

To get started using Prefect with ECS, check out the repository template dataflow-ops demonstrating ECS agent setup and various deployment configurations for using ECSTask block.

Make sure to allocate enough CPU and memory to your agent, and consider adding retries

When you start a Prefect agent on AWS ECS Fargate, allocate as much CPU and memory as needed for your workloads. Your agent needs enough resources to appropriately provision infrastructure for your flow runs and to monitor their execution. Otherwise, your flow runs may get stuck in a Pending state. Alternatively, set a work-queue concurrency limit to ensure that the agent will not try to process all runs at the same time.

Some API calls to provision infrastructure may fail due to unexpected issues on the client side (for example, transient errors such as ConnectionError, HTTPClientError, or RequestTimeout), or due to server-side rate limiting from the AWS service. To mitigate those issues, we recommend adding environment variables such as AWS_MAX_ATTEMPTS (can be set to an integer value such as 10) and AWS_RETRY_MODE (can be set to a string value including standard or adaptive modes). Those environment variables must be added within the agent environment, e.g. on your ECS service running the agent, rather than on the ECSTask infrastructure block.

Docker images

Every release of Prefect comes with a few built-in images. These images are all named prefecthq/prefect and their tags are used to identify differences in images.

Prefect agents rely on Docker images for executing flow runs using DockerContainer or KubernetesJob infrastructure. If you do not specify an image, we will use a Prefect image tag that matches your local Prefect and Python versions. If you are building your own image, you may find it useful to use one of the Prefect images as a base.

Choose image versions wisely

It's a good practice to use Docker images with specific Prefect versions in production.

Use care when employing images that automatically update to new versions (such as prefecthq/prefect:2-python3.9 or prefecthq/prefect:2-latest).

Image tags

When a release is published, images are built for all of Prefect's supported Python versions. These images are tagged to identify the combination of Prefect and Python versions contained. Additionally, we have "convenience" tags which are updated with each release to facilitate automatic updates.

For example, when release 2.1.1 is published:

  1. Images with the release packaged are built for each supported Python version (3.7, 3.8, 3.9, 3.10, 3.11) with both standard Python and Conda.
  2. These images are tagged with the full description, e.g. prefect:2.1.1-python3.7 and prefect:2.1.1-python3.7-conda.
  3. For users that want more specific pins, these images are also tagged with the SHA of the git commit of the release, e.g. sha-88a7ff17a3435ec33c95c0323b8f05d7b9f3f6d2-python3.7
  4. For users that want to be on the latest 2.1.x release, receiving patch updates, we update a tag without the patch version to this release, e.g. prefect.2.1-python3.7.
  5. For users that want to be on the latest 2.x.y release, receiving minor version updates, we update a tag without the minor or patch version to this release, e.g. prefect.2-python3.7
  6. Finally, for users who want the latest 2.x.y release without specifying a Python version, we update 2-latest to the image for our highest supported Python version, which in this case would be equivalent to prefect:2.1.1-python3.10.

Standard Python

Standard Python images are based on the official Python slim images, e.g. python:3.10-slim.

Tag Prefect Version Python Version
2-latest most recent v2 PyPi version 3.10
2-python3.11 most recent v2 PyPi version 3.11
2-python3.10 most recent v2 PyPi version 3.10
2-python3.9 most recent v2 PyPi version 3.9
2-python3.8 most recent v2 PyPi version 3.8
2-python3.7 most recent v2 PyPi version 3.7
2.X-python3.11 2.X 3.11
2.X-python3.10 2.X 3.10
2.X-python3.9 2.X 3.9
2.X-python3.8 2.X 3.8
2.X-python3.7 2.X 3.7
sha-<hash>-python3.11 <hash> 3.11
sha-<hash>-python3.10 <hash> 3.10
sha-<hash>-python3.9 <hash> 3.9
sha-<hash>-python3.8 <hash> 3.8
sha-<hash>-python3.7 <hash> 3.7
sha-<hash>-python3.7 <hash> 3.7

Conda-flavored Python

Conda flavored images are based on continuumio/miniconda3. Prefect is installed into a conda environment named prefect.

Note, Conda support for Python 3.11 is not available so we cannot build an image yet.

Tag Prefect Version Python Version
2-latest-conda most recent v2 PyPi version 3.10
2-python3.10-conda most recent v2 PyPi version 3.10
2-python3.9-conda most recent v2 PyPi version 3.9
2-python3.8-conda most recent v2 PyPi version 3.8
2-python3.7-conda most recent v2 PyPi version 3.7
2.X-python3.10-conda 2.X 3.10
2.X-python3.9-conda 2.X 3.9
2.X-python3.8-conda 2.X 3.8
2.X-python3.7-conda 2.X 3.7
sha-<hash>-python3.10-conda <hash> 3.10
sha-<hash>-python3.9-conda <hash> 3.9
sha-<hash>-python3.8-conda <hash> 3.8
sha-<hash>-python3.7-conda <hash> 3.7
sha-<hash>-python3.7-conda <hash> 3.7

Installing Extra Dependencies at Runtime

If you're using the prefecthq/prefect image (or an image based on prefecthq/prefect), you can make use of the EXTRA_PIP_PACKAGES environment variable to install dependencies at runtime. If defined, pip install ${EXTRA_PIP_PACKAGES} is executed before the flow run starts.

For production deploys we recommend building a custom image (as described below). Installing dependencies during each flow run can be costly (since you're downloading from PyPI on each execution) and adds another opportunity for failure. Use of EXTRA_PIP_PACKAGES can be useful during development though, as it allows you to iterate on dependencies without building a new image each time.

Building your Own Image

If your flow relies on dependencies not found in the default prefecthq/prefect images, you'll want to build your own image. You can either base it off of one of the provided prefecthq/prefect images, or build your own from scratch.

Extending the prefecthq/prefect image

Here we provide an example Dockerfile for building an image based on prefecthq/prefect:2-latest, but with scikit-learn installed.

FROM prefecthq/prefect:2-latest

RUN pip install scikit-learn

Choosing an Image Strategy

The options described above have different complexity (and performance) characteristics. For choosing a strategy, we provide the following recommendations:

  • If your flow only makes use of tasks defined in the same file as the flow, or tasks that are part of prefect itself, then you can rely on the default provided prefecthq/prefect image.

  • If your flow requires a few extra dependencies found on PyPI, we recommend using the default prefecthq/prefect image and setting EXTRA_PIP_PACKAGES to install these dependencies at runtime. This makes the most sense for small dependencies that are quick to install. If the installation process requires compiling code or other expensive operations, you may be better off building a custom image instead.

  • If your flow (or flows) require extra dependencies or shared libraries, we recommend building a shared custom image with all the extra dependencies and shared task definitions you need. Your flows can then all rely on the same image, but have their source stored externally. This can ease development, as the shared image only needs to be rebuilt when dependencies change, not when the flow source changes.