Přeskočit na obsah
_CORE
AI & Agentic Systems Core Informační Systémy Cloud & Platform Engineering Data Platforma & Integrace Security & Compliance QA, Testing & Observability IoT, Automatizace & Robotika Mobile & Digital Banky & Finance Pojišťovnictví Veřejná správa Obrana & Bezpečnost Zdravotnictví Energetika & Utility Telco & Média Průmysl & Výroba Logistika & E-commerce Retail & Loyalty
Reference Technologie Blog Knowledge Base O nás Spolupráce Kariéra
Pojďme to probrat

Dagster — moderní orchestrace s asset-based přístupem

01. 01. 2024 1 min čtení intermediate

Dagster staví na konceptu software-defined assets. Místo tasků popisujete datové assety a Dagster automaticky odvozuje pipeline.

Proč Dagster

Dagster se zaměřuje na assety (co vytvořit), ne operace (co provést).

Software-Defined Assets

from dagster import asset
import pandas as pd

@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM orders", conn)

@asset(group_name="staging")
def clean_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    df = raw_orders.copy()
    df['total_czk'] = df['total_eur'] * 25.2
    return df.dropna(subset=['customer_id'])

@asset(group_name="marts")
def daily_revenue(clean_orders: pd.DataFrame) -> pd.DataFrame:
    return clean_orders.groupby('order_date').agg(
        revenue=('total_czk', 'sum'), orders=('order_id', 'count')
    ).reset_index()

Asset checks

from dagster import asset_check, AssetCheckResult

@asset_check(asset=clean_orders)
def no_negative_amounts(clean_orders):
    neg = clean_orders[clean_orders['total_czk'] < 0]
    return AssetCheckResult(passed=len(neg) == 0)

Shrnutí

Dagster je ideální pro asset-oriented datové platformy. Software-defined assets, vestavěné testy a partitioning.

dagsterorchestracedata assetspipeline