Praktisk vejledning : Sådan automatiseres din ML-proces med Airflow og Cloud Composer

 Start din dag med endnu udført arbejde (Foto af Andrian Valeanu på Pixabay)

Drømmer du om at opbygge en hurtig, skalerbar, vedligeholdelig Machine Learning-rørledning (som et anbefalingssystem), der udtrækker data, træner og forudsiger det og gemmer sine resultater i din søvn, så du kun behøver at beundre alt det arbejde, der er gjort, mens du nipper til din morgenkaffe?

Søg ikke længere, Google Cloud Composer og Airflow er lavet til dig! Der er altid et stort kløft mellem at implementere / træne en ML-model på din jupyter-notebook og at sætte den i produktion. Opbygning af en ren infrastruktur, der giver dig mulighed for at overvåge hvert trin i rørledningen (dataindsamling, træne ML-modeller, sætte modeller i produktion, blive rapporteret, hvis der er noget problem), er nødvendigt for at skalere. Ved Snipfeed (beta-versionen af ​​vores app kommer snart, men du kan prøve messenger-versionen for et sneak peek), bruger vi Airflow til at overvåge og automatisere vores ML-proces til anbefalingssystemet.

Drømmen rørledning

I denne praktiske vejledning implementerer vi følgende arkitektur:

Drømte rørledning

Her er de daglige opgaver på din pipeline:

  • Ekstrahering og filtrering af data fra MongoDB og opbevaring på Cloud Storage.
  • Anvendelse af en ML-model på dine udpakkede data på en computermotor og gemme resultatet på Cloud Storage.
  • Importerer dine resultater til Redis Cache.

Fejl rapporteres direkte om Slack

Luftstrøm og skykomponist

Lad os introducere vores værktøjer til udvikling af denne rørledning: Airflow og Cloud Composer.

  1. Luftstrøm

Airflow er en platform, der giver dig mulighed for at oprette, styre og overvåge arbejdsgange. I vores scenario planlægger Airflow aggregeringen af ​​data med MongoDB og kører ML-modellen, når den er færdig.

I luftstrøm kan du opbygge DAGs (Directed Acyclic Graphs), som er rørledninger (eller arbejdsgange) skrevet i Python, der udfører operatører (såsom Python-funktioner eller Bash-kommandoer).

En DAG har en startdato (den dato, den bliver aktiv) og et tidsplaninterval, der definerer tidsforløbet mellem to kørsler.

I vores eksempel

  • DAG: Drømrørledningen
  • Operatører: 2 Python-operatører for at få adgang til MongoDB og Redis, 1 Bash-operatør til at udføre ML-kode på Compute-motoren.
  • Startdato: datetime (2019, 3, 27) vores DAG bliver aktiv den 27. marts 2019.
  • Planinterval: timedelta (dage = 1) fra 27. marts 2019, vores DAG vil køre hver dag.

2. Cloud Composer

Cloud Composer er en service leveret af Google Cloud Platform, der bruger Airflow til at konfigurere en rørledning.

Platformen leveret af Cloud Composer er relativt let at håndtere. Luftstrømmen kører på et miljø i bagenden.
Et miljø er sammensat af noder, der udfører Airflows operatører parallelt, hvis det er muligt.
Miljøet leveres også med en Google Cloud Storage-spand, der gemmer DAGs kode, midlertidige data og logfiler.

Start med Cloud Composer

For at begynde skal vi skabe et miljø.

  1. På Google Cloud Platform skal du gå til Cloud Composer API og markere afkrydsningsfeltet Active API.
  2. Åbn Cloud Shell (øverste højre hjørne), og skriv følgende kommando:
$ cloud-beta-komponistmiljøer skaber komponist - placering us-central-1 - zone us-central1-b - disk-størrelse 40 GB - maskintype n1-standard-1 - nodetælling 3 - python-version 3 - Komponist-image-version-1.5.2-airflow-1.10.1
  • komponist: dit miljønavn
  • placering & zone: fysisk placering af dit miljø.
  • diskstørrelse: hukommelsesstørrelse af dit miljø, minimum er 30 GB, men det er ikke nok til senere at installere Python-pakken.
  • maskintype: maskintype, som dine noder arbejder på (n1-standard-1 er standard). Se beskrivelse her for andre typer.
  • node-count: hvor mange noder der fungerer på dine DAG'er, minimum er 3.
  • python-version: python-version koden i dine DAG'er udføres i (2 og 3 tilgængelig).
  • image-version: version af luftstrøm og komponist, der kører på miljøet, se her for nyere versioner.

3. Efter ca. 10 minutter er dit miljø opsat

Opsætning af miljø

Opret din første Airflow DAG

Nu hvor dit Cloud Composer-miljø er konfigureret, er vi nødt til at oprette vores første DAG, der kører på det.

En DAG er skrevet i Python og består af 4 forskellige dele:

  1. DAG argumenter:
  • ejer: navnet på denne DAGs ejer
  • depend_on_past: om DAG afhænger af dets tidligere løb,
  • startdato: den dato, hvor du ønsker, at denne DAG skal aktiveres
  • gentest: hvor mange forsøg, der skal køres i tilfælde af fejl
  • retry_delay: forsinkelsen mellem disse forsøg

2. DAG-definition:

Her er den del, hvor du opretter din DAG.

  • dag_id: DAG-navnet
  • args: de argumenter, som vi erklærede ovenfor
  • schema_interval: tidsintervallet mellem hver kørsel af din DAG

3. DAG-operatør:

Og nu skal du definere operatørerne i din DAG.
F.eks. Instantierer vi her en Bash-operatør og en Python-operatør.
Disse operatører tager følgende parametre ind:

  • task_id: id for operatøren
  • give_context: Boolean for at specificere, om konteksten skal overføres til operatøren (dette kan være nyttigt at få task_id, uitvoeringsdato osv.)
  • op_kwargs: argumenter sendt til din operatør
  • python_callable: python-funktion til at køre (felt kun tilgængeligt for objekter i klassen PythonOperator)
  • bash_command: bash kommando til at køre (felt kun tilgængeligt for objekter i klassen BashOperator)
  • on_failure_callback: funktion til at køre i tilfælde af, at operatøren mislykkes (ikke brugt i koden herunder)

Dette er et eksempel på en funktion, som en PythonOperator kan køre.
Bemærk, at hvad vores funktion returnerer udskrives i loggen (og det er alt, hvad der sker med det)

4. Afhængighedsdefinition:

Her definerer vi rækkefølgen, som operatørerne skal udføres i, så vi har følgende pipeline:

Når du sætter alt sammen, får du:

Link din DAG til Cloud Composer

For at kunne overvåge din DAG på Cloud Composer:

  1. Gå til DAGs

2. Upload din dag1.py fil

3. Gå til luftstrøm

4. Efter nogle få minutter skal du opdatere, og din DAG skal vises, identificeret med dens ID (den er som standard tændt og kører et vist antal gange, afhængigt af hvad du angiver startdato og planlagt interval)

Vores første DAG om luftstrøm

Du kan nu overvåge det og opdage de mange overvågningsfunktioner, der leveres af Airflow, ved at klikke på din DAG! (en vis hjælp til overvågning leveres af denne artikel)

Tilbage til vores drømmepipeline

Så nu kan vi gå nærmere ind på vores drømmepipeline.
Vi skal:

  1. Opret forbindelse til MongoDB, samlet data og gem dem i en spand.
  2. Udfør en bash-kommando, der kopierer disse data til en Compute Engine, anvender en ML-algoritme og gemmer resultatet i en spand.
  3. Opret forbindelse til Redis og upload vores resultater.
  4. Ved mislykkethed, send en besked til Slack.

Der er stadig nogle Airflow-funktionaliteter, som vi er nødt til at afsløre for at være i stand til at indstille vores drømmepipeline, for det går i Airflow web UI.

  • Connexions
Opret luftstrømforbindelse

Som specificeret bliver vi nødt til at indstille 2 sammenhænge til Slack og Redis (ikke MongoDB, fordi du ikke kan angive MongoDB som en tilknytning, når denne artikel bliver offentliggjort)

Følg dette medium for at opsætte Slack-forbindelsen

For Redis-forbindelsen skal du gå til Redis og oprette en ny cache.
I Airflow-webgrænseflade skal du i fanen Forbindelser klikke på Opret sæt en id i Conn-id (“redis” for eksempel). Specificer også Conn Type som Redis, og tilføj derefter oplysninger, der er givet dig af redis i Host, Password, Port og Extra.

  • Variable

Airflow giver os også en enkel måde at videregive argumenter til vores DAG

Opret luftstrømvariabel

I vores tilfælde vil vores variabel indeholde:

  • redis: idet til redis-forbindelsen, som vi definerede tidligere
  • slack: idet for den slappe forbindelse, som vi definerede tidligere
  • mongo_uri: uri brugt af pymongo til at få adgang til din database i formatet mongodb: // brugernavn: password @ host: port / db? indstillinger
  • mongo_collection: den samling, du vil samle
  • instansnavn: navnet på den instans, som du vil køre ML-kode på
  • instans_zone: zonen i det forekomst, du vil køre ML-kode på

Så opret en variabel og udfyld den med dine detaljer som ovenfor

Denne variabel åbnes i python dag-fil med følgende kode

Udfør ML-kode på Compute Engine

Din ML-kode kopieres og overføres til en instans af pipeline, så den kan bruge denne instans til at fremsætte forudsigelser.

Krav: python3 og pip3 skal installeres på din instans.

Du har derefter brug for to filer i et zippet bibliotek for at gøre din ML-kode eksekverbar:
krav.txt, der installerer eksterne biblioteker efter dette format, og main.py, der implementerer din ML-model og gemmer dens forudsigelser.
Denne ml.zip skal placeres i datamappen i din miljøspand.

Endelig DAG-kode

Konklusion

Her går du, du har en frisk ny arbejdsgang, der vil gøre mange ML-ingeniører glade i din virksomhed eller gøre alt det arbejde for dig!

Kommentarer

Vi er tæt på slutningen af ​​artiklen, så jeg vil kommentere nogle valg af implementering, jeg foretog, mens jeg byggede denne pipeline.

  • Hvorfor eksportere og udføre ML-kode på en Computermotor, hvis du har et python-miljø direkte på Cloud Composer?

Hvis du har brug for en masse CPU eller RAM for at udføre din ML-kode, skal du opgradere til 3 noder mindst på Cloud Composer, og det kan blive dyrt; hvorimod du kun kan vælge at tilføje en node på Compute Engine, som du også kan tilpasse med GPU.

  • Hvorfor ikke gemme output direkte til Redis i slutningen af ​​ML-modellen i stedet for at oprette en ny operatør, der udfører arbejdet bagefter?

Årsagen bag dette valg er, at det giver os mulighed for at gemme Redis-legitimationsoplysninger i Airflow Connexions, så forbindelsen er krypteret, og alle legitimationsoplysninger holdes på samme sted.

  • Hvorfor bruger vi ikke MongoDB-tilslutningskrog på luftstrøm i stedet for pymongo inde i DAG-kode?

Igen, på det tidspunkt, hvor denne artikel blev skrevet, var MongoDB-forbindelsen ikke tilgængelig på den luftstrømversion, der blev brugt af Cloud Composer.

  • Er der nogle særlige forhold, der beskæftiger sig med MongoDB eller Redis?

Ja, der er nogle særheder, som du bliver nødt til at håndtere.
Med MongoDB skal du passe på, at Mongo-aggregering ikke kan returnere resultater, der er større end 16 MB, så måske bliver du nødt til at dele din aggregering vha. $ Skip og $ limit-operatører.
Med Redis kan du ikke angive ordbøger, der er større end 512 MB, så du bliver måske nødt til at opdele din mset i flere dikter.