Compare commits

2 Commits

Author SHA1 Message Date
eyedeekay
979c70a3d3 Fix race condition in Accept method using atomic operations 2025-07-17 21:44:57 -04:00
eyedeekay
0363467080 Fix data race in listener removal using channel-based coordination 2025-07-17 21:32:06 -04:00
6 changed files with 237 additions and 22 deletions

View File

@@ -42,9 +42,12 @@ func (ml *MetaListener) handleListener(id string, listener net.Listener) {
}
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
ml.mu.Lock()
delete(ml.listeners, id)
ml.mu.Unlock()
select {
case ml.removeListenerCh <- id:
// Successfully signaled for removal
case <-ml.closeCh:
// MetaListener is closing, no need to signal removal
}
return
}

View File

@@ -3,18 +3,16 @@ package meta
import (
"fmt"
"net"
"sync/atomic"
)
// Accept implements the net.Listener Accept method.
// It returns the next connection from any of the managed listeners.
func (ml *MetaListener) Accept() (net.Conn, error) {
// Check if already closed before entering the select loop
ml.mu.RLock()
if ml.isClosed {
ml.mu.RUnlock()
if atomic.LoadInt64(&ml.isClosed) != 0 {
return nil, ErrListenerClosed
}
ml.mu.RUnlock()
for {
select {
@@ -25,9 +23,8 @@ func (ml *MetaListener) Accept() (net.Conn, error) {
// Access RemoteAddr() directly on the connection
return result, nil
case <-ml.closeCh:
// Double-check the closed state under lock to ensure consistency
closed := ml.isClosed
if closed {
// Double-check the closed state using atomic operation
if atomic.LoadInt64(&ml.isClosed) != 0 {
return nil, ErrListenerClosed
}
continue
@@ -38,15 +35,13 @@ func (ml *MetaListener) Accept() (net.Conn, error) {
// Close implements the net.Listener Close method.
// It closes all managed listeners and releases resources.
func (ml *MetaListener) Close() error {
ml.mu.Lock()
if ml.isClosed {
ml.mu.Unlock()
// Use atomic compare-and-swap to ensure we only close once
if !atomic.CompareAndSwapInt64(&ml.isClosed, 0, 1) {
return nil
}
ml.mu.Lock()
log.Printf("Closing MetaListener with %d listeners", len(ml.listeners))
ml.isClosed = true
// Signal all goroutines to stop
close(ml.closeCh)

5
log Normal file
View File

@@ -0,0 +1,5 @@
? github.com/go-i2p/go-meta-listener [no test files]
? github.com/go-i2p/go-meta-listener/example [no test files]
? github.com/go-i2p/go-meta-listener/mirror [no test files]
? github.com/go-i2p/go-meta-listener/mirror/metaproxy [no test files]
ok github.com/go-i2p/go-meta-listener/tcp 0.079s

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"github.com/samber/oops"
)
@@ -28,8 +29,10 @@ type MetaListener struct {
connCh chan ConnResult
// closeCh signals all goroutines to stop
closeCh chan struct{}
// isClosed indicates whether the meta listener has been closed
isClosed bool
// removeListenerCh is used to signal listener removal from handlers
removeListenerCh chan string
// isClosed indicates whether the meta listener has been closed (atomic)
isClosed int64
// mu protects concurrent access to the listener's state
mu sync.RWMutex
}
@@ -42,11 +45,17 @@ type ConnResult struct {
// NewMetaListener creates a new MetaListener instance ready to manage multiple listeners.
func NewMetaListener() *MetaListener {
return &MetaListener{
listeners: make(map[string]net.Listener),
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
closeCh: make(chan struct{}),
ml := &MetaListener{
listeners: make(map[string]net.Listener),
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
closeCh: make(chan struct{}),
removeListenerCh: make(chan string, 10), // Buffer for listener removal signals
}
// Start the listener management goroutine
go ml.manageListeners()
return ml
}
// AddListener adds a new listener with the specified ID.
@@ -60,7 +69,7 @@ func (ml *MetaListener) AddListener(id string, listener net.Listener) error {
ml.mu.Lock()
defer ml.mu.Unlock()
if ml.isClosed {
if atomic.LoadInt64(&ml.isClosed) != 0 {
return ErrListenerClosed
}
@@ -133,3 +142,21 @@ func (ml *MetaListener) WaitForShutdown(ctx context.Context) error {
return ctx.Err()
}
}
// manageListeners handles listener removal signals from handler goroutines
func (ml *MetaListener) manageListeners() {
for {
select {
case <-ml.closeCh:
return
case id := <-ml.removeListenerCh:
ml.mu.Lock()
if listener, exists := ml.listeners[id]; exists {
listener.Close()
delete(ml.listeners, id)
log.Printf("Listener %s removed due to permanent error", id)
}
ml.mu.Unlock()
}
}
}

185
metalistener_test.go Normal file
View File

@@ -0,0 +1,185 @@
package meta
import (
"fmt"
"net"
"sync"
"testing"
"time"
)
// mockListener is a test listener that can simulate errors
type mockListener struct {
addr net.Addr
connCh chan net.Conn
closeCh chan struct{}
closed bool
mu sync.Mutex
errorMode bool
}
func newMockListener(addr string) *mockListener {
return &mockListener{
addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8080},
connCh: make(chan net.Conn, 1),
closeCh: make(chan struct{}),
}
}
func (m *mockListener) Accept() (net.Conn, error) {
m.mu.Lock()
errorMode := m.errorMode
m.mu.Unlock()
if errorMode {
return nil, fmt.Errorf("permanent error")
}
select {
case conn := <-m.connCh:
return conn, nil
case <-m.closeCh:
return nil, fmt.Errorf("listener closed")
}
}
func (m *mockListener) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return nil
}
m.closed = true
close(m.closeCh)
return nil
}
func (m *mockListener) Addr() net.Addr {
return m.addr
}
func (m *mockListener) setErrorMode(errorMode bool) {
m.mu.Lock()
m.errorMode = errorMode
m.mu.Unlock()
}
// TestListenerRemovalRace tests that listener removal doesn't cause race conditions
func TestListenerRemovalRace(t *testing.T) {
ml := NewMetaListener()
defer ml.Close()
// Add multiple listeners
listener1 := newMockListener("127.0.0.1:8080")
listener2 := newMockListener("127.0.0.1:8081")
err := ml.AddListener("test1", listener1)
if err != nil {
t.Fatalf("Failed to add listener1: %v", err)
}
err = ml.AddListener("test2", listener2)
if err != nil {
t.Fatalf("Failed to add listener2: %v", err)
}
// Verify both listeners are present
if ml.Count() != 2 {
t.Errorf("Expected 2 listeners, got %d", ml.Count())
}
// Simulate permanent error in listener1
listener1.setErrorMode(true)
// Wait for listener to be removed due to error
time.Sleep(100 * time.Millisecond)
// Verify listener1 was removed
if ml.Count() != 1 {
t.Errorf("Expected 1 listener after error, got %d", ml.Count())
}
// Verify we can still use RemoveListener on the remaining listener
err = ml.RemoveListener("test2")
if err != nil {
t.Errorf("Failed to remove listener2: %v", err)
}
if ml.Count() != 0 {
t.Errorf("Expected 0 listeners after removal, got %d", ml.Count())
}
}
// TestConcurrentListenerAccess tests concurrent access to listener map
func TestConcurrentListenerAccess(t *testing.T) {
ml := NewMetaListener()
defer ml.Close()
var wg sync.WaitGroup
// Add multiple listeners concurrently
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
listener := newMockListener(fmt.Sprintf("127.0.0.1:%d", 8080+id))
err := ml.AddListener(fmt.Sprintf("test%d", id), listener)
if err != nil {
t.Errorf("Failed to add listener%d: %v", id, err)
}
}(i)
}
// Concurrently check listener count
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ml.Count()
ml.ListenerIDs()
}()
}
wg.Wait()
if ml.Count() != 10 {
t.Errorf("Expected 10 listeners, got %d", ml.Count())
}
}
// TestAcceptRaceCondition tests that Accept() method doesn't have race conditions
func TestAcceptRaceCondition(t *testing.T) {
ml := NewMetaListener()
// Add a listener
listener := newMockListener("127.0.0.1:8080")
err := ml.AddListener("test", listener)
if err != nil {
t.Fatalf("Failed to add listener: %v", err)
}
var wg sync.WaitGroup
// Start multiple goroutines calling Accept()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := ml.Accept()
// We expect either a valid connection or ErrListenerClosed
if err != nil && err.Error() != ErrListenerClosed.Error() {
t.Errorf("Unexpected error from Accept(): %v", err)
}
}()
}
// Concurrently close the listener
go func() {
time.Sleep(10 * time.Millisecond)
ml.Close()
}()
wg.Wait()
}

BIN
mp Executable file

Binary file not shown.