flowchart LR A[raw data] -- preprocessing --> B[preprocessed data per batch] B -- LuciusProcessing --> C[ready for Lucius]
Usage
Data processing and preparation
Introduction
If the LuciusWeb interface is able to calculate Zhang scores and top tables sufficiently fast, it is because the effective calculations are performed in a distributed way by Spark running on a cluster. In order for Spark to do it’s parallel magic, though, it needs the data to be in a suitable format. Every lookup or join that would have to be done during execution of a query would be detremental to the overall performance. In other words, for Spark to function effectively we need to prepare the data in a format that would be called denormalized in traditional database terms.
What this means is that the database we work with on the level of the cluster is a (very) long list of complete records. If a specific compound appears 100 times in this database, than we store all that needs to be shown in the tables directly with every record where this compound is the treatment. This approach trades in storage efficiency for processing speed.
We distinguish 3 main steps in preparing the data:
The preprocessing step takes in the raw data from the experiments and applies differential analysis, possibly replicate consolidation and other computational steps. We will not discuss the preprocessing step as it is outside the scope of our work. The result of this step is written to one or more data files as CSV, TSV, Parquet or some other structured data format. Typically this preprocessed data is structured by batch of the original raw data. Additional information should be provided with it dealing with annotations for treatments, genes and samples.
We pick up the data at this stage and prepare it for use with Lucius. That’s what LuciusProcessing is for.
LuciusProcessing
Generic setup information
We first have to initialize a spark-jobserver (and consequently Spark) context in which our subsequent jobs will run. We assume the bin/build.sh
has been run already, that leaves us with:
utils/processing/create_context
You should receive a response containing SUCCESS
. If you get a timeout message, please try again.
We should provide the appropriate JAR file to the jobserver as well:
utils/processing/upload_jar \
--jars ... \ # location of the jar file
--tag ... \ # version of LuciusAPI to use
--...
Please check if those arguments are not already set correctly in the _viash.yaml
project config file.
If you decide to update the defaults in _viash.yaml
, make sure to run bin/build.sh
again!
After you issued the previous command, you should receive a response saying the JAR is uploaded.
If you get something like this it means something is wrong with the JAR file:
{
"status": "ERROR",
"result": "Binary is not of the right format"
}
Suggested setup
When we deploy LuciusAPI and LuciusOperations, we build/deploy the JAR files on one instance (or pod) together with the Spark Jobserver runtime. Next, we LuciusOperations to initialize the LuciusAPI backend on the instance itself. This means that all necessary ingredients for running LuciusOperations (both the api
and the processing
) part are available on the host:
- The necessary JAR files
- The Spark Jobserver endpoint (it’s just
localhost:8090
) - A working
_viash.yaml
configuration file
The login info for this instance is project-specific and will be provided separately.
Open a shell on the jobserver instance, let’s assume the following directory structure:
/home
LuciusAPI
LuciusProcessing
luciusoperations
LuciusAPI.jar LuciusProcessing.jar
Since this is a running Spark-Jobserver instance (contrary to the generic setup above), the configuration under luciusoperations
is guaranteed to be correct1.
In order to avoid interferring with the running services and applications, we make a copy of the
luciusoperations
directory first:cd /home cp -r luciusoperations luciusoperations-process cd luciusoperations-process
Now, open the
_viash.yaml
file and replaceREPLACE_ME
withluciusprocessing
. Alternatively, use the followingsed
instruction to achieve the same from the CLI directly:sed -i 's/REPLACE_ME/luciusprocessing/' _viash.yaml
Run
bin/build.sh
in order to update the tools underutils/
.Initialize a new context:
utils/processing/create_context
This can take minute to complete, the result should be a status message in JSON format saying
Context initialized
.Upload the JAR file:
utils/processing/upload_jar
Since the JAR is uploaded to
localhost
(on port8090
to be precise), this is fast.Run the processing job. See further down this document for more information about incremental and full processing jobs and how to configure those. For completeness, if a full processing job is required this would entail running:
utils/processing/process
The process may take a lot of time, and that’s expected. It can be monitored by opening the Spark Jobserver console.
What is processed?
LuciusProcessing transforms the data from preprocessed (per batch) data that is normalized to denormalized data in one or multiple ‘files’ per version (see later).
In what follows, we assume the preprocessed data is in Parquet format as well.
The source should look like this:
input/<batch>_profile.parquet # profile data (t-stats, p values, ...)
input/<batch>_profile_meta.parquet # profile meta data (treatment, sample data, ...)
geneAnnotations # a 'file' containing gene annotations
treatmentAnnotations # a 'file' containing treatment annotations
cellAnnotations # a 'file' containing cell annotations
Typically, those files and directories will be on a shared filesystem or blob storage: S3 or HDFS.
Each <batch>
of data might be added on a different date because experimental data is added. The preprocessed data we consume with LuciusProcessing has two modes:
- The default mode, where all data from
input/...
is fetched and processed. This results in a new major version. - An incremental mode, used to process new data since the last processing run.
Full processing
Let’s take a look at the workflow to better understand what happens. We initialized a context and uploaded a JAR above, it’s now time to see if our config is right and if the data is available:
utils/processing/check
Here, we assume the default arguments are correctly configured in _viash.yaml
, if in doubt you can always run utils/processing/check -h
.
We configured the check
request to be synchronous. That means that the check
tool will wait for the answer to be returned. There is however a (configurable) timeout for synchronous requests, and so it may be the returned status is status: ERROR
. Don’t worry in that case, there are ways to get to the requested information. We’ll discuss them later.
This is the output of check
on a limited dataset, trimmed and formatted to be better readable:
{
"duration": "12.053 secs",
"classPath": "com.dataintuitive.luciusprocessing.check",
"startTime": "2022-11-30T13:14:01.863+01:00",
"context": "luciusapi",
"result": {
"info": "No data to process, please check input and parameters",
"header": "None",
"data": {
"inputPath": ".../",
"inputs": [
"batch1: 2022-10-25 with 162 samples stored in .../batch1_profile_meta.parquet",
"batch2: 2022-10-25 with 84 samples stored in .../batch2_profile_meta/ASG001_MCF7_6H_l5_profile_meta.parquet",
...
],
"filter_inputs": [
...
],
"outputs": [
"Version 6_0 at 2022-11-28 (.../output-data/2022-11-28_output_v6_0.parquet)",
"Version 1_0 at 2022-12-01 (.../output-data/2022-12-01_output_v1_0.parquet)",
...
]
}
},
"status": "FINISHED",
"jobId": "c6ea3c8a-dcd1-41bb-bf28-69c46e5431f7",
"contextId": ""
}
There are 3 important lists in the output above:
inputs
is a list of the preprocessed batches that are available.filter_inputs
is for running incrementally, we’ll discuss that later.outputs
is a list of already processed files.
If you add --processingDate
to the check
tool, only data before that date will be processed. By default it is the current date of processing, but in certain situations it might be useful to be able to set it explicitly. For instance, if you want to process the a large number of batches in pieces based on the date at which they were added. Or just when preparing a test dataset. But in general, it should not be used.
If you specify the --processingDate
, the filter_inputs
list will contain the entries that match the query.
When the output of the check
tool yields the expected result, it’s time to start processinsg the data. If processed data is available in the outputs
list, the major version will automatically be updated when a full processing run is started. For example, if the latest output dataset is version 3.x, the next full processing run will get version 4.
The dates and versions are only encoded in the filenames of the Parquet files for convenience. The dates and version that are effectively used are encoded inside thte files. So even if we rename files or move them, we are able to retrieve that information.
It’s now time to start the effective processing job:
utils/processing/process
Again, this just works if the _viash.yaml
config file has been properly provisioned or configured.
The process
tool does not wait for the result and runs in the background. One can either connect to the jobserver console or via the CLI (see later).
Incremental processing runs
The process for incremental runs is the same as for full runs, the difference is in the selection of data that is used for processing: an incremental run looks at the last major version in the output
location, and for that version the last minor version. It then checks at what date that data was processed. If there is input data that is newer than the last processed data, it will be processed and the minor version will be bumped.
Running the processing is as easy as before by adding --incremental
:
utils/processing/process --incremental
Remember that the filtered_inputs
will contain the entries in the input that would be processed when running incrementally.
In order to cleanup, it’s good practice to remove the context after we ran the processing:
utils/processing/remove_context
LuciusAPI
Once there data is processed for use with Lucius, it’s a matter of initializing the long-running context. This is similar to what we did above:
utils/api/create_context
utils/api/upload_jar
utils/api/initialize
At this point, the data will be loaded and cached in memory on the Spark cluster. The caching is important for performance. You can check the status of the initialization job using the jobserver console or the CLI (see below under Troubleshooting).
Please note that while the processing
step only requires the profiles, profile meta and gene annotations, LuciusAPI also needs treatment annotations and cell annotations. Those have to be provided.
When the initialization
job has finished, a check can be performed:
utils/api/check
You’ll get some basic statistics about what is available in the dataset. If this works, there’s a high chance that the frontend will work!
Do not remove the context unless you really intend to do that: removing the context will stop the application from working.
Technical details
Every tool in the toolbox in fact performs a POST
REST request to LuciusAPI or LuciusProcessing. This POST
request includes a so-called payload or a configuration object for this specific endpoint or function. The processing/process
and api/initialize
also require such a configuration payload. The template for this can be found under etc/
. There is one config file for LuciusAPI
and one for LuciusProcessing
. A simple templating mechanism is used where variables that require subsitition are enclosed by two underscores (__
).
Customizations
Processed data versions
As noted above, processed data is stored with a major/minor versioning scheme: major versions for full processing runs, minor versions for incremental runs. Each new full run generates a new major version and consequently the utils/api/initialization
tool should pick out the correct one to initialize the context.
The default is set to 0, but one will often require different versions. One approach is to update the _viash.yaml
file and encode the proper default there.
Another approach is to simply uses the arguments to modify the behavior of the initialize
tool:
utils/api/initialize --db_version 2
Please note that we only select a major version because all minor versions are incremental runs on top of version 2.0.
Other customizations
It’s possible to edit the _viash.yaml
file and run bin/build.sh
again in order to create the utils/
toolbox with new defaults. It’s also possible to use the arguments available, just be sure to use them consistenly.
For instance:
bin/processing/create_context \
--application my_app_name # specify a different name
bin/processing/upload_jar \
--tag 0.2.0 # select an older version
bin/processing/check \
--application my_app_name # use the same app name as before
bin/api/initialize \
--db $different_path \ # point to different database location
--db_version 1 \ # point to different database version
--application my_app_name
As always, simply call the appropriate tool from the toolbox with -h
or --help
.
In principle, we could create scripts (or even workflows?) based on these steps.