Антон Маркелов

Автоматизирую, поддерживаю, починяю примус

 » Home
 » Обо мне
 » CV Rus
 » Resume Eng (PDF)
 » Github
 » XML Feed

CI/CD for Deploying Flows in Prefect Cloud 2

09 Nov 2023 » devops

Initial Requirements

  • We’ve got a bunch of Python scripts representing various Flows, all packed in one repository.
  • The goal is to deploy the same Flow in the form of multiple Deployments (with different parameters and schedules).
  • Also, we need to deploy the Flows in two different environments (Dev and Prod) for testing and debugging.
  • We prefer defining everything in code rather than messing around with manual configurations in a web interface

What Prefect Offers

The Prefect documentation (https://docs.prefect.io/2.14.3/) is a bit confusing, but after some trial and error, here’s what I found out:

  • Prefect gives us two methods to execute Flows - using agents and workers (https://docs.prefect.io/2.14.3/guides/upgrade-guide-agents-to-workers/). The difference, as far as I can tell, lies in setting up the underlying infrastructure. With agents, you configure it in the infrastructure block (https://docs.prefect.io/2.14.3/concepts/infrastructure/), and for workers, it’s done through job templates. We opted for agent-based deployments to avoid tweaking our Flows too much.
  • There are three main ways to deploy our Deployments:
    1. prefect deployment build + prefect deployment apply - this lets you set parameters for each Deployment individually. It’s an a bit more legacy command that supports both agent and worker configurations.
    2. prefect deploy - it can read the prefect.yaml config file and deploy everything from there, or you can specify parameters for a single deployment, similar to the previous method. This is a newer approach that only works with workers and doesn’t support things like infrastructure blocks. Plus, it has a limitation - it can deploy either a single flow or all the flows listed in prefect.yaml, and you can’t have multiple configs like that in one project.
    3. Call the serve or to_deployment functions directly from your Flow code.

Description of Deployment Infrastructure

To keep things simple, we decided to skip storage blocks and store all Flows directly in the Docker image of the agent. This image is also used for CI:

# change to exact version if needed, e.g. 2.8.6-python3.10
# see https://hub.docker.com/r/prefecthq/prefect/tags?page=1
FROM prefecthq/prefect:2-python3.10

RUN /usr/local/bin/python -m pip install --upgrade pip

WORKDIR /opt/prefect


COPY requirements.txt /opt/prefect/requirements.txt
RUN pip install -r requirements.txt
RUN prefect block register -m prefect_aws.ecs

COPY src /opt/prefect/src/
COPY flows /opt/prefect/flows/

# default CMD for agent
# workers will be spawned with their own cmd
CMD [ "sh", "-c", "prefect agent  start -q ${PREFECT_QUEUE}" ]

For Deployments, we chose the first deployment method - using prefect deployment build. We wanted to continue defining the infrastructure alongside the Flow in code (as infrastructure blocks). The method of “deploy everything from the project” didn’t suit us because we needed to deploy flows in two different environments in different stages of development.

However, the idea of having a config file describing all deployments was quite handy. So, we came up with our own wrapper that reads our YAML config specifying deployments for a specific environment.

Here’s the script we came up with:

import yaml
import sys
import subprocess
import json
from  shlex import quote

def read_yaml_file(file_path):
        with open(file_path, 'r') as yaml_file:
            data = yaml.safe_load(yaml_file)
            return data
    except FileNotFoundError:
        print(f"Error: File '{file_path}' not found.")
    except Exception as e:
        print(f"Error: {e}")

# construct_prefect_command accepts two parameters:
# deployment - dict with deployment parameters (see examples in prefect-deployments-dev.yaml)
# ib_block - infrastructure block with AWS creds and ECS config (located in the infrastructure folder)
def construct_prefect_command(deployment, ib_block):
    # mandatory fields
    if "entrypoint" not in deployment:
        raise KeyError("'entrypoint' field is mandatory")
    if "name" not in deployment:
        raise KeyError("'name' field is mandatory")

    command = f"prefect deployment build {quote(deployment['entrypoint'])} -n {quote(deployment['name'])} -a --skip-upload -ib {quote(ib_block)}"

    # optional fields
    if "parameters" in deployment:
        command += f" --params={quote(json.dumps(deployment['parameters']))}"

    if "tags" in deployment:
        for tag in deployment["tags"]:
            command += f" --tag={quote(tag)}"

    if "schedule" in deployment:
        if "cron" in deployment["schedule"]:
            command += f" --cron={quote(deployment['schedule']['cron'])}"
        if "interval" in deployment["schedule"]:
            command += f" --interval={quote(deployment['schedule']['interval'])}"
        if "rrule" in deployment["schedule"]:
            command += f" --rrule={quote(deployment['schedule']['rrule'])}"
        if "timezone" in deployment["schedule"]:
            command += f" --timezone={quote(deployment['schedule']['timezone'])}"
        if "anchor-date" in deployment["schedule"]:
            command += f" --anchor-date={quote(deployment['schedule']['anchor-date'])}"

    if "work_pool" in deployment:
        if "name" in deployment["work_pool"]:
            command += f" --pool={quote(deployment['work_pool']['name'])}"
        if "work_queue_name" in deployment["work_pool"]:
            command += f" --work-queue={quote(deployment['work_pool']['work_queue_name'])}"

    return command

if __name__ == "__main__":

    if len(sys.argv) != 2:
        raise OSError("Script accepts exactly one argument: path to deployments YAML config")
    file_path = sys.argv[1]
    yaml_data = read_yaml_file(file_path)
    if yaml_data:
        if "ib-block" not in yaml_data:
            raise KeyError("'ib-block' field is mandatory")
        if "deployments" not in yaml_data:
            raise KeyError("'deployments' field is mandatory")

        for deployment in yaml_data["deployments"]:
            deploy_command = construct_prefect_command(deployment, yaml_data["ib-block"])
            print(f"run {deploy_command}")
            subprocess.run(deploy_command, shell=True, universal_newlines=True, check=True)

The logic is straightforward: parse the YAML file, check its structure, and convert its fields into parameters for the prefect deployment build command. The YAML file has the following structure:

ib-block: "ecs-task/dev" # Mandatory field. Credentials and parameters for ECS from the infrastructure folder

  - name: Flow1 # Mandatory field. Deployment name
    entrypoint: "examples/hello_world.py:hello" # Mandatory field. The path to the .py file containing the flow you want to deploy (relative to the root directory of your project) combined with the name of the flow function.
    parameters: # Optional field. Dict with parameters for flow
      number: 42
      name: "Do not panic!"
      env.stage: "dev"
    schedule: # Optional field. Scheduler parameters, "cron", "interval" and "rrule" are supported, see https://docs.prefect.io/2.14.3/concepts/schedules/?h=schedules#schedule-types
      cron: "0 0 * * *"
      timezone: "America/Chicago"
    tags: # Optional field. List of tags for deployment
      - "dev"
      - "some-tag1"
    version: "0.0.1" # Optional field. Deployment version
    work_pool: # Optional fields. Pool and Work Queue names
      name: "pool"
      work_queue_name: "dev"
  - name: Flow2

All of this is deployed using Github Actions (some code parts are omitted as they are not relevant to the topic):

name: Deploy to dev
      - dev 

    name: Build Prefect docker image
    runs-on: ubuntu-latest
      image: ${{ steps.docker_image.outputs.IMAGE }}

      - name: Check out code
        uses: actions/checkout@v3

      - name: Build, tag, and push docker image
        id: docker_image
          REPOSITORY: prefect2-worker
          IMAGE_TAG: dev
        run: |
          docker build -t $REGISTRY/$REPOSITORY:$IMAGE_TAG .
          docker push $REGISTRY/$REPOSITORY:$IMAGE_TAG
          echo "IMAGE=${{ env.REGISTRY }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}" >> $GITHUB_OUTPUT

    name: Prefect Blocks Upload
    runs-on: ubuntu-20.04 # need for glibc compatibility with prefect docker container
    needs: build-image

      image: "${{ needs.build-image.outputs.image }}" # run in prefect container from the first step

      PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
      WORKSPACE_NAME: "example"
      PREFECT_IMAGE: "${{ needs.build-image.outputs.image }}"

      - name: Checkout
        uses: actions/checkout@v3

      - name: Authenticate to Prefect Cloud and upload block
        run: |
          prefect config set PREFECT_API_KEY=${{ secrets.PREFECT_API_KEY }} 
          prefect config set PREFECT_API_URL=${{ secrets.PREFECT_API_URL }}
          python3 infrastructure/ecs-task-dev.py # upload infrastructure block to prefect cloud

      - name: Blocks creation finished
        run: echo "ECS block built at $(date +'%Y-%m-%dT%H:%M:%S')" >> $GITHUB_STEP_SUMMARY

    runs-on: ubuntu-20.04
    needs: [build-image, blocks]

      image: "${{ needs.build-image.outputs.image }}" # run in built prefect container from the first step

      - uses: actions/checkout@v3

      - name: Authenticate to Prefect Cloud
        run: |
          prefect config set PREFECT_API_KEY=${{ secrets.PREFECT_API_KEY }} 
          prefect config set PREFECT_API_URL=${{ secrets.PREFECT_API_URL }}

      - name: Deploy flows to Cloud
        id: build
        run: |
          python upload_deployments.py prefect-deployments-dev.yaml
          echo "Flows from prefect-deployments-dev.yaml deployed to Prefect Cloud (dev)" >> $GITHUB_STEP_SUMMARY


This solution enabled us to implement the necessary functionality without making any changes to the Flow code. Moreover, since the deployment script is written in Python and is very straightforward, we were able to hand it over to the Data team. This allows them to modify it in the future to better suit their needs.