Teddy's Knowledge .NET, Open Source, Cassandra, RabbitMQ, ELK, Docker, etc

FluentDataflow - Fluent Style TPL Dataflow

Background

In modern application development, there are so many scenarios of dataflow processing, such large volume data processing, image/video conversion, data migration, etc.

In .NET, thanks to Microsoft for providing the TPL Dataflow library which is a flexible and high performance library for this area.

TPL dataflow is really flexible, but when we want to build complex dataflows, we might have to create bunch of blocks, manually wire them, be careful to avoid data blocking, write a lot of tricks even for implementing the simplest branching and merging cases. If you are ever a dataflow developer, do you remember how many times you ever wait for your dataflow to complete but it never happens and you have to debug line by line to find out where the message is blocked and why.

And for long time, I heard of developers complaining about its lack of a fluent style API which is kind of standard configuration for almost all JAVA world libraies and also already provided by many popular .NET libraries. So when I also experienced the same fustratings, I decide to build one, which is what I want to introduce here - FluentDataflow.

Introduction

FluentDataflow, in short, is a fluent style wrapper and extension of TPL Dataflow.

The only entry-point of this library is the IDataflowFactory interface. Through the fluent style API provided by a factory instance, the outputs of this factory are still instances of standard IDataflowBlock, such as ITargetBlock, ISourceBlock, IPropogatorBlock, etc, instances, each of which usually wraps a simple or even a complex custom dataflow.

I heard what you are shouting: “Show me the code!” So, let’s go!

Examples

  • Simple aggregation example:
var splitter = new TransformBlock<string, KeyValuePair<string, int>>(input =>
{
    string[] splitted = input.Split('=');
    return new KeyValuePair<string, int>(splitted[0], int.Parse(splitted[1]));
});

var dict = new Dictionary<string, int>();

var aggregater = new ActionBlock<KeyValuePair<string, int>>(pair =>
{
    int oldValue;
    dict[pair.Key] = dict.TryGetValue(pair.Key, out oldValue) ? oldValue + pair.Value : pair.Value;
});

// the created the dataflow instance is also
// a strong-typed standard dataflow block
var dataflow = new DataflowFactory().FromPropagator(splitter)
    .LinkToTarget(aggregater)
    .Create();

dataflow.Post("a=1");
dataflow.Post("b=2");
dataflow.Post("a=5");
dataflow.Complete();

Task.WaitAll(dataflow.Completion);

System.Console.WriteLine("sum(a) = {0}", dict["a"]); //prints sum(a) = 6
  • A dataflow as part of another dataflow example
var factory = new DataflowFactory();

var aggregater = ... // Build an aggregator dataflow
var splitter = new TransformManyBlock<string, string>(line => line.Split(' '));

var dataflow = factory.FromPropagator<string, string>(splitter)
    .LinkToTarget(aggregator)
    .Create();

dataflow.Post("a=1 b=2 a=5");
dataflow.Post("c=6 b=8");
dataflow.Complete();

Task.WaitAll(dataflow.Completion);

System.Console.WriteLine("sum(b) = {0}", result["b"]); //prints sum(b) = 10
  • Broadcast example
var printer1 = new ActionBlock<string>(s => System.Console.WriteLine("Printer1: {0}", s));
var printer2 = new ActionBlock<string>(s => System.Console.WriteLine("Printer2: {0}", s));
var printer3 = new ActionBlock<string>(s => System.Console.WriteLine("Printer3: {0}", s));

var dataflow = new DataflowFactory().FromBroadcast<string>()
    .LinkTo(printer1)
    .LinkTo(printer2)
    .LinkTo(printer3)
    .Create();

dataflow.Post("first message");
dataflow.Post("second message");
dataflow.Post("third message");
dataflow.Complete();

Task.WaitAll(dataflow.Completion);
  • Multiple sources example
// by default, with native TPL, if multiple sources link to the same target,
// if set PropagateCompletion=true,
// as long as one of the source complets, the target complets.
// the target will miss some of the messages from the other sources.

// BUT, with our dataflow factory here, when set PropagateCompletion=true,
// dataflow.Complete() internally waits for all the sources to complete,
// so the target is guaranteed to receive all the messages from all the sources

var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var source3 = new BufferBlock<string>();
var printer = new ActionBlock<string>(s => System.Console.WriteLine(s));

var dataflow = new DataflowFactory().FromMultipleSources(source1, source2, source3)
    .LinkToTarget(printer)
    .Create();

for (var i = 0; i < 3; ++i)
{
    var s = i.ToString();
    source1.Post(s);
    source2.Post(s);
    source3.Post(s);
}

dataflow.Complete();

Task.WaitAll(dataflow.Completion);
  • Linking with filter example
// the filter only accepts even numbers,
// so odd numbers goes to declined printer
var filter = new Predicate<int>(i =>
{
    return i % 2 == 0;
});
var printer = new ActionBlock<int>(s => System.Console.WriteLine("printer: " + s.ToString()));
var declinedPrinter = new ActionBlock<int>(s => System.Console.WriteLine("declined: " + s.ToString()));
var inputBlock = new BufferBlock<int>();

var dataflow = new DataflowFactory().FromPropagator(inputBlock)
    .LinkToTarget(printer
        , filter
        // when linking with filter, you have to specify a declined block
        // otherwise, because there will be messages declined still in the queue,
        // the current block will not be able to COMPLETE (waits on its Completion will never return)
        , declinedPrinter)
    .Create();

for (int i = 0; i < 10; ++i)
{
    dataflow.Post(i);
}

dataflow.Complete();

Task.WaitAll(dataflow.Completion);
  • Join example
var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var printer = new ActionBlock<Tuple<string, string>>(s => System.Console.WriteLine("printer: {0},{1}", s.Item1, s.Item2));

var dataflow = new DataflowFactory().Join(source1, source2)
    .LinkToTarget(printer)
    .Create();

for (var i = 0; i < 3; ++i)
{
    var s = i.ToString();
    source1.Post(s);
    source2.Post(s);
}

dataflow.Complete();

Task.WaitAll(dataflow.Completion);
  • Batch example
var source = new BufferBlock<string>();
var printer = new ActionBlock<IEnumerable<string>>(s => System.Console.WriteLine("printer: " +string.Join("|", s)));

var dataflow = new DataflowFactory().FromSource(source)
    .Batch(2)
    .LinkToTarget(printer)
    .Create();

for (var i = 0; i < 6; ++i)
{
    var s = i.ToString();
    source.Post(s);
}

dataflow.Complete();

Task.WaitAll(dataflow.Completion);
  • If-else branching and merging example
var factory = new DataflowFactory();
// the ifFilter only accepts even numbers,
// so odd numbers goes to elseBlock
var ifFilter = new Predicate<int>(i =>
{
    return i % 2 == 0;
});
var printer = new ActionBlock<int>(s => System.Console.WriteLine("printer: " + s.ToString()));
var inputBlock = new BufferBlock<int>();
// if meet ifFilter, convert to: i -> i * 10
var ifBlock = new TransformBlock<int, int>(i => i * 10);
// else, convert to: i -> i * 100
var elseBlock = new TransformBlock<int, int>(i => i * 100);

var branchingDataflow = factory.FromPropagator(inputBlock)
    .LinkToPropagator(ifBlock, ifFilter, elseBlock)
    .Create();

var mergeingDataflow = factory.FromMultipleSources(ifBlock, elseBlock)
    .LinkToTarget(printer)
    .Create();

//encapsulate branchingDataflow and mergeingDataflow
var dataflow = factory.EncapsulateTargetDataflow(branchingDataflow, mergeingDataflow);

for (int i = 0; i < 10; ++i)
{
    dataflow.Post(i);
}

dataflow.Complete();

Task.WaitAll(dataflow.Completion);

Elasticsearch Mantanence Lessons Learned Today

The elasticsearch cluster was down!

Today I troubleshooted an Elasticsearch-cluster-down issue. We have a 3-node Elasticsearch cluster receiving hundreds of Giga of tracking data every day. And this afternoon, it was suddenly down and all our kibana dashboards failed to load any data from it.

From elasticsearch-kopf monitor, we could see more than half of the shards are unallocated, so it sounds like at least 2 nodes were just restarted for some reason. Coz of our cluster setting is each index has one primary and one replica, until at least the primary shards are allocated, the indices are not able to be loaded. The shards are being slowly allocated automatically. If I’m patient enough and just wait for a while, it should be recover by itself in my understanding. So I try to wait. After 10 minutes, some dashboards could display, which looks good. But after 30 minutes, from kopf, I could see the HEAP of the master node keeps increasing, and eventually full. And the entire cluster becomes no responsive again. Restart the master node, but the HEAP still keeps increasing and be full and cluster down again.

Is it because the shards being reallocated too slow?

Someone in my team is working one a kibana dashboard, and since it suddenly stopped working, he reached me and asked if I could help to do something to speed up the recovery. And I’m guessing. Is it because the shards were reallocated too slow? I have a script which calls the elasticsearch API to do force shard reallocation. When I created the script, it was for fixing very few shards which could not be reallocated by elasticsearch itself. It looks like below:

IFS=$'\n'
for line in $(curl -s 'server_name/es/_cat/shards' | fgrep UNASSIGNED); do
  INDEX=$(echo $line | (awk '{print $1}'))
  SHARD=$(echo $line | (awk '{print $2}'))

  curl -XPOST 'server_name/es/_cluster/reroute' -d '{
     "commands": [
        {
            "allocate": {
                "index": "'$INDEX'",
                "shard": '$SHARD',
                "node": "ETELASTIC1",
                "allow_primary": true
          }
        }
    ]
  }'

  # try again since the first try might fail if the target node already has a copy of the shard
  curl -XPOST 'server_name/es/_cluster/reroute' -d '{
     "commands": [
        {
            "allocate": {
                "index": "'$INDEX'",
                "shard": '$SHARD',
                "node": "ETELASTIC2",
                "allow_primary": true
          }
        }
    ]
  }'

  sleep 2

done

Restarted the master node and executed the script, and it keeps running. It’ll take some time coz I have 10K of unallocated shards. The reallocation becomes faster. But no luck that before it finishes, the HEAP became full again. So sounds like the guess is not correct.

What causes the HEAP increasing then?

Checked the log of elasticsearch, and it shows there are only very few small cheap reads during the cluster down & recovery time. Checked the nodes breaker API and the fielddata cache is also not big. So what else might cost elasticsearch memory? If not reads, then it might be writes?

Luckily we have centralized logstash server for parsing all different types of tracking logs and feeding them to elasticsearch servers, so I simply stopped all of them. Restarted the master node again, wait and eventually, the elasticsearch cluster recovered without HEAP issue. Great! Here it is. So it should be because we configured our logstash jobs to always try re-connect elasticsearch servers when server connection timeout or no response, and since we have many logstash jobs trying to reconnect, it costs too much elasticsearch server-side memory in HEAP.

Let’s try some kibana dashboards, WAIT! Why some of the dashboards returns 404? It is impossible that people deleted them during my troubleshooting, we rarely delete dashboards. It makes me nerves. Coz it looks like reallocation of elasticsearch shards might cause data loss. WTF! If that’s true, how could we truct elasticsearch anymore?

Always human’s fault!

Ok, ask google. And, found the reason. Usually, it is always human’s fault. I mean, my fault, not elasticsearch’s. It is because of the “allow_primary=true” option in my “force reallocation” script. Bascially, when all the shards of an index are unallocated yet, and if you do reroute with the “allow_primary=true” option, some shards might become empty, and you LOSS data! Luckily, it is some shards of the kibana metadata index were lost, so I realized this issue, if it is only some tracking index loss, it is much harder for me to be awared.

But I’m lucky enough, coz:

  1. I have daily backup of the kibana metadata index with elasticdump, and no much dashboard changes everyday;
  2. We keeps latest 3-day raw data of all the tracking logs, so I could easily re-index them;

Restored from the kibana metadata index backup. And finally, all the dashboards are back!

Summary

Several lessons were learned here:

  1. When many elasticsearch cluster nodes are restarted, to avoid HEAP spike, better to temporarily stop all connection attempts;
  2. Avoid setting allow_primary=true when reroute shards via API;
  3. Don’t forget backup! It could save you some day!

RabbitMQ Exchange and Queue Design Trade-off

In previous post, I mentioned the discussion on StackOverflow regarding designing exchanges. Usually, when people ask about best practice of designing exchanges, you will get some useful suggestions, but finally you will be told that the final solution always “depends on your system needs”. Fair enough & safe enough answer, but not really helpful enough. So I want to extend this discussion a little bit here with more detailed backgrounds.

Before discussing any solutions, I’m trying to suggest some rules first. No matter which solutions to choose, these rules should be correct in most cases and should be followed if there are no compelling reasons.

Rule 1 Each queue should only represent one type of jobs. Do not mix different type of messages in one queue. And when this rule is followed, we could clearly name a queue with the job represented by it.

Rule 2 Avoid messgae re-dispatching on a queue. If you find that your subscriber is trying to re-dispatch any messages to other places without real processing, there might be something wrong. Routing or dispatching is the responsibility of exchanges rather than queues.

Rule 3 When not using the global default exchange, publishers should know nothing about queues. One of the essences of the AMQP design is the responsibility separation of exchange & queue, so that publishers don’t need to care about message comsuption, and comcumers don’t need tocare about where comes the messages at all. So when designing exchanges, the concepts only related to publishing, such as routing keys & message headers, should not has any knowledge of the queues they will finally been forwarded to.

Now let’s focus on the examples. The scenario is that you want to design exchanges and queues for “user” related write events. The write events will be triggered in one or many apps and these messages are to be consumed by some other apps.

| object | event   |
|------------------|
| user   | created |
| user   | updated |
| user   | deleted |

The first question people usually ask is for different events of one object (the “user” object in this example), should we use one exchange for publishing all the 3 events, or use 3 separate exchanges for each event? Or, in short, one exchange, or many?

Before answering this question. I actually want to ask another question: do we really even need a custom exchange for this case?

Different types of object events are so natual to match different types of messages to be published, but it is not really necessary sometimes. What if we abstract all the 3 types of events as a “write” event, whose sub-types are “created”, “updated” and “deleted”?

| object | event   | sub-type |
|-----------------------------|
| user   | write   | created  |
| user   | write   | updated  |
| user   | write   | deleted  |

Solution 1 The simplest solution to support this is we could only design a “user.write” queue, and publish all user write event messages to this queue directly via the global default exchange. When publishing to a queue directly, the biggest limitation is it assumes that only one app subscribes to this type of messages. Multiple instances of one app subscribing to this queue is also fine.

| queue      | app  |
|-------------------|
| user.write | app1 |

Solution 2 The simplest solution could not work when there is a second app (having different processing logic) want to subscribe to any messages published to the queue. When there are multiple apps subscribing, we at least need one “fanout” type exchange with bindings to multiple queues. So that messages are published to the excahnge, and the exchange duplicates the messages to each of the queues. Each queue represents the processing job of each different app.

| queue           | subscriber  |
|-------------------------------|
| user.write.app1 | app1        |
| user.write.app2 | app2        |

| exchange   | type   | binding_queue   |
|---------------------------------------|
| user.write | fanout | user.write.app1 |
| user.write | fanout | user.write.app2 |

This second solution works fine if each subscriber does care about and want to handle all the sub-types of “user.write” events or at least to expose all these sub-type events to each subscribers is not a problem. For instance, if the subscriber app is for simply keeping the transction log; or although the subscriber handles only user.created, it is ok to let it know about when user.updated or user.deleted happens. It becomes less elegant when some subscribers are from external of your organization, and you only want to notify them about some specific sub-type events. For instance, if app2 only wants to handle user.created and it should not have the knowledge of user.updated or user.deleted at all.

Solution 3 To solve the issue above, we have to extract “user.created” concept from “user.write”. The “topic” type of exchange could help. When publishing the messages, let’s use user.created/user.updated/user.deleted as routing keys, so that we could set the binding key of “user.write.app1” queue be “user.*” and the binding key of “user.created.app2” queue be “user.created”.

| queue             | subscriber  |
|---------------------------------|
| user.write.app1   | app1        |
| user.created.app2 | app2        |

| exchange   | type  | binding_queue     | binding_key  |
|-------------------------------------------------------|
| user.write | topic | user.write.app1   | user.*       |
| user.write | topic | user.created.app2 | user.created |

Solution 4 The “topic” exchange type is more flexible in case potentially there will be more event sub-types. But if you clearly know the exact number of events, you could also use the “direct” exchange type instead for better performance.

| queue             | subscriber  |
|---------------------------------|
| user.write.app1   | app1        |
| user.created.app2 | app2        |

| exchange   | type   | binding_queue    | binding_key   |
|--------------------------------------------------------|
| user.write | direct | user.write.app1   | user.created |
| user.write | direct | user.write.app1   | user.updated |
| user.write | direct | user.write.app1   | user.deleted |
| user.write | direct | user.created.app2 | user.created |

Come back to the “one exchange, or many?” question. So far, all the solutions use only one exchange. Works fine, nothing wrong. Then, when might we need multiple exchanges? This link summarizes some valuable ideas about performance and scalability considerations. If performance difference of too many bindings on “topic exchange” really becomes an issue, of course you could use more “direct” exchanges to reduce number of “topic” exchange bindings for better performance. But, here I want to focus more on function limitations of “one exchange” solutions.

Solution 5 One case we might natually consider multiple exchanges is for different groups or dimensions of events. For instance, besides the created, updated and deleted events memtioned above, if we have another group of events: login and logout - a group of events describing “user behaviors” rather than “data write”. Coz different group of events might need completely different routing strategies and routing key & queue naming conventions, it is so that natual to have a separate user.behavior exchange.

| queue              | subscriber  |
|----------------------------------|
| user.write.app1    | app1        |
| user.created.app2  | app2        |
| user.behavior.app3 | app3        |

| exchange      | type  | binding_queue      | binding_key     |
|--------------------------------------------------------------|
| user.write    | topic | user.write.app1    | user.*          |
| user.write    | topic | user.created.app2  | user.created    |
| user.behavior | topic | user.behavior.app3 | user.*          |

Other Solutions There are other cases when we might need multiple exchanges for one object type. For instance, if you want to set different permissions on exchanges (e.g. only selected events of one object type are allowed to be published to one exchange from external apps, while the other exchange accepts any the events from internal apps). For another instance, if you want to use different exchanges suffixed with a version number to support different versions of routing strategies of same group of events. For another another instance, you might want to define some “internal exchanges” for exchange-to-exchange bindings, which could manage routing rules in a layered way.

In summary, still, “the final solution depends on your system needs”, but with all the solution examples above, and with the background considerations, as said in the performance and scalability consideration link, I also hope it could at least get one thinking in the right directions.

Understanding RabbitMQ Exchange and Queue

In previous post, I compared the difference of ActiveMQ (JMS) and RabbitMQ (AMQP)’s design philosophy. RabbitMQ has its beauty of simplicity in its design with 5 exchange types. In this post, I want to clarify the understanding of RabbitMQ’s exchange & queue and their relationship - binding.

In brief, exchanges are the only places where messages could be published to; while queues are the only places where messages could be consumed from. And the configurations for how messages should be routed from exchanges to queues are called bindings.

Imagine a message as a mail, then an exchange is like a post office, and a queue is kind of a mailbox in front of someone’s house. Different exchange types are like different types of mails which will be delivered in different ways. For instances, if a school wants to send the same notification mail to every student’s mailbox (kind of broardcast), it is exchange type - “fanout”; if a mail should only be sent to one specific student’s mailbox, it is exchange type - “direct”, and the key of the binding between the exchange and the queue should be the address of the student’s mailbox which the post office could understand. The mail to be sent to the post office should have the same address written on it, this address is the “routing key” of the message.

In RabbitMQ, it is not possible for a publisher to send a message to a queue directly. Even when you send a message without specifying an exchange, the message is actually sent to the global default exchange, who has bindings to each queue with queue name as binding key.

Messages can only be consumed from queues, consuming from exchanges directly are not allowed. An exchange has no storage. So if you send a message to an exchange while there are no queues bound to it or no bindings match the routing key of the message, the message will be thrown away immediately.

About designing exchanges, there is an interesting discussion on StackOverflow for modeling a classic scenario. One exchange, or many? Which one is better? My opinion is similar to derick’s. It really depends on your system needs. RabbitMQ’s exchange types are simple, but flexible enough to support different modellings for the same scenario. There are always tradeoffs with each one. But no matter one or many exchanges you choose, I agree with Jason Lotito’s comments there about routing keys and naming convention of queues: “Routing keys are elements of the message, and the queues shouldn’t care about that (he means the naming of queues should not care about value of routing keys of published messages). That’s what bindings are for.”, “Queue names should be named after what the consumer attached to the queue will do. What is the intent of the operation of this queue.”.

Behind RabbitMQ Exchange Types

Before touching RabbitMQ, I used MSMQ & ZeroMQ much in real projects before. I also played Apache ActiveMQ together with Camel a little bit (but not in real projects) when evaluating different ESB solutions before.

Like everyone, my first glance of RabbitMQ is the Get Started tutorials, and the feeling of the RabbitMQ design philosophy is, it is quite like the design philosophy of ZeroMQ - simple & consistent interface and super flexible configuration. Among all of the configuration options, the most interesting part is the “exchange types”. You might want to say that the concept of “exchange types” is from the AMQP protocol, not invented by RabbitMQ. Yes, you are right. But no doubt that RabbitMQ is one of the most popular AMQP implementations.

The 6 examples in the Get Started tutorials show the flexibility of RabbitMQ with the consistent and simple Publish & Receive interface. And I believe you could already imagine many more usages of them to match many other common scenarios.

But, what’s the underlying philosophy behind “exchange types”? In a word, it is all about implementing integration patterns in a manner of simple, stupid.

If you ever played Apache ActiveMQ and Camel, you must be familar with the Enterprise Integration Patterns. The examples of Apache Camel even exactly match all of the integration patterns. The design philosophy of ActiveMQ is, it provides the basic queue and pub/sub ability, and, together with Camel, to give ultimate flexibility for implementing all the integration patterns.

The design philosophy of ZeroMQ and RabbitMQ is quite different. Ultimate flexibility is cool, but performance and simplicity are also important. ZeroMQ chooses ultimate performance and simplicity rather than more integration features, while RabbitMQ is kind of in between of ActiveMQ and ZeroMQ. Its interface is as simple as ZeroMQ; the performance is not as super as ZeroMQ, but good enough; and it provides minimal but elegant configuration options to support most of the common integration patterns. One of the essences of its design is just the “exchange types”, which abstracts most common message routing & consuming scenarios with only 5 easy-to-understand types: default (no routing), direct, fanout (broadcast), topic and header.

Modeling & programming is the art of abstracting complexity. A real elegant design must be simple, stupid! That’s what behind the idea of “exchange types”.