mirror of
https://github.com/go-i2p/go-meta-listener.git
synced 2025-09-05 02:47:46 -04:00
Compare commits
2 Commits
837850ac99
...
979c70a3d3
Author | SHA1 | Date | |
---|---|---|---|
![]() |
979c70a3d3 | ||
![]() |
0363467080 |
@@ -42,9 +42,12 @@ func (ml *MetaListener) handleListener(id string, listener net.Listener) {
|
||||
}
|
||||
|
||||
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
|
||||
ml.mu.Lock()
|
||||
delete(ml.listeners, id)
|
||||
ml.mu.Unlock()
|
||||
select {
|
||||
case ml.removeListenerCh <- id:
|
||||
// Successfully signaled for removal
|
||||
case <-ml.closeCh:
|
||||
// MetaListener is closing, no need to signal removal
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
19
listener.go
19
listener.go
@@ -3,18 +3,16 @@ package meta
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Accept implements the net.Listener Accept method.
|
||||
// It returns the next connection from any of the managed listeners.
|
||||
func (ml *MetaListener) Accept() (net.Conn, error) {
|
||||
// Check if already closed before entering the select loop
|
||||
ml.mu.RLock()
|
||||
if ml.isClosed {
|
||||
ml.mu.RUnlock()
|
||||
if atomic.LoadInt64(&ml.isClosed) != 0 {
|
||||
return nil, ErrListenerClosed
|
||||
}
|
||||
ml.mu.RUnlock()
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -25,9 +23,8 @@ func (ml *MetaListener) Accept() (net.Conn, error) {
|
||||
// Access RemoteAddr() directly on the connection
|
||||
return result, nil
|
||||
case <-ml.closeCh:
|
||||
// Double-check the closed state under lock to ensure consistency
|
||||
closed := ml.isClosed
|
||||
if closed {
|
||||
// Double-check the closed state using atomic operation
|
||||
if atomic.LoadInt64(&ml.isClosed) != 0 {
|
||||
return nil, ErrListenerClosed
|
||||
}
|
||||
continue
|
||||
@@ -38,15 +35,13 @@ func (ml *MetaListener) Accept() (net.Conn, error) {
|
||||
// Close implements the net.Listener Close method.
|
||||
// It closes all managed listeners and releases resources.
|
||||
func (ml *MetaListener) Close() error {
|
||||
ml.mu.Lock()
|
||||
|
||||
if ml.isClosed {
|
||||
ml.mu.Unlock()
|
||||
// Use atomic compare-and-swap to ensure we only close once
|
||||
if !atomic.CompareAndSwapInt64(&ml.isClosed, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ml.mu.Lock()
|
||||
log.Printf("Closing MetaListener with %d listeners", len(ml.listeners))
|
||||
ml.isClosed = true
|
||||
|
||||
// Signal all goroutines to stop
|
||||
close(ml.closeCh)
|
||||
|
5
log
Normal file
5
log
Normal file
@@ -0,0 +1,5 @@
|
||||
? github.com/go-i2p/go-meta-listener [no test files]
|
||||
? github.com/go-i2p/go-meta-listener/example [no test files]
|
||||
? github.com/go-i2p/go-meta-listener/mirror [no test files]
|
||||
? github.com/go-i2p/go-meta-listener/mirror/metaproxy [no test files]
|
||||
ok github.com/go-i2p/go-meta-listener/tcp 0.079s
|
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/samber/oops"
|
||||
)
|
||||
@@ -28,8 +29,10 @@ type MetaListener struct {
|
||||
connCh chan ConnResult
|
||||
// closeCh signals all goroutines to stop
|
||||
closeCh chan struct{}
|
||||
// isClosed indicates whether the meta listener has been closed
|
||||
isClosed bool
|
||||
// removeListenerCh is used to signal listener removal from handlers
|
||||
removeListenerCh chan string
|
||||
// isClosed indicates whether the meta listener has been closed (atomic)
|
||||
isClosed int64
|
||||
// mu protects concurrent access to the listener's state
|
||||
mu sync.RWMutex
|
||||
}
|
||||
@@ -42,11 +45,17 @@ type ConnResult struct {
|
||||
|
||||
// NewMetaListener creates a new MetaListener instance ready to manage multiple listeners.
|
||||
func NewMetaListener() *MetaListener {
|
||||
return &MetaListener{
|
||||
listeners: make(map[string]net.Listener),
|
||||
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
|
||||
closeCh: make(chan struct{}),
|
||||
ml := &MetaListener{
|
||||
listeners: make(map[string]net.Listener),
|
||||
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
|
||||
closeCh: make(chan struct{}),
|
||||
removeListenerCh: make(chan string, 10), // Buffer for listener removal signals
|
||||
}
|
||||
|
||||
// Start the listener management goroutine
|
||||
go ml.manageListeners()
|
||||
|
||||
return ml
|
||||
}
|
||||
|
||||
// AddListener adds a new listener with the specified ID.
|
||||
@@ -60,7 +69,7 @@ func (ml *MetaListener) AddListener(id string, listener net.Listener) error {
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
|
||||
if ml.isClosed {
|
||||
if atomic.LoadInt64(&ml.isClosed) != 0 {
|
||||
return ErrListenerClosed
|
||||
}
|
||||
|
||||
@@ -133,3 +142,21 @@ func (ml *MetaListener) WaitForShutdown(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// manageListeners handles listener removal signals from handler goroutines
|
||||
func (ml *MetaListener) manageListeners() {
|
||||
for {
|
||||
select {
|
||||
case <-ml.closeCh:
|
||||
return
|
||||
case id := <-ml.removeListenerCh:
|
||||
ml.mu.Lock()
|
||||
if listener, exists := ml.listeners[id]; exists {
|
||||
listener.Close()
|
||||
delete(ml.listeners, id)
|
||||
log.Printf("Listener %s removed due to permanent error", id)
|
||||
}
|
||||
ml.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
185
metalistener_test.go
Normal file
185
metalistener_test.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package meta
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// mockListener is a test listener that can simulate errors
|
||||
type mockListener struct {
|
||||
addr net.Addr
|
||||
connCh chan net.Conn
|
||||
closeCh chan struct{}
|
||||
closed bool
|
||||
mu sync.Mutex
|
||||
errorMode bool
|
||||
}
|
||||
|
||||
func newMockListener(addr string) *mockListener {
|
||||
return &mockListener{
|
||||
addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8080},
|
||||
connCh: make(chan net.Conn, 1),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockListener) Accept() (net.Conn, error) {
|
||||
m.mu.Lock()
|
||||
errorMode := m.errorMode
|
||||
m.mu.Unlock()
|
||||
|
||||
if errorMode {
|
||||
return nil, fmt.Errorf("permanent error")
|
||||
}
|
||||
|
||||
select {
|
||||
case conn := <-m.connCh:
|
||||
return conn, nil
|
||||
case <-m.closeCh:
|
||||
return nil, fmt.Errorf("listener closed")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockListener) Close() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if m.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.closed = true
|
||||
close(m.closeCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockListener) Addr() net.Addr {
|
||||
return m.addr
|
||||
}
|
||||
|
||||
func (m *mockListener) setErrorMode(errorMode bool) {
|
||||
m.mu.Lock()
|
||||
m.errorMode = errorMode
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// TestListenerRemovalRace tests that listener removal doesn't cause race conditions
|
||||
func TestListenerRemovalRace(t *testing.T) {
|
||||
ml := NewMetaListener()
|
||||
defer ml.Close()
|
||||
|
||||
// Add multiple listeners
|
||||
listener1 := newMockListener("127.0.0.1:8080")
|
||||
listener2 := newMockListener("127.0.0.1:8081")
|
||||
|
||||
err := ml.AddListener("test1", listener1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add listener1: %v", err)
|
||||
}
|
||||
|
||||
err = ml.AddListener("test2", listener2)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add listener2: %v", err)
|
||||
}
|
||||
|
||||
// Verify both listeners are present
|
||||
if ml.Count() != 2 {
|
||||
t.Errorf("Expected 2 listeners, got %d", ml.Count())
|
||||
}
|
||||
|
||||
// Simulate permanent error in listener1
|
||||
listener1.setErrorMode(true)
|
||||
|
||||
// Wait for listener to be removed due to error
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Verify listener1 was removed
|
||||
if ml.Count() != 1 {
|
||||
t.Errorf("Expected 1 listener after error, got %d", ml.Count())
|
||||
}
|
||||
|
||||
// Verify we can still use RemoveListener on the remaining listener
|
||||
err = ml.RemoveListener("test2")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to remove listener2: %v", err)
|
||||
}
|
||||
|
||||
if ml.Count() != 0 {
|
||||
t.Errorf("Expected 0 listeners after removal, got %d", ml.Count())
|
||||
}
|
||||
}
|
||||
|
||||
// TestConcurrentListenerAccess tests concurrent access to listener map
|
||||
func TestConcurrentListenerAccess(t *testing.T) {
|
||||
ml := NewMetaListener()
|
||||
defer ml.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Add multiple listeners concurrently
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
listener := newMockListener(fmt.Sprintf("127.0.0.1:%d", 8080+id))
|
||||
err := ml.AddListener(fmt.Sprintf("test%d", id), listener)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to add listener%d: %v", id, err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Concurrently check listener count
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ml.Count()
|
||||
ml.ListenerIDs()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if ml.Count() != 10 {
|
||||
t.Errorf("Expected 10 listeners, got %d", ml.Count())
|
||||
}
|
||||
}
|
||||
|
||||
// TestAcceptRaceCondition tests that Accept() method doesn't have race conditions
|
||||
func TestAcceptRaceCondition(t *testing.T) {
|
||||
ml := NewMetaListener()
|
||||
|
||||
// Add a listener
|
||||
listener := newMockListener("127.0.0.1:8080")
|
||||
err := ml.AddListener("test", listener)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add listener: %v", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start multiple goroutines calling Accept()
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := ml.Accept()
|
||||
// We expect either a valid connection or ErrListenerClosed
|
||||
if err != nil && err.Error() != ErrListenerClosed.Error() {
|
||||
t.Errorf("Unexpected error from Accept(): %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Concurrently close the listener
|
||||
go func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
ml.Close()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
Reference in New Issue
Block a user