Introduction to MLSQL Deep Learning [1]

祝海林
9 min readMay 30, 2021

All code examples in this article are based on the latest version of MLSQL Engine 2.1.0-SNAPSHOT

This article will use the notebook of the MLSQL Console to demonstrate the Hello world example -mnist dataset for deep learning.

List of series articles:

  1. MLSQL Machine Learning Minimalist Tutorial (No Python Required!)
  2. MLSQL deep learning introduction [1]
  3. Introduction to MLSQL deep learning [2] -Distributed model training
  4. Introduction to MLSQL deep learning [3] -Feature engineering
  5. Introduction to MLSQL deep learning [4] -Serving

Environment requirements

Necessary software:

  1. The latest version of MLSQL Engine/Console.
  2. Ray 1.3.0 (optional)

Python dependencies are as follows:

pip install Cythonpip install ray==1.3.0pip install tensorflowpip install aiohttp psutil setproctitle grpcio pandaspip install watchdog requests click uuid plotlypip install pyjava>=0.2.8.8

These dependencies are only required on MLSQL Engine Driver nodes (deploying on all Executor nodes is optional). If Ray is deployed, all Ray nodes need to have these dependencies.

Prepare dataset

We can either download the mnist dataset by ourselves and upload it to the object store through Console’s upload function, or we can use the MLSQL third-party library lib-core to obtain the dataset we need. In this article, we will use lib-core to do it.

The sample code is as follows:

Let’s explain the corresponding MLSQL code in segments;

Part 1 Code:

include lib.`github.com/allwefantasy/lib-core` whereforce="true" and -- always download the newest package-- libMirror="gitee.com" and -- proxy configuration for users in China.alias="libCore";

Import the third-party library lib-core with include statement.

Part 2 Code:

include local.`libCore.dataset.mnist`;!dumpData /tmp/mnist;

Introduced the module dataset.mnist, which provides the !dumpData command, allowing you to dump data into the specified directory.

Now, let’s take a look at what the data we have imported looks like:

It may not be clear, it doesn t matter, we can first look at the fields and the corresponding types in the dataset:

Then let’s look at the length of the array:

Here we see that we have expanded the 28 * 28 pixel into an array.

Next step, we can do model training.

Training with TensorFlow (non-Ray version)

First, we need to set the following environment:

Brief introduction:

--  Tell the system the Python client should-- run on driver side since we have all dependencies installed-- this node.!python conf "runIn=driver";-- Tell the system the schema of output of-- the  python code we will write in next step!python conf "schema=file";
-- dataMode is model means this is not a ETL.!python conf "dataMode=model";

Now, we can start writing Python code. In the Notebook of MLSQL Console, we can write Python code directly in the cell. The details are as follows:

The implementation results are as follows:

The following is the complete code in the current cell:

#%python#%input=mnist_data#%output=mnist_model#%cache=truefrom tensorflow.keras import models,layersfrom tensorflow.keras import utils as np_utilsfrom pyjava.api.mlsql import RayContextimport osimport numpy as np
ray_context = RayContext.connect(globals(),None)data_servers = ray_context.data_servers()def data(): temp_data = [item for item in RayContext.collect_from(data_servers)] train_images = np.array([np.array(item["image"]) for item in temp_data]) train_labels = np_utils.to_categorical(np.array([item["label"] for item in temp_data]) ) train_images = train_images.reshape((len(temp_data),28*28)) return train_images,train_labelsdef train(): train_images,train_labels = data() network = models.Sequential() network.add(layers.Dense(512,activation="relu",input_shape=(28*28,))) network.add(layers.Dense(10,activation="softmax")) network.compile(optimizer="rmsprop",
loss="categorical_crossentropy",metrics=["accuracy"])
network.fit(train_images,train_labels,epochs=6,batch_size=128) model_path = os.path.join("tmp","minist_model")
network.save(model_path)
return model_path
model_path = train()ray_context.build_result_from_dir(model_path)

There are several core points, let’s explain, there are several comments(annotations/hints) at the top of the cell:

#%python#%input=mnist_data#%output=mnist_model#%cache=true

The current cell language, data table name, and output table name are marked respectively, and whether the output result of python is cached. With this information, the system will automatically generate the following statement and put them into the MLSQL engine layer for execution:

run command as Ray.`` whereinputTable="mnist_data" andoutputTable="mnist_model" andcache="true" andcode='''YOUR PYTHON CODE''';

Obviously, in Notebook, it will be easier and more comfortable to use.

Next, we need to get a session object:

ray_context = RayContext.connect(globals(),None)

With this session object, we can either fetch data from table mnist_data or put data generated by Python to the table mnist_model.

The way to obtain data is as follows:

data_servers = ray_context.data_servers()temp_data = [item for item in RayContext.collect_from(data_servers)]

data_servers is the address of the data source, it is an array. You can get data from table mnist_data through RayContext. collect_from. Note that RayContext. collect_from returns a generator, and the data can only be consumed once. Here we simply put all the data directly into memory, that is, temp_data object. If the dataset is large, you can save it into disk and then read it later.

When we finished training the model, we saved the model in the/tmp/minist_model directory,

model_path = os.path.join("tmp","minist_model")network.save(model_path)

But obviously, this is a local disk, and we need to be able to output the trained model into a table, so we can store the table in the data lake for subsequent use. This can be done in the following way:

ray_context.build_result_from_dir(model_path)

At this time, the mnist_model table holds the data of the entire model directory. We can verify with the select statement:

That it is indeed a tensorflow model.

Now, we save the model in the data lake:

Data lake supports version, so you can use the following command to view the version of the current model:

For now, the training part is completed.

Training with TensorFlow (On Ray)

We can also run the training on Ray. The code is almost the same with the non-Ray version , here is the full code:

#%python#%input=mnist_data#%output=mnist_model#%cache=true
import ray
import osfrom tensorflow.keras import models,layersfrom tensorflow.keras import utils as np_utilsfrom pyjava.api.mlsql import RayContextfrom pyjava.storage import streaming_tarfrom pyjava import rayfiximport numpy as npray_context = RayContext.connect(globals(),"127.0.0.1:10001")data_servers = ray_context.data_servers()def data(): temp_data = [item for item in RayContext.collect_from(data_servers)] train_images = np.array([np.array(item["image"]) for item in temp_data]) train_labels = np_utils.to_categorical(np.array([item["label"] for item in temp_data]) ) train_images = train_images.reshape((len(temp_data),28*28)) return train_images,train_labels@ray.remote@rayfix.lastdef train(): train_images,train_labels = data()
network = models.Sequential()
network.add(layers.Dense(512,
activation="relu",input_shape=(28*28,)))
network.add(layers.Dense(10,activation="softmax"))
network.compile(optimizer="rmsprop",
loss="categorical_crossentropy",metrics=["accuracy"])
network.fit(train_images,train_labels,epochs=6,batch_size=128)
model_path = os.path.join("tmp","minist_model")
network.save(model_path)
model_binary = [item for item in
streaming_tar.build_rows_from_file(model_path)]
return model_binary
model_binary = ray.get(train.remote())ray_context.build_result(model_binary)

Some differences are that we need to convert the model directory into a binary stream to return, so that the client can return the data from Ray to the Engine:

model_binary = [item for item in streaming_tar.build_rows_from_file(model_path)]return model_binary

Finally, use the previous way to build:

ray_context.build_result(model_binary)

Predict

We first load the model and data:

Then set the schema we will predict in later:

The code corresponding to the two parts is as follows:

load delta.`ai_model.mnist_model` as model;
load parquet.`/tmp/mnist` as mnist_data;
!python env "PYTHON_ENV=source /Users/allwefantasy/opt/anaconda3/bin/activate ray1.3.0";!python conf "schema=st(field(actualCol,long),field(predictCol,long))";

Now we can write python code to predict the data:

The results are as follows:

Here is the complete code:

#%python#%input=mnist_data#%model=model#%cache=true#%output=predicted_tableimport rayimport numpy as npfrom tensorflow.keras import models,layersfrom tensorflow.keras import utils as np_utilsfrom pyjava.api.mlsql import PythonContext,RayContextimport mockimport osimport numpy as npfrom pyjava import rayfixray_context = RayContext.connect(globals(),None)conf = ray_context.conf()home = conf["HOME"]## rebuild model from data lakemodel_path = os.path.join(home,"tmp","minist_model7")model_servers = RayContext.parse_servers(conf["modelServers"])ray_context.fetch_as_dir(model_path,model_servers)model = models.load_model(model_path)## get data  and use model to predict## notice that ray_context.collect() generator if your dataset is big,## try get the data by chunk.For simplicity, in this exmaple, we get all data## from mnist_datatemp_data = [item for item in ray_context.collect()]train_images = np.array([np.array(item["image"]) for item in temp_data])train_labels = np_utils.to_categorical(np.array([item["label"] for item in temp_data]))train_images = train_images.reshape((len(temp_data),28*28))predictions = model.predict(train_images)result = [{"actualCol":np.argmax(a),"predictCol":np.argmax(b)} for (a,b) in zip(predictions,train_labels)]print(result[1])context.build_result(result)

There are several points that need to be explained separately.

First of all, how to get the model stored in the data lake in Python? It can be done by the following code:

model_path = os.path.join(home,"tmp","minist_model7")model_servers = RayContext.parse_servers(conf["modelServers"])ray_context.fetch_as_dir(model_path,model_servers)model = models.load_model(model_path)

If the user provides the #%model = annotation, then ray_context.conf() will have the address of the model, which can be restored to the local disk by ray_context fetch_as_dir (model_path, model_servers) function.

Then the way of obtaining data is the same as we mentioned in the previous section. Here we can implicitly obtain it through the ray_context.collect() method:

temp_data = [item for item in ray_context.collect()]

In the prediction, we output the actual value and the predicted value into a new table to facilitate the subsequent evaluation of our actual effect.

You can now see the result with the following statement:

Then we use ConfusionMatrix, MLSQL's built-in module, to calculate various metrics:

Isn’t that helpful?

How to debug Python code in MLSQL using IDE

We have seen that in MLSQL, Python can seamlessly interface with SQL and can run directly in the Web Console, greatly improving productivity. However, the current Console editor is not very friendly to Python support, which includes not being able to do auto code-complete . So what should we do? Can we debug it in the IDE and then put it into the Console to run the actual data?

In fact, with a few tweaks, we can make the Python code of MLSQL run away from the MLSQL engine.

First, check whether there is a context object to determine whether the code runs in the MLSQL engine or independently.

is_in_mlsql = "context" not in globals() and "context" not in locals()

Secondly, for context objects, we can introduce mock to avoid calling errors:

context = mock.Mock()

Now, you can use is_in_mlsql to make some judgments so that the script can be separated from the MLSQL Engine test. There is a sample code I wrote at the end of the article.

Next step

As the first introduction to MLSQL deep learning, we saw the simplicity of using MLSQL for deep learning. Perfect integration of big data system and AI ecosystem.

In the next article, we will introduce how to use Tensorflow for distributed training in MLSQL.

Appendix

--%python--%input=mnist--%output=model--%confTable=pconf--%cache=trueimport rayfrom tensorflow.keras import models,layersfrom tensorflow.keras import utils as np_utilsfrom pyjava.api.mlsql import PythonContext,RayContextimport mockimport osimport numpy as npfrom pyjava import rayfixis_mock = "context" not in globals() and "context" not in locals()enable_ray = not is_mock and "rayAddress" in context.conf and context.conf["rayAddress"]!=""home = "/" if is_mock else context.conf["HOME"]if not is_mock:ray_address = None if not enable_ray else context.conf["rayAddress"]ray_context = RayContext.connect(globals(),ray_address)data_servers = ray_context.data_servers()else:context = mock.Mock()ray.util.connect(conn_str="192.168.31.207:10001")def data():if is_mock:from tensorflow.keras.datasets import mnist(train_images,train_labels),(test_images,test_labels) = mnist.load_data()train_images = train_images.reshape((60000,28*28))train_images = train_images.astype('float32')/255test_images = test_images.reshape((10000,28*28))test_images = test_images.astype('float32')/255train_labels = np_utils.to_categorical(train_labels)test_labels = np_utils.to_categorical(test_labels)else:temp_data = [item for item in RayContext.collect_from(data_servers)]train_images = np.array([np.array(item["image"]) for item in temp_data])train_labels = np_utils.to_categorical(np.array([item["label"] for item in temp_data])    )train_images = train_images.reshape((len(temp_data),28*28))return train_images,train_labels@ray.remote@rayfix.lastdef train():return _train()def _train():train_images,train_labels = data()network = models.Sequential()network.add(layers.Dense(512,activation="relu",input_shape=(28*28,)))network.add(layers.Dense(10,activation="softmax"))network.compile(optimizer="rmsprop",loss="categorical_crossentropy",metrics=["accuracy"])network.fit(train_images,train_labels,epochs=6,batch_size=128)model_path = os.path.join(home,"tmp","minist_model")network.save(model_path)return model_pathif  is_mock:model_path = ray.get(train.remote())else:model_path = ray.get(train.remote()) if enable_ray else _train()context.build_result_from_dir(model_path)

--

--