import pikaimport astimport pymongoimport datetimeimport loggingimport timeimport sys,traceback,socket,threadingfrom datetime import datetimefrom time import sleep# configurationMONGODB_NAME = "terrydb"HOSTNAME = ':'+socket.gethostname()QUEUE_NAME = 'hello'MONGODB_URL= 'mongodb://localhost'RABBITMQ_URL='localhost'LOG_FORMAT = ('[%(levelname)s] %(asctime)s %(name)s : %(message)s')LOGGER = logging.getLogger(__name__)class WorkerThread(threading.Thread):def __init__(self,threadID,name,counter):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.counter = counter# init rabbitmq# init mongodb# make rabbitMQ connection and create channeldef initRabbitMQ(self):self.q_conn = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))self.q_channel =self.q_conn.channel()self.q_channel.queue_declare(queue=QUEUE_NAME) # create queue# make mongo db connectiondef initMongoDB(self):self.mongo_conn = pymongo.MongoClient(MONGODB_URL)self.mongo_db = self.mongo_conn[MONGODB_NAME]def onMessage(self,ch,method,properties,body):try:LOGGER.info(str(self.name)+" recevied "+body)#print str(self.name) + "[x] recevied %r" % (body,)json_dict = ast.literal_eval(body) # convert string to dictionary## need to be fixed## specify board name hereself.writeToMongoDB('MYBOARD',json_dict)except ValueError:print 'String parsing error'except:print 'unknown error'traceback.print_exc(file=sys.stdout)def writeToMongoDB(self,boardname,post):# get board names = self.mongo_db[boardname]# generate uuid for the postingpost['_id'] = self.genPostId()try:s.insert(post)except:LOGGER.error(" mongodb insert fail" + str(sys.exc_info()[0]) )traceback.print_exc(file=sys.stdout)# generate post unique id with# format : YYMM{microsecond from this month 1}:{hostname}def genPostId(self):time.sleep(0.001) # intentionally sleep to remove key duplicationdt = datetime.now()year = str(dt.year)[-2:]mon = dt.monthif mon < 10 :mon = '0'+str(mon)else:mon = str(mon)print dt.secondprint dt.microseconduid = year+mon+ str ( int(dt.day * 24 * 60 * 60 + dt.second) * 1000 + dt.microsecond / 1000.0)uid = uid + HOSTNAMEreturn uiddef run(self):LOGGER.info(str(self.name)+" has been started")self.initRabbitMQ()self.initMongoDB()self.q_channel.basic_consume(self.onMessage,queue=QUEUE_NAME,no_ack=True)self.q_channel.start_consuming()#while 1:# time.sleep(0.01)#savePostToDB('helloboard',j)def print_usage():print 'usage : python worker_multithread {number of thread}'exit()def main(argv):if len(argv) <2:print_usage()if argv[1].isdigit() ==False :print_usage()max_thread = int(argv[1])logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)LOGGER.info('Create '+str(max_thread)+' threads ')threadList =[]for i in range(max_thread):t_name = "WorkerThread-"+str(i)t = WorkerThread(i,t_name,i)t.start()threadList.append(t)if __name__ == '__main__':main(sys.argv)
그리드형
'클라우드 컴퓨팅 & NoSQL > MongoDB' 카테고리의 다른 글
mongodb locking (0) | 2014.01.02 |
---|---|
Python을 이용한 간단한 mongodb insert 예제 (1) | 2013.05.04 |
MongoDB의 Physical 데이타 저장 구조 (4) | 2013.05.03 |
MongoDB 30분만에 이해하기.. (설치,테스트 및 자바 샘플) (3) | 2013.04.30 |
MongoDB vs Cassandra Performance (3) | 2012.03.05 |