Partitioning a query result..

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Partitioning a query result..

Tony Giaccone
I want to speed thing up, by running multiple instances of a job that
fetches data from a table.  So that for example if I need to process 10,000
rows
the query runs on each instance and returns 4 sets of 2500 rows one for
each instance with no duplication.

My first thought in SQL was to add something like this to the where clause..

and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;

so that if the instance count was 4 then the instance IDs would run 0,1,2,3.

I'm not quite sure how you would structure that using the queryAPI. Any
suggestions about that?

And there are some problems with this idea, as you have to be certain your
IDs increase in a manner that aligns with your math so that the
partitioning is equal in size.
For example if your sequence increments by 20, then you would have to futz
around with your math to get the right partitioning and that is the problem
with this technique.
 It's brittle it depends on getting a bunch of things in  "sync".

Does anyone have another idea of how to segment out rows that would yield a
solution that's not quite so brittle?



Tony Giaccone
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning a query result..

John Huss
Unless your DB disk is stripped into at least four parts this won't be
faster.
On Wed, Dec 14, 2016 at 5:46 PM Tony Giaccone <[hidden email]> wrote:

> I want to speed thing up, by running multiple instances of a job that
> fetches data from a table.  So that for example if I need to process 10,000
> rows
> the query runs on each instance and returns 4 sets of 2500 rows one for
> each instance with no duplication.
>
> My first thought in SQL was to add something like this to the where
> clause..
>
> and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;
>
> so that if the instance count was 4 then the instance IDs would run
> 0,1,2,3.
>
> I'm not quite sure how you would structure that using the queryAPI. Any
> suggestions about that?
>
> And there are some problems with this idea, as you have to be certain your
> IDs increase in a manner that aligns with your math so that the
> partitioning is equal in size.
> For example if your sequence increments by 20, then you would have to futz
> around with your math to get the right partitioning and that is the problem
> with this technique.
>  It's brittle it depends on getting a bunch of things in  "sync".
>
> Does anyone have another idea of how to segment out rows that would yield a
> solution that's not quite so brittle?
>
>
>
> Tony Giaccone
>
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning a query result..

Andrus Adamchik
Here is another idea:

* read all data in one thread using iterated query and DataRows
* append received rows to an in-memory queue (individually or in small batches)
* run a thread pool of processors that read from the queue and do the work.

As with all things performance, this needs to be measured and compared with a single-threaded base line. This will not help with IO bottleneck, but the processing part will happen in parallel. If you see any Cayenne bottlenecks during the last step, you can start multiple ServerRuntimes - one per thread.

Andrus

> On Dec 15, 2016, at 3:06 AM, John Huss <[hidden email]> wrote:
>
> Unless your DB disk is stripped into at least four parts this won't be
> faster.
> On Wed, Dec 14, 2016 at 5:46 PM Tony Giaccone <[hidden email]> wrote:
>
>> I want to speed thing up, by running multiple instances of a job that
>> fetches data from a table.  So that for example if I need to process 10,000
>> rows
>> the query runs on each instance and returns 4 sets of 2500 rows one for
>> each instance with no duplication.
>>
>> My first thought in SQL was to add something like this to the where
>> clause..
>>
>> and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;
>>
>> so that if the instance count was 4 then the instance IDs would run
>> 0,1,2,3.
>>
>> I'm not quite sure how you would structure that using the queryAPI. Any
>> suggestions about that?
>>
>> And there are some problems with this idea, as you have to be certain your
>> IDs increase in a manner that aligns with your math so that the
>> partitioning is equal in size.
>> For example if your sequence increments by 20, then you would have to futz
>> around with your math to get the right partitioning and that is the problem
>> with this technique.
>> It's brittle it depends on getting a bunch of things in  "sync".
>>
>> Does anyone have another idea of how to segment out rows that would yield a
>> solution that's not quite so brittle?
>>
>>
>>
>> Tony Giaccone
>>

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning a query result..

Tony Giaccone-3
Right so I agree with the partitioning of the database, that's a thing that
can be done.

Andrus, I'm a bit less confident in the proposal you're suggesting. I want
to be able to spin up new instances potentially in new containers and run
them in different environments. If we're moving to a cloud based
infrastructure, then paralyzing in a single app doesn't match up with that
kind of deployment. I recognize there are limits on my solution as well.
You have to deal with how you split up the rows into partitions.

The problem generally stated is. If I have 10,000 records and I want to
distribute them across N number of workers. How do I do that?  How can I
partition the result set at run time, into an arbitrary number of workers?

I also realize this is quickly expanding out side the scope of the cayenne
users mailing list.

On Thu, Dec 15, 2016 at 3:18 AM, Andrus Adamchik <[hidden email]>
wrote:

> Here is another idea:
>
> * read all data in one thread using iterated query and DataRows
> * append received rows to an in-memory queue (individually or in small
> batches)
> * run a thread pool of processors that read from the queue and do the work.
>
> As with all things performance, this needs to be measured and compared
> with a single-threaded base line. This will not help with IO bottleneck,
> but the processing part will happen in parallel. If you see any Cayenne
> bottlenecks during the last step, you can start multiple ServerRuntimes -
> one per thread.
>
> Andrus
>
> > On Dec 15, 2016, at 3:06 AM, John Huss <[hidden email]> wrote:
> >
> > Unless your DB disk is stripped into at least four parts this won't be
> > faster.
> > On Wed, Dec 14, 2016 at 5:46 PM Tony Giaccone <[hidden email]>
> wrote:
> >
> >> I want to speed thing up, by running multiple instances of a job that
> >> fetches data from a table.  So that for example if I need to process
> 10,000
> >> rows
> >> the query runs on each instance and returns 4 sets of 2500 rows one for
> >> each instance with no duplication.
> >>
> >> My first thought in SQL was to add something like this to the where
> >> clause..
> >>
> >> and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;
> >>
> >> so that if the instance count was 4 then the instance IDs would run
> >> 0,1,2,3.
> >>
> >> I'm not quite sure how you would structure that using the queryAPI. Any
> >> suggestions about that?
> >>
> >> And there are some problems with this idea, as you have to be certain
> your
> >> IDs increase in a manner that aligns with your math so that the
> >> partitioning is equal in size.
> >> For example if your sequence increments by 20, then you would have to
> futz
> >> around with your math to get the right partitioning and that is the
> problem
> >> with this technique.
> >> It's brittle it depends on getting a bunch of things in  "sync".
> >>
> >> Does anyone have another idea of how to segment out rows that would
> yield a
> >> solution that's not quite so brittle?
> >>
> >>
> >>
> >> Tony Giaccone
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning a query result..

Andrus Adamchik
Actually this is an interesting architectural discussion. Speaking for myself, I certainly like having it here.

The 2 main approaches have already been mentioned:

1. Single dispatcher -> message queue -> multiple workers.
2. Multiple workers that somehow guess their part of the workload.

Both can be made to work. Generally I like #1 as it is not brittle at all. This is how work is parallelized on the cloud essentially. Dispatcher instance would poll the DB and post a stream of IDs to the queue. Workers would grab the ids from the queue and do their processing. Worker instances can come and go. A good choice for the message queue is Apache Kafka, that supports automatically spreading messages to multiple consumers (and yes there's Bootique support for it). If your can make your dispatcher fast (and I don't see why you can't... fetching 10K IDs can be done in milliseconds with proper DB indexes), you can keep adding as many workers as needed.

So, to confirm your scenario:

* On each job run do you need to reprocess previously seen records, or do you only care about new records since the last run?
* On a single instance, do you have an idea of "the main query time" vs "processing time + any extra queries and commits"?

Andrus


> On Dec 16, 2016, at 8:50 PM, Giaccone, Tony <[hidden email]> wrote:
>
> Right so I agree with the partitioning of the database, that's a thing that
> can be done.
>
> Andrus, I'm a bit less confident in the proposal you're suggesting. I want
> to be able to spin up new instances potentially in new containers and run
> them in different environments. If we're moving to a cloud based
> infrastructure, then paralyzing in a single app doesn't match up with that
> kind of deployment. I recognize there are limits on my solution as well.
> You have to deal with how you split up the rows into partitions.
>
> The problem generally stated is. If I have 10,000 records and I want to
> distribute them across N number of workers. How do I do that?  How can I
> partition the result set at run time, into an arbitrary number of workers?
>
> I also realize this is quickly expanding out side the scope of the cayenne
> users mailing list.
>
> On Thu, Dec 15, 2016 at 3:18 AM, Andrus Adamchik <[hidden email]>
> wrote:
>
>> Here is another idea:
>>
>> * read all data in one thread using iterated query and DataRows
>> * append received rows to an in-memory queue (individually or in small
>> batches)
>> * run a thread pool of processors that read from the queue and do the work.
>>
>> As with all things performance, this needs to be measured and compared
>> with a single-threaded base line. This will not help with IO bottleneck,
>> but the processing part will happen in parallel. If you see any Cayenne
>> bottlenecks during the last step, you can start multiple ServerRuntimes -
>> one per thread.
>>
>> Andrus
>>
>>> On Dec 15, 2016, at 3:06 AM, John Huss <[hidden email]> wrote:
>>>
>>> Unless your DB disk is stripped into at least four parts this won't be
>>> faster.
>>> On Wed, Dec 14, 2016 at 5:46 PM Tony Giaccone <[hidden email]>
>> wrote:
>>>
>>>> I want to speed thing up, by running multiple instances of a job that
>>>> fetches data from a table.  So that for example if I need to process
>> 10,000
>>>> rows
>>>> the query runs on each instance and returns 4 sets of 2500 rows one for
>>>> each instance with no duplication.
>>>>
>>>> My first thought in SQL was to add something like this to the where
>>>> clause..
>>>>
>>>> and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;
>>>>
>>>> so that if the instance count was 4 then the instance IDs would run
>>>> 0,1,2,3.
>>>>
>>>> I'm not quite sure how you would structure that using the queryAPI. Any
>>>> suggestions about that?
>>>>
>>>> And there are some problems with this idea, as you have to be certain
>> your
>>>> IDs increase in a manner that aligns with your math so that the
>>>> partitioning is equal in size.
>>>> For example if your sequence increments by 20, then you would have to
>> futz
>>>> around with your math to get the right partitioning and that is the
>> problem
>>>> with this technique.
>>>> It's brittle it depends on getting a bunch of things in  "sync".
>>>>
>>>> Does anyone have another idea of how to segment out rows that would
>> yield a
>>>> solution that's not quite so brittle?
>>>>
>>>>
>>>>
>>>> Tony Giaccone
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning a query result..

Tony Giaccone-3
So the essential bit of this that's not perhaps been exposed is that we're
publishing this data to google's pubsub.  The intent is to generate the
events as a result of actions taken in the main application and store the
event data in a new event database  Then a periodic job, reads the data
from the database/table and pushes the data into the pubsub system.  One of
the other interesting aspects is that we are using avro to serialize the
event into a blob of json. The tables then store the event audit data (who,
where, when)  and the blob. The publisher pulls the blob and a has that
identifies the schema of the json mesg and publishes it to pubsub updating
the event to indicate it's been published.

Ideally we would just publish the events to pubsub as they happen from the
main app. It's not clear to me why we're using the DB as a temporary
storage location, but that's the deal for now.


   - In my POC, I pushed 10,000 events into the event queues using cayenne
   in about 13 seconds (a completely unoptimized local postgress db, event
   data generated randomly, app and db both running on my laptop).
   - If I avoid the pub sub step, I can then process those events, read
   them from the db, update the is_published field and write them back out in
   17 seconds.
   - If I include the pubsub step. then that 17 seconds expands to 10
   minutes.

So you see it's the single dispatcher -> message queue part that I want to
parallelize because that's where I think I could get the biggest speed up
now.  If pushing to pubsub is slow, and the results are guaranteed to be
retrieved out of order, it doesn't matter in what order I push them to
pubsub.


Tony

On Fri, Dec 16, 2016 at 1:35 PM, Andrus Adamchik <[hidden email]>
wrote:

> Actually this is an interesting architectural discussion. Speaking for
> myself, I certainly like having it here.
>
> The 2 main approaches have already been mentioned:
>
> 1. Single dispatcher -> message queue -> multiple workers.
> 2. Multiple workers that somehow guess their part of the workload.
>
> Both can be made to work. Generally I like #1 as it is not brittle at all.
> This is how work is parallelized on the cloud essentially. Dispatcher
> instance would poll the DB and post a stream of IDs to the queue. Workers
> would grab the ids from the queue and do their processing. Worker instances
> can come and go. A good choice for the message queue is Apache Kafka, that
> supports automatically spreading messages to multiple consumers (and yes
> there's Bootique support for it). If your can make your dispatcher fast
> (and I don't see why you can't... fetching 10K IDs can be done in
> milliseconds with proper DB indexes), you can keep adding as many workers
> as needed.
>
> So, to confirm your scenario:
>
> * On each job run do you need to reprocess previously seen records, or do
> you only care about new records since the last run?
> * On a single instance, do you have an idea of "the main query time" vs
> "processing time + any extra queries and commits"?
>
> Andrus
>
>
> > On Dec 16, 2016, at 8:50 PM, Giaccone, Tony <
> [hidden email]> wrote:
> >
> > Right so I agree with the partitioning of the database, that's a thing
> that
> > can be done.
> >
> > Andrus, I'm a bit less confident in the proposal you're suggesting. I
> want
> > to be able to spin up new instances potentially in new containers and run
> > them in different environments. If we're moving to a cloud based
> > infrastructure, then paralyzing in a single app doesn't match up with
> that
> > kind of deployment. I recognize there are limits on my solution as well.
> > You have to deal with how you split up the rows into partitions.
> >
> > The problem generally stated is. If I have 10,000 records and I want to
> > distribute them across N number of workers. How do I do that?  How can I
> > partition the result set at run time, into an arbitrary number of
> workers?
> >
> > I also realize this is quickly expanding out side the scope of the
> cayenne
> > users mailing list.
> >
> > On Thu, Dec 15, 2016 at 3:18 AM, Andrus Adamchik <[hidden email]
> >
> > wrote:
> >
> >> Here is another idea:
> >>
> >> * read all data in one thread using iterated query and DataRows
> >> * append received rows to an in-memory queue (individually or in small
> >> batches)
> >> * run a thread pool of processors that read from the queue and do the
> work.
> >>
> >> As with all things performance, this needs to be measured and compared
> >> with a single-threaded base line. This will not help with IO bottleneck,
> >> but the processing part will happen in parallel. If you see any Cayenne
> >> bottlenecks during the last step, you can start multiple ServerRuntimes
> -
> >> one per thread.
> >>
> >> Andrus
> >>
> >>> On Dec 15, 2016, at 3:06 AM, John Huss <[hidden email]> wrote:
> >>>
> >>> Unless your DB disk is stripped into at least four parts this won't be
> >>> faster.
> >>> On Wed, Dec 14, 2016 at 5:46 PM Tony Giaccone <[hidden email]>
> >> wrote:
> >>>
> >>>> I want to speed thing up, by running multiple instances of a job that
> >>>> fetches data from a table.  So that for example if I need to process
> >> 10,000
> >>>> rows
> >>>> the query runs on each instance and returns 4 sets of 2500 rows one
> for
> >>>> each instance with no duplication.
> >>>>
> >>>> My first thought in SQL was to add something like this to the where
> >>>> clause..
> >>>>
> >>>> and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;
> >>>>
> >>>> so that if the instance count was 4 then the instance IDs would run
> >>>> 0,1,2,3.
> >>>>
> >>>> I'm not quite sure how you would structure that using the queryAPI.
> Any
> >>>> suggestions about that?
> >>>>
> >>>> And there are some problems with this idea, as you have to be certain
> >> your
> >>>> IDs increase in a manner that aligns with your math so that the
> >>>> partitioning is equal in size.
> >>>> For example if your sequence increments by 20, then you would have to
> >> futz
> >>>> around with your math to get the right partitioning and that is the
> >> problem
> >>>> with this technique.
> >>>> It's brittle it depends on getting a bunch of things in  "sync".
> >>>>
> >>>> Does anyone have another idea of how to segment out rows that would
> >> yield a
> >>>> solution that's not quite so brittle?
> >>>>
> >>>>
> >>>>
> >>>> Tony Giaccone
> >>>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning a query result..

Andrus Adamchik
Ah ok, so sending to pubsub is the bottleneck here. An ideal solution would have been replacing the entire event DB with Kafka. There'll be fewer moving parts, you can consume the events with any number of parallel consumers and Kafka will care of spreading consumption. But as you said, this is not an option.

I guess your original solution partitioning with MOD is the easiest under the circumstances. Unless you are willing to add another Kafka queue to the process for the purpose of feeding the IO-bound consumers that send to pubsub. This will also work great, considering external IO is the bottleneck, and not the DB or Cayenne.

Andrus


> On Dec 16, 2016, at 10:54 PM, Giaccone, Tony <[hidden email]> wrote:
>
> So the essential bit of this that's not perhaps been exposed is that we're
> publishing this data to google's pubsub.  The intent is to generate the
> events as a result of actions taken in the main application and store the
> event data in a new event database  Then a periodic job, reads the data
> from the database/table and pushes the data into the pubsub system.  One of
> the other interesting aspects is that we are using avro to serialize the
> event into a blob of json. The tables then store the event audit data (who,
> where, when)  and the blob. The publisher pulls the blob and a has that
> identifies the schema of the json mesg and publishes it to pubsub updating
> the event to indicate it's been published.
>
> Ideally we would just publish the events to pubsub as they happen from the
> main app. It's not clear to me why we're using the DB as a temporary
> storage location, but that's the deal for now.
>
>
>   - In my POC, I pushed 10,000 events into the event queues using cayenne
>   in about 13 seconds (a completely unoptimized local postgress db, event
>   data generated randomly, app and db both running on my laptop).
>   - If I avoid the pub sub step, I can then process those events, read
>   them from the db, update the is_published field and write them back out in
>   17 seconds.
>   - If I include the pubsub step. then that 17 seconds expands to 10
>   minutes.
>
> So you see it's the single dispatcher -> message queue part that I want to
> parallelize because that's where I think I could get the biggest speed up
> now.  If pushing to pubsub is slow, and the results are guaranteed to be
> retrieved out of order, it doesn't matter in what order I push them to
> pubsub.
>
>
> Tony
>
> On Fri, Dec 16, 2016 at 1:35 PM, Andrus Adamchik <[hidden email]>
> wrote:
>
>> Actually this is an interesting architectural discussion. Speaking for
>> myself, I certainly like having it here.
>>
>> The 2 main approaches have already been mentioned:
>>
>> 1. Single dispatcher -> message queue -> multiple workers.
>> 2. Multiple workers that somehow guess their part of the workload.
>>
>> Both can be made to work. Generally I like #1 as it is not brittle at all.
>> This is how work is parallelized on the cloud essentially. Dispatcher
>> instance would poll the DB and post a stream of IDs to the queue. Workers
>> would grab the ids from the queue and do their processing. Worker instances
>> can come and go. A good choice for the message queue is Apache Kafka, that
>> supports automatically spreading messages to multiple consumers (and yes
>> there's Bootique support for it). If your can make your dispatcher fast
>> (and I don't see why you can't... fetching 10K IDs can be done in
>> milliseconds with proper DB indexes), you can keep adding as many workers
>> as needed.
>>
>> So, to confirm your scenario:
>>
>> * On each job run do you need to reprocess previously seen records, or do
>> you only care about new records since the last run?
>> * On a single instance, do you have an idea of "the main query time" vs
>> "processing time + any extra queries and commits"?
>>
>> Andrus
>>
>>
>>> On Dec 16, 2016, at 8:50 PM, Giaccone, Tony <
>> [hidden email]> wrote:
>>>
>>> Right so I agree with the partitioning of the database, that's a thing
>> that
>>> can be done.
>>>
>>> Andrus, I'm a bit less confident in the proposal you're suggesting. I
>> want
>>> to be able to spin up new instances potentially in new containers and run
>>> them in different environments. If we're moving to a cloud based
>>> infrastructure, then paralyzing in a single app doesn't match up with
>> that
>>> kind of deployment. I recognize there are limits on my solution as well.
>>> You have to deal with how you split up the rows into partitions.
>>>
>>> The problem generally stated is. If I have 10,000 records and I want to
>>> distribute them across N number of workers. How do I do that?  How can I
>>> partition the result set at run time, into an arbitrary number of
>> workers?
>>>
>>> I also realize this is quickly expanding out side the scope of the
>> cayenne
>>> users mailing list.
>>>
>>> On Thu, Dec 15, 2016 at 3:18 AM, Andrus Adamchik <[hidden email]
>>>
>>> wrote:
>>>
>>>> Here is another idea:
>>>>
>>>> * read all data in one thread using iterated query and DataRows
>>>> * append received rows to an in-memory queue (individually or in small
>>>> batches)
>>>> * run a thread pool of processors that read from the queue and do the
>> work.
>>>>
>>>> As with all things performance, this needs to be measured and compared
>>>> with a single-threaded base line. This will not help with IO bottleneck,
>>>> but the processing part will happen in parallel. If you see any Cayenne
>>>> bottlenecks during the last step, you can start multiple ServerRuntimes
>> -
>>>> one per thread.
>>>>
>>>> Andrus
>>>>
>>>>> On Dec 15, 2016, at 3:06 AM, John Huss <[hidden email]> wrote:
>>>>>
>>>>> Unless your DB disk is stripped into at least four parts this won't be
>>>>> faster.
>>>>> On Wed, Dec 14, 2016 at 5:46 PM Tony Giaccone <[hidden email]>
>>>> wrote:
>>>>>
>>>>>> I want to speed thing up, by running multiple instances of a job that
>>>>>> fetches data from a table.  So that for example if I need to process
>>>> 10,000
>>>>>> rows
>>>>>> the query runs on each instance and returns 4 sets of 2500 rows one
>> for
>>>>>> each instance with no duplication.
>>>>>>
>>>>>> My first thought in SQL was to add something like this to the where
>>>>>> clause..
>>>>>>
>>>>>> and MOD(ID, INSTANCE_COUNT) == INSTANCE_ID;
>>>>>>
>>>>>> so that if the instance count was 4 then the instance IDs would run
>>>>>> 0,1,2,3.
>>>>>>
>>>>>> I'm not quite sure how you would structure that using the queryAPI.
>> Any
>>>>>> suggestions about that?
>>>>>>
>>>>>> And there are some problems with this idea, as you have to be certain
>>>> your
>>>>>> IDs increase in a manner that aligns with your math so that the
>>>>>> partitioning is equal in size.
>>>>>> For example if your sequence increments by 20, then you would have to
>>>> futz
>>>>>> around with your math to get the right partitioning and that is the
>>>> problem
>>>>>> with this technique.
>>>>>> It's brittle it depends on getting a bunch of things in  "sync".
>>>>>>
>>>>>> Does anyone have another idea of how to segment out rows that would
>>>> yield a
>>>>>> solution that's not quite so brittle?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Tony Giaccone
>>>>>>
>>>>
>>>>
>>
>>