It happened on a project I was working on that we had to implement a divide & conquer algorithm using our background jobs processor, in this case we were using sidekiq. Implementing this using sidekiq can be quite challenging since all the workers are independent and they do not trigger any callback once they’re done.
I am going to use merge-sort as the example here since it is a quite simple application of the divide & conquer. I am not going to explain in detail how it works but it basically consists of splitting an array in two parts and them merging them sorted, recursively.
A very simple way of getting merge-sort to work on sidekiq is having one single worker that does all the job, note that I’m using two abstract service classes here
MergesArrays, imagine that things can go wrong inside of these services, since they are not controlled by us and can live somewhere in the internet, maybe, they can fail or be slow.
class MergeSortWorker include Sidekiq::Worker def sort(array) return array if array.count <= 1 left, right = SplitsArray.split(array) MergesArrays.merge sort(left), sort(right) end def perform(array) SortedArray.create array: sort(array) end end
If you think about this implementation, let’s say SplitsArray is a very flaky service and it fails once every few tries, do we want it to retry the whole worker from scratch? We can potentially be retrying this over and over again and maybe never see the result. That is exactly the problem we had and we needed each step to be independent. Not to mention that if we can parallelize this process to gain performance it’s another win.
So here is the initial idea, what if we have a worker to split arrays, and a worker to merge them? That could work right? Here is a pseudo implementation of that:
class MergeSortWorker include Sidekiq::Worker def perform(array) array = SortWorker.perform_async array SortedArray.create array: array end end class SortWorker include Sidekiq::Worker def perform(array) return array if array.count <= 1 left, right = SplitsArray.split(array) left_sorted = SortWorker.perform_async left right_sorted = SortWorker.perform_async right MergeWorker.perform_async left_sorted, right_sorted end end class MergeWorker include Sidekiq::Worker def perform(left, right) MergesArrays.merge left, right end end
And this is a visual representation of what it looks like when executing.
There is only one problem: workers are asynchronous and they do not have return values, so our code can’t possibly work like that, we have to store some kind of state to know when we’re done with a worker so we can merge back the arrays. There is no tool as of now to do that with sidekiq (couldn’t find anything for resque either) out of the box.
To solve this problem you can manually track the state of the job, thats essentially what we did in our case, we could end up with something like this:
class MergeSortWorker include Sidekiq::Worker def perform(array) SortWorker.perform_async TempArray.create(array: array).id end end class SortWorker include Sidekiq::Worker def finish(temp_array) temp_array.sorted = true temp_array.save MergeWorker.perform_async temp_array.parent.id end def perform(array_id) temp_array = TempArray.find array_id array = temp_array.array finish(temp_array) and return if array.count <= 1 left, right = SplitsArray.split(array) temp_left = temp_array.children.create(array: left, sorted: false) temp_right = temp_array.children.create(array: right, sorted: false) SortWorker.perform_async temp_left.id SortWorker.perform_async temp_right.id end end class MergeWorker include Sidekiq::Worker def finish(parent) if parent.parent MergeWorker.perform_async parent.parent.id else s = Redis::Semaphore.new(:creating_sorted_array, connection: "localhost") s.lock do parent.reload rescue return SortedArray.create array: parent.array parent.destroy end end end def perform(parent_id) parent = TempArray.includes(:parent).where(id: parent_id).first return if parent.nil? finish(parent) and return if parent.sorted? if parent.children.present? && parent.children.all?(&:sorted?) left = parent.children.first.array right = parent.children.last.array parent.children.destroy_all parent.array = MergesArrays.merge left, right parent.sorted = true parent.save finish parent end end end
The complexity of our workers grew considerably, but it is now fail proof and parallelizable, solving our previous problems and getting us somewhere closer to that graphical representation above. Note that we added the model
TempArray to the equation, that is where we save our interstitial states.
We have knowledge about synchronization and thread safety all over the place now, we have that redis-semaphore gem to ensure it. Each worker is now trying stuff without the certainty of success, it eventually succeeds, tho. We are instantiating superfluos workers then giving up on them once we figure out the arrays “are not ready”, and it got much more difficult to understand what the code is doing.
The responsibilities of each worker are a bit shady right now because of the nature of the solution, it is definitely not ideal, but sometimes when you need a robust and efficient solution you have to make some sacrifices.
I still think this should be easier, I’m sure the code I wrote as an example here could be refactored to look nicer but still, this shouldn’t be such a pain to achieve, that is why I’m starting my own gem to solve this problem. More on that next week.
It has been pointed out to me by the author of sidekiq that sidekiq-pro is capable of creating batch jobs and keeping track of them. That is true. You can keep track of the batches using the pub/sub part of the notifications feature.
All the files I used for this are available on this gist here: https://gist.github.com/luan/5610299