VK Cloud logo
Updated at March 19, 2024   02:14 PM

Basic working with Spark jobsBeta

Spark jobs can be submitted to the cluster in different ways:

  • For Spark applications that do not have dependencies, it is sufficient to pass the application code in the job manifest. This approach will be demonstrated below.
  • For Spark applications that require additional artifacts for their operation, you must manually add the required artifacts to the Cloud Storage bucket and edit the job manifest. This approach is illustrated using the example of working with ClickHouse.

As an example, an application will be used to calculate an approximate value of the number π.

Preparatory steps

  1. Prepare the environment for working with Python in any convenient way:

    Create a JupyterHub instance on the VK Cloud platform. It already contains configured Python 3.x and pip, which you can work with from JupyterHub notebook.

  2. Install the Cloud ML Platform library for Python:

    1. Connect to the JupyterHub instance.

    2. In the JupyterHub notebook, create and execute a cell with the following contents:

      %pip install https://mlplatform.hb.ru-msk.vkcs.cloud/mlplatform_client.tar.gz

    The up-to-date version of the library is always available at the link provided.

  3. Create an access token, which is needed to work with the library.

    A token with both the Administrator role and the User role is suitable.

  4. Create a Spark cluster.

    Choose the cluster parameters at your discretion.

  5. Run the script to get information about Spark clusters in the project:

    from mlplatform_client import MLPlatformREFRESH_TOKEN = "<the value of the access token>"mlp = MLPlatform(REFRESH_TOKEN)print(mlp.get_clusters())

    Detailed information about clusters will be displayed.

  6. Find and write down the ID of the created cluster (contained in the id field).

1. Create a file with the Spark application code

This application calculates a Monte Carlo approximation of the number π by distributing the computation across the nodes of the Spark cluster.

2. Send the Spark job to the cluster

Submit a job to the cluster by executing the script:

from mlplatform_client import MLPlatform   REFRESH_TOKEN = "<the value of the access token>"CLUSTER_ID = "<cluster ID>"PY_FILE = "calculate-pi.py"JOB_NAME = "pi-spark-job"   mlp = MLPlatform(REFRESH_TOKEN)   spark_job_manifest = mlp.get_default_manifest(CLUSTER_ID, JOB_NAME)spark_job_info = mlp.spark_submit_job(CLUSTER_ID, spark_job_manifest, PY_FILE)   print(spark_job_info)

Information about the submitted task will be displayed, for example:

Job: pi-spark-job, status: SUBMITTED, created_at: ...

By default, the Spark manifest job does not contain a location of the files needed to run the Spark application.

The application that calculates the number π requires only one executable file, no additional artifacts are used.

In this simple case, you don't need to host the Spark application executable in an Object Storage bucket and then edit the default manifest to add the required information to it.

It is enough to pass the name of the executable file when sending a job to the cluster:

spark_job_info = mlp.spark_submit_job(CLUSTER_ID, spark_job_manifest, PY_FILE)

The Cloud ML Platform library itself will adjust the manifest so that the code from the specified file can be executed.

3. Track the status of the Spark job

  1. Make sure that the result of calculating the number π appears in the task logs. If the result does not appear, run the script to obtain the logs again: intermediate logs of the work can be displayed at the time when the task has not completed yet.

    from mlplatform_client import MLPlatformREFRESH_TOKEN = "<the value of the access token>"CLUSTER_ID = "<cluster ID>"JOB_NAME = "pi-spark-job"mlp = MLPlatform(REFRESH_TOKEN)logs = mlp.spark_job_logs(CLUSTER_ID, JOB_NAME)print(logs)
  2. (Optional) Get information about events in the cluster. Such information allows you to find out the current status of the cluster and jobs, for example, when investigating issues.

    from mlplatform_client import MLPlatformREFRESH_TOKEN = "<the value of the access token>"CLUSTER_ID = "<cluster ID>"mlp = MLPlatform(REFRESH_TOKEN)events = mlp.spark_events(CLUSTER_ID)print(events)

Delete unused resources

If you no longer need the created resources, delete them:

  1. Delete the Spark cluster.
  2. Delete the Docker registry for this Spark cluster.
  3. Delete the access token.