Beam ❤️ Kotlin = Midgard library

Mazlum Tosun
Google Cloud - Community
3 min readJan 31, 2023

--

Because Beam ❤️ Kotlin, I created a new open source library called Midgard to :

  • Have more concise and expressive code
  • Remove Beam boilerplate code
  • Propose more Functional Programming style

This library proposes some extensions on Beam PCollection DoFn and IO Connectors

Behind the scene Kotlin extensions are used, the main advantage of this technic is adding behaviours and methods to an existing structure without affecting it.

Extensions on PCollection

Usual Beam operators : map, flatMap and filter

Test data :

val psgPlayers = listOf(
Player(firstName = "Kylian", lastName = "Mbappe", 24),
Player(firstName = "Marco", lastName = "Verrati", 28)
)

val realPlayers = listOf(
Player(firstName = "Karim", lastName = "Benzema", 35),
Player(firstName = "Luca", lastName = "Modric", 39)
)

// Given.
val psgTeam = Team(name = "PSG", slogan = "Ici c'est Paris", psgPlayers)
val realTeam = Team(name = "REAL", slogan = "Hala Madrid", realPlayers)

Example of usual Beam pipeline with map, flatMap and filter operations :

val resultPlayers: PCollection<Player> = pipeline
.apply("Create", Create.of(listOf(psgTeam, realTeam)))
.apply(
"To Team with Slogan V2",
MapElements
.into(TypeDescriptor.of(Team::class.java))
.via(SerializableFunction { it.copy(slogan = "${it.slogan} VERSION 2") })
)
.apply(
"To Players",
FlatMapElements
.into(TypeDescriptor.of(Player::class.java))
.via(SerializableFunction { it.players })
)
.apply("Filter age > 25", Filter.by(SerializableFunction { it.age > 25 }))

The same pipeline with Midgard library :

import fr.groupbees.midgard.*

val resultPlayersMidgard: PCollection<Player> = pipeline
.apply("Create", Create.of(listOf(psgTeam, realTeam)))
.map("To Team with Slogan V2") { it.copy(slogan = "${it.slogan} VERSION 2") }
.flatMap("To Players") { it.players }
.filter("Filter age > 25") { it.age > 25 }

For each operator, there is its equivalent with Midgard :

  • MapElements -> map
  • FlatMapElements -> flatMap
  • Filter -> filter

To use extensions offered by Midgard, you have to add the following import in the code :

import fr.groupbees.midgard.*

Another big advantage of using Kotlin extensions, is the possibility to mix native methods of the PCollection with those specific to Midgard. The previous example contains :

Native method of the PCollection

.apply("Create", Create.of(listOf(psgTeam, realTeam)))

Mixed with extensions and methods brought by Midgard :

.apply("Create", Create.of(listOf(psgTeam, realTeam)))
.map("To Team with Slogan V2") { it.copy(slogan = "${it.slogan} VERSION 2") }
.flatMap("To Players") { it.players }
.filter("Filter age > 25") { it.age > 25 }

The map, flatMap and filter operators take as parameters :

  • The name and pipeline step
  • Lambda expression or the implementation of the function, to apply the needed operation
  • The Beam TypeDescriptor is deduced inside the operators

Installation of the project

The project is hosted on Maven repository.
You can install it with all the build tools compatibles with Maven.

Example with Maven and Gradle :

Maven :

<dependency>
<groupId>fr.groupbees</groupId>
<artifactId>midgard</artifactId>
<version>0.15.0</version>
</dependency>

Gradle :

implementation group: 'fr.groupbees', name: 'midgard', version: '0.15.0'

To have more details for the library, you can check the link to the project and the README.md file :

If you like my articles and want to see my posts, follow me on :

- Medium
-
Twitter
-
LinkedIn

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Head of Data & Cloud GroupBees | Data | Serverless | IAC | Devops | FP