Skip to content

Commit

Permalink
[embedded-elt] Allow custom op names when using @sling_assets asset d…
Browse files Browse the repository at this point in the history
…ecorators (#20253)

## Summary & Motivation

Allows the use of custom op names when using the @sling_assets decorator,
allowing users to have multiple invocations of the sling asset. By default,
uses the name of the calling function

## How I Tested These Changes

unit tests
  • Loading branch information
PedramNavid committed Mar 5, 2024
1 parent d8f8b5d commit 06d4c19
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def sling_assets(
*,
replication_config: SlingReplicationParam,
dagster_sling_translator: DagsterSlingTranslator = DagsterSlingTranslator(),
name: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[Mapping[str, Any]] = None,
Expand All @@ -38,13 +39,14 @@ def sling_assets(
spec and descriptions, see `Sling's Documentation <https://docs.slingdata.io/sling-cli/run/configuration>`_.
Args:
replication_config: Union[Mapping[str, Any], str, Path]: A path to a Sling replication config, or a dictionary
replication_config (Union[Mapping[str, Any], str, Path]): A path to a Sling replication config, or a dictionary
of a replication config.
dagster_sling_translator: DagsterSlingTranslator: Allows customization of how to map a Sling stream to a Dagster
dagster_sling_translator: (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster
AssetKey.
partitions_def: Optional[PartitionsDefinition]: The partitions definition for this asset.
backfill_policy: Optional[BackfillPolicy]: The backfill policy for this asset.
op_tags: Optional[Mapping[str, Any]]: The tags for this asset.
name (Optional[str]: The name of the op.
partitions_def (Optional[PartitionsDefinition]): The partitions definition for this asset.
backfill_policy (Optional[BackfillPolicy]): The backfill policy for this asset.
op_tags (Optional[Mapping[str, Any]]): The tags for this asset.
Examples:
Running a sync by providing a path to a Sling Replication config:
Expand Down Expand Up @@ -100,7 +102,7 @@ def my_assets(context, sling: SlingResource):

def inner(fn) -> AssetsDefinition:
asset_definition = multi_asset(
name="sling_asset_definition",
name=name,
compute_kind="sling",
partitions_def=partitions_def,
can_subset=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,23 @@ def my_sling_assets(sling: SlingResource):
assert res.success
counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0]
assert counts == 3


def test_with_custom_name(replication_config: SlingReplicationParam):
@sling_assets(replication_config=replication_config)
def my_sling_assets():
...

assert my_sling_assets.op.name == "my_sling_assets"

@sling_assets(replication_config=replication_config)
def my_other_assets():
...

assert my_other_assets.op.name == "my_other_assets"

@sling_assets(replication_config=replication_config, name="custom_name")
def my_third_sling_assets():
...

assert my_third_sling_assets.op.name == "custom_name"

0 comments on commit 06d4c19

Please sign in to comment.