Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication: change metrics API #1833

Merged
merged 2 commits into from Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 36 additions & 0 deletions api-bucket-replication.go
Expand Up @@ -289,3 +289,39 @@ func (c *Client) GetBucketReplicationResyncStatus(ctx context.Context, bucketNam
}
return rinfo, nil
}

// GetBucketReplicationMetricsV2 fetches bucket replication status metrics
func (c *Client) GetBucketReplicationMetricsV2(ctx context.Context, bucketName string) (s replication.MetricsV2, err error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return s, err
}
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
urlValues.Set("replication-metrics", "2")

// Execute GET on bucket to get replication metrics.
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
})

defer closeResponse(resp)
if err != nil {
return s, err
}

if resp.StatusCode != http.StatusOK {
return s, httpRespToErrorResponse(resp, bucketName, "")
}
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return s, err
}

if err := json.Unmarshal(respBytes, &s); err != nil {
return s, err
}
return s, nil
}
114 changes: 113 additions & 1 deletion pkg/replication/replication.go
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/xml"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -704,6 +705,8 @@ type TargetMetrics struct {
BandWidthLimitInBytesPerSecond int64 `json:"limitInBits"`
// Current bandwidth used in bytes/sec for this target
CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth"`
// Completed count
ReplicatedCount uint64 `json:"replicationCount"`
}

// Metrics represents inline replication metrics for a bucket.
Expand All @@ -721,6 +724,10 @@ type Metrics struct {
PendingCount uint64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates across targets
FailedCount uint64 `json:"failedReplicationCount"`
// Total Replica counts
ReplicaCount int64 `json:"replicaCount,omitempty"`
// Total Replicated count
ReplicatedCount int64 `json:"replicationCount,omitempty"`
}

// ResyncTargetsInfo provides replication target information to resync replicated data.
Expand All @@ -742,9 +749,114 @@ type ResyncTarget struct {
FailedSize int64 `json:"failedReplicationSize,omitempty"`
// Total number of failed operations
FailedCount int64 `json:"failedReplicationCount,omitempty"`
// Total number of failed operations
// Total number of completed operations
ReplicatedCount int64 `json:"replicationCount,omitempty"`
// Last bucket/object replicated.
Bucket string `json:"bucket,omitempty"`
Object string `json:"object,omitempty"`
}

// XferStats holds transfer rate info for uploads/sec
type XferStats struct {
AvgRate float64 `json:"avgRate"`
PeakRate float64 `json:"peakRate"`
CurrRate float64 `json:"currRate"`
}

// InQueueStats holds stats for objects in replication queue
type InQueueStats struct {
Count int32 `json:"count"`
Bytes int64 `json:"bytes"`
}

// MetricName name of replication metric
type MetricName string

const (
// Large is a metric name for large objects >=128MiB
Large MetricName = "Large"
// Small is a metric name for objects <128MiB size
Small MetricName = "Small"
// Total is a metric name for total objects
Total MetricName = "Total"
)

// ReplQNodeStats holds stats for a node in replication queue
type ReplQNodeStats struct {
NodeName string `json:"nodeName"`
Uptime int64 `json:"uptime"`
ActiveWorkers int32 `json:"activeWorkers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
QStats map[MetricName]InQueueStats `json:"qStats"`
}

// ReplQueueStats holds stats for replication queue across nodes
type ReplQueueStats struct {
Nodes []ReplQNodeStats `json:"nodes"`
}

// Workers returns number of workers across all nodes
func (q ReplQueueStats) Workers() int64 {
var workers int64
for _, node := range q.Nodes {
workers += int64(node.ActiveWorkers)
}
return workers
}

// ReplQStats holds stats for objects in replication queue
type ReplQStats struct {
Uptime int64 `json:"uptime"`
Workers int64 `json:"workers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
QStats map[MetricName]InQueueStats `json:"qStats"`
}

// QStats returns cluster level stats for objects in replication queue
func (q ReplQueueStats) QStats() (r ReplQStats) {
r.QStats = make(map[MetricName]InQueueStats)
r.XferStats = make(map[MetricName]XferStats)
for _, node := range q.Nodes {
r.Workers += int64(node.ActiveWorkers)
for k, v := range node.XferStats {
st, ok := r.XferStats[k]
if !ok {
st = XferStats{}
}
st.AvgRate += v.AvgRate
st.CurrRate += v.CurrRate
st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
r.XferStats[k] = st
}
for k, v := range node.QStats {
st, ok := r.QStats[k]
if !ok {
st = InQueueStats{}
}
st.Count += v.Count
st.Bytes += v.Bytes
r.QStats[k] = st
}
r.Uptime += node.Uptime
}
if len(q.Nodes) > 0 {
for k := range r.XferStats {
st := r.XferStats[k]
st.AvgRate /= float64(len(q.Nodes))
st.CurrRate /= float64(len(q.Nodes))
r.XferStats[k] = st
}
r.Uptime /= int64(len(q.Nodes)) // average uptime
}

return
}

// MetricsV2 represents replication metrics for a bucket.
type MetricsV2 struct {
History Metrics `json:"history"`
CurrentStats Metrics `json:"currStats"`
QueueStats ReplQueueStats `json:"queueStats"`
}