In [1]:
from kfp.dsl import component, pipeline
import kfp
from kfp import kubernetes

@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn"],
    base_image="python:3.9"
)
def prepare_data(data_path: str):
    import pandas as pd
    import os
    from sklearn import datasets
    
    iris = datasets.load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names)
    df['species'] = iris.target
    
    df = df.dropna()
    df.to_csv(f'{data_path}/final_df.csv', index=False)
    
@component(
    packages_to_install=["pandas", "numpy", "scikit-learn"],
    base_image="python:3.9",
)
def train_test_split(data_path: str):    
    import pandas as pd
    import numpy as np
    import os
    from sklearn.model_selection import train_test_split
    
    final_data = pd.read_csv(f'{data_path}/final_df.csv')
    
    target_column = 'species'
    X = final_data.loc[:, final_data.columns != target_column]
    y = final_data.loc[:, final_data.columns == target_column]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,stratify = y, random_state=47)
    
    np.save(f'{data_path}/X_train.npy', X_train)
    np.save(f'{data_path}/X_test.npy', X_test)
    np.save(f'{data_path}/y_train.npy', y_train)
    np.save(f'{data_path}/y_test.npy', y_test)
    
@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn"],
    base_image="python:3.9",
)
def training_basic_classifier(data_path: str):
    import pandas as pd
    import numpy as np
    import os
    from sklearn.linear_model import LogisticRegression
    
    X_train = np.load(f'{data_path}/X_train.npy',allow_pickle=True)
    y_train = np.load(f'{data_path}/y_train.npy',allow_pickle=True)
    
    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train,y_train)
    import pickle
    with open(f'{data_path}/model.pkl', 'wb') as f:
        pickle.dump(classifier, f)
        
@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.9",
)
def register_model(data_path: str) -> dict:
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import mlflow
    from mlflow.models import infer_signature
    from sklearn import datasets
   
    with open(f'{data_path}/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)
    
    # Infer the model signature
    X_test = np.load(f'{data_path}/X_test.npy', allow_pickle=True)
    y_pred = logistic_reg_model.predict(X_test)
    signature = infer_signature(X_test, y_pred)
    
    
    # Log and register the model using MLflow scikit-learn API
    mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")
    reg_model_name = "SklearnLogisticRegression"
    
    # Change the name of the experiment from the default 
    mlflow.set_experiment(experiment_name="new-exp")
    
    with mlflow.start_run() as run:
        mlflow.log_param('max_iter', 500)

        # Log model artifact
        artifact_path = "sklearn-model"      
        mlflow.log_artifact(local_path=f'{data_path}/model.pkl', artifact_path=artifact_path)
        
        model_info = mlflow.sklearn.log_model(
            sk_model=logistic_reg_model,
            artifact_path="sklearn-model",
            signature=signature,
            registered_model_name=reg_model_name,
        )
    
    model_uri = f"runs:/{run.info.run_id}/sklearn-model" #run details 
    
    
    # Capture version from register_model and pass it to next pipeline for inference/future references 
    mlflow.register_model(
        model_uri,
        reg_model_name
    )

    return {"artifact_path": artifact_path, "artifact_uri": run.info.artifact_uri, "run_id": run.info.run_id, "experiment_id": run.info.experiment_id}


@component(
    packages_to_install=["pandas", "numpy==1.21.0", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.9",
)
def predict_on_test_data(data_path: str, model_info: dict) -> str:
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import mlflow
    
    artifact_path = model_info["artifact_path"]
    artifact_uri = model_info["artifact_uri"]
    
    mlflow.set_tracking_uri("http://mlflow-tracking.mlflow.svc.cluster.local:80")
    mlflow.set_experiment(experiment_name="experiment-2006")
    model_uri = f"{artifact_uri}/{artifact_path}"
    logistic_reg_model = mlflow.sklearn.load_model(model_uri)
        
    X_test = np.load(f'{data_path}/X_test.npy',allow_pickle=True)
    y_pred = logistic_reg_model.predict(X_test)
    np.save(f'{data_path}/y_pred.npy', y_pred)
    
    X_test = np.load(f'{data_path}/X_test.npy',allow_pickle=True)
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    np.save(f'{data_path}/y_pred_prob.npy', y_pred_prob)
    
    return model_uri

@component(
    packages_to_install=["kserve"],
    base_image="python:3.9",
)
def model_serving(model_info: dict):
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    from kserve import V1beta1ModelSpec
    from kserve import V1beta1ModelFormat
    import os
    
    artifact_root = "mlflow-artifacts:"
    
    build_model_uri = "{}/{}/{}/artifacts/{}".format(artifact_root,model_info['experiment_id'],
                                                     model_info['run_id'],model_info['artifact_path'])
    
    namespace = utils.get_default_target_namespace()
    
    name='sklearn-iris-v3-demo'

    predictor = V1beta1PredictorSpec(
        model=V1beta1ModelSpec(
            model_format=V1beta1ModelFormat(
                name="sklearn",
            ),
            protocol_version="v2",
            runtime="kserve-mlserver",
            storage_uri=build_model_uri,
        ),
    )

    isvc = V1beta1InferenceService(
        api_version=constants.KSERVE_V1BETA1,
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(
            name=name, namespace=namespace,annotations={'sidecar.istio.io/inject':'false'}
        ),
        spec=V1beta1InferenceServiceSpec(predictor=predictor),
    )


    KServe = KServeClient()
    KServe.create(isvc)
    KServe.get(name, namespace=namespace, watch=True, timeout_seconds=200)
    

from kubernetes import client, config
import base64

@pipeline(
    name="iris-pipeline",
)
def iris_pipeline(data_path: str):
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-iris-mlflow-pvc',
        access_modes=['ReadWriteOnce'],
        size='1G',
    )
    
    # Data Preparation
    prepare_data_task = prepare_data(data_path=data_path)
    prepare_data_task.set_caching_options(False)
    kubernetes.mount_pvc(prepare_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    
    # Split data into Train and Test Sets     
    train_test_split_task = train_test_split(data_path=data_path) 
    train_test_split_task.set_caching_options(False)
    kubernetes.mount_pvc(train_test_split_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    train_test_split_task.after(prepare_data_task)
    
    # Model Training
    training_basic_classifier_task = training_basic_classifier(data_path=data_path)
    training_basic_classifier_task.set_caching_options(False)
    kubernetes.mount_pvc(training_basic_classifier_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    training_basic_classifier_task.after(train_test_split_task)
    
    # Register our Trained Model in our MLflow based Model Registry
    register_model_task = register_model(data_path=data_path)
    register_model_task.set_caching_options(False)
    kubernetes.mount_pvc(register_model_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    # kubernetes.mount_pvc(register_model_task, pvc_name="mlflow-pvc", mount_path='/opt/mlflow/')
    register_model_task.after(training_basic_classifier_task)
    
    # Model Evaluation
    predict_on_test_data_task = predict_on_test_data(data_path=data_path, model_info=register_model_task.output)
    predict_on_test_data_task.set_caching_options(False)
    kubernetes.mount_pvc(predict_on_test_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    predict_on_test_data_task.after(register_model_task)
    
    # Model Deployment
    model_serving_task = model_serving(model_info=register_model_task.output)
    model_serving_task.set_caching_options(False)
    model_serving_task.after(predict_on_test_data_task)

if __name__ == '__main__':
  import kfp.compiler as compiler
  # Please change the name of the output YAML file if required
  compiler.Compiler().compile(iris_pipeline, 'demo-pipeline.yaml')