Python操作Kafka爬坑

组内做大数据,需要kafka写入数据,最近在看python正好,练练手,网上找了一圈,都是用的pykafka,经过一整圈的安装,最终搞定,代码如下
#coding:u8
import sys
import time
import random
import datetime
import MySQLdb
import codecs
from pykafka import KafkaClient
import logging
import json
import threading
'''
******************
'''
ad=[]
try:
  ini=file("set.txt")
  ad=ini.readline().splitlines()
  ini.close
except Exception as e:
  print "open settings file Error:",type(e)
  ad=["192.168.1.121:9092"]
print "open ini file"
try:
  client = KafkaClient(hosts = ad[0])
  print "Topics:",client.topics
  topic  = client.topics["mytopic"]
except Exception as e:
  print "Opening kafka Error:%s" %(type(e))
  sys.exit(1)
print "before threading"


try:
  with tp.get_sync_producer() as producer:
    producer.produce(str(dct2))
 except Exception as e:
    print "Error:" ,type(e)


  print "ini consumer"
  while 1==1:
    print "nn",type(consumer)
    for message in consumer:
      print "mm"
      if message is not None:
        print message.offset, message.value
except Exception as e:
  print e,type(e)


运行结果,可以列出topic,写入的数据也没有报错信息。但是,消费者取不到数据,无论是kafka直接取,还是python写消费者代码。
后来采用了 kafkapython 正常,代码如下:


#coding:utf-8
import sys
import time
import random
import datetime
import codecs
import kafka.kafkaProducer
import logging
import json
import threading


ad=[]
try:
  ini=file("set.txt")
  ad=ini.readline().splitlines()
  ini.close
except Exception as e:
  ad=["192.168.1.121:9092,192.168.1.122.9092"]
  #print "open settings file Error:%d,%s" %(e.args[0],e.args[1])
  print "Opening settings file Error:",e,type(e)
print "Opened ini file"
'''
try:
  client = KafkaClient(hosts = ad[0])
  print "Topics:",client.topics
  topic  = client.topics["mytopic"]
except Exception as e:
  print "Opening kafka Error:%s" %(e.args[0])
  sys.exit(1)
print "before threading"
'''
try:
  producer = KafkaProducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
except Exception as e:
  print "Opening kafka Error:",e,type(e)
  sys.exit(1)




print "before threading"




threads=[]
for i in range(0,12):
  try:
    threads.append(threading.Thread(target=tf,args=(producer,i)))
    threads[i].start()
  except Exception as e:
    print "Treand error at Thread:%d:%s,%s" %(i,e,type(e))




print "main thread is ended"

代码均有所节略。



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/440175.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apollo自动驾驶入门课程第⑩讲 — 控制(下)

目录 1. 线性二次调节器 2. 模型控制预测 3. 总结 本文转自微信公众号:Apollo开发者社区 原创: 阿波君 Apollo开发者社区 昨天 Apollo自动驾驶课程马上进入尾声,在无人驾驶技术控制篇(上)中,具体讲解了最…

*【ZOJ - 3703】Happy Programming Contest(带优先级的01背包)

题干: In Zhejiang University Programming Contest, a team is called "couple team" if it consists of only two students loving each other. In the contest, the team will get a lovely balloon with unique color for each problem they solved.…

图解算法学习笔记(四):快速排序

目录 1) 示例1: 2)快速排序 3) 再谈大O表示法 4)小结 本章内容:学习分而治之,快速排序 1) 示例1: 假设你是农场主,有一小块土地,你要将这块地均匀分成方…

ThriftParserError: ThriftPy does not support generating module with path in protocol 'd'

使用python连接hive,在 from impala.dbapi import connect 语句报如下错误: ThriftParserError: ThriftPy does not support generating module with path in protocol d 定位到 D:\Anaconda3\Lib\site-packages\thriftpy\parser\parser.py的 if u…

【HDU - 5468】Puzzled Elena(容斥原理,dfs序,数学,素因子分解,有坑)

题干: Problem Description Since both Stefan and Damon fell in love with Elena, and it was really difficult for her to choose. Bonnie, her best friend, suggested her to throw a question to them, and she would choose the one who can solve it.Suppo…

图解算法学习笔记(五):散列表

目录 1)示例1: 2)散列函数 3)应用案例 4)冲突 5)性能 6)小结 本章内容: 学习散列表,最有用的数据结构之一。 学习散列表的内部机制:实现、冲突和散列函…

Ros 消息结构1

1、ROS的消息头信息 #Standard metadata for higher-level flow data types #sequence ID: consecutively increasing ID uint32 seq#Two-integer timestamp that is expressed as: # * stamp.secs: seconds (stamp_secs) since epoch # * stamp.nsecs: nanoseconds since sta…

【HDU - 5475】An easy problem(线段树,思维)

题干: One day, a useless calculator was being built by Kuros. Lets assume that number X is showed on the screen of calculator. At first, X 1. This calculator only supports two types of operation. 1. multiply X with a number. 2. divide X with…

图解算法学习笔记(六):广度优先搜索

目录 1)图简介 2)图是什么 3)广度优先搜索 4)实现图 5)实现算法 6)小结 本章内容; 学习使用新的数据结构图来建立网络模型; 学习广度优先搜索; 学习有向图和无向图…

图解算法学习笔记(七):狄克斯特拉算法

目录 1)使用狄克斯特拉算法 2)术语 3)实现 4)小结 本章内容; 介绍加权图,提高或降低某些边的权重; 介绍狄克斯特拉算法,找出加权图中前往X的最短路径; 介绍图中的环…

【HDU - 5477】A Sweet Journey(思维,水题)

题干: Master Di plans to take his girlfriend for a travel by bike. Their journey, which can be seen as a line segment of length L, is a road of swamps and flats. In the swamp, it takes A point strengths per meter for Master Di to ride; In the f…

[转载]Bluetooth协议栈学习之SDP

原文地址:Bluetooth协议栈学习之SDP作者:BigSam78作者: Sam (甄峰) sam_codehotmail.com SDP(service discovery protocol:服务发现协议)提供了一个方法,让应用程序检测哪些服务是可用的并探测这…

图解算法学习笔记(八):贪婪算法

目录 (1)背包问题 (2)集合覆盖问题 (3)NP完全问题 (4)小结 本章内容: 学习如何处理没有快速算法的问题(NP完全问题)。学习近似算法&#xff…

【CodeForces - 1152C 】Neko does Maths(数学数论,lcm,gcd性质)

题干&#xff1a; 给出a,b<1e9&#xff0c;你要找到最小的k使得lcm(ak,bk)尽可能小&#xff0c;如果有多个k给出同样的最小公倍数&#xff0c;输出最小的一个k。 解题报告&#xff1a; 因为题目中k太多了&#xff0c;先化简一下公式&#xff0c;假设a>b &#xff0c;则…

vs visual studio 2015安装后的几个问题

前言 最近在win7下重新安装了visual studio 2015&#xff0c;没有安装在默认路径下&#xff0c;编译时出现不少问题&#xff0c;整理如下 1.Failed to locate: "CL.exe". The system cannot find the file specified. TRACKER : error TRK0005: Failed to locate: “…

图解算法学习笔记(九):动态规划

目录 &#xff08;1&#xff09;背包问题 &#xff08;2&#xff09;最长公共子串 &#xff08;3&#xff09;小结 本章内容&#xff1a; 学习动态规划&#xff0c;它将问题分成小问题&#xff0c;并先着手解决这些小问题。学习如何设计问题的动态规划解决方案。 &#xff…

Java(win10安装jdk,第一个hello world)

Java 第一步 &#xff1a;安装jdk 推荐默认安装。&#xff08;安装到C盘&#xff09;第二步 &#xff1a;配置jdk环境 JAVA_HOME C:\Program Files\Java\jdk1.8.0_191 JDK的安装路径 Path&#xff1a; C:\Program Files\Java\jdk1.8.0_191\bin JDK下bin目录的路径 &#xf…

【POJ - 3177】Redundant Paths(边双连通分量,去重边)

题干&#xff1a; In order to get from one of the F (1 < F < 5,000) grazing fields (which are numbered 1..F) to another field, Bessie and the rest of the herd are forced to cross near the Tree of Rotten Apples. The cows are now tired of often being f…

#pragma 详解

#pragma 求助编辑 pragma - 必应词典美[prɡmə]英[prɡmə]n.〔计〕杂注网络编译指示&#xff1b;显示编译指示&#xff1b;特殊指令 百科名片 在所有的预处理指令中&#xff0c;#Pragma 指令可能是最复杂的了&#xff0c;它的作用是设定编译器的状态或者是指示编译器完成一些…

1.绪论

目录 &#xff08;1&#xff09;C语言传值与传地址变量 &#xff08;2&#xff09;算法效率的度量 &#xff08;3&#xff09;基本操作 &#xff08;4&#xff09;主函数 主要由实现基本操作和算法的程序构成。这些程序有6类&#xff1a; 数据存储结构&#xff0c;文件名第…