Skip to content

Packages and workflows introduction

Operations on workflows.

In Atlan, packages define the workflows you can run to retrieve metadata from various sources. Within this section the individual packages that are currently supported through the SDKs are listed with examples.

Workflows run asynchronously

This means the helper method to run a workflow will return immediately, before the workflow itself has finished running. If you want to wait until the workflow is finished you'll need to use other helper methods to check the status and wait accordingly.

Block until workflow completion

1.8.4 1.0.0

To block until the workflow has completed running:

Block until workflow has completed
1
2
3
...
WorkflowResponse response = workflow.run(); // (1)
AtlanWorkflowPhase state = response.monitorStatus(log); // (2)
  1. Every package returns a Workflow object, from which you can run() the workflow. This call will return almost immediately with some metadata about the workflow run — it will not wait until the workflow has completed running.
  2. There is a monitorStatus() method on the response of a workflow run that you can use to wait until the workflow has completed. When this method finally returns, it will give the state of the workflow when it completed (for example, success or failure).

    The method comes in two variations:

    • one that takes an slf4j logger (in this example) and will log its status periodically
    • and another that takes no arguments and does not do any logging
Block until workflow has completed
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import logging
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

...
response = client.workflow.run(workflow) # (1)
state = client.workflow.monitor( # (2)
  workflow_response=response, logger=LOGGER
)
  1. Each package returns a Workflow object, which you can subsequently pass to the run() method of the workflow client. This call will return almost immediately with some metadata about the workflow run — it will not wait until the workflow has completed running.
  2. Use the monitor() method on the workflow client to wait until the workflow has completed. When this method returns, it provides the final state of the workflow, indicating whether it was successful or failed.

    The method comes in two variations:

    • one that takes a logger (in this example) and will log its status periodically.
    • and another that takes no arguments and does not do any logging.
Block until workflow has completed
1
2
3
...
val response = workflow.run() // (1)
val state = response.monitorStatus(log) // (2)
  1. Every package returns a Workflow object, from which you can run() the workflow. This call will return almost immediately with some metadata about the workflow run — it will not wait until the workflow has completed running.
  2. There is a monitorStatus() method on the response of a workflow run that you can use to wait until the workflow has completed. When this method finally returns, it will give the state of the workflow when it completed (for example, success or failure).

    The method comes in two variations:

    • one that takes an slf4j logger (in this example) and will log its status periodically
    • and another that takes no arguments and does not do any logging

Update workflow source credentials

1.8.4 1.9.0

To update workflow source credentials for example, for Snowflake:

Update workflow source credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
AtlanClient client = Atlan.getDefaultClient()

Credential snowflakeCredential = client.credentials.get( // (1)
    "972a87c1-28d7-8bf2-896d-ea5bd3e9c691"
    ).toCredential()
    .authType("basic") // (2)
    .username("username") // (3)
    .password("password")
    .extra("role", "role-here")
    .extra("warehouse", "warehouse-here")
    .build() // (4)

CredentialResponse response = snowflakeCredential.update() // (5)
  1. You can retrieve the workflow credential object by providing its GUID.
  2. You must specify the authentication type of the credential.
  3. You must provide the sensitive details such as the username, password, and extra when updating credentials. This behavior aligns with the Atlan workflow config update UI.
  4. Build the minimal Credential object.
  5. Now, use the update() method of the Credential object to update this new credentials in Atlan after initially testing it for successful validation.
Update workflow source credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

snowflake_credential = client.credentials.get(
    guid="972a87c1-28d7-8bf2-896d-ea5bd3e9c691"
).to_credential() # (1)

# Basic Authentication
snowflake_credential.auth_type = "basic" # (2)
snowflake_credential.username = "username" # (3)
snowflake_credential.password = "password"
snowflake_credential.extras = {
    "role": "role-here",
    "warehouse": "warehouse-here",
}

response = client.credentials.test_and_update( # (4)
    credential=snowflake_credential
)
  1. You can retrieve the workflow credential object by providing its GUID.
  2. You must specify the authentication type of the credential.
  3. You must provide the sensitive details such as the username, password, and extras when updating credentials. This behavior aligns with the Atlan workflow config update UI.
  4. Now, pass the credential object to the test_and_update() method to update this new credentials in Atlan after initially testing it to confirm its successful validation.
Update workflow source credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
val client = Atlan.getDefaultClient()

val snowflakeCredential = client.credentials.get( // (1)
    "972a87c1-28d7-8bf2-896d-ea5bd3e9c691"
    ).toCredential()
    .authType("basic") // (2)
    .username("username") // (3)
    .password("password")
    .extra("role", "role-here")
    .extra("warehouse", "warehouse-here")
    .build() // (4)

val response = snowflakeCredential.update() // (5)
  1. You can retrieve the workflow credential object by providing its GUID.
  2. You must specify the authentication type of the credential.
  3. You must provide the sensitive details such as the username, password, and extra when updating credentials. This behavior aligns with the Atlan workflow config update UI.
  4. Build the minimal Credential object.
  5. Now, use the update() method of the Credential object to update this new credentials in Atlan after initially testing it for successful validation.

Update workflow configuration

1.9.1

To update workflow configuration for example, for Snowflake:

Coming soon

Update workflow configuration
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

result = client.workflow.find_by_type(
    prefix=WorkflowPackage.SNOWFLAKE, max_results=5
)[0] # (1)

workflow_task = result.source.spec.templates[0].dag.tasks[0]
workflow_params = workflow_task.arguments.parameters # (2)

for option in workflow_params:
    if option.name == "enable-lineage": # (3)
        option.value = True

response = client.workflow.update(workflow=result.to_workflow()) # (4)
  1. Find workflows by their type using the find_by_type() method of the workflow client and provide the prefix for one of the packages. In this example, we do this for the Snowflake package. You can also specify the maximum number of resulting workflows you want to retrieve as results.
  2. Retrieve the workflow template and specific task that you need to update.
  3. Update the specific workflow parameter. In this example, we're enabling lineage for the Snowflake workflow.
  4. Convert the workflow search result object to a workflow object and pass that to the update() method to actually perform the workflow update in Atlan.

Coming soon