Skip to content

Commit 881d3f8

Browse files
michael-radencyShireenMissi
andauthoredMar 13, 2025··
feat(Merge Node): Better pairedItem mapping in combineBySql operation if SELECT query (#13849)
Co-authored-by: Shireen Missi <94372015+ShireenMissi@users.noreply.github.com>
1 parent d2e4706 commit 881d3f8

File tree

6 files changed

+368
-24
lines changed

6 files changed

+368
-24
lines changed
 

‎packages/nodes-base/nodes/Merge/Merge.node.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ export class Merge extends VersionedNodeType {
1414
group: ['transform'],
1515
subtitle: '={{$parameter["mode"]}}',
1616
description: 'Merges data of multiple streams once data from both is available',
17-
defaultVersion: 3,
17+
defaultVersion: 3.1,
1818
};
1919

2020
const nodeVersions: IVersionedNodeType['nodeVersions'] = {
2121
1: new MergeV1(baseDescription),
2222
2: new MergeV2(baseDescription),
2323
2.1: new MergeV2(baseDescription),
2424
3: new MergeV3(baseDescription),
25+
3.1: new MergeV3(baseDescription),
2526
};
2627

2728
super(nodeVersions, baseDescription);

‎packages/nodes-base/nodes/Merge/test/v3/operations.test.ts

+136
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,142 @@ describe('Test MergeV3, combineBySql operation', () => {
259259
country: 'PL',
260260
});
261261
});
262+
263+
it('should collect pairedItems data, if version >= 3.1 and SELECT query', async () => {
264+
const nodeParameters: IDataObject = {
265+
operation: 'combineBySql',
266+
query: 'SELECT name FROM input1 LEFT JOIN input2 ON input1.id = input2.id',
267+
};
268+
269+
const inputs = [
270+
[
271+
{
272+
json: {
273+
id: 1,
274+
data: 'a',
275+
name: 'Sam',
276+
},
277+
pairedItem: {
278+
item: 0,
279+
input: undefined,
280+
},
281+
},
282+
{
283+
json: {
284+
id: 2,
285+
data: 'b',
286+
name: 'Dan',
287+
},
288+
pairedItem: {
289+
item: 1,
290+
input: undefined,
291+
},
292+
},
293+
{
294+
json: {
295+
id: 3,
296+
data: 'c',
297+
name: 'Jon',
298+
},
299+
pairedItem: {
300+
item: 2,
301+
input: undefined,
302+
},
303+
},
304+
],
305+
[
306+
{
307+
json: {
308+
id: 1,
309+
data: 'aa',
310+
country: 'PL',
311+
},
312+
pairedItem: {
313+
item: 0,
314+
input: 1,
315+
},
316+
},
317+
{
318+
json: {
319+
id: 2,
320+
data: 'bb',
321+
country: 'FR',
322+
},
323+
pairedItem: {
324+
item: 1,
325+
input: 1,
326+
},
327+
},
328+
{
329+
json: {
330+
id: 3,
331+
data: 'cc',
332+
country: 'UA',
333+
},
334+
pairedItem: {
335+
item: 2,
336+
input: 1,
337+
},
338+
},
339+
],
340+
];
341+
342+
const returnData = await mode.combineBySql.execute.call(
343+
createMockExecuteFunction(nodeParameters, { ...node, typeVersion: 3.1 }),
344+
inputs,
345+
);
346+
347+
expect(returnData.length).toEqual(1);
348+
expect(returnData).toEqual([
349+
[
350+
{
351+
json: {
352+
name: 'Sam',
353+
},
354+
pairedItem: [
355+
{
356+
item: 0,
357+
input: undefined,
358+
},
359+
{
360+
item: 0,
361+
input: 1,
362+
},
363+
],
364+
},
365+
{
366+
json: {
367+
name: 'Dan',
368+
},
369+
pairedItem: [
370+
{
371+
item: 1,
372+
input: undefined,
373+
},
374+
{
375+
item: 1,
376+
input: 1,
377+
},
378+
],
379+
},
380+
{
381+
json: {
382+
name: 'Jon',
383+
},
384+
pairedItem: [
385+
{
386+
item: 2,
387+
input: undefined,
388+
},
389+
{
390+
item: 2,
391+
input: 1,
392+
},
393+
],
394+
},
395+
],
396+
]);
397+
});
262398
});
263399

264400
describe('Test MergeV3, append operation', () => {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import type { IDataObject, INodeExecutionData, IPairedItemData } from 'n8n-workflow';
2+
3+
import { modifySelectQuery, rowToExecutionData } from '../../v3/helpers/utils';
4+
5+
describe('rowToExecutionData', () => {
6+
test('should return empty json and pairedItem when input is empty', () => {
7+
const input: IDataObject = {};
8+
const result = rowToExecutionData(input);
9+
expect(result).toEqual({ json: {}, pairedItem: [] });
10+
});
11+
12+
test('should separate json properties and pairedItem properties', () => {
13+
const input: IDataObject = {
14+
key1: 'value1',
15+
key2: 42,
16+
pairedItem1: { item: 0, input: undefined } as IPairedItemData,
17+
pairedItem2: { item: 0, input: 1 } as IPairedItemData,
18+
};
19+
20+
const expectedOutput: INodeExecutionData = {
21+
json: { key1: 'value1', key2: 42 },
22+
pairedItem: [
23+
{ item: 0, input: undefined },
24+
{ item: 0, input: 1 },
25+
],
26+
};
27+
28+
expect(rowToExecutionData(input)).toEqual(expectedOutput);
29+
});
30+
31+
test('should ignore undefined pairedItem values', () => {
32+
const input: IDataObject = {
33+
key: 'value',
34+
pairedItem1: { item: 0, input: undefined } as IPairedItemData,
35+
pairedItem2: undefined,
36+
};
37+
38+
const expectedOutput: INodeExecutionData = {
39+
json: { key: 'value' },
40+
pairedItem: [{ item: 0, input: undefined }],
41+
};
42+
43+
expect(rowToExecutionData(input)).toEqual(expectedOutput);
44+
});
45+
46+
test('should handle only json properties without pairedItem', () => {
47+
const input: IDataObject = {
48+
name: 'Alice',
49+
age: 30,
50+
};
51+
52+
const expectedOutput: INodeExecutionData = {
53+
json: { name: 'Alice', age: 30 },
54+
pairedItem: [],
55+
};
56+
57+
expect(rowToExecutionData(input)).toEqual(expectedOutput);
58+
});
59+
});
60+
61+
describe('modifySelectQuery', () => {
62+
test('should return the original query if no SELECT match is found', () => {
63+
const query = 'UPDATE table SET column = 1';
64+
expect(modifySelectQuery(query, 2)).toBe(query);
65+
});
66+
67+
test('should return the original query if SELECT * is used', () => {
68+
const query = 'SELECT * FROM input1';
69+
expect(modifySelectQuery(query, 2)).toBe(query);
70+
});
71+
72+
test('should append pairedItem columns when input tables exist', () => {
73+
const query = 'SELECT column1, column2 FROM input1 WHERE input1.id = table.id';
74+
const modifiedQuery = modifySelectQuery(query, 2);
75+
expect(modifiedQuery).toBe(
76+
'SELECT column1, column2, input1.pairedItem AS pairedItem1 FROM input1 WHERE input1.id = table.id',
77+
);
78+
});
79+
80+
test('should handle multiple input tables correctly', () => {
81+
const query = 'SELECT column1 FROM input1 LEFT JOIN input2 ON input1.name = input2.name';
82+
const modifiedQuery = modifySelectQuery(query, 2);
83+
expect(modifiedQuery).toBe(
84+
'SELECT column1, input1.pairedItem AS pairedItem1, input2.pairedItem AS pairedItem2 FROM input1 LEFT JOIN input2 ON input1.name = input2.name',
85+
);
86+
});
87+
88+
test('should not modify query if no input tables are found', () => {
89+
const query = 'SELECT column1 FROM table';
90+
expect(modifySelectQuery(query, 2)).toBe(query);
91+
});
92+
});

‎packages/nodes-base/nodes/Merge/v3/actions/mode/combineBySql.ts

+97-22
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import { Container } from '@n8n/di';
12
import alasql from 'alasql';
23
import type { Database } from 'alasql';
4+
import { ErrorReporter } from 'n8n-core';
35
import type {
46
IDataObject,
57
IExecuteFunctions,
8+
INode,
69
INodeExecutionData,
710
INodeProperties,
811
IPairedItemData,
@@ -12,6 +15,7 @@ import { NodeOperationError } from 'n8n-workflow';
1215
import { getResolvables, updateDisplayOptions } from '@utils/utilities';
1316

1417
import { numberInputsProperty } from '../../helpers/descriptions';
18+
import { modifySelectQuery, rowToExecutionData } from '../../helpers/utils';
1519

1620
export const properties: INodeProperties[] = [
1721
numberInputsProperty,
@@ -39,15 +43,102 @@ const displayOptions = {
3943

4044
export const description = updateDisplayOptions(displayOptions, properties);
4145

46+
const prepareError = (node: INode, error: Error) => {
47+
let message = '';
48+
if (typeof error === 'string') {
49+
message = error;
50+
} else {
51+
message = error.message;
52+
}
53+
throw new NodeOperationError(node, error, {
54+
message: 'Issue while executing query',
55+
description: message,
56+
itemIndex: 0,
57+
});
58+
};
59+
60+
async function executeSelectWithMappedPairedItems(
61+
node: INode,
62+
inputsData: INodeExecutionData[][],
63+
query: string,
64+
): Promise<INodeExecutionData[][]> {
65+
const returnData: INodeExecutionData[] = [];
66+
67+
const db: typeof Database = new (alasql as any).Database(node.id);
68+
69+
try {
70+
for (let i = 0; i < inputsData.length; i++) {
71+
const inputData = inputsData[i];
72+
73+
db.exec(`CREATE TABLE input${i + 1}`);
74+
db.tables[`input${i + 1}`].data = inputData.map((entry) => ({
75+
...entry.json,
76+
pairedItem: entry.pairedItem,
77+
}));
78+
}
79+
} catch (error) {
80+
throw new NodeOperationError(node, error, {
81+
message: 'Issue while creating table from',
82+
description: error.message,
83+
itemIndex: 0,
84+
});
85+
}
86+
87+
try {
88+
const result: IDataObject[] = db.exec(modifySelectQuery(query, inputsData.length));
89+
90+
for (const item of result) {
91+
if (Array.isArray(item)) {
92+
returnData.push(...item.map((entry) => rowToExecutionData(entry)));
93+
} else if (typeof item === 'object') {
94+
returnData.push(rowToExecutionData(item));
95+
}
96+
}
97+
98+
if (!returnData.length) {
99+
returnData.push({ json: { success: true } });
100+
}
101+
} catch (error) {
102+
prepareError(node, error as Error);
103+
} finally {
104+
delete alasql.databases[node.id];
105+
}
106+
107+
return [returnData];
108+
}
109+
42110
export async function execute(
43111
this: IExecuteFunctions,
44112
inputsData: INodeExecutionData[][],
45113
): Promise<INodeExecutionData[][]> {
46-
const nodeId = this.getNode().id;
114+
const node = this.getNode();
47115
const returnData: INodeExecutionData[] = [];
48116
const pairedItem: IPairedItemData[] = [];
49117

50-
const db: typeof Database = new (alasql as any).Database(nodeId);
118+
let query = this.getNodeParameter('query', 0) as string;
119+
120+
for (const resolvable of getResolvables(query)) {
121+
query = query.replace(resolvable, this.evaluateExpression(resolvable, 0) as string);
122+
}
123+
124+
const isSelectQuery = node.typeVersion >= 3.1 ? query.toLowerCase().startsWith('select') : false;
125+
126+
if (isSelectQuery) {
127+
try {
128+
return await executeSelectWithMappedPairedItems(node, inputsData, query);
129+
} catch (error) {
130+
Container.get(ErrorReporter).error(error, {
131+
extra: {
132+
nodeName: node.name,
133+
nodeType: node.type,
134+
nodeVersion: node.typeVersion,
135+
workflowId: this.getWorkflow().id,
136+
},
137+
});
138+
}
139+
}
140+
141+
const db: typeof Database = new (alasql as any).Database(node.id);
51142

52143
try {
53144
for (let i = 0; i < inputsData.length; i++) {
@@ -90,20 +181,14 @@ export async function execute(
90181
db.tables[`input${i + 1}`].data = inputData.map((entry) => entry.json);
91182
}
92183
} catch (error) {
93-
throw new NodeOperationError(this.getNode(), error, {
184+
throw new NodeOperationError(node, error, {
94185
message: 'Issue while creating table from',
95186
description: error.message,
96187
itemIndex: 0,
97188
});
98189
}
99190

100191
try {
101-
let query = this.getNodeParameter('query', 0) as string;
102-
103-
for (const resolvable of getResolvables(query)) {
104-
query = query.replace(resolvable, this.evaluateExpression(resolvable, 0) as string);
105-
}
106-
107192
const result: IDataObject[] = db.exec(query);
108193

109194
for (const item of result) {
@@ -118,20 +203,10 @@ export async function execute(
118203
returnData.push({ json: { success: true }, pairedItem });
119204
}
120205
} catch (error) {
121-
let message = '';
122-
if (typeof error === 'string') {
123-
message = error;
124-
} else {
125-
message = error.message;
126-
}
127-
throw new NodeOperationError(this.getNode(), error, {
128-
message: 'Issue while executing query',
129-
description: message,
130-
itemIndex: 0,
131-
});
206+
prepareError(node, error as Error);
207+
} finally {
208+
delete alasql.databases[node.id];
132209
}
133210

134-
delete alasql.databases[nodeId];
135-
136211
return [returnData];
137212
}

‎packages/nodes-base/nodes/Merge/v3/actions/versionDescription.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export const versionDescription: INodeTypeDescription = {
99
name: 'merge',
1010
group: ['transform'],
1111
description: 'Merges data of multiple streams once data from both is available',
12-
version: [3],
12+
version: [3, 3.1],
1313
defaults: {
1414
name: 'Merge',
1515
},

‎packages/nodes-base/nodes/Merge/v3/helpers/utils.ts

+40
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,43 @@ export function getNodeInputsData(this: IExecuteFunctions) {
386386

387387
return returnData;
388388
}
389+
390+
export const rowToExecutionData = (data: IDataObject): INodeExecutionData => {
391+
const keys = Object.keys(data);
392+
const pairedItem: IPairedItemData[] = [];
393+
const json: IDataObject = {};
394+
395+
for (const key of keys) {
396+
if (key.startsWith('pairedItem')) {
397+
if (data[key] === undefined) continue;
398+
pairedItem.push(data[key] as IPairedItemData);
399+
} else {
400+
json[key] = data[key];
401+
}
402+
}
403+
404+
return { json, pairedItem };
405+
};
406+
407+
export function modifySelectQuery(userQuery: string, inputLength: number): string {
408+
const selectMatch = userQuery.match(/SELECT\s+(.+?)\s+FROM/i);
409+
if (!selectMatch) return userQuery;
410+
411+
let selectedColumns = selectMatch[1].trim();
412+
413+
if (selectedColumns === '*') {
414+
return userQuery;
415+
}
416+
417+
const pairedItemColumns = [];
418+
419+
for (let i = 1; i <= inputLength; i++) {
420+
if (userQuery.includes(`input${i}`)) {
421+
pairedItemColumns.push(`input${i}.pairedItem AS pairedItem${i}`);
422+
}
423+
}
424+
425+
selectedColumns += pairedItemColumns.length ? ', ' + pairedItemColumns.join(', ') : '';
426+
427+
return userQuery.replace(selectMatch[0], `SELECT ${selectedColumns} FROM`);
428+
}

0 commit comments

Comments
 (0)
Please sign in to comment.