Teddy's Knowledge .NET, Open Source, Cassandra, RabbitMQ, ELK, Architecture, etc

Elasticsearch Mantanence Lessons Learned Today

The elasticsearch cluster was down!

Today I troubleshooted an Elasticsearch-cluster-down issue. We have a 3-node Elasticsearch cluster receiving hundreds of Giga of tracking data every day. And this afternoon, it was suddenly down and all our kibana dashboards failed to load any data from it.

From elasticsearch-kopf monitor, we could see more than half of the shards are unallocated, so it sounds like at least 2 nodes were just restarted for some reason. Coz of our cluster setting is each index has one primary and one replica, until at least the primary shards are allocated, the indices are not able to be loaded. The shards are being slowly allocated automatically. If I’m patient enough and just wait for a while, it should be recover by itself in my understanding. So I try to wait. After 10 minutes, some dashboards could display, which looks good. But after 30 minutes, from kopf, I could see the HEAP of the master node keeps increasing, and eventually full. And the entire cluster becomes no responsive again. Restart the master node, but the HEAP still keeps increasing and be full and cluster down again.

Is it because the shards being reallocated too slow?

Someone in my team is working one a kibana dashboard, and since it suddenly stopped working, he reached me and asked if I could help to do something to speed up the recovery. And I’m guessing. Is it because the shards were reallocated too slow? I have a script which calls the elasticsearch API to do force shard reallocation. When I created the script, it was for fixing very few shards which could not be reallocated by elasticsearch itself. It looks like below:

IFS=$'\n'
for line in $(curl -s 'server_name/es/_cat/shards' | fgrep UNASSIGNED); do
  INDEX=$(echo $line | (awk '{print $1}'))
  SHARD=$(echo $line | (awk '{print $2}'))

  curl -XPOST 'server_name/es/_cluster/reroute' -d '{
     "commands": [
        {
            "allocate": {
                "index": "'$INDEX'",
                "shard": '$SHARD',
                "node": "ETELASTIC1",
                "allow_primary": true
          }
        }
    ]
  }'

  # try again since the first try might fail if the target node already has a copy of the shard
  curl -XPOST 'server_name/es/_cluster/reroute' -d '{
     "commands": [
        {
            "allocate": {
                "index": "'$INDEX'",
                "shard": '$SHARD',
                "node": "ETELASTIC2",
                "allow_primary": true
          }
        }
    ]
  }'

  sleep 2

done

Restarted the master node and executed the script, and it keeps running. It’ll take some time coz I have 10K of unallocated shards. The reallocation becomes faster. But no luck that before it finishes, the HEAP became full again. So sounds like the guess is not correct.

What causes the HEAP increasing then?

Checked the log of elasticsearch, and it shows there are only very few small cheap reads during the cluster down & recovery time. Checked the nodes breaker API and the fielddata cache is also not big. So what else might cost elasticsearch memory? If not reads, then it might be writes?

Luckily we have centralized logstash server for parsing all different types of tracking logs and feeding them to elasticsearch servers, so I simply stopped all of them. Restarted the master node again, wait and eventually, the elasticsearch cluster recovered without HEAP issue. Great! Here it is. So it should be because we configured our logstash jobs to always try re-connect elasticsearch servers when server connection timeout or no response, and since we have many logstash jobs trying to reconnect, it costs too much elasticsearch server-side memory in HEAP.

Let’s try some kibana dashboards, WAIT! Why some of the dashboards returns 404? It is impossible that people deleted them during my troubleshooting, we rarely delete dashboards. It makes me nerves. Coz it looks like reallocation of elasticsearch shards might cause data loss. WTF! If that’s true, how could we truct elasticsearch anymore?

Always human’s fault!

Ok, ask google. And, found the reason. Usually, it is always human’s fault. I mean, my fault, not elasticsearch’s. It is because of the “allow_primary=true” option in my “force reallocation” script. Bascially, when all the shards of an index are unallocated yet, and if you do reroute with the “allow_primary=true” option, some shards might become empty, and you LOSS data! Luckily, it is some shards of the kibana metadata index were lost, so I realized this issue, if it is only some tracking index loss, it is much harder for me to be awared.

But I’m lucky enough, coz:

  1. I have daily backup of the kibana metadata index with elasticdump, and no much dashboard changes everyday;
  2. We keeps latest 3-day raw data of all the tracking logs, so I could easily re-index them;

Restored from the kibana metadata index backup. And finally, all the dashboards are back!

Summary

Several lessons were learned here:

  1. When many elasticsearch cluster nodes are restarted, to avoid HEAP spike, better to temporarily stop all connection attempts;
  2. Avoid setting allow_primary=true when reroute shards via API;
  3. Don’t forget backup! It could save you some day!

RabbitMQ Exchange and Queue Design Trade-off

In previous post, I mentioned the discussion on StackOverflow regarding designing exchanges. Usually, when people ask about best practice of designing exchanges, you will get some useful suggestions, but finally you will be told that the final solution always “depends on your system needs”. Fair enough & safe enough answer, but not really helpful enough. So I want to extend this discussion a little bit here with more detailed backgrounds.

Before discussing any solutions, I’m trying to suggest some rules first. No matter which solutions to choose, these rules should be correct in most cases and should be followed if there are no compelling reasons.

Rule 1
Each queue should only represent one type of jobs. Do not mix different type of messages in one queue. And when this rule is followed, we could clearly name a queue with the job represented by it.

Rule 2
Avoid messgae re-dispatching on a queue. If you find that your subscriber is trying to re-dispatch any messages to other places without real processing, there might be something wrong. Routing or dispatching is the responsibility of exchanges rather than queues.

Rule 3
When not using the global default exchange, publishers should know nothing about queues. One of the essences of the AMQP design is the responsibility separation of exchange & queue, so that publishers don’t need to care about message comsuption, and comcumers don’t need tocare about where comes the messages at all. So when designing exchanges, the concepts only related to publishing, such as routing keys & message headers, should not has any knowledge of the queues they will finally been forwarded to.

Now let’s focus on the examples. The scenario is that you want to design exchanges and queues for “user” related write events. The write events will be triggered in one or many apps and these messages are to be consumed by some other apps.

| object | event   |
|------------------|
| user   | created |
| user   | updated |
| user   | deleted |

The first question people usually ask is for different events of one object (the “user” object in this example), should we use one exchange for publishing all the 3 events, or use 3 separate exchanges for each event? Or, in short, one exchange, or many?

Before answering this question. I actually want to ask another question: do we really even need a custom exchange for this case?

Different types of object events are so natual to match different types of messages to be published, but it is not really necessary sometimes. What if we abstract all the 3 types of events as a “write” event, whose sub-types are “created”, “updated” and “deleted”?

| object | event   | sub-type |
|-----------------------------|
| user   | write   | created  |
| user   | write   | updated  |
| user   | write   | deleted  |

Solution 1
The simplest solution to support this is we could only design a “user.write” queue, and publish all user write event messages to this queue directly via the global default exchange. When publishing to a queue directly, the biggest limitation is it assumes that only one app subscribes to this type of messages. Multiple instances of one app subscribing to this queue is also fine.

| queue      | app  |
|-------------------|
| user.write | app1 |

Solution 2
The simplest solution could not work when there is a second app (having different processing logic) want to subscribe to any messages published to the queue. When there are multiple apps subscribing, we at least need one “fanout” type exchange with bindings to multiple queues. So that messages are published to the excahnge, and the exchange duplicates the messages to each of the queues. Each queue represents the processing job of each different app.

| queue           | subscriber  |
|-------------------------------|
| user.write.app1 | app1        |
| user.write.app2 | app2        |

| exchange   | type   | binding_queue   |
|---------------------------------------|
| user.write | fanout | user.write.app1 |
| user.write | fanout | user.write.app2 |

This second solution works fine if each subscriber does care about and want to handle all the sub-types of “user.write” events or at least to expose all these sub-type events to each subscribers is not a problem. For instance, if the subscriber app is for simply keeping the transction log; or although the subscriber handles only user.created, it is ok to let it know about when user.updated or user.deleted happens. It becomes less elegant when some subscribers are from external of your organization, and you only want to notify them about some specific sub-type events. For instance, if app2 only wants to handle user.created and it should not have the knowledge of user.updated or user.deleted at all.

Solution 3
To solve the issue above, we have to extract “user.created” concept from “user.write”. The “topic” type of exchange could help. When publishing the messages, let’s use user.created/user.updated/user.deleted as routing keys, so that we could set the binding key of “user.write.app1” queue be “user.*” and the binding key of “user.created.app2” queue be “user.created”.

| queue             | subscriber  |
|---------------------------------|
| user.write.app1   | app1        |
| user.created.app2 | app2        |

| exchange   | type  | binding_queue     | binding_key  |
|-------------------------------------------------------|
| user.write | topic | user.write.app1   | user.*       |
| user.write | topic | user.created.app2 | user.created |

Solution 4
The “topic” exchange type is more flexible in case potentially there will be more event sub-types. But if you clearly know the exact number of events, you could also use the “direct” exchange type instead for better performance.

| queue             | subscriber  |
|---------------------------------|
| user.write.app1   | app1        |
| user.created.app2 | app2        |

| exchange   | type   | binding_queue    | binding_key   |
|--------------------------------------------------------|
| user.write | direct | user.write.app1   | user.created |
| user.write | direct | user.write.app1   | user.updated |
| user.write | direct | user.write.app1   | user.deleted |
| user.write | direct | user.created.app2 | user.created |

Come back to the “one exchange, or many?” question. So far, all the solutions use only one exchange. Works fine, nothing wrong. Then, when might we need multiple exchanges? This link summarizes some valuable ideas about performance and scalability considerations. If performance difference of too many bindings on “topic exchange” really becomes an issue, of course you could use more “direct” exchanges to reduce number of “topic” exchange bindings for better performance. But, here I want to focus more on function limitations of “one exchange” solutions.

Solution 5
One case we might natually consider multiple exchanges is for different groups or dimensions of events. For instance, besides the created, updated and deleted events memtioned above, if we have another group of events: login and logout - a group of events describing “user behaviors” rather than “data write”. Coz different group of events might need completely different routing strategies and routing key & queue naming conventions, it is so that natual to have a separate user.behavior exchange.

| queue              | subscriber  |
|----------------------------------|
| user.write.app1    | app1        |
| user.created.app2  | app2        |
| user.behavior.app3 | app3        |

| exchange      | type  | binding_queue      | binding_key     |
|--------------------------------------------------------------|
| user.write    | topic | user.write.app1    | user.*          |
| user.write    | topic | user.created.app2  | user.created    |
| user.behavior | topic | user.behavior.app3 | user.*          |

Other Solutions
There are other cases when we might need multiple exchanges for one object type. For instance, if you want to set different permissions on exchanges (e.g. only selected events of one object type are allowed to be published to one exchange from external apps, while the other exchange accepts any the events from internal apps). For another instance, if you want to use different exchanges suffixed with a version number to support different versions of routing strategies of same group of events. For another another instance, you might want to define some “internal exchanges” for exchange-to-exchange bindings, which could manage routing rules in a layered way.

In summary, still, “the final solution depends on your system needs”, but with all the solution examples above, and with the background considerations, as said in the performance and scalability consideration link, I also hope it could at least get one thinking in the right directions.

Understanding RabbitMQ Exchange and Queue

In previous post, I compared the difference of ActiveMQ (JMS) and RabbitMQ (AMQP)’s design philosophy. RabbitMQ has its beauty of simplicity in its design with 5 exchange types. In this post, I want to clarify the understanding of RabbitMQ’s exchange & queue and their relationship - binding.

In brief, exchanges are the only places where messages could be published to; while queues are the only places where messages could be consumed from. And the configurations for how messages should be routed from exchanges to queues are called bindings.

Imagine a message as a mail, then an exchange is like a post office, and a queue is kind of a mailbox in front of someone’s house. Different exchange types are like different types of mails which will be delivered in different ways. For instances, if a school wants to send the same notification mail to every student’s mailbox (kind of broardcast), it is exchange type - “fanout”; if a mail should only be sent to one specific student’s mailbox, it is exchange type - “direct”, and the key of the binding between the exchange and the queue should be the address of the student’s mailbox which the post office could understand. The mail to be sent to the post office should have the same address written on it, this address is the “routing key” of the message.

In RabbitMQ, it is not possible for a publisher to send a message to a queue directly. Even when you send a message without specifying an exchange, the message is actually sent to the global default exchange, who has bindings to each queue with queue name as binding key.

Messages can only be consumed from queues, consuming from exchanges directly are not allowed. An exchange has no storage. So if you send a message to an exchange while there are no queues bound to it or no bindings match the routing key of the message, the message will be thrown away immediately.

About designing exchanges, there is an interesting discussion on StackOverflow for modeling a classic scenario. One exchange, or many? Which one is better? My opinion is similar to derick’s. It really depends on your system needs. RabbitMQ’s exchange types are simple, but flexible enough to support different modellings for the same scenario. There are always tradeoffs with each one. But no matter one or many exchanges you choose, I agree with Jason Lotito’s comments there about routing keys and naming convention of queues: “Routing keys are elements of the message, and the queues shouldn’t care about that (he means the naming of queues should not care about value of routing keys of published messages). That’s what bindings are for.”, “Queue names should be named after what the consumer attached to the queue will do. What is the intent of the operation of this queue.”.

Behind RabbitMQ Exchange Types

Before touching RabbitMQ, I used MSMQ & ZeroMQ much in real projects before. I also played Apache ActiveMQ together with Camel a little bit (but not in real projects) when evaluating different ESB solutions before.

Like everyone, my first glance of RabbitMQ is the Get Started tutorials, and the feeling of the RabbitMQ design philosophy is, it is quite like the design philosophy of ZeroMQ - simple & consistent interface and super flexible configuration. Among all of the configuration options, the most interesting part is the “exchange types”. You might want to say that the concept of “exchange types” is from the AMQP protocol, not invented by RabbitMQ. Yes, you are right. But no doubt that RabbitMQ is one of the most popular AMQP implementations.

The 6 examples in the Get Started tutorials show the flexibility of RabbitMQ with the consistent and simple Publish & Receive interface. And I believe you could already imagine many more usages of them to match many other common scenarios.

But, what’s the underlying philosophy behind “exchange types”? In a word, it is all about implementing integration patterns in a manner of simple, stupid.

If you ever played Apache ActiveMQ and Camel, you must be familar with the Enterprise Integration Patterns. The examples of Apache Camel even exactly match all of the integration patterns. The design philosophy of ActiveMQ is, it provides the basic queue and pub/sub ability, and, together with Camel, to give ultimate flexibility for implementing all the integration patterns.

The design philosophy of ZeroMQ and RabbitMQ is quite different. Ultimate flexibility is cool, but performance and simplicity are also important. ZeroMQ chooses ultimate performance and simplicity rather than more integration features, while RabbitMQ is kind of in between of ActiveMQ and ZeroMQ. Its interface is as simple as ZeroMQ; the performance is not as super as ZeroMQ, but good enough; and it provides minimal but elegant configuration options to support most of the common integration patterns. One of the essences of its design is just the “exchange types”, which abstracts most common message routing & consuming scenarios with only 5 easy-to-understand types: default (no routing), direct, fanout (broadcast), topic and header.

Modeling & programming is the art of abstracting complexity. A real elegant design must be simple, stupid! That’s what behind the idea of “exchange types”.

Integrate PrettyPhoto with jekyll-picture-tag plugin

In previous post, I talked about installed jekyll-picture-tag plugin to display pictures responsively in posts. It works very well in different browsers and resolutions. But sometimes, we might want a photo gallery. Instead of displaying the full size of photos, we might want to display thumbnails, and on click, popup a lightbox style slideshow window for displaying the full size of photos.

So, in this post, I want to talk about how I have integrated PrettyPhoto with jekyll-picture-tag plugin. Our goal is no change to jekyll-picture-tag plugin, and to use the same jekyll-picture-tag’s liquid tag format, only to add PrettyPhoto effect with addtional attributes when necessary.

The live example, is already in the previous post, an image is clicked, the PrettyPhoto slideshow window will popup.

The only addtional attribute we add to the jekyll-picture-tag liquid tag is “group”. You could see the sample liquid tag below. Images marked with the same group value, will be displayed together in the same PrettyPhoto slideshow window. So easy!

{% picture 2015-1-20-1.jpg alt="picture 1" group="1" %}
{% picture 2015-1-20-2.jpg alt="picture 2" group="1" %}

Besides the addtional attribute in liquid tag. We also need to add necessary css and js to our page template. So, in the header include file, we need to add the following:

  <!-- Pretty Photo -->
  <script src="/public/js/jquery-1.6.1.min.js" type="text/javascript" charset="utf-8"></script>
  <link rel="stylesheet" href="/public/css/prettyPhoto.css" type="text/css" media="screen" charset="utf-8" />
  <script src="/public/js/jquery.prettyPhoto.js" type="text/javascript" charset="utf-8"></script>  
  <script type="text/javascript" charset="utf-8">
    $(document).ready(function(){
	  $("picture img").each(function(){
	    if (!$(this).attr('group')) return;
		
	    var parent = $(this).parent();
	    parent.attr('rel', 'prettyPhoto[group_' + $(this).attr('group') + ']');
		parent.attr('href', parent.children().first().attr('srcset'));
	  });
      $("picture[rel^='prettyPhoto']").prettyPhoto({theme:'pp_default',slideshow:5000,autoplay_slideshow:false,social_tools:false});
    });
  </script>  

Also, we need to add necessary css, js and image files to the jekyll repo. All the files could be downloaded from the official PrettyPhoto site.