Batch publishing (ingest)

Hello there.
I need to publish a bunch of files from client. Stills for assets, frame sequences for shots.

Working on a script that walks through the directory to ingest, and publishes each file on the asset where it belongs.

I’m stuck on the step of actually publishing each file. From the docs here OpenPype Commands Reference | openPYPE, I see that I could run
openpype publish <PATH_TO_JSON> or PypeCommands.publish but it’s not clear to me how said json file should be formed, or where it’s supposed to come from.

I’d appreciate if somebody could point me in the right direction in terms of architecture… Docs, example scripts, any tip is welcome.

Thanks!
Santi

Hello Santi,

Is Tray publisher tool working for you ?

or
Are you looking for Running a headless publish ?

Hi Mustafa, thanks for your reply.
Tray publisher seems to only be able to publish to a single asset (unless I’m missing something?) and I have to publish hundreds of files, so yes, I’m looking for a headless publish solution.

Thanks for the link to BigRoy’s launch_scripts. It is a step in the right direction, though I’m a bit confused about the need to open a host like Nuke for this - it feels like overkill…?

Yes, different use case. It’s for batch publishing outputs from existing workfiles.


Tray Publisher: Editorial Workflow

I think what you might be looking for is Editorial publishing - which I think the Tray Publisher offer something for as well.

See:

However it revolves around working with an EDL file I believe.

Tray Publisher: Batch Movies

There’s also Batch Movies which I believe just parses input video files on name and with that moves them into specific assets/subsets - I believe you can define a regex somewhere. Not entirely sure myself how it works but the logic for it is here but it has example logic to match it to existing assets in your database here


Do you have some more details on what those hundred files would be? And how do you have these currently structured? And how would you like these to get processed?

Do you have a single folder with 100 video files? Do you have 10 folders with each representing a shot that contains an audio file, reference plate and matchmoved alembic cameras? Knowing the types of files you might have and how they are structured might help giving good feedback on how to quickest ingest these into OpenPype.

Hi BigRoy,
Yeah, more info: On this particular case, I have batches of psd files and exr sequences, coming from client.
My current plan is to walk through the source directory in a python script, fuzzy-matching each source file to an existing shot/asset (fuzzy because source naming is somewhat inconsistent).
I have two kinds of sources:

  • psd files which go to asset (task:Concept, family:image, subset:background)
  • exr sequences that go to shot (task:Animation, family:render, subset:animation)

I don’t have a problem parsing the input and figuring out where to publish each file. My issue is HOW to publish it, ideally in a way that’s not too convoluted.

Thanks for the resources, will be taking a look!
S

Hey Santi,

This is exactly the same need I have in my studio and I found the same blockers you did. I mentioned this on the last paragraph of this post of mine Refactoring Houdini integration - #2 by fabiaserra and also more recently here Discord how the TrayPublisher or the existing solutions in OP don’t fit the requirements to provide these flexible workflows to batch ingest a bunch of files programmatically.

Because of that, right now I have written an abstraction of the submit_publish_job plugin so it generates the json file that you need to run the publish in headless mode with some simple inputs. I basically then have a function with an interface like the following:

def publish_version(
    project_name,
    asset_name,
    task_name,
    family_name,
    subset_name,
    expected_representations,
):

And I can now write a separate script that does this fuzzy-matching logic where given a folder, I can parse through that list of files and write the information I need to call this publish_version function and I can wrap this script in a CLI or a UI very easily.

I’m still working on this today and tomorrow and my logic will still be quite specific to my studio but I hope I can clean this up and create a PR soon so others can make use of it

1 Like

Please do share - even when not clean. I’d be happy to take a look as well and see if I can re-use or clean things up for more generalized used cases.

Hello again.

Fabià, I couldn’t agree more with you. Headless publishing should be a lot more straight-forward and abstract.

Digging around on this topic, I found #felixdavriv’s take here:

Which I’ve spliced into an addon (using example_addon as template) so I can run it as a module:
poetry run python start.py module batch_publish_addon process-directory

And it does publish to OP, though not to Kitsu, not sure why (Tray publisher does work as expected for single items and it does push to Kitsu)

The business bit comes from Felix pretty much untouched:

        # Build required pyblish data
        os.environ["AVALON_ASSET"] = asset['name']
        context = pyblish.api.Context()
        instance = context.create_instance(name=asset['name'])
        instance.data.update(
            {
                "family": "review",
                "asset": asset['name'],
                "task": "Concept",
                "subset": "background",
                "publish": True,
                "active": True,
                "source": filepath.as_posix(),
            }
        )
        # Add representation
        ext = filepath.suffix.strip(".")
        representation = {
            "name": ext,
            "ext": ext,
            "preview": True,
            "tags": ["review"],
            "files": filepath.name,
            "stagingDir": filepath.parent,
        }
        instance.data.setdefault("representations", [])
        instance.data["representations"].append(representation)

        # Publish to OP
        context = pyblish.util.publish(context)

The only thing suspicious I see in the output is:

DEBUG:pyblish.ValidateIntent:Profile selected: {'hosts': [], 'task_types': [], 'tasks': [], 'validate': False}
DEBUG:pyblish.ValidateIntent:Validation of intent was skipped. Matching profile for current context disabled validation.
DEBUG:pyblish.ValidatePublishDir:Looking for matching profile for: hosts: "None" | families: "review" | task_names: "Concept" | task_types: "Concept"
DEBUG:pyblish.ValidatePublishDir:"None" not found in "hosts": ['standalonepublisher']
DEBUG:pyblish.ValidatePublishDir:"None" not found in "hosts": ['maya']
DEBUG:pyblish.ValidatePublishDir:"None" not found in "hosts": ['traypublisher']
DEBUG:pyblish.ValidatePublishDir:Profile selected: {'families': ['review', 'render', 'prerender'], 'hosts': [], 'task_types': [], 'task_names': [], 'template_name': 'render'}
DEBUG:pyblish.CollectComment:Instance BG_INT_Frances_ApartmentWindow  does not have set comment

Again pointing to the expectation of running from a ‘host’? #bigroy

Felix’s script was manually uploading to kitsu using gazu api, but it seems like this shouldn’t be necessary?

Again, thanks for all the help!
S

Here’s the code that I’m using right now for abstracting the OP publishes by submitting them to Deadline. Maybe this is a bit overkill for certain subset publishes that could probably go fast enough locally… but this has allowed me to write a very thin layer publisher in Houdini and I’m now writing a batch ingest tool that would use the same modules. It’s still very much WIP and I took some shortcuts in order to have something ready faster. I was planning on cleaning it further and create a draft PR in Github but for now some of this might help you and it might help me get some early feedback:

Publish version abstraction:

import os
import getpass
import json

from openpype.lib import Logger
from openpype.pipeline import legacy_io

from openpype.modules.deadline import constants as dl_constants
from openpype.modules.deadline.lib import submit
from openpype.modules.delivery.scripts import utils


logger = Logger.get_logger(__name__)


REVIEW_FAMILIES = {
    "render"
}

PUBLISH_TO_SG_FAMILIES = {
    "render"
}


def publish_version(
    project_name,
    asset_name,
    task_name,
    family_name,
    subset_name,
    expected_representations,
    publish_data,
):
    # TODO: write some logic that finds the main path from the list of
    # representations
    source_path = list(expected_representations.values())[0]

    instance_data = {
        "project": project_name,
        "family": family_name,
        "subset": subset_name,
        "families": publish_data.get("families", []),
        "asset": asset_name,
        "task": task_name,
        "comment": publish_data.get("comment", ""),
        "source": source_path,
        "overrideExistingFrame": False,
        "useSequenceForReview": True,
        "colorspace": publish_data.get("colorspace"),
        "version": publish_data.get("version"),
        "outputDir": os.path.dirname(source_path),
    }

    representations = utils.get_representations(
        instance_data,
        expected_representations,
        add_review=family_name in REVIEW_FAMILIES,
        publish_to_sg=family_name in PUBLISH_TO_SG_FAMILIES,
    )
    if not representations:
        logger.error(
            "No representations could be found on expected dictionary: %s",
            expected_representations
        )
        return {}

    if family_name in REVIEW_FAMILIES:
        # inject colorspace data if we are generating a review
        for rep in representations:
            source_colorspace = publish_data.get("colorspace") or "scene_linear"
            logger.debug(
                "Setting colorspace '%s' to representation", source_colorspace
            )
            utils.set_representation_colorspace(
                rep, project_name, colorspace=source_colorspace
            )

    instance_data["frameStartHandle"] = representations[0]["frameStart"]
    instance_data["frameEndHandle"] = representations[0]["frameEnd"]

    # add representation
    instance_data["representations"] = representations
    instances = [instance_data]

    # Create farm job to run OP publish
    metadata_path = utils.create_metadata_path(instance_data)
    logger.info("Metadata path: %s", metadata_path)

    publish_args = [
        "--headless",
        "publish",
        '"{}"'.format(metadata_path),
        "--targets",
        "deadline",
        "--targets",
        "farm",
    ]

    # Create dictionary of data specific to OP plugin for payload submit
    plugin_data = {
        "Arguments": " ".join(publish_args),
        "Version": os.getenv("OPENPYPE_VERSION"),
        "SingleFrameOnly": "True",
    }

    username = getpass.getuser()

    # Submit job to Deadline
    extra_env = {
        "AVALON_PROJECT": project_name,
        "AVALON_ASSET": asset_name,
        "AVALON_TASK": task_name,
        "OPENPYPE_USERNAME": username,
        "AVALON_WORKDIR": os.path.dirname(source_path),
        "OPENPYPE_PUBLISH_JOB": "1",
        "OPENPYPE_RENDER_JOB": "0",
        "OPENPYPE_REMOTE_JOB": "0",
        "OPENPYPE_LOG_NO_COLORS": "1",
        "OPENPYPE_SG_USER": username,
    }

    deadline_task_name = "Publish {} - {} - {} - {} - {}".format(
        family_name,
        subset_name,
        task_name,
        asset_name,
        project_name
    )

    response = submit.payload_submit(
        plugin="OpenPype",
        plugin_data=plugin_data,
        batch_name=publish_data.get("jobBatchName") or deadline_task_name,
        task_name=deadline_task_name,
        group=dl_constants.OP_GROUP,
        extra_env=extra_env,
    )

    # publish job file
    publish_job = {
        "asset": instance_data["asset"],
        "frameStart": instance_data["frameStartHandle"],
        "frameEnd": instance_data["frameEndHandle"],
        "source": instance_data["source"],
        "user": getpass.getuser(),
        "version": None,  # this is workfile version
        "comment": instance_data["comment"],
        "job": {},
        "session": legacy_io.Session.copy(),
        "instances": instances,
        "deadline_publish_job_id": response.get("_id")
    }

    logger.info("Writing json file: {}".format(metadata_path))
    with open(metadata_path, "w") as f:
        json.dump(publish_job, f, indent=4, sort_keys=True)

    return response

The utils module that’s basically a rip off of other OP functions but I removed the instance object and other things so I was able to use them easier. There’s also this get_representations function that I wrote pretty quickly so I’m able to create the representations from just a dictionary of keys being the name of the representation and value a single path from the corresponding representation:

import clique
import re
import glob
import datetime
import getpass
import os
import requests

from openpype.lib import Logger, is_running_from_build
from openpype.pipeline import Anatomy
from openpype.pipeline.colorspace import get_imageio_config


logger = Logger.get_logger(__name__)

# Regular expression that allows us to replace the frame numbers of a file path
# with any string token
RE_FRAME_NUMBER = re.compile(
    r"(?P<prefix>^(.*)+)\.(?P<frame>\d+)\.(?P<extension>\w+\.?(sc|gz)?$)"
)


def create_metadata_path(instance_data):
    # Ensure output dir exists
    output_dir = instance_data.get(
        "publishRenderMetadataFolder", instance_data["outputDir"]
    )

    try:
        if not os.path.isdir(output_dir):
            os.makedirs(output_dir)
    except OSError:
        # directory is not available
        logger.warning("Path is unreachable: `{}`".format(output_dir))

    metadata_filename = "{}_{}_{}_metadata.json".format(
        datetime.datetime.now().strftime("%d%m%Y%H%M%S"),
        instance_data["asset"],
        instance_data["subset"]
    )

    return os.path.join(output_dir, metadata_filename)


def replace_frame_number_with_token(path, token):
    return RE_FRAME_NUMBER.sub(
        r"\g<prefix>.{}.\g<extension>".format(token), path
    )


def get_representations(
    instance_data,
    exp_representations,
    add_review=True,
    publish_to_sg=False
):
    """Create representations for file sequences.

    This will return representation dictionaries of expected files. There
    should be only one sequence of files for most cases, but if not - we create
    a representation for each.

    If the file path given is just a frame, it

    Arguments:
        instance_data (dict): instance["data"] for which we are
                            setting representations
        exp_representations (dict[str:str]): Dictionary of expected
            representations that should be created. Key is name of
            representation and value is a file path to one of the files
            from the representation (i.e., "exr": "/path/to/beauty.1001.exr").

    Returns:
        list of representations

    """
    anatomy = Anatomy(instance_data["project"])

    representations = []
    for rep_name, file_path in exp_representations.items():

        rep_frame_start = None
        rep_frame_end = None
        ext = None

        # Convert file path so it can be used with glob and find all the
        # frames for the sequence
        file_pattern = replace_frame_number_with_token(file_path, "*")

        representation_files = glob.glob(file_pattern)
        collections, remainder = clique.assemble(representation_files)

        # If file path is in remainder it means it was a single file
        if file_path in remainder:
            collections = [remainder]
            frame_match = RE_FRAME_NUMBER.match(file_path)
            if frame_match:
                ext = frame_match.group("extension")
                frame = frame_match.group("frame")
                rep_frame_start = frame
                rep_frame_end = frame
            else:
                rep_frame_start = 1
                rep_frame_end = 1
                ext = os.path.splitext(file_path)[1][1:]

        elif not collections:
            logger.warning(
                "Couldn't find a collection for file pattern '%s'.",
                file_pattern
            )
            continue

        if len(collections) > 1:
            logger.warning(
                "More than one sequence find for the file pattern '%s'."
                " Using only first one: %s",
                file_pattern,
                collections,
            )
        collection = collections[0]

        if not ext:
            ext = collection.tail.lstrip(".")

        staging = os.path.dirname(list(collection)[0])
        success, rootless_staging_dir = anatomy.find_root_template_from_path(
            staging
        )
        if success:
            staging = rootless_staging_dir
        else:
            logger.warning(
                "Could not find root path for remapping '%s'."
                " This may cause issues on farm.",
                staging
            )

        if not rep_frame_start or not rep_frame_end:
            col_frame_range = list(collection.indexes)
            rep_frame_start = col_frame_range[0]
            rep_frame_end = col_frame_range[-1]

        tags = []
        if add_review:
            tags.append("review")

        if publish_to_sg:
            tags.append("shotgridreview")

        files = [os.path.basename(f) for f in list(collection)]
        # If it's a single file on the collection we remove it
        # from the list as OP checks if "files" is a list or tuple
        # at certain places to validate if it's a sequence or not
        if len(files) == 1:
            files = files[0]

        rep = {
            "name": rep_name,
            "ext": ext,
            "files": files,
            "frameStart": rep_frame_start,
            "frameEnd": rep_frame_end,
            # If expectedFile are absolute, we need only filenames
            "stagingDir": staging,
            "fps": instance_data.get("fps"),
            "tags": tags,
        }

        if instance_data.get("multipartExr", False):
            rep["tags"].append("multipartExr")

        # support conversion from tiled to scanline
        if instance_data.get("convertToScanline"):
            logger.info("Adding scanline conversion.")
            rep["tags"].append("toScanline")

        representations.append(rep)

        solve_families(instance_data, add_review)

    return representations


def get_colorspace_settings(project_name):
    """Returns colorspace settings for project.

    Returns:
        tuple | bool: config, file rules or None
    """
    config_data = get_imageio_config(
        project_name,
        host_name="nuke",  # temporary hack as get_imageio_config doesn't support grabbing just global
    )

    # in case host color management is not enabled
    if not config_data:
        return None

    return config_data


def set_representation_colorspace(
    representation,
    project_name,
    colorspace=None,
):
    """Sets colorspace data to representation.

    Args:
        representation (dict): publishing representation
        project_name (str): Name of project
        config_data (dict): host resolved config data
        file_rules (dict): host resolved file rules data
        colorspace (str, optional): colorspace name. Defaults to None.

    Example:
        ```
        {
            # for other publish plugins and loaders
            "colorspace": "linear",
            "config": {
                # for future references in case need
                "path": "/abs/path/to/config.ocio",
                # for other plugins within remote publish cases
                "template": "{project[root]}/path/to/config.ocio"
            }
        }
        ```

    """
    ext = representation["ext"]
    # check extension
    logger.debug("__ ext: `{}`".format(ext))

    config_data = get_colorspace_settings(project_name)

    if not config_data:
        # warn in case no colorspace path was defined
        logger.warning("No colorspace management was defined")
        return

    logger.debug("Config data is: `{}`".format(config_data))

    # infuse data to representation
    if colorspace:
        colorspace_data = {"colorspace": colorspace, "config": config_data}

        # update data key
        representation["colorspaceData"] = colorspace_data


def solve_families(instance_data, preview=False):
    families = instance_data.get("families")

    # if we have one representation with preview tag
    # flag whole instance_data for review and for ftrack
    if preview:
        if "review" not in families:
            logger.debug('Adding "review" to families because of preview tag.')
            families.append("review")
        instance_data["families"] = families

And finally the abstraction of Deadline so I can submit any plugin job using the same function:

# Default Deadline job
DEFAULT_PRIORITY = 50
DEFAULT_CHUNK_SIZE = 9999
DEAFAULT_CONCURRENT_TASKS = 1


def payload_submit(
    plugin,
    plugin_data,
    batch_name,
    task_name,
    group="",
    comment="",
    priority=DEFAULT_PRIORITY,
    chunk_size=DEFAULT_CHUNK_SIZE,
    concurrent_tasks=DEAFAULT_CONCURRENT_TASKS,
    frame_range=None,
    department="",
    extra_env=None,
    response_data=None,
):
    if not response_data:
        response_data = {}

    frames = "0" if not frame_range else f"{frame_range[0]}-{frame_range[1]}"

    payload = {
        "JobInfo": {
            # Top-level group name
            "BatchName": batch_name,
            # Job name, as seen in Monitor
            "Name": task_name,
            # Arbitrary username, for visualisation in Monitor
            "UserName": getpass.getuser(),
            "Priority": priority,
            "ChunkSize": chunk_size,
            "ConcurrentTasks": concurrent_tasks,
            "Department": department,
            "Pool": "",
            "SecondaryPool": "",
            "Group": group,
            "Plugin": plugin,
            "Frames": frames,
            "Comment": comment or "",
            # Optional, enable double-click to preview rendered
            # frames from Deadline Monitor
            # "OutputFilename0": preview_fname(render_path).replace("\\", "/"),
        },
        "PluginInfo": plugin_data,
        # Mandatory for Deadline, may be empty
        "AuxFiles": [],
    }

    if response_data.get("_id"):
        payload["JobInfo"].update(
            {
                "JobType": "Normal",
                "BatchName": response_data["Props"]["Batch"],
                "JobDependency0": response_data["_id"],
                "ChunkSize": 99999999,
            }
        )

    # Include critical environment variables with submission
    keys = [
        "AVALON_ASSET",
        "AVALON_TASK",
        "AVALON_PROJECT",
        "AVALON_APP_NAME",
        "OCIO",
        "USER",
        "OPENPYPE_SG_USER",
    ]

    # Add OpenPype version if we are running from build.
    if is_running_from_build():
        keys.append("OPENPYPE_VERSION")

    environment = dict(
        {key: os.environ[key] for key in keys if key in os.environ},
        **legacy_io.Session,
    )

    if extra_env:
        environment.update(extra_env)

    payload["JobInfo"].update(
        {
            "EnvironmentKeyValue%d"
            % index: "{key}={value}".format(
                key=key, value=environment[key]
            )
            for index, key in enumerate(environment)
        }
    )

    plugin = payload["JobInfo"]["Plugin"]
    logger.info("using render plugin : {}".format(plugin))

    logger.info("Submitting..")
    logger.info(json.dumps(payload, indent=4, sort_keys=True))

    url = "{}/api/jobs".format(constants.DEADLINE_URL)
    response = requests.post(url, json=payload, timeout=10)

    if not response.ok:
        raise Exception(response.text)

    return response.json()

Would be nice to combine some of these ideas with Felix’s and write some better API that allows us to choose farm or local execution easily

If you’re running from a completely separate python process - outside of any host or tray publisher you might need to register your own “host”.

Register a Pyblish host with pyblish.api.register_host

At one level that’s mostly for the Pyblish plugins, so you can use pyblish.api.register_host similar to e.g. here.

:point_up: That’s basically also the logic that openpype publish CLI uses and is also used currently for publish jobs from Deadline. I’m pretty sure that in this case this is what you’re looking to do.

More complex, register a full OpenPype host

The new publisher can get sligthly more complicated since it has a “context” that it can reset to. Usually on command line publishing you don’t need this, but it’d mean without this that ‘saving’ from the CreateContext does nothing since it can’t really save anywhere (it doesn’t know how to save the context).

For that you’d need to install your own BaseHost / IPublishHost implementation I believe. Similar to the tray publisher.

You might get by with installing the TrayPublisherHost:

from openpype.hosts.traypublisher.api import TrayPublisherHost
from openpype.pipeline import install_host

host = TrayPublisherHost()
install_host(host)

This will then basically allow to keep the publishing data (mostly designed for ‘resets’ and ‘updating’ of the publishing system). However, it’ll make the publishing think you are that host and thus all plugins targeting tray publisher will also trigger.

If you change the install logic inside the TrayPublisherHost to here do pyblish.api.register_host(self.name) then you might be able to use that full Host logic for storing the contexts without it targeting traypublisher plugins by doing:

from openpype.hosts.traypublisher.api import TrayPublisherHost
from openpype.pipeline import install_host

class FakeHost(TrayPublisherHost):
    name = "fake"

host = FakeHost()
install_host(host)

Likely what you need just for now is the first pyblish.api.register_host I mentioned.

BigRoy that’s exactly the kind of architectural guidance that I was hoping for.
Fabià thanks for the code, I’ll take a good look.

<3

Thanks

Using the backend API that I showed of abstracting the publish_version + submit publish to Deadline functions I have now written an ingest module that basically given a folder on disk, it recursively tries find all the possible “products”/“subsets” that we support and extract the name of the “asset”, “family”, “task”, “subset” and “representation” (and also if you want to override the version that will be used to publish) and then run that function to publish the files so it’s very easy for our editorial team to ingest anything we receive. This is very specific to our studio configuration of OP and naming conventions but I believe this is what @santi was after with this request and the same blockers I was hitting with the TrayPublisher framework. Of course this could be written as a creator plugin in TrayPublisher like the BatchMovieCreator plugin but it was much easier for me to write it this way than framing it into the TrayPublisher and pyblish framework so I’m not limited by that UI and other constrains.

ezgif-4-f0c03e78f2

1 Like

Hi Fabi that GUI looks great and very production-friendly!

On my part, I went a similar route, re-writing your script as an OP module, and walk a folder finding things to publish (using fuzzy matches since they come from outside i don’t want to be too clever with regex’s…).

I imagine the ‘matching logic’ is the bit that would change the most from studio to studio… it should be easily customizable. Would love to take a look at your code and see if we can separate the studio-specific stuff…

1 Like

What the hell - that’s looking sweet! It would be epic if we can get some of these tools built like these on the same foundation but ease the route there a little bit. It’d be great if some effort could go in @milan to streamlining or even merging some of the effort that @fabiaserra has done.

2 Likes

yeah certainly, I think the first step should be to clean up the OP APIs to facilitate the entry points for tools like this so then we don’t duplicate all the code on the different publishers, then this would be much cleaner as an Ayon module that calls that API and we can do the abstractions that Santi mentioned too to make it studio agnostic

1 Like

We have some budget to make something like this officially supported right now and are starting the work.

@fabiaserra is your code public in your repo, or private only?

2 Likes

My personal github repo I only use as an intermediate remote repo to create PRs against yours but it’s not the one we use in my company as that’s hosted in our own hosted gitlab server so I only push to github a few cherry-picked commits but I can have a look and push the batch ingester code

3 Likes

Here, I have pushed most of the changes on this branch and this is the module that contains the batch ingester that we use: https://github.com/fabiaserra/OpenPype/tree/feature/batch_ingester/openpype/modules/ingest

Of course a lot of things are very specific to our studio and would need extra abstraction but let me know if you have questions

2 Likes