Source code for gt4sd.training_pipelines.regression_transformer.implementation

#
# MIT License
#
# Copyright (c) 2022 GT4SD team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
"""Regression Transformer training implementation."""
import json
import logging
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from terminator.collators import TRAIN_COLLATORS
from terminator.tokenization import ExpressionBertTokenizer
from terminator.trainer import CustomTrainer, get_trainer_dict
from transformers import (
    CONFIG_MAPPING,
    MODEL_WITH_LM_HEAD_MAPPING,
    AutoConfig,
    AutoModelWithLMHead,
    DataCollatorForPermutationLanguageModeling,
    LineByLineTextDataset,
    set_seed,
)

from ..core import TrainingPipeline, TrainingPipelineArguments
from .utils import (
    get_hf_training_arg_object,
    get_train_config_dict,
    prepare_datasets_from_files,
)

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

MODEL_CONFIG_CLASSES = list(MODEL_WITH_LM_HEAD_MAPPING.keys())  # type: ignore
MODEL_TYPES = tuple(conf.model_type for conf in MODEL_CONFIG_CLASSES)


[docs]class RegressionTransformerTrainingPipeline(TrainingPipeline): """RegressionTransformer training pipeline."""
[docs] def train( # type: ignore self, training_args: Dict[str, Any], model_args: Dict[str, Any], dataset_args: Dict[str, Any], ) -> None: """Generic training function for training a Regression Transformer (RT) model. For details see: Born, J., & Manica, M. (2023). Regression Transformer enables concurrent sequence regression and generation for molecular language modelling. `Nature Machine Intelligence`, 5(4), 432-444. Args: training_args: training arguments passed to the configuration. model_args: model arguments passed to the configuration. dataset_args: dataset arguments passed to the configuration. """ try: params = {**training_args, **dataset_args, **model_args} # Setup logging logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", level=logging.INFO, ) if not params["do_train"]: logger.info("Nothing to do.") return training_name = params.get("training_name", "rt_training") logger.info(f"Model with name {training_name} starts.") self.setup_model(params) # Register training_dataset and eval_dataset self.train_dataset, self.test_dataset = self.setup_dataset(**params) logger.info( f"# samples: {len(self.train_dataset)}, {len(self.test_dataset)}." ) # Model logging num_params = sum( p.numel() for p in self.model.parameters() if p.requires_grad ) typ = type(self.model) logger.info(f"# parameters: {num_params}. Model: {typ}") if typ != "xlnet": logger.warning(f"Full functionality only with XLNet; not {typ}") # Setup training configuration self.model.resize_token_embeddings(len(self.tokenizer)) if training_args["alternate_steps"] <= 0: # No alternation of training objectives means: Vanilla PLM training data_collator = DataCollatorForPermutationLanguageModeling( tokenizer=self.tokenizer, plm_probability=training_args["plm_probability"], max_span_length=training_args["max_span_length"], ) alternating_collator = None else: data_collator = TRAIN_COLLATORS["property"]( tokenizer=self.tokenizer, property_tokens=self.properties, num_tokens_to_mask=None, mask_token_order=None, ) alternating_collator = TRAIN_COLLATORS[training_args["cg_collator"]]( tokenizer=self.tokenizer, property_tokens=self.properties, plm_probability=training_args["plm_probability"], max_span_length=training_args["max_span_length"], do_sample=False, entity_separator_token=training_args["entity_separator_token"], mask_entity_separator=training_args["mask_entity_separator"], entity_to_mask=training_args["entity_to_mask"], ) # Initialize our Trainer train_config = get_train_config_dict(training_args, set(self.properties)) os.makedirs(params["output_dir"], exist_ok=True) with open( os.path.join(params["output_dir"], "training_config.json"), "w" ) as f: json.dump(train_config, f, indent="\t") # Create the inference.json inference_dict = { "property_token": self.properties, "example": self.example_sample, "property_ranges": { p.name: [p.minimum, p.maximum] for p in self.property_objects.values() }, "normalize": [False] * len(self.properties), # True not supported atm "max_span_length": training_args["max_span_length"], "property_mask_length": { p.name: p.mask_length for p in self.property_objects.values() }, } with open(os.path.join(params["output_dir"], "inference.json"), "w") as f: json.dump(inference_dict, f, indent="\t") custom_trainer_params = get_trainer_dict(self.model_params) hf_train_object = get_hf_training_arg_object(training_args) trainer = CustomTrainer( model=self.model, args=hf_train_object, data_collator=data_collator, train_dataset=self.train_dataset, eval_dataset=self.test_dataset, tokenizer=self.tokenizer, alternating_collator=alternating_collator, train_config=train_config, **custom_trainer_params, ) trainer.train(model_path=params["output_dir"]) trainer.save_model() # type: ignore except Exception: logger.exception( "Exception occurred while running RegressionTransformerTrainingPipeline." )
[docs] def setup_model(self, params: Dict[str, Any]): """ Error handling and training setup routine. Args: params: A dictionary with all parameters to launch training. Raises: ValueError: If flawed values are passed. """ if params.get("test_data_path") is None and params["do_eval"]: raise ValueError( "Cannot do evaluation without an evaluation data file. Either supply a " "file to --test_data_path or remove the --do_eval argument." ) if params["output_dir"] is None: raise ValueError( "You have to specify an output directory for the trained model" ) if ( os.path.exists(params["output_dir"]) and os.listdir(params["output_dir"]) and not params["overwrite_output_dir"] ): raise ValueError( f"Output directory ({params['output_dir']}) exists and is not empty. " "Use --overwrite_output_dir to overcome." ) # Set seed set_seed(params["seed"]) # Load model configuration. # Priorities: # 1) If provided, take it from the model path: `model_path`. # 2) If provided, take model configuration from `config_name`. # 3) Instantiate a fresh model. if params["model_path"] is None and params["model_type"] is None: raise ValueError( "Either pass pretrained model via `model_path` or specify" "which model to use via `model_typ`." ) if params["model_path"]: config = AutoConfig.from_pretrained( params["model_path"], cache_dir=params["cache_dir"], ) self.model_params = config.__dict__ self.model = AutoModelWithLMHead.from_pretrained( params["model_path"], from_tf=False, config=config, cache_dir=params["cache_dir"], ) logger.info(f"Model restored from {params['model_path']}") elif params["config_name"] is not None: with open(params["config_name"], "r") as f: self.model_params = json.load(f) config = AutoConfig.from_pretrained( params["config_name"], cache_dir=params["cache_dir"], mem_len=self.model_params.get("mem_len", 1024), ) else: config = CONFIG_MAPPING[params["model_type"]]() self.model_params = config.__dict__ logger.warning( f"Instantiating a new config instance: {params['model_type']}." ) # Load tokenizer if params["model_path"]: # If model_path was provided we load tokenizer from there self.tokenizer = ExpressionBertTokenizer.from_pretrained( params["model_path"], cache_dir=params["cache_dir"] ) elif params["tokenizer_name"]: self.tokenizer = ExpressionBertTokenizer.from_pretrained( params["tokenizer_name"], cache_dir=params["cache_dir"] ) else: raise ValueError( f"No support for creating new tokenizer for: {params['model_type']}." ) if not params["model_path"]: logger.info("Training new model from scratch") self.model = AutoModelWithLMHead.from_config(config)
[docs] def setup_dataset( self, train_data_path: str, test_data_path: str, augment: int = 0, save_datasets: bool = False, *args, **kwargs, ): """ Constructs the dataset objects. Args: train_data_path: Path to `.csv` file. Has to have a `text` column and at least one column of numerical properties. train_data_path: Path to `.csv` file. Has to have a `text` column and at least one column of numerical properties. augment: How many times each training sample is augmented. save_datasets: Whether to save the datasets to disk (will be stored in same location as `train_data_path` and `test_data_path`). Returns: A tuple of train and test dataset. """ logger.info("Preparing/reading data...") tokenizer, properties, train_data, test_data = prepare_datasets_from_files( self.tokenizer, train_data_path, test_data_path, augment=augment ) self.tokenizer, self.property_objects = tokenizer, properties self.properties = list(properties.keys()) self.example_sample = train_data[0] train_dataset = self.create_dataset_from_list( train_data, save_path=train_data_path.replace(".csv", ".txt") if save_datasets else None, ) test_dataset = self.create_dataset_from_list( test_data, save_path=test_data_path.replace(".csv", ".txt") if save_datasets else None, ) logger.info("Finished data setup.") return train_dataset, test_dataset
[docs] def create_dataset_from_list( self, data: List[str], save_path: Optional[str] = None ) -> LineByLineTextDataset: """ Creates a LineByLineTextDataset from a List of strings. Args: data: List of strings with the samples. save_path: Path to save the dataset to. Defaults to None, meaning the dataset will not be saved. Returns: The dataset. """ # Write files to temporary location and create data with tempfile.TemporaryDirectory() as temp: f_name = os.path.join(temp, "tmp_data.txt") # Write file with open(f_name, "w") as f: for line in data: f.write(line + "\n") # Create dataset dataset = LineByLineTextDataset( file_path=f_name, tokenizer=self.tokenizer, block_size=2**64 ) if save_path: shutil.copyfile(f_name, save_path) return dataset
[docs]@dataclass class RegressionTransformerModelArguments(TrainingPipelineArguments): """Arguments pertaining to model instantiation.""" __name__ = "model_args" model_path: Optional[str] = field( default=None, metadata={"help": "Path where the pretrained model artifacts are stored."}, ) tokenizer_name: Optional[str] = field( default=None, metadata={ "help": "Pretrained tokenizer name or path. If not provided, will be " "inferred from `model_path`. If `model_path` is not provided either you " "have to pass a tokenizer." }, ) config_name: Optional[str] = field( default=None, metadata={ "help": "Pretrained config name or path. But `model_path` takes preference." }, ) model_type: Optional[str] = field( default="xlnet", metadata={ "help": "If training from scratch, pass a model type from the list: " f"{', '.join(MODEL_TYPES)}. If `model_path` is also provided, `model_path` " "takes preference." }, ) cache_dir: Optional[str] = field( default=None, metadata={ "help": "Where do you want to store the pretrained models downloaded from s3" }, )