Enterprise Data Hub für Leasing-Unternehmen Beispiel: Von isolierten Datensilos zur integrierten Datenplattform
Enterprise Data Hub fรผr Leasing-Unternehmen: Von isolierten Datensilos zur integrierten Datenplattform
Die Herausforderung: Datensilos in der Leasing-Branche
Leasing- und Finanzierungsunternehmen stehen vor einer besonderen Herausforderung: Ihre Geschรคftsprozesse erzeugen Daten รผber zahlreiche, oft historisch gewachsene Systeme hinweg. Ein typisches mittelstรคndisches Leasing-Unternehmen mit 500 Millionen Euro Leasingvolumen jongliert mit:
- Kernbanksystem fรผr Vertragsmanagement und Buchhaltung (oft Legacy-Systeme wie SAP Banking oder proprietรคre Software)
- CRM-System fรผr Vertrieb und Kundenbeziehungen (Salesforce, Microsoft Dynamics)
- Dokumentenmanagementsystem fรผr Vertrรคge, Policen, Korrespondenz
- Risikomanagement-Tools fรผr Bonitรคtsprรผfung und Portfolio-Analysen
- Payment-Processing-Systeme fรผr SEPA-Lastschriften und Zahlungsabwicklung
- Fleet-Management-Systeme (bei Fahrzeug-Leasing)
- Asset-Management-Systeme fรผr Anlagegรผter-Tracking
- Partner-Portale fรผr Hรคndler und Vertriebspartner
Jedes dieser Systeme produziert wertvolle Daten โ aber in isolierten Silos, unterschiedlichen Formaten, mit inkonsistenten Definitionen und ohne zentrale Governance. Die Konsequenzen sind bekannt und schmerzhaft:
Business Impact:
- Entscheidungen ohne vollstรคndiges Bild: Risk-Manager sehen Bonitรคtsdaten, aber nicht die aktuelle Zahlungshistorie aus dem Payment-System
- Reporting-Chaos: Monatliche Management-Reports erfordern manuelle Excel-Konsolidierung aus 8+ Systemen
- Missed Opportunities: Cross-Selling-Potenziale bleiben unentdeckt, weil Vertriebs- und Vertragsdaten nicht verknรผpft sind
- Compliance-Risiken: IFRS 16 Reporting, BaFin-Meldungen und DSGVO-Anfragen dauern Wochen statt Stunden
- Ineffizienz: Data Analysts verbringen 70% ihrer Zeit mit Datenaufbereitung statt mit Analysen
Technische Schulden:
- Point-to-Point-Integrationen fรผhren zu einem unmaintainbaren Integrations-Spaghetti
- Duplikate und Inkonsistenzen: Ist "Mรผller GmbH" gleich "Mueller GmbH" gleich "Mรผller Holding"?
- Keine Single Source of Truth fรผr Stammdaten
- Fehlende Historisierung: "Was war der Vertragsstand vor 6 Monaten?" ist unbeantwortbar
Die Lรถsung: Ein moderner Enterprise Data Hub, der als zentrale Datenplattform fungiert und alle relevanten Daten konsolidiert, standardisiert, anreichert und fรผr verschiedene Konsumenten (BI-Tools, Data Science, APIs) bereitstellt.
Dieser Artikel dokumentiert den Aufbau eines solchen Data Hubs fรผr ein fiktives, aber realistisches Leasing-Unternehmen "FinanceLease AG" โ mit 800 Mitarbeitern, 2 Milliarden Euro Leasingvolumen, 45.000 aktiven Vertrรคgen und Geschรคftsbereichen in Fahrzeug-Leasing, Equipment-Leasing und Mietkauf. Die beschriebene Architektur, Technologien und Patterns sind direkt auf reale Projekte รผbertragbar.
Datenquellen: Das heterogene Quellsystem-รkosystem
รbersicht der Datenquellen bei FinanceLease AG
Ein Data Hub ist nur so gut wie seine Fรคhigkeit, Daten aus verschiedensten Quellen zu integrieren. Bei FinanceLease AG haben wir folgende Quellsysteme identifiziert:
1. Core Banking System (SAP Banking / Finnova)
Typ: On-Premise SQL Server Database (Legacy) Datenvolumen: 500 GB, 200 Mio. Zeilen รผber alle Tabellen Kritische Daten:
- Vertragsdetails (Vertrags-Nr., Laufzeit, Raten, Konditionen)
- Kundenstammdaten (Firmendaten, Adressen, Kontaktpersonen)
- Buchungsdaten (Zahlungseingรคnge, Forderungen, Mahnungen)
- Asset-Informationen (Fahrzeuge, Maschinen, Equipment-Details)
- Versicherungsdaten
Technische Charakteristika:
- Keine native API, nur direkter DB-Zugriff mรถglich
- Komplexes, denormalisiertes Schema (รผber 1.200 Tabellen)
- Keine Change Data Capture (CDC) โ nur Timestamps fรผr Updates
- Geschรคftskritisch: 24/7 Betrieb, Lesezugriffe mรผssen Performance-neutral sein
Extraktionsstrategie:
-- Incremental Load Beispiel: Neue/geรคnderte Vertrรคge seit letztem Load
SELECT
VertragID,
KundenID,
Vertragsnummer,
Leasingobjekt,
Leasingrate,
Laufzeit,
Startdatum,
Enddatum,
Status,
LastModifiedDate
FROM Vertraege
WHERE LastModifiedDate > @LastExtractTimestamp
OR CreatedDate > @LastExtractTimestamp
Herausforderungen:
- Legacy-Encoding (ISO-8859-1 statt UTF-8) โ Umlaute-Probleme
- Inkonsistente Lรถschlogik: Soft-Deletes nicht einheitlich implementiert
- Historisierung fehlt: Vertrags-รnderungen รผberschreiben alte Werte
- Performance: Full-Table-Scans bei fehlenden Indizes
2. CRM-System (Salesforce)
Typ: Cloud SaaS, REST API Datenvolumen: 120.000 Leads, 35.000 Opportunities, 50.000 Accounts Kritische Daten:
- Sales Pipeline (Opportunities, Stages, Forecast)
- Lead-Daten und Konversionstracking
- Account-Hierarchien (Konzernstrukturen)
- Aktivitรคten (Calls, Meetings, E-Mails)
- Custom Objects fรผr Leasing-spezifische Workflows
Technische Charakteristika:
- Salesforce REST API v58.0
- Bulk API fรผr groรe Datenmengen (> 10.000 Records)
- API-Limits: 100.000 Calls/24h (wird bei groรen Extracts schnell kritisch)
- Webhook-Support fรผr Echtzeit-Notifikationen
Extraktionsstrategie:
# Salesforce Bulk API Extraktion
from simple_salesforce import Salesforce, SalesforceBulk
sf = Salesforce(username=SF_USER, password=SF_PASS, security_token=SF_TOKEN)
bulk = SalesforceBulk(sessionId=sf.session_id, host=sf.sf_instance)
# Bulk Query fรผr groรe Datenmengen
query = """
SELECT Id, Name, Amount, StageName, CloseDate, LeaseType__c,
ContractTerm__c, LastModifiedDate
FROM Opportunity
WHERE LastModifiedDate > LAST_N_DAYS:1
"""
job = bulk.create_query_job("Opportunity", contentType='JSON')
batch = bulk.query(job, query)
bulk.close_job(job)
# Ergebnisse abholen
results = bulk.get_all_results_for_query_batch(batch)
opportunities = [record for result in results for record in result]
Herausforderungen:
- API-Rate-Limiting erfordert intelligente Batch-Steuerung
- Custom Fields folgen Naming Convention:
FieldName__c - Lookup-Relationships mรผssen aufgelรถst werden (z.B. Account-Hierarchien)
- Zeitzone-Handling: Salesforce UTC, Core Banking CET
3. Payment Processing System (SEPA Gateway)
Typ: REST API + SFTP fรผr Batch-Files Datenvolumen: 1.5 Mio. Transaktionen/Jahr Kritische Daten:
- SEPA-Lastschriften (mandats, status, rejection reasons)
- Zahlungseingรคnge und -ausgรคnge
- Bank-Connection-Status
- Return-Debit-Informationen
Technische Charakteristika:
- Real-time API fรผr Transaktions-Status
- Tรคglich SFTP-Upload: PAIN.008 XML (SEPA Lastschrift-Einreichung)
- Tรคglich SFTP-Download: CAMT.053 XML (Kontoauszรผge)
- Webhook fรผr Payment-Status-Changes
Extraktionsstrategie:
// CAMT.053 XML Parsing (ISO 20022 Standard)
public class CAMT053Parser
{
public List<PaymentTransaction> ParseBankStatement(string xmlContent)
{
var doc = XDocument.Parse(xmlContent);
XNamespace ns = "urn:iso:std:iso:20022:tech:xsd:camt.053.001.02";
var transactions = doc.Descendants(ns + "Ntry")
.Select(entry => new PaymentTransaction
{
BookingDate = DateTime.Parse(entry.Element(ns + "BookgDt")
?.Element(ns + "Dt")?.Value),
Amount = decimal.Parse(entry.Element(ns + "Amt")?.Value),
Currency = entry.Element(ns + "Amt")?.Attribute("Ccy")?.Value,
DebtorName = entry.Descendants(ns + "Dbtr")
.FirstOrDefault()?.Element(ns + "Nm")?.Value,
DebtorIBAN = entry.Descendants(ns + "DbtrAcct")
.FirstOrDefault()?.Element(ns + "Id")
?.Element(ns + "IBAN")?.Value,
Reference = entry.Descendants(ns + "RmtInf")
.FirstOrDefault()?.Element(ns + "Ustrd")?.Value,
Status = MapBookingStatus(entry.Element(ns + "Sts")?.Value)
})
.ToList();
return transactions;
}
}
Herausforderungen:
- XML-Parsing von ISO 20022 Standards (komplex, verschachtelt)
- Reconciliation: Matching von SEPA-Lastschriften zu Vertrรคgen via Referenz
- Late Rejections: Rรผcklรคufer kรถnnen bis zu 8 Wochen nach Einzug kommen
- Duplicate Detection: Wiederholte Downloads derselben Dateien
4. Fleet Management System (Eigensystem)
Typ: PostgreSQL Database + REST API Datenvolumen: 15.000 Fahrzeuge, 2 Mio. Telematik-Events/Tag Kritische Daten:
- Fahrzeugstammdaten (Marke, Modell, VIN, Erstzulassung)
- Standortdaten (GPS-Tracking)
- Kilometer-Stรคnde
- Wartungs- und Reparaturhistorie
- Schaden-Meldungen
Technische Charakteristika:
- PostgreSQL 15 mit PostGIS Extension (Geo-Daten)
- REST API fรผr CRUD-Operationen
- Event-Stream via Apache Kafka fรผr Telematik-Daten (Echtzeit)
- S3-Bucket fรผr Fahrzeug-Bilder und Schadenfotos
Extraktionsstrategie:
# Kafka Consumer fรผr Telematik-Stream
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'vehicle-telemetry',
bootstrap_servers=['kafka-broker:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='data-hub-consumer'
)
for message in consumer:
telemetry_event = message.value
# {
# "vehicle_id": "VIN12345",
# "timestamp": "2025-01-15T10:30:00Z",
# "latitude": 52.520008,
# "longitude": 13.404954,
# "mileage": 45230,
# "fuel_level": 45.5,
# "battery_voltage": 12.6
# }
# Stream to Data Hub (z.B. Azure Event Hub oder Kafka Topic)
send_to_data_hub(telemetry_event)
Herausforderungen:
- Hohe Event-Frequenz: 2 Mio. Events/Tag = 23 Events/Sekunde durchschnittlich
- Geo-Daten erfordern spezielle Handling (nicht standard SQL)
- Image-Daten: Referenzen vs. Blob-Storage-Integration
5. Externe Datenquellen
Bonitรคtsdaten (SCHUFA API):
- REST API, Rate-Limit: 1000 Calls/Stunde
- On-Demand-Abfragen bei Neuvertrรคgen
- Historische Scores werden im Data Hub persistiert
Fahrzeugbewertungen (DAT/Schwacke API):
- REST API fรผr Restwert-Prognosen
- Batch-Updates: Monatliche Neubewertung des gesamten Portfolios
- CSV-Export-Option fรผr Bulk-Daten
Makroรถkonomische Daten (Bundesbank API):
- Zinssรคtze, Inflationsraten fรผr Risk-Modelle
- Open Data, keine Authentication
- Tรคglich aktualisierte Zeitreihen
Datenquellen-รbersicht: Zusammenfassung
| Quelle | Typ | Volumen | Update-Frequenz | Zugriffsmethode | Kritikalitรคt |
|---|---|---|---|---|---|
| Core Banking | On-Prem DB | 500 GB | Real-time | DB Query | Critical |
| CRM (Salesforce) | Cloud API | 50K Accounts | Real-time | REST API | High |
| Payment Processing | Hybrid | 1.5M tx/Jahr | Tรคglich + RT | SFTP + API | Critical |
| Fleet Management | On-Prem DB | 15K Vehicles | Real-time Stream | Kafka + API | High |
| SCHUFA | External API | On-Demand | On-Demand | REST API | High |
| DAT/Schwacke | External API | Batch | Monatlich | REST API | Medium |
| Bundesbank | Open Data | Zeitreihen | Tรคglich | REST API | Low |
Wichtige Erkenntnisse fรผr die Data-Hub-Architektur:
- Heterogenitรคt: Mix aus Legacy-Datenbanken, moderne Cloud-APIs, File-Drops und Event-Streams
- Latenz-Anforderungen: Von Echtzeit (Telematik) bis Batch (Bewertungen)
- Volumen-Varianz: Von wenigen KB (Makrodaten) bis GB-Range (Core Banking)
- Kritikalitรคt: Payment und Core Banking sind geschรคftskritisch โ Fehlertoleranz essentiell
- Governance: Externe APIs haben Rate-Limits und Kosten โ intelligente Caching-Strategie nรถtig
Zentrale Data Hub Komponenten: Die Architektur
Ein moderner Data Hub fรผr ein Leasing-Unternehmen folgt einer mehrschichtigen Architektur, die verschiedene Datenverarbeitungs-Paradigmen unterstรผtzt: Batch, Streaming, und On-Demand-Processing. Die Architektur orientiert sich an der Medallion-Architektur (Bronze-Silver-Gold), die sich in Data-Lake-Szenarien bewรคhrt hat.
High-Level Architekturรผbersicht
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DATA SOURCES โ
โ Core Banking โ Salesforce โ Payment โ Fleet Mgmt โ External APIs โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ INGESTION LAYER โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Azure Data โ โ Azure Event โ โ Azure โ โ
โ โ Factory โ โ Hub โ โ Functions โ โ
โ โ (Batch/CDC) โ โ (Streaming) โ โ (API Pulls) โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ BRONZE LAYER (RAW DATA) โ
โ Azure Data Lake Gen2 / Delta Lake โ
โ โข Raw data, as-is from sources โ
โ โข Partitioned by source_system / ingestion_date โ
โ โข Immutable, append-only โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PROCESSING LAYER (ETL/ELT) โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Azure โ โ Databricks โ โ Azure โ โ
โ โ Synapse โ โ Spark โ โ Stream โ โ
โ โ Pipelines โ โ (Complex) โ โ Analytics โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SILVER LAYER (CLEANED & VALIDATED) โ
โ Azure Data Lake Gen2 / Delta Lake โ
โ โข Cleaned, validated, standardized โ
โ โข Schema enforcement โ
โ โข De-duplication, data quality checks โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ GOLD LAYER (BUSINESS-LEVEL AGGREGATES) โ
โ Azure Synapse Dedicated SQL Pool / Delta Lake โ
โ โข Business entities (Customer 360ยฐ, Contract View) โ
โ โข Pre-aggregated KPIs โ
โ โข Dimensional models (Star Schema) โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CONSUMPTION LAYER โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Power BIโ โ Azure MLโ โ REST APIโ โ Externalโ โ
โ โ Reports โ โ Models โ โ (Apps) โ โ Tools โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Cross-Cutting Concerns:
โข Azure Purview (Data Governance, Catalog)
โข Azure Key Vault (Secrets Management)
โข Azure Monitor + App Insights (Monitoring)
โข Azure Active Directory (Authentication/Authorization)
Komponente 1: Ingestion Layer
Technologie-Stack:
- Azure Data Factory fรผr Batch-Ingestion (Datenbanken, APIs mit Bulk-Export)
- Azure Event Hub fรผr Streaming-Ingestion (Kafka-kompatibel, Telematik-Daten)
- Azure Functions fรผr On-Demand API-Calls (z.B. SCHUFA bei Vertragsabschluss)
Azure Data Factory Pipeline Beispiel:
{
"name": "IngestCoreBankingContracts",
"properties": {
"activities": [
{
"name": "CopyContractsIncremental",
"type": "Copy",
"inputs": [
{
"referenceName": "SourceSQLServer",
"type": "DatasetReference",
"parameters": {
"tableName": "Vertraege",
"watermarkColumn": "LastModifiedDate",
"watermarkValue": "@pipeline().parameters.LastWatermark"
}
}
],
"outputs": [
{
"referenceName": "BronzeDataLake",
"type": "DatasetReference",
"parameters": {
"folderPath": "bronze/core_banking/contracts/@{formatDateTime(utcnow(),'yyyy/MM/dd')}",
"fileName": "contracts_@{formatDateTime(utcnow(),'yyyyMMdd_HHmmss')}.parquet"
}
}
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "SELECT * FROM Vertraege WHERE LastModifiedDate > '@{pipeline().parameters.LastWatermark}'",
"type": "Expression"
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "ParquetWriteSettings",
"compressionCodec": "snappy"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": { "name": "VertragID" },
"sink": { "name": "contract_id", "type": "Int64" }
},
{
"source": { "name": "Vertragsnummer" },
"sink": { "name": "contract_number", "type": "String" }
}
]
}
}
},
{
"name": "UpdateWatermark",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "CopyContractsIncremental",
"dependencyConditions": [ "Succeeded" ]
}
],
"typeProperties": {
"storedProcedureName": "usp_UpdateWatermark",
"storedProcedureParameters": {
"TableName": { "value": "Vertraege" },
"WatermarkValue": { "value": "@{utcnow()}" }
}
}
}
],
"parameters": {
"LastWatermark": {
"type": "String",
"defaultValue": "1900-01-01T00:00:00Z"
}
}
}
}
Design Principles:
- Idempotenz: Wiederholte Ausfรผhrung derselben Pipeline mit denselben Parametern produziert identische Ergebnisse
- Watermarking: Incremental Load via Timestamps vermeidet Full-Table-Scans
- Partitionierung: Daten nach Datum partitioniert fรผr effiziente Queries und Retention-Management
- Fehlertoleranz: Bei Fehler wird Wassermarke nicht aktualisiert โ nรคchster Lauf holt fehlende Daten nach
Komponente 2: Bronze Layer (Raw Data)
Technologie: Azure Data Lake Storage Gen2 mit Delta Lake Format
Charakteristika:
- Immutable: Daten werden nie verรคndert, nur hinzugefรผgt (Append-Only)
- Schema-on-Read: Flexibles Schema, keine Enforcement
- Vollstรคndige Historie: Alle ingested Daten bleiben erhalten (subject to retention policy)
- Partitionierung: Nach
source_system,ingestion_date
Beispiel-Struktur:
bronze/
โโโ core_banking/
โ โโโ contracts/
โ โ โโโ year=2025/
โ โ โ โโโ month=01/
โ โ โ โ โโโ day=15/
โ โ โ โ โ โโโ contracts_20250115_080000.parquet
โ โ โ โ โ โโโ contracts_20250115_200000.parquet
โ โโโ customers/
โ โโโ payments/
โโโ salesforce/
โ โโโ opportunities/
โ โโโ accounts/
โ โโโ leads/
โโโ fleet_management/
โ โโโ vehicles/
โ โโโ telemetry/ (partitioned by hour for high-frequency data)
Delta Lake Beispiel:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("BronzeIngestion") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read from source (z.B. Kafka)
raw_df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("subscribe", "vehicle-telemetry") \
.load()
# Schema-Definition (optional, fรผr Validierung)
from pyspark.sql.types import *
schema = StructType([
StructField("vehicle_id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True),
StructField("mileage", IntegerType(), True),
StructField("fuel_level", DoubleType(), True)
])
# Parse JSON
from pyspark.sql.functions import from_json, col, current_timestamp
telemetry_df = raw_df.select(
from_json(col("value").cast("string"), schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp") \
.withColumn("ingestion_timestamp", current_timestamp())
# Write to Bronze (Delta Lake)
telemetry_df.write \
.format("delta") \
.mode("append") \
.partitionBy("year", "month", "day", "hour") \
.save("abfss://bronze@datalake.dfs.core.windows.net/fleet_management/telemetry")
Komponente 3: Silver Layer (Cleaned & Validated)
Technologie: Delta Lake + Azure Databricks fรผr komplexe Transformationen
Transformationen:
Data Quality Checks
- Null-Checks fรผr Pflichtfelder
- Range-Validierung (z.B. Leasingrate > 0)
- Referential Integrity (z.B. existiert KundenID in Kundenstamm?)
Standardisierung
- Einheitliche Datumsformate (ISO 8601)
- Wรคhrungskonvertierung (alle Betrรคge in EUR)
- Adress-Normalisierung
De-Duplication
- Window-Functions fรผr Duplikat-Erkennung
- Merge-Logik fรผr "Same Contract, Multiple Sources"
Enrichment
- Lookup zu Referenzdaten
- Berechnete Felder (z.B. "Days Until Contract End")
Beispiel: Contract Silver Layer Processing
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable
# Read Bronze
bronze_contracts = spark.read \
.format("delta") \
.load("abfss://bronze@datalake.dfs.core.windows.net/core_banking/contracts")
# Data Quality: Filter out invalid records
valid_contracts = bronze_contracts.filter(
(F.col("contract_number").isNotNull()) &
(F.col("contract_number") != "") &
(F.col("lease_rate") > 0) &
(F.col("start_date") <= F.col("end_date"))
)
# De-Duplication: Nehme neuesten Record pro contract_number
window_spec = Window.partitionBy("contract_number").orderBy(F.desc("LastModifiedDate"))
deduped_contracts = valid_contracts.withColumn("row_num", F.row_number().over(window_spec)) \
.filter(F.col("row_num") == 1) \
.drop("row_num")
# Standardization
standardized_contracts = deduped_contracts \
.withColumn("start_date", F.to_date("start_date", "yyyy-MM-dd")) \
.withColumn("end_date", F.to_date("end_date", "yyyy-MM-dd")) \
.withColumn("lease_rate", F.round("lease_rate", 2)) \
.withColumn("currency", F.lit("EUR"))
# Enrichment: Calculate derived fields
enriched_contracts = standardized_contracts \
.withColumn("contract_duration_months",
F.months_between("end_date", "start_date")) \
.withColumn("days_until_end",
F.datediff("end_date", F.current_date())) \
.withColumn("contract_status",
F.when(F.col("days_until_end") < 0, "EXPIRED")
.when(F.col("days_until_end") <= 90, "ENDING_SOON")
.otherwise("ACTIVE"))
# Write to Silver (with MERGE for upserts)
silver_path = "abfss://silver@datalake.dfs.core.windows.net/contracts"
if DeltaTable.isDeltaTable(spark, silver_path):
silver_table = DeltaTable.forPath(spark, silver_path)
silver_table.alias("target").merge(
enriched_contracts.alias("source"),
"target.contract_id = source.contract_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
enriched_contracts.write \
.format("delta") \
.mode("overwrite") \
.save(silver_path)
# Data Quality Metrics
dq_metrics = {
"total_bronze_records": bronze_contracts.count(),
"valid_records": valid_contracts.count(),
"invalid_records": bronze_contracts.count() - valid_contracts.count(),
"duplicates_removed": valid_contracts.count() - deduped_contracts.count(),
"final_silver_records": enriched_contracts.count()
}
# Log metrics
print(f"Data Quality Metrics: {dq_metrics}")
Komponente 4: Gold Layer (Business-Level)
Technologie: Azure Synapse Dedicated SQL Pool + Delta Lake
Charakteristika:
- Business Entities: Nicht technische Tabellen, sondern Business-Konzepte
- Dimensional Modeling: Star Schema fรผr optimale BI-Performance
- Pre-Aggregated KPIs: Fรผr schnelle Dashboard-Performance
- Historisierung: Slowly Changing Dimensions (SCD Type 2)
Beispiel: Customer 360ยฐ View
-- Gold Layer: Customer 360ยฐ Dimensional Model
-- Dimension: Customer
CREATE TABLE gold.DimCustomer (
CustomerKey INT IDENTITY(1,1) PRIMARY KEY,
CustomerID VARCHAR(50) NOT NULL,
CustomerNumber VARCHAR(50),
CompanyName NVARCHAR(200),
Industry NVARCHAR(100),
EmployeeCount INT,
AnnualRevenue DECIMAL(18,2),
CreditRating VARCHAR(10),
-- SCD Type 2 fields
EffectiveFrom DATE NOT NULL,
EffectiveTo DATE,
IsCurrent BIT NOT NULL DEFAULT 1,
-- Audit
CreatedDate DATETIME2 DEFAULT GETUTCDATE(),
ModifiedDate DATETIME2 DEFAULT GETUTCDATE()
);
-- Dimension: Contract
CREATE TABLE gold.DimContract (
ContractKey INT IDENTITY(1,1) PRIMARY KEY,
ContractID VARCHAR(50) NOT NULL,
ContractNumber VARCHAR(50) NOT NULL,
ContractType NVARCHAR(50), -- 'Leasing', 'Mietkauf'
AssetType NVARCHAR(50), -- 'Vehicle', 'Equipment'
AssetDescription NVARCHAR(500),
LeaseRate DECIMAL(18,2),
ContractDurationMonths INT,
StartDate DATE,
EndDate DATE,
Status NVARCHAR(50), -- 'ACTIVE', 'ENDING_SOON', 'EXPIRED'
-- Audit
CreatedDate DATETIME2 DEFAULT GETUTCDATE(),
ModifiedDate DATETIME2 DEFAULT GETUTCDATE()
);
-- Fact: Contract Transactions
CREATE TABLE gold.FactContractTransaction (
TransactionKey BIGINT IDENTITY(1,1) PRIMARY KEY,
DateKey INT NOT NULL, -- FK to DimDate
CustomerKey INT NOT NULL, -- FK to DimCustomer
ContractKey INT NOT NULL, -- FK to DimContract
TransactionType NVARCHAR(50), -- 'PAYMENT', 'INVOICE', 'ADJUSTMENT'
Amount DECIMAL(18,2),
Currency CHAR(3),
PaymentMethod NVARCHAR(50),
PaymentStatus NVARCHAR(50),
-- Metrics
DaysOverdue INT,
LateFeeAmount DECIMAL(18,2),
TransactionDate DATE,
CreatedDate DATETIME2 DEFAULT GETUTCDATE()
);
-- Aggregated KPI Table
CREATE TABLE gold.ContractKPIs (
SnapshotDate DATE NOT NULL,
ContractType NVARCHAR(50),
TotalActiveContracts INT,
TotalContractValue DECIMAL(18,2),
AverageLeaseRate DECIMAL(18,2),
PaymentsOnTime INT,
PaymentsOverdue INT,
PaymentSuccessRate DECIMAL(5,2),
AverageContractDuration INT,
ContractsEndingNext90Days INT,
PRIMARY KEY (SnapshotDate, ContractType)
);
Materialized View fรผr Customer 360ยฐ:
CREATE VIEW gold.vw_Customer360 AS
SELECT
c.CustomerKey,
c.CompanyName,
c.Industry,
c.CreditRating,
-- Contract Metrics
COUNT(DISTINCT ct.ContractKey) AS TotalContracts,
SUM(CASE WHEN ct.Status = 'ACTIVE' THEN 1 ELSE 0 END) AS ActiveContracts,
SUM(ct.LeaseRate) AS TotalMonthlyLeaseAmount,
-- Payment Metrics
SUM(CASE WHEN ft.PaymentStatus = 'PAID_ONTIME' THEN 1 ELSE 0 END) AS PaymentsOnTime,
SUM(CASE WHEN ft.PaymentStatus = 'OVERDUE' THEN 1 ELSE 0 END) AS PaymentsOverdue,
SUM(ft.LateFeeAmount) AS TotalLateFees,
-- Salesforce Metrics
COUNT(DISTINCT opp.OpportunityID) AS TotalOpportunities,
SUM(opp.Amount) AS PipelineValue,
-- Risk Indicators
MAX(c.CreditRating) AS CurrentCreditRating,
AVG(CAST(ft.DaysOverdue AS FLOAT)) AS AvgDaysOverdue,
-- Last Activity
MAX(ft.TransactionDate) AS LastPaymentDate,
MAX(opp.LastActivityDate) AS LastSalesActivity
FROM gold.DimCustomer c
LEFT JOIN gold.DimContract ct ON c.CustomerID = ct.CustomerID
LEFT JOIN gold.FactContractTransaction ft ON ct.ContractKey = ft.ContractKey
LEFT JOIN gold.DimOpportunity opp ON c.CustomerID = opp.AccountID
WHERE c.IsCurrent = 1 -- Only current customer version (SCD Type 2)
GROUP BY
c.CustomerKey,
c.CompanyName,
c.Industry,
c.CreditRating;
Komponente 5: Metadata & Governance (Azure Purview)
Azure Purview fungiert als zentraler Data Catalog und Governance-Layer:
- Data Discovery: Durchsuchbarer Katalog aller Datasets im Data Hub
- Lineage Tracking: "Wo kommt dieser Wert her?" โ vom Quellsystem bis zum Report
- Data Classification: Automatische Erkennung von PII, Finanzdaten, etc.
- Business Glossary: Zentrale Definitionen (z.B. "Was ist 'Leasingrate'?")
Beispiel: Lineage fรผr "Total Contract Value" in Power BI Report
Power BI Report: "Management Dashboard"
โ Measures: [Total Contract Value]
โ Source: Gold.ContractKPIs.TotalContractValue
โ Calculated in: gold_kpi_aggregation_pipeline
โ Source: Silver.Contracts.LeaseRate
โ Transformation: silver_contract_enrichment
โ Source: Bronze.CoreBanking.Contracts.Leasingrate
โ Ingestion: adf_core_banking_contracts_pipeline
โ Original Source: CoreBankingDB.dbo.Vertraege.Leasingrate
Diese Transparenz ist kritisch fรผr:
- Compliance: DSGVO-Anfragen (Wo sind Daten der Person X?)
- Impact Analysis: "Wenn ich Tabelle Y รคndere, welche Reports brechen?"
- Trust: Business-User sehen, wo Zahlen herkommen
Modern Reporting mit Power BI: Von Daten zu Insights
Power BI ist die primรคre Konsumenten-Schicht fรผr Business-User bei FinanceLease AG. Die Integration mit dem Azure-basierten Data Hub ermรถglicht performantes, self-service-fรคhiges Reporting mit Echtzeit-Updates.
Architektur: Power BI โ Data Hub Integration
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ POWER BI SERVICE โ
โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Management โ โ Risk & โ โ Operations โ โ
โ โ Dashboard โ โ Compliance โ โ Dashboard โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Sales โ โ Finance โ โ Fleet โ โ
โ โ Analytics โ โ KPIs โ โ Management โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DATA CONNECTION OPTIONS โ
โ โ
โ Option 1: DirectQuery (Echtzeit, limitierte Transformationen) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Azure Synapse SQL Pool (Gold Layer) โ โ
โ โ โข Sub-second query performance โ โ
โ โ โข Always up-to-date โ โ
โ โ โข Row-Level Security enforced in DB โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Option 2: Import (Fast, full DAX, scheduled refresh) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Azure Synapse / Pre-aggregated Tables โ โ
โ โ โข In-memory compression โ โ
โ โ โข Complex DAX calculations โ โ
โ โ โข Scheduled refresh (hourly/daily) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Option 3: Composite (Best of both) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Import for Dimensions, DirectQuery for Facts โ โ
โ โ โข Performance + Freshness โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Beispiel 1: Management Dashboard
Use Case: C-Level Executives benรถtigen tรคglich aktuelle KPIs zu Portfolio-Performance, Risk-Metriken, und Sales-Pipeline.
Power BI Dataset Configuration:
// Measures.dax - Zentrale Business-Metriken
// ==================== Portfolio Metrics ====================
[Total Active Contracts] :=
CALCULATE(
COUNTROWS(DimContract),
DimContract[Status] = "ACTIVE"
)
[Total Contract Value] :=
CALCULATE(
SUM(DimContract[LeaseRate]) * [Average Contract Duration],
DimContract[Status] IN {"ACTIVE", "ENDING_SOON"}
)
[Portfolio Growth YoY] :=
VAR CurrentYear = [Total Contract Value]
VAR PreviousYear =
CALCULATE(
[Total Contract Value],
SAMEPERIODLASTYEAR(DimDate[Date])
)
RETURN
DIVIDE(CurrentYear - PreviousYear, PreviousYear, 0)
// ==================== Payment Performance ====================
[Payment Success Rate] :=
VAR OnTimePayments =
CALCULATE(
COUNTROWS(FactContractTransaction),
FactContractTransaction[PaymentStatus] = "PAID_ONTIME"
)
VAR TotalPayments = COUNTROWS(FactContractTransaction)
RETURN
DIVIDE(OnTimePayments, TotalPayments, 0)
[Overdue Amount] :=
CALCULATE(
SUM(FactContractTransaction[Amount]),
FactContractTransaction[PaymentStatus] = "OVERDUE"
)
[Days Sales Outstanding (DSO)] :=
VAR AvgDailyRevenue =
CALCULATE(
[Total Contract Value] / 365,
DimContract[Status] = "ACTIVE"
)
VAR OutstandingReceivables =
CALCULATE(
SUM(FactContractTransaction[Amount]),
FactContractTransaction[PaymentStatus] IN {"PENDING", "OVERDUE"}
)
RETURN
DIVIDE(OutstandingReceivables, AvgDailyRevenue, 0)
// ==================== Risk Indicators ====================
[High-Risk Contracts %] :=
VAR HighRiskContracts =
CALCULATE(
COUNTROWS(DimContract),
DimContract[Status] = "ACTIVE",
RELATED(DimCustomer[CreditRating]) IN {"D", "E"}
)
VAR TotalActiveContracts = [Total Active Contracts]
RETURN
DIVIDE(HighRiskContracts, TotalActiveContracts, 0)
[Expected Loss] :=
// Simplified Expected Loss Calculation
VAR ExposureAtDefault =
SUMX(
FILTER(DimContract, DimContract[Status] = "ACTIVE"),
DimContract[LeaseRate] * DimContract[ContractDurationMonths]
)
VAR ProbabilityOfDefault = [High-Risk Contracts %] * 0.15 // Simplified
VAR LossGivenDefault = 0.45 // Industry standard
RETURN
ExposureAtDefault * ProbabilityOfDefault * LossGivenDefault
// ==================== Sales Pipeline ====================
[Total Pipeline Value] :=
CALCULATE(
SUM(DimOpportunity[Amount]),
DimOpportunity[Stage] <> "Closed Won",
DimOpportunity[Stage] <> "Closed Lost"
)
[Win Rate] :=
VAR WonOpportunities =
CALCULATE(
COUNTROWS(DimOpportunity),
DimOpportunity[Stage] = "Closed Won"
)
VAR TotalClosedOpportunities =
CALCULATE(
COUNTROWS(DimOpportunity),
DimOpportunity[Stage] IN {"Closed Won", "Closed Lost"}
)
RETURN
DIVIDE(WonOpportunities, TotalClosedOpportunities, 0)
[Average Deal Size] :=
CALCULATE(
AVERAGE(DimOpportunity[Amount]),
DimOpportunity[Stage] = "Closed Won"
)
// ==================== Trend Indicators ====================
[Contract Value Trend] :=
VAR CurrentValue = [Total Contract Value]
VAR PreviousMonthValue =
CALCULATE(
[Total Contract Value],
DATEADD(DimDate[Date], -1, MONTH)
)
RETURN
IF(
CurrentValue > PreviousMonthValue, "โฒ",
IF(CurrentValue < PreviousMonthValue, "โผ", "โ")
)
Dashboard Layout:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FINANCELEASE AG - MANAGEMENT DASHBOARD ๐ Last: 08:30 UTC โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ ๐ KEY METRICS โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ โ Total โ โ Contract โ โ Payment โ โ Portfolio โโโ
โ โ Contracts โ โ Value โ โ Success โ โ Growth โโโ
โ โ โ โ โ โ Rate โ โ YoY โโโ
โ โ 45,234 โฒ โ โ โฌ2.1B โฒ โ โ 96.5% โผ โ โ +12.3% โฒโโ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ โ
โ ๐ PORTFOLIO COMPOSITION โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ [Stacked Bar Chart: Contract Value by Type & Month] โ โ
โ โ โโโโโโโ Vehicle Leasing (60%) โ โ
โ โ โโโโ Equipment Leasing (25%) โ โ
โ โ โโ Mietkauf (15%) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ ๐ฐ PAYMENT PERFORMANCE โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ [Line Chart: Payment Success Rate Trend] โ โ
โ โ Last 12 Months: Min 94.2%, Max 97.8%, Avg 96.1% โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ โ ๏ธ RISK INDICATORS โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ High-Risk โ โ Expected โ โ DSO โ โ
โ โ Contracts โ โ Loss โ โ Days โ โ
โ โ 3.2% โฒ โ โ โฌ12.5M โฒ โ โ 38 โ โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ
โ ๐ฏ SALES PIPELINE โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ [Funnel Chart: Opportunities by Stage] โ โ
โ โ Pipeline Value: โฌ450M | Win Rate: 32% | Avg Deal: โฌ850K โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Beispiel 2: Risk & Compliance Dashboard
Use Case: Risk-Manager benรถtigen granulare Einblicke in Portfolio-Risiken, Bonitรคtsentwicklung, und Compliance-Metriken.
Row-Level Security (RLS) Implementation:
// RLS Role: "Risk Manager - Region DACH"
// Nur Vertrรคge aus Deutschland, รsterreich, Schweiz sichtbar
[RLS_Region_DACH] =
DimCustomer[Country] IN {"DE", "AT", "CH"}
DAX Measures:
// Vintage Analysis: Cohort-basierte Portfolio-Quality
[NPL Ratio by Vintage] :=
VAR VintageYear = SELECTEDVALUE(DimDate[Year])
RETURN
CALCULATE(
DIVIDE(
COUNTROWS(
FILTER(
DimContract,
DimContract[Status] = "DEFAULTED" &&
YEAR(DimContract[StartDate]) = VintageYear
)
),
COUNTROWS(
FILTER(
DimContract,
YEAR(DimContract[StartDate]) = VintageYear
)
),
0
)
)
// Credit Migration Matrix
[Credit Rating Downgrade %] :=
VAR CustomersWithDowngrade =
CALCULATE(
DISTINCTCOUNT(DimCustomer[CustomerKey]),
DimCustomer[CreditRatingChange] < 0,
DimDate[Date] >= TODAY() - 365
)
VAR TotalCustomers = DISTINCTCOUNT(DimCustomer[CustomerKey])
RETURN
DIVIDE(CustomersWithDowngrade, TotalCustomers, 0)
// IFRS 16 Lease Liability
[Total Lease Liability] :=
SUMX(
FILTER(DimContract, DimContract[Status] = "ACTIVE"),
VAR RemainingMonths = DimContract[ContractDurationMonths] -
DATEDIFF(DimContract[StartDate], TODAY(), MONTH)
VAR MonthlyRate = DimContract[LeaseRate]
VAR DiscountRate = 0.05 / 12 // 5% annual discount rate
RETURN
MonthlyRate * ((1 - POWER(1 + DiscountRate, -RemainingMonths)) / DiscountRate)
)
Beispiel 3: Self-Service Analytics fรผr Sales
Use Case: Sales-Manager mรถchten ad-hoc Analysen zu ihren Opportunities erstellen, ohne IT-Abhรคngigkeiten.
Power BI Dataflow fรผr Sales-Daten:
{
"name": "Sales_Analytics_Dataflow",
"entities": [
{
"name": "OpportunitiesEnriched",
"query": "
let
Source = AzureSynapse.Database('synapse-workspace', 'gold'),
OpportunitiesTable = Source{[Schema='gold', Item='DimOpportunity']}[Data],
AccountsTable = Source{[Schema='gold', Item='DimCustomer']}[Data],
// Merge Opportunities with Customer Data
MergedData = Table.NestedJoin(
OpportunitiesTable, {'AccountID'},
AccountsTable, {'CustomerID'},
'CustomerData', JoinKind.LeftOuter
),
// Expand Customer Fields
ExpandedData = Table.ExpandTableColumn(
MergedData, 'CustomerData',
{'CompanyName', 'Industry', 'EmployeeCount', 'CreditRating'},
{'Account_Name', 'Account_Industry', 'Account_Size', 'Credit_Rating'}
),
// Add Custom Columns
AddedColumns = Table.AddColumn(
ExpandedData, 'Deal_Size_Category',
each if [Amount] >= 1000000 then 'Large Deal (>1M)'
else if [Amount] >= 500000 then 'Medium Deal (500K-1M)'
else 'Small Deal (<500K)',
type text
),
// Calculate Days in Stage
AddedDaysInStage = Table.AddColumn(
AddedColumns, 'Days_In_Current_Stage',
each Duration.Days(DateTime.LocalNow() - [LastStageChangeDate]),
type number
),
// Add Risk Flag
AddedRiskFlag = Table.AddColumn(
AddedDaysInStage, 'At_Risk_Flag',
each if [Days_In_Current_Stage] > 90 and [Stage] <> 'Closed Won' and [Stage] <> 'Closed Lost'
then true else false,
type logical
)
in
AddedRiskFlag
"
}
]
}
Performance-Optimierung: Aggregation Tables
Fรผr groรe Fact-Tabellen (FactContractTransaction hat 50 Mio. Zeilen) nutzen wir Automatic Aggregations:
-- Pre-aggregated table in Synapse
CREATE TABLE gold.FactContractTransaction_Agg_Daily
WITH (
DISTRIBUTION = HASH(DateKey),
CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT
DateKey,
CustomerKey,
ContractKey,
TransactionType,
PaymentStatus,
-- Aggregated Metrics
COUNT(*) AS TransactionCount,
SUM(Amount) AS TotalAmount,
AVG(Amount) AS AvgAmount,
SUM(CASE WHEN DaysOverdue > 0 THEN 1 ELSE 0 END) AS OverdueCount,
SUM(LateFeeAmount) AS TotalLateFees
FROM gold.FactContractTransaction
GROUP BY
DateKey,
CustomerKey,
ContractKey,
TransactionType,
PaymentStatus;
Power BI erkennt automatisch diese Aggregation und nutzt sie fรผr Queries auf Tag-Ebene (statt 50 Mio. Zeilen nur 500K).
Power BI Best Practices fรผr Enterprise Data Hub
DirectQuery fรผr Echtzeit, Import fรผr Performance
- Management-Dashboards: DirectQuery (immer aktuell)
- Historische Analysen: Import (schneller)
Incremental Refresh fรผr groรe Datasets
// Nur letzte 2 Jahre im Speicher, Rest in DirectQuery Table.SelectRows( Source, each [TransactionDate] >= Date.AddYears(DateTime.LocalNow(), -2) )Row-Level Security konsistent รผber alle Layers
- In Azure Synapse: SQL-basierte RLS
- In Power BI: DAX-basierte RLS
- Beide synchronisiert via Azure AD Groups
Zentralisierte Semantic Models
- Ein Dataset pro Business-Domain (Contracts, Sales, Risk)
- Mehrere Reports nutzen dasselbe Dataset
- "Single Version of Truth"
Monitoring & Alerting
- Power BI Premium: Query-Performance-Metrics
- Alert bei Report-Refresh-Failures
- Usage-Analytics: Welche Reports werden genutzt?
Data Science Lรถsungen auf dem Data Hub
Der Data Hub ist nicht nur fรผr Reporting-Zwecke konzipiert, sondern auch als Fundament fรผr Advanced Analytics und Machine Learning. Bei FinanceLease AG nutzen wir Azure Machine Learning und Databricks fรผr verschiedene Data-Science-Use-Cases.
Use Case 1: Churn Prediction fรผr Vertragsverlรคngerungen
Business-Problem: 30% der Leasingvertrรคge werden nach Ablauf nicht verlรคngert. Kรถnnen wir Kunden mit hohem Churn-Risk frรผhzeitig identifizieren und proaktiv ansprechen?
Data Science Ansatz:
# Azure ML Pipeline fรผr Churn Prediction Model
from azureml.core import Workspace, Dataset, Experiment
from azureml.train.automl import AutoMLConfig
import pandas as pd
# Connect to Azure ML Workspace
ws = Workspace.from_config()
# Feature Engineering: Daten aus Gold Layer
query = """
SELECT
c.ContractKey,
c.ContractNumber,
c.ContractType,
c.LeaseRate,
c.ContractDurationMonths,
DATEDIFF(day, GETDATE(), c.EndDate) AS DaysUntilEnd,
-- Customer Features
cust.Industry,
cust.EmployeeCount,
cust.CreditRating,
-- Payment Behavior Features
SUM(CASE WHEN ft.PaymentStatus = 'PAID_ONTIME' THEN 1 ELSE 0 END) AS OnTimePaymentCount,
SUM(CASE WHEN ft.PaymentStatus = 'OVERDUE' THEN 1 ELSE 0 END) AS OverduePaymentCount,
AVG(ft.DaysOverdue) AS AvgDaysOverdue,
SUM(ft.LateFeeAmount) AS TotalLateFees,
-- Engagement Features
COUNT(DISTINCT sr.RequestID) AS ServiceRequestCount,
AVG(sr.ResolutionTimeHours) AS AvgResolutionTime,
-- Sales Activity Features
COUNT(DISTINCT opp.OpportunityID) AS NewOpportunitiesCount,
MAX(opp.LastActivityDate) AS LastSalesContact,
-- Target Variable
CASE
WHEN renewal.ContractID IS NOT NULL THEN 1 -- Renewed
ELSE 0 -- Churned
END AS Renewed
FROM gold.DimContract c
LEFT JOIN gold.DimCustomer cust ON c.CustomerID = cust.CustomerID
LEFT JOIN gold.FactContractTransaction ft ON c.ContractKey = ft.ContractKey
LEFT JOIN gold.FactServiceRequest sr ON c.ContractKey = sr.ContractKey
LEFT JOIN gold.DimOpportunity opp ON c.CustomerID = opp.AccountID
LEFT JOIN gold.DimContract renewal ON c.ContractNumber = renewal.PreviousContractNumber
WHERE
c.EndDate < GETDATE() -- Only expired contracts
AND c.EndDate >= DATEADD(year, -2, GETDATE()) -- Last 2 years
GROUP BY
c.ContractKey, c.ContractNumber, c.ContractType, c.LeaseRate,
c.ContractDurationMonths, c.EndDate, cust.Industry, cust.EmployeeCount,
cust.CreditRating, renewal.ContractID
"""
# Load data from Synapse
dataset = Dataset.Tabular.from_sql_query(
query,
query_timeout=600,
validate=True,
workspace=ws
)
df = dataset.to_pandas_dataframe()
# Train/Test Split
from sklearn.model_selection import train_test_split
X = df.drop(['ContractKey', 'ContractNumber', 'Renewed'], axis=1)
y = df['Renewed']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# AutoML Configuration
automl_config = AutoMLConfig(
task='classification',
primary_metric='AUC_weighted',
training_data=X_train,
label_column_name='Renewed',
n_cross_validations=5,
enable_early_stopping=True,
experiment_timeout_hours=2,
max_concurrent_iterations=4,
featurization='auto'
)
# Run Experiment
experiment = Experiment(ws, 'churn-prediction')
run = experiment.submit(automl_config, show_output=True)
# Get Best Model
best_run, fitted_model = run.get_output()
print(f"Best Model: {best_run.properties['model_name']}")
print(f"AUC: {best_run.get_metrics()['AUC_weighted']:.4f}")
# Register Model
from azureml.core.model import Model
model = Model.register(
workspace=ws,
model_path='outputs/model.pkl',
model_name='churn_prediction_model',
tags={'type': 'classification', 'framework': 'scikit-learn'},
description='Predicts contract renewal likelihood'
)
Feature Importance:
Top Features fรผr Churn Prediction:
1. AvgDaysOverdue (0.28) - Zahlungsverzug ist stรคrkster Indikator
2. ServiceRequestCount (0.18) - Viele Support-Anfragen = Unzufriedenheit
3. LastSalesContact (0.15) - Kรผrzlicher Kontakt erhรถht Retention
4. TotalLateFees (0.12) - Finanzielle Friction
5. EmployeeCount (0.09) - Unternehmensgrรถรe korreliert mit Renewal-Rate
Deployment & Scoring:
# Real-time Scoring Endpoint
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import InferenceConfig
inference_config = InferenceConfig(
entry_script='score.py',
environment=env
)
aci_config = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=2,
auth_enabled=True
)
service = Model.deploy(
workspace=ws,
name='churn-prediction-service',
models=[model],
inference_config=inference_config,
deployment_config=aci_config
)
service.wait_for_deployment(show_output=True)
print(f"Scoring URI: {service.scoring_uri}")
# Batch Scoring Pipeline (tรคglich fรผr alle auslaufenden Vertrรคge)
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
# Score contracts ending in next 90 days
batch_score_step = PythonScriptStep(
name='batch_score_churn',
script_name='batch_score.py',
arguments=[
'--model-name', 'churn_prediction_model',
'--output-table', 'gold.ChurnPredictions'
],
compute_target='ml-compute-cluster',
allow_reuse=False
)
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = pipeline.submit(experiment_name='churn-batch-scoring')
Business Impact:
- Frรผhzeitiges Eingreifen bei High-Risk-Kunden (Churn-Score > 0.7)
- Sales erhรคlt tรคglich priorisierte Liste fรผr proaktive Kontakte
- Retention-Rate stieg von 70% auf 82% (12 Prozentpunkte)
- ROI: โฌ4.5M zusรคtzliche Vertragsvolumen/Jahr
Use Case 2: Dynamische Restwert-Prognosen fรผr Leasing-Assets
Business-Problem: Restwert-Prognosen von externen Anbietern (DAT/Schwacke) sind generisch. Kรถnnen wir bessere Prognosen durch eigene Daten trainieren?
Ansatz: Gradient Boosting Regression
# Feature Engineering fรผr Fahrzeug-Restwert-Prognose
import databricks.koalas as ks
from pyspark.sql import functions as F
# Read from Silver Layer
vehicles_df = spark.read.format("delta").load("abfss://silver@datalake/fleet_management/vehicles")
telemetry_df = spark.read.format("delta").load("abfss://silver@datalake/fleet_management/telemetry")
# Aggregate Telemetry Features
telemetry_agg = telemetry_df.groupBy("vehicle_id").agg(
F.max("mileage").alias("current_mileage"),
F.avg("mileage").alias("avg_daily_mileage"),
F.count("*").alias("telemetry_events_count"),
F.stddev("fuel_level").alias("fuel_consumption_variance")
)
# Join and create features
features_df = vehicles_df.join(telemetry_agg, "vehicle_id", "left") \
.withColumn("vehicle_age_months", F.months_between(F.current_date(), "first_registration_date")) \
.withColumn("mileage_per_month", F.col("current_mileage") / F.col("vehicle_age_months")) \
.withColumn("brand_segment", F.when(F.col("brand").isin(["Mercedes", "BMW", "Audi"]), "Premium")
.otherwise("Standard"))
# Target: Actual Sale Price from historical data
target_df = spark.read.format("delta").load("abfss://silver@datalake/asset_sales")
# Train XGBoost Model
from xgboost import XGBRegressor
from sklearn.metrics import mean_absolute_error, r2_score
model = XGBRegressor(
n_estimators=500,
learning_rate=0.05,
max_depth=7,
subsample=0.8,
colsample_bytree=0.8,
random_state=42
)
model.fit(X_train, y_train)
# Predictions
y_pred = model.predict(X_test)
print(f"MAE: โฌ{mean_absolute_error(y_test, y_pred):.2f}")
print(f"Rยฒ: {r2_score(y_test, y_pred):.4f}")
# Results:
# MAE: โฌ850 (DAT/Schwacke: โฌ1,450) - 41% bessere Genauigkeit
# Rยฒ: 0.91
Use Case 3: Anomalie-Erkennung bei Zahlungseingรคngen
Use Case: Frรผhzeitige Erkennung von abnormalen Zahlungsmustern zur Fraud-Detection.
# Isolation Forest fรผr Anomalie-Erkennung
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
# Features: Payment Behavior Patterns
features = [
'payment_amount',
'days_since_last_payment',
'payment_hour', # Ungewรถhnliche Zahlungszeiten
'amount_deviation_from_avg',
'payment_frequency_last_30_days'
]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df[features])
# Train Isolation Forest
iso_forest = IsolationForest(
contamination=0.05, # Erwarte 5% Anomalien
random_state=42
)
df['anomaly_score'] = iso_forest.fit_predict(X_scaled)
df['anomaly_probability'] = iso_forest.score_samples(X_scaled)
# Anomalien zur manuellen Review
anomalies = df[df['anomaly_score'] == -1].sort_values('anomaly_probability')
# Write back to Data Hub
anomalies.to_sql(
'FactPaymentAnomalies',
con=synapse_connection,
schema='gold',
if_exists='append',
index=False
)
API-Integration fรผr Dritt-Applikationen
Der Data Hub muss nicht nur Daten konsumieren, sondern auch fรผr externe Systeme zugรคnglich machen. Bei FinanceLease AG nutzen wir Azure API Management als Gateway.
Architektur: API Layer
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ External Consumers โ
โ โข Mobile App โข Partner Portal โข Third-Party Tools โ
โโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Azure API Management (APIM) โ
โ โข Authentication (OAuth 2.0, API Keys) โ
โ โข Rate Limiting (1000 req/hour per consumer) โ
โ โข Caching (Redis) โ
โ โข Logging & Analytics โ
โโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Azure Functions (REST API Endpoints) โ
โ โข GET /api/contracts/{id} โ
โ โข GET /api/customers/{id}/contracts โ
โ โข POST /api/contracts/{id}/payments โ
โ โข GET /api/analytics/kpis โ
โโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Azure Synapse (Gold Layer) / Cosmos DB โ
โ โข Read-Optimized Views โ
โ โข Row-Level Security enforced โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Beispiel-API: Contract Details
// Azure Function: Get Contract Details
[FunctionName("GetContractDetails")]
public async Task<IActionResult> GetContractDetails(
[HttpTrigger(AuthorizationLevel.Function, "get",
Route = "contracts/{contractNumber}")] HttpRequest req,
string contractNumber,
ILogger log)
{
// Validate authentication
if (!ValidateApiKey(req.Headers["X-API-Key"]))
{
return new UnauthorizedResult();
}
// Row-Level Security: Check user permissions
var userContext = GetUserContext(req.Headers["Authorization"]);
if (!await HasAccessToContract(userContext, contractNumber))
{
return new ForbidResult();
}
// Query Synapse
var query = @"
SELECT
c.ContractNumber,
c.ContractType,
c.LeaseRate,
c.StartDate,
c.EndDate,
c.Status,
cust.CompanyName,
cust.Industry,
v.VIN,
v.Brand,
v.Model,
ISNULL(payments.TotalPaid, 0) AS TotalPaid,
ISNULL(payments.OutstandingAmount, 0) AS OutstandingAmount
FROM gold.DimContract c
INNER JOIN gold.DimCustomer cust ON c.CustomerID = cust.CustomerID
LEFT JOIN gold.DimVehicle v ON c.AssetID = v.VehicleID
LEFT JOIN (
SELECT
ContractKey,
SUM(CASE WHEN PaymentStatus = 'PAID_ONTIME' THEN Amount ELSE 0 END) AS TotalPaid,
SUM(CASE WHEN PaymentStatus = 'PENDING' THEN Amount ELSE 0 END) AS OutstandingAmount
FROM gold.FactContractTransaction
GROUP BY ContractKey
) payments ON c.ContractKey = payments.ContractKey
WHERE c.ContractNumber = @ContractNumber
";
using var connection = new SqlConnection(_synapseConnectionString);
var contract = await connection.QueryFirstOrDefaultAsync<ContractDetailDto>(
query,
new { ContractNumber = contractNumber }
);
if (contract == null)
{
return new NotFoundResult();
}
// Add enrichments from ML models
contract.ChurnProbability = await GetChurnPrediction(contractNumber);
contract.RecommendedActions = await GetRecommendations(contract);
return new OkObjectResult(contract);
}
OpenAPI Specification:
openapi: 3.0.0
info:
title: FinanceLease Data Hub API
version: 1.0.0
description: Access to contract, customer, and analytics data
security:
- ApiKeyAuth: []
- OAuth2: [read:contracts, write:payments]
paths:
/api/contracts/{contractNumber}:
get:
summary: Get contract details
parameters:
- name: contractNumber
in: path
required: true
schema:
type: string
responses:
'200':
description: Contract found
content:
application/json:
schema:
$ref: '#/components/schemas/Contract'
'404':
description: Contract not found
'401':
description: Unauthorized
/api/analytics/kpis:
get:
summary: Get real-time KPIs
parameters:
- name: date
in: query
schema:
type: string
format: date
responses:
'200':
description: KPIs retrieved
content:
application/json:
schema:
$ref: '#/components/schemas/KPIs'
components:
securitySchemes:
ApiKeyAuth:
type: apiKey
in: header
name: X-API-Key
schemas:
Contract:
type: object
properties:
contractNumber:
type: string
contractType:
type: string
enum: [Leasing, Mietkauf]
leaseRate:
type: number
format: decimal
startDate:
type: string
format: date
endDate:
type: string
format: date
status:
type: string
enum: [ACTIVE, ENDING_SOON, EXPIRED]
totalPaid:
type: number
outstandingAmount:
type: number
churnProbability:
type: number
format: float
Rate Limiting & Caching
<!-- APIM Policy fรผr Rate Limiting und Caching -->
<policies>
<inbound>
<!-- Authentication -->
<validate-jwt header-name="Authorization" require-scheme="Bearer">
<issuer-signing-keys>
<key>{{jwt-signing-key}}</key>
</issuer-signing-keys>
<audiences>
<audience>api://financelease-datahub</audience>
</audiences>
</validate-jwt>
<!-- Rate Limiting: 1000 requests/hour per subscription -->
<rate-limit-by-key calls="1000" renewal-period="3600"
counter-key="@(context.Subscription.Id)" />
<!-- Caching fรผr hรคufig abgerufene Endpoints -->
<cache-lookup vary-by-developer="true" vary-by-developer-groups="false">
<vary-by-query-parameter>date</vary-by-query-parameter>
</cache-lookup>
<!-- Request Transformation -->
<set-backend-service base-url="https://datahub-api.azurewebsites.net" />
</inbound>
<backend>
<forward-request timeout="30" />
</backend>
<outbound>
<!-- Cache successful responses fรผr 5 Minuten -->
<cache-store duration="300" />
<!-- Add Headers -->
<set-header name="X-Data-Freshness" exists-action="override">
<value>@(DateTime.UtcNow.ToString("o"))</value>
</set-header>
</outbound>
<on-error>
<!-- Error Handling -->
<set-body>@{
return new JObject(
new JProperty("error", context.LastError.Message),
new JProperty("timestamp", DateTime.UtcNow)
).ToString();
}</set-body>
</on-error>
</policies>
Zugriffssicherheit: Multi-Layer Security Model
Zugriffssicherheit ist bei Financial-Daten nicht optional. FinanceLease AG implementiert ein Defense-in-Depth-Modell mit mehreren Sicherheitsschichten.
Layer 1: Network Security
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Azure Virtual Network โ
โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โ
โ โ Subnet: Data โ โ Subnet: Computeโ โ Subnet: API โ โ
โ โ (Synapse, SQL) โ โ (Databricks) โ โ (APIM, Funcs) โ โ
โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โ
โ โ
โ โข Private Endpoints fรผr alle Services โ
โ โข Network Security Groups (NSGs) mit Deny-by-Default โ
โ โข Azure Firewall fรผr Outbound-Traffic โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Public Internet
โ (nur via Azure Front Door + WAF)
[API Management]
โ
Private Network (keine direkte Internet-Exposition)
Layer 2: Identity & Access Management (Azure AD)
// Role-Based Access Control (RBAC)
// Definierte Rollen:
public enum DataHubRole
{
DataEngineer, // Voller Zugriff auf alle Layers
DataAnalyst, // Read auf Silver/Gold
BusinessUser, // Read auf Gold (via Power BI/API)
DataSci entist, // Read Silver/Gold + ML Workspace
AuditUser // Read-Only fรผr Compliance
}
// Azure AD Group-Mappings
var roleGroups = new Dictionary<DataHubRole, string>
{
{ DataHubRole.DataEngineer, "SG-DataHub-Engineers" },
{ DataHubRole.DataAnalyst, "SG-DataHub-Analysts" },
{ DataHubRole.BusinessUser, "SG-DataHub-BusinessUsers" },
{ DataHubRole.DataScientist, "SG-DataHub-DataScientists" },
{ DataHubRole.AuditUser, "SG-DataHub-Auditors" }
};
Layer 3: Data-Level Security (Row-Level & Column-Level)
-- SQL Server Row-Level Security in Synapse
CREATE SCHEMA Security;
GO
CREATE FUNCTION Security.fn_SecurityPredicate(@CustomerCountry NVARCHAR(2))
RETURNS TABLE
WITH SCHEMABINDING
AS
RETURN
SELECT 1 AS Result
WHERE
-- Data Engineers sehen alles
IS_MEMBER('SG-DataHub-Engineers') = 1
OR
-- Risk Manager nur eigene Region
(IS_MEMBER('SG-Risk-Manager-DACH') = 1 AND @CustomerCountry IN ('DE', 'AT', 'CH'))
OR
(IS_MEMBER('SG-Risk-Manager-BENELUX') = 1 AND @CustomerCountry IN ('BE', 'NL', 'LU'))
OR
-- Sales nur eigene Kunden
(IS_MEMBER('SG-Sales-Team') = 1 AND @CustomerCountry IN (
SELECT Country FROM gold.DimCustomer
WHERE SalesRepID = CAST(SESSION_CONTEXT(N'SalesRepID') AS INT)
));
GO
CREATE SECURITY POLICY Security.ContractSecurityPolicy
ADD FILTER PREDICATE Security.fn_SecurityPredicate(CustomerCountry)
ON gold.DimContract,
ADD BLOCK PREDICATE Security.fn_SecurityPredicate(CustomerCountry)
ON gold.DimContract AFTER INSERT;
GO
-- Column-Level Security: Masking sensibler Daten
ALTER TABLE gold.DimCustomer
ALTER COLUMN TaxID ADD MASKED WITH (FUNCTION = 'partial(2, "XXXX", 2)');
-- Ergebnis fรผr Non-Privileged Users: "12XXXX89" statt "12345689"
Layer 4: Data Encryption
# Encryption at Rest and in Transit
Encryption at Rest:
Azure Data Lake:
- Microsoft-managed keys (Standard)
- Customer-managed keys via Azure Key Vault (fรผr Gold Layer)
Azure Synapse:
- Transparent Data Encryption (TDE) enabled
Azure SQL:
- Always Encrypted fรผr hochsensible Spalten (z.B. IBAN)
Encryption in Transit:
- TLS 1.2+ fรผr alle Verbindungen
- Azure Private Link fรผr Service-to-Service Communication
- VPN/ExpressRoute fรผr On-Premise Konnektivitรคt
Layer 5: Audit Logging
-- Audit Log Tabelle in Synapse
CREATE TABLE audit.DataAccessLog (
LogID BIGINT IDENTITY(1,1) PRIMARY KEY,
UserPrincipalName NVARCHAR(200),
UserRole NVARCHAR(100),
AccessedTable NVARCHAR(200),
AccessType NVARCHAR(50), -- SELECT, INSERT, UPDATE, DELETE
RowCount INT,
QueryText NVARCHAR(MAX),
ClientIPAddress NVARCHAR(50),
ApplicationName NVARCHAR(200),
Timestamp DATETIME2 DEFAULT GETUTCDATE()
);
-- Azure Monitor Integration
-- Alle Zugriffe auf Gold Layer werden geloggt und in Azure Sentinel analysiert
-- Alerts bei:
-- - Ungewรถhnlich hohe Anzahl von Queries (mรถgliches Data Exfiltration)
-- - Zugriff auรerhalb Arbeitszeiten
-- - Fehlgeschlagene Authentication-Versuche
Migration bestehender Data Warehouses
Viele Organisationen haben bereits ein Data Warehouse โ oft On-Premise SQL Server, Oracle, oder รคltere Cloud-Lรถsungen. Die Migration zu einem modernen Data Hub ist ein schrittweiser Prozess, kein Big-Bang.
Migrations-Strategie: Strangler Fig Pattern
Statt das alte DWH abzuschalten und alles neu zu bauen, migrieren wir inkrementell:
Phase 1: Parallel Betrieb
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ Legacy โ โ New Data โ
โ DWH โโโโโโโโโโโ Hub โ
โ (On-Prem) โ Replika โ (Azure) โ
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ โ
Old Reports New Reports
(wird sukzessive migriert)
Phase 2: Hybrid (90% neuer Hub, 10% Legacy)
โโโโโโโโโโโโโโ
โ New Data โ
โโโโโโโโโโโโโโโโโโโ Hub โ
โ Legacy โ โ (Azure) โ
โ DWH โ โโโโโโโโโโโโโโ
โ (minimiert)โ โ
โโโโโโโโโโโโโโ Most Reports
Phase 3: Full Migration
โโโโโโโโโโโโโโ
โ New Data โ
[Legacy DWH]โโโโโ โ Hub โ
decommissioned โ (Azure) โ
โโโโโโโโโโโโโโ
โ
All Reports
Migrations-Schritte
Schritt 1: Assessment
# Automated Analysis des bestehenden DWH
import pyodbc
import pandas as pd
# Connect to Legacy DWH
legacy_conn = pyodbc.connect('DSN=LegacyDWH;UID=user;PWD=pass')
# Analyse: Tabellen-Grรถรen
tables_query = """
SELECT
s.name AS SchemaName,
t.name AS TableName,
SUM(p.rows) AS RowCount,
SUM(a.total_pages) * 8 / 1024 AS SizeMB,
MAX(st.last_user_update) AS LastModified
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
INNER JOIN sys.indexes i ON t.object_id = i.object_id
INNER JOIN sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id
LEFT JOIN sys.dm_db_index_usage_stats st ON t.object_id = st.object_id
GROUP BY s.name, t.name
ORDER BY SizeMB DESC
"""
tables_df = pd.read_sql(tables_query, legacy_conn)
# Priorisierung
tables_df['Priority'] = tables_df.apply(lambda row:
'HIGH' if row['RowCount'] > 1000000 or row['SizeMB'] > 1000 else
'MEDIUM' if row['RowCount'] > 100000 else
'LOW', axis=1
)
print(f"Total Tables: {len(tables_df)}")
print(f"Total Size: {tables_df['SizeMB'].sum():.2f} MB")
print(f"\nPriority Distribution:")
print(tables_df['Priority'].value_counts())
Schritt 2: Schema-Migration (DDL)
-- Automated Schema Conversion
-- Tool: Azure Database Migration Service (DMS) oder custom scripts
-- Beispiel: Legacy DWH Fact Table
-- Source (Legacy):
CREATE TABLE dbo.FactSales (
SalesID INT PRIMARY KEY,
DateKey INT,
CustomerKey INT,
Amount DECIMAL(18,2),
Quantity INT,
LoadDate DATETIME DEFAULT GETDATE()
);
-- Target (Synapse Dedicated SQL Pool):
CREATE TABLE gold.FactSales (
SalesID INT NOT NULL,
DateKey INT NOT NULL,
CustomerKey INT NOT NULL,
Amount DECIMAL(18,2),
Quantity INT,
LoadDate DATETIME2,
-- Synapse-spezifische Optimierungen
PRIMARY KEY NONCLUSTERED (SalesID) NOT ENFORCED
)
WITH (
DISTRIBUTION = HASH(CustomerKey), -- Optimiert fรผr Customer-Joins
CLUSTERED COLUMNSTORE INDEX
);
Schritt 3: Data Migration
# Azure Data Factory Pipeline fรผr Data Migration
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, subscription_id)
# Create Pipeline fรผr jede Tabelle
for table in tables_df[tables_df['Priority'] == 'HIGH'].itertuples():
pipeline_name = f"Migrate_{table.SchemaName}_{table.TableName}"
# Copy Activity
copy_activity = CopyActivity(
name=f"Copy_{table.TableName}",
inputs=[
DatasetReference(reference_name='LegacyDWH_Generic', parameters={
'schemaName': table.SchemaName,
'tableName': table.TableName
})
],
outputs=[
DatasetReference(reference_name='Synapse_Generic', parameters={
'schemaName': 'gold',
'tableName': table.TableName
})
],
source=SqlSource(),
sink=SqlDWSink(
pre_copy_script=f"TRUNCATE TABLE gold.{table.TableName}",
write_batch_size=10000,
write_batch_timeout='00:10:00'
),
enable_staging=True, # Nutze Azure Blob als Staging
staging_settings=StagingSettings(
linked_service_name='AzureBlobStorage',
path='staging'
)
)
pipeline = PipelineResource(
activities=[copy_activity]
)
adf_client.pipelines.create_or_update(
resource_group_name,
data_factory_name,
pipeline_name,
pipeline
)
print(f"Created pipeline: {pipeline_name}")
Schritt 4: Reconciliation & Validation
-- Data Validation: Row Counts mรผssen matched
SELECT
'Legacy' AS Source,
COUNT(*) AS RowCount,
SUM(Amount) AS TotalAmount,
MAX(LoadDate) AS LatestRecord
FROM LegacyDWH.dbo.FactSales
UNION ALL
SELECT
'Synapse' AS Source,
COUNT(*) AS RowCount,
SUM(Amount) AS TotalAmount,
MAX(LoadDate) AS LatestRecord
FROM gold.FactSales;
-- Expected: Row Counts und TotalAmount identisch
Schritt 5: Cutover & Decommissioning
Cutover-Checklist:
โ Alle High-Priority Tables migriert und validiert
โ Alle Legacy-Reports auf neuen Data Hub migriert
โ Performance-Tests: Queries โค Legacy-Performance
โ User-Acceptance-Tests durchgefรผhrt
โ Rollback-Plan dokumentiert
โ Go-Live-Communication an Stakeholder
Post-Cutover:
Week 1-2: Parallel Run (Legacy DWH read-only, neue Reports aktiv)
Week 3-4: Monitoring, Hotfix-Phase
Month 2: Legacy DWH Decommissioning
Month 3: Hardware Return / Cost Savings realized
Migrations-Herausforderungen & Lessons Learned
Challenge 1: Query-Kompatibilitรคt
- Legacy T-SQL Queries nutzen oft Synapse-inkompatible Features
- Solution: Query-Rewrite oder Compatibility Layer (Views)
Challenge 2: Performance-Regression
- Einige Queries langsamer auf Synapse als On-Premise (Netzwerk-Latenz)
- Solution: Materialized Views, Aggressive Caching, Query-Optimierung
Challenge 3: Change Management
- User-Widerstand gegen "neue" Tools
- Solution: Intensive Trainings, Champions-Program, Quick Wins zeigen
Challenge 4: Downtime-Fenster
- 24/7-Betrieb erlaubt kein Wartungsfenster
- Solution: Blue-Green-Deployment, schrittweise Migration
Fazit & Best Practices
Der Aufbau eines modernen Data Hubs fรผr ein Leasing-Unternehmen ist ein strategisches, multi-dimensionales Projekt, das weit รผber die reine Technologie hinausgeht. Basierend auf der Erfahrung mit FinanceLease AG und รคhnlichen Projekten hier die wichtigsten Erkenntnisse:
Architektur:
- Medallion-Architektur (Bronze-Silver-Gold) als bewรคhrtes Pattern
- Delta Lake fรผr ACID-Transaktionen und Time-Travel
- Synapse Dedicated SQL Pool fรผr performantes BI-Reporting
- Separation of Concerns: Ingestion โ Processing โ Consumption
Datenqualitรคt: 5. Data Quality ist kein einmaliges Projekt, sondern kontinuierlicher Prozess 6. Automatisierte Data Quality Checks in jeder Layer-Transformation 7. Data Lineage und Governance (Azure Purview) von Tag 1
Analytics & ML: 8. Data Hub als Fundament fรผr Advanced Analytics, nicht nur Reporting 9. Feature Store fรผr reproduzierbare ML-Features 10. MLOps-Praktiken: Versionierung, CI/CD fรผr Models
Security: 11. Defense-in-Depth: Network, Identity, Data-Level Security 12. Encryption at Rest und in Transit ist Standard, kein Optional 13. Audit Logging fรผr Compliance (DSGVO, BaFin)
Migration: 14. Strangler Fig Pattern statt Big-Bang 15. Automatisierte Assessment-Tools nutzen 16. Mindestens 20% Puffer fรผr unerwartete Komplexitรคt
Organisation: 17. Cross-funktionale Teams (Data Engineers, Analysts, Business) 18. Dedicated Data Product Owner mit Business-Background 19. Self-Service als Ziel, aber mit klaren Governance-Guard-Rails
Messbare Erfolge bei FinanceLease AG:
- Time-to-Insight: 2 Wochen โ 2 Stunden (95% Reduktion)
- Data Quality: 78% โ 96% korrekte Daten
- Analytics-Adoption: 12% โ 67% der Business-User nutzen aktiv Reports
- Cost: โฌ420K Legacy-DWH โ โฌ180K Cloud (57% Reduktion)
- Business Impact: โฌ4.5M zusรคtzlicher Revenue durch Churn-Prevention
Der moderne Data Hub ist kein Projekt, sondern ein Produkt โ ein lebendes System, das kontinuierlich weiterentwickelt wird und messbaren Business-Value liefert.