Skip to content

Commit

Permalink
Merge pull request #3116 from olljanat/csi-allow-publish-without-staging
Browse files Browse the repository at this point in the history
CSI: Allow NodePublishVolume even when plugin does not support staging
  • Loading branch information
dperny committed Feb 17, 2023
2 parents b7708a5 + bd7d9d8 commit 2ad26e5
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 127 deletions.
74 changes: 15 additions & 59 deletions agent/csi/plugin/plugin.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/docker/docker/pkg/plugingetter"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/internal/csi/capability"
"github.com/moby/swarmkit/v2/log"
)

Expand Down Expand Up @@ -208,10 +209,9 @@ func (np *nodePlugin) NodeStageVolume(ctx context.Context, req *api.VolumeAssign
}

stagingTarget := stagePath(req)

// Check arguments
if len(req.VolumeID) == 0 {
return status.Error(codes.InvalidArgument, "VolumeID missing in request")
err := capability.CheckArguments(req)
if err != nil {
return err
}

c, err := np.Client(ctx)
Expand All @@ -223,7 +223,7 @@ func (np *nodePlugin) NodeStageVolume(ctx context.Context, req *api.VolumeAssign
VolumeId: req.VolumeID,
StagingTargetPath: stagingTarget,
Secrets: np.makeSecrets(req),
VolumeCapability: makeCapability(req.AccessMode),
VolumeCapability: capability.MakeCapability(req.AccessMode),
VolumeContext: req.VolumeContext,
PublishContext: req.PublishContext,
})
Expand Down Expand Up @@ -286,24 +286,25 @@ func (np *nodePlugin) NodeUnstageVolume(ctx context.Context, req *api.VolumeAssi
}

func (np *nodePlugin) NodePublishVolume(ctx context.Context, req *api.VolumeAssignment) error {
// Check arguments
if len(req.VolumeID) == 0 {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
err := capability.CheckArguments(req)
if err != nil {
return err
}

np.mu.Lock()
defer np.mu.Unlock()

publishTarget := publishPath(req)

// some volumes do not require staging. we can check this by checkign the
// staging variable, or we can just see if there is a staging path in the
// map.
// Some volumes plugins require staging; we track this with a boolean, which
// also implies a staging path in the path map. If the plugin is marked as
// requiring staging but does not have a staging path in the map, that is an
// error.
var stagingPath string
if vs, ok := np.volumeMap[req.ID]; ok {
stagingPath = vs.stagingPath
} else {
return status.Error(codes.FailedPrecondition, "volume not staged")
} else if np.staging {
return status.Error(codes.FailedPrecondition, "volume requires staging but was not staged")
}

c, err := np.Client(ctx)
Expand All @@ -315,7 +316,7 @@ func (np *nodePlugin) NodePublishVolume(ctx context.Context, req *api.VolumeAssi
VolumeId: req.VolumeID,
TargetPath: publishTarget,
StagingTargetPath: stagingPath,
VolumeCapability: makeCapability(req.AccessMode),
VolumeCapability: capability.MakeCapability(req.AccessMode),
Secrets: np.makeSecrets(req),
VolumeContext: req.VolumeContext,
PublishContext: req.PublishContext,
Expand Down Expand Up @@ -399,51 +400,6 @@ func makeNodeInfo(csiNodeInfo *csi.NodeGetInfoResponse) *api.NodeCSIInfo {
}
}

func makeCapability(am *api.VolumeAccessMode) *csi.VolumeCapability {
var mode csi.VolumeCapability_AccessMode_Mode
switch am.Scope {
case api.VolumeScopeSingleNode:
switch am.Sharing {
case api.VolumeSharingNone, api.VolumeSharingOneWriter, api.VolumeSharingAll:
mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.VolumeSharingReadOnly:
mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY
}
case api.VolumeScopeMultiNode:
switch am.Sharing {
case api.VolumeSharingReadOnly:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.VolumeSharingOneWriter:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER
case api.VolumeSharingAll:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
}

capability := &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: mode,
},
}

if block := am.GetBlock(); block != nil {
capability.AccessType = &csi.VolumeCapability_Block{
// Block type is empty.
Block: &csi.VolumeCapability_BlockVolume{},
}
}

if mount := am.GetMount(); mount != nil {
capability.AccessType = &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
FsType: mount.FsType,
MountFlags: mount.MountFlags,
},
}
}
return capability
}

// stagePath returns the staging path for a given volume assignment
func stagePath(v *api.VolumeAssignment) string {
// this really just exists so we use the same trick to determine staging
Expand Down
58 changes: 38 additions & 20 deletions agent/csi/plugin/plugin_test.go
Expand Up @@ -80,30 +80,48 @@ func TestNodeUnstageVolume(t *testing.T) {
}

func TestNodePublishVolume(t *testing.T) {
plugin := "plugin-3"
node := "node-1"
nodePlugin := newVolumeClient(plugin, node)
s := &api.VolumeAssignment{
VolumeID: "vol3",
AccessMode: &api.VolumeAccessMode{
Scope: api.VolumeScopeMultiNode,
Sharing: api.VolumeSharingOneWriter,
AccessType: &api.VolumeAccessMode_Mount{
Mount: &api.VolumeAccessMode_MountVolume{},
},
var testcases = []struct {
staging bool
plugin string
}{
{
plugin: "staging-plugin",
staging: true,
},
Driver: &api.Driver{
Name: plugin,
{
plugin: "non-staging-plugin",
staging: false,
},
}
err := nodePlugin.NodePublishVolume(context.Background(), s)
assert.Equal(t, codes.FailedPrecondition, testutils.ErrorCode(err))

// Volume needs to be staged before publishing. Stage -> Publish
err = nodePlugin.NodeStageVolume(context.Background(), s)
require.NoError(t, err)
err = nodePlugin.NodePublishVolume(context.Background(), s)
require.NoError(t, err)
for _, tc := range testcases {
nodePlugin := newVolumeClient(tc.plugin, "node-1")
s := &api.VolumeAssignment{
VolumeID: "vol3",
AccessMode: &api.VolumeAccessMode{
Scope: api.VolumeScopeMultiNode,
Sharing: api.VolumeSharingOneWriter,
AccessType: &api.VolumeAccessMode_Mount{
Mount: &api.VolumeAccessMode_MountVolume{},
},
},
Driver: &api.Driver{
Name: tc.plugin,
},
}

nodePlugin.staging = tc.staging
if nodePlugin.staging {
err := nodePlugin.NodePublishVolume(context.Background(), s)
assert.Equal(t, codes.FailedPrecondition, testutils.ErrorCode(err))

// Volume needs to be staged before publishing. Stage -> Publish
err = nodePlugin.NodeStageVolume(context.Background(), s)
require.NoError(t, err)
}
err := nodePlugin.NodePublishVolume(context.Background(), s)
require.NoError(t, err)
}
}

func TestNodeUnpublishVolume(t *testing.T) {
Expand Down
64 changes: 64 additions & 0 deletions internal/csi/capability/capability.go
@@ -0,0 +1,64 @@
package capability

import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/moby/swarmkit/v2/api"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func CheckArguments(req *api.VolumeAssignment) error {
if len(req.VolumeID) == 0 {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.AccessMode == nil {
return status.Error(codes.InvalidArgument, "AccessMode missing in request")
}
return nil
}

func MakeCapability(am *api.VolumeAccessMode) *csi.VolumeCapability {
var mode csi.VolumeCapability_AccessMode_Mode
switch am.Scope {
case api.VolumeScopeSingleNode:
switch am.Sharing {
case api.VolumeSharingNone, api.VolumeSharingOneWriter, api.VolumeSharingAll:
mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.VolumeSharingReadOnly:
mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY
}
case api.VolumeScopeMultiNode:
switch am.Sharing {
case api.VolumeSharingReadOnly:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.VolumeSharingOneWriter:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER
case api.VolumeSharingAll:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
}

capability := &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: mode,
},
}

if block := am.GetBlock(); block != nil {
capability.AccessType = &csi.VolumeCapability_Block{
// Block type is empty.
Block: &csi.VolumeCapability_BlockVolume{},
}
}

if mount := am.GetMount(); mount != nil {
capability.AccessType = &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
FsType: mount.FsType,
MountFlags: mount.MountFlags,
},
}
}

return capability
}
46 changes: 0 additions & 46 deletions manager/csi/convert.go
Expand Up @@ -45,52 +45,6 @@ func makeTopology(t *api.Topology) *csi.Topology {
}
}

func makeCapability(am *api.VolumeAccessMode) *csi.VolumeCapability {
var mode csi.VolumeCapability_AccessMode_Mode
switch am.Scope {
case api.VolumeScopeSingleNode:
switch am.Sharing {
case api.VolumeSharingNone, api.VolumeSharingOneWriter, api.VolumeSharingAll:
mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.VolumeSharingReadOnly:
mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY
}
case api.VolumeScopeMultiNode:
switch am.Sharing {
case api.VolumeSharingReadOnly:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.VolumeSharingOneWriter:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER
case api.VolumeSharingAll:
mode = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
}

capability := &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: mode,
},
}

if block := am.GetBlock(); block != nil {
capability.AccessType = &csi.VolumeCapability_Block{
// Block type is empty.
Block: &csi.VolumeCapability_BlockVolume{},
}
}

if mount := am.GetMount(); mount != nil {
capability.AccessType = &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
FsType: mount.FsType,
MountFlags: mount.MountFlags,
},
}
}

return capability
}

// makeCapcityRange converts the swarmkit CapacityRange object to the
// equivalent CSI object
func makeCapacityRange(cr *api.CapacityRange) *csi.CapacityRange {
Expand Down
5 changes: 3 additions & 2 deletions manager/csi/plugin.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/docker/docker/pkg/plugingetter"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/internal/csi/capability"
)

// Plugin is the interface for a CSI controller plugin.
Expand Down Expand Up @@ -275,7 +276,7 @@ func (p *plugin) makeCreateVolume(v *api.Volume) *csi.CreateVolumeRequest {
Name: v.Spec.Annotations.Name,
Parameters: v.Spec.Driver.Options,
VolumeCapabilities: []*csi.VolumeCapability{
makeCapability(v.Spec.AccessMode),
capability.MakeCapability(v.Spec.AccessMode),
},
Secrets: secrets,
AccessibilityRequirements: makeTopologyRequirement(v.Spec.AccessibilityRequirements),
Expand Down Expand Up @@ -307,7 +308,7 @@ func (p *plugin) makeControllerPublishVolumeRequest(v *api.Volume, nodeID string
}

secrets := p.makeSecrets(v)
capability := makeCapability(v.Spec.AccessMode)
capability := capability.MakeCapability(v.Spec.AccessMode)
capability.AccessType = &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
}
Expand Down

0 comments on commit 2ad26e5

Please sign in to comment.