Skip to content

Commit

Permalink
vdk-core: implement config option for logging execution result (#2831)
Browse files Browse the repository at this point in the history
## Why?

User reasearch indicates that the execution result is too verbose for
local runs. Users don't expect a lot of output for successful jobs and
expect error output for failing jobs.

## What?

Introduce the LOG_EXECUTION_RESULT config option that enables/disables
displaying the end result.

### Config Error Output

https://pastebin.com/raw/MvJX0u9m

### User Error Output

https://pastebin.com/raw/0MxZp6Zg

### Success Output

https://pastebin.com/raw/GTNBiEFG

## How was this tested?

Ran locally with successful and failing jobs

## What kind of change is this?

Feature/non-breaking

Signed-off-by: Dilyan Marinov <mdilyan@vmware.com>
  • Loading branch information
DeltaMichael committed Nov 15, 2023
1 parent 4e5862d commit c5c6a2d
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 43 deletions.
45 changes: 28 additions & 17 deletions projects/vdk-core/src/vdk/internal/builtin_plugins/run/cli_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from vdk.internal.builtin_plugins.config.job_config import JobConfig
from vdk.internal.builtin_plugins.run import job_input_error_classifier
from vdk.internal.builtin_plugins.run.data_job import DataJobFactory
from vdk.internal.builtin_plugins.run.execution_results import ExecutionResult
from vdk.internal.builtin_plugins.run.execution_tracking import (
ExecutionTrackingPlugin,
)
Expand Down Expand Up @@ -120,6 +121,26 @@ def __warn_on_python_version_disparity(
"""
)

def __log_exec_result(self, execution_result: ExecutionResult) -> None:
# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
steps = temp_exec_result.pop("steps_list")

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

else:
log.info(f"Data Job execution summary: {execution_result}")

def create_and_run_data_job(
self,
context: CoreContext,
Expand All @@ -141,24 +162,14 @@ def create_and_run_data_job(
execution_result = None
try:
execution_result = job.run(args)
# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
steps = temp_exec_result.pop("steps_list")

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

if context.configuration.get_value("LOG_EXECUTION_RESULT"):
self.__log_exec_result(execution_result)
else:
log.info(f"Data Job execution summary: {execution_result}")
if execution_result.is_success():
log.info("Job execution result: SUCCESS")
if execution_result.is_failed():
log.info("Job execution result: FAILED")

except BaseException as e:
log.error(
"\n".join(
Expand Down
22 changes: 14 additions & 8 deletions projects/vdk-core/tests/functional/run/test_run_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,17 @@ def test_user_error_handled(tmp_termination_msg_file):


def test_error_from_pandas_user_error(tmp_termination_msg_file):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
with mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
41 changes: 24 additions & 17 deletions projects/vdk-core/tests/functional/run/test_run_job_cancel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import unittest.mock
from unittest.mock import MagicMock
from unittest.mock import patch
Expand Down Expand Up @@ -27,23 +28,29 @@ def test_run(self, patched_writer: MagicMock):
proper message is logged confirming job was cancelled from cancel_job_execution() method.
:return:
"""
test_runner = CliEntryBasedTestRunner()
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
test_runner = CliEntryBasedTestRunner()

# clear recorded errors before invoking run since other tests might leave a dirty state
errors.clear_intermediate_errors()
# clear recorded errors before invoking run since other tests might leave a dirty state
errors.clear_intermediate_errors()

result: Result = test_runner.invoke(["run", util.job_path("cancel-job")])
result: Result = test_runner.invoke(["run", util.job_path("cancel-job")])

# Check the writer plugin wasn't called with user or platform errors.
# 1st param is overall blamee, should be None.
# 2nd parm is overall user error in this case it's empty string which is a falsy value.
# 3rd param is context.configuration and can be ignored for this test.
patched_writer.assert_called_once_with(None, "", unittest.mock.ANY)
assert "Step Cancel 1." in result.output
assert "Step Cancel 2." in result.output
assert "Step Cancel 3." not in result.output
assert (
"Job/template execution was skipped from job/template step code."
in result.output
)
cli_assert_equal(0, result)
# Check the writer plugin wasn't called with user or platform errors.
# 1st param is overall blamee, should be None.
# 2nd parm is overall user error in this case it's empty string which is a falsy value.
# 3rd param is context.configuration and can be ignored for this test.
patched_writer.assert_called_once_with(None, "", unittest.mock.ANY)
assert "Step Cancel 1." in result.output
assert "Step Cancel 2." in result.output
assert "Step Cancel 3." not in result.output
assert (
"Job/template execution was skipped from job/template step code."
in result.output
)
cli_assert_equal(0, result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import unittest.mock

from click.testing import Result
from functional.run import util
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner


def test_run_log_execution_result_enabled():
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("simple-job")])

cli_assert_equal(0, result)
assert "Data Job execution summary" in result.output
assert "steps_list" in result.output


def test_run_log_execution_result_enabled_on_fail():
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("fail-job")])

cli_assert_equal(1, result)
assert "Data Job execution summary" in result.output
assert "steps_list" in result.output


def test_run_log_execution_result_disabled():
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("simple-job")])

cli_assert_equal(0, result)
assert "Data Job execution summary" not in result.output
assert "steps_list" not in result.output
assert "Job execution result: SUCCESS" in result.output


def test_run_log_execution_result_disabled_on_fail():
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("fail-job")])

cli_assert_equal(1, result)
assert "Data Job execution summary" not in result.output
assert "steps_list" not in result.output
assert "Job execution result: FAILED" in result.output
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
log = logging.getLogger(__name__)

VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE"
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"


class ValidatedSqLite3MemoryDbPlugin:
Expand Down Expand Up @@ -79,7 +80,10 @@ def test_run_dbapi_connection_no_such_db_type():
logging.getLogger("vdk").setLevel(logging.INFO)
runner = CliEntryBasedTestRunner()

with mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}):
with mock.patch.dict(
os.environ,
{VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY, VDK_LOG_EXECUTION_RESULT: "True"},
):
result: Result = runner.invoke(
[
"run",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def setUp(self) -> None:
{
"VDK_INGEST_METHOD_DEFAULT": "memory",
"VDK_INGEST_PAYLOAD_PREPROCESS_SEQUENCE": "vdk-gdp-execution-id",
"VDK_LOG_EXECUTION_RESULT": "True",
},
)
def test_ingested_payload_expansion(self) -> None:
Expand All @@ -46,6 +47,7 @@ def test_ingested_payload_expansion(self) -> None:
"VDK_INGEST_METHOD_DEFAULT": "memory",
"VDK_INGEST_PAYLOAD_PREPROCESS_SEQUENCE": "vdk-gdp-execution-id",
"VDK_GDP_EXECUTION_ID_MICRO_DIMENSION_NAME": "micro_dimension_name_customized",
"VDK_LOG_EXECUTION_RESULT": "True",
},
)
def test_micro_dimension_name_configurable(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-trino/tests/test_ingest_to_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
VDK_TRINO_PORT = "VDK_TRINO_PORT"
VDK_TRINO_USE_SSL = "VDK_TRINO_USE_SSL"
VDK_INGEST_METHOD_DEFAULT = "VDK_INGEST_METHOD_DEFAULT"
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"


@pytest.mark.usefixtures("trino_service")
Expand All @@ -29,6 +30,7 @@
VDK_TRINO_PORT: "8080",
VDK_TRINO_USE_SSL: "False",
VDK_INGEST_METHOD_DEFAULT: "TRINO",
VDK_LOG_EXECUTION_RESULT: "True",
},
)
class IngestToTrinoTests(TestCase):
Expand Down
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY = (
"VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY"
)
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"

org_move_data_to_table = TrinoTemplateQueries.move_data_to_table

Expand Down Expand Up @@ -60,6 +61,7 @@ def mock_os_environ():
VDK_TRINO_PORT: "8080",
VDK_TRINO_USE_SSL: "False",
VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "INSERT_SELECT",
VDK_LOG_EXECUTION_RESULT: "True",
},
):
yield
Expand Down

0 comments on commit c5c6a2d

Please sign in to comment.