February 2, 2020

Updating Goroutines post-creation

One of the greatest things about Go is how it deals with concurrency. It is far simpler compared to other languages. It uses so-called goroutines - a lightweight thread managed by the Go runtime. While they are mostly used for asynchronous, fire-and-forget stuff (most common usage being HTTP multiplexers), I recently needed to have them updated post creation. The obvious first choice in Go would be to use channels, but trying to build the solution with them caused me some issues, which made me opt for a far simpler one - maps.

I have a client that runs a food-ordering business as an SaaS. It worked for a few years as a monolith written in Java, but a year ago the client decided to have everything rewritten in Go, using microservice architecture. One of the services needed for online food-ordering business is having (thermo)printers, that print orders for the restaurants. Not sure why, but the personnel responsible for managing orders prefer them in a paper-from instead of digital. I’ve noticed this with local food-ordering businesses as well.

Once the order is created on the website / mobile app, the restaurant gets the whole order printed with all necessary information included. Bigger ones tend to have multiple printers, organized by meal type, e.g. soups, salads, warm/cold dishes, desserts.

In short, the service needs to constantly poll print jobs via API or sockets, and if there are any print them immediately. A common scenario is that the service fetches multiple jobs for a single printer, and while the printer is still printing orders, new print jobs are fetched and queued.

My first approach was to use channels - pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.

The code for managing jobs looked similar to this

var queue = chan map[string]job
func main(){
 cfg := config.Load(path)
 queue = make(chan map[string]job, 1000)
 go func(){
 for{
 printerJobs, err := api.FetchJobs()
 if err != nil{
 // handle error
 }
 queue <- printerJobs
 time.Sleep(cfg.Interval)
 }
 }()

 go func(){
 for (){
 for printerJobs := range queue {
 for printer, jobs := rangePrinterJobs{
 go func() {
 a.print(printer, jobs)
 }()
 }
 }
 time.Sleep(cfg.Interval)
 }
 }
 blockForever()
}

func blockForever() {
 select {}
}

func print(printer string, jobs []job){
 // connect to printer
 for _, job := range jobs{
 // print the job
 }
 // close the connection
}

It worked quite fine in this manner, but once the print function was called with jobs for a printer, it couldn’t be updated. Instead, if new jobs for the same printer were fetched, it would print them in parallel which caused all kinds of issues. The implementation could be updated to keep using channels and synchronously have the printer print content but given my lack of expertise with concurrency in Go and a need for a simpler solution I turned my view to maps.

The solution is not necessarily tied to maps, other data types like arrays could be used as well. The idea was simple, have the map being updated in thread A and read by another, thread B. As the thread A continues to add values to map, thread B will continue to read them until there aren’t any left.

Some things had to be overlooked before the implementation was complete. As I ranged through an array of jobs, simply updating the map wouldn’t work. The values it gets from ranging aren’t updated if the array is updated in another goroutine.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
 var countryCities = map[string][]string{}

 func main() {
 countryCities = make(map[string][]string)
 countryCities["England"] = []string{"London", "Leeds"}
 go func() {
 for country, cities := range countryCities {
 for _, city := range cities {
 fmt.Println(country, city)
 time.Sleep(time.Second * 2)
 }
 }
 }()
 go func() {
 countryCities["England"] = append(countryCities["England"], "Sheffield")
 cities := countryCities["England"]
 cities = append(cities, "Glasgow")
 countryCities["England"] = cities
 }()
 time.Sleep(time.Second * 10)
 }

The code above prints out two cities, “London” and “Leeds” and doesn’t print out Sheffield or Glasgow. The reason is simple, the cities are already extracted from the map on line 7. They aren’t being updated, instead what’s being updated is the values in the map. The solution is quite simple, range over the map by indexes, check if it exists and if it doesn’t delete the map and return.

var countryCities = map[string][]string{}

func main() {
 countryCities = make(map[string][]string)
 countryCities["England"] = []string{"London", "Leeds", "Liverpool", "Bristol", "Cardiff"}
 go func() {
 for country := range countryCities {
 go func() {
 printCity(country)
 }()
 }
 }()
 go func() {
 for _, city := range []string{"Sheffield", "Glasgow", "Manchester", "Bradford", "Edinburgh"} {
 countryCities["England"] = append(countryCities["England"], city)
 time.Sleep(time.Millisecond * 1000)
 }
 }()
 time.Sleep(time.Second * 5)
}

func printCity(country string) {
 var index int
 for {
 if len(countryCities[country])-1 < index {
 time.Sleep(2 * time.Second)
 fmt.Println("Reached the end")
 delete(countryCities, country)
 return
 }
 fmt.Println(countryCities[country][index])
 index++
 time.Sleep(time.Millisecond * 300)
 }
}

Now, it prints out all cities as it should. There is one problem needed solving to make this solution ready. Bradford and Edinburgh aren’t printed above, and will never be printed as the map is deleted once it completed processing. To prevent this we use locks - mutexes. The final solution looks somewhat similar to the gist below:

The mutexes allow us to prevent writing/reading in any goroutine as long as the mutex is locked. Thus if any city remains unprinted, it will be picked up later on and be printed in another thread. The solution with the map turned out to be simpler and more readable than using channels.

2024 © Emir Ribic - Some rights reserved; please attribute properly and link back. Code snippets are MIT Licensed

Powered by Hugo & Kiss.