Concurrency in Goπ
- Concurrency means breaking a process into independent components that safely share data.
- Most languages use OS-level threads with locks for concurrency.
- Go uses Communicating Sequential Processes (CSP), introduced by Tony Hoare in 1978.
- CSP patterns make concurrency simpler and easier to understand.
When to Use Concurrencyπ
- Concurrency should be used only when necessary; it does not always improve speed.
- New Go developers often misuse goroutines, leading to deadlocks and inefficiencies.
- Concurrency β Parallelism: More concurrency doesnβt always mean faster execution.
- Amdahlβs Law: Only the parallel portion of a task benefits from concurrency.
- Use concurrency when:
- Multiple independent operations need to be combined.
- Tasks involve I/O (disk, network) since they are much slower than memory operations.
- Code must meet strict time constraints (e.g., web service calls under 50ms)
- Always benchmark to determine if concurrency actually improves performance.
Goroutinesπ
- Goroutines are lightweight threads managed by the Go runtime.
- The Go scheduler assigns goroutines to OS threads efficiently.
- Benefits:
- Faster than creating OS threads.
- Uses less memory with smaller, dynamically growing stacks.
- Faster context switching since it avoids OS-level scheduling.
- Works with the garbage collector and network poller for better optimization.
- Can launch thousands of goroutines without performance issues.
- How to Use Goroutines
- Use go before a function call to launch it as a goroutine.
go someFunc()
- Unlike JavaScriptβs async functions, any function in Go can be a goroutine.
- Prefer launching goroutines inside closures for better structure.
Example: Concurrent Processing with Goroutines
- Uses channels to pass values between goroutines.
- Keeps business logic (process()) separate from concurrency logic.
- Ensures modularity and testability.
func process(val int) int {
return val * 2 // Example: Doubling the value
}
func processConcurrently(inVals []int) []int {
in := make(chan int, 5)
out := make(chan int, 5)
// Launch 5 worker goroutines
for i := 0; i < 5; i++ {
go func() {
for val := range in {
out <- process(val)
}
}()
}
// Load data into the in channel
go func() {
for _, val := range inVals {
in <- val
}
close(in) // Close in channel to signal no more data
}()
// Read processed data
results := make([]int, 0, len(inVals))
for i := 0; i < len(inVals); i++ {
results = append(results, <-out)
}
return results
}
func main() {
input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
output := processConcurrently(input)
fmt.Println(output) // Example output: [2 4 6 8 10 12 14 16 18 20]
}
Key Takeaways
- Use concurrency wiselyβnot all tasks benefit from it.
- Goroutines are cheap, but improper use can lead to complexity.
- Benchmark first, then decide if concurrency helps.
- Use channels for communication and avoid shared memory when possible.
Channelsπ
- Channels are used in communication by goroutine.
- Channels are a built-in type.
- Like Maps, channels are reference types.
- Zero value of Channels:
nil
Reading, Writing, and Bufferingπ
Channel Interaction (<-
)π
- Read from a channel:
a := <-ch
- Write to a channel:
ch <- b
- Each value written to channel is consumed only once.
- If multiple goroutines read from the same channel, only one will receive each value.
Directional Channelsπ
- Read-only channel:
ch <-chan int
(goroutine can only read) - Write-only channel:
ch chan<- int
(goroutine can only write) - Helps the Go Compiler enforce proper channel usage
Unbuffered Channelπ
- Blocking Behaviour
- A write pauses until another goroutine reads.
- A read pauses until another goroutine writes.
- Requires atleast two concurrently running goroutines.
Buffered Channelπ
- Created with fixed buffer size:
ch := make(chan int, 10)
- Can hold values temporarily before being read.
- Blocking Behaviour
- Writing blocks when the buffer is full
- Reading blocks when the buffer is empty
Channel Propertiesπ
len(ch)
: Number of elements in the buffercap(ch)
: Maximum Buffer Size- Unbuffered Channels return 0 for both length and capacity
General Recommendationπ
- Prefer unbuffered channels in most cases.
- Buffered channels are useful when handling bursts of data without immediate consumers.
Using for-range and Channelsπ
- The loop extracts values from channel until it is closed.
- Blocking Behaviour
- If no value is available, goroutine pauses until a value is sent or channel is closed.
- Loop Termination
- The loop stops when the channel is closed.
- Can also be manually stopped using break or return.
- Unlike other for-range loops, only a single variable (value) is used, since channels do not have keys.
Closing a Channelπ
- Use
close(ch)
to close a channel when no more data is will be sent. - Writing to a closed channel causes panic
- Closing an already closed channel also causes panic
Reading from a Closed Channelπ
- Always succeeds (does not panic)
- If the channel is buffered, remaining values are read in order.
- If the channel is empty, the zero value of the channelβs type is returned.
Detecting a Closed Channelπ
ok == true
: channel is open, v contains a valid valueok == false
: channel is closed, v contains a zero value- Always use this when reading from a potentially closed channel
Who closes the Channel ?π
- The sending goroutine is responsible for closing the channel.
- Closing is necessary only if a receiver is waiting for the channel to close (e.g., a for-range loop).
- Otherwise, Go garbage collects unused channels automatically.
Why Channels Matter in Go?π
- Go avoids shared mutable state (unlike other languages using global shared memory for concurrency).
- Channels clarify data dependencies, making concurrent code easier to reason about.
- They encourage a staged pipeline approach to processing data.
Understanding How Channels Behaveπ
State | Read Behaviour | Write Behaviour | Close Behaviour |
---|---|---|---|
Unbuffered, Open | Waits until something is written | Waits until something is read | Works |
Unbuffered, Closed | Returns zero value (use comma ok to check) | PANIC | PANIC |
Buffered, Open | Waits if buffer is empty | Wait if buffer is full | Works |
Buffered, Closed | Reads remaining values, then returns zero value | PANIC | PANIC |
Nil Channel | Hangs Forever | Hangs Forever | PANIC |
Closing a Channel Properlyπ
- The writing goroutine should close the channel when no more data is sent.
- Multiple writers? β Use sync.WaitGroup to prevent multiple close() calls.
Nil Channelsπ
- Reads and writes hang forever.
- Useful in some cases (e.g., turning off a select case).
selectπ
- Purpose
- select lets a goroutine read from or write to multiple channels at once.
- Prevents starvation (all cases are considered randomly, ensuring fairness).
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
case ch3 <- x:
fmt.Println("Wrote", x)
case <-ch4:
fmt.Println("Received from ch4, but ignored")
}
- Each case must involve a channel operation (<-ch for read, ch <- x for write).
- If multiple cases are ready, one is chosen randomly.
- If no cases are ready, select blocks until one can proceed.
Avoiding Deadlocks with selectπ
- A deadlock occurs when all goroutines are waiting indefinitely.
- Example of deadlock (caused by circular waiting):
func main() {
ch1 := make(chan int) // Unbuffered channel ch1
ch2 := make(chan int) // Unbuffered channel ch2
go func() { // Goroutine starts
ch1 <- 1 // Step 1: Sends 1 to ch1 (blocks until read)
fmt.Println(<-ch2) // Step 3: Reads from ch2 (waits indefinitely)
}()
ch2 <- 2 // Step 2: Sends 2 to ch2 (blocks until read)
fmt.Println(<-ch1) // Step 4: Reads from ch1 (waits indefinitely)
}
- Error: fatal error: all goroutines are asleep - deadlock!
- ch1 and ch2 are waiting on each other to proceed.
- Fix using select to avoid deadlock:
- Ensures at least one operation can proceed, breaking deadlock.
Using select in Loops (for-select Loop)π
Common Pattern
- Exiting condition is required (done channel in this case).
- Using default in select
- Non-blocking behavior (does not wait if ch has no value).
- Caution: Avoid using default inside for-select, as it wastes CPU by looping infinitely.
Concurrency Practices and Patternsπ
Keep Your APIs Concurrency-Free]π
- Concurrency should be hidden as an implementation detail in the APIs
- Avoid exposing channels or mutexes in API Types, functions and methods.
- User should not have to manage channels(e.g. buffering, closing)
- Exception: Concurrency helper libraries may expose channels
Goroutines, for Loops and Varying Variablesπ
- Before Go 1.22: Loops reused the same variable, causing goroutines to capture the final value instead of different values.
- Go 1.22 Fix: Each iteration now gets a new copy of the loop variable.
- Workarounds for older versions:
- Shadow the variable: v := v inside the loop.
- Pass as a function parameter: go func(val int) { ch <- val * 2 }(v).
- Rule of Thumb: Always pass copies of changing variables into closures.
Always Clean up your Goroutinesπ
- Goroutines must eventually exit, or they cause memory leaks.
- Example issue: A blocked goroutine waiting for channel input may never exit. Consider a case where you call a generator to generate 10 numbers, but the for loop that called the generator exits on 5 numbers. Then the generator coroutine will be blocked.
Use the Context to Terminate Goroutinesπ
context.Context
is used to signal goroutines to exit.- Modify loops in goroutines.
// passing Context helps clean up goroutines
select {
case <-ctx.Done(): return // Gracefully exit
case ch <- i: // Continue sending data
}
Know when to use Buffered and Unbuffered Channelsπ
- Buffered Channels are useful to limit number of goroutine that will be launched limiting number of amount of work that is queued up.
Implement Backpressureπ
- Systems perform better overall when their components limit the amount of work they are willing to perform.
- Buffered Channel and
select
statements can limit number of subsequent requests in a system.
type PressureGauge struct {
ch chan struct{}
}
func New(limit int) *PressureGauge {
return &PressureGauge{
ch: make(chan struct{}, limit),
}
}
func (pg *PressureGauge) Process(f func()) error {
select {
case pg.ch <- struct{}{}:
f()
<-pg.ch
return nil
default:
return errors.New("no more capacity")
}
}
// example: ratelimiter in golang
func doThingThatShouldBeLimited() string {
time.Sleep(2 * time.Second)
return "done"
}
func main() {
pg := New(10)
http.HandleFunc("/request", func(w http.ResponseWriter, r *http.Request) {
err := pg.Process(func() {
w.Write([]byte(doThingThatShouldBeLimited()))
})
if err != nil {
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte("Too many requests"))
}
})
http.ListenAndServe(":8080", nil)
}
Turn Off a case in a selectπ
Handling Closed Channels
- select helps combine data from multiple sources but requires proper handling of closed channels.
- Reading from a closed channel always succeeds, returning the zero value.
- If not handled, your program may waste time processing junk values.
Using nil Channels to Disable Cases
- Reading from or writing to a nil channel causes the program to block indefinitely.
- You can use this behavior to disable a case in select by setting a closed channel to nil.
- This prevents the case from being randomly selected and stops unnecessary reads.
Example Implementation
- Use a counter to track how many channels are closed.
- When a read operation detects a closed channel (ok == false), set the channel variable to nil.
- This ensures the case never runs again, preventing unnecessary processing.
Use WaitGroupsπ
- Purpose of sync.WaitGroup
- Used when one goroutine needs to wait for multiple goroutines to complete.
- Unlike context cancellation (which works for a single goroutine), WaitGroup manages multiple concurrent tasks.
- Key Methods in sync.WaitGroup
- Add(n): Increments the counter by n, indicating n goroutines will be waited on.
- Done(): Decrements the counter, called via defer in each goroutine.
- Wait(): Blocks execution until the counter reaches zero.
- Implementation Details
- sync.WaitGroup does not require initialization; its zero value is ready to use.
- Ensure all goroutines share the same WaitGroup instance (use closures instead of passing by value).
- Helps manage closing channels when multiple goroutines write to the same channel.
- Example: Processing Data with WaitGroup
- Uses multiple worker goroutines to process data from an input channel.
- A monitoring goroutine waits for all workers to finish before closing the output channel.
- Ensures out is closed only once, preventing data race issues.
func main() {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
doThing1()
}()
go func() {
defer wg.Done()
doThing2()
}()
go func() {
defer wg.Done()
doThing3()
}()
wg.Wait()
}
func processAndGather[T, R any](in <-chan T, processor func(T) R, num int) []R {
out := make(chan R, num)
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
defer wg.Done()
for v := range in {
out <- processor(v)
}
}()
}
go func() {
wg.Wait()
close(out)
}()
var result []R
for v := range out {
result = append(result, v)
}
return result
}
- Best Practices
- Use WaitGroup only when necessary (e.g., closing shared resources like channels).
- Prefer higher-level concurrency patterns when applicable.
- Alternative: errgroup.Group
- Found in golang.org/x/sync/errgroup.
- Extends WaitGroup by allowing goroutines to return errors.
- Stops all processing if any goroutine encounters an error.
Run Code Exactly Onceπ
init
should be reserved for initialization of effectively immutable package-level state.- Use sync.Once to ensure a function executes only once, even if called multiple times.
- Go 1.21 introduced sync.OnceFunc, sync.OnceValue, and sync.OnceValues to simplify this.
Put Your Concurrent Tools Togetherπ
- Example pipeline: Call services A & B in parallel β Combine results β Call service C.
- Uses context.Context to enforce a 50ms timeout.
- Uses goroutines and channels for concurrency.
func GatherAndProcess(ctx context.Context, data Input) (COut, error) {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel()
ab := newABProcessor()
ab.start(ctx, data)
inputC, err := ab.wait(ctx)
if err != nil {
return COut{}, err
}
c := newCProcessor()
c.start(ctx, inputC)
out, err := c.wait(ctx)
return out, err
}
// notice how ab API doesn't expose concurrency to driver program
type abProcessor struct {
outA chan aOut
outB chan bOut
errs chan error
}
func newABProcessor() *abProcessor {
return &abProcessor{
outA: make(chan aOut, 1),
outB: make(chan bOut, 1),
errs: make(chan error, 2),
}
}
- abProcessor handles concurrent calls to services A & B:
- abProcessor handles concurrent calls to services A & B:
- Implements start (launch goroutines) and wait (aggregate results).
- cProcessor is a simpler version handling service C.
func (p *abProcessor) start(ctx context.Context, data Input) {
go func() {
aOut, err := getResultA(ctx, data.A)
if err != nil {
p.errs <- err
return
}
p.outA <- aOut
}()
go func() {
bOut, err := getResultB(ctx, data.B)
if err != nil {
p.errs <- err
return
}
p.outB <- bOut
}()
}
func (p *abProcessor) wait(ctx context.Context) (cIn, error) {
var cData cIn
for count := 0; count < 2; count++ {
select {
case a := <-p.outA:
cData.a = a
case b := <-p.outB:
cData.b = b
case err := <-p.errs:
return cIn{}, err
case <-ctx.Done():
return cIn{}, ctx.Err()
}
}
return cData, nil
}
- C-processor implementation:
type cProcessor struct {
outC chan COut
errs chan error
}
func newCProcessor() *cProcessor {
return &cProcessor{
outC: make(chan COut, 1),
errs: make(chan error, 1),
}
}
func (p *cProcessor) start(ctx context.Context, inputC cIn) {
go func() {
cOut, err := getResultC(ctx, inputC)
if err != nil {
p.errs <- err
return
}
p.outC <- cOut
}()
}
func (p *cProcessor) wait(ctx context.Context) (COut, error) {
select {
case out := <-p.outC:
return out, nil
case err := <-p.errs:
return COut{}, err
case <-ctx.Done():
return COut{}, ctx.Err()
}
}
When to Use Mutexes Instead of Channelsπ
- Channels are preferred for managing concurrent data flow.
- Mutex (sync.Mutex) is better for shared state like an in-memory scoreboard.
- sync.RWMutex allows multiple readers but only one writer at a time.
When to Use Channels vs. Mutexes
- Use Channels: When passing values between goroutines.
- Use Mutexes: When managing access to shared fields in structs.
- Use Mutexes only if performance is an issue with channels.
Atomicsπ
- The
sync/atomic
package provides access to the atomic variable operations built into modern CPUs to add, swap, load, store, or compare and swap (CAS) a value that fits into a single register.
Moreπ
- Read: Concurrency in Go for learning more about this chapter.