forked from jeffbski/wait-on
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wait-on.js
400 lines (367 loc) · 14 KB
/
wait-on.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
'use strict';
const fs = require('fs');
const { promisify } = require('util');
const Joi = require('joi');
const https = require('https');
const net = require('net');
const util = require('util');
const axiosPkg = require('axios').default;
const { isBoolean, isEmpty, negate, noop, once, partial, pick, zip } = require('lodash/fp');
const { NEVER, combineLatest, from, merge, throwError, timer } = require('rxjs');
const { distinctUntilChanged, map, mergeMap, scan, startWith, take, takeWhile } = require('rxjs/operators');
// force http adapter for axios, otherwise if using jest/jsdom xhr might
// be used and it logs all errors polluting the logs
const axios = axiosPkg.create({ adapter: 'http' });
const isNotABoolean = negate(isBoolean);
const isNotEmpty = negate(isEmpty);
const fstat = promisify(fs.stat);
const PREFIX_RE = /^((https?-get|https?|tcp|socket|file):)(.+)$/;
const HOST_PORT_RE = /^(([^:]*):)?(\d+)$/;
const HTTP_GET_RE = /^https?-get:/;
const HTTP_UNIX_RE = /^http:\/\/unix:([^:]+):([^:]+)$/;
const TIMEOUT_ERR_MSG = 'Timed out waiting for';
const WAIT_ON_SCHEMA = Joi.object({
resources: Joi.array().items(Joi.string().required()).required(),
delay: Joi.number().integer().min(0).default(0),
httpTimeout: Joi.number().integer().min(0),
interval: Joi.number().integer().min(0).default(250),
log: Joi.boolean().default(false),
reverse: Joi.boolean().default(false),
simultaneous: Joi.number().integer().min(1).default(Infinity),
timeout: Joi.number().integer().min(0).default(Infinity),
validateStatus: Joi.function(),
verbose: Joi.boolean().default(false),
window: Joi.number().integer().min(0).default(750),
tcpTimeout: Joi.number().integer().min(0).default(300),
// http/https options
ca: [Joi.string(), Joi.binary()],
cert: [Joi.string(), Joi.binary()],
key: [Joi.string(), Joi.binary(), Joi.object()],
passphrase: Joi.string(),
proxy: [Joi.boolean(), Joi.object()],
auth: Joi.object({
username: Joi.string(),
password: Joi.string()
}),
strictSSL: Joi.boolean().default(false),
followRedirect: Joi.boolean().default(true), // HTTP 3XX responses
headers: Joi.object()
});
/**
Waits for resources to become available before calling callback
Polls file, http(s), tcp ports, sockets for availability.
Resource types are distinquished by their prefix with default being `file:`
- file:/path/to/file - waits for file to be available and size to stabilize
- http://foo.com:8000/bar verifies HTTP HEAD request returns 2XX
- https://my.bar.com/cat verifies HTTPS HEAD request returns 2XX
- http-get: - HTTP GET returns 2XX response. ex: http://m.com:90/foo
- https-get: - HTTPS GET returns 2XX response. ex: https://my/bar
- tcp:my.server.com:3000 verifies a service is listening on port
- socket:/path/sock verifies a service is listening on (UDS) socket
For http over socket, use http://unix:SOCK_PATH:URL_PATH
like http://unix:/path/to/sock:/foo/bar or
http-get://unix:/path/to/sock:/foo/bar
@param opts object configuring waitOn
@param opts.resources array of string resources to wait for. prefix determines the type of resource with the default type of `file:`
@param opts.delay integer - optional initial delay in ms, default 0
@param opts.httpTimeout integer - optional http HEAD/GET timeout to wait for request, default 0
@param opts.interval integer - optional poll resource interval in ms, default 250ms
@param opts.log boolean - optional flag to turn on logging to stdout
@param opts.reverse boolean - optional flag which reverses the mode, succeeds when resources are not available
@param opts.simultaneous integer - optional limit of concurrent connections to a resource, default Infinity
@param opts.tcpTimeout - Maximum time in ms for tcp connect, default 300ms
@param opts.timeout integer - optional timeout in ms, default Infinity. Aborts with error.
@param opts.verbose boolean - optional flag to turn on debug log
@param opts.window integer - optional stabilization time in ms, default 750ms. Waits this amount of time for file sizes to stabilize or other resource availability to remain unchanged. If less than interval then will be reset to interval
@param cb optional callback function with signature cb(err) - if err is provided then, resource checks did not succeed
if not specified, wait-on will return a promise that will be rejected if resource checks did not succeed or resolved otherwise
*/
function waitOn(opts, cb) {
if (cb !== undefined) {
return waitOnImpl(opts, cb);
} else {
// promise API
return new Promise(function (resolve, reject) {
waitOnImpl(opts, function (err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
}
function waitOnImpl(opts, cbFunc) {
const cbOnce = once(cbFunc);
const validResult = WAIT_ON_SCHEMA.validate(opts);
if (validResult.error) {
return cbOnce(validResult.error);
}
const validatedOpts = {
...validResult.value, // use defaults
// window needs to be at least interval
...(validResult.value.window < validResult.value.interval ? { window: validResult.value.interval } : {}),
...(validResult.value.verbose ? { log: true } : {}) // if debug logging then normal log is also enabled
};
const { resources, log: shouldLog, timeout, verbose, reverse } = validatedOpts;
const output = verbose ? console.log.bind() : noop;
const log = shouldLog ? console.log.bind() : noop;
const logWaitingForWDeps = partial(logWaitingFor, [{ log, resources }]);
const createResourceWithDeps$ = partial(createResource$, [{ validatedOpts, output, log }]);
let lastResourcesState = resources; // the last state we had recorded
const timeoutError$ =
timeout !== Infinity
? timer(timeout).pipe(
mergeMap(() => {
const resourcesWaitingFor = determineRemainingResources(resources, lastResourcesState).join(', ');
return throwError(Error(`${TIMEOUT_ERR_MSG}: ${resourcesWaitingFor}`));
})
)
: NEVER;
function cleanup(err) {
if (err) {
if (err.message.startsWith(TIMEOUT_ERR_MSG)) {
log('wait-on(%s) %s; exiting with error', process.pid, err.message);
} else {
log('wait-on(%s) exiting with error', process.pid, err);
}
} else {
// no error, we are complete
log('wait-on(%s) complete', process.pid);
}
cbOnce(err);
}
if (reverse) {
log('wait-on reverse mode - waiting for resources to be unavailable');
}
logWaitingForWDeps(resources);
const resourcesCompleted$ = combineLatest(resources.map(createResourceWithDeps$));
merge(timeoutError$, resourcesCompleted$)
.pipe(takeWhile((resourceStates) => resourceStates.some((x) => !x)))
.subscribe({
next: (resourceStates) => {
lastResourcesState = resourceStates;
logWaitingForWDeps(resourceStates);
},
error: cleanup,
complete: cleanup
});
}
function logWaitingFor({ log, resources }, resourceStates) {
const remainingResources = determineRemainingResources(resources, resourceStates);
if (isNotEmpty(remainingResources)) {
log(`waiting for ${remainingResources.length} resources: ${remainingResources.join(', ')}`);
}
}
function determineRemainingResources(resources, resourceStates) {
// resourcesState is array of completed booleans
const resourceAndStateTuples = zip(resources, resourceStates);
return resourceAndStateTuples.filter(([, /* r */ s]) => !s).map(([r /*, s */]) => r);
}
function createResource$(deps, resource) {
const prefix = extractPrefix(resource);
switch (prefix) {
case 'https-get:':
case 'http-get:':
case 'https:':
case 'http:':
return createHTTP$(deps, resource);
case 'tcp:':
return createTCP$(deps, resource);
case 'socket:':
return createSocket$(deps, resource);
default:
return createFileResource$(deps, resource);
}
}
function createFileResource$(
{ validatedOpts: { delay, interval, reverse, simultaneous, window: stabilityWindow }, output },
resource
) {
const filePath = extractPath(resource);
const checkOperator = reverse
? map((size) => size === -1) // check that file does not exist
: scan(
// check that file exists and the size is stable
(acc, x) => {
if (x > -1) {
const { size, t } = acc;
const now = Date.now();
if (size !== -1 && x === size) {
if (now >= t + stabilityWindow) {
// file size has stabilized
output(` file stabilized at size:${size} file:${filePath}`);
return true;
}
output(` file exists, checking for size change during stability window, size:${size} file:${filePath}`);
return acc; // return acc unchanged, just waiting to pass stability window
}
output(` file exists, checking for size changes, size:${x} file:${filePath}`);
return { size: x, t: now }; // update acc with new value and timestamp
}
return acc;
},
{ size: -1, t: Date.now() }
);
return timer(delay, interval).pipe(
mergeMap(() => {
output(`checking file stat for file:${filePath} ...`);
return from(getFileSize(filePath));
}, simultaneous),
checkOperator,
map((x) => (isNotABoolean(x) ? false : x)),
startWith(false),
distinctUntilChanged(),
take(2)
);
}
function extractPath(resource) {
const m = PREFIX_RE.exec(resource);
if (m) {
return m[3];
}
return resource;
}
function extractPrefix(resource) {
const m = PREFIX_RE.exec(resource);
if (m) {
return m[1];
}
return '';
}
async function getFileSize(filePath) {
try {
const { size } = await fstat(filePath);
return size;
} catch (err) {
return -1;
}
}
function createHTTP$({ validatedOpts, output }, resource) {
const {
delay,
followRedirect,
httpTimeout: timeout,
interval,
proxy,
reverse,
simultaneous,
strictSSL: rejectUnauthorized
} = validatedOpts;
const method = HTTP_GET_RE.test(resource) ? 'get' : 'head';
const url = resource.replace('-get:', ':');
const matchHttpUnixSocket = HTTP_UNIX_RE.exec(url); // http://unix:/sock:/url
const urlSocketOptions = matchHttpUnixSocket
? { socketPath: matchHttpUnixSocket[1], url: matchHttpUnixSocket[2] }
: { url };
const socketPathDesc = urlSocketOptions.socketPath ? `socketPath:${urlSocketOptions.socketPath}` : '';
const httpOptions = {
...pick(['auth', 'headers', 'validateStatus'], validatedOpts),
httpsAgent: new https.Agent({
rejectUnauthorized,
...pick(['ca', 'cert', 'key', 'passphrase'], validatedOpts)
}),
...(followRedirect ? {} : { maxRedirects: 0 }), // defaults to 5 (enabled)
proxy, // can be undefined, false, or object
...(timeout && { timeout }),
...urlSocketOptions,
method
// by default it provides full response object
// validStatus is 2xx unless followRedirect is true (default)
};
const checkFn = reverse ? negateAsync(httpCallSucceeds) : httpCallSucceeds;
return timer(delay, interval).pipe(
mergeMap(() => {
output(`making HTTP(S) ${method} request to ${socketPathDesc} url:${urlSocketOptions.url} ...`);
return from(checkFn(output, httpOptions));
}, simultaneous),
startWith(false),
distinctUntilChanged(),
take(2)
);
}
async function httpCallSucceeds(output, httpOptions) {
try {
const result = await axios(httpOptions);
output(
` HTTP(S) result for ${httpOptions.url}: ${util.inspect(
pick(['status', 'statusText', 'headers', 'data'], result)
)}`
);
return true;
} catch (err) {
output(` HTTP(S) error for ${httpOptions.url} ${err.toString()}`);
return false;
}
}
function createTCP$({ validatedOpts: { delay, interval, tcpTimeout, reverse, simultaneous }, output }, resource) {
const tcpPath = extractPath(resource);
const checkFn = reverse ? negateAsync(tcpExists) : tcpExists;
return timer(delay, interval).pipe(
mergeMap(() => {
output(`making TCP connection to ${tcpPath} ...`);
return from(checkFn(output, tcpPath, tcpTimeout));
}, simultaneous),
startWith(false),
distinctUntilChanged(),
take(2)
);
}
async function tcpExists(output, tcpPath, tcpTimeout) {
const [, , /* full, hostWithColon */ hostMatched, port] = HOST_PORT_RE.exec(tcpPath);
const host = hostMatched || 'localhost';
return new Promise((resolve) => {
const conn = net
.connect(port, host)
.on('error', (err) => {
output(` error connecting to TCP host:${host} port:${port} ${err.toString()}`);
resolve(false);
})
.on('timeout', () => {
output(` timed out connecting to TCP host:${host} port:${port} tcpTimeout:${tcpTimeout}ms`);
conn.end();
resolve(false);
})
.on('connect', () => {
output(` TCP connection successful to host:${host} port:${port}`);
conn.end();
resolve(true);
});
conn.setTimeout(tcpTimeout);
});
}
function createSocket$({ validatedOpts: { delay, interval, reverse, simultaneous }, output }, resource) {
const socketPath = extractPath(resource);
const checkFn = reverse ? negateAsync(socketExists) : socketExists;
return timer(delay, interval).pipe(
mergeMap(() => {
output(`making socket connection to ${socketPath} ...`);
return from(checkFn(output, socketPath));
}, simultaneous),
startWith(false),
distinctUntilChanged(),
take(2)
);
}
async function socketExists(output, socketPath) {
return new Promise((resolve) => {
const conn = net
.connect(socketPath)
.on('error', (err) => {
output(` error connecting to socket socket:${socketPath} ${err.toString()}`);
resolve(false);
})
.on('connect', () => {
output(` connected to socket:${socketPath}`);
conn.end();
resolve(true);
});
});
}
function negateAsync(asyncFn) {
return async function (...args) {
return !(await asyncFn(...args));
};
}
module.exports = waitOn;