Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesisanalytics-flink): VPC support for Flink applications #24442

Merged
merged 13 commits into from Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 114 additions & 9 deletions packages/@aws-cdk/aws-kinesisanalytics-flink/lib/application.ts
@@ -1,4 +1,5 @@
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
import * as ec2 from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import { CfnApplicationCloudWatchLoggingOptionV2, CfnApplicationV2 } from '@aws-cdk/aws-kinesisanalytics';
import * as logs from '@aws-cdk/aws-logs';
Expand All @@ -14,7 +15,7 @@ import { LogLevel, MetricsLevel, PropertyGroups, Runtime } from './types';
* An interface expressing the public properties on both an imported and
* CDK-created Flink application.
*/
export interface IApplication extends core.IResource, iam.IGrantable {
export interface IApplication extends core.IResource, ec2.IConnectable, iam.IGrantable {
/**
* The application ARN.
*
Expand Down Expand Up @@ -351,6 +352,13 @@ abstract class ApplicationBase extends core.Resource implements IApplication {
// Implement iam.IGrantable interface
public abstract readonly grantPrincipal: iam.IPrincipal;

/**
* The underlying connections object for the connections getter.
*
* @internal
*/
protected _connections?: ec2.Connections;

/** Implement the convenience `IApplication.addToPrincipalPolicy` method. */
public addToRolePolicy(policyStatement: iam.PolicyStatement): boolean {
if (this.role) {
Expand All @@ -361,6 +369,13 @@ abstract class ApplicationBase extends core.Resource implements IApplication {
return false;
}

public get connections() {
if (!this._connections) {
throw new Error('This Application isn\'t associated with a VPC. Provide a "vpc" prop when creating the Application or "securityGroups" when importing it.');
}
return this._connections;
}

/**
* Return a CloudWatch metric associated with this Flink application.
*
Expand Down Expand Up @@ -720,6 +735,25 @@ abstract class ApplicationBase extends core.Resource implements IApplication {
}
}

/**
* Attributes used for importing an Application with Application.fromApplicationAttributes.
*/
export interface ApplicationAttributes {
/**
* The ARN of the Flink application.
*
* Format: arn:<partition>:kinesisanalytics:<region>:<account-id>:application/<application-name>
*/
readonly applicationArn: string;

/**
* The security groups for this Flink application if deployed in a VPC.
*
* @default no security groups
*/
readonly securityGroups?: ec2.ISecurityGroup[];
}

/**
* Props for creating an Application construct.
*/
Expand Down Expand Up @@ -840,6 +874,27 @@ export interface ApplicationProps {
* @default CDK's default LogGroup
*/
readonly logGroup?: logs.ILogGroup;

/**
* Deploy the Flink application in a VPC.
*
* @default no VPC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @default no VPC
* @default - no VPC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed a change which adds hyphens for these kinds of @default phrases to the entire file.

*/
readonly vpc?: ec2.IVpc;

/**
* Choose which VPC subnets to use.
*
* @default SubnetType.PRIVATE_WITH_EGRESS subnets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @default SubnetType.PRIVATE_WITH_EGRESS subnets
* @default - SubnetType.PRIVATE_WITH_EGRESS subnets

*/
readonly vpcSubnets?: ec2.SubnetSelection;

/**
* Security groups to use with a provided VPC.
*
* @default a new security group is created for this application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @default a new security group is created for this application.
* @default - a new security group is created for this application.

*/
readonly securityGroups?: ec2.ISecurityGroup[];
}

/**
Expand All @@ -851,15 +906,24 @@ class Import extends ApplicationBase {
public readonly applicationName: string;
public readonly applicationArn: string;

constructor(scope: Construct, id: string, attrs: { applicationArn: string, applicationName: string }) {
constructor(scope: Construct, id: string, attrs: { applicationArn: string, securityGroups?: ec2.ISecurityGroup[] }) {
Comment on lines -854 to +909
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has applicationName been removed?

I'd rather avoid removing this arg entirely, as doing so is a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import is an internal class that has never been exported so I don't think it's a breaking change. I think you may have missed that this code is in the Import class because the diff view isn't wide enough to show the class Import declaration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh I see, yep I'd missed that. Thanks

super(scope, id);

// Imported applications have no associated role or grantPrincipal
this.grantPrincipal = new iam.UnknownPrincipal({ resource: this });
this.role = undefined;

this.applicationArn = attrs.applicationArn;
this.applicationName = attrs.applicationName;
const applicationName = core.Stack.of(scope).splitArn(attrs.applicationArn, core.ArnFormat.SLASH_RESOURCE_NAME).resourceName;
if (!applicationName) {
throw new Error(`applicationArn for fromApplicationArn (${attrs.applicationArn}) must include resource name`);
}
this.applicationName = applicationName;
Comment on lines -862 to +921
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we getting this from the arn now? This worked when it was in the Import class, because that ARN was always an actual ARN. This ARN is almost certainly not an actual string, it's instead a Token, which means we can't validate this at synth time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this code was in the Application.fromApplicationName static method and now it's in the Import constructor. So I think this will still work. I think this is the same code path as before but with a slight refactoring.

Also I have tests for fromApplicationName and applicationArn that didn't change in this PR. Let me know if there is some edge case I'm missing.


const securityGroups = attrs.securityGroups ?? [];
if (securityGroups.length > 0) {
this._connections = new ec2.Connections({ securityGroups });
}
}
}

Expand All @@ -877,20 +941,25 @@ export class Application extends ApplicationBase {
public static fromApplicationName(scope: Construct, id: string, applicationName: string): IApplication {
const applicationArn = core.Stack.of(scope).formatArn(applicationArnComponents(applicationName));

return new Import(scope, id, { applicationArn, applicationName });
return new Import(scope, id, { applicationArn });
}

/**
* Import an existing application defined outside of CDK code by
* applicationArn.
*/
public static fromApplicationArn(scope: Construct, id: string, applicationArn: string): IApplication {
const applicationName = core.Stack.of(scope).splitArn(applicationArn, core.ArnFormat.SLASH_RESOURCE_NAME).resourceName;
if (!applicationName) {
throw new Error(`applicationArn for fromApplicationArn (${applicationArn}) must include resource name`);
}
return new Import(scope, id, { applicationArn });
}

return new Import(scope, id, { applicationArn, applicationName });
/**
* Import an existing application defined outside of CDK code.
*/
public static fromApplicationAttributes(scope: Construct, id: string, attrs: ApplicationAttributes): IApplication {
return new Import(scope, id, {
applicationArn: attrs.applicationArn,
securityGroups: attrs.securityGroups,
});
}

public readonly applicationArn: string;
Expand Down Expand Up @@ -919,6 +988,23 @@ export class Application extends ApplicationBase {
const code = props.code.bind(this);
code.bucket.grantRead(this);

let vpcConfigurations;
if (props.vpc) {
const securityGroups = props.securityGroups ?? [
new ec2.SecurityGroup(this, 'SecurityGroup', {
vpc: props.vpc,
}),
];
this._connections = new ec2.Connections({ securityGroups });
const subnetSelection = props.vpcSubnets ?? {
subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
};
vpcConfigurations = [{
securityGroupIds: securityGroups.map(sg => sg.securityGroupId),
subnetIds: props.vpc.selectSubnets(subnetSelection).subnetIds,
}];
}

const resource = new CfnApplicationV2(this, 'Resource', {
applicationName: props.applicationName,
runtimeEnvironment: props.runtime.value,
Expand All @@ -939,6 +1025,7 @@ export class Application extends ApplicationBase {
applicationSnapshotConfiguration: {
snapshotsEnabled: props.snapshotsEnabled ?? true,
},
vpcConfigurations,
},
});
resource.node.addDependency(this.role);
Expand Down Expand Up @@ -978,6 +1065,24 @@ export class Application extends ApplicationBase {
},
});

// Permissions required for VPC usage per:
// https://docs.aws.amazon.com/kinesisanalytics/latest/java/vpc-permissions.html
if (props.vpc) {
this.role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: [
'ec2:DescribeVpcs',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
'ec2:DescribeDhcpOptions',
'ec2:CreateNetworkInterface',
'ec2:CreateNetworkInterfacePermission',
'ec2:DescribeNetworkInterfaces',
'ec2:DeleteNetworkInterface',
],
resources: ['*'],
}));
}

this.applicationName = this.getResourceNameAttribute(resource.ref);
this.applicationArn = this.getResourceArnAttribute(
core.Stack.of(this).formatArn(applicationArnComponents(resource.ref)),
Expand Down
@@ -1,9 +1,13 @@
import * as ec2 from '@aws-cdk/aws-ec2';
import * as core from '@aws-cdk/core';

interface ValidatedProps {
applicationName?: string;
parallelism?: number;
parallelismPerKpu?: number;
vpc?: ec2.IVpc;
vpcSubnets?: ec2.SubnetSelection;
securityGroups?: ec2.ISecurityGroup[];
}

/**
Expand All @@ -13,6 +17,7 @@ export function validateFlinkApplicationProps(props: ValidatedProps) {
validateApplicationName(props.applicationName);
validateParallelism(props.parallelism);
validateParallelismPerKpu(props.parallelismPerKpu);
validateVpcProps(props);
}

function validateApplicationName(applicationName?: string) {
Expand Down Expand Up @@ -52,3 +57,15 @@ function validateParallelismPerKpu(parallelismPerKpu?: number) {
throw new Error('parallelismPerKpu must be at least 1');
}
}

function validateVpcProps({ vpc, securityGroups = [], vpcSubnets }: ValidatedProps) {
if (!vpc) {
if (vpcSubnets) {
throw new Error('vpc prop required when passing vpcSubnets');
}

if (securityGroups.length > 0) {
throw new Error('vpc prop required when passing securityGroups');
}
}
}
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-kinesisanalytics-flink/package.json
Expand Up @@ -76,6 +76,7 @@
"@aws-cdk/assertions": "0.0.0",
"@aws-cdk/cdk-build-tools": "0.0.0",
"@aws-cdk/integ-runner": "0.0.0",
"@aws-cdk/integ-tests": "0.0.0",
"@aws-cdk/pkglint": "0.0.0",
"@types/jest": "^27.5.2",
"jest": "^27.5.1",
Expand All @@ -84,6 +85,7 @@
"dependencies": {
"@aws-cdk/assets": "0.0.0",
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesisanalytics": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
Expand All @@ -98,6 +100,7 @@
"peerDependencies": {
"@aws-cdk/assets": "0.0.0",
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesisanalytics": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
Expand Down