From da5809449d36dac3fb62d7070852824369f54370 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Fri, 26 May 2023 14:16:26 -0700 Subject: [PATCH] replication: change metrics API for additional info to be passed back. --- api-bucket-replication.go | 36 +++++++++++++ pkg/replication/replication.go | 93 +++++++++++++++++++++++++++++++++- 2 files changed, 128 insertions(+), 1 deletion(-) diff --git a/api-bucket-replication.go b/api-bucket-replication.go index d5895dfe0..722dbac7b 100644 --- a/api-bucket-replication.go +++ b/api-bucket-replication.go @@ -289,3 +289,39 @@ func (c *Client) GetBucketReplicationResyncStatus(ctx context.Context, bucketNam } return rinfo, nil } + +// GetBucketReplicationMetricsV2 fetches bucket replication status metrics +func (c *Client) GetBucketReplicationMetricsV2(ctx context.Context, bucketName string) (s replication.MetricsV2, err error) { + // Input validation. + if err := s3utils.CheckValidBucketName(bucketName); err != nil { + return s, err + } + // Get resources properly escaped and lined up before + // using them in http request. + urlValues := make(url.Values) + urlValues.Set("replication-metrics", "v2") + + // Execute GET on bucket to get replication config. + resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ + bucketName: bucketName, + queryValues: urlValues, + }) + + defer closeResponse(resp) + if err != nil { + return s, err + } + + if resp.StatusCode != http.StatusOK { + return s, httpRespToErrorResponse(resp, bucketName, "") + } + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return s, err + } + + if err := json.Unmarshal(respBytes, &s); err != nil { + return s, err + } + return s, nil +} diff --git a/pkg/replication/replication.go b/pkg/replication/replication.go index 645fe18cb..054602205 100644 --- a/pkg/replication/replication.go +++ b/pkg/replication/replication.go @@ -704,6 +704,8 @@ type TargetMetrics struct { BandWidthLimitInBytesPerSecond int64 `json:"limitInBits"` // Current bandwidth used in bytes/sec for this target CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth"` + // Completed count + ReplicatedCount uint64 `json:"replicationCount"` } // Metrics represents inline replication metrics for a bucket. @@ -721,6 +723,10 @@ type Metrics struct { PendingCount uint64 `json:"pendingReplicationCount"` // Total number of failed operations including metadata updates across targets FailedCount uint64 `json:"failedReplicationCount"` + // Total Replica counts + ReplicaCount int64 `json:"replicaCount,omitempty"` + // Total Replicated count + ReplicatedCount int64 `json:"replicationCount,omitempty"` } // ResyncTargetsInfo provides replication target information to resync replicated data. @@ -742,9 +748,94 @@ type ResyncTarget struct { FailedSize int64 `json:"failedReplicationSize,omitempty"` // Total number of failed operations FailedCount int64 `json:"failedReplicationCount,omitempty"` - // Total number of failed operations + // Total number of completed operations ReplicatedCount int64 `json:"replicationCount,omitempty"` // Last bucket/object replicated. Bucket string `json:"bucket,omitempty"` Object string `json:"object,omitempty"` } + +// ReplicationXferRate holds transfer rate info for uploads +type ReplicationXferRate struct { + Avg float64 `json:"avg"` + Peak float64 `json:"peak"` + Curr float64 `json:"curr"` +} + +type ReplQNodeStats struct { + NodeName string `json:"nodename"` + ActiveWorkers int32 `json:"activeworkers"` + Lrg ReplicationXferRate `json:"lrg"` + Sml ReplicationXferRate `json:"sml"` + QueuedCount int32 `json:"totQueuedCount"` + QueuedBytes int64 `json:"totQueuedBytes"` + TotQueuedLrgCount int32 `json:"totQueuedLrgCount"` + TotQueuedLrgBytes int64 `json:"totQueuedLrgBytes"` + TotQueuedSmlCount int32 `json:"totQueuedSmlCount"` + TotQueuedSmlBytes int64 `json:"totQueuedSmlBytes"` +} +type ReplQueueStats struct { + Nodes []ReplQNodeStats `json:"nodes"` + Uptime int64 `json:"uptime"` +} + +func (q ReplQueueStats) Workers() int64 { + var workers int64 + for _, node := range q.Nodes { + workers += int64(node.ActiveWorkers) + } + return workers +} + +type ReplQStats struct { + Uptime int64 `json:"int64"` + Workers int64 `json:"workers"` + Lrg ReplicationXferRate `json:"lrg"` + Sml ReplicationXferRate `json:"sml"` + QueuedCount int32 `json:"totQueuedCount"` + QueuedBytes int64 `json:"totQueuedBytes"` + TotQueuedLrgCount int32 `json:"totQueuedLrgCount"` + TotQueuedLrgBytes int64 `json:"totQueuedLrgBytes"` + TotQueuedSmlCount int32 `json:"totQueuedSmlCount"` + TotQueuedSmlBytes int64 `json:"totQueuedSmlBytes"` +} + +func (q ReplQueueStats) QStats() (r ReplQStats) { + r.Uptime = q.Uptime + var lavg, lcurr, lpeak, savg, scurr, speak float64 + for _, node := range q.Nodes { + r.Workers += int64(node.ActiveWorkers) + r.QueuedCount += node.QueuedCount + r.QueuedBytes += node.QueuedBytes + r.TotQueuedLrgCount += node.TotQueuedLrgCount + r.TotQueuedLrgBytes += node.TotQueuedLrgBytes + r.TotQueuedSmlCount += node.TotQueuedSmlCount + r.TotQueuedSmlBytes += node.TotQueuedSmlBytes + lavg += node.Lrg.Avg + lcurr += node.Lrg.Curr + if node.Lrg.Peak > lpeak { + lpeak = node.Lrg.Peak + } + savg += node.Sml.Avg + scurr += node.Sml.Curr + if node.Sml.Peak > speak { + speak = node.Sml.Peak + } + } + if len(q.Nodes) > 0 { + r.Lrg.Avg = lavg / float64(len(q.Nodes)) + r.Lrg.Curr = lcurr / float64(len(q.Nodes)) + r.Lrg.Peak = lpeak + r.Sml.Avg = savg / float64(len(q.Nodes)) + r.Sml.Curr = scurr / float64(len(q.Nodes)) + r.Sml.Peak = speak + } + + return +} + +type MetricsV2 struct { + History Metrics `json:"history"` + CurrentStats Metrics `json:"currStats"` + QueueStats ReplQueueStats `json:"queueStats"` +}