python - Consume Kafka continuously and update queue at specific intervals using multiprocessing -


i trying continuously consume events kafka. same application uses consumed data, perform analysis , update database in n-second intervals (assume n = 60 seconds).

in same application, if process1 = kafka consumer , process2= data analysis , database update logic.

process1 run continuously process2 executed once every n=60 seconds  

process2 concerned computation , database update , hence take 5-10 seconds execute. not want process1 stall during time process2 executing. hence, using multiprocessing module (process1,process2 thread1,thread2 if using threading module in python due have read gil , threading module not being able leverage multi-core architecture, decided go multiprocessing module.) achieve concurrency in case. (if understanding of gil or threading module limitations mentioned above incorrect, apologies , please feel free correct me).

the application have has simple interaction between 2 processes wherein process1 fills queue messages receives in 60 seconds , @ end of 60 seconds , transfers messages process2.

i having trouble transfer logic. how transfer contents of queue process1 process2 (i guess main process or process? question have, should instantiate 2 processes in addition main process?) @ end of 60 seconds , subsequently clear queue contents starts again on iteration.

so far have following:

import sys kafka.client import kafkaclient kafka import simpleconsumer import time multiprocessing import process,queue  def kafka_init():     client=kafkaclient('kafka1.wpit.nile.works')     consumer=simpleconsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")     return consumer  def consumemessages(q):     print "thread started"     while not q.empty():         try:             print q.get(true,1)         queue.empty:             break     print "thread ended" if __name__=="__main__":     starttime=time.time()     timeout=starttime+ 10 #timeout of read in seconds     consumer=kafka_init()     q=queue()     p=process(target=consumemessages,args=q)     while(true):         q.put(consumer.get_message())         if time.time()>timeout:             #transfer logic process1 main process here.             print "start time",starttime             print "end time",time.time()             p.start()             p.join()             break 

any appreciated.

the problem dealing not kafka-specific, i'm going use generic "messages" ints.

the main problem, seems me, on 1 hand want process messages produced, , on other hand want update database every 60 seconds.

if use q.get(), default method call block until there message available in queue. take longer 60 seconds, delay database update long. can't use blocking q.get. need use q.get timeout call non-blocking:

import time import multiprocessing mp import random import queue  def process_messages(q):     messages = []     start = time.time()     while true:         try:             message = q.get(timeout=1)         except queue.empty:             pass         else:             messages.append(message)             print('doing data analysis on {}'.format(message))         end = time.time()         if end-start > 60:             print('updating database: {}'.format(messages))             start = end             messages = []  def get_messages(q):     while true:         time.sleep(random.uniform(0,5))         message = random.randrange(100)         q.put(message)  if __name__ == "__main__":     q = mp.queue()      proc1 = mp.process(target=get_messages, args=[q])     proc1.start()      proc2 = mp.process(target=process_messages, args=[q])     proc2.start()      proc1.join()     proc2.join() 

produces output such as:

doing data analysis on 38 doing data analysis on 8 doing data analysis on 8 doing data analysis on 66 doing data analysis on 37 updating database: [38, 8, 8, 66, 37] doing data analysis on 27 doing data analysis on 47 doing data analysis on 57 updating database: [27, 47, 57] doing data analysis on 85 doing data analysis on 90 doing data analysis on 86 doing data analysis on 22 updating database: [85, 90, 86, 22] doing data analysis on 8 doing data analysis on 92 doing data analysis on 59 doing data analysis on 40 updating database: [8, 92, 59, 40] 

Comments

Popular posts from this blog

c# - Binding a comma separated list to a List<int> in asp.net web api -

Delphi 7 and decode UTF-8 base64 -

html - Is there any way to exclude a single element from the style? (Bootstrap) -