Category Archives: Development

Lazy Migrations in MongoDB with Scala Salat

In this post I’ll outline how we perform lazy migration with MongoDB using Salat, a case class serialization library for Scala. Salat in conjunction with a REST-like storage layer make for a very good approach to handle lazy migration of your database. Though there are still some open questions, specifically how to handle more sophisticated schema designs, this approach has proven very successfull the last months in migrating our doctivity schema numerous times.

MongoDB Storage Layer

Lets consider a simple user model:

{
  "_id": 1,
  "email": "user@example.com",
  "name": "Test User"
}

Now, while the application is evolving we might have to add additional information to this model, e.g., the last login timestamp:

{
  "_id": 1,
  "email": "user@example.com",
  "name": "Test User",
  "lastLogin": 1234
}

Having both types of documents in MongoDB is not an issue. But at some point you have to use those documents, and at that point you have to know what information is available. Writing the code in a way that knows all different versions of the user document at every possible place is cumbersome. Therefore, one needs a strategy to migrate documents in a controlled manner and to provide a single, reliable API to the rest of the application.

Using Salat

The first user model could be represented as the following case class:

case class User(@Key("_id") id: Long, email: String, name: String)

We are using Salat to serialize this case class into a BSON document. The case class member names are mapped 1:1 to fields in the document. Using the @Key annotation, we can controll the mapping, i.e., the id member becomes the _id field.

Given an object of that case class, let’s call it user, we can transform it into a MongoDBObject and back as follows:

val dbo = grater[User].asDBObject(user)
val userAgain = grater[User].asOject(dbo)

The dbo object is a normal Casbah MongoDBObject which you can store into MongoDB nomally. Consequently, you can also retrieve a document using Casbah, put it through grater and get a scala object of the given case class.

The storage layer consists of a set of stores that are able to store and retrieve different model objects. For the user we would have a UserStore. The API is basically RESTful. You put representations into the store, either new ones or updates for existing ones, you can query for, or delete them.

trait UserStore {
  def save(user: User)
  def findById(id: Long): Option[User]
  // ...
}

Obviously, with more demanding requirements, it might be necessary to extend this simple API, e.g., allowing to push to arrays directly without the need to deserialize complete representations first, and then storing them back, but we’ll stick with this simple base architecture first.

Versioning Documents

Now, if we need to add the timestamp of the last login to this model, we have to consider that all already stored users don’t have this timestamp yet. If we just extend the case class, deserialization of old users will fail, since the case class constructor requires a value for this field. We could make this field an Option, but that would effectively make the fact that there are old documents visible to the outside of the storage layer. Obviously there are cases where information is optional, but in this specific case we assume the lastLogin timestamp is a mandatory field in the application. The updated case class therefore becomes:

case class User(@Key("_id") id: Long, email: String, name: String, lastLogin: DateTime)

But how do we distinguish between old and new documents? One approach would be to check whether the lastLogin property is available. In this case it would even work, but in other cases this might lead to problems, e.g., when the format of an inline document changes between versions. In that case, checking for the version based on the content might easily become cumbersome.

Another approach is to store the version of the document explicitly. We prefer this approach, as it is easy to implement and makes the versioning explicit and easier to understand. Using Salat versioning is as simple as adding a version field:

case class User(@Key("_id") id: Long, email: String, name: String) {
  @Persist val _version = 1
}

We do not add the version as a constructor parameter, but as a “constant” that is fixed for the case class. The @Persist annotation tells Salat to serialize this member to the document. A resulting document would then look like:

{
  "_id": 1,
  "_version": 1,
  "email": "user@example.com",
  "name": "Test User"
}

Now we have the versioning information in the database and can act upon it accordingly.

Lazy Migration

Lazy migration means you migrate the document when you encouter it. That is to say, everytime we get a document from the database and want to transform it into a case class, we check the version and update the document it if needed.

To the outside of the storage layer, only the newest version is known. We do not expose old versions to the outside world, as that would mean loss of control. We would spread knowledge of old versions and migration paths across several parts of the application, something we should clearly avoid.

Internally, we use a function that wraps the migrations:

class UserStoreImpl extends UserStore {
  def findById(id: Long): Option[User] = {
    col.findOne(MongoDBObject("_id" -> id)) map (buildObject(_))
  }

  private def buildObject(dbo: MongoDBObject): User = {
    grater[User].asObject(dbo)
  }
}

And obviously, this is the place to handle old versions. Typically, we check for the version field in dbo, and handle each version in a match block:

private def buildObject(dbo: MongoDBObject): User = {
  dbo.get("_version") match {
    case Some(2) => grater[User].asObject(dbo)
    case Some(1) => buildObject_v1(dbo)
    case _ => throw new IllegalStateException("illegal version")
  }
}

The most recent version can be gratered directly, old versions are dispatched to some method that knows how to make recent version out of them. If an unknown version is encountered we throw an exception, since this is something that shouldn’t happen. You might also decide to handle this case differently, e.g., by returning an option from the build method, log an error, and handle the situation more gracefully. How to handle this depends on the application, your personal preferences, and your style.

One question remains: how do we handle the old document. We can not grater it into the most recent case class, as the lastLogin field is missing. We could parse the document then create on object manually, but that would mean extra work and not take advantage of Salat.

In a case such as our example, I would keep the old case class under a new name

case class User_v1(@Key("_id") id: Long, email: String, name: String) {
  @Persist val _version = 1
}

and then implement buildObject_v1 as follows:

def buildObject_v1(dbo: MongoDBObject): User = {
  val old = grater[User_v1].asObject(dbo)
  val updated = User(old.id, old.email, old.name, DateTime.now)
  save(updated)
  updated
}

This creates an instance of the old case class from the document, uses the available information as input for the case class constructor, and a more or less reasonable default value for the last login timestamp. We then save this migrated user object, which replaces the old document. Finally, we return the object.

Open Issues

This approach assumes a REST-like storage layer. So far we always store and retrieve complete representations of a domain object. However, MongoDB often requires schemas that contain inline documents to allow for more performant access to information.

An example could be user notifications. For instance, a user might need to be notified of certain activities within the system. The activities are stored in a separate collection, with the activities that are notifications for a user indexed on the userId. When we retrieve a user with it’s notifications, we basically make two queries, one against the user collection and one against the activities collections. However, if users are requested very frequently, we have two queries for each request. An alternative approach could be to store the notifications inline:

{
  "_id": 1,
  "_version": 3,
  "email": "user@example.com",
  "name": "Test User",
  "lastLogin": 123,
  "notifications": [
    {
      "_id": 1,
      "_type": "message",
      "_version": 1,
      "message": "How are you?"
    }
  ]
}

But if now the activity model changes, we have to handle the migration of activities in two places. We probably have an activity collection, and we have a list of notifications for each user. If we just update the case class, we have to implement the migration in two places. This works, but results in redundant code.

We are currently experimenting with an approach to provide to the outside world an API that wraps the grater part (which basically conforms to the buildObject(dbo) method from our example) with the following interface:

def buildUser(dbo: MongoDBObject): Either[User, User]

The idea is to return a Right if the document was up to date, and a Left if the document was migrated. We then now in the user store, that we have to updated the activity in the notifications array.

Conclusion

We use the approach presented here for a couple of months now and migrating the database on the fly is a no brainer in many cases. A crucial point is to write tests before deploying a migration and make sure that the migration paths are triggered as expected and produce the results expected. We have migrated the database numerous times now, and did not have a single problem. Using Salat, versioning documents, handling old versions and persisting the updated documents is extremely simple. We can acutally do most of the work in Scala and don’t have to cope with BSON documents directly even in the case of a migration.

Things only get more complicated, when you need to store inline documents redundantly. We are experimenting with some ideas, the most recent being the one outlined before. I would be happy to hear from other people and their approach to handling lazy migration in their application.

Scala Dynamics at Work: A KISSmetrics Library

With Scala 2.10 a pretty cool feature is available by default (it was available as an experimental feature before), namely Scala Dynamics. With Scala Dynamics, you can intercept calls to non existing methods on an object. Consider a class

class A {
    def method(i: Int): String = i.toString
}

and an object of that class:

val x = new A

Now we could call x.method(5) and we would get back the String representation of 5. Now, what would happen if we called x.someOtherMethod(5)? Right, we would get a compile time error, since this method does not exist.

Say Hello to the Dynamic Trait

Scala Dynamics provides you the means to intercept calls to non-existing methods on an object. The call to x.someOtherMethod(5) would therefore not lead to a compile time error, but be dispatched to some generic methods, together with some information about what method was called and what the arguments were. At that point, you are free to do whatever you think is appropriate.

This feature needs to be enabled for a class quite explicitly, by

  • mixing in the Dynamic trait
  • importing the dynamic feature

So, in order to enable this feature for our class from above:

import scala.language.dynamics

class A extends Dynamic {
    def method(i: Int): String = i.toString

    // fill the dynamic dispatch methods in...
}

The dynamic trait defines a number functions that might be implemented in order to intercept certain kinds of method calls. These are (see also the Scala API docs or this blog post):

  • applyDynamic("someOtherMethod")(5): Called for simple method invocations, such as x.someOtherMethod(5).
  • applyDynamicNamed("method")(("high" -> 5)): Called for method invocations involving named parameters, such as x.someOtherMethod(high = "5").
  • selectDynamic("someField"): Called for accesses to an unknown field, e.g., x.someField.
  • updateDynamic("someField")("high"): Called for assignments to an unknown field, such as x.someField = "high".

Now we could extend our class as follows:

import scala.language.dynamics

class A extends Dynamic {
    def method(i: Int): String = i.toString

    def applyDynamic(name: String)(number: Int): Int = name match {
        case "someOtherMethod" => 2 * number
        case _ => throw new RuntimeException(s"Method $name not implemented!")
    }
}

Now, calling x.someOtherMethod(5) will actually do something, while calling something like x.stillNotWorking(5) will throw a RuntimeException. Now lets have a look at how to actually employ this for something useful.

An API for KISSmetrics

KISSmetrics provides user tracking services. This is quite similar to Google Analytics, but KISSmetrics enables you to identify users across different channels. If an anonymous user surfs your website and turns into a customer, both identies are linked. If he logins from another device, that identity is also linked. Using KISSmetrics you have a better idea of how your user behaves on your website, which is important in order to test your existing or new features and optimize your app to provide the best possible user experience.

KISSmetrics provides a very simple REST API, basically allowing you to do 4 things:

  • identify a user, which is exclusively done from JavaScript, since it tells KISSmetrics that the initially anonymous user is now known by some persistent identifier, e.g., the identifier from your database or his email address.
  • trigger an event for a user, such as Signup or Checkout, potentially together with some properties, e.g. number of items checked out.
  • set a property for a user, e.g., gender or country.
  • alias an user with some other ID, e.g., if you have separate services but want to track something accross them.

As I’ve mentioned, the identify actions only makes sense from the UI, but the other actions are often better triggered from the backend, since certain events are more reliably detected there. Especially, if access to a service is provided via native or mobile apps in addition to some website.

A generic implementation of a KISSmetrics library in Scala would probably look something like this:

trait BaseService {
  def alias(person1Id: String, person2Id: String)
  def event(personId: String, eventName: String, properties: Map[String, String] = Map())
  def properties(personId: String, properties: Map[String, String])
}

For instance, in order to trigger a signup event, we could call

service.event("user1", "Signup", Map("Button Color" -> "red"))

While this is works, it would also be cool to do something like

service.signupWithButtonColor("user1", "red")

Normally, we would need to implement some wrapper around the base service, providing such methods, which is rather cumbersome. Scala Dynamics to the rescue! Using the dynamic features of Scala as described above, we can write a service that dispatches certain method calls to the methods of the base service. I published a first version on github. The basic idea is to provide a syntax for the different types of actions supported by KISSmetrics. Currently, the following syntax is supported:

service.e_Signup("user1") // triggering an event
service.e_Signup_with_Button_Color("user1", "red") // triggering an event with properties
service.e_Signup_with_Button_Color_and_Title("user1", "red", "title1") // two properties
service.p_Gender_and_Country("user1", "male", "Germany") // setting a property

The current syntax is rather a proof of concept, to get something implemented that works without too sophisitcated parsing, and there is certainly room for improvement. Currently everything is handled within the applyDynamic method:

def applyDynamic(trigger: String)(personId: String, args: String*) {
    val signature = trigger.split("_").toList
    if (signature.size < 2) throw new IllegalArgumentException("...")
    signature match {
        case "e" :: rest => handleEvent(personId, rest, args)
        case "p" :: rest => handleProperty(personId, rest, args)
        case _ => throw new IllegalArgumentException("...")
    }
}

In handleEvent and handleProperty we further process the signature:

private def handleEvent(personId: String, sig: List[String], propertyValues: Seq[String]) {
    val (propertySignature, name) = parseTo(sig, "with")
    val propertyNames = parsePropertyNames(propertySignature)
    if (propertyNames.size != propertyValues.size) throw new IllegalArgumentException("...")
    val map = propertyNames zip propertyValues toMap;
    event(personId, name, map)
}

The parseTo and parsePropertyNames collect the event name and the property names which are separated by the stop words with and and.

Things to Consider

There are two things to consider when using Scala Dynamics. First of all, there are no compile time checks. You can call arbitrary methods, but if in an unsupported method is called, you will only detect that at runtime. In order to reduce potential harm, you should have good tests in place to account for the missing compile time safety. One exception is the checking of the dynamic method signatures at compile time. The KISSmetrics service expects parameters of type String. Calling service.e_Signup(1) would result in a compile time error.

Secondly, you can not overload the dynamic handler methods. At least, I have not found a way to do that. So, if you want to define event methods that take either a String or an Int person id, you can not do something like:

def applyDynamic(trigger: String)(personId: Int, args: String*) { ... }
def applyDynamic(trigger: String)(personId: String, args: String*) { ... }

Basically, this means you can define exactly one applyDynamic method, which has to handle and further dispatch every legal method invocation you want to support. I assume, in more sophisticated scenarios than the rather simple KISSmetrics API, this results in no compile time checks at all, since the method will probably be defined as

def applyDynamic(sig: String)(args: Any*)

So, the final question is probably, whether using the Dynamic feature of Scala 2.10 is worth the hassle. As far as I know, languages like Ruby have this feature and it is used for ORM mappers such as ActiveRecord or ActiveRDF. They provide a very nice API for accessing data from a database. But one of the reasons to love Scala is probably its type safety, which basically is not present when using the dynamic feature.

I personally think it might still be useful in certain scenarios. The KISSmetrics API is one example. The alternative would be to put the information into a Map, where the same problems exist, i.e., mistyped event or property names, or missed properties. Writing a wrapper with explicit methods for each triggered event gives you great compile time checking, but I personally find it too cumbersome for this use case.

We plan to use the API as soon as doctivity is completely migrated to Scala 2.10, so currently we have no experiences with the library from a production system. I would be interested to hear any opinions on Scala Dynamics or the KISSmetrics library. Do you think it is appropriate to implement a library like that in Scala or is this too “scriptish”?

Continuous Deployment: Zero-Downtime Refactorings

Over at Kreuzverweis we have been practicing continuous deployment from the very beginning and also dam simple is continuously deployed. The theory of continuous deployment is, e.g., described by Timothy Fitz at IMVU or in the great book “Continuous Delivery” by Jez Humble and David Farley.

However, in practice it needs some thinking how to solve certain problems. You are not able to shut the system down for a couple of hours to do a migration step and release a completely new system in one batch. Continuous deployment is a logical consequence of continuous integration and it requires to develop changes in small batches, never breaking any code, and pushing to VCS often. But continuous deployment means that every commit is also a deployment, and thus it requires to migrate data in small batches on the fly and to account for the existence of legacy or stale data. In this blog post I will talk about how we currently refactor the complete file storage layer in dam simple with zero downtime.

What’s the problem, dude?

In dam simple, we currently have a 1:1 relation between documents and the file they represent. If you upload, lets say, a PDF, we create a document having references to the original file and differently sized thumbnails. It also contains information such as the title, the owner, keywords etc. However, we just started to implement versioning support for dam simple and obviously this requires a different approach to storing information about documents. We basically introduce a 1:n relationship between files and a document, since different files correspond to different version of the document.

Currently, a document is modelled as follows (please note that we use a very simplified version here):

We have a case class containing only the ID of the document and its title (among other information that we leave out for brevity). The storage layer component is called FileStore and provides method to retrieve the original and thumbnails of different sizes for a given document.

In order to support versioning we have to modify this API:

What has changed? We introduced a document store trait that provides access to the current and other arbitrary versions of a document. Each version is represented by a StoredFile, which contains metadata of the file and is used to access the original file and differently sized thumbnails using the FileStore as before.

So, to recap, we changed the following details:

  • We splitted information about the document into a case class Document and a case class StoredFile.
  • Both classes now access a dedicated component of the storage layer, while before everything was handled by the FileStore.
  • A document can now have more than one associated file.

And the problem is now

  • to introduce the new API “on the fly”, i.e., we do not want to shut down the service to migrate data.
  • And we want to enable it only for a limited number of users in order to be able to test the new feature thoroughly and to work on the UI without risking to disturb existing users.

I will describe how we refactored our code using simplified examples and showing only the read-part of the API. Refactoring other parts works accordingly.

Step 1: Introducing the new API and Testing

In the first step we concentrate on functionality that was available before, i.e., we do not implement the versioning support, but only access to the original file and its thumbnails. We start with extending the existing API, i.e., we do not touch the existing methods and only add new methods:

We then write tests for the new code and implement the methods as required. Since we had tests for the old code, we can assure that both the old and the new API works as expected. We are now in a state to deploy the current code base. The old methods have not been changed, all tests pass and the new methods are nowhere used except in the tests.

Step 2: Handling Stale Data.

Before we can enable the new upload API for a test group, we have to consider one important case. The test group already has data that was uploaded using the old API. Therefore, we will encounter assets of which the files can not be accessed using the new API. We already considered that case and let the DocumentStore return Options for the new methods. In case the new API returns None, we fall back to the old API, as we will show in a later gist.

Step 3: Enabling the New API for a Test Group

We can now enable the new upload for specific users, using techniques such as feature flipping. I recommend to always introduce new features guarded by some kind of feature flipping method, except you have very comprehensive acceptance tests in place. Being able to test new features before everyone can see them is one additional level of safety when deploying continuously. But you should not end up holding back too many features. With feature flipping, we switch between the old and the new API as is demonstrated in the following sippet:

In this example you can also see the fallback code used to handle stale data with the new API. Normally, it would be better to hide the fallback code in the API, but in this case, due to fact that we added a new type and changed the model quite fundamentally, the current approach seems to be more appropriate.

One important note: Write a test that checks that the correct APIs are called when the feature is enabled and disabled. You don’t want to find out in production that your forgot something!

Step 4: Enable New API by Default.

We are done with the first phase of refactoring, i.e., since data can not be updated, we have an implementation that uses the new implementation for newly uploaded content, and is still able to access data that was produced using the old API. When the new feature is tested within the test group it can be released for everyone. In this step we can also remove the old API completely from the code base, since everything is handled by the new one.

Sweet!

We have introduced the new API that is prepared for handling versions appropriately without having introduced only a second of downtime. Key to this approach was:

  • We introduced the new API in parallel to the old API, which enables us to guard the use of the new API with some sort of feature flipping, enabling the new API only for a test group.
  • We took care, that the new API is aware of the old format, i.e., in case we encounter stale data in our test group, the implementation falls back to the old API. Please note that the old API does not need to be aware of the new one.
  • In Scala, Options are king. Using Options we can very easily handle the cases were stale data is encountered.
  • Tests! You need tests for continuous deployment. With good tests in place you can modify your code base in small batches and always verify that existing functionality was not destroyed.
  • Feature Flipping! If you add new stuff, enable it only for a limited group of users, for instance the developers, everyone in your team, or maybe the test team. Automated tests are good, but enabling new stuff only for a limited number of people allows you to test with real people and to encounter problems that might not have been covered in your tests.

In the next blog post, I will talk about migrating data on the fly. Once we introduce the true versioning support, we also have to handle cases, where data is stored using the old model, but needs to be updated. In that case we need to migrate the data on the fly. So stay tuned.

Pushing new repository to gitolite server

I am playing around with git and gitolite recently. Setting up a git server using gitolite is very simple. Authentication is done using ssh, configuring the repositories can all be done from the client by just pushing new configurations into an admin repository. Very convenient. Creating a new repository on the server is as easy as adding a new configuration to the config file and pushing it to the server. However, what didn’t work was pushing something into this new repostory. The reason is that pushing into an emtpy repository needs a special command in git, what I didn’t know. You basically have to add some files, commit them and then do a

git push origin master

However, that still requires to add some files. This might not always be convenient, and thanks to this post I found out that the following also works on an empty repository:

git commit --allow-empty
git push origin master

Probably this is not new to people using git for a longer time, but I thought it is a good idea to document this for future reference.