Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
poornas committed Jun 28, 2023
1 parent e7bd6d1 commit a263fd5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 52 deletions.
4 changes: 2 additions & 2 deletions api-bucket-replication.go
Expand Up @@ -299,9 +299,9 @@ func (c *Client) GetBucketReplicationMetricsV2(ctx context.Context, bucketName s
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
urlValues.Set("replication-metrics", "v2")
urlValues.Set("replication-metrics", "2")

// Execute GET on bucket to get replication config.
// Execute GET on bucket to get replication metrics.
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
Expand Down
121 changes: 71 additions & 50 deletions pkg/replication/replication.go
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/xml"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -755,30 +756,47 @@ type ResyncTarget struct {
Object string `json:"object,omitempty"`
}

// ReplicationXferRate holds transfer rate info for uploads
type ReplicationXferRate struct {
Avg float64 `json:"avg"`
Peak float64 `json:"peak"`
Curr float64 `json:"curr"`
// XferStats holds transfer rate info for uploads/sec
type XferStats struct {
AvgRate float64 `json:"avgRate"`
PeakRate float64 `json:"peakRate"`
CurrRate float64 `json:"currRate"`
}

// InQueueStats holds stats for objects in replication queue
type InQueueStats struct {
Count int32 `json:"count"`
Bytes int64 `json:"bytes"`
}

// MetricName name of replication metric
type MetricName string

const (
// Large is a metric name for large objects >=128MiB
Large MetricName = "Large"
// Small is a metric name for objects <128MiB size
Small MetricName = "Small"
// Total is a metric name for total objects
Total MetricName = "Total"
)

// ReplQNodeStats holds stats for a node in replication queue
type ReplQNodeStats struct {
NodeName string `json:"nodename"`
ActiveWorkers int32 `json:"activeworkers"`
Lrg ReplicationXferRate `json:"lrg"`
Sml ReplicationXferRate `json:"sml"`
QueuedCount int32 `json:"totQueuedCount"`
QueuedBytes int64 `json:"totQueuedBytes"`
TotQueuedLrgCount int32 `json:"totQueuedLrgCount"`
TotQueuedLrgBytes int64 `json:"totQueuedLrgBytes"`
TotQueuedSmlCount int32 `json:"totQueuedSmlCount"`
TotQueuedSmlBytes int64 `json:"totQueuedSmlBytes"`
NodeName string `json:"nodeName"`
Uptime int64 `json:"uptime"`
ActiveWorkers int32 `json:"activeWorkers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
QStats map[MetricName]InQueueStats `json:"qStats"`
}

// ReplQueueStats holds stats for replication queue across nodes
type ReplQueueStats struct {
Nodes []ReplQNodeStats `json:"nodes"`
Uptime int64 `json:"uptime"`
Nodes []ReplQNodeStats `json:"nodes"`
}

// Workers returns number of workers across all nodes
func (q ReplQueueStats) Workers() int64 {
var workers int64
for _, node := range q.Nodes {
Expand All @@ -787,53 +805,56 @@ func (q ReplQueueStats) Workers() int64 {
return workers
}

// ReplQStats holds stats for objects in replication queue
type ReplQStats struct {
Uptime int64 `json:"int64"`
Workers int64 `json:"workers"`
Lrg ReplicationXferRate `json:"lrg"`
Sml ReplicationXferRate `json:"sml"`
QueuedCount int32 `json:"totQueuedCount"`
QueuedBytes int64 `json:"totQueuedBytes"`
TotQueuedLrgCount int32 `json:"totQueuedLrgCount"`
TotQueuedLrgBytes int64 `json:"totQueuedLrgBytes"`
TotQueuedSmlCount int32 `json:"totQueuedSmlCount"`
TotQueuedSmlBytes int64 `json:"totQueuedSmlBytes"`
Uptime int64 `json:"uptime"`
Workers int64 `json:"workers"`

XferStats map[MetricName]XferStats `json:"xferStats"`
QStats map[MetricName]InQueueStats `json:"qStats"`
}

// QStats returns cluster level stats for objects in replication queue
func (q ReplQueueStats) QStats() (r ReplQStats) {
r.Uptime = q.Uptime
var lavg, lcurr, lpeak, savg, scurr, speak float64
r.QStats = make(map[MetricName]InQueueStats)
r.XferStats = make(map[MetricName]XferStats)
for _, node := range q.Nodes {
r.Workers += int64(node.ActiveWorkers)
r.QueuedCount += node.QueuedCount
r.QueuedBytes += node.QueuedBytes
r.TotQueuedLrgCount += node.TotQueuedLrgCount
r.TotQueuedLrgBytes += node.TotQueuedLrgBytes
r.TotQueuedSmlCount += node.TotQueuedSmlCount
r.TotQueuedSmlBytes += node.TotQueuedSmlBytes
lavg += node.Lrg.Avg
lcurr += node.Lrg.Curr
if node.Lrg.Peak > lpeak {
lpeak = node.Lrg.Peak
}
savg += node.Sml.Avg
scurr += node.Sml.Curr
if node.Sml.Peak > speak {
speak = node.Sml.Peak
for k, v := range node.XferStats {
st, ok := r.XferStats[k]
if !ok {
st = XferStats{}
}
st.AvgRate += v.AvgRate
st.CurrRate += v.CurrRate
st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
r.XferStats[k] = st
}
for k, v := range node.QStats {
st, ok := r.QStats[k]
if !ok {
st = InQueueStats{}
}
st.Count += v.Count
st.Bytes += v.Bytes
r.QStats[k] = st
}
r.Uptime += node.Uptime
}
if len(q.Nodes) > 0 {
r.Lrg.Avg = lavg / float64(len(q.Nodes))
r.Lrg.Curr = lcurr / float64(len(q.Nodes))
r.Lrg.Peak = lpeak
r.Sml.Avg = savg / float64(len(q.Nodes))
r.Sml.Curr = scurr / float64(len(q.Nodes))
r.Sml.Peak = speak
for k := range r.XferStats {
st := r.XferStats[k]
st.AvgRate /= float64(len(q.Nodes))
st.CurrRate /= float64(len(q.Nodes))
r.XferStats[k] = st
}
r.Uptime /= int64(len(q.Nodes)) // average uptime
}

return
}

// MetricsV2 represents replication metrics for a bucket.
type MetricsV2 struct {
History Metrics `json:"history"`
CurrentStats Metrics `json:"currStats"`
Expand Down

0 comments on commit a263fd5

Please sign in to comment.