nnsight.intervention#

exception nnsight.intervention.interleaver.Cancelation[source]#

Exception raised when a request is canceled.

exception nnsight.intervention.interleaver.EarlyStopException[source]#

Exception raised to stop the execution of the model.

class nnsight.intervention.interleaver.Events(*values)[source]#

Enum for different types of events in the interleaving process.

class nnsight.intervention.interleaver.Interleaver(mediators: List[Mediator] = [], tracer: InterleavingTracer = None, batcher: Batcher = None)[source]#

Manages the interleaving of model execution and interventions.

This class coordinates the flow between the model’s forward pass and user-defined intervention functions, allowing for inspection and modification of intermediate values.

mediators#

A dictionary of mediator names to mediator objects. Each meidator is responsible for a single invoke, or intervention function.

Type:

Dict[str, Mediator]

tracer#

The tracer object that created this interleaver. Occationaly useful to know the tracer type for this interleaving.

Type:

Optional[InterleavingTracer]

batcher#

The batcher object that manages the slice of inputs associtated with each mediator.

Type:

Batcher

current#

The current mediator that is being processed. Must be update before resuming a given mediator.

Type:

Mediator

cancel()[source]#

Cancel all mediators / intervention threads.

check_cache_full()[source]#

Print a warning if a module to be cached was missed.

handle(provider: Any | None = None, value: Any | None = None, iterate: bool = False)[source]#

Handle a provider’s value, allowing mediators to consume and modify it.

Parameters:
  • provider (Optional[Any]) – The identifier of the provider

  • value (Optional[Any]) – The value being provided

  • iterate (bool) – Whether to iterate the provider string

Returns:

The original or modified value

Return type:

Any

property interleaving: bool#

Check if the interleaver is currently interleaving.

Returns:

True if the interleaver is interleaving, False otherwise

Return type:

bool

iterate_provider(provider: str)[source]#

Update a provider string to include which iteration of the provider is being provided.

Parameters:

provider (str) – The provider string to update

Returns:

The updated provider string

Return type:

str

Example

>>> provider = "model.transformer.h[0].input"
>>> iterate_provider(provider)
"model.transformer.h[0].input.i0"
>>> provider = "model.transformer.h[0].input"
>>> iterate_provider(provider)
"model.transformer.h[0].input.i1"
>>> provider = "model.transformer.h[0].input"
>>> iterate_provider(provider)
"model.transformer.h[0].input.i2"
iterate_requester(requester: str)[source]#

Update a requester string to include which iteration of the requester is being requested. This is determined by the current mediator’s iteration attribute, or influced by .iter contexts.

Parameters:

requester (str) – The requester string to update

Returns:

The updated requester string

Return type:

str

wrap_module(module: Module)[source]#

Instruments a PyTorch module to intercept inputs and outputs for interleaving.

Parameters:

module (torch.nn.Module) – The module to instrument

Returns:

None

wrap_operation(fn: Callable, name: str, bound_obj: Any | None = None)[source]#

Wrap an operation to intercept inputs and outputs for intervention, as well as the function itself. Used by Envoy.source to hook into intermediate operations of a forward pass.

Parameters:
  • fn (Callable) – The intermediate operation function to wrap

  • name (str) – The fully qualified name of the operation

  • bound_obj (Optional[Any]) – The object fn is bound to if it is a method

Returns:

A wrapped version of the function

Return type:

Callable

class nnsight.intervention.interleaver.Mediator(intervention: Callable, info: Tracer.Info, name: str | None = None, batch_group: List[int] | None = None, stop: int | None = None)[source]#

Mediates between the model execution and a single intervention function.

This class handles the communication between the model’s forward pass and a user-defined intervention function, allowing for inspection and modification of intermediate values.

interleaver#

The interleaver that this mediator is currently running in

Type:

Interleaver

intervention#

The intervention function to mediate

Type:

Callable

info#

Information about the tracing context associated with this mediator

Type:

Tracer.Info

name#

Optional name for the mediator

Type:

Optional[str]

batch_group#

Optional batch group for the mediator to determine which slice of tensors are being intervened on

Type:

Optional[List[int]]

event_queue#

Where the mediator (worker thread) puts events to be processed by the interleaver (main thread). Will only ever have 1 or 0 items in the queue.

Type:

SimpleQueue

response_queue#

Where the interleaver (main thread) puts responses to events, to then be processed by the mediator (worker thread). Will only ever have 1 or 0 items in the queue.

Type:

SimpleQueue

worker#

The thread that runs the intervention function

Type:

Thread

history#

A set of providers that have been seen by the mediator. Used to detect out of order interventions.

Type:

Set[str]

iteration_tracker#

A dictionary tracking the number of times each provider has been seen by the mediator.

Type:

Dict[str, int]

iteration#

The current iteration this mediator is interventing on

Type:

int

user_cache#

A list of caches to be used by the mediator

Type:

List[Cache]

all_stop#

Optional number of times to execute this mediator

Type:

Optional[int]

exception OutOfOrderError[source]#

Exception raised when interventions are defined out of order.

cancel()[source]#

Cancel the intervention thread and its ephemeral state.

end()[source]#

Signal that execution should continue without further intervention.

exception(exception: Exception)[source]#

Signal that an exception occurred during intervention.

Parameters:

exception – The exception that occurred

property frame: FrameType#

Get the frame of the intervention function.

Returns:

The frame of the intervention function

handle(provider: str | None = None)[source]#

Process a provider and its value. Depending on which event this mediator is waiting on, it will either: - Respond with the value - Swap (replace) the value with a new value - Respond with an out of order error - Skip the value - Set a barrier - End the execution (cancels the mediator)

Parameters:

provider (Optional[str]) – The identifier of the provider

handle_barrier_event(provider: Any, participants: Set[str])[source]#

Handle a barrier event by setting a barrier.

handle_end_event()[source]#

Handle an end event by stopping the mediator.

handle_exception_event(exception: Exception)[source]#

Handle an exception event by raising the exception.

Parameters:

exception (Exception) – The exception to raise

Returns:

Flag to stop processing events.

Return type:

bool

handle_swap_event(provider: Any, requester: Any, swap_value: Any)[source]#

Handle a swap event by swapping the value if the provider matches the requester.

Parameters:
  • requester (str) – The identifier of the requester

  • provider (str) – The identifier of the provider

  • swap_value (Any) – The value to swap in

Returns:

Indicating whether the swap was fulfilled by this processor, If so, continue processing events.

Return type:

bool

handle_value_event(requester: Any, provider: Any) bool[source]#

Handle a value event by providing the requested value or recording a missed provider.

Parameters:
  • requester (str) – The identifier of the requester

  • provider (str) – The identifier of the provider

Returns:

Indicating whether the request was fulfilled by this processor, If so, continue processing events.

Return type:

bool

pull()[source]#

Pull variables from the interleaver state to the frame globals.

push()[source]#

Push local variables to the interleaver state.

request(requester: str)[source]#

Request a value from a specific provider.

Parameters:

requester (str) – The identifier of the provider to request a value from

Returns:

The requested value

Return type:

Any

respond(value: Any | None = None)[source]#

Respond from the interleaver (main thread) to the mediator (worker thread) the value for a pending event.

Parameters:

value (Optional[Any]) – The value to provide

send(event: Events, requester: Any)[source]#

Send an event to interleaver (main thread) from this mediator (worker thread), and wait for it to be processed by the interleaver.

Parameters:
  • event (Events) – The event to send

  • requester (Any) – The identifier of the requester, plus any additional data for the event.

Returns:

The response from the provider

Return type:

Any

set_user_cache(cache: Cache)[source]#

Set the user cache for this mediator.

Parameters:

cache – The cache to set

start(interleaver: Interleaver)[source]#

Start the mediator’s intervention thread.

Parameters:

interleaver (Interleaver) – The interleaver managing this mediator

stop()[source]#

Stop the execution of the model by raising an EarlyStopException.

swap(requester: str, value: Any)[source]#

Send a swap event to replace the value of a provider.

Parameters:
  • requester (str) – The identifier of the requester

  • value (Any) – The value to swap in

exception nnsight.intervention.interleaver.SkipException(value: Any)[source]#

Exception raised to skip the execution of the model.

class nnsight.intervention.serialization.CustomCloudPickler(file, protocol=None, buffer_callback=None)[source]#
class nnsight.intervention.serialization.CustomCloudUnpickler(file, root: Envoy, frame: FrameType)[source]#
load()[source]#

Load a pickle.

Read a pickled object representation from the open file object given in the constructor, and return the reconstituted object hierarchy specified therein.

class nnsight.intervention.inject.FunctionCallWrapper(name: str)[source]#
get_name(node: Call)[source]#

Extract and index function name from a Call node.