Sure thing, here it is:
import os
import click
import gazu
from pathlib import Path
from openpype.modules.base import ModulesManager
from openpype.pipeline.mongodb import AvalonMongoDB
from openpype.lib.local_settings import get_local_site_id
from openpype.modules.kitsu.utils.credentials import (
validate_credentials,
set_credentials_envs,
)
def run_scripts_on_task_workfile(
project_name: str,
asset_name: str,
task_name: str,
python_scripts: List[str],
families: List[str],
comment: str,
script_args: List[str],
keep_workfile_open: bool = False,
workfile_version=-1,
) -> Popen:
"""Launch Blender with the given python scripts and publish the workfile.
Args:
project_name (str): The project name.
asset_name (str): The asset name.
task_name (str): The task name.
python_scripts (List[str]): The python scripts paths to run.
families (List[str]): The families to publish the workfile with.
Returns:
Popen: The Blender process.
"""
return ApplicationManager().launch(
app_name="blender/3-3",
app_args=["-b"] if not keep_workfile_open else [],
project_name=project_name,
asset_name=asset_name,
task_name=task_name,
python_scripts=[*python_scripts,
Path(publish_blender_workfile.__file__).as_posix(),
],
script_args=[*script_args, "--families", *families, "--comment", comment],
workfile_version=workfile_version,
)
def download_subset(
project_name, asset_name, subset_name, ext="blend", hero=False
) -> dict:
"""Download the representation of the subset last version on current site.
Args:
project_name (str): The project name.
asset_name (str): The asset name.
subset_name (str): The subset name.
ext (str, optional): The representation extension. Defaults to "blend".
hero (bool, optional): Use hero version.
Returns:
dict: The subset representation.
"""
asset = get_asset_by_name(project_name, asset_name, fields=["_id"])
if not asset:
return
subset = get_subset_by_name(
project_name,
subset_name,
asset["_id"],
fields=["_id"],
)
if not subset:
return
version = None
if hero:
version = get_hero_version_by_subset_id(
project_name,
subset["_id"],
fields=["_id"],
)
if not version:
version = get_last_version_by_subset_id(
project_name,
subset["_id"],
fields=["_id"],
)
if not version:
return
representation = next(
get_representations(
project_name,
version_ids=[version["_id"]],
context_filters={"ext": [ext]},
),
None,
)
if not representation:
return
# Get sync server
modules_manager = ModulesManager()
sync_server = modules_manager.get("sync_server")
local_site_id = get_local_site_id()
# Add linked representations
representation_ids = {representation["_id"]}
representation_ids.update(
get_linked_representation_id(
project_name, repre_id=representation["_id"]
)
)
# Add local site to representations
for repre_id in representation_ids:
# Check if representation is already on site
if not sync_server.is_representation_on_site(
project_name, repre_id, local_site_id
):
sync_server.add_site(
project_name,
repre_id,
local_site_id,
priority=99,
force=True,
)
return representation
def wait_for_download(project_name, representations: List[dict]):
"""Wait for download of representations.
Args:
project_name (str): Project name.
representations (List[dict]): List of representations to wait for.
"""
# Get sync server
modules_manager = ModulesManager()
sync_server = modules_manager.get("sync_server")
# Reset timer
sync_server.reset_timer()
# Wait for download
local_site_id = get_local_site_id()
start = time() # 5 minutes timeout
while (
not all(
sync_server.is_representation_on_site(
project_name, r["_id"], local_site_id
)
for r in representations
if r
)
and time() - start < 300
):
sleep(5)
@cli_main.command(
context_settings=dict(
ignore_unknown_options=True,
)
)
@click.option(
"--login",
prompt=True,
help="Kitsu login",
)
@click.option(
"--password",
prompt=True,
hide_input=True,
help="Kitsu password",
)
@click.option(
"-p", "--project-name", prompt=True, help="Project name", required=True
)
@click.option(
"-e",
"--entity-name",
"entities_list",
help="Entity name",
multiple=True,
default=[],
)
@click.option(
"-A",
"--asset-type",
"asset_types",
help=(
"Asset type name to publish all assets from this type"
" (case sensitive, usually Capitalized)"
),
multiple=True,
default=[],
)
@click.option(
"-ep", "--episode", "episodes", help=(
"Episode name to process all shots from this episode"
),
multiple=True,
default=[],
)
@click.option(
"-l",
"--assets-list",
"entities_list_path",
help=("Path to a text file containing a list of assets to publish"),
)
@click.option(
"-s",
"--skip-assets-list",
"entities_to_skip_list_path",
help=("Path to a text file containing a list of assets to skip"),
)
@click.option(
"-f",
"--family",
"families",
help="Family to publish",
multiple=True,
required=True,
)
@click.option(
"-v",
"--version",
"workfile_version",
help="Workfile version",
default=-1,
)
@click.option(
"-P",
"--python-script",
"python_scripts",
help="Script to execute before publish",
multiple=True,
)
@click.option(
"-t",
"--task-type",
"task_types",
help="Task type name (case sensitive, usually Capitalized)",
multiple=True,
required=True,
)
@click.option(
"-c", "--comment", help="Comment to add to the publish", required=True
)
@click.option("--no-download", help="TODO", default=False, is_flag=True)
@click.option("--keep-open", help="TODO", default=False, is_flag=True)
@click.argument("unknown_args", nargs=-1, type=click.UNPROCESSED)
def publish_workfile(
login: str,
password: str,
project_name: str,
entities_list: List[str],
asset_types: List[str],
episodes: List[str],
entities_list_path: List[str],
entities_to_skip_list_path: List[str],
workfile_version: int,
families: List[str],
python_scripts: List[str],
task_types: List[str],
comment: str,
no_download: bool,
keep_open: bool,
unknown_args,
):
"""Builds a workfile with: TODO
- All the rigged characters casted in a shot.
- Lipsync actions assigned to each character in the scene.
- Animation instance for each armature.
"""
# Cast to list
entities_list = list(entities_list)
python_scripts = list(python_scripts)
# Check asset or asset type
if not entities_list and not asset_types and not episodes and not entities_list_path:
raise ValueError(
"You must provide either an asset name or an asset type "
"or an episode name or a path to an assets list file."
)
# Validate python scripts paths
for i, script_path in enumerate(python_scripts):
if not Path(script_path).is_file():
confo_filepath = (
Path(__file__)
.parent.joinpath("scripts", "conformation", script_path)
.with_suffix(".py")
)
if confo_filepath.is_file():
python_scripts[i] = confo_filepath.as_posix()
else:
raise ValueError(f"Invalid filepath: {script_path}")
set_credentials_envs(login, password)
validate_credentials(login, password)
# Start sync server
active_site = get_local_site_id()
os.environ["OPENPYPE_LOCAL_ID"] = active_site
manager = ModulesManager()
sync_server_module = manager.modules_by_name["sync_server"]
sync_server_module.server_init()
sync_server_module.server_start()
# Get all assets to skip from a list
entities_to_skip = set()
if entities_to_skip_list_path:
entities_to_skip_list_path = Path(entities_to_skip_list_path)
if not entities_to_skip_list_path.is_file():
raise ValueError(f"Invalid filepath: {entities_to_skip_list_path}")
# Get assets from file
with Path(entities_to_skip_list_path).open() as f:
entities_to_skip.update(
asset_name.strip()
for asset_name in f.readlines()
if not asset_name.startswith("#")
)
# Check single asset is not in skip list
if len(entities_list) == 1 and entities_list[0] in entities_to_skip:
raise ValueError(f"Single Asset '{entity_name}' is in skip list.")
# Get all assets of an asset type
project = gazu.project.get_project_by_name(project_name)
for asset_type_name in asset_types:
# Get asset type
asset_type = gazu.asset.get_asset_type_by_name(asset_type_name)
assets = gazu.asset.all_assets_for_project_and_type(
project, asset_type
)
for asset in assets:
# Check asset is not in skip list
#if asset["name"] not in entities_to_skip:
entities_list.append(asset["name"])
# Get all assets from a list
if entities_list_path:
entities_list_path = Path(entities_list_path)
if not entities_list_path.is_file():
raise ValueError(f"Invalid filepath: {entities_list_path}")
# Get assets from file
with Path(entities_list_path).open() as f:
entities_list.extend(
asset_name.strip()
for asset_name in f.readlines()
if not asset_name.startswith("#")
#and asset_name.strip() not in entities_to_skip
)
# Get all shots of an episode
dbcon = AvalonMongoDB()
dbcon.Session["AVALON_PROJECT"] = project_name
for episode_name in episodes:
# Get episode
episode = gazu.shot.get_episode_by_name(project, episode_name)
shots = gazu.shot.all_shots_for_episode(episode)
for shot in shots:
# Get shot from database by matching zou id
# TODO not optimized, should find all in one query
shot_doc = dbcon.find_one(
{
"type": "asset",
"data.zou.id": shot["id"],
}
)
# Check asset is not in skip list
#if not {shot["name"], shot_doc["name"]} & set(entities_to_skip):
entities_list.append(shot_doc["name"])
representations = []
# Run for listed assets
for entity_name in sorted(set(entities_list) - entities_to_skip):
for task_name in task_types:
if not no_download:
repre = download_subset(
project_name, entity_name, f"workfile{task_name}"
)
else:
repre = None
representations.append((repre, entity_name, task_name))
# Wait for all downloads and run scripts
if not representations:
raise ValueError(
f"No any representation found in '{project_name}' for assets: {entities_list}"
)
for repre, entity_name, task_name in representations:
if repre:
wait_for_download(project_name, [repre])
run_scripts_on_task_workfile(
project_name,
entity_name,
task_name,
python_scripts,
families,
comment,
unknown_args,
keep_workfile_open=keep_open,
workfile_version=workfile_version
)
# Log out
gazu.log_out()
Required features
In exchange, could you please try to make it as Deadline independent as possible? A true clean API I could reuse for other hosts? I’m sure that’s your goal and I wish it from the deep of my heart.