"""Interface to Scenic."""
import importlib.metadata
import scenic
from scenic.core.distributions import needsSampling, Options
from scenic.core.vectors import Vector
from scenic.core.type_support import canCoerceType, coerce, underlyingType
# TODO unify handling of these custom types!
from scenic.simulators.utils.colors import Color
from scenic.simulators.gta.interface import CarModel as GTACarModel
from scenic.simulators.webots.road.car_models import (
CarModel as WebotsCarModel, carModels as webotsCarModels)
from verifai.features import (Constant, Categorical, Real, Box, Array, Struct,
Feature, FeatureSpace)
from verifai.samplers.feature_sampler import FeatureSampler
from verifai.utils.frozendict import frozendict
scenicMajorVersion = int(importlib.metadata.version('scenic').split('.')[0])
if scenicMajorVersion >= 3:
from scenic.core.vectors import Orientation
else:
Orientation = None
scalarDomain = Real()
vectorDim = 2 if scenicMajorVersion < 3 else 3
vectorDomain = Array(scalarDomain, (vectorDim,))
orientationDomain = Array(scalarDomain, (4,)) # as quaternions
gtaModelDomain = Categorical(*GTACarModel.models.values())
webotsModelDomain = Categorical(*webotsCarModels)
colorDomain = Box((0, 1), (0, 1), (0, 1))
def convertToVerifaiType(value, strict=True):
"""Attempt to convert a Scenic value to a type known to VerifAI"""
ty = underlyingType(value)
if ty is float or ty is int:
return float(value)
elif ty is list or ty is tuple:
return tuple(convertToVerifaiType(e, strict=strict) for e in value)
elif issubclass(ty, dict) and not needsSampling(value):
return frozendict(value)
elif ty is GTACarModel:
return value
elif ty is WebotsCarModel:
return value
elif ty is Color:
return value
elif canCoerceType(ty, Vector):
return tuple(coerce(value, Vector))
elif Orientation and isinstance(value, Orientation):
return tuple(value.getRotation().as_quat())
elif strict: # Unknown type, so give up if we're being strict
raise RuntimeError(
f'attempted to convert Scenic value {value} of unknown type {ty}')
else:
return value
def domainForValue(value):
"""Return a Domain for this type of Scenic value, when possible"""
ty = underlyingType(value)
if ty is float or ty is int:
domain = scalarDomain
elif ty is GTACarModel:
domain = gtaModelDomain
elif ty is WebotsCarModel:
domain = webotsModelDomain
elif ty is Color:
domain = colorDomain
elif canCoerceType(ty, Vector):
domain = vectorDomain
elif ty is Orientation:
domain = orientationDomain
elif ty is str:
# We can only handle strings when they come from a finite set of
# possibilities; we heuristically detect that here.
if isinstance(value, Options):
domain = Categorical(*value.options)
else:
domain = None # we can't ensure the domain is finite
else:
domain = None # no corresponding Domain known
if not needsSampling(value):
# We can handle constants of unknown types, but when possible we
# convert the value to a VerifAI type.
value = convertToVerifaiType(value, strict=False)
return Constant(value)
return domain
def pointForValue(dom, scenicValue):
"""Convert a sampled Scenic value to a point in the corresponding Domain"""
if isinstance(dom, Constant):
value = convertToVerifaiType(scenicValue, strict=False)
assert value == dom.value
return value
elif isinstance(dom, Categorical):
value = convertToVerifaiType(scenicValue, strict=False)
assert value in dom.values
return value
elif dom == scalarDomain:
if not isinstance(scenicValue, (float, int)):
raise RuntimeError(
f'could not coerce Scenic value {scenicValue} to scalar')
return coerce(scenicValue, float)
elif dom == vectorDomain:
return tuple(coerce(scenicValue, Vector))
elif dom == orientationDomain:
return tuple(scenicValue.getRotation().as_quat())
elif dom == colorDomain:
assert isinstance(scenicValue, (tuple, list))
assert len(scenicValue) == 3
return scenicValue
else:
raise RuntimeError(
f'Scenic value {scenicValue} has unexpected domain {dom}')
#: Scenic global parameters not included in generated samples
ignoredParameters = {
'externalSampler', 'externalSamplerRejectionFeedback',
'verifaiSamplerType', 'verifaiSamplerParams',
'behavior',
}
#: Scenic object properties not included in generated samples
defaultIgnoredProperties = {
'shape',
'viewAngle', 'viewAngles', 'visibleDistance', 'cameraOffset',
'viewRayDensity', 'viewRayCount', 'viewRayDistanceScaling',
'baseOffset', 'contactTolerance', 'onDirection', 'sideComponentThresholds',
'allowCollisions', 'requireVisible', 'regionContainedIn', 'occluding',
'showVisibleRegion',
'mutator', 'mutationEnabled', 'mutationScale',
'positionStdDev', 'headingStdDev', 'orientationStdDev',
'behavior', 'lastActions',
}
# certain built-in properties requiring type normalization before Scenic 3
normalizedProperties = {
'position': Vector,
'heading': float
} if scenicMajorVersion < 3 else {}
# hard-coded Domains for certain properties
specialDomainProperties = {
'webotsType': Categorical(*(model.name for model in webotsCarModels)),
'color': colorDomain, # to allow Colors, tuples, or lists
}
def domainForObject(obj, ignoredProperties):
"""Construct a Domain for the given Scenic Object"""
domains = {}
for prop in obj.properties:
if prop in ignoredProperties or prop.startswith('_'):
continue
value = getattr(obj, prop)
if prop in normalizedProperties:
value = coerce(value, normalizedProperties[prop])
# TODO improve this system... (need to get better type info in Scenic)
if prop in specialDomainProperties and needsSampling(value):
dom = specialDomainProperties[prop]
else:
dom = domainForValue(value)
if dom is None:
ty = underlyingType(value)
print(f'WARNING: skipping property "{prop}" of unknown type {ty}')
else:
domains[prop] = dom
# add type as additional property
value = type(obj).__name__
dom = domainForValue(value)
assert dom is not None
assert 'type' not in domains
domains['type'] = dom
return Struct(domains)
def pointForObject(dom, obj):
"""Convert a sampled Scenic Object to a point in its Domain"""
values = []
for prop, subdom in dom.domainNamed.items():
if prop == 'type': # special case for 'type' pseudo-property
scenicValue = type(obj).__name__
elif (prop == 'speed'
and isinstance(subdom, Constant)
and scenicMajorVersion < 3):
# work around bug in Scenic 2.1
scenicValue = subdom.value
else:
scenicValue = getattr(obj, prop)
values.append(pointForValue(subdom, scenicValue))
return dom.makePoint(*values)
def spaceForScenario(scenario, ignoredProperties):
"""Construct a FeatureSpace for the given Scenic Scenario."""
# create domains for objects
assert scenario.egoObject is scenario.objects[0]
doms = (domainForObject(obj, ignoredProperties)
for obj in scenario.objects)
objects = Struct({
ScenicSampler.nameForObject(i): dom
for i, dom in enumerate(doms)
})
# create domains for global parameters
paramDoms = {}
quotedParams = {}
for param, value in scenario.params.items():
if param in ignoredParameters:
continue
dom = domainForValue(value)
if dom is None:
ty = underlyingType(value)
print(f'WARNING: skipping param "{param}" of unknown type {ty}')
else:
if not param.isidentifier(): # munge quoted parameter names
newparam = 'quoted_param' + str(len(quotedParams))
quotedParams[newparam] = param
param = newparam
paramDoms[param] = dom
params = Struct(paramDoms)
space = FeatureSpace({
'objects': Feature(objects),
'params': Feature(params)
})
return space, quotedParams
[docs]class ScenicSampler(FeatureSampler):
"""Samples from the induced distribution of a Scenic scenario.
Created using the `fromScenario` and `fromScenicCode` class methods.
See :ref:`scene generation` in the Scenic documentation for details of how
Scenic's sampler works. Note that VerifAI's other samplers can be used from
within a Scenic scenario by defining :term:`external parameters`.
"""
def __init__(self, scenario, maxIterations=None, ignoredProperties=None):
self.scenario = scenario
self.maxIterations = 2000 if maxIterations is None else maxIterations
self.lastScene = None
if ignoredProperties is None:
ignoredProperties = defaultIgnoredProperties
space, self.quotedParams = spaceForScenario(scenario, ignoredProperties)
super().__init__(space)
[docs] @classmethod
def fromScenario(cls, path, maxIterations=None,
ignoredProperties=None, **kwargs):
"""Create a sampler corresponding to a Scenic program.
The only required argument is ``path``, and ``maxIterations`` may be useful if
your scenario requires a very large number of rejection sampling iterations.
See `scenic.scenarioFromFile` for details on optional keyword arguments used to
customize compilation of the Scenic file.
Args:
path (str): path to a Scenic file.
maxIterations (int): maximum number of rejection sampling iterations
(default 2000).
ignoredProperties: properties of Scenic objects to not include in
generated samples (see ``defaultIgnoredProperties`` for the default).
kwargs: additional keyword arguments passed to `scenic.scenarioFromFile`;
e.g. ``params`` to override global parameters or ``model`` to set the
:term:`world model`.
"""
scenario = scenic.scenarioFromFile(path, **kwargs)
return cls(scenario, maxIterations=maxIterations,
ignoredProperties=ignoredProperties)
[docs] @classmethod
def fromScenicCode(cls, code, maxIterations=None,
ignoredProperties=None, **kwargs):
"""As above, but given a Scenic program as a string."""
scenario = scenic.scenarioFromString(code, **kwargs)
return cls(scenario, maxIterations=maxIterations,
ignoredProperties=ignoredProperties)
def nextSample(self, feedback=None):
ret = self.scenario.generate(
maxIterations=self.maxIterations, feedback=feedback, verbosity=0
)
self.lastScene, _ = ret
return self.pointForScene(self.lastScene)
[docs] def pointForScene(self, scene):
"""Convert a sampled Scenic :obj:`~scenic.core.scenarios.Scene` to a point in our feature space.
The `FeatureSpace` used by this sampler consists of 2 features:
* ``objects``, which is a `Struct` consisting of attributes ``object0``,
``object1``, etc. with the properties of the corresponding objects
in the Scenic program. The names of these attributes may change in a
future version of VerifAI: use the `nameForObject` function to
generate them.
* ``params``, which is a `Struct` storing the values of the
:term:`global parameters` of the Scenic program (use
`paramDictForSample` to extract them).
"""
lengths, dom = self.space.domains
assert lengths is None
assert scene.egoObject is scene.objects[0]
objDomain = dom.domainNamed['objects']
assert len(objDomain.domains) == len(scene.objects)
objects = {
self.nameForObject(i):
pointForObject(objDomain.domainNamed[self.nameForObject(i)], obj)
for i, obj in enumerate(scene.objects)
}
objPoint = objDomain.makePoint(**objects)
paramDomain = dom.domainNamed['params']
params = {}
for param, subdom in paramDomain.domainNamed.items():
originalName = self.quotedParams.get(param, param)
params[param] = pointForValue(subdom, scene.params[originalName])
paramPoint = paramDomain.makePoint(**params)
return self.space.makePoint(objects=objPoint, params=paramPoint)
[docs] @staticmethod
def nameForObject(i):
"""Name used in the `FeatureSpace` for the Scenic object with index i.
That is, if ``scene`` is a :obj:`~scenic.core.scenarios.Scene`, the object
``scene.objects[i]``.
"""
return f'object{i}'
[docs] def paramDictForSample(self, sample):
"""Recover the dict of :term:`global parameters` from a `ScenicSampler` sample."""
params = sample.params._asdict()
corrected = {}
for newName, quotedParam in self.quotedParams.items():
corrected[quotedParam] = params.pop(newName)
corrected.update(params)
return corrected