Spark, czyli opóźnienia pociągów

Czytanie zajmie Ci około 15 minut

Czy polskie pociągi są punktualne? To pytanie, na które poszukamy odpowiedzi, ale jest ono tylko przyczynkiem do dzisiejszego wpisu.

Chciałbym pokazać dzisiaj sposób znalezienia odpowiedzi, nawet nie samą odpowiedź. Skąd wziąć dane? Jak sobie z nimi poradzić? Czym je uzupełnić?

Pozyskanie danych

W pierwszym odruchu pomyślałem o stronach PKP i webscrappingu. Ale na Twitterze mojego bota @rstatspl obserwuje inny bot – @OpPociagow, który prezentuje ciekawe dane. Skoro prezentuje to je ma. Skoro ma, to gdzieś są. Chwila rozmowy z twórcą i uzyskujemy adres InfoPasażer Archiver – archiwum opóźnień pociągów.

Dane o rozkładach i opóźnieniach

Na stronie tej znajdziemy pliki z archiwalnymi danymi. Tego szukamy. Ściągamy więc archiwum z danymi za lata 2017/2018 i patrzymy co jest w środku (po prostu go rozpakowując i oglądając pliki).

Z poziomu systemów linuksowych wystarczy:

Plik jest archiwum 7-zip, potrzebujemy więc go czymś rozpakować. Znowu (na Ubuntu) robimy to z poziomu konsoli:

co spowoduje wypakowanie plików z danymi (są w formacie JSON) do nowego folderu json. Jeśli popatrzymy na te dane to jest ich ogrom. Każdy plik zawiera informacje o poszczególnym pociągu. Przerabianie każdego po kolei będzie trwało wieki. A chcemy mieć dane globalne a nie z każdy z osobna – zresztą mając wszystkie dane możemy wyłuskać dane o pojedynczym pociągu…

Z pomocą może przyjść baza danych (w archiwum jest plik SQL wpisujący dane do bazy – jego wykorzystanie to inny sposób) albo na przykład Apache Spark. Sparka instaluje się łatwo, razem z nim przychodzi Scala. W Scali zaimportujemy pliki JSON do Sparka (do parqueta), a później z poziomu R będziemy sobie na nich operować tak jak na danych w tabeli SQLowej.

Uruchamiamy więc spark-shell i tam w konsoli uruchamiamy skrypt:

W kolejnych krokach skrypt robi:

  • wczytuje do df1 wszystkie pliki json z podanego folderu (to jest power – jedną linią czesamy całą masę podobnych plików)
  • do df2 wybiera z df1 odpowiednie pola JSONa i rozwija (explode) te, które trzeba
  • do df3 z df2 rozwija kolejne zagnieżdżenie
  • do df4 wybiera z df3 odpowiednie kolumny i zmienia ich typ
  • na koniec df4 zapisuje do parquetu trains_parquet – to z niego będziemy korzystać w R

Teraz w R możemy połączyć się z naszym parquetem i zobaczyć co tam mamy w środku. Dzisiaj przydadzą nam się biblioteki:

train_id train_name schedule_id schedule_date arrival_delay arrival_time departure_delay departure_time station_name
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 NA NA 0 2018-12-08 17:04:00 Rzeszów Główny
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 2 2018-12-08 17:11:00 2 2018-12-08 17:11:30 Rudna Wielka
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 3 2018-12-08 17:14:00 3 2018-12-08 17:14:30 Świlcza
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:17:30 4 2018-12-08 17:18:00 Trzciana
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:21:30 4 2018-12-08 17:22:00 Będziemyśl
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:26:00 4 2018-12-08 17:26:30 Sędziszów Małopolski
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:29:36 4 2018-12-08 17:30:06 Ropczyce Witkowice
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:32:30 4 2018-12-08 17:33:00 Ropczyce
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:36:00 4 2018-12-08 17:36:30 Lubzina
604 34804/5 KAROLINKA 53468006 2018-12-07 23:00:00 4 2018-12-08 17:40:30 4 2018-12-08 17:41:00 Dębica Wschodnia

Ciekawe dane, a mamy ich jakieś 16.2 mln wierszy. No to już jest big data! :)

W tym momencie możemy liczyć sobie jakieś rzeczy. Wcześniej jednak warto pooglądać dane (wybierając jeden dzień, jeden pociąg, jedną stację). Ja zrobiłem to za Was, więc wiem, że:

  • opóźnienia są różne, czasem jakieś całkowicie nierealne – na przykład -520 minut (tak, minus – pociąg przyjechał za wcześnie! Kilka godzin!)
  • pociąg na danej stacji może mieć opóźniony przyjazd i opóźniony odjazd, może krócej stać na stacji i niwelować opóźnienie
  • jedno jest pewne – na stacji początkowej nie ma wartości dla arrival_delay, a na stacji końcowej – nie ma departure_delay.

Sprawdźmy czy opóźnienie przyjazdu na stację i odjazdu z niej jest mniej więcej równe (czyli czy rzeczywiście pociągi nadrabiają opóźnienia na stacjach). Przy okazji zwróćcie uwagę, że df jest tabelą w parquet Sparka, a traktujemy ją tak samo jakby była to tabela w pamięci! Niesamowicie to wygodne.

delta n p
FALSE 1573757 9.736859
TRUE 12939333 80.055858
NA 1649791 10.207283

Widzimy, że 80% opóźnień nie jest nadrabianych na stacji. Z tego możemy wysnuć wniosek, że opóźnienie przyjazdu na stację (lub odjazdu z niej) jest poi prostu opóźnieniem pociągu na stacji. To cenna informacja, bo zamiast przerabiać dwie kolumny o opóźnieniach (arrival_delay i departure_delay) możemy skupić się na jednej – ja wybrałem arrival_delay. Dodatkowo dla uproszczenia z dalszych rozważań usuniemy stacje początkowe (czyli tam, gdzie arrival_delay jest puste). Dlaczego początkowe? Bo skupiamy się na arrival_delay.

Jak wygląda rozkład opóźnień?

W dalszej części zastosujemy jeszcze jedno uproszczenie: wybierzemy tylko te dane, gdzie opóźnienie przyjazdu mieści się w \mu \pm 2 \sigma co oznacza wybranie 95.5% wartości leżących blisko średniej:

Ciekawostka: zdefiniowaliśmy df_filltered i normalnie byłaby to już nowa wartość trzymana w pamięci. Jednak nie mamy wyraźnego pobrania (brak collect() na końcu) danych ze Sparka do R więc jest to tylko definicja. Wykona się dopiero wtedy kiedy wyraźnie tego zażądamy.

Opóźnienia

średnie opóźnienie według dnia

Zobaczmy teraz jak globalnie wyglądają opóźnienia pociągów (precyzyjniej: przyjazdów pociągu na stację) według dnia.

Uśredniamy dane w poszczególnych dniach dla wszystkich stacji. Powiemy tylko tyle, że w jakimś dniu roku pociągi opóźniały się bardziej lub mniej niż w innym. To nie powie nam nic o tym które to pociągi, czy zawsze tak jest itd.

Te uśrednione dane będą nam potrzebne jeszcze za chwilę – możemy pokazać je w różny sposób – więc niech Spark wyliczy, a wynik zachowamy sobie w pamięci:

Policzmy teraz średnią (ze średnich godzinowych) dla każdego dnia:

Nie jest źle – pociągi średnio spóźniają się niecałe dwie minuty. Wiosną trochę mniej niż latem i zimą. Czy to jest prawdziwe? Wkraczamy na pole statystyki i wnioski wysnute na wszystkich danych mogą być mylące. Bo w naszych danych mamy jakiś zbiór pociągów – być może wszystkie. Analizując listę stacji widać że są to często pociągi podmiejskie, SKM-ki i podobne. Te może się nie spóźniają bardzo, jeździ ich dużo co robi masę dla średnich.

Dla przykładu weźmy pierwszy z kraju pociąg pospieszny jadący z Przemyśla do Szczecina 38174/5 PRZEMYŚLANIN:

I tutaj sytuacja nie wygląda już tak różowo. Możemy jeszcze rozłożyć te dane na poszczególne stacje:

Od razu rzuca się w oczy, że pociąg jeździ różną trasą w różnych okresach roku (raz przez Wrocław, raz przez Ostrów Wielkopolski), w wakacje ten konkretny nie jeździ (ale są inne “Przemyślaniny” z innymi numerami). Gdyby poukładać stacje zgodnie z trasą być może udałoby się zobaczyć, że na pewnych odcinkach opóźnienie rośnie.

Wróćmy do globalnych rozważań: czy opóźnienia w ciągu tygodnia wyglądają jakoś charakterystycznie?

Te poranne pociągi są raczej punktualne. Popołudniowy szczyt w piątki generuje opóźnienia. Znowu – są one niewielkie, bo patrzymy na wszystkie pociągi. Być może w rozbiciu na poszczególne składy wygląda to inaczej?

średnie opóźnienie według pociągu

Czy są pociągi, które opóźniają się zazwyczaj najbardziej? Poszukajmy 10 takich, dla których średnia opóźnień na kolejnych stacjach jest największa:

train_id train_name ma
2759 50966 28.00000
2332 90807/6 27.07143
2574 77728 27.00000
3837 15205/4 CZERWONY 27.00000
4446 84104/5 NIEKOŃCZĄCA SIĘ OPOW 27.00000
4357 90529 26.88889
2775 31142/3 25.00000
3330 60708 24.89474
4663 55376 24.40000
2765 95903 24.17647

Co to za pociągi? Którędy jeżdżą? Trzeba wyszukać :P

średnie opóźnienie wg stacji

To samo możemy sprawdzić dla poszczególnych stacji – czy są stacje, na które wszystkie pociągi zazwyczaj przyjeżdżają opóźnione?

station_name ma
Stalowa Wola Rozwadów Towarowa 329.00000
Psary 93.00000
Przemyśl Bakończyce 86.18182
Grodno 72.50000
Hrubieszów Miasto 39.43284
Warszawa Grochów 33.28571
Cierpice 30.00000
Warszawa Gdańska 29.94444
Doboszowice 27.22222
Paczków 27.11111
Otmuchów Jezioro 27.11111

Stalowa Wola Rozwadów Towarowa to jakiś pojedynczy przypadek, outliner (występuje chyba tylko raz w bazie).

Lokalizacja stacji

Przejdźmy jednak dalej. Spróbujmy narysować sieć połączeń.

Dane o lokalizacji stacji

Przede wszystkim potrzebujemy lokalizacji poszczególnych stacji. W danych tej informacji nie mamy, trzeba posiłkować się innym źródłem. I znowu z pomocą przychodzi internet, trochę Googla i znajdujemy stronę bazakolejnowa.pl na której mamy bogatą bazę stacji, między innymi z ich położeniem na mapie.

Nie jest jednak tak wesoło, bo położenie jest pokazane na mapie a nie zapisane gdzieś na stronie. Rzut oka w kod strony prezentującej mapę dla losowej stacji i widzimy, że mapa generowana jest jedną funkcją w JavaScript, w której zaszyte są współrzędne geograficzne stacji. Trzeba więc pobrać stronę, wyłuskać z niej kod JS i z tego kodu współrzędne. Pełny skrypt poniżej:

W wyniku (po dobie, może dwóch) dostajemy plik lokalizacje_stacji.rds z informacjami o lokalizacjach stacji. Możemy sobie narysować mapkę stacji – to niezła ciekawostka typu #widaćzabory:

Obserwujący Dane i Analizy na Facebooku już widzieli powyższą mapkę. Dlatego warto zostać fanem :)

Najpopularniejsze połączenia

Problem z narysowaniem mapy jest jeden, ale istotny – nazwy stacji w rozkładzie (w tabeli z opóźnieniami) różnią się czasem od nazw zebranych razem z ich lokalizacjami. Trzeba to jakoś ujednolicić. Można ręcznie (hehehe, oszalał!), a można sposobem.

Spróbujmy poczynić tak:

  • porównajmy listę stacji z rozkładu z listą z lokalizacjami
  • to co się zgadza łatwo dopasujemy (join na tabelach doda nam współrzędne)
  • to co się nie zgadza dopasujemy przez podobieństwo nazw jakimś algorytmem
  • każdą niepasującą nazwę z lokalizacji porównamy ze wszystkimi nazwami z rozkładu
  • za właściwą uznamy tę, która będzie najbardziej podobna i z niej weźmiemy współrzędne

Zatem do dzieła! Poniżej więcej treści w komentarzach w kodzie

Teraz szukamy stacji, których nie udało się połączyć.

Teraz wygenerujemy sobie trasy pociągów. Dany pociąg jedzie przez poszczególne stacje zgodnie z czasem (nie może być na stacji numer 3 wcześniej niż na stacji numer 2).

Relacja pociągu to pierwsza i ostatnia stacja. Na podstawie numerów możemy to oszacować (z jakimś błędem):

Policzmy jak dużo pociągów jeździ na danych relacjach:

Dodajmy do tego współrzędne stacji:

Na mapce pokażemy nazwy najbardziej popularnych początków i końców tras – wybierzmy więc miasta najczęściej będące początkiem/końcem trasy:

I na koniec mamy gotową mapę z siecią połączeń:

Co z tym jeszcze można zrobić? Na pewno jest kilka rzeczy:

  • można poszukać wszystkich odcinków pomiędzy stacjami i policzyć średni czas przejazdu na tych odcinkach
  • dla wszystkich pociągów
  • dla wybranej relacji
  • porównując taki czas z czasem w poszczególnych dniach znajdziemy momenty w roku, kiedy pociągi na danym odcinku mają największe opóźnienia
  • mając sieć połączeń pomiędzy dwoma stacjami jesteśmy w stanie zbudować graf takich połączeń
  • a mając taki graf możemy zrobić wyszukiwarkę najbardziej optymalnych połączeń (ale trzeba pamiętać, że nie każdy pociąg zatrzymuje się na każdej stacji)

Jeśli ktoś interesuje się koleją o wiele bardziej niż ja (czytaj: posiada wiedzę dziedzinową) na pewno wpadnie na kilka innych pomysłów. Będzie też potrafił rozróżnić pociągi po ich numerach (bo coś te numery znaczą – typ pociągu (osobowy, pospieszny, ekspres), być może przewoźnika albo relację), a na tej podstawie przygotować odpowiednie zestawienia.

Celem dzisiejszego wpisu było przede wszystkim pokazanie Wam jak posłużyć się Sparkiem z poziomu R (wcześniej importując do Sparka potężne ilości danych z plików – w tym wypadku – JSON) oraz jak wyciągnąć ze strony kawałek kodu JavaScript i informacje w nim zawarte. Dodatkowo (mam nadzieję) dowiedzieliście się jak połączyć dane z różnych źródeł, nawet jeśli klucze nie są identyczne (mam na myśli szukanie podobieństwa tekstów w nazwach stacji). Sama informacja o tym czy pociągi się spóźniają czy nie była drugorzędna.

2 komentarze do wpisu „Spark, czyli opóźnienia pociągów”

Dodaj komentarz