Skip to content

Commit

Permalink
feat(microservice): add tcp over tls support
Browse files Browse the repository at this point in the history
Signed-off-by: nomaxg <noahgolub2@gmail.com>
  • Loading branch information
nomaxg committed Dec 2, 2022
1 parent d3a025c commit b9c235a
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 4 deletions.
142 changes: 142 additions & 0 deletions integration/microservices/e2e/sum-rpc-tls.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { AppController } from '../src/tcp-tls/app.controller';
import { ApplicationModule } from '../src/tcp-tls/app.module';
import * as fs from 'fs';
import * as path from 'path';

describe('RPC TLS transport', () => {
let server;
let app: INestApplication;
let key: string;
let cert: string;

before(() => {
// Generate a self-signed key pair
key = fs
.readFileSync(path.join(__dirname, '../src/tcp-tls/privkey.pem'), 'utf8')
.toString();
cert = fs
.readFileSync(path.join(__dirname, '../src/tcp-tls/ca.cert.pem'), 'utf8')
.toString();
});

beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [ApplicationModule],
}).compile();

app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();

app.connectMicroservice({
transport: Transport.TCP,
options: {
host: '0.0.0.0',
tlsOptions: { key: key, cert: cert },
},
});
await app.startAllMicroservices();
await app.init();
});

it(`/POST TLS`, () => {
return request(server)
.post('/?command=sum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});

it(`/POST (Promise/async)`, () => {
return request(server)
.post('/?command=asyncSum')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});

it(`/POST (Observable stream)`, () => {
return request(server)
.post('/?command=streamSum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});

it(`/POST (useFactory client)`, () => {
return request(server)
.post('/useFactory?command=sum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});

it(`/POST (useClass client)`, () => {
return request(server)
.post('/useClass?command=sum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});

it(`/POST (concurrent)`, () => {
return request(server)
.post('/concurrent')
.send([
Array.from({ length: 10 }, (v, k) => k + 1),
Array.from({ length: 10 }, (v, k) => k + 11),
Array.from({ length: 10 }, (v, k) => k + 21),
Array.from({ length: 10 }, (v, k) => k + 31),
Array.from({ length: 10 }, (v, k) => k + 41),
Array.from({ length: 10 }, (v, k) => k + 51),
Array.from({ length: 10 }, (v, k) => k + 61),
Array.from({ length: 10 }, (v, k) => k + 71),
Array.from({ length: 10 }, (v, k) => k + 81),
Array.from({ length: 10 }, (v, k) => k + 91),
])
.expect(200, 'true');
});

it(`/POST (streaming)`, () => {
return request(server)
.post('/stream')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});

it(`/POST (pattern not found)`, () => {
return request(server).post('/?command=test').expect(500);
});

it(`/POST (event notification)`, done => {
request(server)
.post('/notify')
.send([1, 2, 3, 4, 5])
.end(() => {
setTimeout(() => {
expect(AppController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});

it('/POST (custom client)', () => {
return request(server)
.post('/error?client=custom')
.send({})
.expect(200)
.expect('true');
});

it('/POST (standard client)', () => {
return request(server)
.post('/error?client=standard')
.send({})
.expect(200)
.expect('false');
});

afterEach(async () => {
await app.close();
});
});
141 changes: 141 additions & 0 deletions integration/microservices/src/tcp-tls/app.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import {
Body,
Controller,
HttpCode,
Inject,
Post,
Query,
} from '@nestjs/common';
import {
Client,
ClientProxy,
EventPattern,
MessagePattern,
RpcException,
Transport,
} from '@nestjs/microservices';
import { from, lastValueFrom, Observable, of, throwError } from 'rxjs';
import { catchError, scan } from 'rxjs/operators';
import * as fs from 'fs';
import * as path from 'path';

@Controller()
export class AppController {
constructor(
@Inject('USE_CLASS_CLIENT') private useClassClient: ClientProxy,
@Inject('USE_FACTORY_CLIENT') private useFactoryClient: ClientProxy,
@Inject('CUSTOM_PROXY_CLIENT') private customClient: ClientProxy,
) {}
static IS_NOTIFIED = false;

@Client({
transport: Transport.TCP,
options: {
tlsOptions: {
ca: [
fs
.readFileSync(path.join(__dirname, 'ca.cert.pem'), 'utf-8')
.toString(),
],
},
},
})
client: ClientProxy;

@Post()
@HttpCode(200)
call(@Query('command') cmd, @Body() data: number[]): Observable<number> {
return this.client.send<number>({ cmd }, data);
}

@Post('useFactory')
@HttpCode(200)
callWithClientUseFactory(
@Query('command') cmd,
@Body() data: number[],
): Observable<number> {
return this.useFactoryClient.send<number>({ cmd }, data);
}

@Post('useClass')
@HttpCode(200)
callWithClientUseClass(
@Query('command') cmd,
@Body() data: number[],
): Observable<number> {
return this.useClassClient.send<number>({ cmd }, data);
}

@Post('stream')
@HttpCode(200)
stream(@Body() data: number[]): Observable<number> {
return this.client
.send<number>({ cmd: 'streaming' }, data)
.pipe(scan((a, b) => a + b));
}

@Post('concurrent')
@HttpCode(200)
concurrent(@Body() data: number[][]): Promise<boolean> {
const send = async (tab: number[]) => {
const expected = tab.reduce((a, b) => a + b);
const result = await lastValueFrom(
this.client.send<number>({ cmd: 'sum' }, tab),
);

return result === expected;
};
return data
.map(async tab => send(tab))
.reduce(async (a, b) => (await a) && b);
}

@Post('error')
@HttpCode(200)
serializeError(
@Query('client') query: 'custom' | 'standard' = 'standard',
@Body() body: Record<string, any>,
): Observable<boolean> {
const client = query === 'custom' ? this.customClient : this.client;
return client.send({ cmd: 'err' }, {}).pipe(
catchError(err => {
return of(err instanceof RpcException);
}),
);
}

@MessagePattern({ cmd: 'sum' })
sum(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}

@MessagePattern({ cmd: 'asyncSum' })
async asyncSum(data: number[]): Promise<number> {
return (data || []).reduce((a, b) => a + b);
}

@MessagePattern({ cmd: 'streamSum' })
streamSum(data: number[]): Observable<number> {
return of((data || []).reduce((a, b) => a + b));
}

@MessagePattern({ cmd: 'streaming' })
streaming(data: number[]): Observable<number> {
return from(data);
}

@MessagePattern({ cmd: 'err' })
throwAnError() {
return throwError(() => new Error('err'));
}

@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit<number>('notification', true);
}

@EventPattern('notification')
eventHandler(data: boolean) {
AppController.IS_NOTIFIED = data;
}
}
90 changes: 90 additions & 0 deletions integration/microservices/src/tcp-tls/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { Module, Injectable } from '@nestjs/common';
import { AppController } from './app.controller';
import {
ClientsModule,
Transport,
ClientsModuleOptionsFactory,
ClientOptions,
ClientTCP,
RpcException,
} from '@nestjs/microservices';

import * as fs from 'fs';
import * as path from 'path';

const caCert = fs.readFileSync(path.join(__dirname, 'ca.cert.pem')).toString();

class ErrorHandlingProxy extends ClientTCP {
constructor() {
super({
tlsOptions: { ca: caCert },
});
}

serializeError(err) {
return new RpcException(err);
}
}

@Injectable()
class ConfigService {
private readonly config = {
transport: Transport.TCP,
};
get(key: string, defaultValue?: any) {
return this.config[key] || defaultValue;
}
}

@Module({
providers: [ConfigService],
exports: [ConfigService],
})
class ConfigModule {}

@Injectable()
class ClientOptionService implements ClientsModuleOptionsFactory {
constructor(private readonly configService: ConfigService) {}
createClientOptions(): Promise<ClientOptions> | ClientOptions {
return {
transport: this.configService.get('transport'),
options: {
tlsOptions: { ca: caCert },
},
};
}
}

@Module({
imports: [
ClientsModule.registerAsync([
{
imports: [ConfigModule],
name: 'USE_FACTORY_CLIENT',
useFactory: (configService: ConfigService) => ({
transport: configService.get('transport'),
options: {
tlsOptions: { ca: caCert },
},
}),
inject: [ConfigService],
},
{
imports: [ConfigModule],
name: 'USE_CLASS_CLIENT',
useClass: ClientOptionService,
inject: [ConfigService],
},
{
imports: [ConfigModule],
inject: [ConfigService],
name: 'CUSTOM_PROXY_CLIENT',
useFactory: (config: ConfigService) => ({
customClass: ErrorHandlingProxy,
}),
},
]),
],
controllers: [AppController],
})
export class ApplicationModule {}
17 changes: 17 additions & 0 deletions integration/microservices/src/tcp-tls/ca.cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICpDCCAYwCCQCyP27z3r0PFjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
b2NhbGhvc3QwHhcNMjIxMjAyMDQ0NTQ1WhcNMzIxMTI5MDQ0NTQ1WjAUMRIwEAYD
VQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDZ
1IdZZXqrwXql4AIOJnlfpoGKOKoIalnK7KaKHTsq1QOF8z2abFuNBVIIrO0etQ/0
PPAaFGkXl6HHBuA5PrFpsw3V1wSnNs1Cns9NhvypHI2V71lkwBJrEaSicNWL2AOE
QkQ9cZ4YsTGd0BrM8D5VvgXdrC7gOXfj7Hx3E4K+wFO/Gi4AUXl5CXxleSFcW4U+
jFulfq/DE8rBZXs29IsGeVkkgUoICjQ4Ey4zE6EY7f3SPKgU8gfgzYyGSd/ZZ/E7
6M2yakEUX448Nl4BeuNWroBHVm1pSiMo+Cm1g34pJScPrx1yw6qquziCc/2n1M6O
B4WGIZAmJDWnAOEjjrxFAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAABGByZZUjaq
ZygICSH2qUGHPPIyrfaCe0qM7de6kYfxxPYQQZb0HDynzv780Lq1002XeT02fNR+
5sBCVFuKvS8BNvTq6kHzO1FiWIk/E5fQcYNToYSeEcXgWFLhJMty7+R6sIc9y8PH
2YNehf78Jjm9ukM52sLc4+JWl7AEeqPrOHZdh/ve8M2gTfimFKTrW5cEAVPIOPhp
2t5BdDKt8ZxgrGC7iRxga+v80VUOHRGfrd3hf3NlDQZO8upVGY8DdJhPRDB72+R0
kzJ7eyQwlGXM20atiFxPk43h0f273MneIJG8NgGiVU0ND4XnZkAB3KSAu7pB+nEw
QRYMYDgo/8Q=
-----END CERTIFICATE-----

0 comments on commit b9c235a

Please sign in to comment.