From 4d110b7e923d1622d8df7a4188b7b8f0c36b7d21 Mon Sep 17 00:00:00 2001 From: kensinchen Date: Sat, 11 Sep 2021 14:51:50 +0800 Subject: [PATCH 1/2] client cleaner --- client.go | 225 ++++++++++++++++++++++++++++------------- client_cleaner_test.go | 65 ++++++++++++ client_test.go | 2 +- 3 files changed, 219 insertions(+), 73 deletions(-) create mode 100644 client_cleaner_test.go diff --git a/client.go b/client.go index 832513fe25..64cc4a58c0 100644 --- a/client.go +++ b/client.go @@ -518,7 +518,8 @@ func (c *Client) Do(req *Request, resp *Response) error { c.mLock.Unlock() if startCleaner { - go c.mCleaner(m) + // Register client to cleaner + cCleaner.register(c) } return hc.Do(req, resp) @@ -539,37 +540,36 @@ func (c *Client) CloseIdleConnections() { c.mLock.Unlock() } -func (c *Client) mCleaner(m map[string]*HostClient) { - mustStop := false - - sleep := c.MaxIdleConnDuration - if sleep < time.Second { - sleep = time.Second - } else if sleep > 10*time.Second { - sleep = 10 * time.Second +// Clean HostClient in a map when the HostClient has no more connections. +func (c *Client) cleanHostClients(m map[string]*HostClient) { + c.mLock.Lock() + for k, v := range m { + // Lock HostClient, and delete it from map m where it has no connections. + v.connsLock.Lock() + fmt.Printf("Client check host client connsCount:%d, %p\n", v.connsCount, v) + if v.connsCount == 0 { + delete(m, k) + } + v.connsLock.Unlock() } + c.mLock.Unlock() +} - for { - c.mLock.Lock() - for k, v := range m { - v.connsLock.Lock() - shouldRemove := v.connsCount == 0 - v.connsLock.Unlock() - - if shouldRemove { - delete(m, k) - } - } - if len(m) == 0 { - mustStop = true - } - c.mLock.Unlock() +// Clean HostClient for Client. +func (c *Client) cleanResource() { + c.cleanHostClients(c.m) + c.cleanHostClients(c.ms) +} - if mustStop { - break - } - time.Sleep(sleep) +// Whether a Client has any HostClient. +func (c *Client) hasResource() bool { + c.mLock.Lock() + defer c.mLock.Unlock() + if len(c.m) > 0 || len(c.ms) > 0 { + fmt.Printf("client has resource, c.m:%d, c.ms:%d\n", len(c.m), len(c.ms)) + return true } + return false } // DefaultMaxConnsPerHost is the maximum number of concurrent connections @@ -1511,6 +1511,8 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) createConn := false startCleaner := false + fmt.Printf("accquireConn HostClient:%p\n", c) + var n int c.connsLock.Lock() n = len(c.conns) @@ -1522,9 +1524,9 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) if c.connsCount < maxConns { c.connsCount++ createConn = true - if !c.connsCleanerRun && !connectionClose { + // first time to create connection, need to start cleaner + if c.connsCount == 1 { startCleaner = true - c.connsCleanerRun = true } } } else { @@ -1536,6 +1538,7 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) c.connsLock.Unlock() if cc != nil { + fmt.Printf("accquireConn use exist connection, HostClient:%p\n", c) return cc, nil } if !createConn { @@ -1583,7 +1586,8 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) } if startCleaner { - go c.connsCleaner() + // Register to cleaner. + cCleaner.register(c) } conn, err := c.dialHostHard() @@ -1648,54 +1652,51 @@ func (c *HostClient) connsCleaner() { if maxIdleConnDuration <= 0 { maxIdleConnDuration = DefaultMaxIdleConnDuration } - for { - currentTime := time.Now() - // Determine idle connections to be closed. - c.connsLock.Lock() - conns := c.conns - n := len(conns) - i := 0 - for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration { - i++ - } - sleepFor := maxIdleConnDuration - if i < n { - // + 1 so we actually sleep past the expiration time and not up to it. - // Otherwise the > check above would still fail. - sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1 - } - scratch = append(scratch[:0], conns[:i]...) - if i > 0 { - m := copy(conns, conns[i:]) - for i = m; i < n; i++ { - conns[i] = nil - } - c.conns = conns[:m] - } - c.connsLock.Unlock() - - // Close idle connections. - for i, cc := range scratch { - c.closeConn(cc) - scratch[i] = nil - } + currentTime := time.Now() - // Determine whether to stop the connsCleaner. - c.connsLock.Lock() - mustStop := c.connsCount == 0 - if mustStop { - c.connsCleanerRun = false - } + // Determine idle connections to be closed. + c.connsLock.Lock() + conns := c.conns + n := len(conns) + i := 0 + for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration { + i++ + } + // If no connections need to be closed, unlock and finish. + if i <= 0 { c.connsLock.Unlock() - if mustStop { - break - } + return + } - time.Sleep(sleepFor) + // Connections to be closed. + scratch = append(scratch[:0], conns[:i]...) + m := copy(conns, conns[i:]) + for i = m; i < n; i++ { + conns[i] = nil + } + // Connections to be used. + c.conns = conns[:m] + c.connsLock.Unlock() + + // Close idle connections. + for i, cc := range scratch { + c.closeConn(cc) + scratch[i] = nil } } +// clean connection resources for HostClient. +func (c *HostClient) cleanResource() { + c.connsCleaner() +} + +func (c *HostClient) hasResource() bool { + cnt := c.ConnsCount() + fmt.Printf("HostClient has resource, cnt:%d\n", cnt) + return cnt > 0 +} + func (c *HostClient) closeConn(cc *clientConn) { c.decConnsCount() cc.c.Close() @@ -2907,3 +2908,83 @@ func releasePipelineWork(pool *sync.Pool, w *pipelineWork) { w.err = nil pool.Put(w) } + +// Resource Clean interface. +// resourceClean can register into clientCleaner for cleaning resources. +type resourceClean interface { + cleanResource() + hasResource() bool +} + +// Cleaner to clean resources. +// Client, HostClient register into cleaner. The Cleaner will check and delete client +// where it has no resources. +type clientCleaner struct { + initOnce sync.Once + // clients to clean + clients sync.Map +} + +// Global cleaner. +var cCleaner = &clientCleaner{} + +// Initialize cleaner. +func (c *clientCleaner) init() { + c.initOnce.Do(func() { + go c.cleaner() + }) +} + +// Clean clients +func (c *clientCleaner) cleaner() { + for { + c.cleanClient() + time.Sleep(10 * time.Second) + } +} + +// Register Client to cleaner. +func (c *clientCleaner) register(client resourceClean) { + if client == nil { + return + } + // Check if init + c.init() + + fmt.Printf("register client:%p\n", client) + + // Store client + c.clients.Store(client, struct{}{}) +} + +// Clean Client when the client has no resource. +func (c *clientCleaner) cleanClient() { + // Find all clients + allClients := make([]resourceClean, 0) + c.clients.Range(func (key, value interface{}) bool { + client := key.(resourceClean) + allClients = append(allClients, client) + return true + }) + + fmt.Printf("number of clients:%d\n", len(allClients)) + + // Clean connections for each client. + for _, client := range allClients { + //Clean resource. + fmt.Printf("check client:%p\n", client) + client.cleanResource() + // If a client has no resource, delete it from c.clients. + // But client may create resource before deleted, + // so check again. If it has any resource, store it into c.clients. + if !client.hasResource() { + fmt.Printf("client has no resource:%p\n", client) + c.clients.Delete(client) + if client.hasResource() { + c.clients.Store(client, struct{}{}) + } else { + fmt.Printf("delete client:%p\n", client) + } + } + } +} diff --git a/client_cleaner_test.go b/client_cleaner_test.go new file mode 100644 index 0000000000..b4f5d11f5d --- /dev/null +++ b/client_cleaner_test.go @@ -0,0 +1,65 @@ +package fasthttp + +import ( + "testing" +) + +type cleanItem struct { + used int + free int +} + +func (ci *cleanItem) cleanResource() { + ci.free = 0 +} + +func (ci *cleanItem) hasResource() bool { + total := ci.used + ci.free + return total > 0 +} + +func TestClientCleaner(t *testing.T) { + c1 := &cleanItem{used: 1, free: 0} + c2 := &cleanItem{used: 2, free: 0} + + exists := func(ci *cleanItem) bool { + var item resourceClean = ci + _, ok := cCleaner.clients.Load(item) + return ok + } + + // test register + cCleaner.register(c1) + cCleaner.register(c2) + if !exists(c1) { + t.Errorf("clientCleaner error, item register but not exist.") + } + if !exists(c2) { + t.Errorf("clientCleaner error, item register but not exist.") + } + + // test clean + c1.used = 0 + cCleaner.cleanClient() + if exists(c1) { + t.Errorf("clientCleaner error, item has no resource but not deleted") + } + + // test duplicate register + c1.used, c1.free = 0, 3 + cCleaner.register(c1) + cCleaner.register(c1) + if !exists(c1) { + t.Errorf("clientCleaner error, item register but not exist.") + } + + // test clean and delete + cCleaner.cleanClient() + if exists(c1) { + t.Errorf("clientCleaner error, item has no resource but not deleted") + } + + if !exists(c2) { + t.Errorf("clientCleaner error, item register but not exist.") + } +} \ No newline at end of file diff --git a/client_test.go b/client_test.go index 08f52a694d..9c82c826ec 100644 --- a/client_test.go +++ b/client_test.go @@ -2815,4 +2815,4 @@ func TestHostClientMaxConnWaitTimeoutWithEarlierDeadline(t *testing.T) { if emptyBodyCount > 0 { t.Fatalf("at least one request body was empty") } -} +} \ No newline at end of file From c819d729b891915fdb0dd57d1c0bbfb615df763c Mon Sep 17 00:00:00 2001 From: kensinchen Date: Sat, 11 Sep 2021 15:52:51 +0800 Subject: [PATCH 2/2] clear log --- client.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/client.go b/client.go index 3aad3409f8..3bfbb3462f 100644 --- a/client.go +++ b/client.go @@ -546,7 +546,6 @@ func (c *Client) cleanHostClients(m map[string]*HostClient) { for k, v := range m { // Lock HostClient, and delete it from map m where it has no connections. v.connsLock.Lock() - fmt.Printf("Client check host client connsCount:%d, %p\n", v.connsCount, v) if v.connsCount == 0 { delete(m, k) } @@ -566,7 +565,6 @@ func (c *Client) hasResource() bool { c.mLock.Lock() defer c.mLock.Unlock() if len(c.m) > 0 || len(c.ms) > 0 { - fmt.Printf("client has resource, c.m:%d, c.ms:%d\n", len(c.m), len(c.ms)) return true } return false @@ -1511,8 +1509,6 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) createConn := false startCleaner := false - fmt.Printf("accquireConn HostClient:%p\n", c) - var n int c.connsLock.Lock() n = len(c.conns) @@ -1538,7 +1534,6 @@ func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) c.connsLock.Unlock() if cc != nil { - fmt.Printf("accquireConn use exist connection, HostClient:%p\n", c) return cc, nil } if !createConn { @@ -1693,7 +1688,6 @@ func (c *HostClient) cleanResource() { func (c *HostClient) hasResource() bool { cnt := c.ConnsCount() - fmt.Printf("HostClient has resource, cnt:%d\n", cnt) return cnt > 0 } @@ -2951,8 +2945,6 @@ func (c *clientCleaner) register(client resourceClean) { // Check if init c.init() - fmt.Printf("register client:%p\n", client) - // Store client c.clients.Store(client, struct{}{}) } @@ -2967,23 +2959,17 @@ func (c *clientCleaner) cleanClient() { return true }) - fmt.Printf("number of clients:%d\n", len(allClients)) - // Clean connections for each client. for _, client := range allClients { //Clean resource. - fmt.Printf("check client:%p\n", client) client.cleanResource() // If a client has no resource, delete it from c.clients. // But client may create resource before deleted, // so check again. If it has any resource, store it into c.clients. if !client.hasResource() { - fmt.Printf("client has no resource:%p\n", client) c.clients.Delete(client) if client.hasResource() { c.clients.Store(client, struct{}{}) - } else { - fmt.Printf("delete client:%p\n", client) } } }