Beliebte Suchanfragen
//

Lessons learned: Was wir in einem Jahr ML Orchestrierung mit Dagster gelernt haben

12.9.2024 | 25 Minuten Lesezeit

In einem gemeinsamen Projekt haben Tom Scholz und ich Machine Learning (ML) Services gebaut, um einem Kunden bei der Analyse von Dokumenten zu helfen. Eine Proof-Of-Concept Lösung war schnell gebaut, die es nun zu operationalisieren gilt. Hierbei war uns wichtig:

  1. Komplexität beherrschen: ML-Workflows umfassen oft mehrere Schritte wie Datenvorverarbeitung, Modelltraining, Evaluation und Deployment. Viele ML-Tasks sind voneinander abhängig. Orchestrierung sorgt dafür, dass diese Tasks in eine Reihenfolge gebracht und im besten Fall als Directed Acyclic Graph (DAG) dargestellt werden.

  2. Wiederholbarkeit und Konsistenz: Das ist besonders wichtig, um sicherzustellen, dass Modelle auf denselben Daten und Prozessen basieren. Auch wollen wir nachvollziehen können, welche Ergebnisse durch welche Daten- und Codekombinationen entstanden sind.

  3. Automatisierung: Um die Modelle inkrementell zu verbessern, automatisieren wir die typischen ML-Prozesse, wodurch wir menschliche Fehler verhindern und auch Zeit einsparen können.

  4. Überwachung und Debugging: Wir brauchen Monitoring- und Logging-Funktionen, die es einfacher machen, Probleme zu identifizieren und zu beheben, wenn etwas schiefgeht.

  5. Skalierbarkeit: Für große Datenmengen und rechenintensive Modelle ist es notwendig, ML-Workflows über mehrere Maschinen oder Cloud-Umgebungen hinweg zu skalieren.

Um diese Punkte zu gewährleisten, haben wir uns in unserem Projekt für Dagster, einem bekannten Orchestrierungs-Tool in Python entschieden. Es ist für die Entwicklung und Pflege von Datenprodukten bekannt. Kernkonzept ist die Deklaration von ausführbaren Funktionen (Ops) und Daten-Assets, die aus den Ops erzeugt werden. Diese Funktionen und Datenobjekte können als Job miteinander in Verbindung gesetzt werden.

In unserer Arbeit mit Dagster haben uns insbesondere folgende Themen beschäftigt:

Wie löse ich das Lesen und Schreiben von Daten?

Wir wollen eine Lösung finden, die es uns ermöglicht:

  • Dateien in verschiedenen Dateisystemen zu lesen und zu schreiben.
  • Dateien in verschiedenen Formaten zu lesen und schreiben.
  • I/O Operationen einfach konfigurieren zu können.
  • I/O Operationen gut testen zu können.
  • Lese- und Schreiboperationen von der Business-Logik zu trennen und an einer zentralen Stelle zu verwalten.

Wie verwende ich Code in mehreren Dagster Jobs?

Für den Kunden haben wir mehrere Dagster Jobs angelegt, um die Datenverarbeitung, das Trainieren und Evaluieren des Modells zu orchestrieren:

  1. Job
  • Daten lesen
  • Daten prozessieren
  • Feature Engineering
  • Daten schreiben
  1. Job
  • Daten lesen
  • ML Modell trainieren
  • trainiertes ML Modell mit Test-Daten verproben
  • Metriken berechnen und Modell Performance visualisieren
  • Daten schreiben

Manche dieser Prozesse sind sich im Grundsatz sehr ähnlich. Den Code für jeden Job neu zu schreiben wäre nicht sauber und würde auch parallel implementierte Tests erfordern. Das würden wir gerne vermeiden.

Wie nutze ich Dagster für stabile und robuste Software?

Im Gegensatz zu anderen ML Orchestratoren wie z.B. kubeflow ist es mit Dagster sehr einfach Code lokal und in verschiedenen Staging Umgebungen zu benutzen. Staging-Umgebungen sind Testumgebungen, die die Produktionsumgebung möglichst genau nachbilden. Mehrere Staging-Umgebungen helfen, verschiedene Szenarien und Versionsstände zu testen, bevor neue Features oder Modelle live gehen. Dadurch können Fehler frühzeitig erkannt und behoben werden, ohne die reale Anwendung zu gefährden. Mit diesem Feature wollten wir uns intensiv beschäftigen: Aus unserer sehr subjektiven Sicht hinken andere Orchestratoren darin sowohl die Bedürfnisse von Data Scientists als auch von Software Engineers zu erfüllen. In der Rolle als Data Scientist will ich mit ML Modellen experimentieren und sie iterativ verbessern. Der Code soll dafür zugänglich sein, ich will mich nicht von Funktion zu Funktion hangeln, wenn ich nur einen einzigen Konfigurationsparameter anpassen will. Am liebsten wäre mir eine UI über die ich das machen kann. Als Software Engineer will ich Code, der gut wartbar und testbar ist und den ich in verschiedenen Umgebungen ausrollen kann, also ohne viel Mehraufwand produktiv halten kann. Dagster schien diesen Spagat gut zu schaffen.

Wir wollen

  • einfach lokal debuggen können
  • keine parallelen Implementierungen schreiben müssen für verschiedene Staging-Umgebungen.
  • einfach testen

Auf diese Punkte wollen wir in unserem Blogpost eingehen. Wir rekapitulieren mit dir, welche Ideen wir dazu hatten, was wir ausprobiert haben und mit welcher Lösung wir zufrieden waren. Diese haben wir in einem Beispielprojekt dargestellt. Viel Spaß beim Lesen!

Dagster Konzepte

Schauen wir uns zunächst ein paar Dagster Grundlagen und Konzepte an, die uns dabei helfen können, diese Anforderungen zu erfüllen.

Assets

Software-defined Assets SDA stellen eine der zentralen Komponenten in Dagster dar. Bei SDAs werden Daten so behandelt, als wären sie Softwarekomponenten. Das bedeutet, dass ihre Definition, Erstellung, Verwaltung und Änderungen durch Code gesteuert werden. Anstatt Daten manuell zu bearbeiten oder zu aktualisieren, definiert man dieses Procedere im Code. Man kann sich SDAs als Artefakte vorstellen, die im Verlauf eines Dagster Jobs erstellt und bearbeitet werden. Dabei kann es sich z.B. um pandas Dataframes oder ML-Modelle handeln. Jedes SDA verfügt über folgende Charakteristika:

  • Es hat einen Asset Key, über den das Asset identifizierbar ist,
  • eine Python Funktion, die das Asset generiert und
  • ggf. Asset-Keys. Darüber kann definiert werden in welcher Verbindung Assets stehen, also welche Assets sich gegenseitig konsumieren.

Wir können auch Funktionen schreiben, die mehr als ein Asset zurückgeben. Diese Funktionen werden als Mutli-Asset bezeichnet.

Jobs

Um eine Pipeline in Dagster auszuführen und zu überwachen verwenden wir Jobs in Dagster. Ein Asset-Job materialisiert eine Auswahl von Assets und kann auf verschiedene Weisen gestartet werden: manuell über die CLI oder die Dagster UI, zu festgelegten Zeiten mittels Schedules oder bei externen Änderungen durch den Einsatz von Sensors z.B. wenn sich Daten in einer Datenbank ändern.

Resources

Dagster enthält mehrere Resource Klassen. Sie helfen dabei, externe Daten-Tools und -Dienste — wie Datenbanken, APIs, Alerting über Slack, Teams oder E-Mail in Bezug auf unseren Job-Status oder Speichersysteme — nahtlos in deine Daten-Pipelines zu integrieren. Egal, ob du Daten aus einer Quelle extrahierst, sie transformierst oder in einer anderen Speicherstelle ablegst, Dagster Ressourcen sorgen dafür, dass diese Verbindungen sicher, wiederverwendbar und einfach zu verwalten sind. Sie standardisieren und vereinfachen die Art und Weise, wie deine Pipeline-Komponenten mit externen Diensten und Werkzeugen kommunizieren.

Definitions

Definitions ist eine Klasse in Dagster, die dafür genutzt wird, um Jobs, Assets und Ressource Dagster Tools, wie dem Dagster Webservice (dagit) verfügbar zu machen.

"Sauberes" I/O Management mit Dagster

Gehen wir von einem einfachen Beispiel aus: Wir wollen Daten generieren, prozessieren und damit einen linearen Regressor trainieren. Das Prozessieren der Daten und das Trainieren des linearen Regressors bilden wir als Asset ab. Und das Generieren der Daten? Ist das auch ein Asset? Und wie speichern wir die generierten Daten? Machen wir das innerhalb der Asset-Logik? Dies hätte jedoch zur Folge, dass wir unsere Businesslogik mit I/O vermischen und so unter anderem die Test- und Wiederverwendbarkeit von Assets in verschiedenen Jobs erschwert wird. Wie nutzen wir Dagster so, dass es unsere Arbeit mit dem Code vereinfacht und nicht erschwert?

I/O Management innerhalb eines Assets

Über ein Asset ließe sich das Generieren der Daten abhandeln.

1from dagster import asset
2import pandas as pd
3import numpy as np
4
5
6@asset(
7    name="raw_training_data",
8    group_name="data_generation"
9)
10def generate_raw_training_data() -> pd.DataFrame:
11    """
12    Generate training data
13    """
14    # Features
15    X = np.random.rand(100, 1) * 10
16
17    # Zielvariable mit etwas Rauschen
18    y = 2 * X + 1 + np.random.randn(100, 1) * 2
19
20    df = pd.DataFrame(
21        data=np.concatenate((X, y), axis=1),
22        columns=['Feature', 'Target']
23    )
24
25    # Speichern der generierten Daten
26    df.to_parquet('pfad_zur_datei.parq', engine='pyarrow')
27
28    return df

Mit diesem Ansatz würden wir I/O Logik mit Businesslogik mischen. Das ist bei einem einzigen Asset an sich kein Problem. Allerdings bestanden die Dagster Jobs in unserem Kundenprojekt aus mehr als einem Asset. Aus unserer Sicht war es nun nicht mehr praktikabel in jedem Asset das Speichern der Daten explizit per df.to_parquet('pfad_zur_datei.parq', engine='pyarrow')oder etwas Ähnlichem abzuhandeln.

I/O Management per Dagster Ressource

Wir können das Generieren der Daten auch per folgender Ressource in einer core/resources/data_generator.py abhandeln.

1import numpy as np
2import pandas as pd
3from dagster import ConfigurableResource
4from pydantic import Field
5
6
7class DataGenerator(ConfigurableResource):
8    size: int = Field(description="Size of the generated dataset", default=100)
9
10    def generate_raw_training_data(self) -> pd.DataFrame:
11        """
12        Generate training data
13        """
14        # Features
15        X = np.random.rand(self.size, 1) * 10
16
17        # Zielvariable mit etwas Rauschen
18        y = 2 * X + 1 + np.random.randn(self.size, 1) * 2
19
20        return pd.DataFrame(
21            data=np.concatenate((X, y), axis=1),
22            columns=['Feature', 'Target']
23        )

Die Ressource kann dann in den Assets via Dependency Injection verwendet werden. DataGenerator wird als Input Parameter in der Funktion aufgerufen:

1from dagster import asset
2
3from core.resources.data_generator import DataGenerator
4
5
6@asset(name="preprocessed_data")
7def preprocess_data(data_generator: DataGenerator):
8    """
9    Process raw training data.
10    """
11    raw_data = data_generator.generate_raw_training_data()
12
13    # do "something" with raw_data
14
15    return raw_data

Auch ist es möglich, verschiedene Implementierungen für verschiedene Umgebungen (Test, Staging, Prod, etc.) bereitzustellen. Asset und Ressource (je nach Umgebung) müssen dann in der Dagster Job Definition aufeinander gemappt werden. Per load_assets_from_package_module laden wir alle Assets mit dem group_name Key in dem asset decorator aus einem python Modul preprocessing in unserem Repo. Wir definieren einen preprocessing_job und übergeben dabei das zuvor definierte preprocessing_job_assets.

1from dagster import Definitions, load_assets_from_package_module, define_asset_job
2from core.resources.data_generator import DataGenerator
3from core.resources.data_warehouse_api import DataWareHouseAPI
4
5from core.assets import preprocessing
6
7preprocessing_job_assets = load_assets_from_package_module(
8    package_module=preprocessing,
9    group_name="preprocessing"
10)
11
12preprocessing_job = define_asset_job(
13    name="preprocessing_job",
14    selection=preprocessing_job_assets)
15
16RESOURCES_DEV = {
17    "data_generator": DataGenerator(),
18}
19
20RESOURCES_PROD = {
21    "api_endpoint_to_large_data_warehouse": DataWareHouseAPI(),
22}
23
24resources_by_deployment_name = {
25    "DEV": RESOURCES_DEV,
26    "PROD": RESOURCES_PROD,
27}
28
29defs = Definitions(
30    assets=[
31        *preprocessing_job_assets,
32    ],
33    jobs=[
34        preprocessing_job,
35    ],
36    resources=resources_by_deployment_name["DEV"],
37)

In dem Beispiel kapseln wir Ressourcen pro Stage und verwenden in den Definitions die DEV Stage Ressourcen. Motivation könnte hier sein, dass wir in der DEV Stage unseren Code verproben und mit ihm experimentieren wollen. Einen kleinen Datensatz per DataGenerator zu erzeugen kann hierbei genügen anstatt einen großen Datensatz per DataWareHouseAPI zu laden.

Bei der Verwendung einer Ressource ist darauf zu achten, dass diese immer explizit in dem Asset, das sie verwenden soll, aufgerufen werden muss. Wird eine Ressource verwendet, um beispielsweise Daten in einen S3 Bucket zu schreiben, muss sichergestellt werden, dass diese Ressource auch tatsächlich im Asset verwendet wird und die Daten tatsächlich geschrieben werden. Dies macht es einerseits sehr eindeutig wo welche Ressource verwendet wird, andererseits den Code auch unübersichtlicher.

Wenn wir z.B. ein Multi-Asset definieren, welches zwei Assets erzeugt (ML Modell und Metriken), kann das Schreiben der Assets mittels Ressourcen schnell unübersichtlich werden. Um per Dagster Ressource entweder in einen S3 storage oder ins lokale Dateisystem zu schreiben brauchen wir mehrere Hilfsfunktionen in einer core/utils/resource_helper_functions.py. Wir schreiben

  • eine LocationConfig Klasse, die grundsätzliche Parameter enthält, um auf einen S3 Speicher zugreifen zu können.
  • eine open_location Funktion, mit der wir ein Dateisystem (lokal oder remote) öffnen und für I/O Operationen zugänglich machen und nach Schreiben auch wieder schließen.
  • eine write_csv Funktion, die einen pandas Dataframe in ein geöffnetes Dateisystem als .csv Datei schreibt.
  • eine join_fs_path Funktion, die mit Pfad Separatoren umgeht. Denn je nach Ziel-Dateisystem setzt sich der Pfad, auf den wir schreiben wollen, unterschiedlich zusammen.
1from contextlib import contextmanager
2from copy import deepcopy
3from os import path
4from typing import Any, Dict, Iterator, Tuple, Optional, Union
5
6import pandas as pd
7from fsspec import AbstractFileSystem
8from fsspec.core import url_to_fs
9from fsspec.implementations.local import LocalFileSystem
10from pydantic import BaseModel, Field
11
12FSPath = Tuple[AbstractFileSystem, str]
13
14
15class LocationConfig(BaseModel):
16    """Access description for a remote storage location. The description is targeted at fsspec_."""
17
18    uri: str = Field(description="URL to remote storage as expected by fsspec_.")
19    credentials: Dict[str, Any] = Field(
20        description="Optional credentials to be passed as filesystem arguments to fsspec_."
21    )
22
23
24@contextmanager
25def open_location(config: Union[LocationConfig, Dict[str, Any]]) -> Iterator[FSPath]:
26    """
27    Creates a filesystem and path from configuration as a single context manager.
28    The filesystem is "closed" (i.e. open connections are closed) when the context is left.
29    """
30
31    parsed_config = (
32        config
33        if isinstance(config, LocationConfig)
34        else LocationConfig(**config)
35    )
36
37    credentials = deepcopy(parsed_config.credentials)
38    filesystem, path = url_to_fs(parsed_config.uri, **credentials)
39    try:
40        yield filesystem, path
41    finally:
42        del filesystem
43
44
45def write_csv(
46        data: pd.DataFrame,
47        filepath: str,
48        file_system: Optional[AbstractFileSystem] = None,
49        **kwargs,
50):
51    """
52    Writes Dataframe to csv file with optional AbstractFileSystem given
53
54    Args:
55        data: Dataframe to write to csv file
56        filepath: Path to save the csv file to
57        file_system: Allow the function to be used with different file systems; default = local
58        **kwargs: additional arguments for data.to_csv function
59    """
60    cur_fs: AbstractFileSystem = file_system or LocalFileSystem()
61    cur_fs.mkdirs(
62        path.dirname(filepath),
63        exist_ok=True,
64    )
65    with cur_fs.open(filepath, "w", encoding="utf-8") as file:
66        data.to_csv(file, index=False, **kwargs)
67
68
69def join_fs_path(file_system: AbstractFileSystem, *paths: str) -> str:
70    """Returns joined given paths with the fsspec specific path seperator"""
71    paths = [path for path in paths if len(path) > 0]
72    return file_system.sep.join(paths)

Die credentials zum Schreiben in einen S3 storage kommen aus Umgebungsvariablen. Diese lösen wir per OmegaConf (oc) auf. Dank der Location Klasse können wir schon ganz gut zwischen S3 und unserem lokalen Dateisystem hin- und herwechseln. Wir müssen dafür das filesystem Attribut entweder auf s3 oder lokal setzen. Hierbei nutzen wir fsspec. fsspec ist ein Projekt, zur Bereitstellung einer einheitlichen Python-Schnittstelle für lokale und Remote Dateisysteme und stellt eine Abstraktionsschicht über verschiedene Dateisysteme wie z.B. S3 hinweg bereit. Das ermöglicht es uns ohne Boilerplatecode sowohl in S3 als auch ins lokale Dateisystem zu schreiben. Die Location Klasse vererben wir an ModelLocation in core/resources.py, die sich um das Speichern eines ML Modells und von Modell Metriken (pandas Dataframe) als .csv Datei kümmert.

1import os
2from os import path
3from tempfile import TemporaryDirectory
4
5import joblib
6import pandas as pd
7from dagster import ConfigurableResource
8from omegaconf import OmegaConf
9from pydantic import Field
10
11from core.utils.generate_experiemt_folder import generate_experiment_folder
12from core.utils.resource_helper_functions import open_location, join_fs_path, write_csv
13
14s3_credentials = {
15    "key": "${oc.env:MINIO_ACCESS_KEY,null}",
16    "secret": "${oc.env:MINIO_SECRET_KEY,null}",
17    "client_kwargs": {"endpoint_url": "${oc.env:MINIO_HOST,null}"}
18}
19
20
21class Location(ConfigurableResource):
22    """Dagster resource to write and read data"""
23    filesystem: str = "s3"
24    filepath: str
25    credentials: dict = s3_credentials
26
27    def get_credentials(self) -> dict:
28        cfg = OmegaConf.create(
29            self.credentials
30        )
31
32        return OmegaConf.to_container(cfg, resolve=True)
33
34    def get_location(self) -> dict:
35        return {"uri": self.uri,
36                "credentials": self.get_credentials()}
37
38    @property
39    def uri(self) -> str:
40        if self.filesystem != "local":
41            return f"{self.filesystem}://{self.filepath}"
42        return self.filepath
43
44
45class ModelLocation(Location):
46    """Dagster resource to read and write the trained model and metrics collected while training."""
47    filepath: str = Field("artifacts/models")
48    metrics_filename: str = Field("metrics_test.csv")
49    trained_model_experiment_folder: str = Field(default_factory=generate_experiment_folder)
50
51    @property
52    def uri(self) -> str:
53        if self.filesystem != "local":
54            return f"{self.filesystem}://{self.filepath}/{self.trained_model_experiment_folder}"
55        return f"{self.filepath}/{self.trained_model_experiment_folder}"
56
57    def save_model(self, model):
58        """Write model."""
59        with TemporaryDirectory() as tmp_path:
60            file_path = path.join(tmp_path, "model")
61            joblib.dump(model, file_path)
62
63            with open_location(self.get_location()) as (target_fs, model_output_folder):
64                target_fs.makedirs(model_output_folder, exist_ok=True)
65
66                for filepath in os.listdir(tmp_path):
67                    remote_file = join_fs_path(target_fs, model_output_folder, filepath)
68                    with open(file_path, "rb") as file_to_read, target_fs.open(remote_file, "wb") as file_to_write:
69                        f = file_to_read.read()
70                        file_to_write.write(f)
71
72    def save_metrics(self, metrics_df: pd.DataFrame, filename: str):
73        """Write metrics pandas Dataframe to csv file."""
74        metrics_location = self.get_location()
75        with open_location(metrics_location) as (output_filesystem, stats_output_folder):
76            write_csv(
77                data=metrics_df,
78                file_system=output_filesystem,
79                filepath=join_fs_path(
80                    output_filesystem,
81                    stats_output_folder,
82                    filename
83                )
84            )

Nun muss die ModelLocation Ressource explizit mit ihrer save_metrics Methode aufgerufen werden in einem Multi-Asset, das wir in einer core/assets.py anlegen und, welches das Modell-Asset und das Metriken-Asset erzeugt.

1import pandas as pd
2from dagster import multi_asset, AssetIn, AssetOut
3from sklearn.linear_model import LinearRegression
4
5from core.resources import ModelLocation
6from core.utils.calculate_metrics import calculate_metrics
7
8
9@multi_asset(
10    outs={"model": AssetOut(),
11          "metrics": AssetOut()},
12    group_name="model_training",
13    ins={
14        "train_data": AssetIn(),
15        "test_data": AssetIn()
16    }
17)
18def train_model(
19        model_location: ModelLocation,
20        train_data: pd.DataFrame,
21        test_data: pd.DataFrame,
22):
23    """Train model with the training data subsets."""
24
25    model = LinearRegression()
26    model.fit(
27        train_data[["Feature"]].to_numpy(),
28        train_data[["Target"]].to_numpy()
29    )
30    model_location.save_model(model)
31
32    y_pred = model.predict(test_data[["Feature"]].to_numpy())
33
34    metrics = calculate_metrics(
35        test_data[["Target"]].to_numpy(),
36        y_pred,
37        test_data
38    )
39    model_location.save_metrics(metrics, "metrics.csv")
40
41    return model, metrics

Auch in der definitions.py dürfen wir nicht vergessen, dass die ModelLocation für das Schreiben von Modell Daten verwendet werden soll:

1import os
2
3from dagster import load_assets_from_current_module, define_asset_job, Definitions
4
5from core.assets import train_model
6from core.resources import ModelLocation
7
8assets = load_assets_from_current_module()
9
10training_job = define_asset_job(
11    name="training_job",
12    selection=[
13        train_model
14    ]
15)
16
17RESOURCES_DEV = {
18    "model_location": ModelLocation(),
19}
20
21resources_by_deployment_name = {
22    "DEV": RESOURCES_DEV
23}
24
25deployment_name = os.getenv("DEPLOYMENT_NAME", "DEV")
26
27defs = Definitions(
28    assets=assets,
29    resources=resources_by_deployment_name[deployment_name],
30    jobs=[
31        training_job
32    ]
33)

Bei vielen Datenquellen kann es schnell unübersichtlich werden, wenn wir I/O Operationen über Ressourcen abhandeln und diese an vielen Stellen angefordert werden. Auch kann es vorkommen, dass die gleiche Ressource an verschiedenen Stellen unterschiedlich konfiguriert werden soll, was zu Fehlern und Verwirrung führen kann.

Für unseren Geschmack erzeugt dieser Ansatz unnötigen Overhead und vermischt Businesslogik mit I/O Operationen, die in sauberen Daten Pipelines getrennt voneinander existieren sollten. Diese Problematik lässt sich durch die Verwendung von I/O Managern lösen.

I/O Management per Dagster I/O Manager

I/O Manager kapseln und verwalten die von einem Asset erzeugten oder benötigten Daten. Assets brauchen nur anzugeben, dass sie einen I/O Manager verwenden und erhalten dann die Daten von diesem Manager. Zum Beispiel legt Dagster pro Job Ausführung einen Ordner an und schreibt die erzeugten Assets im Pickle Format dort hinein über den Standard Dagster I/O Manager. In folgendem Beispiel wird das asset raw_training_data über den Standard Dagster I/O Manager gepickelt und geschrieben. Hierfür müssen wir keinen dezidierten I/O Manager angeben.

1from dagster import asset
2import pandas as pd
3
4
5@asset(
6    name="raw_training_data",
7    group_name="data_generation"
8)
9def generate_raw_training_data() -> pd.DataFrame:
10    """
11    Generate training data
12    """
13
14    pass

Folgende Abbildung zeigt wie das Zusammenspiel von Ressourcen, I/O Managern und Assets in einem sauberen Dagster Job aussehen könnte.

Abbildung: Zusammenspiel von Ressourcen, I/O Managern und Assets in einer Dagster Pipeline.

Damit ein Asset einen gesonderten I/O-Manager verwendet, wird dieser über den io_manager_key dem Asset zugewiesen. Diese Zuweisung wird in der Pipeline-Definition aufgelöst.

1from dagster import asset
2import pandas as pd
3
4
5@asset(
6    name="raw_training_data",
7    group_name="data_generation",
8    io_manager_key="pandas_parquet_io_manager"
9
10)
11def generate_raw_training_data() -> pd.DataFrame:
12    """
13    Generate training data
14    """
15
16    pass

Ähnlich wie bei Ressourcen können wir verschiedene built-in I/O Manager von Dagster nutzen, um unterschiedliche Speicherorte und Dateiformate zu unterstützen. Im folgenden Abschnitt stellen wir die verschiedenen I/O-Manager vor und gehen auf ihre Vor- und Nachteile ein.

FilesystemIOManager

Der FilesystemIOManager ist der Standard-I/O-Manager in Dagster, welcher die Werte, die von einem Asset generiert werden, als Pickle-Dateien im lokalen Dateisystem abspeichert und einliest.

Dieser erfordert so gut wie keine Konfiguration und ist extrem einfach zu verwenden. Aufgrund seiner Beschränkung auf das lokale Dateisystem ist er allerdings nicht für den Einsatz in verteilten oder Cloud Umgebungen geeignet.

S3PickleIOManager

Die dagster-aws Bibliothek kommt mit einem S3PickleIOManager, welcher speziell für das Lesen und Schreiben von Pickle-Dateien in S3 entwickelt wurde. Er ist eine gute Wahl, wenn wir von einem lokalen Dateisystem auf S3 umsteigen wollen und erfordert nur eine minimale Konfiguration. Alle Werte werden als Pickle-Dateien gespeichert, was jedoch einige Nachteile mit sich bringt, die wir im nächsten Abschnitt besprechen werden.

Python Pickle

An dieser Stelle möchten wir kurz auf Python Pickle eingehen, da die Standardprozedur in Dagster Assets das Speichern und Laden als Pickle-Dateien ist. Sowohl der FilesystemIOManager als auch der S3PickleIOManager verwenden Pickle zur Serialisierung und Deserialisierung von Python Objekten. Pickle kann verwendet werden, um Python-Objekte in eine Byte-Struktur zu konvertieren, welche in einer Datei gespeichert oder über das Netzwerk gesendet werden können.

Hier ist ein Beispiel, wie Objekte mit Pickle serialisiert und deserialisiert werden können:

1import tempfile
2import pickle
3from datetime import datetime
4from dataclasses import dataclass
5
6
7@dataclass
8class WeatherEntry:
9    celcius: float
10    date: datetime
11    location: str
12
13
14weather_entries = [
15    WeatherEntry(celcius=20.4, date=datetime(2022, 6, 1, 12, 0), location="Berlin"),
16    WeatherEntry(celcius=0.4, date=datetime(2021, 1, 1, 12, 0), location="Hamburg"),
17]
18
19with tempfile.TemporaryDirectory() as tmpdirname:
20    filename = f"{tmpdirname}/weather_data.pkl"
21
22    with open(filename, "wb") as file:
23        pickle.dump(weather_entries, file)
24
25    with open(filename, "rb") as file:
26        weather_entries_from_file = pickle.load(file)
27
28print(weather_entries_from_file)
29assert weather_entries == weather_entries_from_file

Pickle ist aus gutem Grund eine beliebte Wahl für die Serialisierung in Python.

  • universell: Fast jedes Python-Objekt kann mit Pickle serialisiert und deserialisiert werden.
  • built-in: Als eingebautes Python Modul erfordert Pickle keine externen Bibliotheken oder Tools.
  • einfache Verwendung: Mit Pickle ist Serialisierung so einfach wie die Verwendung von pickle.dump und Deserialisierung so einfach wie pickle.load (siehe Beispiel oben).

Trotz seiner Nutzerfreundlichkeit weist Pickle einige bedeutende Nachteile auf:

  • Sicherheitsrisiko: Das größte Problem bei der Verwendung von Pickle besteht darin, dass es ein potenzielles Sicherheitsrisiko darstellt. Jedes Mal, wenn ein Objekt deserialisiert wird, besteht die Gefahr, dass ein verborgener bösartiger Code ausgeführt wird. Die Python-Dokumentation warnt daher ausdrücklich davor, Pickle-Dateien aus nichtvertrauenswürdigen Quellen zu laden Dokumentation.
  • Portabilität: Pickle-Dateien sind in Python-Implementierungen festgelegt und nicht einfach über verschiedene Programmiersprachen hinweg kompatibel. Pickle speichert außerdem nicht die Klasse eines Objekts, sondern nur die Daten, die das Objekt repräsentieren. Aus diesem Grund kann Code bei der Verwendung in anderen Projekten oder nach Refactorings auf verschiedene Weise fehlschlagen.
  • Wartbarkeit: Da Pickle-Dateien binäre Darstellungen von Python Objekten sind, sind sie nicht menschenlesbar, was die Fehlersuche und Wartung erschwert.

Aufgrund dieser Nachteile empfehlen wir Pickle zu vermeiden. Standardisierte, nicht ausführbarer Dateiformate wie JSON, CSV oder Parquet stellen eine sicherere Alternative dar. Diese Formate sind dank ihrer breiten Unterstützung durch viele Bibliotheken auch leichter in unterschiedlichen Umgebungen und sogar in anderen Programmiersprachen zu verwenden.

Zusammengefasst: Pickle ist am besten für den internen Gebrauch geeignet, wenn Geschwindigkeit und Einfachheit Priorität haben und die Datenquellen als sicher gelten. In allen anderen Situationen sind standardisierte Datenformate vorzuziehen, um Sicherheit, Kompatibilität und Wartbarkeit zu gewährleisten.

S3Resource

Da wir aus den oben genannten Gründen Pickle vermeiden wollen und unsere Assets in anderen Formaten speichern wollen, haben wir uns mit der S3Resource beschäftigt, die ebenfalls von der dagster-aws Bibliothek bereitgestellt wird und einen Wrapper um einen boto3 S3 Client darstellt. Sie wird ebenfalls vom S3PickleIOManager verwendet und ist relativ einfach zu konfigurieren und zu verwenden.

Unsere Überlegung war es, dass wir wie im S3PickleIOManager eine S3Resource für verschiedene eigene I/O Manager wiederverwenden können. Die Zugangsdaten zum S3 storage werden per EnvVar aus der .env Datei gelesen.

Warnung: Zugangsdaten niemals direkt in die Konfiguration schreiben, die an Dagster übergeben wird. Die gesamte Konfiguration einschließlich der Zugangsdaten wird geloggt.

Warnung: Zugangsdaten niemals in Dateien schreiben, die in das Versionskontrollsystem hochgeladen werden.

Das ganze sieht dann so aus:

1from dagster import Definitions, ConfigurableIOManager, ResouceDependency, EnvVar
2from dagster_aws.s3 import S3Resource
3
4s3_resource = S3Resource(
5    aws_access_key_id=EnvVar('MINIO_ACCESS_KEY'),
6    aws_secret_access_key=EnvVar('MINIO_SECRET_KEY'),
7    endpoint_url=EnvVar('MINIO_HOST')
8)
9
10
11class S3CsvIOManager(ConfigurableIOManager):
12    s3_resource: ResouceDependency[S3Resource]
13    s3_bucket: str
14    s3_prefix: str
15
16    def handle_output(self, context, obj):
17        self.s3_resource.get_client().put_object(
18            Bucket=self.s3_bucket,
19            Key=f"{self.s3_prefix}/output.csv",
20            Body=obj
21        )
22
23    def load_input(self, context):
24        response = self.s3_resource.get_client().get_object(
25            Bucket=self.s3_bucket,
26            Key=f"{self.s3_prefix}/output.csv"
27        )
28
29        return response["Body"].read()
30
31
32defs = Definitions(
33    # ...
34    resources={
35        "csv_io_manager": S3CsvIOManager(
36            s3_resource=s3_resource,
37            s3_bucket="dagster",
38            s3_prefix="csv_io_manager/"
39        ),
40    }
41)

boto3 ist eine sehr mächtige Bibliothek, die viele Funktionen und Konfigurationen bietet, um mit verschiedensten AWS Services, nicht nur S3, zu interagieren. Für unsere Zwecke benötigen wir jedoch nur die grundlegenden Funktionen zum Lesen und Schreiben von Objekten nach S3.

Ein Nachteil der Verwendung der S3Resource ist allerdings, dass wir an S3 gekoppelt sind. Jedes neue Dateisystem würde eine andere Ressource erfordern. Statt der S3Resource können wir wie im Beispiel für die dagster Ressource (ModelLocation) auch fsspec verwenden, um I/O Operationen auf verschiedenen Dateisystemen zu kapseln. Im nächsten Abschnitt werden wir sehen, wie wir fsspec verwenden können, um die Implementierung von I/O Managern zu vereinfachen.

UPathIOManager

Der UPathIOManager ist ein I/O-Manager, der Teil von Dagster ist und welcher die fsspec-Bibliothek verwendet. Durch die Verwendung von fsspec abstrahiert der UPathIOManager die Funktionalität verschiedener Dateisysteme und ermöglicht es uns, in willkürliche Dateisysteme zu schreiben. Wir sind weniger festgelegt als mit dem S3CsvIOManager.

Eine einfache Implementierung eines UPathIOManager könnte so aussehen:

1from typing import Any
2
3from dagster import UPathIOManager, OutputContext, InputContext
4from upath import UPath
5
6
7class OneOfManyUPathIOManager(UPathIOManager):
8    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
9        with path.open("wb") as file:
10            # Write obj to an abstract filesystem
11            pass
12
13    def load_from_path(self, context: InputContext, path: UPath) -> Any:
14        with path.open("rb") as file:
15            # Read file from an abstract filesystem, parse and return it
16            pass
17
18
19class SecondOfManyUPathIOManager(UPathIOManager):
20    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
21        # ...
22        pass
23
24    def load_from_path(self, context: InputContext, path: UPath) -> Any:
25        # ...
26        pass

UPathIOManager ist eine abstrakte Python Klasse und muss deswegen vererbt und um die Methoden dump_to_path und load_from_path erweitert werden. Die beiden Methoden erhalten ein UPath Objekt, welches den Ort einer Datei in einem Dateisystem enthält, die gelesen oder geschrieben werden soll. Der UPath enthält auch die Konfigurationsparameter für das Dateisystem, wie z.B. im Falle von S3 den Bucket-Namen, das Präfix und die Zugangsdaten. Welche Parameter genau benötigt werden, hängt von dem jeweiligen Ziel-Dateisystem der fsspec Implementierung ab. In unserem Fall verwenden wir fsspec mit s3fs.

In der Pipeline-Definition können wir dann unsere eigenen I/O-Manager verwenden, mit denen wir unsere Assets speichern und laden.

1from dagster import Definitions, EnvVar
2from upath import UPath
3
4from core.resources.io_manager import OneOfManyUPathIOManager, SecondOfManyUPathIOManager
5
6defs = Definitions(
7    # ...
8    resources={
9        "parquet_io_manager": OneOfManyUPathIOManager(
10            base_path=UPath(
11                f"s3://my-bucket/my-prefix",
12                key=EnvVar('MINIO_ACCESS_KEY'),
13                secret=EnvVar('MINIO_SECRET_KEY'),
14                client_kwargs={
15                    "endpoint_url": EnvVar('MINIO_HOST')
16                }
17            )
18        ),
19        "model_io_manager": SecondOfManyUPathIOManager(
20            base_path=UPath(
21                f"s3://my-bucket/my-prefix",
22                key=EnvVar('MINIO_ACCESS_KEY'),
23                secret=EnvVar('MINIO_SECRET_KEY'),
24                client_kwargs={
25                    "endpoint_url": EnvVar('MINIO_HOST')
26                }
27            )
28        )
29    }
30)

ConfigurableIOManagerFactory

Damit wir allerdings nicht bei jedem IOManager die gleichen Konfigurationsparameter übergeben müssen (s.o), können wir eine Factory verwenden, welche die Konfiguration für uns übernimmt und einen UPathIOManager erstellt. Diese Factory erbt von ConfigurableIOManagerFactory und ist so konzipiert, dass sie erweitert werden kann und dann die spezifischen I/O-Manager-Typen erzeugt.

Das sieht dann so aus:

1from abc import ABC, abstractmethod
2
3from dagster import ResourceDependency, ConfigurableIOManagerFactory
4from dagster_aws.s3.resources import ResourceWithS3Configuration
5from pydantic import Field
6from upath import UPath
7
8
9class S3UPathIOManagerFactory(ConfigurableIOManagerFactory, ABC):
10    s3_bucket: str = Field(description="The S3 bucket to use for the IO manager.")
11    s3_prefix: str = Field(description="The S3 prefix to use for the IO manager.")
12    s3_configuration: ResourceDependency[ResourceWithS3Configuration]
13
14    @abstractmethod
15    def io_manager_class(self):
16        raise NotImplementedError
17
18    def create_io_manager(self, context):
19        return self.io_manager_class()(
20            base_path=UPath(
21                f"s3://{self.s3_bucket}/{self.s3_prefix}",
22                key=self.s3_configuration.aws_access_key_id,
23                secret=self.s3_configuration.aws_secret_access_key,
24                client_kwargs={
25                    "endpoint_url": self.s3_configuration.endpoint_url
26                }
27            )
28        )

Nun müssen wir noch eine Klasse erstellen, die von S3UPathIOManagerFactory erbt, welche die Methode io_manager_class implementiert und den spezifischen IOManager-Typ zurückgibt, den wir verwenden wollen:

1import pandas as pd
2from dagster import UPathIOManager, OutputContext, InputContext
3from upath import UPath
4
5from core.resources.io_manager_factories import S3UPathIOManagerFactory
6
7
8class PandasParquetIOManager(UPathIOManager):
9    extension: str = ".pq"
10
11    def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
12        context.add_output_metadata({
13            "row_count": len(obj)
14        })
15
16        with path.open("wb") as file:
17            obj.to_parquet(file)
18
19    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
20        with path.open("rb") as file:
21            return pd.read_parquet(file)
22
23
24class S3PandasParquetIOManager(S3UPathIOManagerFactory):
25    def io_manager_class(self):
26        return PandasParquetIOManager

So können wir pandas Dataframes sowohl lokal als auch in einen S3 storage schreiben. In der Pipeline-Definition können wir dann die S3PandasParquetIOManager-Klasse verwenden, um einen für S3 konfigurierten Pandas Parquet I/O-Manager zu erstellen:

1from dagster import Definitions, EnvVar
2from dagster_aws.s3 import S3Resource
3
4from core.resources.pandas_parquet_io_manager import S3PandasParquetIOManager
5
6s3_resource = S3Resource(
7    aws_access_key_id=EnvVar('MINIO_ACCESS_KEY'),
8    aws_secret_access_key=EnvVar('MINIO_SECRET_KEY'),
9    endpoint_url=EnvVar('MINIO_HOST')
10)
11
12defs = Definitions(
13    # ...
14    resources={
15        "pandas_parquet_io_manager": S3PandasParquetIOManager(
16            s3_bucket="dagster",
17            s3_prefix="pandas_parquet_io_manager/",
18            s3_configuration=s3_resource
19        )
20    }
21)

Weitere I/O Manager

Dagster bietet eine Vielzahl von I/O-Managern, die speziell für verschiedene Dateiformate und Speicherorte entwickelt wurden. Die vollständige Liste der I/O-Manager, die in Dagster verfügbar sind, findest du hier

Hier sind einige Beispiel-I/O-Manager Ideen, die wir für verschiedene Anwendungsfälle verwenden könnten:

NamePython DatentypOutputAnwendung
PandasCSVIOManagerpd.DataFrame.csv DateiKleinere Datenexporte
PandasParquetIOManagerpd.DataFrame.pq DateiGrößere Dataframes
YamlIOManagerdict, list, tuple, str, etc..yaml DateiKonfigurationen
PostgresIOManagerpd.DataFrame, dict, list, etc.Postgres-TabelleDatenbank-Import/Export
TorchScriptIOManagertorch.nn.Module.pt DateiPyTorch Modelle

Die Auswahl des richtigen I/O-Managers hängt von den spezifischen Anforderungen und dem Anwendungsfall ab und kann je nach Projekt variieren.

Wie verwende ich Assets in mehreren Pipelines?

In unserem Projekt haben wir mehrere Jobs geschrieben, die sich zum Teil sehr geähnelt haben. Hier hat es sich angeboten, Assets zwischen Dagster Jobs zu teilen. So wollten wir unseren code sauber und non-repetitiv halten. Beispielsweise könnte ein predictions-Asset in mehreren Jobs z.B. Inferenz und Evaluation, verwendet werden, um Vorhersagen mit einem ML-Modell zu treffen.

In solchen Fällen können wir das Asset in einem separaten Ordner für Assets, die in mehreren Jobs verwendet werden sollen, anlegen.

Die Struktur könnte so aussehen:

core/assets/shared/<name_of_shared_asset>.py

Wenn wir beispielsweise ein predictions-Asset haben, könnten wir es in einer Datei predict.py im Ordner core/assets/shared ablegen. In dieser Datei können wir die predict-Funktion definieren und gegebenenfalls eine Konfiguration für das Asset bereitstellen. Das Asset wird nicht mit einem @asset-Decorator deklariert, sondern als normale Funktion, da es sonst zu Konflikten mit den Assets in den Jobs kommen kann. Denn ein Asset kann immer nur eine Konfiguration haben.

So könnte die predict.py Datei aussehen:

1from dagster import AssetExecutionContext, Config
2from pydantic import Field
3
4
5class PredictConfig(Config):
6    threshold: float = Field(
7        description="Predictions above the threshold are positive predictions. Predictions below the threshold are negative predictions.",
8        default_value=0.5
9    )
10
11
12def predict(context: AssetExecutionContext, config: PredictConfig):
13    pass

In den Jobs, in denen wir die predict-Funktion verwenden möchten, können wir sie einfach importieren und aufrufen:

1from dagster import AssetExecutionContext, asset, AssetIn
2
3from core.assets.shared import predict, PredictConfig
4
5
6class EvaluationPredictConfig(PredictConfig):
7    threshold: float = 0.8
8
9
10@asset(
11    name="evaluation_predictions",
12    ins={
13        "trained_model": AssetIn("evaluation_trained_model"),
14        "prediction_data": AssetIn("batched_evaluation_data")
15    }
16)
17def evaluation_predictions(context: AssetExecutionContext, config: EvaluationPredictConfig):
18    return predict(context, config)

In Fällen, in denen unser Asset keine Konfiguration benötigt, können wir es einfach importieren und verwenden. Dazu importieren wir die Funktion und markieren sie als Asset:

1from dagster import asset, AssetIn
2
3
4def with_different_value_no_config(x):
5    pass
6
7
8asset(
9    ins={"input": AssetIn()},
10)(with_different_value_no_config)

Wie können wir unsere Jobs testen?

Um dafür zu sorgen, dass unsere Jobs sicher und langfristig wartbar laufen, sind Tests essenziell. Dafür testen wir auf mehreren Ebenen:

Mit Unit-Tests validieren wir

  1. die Geschäftslogik, die die Assets der Jobs herstellt.
  2. die Helfer-Funktionen, die in der Geschäftslogik verwendet werden.
  3. die Methoden der IOManager, die wir geschrieben haben.

In dieser Art und Weise können wir beispielsweise die Funktion testen, die für das Training des Modells verantwortlich ist.

1import pandas as pd
2from dagster import build_asset_context
3from sklearn.linear_model import LinearRegression
4from sklearn.utils.validation import check_is_fitted
5
6from core.assets.training.trained_model import train_model
7
8
9def test_train_model():
10    with build_asset_context() as context:
11        train_data = pd.DataFrame({
12            "Feature": [1, 2, 3, 4, 5],
13            "Target": [2, 4, 6, 8, 10]
14        })
15        test_data = pd.DataFrame({
16            "Feature": [6, 7, 8],
17            "Target": [12, 14, 16]
18        })
19
20        model = train_model(context, train_data, test_data)
21        assert isinstance(model, LinearRegression)
22        check_is_fitted(model)

Im nächsten Schritt testen wir die Integration von mehreren Pipeline-Komponenten im Verbund. Zuerst können wir hierzu testen, ob die Dagster Definitionen, also das Mappen von Assets und Ressourcen in Jobs funktionieren. Dafür legen wir eine test_definitions.py an:

1from core.definitions import defs
2
3
4def test_defs_can_load():
5    # sanity check
6    loaded_jobs = defs.get_all_job_defs()
7    expected_job_names = [
8        "__ASSET_JOB",  # internal dagster job. always present.
9        "training_job",
10        "preprocessing_job"
11    ]
12
13    for job in loaded_jobs:
14        assert job.name in expected_job_names
15        expected_job_names.remove(job.name)
16
17    # No need to check if resources are loaded, as dagster will throw an error if they are not.

Mit diesem Test stellen wir sicher, dass alle Komponenten von Dagster ineinander greifen ohne die Dagster UI hochfahren zu müssen. Hier wird überprüft, ob alle Jobs identifiziert wurden und ob die Dagster Ressourcen richtig in den Definitionen angegeben wurden.

Zuletzt testen wir, ob wir unsere Jobs tatsächlich ausführen können, indem wir eine test_jobs.py anlegen. Hierfür führen wir die Pipelines in-memory durch. Das heißt, kein unnötiger Datenmüll wird produziert. Außerhalb des Testens greift der training Job auf Assets zu, die im preprocessing Job erstellt werden. Im Test-Szenario wollen wir dies auch abbilden. So ist das Test-Szenario sehr nahe am eigentlichen Betrieb und wir können auf den Mehraufwand verzichten, Testdaten oder Testumgebungen manuell vorbereiten zu müssen. Deshalb rufen wir sowohl in test_preprocessing als auch in test_training die DataGenerator Ressource auf und in test_training zudem den preprocessing Job.

1from dagster import load_assets_from_package_module, materialize_to_memory
2
3from core.resources.data_generator import DataGenerator
4
5
6def test_training():
7    from core.assets import preprocessing, training
8
9    result = materialize_to_memory(
10        resources={
11            "data_generator": DataGenerator(),
12        },
13        assets=[
14            *load_assets_from_package_module(preprocessing, "preprocessing"),
15            *load_assets_from_package_module(training, "training"),
16        ],
17    )
18
19    assert result.success

Das Testing ist sehr elegant mit Dagster, da es uns erlaubt alle Ebenen des Testing nahtlos lokal und in allen Staging Umgebungen auszuführen.

Fazit

In diesem Artikel haben wir uns mit den verschiedenen Methoden beschäftigt, wie wir Daten in einem Dagster Job lesen und schreiben können. Wir haben gesehen, dass Dagster verschiedene Konzepte wie Ressourcen und I/O Manager bietet, um die Business-Logik von anderen Services und I/O Operationen zu trennen.

Die Verwendung von eigenen I/O Managern ist eine der besten Methoden, um das Lesen und Schreiben von Daten in einem Job zu handhaben. I/O Manager kapseln die Logik zum Lesen und Schreiben von Daten und ermöglichen es uns, uns auf die Business-Logik zu konzentrieren. Mit der Verwendung von UPathIOManager können wir Dateien sowohl von/zu lokalen Dateisystemen als auch von Remote-Dateisystemen lesen und schreiben, ohne separate Implementierungen schreiben und pflegen zu müssen. Wir können so einen "Write once, use anywhere" Ansatz verfolgen.

Wir haben gelernt, dass Dagster alle Outputs standardmäßig als Pickle-Dateien im lokalen Dateisystem speichert und lädt. Pickle ist eine einfache und schnelle Möglichkeit, Python-Objekte zu serialisieren und zu deserialisieren, aber aufgrund seiner Sicherheitsrisiken und seiner mangelnden Portabilität nicht für alle Anwendungsfälle geeignet.

Auch haben wir gesehen, dass es möglich ist, Assets in mehreren Jobs zu verwenden, indem wir sie in einem separaten Ordner für gemeinsam genutzte Assets ablegen und sie in den Jobs importieren und verwenden.

Zudem haben wir gesehen, dass es wichtig ist, unsere Pipelines zu testen, um sicherzustellen, dass sie robust und wartbar sind. Wir haben verschiedene Teststrategien für das Testen von Dagster Jobs vorgestellt, die von Unit-Tests bis hin zu Integrationstests für das Testen von Jobs und Definitionen reichen.

Auf GitHub haben wir ein Beispielprojekt erstellt, in dem wir die in diesem Artikel vorgestellten Konzepte implementiert haben. Wir hoffen, dass dieser Artikel dir geholfen hat, ein besseres Verständnis für die Verwendung von Dagster in deinen Projekten zu entwickeln und dir dabei hilft, saubere Pipelines zu schreiben.

Beitrag teilen

//

Weitere Artikel in diesem Themenbereich

Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.