import torch
import torch.nn as nn
import torch_geometric.graphgym.register as register
from torch_geometric.graphgym.config import cfg
from torch_geometric.graphgym.register import register_node_encoder
[docs]
@register_node_encoder('LapPE')
class LapPENodeEncoder(torch.nn.Module):
"""Laplace Positional Embedding node encoder.
LapPE of size dim_pe will get appended to each node feature vector.
If `expand_x` set True, original node features will be first linearly
projected to (dim_emb - dim_pe) size and the concatenated with LapPE.
Parameters:
dim_emb (int): Size of final node embedding
expand_x (bool): Expand node features `x` from dim_in to (dim_emb - dim_pe)
"""
def __init__(self, dim_emb, expand_x=True):
super().__init__()
dim_in = cfg.share.dim_in # Expected original input node features dim
pecfg = cfg.posenc_LapPE
dim_pe = pecfg.dim_pe # Size of Laplace PE embedding
model_type = pecfg.model # Encoder NN model type for PEs
if model_type not in ['Transformer', 'DeepSet']:
raise ValueError(f"Unexpected PE model {model_type}")
self.model_type = model_type
n_layers = pecfg.layers # Num. layers in PE encoder model
n_heads = pecfg.n_heads # Num. attention heads in Trf PE encoder
post_n_layers = pecfg.post_layers # Num. layers to apply after pooling
max_freqs = pecfg.eigen.max_freqs # Num. eigenvectors (frequencies)
norm_type = pecfg.raw_norm_type.lower() # Raw PE normalization layer type
self.pass_as_var = pecfg.pass_as_var # Pass PE also as a separate variable
if dim_emb - dim_pe < 0: # formerly 1, but you could have zero feature size
raise ValueError(f"LapPE size {dim_pe} is too large for "
f"desired embedding size of {dim_emb}.")
if expand_x and dim_emb - dim_pe > 0:
self.linear_x = nn.Linear(dim_in, dim_emb - dim_pe)
self.expand_x = expand_x and dim_emb - dim_pe > 0
# Initial projection of eigenvalue and the node's eigenvector value
self.linear_A = nn.Linear(2, dim_pe)
if norm_type == 'batchnorm':
self.raw_norm = nn.BatchNorm1d(max_freqs)
else:
self.raw_norm = None
activation = nn.ReLU # register.act_dict[cfg.gnn.act]
if model_type == 'Transformer':
# Transformer model for LapPE
encoder_layer = nn.TransformerEncoderLayer(d_model=dim_pe,
nhead=n_heads,
batch_first=True)
self.pe_encoder = nn.TransformerEncoder(encoder_layer,
num_layers=n_layers)
else:
# DeepSet model for LapPE
layers = []
if n_layers == 1:
layers.append(activation())
else:
self.linear_A = nn.Linear(2, 2 * dim_pe)
layers.append(activation())
for _ in range(n_layers - 2):
layers.append(nn.Linear(2 * dim_pe, 2 * dim_pe))
layers.append(activation())
layers.append(nn.Linear(2 * dim_pe, dim_pe))
layers.append(activation())
self.pe_encoder = nn.Sequential(*layers)
self.post_mlp = None
if post_n_layers > 0:
# MLP to apply post pooling
layers = []
if post_n_layers == 1:
layers.append(nn.Linear(dim_pe, dim_pe))
layers.append(activation())
else:
layers.append(nn.Linear(dim_pe, 2 * dim_pe))
layers.append(activation())
for _ in range(post_n_layers - 2):
layers.append(nn.Linear(2 * dim_pe, 2 * dim_pe))
layers.append(activation())
layers.append(nn.Linear(2 * dim_pe, dim_pe))
layers.append(activation())
self.post_mlp = nn.Sequential(*layers)
def forward(self, batch):
if not (hasattr(batch, 'EigVals') and hasattr(batch, 'EigVecs')):
raise ValueError("Precomputed eigen values and vectors are "
f"required for {self.__class__.__name__}; "
"set config 'posenc_LapPE.enable' to True")
EigVals = batch.EigVals
EigVecs = batch.EigVecs
if self.training:
sign_flip = torch.rand(EigVecs.size(1), device=EigVecs.device)
sign_flip[sign_flip >= 0.5] = 1.0
sign_flip[sign_flip < 0.5] = -1.0
EigVecs = EigVecs * sign_flip.unsqueeze(0)
pos_enc = torch.cat((EigVecs.unsqueeze(2), EigVals), dim=2) # (Num nodes) x (Num Eigenvectors) x 2
empty_mask = torch.isnan(pos_enc) # (Num nodes) x (Num Eigenvectors) x 2
pos_enc[empty_mask] = 0 # (Num nodes) x (Num Eigenvectors) x 2
if self.raw_norm:
pos_enc = self.raw_norm(pos_enc)
pos_enc = self.linear_A(pos_enc) # (Num nodes) x (Num Eigenvectors) x dim_pe
# PE encoder: a Transformer or DeepSet model
if self.model_type == 'Transformer':
pos_enc = self.pe_encoder(src=pos_enc,
src_key_padding_mask=empty_mask[:, :, 0])
else:
pos_enc = self.pe_encoder(pos_enc)
# Remove masked sequences; must clone before overwriting masked elements
pos_enc = pos_enc.clone().masked_fill_(empty_mask[:, :, 0].unsqueeze(2),
0.)
# Sum pooling
pos_enc = torch.sum(pos_enc, 1, keepdim=False) # (Num nodes) x dim_pe
# MLP post pooling
if self.post_mlp is not None:
pos_enc = self.post_mlp(pos_enc) # (Num nodes) x dim_pe
# Expand node features if needed
if self.expand_x:
h = self.linear_x(batch.x)
else:
h = batch.x
# Concatenate final PEs to input embedding
batch.x = torch.cat((h, pos_enc), 1)
# Keep PE also separate in a variable (e.g. for skip connections to input)
if self.pass_as_var:
batch.pe_LapPE = pos_enc
return batch