Skip to content

Commit

Permalink
4.1.4 release cherry-picks (#1994)
Browse files Browse the repository at this point in the history
  • Loading branch information
chayim committed Feb 16, 2022
1 parent 6c00e09 commit 0ed0660
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 45 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ on:
- '**/*.md'
branches:
- master
- '[0-9].[0-9]'
pull_request:
branches:
- master
- '[0-9].[0-9]'

jobs:

Expand Down
52 changes: 31 additions & 21 deletions redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from redis.exceptions import ResponseError

from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
from .query_result import QueryResult


Expand Down Expand Up @@ -118,27 +119,6 @@ def flush(self):
self.nodes = {}
self.edges = []

def explain(self, query, params=None):
"""
Get the execution plan for given query,
Returns an array of operations.
For more information see `GRAPH.EXPLAIN <https://oss.redis.com/redisgraph/master/commands/#graphexplain>`_. # noqa
Args:
query:
The query that will be executed.
params: dict
Query parameters.
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)

def bulk(self, **kwargs):
"""Internal only. Not supported."""
raise NotImplementedError(
Expand Down Expand Up @@ -200,3 +180,33 @@ def list_keys(self):
For more information see `GRAPH.LIST <https://oss.redis.com/redisgraph/master/commands/#graphlist>`_. # noqa
"""
return self.execute_command("GRAPH.LIST")

def execution_plan(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns an array of operations.
Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return "\n".join(plan)

def explain(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns ExecutionPlan object.
Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return ExecutionPlan(plan)
208 changes: 208 additions & 0 deletions redis/commands/graph/execution_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import re


class ProfileStats:
"""
ProfileStats, runtime execution statistics of operation.
"""

def __init__(self, records_produced, execution_time):
self.records_produced = records_produced
self.execution_time = execution_time


class Operation:
"""
Operation, single operation within execution plan.
"""

def __init__(self, name, args=None, profile_stats=None):
"""
Create a new operation.
Args:
name: string that represents the name of the operation
args: operation arguments
profile_stats: profile statistics
"""
self.name = name
self.args = args
self.profile_stats = profile_stats
self.children = []

def append_child(self, child):
if not isinstance(child, Operation) or self is child:
raise Exception("child must be Operation")

self.children.append(child)
return self

def child_count(self):
return len(self.children)

def __eq__(self, o: object) -> bool:
if not isinstance(o, Operation):
return False

return self.name == o.name and self.args == o.args

def __str__(self) -> str:
args_str = "" if self.args is None else " | " + self.args
return f"{self.name}{args_str}"


class ExecutionPlan:
"""
ExecutionPlan, collection of operations.
"""

def __init__(self, plan):
"""
Create a new execution plan.
Args:
plan: array of strings that represents the collection operations
the output from GRAPH.EXPLAIN
"""
if not isinstance(plan, list):
raise Exception("plan must be an array")

self.plan = plan
self.structured_plan = self._operation_tree()

def _compare_operations(self, root_a, root_b):
"""
Compare execution plan operation tree
Return: True if operation trees are equal, False otherwise
"""

# compare current root
if root_a != root_b:
return False

# make sure root have the same number of children
if root_a.child_count() != root_b.child_count():
return False

# recursively compare children
for i in range(root_a.child_count()):
if not self._compare_operations(root_a.children[i], root_b.children[i]):
return False

return True

def __str__(self) -> str:
def aggraget_str(str_children):
return "\n".join(
[
" " + line
for str_child in str_children
for line in str_child.splitlines()
]
)

def combine_str(x, y):
return f"{x}\n{y}"

return self._operation_traverse(
self.structured_plan, str, aggraget_str, combine_str
)

def __eq__(self, o: object) -> bool:
"""Compares two execution plans
Return: True if the two plans are equal False otherwise
"""
# make sure 'o' is an execution-plan
if not isinstance(o, ExecutionPlan):
return False

# get root for both plans
root_a = self.structured_plan
root_b = o.structured_plan

# compare execution trees
return self._compare_operations(root_a, root_b)

def _operation_traverse(self, op, op_f, aggregate_f, combine_f):
"""
Traverse operation tree recursively applying functions
Args:
op: operation to traverse
op_f: function applied for each operation
aggregate_f: aggregation function applied for all children of a single operation
combine_f: combine function applied for the operation result and the children result
""" # noqa
# apply op_f for each operation
op_res = op_f(op)
if len(op.children) == 0:
return op_res # no children return
else:
# apply _operation_traverse recursively
children = [
self._operation_traverse(child, op_f, aggregate_f, combine_f)
for child in op.children
]
# combine the operation result with the children aggregated result
return combine_f(op_res, aggregate_f(children))

def _operation_tree(self):
"""Build the operation tree from the string representation"""

# initial state
i = 0
level = 0
stack = []
current = None

def _create_operation(args):
profile_stats = None
name = args[0].strip()
args.pop(0)
if len(args) > 0 and "Records produced" in args[-1]:
records_produced = int(
re.search("Records produced: (\\d+)", args[-1]).group(1)
)
execution_time = float(
re.search("Execution time: (\\d+.\\d+) ms", args[-1]).group(1)
)
profile_stats = ProfileStats(records_produced, execution_time)
args.pop(-1)
return Operation(
name, None if len(args) == 0 else args[0].strip(), profile_stats
)

# iterate plan operations
while i < len(self.plan):
current_op = self.plan[i]
op_level = current_op.count(" ")
if op_level == level:
# if the operation level equal to the current level
# set the current operation and move next
child = _create_operation(current_op.split("|"))
if current:
current = stack.pop()
current.append_child(child)
current = child
i += 1
elif op_level == level + 1:
# if the operation is child of the current operation
# add it as child and set as current operation
child = _create_operation(current_op.split("|"))
current.append_child(child)
stack.append(current)
current = child
level += 1
i += 1
elif op_level < level:
# if the operation is not child of current operation
# go back to it's parent operation
levels_back = level - op_level + 1
for _ in range(levels_back):
current = stack.pop()
level -= levels_back
else:
raise Exception("corrupted plan")
return stack[0]
42 changes: 35 additions & 7 deletions redis/commands/search/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import time
from typing import Dict, Union

from ..helpers import parse_to_dict
from ._util import to_string
Expand Down Expand Up @@ -377,7 +378,17 @@ def info(self):
it = map(to_string, res)
return dict(zip(it, it))

def _mk_query_args(self, query):
def get_params_args(self, query_params: Dict[str, Union[str, int, float]]):
args = []
if len(query_params) > 0:
args.append("params")
args.append(len(query_params) * 2)
for key, value in query_params.items():
args.append(key)
args.append(value)
return args

def _mk_query_args(self, query, query_params: Dict[str, Union[str, int, float]]):
args = [self.index_name]

if isinstance(query, str):
Expand All @@ -387,9 +398,16 @@ def _mk_query_args(self, query):
raise ValueError(f"Bad query type {type(query)}")

args += query.get_args()
if query_params is not None:
args += self.get_params_args(query_params)

return args, query

def search(self, query):
def search(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Search the index for a given query, and return a result of documents
Expand All @@ -401,7 +419,7 @@ def search(self, query):
For more information: https://oss.redis.com/redisearch/Commands/#ftsearch
""" # noqa
args, query = self._mk_query_args(query)
args, query = self._mk_query_args(query, query_params=query_params)
st = time.time()
res = self.execute_command(SEARCH_CMD, *args)

Expand All @@ -413,18 +431,26 @@ def search(self, query):
with_scores=query._with_scores,
)

def explain(self, query):
def explain(
self,
query=Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""Returns the execution plan for a complex query.
For more information: https://oss.redis.com/redisearch/Commands/#ftexplain
""" # noqa
args, query_text = self._mk_query_args(query)
args, query_text = self._mk_query_args(query, query_params=query_params)
return self.execute_command(EXPLAIN_CMD, *args)

def explain_cli(self, query): # noqa
def explain_cli(self, query: Union[str, Query]): # noqa
raise NotImplementedError("EXPLAINCLI will not be implemented.")

def aggregate(self, query):
def aggregate(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Issue an aggregation query.
Expand All @@ -445,6 +471,8 @@ def aggregate(self, query):
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
if query_params is not None:
cmd += self.get_params_args(query_params)

raw = self.execute_command(*cmd)
return self._get_AggregateResult(raw, query, has_cursor)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
long_description_content_type="text/markdown",
keywords=["Redis", "key-value store", "database"],
license="MIT",
version="4.1.3",
version="4.1.4",
packages=find_packages(
include=[
"redis",
Expand Down

0 comments on commit 0ed0660

Please sign in to comment.