Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failing to save the model pickle. and how to check for the predictions? #197

Open
vizzyno1 opened this issue Feb 10, 2022 · 1 comment
Open

Comments

@vizzyno1
Copy link

vizzyno1 commented Feb 10, 2022

Hi Team ,

I am trying to train the Tensorflow LSTM model over the Spark after the epoch training, it is failing with the below error:-

==========================
Doing CPU training...
Will run with 8 Spark tasks.
/usr/local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:1844: UserWarning: Model.fit_generator is deprecated and will be removed in a future version. Please use Model.fit, which supports generators.
warnings.warn('Model.fit_generator is deprecated and '
2022-02-10 11:41:59.702186: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2022-02-10 11:41:59.718653: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2999995000 Hz
Epoch 1/5
247/247 [==============================] - 141s 543ms/step - loss: 0.0380 - mean_absolute_error: 0.1516 - val_loss: 0.0141 - val_mean_absolute_error: 0.0775
Epoch 2/5
247/247 [==============================] - 129s 522ms/step - loss: 0.0126 - mean_absolute_error: 0.0821 - val_loss: 0.0103 - val_mean_absolute_error: 0.0648
Epoch 3/5
247/247 [==============================] - 137s 554ms/step - loss: 0.0094 - mean_absolute_error: 0.0695 - val_loss: 0.0082 - val_mean_absolute_error: 0.0519
Epoch 4/5
247/247 [==============================] - 149s 605ms/step - loss: 0.0079 - mean_absolute_error: 0.0614 - val_loss: 0.0070 - val_mean_absolute_error: 0.0479
Epoch 5/5
247/247 [==============================] - 132s 536ms/step - loss: 0.0067 - mean_absolute_error: 0.0567 - val_loss: 0.0060 - val_mean_absolute_error: 0.0422
Distributed training in progress...
View Spark executor stderr logs to inspect training...
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/serializers.py", line 437, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 101, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
return Pickler.dump(self, obj)
File "/usr/lib64/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 884, in _batch_setitems
save(k)
File "/usr/lib64/python3.7/pickle.py", line 524, in save
rv = reduce(self.proto)
TypeError: can't pickle weakref objects

======================================

  1. Are there any example on the model prediction after the distributed training using MirroredStrategyRunner.?
@HeyPhiS
Copy link

HeyPhiS commented Mar 8, 2022

I am sorry I can not help you with the first part, I also would like to know if and how that would work.

However, the second part works at least if you put training and prediction in one function.
I tried it by adapting from the official example at https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/deep-learning/spark-tensorflow-distributor.html

### 1.)  Combined train and predict (helper functions listed below)
def train_and_predict( batch_size ):

  train_datasets = make_training_dataset(batch_size)
  train_dataset_x , train_dataset_y = make_prediction_dataset(batch_size)

  multi_worker_model = build_and_compile_cnn_model()
  
  multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)

  prediction = multi_worker_model.predict( x=train_dataset_x , batch_size=batch_size )
    
  return prediction

### 2.) Execute with Runner
from spark_tensorflow_distributor import MirroredStrategyRunner
runner = MirroredStrategyRunner(num_slots=NUM_SLOTS, local_mode=False, use_gpu=USE_GPU)
prediction = runner.run( train_and_predict , batch_size=batch_size )

 ---------------------------------------------------------------------

### A.) Functions to generate data
def load_mnist():
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    
    return x_train, y_train

def make_training_dataset(batch_size):
    x_train, y_train = load_mnist()

    train_dataset = tf.data.Dataset.from_tensor_slices(
                            (x_train, y_train) ).repeat().batch(batch_size)
    
    # Specify the data auto-shard policy: DATA
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    train_dataset = train_dataset.with_options(options)
    
    return train_dataset

def make_prediction_dataset(batch_size):
    x_train, y_train = load_mnist()

    train_dataset_x = tf.data.Dataset.from_tensor_slices( x_train ).batch(batch_size)
    train_dataset_y = tf.data.Dataset.from_tensor_slices( y_train ).batch(batch_size)
    
    # Specify the data auto-shard policy: DATA
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    train_dataset_x = train_dataset_x.with_options(options)
    train_dataset_x = train_dataset_x.with_options(options)
    
    return train_dataset_x , train_dataset_y

### B.) function to build model
def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(28, 28)),
    tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax'),
                  ])

  model.compile(
    loss=tf.keras.losses.sparse_categorical_crossentropy,
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
    metrics=['accuracy'],
              )
    
  return model

However, this does not get around the problem of saving the model. I would also like to have the answer to that question??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants