From 2a0445607898aabc0ef04c1e9eb0e21ae82cc41b Mon Sep 17 00:00:00 2001 From: appleboy Date: Sun, 13 Apr 2025 11:14:03 +0800 Subject: [PATCH 1/9] docs: optimize Queue library for CPU usage and performance - Clarify the description of the Queue Golang library to emphasize utilizing the full CPU capacity. Signed-off-by: appleboy --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 926b94b..33aa89c 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [繁體中文](./README.zh-tw.md) | [简体中文](./README.zh-cn.md) -Queue is a Golang library that helps you create and manage a pool of Goroutines (lightweight threads). It allows you to efficiently run multiple tasks in parallel, utilizing the CPU capacity of your machine. +Queue is a Golang library designed to help you create and manage a pool of Goroutines (lightweight threads). It allows you to efficiently run multiple tasks in parallel, utilizing the full CPU capacity of your machine. ## Features From 0481dfd24f0468c6b3c857395ae67098935c5159 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 17 Apr 2025 18:13:21 +0800 Subject: [PATCH 2/9] docs: improve and expand documentation for queue and worker internals - Add comprehensive package-level and function-level documentation to clarify queue behavior, worker management, and shutdown procedures - Replace brief or missing comments with detailed explanations for struct fields and internal logic - Improve clarity of retry, scheduling, and worker lifecycle mechanisms through enhanced inline comments - Refactor comments to use Go-style doc comments and block comments for exported types and functions Signed-off-by: Bo-Yi Wu --- queue.go | 164 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 67 deletions(-) diff --git a/queue.go b/queue.go index 836de8a..0fe4161 100644 --- a/queue.go +++ b/queue.go @@ -1,5 +1,8 @@ package queue +// Package queue provides a high-performance, extensible message queue implementation +// supporting multiple workers, job retries, dynamic scaling, and graceful shutdown. + import ( "context" "errors" @@ -13,45 +16,52 @@ import ( "github.com/jpillora/backoff" ) -// ErrQueueShutdown the queue is released and closed. +/* +ErrQueueShutdown is returned when an operation is attempted on a queue +that has already been closed and released. +*/ var ErrQueueShutdown = errors.New("queue has been closed and released") type ( - // A Queue is a message queue. + // Queue represents a message queue with worker management, job scheduling, + // retry logic, and graceful shutdown capabilities. Queue struct { - sync.Mutex - metric *metric - logger Logger - workerCount int64 - routineGroup *routineGroup - quit chan struct{} - ready chan struct{} - notify chan struct{} - worker core.Worker - stopOnce sync.Once - stopFlag int32 - afterFn func() - retryInterval time.Duration + sync.Mutex // Mutex to protect concurrent access to queue state + metric *metric // Metrics collector for tracking queue and worker stats + logger Logger // Logger for queue events and errors + workerCount int64 // Number of worker goroutines to process jobs + routineGroup *routineGroup // Group to manage and wait for goroutines + quit chan struct{} // Channel to signal shutdown to all goroutines + ready chan struct{} // Channel to signal worker readiness + notify chan struct{} // Channel to notify workers of new jobs + worker core.Worker // The worker implementation that processes jobs + stopOnce sync.Once // Ensures shutdown is only performed once + stopFlag int32 // Atomic flag indicating if shutdown has started + afterFn func() // Optional callback after each job execution + retryInterval time.Duration // Interval for retrying job requests } ) -// ErrMissingWorker missing define worker +/* +ErrMissingWorker is returned when a queue is created without a worker implementation. +*/ var ErrMissingWorker = errors.New("missing worker module") -// NewQueue returns a Queue. +// NewQueue creates and returns a new Queue instance with the provided options. +// Returns an error if no worker is specified. func NewQueue(opts ...Option) (*Queue, error) { o := NewOptions(opts...) q := &Queue{ - routineGroup: newRoutineGroup(), - quit: make(chan struct{}), - ready: make(chan struct{}, 1), - notify: make(chan struct{}, 1), - workerCount: o.workerCount, - logger: o.logger, - worker: o.worker, - metric: &metric{}, - afterFn: o.afterFn, - retryInterval: o.retryInterval, + routineGroup: newRoutineGroup(), // Manages all goroutines spawned by the queue + quit: make(chan struct{}), // Signals shutdown to all goroutines + ready: make(chan struct{}, 1), // Signals when a worker is ready to process a job + notify: make(chan struct{}, 1), // Notifies workers of new jobs + workerCount: o.workerCount, // Number of worker goroutines + logger: o.logger, // Logger for queue events + worker: o.worker, // Worker implementation + metric: &metric{}, // Metrics collector + afterFn: o.afterFn, // Optional post-job callback + retryInterval: o.retryInterval, // Interval for retrying job requests } if q.worker == nil { @@ -61,7 +71,8 @@ func NewQueue(opts ...Option) (*Queue, error) { return q, nil } -// Start to enable all worker +// Start launches all worker goroutines and begins processing jobs. +// If workerCount is zero, Start is a no-op. func (q *Queue) Start() { q.Lock() count := q.workerCount @@ -74,7 +85,9 @@ func (q *Queue) Start() { }) } -// Shutdown stops all queues. +// Shutdown initiates a graceful shutdown of the queue. +// It signals all goroutines to stop, shuts down the worker, and closes the quit channel. +// Shutdown is idempotent and safe to call multiple times. func (q *Queue) Shutdown() { if !atomic.CompareAndSwapInt32(&q.stopFlag, 0, 1) { return @@ -92,55 +105,59 @@ func (q *Queue) Shutdown() { }) } -// Release for graceful shutdown. +// Release performs a graceful shutdown and waits for all goroutines to finish. func (q *Queue) Release() { q.Shutdown() q.Wait() } -// BusyWorkers returns the numbers of workers in the running process. +// BusyWorkers returns the number of workers currently processing jobs. func (q *Queue) BusyWorkers() int64 { return q.metric.BusyWorkers() } -// BusyWorkers returns the numbers of success tasks. +// SuccessTasks returns the number of successfully completed tasks. func (q *Queue) SuccessTasks() uint64 { return q.metric.SuccessTasks() } -// BusyWorkers returns the numbers of failure tasks. +// FailureTasks returns the number of failed tasks. func (q *Queue) FailureTasks() uint64 { return q.metric.FailureTasks() } -// BusyWorkers returns the numbers of submitted tasks. +// SubmittedTasks returns the number of tasks submitted to the queue. func (q *Queue) SubmittedTasks() uint64 { return q.metric.SubmittedTasks() } -// CompletedTasks returns the numbers of completed tasks. +// CompletedTasks returns the total number of completed tasks (success + failure). func (q *Queue) CompletedTasks() uint64 { return q.metric.CompletedTasks() } -// Wait all process +// Wait blocks until all goroutines in the routine group have finished. func (q *Queue) Wait() { q.routineGroup.Wait() } -// Queue to queue single job with binary +// Queue enqueues a single job (core.QueuedMessage) into the queue. +// Accepts job options for customization. func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error { data := job.NewMessage(message, opts...) return q.queue(&data) } -// QueueTask to queue single task +// QueueTask enqueues a single task function into the queue. +// Accepts job options for customization. func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error { data := job.NewTask(task, opts...) return q.queue(&data) } +// queue is an internal helper to enqueue a job.Message into the worker. +// It increments the submitted task metric and notifies workers if possible. func (q *Queue) queue(m *job.Message) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown @@ -151,9 +168,8 @@ func (q *Queue) queue(m *job.Message) error { } q.metric.IncSubmittedTask() - // notify worker - // if the channel is full, it means that the worker is busy - // and we don't want to block the main thread + // Notify a worker that a new job is available. + // If the notify channel is full, the worker is busy and we avoid blocking. select { case q.notify <- struct{}{}: default: @@ -162,10 +178,11 @@ func (q *Queue) queue(m *job.Message) error { return nil } +// work executes a single task, handling panics and updating metrics accordingly. +// After execution, it schedules the next worker if needed. func (q *Queue) work(task core.TaskMessage) { var err error - // to handle panic cases from inside the worker - // in such case, we start a new goroutine + // Defer block to handle panics, update metrics, and run afterFn callback. defer func() { q.metric.DecBusyWorker() e := recover() @@ -174,7 +191,7 @@ func (q *Queue) work(task core.TaskMessage) { } q.schedule() - // increase success or failure number + // Update success or failure metrics based on execution result. if err == nil && e == nil { q.metric.IncSuccessTask() } else { @@ -190,6 +207,8 @@ func (q *Queue) work(task core.TaskMessage) { } } +// run dispatches the task to the appropriate handler based on its type. +// Returns an error if the task type is invalid. func (q *Queue) run(task core.TaskMessage) error { switch t := task.(type) { case *job.Message: @@ -199,8 +218,11 @@ func (q *Queue) run(task core.TaskMessage) error { } } +// handle executes a job.Message, supporting retries, timeouts, and panic recovery. +// Returns an error if the job fails or times out. func (q *Queue) handle(m *job.Message) error { - // create channel with buffer size 1 to avoid goroutine leak + // done: receives the result of the job execution + // panicChan: receives any panic that occurs in the job goroutine done := make(chan error, 1) panicChan := make(chan any, 1) startTime := time.Now() @@ -209,18 +231,18 @@ func (q *Queue) handle(m *job.Message) error { cancel() }() - // run the job + // Run the job in a separate goroutine to support timeout and panic recovery. go func() { - // handle panic issue + // Defer block to catch panics and send to panicChan defer func() { if p := recover(); p != nil { panicChan <- p } }() - // run custom process function var err error + // Set up backoff for retry logic b := &backoff.Backoff{ Min: m.RetryMin, Max: m.RetryMax, @@ -230,26 +252,28 @@ func (q *Queue) handle(m *job.Message) error { delay := m.RetryDelay loop: for { + // If a custom Task function is provided, use it; otherwise, use the worker's Run method. if m.Task != nil { err = m.Task(ctx) } else { err = q.worker.Run(ctx, m) } - // check error and retry count + // If no error or no retries left, exit loop. if err == nil || m.RetryCount == 0 { break } m.RetryCount-- + // If no fixed retry delay, use backoff. if m.RetryDelay == 0 { delay = b.Duration() } select { - case <-time.After(delay): // retry delay + case <-time.After(delay): // Wait before retrying q.logger.Infof("retry remaining times: %d, delay time: %s", m.RetryCount, delay) - case <-ctx.Done(): // timeout reached + case <-ctx.Done(): // Timeout reached err = ctx.Err() break loop } @@ -261,28 +285,27 @@ func (q *Queue) handle(m *job.Message) error { select { case p := <-panicChan: panic(p) - case <-ctx.Done(): // timeout reached + case <-ctx.Done(): // Timeout reached return ctx.Err() - case <-q.quit: // shutdown service - // cancel job + case <-q.quit: // Queue is shutting down + // Cancel job and wait for remaining time or job completion cancel() - leftTime := m.Timeout - time.Since(startTime) - // wait job select { case <-time.After(leftTime): return context.DeadlineExceeded - case err := <-done: // job finish + case err := <-done: // Job finished return err case p := <-panicChan: panic(p) } - case err := <-done: // job finish + case err := <-done: // Job finished return err } } -// UpdateWorkerCount to update worker number dynamically. +// UpdateWorkerCount dynamically updates the number of worker goroutines. +// Triggers scheduling to adjust to the new worker count. func (q *Queue) UpdateWorkerCount(num int64) { q.Lock() q.workerCount = num @@ -290,7 +313,8 @@ func (q *Queue) UpdateWorkerCount(num int64) { q.schedule() } -// schedule to check worker number +// schedule checks if more workers can be started based on the current busy count. +// If so, it signals readiness to start a new worker. func (q *Queue) schedule() { q.Lock() defer q.Unlock() @@ -304,24 +328,30 @@ func (q *Queue) schedule() { } } -// start to start all worker +/* +start launches the main worker loop, which manages job scheduling and execution. + +- It uses a ticker to periodically retry job requests if the queue is empty. +- For each available worker slot, it requests a new task from the worker. +- If a task is available, it is sent to the tasks channel and processed by a new goroutine. +- The loop exits when the quit channel is closed. +*/ func (q *Queue) start() { tasks := make(chan core.TaskMessage, 1) ticker := time.NewTicker(q.retryInterval) defer ticker.Stop() for { - // check worker number + // Ensure the number of busy workers does not exceed the configured worker count. q.schedule() select { - // wait worker ready - case <-q.ready: - case <-q.quit: + case <-q.ready: // Wait for a worker slot to become available + case <-q.quit: // Shutdown signal received return } - // request task from queue in background + // Request a task from the worker in a background goroutine. q.routineGroup.Run(func() { for { t, err := q.worker.Request() @@ -359,7 +389,7 @@ func (q *Queue) start() { return } - // start new task + // Start processing the new task in a separate goroutine. q.metric.IncBusyWorker() q.routineGroup.Run(func() { q.work(task) From ad38a4cef39271fb599ac83afcd1599f2e32ae89 Mon Sep 17 00:00:00 2001 From: appleboy Date: Tue, 13 May 2025 22:32:42 +0800 Subject: [PATCH 3/9] chore: update Go version and mock dependency - Update Go version requirement from 1.22 to 1.23 - Bump go.uber.org/mock dependency from v0.5.1 to v0.5.2 Signed-off-by: appleboy --- go.mod | 4 ++-- go.sum | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 486a9f1..a8040bc 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,13 @@ module github.com/golang-queue/queue -go 1.22 +go 1.23 require ( github.com/appleboy/com v0.3.0 github.com/jpillora/backoff v1.0.0 github.com/stretchr/testify v1.10.0 go.uber.org/goleak v1.3.0 - go.uber.org/mock v0.5.1 + go.uber.org/mock v0.5.2 ) require ( diff --git a/go.sum b/go.sum index ed3b6b2..c5a0e76 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/mock v0.5.1 h1:ASgazW/qBmR+A32MYFDB6E2POoTgOwT509VP0CT/fjs= -go.uber.org/mock v0.5.1/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= +go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko= +go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From db4474dc5f29a6ee862c77c4d852bdbf47e00268 Mon Sep 17 00:00:00 2001 From: appleboy Date: Tue, 13 May 2025 22:34:50 +0800 Subject: [PATCH 4/9] ci: update CI workflows for broader OS support and Go versioning - Update golangci-lint GitHub Action to use version v8 instead of v7 - Adjust test matrix to include macos-latest and windows-latest OS options - Remove Go 1.22 from the test matrix, testing only 1.23 and 1.24 - Add platform-specific go-build cache paths for macOS and Windows runners Signed-off-by: appleboy --- .github/workflows/go.yml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 4bfabb5..ab77d75 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -31,7 +31,7 @@ jobs: go-version-file: "go.mod" check-latest: true - name: Setup golangci-lint - uses: golangci/golangci-lint-action@v7 + uses: golangci/golangci-lint-action@v8 with: version: v2.0 args: --verbose @@ -44,11 +44,15 @@ jobs: test: strategy: matrix: - os: [ubuntu-latest] - go: [1.22, 1.23, 1.24] + os: [ubuntu-latest, macos-latest, windows-latest] + go: [1.23, 1.24] include: - os: ubuntu-latest go-build: ~/.cache/go-build + - os: macos-latest + go-build: ~/Library/Caches/go-build + - os: windows-latest + go-build: C:\Users\runneradmin\AppData\Local\go-build name: ${{ matrix.os }} @ Go ${{ matrix.go }} runs-on: ${{ matrix.os }} env: From 528e07469460c79ad4fa86835f96beaf14a82654 Mon Sep 17 00:00:00 2001 From: appleboy Date: Tue, 13 May 2025 22:35:40 +0800 Subject: [PATCH 5/9] ci: upgrade golangci-lint GitHub Action to v2.1 - Update golangci-lint action to use version v2.1 instead of v2.0 in the GitHub Actions workflow Signed-off-by: appleboy --- .github/workflows/go.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index ab77d75..670bf71 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -33,7 +33,7 @@ jobs: - name: Setup golangci-lint uses: golangci/golangci-lint-action@v8 with: - version: v2.0 + version: v2.1 args: --verbose - name: Bearer From aa195af84384b925023951b1c35585dc05795151 Mon Sep 17 00:00:00 2001 From: appleboy Date: Tue, 13 May 2025 22:36:15 +0800 Subject: [PATCH 6/9] ci: drop Windows support from CI Go workflow - Remove Windows from the test matrix in the GitHub Actions Go workflow Signed-off-by: appleboy --- .github/workflows/go.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 670bf71..c40c952 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -44,15 +44,13 @@ jobs: test: strategy: matrix: - os: [ubuntu-latest, macos-latest, windows-latest] + os: [ubuntu-latest, macos-latest] go: [1.23, 1.24] include: - os: ubuntu-latest go-build: ~/.cache/go-build - os: macos-latest go-build: ~/Library/Caches/go-build - - os: windows-latest - go-build: C:\Users\runneradmin\AppData\Local\go-build name: ${{ matrix.os }} @ Go ${{ matrix.go }} runs-on: ${{ matrix.os }} env: From eac57a2d10beda8378ebabd51eecc821c8e01f48 Mon Sep 17 00:00:00 2001 From: appleboy Date: Tue, 13 May 2025 22:51:38 +0800 Subject: [PATCH 7/9] ci: restrict CI testing to Ubuntu platform only - Remove macOS from the CI test matrix, limiting tests to Ubuntu only Signed-off-by: appleboy --- .github/workflows/go.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c40c952..2544bd8 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -44,13 +44,11 @@ jobs: test: strategy: matrix: - os: [ubuntu-latest, macos-latest] + os: [ubuntu-latest] go: [1.23, 1.24] include: - os: ubuntu-latest go-build: ~/.cache/go-build - - os: macos-latest - go-build: ~/Library/Caches/go-build name: ${{ matrix.os }} @ Go ${{ matrix.go }} runs-on: ${{ matrix.os }} env: From 14d8b998f26a728feed7bdd8acdf73d5cbc34ac4 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 4 Jul 2025 20:57:34 +0800 Subject: [PATCH 8/9] chore(deps): bump github.com/appleboy/com from 0.3.0 to 0.4.0 (#149) Bumps [github.com/appleboy/com](https://github.com/appleboy/com) from 0.3.0 to 0.4.0. - [Release notes](https://github.com/appleboy/com/releases) - [Changelog](https://github.com/appleboy/com/blob/master/.goreleaser.yaml) - [Commits](https://github.com/appleboy/com/compare/v0.3.0...v0.4.0) --- updated-dependencies: - dependency-name: github.com/appleboy/com dependency-version: 0.4.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 4 ++-- go.sum | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index a8040bc..b366dad 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/golang-queue/queue -go 1.23 +go 1.23.0 require ( - github.com/appleboy/com v0.3.0 + github.com/appleboy/com v0.4.0 github.com/jpillora/backoff v1.0.0 github.com/stretchr/testify v1.10.0 go.uber.org/goleak v1.3.0 diff --git a/go.sum b/go.sum index c5a0e76..bb04007 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/appleboy/com v0.3.0 h1:omze/tJPyi2YVH+m23GSrCGt90A+4vQNpEYBW+GuSr4= -github.com/appleboy/com v0.3.0/go.mod h1:kByEI3/vzI5GM1+O5QdBHLsXaOsmFsJcOpCSgASi4sg= +github.com/appleboy/com v0.4.0 h1:z6Yx16K5nqnUrT5N40elwvcFFuCbIwJn80FeYXS1H4I= +github.com/appleboy/com v0.4.0/go.mod h1:IbC1mLvqcIYn2YVNJgAYB9XnhbUh1xYKsOzdEOy0n+c= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From bf195732db42186bf48ed9f0ce6b75705ae73908 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Tue, 5 Aug 2025 19:51:40 +0800 Subject: [PATCH 9/9] docs: document contributor workflows and queue library architecture - Add a CLAUDE.md file with development, testing, and build instructions for contributors using Claude Code - Document the core architecture, main abstractions, and key design patterns of the queue library - List supported worker implementations and entry points for queue usage - Include notes on code quality tools, security scanning, and testing practices Signed-off-by: Bo-Yi Wu --- CLAUDE.md | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..d9555da --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,65 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Development Commands + +### Testing + +- `go test -v` - Run all tests with verbose output +- `go test -race -v -covermode=atomic -coverprofile=coverage.out` - Run tests with race detection and coverage +- `go test -v -run=^$ -count 5 -benchmem -bench . ./...` - Run benchmarks with memory stats + +### Building + +- `go build` - Build the package +- `go mod download` - Download dependencies +- `go mod tidy` - Clean up go.mod and go.sum + +### Code Quality + +- Uses golangci-lint for linting (configured in CI) +- Bearer security scanning is enabled (config in bearer.yml) + +## Architecture Overview + +### Core Components + +The queue library is built around these key abstractions: + +1. **Queue** (`queue.go:28-42`) - Main queue implementation managing workers, job scheduling, retries, and graceful shutdown +2. **Worker Interface** (`core/worker.go:9-25`) - Abstraction for task processors with Run, Shutdown, Queue, and Request methods +3. **Ring Buffer** (`ring.go:14-26`) - Default in-memory worker implementation using circular buffer +4. **Job Messages** (`job/job.go:14-50`) - Task wrapper with retry logic, timeouts, and payload handling + +### Key Design Patterns + +- **Worker Pool**: Queue manages multiple goroutines processing tasks concurrently +- **Pluggable Workers**: Core Worker interface allows different backends (NSQ, NATS, Redis, etc.) +- **Graceful Shutdown**: Atomic flags and sync.Once ensure clean shutdown +- **Retry Logic**: Built-in exponential backoff with jitter for failed tasks +- **Metrics**: Built-in tracking of submitted, completed, success, and failure counts + +### Main Entry Points + +- `NewPool(size, opts...)` - Creates queue with Ring buffer worker, auto-starts +- `NewQueue(opts...)` - Creates queue with custom worker, requires manual Start() +- `Queue(message)` - Enqueue a QueuedMessage +- `QueueTask(func)` - Enqueue a function directly + +### Worker Implementations + +The library supports multiple queue backends through the Worker interface: + +- **Ring** (default) - In-memory circular buffer +- **NSQ** - Distributed messaging (external package) +- **NATS** - Cloud-native messaging (external package) +- **Redis** - Pub/Sub or Streams (external package) +- **RabbitMQ** - Message broker (external package) + +## Testing Notes + +- Uses testify for assertions (`github.com/stretchr/testify`) +- Includes benchmark tests in `benchmark_test.go` and `job/benchmark_test.go` +- Uses go.uber.org/goleak to detect goroutine leaks +- Mock interfaces generated with go.uber.org/mock in `mocks/` directory