Skip to content

Commit

Permalink
replication: change metrics API (#1833)
Browse files Browse the repository at this point in the history
  • Loading branch information
poornas committed Jun 28, 2023
1 parent de0473e commit ac95c83
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 1 deletion.
36 changes: 36 additions & 0 deletions api-bucket-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,39 @@ func (c *Client) GetBucketReplicationResyncStatus(ctx context.Context, bucketNam
}
return rinfo, nil
}

// GetBucketReplicationMetricsV2 fetches bucket replication status metrics
func (c *Client) GetBucketReplicationMetricsV2(ctx context.Context, bucketName string) (s replication.MetricsV2, err error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return s, err
}
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
urlValues.Set("replication-metrics", "2")

// Execute GET on bucket to get replication metrics.
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
})

defer closeResponse(resp)
if err != nil {
return s, err
}

if resp.StatusCode != http.StatusOK {
return s, httpRespToErrorResponse(resp, bucketName, "")
}
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return s, err
}

if err := json.Unmarshal(respBytes, &s); err != nil {
return s, err
}
return s, nil
}
114 changes: 113 additions & 1 deletion pkg/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/xml"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -704,6 +705,8 @@ type TargetMetrics struct {
BandWidthLimitInBytesPerSecond int64 `json:"limitInBits"`
// Current bandwidth used in bytes/sec for this target
CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth"`
// Completed count
ReplicatedCount uint64 `json:"replicationCount"`
}

// Metrics represents inline replication metrics for a bucket.
Expand All @@ -721,6 +724,10 @@ type Metrics struct {
PendingCount uint64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates across targets
FailedCount uint64 `json:"failedReplicationCount"`
// Total Replica counts
ReplicaCount int64 `json:"replicaCount,omitempty"`
// Total Replicated count
ReplicatedCount int64 `json:"replicationCount,omitempty"`
}

// ResyncTargetsInfo provides replication target information to resync replicated data.
Expand All @@ -742,9 +749,114 @@ type ResyncTarget struct {
FailedSize int64 `json:"failedReplicationSize,omitempty"`
// Total number of failed operations
FailedCount int64 `json:"failedReplicationCount,omitempty"`
// Total number of failed operations
// Total number of completed operations
ReplicatedCount int64 `json:"replicationCount,omitempty"`
// Last bucket/object replicated.
Bucket string `json:"bucket,omitempty"`
Object string `json:"object,omitempty"`
}

// XferStats holds transfer rate info for uploads/sec
type XferStats struct {
AvgRate float64 `json:"avgRate"`
PeakRate float64 `json:"peakRate"`
CurrRate float64 `json:"currRate"`
}

// InQueueStats holds stats for objects in replication queue
type InQueueStats struct {
Count int32 `json:"count"`
Bytes int64 `json:"bytes"`
}

// MetricName name of replication metric
type MetricName string

const (
// Large is a metric name for large objects >=128MiB
Large MetricName = "Large"
// Small is a metric name for objects <128MiB size
Small MetricName = "Small"
// Total is a metric name for total objects
Total MetricName = "Total"
)

// ReplQNodeStats holds stats for a node in replication queue
type ReplQNodeStats struct {
NodeName string `json:"nodeName"`
Uptime int64 `json:"uptime"`
ActiveWorkers int32 `json:"activeWorkers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
QStats map[MetricName]InQueueStats `json:"qStats"`
}

// ReplQueueStats holds stats for replication queue across nodes
type ReplQueueStats struct {
Nodes []ReplQNodeStats `json:"nodes"`
}

// Workers returns number of workers across all nodes
func (q ReplQueueStats) Workers() int64 {
var workers int64
for _, node := range q.Nodes {
workers += int64(node.ActiveWorkers)
}
return workers
}

// ReplQStats holds stats for objects in replication queue
type ReplQStats struct {
Uptime int64 `json:"uptime"`
Workers int64 `json:"workers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
QStats map[MetricName]InQueueStats `json:"qStats"`
}

// QStats returns cluster level stats for objects in replication queue
func (q ReplQueueStats) QStats() (r ReplQStats) {
r.QStats = make(map[MetricName]InQueueStats)
r.XferStats = make(map[MetricName]XferStats)
for _, node := range q.Nodes {
r.Workers += int64(node.ActiveWorkers)
for k, v := range node.XferStats {
st, ok := r.XferStats[k]
if !ok {
st = XferStats{}
}
st.AvgRate += v.AvgRate
st.CurrRate += v.CurrRate
st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
r.XferStats[k] = st
}
for k, v := range node.QStats {
st, ok := r.QStats[k]
if !ok {
st = InQueueStats{}
}
st.Count += v.Count
st.Bytes += v.Bytes
r.QStats[k] = st
}
r.Uptime += node.Uptime
}
if len(q.Nodes) > 0 {
for k := range r.XferStats {
st := r.XferStats[k]
st.AvgRate /= float64(len(q.Nodes))
st.CurrRate /= float64(len(q.Nodes))
r.XferStats[k] = st
}
r.Uptime /= int64(len(q.Nodes)) // average uptime
}

return
}

// MetricsV2 represents replication metrics for a bucket.
type MetricsV2 struct {
History Metrics `json:"history"`
CurrentStats Metrics `json:"currStats"`
QueueStats ReplQueueStats `json:"queueStats"`
}

0 comments on commit ac95c83

Please sign in to comment.