In [1]:
import kfp
import kfp.components as components
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath

In [2]:
def train_export_model(trainingjobName: str, epochs: str, version: str):
    
    import tensorflow as tf
    from numpy import array
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Flatten, Dropout, Activation, LSTM
    import numpy as np
    print("numpy version")
    print(np.__version__)
    import pandas as pd
    import os
    from featurestoresdk.feature_store_sdk import FeatureStoreSdk
    from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk

    import requests
    from tensorflow import keras
    from sklearn.model_selection import train_test_split

    fs_sdk = FeatureStoreSdk()
    mm_sdk = ModelMetricsSdk()
    
    features = fs_sdk.get_features(trainingjobName, ['x', 'y', 'z'])
    features = features.astype('float64')
    print("Dataframe:")
    print(features)
    print(features.dtypes)

    features = features.to_numpy()
    print("features.dtype:")
    print(features.dtype)

    one_set_data_count = 2500
    set_count = 1
    input_data_series_count = 100  # hyper parameter
    separate_data = []
    series = []
    label = []
    for i in range(set_count):
        separate_data.append(features[i*one_set_data_count: i*one_set_data_count+one_set_data_count])
        for j in range(one_set_data_count-input_data_series_count):
            series.append(separate_data[i][j:j+input_data_series_count])  # get input_data_series_count as input
            label.append(separate_data[i][j+input_data_series_count])  # get input_data_series_count+1 data as label
    separate_data = np.array(separate_data)
    series = np.array(series)
    label = np.array(label)

    # make train and test
    x_train, x_test, y_train, y_test = train_test_split(series, label, test_size=0.4, random_state=42)
    x_test, x_validate, y_test, y_validate = train_test_split(x_test, y_test, test_size=0.5, random_state=42)
    print("x_train:", x_train.dtype)
    print("y_train:", y_train.dtype)
    print("x_validate:", x_validate.dtype)
    print("y_validate:", y_validate.dtype)


    # Define the model architecture
    model = Sequential()
    model.add(LSTM(units=64, input_shape=(input_data_series_count, 3)))
    model.add(Dropout(0.3))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(3, activation='linear'))
    
    # Compile the model
    model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])

    # Train the model on your data
    batch_size = 4
    model.fit(x_train, y_train, batch_size=batch_size, epochs=int(epochs), validation_data=(x_validate, y_validate))

    model.save("./")
    mm_sdk.upload_model("./", trainingjobName, version)

In [3]:
BASE_IMAGE = "traininghost/pipelineimage:latest"

In [4]:
def train_and_export(trainingjobName: str, epochs: str, version: str):
    trainOp = components.func_to_container_op(train_export_model, base_image=BASE_IMAGE)(trainingjobName, epochs, version)
    # Below line to disable caching of pipeline step
    trainOp.execution_options.caching_strategy.max_cache_staleness = "P0D"
    trainOp.container.set_image_pull_policy("IfNotPresent")

In [None]:
PIPELINE_NAME = 'UAV_pipeline'

In [5]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description='UAVData_pipeline'
)
def super_model_pipeline( 
    trainingjob_name: str, epochs: str, version: str):
    
    train_and_export(trainingjob_name, epochs, version)

In [6]:
pipeline_func = super_model_pipeline
file_name = "model_training_pipeline"
kfp.compiler.Compiler().compile(pipeline_func, '{}.zip'.format(file_name))

In [7]:
import requests
pipeline_name=PIPELINE_NAME
pipeline_file = file_name+'.zip'
requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')})

<Response [200]>