Digital mobility comes with GPS navigation and tracking. We'll use Apache Spark for processing and analysing GPX files generated by a running watch.
The daily mood
Beside riding, I also like running in my free-time. Well, because of my actual life-style and health, a category of time investment that sits somewhere in between leisure and duty.
As part of my running activities, I recently started with interval training on a 400m tartan track. I run 8x 1000m fast + 600m slow, for a total of about 1 hour workout plus 15 min warm-up and 15 min cool down. This is rather a traditional training method as compared to modern High-intensity-interval-training (HIIT), but I actually needed a convenient plan for improving my racing pace, without being too much concerned of potential pain and injury.
I am equiped with a second-hand Garmin Forerunner watch, probably one of the most popular devices for capturing passable GPS/HR measures at an affordable price range of 100 to 200$ (depending on the model and version). The watch supports 2 lap modes: auto-lap and manual lap. The auto-lap mode allows you to set a fixed lap distance greater or equal to 250m (default: 1000m). The manual-lap mode allows you to press a button at the end of each lap. Lap records are then aggregated in corresponding statistics. Using manual-lap, here is a nice visualization of my last workout on Strava.
New idea
I decided to re-collect and re-aggregate my raw data for building my own custom training stats. A classical transformation.
I also want to use a technology that is able to scale, and perspectively, able to automatically recognize my training pattern using some Machine Learning.
Source data
Once paired via Bluetooth, the data is transfered via ANT+ protocol to a Smart device, and stored in binary format G.FIT (both Garmin's proprietary technologies). Activities can be uploaded to and visualized in Garmin Connect (free-of-charge SaaS). From there you can integrate with other Platforms (e.g. Google Earth, Strava), or export data into one of the following file formats:
- Lap split table to Comma-Separated-Values (CSV)
- Lap tracking points to Training Center XML (TCX)
- Tracking points to Global Positioning XML aka. Global Exchange Format (GPX)
- Original format (G.FIT)
CSV and TCX exports already hold precious aggregate values but they are based on auto-lap feature which is 1000m by default so they are good for the trash only. Not sure if an official SDK is available beside this free Perl-library for parsing G.FIT. Since original format is actually binary and multi-file, I assume some overhead and almost no reusability. So i'll work with GPX data instead. Here is the simplified structure of this file format:
<gpx> <!-- global header --> <metadata> <time>2020-06-22T13:54:45.000Z</time> </metadata> <trk> <!-- track header --> <name>Fantastic run</name> <type>running</type> <trkseg> <!-- tracking point location measured every 1 sec -->
<trkpt lat="25.120" lon="-71.354"> <!-- elevation --> <ele>0</ele> <!-- time --> <time>2020-06-22T13:54:47.000Z</time> <!-- custom data --> <extensions> <ns3:TrackPointExtension> <!-- heart rate --> <ns3:hr>78</ns3:hr> <!-- cadence --> <ns3:cad>62</ns3:cad> </ns3:TrackPointExtension> </extensions> </trkpt> ... <trk> <gpx>
Processing technology
I chose to write a small program in Apache Spark, a distributed in-memory computing engine that is especially popular in the world of Big Data. Spark runs in Scala, a functional language running on Java Virtual Machine (JVM) and natively supporting concurrent execution.
Main features of Spark:
- Extract Transform Load (ETL) including input "datasources" (e.g. files, databases), "tansformations" (e.g. filter, map) and "actions" (e.g. count, reduce)
- Distributed SQL engine (based on Apache Hive) for structured data processing
- Machine Learning Library (ML Lib)
- Stream processing
- Graph processing
Spark comes with the following Software Development Kits (SDK):
- Scala: best consistency, performance and troubleshooting
- Java: best interoperability
- Python: best user experience
System and Application Monitoring can be done via different ways:
- Web-UI
- REST API
- Listener Class
Spark 3 was released at the beginning of this year. This version brings a better support for Python3 as well as some performance enhancements.
Setup
We need to install all 3 components Java, Scala and Spark.
# Java $ sudo apt install openjdk-11-jdk $ sudo echo "JAVA_HOME=/usr/lib/jvm/default-java" >> /etc/environment $ source /etc/environment $ java -version openjdk version "11.0.7" 2020-04-14 OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu218.04) OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu218.04, mixed mode) # Scala $ sudo apt install scala $ scala -version Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL # Spark $ wget https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz $ tar -xf spark-*.tgz && rm spark-*.tgz $ sudo mv spark-* /usr/local/ $ sudo ln -s /usr/local/spark-* /usr/local/spark $ cat <<EOF >> ~/.profile # spark export SPARK_HOME=/usr/local/spark export PATH=$PATH:$SPARK_HOME/bin EOF $ source ~/.profile
Getting started
We'll use the interactive spark-shell to keep it simple while experimenting, although it is actually more common practice to instanciate pyspark from a notebook, or spark-submit an application package.
$ spark-shell Spark context Web UI available at http://localhost:4040 Spark context available as 'sc' (master = local[*], app id = local-1593421474421). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.7) Type in expressions to have them evaluated. Type :help for more information.The main program entry point is the Spark session and the connection to the cluster is the Spark context (sc) or "driver". Spark atomic entity i.e. smallest unit of computation is the Resilient Distributed Dataset (RDD), a distributed collection of immutable data. A DataFrame is a Dataset organized into named columns. A cluster is a collection of machines that run fault-tolerant operations. Now let us write our first program.
scala>
scala> val inputRdd = sc.parallelize("Hello World")inputRdd: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> println("Char count: " + inputRdd.count()) Char count: 11scala> :quitSpark also offers a number of additional modules atop of RDD API, and corresponding featrures like for example SQL functions which we are going to use intensively.
Parsing XML
Using Garmin Connect I exported some activities in GPX format to a data/1_transient folder. I'll re-start spark-shell and pass spark-xml connector library in order to extract that data.
Submit an application
Run Spark inside Kubernetes
Source code
$ spark-shell --packages com.databricks:spark-xml_2.12:0.9.0
scala> val dfTrackingpoints = spark.
read.
format("com.databricks.spark.xml").
option("rootTag", "gpx.trk.trkseg").
option("rowTag", "trkpt").
load("data/1_transient/activity_5098098091.gpx").
withColumn(
"_ref",lit("5098098091")
).
orderBy("time").
withColumn(
"tot_s",row_number().over(
org.apache.spark.sql.expressions.Window.
partitionBy(col("_ref")).
orderBy(monotonically_increasing_id()
)
)-1)
dfTrackingpoints: org.apache.spark.sql.DataFrame = [
_lat: double, _lon: double ... 4 more fields
]
Note that we are using only one partition per dataset. Reason for that is the small volume of data (a few thousands of tracking points per training activity hour).scala> dfTrackingpoints.show() +-------+-------+----+-----------+--------------------+----------+-----+ | _lat | _lon | ele| extensions| time | _ref |tot_s| +-------+-------+----+-----------+--------------------+----------+-----+ |25.1204|-71.354|23.0|[[90, 110]]|2020-06-16T11:48:...|5098098091| 1| | ...| ...| ...| ...| ...| ...| ...| +-------+-------+----+-----------+--------------------+----------+-----+ only showing top 20 rows
We may easily explore the structure of the dataset...
scala> dfTrackingpoints.printSchema() root |-- _lat: double (nullable = true) |-- _lon: double (nullable = true) |-- ele: double (nullable = true) |-- extensions: struct (nullable = true) | |-- ns3:TrackPointExtension: struct (nullable = true) | | |-- ns3:cad: long (nullable = true) | | |-- ns3:hr: long (nullable = true) |-- time: string (nullable = true) |-- tot_s: integer (nullable = true)...store it to disk...
scala> dfCumulwin.write.parquet("data/2_raw/activity_5098098091.parquet")...or run further aggregations
scala> println("Count: " + dfTrackingpoints.count()) Count: 3652Based on geographical location (latitude, longitude), we could take 2 different approaches:
- Since I am running on a round course, it is possible to leverage the 1000m start and finish location, then detect proximity and compute laps
- We'll chose the more generic way that consists in calculating distances between tracking points, then pace and training zone.
scala> :load notebook.scala ...
We'll not copy-paste the whole code here (see link to the source at the end of this post), but we'll go through the program output and use it to discuss the processing steps.
Loading notebook.scala...
import org.apache.spark.sql.DataFrame
import java.io.File
fcReadGpx: (file: java.io.File) org.apache.spark.sql.DataFrame
fcIngestAll: (files: List[java.io.File], seq: Seq[org.apache.spark.sql.DataFrame]) Seq[org.apache.spark.sql.DataFrame]
dfTrackingpoints: org.apache.spark.sql.DataFrame = [
_lat: double, _lon: double ... 5 more fields
]
+--------------------+-----+
| _ref |count|
+--------------------+-----+
|activity_51281788...| 3680|
|activity_51668094...| 3743|
|activity_50980980...| 3652|
+--------------------+-----+
The program automatically collects and reads all GPX files downloaded under data directory (round about 10 thousand records in the example). Since a tracking point is measured and stored on start button, then every 1 second until end button, the number of tracking points is the activity time + 1sec.fcCalcDist: (lat1: Double, lon1: Double, lat2: Double, lon2: Double) Double udfCalcDist: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3950/0x0000000101713840@170ba653,DoubleType,List(Some(class[value[0]: double]), Some(class[value[0]: double]), Some(class[value[0]: double]), Some(class[value[0]: double])),None,false,true) dfDistances: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ _ref: string, tot_s: int ... 1 more field ]Following to our XML extraction, we calculate geographical distances for all segments (n Tracking points - 1) build through pairs of dataframe rows. We use the Harvesine formula for that purpose, a methodolofy assuming that the earth is a perfect spheroid, and that geographical points are located at sea level. The algorithm typically multiplies a distance coefficient with the mean radius of the earth (637.10083 km), which works well especially for locations that are far form the poles and far from each other (2 big cities). Since I actually run a very small distance each second, I may have required another formula but I couldn't find any so easily.
dfCumultot: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_ref: string, tot_s: int ... 2 more fields]
+--------------------+----------+------------------+ ]
| _ref|max(tot_s)| max(tot_m)|
+--------------------+----------+------------------+
|activity_51281788...| 3679|13139.852899999994|
|activity_51668094...| 3742|13034.647000000004|
|activity_50980980...| 3651|12754.718799999999|
+--------------------+----------+------------------+
Based on the mean radius of the earth, I found that the sum of all distances max(tot_m) was significantly different than the total distance reported by Garmin, so I adjusted it until the result matched.dfCumultot: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_ref: string, tot_s: int ... 3 more fields]We took the occasion to calculate an overall mean pace value (in ms) which we will need later for classification.
dfPreAgg: org.apache.spark.sql.DataFrame = [_ref: string, max(tot_s): int ... 3 more fields]
+--------------------+----------+------------------+------------------+-----+
| _ref|max(tot_s)| max(tot_m)| mean_m_s|dummy|
+--------------------+----------+------------------+------------------+-----+
|activity_51281788...| 3679|13139.852899999994|3.5715827398749647| 0|
|activity_51668094...| 3742|13034.647000000004|3.4833369855692156| 0|
|activity_50980980...| 3651|12754.718799999999| 3.493486387291153| 0|
+--------------------+----------+------------------+------------------+-----+
mean_m_s: Double = 3.6661353709117774
pace_agg_window: Int = 3
dfCumulwin: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_ref: string, tot_s: int ... 2 more fields]
Next, there are further accuracy issues due to GPS transmission reliability, and potentially even GPS error magniture. So we need to apply a sort of "error correction". Instead of filtering noise, I chose to aggregate the small tracking segments into bigger ones, a time-series practice commonly known as statistical resampling, which I happened to call "pace aggregation window". 3 seconds appeared to me as a good compromise between ironing errors and missing lap boundaries.fast_split_m: Double = 11.148406112735334 dfClassification: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_ref: string, tot_s: int ... 3 more fields] +--------------------+-----+------------------+------------------+---+ | _ref|tot_s| tot_m| win_m|cat| +--------------------+-----+------------------+------------------+---+ |activity_50980980...| 3|5.9510000000000005|5.9510000000000005| 0| |activity_50980980...| 6| 25.8598| 19.9088| 1| |activity_50980980...| 9| 54.4401| 28.5803| 1| |activity_50980980...| 12| 82.32480000000001| 27.88470000000001| 1| |activity_50980980...| 15| 102.8829|20.558099999999996| 1| |activity_50980980...| 18|114.01210000000002|11.129200000000012| 1| |activity_50980980...| 21|116.67070000000001|2.6585999999999927| 0| |activity_50980980...| 24| 125.2339| 8.563199999999995| 0| |activity_50980980...| 27| 140.892| 15.65809999999999| 1| |activity_50980980...| 30| 154.5716|13.679599999999994| 1| |activity_50980980...| 33|168.71489999999997|14.143299999999982| 1| |activity_50980980...| 36|185.43959999999998|16.724700000000013| 1| |activity_50980980...| 39|193.61649999999997| 8.17689999999999| 0| |activity_50980980...| 42|206.13669999999996|12.520199999999988| 1| |activity_50980980...| 45|222.52129999999997|16.384600000000006| 1| |activity_50980980...| 48|244.14579999999998|21.624500000000012| 1| |activity_50980980...| 51| 263.7145|19.568700000000007| 1| |activity_50980980...| 54|284.70529999999997| 20.99079999999998| 1| |activity_50980980...| 57| 297.9521|13.246800000000007| 1| |activity_50980980...| 60|300.00789999999995|2.0557999999999765| 0| +--------------------+-----+------------------+------------------+---+ only showing top 20 rowsRemind that even if i'm not running at constant pace, my running intervals should last about 4 minutes and be either fast or slow, not "half". We just added a column "cat" for the pace category, based on mean pace calculated at pre-aggregation phase. As we can see, this classification approach doesn't get rid of noise i.e. a few negative spikes from fast split land in category slow whereas a few positive spikes from slow split land in category fast. And since the error spikes show quite a huge amplitude, it wouldn't help much to implement an activation function with a hysteresis form.
fcSetCat2: (weight: Int)IntIn order to get more consistency, we ran a second classification pass based on the sum of category values from the last n rows, and switch on and off accordingly. As we can see, the cat2 column may record a small activation delay but then keeps consistent. A similar approach exists in a popular ML classification algorithm called K-neared neighbors: When you have no idea about the class of a specific record, then look around it. This dataset is also written to disk (data/3_trusted) in parquet format.
udfSetCat2: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4499/0x00000001018dc840@644c0e,IntegerType,List(Some(class[value[0]: int])),None,false,true)
dfClassification2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_ref: string, tot_s: int ... 5 more fields]
+--------------------+-----+------------------+------------------+---+------+----+ | _ref|tot_s| tot_m| win_m|cat|weight|cat2| +--------------------+-----+------------------+------------------+---+------+----+ |activity_50980980...| 3|5.9510000000000005|5.9510000000000005| 0| 0.0| 0| |activity_50980980...| 6| 25.8598| 19.9088| 1| 1.0| 0| |activity_50980980...| 9| 54.4401| 28.5803| 1| 2.0| 0| |activity_50980980...| 12| 82.32480000000001| 27.88470000000001| 1| 3.0| 0| |activity_50980980...| 15| 102.8829|20.558099999999996| 1| 4.0| 1| |activity_50980980...| 18|114.01210000000002|11.129200000000012| 1| 5.0| 1| |activity_50980980...| 21|116.67070000000001|2.6585999999999927| 0| 5.0| 1| |activity_50980980...| 24| 125.2339| 8.563199999999995| 0| 5.0| 1| |activity_50980980...| 27| 140.892| 15.65809999999999| 1| 5.0| 1| |activity_50980980...| 30| 154.5716|13.679599999999994| 1| 5.0| 1| |activity_50980980...| 33|168.71489999999997|14.143299999999982| 1| 5.0| 1| |activity_50980980...| 36|185.43959999999998|16.724700000000013| 1| 5.0| 1| |activity_50980980...| 39|193.61649999999997| 8.17689999999999| 0| 4.0| 1| |activity_50980980...| 42|206.13669999999996|12.520199999999988| 1| 5.0| 1| |activity_50980980...| 45|222.52129999999997|16.384600000000006| 1| 6.0| 1| |activity_50980980...| 48|244.14579999999998|21.624500000000012| 1| 6.0| 1| |activity_50980980...| 51| 263.7145|19.568700000000007| 1| 6.0| 1| |activity_50980980...| 54|284.70529999999997| 20.99079999999998| 1| 6.0| 1| |activity_50980980...| 57| 297.9521|13.246800000000007| 1| 6.0| 1| |activity_50980980...| 60|300.00789999999995|2.0557999999999765| 0| 6.0| 1| +--------------------+-----+------------------+------------------+---+------+----+ only showing top 20 rows
fcCalcPace: (dMeter: Double, nSec: Int)String udfCalcPace: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4723/0x0000000101a24040@10f36db1,StringType,List(Some(class[value[0]: double]), Some(class[value[0]: int])),None,true,true) dfResult: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_ref: string, cat2: int ... 3 more fields] +--------------------+----+-------+------+-----------+ | _ref|cat2| dist_m|time_s|pace_min_km| +--------------------+----+-------+------+-----------+ |activity_50980980...| 0|6094.66| 2010| 5:29| |activity_50980980...| 1|6660.05| 1641| 4:06| |activity_51281788...| 0|5038.82| 1662| 5:29| |activity_51281788...| 1| 8098.7| 2016| 4:08| |activity_51668094...| 0|5007.04| 1803| 6:00| |activity_51668094...| 1|8025.25| 1938| 4:01| +--------------------+----+-------+------+-----------+
Finally, our result provides a running distance, time and pace by category. The distribution between fast and slow varies depending on my pace aggregation window and mean pace value assumptions. This dataset is written to disk (data/4_refined) in CSV format this time, since the volume is tiny and not all downstream applications support parquet.
In terms of workout analysis, it looks like I was faster at first try than at second try, but I missed the target of running fast over a total distance of 8000m. Third time I made both distance and pace, so I definitely improved. As we know, performance does not behave linearly. It goes up and down depending on many parameters; it shows you in best case a logarithmic trend (cf. performance curve).
In terms of workout analysis, it looks like I was faster at first try than at second try, but I missed the target of running fast over a total distance of 8000m. Third time I made both distance and pace, so I definitely improved. As we know, performance does not behave linearly. It goes up and down depending on many parameters; it shows you in best case a logarithmic trend (cf. performance curve).
Submit an application
This is an example of command to submit a compiled Scala archive (using sbt) or Python script to a Spark cluster.
$ sbt new scala/hello-world.g8 $ cat hello-world-template/src/main/scala/Main.scala object Main extends App { println("Hello, World!") } $ cd hello-world-template/ $ sbt package $ spark-submit --class "Main" target/scala-2.13/hello-world_2.13-1.0.jar Hello, World!For instantiating Spark session and context we'll need the following base:
$ mkdir -p my-spark-app/src/main/scala $ cd my-spark-app $ cat <EOF >> build.sbt
scalaVersion := "2.12.10"
name := "my-spark-app"
organization := "ch.epfl.scala"
version := "1.0"
libraryDependencies += "org.typelevel" %% "cats-core" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" EOF $ cat <EOF >> my-spark-app/src/main/scala/MySparkApp.scala import org.apache.spark.SparkContext import org.apache.spark.SparkConf object MySparkApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("my-spark-app") val sc = new SparkContext(conf) println("Hello Spark!") } } EOF $ sbt package $ spark-submit \ --class MySparkApp \ target/scala-2.12/my-spark-app_2.12-1.0.jar 20/07/02 21:45:21 INFO SparkContext: Running Spark version 3.0.0 20/07/02 21:45:21 INFO SparkContext: Submitted application: my-spark-app 20/07/02 21:45:21 INFO Utils: Successfully started service 'sparkDriver' on port 40593. 20/07/02 21:45:21 INFO Utils: Successfully started service 'SparkUI' on port 4040. 20/07/02 21:45:22 INFO Executor: Starting executor ID driver on host Hello Spark! 20/07/02 21:45:22 INFO SparkContext: Invoking stop() from shutdown hook
Run Spark inside Kubernetes
Now we want to submit our application to a Spark cluster instead of running it standalone. Starting with v2.3, Spark ships with Dockerfiles (Standard, Python and R bindings) and a build tool that allows for building an image used as the base for spining-up a Spark driver and executors containers.
Take away
$SPARK_HOME/bin/docker-image-tool.sh \ -r localhost:32000 \ -t spark3-std \ -p $SPARK_HOME/kubernetes/dockerfiles/spark/Dockerfile \ build $ docker save localhost:32000/spark:spark3-std > spark.tar $ microk8s ctr image import spark.tar $ rm spark.tarThe application will be submitted to the Kubernetes API server using via some RBAC privilege.
$ kubectl create namespace spark-test namespace/spark-test created $ kubectl -n spark-test create serviceaccount spark serviceaccount/spark created $ kubectl create clusterrolebinding spark-role \ --clusterrole=edit \ --serviceaccount=default:spark \ --namespace=spark-test clusterrolebinding.rbac.authorization.k8s.io/spark-role createdWe also need to trust the certificate from API server which is SSL enabled:
$ export K8_CACERT=/var/snap/microk8s/current/certs/ca.crt $ $JAVA_HOME/bin/keytool -import -noprompt -trustcacerts \ -alias microk8s \ -file $K8_CACERT \ -keystore $JAVA_HOME/lib/security/cacerts \ -storepass changeit Warning: use -cacerts option to access cacerts keystore Certificate was added to keystore $ export K8S_TOKEN=`kubectl -n kube-system describe secret $(kubectl -n kube-system get secrets | grep default-token | cut -d' ' -f1) | grep token: | sed 's/token:\s*//'`Finally we may submit our application to deploy in cluster mode, and even orchestrate it natively via the Kubernetes scheduler.
$ spark-submit \
--master k8s://https://127.0.0.1:16443 \
--deploy-mode cluster \
--name MySparkApp \
--class MySparkApp \
--conf spark.kubernetes.namespace=spark-test \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=localhost:32000/spark:spark3-std \
--conf spark.kubernetes.authenticate.caCertFile=$K8_CACERT \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.submission.oauthToken=$K8S_TOKEN \
--conf spark.kubernetes.file.upload.path=/tmp/spark-upload \
target/scala-2.12/my-spark-app_2.12-1.0.jar
Take away
We have built a scalable application using Apache Spark for analysing GPX records from multiple running workouts with interval component. We intensionally let ML techniques apart for now. Because of the bad quality of the data, we struggled a bit at ironing it using an aggregate pace window, and classifying it using an aggregate split window. In a later post, we will see how ML can help automating classification at scale, provided that a labelled dataset like ours has previously been created in order to train the model. We will also integrate the cadence and heart rate as further ML features.
Source code
Comments
Post a Comment