🦿Solving Concurrency Problems with Redis and Golang
date
Nov 6, 2023
slug
solving-concurrency-problems-with-redis-and-golang
status
Published
tags
Chia sẻ
Sưu tầm
Study
summary
To better undestand the issue, we will use a simple example. Imagine we are building an application that buys stock shares for different companies. Different users can buy a number of shares as long as the company has that remaining number of shares available.
type
Post
Problem Definition
To better undestand the issue, we will use a simple example. Imagine we are building an application that buys stock shares for different companies. Different users can buy a number of shares as long as the company has that remaining number of shares available.
Given that statement, we can build a data layer (our Repository) using the goredis client that has the following definition:
type Repository struct { client goRedis.Client } var _ go_redis_concurrency.Repository = (*Repository)(nil) func NewRepository(address) Repository { return Repository{ client: *goRedis.NewClient(&goRedis.Options{ Addr: address, }), } } func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error { defer wg.Done() // --- (1) ---- // Get current number of shares currentShares, err := tx.Get(ctx, companySharesKey).Int() if err != nil { fmt.Print(err.Error()) return err } // --- (2) ---- // Validate if the shares remaining are enough to be bought if currentShares < numShares { fmt.Print("error: company does not have enough shares \n") return errors.New("error: company does not have enough shares") } currentShares -= numShares // --- (3) ---- // Update the current shares of the company and log who has bought shares r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0) return nil }
As we can see, the BuyShares operation has 3 steps:
- Recover current free shares
- Validate if the user can buy shares
- Update company free shares
All that sounds good, but let’s put it to the test. Imagine that multiple users try to buy shares simultaniously. We can simulate this behavior by running this BuyShares function in an asynchronous manner, multiple times. Something like the following:
const ( total_clients = 30 ) func main() { // --- (1) ---- // Init the repository repository := redis.NewRepository(fmt.Sprintf("XXXXXX:XXXXX", config.Redis.Host, config.Redis.Port), config.Redis.Pass) // --- (2) ---- // Run concurrent clients that buy shares companyId := "TestCompanySL" var wg sync.WaitGroup wg.Add(total_clients) for idx := 1; idx <= total_clients; idx++ { userId := fmt.Sprintf("user%d", idx) go repository.BuyShares(context.Background(), userId, companyId, 100, &wg) } wg.Wait() // --- (3) ---- // Get the remaining company shares shares, err := repository.GetCompanyShares(context.Background(), companyId) if err != nil { panic(err) } fmt.Printf("the number of free shares the company %s has is: %d", companyId, shares) }
This main function runs 30 go routines that each execute the BuyShares operation. What would we expect to happen here if we give TestCompanySL 1000 shares and each user buys 100 shares? The logical thing would be to have 10 buyers buyout all the shares (that means that the last line prints out 0) and the rest of them to get an error saying that “error: company does not have enough shares”. If you run this code however, you will see that we receive something like the following:
Let’s run it again! After the second one, the following output is produced:
You can see the problem here right? All the users see that the company has 1000 shares when they recover them. This means that they all deduct the 100 shares and update. This however is wrong since it is not taking into account all the other operations that are occurring at the same time and their reduction of the shares.
Solutions
Atomic Operations
Spoiler alert, this solution is not going to work for our use case but it’s worth mentioning it. Let’s assume that we want to use an atomic operator to perform the decrement of shares once a user buys them (basically remove the currentShares -= numShares and let Redis handle that). To do this, we could use the IncrBy operator that Redis offers like the following:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error { defer wg.Done() // --- (1) ---- // Get current number of shares currentShares, err := tx.Get(ctx, companySharesKey).Int() if err != nil { fmt.Print(err.Error()) return err } // --- (2) ---- // Validate if the shares remaining are enough to be bought if currentShares < numShares { fmt.Print("error: company does not have enough shares \n") return errors.New("error: company does not have enough shares") } // --- (3) ---- // Update the current shares of the company and log who has bought shares r.client.IncrBy(ctx, BuildCompanySharesKey(companyId), -1*int64(numShares)) return nil }
Looks good, lets run it!
Well, either Wallstreet allows us to buy extra shares or we have some issue with this solution…
If we had a scenario where we did not need to read the data and do some sort of manipulation before storing it, we could use atomic updates. However, the problem here is that we validate AFTER we have recovered the value. This means that if all the buyers have recovered 1000 shares, then all the validations will go through and since the write is now atomic it updates against the existing value in Redis thus reducing the shares even thought we are already out.
Transactions
As per the definition by the official Redis page:
Redis Transactions allow the execution of a group of commands in a single step, they are centered around the commands MULTI, EXEC, DISCARD and WATCH.
Each command mentioned serves a specific purpose:
- MULTI. Indicates the definition of a new transaction
- EXEC. Executes the defined transaction.
- DISCARD. Clears all previously queued commands in the transaction.
- WATCH. Marks the selected key to be watched for execution.
For more information on Redis Transactions, you can check their official web page.
You may be thinking, how can this help our use case and assure concurrency? Well thats where the WATCH command enters in play.
Our “problematic” key is the key that stores each company’s free shares. The problem here is that a client can update the key after others have read the “un-updated” value. If we encapsulate all of our use case, inside a transaction and add a WATCH command to the company free shares key, every time transactions run simultaneously and one updates the value before the other one does, when the second one runs the update command, that transaction will fail. This is due to the fact that the key being watched has been updated.
The implementation in Go of a transaction can be the following:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error { defer wg.Done() companySharesKey := BuildCompanySharesKey(companyId) err := r.client.Watch(ctx, func(tx *goredislib.Tx) error { // --- (1) ---- // Get current number of shares currentShares, err := tx.Get(ctx, companySharesKey).Int() if err != nil { fmt.Print(fmt.Errorf("error getting value %v", err.Error())) return err } // --- (2) ---- // Validate if the shares remaining are enough to be bought if currentShares < numShares { fmt.Print("error: company does not have enough shares \n") return errors.New("error: company does not have enough shares") } currentShares -= numShares // --- (3) ---- // Update the current shares of the company and log who has bought shares _, err = tx.TxPipelined(ctx, func(pipe goredislib.Pipeliner) error { // pipe handles the error case pipe.Pipeline().Set(ctx, companySharesKey, currentShares, 0) return nil }) if err != nil { fmt.Println(fmt.Errorf("error in pipeline %v", err.Error())) return err } return nil }, companySharesKey) return err }
If we run this, we get the following output:
What does this mean? Well, we still have users who cannot buy free shares even thought these are available… this is due to the fact that these users entered the transaction and started “watching” the specific Redis key while another one updated the key. In doing so, all of these transactions were marked as failed and their update was rolled back.
Even if that was not the problem, we see that this approach is not doing the job since all of the users enter the transaction and see that the shares are 1000 and the validation is not performed correctly.
LUA Scripts
LUA Scripts are fragments of code written in the LUA language designed for embedded use inside applications. That is exactly why they can be used here, since they can run inside the Redis server. This, in combination with the fact that all LUA scripts run sequentially, provides for a possible concurrency solution.
I will not go into much detail on LUA scripting since it’s relatively similar to most scripting languages out there (with slight differences obviously) but you can find a detailed explanation on the LUA syntax here and here.
The script we have to write to solve our problem is basically the same as the definition we have in the BuyShares function:
- Recover current free shares from Redis
- Validate if the user can buy shares
- Update company free shares to Redis
This can be translated into the following LUA script:
local sharesKey = KEYS[1] local requestedShares = ARGV[1] local currentShares = redis.call("GET", sharesKey) if currentShares < requestedShares then return {err = "error: company does not have enough shares"} end currentShares = currentShares - requestedShares redis.call("SET", sharesKey, currentShares)
The first 2 lines recover the script’s inputs (KEYS gets the Redis keys and ARGV the arguments, both of which we need to indicate). Then we recover the current shares using the redis.Call(“GET”) and, after validating that the shares can be bought, we update the shares using redis.Call(“SET”).
To define the script in Go and run it, we can use the following example:
var BuyShares = goRedis.NewScript(` local sharesKey = KEYS[1] local requestedShares = ARGV[1] local currentShares = redis.call("GET", sharesKey) if currentShares < requestedShares then return {err = "error: company does not have enough shares"} end currentShares = currentShares - requestedShares redis.call("SET", sharesKey, currentShares) `) func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error { defer wg.Done() keys := []string{BuildCompanySharesKey(companyId)} err := BuyShares.Run(ctx, r.client, keys, numShares).Err() if err != nil { fmt.Println(err.Error()) } return err }
If we run the code above we get the following output:
20 users couldn’t buy shares, 10 could and bought all the shares. Well what do you know? Looks like we finally have a solution for our problem!
Since the LUA script is performed in the Redis server level and two scripts cannot run simultaneously, each buyer has to wait until it can proceed to get, validate and buy (if available) it’s shares thus solving the concurrency problem we were having.
Locks
We saw that a LUA script can solve the concurrency issue we were having but it does come with some disadvantages:
- LUA Scripts block all other transactions to the Redis server while the script is executing. This means that if other services need to perform operations on the server, something completely different that our example, these will be blocked as well.
- Adding a LUA Script requires the developer to know the LUA syntax and brings all the disadvantages that come with the addition of a new language.
But what about a simple semaphore like distributed solution? Something that locks the critical section in a way that only one process enters, locks it, does it’s job and then unlocks it. Well Redis locks does just that.
In a sense it’s a mutex whose value is stored in Redis. To write this solution, we can use the specific Golang library recommend by Redis itself.
Following the implementation proposed in the package README.md, we can make some modifications to the Repository struct initialization in order to make our mutex value global.
func NewRepository(address, password string) Repository { client := goredislib.NewClient(&goredislib.Options{ Addr: address, Password: password, }) pool := goredis.NewPool(client) rs := redsync.New(pool) mutexname := "my-global-mutex" mutex := rs.NewMutex(mutexname) return Repository{ client: client, mutex: mutex, } }
After adding this, the implementation can be the following:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error { defer wg.Done() // Obtain a lock for our given mutex. After this is successful, no one else // can obtain the same lock (the same mutex name) until we unlock it. if err := r.mutex.Lock(); err != nil { fmt.Printf("error during lock: %v \n", err) } defer func() { if ok, err := r.mutex.Unlock(); !ok || err != nil { fmt.Printf("error during unlock: %v \n", err) } }() // --- (1) ---- // Get current number of shares currentShares, err := r.client.Get(ctx, BuildCompanySharesKey(companyId)).Int() if err != nil { fmt.Print(err.Error()) return err } // --- (2) ---- // Validate if the shares remaining are enough to be bought if currentShares < numShares { fmt.Print("error: company does not have enough shares \n") return errors.New("error: company does not have enough shares") } currentShares -= numShares // --- (3) ---- // Update the current shares of the company and log who has bought shares r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0) return nil }
Let’s run it and see what happens
Looks like we got another winner! This implementation also solves the concurrency problem in themanner we have expected. This is due to the fact that the mutex block all the requests to the critical section while another process (or request) is inside it, allowing us to assure that the shares will be bought and the availability of these will be correctly asserted.
Conclusions
Even though some of the approaches we mentioned in this article didn’t solve the problem we were having, this does not mean that Redis does not have the potential to solve many concurrency problems in distributed systems. There are use cases where Transactions and Atomic operators will correctly solve the problem in hand.
When selecting between the different solutions proposed one should value other aspects in combination with the correct solution to the problem. For example, one aspect that I did not mention was the performance comparison between the LUA Scripts solution and the Redis Lock one. While both of them correctly handle the concurrency scenario presented, the Redis Lock implementation is somewhat slower. This is due to the fact that the LUA Script execution is performed at the Redis server level giving it much better performance.
Overall I think all of these solutions are a great example of Redis usage and it was fun to play around with them and see how they work in Go.
You can find the complete project described in this article in my GitHub account here.
Happy coding :)