Sunday, September 18, 2016

Algebird Decayed Value

Since it took me some time to understand how decayed values in algebird work, I thought I'd document it here.

A DecayedValue conveys the idea of a value that is exponentially decaying over time. The DecayedValue itself just represents a snapshot of such a decaying value, it has a value and time. The time is actually scaled time, different exponentially decaying values can be normalized to the same timescale such that they decay at the same rate. The value of a DecayedValue at any other time can be computed by exponentially scaling the value.

e.g. Let's say we have DecayedValue(1.0, 100) i.e. value of 1.0 and scaledTime of  100. Let's say we want to find out what this DecayedValue will be at time 101. For this we basically just need to decay the value by one unit of exponential decay i.e. to 1.0 * Math.exp(-1). So the new value will be:
DecayedValue(Math.exp(-1), 101). Similarly the value at time 99 would be DecayedValue(Math.exp(1), 99). Note that all of these represent the exact same thing i.e

DecayedValue(1.0, 100) === DecayedValue(Math.exp(-1), 101) === DecayedValue(Math.exp(1), 99)

See how we've represented a varying value immutably. This is general concept, values varying with time can be made immutable by capturing their value at a specific time along with the timestamp.

Here's some more sample code to illustrate the concept:
  def decayedValueTry() = {
    // Choosing values so that x decayed at y's time is equal to y's value
    val x = DecayedValue(Math.exp(2), 1)
    val y = DecayedValue(Math.exp(1), 2)

    // eps is a value below which we consider decayed value to be 0
    val monoid = DecayedValueMonoid(0.01)
    val sum = monoid.plus(x, y)

    // We are expecting double of y's value since x is chosen such that
    // it decays to y's value, with infinitesimally small error.
    assert(Math.abs(2 * y.value - sum.value) < 0.00001)
  }

Sunday, September 28, 2014

Finagle, promises, local and dtab

Finally figured out how request context is defined in finagle. It was a bunch of things that had been puzzling me for some time and all started making sense together.

So what is a request context. To understand request context we first have to understand how computations work in finagle. The first two rules of finagle are that you do not block. So how then do you make network calls and other typically blocking operations? The answer is that you still, of course, perform all those computations but by not blocking. Each blocking computation returns a future and then you define future computations on that. This way you never block a thread, you just do your thing and leave the thread, defining the next steps when you get a chance again. In this way you can define what you want to do as a graph of these chained operations. It may sound complex but in practice it is very easy to express as monadic operations on futures. Scala even provides a convenient (for comprehension) syntax for expressing these. So you can imagine an incoming request coming in, starting a bunch of async computations that in turn spawn other computations and so on. Ultimately, a request ends up occupying chunks of execution time slices on many threads. Request Context is state that is available in all those time slices. If all these operations were happening on the same thread in a blocking manner then defining this context would be very simple, it's just any state defined on the thread, whatever variable you define is available to all subsequent operations. In an async scenario this has to be managed in a more sophisticated manner. What we basically have to do is save the request context before starting any async operation and restore it afterwards. Request Context is stored in com.twitter.util.Local.

Local is a utility class built on Java's ThreadLocal that provides operations to easily save and restore data saved in Local onto the current thread. This saving and restoring of state is exactly what happens in async operations in finagle. Async operations are typically implemented as promises. A promise is created at the start of an async operation. This promise implements the future interface and can be passed to callers who can use them to define operations pending on the execution of the underlying async operation of the promise. When the async operation finishes, the promise is marked complete. Thus a promise wraps the execution of an async operation. Finagle does an additional thing here. Before executing the async operation the local context, defined using com.twitter.util.Local, is stored away and after the async operation finishes it is restored. Thus the local context survives the async operation. This facility is extremely powerful as we'll see. We can stow away some piece of data regarding a particular request and have it available in all computations spawned by the request.

Delegations Tables or DTabs are a mechanism of defining how names map to locations where locations are network locations defined ultimately as ipaddress+port. Imagine on your desktop machine you had mounted a remote server's directory, the remote server directory in turn had mounted another server's directory, which had mounted a disk under it's directory somewhere and so on. When you want to read from a file in such a filesystem a lot of resolutions may happen behind the scenes. After some prefix of the path, when a mounted location is encountered, it would have to be mapped to a network location. This kind of a thing will go on until we find the file in question in some sector of some disk somewhere. My point is that names translate to locations. Similarly in Wily naming names translate to locations albeit the locations are host port pairs in the ip universe. There is an elaborate process of resolution to achieve that. Finagle allows overriding the resolution process at various granularities, the finest of which is an individual request. The overrides are defined by things called delegation tables, which basically define prefix substitutions. I won't go into the details of that mechanism here, that's a different topic, but talk about how overriding of these dtabs per request is achieved.

The DTab overrides for a request are basically stored in the local context, defined by com.twitter.util.Local. This dtab override then stays available through out the asynchronous life of a request and affects every network request that this request in turn spawns. Thus every downstream network request that this request triggers uses the overridden delegation table. Well that's not the end of it, for participating protocols such as Http and Thrift this overridden delegation table gets written to outgoing requests and read back on the receiving services. Thus these overrides are basically preserved across the entire network stack for the request. Is this cool or what? Calling it amazing would be an understatement, if you ask me.

Thursday, May 22, 2014

Embrace the implicit

When you first learn scala, implicits come as one of the most advanced and complicated features to grasp. Over time, you get an impression from the scala community that implicits are an advanced feature that should be used carefully. This makes a lot of sense because when you use them in your code, everybody that reads the code needs to know about implicits first of all and then how the particular implicit works in that particular context. Implicits are somewhat magical (and by magical I mean the bad sort of magic) in that sense. Usually when you read code, the context that you need to understand the code with, is around there or at least there is a direct clue to where go look. With implicits, the process to go look is complicated, because the best fit is chosen through an involved process. You have to look at all related companion objects and see if an implicit is defined there that would fit the invocation location best. For compiler, this is an easy thing to do, but for humans it’s very involved.

So well, if there is so much complexity associated with implicits then why embrace them. I’ll give you two reasons:

  1. Implicits are used all over the place in most of the useful and popular scala libraries. Without understanding how they work it will feel like magic (not good); you may as well be working in ruby.

  2. Implicits happen to be fundamental to scala

The first point is self explanatory. Let me expand on the second one.

A great thing about scala collections is that when you map over a collection you get back the collection of the same type or the type that fits best. E.g. if you map over list you get back a list. If you map over an array you get back an array, when you map over a map you get back a map !! This, as you would expect, is a very important property of the collections and arguably without this property the collections would be far less useful. The way this is achieved in the scala collections is through the CanBuildFrom pattern. There is a great explanation of how this works in the "Programming in Scala" book in the chapter titled "The Architecture of Scala Collections". In short, the way it works is that each collection type has an associated builder that knows how to build a collection of the type, adding one element at a time. The map function on the collection takes an implicit builder factory as input. At compile time, the builder that best fits in the context is implicitly chosen and thus the builder of the same type or the "most similar" type is chosen. The concept is explained in great detail in the paper titled Generics of a Higher Kind.

Context bounds and adhoc polymorphism in scala work using implicit conversions.

Martin Odersky, the creator of scala, even includes implicit parameters in the simple parts of Scala.

For better or for worse, implicits are hard to avoid in Scala. I feel it’s best to embrace them rather than avoid learning about them. The important rules for implicits are these:

  • Only constructs defined with the implicit keyword can be used in an implicit context. So, if your function takes an implicit parameter of type MyType then only a variable of type MyType that is defined marked with the keyword implicit would fit, not any other variable of that type.

    val builder = new Builder[MyType]("some")
    implicit val implicitBuilder = new Builder[MyType]("imp")
    def myfunc(param1: String)(implicit builder: Builder[MyType])

    Only implicitBuilder fits and will be picked up.

  • An inserted implicit conversion must be in scope as a single identifier, or be associated with the source or target type of the conversion.

    • This is the most complex part. The scope here is not just the lexical scope but includes the companion types, and associated companion types) of the source and target as well. Source type is the object that is passed in and target type is the type that is expected in the context. The implicit conversions may be defined in the companion object of either type. Any time you see a function call it’s hard to predict by quickly reading the code what types it was passed in because the passed in type may be any that can convert to the expected type through implicit conversions, which can be defined in the imported definitions, companion objects of the source and target types. If this wasn’t enough the definitions are also searched in associated types of the source and target types where association is defined by

      • All subtypes of a type

      • All type parameters of a types

      • Special associations for Singleton types and type projections

        Mind boggles at the possibilities.

  • One-at-a-time Rule: Only one implicit is tried.

    The compiler will never rewrite x + y to convert1(convert2(x)) + y (example taken directly from the book Programming in Scala)

  • Explicits-First Rule: Whenever code type checks as it is written, no implicits are attempted.

    If the code compiles already, i.e. without needing any implicit conversions then none will be attempted. Implicit conversions are always used as a fall back.

  • Where implicits are tried.

    1. implicit parameters

      def map[B, That](f: (A)B)(implicit bf: CanBuildFrom[List[A], B, That]): That
    2. implicit conversions to expected type

      implicit def intToString(i: Int): String = i.toString
      def hyphenate(str: String)
      val i = 1234
      hyphenate(i)

      The above compiles even though hyphenate receives an int instead of string because int gets converted to the expected type, i.e. string, automatically

    3. Conversions of the receiver of a selection

      import OptToTry._
      val opt = funcThatMaybeReturnsAnInt()
      opt.toTry(new Exception("No dice"))
      object OptToTry {
        class TryConvertableOption[T](opt: Option[T]) {
          def toTry(e: Exception) = opt match {
            case Some(v) => Return(v)
            case None => Throw(e)
          }
        }
        implicit def optToTryConvertableOption(opt: Option[T]): TryConvertableOption[T] =
          new TryConvertableOption(opt)
      }

      OptToTry supplies an implict conversion from option to TryConvertableOption that supports a method to convert an option to a Try. This enables calling toTry on options to convert them to Try.

Having a good grasp of implicits is not easy but is essential to get a good grip on scala. I wish I could give you a silver bullet but there is none that I know of. Get over it and embrace the implicits. One caveat though, implicits allow you to play nice magical tricks, tricks that will be a puzzle to everyone who reads that code every time, including you. I like solving puzzles but not when I’m reading code, I hope you feel the same and keep it simple with implicits.

Tuesday, May 20, 2014

Blogging from command line using b.py

Since it took me some time to figure out how to set up b.py to start blogging from command line, I thought I'd post it out here.

pip install b.py

pip install markdown

pip install google-api-python-client

pip install python-gflags

pip install docutils

brew install source-highlight

Then make a directory where you'll keep your blogs. This directory needs to have a configuration file called brc.py

Mine looks like this(I use blogger, blogid jumbled):

brc.py

service = "b"
service_options = {
  'blog': 3865076449691263698
}

Save the above in a file called brc.py in the same folder where you want to keep your blogs.

To get your blog id run the following:

b.py blogs

This does two things:

  1. It would ask you login to blogger in the browser and then save the creds in a file called b.dat. Make sure you doe this in the blog directory, the one that has brc.py
  2. It lists your blogs along with the blog id. You should update the blog id in the brc.py file

Now create blog post in markdown e.g. like this:

blogpost.md

!b
service: blogger
title: My First Post
labels: blogging

My first blog from command line

The first four lines are special. !b tells b.py that headers follow. The three headers provide info about the blog post.

Make sure the following files are in the blog directory:

  1. brc.py
  2. b.dat
  3. blogpost.md

Now publish the post: b.py post blogpost.md

This should have published the blog. b.py would have changed your blog file, don't be surprised. It inserts some metadata in the file, perhaps to keep track of it to update it later, I haven't tried.

If you need speed nothing beats command line. Thank the creator of b.py and enjoy!!

Friday, July 5, 2013

TCP_NODELAY

TCP_NODELAY is a socket property that disables Nagle's algorithm. Nagle's algorithm is basically about buffering requests to a network packet size before sending. This is the classic throughput vs latency tradeoff but only useful when request size is very small. If the intention is to send the request immediately, e.g. in real time systems, then this property should be turned off. Also if requests are usually not very small then there is no benefit of using Nagle's algorithm and thus keeping it off would be better.

Akka Source Code Notes


Mailbox typically holds an in memory concurrent queue called MessageQueue. An actor basically processes messages from a mailbox.

deadLetterMailbox is a special mailbox where messages that couldn’t be sent to an actor are sent. This can happen, for example, when an actor dies and there are pending messages in its mailbox, these messages are sent to deadLetterMailbox or dlm for short.

Dead letters mailbox just sends the message to DeadLetter actor.

The invoke function on the actor is the one that seems to execute the message

case class Envelope private (val message: Any, val sender: ActorRef)

So envelope is the message and the sender actor.

def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack


behaviorStack is the one that basically supplies the actor’s receive function

type Receive = PartialFunction[Any, Unit]

Actor’s receive function returns this partial function. When actor becomes something it pushes a new receive function on the behaviorStack.

ActorCell has most of the meat of the actor logic.

When a new actor is created the receive function is called and result is pushed into the behaviorStack.

Whatever is at the top of behaviorStack is the current behavior of the actor.
unbecome pops the behavior. unbecome always defaults to the default actor receive block, never goes down to Nil.

applyOrElse is a method on the partial function which calls the underlying function if it is defined for the given argument, otherwise calls the supplied default.

unhandled is the default method to handle unhandled messages:

def unhandled(message: Any) {
   message
match {
     
case Terminated(dead) throw new DeathPactException(dead)
     
case _                 context.system.eventStream.publish(UnhandledMessage(message, sender, self))
   
}
 
}


processMailbox in Mailbox.scala is the one that handles a message in the mailbox and in turn calls invoke on the actor.

Important detail of processMailbox is that system messages are checked at the end of each message, which has the implication that actor kill, resume etc take effect after the current message is processed:
message match {
         
case message: SystemMessage if shouldStash(message, currentState) stash(message)
         
case f: Failed handleFailure(f)
         
case DeathWatchNotification(a, ec, at) watchedActorTerminated(a, ec, at)
         
case Create(failure) create(failure)
         
case Watch(watchee, watcher) addWatcher(watchee, watcher)
         
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
         
case Recreate(cause) faultRecreate(cause)
         
case Suspend() faultSuspend()
         
case Resume(inRespToFailure) faultResume(inRespToFailure)
         
case Terminate() terminate()
         
case Supervise(child, async) supervise(child, async)
}


Similarly, adding/removing watchers also takes effect after the current message is processed.

This code is not clear to me:
protected final def systemQueueGet: LatestFirstSystemMessageList =
   
// Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such
   
// it just exists as a typed view during compile-time. The actual return type is still SystemMessage.
   
new LatestFirstSystemMessageList(Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage])



Mailbox implements runnable, it has a function called run. In run it calles processMailbox which in turn runs the message on the actor.

Ok, so the big picture is becoming clear now. The dispatcher uses the executorService to run mailbox as a runnable, thus mailbox’s run method is called on the the thread pool. The run method calls invoke on actor which sends message to the actor. Thus invocation of run on mailbox results in messages being processed by the actor.

registerForInvocation in dispatcher is the critical function here:
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
   
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
     
if (mbox.setAsScheduled()) {
       
try {
         executorService execute mbox
         
true
       
} catch {
         
case e: RejectedExecutionException
           
try {
             
executorService execute mbox
             
true
           
} catch { //Retry once
             
case e: RejectedExecutionException
               
mbox.setAsIdle()
               
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
               
throw e
           
}
       
}
     
} else false
   
} else false
 
}

Calling dispatch on dispatcher puts the message on the mailbox and registers for execution:

protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
   
val mbox = receiver.mailbox
   mbox
.enqueue(receiver.self, invocation)
   registerForExecution
(mbox, true, false)
 
}

There are other kinds of dispatchers such as balancing dispatcher that handle dispatch a little differently.

override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)


The picture becomes clearer. Sending a message to an actor is calling ! on the corresponding actor (that I knew) which calls sendMessage.

def sendMessage(msg: Envelope): Unit =
   
try {
     
val m = msg.message.asInstanceOf[AnyRef]
     
if (system.settings.SerializeAllMessages && !m.isInstanceOf[NoSerializationVerificationNeeded]) {
       
val s = SerializationExtension(system)
       s
.deserialize(s.serialize(m).get, m.getClass).get
     
}
     dispatcher
.dispatch(this, msg)
   
} catch handleException

sendMessage in turn calls dispatch on the dispatcher which puts the message on the mailbox of the actor and registers mailbox for execution. Which uses the executor service to call execute on mailbox. Mailbox is a runnable with a run method that calls processMailbox. processMailbox executes "throughput" messages on the actor. It’s important to note that the throughput messages are running in a single run call, so they execute as a single call task on the thread.

One thing wasn’t clear to me, that registerForExecution calls execute on executor service immediately but message from actors are processed asynchronously, how does that work? What if there is no thread available to do that processing? This is explained by looking at the documentation of execute function of the executor interface: http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executor.html

Execute doesn’t execute the runnable immediately but queues it up, execution will happen when a thread is available. Thus no polling of mailbox is required. Each time a message is sent an execute is called on the mailbox thus a run invocation is scheduled on the mailbox. Each run invocation will consume one or more messages. It seems to be that if throughput is more than 1 than there will be more than needed run calls. Need to investigate this more.