Just recently I had a chance to visit the Big Data, Berlin v 10.0 meetup. I really liked a very inspiring talk from Stephan Ewen about Apache Flink. Basically I'm aware that technology is around for some time now and that it's used by some very big players.
More or less this technology is about streaming but what I liked is that it doesn't perform that bad in batch operations too. Again there are better technologies to do it, but since I really liked the talk (and one slide was about flink not totally sucking in batch). I decided to write a blog post on Apache Flink in a simple case of batch processing. Just as a heads up this technology is super new to me and I just started to get my feet wet but it might be useful to people that want to try it out.
Motivation
For the past 2-3 years I was involved in a very cool IOT project that was mostly about vehicle telemetry data. Since the beginning of the year I started to gather telemetry data about my own car drives. My approach to the problem was pretty much simple and totally uncool. I just wanted to have some telemetry data, so I gathered it by using a simple Android app GPSLogger.
There was no on line processing just CSV files that I sent to my Google Drive after I ended the drives. I didn't know what I'm going to do with the data. And then during the talk mentioned in the introduction it just hit me that I might do a heat map of all the places where my car was not moving. The idea was kind of semi cool but It's worth a shot. Just as a side note I won't release the data set to public because it's private data. But I guess data about my car not moving might identify possible bottle necks in the Croatian and/or Zagreb's traffic.
Getting started
To kick start my Hello Apache Flink project I used giter. I guess you could find out a lot more about this on line and that it's pretty much a topic of it's own. I'll just stick with the commands to get you trough the this blog post.
1 2 | $ brew install giter8 $ g8 tillrohrmann/flink-project |
1 2 3 4 5 6 7 8 9 | A Flink Application Project Using sbt name [Flink Project]: testinglocations organization [org.example]: com.msvaljek version [0.1-SNAPSHOT]: scala_version [2.11.7]: flink_version [1.1.4]: Template applied in ./testinglocations |
1 2 3 | File -> New -> Project from Existing Sources... sbt project ... |
1 2 3 4 5 | Run -> Edit Configurations... and then choose mainRunner from the Use classpath of module dropbox of your current configuration. |
Filtering out locations with Apache Flink
The code that I used is pretty straight forward, just create FindAndGroupStandingLocations object and run it by applying previously mentioned file:
package com.msvaljek | |
import java.io.File | |
import org.apache.flink.api.scala._ | |
object FindAndGroupStandingLocations { | |
def main(args: Array[String]) { | |
val dir = "/examples/scala/flink/GPSLogger/" | |
val env = ExecutionEnvironment.getExecutionEnvironment | |
val files = getListOfFiles( | |
new File(dir), List("csv")).map(_.getAbsolutePath) | |
// sorry line a bit long, but I need it :) | |
getAllStandingLocations(env, files) | |
.map(loc => | |
s"new ol.Feature({geometry: new ol.geom.Point(ol.proj.fromLonLat([${loc.lon}, ${loc.lat}]))}),") | |
.print() | |
// you could also map it some simple json | |
//s"{lat: ${loc.lat}, lon: ${loc.lon} }, | |
} | |
// some small snippet to help us load all the csv files | |
def getListOfFiles( | |
dir: File, extensions: List[String]): List[File] = { | |
dir.listFiles.filter(_.isFile).toList.filter { file => | |
extensions.exists(file.getName.endsWith(_)) | |
} | |
} | |
def getAllStandingLocations(env: ExecutionEnvironment, | |
files: List[String]): DataSet[Location] = { | |
def traverseStandingLocations( | |
fileList: List[String], locations: DataSet[Location] | |
): DataSet[Location] = fileList match { | |
case h :: t => | |
locations.union( | |
traverseStandingLocations( | |
t, getStandingLocations(env, h))) | |
case Nil => | |
locations | |
} | |
traverseStandingLocations( | |
files, env.fromElements[Location]()) | |
} | |
def getStandingLocations( | |
env: ExecutionEnvironment, filePath: String | |
): DataSet[Location] = { | |
//csv file format | |
//_1=time; _2=lat; _3=lon; _4=elevation; _5=accuracy; | |
//_6=bearing; _7=speed; _8=satellites; _9=provider | |
env.readCsvFile[Tuple9[String, String, String, String, String, String, String, String, String]]( | |
filePath = filePath, | |
fieldDelimiter = ";", | |
ignoreFirstLine = true | |
) | |
// just locations where we don't move | |
.filter(line => strToDouble(line._7) == 0) | |
// just coordinates | |
.map(line => | |
Location(strToDouble(line._2), | |
strToDouble(line._3))) | |
} | |
// I had files in european csv format | |
// meaning doubles are separated by , | |
// so out of the box parsing didn't work | |
def strToDouble(a: String): Double = | |
a.replace(',', '.').toDouble | |
// nothing too fancy just interested into coordinates | |
case class Location(lat: Double, lon: Double) | |
} |
I'll just provide a small snippet of how a csv file looks like:
1 2 3 4 | time;lat;lon;elevation;accuracy;bearing;speed;satellites;provider 2016-04-07T04:09:43Z;46,156319;15,87539;253;10;61,900002;0,374433;13;gps 2016-04-07T04:10:50Z;46,156273;15,875374;223;10;132,800003;2,146719;17;gps 2016-04-07T04:10:57Z;46,156045;15,875469;226;5;120,099998;3,563846;9;gps |
The Result
Here is the result, standing points shown on a map:
Observations
In the beginning I tried to follow the hello world Apache Flink examples, they kind of didn't work. I guess it's due to a very high dynamics with project development. Basically some stuff like mentioned in the official docs is not working at all:
The template I used to kick start a project contains references in the generated code to pages like http://flink.apache.org/docs/latest/examples.html but the links where it's pointing are not there (as you can see if you click previous link).
I had trouble with parsing european csv format so I had to write my own parsing methods for floating point numbers and read everything as a string from file
I couldn't persuade Flink the output the results to a file, so I just used print and copy pasted everything from a console output into OpenLayers map.
I guess I could do pull requests to resolve some of this stuff but point of least resistance is just mentioning it here. So I'll take it :)