Dzisiaj zajmiemy się wykorzystaniem Sparka i Hadoopa do przetwarzania większej ilości danych. Oraz do budowania prostego modelu (regresji liniowej). Może jeszcze nie jest to big data, ale mechanizmy są identyczne jak w przypadku większej liczby danych. Wystarczy tylko tych danych więcej zgromadzić, zbudować większe środowisko (dużo serwerów) i… też będzie działało.
A w dodatku poznamy pakiet faker który pozwoli nam na wygenerowanie sztucznych danych.
Spark i Hadoop
Apache Spark to rozwijana na zasadach open source platforma klastrowego przetwarzania danych, która ma interfejsy API dla takich języków programowania jak Scala, Python, Java i R.
Apache Hadoop to z kolei skalowalna platforma do przechowywania i zarządzania dużymi zbiorami danych. Ogólna koncepcja polega na podziale dużych zbiorów na mniejsze fragmenty, które mogą być równolegle przetwarzane w wielu węzłach wykorzystujących standardowe serwery. Oprócz tego Hadoop obsługuje dane niestrukturalne i jest niezależny od systemu operacyjnego. My wykorzystamy przede wszystkim HDFS (Hadoop Distributed File System) czyli system plików zapewniający wysoką wydajność dostępu do danych.
To taki tekst dla inżynierów big dejta. Dla analityków i programistów: Sparkiem będziemy sięgać do danych leżących w wielu plikach na HDFS i traktować je jako jedną tabelkę. To jest ta cała magia która jest najfajniejsza. Dodatkowo – wielkość tych danych w plikach (na Hadoopie/HDFSie) nie ma za bardzo znaczenia – pamięci nie zabraknie (bo jakoś Spark sobie z tym poradzi). No chyba, że dane ze Sparka zechcemy do tej pamięci pobrać (np. do data frame w pandas).
Nie będę opisywał jak zainstalować Sparka i Hadoopa. Jest do tego świetny tutorial, który przeszedłem krok po kroku i u mnie działa. Co więcej – nawet instalowałem wersje nowsze niż w tym tutorialu (odpowiednio trzeba pozmieniać linki do pobieranych instalek i później same nazwy plików czy folderów. Ale to proste jest. No i oczywiście tutorial jest przeznaczony na Linuxa, bo takie rzeczy to nie na Windows (chociaż pewnie się da).
Tutorial tutaj: Installing and Running Hadoop and Spark on Ubuntu 18
Podstawy Sparka
Na początek kilka podstaw – jak utworzyć tabelkę (data frame) w Sparku? Jak wczytać plik? Jak wybrać kolumnę? Jak odfiltrować dane? Jak je posortować i inne takie. Zobaczmy na kilka prostych przykładów:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
from pyspark.sql import SparkSession # budujemy aplikację sparkową spark = SparkSession.builder.appName("test_app").getOrCreate() # dane: df_list = [('Polska', 'Warszawa', 1), ('Polska', 'Kraków', 2), ('Niemcy', 'Berlin', 1), ('Niemcy', 'Hamburg', 3), ('Czechy', 'Praga', 1), ('Rosja', 'Moskwa', 1), ('Francja', 'Lyon', 4)] # budujemy data frame w Sparku df = spark.createDataFrame(df_list, # nazwy kolumn: ['kraj', 'miasto', 'liczba']) # co nam wyszło? df.show() |
kraj | miasto | liczba |
---|---|---|
Polska | Warszawa | 1 |
Polska | Kraków | 2 |
Niemcy | Berlin | 1 |
Niemcy | Hamburg | 3 |
Czechy | Praga | 1 |
Rosja | Moskwa | 1 |
Francja | Lyon | 4 |
Prawda, że jak na razie proste? To tylko króciutkie wprowadzenie, a nawet zarys takiego wprowadzenia, więc o szczegółach musicie doczytać. Jedźmy dalej:
1 2 |
# wybór kolumn: df.select(['kraj', 'miasto']).show() |
kraj | miasto |
---|---|
Polska | Warszawa |
Polska | Kraków |
Niemcy | Berlin |
Niemcy | Hamburg |
Czechy | Praga |
Rosja | Moskwa |
Francja | Lyon |
1 2 |
# filtrowanie: df.filter(df['liczba'] > 1).show() |
kraj | miasto | liczba |
---|---|---|
Polska | Kraków | 2 |
Niemcy | Hamburg | 3 |
Francja | Lyon | 4 |
1 2 |
# filtr można zapisać też w stringu oraz łączyć warunki df.filter('(liczba > 1) AND (kraj != "Polska")').show() |
kraj | miasto | liczba |
---|---|---|
Niemcy | Hamburg | 3 |
Francja | Lyon | 4 |
1 2 |
# sortowanie df.sort('kraj').show() |
kraj | miasto | liczba |
---|---|---|
Czechy | Praga | 1 |
Francja | Lyon | 4 |
Niemcy | Berlin | 1 |
Niemcy | Hamburg | 3 |
Polska | Kraków | 2 |
Polska | Warszawa | 1 |
Rosja | Moskwa | 1 |
1 2 3 4 |
from pyspark.sql.functions import asc, desc # sortowanie po dwóch kolumnach: kraje rosnąco, liczby w ramach kraju majlejąco df.sort(asc('kraj'), desc('liczba')).show() |
kraj | miasto | liczba |
---|---|---|
Czechy | Praga | 1 |
Francja | Lyon | 4 |
Niemcy | Hamburg | 3 |
Niemcy | Berlin | 1 |
Polska | Kraków | 2 |
Polska | Warszawa | 1 |
Rosja | Moskwa | 1 |
1 2 |
# agregaty - np. średnia dla kraju df.groupby('kraj').agg({"liczba": 'mean'}).show() |
kraj | avg(liczba) |
---|---|
Rosja | 1.0 |
Francja | 4.0 |
Czechy | 1.0 |
Niemcy | 2.0 |
Polska | 1.5 |
A co jeśli chcemy kilka agregatów na raz, na tej samej kolumnie i od razu zmieniając nazwę kolumny wynikowej?
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.sql import functions as F df \ .groupby('kraj') \ .agg( \ F.min(df.liczba).alias("minimum"), \ F.max(df.liczba).alias("maximum"), \ F.avg(df.liczba).alias("srednia"), \ F.sum(df.liczba).alias("suma") \ ) \ .show() |
kraj | minimum | maximum | srednia | suma |
---|---|---|---|---|
Rosja | 1 | 1 | 1.0 | 1 |
Francja | 4 | 4 | 4.0 | 4 |
Czechy | 1 | 1 | 1.0 | 1 |
Niemcy | 1 | 3 | 2.0 | 4 |
Polska | 1 | 2 | 1.5 | 3 |
Znak \
pozwala na złamanie linii co ułatwia czytanie i pisanie kodu. Nie jest to niestety R-owy pipe %>%
, ale zawsze coś.
1 2 |
# Nowa kolumna jako kopia df.withColumn('nowe_miasto', df['miasto']).show() |
kraj | miasto | liczba | nowe_miasto |
---|---|---|---|
Polska | Warszawa | 1 | Warszawa |
Polska | Kraków | 2 | Kraków |
Niemcy | Berlin | 1 | Berlin |
Niemcy | Hamburg | 3 | Hamburg |
Czechy | Praga | 1 | Praga |
Rosja | Moskwa | 1 | Moskwa |
Francja | Lyon | 4 | Lyon |
1 2 |
# nowa kolumna jako wynik działania funkcji df.withColumn('liczba_x1000', df['liczba']*1000).show() |
kraj | miasto | liczba | liczba_x1000 |
---|---|---|---|
Polska | Warszawa | 1 | 1000 |
Polska | Kraków | 2 | 2000 |
Niemcy | Berlin | 1 | 1000 |
Niemcy | Hamburg | 3 | 3000 |
Czechy | Praga | 1 | 1000 |
Rosja | Moskwa | 1 | 1000 |
Francja | Lyon | 4 | 4000 |
Niektórzy wolą zapytania SQL-owe i dla takich ludzi Spark to umożliwia. Ot, po prostu, robimy widok SQL-owy na tabelę w Sparku, a później operujemy na danych używając zapytań SQL:
1 2 3 4 5 6 7 |
# Rejestracja DataFrame jako tymczasowy widok SQL df_sql = df.createOrReplaceTempView("dane") # teraz możemy na tych danych działać z poziomu SQL sql_results = spark.sql("SELECT * FROM dane") sql_results |
1 |
DataFrame[kraj: string, miasto: string, liczba: bigint] |
1 |
sql_results.show() |
kraj | miasto | liczba |
---|---|---|
Polska | Warszawa | 1 |
Polska | Kraków | 2 |
Niemcy | Berlin | 1 |
Niemcy | Hamburg | 3 |
Czechy | Praga | 1 |
Rosja | Moskwa | 1 |
Francja | Lyon | 4 |
1 |
spark.sql("SELECT kraj, COUNT(*) AS liczba_miast FROM dane GROUP BY kraj").show() |
kraj | liczba_miast |
---|---|
Rosja | 1 |
Francja | 1 |
Czechy | 1 |
Niemcy | 2 |
Polska | 2 |
SQLa warto się nauczyć chociaż na podstawowym poziomie. Na pewno będzie sprawdzany na przeróżnych rekrutacjach, ale też w pracy z danymi z przeróżnych baz danych się przydaje.
Po wykonaniu wszelakich agregacji, wyliczeń i innych manipulacji danymi możemy je pobrać prosto ze Sparka do Padnas i cokolwiek z tym dalej potrzeba robić. Wykresy rysować czy coś.
1 |
df.toPandas() |
kraj | miasto | liczba | |
---|---|---|---|
0 | Polska | Warszawa | 1 |
1 | Polska | Kraków | 2 |
2 | Niemcy | Berlin | 1 |
3 | Niemcy | Hamburg | 3 |
4 | Czechy | Praga | 1 |
5 | Rosja | Moskwa | 1 |
6 | Francja | Lyon | 4 |
Dane fejkowe
Zanim zaczniemy sensowniej wykorzystywać Sparka przygotujemy sobie trochę fejkowych danych. Wykorzystamy do tego pakiet Faker. Niech będą to osoby, które będą opisane przez imię, nazwisko, płeć i wiek, a także adres domowy i adres pracy. Dodatkowo dla pracy dodamy stanowisko i wynagrodzenie. Struktura dla każdej z osób będzie następująca:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
{ imie, nazwisko, wiek, plec, dom: { ulica, numer_domu, kod_pocztowy, miasto, wojewodztwo, telefon }, praca: { firma, stanowisko, ulica, numer_domu, kod_pocztowy, miasto, telefon, wynagrodzenie } } |
Jak widzicie wygląda to jak struktura pliku JSON. I bardzo dobrze, bo będziemy generować takie właśnie pliczki. Tutaj jeszcze umyślnie jest tak, że plik ma zagnieżdżoną strukturę co w przypadku na przykład wczytania do Pandas może powodować problemy (szczególnie jak czasem dany element jest czasem go nie ma).
Wszystkich tych danych nie wykorzystamy w naszym przykładzie, ale generator umożliwi Wam przygotowanie różnych zestawień w różnych przekrojach – ile średnio zarabia kobieta na określonym stanowisku? Możecie sobie poćwiczyć sparkowe zapytania, ale uprzedzam – bez sensownych danych trudno zweryfikować czy wychodzi sensownie. Dodatkowo Faker całkiem nieźle losuje dane, więc może być jeszcze trudniej (dane są dobrze losowo rozłożone).
Cały skrypt poniżej, mam nadzieję że komentarze wyjaśniają co i jak:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import json import faker import random import pandas as pd # ile osób generujemy? N_PROBS = 2000 def generate_person(f): # losujemy płeć plec = f.random_element(elements=("K", "M")) # w zależności od płci pprzypisujemy żeńskie lub męskie imię i nazwisko if plec == 'M': imie = f.first_name_male() nazwisko = f.last_name_male() else: imie = f.first_name_female() nazwisko = f.last_name_female() # losujemy wiek wiek = random.randint(25, 65) # budujemy "osobę" d = { 'imie': imie, 'nazwisko': nazwisko, 'wiek': wiek, 'plec': plec, # miejsce zamieszkania: 'dom': { 'ulica': f.street_name(), 'numer_domu': f.building_number(), 'kod_pocztowy': f.postcode(), 'miasto': f.city(), 'wojewodztwo': f.region(), 'telefon': f.phone_number() }, # miejsce pracy: 'praca': { 'firma': f.company(), 'stanowisko': f.job(), 'ulica': f.street_name(), 'numer_domu': f.building_number(), 'kod_pocztowy': f.postcode(), 'miasto': f.city(), 'telefon': f.phone_number(), # wynagrodzenie zależy od wieku - rozmyślnie, # bo będziemy ćwiczyć regresję liniową 'wynagrodzenie': 200*wiek + 100*random.randint(-5, 5) } } return d # generator zmyślonych danych fake = faker.Faker(['pl_PL']) # generujemy dużo osób for i in range(N_PROBS): osoba = generate_person(fake) # każdą z nich zapisujemy do dodzielnego pliku JSON with open("fake_data/%04d.json" % (i+1), "w") as file: json.dump(osoba, file) file.close() |
Powyższy kod wygeneruje nam 2000 plików JSON z losowymi danymi wymyślonych osób.
Zwróćcie uwagę, że wynagrodzenie zależy liniowo od wieku osoby (jest około 200 razy większe, plus minus kilka setek). To będziemy estymować w naszym modelu.
Mając tak przygotowane pliki możemy spróbować je wczytać do data frame w Sparku. Najpierw jeden plik:
1 2 3 |
file_df = spark.read.json("fake_data/0000.json") file_df.show() |
dom | imie | nazwisko | plec | praca | wiek |
---|---|---|---|---|---|
[01-516, Przemyśl… | Agnieszka | Rejniak | K | [FPUH Fitrzyk, 96… | 35 |
Zwróćcie uwagę, że kolumny dom i praca to listy. A gdy sprawdzimy schemat jaki Spark sam rozpoznał?
1 |
file_df.printSchema() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
root |-- dom: struct (nullable = true) | |-- kod_pocztowy: string (nullable = true) | |-- miasto: string (nullable = true) | |-- numer_domu: string (nullable = true) | |-- telefon: string (nullable = true) | |-- ulica: string (nullable = true) | |-- wojewodztwo: string (nullable = true) |-- imie: string (nullable = true) |-- nazwisko: string (nullable = true) |-- plec: string (nullable = true) |-- praca: struct (nullable = true) | |-- firma: string (nullable = true) | |-- kod_pocztowy: string (nullable = true) | |-- miasto: string (nullable = true) | |-- numer_domu: string (nullable = true) | |-- stanowisko: string (nullable = true) | |-- telefon: string (nullable = true) | |-- ulica: string (nullable = true) | |-- wynagrodzenie: long (nullable = true) |-- wiek: long (nullable = true) |
Ładnie widać wszystkie zagnieżdżenia, a nawet typy danych które zawsze są numeryczne (wiek i wynagrodzenie).
Sprawdźmy czy możemy wczytać wszystkie 2000 JSONów na raz?
1 2 3 |
files_df = spark.read.json("fake_data") files_df.show() |
dom | imie | nazwisko | plec | praca | wiek |
---|---|---|---|---|---|
[80-540, Łuków, 8… | Urszula | Skała | K | [Spółdzielnia Dyb… | 53 |
[86-924, Tomaszów… | Maurycy | Pszczółka | M | [Gabinety Forysia… | 49 |
[59-091, Mińsk Ma… | Tadeusz | Mikus | M | [Fundacja Choroś-… | 35 |
[31-706, Wągrowie… | Klara | Bartłomiejczyk | K | [Spółdzielnia Pos… | 65 |
[58-611, Świnoujś… | Julianna | Kamyszek | K | [FPUH Rząca-Jama … | 46 |
[34-534, Bytom, 9… | Sylwia | Opas | K | [Stowarzyszenie L… | 62 |
[97-513, Łuków, 3… | Nataniel | Słyk | M | [Stowarzyszenie H… | 49 |
[67-145, Piekary … | Olgierd | Korkosz | M | [Spółdzielnia Rom… | 33 |
[97-413, Bielsk P… | Melania | Krekora | K | [Spółdzielnia Chw… | 57 |
[51-127, Siemiano… | Dawid | Nasiadka | M | [Stowarzyszenie P… | 61 |
[73-637, Żywiec, … | Adrianna | Nowotnik | K | [Fundacja Froń-Fa… | 30 |
[13-696, Wyszków,… | Malwina | Leżoń | K | [Stowarzyszenie P… | 57 |
[69-089, Głogów, … | Anastazja | Potoczek | K | [Gabinety Jaje-Ge… | 54 |
[98-298, Bolesław… | Rozalia | Kapustka | K | [Stowarzyszenie W… | 43 |
[54-050, Marki, 7… | Przemysław | Gorzelańczyk | M | [Stowarzyszenie F… | 53 |
[68-995, Sandomie… | Tadeusz | Oleksik | M | [Watras-Bogaczyk … | 41 |
[41-103, Otwock, … | Krystian | Jażdżyk | M | [Gabinety Becker-… | 29 |
[04-831, Jarosław… | Tomasz | Smektała | M | [Fundacja Morgała… | 37 |
[31-336, Gdańsk, … | Olaf | Czuryło | M | [Spółdzielnia Czu… | 58 |
[76-335, Wągrowie… | Dominik | Drwięga | M | [Spółdzielnia Mie… | 37 |
1 |
only showing top 20 rows |
To tylko początek tabeli, wszystkich wierszy mamy:
1 |
files_df.count() |
1 |
2000 |
No dobrze Panie Bobrze, ale jak wyciągnąć te wartości w zagnieżdżonych kolumnach? Dom i praca mają swój kod pocztowy – wyjmijmy te wartości:
1 2 3 4 5 6 |
files_df \ .select( \ F.col('dom.kod_pocztowy').alias('kod_dom'), \ F.col('praca.kod_pocztowy').alias('kod_praca') \ ) \ .show(5) # 5 oznacza, że pokażemy tylko 5 elementów |
kod_dom | kod_praca |
---|---|
80-540 | 02-055 |
86-924 | 51-564 |
59-091 | 83-771 |
31-706 | 83-631 |
58-611 | 50-091 |
1 |
only showing top 5 rows |
Zapisanie na HDFS (skopiowanie plików z konsoli)
Te wygenerowane pliki dla każdej z osób chcemy mieć na Hadoopie (na HDFS). Tak, żeby sięgnąć do nich Sparkiem. Oczywiście Sparkiem można sięgnąć i do plików z dowolnego katalogu (jak w przypadku tworzenia files_df
), ale my chcemy z HDFS (do celów edukacyjnych).
Proste to zadanie, z konsoli wykonujemy po prostu:
1 |
hdfs dfs -put fake_data/*.json /fake_data/ |
Jak już wrzucimy pliki na HDFS to jak je wczytać w Sparku? A to za chwilę, w ramach kolejnych kroków.
Regresja liniowa
W Sparku cechy (czyli wartości X) muszą być jedną kolumną zawierającą wektor. Odpowiedź (wartość Y) może być już jedną kolumną, nie trzeba jej specjalnie przekształcać.
Takie przekształcenie zapewni nam VectorAssembler
. Model będzie liniową regresją, więc od razu zaimportujmy stosowne funkcje z pySparka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator # wczytujemy dane z HDFS, wszystko co jest w /fake_data df = spark.read.json("hdfs://localhost:9000/fake_data/") # tabela z kolumnami potrzebnymi do modelu reg_df = df.select(['wiek', 'praca.wynagrodzenie']) # które kolumny skłasamy w wektor cech? va = VectorAssembler(inputCols = ['wiek'], outputCol = 'features') # dokonujemy przekształcenia reg_df = va.transform(reg_df) # zostawiamy sobie cechy i odpowiedź reg_df = reg_df.select(['features', 'wynagrodzenie']) # co nam zostało? reg_df.show(5) |
features | wynagrodzenie |
---|---|
[53.0] | 10500 |
[49.0] | 9800 |
[65.0] | 13300 |
[35.0] | 6700 |
[46.0] | 9000 |
1 |
only showing top 5 rows |
Nasz X czyli features
jest wektorem (jednym na każdy wiersz, jest to przy okazji wektor jednoelementowy), a Y – po prostu są to wartości. Zdefiniujmy teraz model i wytrenujmy go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# podział na zbóri uczący i testowy: splits = reg_df.randomSplit([0.7, 0.3]) train_df = splits[0] test_df = splits[1] # definicja modelu: lr = LinearRegression(featuresCol='features', # w której kolumnie są cechy? labelCol='wynagrodzenie', # w której jest odpowiedź? maxIter=10, regParam=0.3, elasticNetParam=0.8) # uczenie modelu: lr_model = lr.fit(train_df) |
Jakie mamy współczynniki naszej prostej wychodzącej z modelu?
1 2 3 4 5 6 |
print("Coefficients: " + str(lr_model.coefficients)) print("Intercept: " + str(lr_model.intercept)) Coefficients: [200.28072550903843] Intercept: -5.3844061706032935 |
Nachylenie prostej to okolice 200. Pamiętacie jak wyglądało generowanie danych? Wynagrodzenie to wiek razy 200 plus minus jakieś wartości. Z tego plus minus zrobił się ułamek przy 200.
Sprawdźmy jeszcze współczynniki mówiące o ocenie modelu – w tym przypadku błąd średnio kwadratowy (RMSE) i R2 na danych treningowych.
1 2 3 |
trainingSummary = lr_model.summary print("RMSE: %f" % trainingSummary.rootMeanSquaredError) print("R2: %f" % trainingSummary.r2) |
1 2 |
RMSE: 318.175435 R2: 0.982466 |
Oczywiście możemy zrobić też predykcję – po co model jak nie można nic przewidzieć?
1 2 3 |
lr_predictions = lr_model.transform(test_df) lr_predictions.select("prediction","wynagrodzenie","features").show(10) |
prediction | wynagrodzenie | features |
---|---|---|
5802.756633591512 | 5600 | [29.0] |
7405.0024376638185 | 7400 | [37.0] |
9207.528967245164 | 9000 | [46.0] |
9808.37114377228 | 9800 | [49.0] |
10008.651869281317 | 10200 | [50.0] |
13012.862751916895 | 12500 | [65.0] |
5402.195182573435 | 5200 | [27.0] |
6203.318084609588 | 6600 | [31.0] |
7605.283163172858 | 7100 | [38.0] |
8406.40606520901 | 8200 | [42.0] |
1 |
only showing top 10 rows |
I sprawdzamy R2 oraz RMSE na danych testowych:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# R2: lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="wynagrodzenie", metricName="r2") print("R2 on test data = %g" % lr_evaluator.evaluate(lr_predictions)) # RMSE: lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="wynagrodzenie", metricName='rmse') print("RMSE on test data = %g" % lr_evaluator.evaluate(lr_predictions)) |
1 2 |
R2 on test data = 0.983078 RMSE on test data = 316.172 |
Wyniki są zbliżone do danych treningowych (nie dziwne to w tym przypadku), myślę że dość zadowalające (chociaż mieć prawie 315 złotych a nie mieć 315 złotych to jednak 630 zł różnicy…).
Model w scikit-learn
A jak podejść standardowo do tych samych danych i przygotować dla porównania model z użyciem np. scikit-learn? Trzeba dane wyjąć ze Sparka do Pandas, a dalej już standardowo. Zatem dla przypomnienia:
1 2 |
pdf = df.select(F.col('praca.wynagrodzenie').alias('wynagrodzenie'), F.col('wiek')).toPandas() pdf.head(5) |
wynagrodzenie | wiek |
---|---|
10500 | 53 |
9800 | 49 |
13300 | 65 |
6700 | 35 |
9000 | 46 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
from sklearn.linear_model import LinearRegression as sk_linreg # alias, żeby nie pogryzło się z tym z pySparka from sklearn.metrics import mean_squared_error, r2_score from sklearn.model_selection import train_test_split import math # dzielimy dane na treningowe i testowe X_train, X_test, y_train, y_test = train_test_split(pdf['wiek'].values.reshape(-1,1), pdf['wynagrodzenie'].values.reshape(-1,1), test_size = 0.2) # budujemy model sklearn_reg_model = sk_linreg() # uczymy model na danych treningowych sklearn_reg_model.fit(X_train, y_train) # współczynniki prostej jaka wychodzi w modelu print("Coefficients: " + str(sklearn_reg_model.coef_)) print("Intercept: " + str(sklearn_reg_model.intercept_)) # robimy predykcje na danych testowych y_pred = sklearn_reg_model.predict(X_test) # dokładność modelu (RMSE i R2) print('RMSE %.2f' % math.sqrt(mean_squared_error(y_test, y_pred))) print('R2: %.2f' % r2_score(y_test, y_pred)) |
1 2 3 4 |
Coefficients: [[200.85387003]] Intercept: [-42.05531234] RMSE 321.86 R2: 0.98 |
Widzimy, że współczynnik nachylenia prostej też jest w okolicach 200, R2 podobnie jak poprzednio bardzo blisko jedynki, błąd średnio kwadratowy w okolicach 315 – czyli też jak poprzednio. Wyniki mamy więc dość podobne.
Jaka jest różnica? I po co to wszystko? Ano po to, że Spark poradzi sobie z bardzo dużymi danymi, Pandas i SciKit-Learn może braknąć pamięci w którymś momencie. Dodatkowo Spark potrafi rozproszyć obliczenia na wszystkie maszyny w klastrze, a scikit policzy lokalnie, na jednej maszynie. Oczywiście mając tylko jedną maszynę to wielkiego przyspieszenia nie zauważymy (a nawet obliczenia będą sumarycznie dłuższe – Spark musi wystartować co swoje też zajmuje). Są jednak przykłady zadań, gdzie dane obrabiane Sparkiem (wtedy bodaj bez HDFS) umożliwiły wykonanie zadania. Wówczas próbowałem w R, bo Sparka w R też można użyć.
Świetny artykuł. Dobrze się czyta, z niecierpliwością czekam na następne części. Może przetwarzanie strumieniowe on-line Spark / Flink?
Kolega z Wiadrodanych.pl więcej o tym pisze, polecam. Chyba nie ma sensu się powtarzać… ale zawsze jakiś pomysł się może trafić. Tutaj chciałem wprowadzić temat i pokazać z czym to się je.