Here’s the code that I’m using right now for abstracting the OP publishes by submitting them to Deadline. Maybe this is a bit overkill for certain subset publishes that could probably go fast enough locally… but this has allowed me to write a very thin layer publisher in Houdini and I’m now writing a batch ingest tool that would use the same modules. It’s still very much WIP and I took some shortcuts in order to have something ready faster. I was planning on cleaning it further and create a draft PR in Github but for now some of this might help you and it might help me get some early feedback:
Publish version abstraction:
import os
import getpass
import json
from openpype.lib import Logger
from openpype.pipeline import legacy_io
from openpype.modules.deadline import constants as dl_constants
from openpype.modules.deadline.lib import submit
from openpype.modules.delivery.scripts import utils
logger = Logger.get_logger(__name__)
REVIEW_FAMILIES = {
"render"
}
PUBLISH_TO_SG_FAMILIES = {
"render"
}
def publish_version(
project_name,
asset_name,
task_name,
family_name,
subset_name,
expected_representations,
publish_data,
):
# TODO: write some logic that finds the main path from the list of
# representations
source_path = list(expected_representations.values())[0]
instance_data = {
"project": project_name,
"family": family_name,
"subset": subset_name,
"families": publish_data.get("families", []),
"asset": asset_name,
"task": task_name,
"comment": publish_data.get("comment", ""),
"source": source_path,
"overrideExistingFrame": False,
"useSequenceForReview": True,
"colorspace": publish_data.get("colorspace"),
"version": publish_data.get("version"),
"outputDir": os.path.dirname(source_path),
}
representations = utils.get_representations(
instance_data,
expected_representations,
add_review=family_name in REVIEW_FAMILIES,
publish_to_sg=family_name in PUBLISH_TO_SG_FAMILIES,
)
if not representations:
logger.error(
"No representations could be found on expected dictionary: %s",
expected_representations
)
return {}
if family_name in REVIEW_FAMILIES:
# inject colorspace data if we are generating a review
for rep in representations:
source_colorspace = publish_data.get("colorspace") or "scene_linear"
logger.debug(
"Setting colorspace '%s' to representation", source_colorspace
)
utils.set_representation_colorspace(
rep, project_name, colorspace=source_colorspace
)
instance_data["frameStartHandle"] = representations[0]["frameStart"]
instance_data["frameEndHandle"] = representations[0]["frameEnd"]
# add representation
instance_data["representations"] = representations
instances = [instance_data]
# Create farm job to run OP publish
metadata_path = utils.create_metadata_path(instance_data)
logger.info("Metadata path: %s", metadata_path)
publish_args = [
"--headless",
"publish",
'"{}"'.format(metadata_path),
"--targets",
"deadline",
"--targets",
"farm",
]
# Create dictionary of data specific to OP plugin for payload submit
plugin_data = {
"Arguments": " ".join(publish_args),
"Version": os.getenv("OPENPYPE_VERSION"),
"SingleFrameOnly": "True",
}
username = getpass.getuser()
# Submit job to Deadline
extra_env = {
"AVALON_PROJECT": project_name,
"AVALON_ASSET": asset_name,
"AVALON_TASK": task_name,
"OPENPYPE_USERNAME": username,
"AVALON_WORKDIR": os.path.dirname(source_path),
"OPENPYPE_PUBLISH_JOB": "1",
"OPENPYPE_RENDER_JOB": "0",
"OPENPYPE_REMOTE_JOB": "0",
"OPENPYPE_LOG_NO_COLORS": "1",
"OPENPYPE_SG_USER": username,
}
deadline_task_name = "Publish {} - {} - {} - {} - {}".format(
family_name,
subset_name,
task_name,
asset_name,
project_name
)
response = submit.payload_submit(
plugin="OpenPype",
plugin_data=plugin_data,
batch_name=publish_data.get("jobBatchName") or deadline_task_name,
task_name=deadline_task_name,
group=dl_constants.OP_GROUP,
extra_env=extra_env,
)
# publish job file
publish_job = {
"asset": instance_data["asset"],
"frameStart": instance_data["frameStartHandle"],
"frameEnd": instance_data["frameEndHandle"],
"source": instance_data["source"],
"user": getpass.getuser(),
"version": None, # this is workfile version
"comment": instance_data["comment"],
"job": {},
"session": legacy_io.Session.copy(),
"instances": instances,
"deadline_publish_job_id": response.get("_id")
}
logger.info("Writing json file: {}".format(metadata_path))
with open(metadata_path, "w") as f:
json.dump(publish_job, f, indent=4, sort_keys=True)
return response
The utils module that’s basically a rip off of other OP functions but I removed the instance object and other things so I was able to use them easier. There’s also this get_representations
function that I wrote pretty quickly so I’m able to create the representations from just a dictionary of keys being the name of the representation and value a single path from the corresponding representation:
import clique
import re
import glob
import datetime
import getpass
import os
import requests
from openpype.lib import Logger, is_running_from_build
from openpype.pipeline import Anatomy
from openpype.pipeline.colorspace import get_imageio_config
logger = Logger.get_logger(__name__)
# Regular expression that allows us to replace the frame numbers of a file path
# with any string token
RE_FRAME_NUMBER = re.compile(
r"(?P<prefix>^(.*)+)\.(?P<frame>\d+)\.(?P<extension>\w+\.?(sc|gz)?$)"
)
def create_metadata_path(instance_data):
# Ensure output dir exists
output_dir = instance_data.get(
"publishRenderMetadataFolder", instance_data["outputDir"]
)
try:
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
except OSError:
# directory is not available
logger.warning("Path is unreachable: `{}`".format(output_dir))
metadata_filename = "{}_{}_{}_metadata.json".format(
datetime.datetime.now().strftime("%d%m%Y%H%M%S"),
instance_data["asset"],
instance_data["subset"]
)
return os.path.join(output_dir, metadata_filename)
def replace_frame_number_with_token(path, token):
return RE_FRAME_NUMBER.sub(
r"\g<prefix>.{}.\g<extension>".format(token), path
)
def get_representations(
instance_data,
exp_representations,
add_review=True,
publish_to_sg=False
):
"""Create representations for file sequences.
This will return representation dictionaries of expected files. There
should be only one sequence of files for most cases, but if not - we create
a representation for each.
If the file path given is just a frame, it
Arguments:
instance_data (dict): instance["data"] for which we are
setting representations
exp_representations (dict[str:str]): Dictionary of expected
representations that should be created. Key is name of
representation and value is a file path to one of the files
from the representation (i.e., "exr": "/path/to/beauty.1001.exr").
Returns:
list of representations
"""
anatomy = Anatomy(instance_data["project"])
representations = []
for rep_name, file_path in exp_representations.items():
rep_frame_start = None
rep_frame_end = None
ext = None
# Convert file path so it can be used with glob and find all the
# frames for the sequence
file_pattern = replace_frame_number_with_token(file_path, "*")
representation_files = glob.glob(file_pattern)
collections, remainder = clique.assemble(representation_files)
# If file path is in remainder it means it was a single file
if file_path in remainder:
collections = [remainder]
frame_match = RE_FRAME_NUMBER.match(file_path)
if frame_match:
ext = frame_match.group("extension")
frame = frame_match.group("frame")
rep_frame_start = frame
rep_frame_end = frame
else:
rep_frame_start = 1
rep_frame_end = 1
ext = os.path.splitext(file_path)[1][1:]
elif not collections:
logger.warning(
"Couldn't find a collection for file pattern '%s'.",
file_pattern
)
continue
if len(collections) > 1:
logger.warning(
"More than one sequence find for the file pattern '%s'."
" Using only first one: %s",
file_pattern,
collections,
)
collection = collections[0]
if not ext:
ext = collection.tail.lstrip(".")
staging = os.path.dirname(list(collection)[0])
success, rootless_staging_dir = anatomy.find_root_template_from_path(
staging
)
if success:
staging = rootless_staging_dir
else:
logger.warning(
"Could not find root path for remapping '%s'."
" This may cause issues on farm.",
staging
)
if not rep_frame_start or not rep_frame_end:
col_frame_range = list(collection.indexes)
rep_frame_start = col_frame_range[0]
rep_frame_end = col_frame_range[-1]
tags = []
if add_review:
tags.append("review")
if publish_to_sg:
tags.append("shotgridreview")
files = [os.path.basename(f) for f in list(collection)]
# If it's a single file on the collection we remove it
# from the list as OP checks if "files" is a list or tuple
# at certain places to validate if it's a sequence or not
if len(files) == 1:
files = files[0]
rep = {
"name": rep_name,
"ext": ext,
"files": files,
"frameStart": rep_frame_start,
"frameEnd": rep_frame_end,
# If expectedFile are absolute, we need only filenames
"stagingDir": staging,
"fps": instance_data.get("fps"),
"tags": tags,
}
if instance_data.get("multipartExr", False):
rep["tags"].append("multipartExr")
# support conversion from tiled to scanline
if instance_data.get("convertToScanline"):
logger.info("Adding scanline conversion.")
rep["tags"].append("toScanline")
representations.append(rep)
solve_families(instance_data, add_review)
return representations
def get_colorspace_settings(project_name):
"""Returns colorspace settings for project.
Returns:
tuple | bool: config, file rules or None
"""
config_data = get_imageio_config(
project_name,
host_name="nuke", # temporary hack as get_imageio_config doesn't support grabbing just global
)
# in case host color management is not enabled
if not config_data:
return None
return config_data
def set_representation_colorspace(
representation,
project_name,
colorspace=None,
):
"""Sets colorspace data to representation.
Args:
representation (dict): publishing representation
project_name (str): Name of project
config_data (dict): host resolved config data
file_rules (dict): host resolved file rules data
colorspace (str, optional): colorspace name. Defaults to None.
Example:
```
{
# for other publish plugins and loaders
"colorspace": "linear",
"config": {
# for future references in case need
"path": "/abs/path/to/config.ocio",
# for other plugins within remote publish cases
"template": "{project[root]}/path/to/config.ocio"
}
}
```
"""
ext = representation["ext"]
# check extension
logger.debug("__ ext: `{}`".format(ext))
config_data = get_colorspace_settings(project_name)
if not config_data:
# warn in case no colorspace path was defined
logger.warning("No colorspace management was defined")
return
logger.debug("Config data is: `{}`".format(config_data))
# infuse data to representation
if colorspace:
colorspace_data = {"colorspace": colorspace, "config": config_data}
# update data key
representation["colorspaceData"] = colorspace_data
def solve_families(instance_data, preview=False):
families = instance_data.get("families")
# if we have one representation with preview tag
# flag whole instance_data for review and for ftrack
if preview:
if "review" not in families:
logger.debug('Adding "review" to families because of preview tag.')
families.append("review")
instance_data["families"] = families
And finally the abstraction of Deadline so I can submit any plugin job using the same function:
# Default Deadline job
DEFAULT_PRIORITY = 50
DEFAULT_CHUNK_SIZE = 9999
DEAFAULT_CONCURRENT_TASKS = 1
def payload_submit(
plugin,
plugin_data,
batch_name,
task_name,
group="",
comment="",
priority=DEFAULT_PRIORITY,
chunk_size=DEFAULT_CHUNK_SIZE,
concurrent_tasks=DEAFAULT_CONCURRENT_TASKS,
frame_range=None,
department="",
extra_env=None,
response_data=None,
):
if not response_data:
response_data = {}
frames = "0" if not frame_range else f"{frame_range[0]}-{frame_range[1]}"
payload = {
"JobInfo": {
# Top-level group name
"BatchName": batch_name,
# Job name, as seen in Monitor
"Name": task_name,
# Arbitrary username, for visualisation in Monitor
"UserName": getpass.getuser(),
"Priority": priority,
"ChunkSize": chunk_size,
"ConcurrentTasks": concurrent_tasks,
"Department": department,
"Pool": "",
"SecondaryPool": "",
"Group": group,
"Plugin": plugin,
"Frames": frames,
"Comment": comment or "",
# Optional, enable double-click to preview rendered
# frames from Deadline Monitor
# "OutputFilename0": preview_fname(render_path).replace("\\", "/"),
},
"PluginInfo": plugin_data,
# Mandatory for Deadline, may be empty
"AuxFiles": [],
}
if response_data.get("_id"):
payload["JobInfo"].update(
{
"JobType": "Normal",
"BatchName": response_data["Props"]["Batch"],
"JobDependency0": response_data["_id"],
"ChunkSize": 99999999,
}
)
# Include critical environment variables with submission
keys = [
"AVALON_ASSET",
"AVALON_TASK",
"AVALON_PROJECT",
"AVALON_APP_NAME",
"OCIO",
"USER",
"OPENPYPE_SG_USER",
]
# Add OpenPype version if we are running from build.
if is_running_from_build():
keys.append("OPENPYPE_VERSION")
environment = dict(
{key: os.environ[key] for key in keys if key in os.environ},
**legacy_io.Session,
)
if extra_env:
environment.update(extra_env)
payload["JobInfo"].update(
{
"EnvironmentKeyValue%d"
% index: "{key}={value}".format(
key=key, value=environment[key]
)
for index, key in enumerate(environment)
}
)
plugin = payload["JobInfo"]["Plugin"]
logger.info("using render plugin : {}".format(plugin))
logger.info("Submitting..")
logger.info(json.dumps(payload, indent=4, sort_keys=True))
url = "{}/api/jobs".format(constants.DEADLINE_URL)
response = requests.post(url, json=payload, timeout=10)
if not response.ok:
raise Exception(response.text)
return response.json()