Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change replicate status metrics to show additional info #4582

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 106 additions & 3 deletions cmd/admin-replicate-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"fmt"
"sort"
"strings"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/madmin-go/v3"
"github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/minio/pkg/console"
)

Expand Down Expand Up @@ -122,7 +125,9 @@ func (i srStatus) JSON() string {

func (i srStatus) String() string {
var messages []string

ms := i.Metrics
q := i.Metrics.Queued
w := i.Metrics.ActiveWorkers
// Color palette initialization
console.SetColor("Summary", color.New(color.FgWhite, color.Bold))
console.SetColor("SummaryHdr", color.New(color.FgCyan, color.Bold))
Expand Down Expand Up @@ -174,7 +179,7 @@ func (i srStatus) String() string {
ss := ssMap[dID]
switch {
case !ss.HasBucket:
details = append(details, fmt.Sprintf("%s Bucket", blankCell))
details = append(details, fmt.Sprintf("%s ", blankCell))
case ss.OLockConfigMismatch, ss.PolicyMismatch, ss.QuotaCfgMismatch, ss.ReplicationCfgMismatch, ss.TagMismatch:
details = append(details, fmt.Sprintf("%s in-sync", crossTickCell))
default:
Expand Down Expand Up @@ -212,7 +217,7 @@ func (i srStatus) String() string {
ss := ssMap[dID]
switch {
case !ss.HasPolicy:
details = append(details, fmt.Sprintf("%s Policy", blankCell))
details = append(details, blankCell)
case ss.PolicyMismatch:
details = append(details, fmt.Sprintf("%s in-sync", crossTickCell))
default:
Expand Down Expand Up @@ -319,7 +324,86 @@ func (i srStatus) String() string {
messages = append(messages, i.getGroupStatusSummary(siteNames, nameIDMap, "Group")...)

}
if i.opts.Metrics {
uiFn := func(theme string) func(string) string {
return func(s string) string {
return console.Colorize(theme, s)
}
}
singleTgt := len(ms.Metrics) == 1
maxui := uiFn("Peak")
avgui := uiFn("Avg")
valueui := uiFn("Value")
messages = append(messages,
console.Colorize("SummaryHdr", "Object replication status:"))

messages = append(messages, console.Colorize("UptimeStr", fmt.Sprintf("Replication status since %s", uiFn("Uptime")(humanize.RelTime(time.Now(), time.Now().Add(time.Duration(ms.Uptime)*time.Second), "", "ago")))))
// for queue stats
coloredDot := console.Colorize("qStatusOK", dot)
if q.Curr.Count > q.Avg.Count {
coloredDot = console.Colorize("qStatusWarn", dot)
}

var replicatedCount, replicatedSize int64
for _, m := range ms.Metrics {
nodeName := m.Endpoint
nodeui := uiFn(getNodeTheme(nodeName))
messages = append(messages, nodeui(nodeName))
messages = append(messages, fmt.Sprintf("Replicated: %s objects (%s)", humanize.Comma(int64(m.ReplicatedCount)), valueui(humanize.IBytes(uint64(m.ReplicatedSize)))))

if singleTgt { // for single target - combine summary section into the target section
messages = append(messages, fmt.Sprintf("Received: %s objects (%s)", humanize.Comma(int64(ms.ReplicaCount)), humanize.IBytes(uint64(ms.ReplicaSize))))
messages = append(messages, fmt.Sprintf("Queued: %s %s objects, (%s) (%s: %s objects, %s; %s: %s objects, %s)", coloredDot, humanize.Comma(int64(q.Curr.Count)), valueui(humanize.IBytes(uint64(q.Curr.Bytes))), avgui("avg"),
humanize.Comma(int64(q.Avg.Count)), valueui(humanize.IBytes(uint64(q.Avg.Bytes))), maxui("max"),
humanize.Comma(int64(q.Max.Count)), valueui(humanize.IBytes(uint64(q.Max.Bytes)))))
messages = append(messages, fmt.Sprintf("Workers: %s (%s: %s; %s %s) ", humanize.Comma(int64(w.Curr)), avgui("avg"), humanize.Comma(int64(w.Avg)), maxui("max"), humanize.Comma(int64(w.Max))))
} else {
replicatedCount += m.ReplicatedCount
replicatedSize += m.ReplicatedSize
}

if m.XferStats != nil {
tgtXfer, ok := m.XferStats[replication.Total]
if ok {
messages = append(messages, fmt.Sprintf("Transfer Rate: %s/s (avg: %s/s; max %s/s)", valueui(humanize.Bytes(uint64(tgtXfer.CurrRate))), valueui(humanize.Bytes(uint64(tgtXfer.AvgRate))), valueui(humanize.Bytes(uint64(tgtXfer.PeakRate)))))
messages = append(messages, fmt.Sprintf("Latency: %s (avg: %s; max %s)", valueui(m.Latency.Curr.Round(time.Millisecond).String()), valueui(m.Latency.Avg.Round(time.Millisecond).String()), valueui(m.Latency.Max.Round(time.Millisecond).String())))
}
}

healthDot := console.Colorize("online", dot)
if !m.Online {
healthDot = console.Colorize("offline", dot)
}
currDowntime := time.Duration(0)
if !m.Online && !m.LastOnline.IsZero() {
currDowntime = UTCNow().Sub(m.LastOnline)
}
// normalize because total downtime is calculated at server side at heartbeat interval, may be slightly behind
totalDowntime := m.TotalDowntime
if currDowntime > totalDowntime {
totalDowntime = currDowntime
}
var linkStatus string
if m.Online {
linkStatus = healthDot + fmt.Sprintf(" online (total downtime: %s)", timeDurationToHumanizedDuration(totalDowntime).String())
} else {
linkStatus = healthDot + fmt.Sprintf(" offline %s (total downtime: %s)", timeDurationToHumanizedDuration(currDowntime).String(), valueui(timeDurationToHumanizedDuration(totalDowntime).String()))
}
messages = append(messages, fmt.Sprintf("Link: %s", linkStatus))
messages = append(messages, fmt.Sprintf("Errors: %s in last 1 minute; %s in last 1hr; %s since uptime", valueui(humanize.Comma(int64(m.Failed.LastMinute.Count))), valueui(humanize.Comma(int64(m.Failed.LastHour.Count))), valueui(humanize.Comma(int64(m.Failed.Totals.Count)))))
messages = append(messages, "")
}
if !singleTgt {
messages = append(messages,
console.Colorize("SummaryHdr", "Summary:"))
messages = append(messages, fmt.Sprintf("Replicated: %s objects (%s)", humanize.Comma(int64(replicatedCount)), valueui(humanize.IBytes(uint64(replicatedSize)))))
messages = append(messages, fmt.Sprintf("Queued: %s %s objects, (%s) (%s: %s objects, %s; %s: %s objects, %s)", coloredDot, humanize.Comma(int64(q.Curr.Count)), valueui(humanize.IBytes(uint64(q.Curr.Bytes))), avgui("avg"),
humanize.Comma(int64(q.Avg.Count)), valueui(humanize.IBytes(uint64(q.Avg.Bytes))), maxui("max"),
humanize.Comma(int64(q.Max.Count)), valueui(humanize.IBytes(uint64(q.Max.Bytes)))))

messages = append(messages, fmt.Sprintf("Received: %s objects (%s)", humanize.Comma(int64(ms.ReplicaCount)), humanize.IBytes(uint64(ms.ReplicaSize))))
}
}
return console.Colorize("UserMessage", strings.Join(messages, "\n"))
}

Expand Down Expand Up @@ -443,6 +527,7 @@ func (i srStatus) getBucketStatusSummary(siteNames []string, nameIDMap map[strin
}
}
messages = append(messages, rows...)

return messages
}

Expand Down Expand Up @@ -646,6 +731,7 @@ func srStatusOpts(ctx *cli.Context) (opts madmin.SRStatusOptions) {
opts.Users = true
opts.Groups = true
opts.Policies = true
opts.Metrics = true
return
}
opts.Buckets = ctx.Bool("buckets")
Expand Down Expand Up @@ -691,6 +777,23 @@ func mainAdminReplicationStatus(ctx *cli.Context) error {

console.SetColor("UserMessage", color.New(color.FgGreen))
console.SetColor("WarningMessage", color.New(color.FgYellow))
for _, c := range colors {
console.SetColor(fmt.Sprintf("Node%d", c), color.New(c, color.Bold))
}
console.SetColor("Replicated", color.New(color.FgCyan))
console.SetColor("In-Queue", color.New(color.Bold, color.FgYellow))
console.SetColor("Avg", color.New(color.FgCyan))
console.SetColor("Peak", color.New(color.FgYellow))
console.SetColor("Value", color.New(color.FgWhite, color.Bold))

console.SetColor("Current", color.New(color.FgCyan))
console.SetColor("Uptime", color.New(color.Bold, color.FgWhite))
console.SetColor("UptimeStr", color.New(color.FgHiWhite))

console.SetColor("qStatusWarn", color.New(color.FgYellow, color.Bold))
console.SetColor("qStatusOK", color.New(color.FgGreen, color.Bold))
console.SetColor("online", color.New(color.FgGreen, color.Bold))
console.SetColor("offline", color.New(color.FgRed, color.Bold))

// Get the alias parameter from cli
args := ctx.Args()
Expand Down
4 changes: 2 additions & 2 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,8 @@ func (f *fsClient) RemoveReplication(_ context.Context) *probe.Error {
}

// GetReplicationMetrics - Get replication metrics for a given bucket, not implemented.
func (f *fsClient) GetReplicationMetrics(_ context.Context) (replication.Metrics, *probe.Error) {
return replication.Metrics{}, probe.NewError(APINotImplemented{
func (f *fsClient) GetReplicationMetrics(_ context.Context) (replication.MetricsV2, *probe.Error) {
return replication.MetricsV2{}, probe.NewError(APINotImplemented{
API: "GetReplicationMetrics",
APIType: "filesystem",
})
Expand Down
8 changes: 4 additions & 4 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2759,15 +2759,15 @@ func (c *S3Client) SetReplication(ctx context.Context, cfg *replication.Config,
}

// GetReplicationMetrics - Get replication metrics for a given bucket.
func (c *S3Client) GetReplicationMetrics(ctx context.Context) (replication.Metrics, *probe.Error) {
func (c *S3Client) GetReplicationMetrics(ctx context.Context) (replication.MetricsV2, *probe.Error) {
bucket, _ := c.url2BucketAndObject()
if bucket == "" {
return replication.Metrics{}, probe.NewError(BucketNameEmpty{})
return replication.MetricsV2{}, probe.NewError(BucketNameEmpty{})
}

metrics, e := c.api.GetBucketReplicationMetrics(ctx, bucket)
metrics, e := c.api.GetBucketReplicationMetricsV2(ctx, bucket)
if e != nil {
return replication.Metrics{}, probe.NewError(e)
return replication.MetricsV2{}, probe.NewError(e)
}
return metrics, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Client interface {
GetReplication(ctx context.Context) (replication.Config, *probe.Error)
SetReplication(ctx context.Context, cfg *replication.Config, opts replication.Options) *probe.Error
RemoveReplication(ctx context.Context) *probe.Error
GetReplicationMetrics(ctx context.Context) (replication.Metrics, *probe.Error)
GetReplicationMetrics(ctx context.Context) (replication.MetricsV2, *probe.Error)
ResetReplication(ctx context.Context, before time.Duration, arn string) (replication.ResyncTargetsInfo, *probe.Error)
ReplicationResyncStatus(ctx context.Context, arn string) (rinfo replication.ResyncTargetsInfo, err *probe.Error)

Expand Down
21 changes: 21 additions & 0 deletions cmd/replicate-backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ func checkReplicateBacklogSyntax(ctx *cli.Context) {
}
}

type replicateMRFMessage struct {
Op string `json:"op"`
Status string `json:"status"`
madmin.ReplicationMRF
}

func (m replicateMRFMessage) JSON() string {
m.Status = "success"
jsonMessageBytes, e := json.MarshalIndent(m, "", " ")
fatalIf(probe.NewError(e), "Unable to marshal into JSON.")
return string(jsonMessageBytes)
}

func (m replicateMRFMessage) String() string {
return console.Colorize("", newPrettyTable(" | ",
Field{getNodeTheme(m.ReplicationMRF.NodeName), len(m.ReplicationMRF.NodeName) + 3},
Field{"Count", 7},
Field{"Object", -1},
).buildRow(m.ReplicationMRF.NodeName, fmt.Sprintf("Retry=%d", m.ReplicationMRF.RetryCount), fmt.Sprintf("%s (%s)", m.ReplicationMRF.Object, m.ReplicationMRF.VersionID)))
}

type replicateBacklogMessage struct {
Op string `json:"op"`
Diff madmin.DiffInfo `json:"diff,omitempty"`
Expand Down