Federating Speckle Models

Merge 3 - The Subtle One.

This third option will be the longest, and most intricate but demonstrate best how the essence of Speckle works.

Speckle has long been praised for bringing the concept of object-level versioning and immutability to AEC, and rightly so. What is less well-understood until you get into the weeds (or the docs), is how to leverage the mechanics behind the Speckle magic.

Once an object has been sent to Speckle, its uniqueness is the property that matters most to our Connectors. Change any property of the source object, and the next time it is sent, a new Speckle object is created. Both exist; one is the latest and, in all likelihood, manifested in a Version Commit with all its latest partner objects.

However, if that object doesn’t change, then none of the Connectors send it again. A commit may include it in the “latest” set, but it is not sent. Instead, Speckle can use the originally sent object and include a reference to it in its place, known as a ReferenceObject. You can read all about the philosophy behind this in our documentation.

Why mention all of that? Well, we’ll use ReferenceObjects to gather the commits from earlier and show that the commit contains all the reference material.

I’ll reuse some of the objects we defined in Merge 2.

referenced_objects = [
    client.commit.get(stream_id, commit_id).referencedObject
    for commit_id in commit_ids
]

We can create a new Federation class, essentially just adding a name for the collection. (almost what you asked @Dickels112 - you can see we listen)

from specklepy.objects import Base

class Federation(Base, speckle_type="Federation"):
    def __init__(self, **kwargs):
      self["Components"] = []

new_commit_object = Base(speckle_type="Federation")

I use Components to mean the building blocks of the Federation

new_commit_object["Components"] = [
    Base.of_type("reference", referencedId=commit_id)
    for commit_id in referenced_objects
]**strong text**

This was incredibly simple, and for the most part, we are done. We define a commit as a Federation and add ReferenceObjects as its Components. Regarding Speckle data, that commit, if sent, “contains” the objects in the three reference commits.

Merge 3b - The Gotcha

For the Viewer to resolve this, commit it will require the “closure table” for each reference object. These closures are used as a shortcut to handle processable things. Essentially, we provide the Viewer with a telephone directory (remember them( of all the child objects.

This doesn’t come for free, but we can add a custom operation to our script to get this from the server.

Ideally, we’d check the localTransport first to see if we have the closure table already, but we’ll get it by querying the server for brevity.

We’ll make a straight GraphQL query of the commit. Below is a helper function that will return for a given object_id

from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport


def get_closures(wrapper, object_id):

    # define a graphQL client
    client = Client(
        transport=RequestsHTTPTransport(
            url=f"{wrapper._account.serverInfo.url}/graphql", verify=True, retries=3
        )
    )

    # define the query
    query = gql(
        """ query Object($stream_id: String!, $object_id: String!) { 
            stream(id: $stream_id) { 
              object(id: $object_id) { 
                data 
              }
            }
          } """
    )
    params = {"stream_id": wrapper.stream_id, "object_id": object_id}

    # Execute the query and profit.
    return client.execute(query, variable_values=params)["stream"]["object"]["data"][
        "__closure"
    ]

To describe what this query asks for, the given Stream and Object (for which we mean the commit objects) return the data property. Commit objects don’t typically contain much data, but one property they possess is the __closure table from the Connector that made the commit in the first place. If we commit our Federation object as it is, the specklepy SDK won’t create that for us.

So, The new_commit_object will need the __closure table from each commit we are merging. We can use the get_closures function we created earlier to get this.

At this point, we could refactor to always using Lists rather than numbered variables, but for now, we’ll add the closures to the new commit object.

closures = {
    k: v
    for d in [get_closures(wrappers[0], obj_id) for obj_id in referenced_objects]
    for k, v in d.items()
}
closures.update(dict.fromkeys(referenced_objects, 1))

new_commit_object["__closure"] = closures

I will reuse the helper function from Merge 2 to check if a ‘Federation’ branch exists and, if not, create it.

branch = try_get_branch_or_create(client, stream_id, "federated-by-reference")

As before we can hash the commit object to add objects to Speckle Server

hash_2 = operations.send(base=new_commit_object, transports=[transport])

All done… ?
…No! This doesn’t work as the default specklepy traversal strips props with the __ prefix, nor does it resolve the closure for Reference Objects. So we’ll need to add a custom operation to the server to fix this.

from typing import Any, Dict, List, Optional, Tuple
from specklepy.serialization.base_object_serializer import BaseObjectSerializer
from uuid import uuid4
import hashlib
import re
from enum import Enum
from specklepy.objects.base import Base, DataChunk
import ujson

PRIMITIVES = (int, float, str, bool)


def traverse_base(
    serializer: BaseObjectSerializer, base: Base, closures: Dict[str, Any] = {}
):
    if serializer.write_transports:
        for wt in serializer.write_transports:
            wt.begin_write()

    if not serializer.detach_lineage:
        serializer.detach_lineage = [True]

        serializer.lineage.append(uuid4().hex)
        object_builder = {"id": "", "speckle_type": "Base", "totalChildrenCount": 0}
        object_builder.update(speckle_type=base.speckle_type)
        obj, props = base, base.get_serializable_attributes()

        while props:
            prop = props.pop(0)
            value = getattr(obj, prop, None)
            chunkable = False
            detach = False

            # skip props marked to be ignored with "__" or "_"
            if prop.startswith(("__", "_")):
                continue

            # don't prepopulate id as this will mess up hashing
            if prop == "id":
                continue

            # only bother with chunking and detaching if there is a write transport
            if serializer.write_transports:
                dynamic_chunk_match = prop.startswith("@") and re.match(
                    r"^@\((\d*)\)", prop
                )
                if dynamic_chunk_match:
                    chunk_size = dynamic_chunk_match.groups()[0]
                    serializer._chunkable[prop] = (
                        int(chunk_size) if chunk_size else base._chunk_size_default
                    )

                chunkable = prop in base._chunkable
                detach = bool(
                    prop.startswith("@") or prop in base._detachable or chunkable
                )

            # 1. handle None and primitives (ints, floats, strings, and bools)
            if value is None or isinstance(value, PRIMITIVES):
                object_builder[prop] = value
                continue

            # NOTE: for dynamic props, this won't be re-serialised as an enum but as an int
            if isinstance(value, Enum):
                object_builder[prop] = value.value
                continue

            # 2. handle Base objects
            elif isinstance(value, Base):
                child_obj = serializer.traverse_value(value, detach=detach)
                if detach and serializer.write_transports:
                    ref_id = child_obj["id"]
                    object_builder[prop] = serializer.detach_helper(ref_id=ref_id)
                else:
                    object_builder[prop] = child_obj

            # 3. handle chunkable props
            elif chunkable and serializer.write_transports:
                chunks = []
                max_size = base._chunkable[prop]
                chunk = DataChunk()
                for count, item in enumerate(value):
                    if count and count % max_size == 0:
                        chunks.append(chunk)
                        chunk = DataChunk()
                    chunk.data.append(item)
                chunks.append(chunk)

                chunk_refs = []
                for c in chunks:
                    serializer.detach_lineage.append(detach)
                    ref_id, _ = serializer._traverse_base(c)
                    ref_obj = serializer.detach_helper(ref_id=ref_id)
                    chunk_refs.append(ref_obj)
                object_builder[prop] = chunk_refs

            # 4. handle all other cases
            else:
                child_obj = serializer.traverse_value(value, detach)
                object_builder[prop] = child_obj

            closure = {}
            # add closures & children count to the object
            detached = serializer.detach_lineage.pop()
            if serializer.lineage[-1] in serializer.family_tree:
                closure = {
                    ref: depth - len(serializer.detach_lineage)
                    for ref, depth in serializer.family_tree[
                        serializer.lineage[-1]
                    ].items()
                }

            ############ ADDING OUR MAGIC HERE #################################
            closure.update(closures)

            object_builder["totalChildrenCount"] = len(closure)

            obj_id = hashlib.sha256(ujson.dumps(object_builder).encode()).hexdigest()[
                :32
            ]

            object_builder["id"] = obj_id
            if closure:
                object_builder["__closure"] = serializer.closure_table[obj_id] = closure

            # write detached or root objects to transports
            if detached and serializer.write_transports:
                for t in serializer.write_transports:
                    t.save_object(
                        id=obj_id, serialized_object=ujson.dumps(object_builder)
                    )

            del serializer.lineage[-1]

            if serializer.write_transports:
                for wt in serializer.write_transports:
                    wt.end_write()

            return obj_id, object_builder

WOW. What was that? It is a modified form of the traverse_base method of the BaseObjectSerializer in specklepy. Ordinarily you don’t need to worry about the Base

The version above extracts the function from the serializer class and add the ability to pass in custom closures (because, by default, it won’t make any for a purely referenceObject commit.

We can use that modified method by injecting the closures and the standard BaseObjectSerializer class.

serializer = BaseObjectSerializer(write_transports=[transport])

obj_id, serialized_object = traverse_base(serializer, new_commit_object, closures)

It isn’t necessary, but I have returned the serialized_object for inspection purposes print()ing it shows wat we have achieved

{'id': '5e9ac0017b74034997dbe5fa45714a90',
 'speckle_type': 'Base',
 'totalChildrenCount': 482,
 'Components': [{'id': '8ca84c1c0447b4caaed8b622dad90263',
   'speckle_type': 'reference',
   'totalChildrenCount': 0,
   'applicationId': None,
   'referencedId': 'f048873d78d8833e1a2c0d7c2391a9bb',
   'units': None},
  {'id': 'e4b7f1ace651fa8a899d4860a0572af6',
   'speckle_type': 'reference',
   'totalChildrenCount': 0,
   'applicationId': None,
   'referencedId': 'de61f36d6a4c6b9713e445ab4d801ea9',
   'units': None},
  {'id': '5d1c1e466dd4df7ae76c7c9183b4317f',
   'speckle_type': 'reference',
   'totalChildrenCount': 0,
   'applicationId': None,
   'referencedId': '90f505f7625cd121e99af6e81a1a1013',
   'units': None}],
 '__closure': {'0042e47be89ba7af3cd0344012dd44fb': 6,
  '0225bdfc617ae2e2cfa3182e5f319026': 8,
  '03ab601e5a6e7743dbada875bd634a3d': 3,
  '04849987174c213dcfba897757bcf4b4': 6,
  '04b68bc41ce7aa7e58e088e997193684': 5,
  '062f59e346ab9ba7f59d60a46b4e421a': 4,
  '085d6f93043117211d14fbf9d5443b6a': 6,
  '09514b6698a1bd2eb1416cf67ffd0f7a': 6,

... SNIP 100s of object ids... 

  'de61f36d6a4c6b9713e445ab4d801ea9': 1,
  '90f505f7625cd121e99af6e81a1a1013': 1}}

There’s that telephone directory. The Speckle viewer loves it :heart:

We can race to the end now:

commit_id2 = client.commit.create(
    branch_name=branch.name,
    stream_id=stream_id,
    object_id=obj_id,
    message="federated commit",
)

Once again we build the embed URL and display it.

embed_url2 = f"https://speckle.xyz/embed?stream={stream_id}&commit={commit_id2}&transparent={transparency}&autoload={autoload}&hidecontrols={hide_controls}&hidesidebar={hide_sidebar}&hideselectioninfo={hide_selection_info}"

from IPython.display import IFrame

IFrame(embed_url2, width=400, height=300)

Wrapping up.

This federation is quite simple, quite clunky and doesn’t de-dupe at all, as it does not even examine the individual commits’ content.

To do anything approaching this, we need to revisit Merge 2:

  • load the child members of each commit
  • have a strategy for de-duping
  • have a strategy for merging
  • have a strategy for filtering
  • have a strategy for handling any other conflicts
6 Likes