Need for join operator #462
Replies: 5 comments 1 reply
-
Short update. On further reading, I think all "join" operations are a mismatch to the need described above. On further reading, I realize that both dask and spark are treating join in the relational database sense and the operations make sense only when doing relational database type operations. Various forms of join could then be used for input algorithms that use a dataframe as an intermediary. Hoping for creative solutions from someone else in the group. |
Beta Was this translation helpful? Give feedback.
-
Had not heard aything from any of you, but I had an idea that worked. I haven't checked out the equivalent for pyspark yet, but a more careful reading of dask bag map operator documentation here showed a key phrase that map allowed two bags. The example there shows a usage of the stock add function but that led me to create the following that is a test implementation of exactly what was discussed above:
It produces this output:
Hence, the unambiguous solution here is to use the bag module's map function in this form. This will get me past the original problem that created this post. We should, however, continue a discussion of this generic problem of merging parallel data sets like this. I wonder if we need to create a generic function for mspass to simplify this process for users? |
Beta Was this translation helpful? Give feedback.
-
Oh, this is cool. I was not aware of the capability to operate on two bags. This seems to be something unique to dask, but I do figured out the equivalence in pyspark by asking chatGPT. Below is the example given by chatGPT:
I have not tested it, but by reading the document of the zip method here, I think it is correct. Basically, we will need to combine zip with a regular map to achieve the same thing, but it is doable. Back to your question, I think it will be helpful to provide such thing either as a function or at least a documented example. I guess the question is how we can make this generic. It seems to me that the need of a "merge" could be pretty different depends on the data and the workflow. Still, I think adding new metadata entry is a pretty common need. |
Beta Was this translation helpful? Give feedback.
-
I modified the little test program above I wrote for dask for spark using the zip idea described above. After the usual hacking I got the following to work (btw the example from the documentation works out without any issues with the mspass container):
Produces this output
So, if you concur with my plan for mods to |
Beta Was this translation helpful? Give feedback.
-
That's great! Yeah, I think the design is pretty good. One minor point is that we may also want a dedicated function to do the merge outside of the read step. This seems to be a common need not just at the read step. |
Beta Was this translation helpful? Give feedback.
-
Both dask and spark have a "join" operator that can be applied to a bag or rdd respectively. The documentation for a spark join is here and the documentation for dask is here.
There are two applications I can see for this operator:
I think the former is less important as we have a solid, focused approach that probably matches mspass better than this generic operator. The later is a useful way, I think, to handle a class of problems where there is an outside thing with one per datum that needs to be merged into the data object. The example that caused me to post this discussion is the following skeleton of an example I was working on for a new section of the user manual on how to handle continuous data with MsPASS:
I know there is a lot there and as the TODOs say it is not complete, but the idea of the example is a parallel job to extract fixed time windows of data from a large table of "picks" like the Earthscope database of picks from the Array Network Facility. The same functionality would be common for a long long list of workflows if the new Earthscope cloud system allows us to efficiently access their waveform archive. To help you along the algorithm has these steps I'll describe in simple prose to complement the code:
query_generator
function defined above would exploit the new functionality of the pending branch for revising Database. The new version ofread_distributed_data
allows an input of a list of python dict containers defining mongoDB queries. Each dict in his implementation generates a query used to load an ensemble. Hence, the output ofread_distributed_data
above would be a bag ofTimeSeriesEnsemble
objects.join
method. The idea here was to allow a way to merge the arrival times into the workflow where the map call that follows to the functionmake_segments
could acess the time allow it to cut out the segment desired and post the arrival time somewhere to each datum.write_distributed_data
.There are numerous things in the above that are broken or incomplete so don't pick on details.
There are some issues I came across that caused me to write this post rather than just finalize an implemention of the above example.
The main design issue this brings up then is this question: how should one implement the algorithm above and do we need to add something to MsPASS to support it? I reiterate that type of algorithm is important for the seismology community. I am 100% sure of that statement.
A completely different way to do this process may be to write a map function that has a python array/tuple or a DataFrame as an argument. If bag had a method to do the equivalent of an index position for (i.e. respond that a datum is the ith item in the bag) it would trivial to get the ith component of an array/tuple and do something with it. If any of you know how to do that, education me and the community by responding below.
Beta Was this translation helpful? Give feedback.
All reactions