python - Consume Kafka continuously and update queue at specific intervals using multiprocessing -
i trying continuously consume events kafka. same application uses consumed data, perform analysis , update database in n-second intervals (assume n = 60 seconds).
in same application, if process1 = kafka consumer , process2= data analysis , database update logic.
process1 run continuously process2 executed once every n=60 seconds
process2
concerned computation , database update , hence take 5-10 seconds execute. not want process1
stall during time process2
executing. hence, using multiprocessing module
(process1,process2
thread1,thread2
if using threading
module in python due have read gil , threading
module not being able leverage multi-core architecture, decided go multiprocessing
module.) achieve concurrency in case. (if understanding of gil
or threading
module limitations mentioned above incorrect, apologies , please feel free correct me).
the application have has simple interaction between 2 processes wherein process1
fills queue messages receives in 60 seconds , @ end of 60 seconds , transfers messages process2
.
i having trouble transfer logic. how transfer contents of queue process1
process2
(i guess main process or process? question have, should instantiate 2 processes in addition main process?) @ end of 60 seconds , subsequently clear queue contents starts again on iteration.
so far have following:
import sys kafka.client import kafkaclient kafka import simpleconsumer import time multiprocessing import process,queue def kafka_init(): client=kafkaclient('kafka1.wpit.nile.works') consumer=simpleconsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod") return consumer def consumemessages(q): print "thread started" while not q.empty(): try: print q.get(true,1) queue.empty: break print "thread ended" if __name__=="__main__": starttime=time.time() timeout=starttime+ 10 #timeout of read in seconds consumer=kafka_init() q=queue() p=process(target=consumemessages,args=q) while(true): q.put(consumer.get_message()) if time.time()>timeout: #transfer logic process1 main process here. print "start time",starttime print "end time",time.time() p.start() p.join() break
any appreciated.
the problem dealing not kafka-specific, i'm going use generic "messages" ints.
the main problem, seems me, on 1 hand want process messages produced, , on other hand want update database every 60 seconds.
if use q.get()
, default method call block until there message available in queue. take longer 60 seconds, delay database update long. can't use blocking q.get
. need use q.get
timeout call non-blocking:
import time import multiprocessing mp import random import queue def process_messages(q): messages = [] start = time.time() while true: try: message = q.get(timeout=1) except queue.empty: pass else: messages.append(message) print('doing data analysis on {}'.format(message)) end = time.time() if end-start > 60: print('updating database: {}'.format(messages)) start = end messages = [] def get_messages(q): while true: time.sleep(random.uniform(0,5)) message = random.randrange(100) q.put(message) if __name__ == "__main__": q = mp.queue() proc1 = mp.process(target=get_messages, args=[q]) proc1.start() proc2 = mp.process(target=process_messages, args=[q]) proc2.start() proc1.join() proc2.join()
produces output such as:
doing data analysis on 38 doing data analysis on 8 doing data analysis on 8 doing data analysis on 66 doing data analysis on 37 updating database: [38, 8, 8, 66, 37] doing data analysis on 27 doing data analysis on 47 doing data analysis on 57 updating database: [27, 47, 57] doing data analysis on 85 doing data analysis on 90 doing data analysis on 86 doing data analysis on 22 updating database: [85, 90, 86, 22] doing data analysis on 8 doing data analysis on 92 doing data analysis on 59 doing data analysis on 40 updating database: [8, 92, 59, 40]
Comments
Post a Comment