24 May 2019
machine learning
In this series of posts, I want to share my machine learning study practices of neural networks and deep learning with Tensorflow 2.0. I’d like to solve several common machine learning problems with both classical statistics-based algorithms and neural networks. And let’s see if we could intuitively make some judgments on the pros and cons of each side.
Problem 1: Predict Auto Insurance in Sweden (Linear Regression on Small Datasets)
For solving this problem, we will use a real but simple dataset - “Auto Insurance in Sweden” to demonstrate linear regression on small datasets.
This dataset contains the number pairs of the total payment for all the claims in thousands of Swedish Kronor (y) given the total number of claims (x). It is simple enough that it only contains one feature x, to predict a single value y, so we could easily visualize it in a 2D axis.
You can download and view the raw dataset from this URL.
Reminder: For running the demo code, you need to have Python 3.7+, Tensorflow 2.0, numpy, pandas and sklearn libraries installed.
We first write some code to format the dataset:
import numpy as np
import pandas as pd
import io
import requests
# download the dataset
url="https://www.math.muni.cz/~kolacek/docs/frvs/M7222/data/AutoInsurSweden.txt"
s=requests.get(url).content
# read the dataset as csv and covert it to numpy arrays
c=pd.read_csv(io.StringIO(s.decode('utf-8')), skiprows=10, sep='\t', dtype={'X': float, 'Y': str})
a=c.to_numpy()
X = a[:,0:1].astype('float')
y = np.array([float(v.replace(',','.')) for v in a[:,1]])
Let’s visualize the full dataset since it is small enough:

Usually, when doing regression to predict a value, we need to first assume the model type. For example, is it more likely to be linear or polynomial? Coz the only thing we’re doing in training a machine learning model is to fit the parameters of a predefined model. For this dataset, according to the scatter drawing, we can say a linear model will work well.
With a classical statistics-based algorithm, this means we will fit parameter w and b for model: y=wX+b.
With sklearn library, we could fit a classical model in only two lines of code. And let’s plot the prediction values of all the X with this model. Not surprisely, it is a line in the 2D axis. Looks perfect!
# fit a classical model
from sklearn.linear_model import LinearRegression
reg = LinearRegression().fit(X, y)
# draw the classical regression model
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.scatter(X, y, color='k')
ax.plot(X, reg.predict(X),color='g')
plt.show()

Now, let’s try to solve the same problem with neural networks. First, let’s try with a shallow neural network, with only one hidden Dense layer of 8 nerons.
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
# construct a shallow neural network
model = Sequential()
model.add(Dense(8))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
# standardize the dataset and train the model
model.fit(X/100, y/100, epochs=200)
# Plot neural network model
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.scatter(X, y, color='k')
ax.plot(X, reg.predict(X),color='g')
ax.plot(X, model.predict(X/100.0)*100,color='b')
plt.show()
After 200 epochs of training, we get a line which is not as fit as the classical one:

But if we increase the epochs to 500, we could get an exactly same perfect line. The blue line almost overlaps with the green line:
# the same shallow neural network but increase the epochs to 500
model = Sequential()
model.add(Dense(8))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X/100, y/100, epochs=500)
# Plot neural network model
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.scatter(X, y, color='k')
ax.plot(X, reg.predict(X),color='g')
ax.plot(X, model.predict(X/100.0)*100,color='b')
plt.show()

Well, although it takes longer time, but even such a shallow neural network did the job. You might wonder, what if we increase the number of neurons in the hidden layer or add some more hidden layers?
Let’s try to increase the number of neurons in the hidden layer from 8 to 128.
# increase the number of neurons in hidden layer to 128
model = Sequential()
model.add(Dense(128))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X/100, y/100, epochs=200)
# Plot neural network model
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.scatter(X, y, color='k')
ax.plot(X, reg.predict(X),color='g')
ax.plot(X, model.predict(X/100.0)*100,color='b')
plt.show()

Well, we can see, this time, with only 200 epochs, we get a perfect fit line.
How about increasing the number of hidden layers instead? Let’s use 3 hidden Dense layers, each having 8 neurons.
# increase the number of hidden layers to 3
model = Sequential()
model.add(Dense(8))
model.add(Dense(8))
model.add(Dense(8))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X/100, y/100, epochs=100)
# Plot neural network model
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.scatter(X, y, color='k')
ax.plot(X, reg.predict(X),color='g')
ax.plot(X, model.predict(X/100.0)*100,color='b')
plt.show()

Awesome! this time, only with 100 epochs, we get almost the same result.
So, what’s our conclusion after these tests?
- Neural networks can do the same job for linear regression tasks;
- Even a simple shallow neural network could fit although it takes longer time to converge;
- Increase the number of neurons in a hidden layer or increase the number of hidden layers could enhance the model;
- A classical statistics-based model takes less time to fit on small dataset.
Further thinking:
Since a well-trained neural network can predict almost the same results as the classical model, can we simply write down the model as a Maths function, something like y=wX+b?
The answer is YES as long as there are no activation functions being used in any layers of the model. Actually, if we construct a neural network with only one neuron, the output y exactly equals to wX+b. If you try to train this simplest neural network, you still can get the same prediction results, although it took 4000 epochs to converge on my test machine.
# the simplest neural network
model = Sequential()
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X/100, y/100, epochs=4000)
# Plot neural network model
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.scatter(X, y, color='k')
ax.plot(X, reg.predict(X),color='g')
ax.plot(X, model.predict(X/100.0)*100,color='b')
plt.show()

So, why does a neural network with more neurons or more Dense layers mean? It just means a linear combination of multiple simple y=wX+b models. In this simple dataset, because we only have one feature in X, no matter how many neurons and Dense layers we use, the Maths representation of the model can be written as y=w’X+b’, where w’ and b’ are linear combinations of w and b parameter values of all the neurons in the model. Apparently, it is still a linear model, that’s why the plot of the converged model is always the same line in the 2D axis.
The last question: why a neural network with more neurons or Dense layers takes less time to converge? Well, we could understand this as kind of we are doing parallel training of a single neuron. It is not exactly that a 2-neuron model reduces half of the training time from a 1-neuron model, coz the convergence of the model also related to the training data, how you initialize the parameters, the optimizer you use, etc. But provided that you use the best practice to set these so-called hyperparameters, more or less, more neurons or Dense layers reduces training time when you have enough hardware resource (CPU, GPU, memory, etc) to parallel the training.
09 Aug 2018
machine learning
In recent 3 months, I have spent most of my casual time in studying machine learning algorithms. I’m really a new comer to machine learning. And left college for more than 15 years, no surprise, I have successfully forgotten most of my maths knowledge for such as calculus, linear algebra, probabilities, statistics, etc.
How I began my learning was from reviewing all the maths by reading books, doing exercises, then watching online machine learning videos, and finally following books & tutorials in action.
Next step I plan to work on some real practice projects. But before that, I collected my study notes of machine learning in action in a github repo.
Again, I’m still a freshman in machine learning, so I cannot 100% guarantee that all my understanding in the notes is correct though I have tried my best and hope it is. So if you find anything wrong in the repo, or if you have any comments or suggestions, please don’t hesitate to let me know.
Here is the link to my github repo:
Teddy’s Machine Learning in Action Study Notes
22 Oct 2017
dataflow
Background
In modern application development, there are so many scenarios of dataflow processing, such large volume data processing, image/video conversion, data migration, etc.
In .NET, thanks to Microsoft for providing the TPL Dataflow library which is a flexible and high performance library for this area.
TPL dataflow is really flexible, but when we want to build complex dataflows, we might have to create bunch of blocks, manually wire them, be careful to avoid data blocking, write a lot of tricks even for implementing the simplest branching and merging cases. If you are ever a dataflow developer, do you remember how many times you ever wait for your dataflow to complete but it never happens and you have to debug line by line to find out where the message is blocked and why.
And for long time, I heard of developers complaining about its lack of a fluent style API which is kind of standard configuration for almost all JAVA world libraies and also already provided by many popular .NET libraries. So when I also experienced the same fustratings, I decide to build one, which is what I want to introduce here - FluentDataflow.
Introduction
FluentDataflow, in short, is a fluent style wrapper and extension of TPL Dataflow.
The only entry-point of this library is the IDataflowFactory interface. Through the fluent style API provided by a factory instance, the outputs of this factory are still instances of standard IDataflowBlock, such as ITargetBlock, ISourceBlock, IPropogatorBlock, etc, instances, each of which usually wraps a simple or even a complex custom dataflow.
I heard what you are shouting: “Show me the code!” So, let’s go!
Examples
- Simple aggregation example:
var splitter = new TransformBlock<string, KeyValuePair<string, int>>(input =>
{
string[] splitted = input.Split('=');
return new KeyValuePair<string, int>(splitted[0], int.Parse(splitted[1]));
});
var dict = new Dictionary<string, int>();
var aggregater = new ActionBlock<KeyValuePair<string, int>>(pair =>
{
int oldValue;
dict[pair.Key] = dict.TryGetValue(pair.Key, out oldValue) ? oldValue + pair.Value : pair.Value;
});
// the created the dataflow instance is also
// a strong-typed standard dataflow block
var dataflow = new DataflowFactory().FromPropagator(splitter)
.LinkToTarget(aggregater)
.Create();
dataflow.Post("a=1");
dataflow.Post("b=2");
dataflow.Post("a=5");
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
System.Console.WriteLine("sum(a) = {0}", dict["a"]); //prints sum(a) = 6
- A dataflow as part of another dataflow example
var factory = new DataflowFactory();
var aggregater = ... // Build an aggregator dataflow
var splitter = new TransformManyBlock<string, string>(line => line.Split(' '));
var dataflow = factory.FromPropagator<string, string>(splitter)
.LinkToTarget(aggregator)
.Create();
dataflow.Post("a=1 b=2 a=5");
dataflow.Post("c=6 b=8");
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
System.Console.WriteLine("sum(b) = {0}", result["b"]); //prints sum(b) = 10
var printer1 = new ActionBlock<string>(s => System.Console.WriteLine("Printer1: {0}", s));
var printer2 = new ActionBlock<string>(s => System.Console.WriteLine("Printer2: {0}", s));
var printer3 = new ActionBlock<string>(s => System.Console.WriteLine("Printer3: {0}", s));
var dataflow = new DataflowFactory().FromBroadcast<string>()
.LinkTo(printer1)
.LinkTo(printer2)
.LinkTo(printer3)
.Create();
dataflow.Post("first message");
dataflow.Post("second message");
dataflow.Post("third message");
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
// by default, with native TPL, if multiple sources link to the same target,
// if set PropagateCompletion=true,
// as long as one of the source complets, the target complets.
// the target will miss some of the messages from the other sources.
// BUT, with our dataflow factory here, when set PropagateCompletion=true,
// dataflow.Complete() internally waits for all the sources to complete,
// so the target is guaranteed to receive all the messages from all the sources
var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var source3 = new BufferBlock<string>();
var printer = new ActionBlock<string>(s => System.Console.WriteLine(s));
var dataflow = new DataflowFactory().FromMultipleSources(source1, source2, source3)
.LinkToTarget(printer)
.Create();
for (var i = 0; i < 3; ++i)
{
var s = i.ToString();
source1.Post(s);
source2.Post(s);
source3.Post(s);
}
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
- Linking with filter example
// the filter only accepts even numbers,
// so odd numbers goes to declined printer
var filter = new Predicate<int>(i =>
{
return i % 2 == 0;
});
var printer = new ActionBlock<int>(s => System.Console.WriteLine("printer: " + s.ToString()));
var declinedPrinter = new ActionBlock<int>(s => System.Console.WriteLine("declined: " + s.ToString()));
var inputBlock = new BufferBlock<int>();
var dataflow = new DataflowFactory().FromPropagator(inputBlock)
.LinkToTarget(printer
, filter
// when linking with filter, you have to specify a declined block
// otherwise, because there will be messages declined still in the queue,
// the current block will not be able to COMPLETE (waits on its Completion will never return)
, declinedPrinter)
.Create();
for (int i = 0; i < 10; ++i)
{
dataflow.Post(i);
}
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var printer = new ActionBlock<Tuple<string, string>>(s => System.Console.WriteLine("printer: {0},{1}", s.Item1, s.Item2));
var dataflow = new DataflowFactory().Join(source1, source2)
.LinkToTarget(printer)
.Create();
for (var i = 0; i < 3; ++i)
{
var s = i.ToString();
source1.Post(s);
source2.Post(s);
}
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
var source = new BufferBlock<string>();
var printer = new ActionBlock<IEnumerable<string>>(s => System.Console.WriteLine("printer: " +string.Join("|", s)));
var dataflow = new DataflowFactory().FromSource(source)
.Batch(2)
.LinkToTarget(printer)
.Create();
for (var i = 0; i < 6; ++i)
{
var s = i.ToString();
source.Post(s);
}
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
- If-else branching and merging example
var factory = new DataflowFactory();
// the ifFilter only accepts even numbers,
// so odd numbers goes to elseBlock
var ifFilter = new Predicate<int>(i =>
{
return i % 2 == 0;
});
var printer = new ActionBlock<int>(s => System.Console.WriteLine("printer: " + s.ToString()));
var inputBlock = new BufferBlock<int>();
// if meet ifFilter, convert to: i -> i * 10
var ifBlock = new TransformBlock<int, int>(i => i * 10);
// else, convert to: i -> i * 100
var elseBlock = new TransformBlock<int, int>(i => i * 100);
var branchingDataflow = factory.FromPropagator(inputBlock)
.LinkToPropagator(ifBlock, ifFilter, elseBlock)
.Create();
var mergeingDataflow = factory.FromMultipleSources(ifBlock, elseBlock)
.LinkToTarget(printer)
.Create();
//encapsulate branchingDataflow and mergeingDataflow
var dataflow = factory.EncapsulateTargetDataflow(branchingDataflow, mergeingDataflow);
for (int i = 0; i < 10; ++i)
{
dataflow.Post(i);
}
dataflow.Complete();
Task.WaitAll(dataflow.Completion);
17 Jun 2016
elasticsearch
logstash
kibana
The elasticsearch cluster was down!
Today I troubleshooted an Elasticsearch-cluster-down issue. We have a 3-node Elasticsearch cluster receiving hundreds of Giga of tracking data every day. And this afternoon, it was suddenly down and all our kibana dashboards failed to load any data from it.
From elasticsearch-kopf monitor, we could see more than half of the shards are unallocated, so it sounds like at least 2 nodes were just restarted for some reason. Coz of our cluster setting is each index has one primary and one replica, until at least the primary shards are allocated, the indices are not able to be loaded. The shards are being slowly allocated automatically. If I’m patient enough and just wait for a while, it should be recover by itself in my understanding. So I try to wait. After 10 minutes, some dashboards could display, which looks good. But after 30 minutes, from kopf, I could see the HEAP of the master node keeps increasing, and eventually full. And the entire cluster becomes no responsive again. Restart the master node, but the HEAP still keeps increasing and be full and cluster down again.
Is it because the shards being reallocated too slow?
Someone in my team is working one a kibana dashboard, and since it suddenly stopped working, he reached me and asked if I could help to do something to speed up the recovery. And I’m guessing. Is it because the shards were reallocated too slow? I have a script which calls the elasticsearch API to do force shard reallocation. When I created the script, it was for fixing very few shards which could not be reallocated by elasticsearch itself. It looks like below:
IFS=$'\n'
for line in $(curl -s 'server_name/es/_cat/shards' | fgrep UNASSIGNED); do
INDEX=$(echo $line | (awk '{print $1}'))
SHARD=$(echo $line | (awk '{print $2}'))
curl -XPOST 'server_name/es/_cluster/reroute' -d '{
"commands": [
{
"allocate": {
"index": "'$INDEX'",
"shard": '$SHARD',
"node": "ETELASTIC1",
"allow_primary": true
}
}
]
}'
# try again since the first try might fail if the target node already has a copy of the shard
curl -XPOST 'server_name/es/_cluster/reroute' -d '{
"commands": [
{
"allocate": {
"index": "'$INDEX'",
"shard": '$SHARD',
"node": "ETELASTIC2",
"allow_primary": true
}
}
]
}'
sleep 2
done
Restarted the master node and executed the script, and it keeps running. It’ll take some time coz I have 10K of unallocated shards. The reallocation becomes faster. But no luck that before it finishes, the HEAP became full again. So sounds like the guess is not correct.
What causes the HEAP increasing then?
Checked the log of elasticsearch, and it shows there are only very few small cheap reads during the cluster down & recovery time. Checked the nodes breaker API and the fielddata cache is also not big. So what else might cost elasticsearch memory? If not reads, then it might be writes?
Luckily we have centralized logstash server for parsing all different types of tracking logs and feeding them to elasticsearch servers, so I simply stopped all of them. Restarted the master node again, wait and eventually, the elasticsearch cluster recovered without HEAP issue. Great! Here it is. So it should be because we configured our logstash jobs to always try re-connect elasticsearch servers when server connection timeout or no response, and since we have many logstash jobs trying to reconnect, it costs too much elasticsearch server-side memory in HEAP.
Let’s try some kibana dashboards, WAIT! Why some of the dashboards returns 404? It is impossible that people deleted them during my troubleshooting, we rarely delete dashboards. It makes me nerves. Coz it looks like reallocation of elasticsearch shards might cause data loss. WTF! If that’s true, how could we truct elasticsearch anymore?
Always human’s fault!
Ok, ask google. And, found the reason. Usually, it is always human’s fault. I mean, my fault, not elasticsearch’s. It is because of the “allow_primary=true” option in my “force reallocation” script. Bascially, when all the shards of an index are unallocated yet, and if you do reroute with the “allow_primary=true” option, some shards might become empty, and you LOSS data! Luckily, it is some shards of the kibana metadata index were lost, so I realized this issue, if it is only some tracking index loss, it is much harder for me to be awared.
But I’m lucky enough, coz:
- I have daily backup of the kibana metadata index with elasticdump, and no much dashboard changes everyday;
- We keeps latest 3-day raw data of all the tracking logs, so I could easily re-index them;
Restored from the kibana metadata index backup. And finally, all the dashboards are back!
Summary
Several lessons were learned here:
- When many elasticsearch cluster nodes are restarted, to avoid HEAP spike, better to temporarily stop all connection attempts;
- Avoid setting allow_primary=true when reroute shards via API;
- Don’t forget backup! It could save you some day!
23 Feb 2016
rabbitmq
In previous post, I mentioned the discussion on StackOverflow regarding designing exchanges. Usually, when people ask about best practice of designing exchanges, you will get some useful suggestions, but finally you will be told that the final solution always “depends on your system needs”. Fair enough & safe enough answer, but not really helpful enough. So I want to extend this discussion a little bit here with more detailed backgrounds.
Before discussing any solutions, I’m trying to suggest some rules first. No matter which solutions to choose, these rules should be correct in most cases and should be followed if there are no compelling reasons.
Rule 1
Each queue should only represent one type of jobs. Do not mix different type of messages in one queue. And when this rule is followed, we could clearly name a queue with the job represented by it.
Rule 2
Avoid messgae re-dispatching on a queue. If you find that your subscriber is trying to re-dispatch any messages to other places without real processing, there might be something wrong. Routing or dispatching is the responsibility of exchanges rather than queues.
Rule 3
When not using the global default exchange, publishers should know nothing about queues. One of the essences of the AMQP design is the responsibility separation of exchange & queue, so that publishers don’t need to care about message comsuption, and comcumers don’t need tocare about where comes the messages at all. So when designing exchanges, the concepts only related to publishing, such as routing keys & message headers, should not has any knowledge of the queues they will finally been forwarded to.
Now let’s focus on the examples. The scenario is that you want to design exchanges and queues for “user” related write events. The write events will be triggered in one or many apps and these messages are to be consumed by some other apps.
| object | event |
|------------------|
| user | created |
| user | updated |
| user | deleted |
The first question people usually ask is for different events of one object (the “user” object in this example), should we use one exchange for publishing all the 3 events, or use 3 separate exchanges for each event? Or, in short, one exchange, or many?
Before answering this question. I actually want to ask another question: do we really even need a custom exchange for this case?
Different types of object events are so natual to match different types of messages to be published, but it is not really necessary sometimes. What if we abstract all the 3 types of events as a “write” event, whose sub-types are “created”, “updated” and “deleted”?
| object | event | sub-type |
|-----------------------------|
| user | write | created |
| user | write | updated |
| user | write | deleted |
Solution 1
The simplest solution to support this is we could only design a “user.write” queue, and publish all user write event messages to this queue directly via the global default exchange. When publishing to a queue directly, the biggest limitation is it assumes that only one app subscribes to this type of messages. Multiple instances of one app subscribing to this queue is also fine.
| queue | app |
|-------------------|
| user.write | app1 |
Solution 2
The simplest solution could not work when there is a second app (having different processing logic) want to subscribe to any messages published to the queue. When there are multiple apps subscribing, we at least need one “fanout” type exchange with bindings to multiple queues. So that messages are published to the excahnge, and the exchange duplicates the messages to each of the queues. Each queue represents the processing job of each different app.
| queue | subscriber |
|-------------------------------|
| user.write.app1 | app1 |
| user.write.app2 | app2 |
| exchange | type | binding_queue |
|---------------------------------------|
| user.write | fanout | user.write.app1 |
| user.write | fanout | user.write.app2 |
This second solution works fine if each subscriber does care about and want to handle all the sub-types of “user.write” events or at least to expose all these sub-type events to each subscribers is not a problem. For instance, if the subscriber app is for simply keeping the transction log; or although the subscriber handles only user.created, it is ok to let it know about when user.updated or user.deleted happens. It becomes less elegant when some subscribers are from external of your organization, and you only want to notify them about some specific sub-type events. For instance, if app2 only wants to handle user.created and it should not have the knowledge of user.updated or user.deleted at all.
Solution 3
To solve the issue above, we have to extract “user.created” concept from “user.write”. The “topic” type of exchange could help. When publishing the messages, let’s use user.created/user.updated/user.deleted as routing keys, so that we could set the binding key of “user.write.app1” queue be “user.*” and the binding key of “user.created.app2” queue be “user.created”.
| queue | subscriber |
|---------------------------------|
| user.write.app1 | app1 |
| user.created.app2 | app2 |
| exchange | type | binding_queue | binding_key |
|-------------------------------------------------------|
| user.write | topic | user.write.app1 | user.* |
| user.write | topic | user.created.app2 | user.created |
Solution 4
The “topic” exchange type is more flexible in case potentially there will be more event sub-types. But if you clearly know the exact number of events, you could also use the “direct” exchange type instead for better performance.
| queue | subscriber |
|---------------------------------|
| user.write.app1 | app1 |
| user.created.app2 | app2 |
| exchange | type | binding_queue | binding_key |
|--------------------------------------------------------|
| user.write | direct | user.write.app1 | user.created |
| user.write | direct | user.write.app1 | user.updated |
| user.write | direct | user.write.app1 | user.deleted |
| user.write | direct | user.created.app2 | user.created |
Come back to the “one exchange, or many?” question. So far, all the solutions use only one exchange. Works fine, nothing wrong. Then, when might we need multiple exchanges? This link summarizes some valuable ideas about performance and scalability considerations. If performance difference of too many bindings on “topic exchange” really becomes an issue, of course you could use more “direct” exchanges to reduce number of “topic” exchange bindings for better performance. But, here I want to focus more on function limitations of “one exchange” solutions.
Solution 5
One case we might natually consider multiple exchanges is for different groups or dimensions of events. For instance, besides the created, updated and deleted events memtioned above, if we have another group of events: login and logout - a group of events describing “user behaviors” rather than “data write”. Coz different group of events might need completely different routing strategies and routing key & queue naming conventions, it is so that natual to have a separate user.behavior exchange.
| queue | subscriber |
|----------------------------------|
| user.write.app1 | app1 |
| user.created.app2 | app2 |
| user.behavior.app3 | app3 |
| exchange | type | binding_queue | binding_key |
|--------------------------------------------------------------|
| user.write | topic | user.write.app1 | user.* |
| user.write | topic | user.created.app2 | user.created |
| user.behavior | topic | user.behavior.app3 | user.* |
Other Solutions
There are other cases when we might need multiple exchanges for one object type. For instance, if you want to set different permissions on exchanges (e.g. only selected events of one object type are allowed to be published to one exchange from external apps, while the other exchange accepts any the events from internal apps). For another instance, if you want to use different exchanges suffixed with a version number to support different versions of routing strategies of same group of events. For another another instance, you might want to define some “internal exchanges” for exchange-to-exchange bindings, which could manage routing rules in a layered way.
In summary, still, “the final solution depends on your system needs”, but with all the solution examples above, and with the background considerations, as said in the performance and scalability consideration link, I also hope it could at least get one thinking in the right directions.