%tensorflow_version 2.x
%reset -f
#libs
import tensorflow as tf;
from tensorflow.keras.layers import *;
#constants
DSIZE = 4;
BSIZE = 2;
#model
class model(tf.Module):
#constructor
def __init__(this):
#utilise a keras layer
this.Layer1 = Dense(30, tf.nn.leaky_relu, name="layer1");
#raw tf layer
this.W2 = tf.Variable(tf.random.uniform([30,20], -1,1, tf.float32), name="layer2w");
this.B2 = tf.Variable(tf.random.uniform([ 20], -1,1, tf.float32), name="layer2b");
#single neuron output layer, also raw
this.W3 = tf.Variable(tf.random.uniform([20,1], -1,1, tf.float32), name="layer3w");
this.B3 = tf.Variable(tf.random.uniform([ 1], -1,1, tf.float32), name="layer3b");
#model call
@tf.function(input_signature=[tf.TensorSpec([BSIZE,2], tf.float32)])
def __call__(this,Inp):
H1 = this.Layer1(Inp);
H2 = tf.nn.leaky_relu(tf.matmul(H1,this.W2) + this.B2);
Out = tf.sigmoid(tf.matmul(H2,this.W3) + this.B3);
return Out;
#PROGRAMME ENTRY POINT==========================================================
#data (XOR data for example)
X = tf.constant([[0,0],[0,1],[1,0],[1,1]], tf.float32);
Y = tf.constant([[0 ],[1 ],[1 ],[0 ]], tf.float32);
#normalise data to range [0,1]
X = X/tf.reduce_max(X);
Y = Y/tf.reduce_max(Y);
#make batch generator
Data = tf.data.Dataset.from_tensor_slices((X,Y));
Data = Data.repeat().shuffle(DSIZE).batch(BSIZE,drop_remainder=True);
#train
Model = model();
Loss = tf.losses.MeanSquaredError();
Optim = tf.optimizers.SGD(1e-1);
Steps = 1000;
Laft = 100;
Lsum = 0;
Iter = iter(Data);
for I in range(Steps):
Batch = next(Iter);
Inp = Batch[0]; #input
Exp = Batch[1]; #expected
with tf.GradientTape() as T:
Lval = Loss(Exp,Model(Inp));
Lsum += Lval.numpy();
Grads = T.gradient(Lval, Model.trainable_variables);
Optim.apply_gradients(zip(Grads, Model.trainable_variables));
if I%Laft==Laft-1:
print("Average Loss:",Lsum/Laft);
Lsum = 0;
#save
print("\nSaving model...");
Model_Dir = "/tmp/my-model";
tf.saved_model.save(Model,Model_Dir);
%ls -laX --group-directories-first /tmp/my-model
#load back model
print("\nLoading back model...");
Model = tf.saved_model.load(Model_Dir);
print(vars(Model).keys());
#continue training the model
print("\nContinue training...");
for I in range(Steps):
Batch = next(Iter); #load another dataset or use back the infinite Iter above
Inp = Batch[0];
Exp = Batch[1];
with tf.GradientTape() as T:
Lval = Loss(Exp,Model(Inp));
Lsum += Lval.numpy();
Params = Model.Layer1.trainable_variables + [Model.W2,Model.B2,Model.W3,Model.B3];
Grads = T.gradient(Lval, Params);
Optim.apply_gradients(zip(Grads, Params));
if I%Laft==Laft-1:
print("Average Loss:",Lsum/Laft);
#infer the training data
print("\nInferring the training data...");
Data = tf.data.Dataset.from_tensor_slices((X,Y));
Data = Data.batch(BSIZE,drop_remainder=True);
for Inp,Exp in Data:
print("Input:");
print(Inp.numpy());
print("Expected:");
print(Exp.numpy());
print("Prediction:");
print(Model(Inp).numpy());
print();
#eof