- 基础
- 查看帮助
- 调试
- pip模块安装
- pip官方安装脚本
- 加载环境变量
- 变量
- 打印
- 字符串 s 整数 d 浮点 f 原样打印 r
- 列表
- 列表元素的个数最多 536870912
- 元组
- 不可变
- 字典
-
- enumerate 使用函数得到索引值和对应值
- tab补全
- history file
-
- 函数
- 模块
- 使用模块中方法
- 类对象的方法
- 实例化初始化的方法
- 有self此函数为方法
- 对象消逝的时候被调用
- 实例化对象
- 使用对象方法
- 继承
- 模块导入使用
- 文件处理
- 模式 读r 写清空整个文件w 追加文件需要存在a 读写r 二进制文件b rbwbrb
- 写文件
- 读文件
- 模式 读r 写清空整个文件w 追加文件需要存在a 读写r 二进制文件b rbwbrb
- 内建函数
- 列表类型内建函数
- 序列类型操作符
- 序列类型相关的模块
- 字典内建方法
- usrbinpython
- 文件对象的属性
- 内建异常
- 触发异常
- 抓取全部错误信息存如字典
- 调试log
- 函数式编程的内建函数
- 编码转换
- 遍历递归
- 常用模块
- 文件处理
- 目录文件
- 文件描述符操作
- ospath模块
- 查询
- 相关模块
- 子进程
- 随机模块
- 发送邮件内容
- usrbinpython
- encodingutf8
- 导入 smtplib 和 MIMEText
- 定义发送列表
- 设置服务器名称用户名密码以及邮件后缀
- 发送邮件函数
- usrbinpython
- encodingutf8
- 设置服务器名称用户名密码以及邮件后缀
- tarextracttmp 全部解压到指定路径
- fwritefile1txt 将文件写入压缩包
- fextractall 解压全部
- encodingutf8
- python aaapy -t file -p etcopt -o aaaaa
- usrbinpython
- ssh
- usrbinevn python
- 使用key把 passwordpassword 换成 pkeymykey
- usrbinpython
- ssh
- mykeyparamikoDSSKeyfrom_private_key_fileprivatekeyfilepassword061128 DSSKey方式 password是key的密码
- usrbinenv python
- encodingutf8
- ssh_concurrentpy
- usrbinpython
- encodingutf8
- config file iplist
- 配置文件IP列表
- 设置服务器名称用户名密码以及邮件后缀
- 执行总计
- 读取配置文件
- 并发执行
- 判断配置文件中注释行跳过
- 取变量其中任意变量未取到就跳过执行
- user_namehost_infosplit1
- user_pwdhost_infosplit2
- try
- portinthost_infosplit3
- except
- port22
- usrbinpython
- encodingutf8
- LzayManagepy
- config file serverlistconf
- 读取配置文件
- 执行总计
- 并发执行
- 判断配置文件中注释行跳过
- 取变量其中任意变量未取到就跳过执行
- user_namehost_infosplit1
- user_pwdhost_infosplit2
- 打印执行结果
- 定义root账号信息
- 配置文件
- usrbinpython
- 注意IP 端口 组默认public oid值
- usrbinpython
- serverpy
- usrbinpython
- clientpy
- usrbinpython
- ftpserverpy
- usrbinpython
- manage_ftppy
- usrbinpython
- ftpclientpy
- usrbinenv python
- apt-get install mysql-server
- apt-get install python-MySQLdb
- connMySQLdbconnectunix_socketvarrunmysqldmysqldsockuserrootpasswd123456 使用socket文件链接
- encodingutf8
- encodingutf8
- encodingutf8
- encodingutf8
- db和collection都是延时创建的在添加Document时才真正创建
- pykestrel
- 读取一组工作
- 流程工作
- kestrel支持memcache协议客户端
- usrlocalbinpython
- 1381125 22133 10000
- pip install tarantool-queue
- date responsereadlines 返回页面所有信息列表
- usrbinenv python
- encodingutf8
- usrbinenv python
- encodingutf8
- data urlliburlencodenamexuesongid30 urllib 的处理参数的方法可以再urllib2中使用
- 百度带Cookie后查看自己的用户
- for i in htmlsplitn
- if bdscommuser in i
- print i
- usrbinenv python
- -- codingutf-8 --
- AppleWebKit53711 KHTML like Gecko Chrome230127164 Safari53711
- Requests是一个Python的HTTP客户端库
- 官方中文文档 httpcnpython-requestsorgzh_CNlatestuserquickstarthtmlid2
- 安装 sudo pip install requests
- get方法提交表单
- get方法带参数 httphttpbinorggetkeyval
- post方法提交表单
- 定制请求头post请求
- https 需登录加auth
- setup any auth-related data here
- modify and return the request
- http httpuserpass10101103128 用户名密码
- 也可以设置环境变量之间访问
- BeautifulSoup中文官方文档
- httpwwwcrummycomsoftwareBeautifulSoupbs3documentationzhhtml
- httpwwwcrummycomsoftwareBeautifulSoupbs4docindexzhhtml
- Beautiful Soup将复杂HTML文档转换成一个复杂的树形结构每个节点都是Python对象所有对象可以归纳为4种 Tag NavigableString BeautifulSoup Comment
- find_all 后循环的值是 Tag 不是字符串 不能直接截取
- usrbinpython
- json file tempjson
- name00_sample_case1 descriptionan example
- encodingutf8
- 查找form标签中的action提交地址
- 获得一个cookieJar实例
- cookieJar作为参数获得一个opener的实例
- 伪装成一个正常的浏览器避免有些web服务器拒绝访问
- 生成Post数据含有登陆用户名密码所有表单内的input中name值
- 以post的方法访问登陆页面访问之后cookieJar会自定保存cookie
- 以带cookie的方式访问页面
- 读取页面源码
- 将图片写到本地
- file1dpng wbwritedata
- 将图片读入内存
- usrbinenv python
- encodingutf-8
- usrbinenv python
- encodingutf-8
- sudo pip install BeautifulSoup
- print linkgettitle linkgethref
- 很遗憾反垃圾邮件联盟改版后加了验证码
- usrbinenv python
- encodingutf-8
- usrbinenv python
- bs4安装 pip install BeautifulSoup4
- 线程安全竞争条件锁死锁检测同步异步阻塞非阻塞epoll非阻塞IO信号量事件线程池生产消费模型伪并发微线程协程
- Stackless Python 是Python编程语言的一个增强版本它使程序员从基于线程的编程方式中获得好处并避免传统线程所带来的性能与复杂度问题Stackless为 Python带来的微线程扩展是一种低开销轻量级的便利工具
- usrbinenv python
- thread_testpy
- 不支持守护进程
- codingutf-8
- os_exit 会把整个进程关闭
- 默认主线程在退出时会等待所有子线程的结束如果希望主线程不等待子线程而是在退出时自动结束所有的子线程就需要设置子线程为后台线程daemon
- usrbinenv python
- encodingutf8
- usrbinenv python
- encodingutf8
- 启动线程后自动执行 run函数其他不可以
- usrbinenv python
- codingutf-8
- 获取ip 及其出现次数
- 目标函数
- 多线程
- 控制多线程
- usrbinpython
- Create new threads
- Fill the queue
- Wait for queue to empty
- Notify threads its time to exit
- Wait for all threads to complete
- usrbinenv python
- encodingutf8
- 加锁使某一时刻只有一个进程 print
- 通过使用Value或者Array把数据存储在一个共享的内存表中
- d和i参数是num和arr用来设置类型d表示一个双精浮点类型i表示一个带符号的整型
- 比共享内存灵活但缓慢
- 支持listdictNamespaceLockSemaphoreBoundedSemaphoreConditionEventQueuealueArray
- httpdormouseholereadthedocsorgenlatest
- html放在 templates js放在 static
- 用来进行网络服务和应用程序的编程虽然 Twisted Matrix 中有大量松散耦合的模块化组件但该框架的中心概念还是非阻塞异步服务器这一思想对于习惯于线程技术或分叉服务器的开发人员来说这是一种新颖的编程风格但它却能在繁重负载的情况下带来极高的效率
- 更加原始的微线程的概念没有调度或者叫做协程这在你需要控制你的代码时很有用你可以自己构造微线程的 调度器也可以使用greenlet实现高级的控制流例如可以重新创建构造器不同于Python的构造器我们的构造器可以嵌套的调用函数而被嵌套的函数也可以 yield 一个值
- 高可伸缩性和epoll非阻塞IO响应快速可处理数千并发连接特别适用用于实时的Web服务
- httpwwwtornadowebcndocumentation
- Python开发的一个快速高层次的屏幕抓取和web抓取框架用于抓取web站点并从页面中提取结构化的数据Scrapy用途广泛可以用于数据挖掘监测和自动化测试
- 将函数结果作为列表可用于循环
- usrbinpython
- 1-70的最小公倍数
- python 2fpy 123456789 4
- list123456789 1 2 3 4 5 6 7 8 9
- usrbinenv python
- JPEG 440 330 RGB
- -- coding cp936 --
- 图片批处理
- 输出路径
- 判断opfile是否存在不存在则创建
- im_ss imresize400400
- im_ss imconvertP
- 取磁盘使用空间
- if j 2
- usrbinpython
- httpwwwadmin10000comdocument2506html
- usrbinpython
- 打印出独立IP并统计独立IP数
- usrbinpython
1 基础
查看帮助
import os
for i in dir(os):
print i # 模块的方法
help(os.path) # 方法的帮助
调试
python -m trace -t aaaaaa.py
pip模块安装
yum install python-pip # centos安装pip
sudo apt-get install python-pip # ubuntu安装pip
pip官方安装脚本
wget https://raw.github.com/pypa/pip/master/contrib/get-pip.py
python get-pip.py
加载环境变量
vim /etc/profile
export PATH=/usr/local/python27/bin:$PATH
. /etc/profile
pip install Package # 安装包 pip install requests
pip show --files Package # 查看安装包时安装了哪些文件
pip show --files Package # 查看哪些包有更新
pip install --upgrade Package # 更新一个软件包
pip uninstall Package # 卸载软件包
变量
r=r’\n’ # 输出时原型打印
u=u’中文’ # 定义为unicode编码
global x # 全局变量
a = 0 or 2 or 1 # 布尔运算赋值,a值为True既不处理后面,a值为2. None、字符串”、空元组()、空列表[],空字典{}、0、空字符串都是false
name = raw_input(“input:”).strip() # 输入字符串变量
num = int(raw_input(“input:”).strip()) # 输入字符串str转为int型
locals() # 所有局部变量组成的字典
locals().values() # 所有局部变量值的列表
os.popen(“date -d @{0} +’%Y-%m-%d %H:%M:%S’”.format(12)).read() # 特殊情况引用变量 {0} 代表第一个参数
打印
字符串 %s 整数 %d 浮点 %f 原样打印 %r
print ‘字符串: %s 整数: %d 浮点: %f 原样打印: %r’ % (‘aa’,2,1.0,’r’)
print ‘abc’, # 有逗号,代表不换行打印,在次打印会接着本行打印
列表
列表元素的个数最多 536870912
shoplist = ['apple', 'mango', 'carrot', 'banana']
shoplist[2] = 'aa'
del shoplist[0]
shoplist.insert('4','www')
shoplist.append('aaa')
shoplist[::-1] # 倒着打印 对字符翻转串有效
shoplist[2::3] # 从第二个开始每隔三个打印
shoplist[:-1] # 排除最后一个
'\t'.join(li) # 将列表转换成字符串
sys.path[1:1]=[5] # 在位置1前面插入列表中一个值
list(set(['qwe', 'as', '123', '123'])) # 将列表通过集合去重复
eval("['1','a']") # 将字符串当表达式求值,得到列表
元组
不可变
zoo = (‘wolf’, ‘elephant’, ‘penguin’)
字典
ab = { 'Swaroop' : 'swaroopch@byteofpython.info',
'Larry' : 'larry@wall.org',
}
ab['c'] = 80 # 添加字典元素
del ab['Larry'] # 删除字典元素
ab.keys() # 查看所有键值
ab.values() # 打印所有值
ab.has_key('a') # 查看键值是否存在
ab.items() # 返回整个字典列表
```
### 复制字典
a = {1: {1: 2, 3: 4}}
b = a
b[1][1] = 8888 # a和b都为 {1: {1: 8888, 3: 4}}
import copy
c = copy.deepcopy(a) # 再次赋值 b[1][1] = 9999 拷贝字典为新的字典,互不干扰
a[2] = copy.deepcopy(a[1]) # 复制出第二个key,互不影响 {1: {1: 2, 3: 4},2: {1: 2, 3: 4}}
## 流程结构
### if判断
### 布尔值操作符 and or not 实现多重判断
if a == b:
print ‘==’
elif a < b:
print b
else:
print a
fi
### while循环
while True:
if a == b:
print “==”
break
print “!=”
else:
print ‘over’
count=0
while(count b:
print a
return a
else:
print b
return b
x = 5
y = 7
printMax(x, y)
def update(*args,**kwargs):
p=''
for i,t in kwargs.items():
p = p+ '%s=%s,' %(i,str(t))
sql = "update 'user' set (%s) where (%s)" %(args[0],p)
print sql
update('aaa',uu='uu',id=3)
模块
# Filename: mymodule.py
def sayhi():
print 'mymodule'
version = '0.1'
使用模块中方法
import mymodule
from mymodule import sayhi, version
mymodule.sayhi() # 使用模块中函数方法
类对象的方法
class Person:
实例化初始化的方法
def __init__(self, name ,age):
self.name = name
self.age = age
print self.name
有self此函数为方法
def sayHi(self):
print ‘Hello, my name is’, self.name
对象消逝的时候被调用
def del(self):
print ‘over’
实例化对象
p = Person(‘Swaroop’)
使用对象方法
p.sayHi()
继承
class Teacher(Person):
def __init__(self, name, age, salary):
Person.__init__(self, name, age)
self.salary = salary
print '(Initialized Teacher: %s)' % self.name
def tell(self):
Person.tell(self)
print 'Salary: "%d"' % self.salary
t = Teacher('Mrs. Shrividya', 40, 30000)
执行模块类中的所有方法
# moniItems.py
import sys, time
import inspect
class mon:
def __init__(self, n):
self.name = n
self.data = dict()
def run(self):
print 'hello', self.name
return self.runAllGet()
def getDisk(self):
return 222
def getCpu(self):
return 111
def runAllGet(self):
for fun in inspect.getmembers(self, predicate=inspect.ismethod):
print fun[0], fun[1]
if fun[0][:3] == 'get':
self.data[fun[0][3:]] = fun[1]()
print self.data
return self.data
模块导入使用
from moniItems import mon
m = mon()
m.runAllGet()
文件处理
模式: 读’r’ 写[清空整个文件]’w’ 追加[文件需要存在]’a’ 读写’r+’ 二进制文件’b’ ‘rb’,’wb’,’rb+’
写文件
i={'ddd':'ccc'}
f = file('poem.txt', 'a')
f.write("string")
f.write(str(i))
f.flush()
f.close()
读文件
f = file('/etc/passwd','r')
c = f.read().strip() # 读取为一个大字符串,并去掉最后一个换行符
for i in c.spilt('\n'): # 用换行符切割字符串得到列表循环每行
print i
f.close()
读文件1
f = file('/etc/passwd','r')
while True:
line = f.readline() # 返回一行
if len(line) == 0:
break
x = line.split(":") # 冒号分割定义序列
#x = [ x for x in line.split(":") ] # 冒号分割定义序列
#x = [ x.split("/") for x in line.split(":") ] # 先冒号分割,在/分割 打印x[6][1]
print x[6],"\n",
f.close()
读文件2
f = file('/etc/passwd')
c = f.readlines() # 读入所有文件内容,可反复读取,大文件时占用内存较大
for line in c:
print line.rstrip(),
f.close()
读文件3
for i in open('b.txt'): # 直接读取也可迭代,并有利于大文件读取,但不可反复读取
print i,
追加日志
log = open('/home/peterli/xuesong','a')
print >> log,'faaa'
log.close()
with读文件
with open('a.txt') as f:
for i in f:
print i
print f.read() # 打印所有内容为字符串
print f.readlines() # 打印所有内容按行分割的列表
csv读配置文件
192.168.1.5,web # 配置文件按逗号分割
list = csv.reader(file('a.txt'))
for line in list:
print line # ['192.168.1.5', 'web']
内建函数
dir(sys) # 显示对象的属性
help(sys) # 交互式帮助
int(obj) # 转型为整形
str(obj) # 转为字符串
len(obj) # 返回对象或序列长度
open(file,mode) # 打开文件 #mode (r 读,w 写, a追加)
range(0,3) # 返回一个整形列表
raw_input("str:") # 等待用户输入
type(obj) # 返回对象类型
abs(-22) # 绝对值
random # 随机数
choice() # 随机返回给定序列的一个元素
divmod(x,y) # 函数完成除法运算,返回商和余数。
round(x[,n]) # 函数返回浮点数x的四舍五入值,如给出n值,则代表舍入到小数点后的位数
strip() # 是去掉字符串两端多于空格,该句是去除序列中的所有字串两端多余的空格
del # 删除列表里面的数据
cmp(x,y) # 比较两个对象 #根据比较结果返回一个整数,如果xy,则返回1,如果x==y则返回0
max() # 字符串中最大的字符
min() # 字符串中最小的字符
sorted() # 对序列排序
reversed() # 对序列倒序
enumerate() # 返回索引位置和对应的值
sum() # 总和
list() # 变成列表可用于迭代
eval('3+4') # 将字符串当表达式求值 得到7
exec 'a=100' # 将字符串按python语句执行
exec(a+'=new') # 将变量a的值作为新的变量
tuple() # 变成元组可用于迭代 #一旦初始化便不能更改的数据结构,速度比list快
zip(s,t) # 返回一个合并后的列表 s = ['11','22'] t = ['aa','bb'] [('11', 'aa'), ('22', 'bb')]
isinstance(object,int) # 测试对象类型 int
xrange([lower,]stop[,step]) # 函数与range()类似,但xrnage()并不创建列表,而是返回一个xrange对象
字符串相关模块
string # 字符串操作相关函数和工具
re # 正则表达式
struct # 字符串和二进制之间的转换
c/StringIO # 字符串缓冲对象,操作方法类似于file对象
base64 # Base16\32\64数据编解码
codecs # 解码器注册和基类
crypt # 进行单方面加密
difflib # 找出序列间的不同
hashlib # 多种不同安全哈希算法和信息摘要算法的API
hma # HMAC信息鉴权算法的python实现
md5 # RSA的MD5信息摘要鉴权
rotor # 提供多平台的加解密服务
sha # NIAT的安全哈希算法SHA
stringprep # 提供用于IP协议的Unicode字符串
textwrap # 文本包装和填充
unicodedate # unicode数据库
列表类型内建函数
list.append(obj) # 向列表中添加一个对象obj
list.count(obj) # 返回一个对象obj在列表中出现的次数
list.extend(seq) # 把序列seq的内容添加到列表中
list.index(obj,i=0,j=len(list)) # 返回list[k] == obj 的k值,并且k的范围在i ')
if len(s) < 3:
raise ShortInputException(len(s), 3) # 触发异常
except EOFError:
print '\nWhy did you do an EOF on me?'
except ShortInputException, x: # 捕捉指定错误信息
print 'ShortInputException: %d | %d' % (x.length, x.atleast)
except Exception as err: # 捕捉所有其它错误信息内容
print str(err)
except urllib2.HTTPError as err: # 捕捉外部导入模块的错误
except: # 捕捉所有其它错误 不会看到错误内容
print 'except'
finally: # 无论什么情况都会执行 关闭文件或断开连接等
print 'finally'
else: # 无任何异常 无法和finally同用
print 'No exception was raised.'
不可捕获的异常
NameError: # 尝试访问一个未申明的变量
ZeroDivisionError: # 除数为零
SyntaxErrot: # 解释器语法错误
IndexError: # 请求的索引元素超出序列范围
KeyError: # 请求一个不存在的字典关键字
IOError: # 输入/输出错误
AttributeError: # 尝试访问未知的对象属性
ImportError # 没有模块
IndentationError # 语法缩进错误
KeyboardInterrupt # ctrl+C
SyntaxError # 代码语法错误
ValueError # 值错误
TypeError # 传入对象类型与要求不符合
内建异常
BaseException # 所有异常的基类
SystemExit # python解释器请求退出
KeyboardInterrupt # 用户中断执行
Exception # 常规错误的基类
StopIteration # 迭代器没有更多的值
GeneratorExit # 生成器发生异常来通知退出
StandardError # 所有的内建标准异常的基类
ArithmeticError # 所有数值计算错误的基类
FloatingPointError # 浮点计算错误
OverflowError # 数值运算超出最大限制
AssertionError # 断言语句失败
AttributeError # 对象没有这个属性
EOFError # 没有内建输入,到达EOF标记
EnvironmentError # 操作系统错误的基类
IOError # 输入/输出操作失败
OSError # 操作系统错误
WindowsError # windows系统调用失败
ImportError # 导入模块/对象失败
KeyboardInterrupt # 用户中断执行(通常是ctrl+c)
LookupError # 无效数据查询的基类
IndexError # 序列中没有此索引(index)
KeyError # 映射中没有这个键
MemoryError # 内存溢出错误(对于python解释器不是致命的)
NameError # 未声明/初始化对象(没有属性)
UnboundLocalError # 访问未初始化的本地变量
ReferenceError # 若引用试图访问已经垃圾回收了的对象
RuntimeError # 一般的运行时错误
NotImplementedError # 尚未实现的方法
SyntaxError # python语法错误
IndentationError # 缩进错误
TabError # tab和空格混用
SystemError # 一般的解释器系统错误
TypeError # 对类型无效的操作
ValueError # 传入无效的参数
UnicodeError # Unicode相关的错误
UnicodeDecodeError # Unicode解码时的错误
UnicodeEncodeError # Unicode编码时的错误
UnicodeTranslateError # Unicode转换时错误
Warning # 警告的基类
DeprecationWarning # 关于被弃用的特征的警告
FutureWarning # 关于构造将来语义会有改变的警告
OverflowWarning # 旧的关于自动提升为长整形的警告
PendingDeprecationWarning # 关于特性将会被废弃的警告
RuntimeWarning # 可疑的运行时行为的警告
SyntaxWarning # 可疑的语法的警告
UserWarning # 用户代码生成的警告
触发异常
raise exclass # 触发异常,从exclass生成一个实例(不含任何异常参数)
raise exclass() # 触发异常,但现在不是类;通过函数调用操作符(function calloperator:"()")作用于类名生成一个新的exclass实例,同样也没有异常参数
raise exclass, args # 触发异常,但同时提供的异常参数args,可以是一个参数也可以是元组
raise exclass(args) # 触发异常,同上
raise exclass, args, tb # 触发异常,但提供一个跟踪记录(traceback)对象tb供使用
raise exclass,instance # 通过实例触发异常(通常是exclass的实例)
raise instance # 通过实例触发异常;异常类型是实例的类型:等价于raise instance.__class__, instance
raise string # 触发字符串异常
raise string, srgs # 触发字符串异常,但触发伴随着args
raise string,args,tb # 触发字符串异常,但提供一个跟踪记录(traceback)对象tb供使用
raise # 重新触发前一个异常,如果之前没有异常,触发TypeError
跟踪异常栈
traceback 获取异常相关数据都是通过sys.exc_info()函数得到的
import traceback
import sys
try:
s = raw_input()
print int(s)
except ValueError:
sys.exc_info() 返回值是元组,第一个exc_type是异常的对象类型,exc_value是异常的值,exc_tb是一个traceback对象,对象中包含出错的行数、位置等数据
exc_type, exc_value, exc_tb = sys.exc_info()
print "\n%s \n %s \n %s\n" %(exc_type, exc_value, exc_tb )
traceback.print_exc() # 打印栈跟踪信息
抓取全部错误信息存如字典
import sys, traceback
try:
s = raw_input()
int(s)
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_details = {
'filename': exc_traceback.tb_frame.f_code.co_filename,
'lineno' : exc_traceback.tb_lineno,
'name' : exc_traceback.tb_frame.f_code.co_name,
'type' : exc_type.__name__,
'message' : exc_value.message,
}
del(exc_type, exc_value, exc_traceback)
print traceback_details
f = file('test1.txt', 'a')
f.write("%s %s %s %s %s\n" %(traceback_details['filename'],traceback_details['lineno'],traceback_details['name'],traceback_details['type'],traceback_details['message'], ))
f.flush()
f.close()
调试log
cgitb覆盖了默认sys.excepthook全局异常拦截器
def func(a, b):
return a / b
if __name__ == '__main__':
import cgitb
cgitb.enable(format='text')
func(1, 0)
函数式编程的内建函数
apply(func[,nkw][,kw]) # 用可选的参数来调用func,nkw为非关键字参数,kw为关键字参数;返回值是函数调用的返回值
filter(func,seq) # 调用一个布尔函数func来迭代遍历每个seq中的元素;返回一个使func返回值为true的元素的序列
map(func,seq1[,seq2]) # 将函数func作用于给定序列(s)的每个元素,并用一个列表来提供返回值;如果func为None,func表现为一个身份函数,返回一个含有每个序列中元素集合的n个元组的列表
reduce(func,seq[,init]) # 将二元函数作用于seq序列的元素,每次携带一堆(先前的结果以及下一个序列元素),连续地将现有的结果和下一个值作用在获得的随后的结果上,最后减少我们的序列为一个单一的返回值;如果初始值init给定,第一个比较会是init和第一个序列元素而不是序列的头两个元素
# filter 即通过函数方法只保留结果为真的值组成列表
def f(x): return x % 2 != 0 and x % 3 != 0
f(3) # 函数结果是False 3被filter抛弃
f(5) # 函数结果是True 5被加入filter最后的列表结果
filter(f, range(2, 25))
[5, 7, 11, 13, 17, 19, 23]
# map 通过函数对列表进行处理得到新的列表
def cube(x): return x*x*x
map(cube, range(1, 11))
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]
# reduce 通过函数会先接收初始值和序列的第一个元素,然后是返回值和下一个元素,依此类推
def add(x,y): return x+y
reduce(add, range(1, 11)) # 结果55 是1到10的和 x的值是上一次函数返回的结果,y是列表中循环的值
re正则
compile(pattern,flags=0) # 对正则表达式模式pattern进行编译,flags是可选标识符,并返回一个regex对象
match(pattern,string,flags=0) # 尝试用正则表达式模式pattern匹配字符串string,flags是可选标识符,如果匹配成功,则返回一个匹配对象;否则返回None
search(pattern,string,flags=0) # 在字符串string中搜索正则表达式模式pattern的第一次出现,flags是可选标识符,如果匹配成功,则返回一个匹配对象;否则返回None
findall(pattern,string[,flags]) # 在字符串string中搜索正则表达式模式pattern的所有(非重复)出现:返回一个匹配对象的列表 # pattern=u'\u4e2d\u6587' 代表UNICODE
finditer(pattern,string[,flags]) # 和findall()相同,但返回的不是列表而是迭代器;对于每个匹配,该迭代器返回一个匹配对象
split(pattern,string,max=0) # 根据正则表达式pattern中的分隔符把字符string分割为一个列表,返回成功匹配的列表,最多分割max次(默认所有)
sub(pattern,repl,string,max=0) # 把字符串string中所有匹配正则表达式pattern的地方替换成字符串repl,如果max的值没有给出,则对所有匹配的地方进行替换(subn()会返回一个表示替换次数的数值)
group(num=0) # 返回全部匹配对象(或指定编号是num的子组)
groups() # 返回一个包含全部匹配的子组的元组(如果没匹配成功,返回一个空元组)
例子
re.findall(r'a[be]c','123abc456eaec789') # 返回匹配对象列表 ['abc', 'aec']
re.findall("(.)12[34](..)",a) # 取出匹配括号中内容 a='qedqwe123dsf'
re.search("(.)123",a ).group(1) # 搜索匹配的取第1个标签
re.match("^(1|2) *(.*) *abc$", str).group(2) # 取第二个标签
re.match("^(1|2) *(.*) *abc$", str).groups() # 取所有标签
re.sub('[abc]','A','alex') # 替换
for i in re.finditer(r'\d+',s): # 迭代
print i.group(),i.span() #
搜索网页中UNICODE格式的中文
QueryAdd='http://www.anti-spam.org.cn/Rbl/Query/Result'
Ip='222.129.184.52'
s = requests.post(url=QueryAdd, data={'IP':Ip})
re.findall(u'\u4e2d\u56fd', s.text, re.S)
编码转换
a='中文' # 编码未定义按输入终端utf8或gbk
u=u'中文' # 定义为unicode编码 u值为 u'\u4e2d\u6587'
u.encode('utf8') # 转为utf8格式 u值为 '\xe4\xb8\xad\xe6\x96\x87'
print u # 结果显示 中文
print u.encode('utf8') # 转为utf8格式,当显示终端编码为utf8 结果显示 中文 编码不一致则乱码
print u.encode('gbk') # 当前终端为utf8 故乱码
ord('4') # 字符转ASCII码
chr(52) # ASCII码转字符
遍历递归
[os.path.join(x[0],y) for x in os.walk('/root/python/5') for y in x[2]]
for i in os.walk('/root/python/5/work/server'):
print i
2 常用模块
sys
sys.argv # 取参数列表
sys.exit(2) # 退出脚本返回状态 会被try截取
sys.exc_info() # 获取当前正在处理的异常类
sys.version # 获取Python解释程序的版本信息
sys.maxint # 最大的Int值 9223372036854775807
sys.maxunicode # 最大的Unicode值
sys.modules # 返回系统导入的模块字段,key是模块名,value是模块
sys.path # 返回模块的搜索路径,初始化时使用PYTHONPATH环境变量的值
sys.platform # 返回操作系统平台名称
sys.stdout # 标准输出
sys.stdin # 标准输入
sys.stderr # 错误输出
sys.exec_prefix # 返回平台独立的python文件安装的位置
sys.stdin.readline() # 从标准输入读一行
sys.stdout.write("a") # 屏幕输出a
os
# 相对sys模块 os模块更为底层 os._exit() try无法抓取
os.popen('id').read() # 执行系统命令得到返回结果
os.system() # 得到返回状态 返回无法截取
os.name # 返回系统平台 Linux/Unix用户是'posix'
os.getenv() # 读取环境变量
os.putenv() # 设置环境变量
os.getcwd() # 当前工作路径
os.chdir() # 改变当前工作目录
os.walk('/root/') # 递归路径
文件处理
mkfifo()/mknod() # 创建命名管道/创建文件系统节点
remove()/unlink() # 删除文件
rename()/renames() # 重命名文件
*stat() # 返回文件信息
symlink() # 创建符号链接
utime() # 更新时间戳
tmpfile() # 创建并打开('w+b')一个新的临时文件
walk() # 遍历目录树下的所有文件名
目录/文件
chdir()/fchdir() # 改变当前工作目录/通过一个文件描述符改变当前工作目录
chroot() # 改变当前进程的根目录
listdir() # 列出指定目录的文件
getcwd()/getcwdu() # 返回当前工作目录/功能相同,但返回一个unicode对象
mkdir()/makedirs() # 创建目录/创建多层目录
rmdir()/removedirs() # 删除目录/删除多层目录
```
### 访问/权限
saccess() # 检验权限模式
chmod() # 改变权限模式
chown()/lchown() # 改变owner和groupID功能相同,但不会跟踪链接
umask() # 设置默认权限模式
“`
文件描述符操作
open() # 底层的操作系统open(对于稳健,使用标准的内建open()函数)
read()/write() # 根据文件描述符读取/写入数据 按大小读取文件部分内容
dup()/dup2() # 复制文件描述符号/功能相同,但是复制到另一个文件描述符
```
### 设备号
makedev() # 从major和minor设备号创建一个原始设备号
major()/minor() # 从原始设备号获得major/minor设备号
“`
os.path模块
os.path.expanduser('~/.ssh/key') # 家目录下文件的全路径
分隔
os.path.basename() # 去掉目录路径,返回文件名
os.path.dirname() # 去掉文件名,返回目录路径
os.path.join() # 将分离的各部分组合成一个路径名
os.path.spllt() # 返回(dirname(),basename())元组
os.path.splitdrive() # 返回(drivename,pathname)元组
os.path.splitext() # 返回(filename,extension)元组
```
### 信息
os.path.getatime() # 返回最近访问时间
os.path.getctime() # 返回文件创建时间
os.path.getmtime() # 返回最近文件修改时间
os.path.getsize() # 返回文件大小(字节)
“`
查询
os.path.exists() # 指定路径(文件或目录)是否存在
os.path.isabs() # 指定路径是否为绝对路径
os.path.isdir() # 指定路径是否存在且为一个目录
os.path.isfile() # 指定路径是否存在且为一个文件
os.path.islink() # 指定路径是否存在且为一个符号链接
os.path.ismount() # 指定路径是否存在且为一个挂载点
os.path.samefile() # 两个路径名是否指向同一个文件
相关模块
base64 # 提供二进制字符串和文本字符串间的编码/解码操作
binascii # 提供二进制和ASCII编码的二进制字符串间的编码/解码操作
bz2 # 访问BZ2格式的压缩文件
csv # 访问csv文件(逗号分隔文件)
csv.reader(open(file))
filecmp # 用于比较目录和文件
fileinput # 提供多个文本文件的行迭代器
getopt/optparse # 提供了命令行参数的解析/处理
glob/fnmatch # 提供unix样式的通配符匹配的功能
gzip/zlib # 读写GNU zip(gzip)文件(压缩需要zlib模块)
shutil # 提供高级文件访问功能
c/StringIO # 对字符串对象提供类文件接口
tarfile # 读写TAR归档文件,支持压缩文件
tempfile # 创建一个临时文件
uu # uu格式的编码和解码
zipfile # 用于读取zip归档文件的工具
environ['HOME'] # 查看系统环境变量
子进程
os.fork() # 创建子进程,并复制父进程所有操作 通过判断pid = os.fork() 的pid值,分别执行父进程与子进程操作,0为子进程
os.wait() # 等待子进程结束
跨平台os模块属性
linesep # 用于在文件中分隔行的字符串
sep # 用来分隔文件路径名字的字符串
pathsep # 用于分割文件路径的字符串
curdir # 当前工作目录的字符串名称
pardir 父目录字符串名称
``
### commands
```
commands.getstatusoutput('id') # 返回元组(状态,标准输出)
commands.getoutput('id') # 只返回执行的结果, 忽略返回值
commands.getstatus('file') # 返回ls -ld file执行的结果
```
### 文件和目录管理
```
import shutil
shutil.copyfile('data.db', 'archive.db') # 拷贝文件
shutil.move('/build/executables', 'installdir') # 移动文件或目录
文件通配符
import glob
glob.glob('*.py') # 查找当前目录下py结尾的文件
随机模块
“`
import random
random.choice([‘apple’, ‘pear’, ‘banana’]) # 随机取列表一个参数
random.sample(xrange(100), 10) # 不重复抽取10个
random.random() # 随机浮点数
random.randrange(6) # 随机整数范围
## 发送邮件
发送邮件内容
!/usr/bin/python
encoding:utf8
导入 smtplib 和 MIMEText
import smtplib
from email.mime.text import MIMEText
定义发送列表
mailto_list=[“272121935@qq.com”,”272121935@163.com”]
设置服务器名称、用户名、密码以及邮件后缀
mail_host = “smtp.163.com”
mail_user = “mailuser”
mail_pass = “password”
mail_postfix=”163.com”
发送邮件函数
def send_mail(to_list, sub):
me = mail_user + “”
fp = open(‘context.txt’)
msg = MIMEText(fp.read(),_charset=”utf-8”)
fp.close()
msg[‘Subject’] = sub
msg[‘From’] = me
msg[‘To’] = “;”.join(to_list)
try:
send_smtp = smtplib.SMTP()
send_smtp.connect(mail_host)
send_smtp.login(mail_user, mail_pass)
send_smtp.sendmail(me, to_list, msg.as_string())
send_smtp.close()
return True
except Exception, e:
print str(e)
return False
if send_mail(mailto_list,”标题”):
print “测试成功”
else:
print “测试失败”
### 发送附件
!/usr/bin/python
encoding:utf8
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
def send_mail(to_list, sub, filename):
me = mail_user + “”
msg = MIMEMultipart()
msg[‘Subject’] = sub
msg[‘From’] = me
msg[‘To’] = “;”.join(to_list)
submsg = MIMEBase(‘application’, ‘x-xz’)
submsg.set_payload(open(filename,’rb’).read())
encoders.encode_base64(submsg)
submsg.add_header(‘Content-Disposition’, ‘attachment’, filename=filename)
msg.attach(submsg)
try:
send_smtp = smtplib.SMTP()
send_smtp.connect(mail_host)
send_smtp.login(mail_user, mail_pass)
send_smtp.sendmail(me, to_list, msg.as_string())
send_smtp.close()
return True
except Exception, e:
print str(e)[1]
return False
设置服务器名称、用户名、密码以及邮件后缀
mail_host = “smtp.163.com”
mail_user = “xuesong”
mail_pass = “mailpasswd”
mail_postfix = “163.com”
mailto_list = [“272121935@qq.com”,”quanzhou722@163.com”]
title = ‘check’
filename = ‘file_check.html’
if send_mail(mailto_list,title,filename):
print “发送成功”
else:
print “发送失败”
## 解压缩
### gzip压缩
import gzip
f_in = open(‘file.log’, ‘rb’)
f_out = gzip.open(‘file.log.gz’, ‘wb’)
f_out.writelines(f_in)
f_out.close()
f_in.close()
### gzip压缩1
File = ‘xuesong_18.log’
g = gzip.GzipFile(filename=”“, mode=’wb’, compresslevel=9, fileobj=open((r’%s.gz’ %File),’wb’))
g.write(open(r’%s’ %File).read())
g.close()
### gzip解压
g = gzip.GzipFile(mode=’rb’, fileobj=open((r’xuesong_18.log.gz’),’rb’))
open((r’xuesong_18.log’),’wb’).write(g.read())
压缩tar.gz
import os
import tarfile
tar = tarfile.open(“/tmp/tartest.tar.gz”,”w:gz”) # 创建压缩包名
for path,dir,files in os.walk(“/tmp/tartest”): # 递归文件目录
for file in files:
fullpath = os.path.join(path,file)
tar.add(fullpath) # 创建压缩包
tar.close()
### 解压tar.gz
import tarfile
tar = tarfile.open(“/tmp/tartest.tar.gz”)
tar.extract(“/tmp”) # 全部解压到指定路径
names = tar.getnames() # 包内文件名
for name in names:
tar.extract(name,path=”./”) # 解压指定文件
tar.close()
### zip压缩
import zipfile,os
f = zipfile.ZipFile(‘filename.zip’, ‘w’ ,zipfile.ZIP_DEFLATED) # ZIP_STORE 为默认表不压缩. ZIP_DEFLATED 表压缩
f.write(‘file1.txt’) # 将文件写入压缩包
for path,dir,files in os.walk(“tartest”): # 递归压缩目录
for file in files:
f.write(os.path.join(path,file)) # 将文件逐个写入压缩包
f.close()
### zip解压
if zipfile.is_zipfile(‘filename.zip’): # 判断一个文件是不是zip文件
f = zipfile.ZipFile(‘filename.zip’)
for file in f.namelist(): # 返回文件列表
f.extract(file, r’/tmp/’) # 解压指定文件
f.extractall() # 解压全部
f.close()
## 时间
import time
time.time() # 时间戳[浮点]
time.localtime()[1] - 1 # 上个月
int(time.time()) # 时间戳[整s]
tomorrow.strftime(‘%Y%m%d_%H%M’) # 格式化时间
time.strftime(‘%Y-%m-%d_%X’,time.localtime( time.time() ) ) # 时间戳转日期
time.mktime(time.strptime(‘2012-03-28 06:53:40’, ‘%Y-%m-%d %H:%M:%S’)) # 日期转时间戳
判断输入时间格式是否正确
encoding:utf8
import time
while 1:
atime=raw_input(‘输入格式如[14.05.13 13:00]:’)
try:
btime=time.mktime(time.strptime(‘%s:00’ %atime, ‘%y.%m.%d %H:%M:%S’))
break
except:
print ‘时间输入错误,请重新输入,格式如[14.05.13 13:00]’
上一个月最后一天
import datetime
lastMonth=datetime.date(datetime.date.today().year,datetime.date.today().month,1)-datetime.timedelta(1)
lastMonth.strftime(“%Y/%m”)
前一天
(datetime.datetime.now() + datetime.timedelta(days=-1) ).strftime(‘%Y%m%d’)
两日期相差天数
import datetime
d1 = datetime.datetime(2005, 2, 16)
d2 = datetime.datetime(2004, 12, 31)
(d1 - d2).days
向后加10个小时
import datetime
d1 = datetime.datetime.now()
d3 = d1 + datetime.timedelta(hours=10)
d3.ctime()
参数[optparse]
import os, sys
import time
import optparse
python aaa.py -t file -p /etc/opt -o aaaaa
def do_fiotest( type, path, output,):
print type, path, output,
def main():
parser = optparse.OptionParser()
parser.add_option(‘-t’, ‘–type’, dest = ‘type’, default = None, help = ‘test type[file, device]’)
parser.add_option(‘-p’, ‘–path’, dest = ‘path’, default = None, help = ‘test file path or device path’)
parser.add_option(‘-o’, ‘–output’, dest = ‘output’, default = None, help = ‘result dir path’)
(o, a) = parser.parse_args()
if None == o.type or None == o.path or None == o.output:
print “No device or file or output dir”
return -1
if ‘file’ != o.type and ‘device’ != o.type:
print “You need specify test type [‘file’ or ‘device’]”
return -1
do_fiotest(o.type, o.path, o.output)
print “Test done!”
if name == ‘main‘:
main()
hash
import md5
m = md5.new(‘123456’).hexdigest()
import hashlib
m = hashlib.md5()
m.update(“Nobody inspects”) # 使用update方法对字符串md5加密
m.digest() # 加密后二进制结果
m.hexdigest() # 加密后十进制结果
hashlib.new(“md5”, “string”).hexdigest() # 对字符串加密
hashlib.new(“md5”, open(“file”).read()).hexdigest() # 查看文件MD5值
隐藏输入密码
import getpass
passwd=getpass.getpass()
string打印a-z
import string
string.lowercase # a-z小写
string.uppercase # A-Z大小
paramiko [ssh客户端]
安装
sudo apt-get install python-setuptools
easy_install
sudo apt-get install python-all-dev
sudo apt-get install build-essential
paramiko实例(账号密码登录执行命令)
!/usr/bin/python
ssh
import paramiko
import sys,os
host = ‘10.152.15.200’
user = ‘peterli’
password = ‘123456’
s = paramiko.SSHClient() # 绑定实例
s.load_system_host_keys() # 加载本地HOST主机文件
s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 允许连接不在know_hosts文件中的主机
s.connect(host,22,user,password,timeout=5) # 连接远程主机
while True:
cmd=raw_input(‘cmd:’)
stdin,stdout,stderr = s.exec_command(cmd) # 执行命令
cmd_result = stdout.read(),stderr.read() # 读取命令结果
for line in cmd_result:
print line,
s.close()
paramiko实例(传送文件)
!/usr/bin/evn python
import os
import paramiko
host=’127.0.0.1’
port=22
username = ‘peterli’
password = ‘123456’
ssh=paramiko.Transport((host,port))
privatekeyfile = os.path.expanduser(‘~/.ssh/id_rsa’)
mykey = paramiko.RSAKey.from_private_key_file( os.path.expanduser(‘~/.ssh/id_rsa’)) # 加载key 不使用key可不加
ssh.connect(username=username,password=password) # 连接远程主机
使用key把 password=password 换成 pkey=mykey
sftp=paramiko.SFTPClient.from_transport(ssh) # SFTP使用Transport通道
sftp.get(‘/etc/passwd’,’pwd1’) # 下载 两端都要指定文件名
sftp.put(‘pwd’,’/tmp/pwd’) # 上传
sftp.close()
ssh.close()
paramiko实例(密钥执行命令)
!/usr/bin/python
ssh
import paramiko
import sys,os
host = ‘10.152.15.123’
user = ‘peterli’
s = paramiko.SSHClient()
s.load_system_host_keys()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
privatekeyfile = os.path.expanduser(‘~/.ssh/id_rsa’) # 定义key路径
mykey = paramiko.RSAKey.from_private_key_file(privatekeyfile)
mykey=paramiko.DSSKey.from_private_key_file(privatekeyfile,password=’061128’) # DSSKey方式 password是key的密码
s.connect(host,22,user,pkey=mykey,timeout=5)
cmd=raw_input(‘cmd:’)
stdin,stdout,stderr = s.exec_command(cmd)
cmd_result = stdout.read(),stderr.read()
for line in cmd_result:
print line,
s.close()
ssh并发(Pool控制最大并发)
!/usr/bin/env python
encoding:utf8
ssh_concurrent.py
import multiprocessing
import sys,os,time
import paramiko
def ssh_cmd(host,port,user,passwd,cmd):
msg = “———–Result:%s———-” % host
s = paramiko.SSHClient()
s.load_system_host_keys()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
s.connect(host,22,user,passwd,timeout=5)
stdin,stdout,stderr = s.exec_command(cmd)
cmd_result = stdout.read(),stderr.read()
print msg
for line in cmd_result:
print line,
s.close()
except paramiko.AuthenticationException:
print msg
print ‘AuthenticationException Failed’
except paramiko.BadHostKeyException:
print msg
print “Bad host key”
result = []
p = multiprocessing.Pool(processes=20)
cmd=raw_input(‘CMD:’)
f=open(‘serverlist.conf’)
list = f.readlines()
f.close()
for IP in list:
print IP
host=IP.split()[0]
port=int(IP.split()[1])
user=IP.split()[2]
passwd=IP.split()[3]
result.append(p.apply_async(ssh_cmd,(host,port,user,passwd,cmd)))
p.close()
for res in result:
res.get(timeout=35)
ssh并发(取文件状态并发送邮件)
!/usr/bin/python
encoding:utf8
config file: ip.list
import paramiko
import multiprocessing
import smtplib
import sys,os,time,datetime,socket,re
from email.mime.text import MIMEText
配置文件(IP列表)
Conf = ‘ip.list’
user_name = ‘peterli’
user_pwd = ‘passwd’
port = 22
PATH = ‘/home/peterli/’
设置服务器名称、用户名、密码以及邮件后缀
mail_host = “smtp.163.com”
mail_user = “xuesong”
mail_pass = “mailpasswd”
mail_postfix = “163.com”
mailto_list = [“272121935@qq.com”,”quanzhou722@163.com”]
title = ‘file check’
DATE1=(datetime.datetime.now() + datetime.timedelta(days=-1) ).strftime(‘%Y%m%d’)
file_path = ‘%s%s’ %(PATH,DATE1)
def Ssh_Cmd(file_path,host_ip,user_name,user_pwd,port=22):
s = paramiko.SSHClient()
s.load_system_host_keys()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd)
stdin,stdout,stderr = s.exec_command(‘stat %s’ %file_path)
stat_result = ‘%s%s’ %(stdout.read(),stderr.read())
if stat_result.find(‘No such file or directory’) == -1:
file_status = ‘OK\t’
stdin,stdout,stderr = s.exec_command(‘du -sh %s’ %file_path)
cmd1_result = ‘%s_%s’ %(stat_result.split()[32],stat_result.split()[33].split(‘.’)[0])
cmd2_result = (‘%s%s’ %(stdout.read(),stderr.read())).split()[0]
else:
file_status = ‘未生成\t’
cmd1_result = ‘null’
cmd2_result = ‘null’
q.put([‘Login successful’])
s.close()
except socket.error:
file_status = ‘主机或端口错误’
cmd1_result = ‘-’
cmd2_result = ‘-’
except paramiko.AuthenticationException:
file_status = ‘用户或密码错误’
cmd1_result = ‘-’
cmd2_result = ‘-’
except paramiko.BadHostKeyException:
file_status = ‘Bad host key’
cmd1_result = ‘-’
cmd2_result = ‘-’
except:
file_status = ‘ssh异常’
cmd1_result = ‘-’
cmd2_result = ‘-’
r.put(‘%s\t-\t%s\t%s\t%s\t%s\n’ %(time.strftime(‘%Y-%m-%d_%H:%M’),host_ip,file_status,cmd2_result,cmd1_result))
def Concurrent(Conf,file_path,user_name,user_pwd,port):
执行总计
total = 0
读取配置文件
f=open(Conf)
list = f.readlines()
f.close()
并发执行
process_list = []
log_file = file(‘file_check.log’, ‘w’)
log_file.write(‘检查时间\t\t业务\tIP\t\t文件状态\t大小\t生成时间\n’)
for host_info in list:
判断配置文件中注释行跳过
if host_info.startswith(‘#’):
continue
取变量,其中任意变量未取到就跳过执行
try:
host_ip=host_info.split()[0].strip()
user_name=host_info.split()[1]
user_pwd=host_info.split()[2]
except:
log_file.write(‘Profile error: %s\n’ %(host_info))
continue
try:
port=int(host_info.split()[3])
except:
port=22
total +=1
p = multiprocessing.Process(target=Ssh_Cmd,args=(file_path,host_ip,user_name,user_pwd,port))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for j in process_list:
log_file.write(r.get())
successful = q.qsize()
log_file.write(‘执行完毕。 总执行:%s 登录成功:%s 登录失败:%s\n’ %(total,successful,total - successful))
log_file.flush()
log_file.close()
def send_mail(to_list, sub):
me = mail_user + “”
fp = open(‘file_check.log’)
msg = MIMEText(fp.read(),_charset=”utf-8”)
fp.close()
msg[‘Subject’] = sub
msg[‘From’] = me
msg[‘To’] = “;”.join(to_list)
try:
send_smtp = smtplib.SMTP()
send_smtp.connect(mail_host)
send_smtp.login(mail_user, mail_pass)
send_smtp.sendmail(me, to_list, msg.as_string())
send_smtp.close()
return True
except Exception, e:
print str(e)[1]
return False
if name == ‘main‘:
q = multiprocessing.Queue()
r = multiprocessing.Queue()
Concurrent(Conf,file_path,user_name,user_pwd,port)
if send_mail(mailto_list,title):
print “发送成功”
else:
print “发送失败”
LazyManage并发批量操作(判断非root交互到root操作)
!/usr/bin/python
encoding:utf8
LzayManage.py
config file: serverlist.conf
import paramiko
import multiprocessing
import sys,os,time,socket,re
def Ssh_Cmd(host_ip,Cmd,user_name,user_pwd,port=22):
s = paramiko.SSHClient()
s.load_system_host_keys()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd)
stdin,stdout,stderr = s.exec_command(Cmd)
Result = ‘%s%s’ %(stdout.read(),stderr.read())
q.put(‘successful’)
s.close()
return Result.strip()
def Ssh_Su_Cmd(host_ip,Cmd,user_name,user_pwd,root_name,root_pwd,port=22):
s = paramiko.SSHClient()
s.load_system_host_keys()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
s.connect(hostname=host_ip,port=port,username=user_name,password=user_pwd)
ssh = s.invoke_shell()
time.sleep(0.1)
ssh.send(‘su - %s\n’ %(root_name))
buff = ”
while not buff.endswith(‘Password: ‘):
resp = ssh.recv(9999)
buff +=resp
ssh.send(‘%s\n’ %(root_pwd))
buff = ”
while True:
resp = ssh.recv(9999)
buff +=resp
if ‘: incorrect password’ in buff:
su_correct=’passwd_error’
break
elif buff.endswith(‘# ‘):
su_correct=’passwd_correct’
break
if su_correct == ‘passwd_correct’:
ssh.send(‘%s\n’ %(Cmd))
buff = ”
while True:
resp = ssh.recv(9999)
if resp.endswith(‘# ‘):
buff +=re.sub(‘[.@.]# $’,”,resp)
break
buff +=resp
Result = buff.lstrip(‘%s’ %(Cmd))
q.put(‘successful’)
elif su_correct == ‘passwd_error’:
Result = “\033[31mroot密码错误\033[m”
s.close()
return Result.strip()
def Send_File(host_ip,PathList,user_name,user_pwd,Remote=’/tmp’,port=22):
s=paramiko.Transport((host_ip,port))
s.connect(username=user_name,password=user_pwd)
sftp=paramiko.SFTPClient.from_transport(s)
for InputPath in PathList:
LocalPath = re.sub(‘^./’,”,InputPath.rstrip(‘/’))
RemotePath = ‘%s/%s’ %( Remote , os.path.basename( LocalPath ))
try:
sftp.rmdir(RemotePath)
except:
pass
try:
sftp.remove(RemotePath)
except:
pass
if os.path.isdir(LocalPath):
sftp.mkdir(RemotePath)
for path,dirs,files in os.walk(LocalPath):
for dir in dirs:
dir_path = os.path.join(path,dir)
sftp.mkdir(‘%s/%s’ %(RemotePath,re.sub(‘^%s/’ %LocalPath,”,dir_path)))
for file in files:
file_path = os.path.join(path,file)
sftp.put( file_path,’%s/%s’ %(RemotePath,re.sub(‘^%s/’ %LocalPath,”,file_path)))
else:
sftp.put(LocalPath,RemotePath)
q.put(‘successful’)
sftp.close()
s.close()
Result = ‘%s \033[32m传送完成\033[m’ % PathList
return Result
def Ssh(host_ip,Operation,user_name,user_pwd,root_name,root_pwd,Cmd=None,PathList=None,port=22):
msg = “\033[32m———–Result:%s———-\033[m” % host_ip
try:
if Operation == ‘Ssh_Cmd’:
Result = Ssh_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,port=port)
elif Operation == ‘Ssh_Su_Cmd’:
Result = Ssh_Su_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,port=port)
elif Operation == ‘Ssh_Script’:
Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port)
Script_Head = open(PathList[0]).readline().strip()
LocalPath = re.sub(‘^./’,”,PathList[0].rstrip(‘/’))
Cmd = ‘%s /tmp/%s’ %( re.sub(‘^#!’,”,Script_Head), os.path.basename( LocalPath ))
Result = Ssh_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,port=port)
elif Operation == ‘Ssh_Su_Script’:
Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port)
Script_Head = open(PathList[0]).readline().strip()
LocalPath = re.sub(‘^./’,”,PathList[0].rstrip(‘/’))
Cmd = ‘%s /tmp/%s’ %( re.sub(‘^#!’,”,Script_Head), os.path.basename( LocalPath ))
Result = Ssh_Su_Cmd(host_ip=host_ip,Cmd=Cmd,user_name=user_name,user_pwd=user_pwd,root_name=root_name,root_pwd=root_pwd,port=port)
elif Operation == ‘Send_File’:
Result = Send_File(host_ip=host_ip,PathList=PathList,user_name=user_name,user_pwd=user_pwd,port=port)
else:
Result = ‘操作不存在’
except socket.error:
Result = ‘\033[31m主机或端口错误\033[m’
except paramiko.AuthenticationException:
Result = ‘\033[31m用户名或密码错误\033[m’
except paramiko.BadHostKeyException:
Result = ‘\033[31mBad host key\033[m[’
except IOError:
Result = ‘\033[31m远程主机已存在非空目录或没有写权限\033[m’
except:
Result = ‘\033[31m未知错误\033[m’
r.put(‘%s\n%s\n’ %(msg,Result))
def Concurrent(Conf,Operation,user_name,user_pwd,root_name,root_pwd,Cmd=None,PathList=None,port=22):
读取配置文件
f=open(Conf)
list = f.readlines()
f.close()
执行总计
total = 0
并发执行
for host_info in list:
判断配置文件中注释行跳过
if host_info.startswith(‘#’):
continue
取变量,其中任意变量未取到就跳过执行
try:
host_ip=host_info.split()[0]
user_name=host_info.split()[1]
user_pwd=host_info.split()[2]
except:
print(‘Profile error: %s’ %(host_info) )
continue
try:
port=int(host_info.split()[3])
except:
port=22
total +=1
p = multiprocessing.Process(target=Ssh,args=(host_ip,Operation,user_name,user_pwd,root_name,root_pwd,Cmd,PathList,port))
p.start()
打印执行结果
for j in range(total):
print(r.get() )
if Operation == ‘Ssh_Script’ or Operation == ‘Ssh_Su_Script’:
successful = q.qsize() / 2
else:
successful = q.qsize()
print(‘\033[32m执行完毕[总执行:%s 成功:%s 失败:%s]\033[m’ %(total,successful,total - successful) )
q.close()
r.close()
def Help():
print(”’ 1.执行命令
2.执行脚本 \033[32m[位置1脚本(必须带脚本头),后可带执行脚本所需要的包\文件\文件夹路径,空格分隔]\033[m
3.发送文件 \033[32m[传送的包\文件\文件夹路径,空格分隔]\033[m
退出: 0\exit\quit
帮助: help\h\?
注意: 发送文件默认为/tmp下,如已存在同名文件会被强制覆盖,非空目录则中断操作.执行脚本先将本地脚本及包发送远程主机上,发送规则同发送文件
”’)
if name==’main‘:
定义root账号信息
root_name = ‘root’
root_pwd = ‘peterli’
user_name=’peterli’
user_pwd=’’)
youdao(word)
if name == ‘main‘:
query()
python启动http服务提供访问或下载
python -m SimpleHTTPServer 9900
8 并发
线程安全/竞争条件,锁/死锁检测,同步/异步,阻塞/非阻塞,epoll非阻塞IO,信号量/事件,线程池,生产消费模型,伪并发,微线程,协程
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具
threading多线程
thread
start_new_thread(function,args kwargs=None) # 产生一个新的线程
allocate_lock() # 分配一个LockType类型的锁对象
exit() # 让线程退出
acquire(wait=None) # 尝试获取锁对象
locked() # 如果获取了锁对象返回True
release() # 释放锁
thread例子
!/usr/bin/env python
thread_test.py
不支持守护进程
import thread
from time import sleep,ctime
loops = [4,2]
def loop(nloop,nsec,lock):
print ‘start loop %s at:%s’ % (nloop,ctime())
sleep(nsec)
print ‘loop %s done at: %s’ % (nloop, ctime())
lock.release() # 分配已获得的锁,操作结束后释放相应的锁通知主线程
def main():
print ‘starting at:’,ctime()
locks = []
nloops = range(len(loops))
for i in nloops:
lock = thread.allocate_lock() # 创建一个锁
lock.acquire() # 调用各个锁的acquire()函数获得锁
locks.append(lock) # 把锁放到锁列表locks中
for i in nloops:
thread.start_new_thread(loop,(i,loops[i],locks[i])) # 创建线程
for i in nloops:
while locks[i].locked():pass # 等待全部解锁才继续运行
print ‘all DONE at:’,ctime()
if name == ‘main‘:
main()
thread例子1
coding=utf-8
import thread,time,os
def f(name):
i =3
while i:
time.sleep(1)
print name
i -= 1
os._exit() 会把整个进程关闭
os._exit(22)
if name == ‘main‘:
thread.start_new_thread(f,(“th1”,))
while 1:
pass
os._exit(0)
threading
Thread # 表示一个线程的执行的对象
start() # 开始线程的执行
run() # 定义线程的功能的函数(一般会被子类重写)
join(timeout=None) # 允许主线程等待线程结束,程序挂起,直到线程结束;如果给了timeout,则最多等待timeout秒.
getName() # 返回线程的名字
setName(name) # 设置线程的名字
isAlive() # 布尔标志,表示这个线程是否还在运行中
isDaemon() # 返回线程的daemon标志
setDaemon(daemonic) # 后台线程,把线程的daemon标志设置为daemonic(一定要在调用start()函数前调用)
默认主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)
Lock # 锁原语对象
Rlock # 可重入锁对象.使单线程可以在此获得已获得了的锁(递归锁定)
Condition # 条件变量对象能让一个线程停下来,等待其他线程满足了某个条件.如状态改变或值的改变
Event # 通用的条件变量.多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活
Semaphore # 为等待锁的线程提供一个类似等候室的结构
BoundedSemaphore # 与Semaphore类似,只是不允许超过初始值
Time # 与Thread相似,只是他要等待一段时间后才开始运行
activeCount() # 当前活动的线程对象的数量
currentThread() # 返回当前线程对象
enumerate() # 返回当前活动线程的列表
settrace(func) # 为所有线程设置一个跟踪函数
setprofile(func) # 为所有线程设置一个profile函数
threading例子1
!/usr/bin/env python
encoding:utf8
import threading
from Queue import Queue
from time import sleep,ctime
class ThreadFunc(object):
def init(self,func,args,name=”):
self.name=name
self.func=func # loop
self.args=args # (i,iplist[i],queue)
def call(self):
apply(self.func,self.args) # 函数apply() 执行loop函数并传递元组参数
def loop(nloop,ip,queue):
print ‘start’,nloop,’at:’,ctime()
queue.put(ip)
sleep(2)
print ‘loop’,nloop,’done at:’,ctime()
if name == ‘main‘:
threads = []
queue = Queue()
iplist = [‘192.168.1.2’,’192.168.1.3’,’192.168.1.4’,’192.168.1.5’,’192.168.1.6’,’192.168.1.7’,’192.168.1.8’]
nloops = range(len(iplist))
for i in nloops:
t = threading.Thread(target=ThreadFunc(loop,(i,iplist[i],queue),loop.name))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
for i in nloops:
print queue.get()
threading例子2
!/usr/bin/env python
encoding:utf8
from Queue import Queue
import random,time,threading
class Producer(threading.Thread):
def init(self, t_name, queue):
threading.Thread.init(self, name=t_name)
self.data=queue
def run(self):
for i in range(5):
print “%s: %s is producing %d to the queue!\n” %(time.ctime(), self.getName(), i)
self.data.put(i)
self.data.put(i*i)
time.sleep(2)
print “%s: %s finished!” %(time.ctime(), self.getName())
class Consumer(threading.Thread):
def init(self, t_name, queue):
threading.Thread.init(self, name=t_name)
self.data=queue
def run(self):
for i in range(10):
val = self.data.get()
print “%s: %s is consuming. %d in the queue is consumed!\n” %(time.ctime(), self.getName(), val)
print “%s: %s finished!” %(time.ctime(), self.getName())
if name == ‘main‘:
queue = Queue()
producer = Producer(‘Pro.’, queue)
consumer = Consumer(‘Con.’, queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
threading例子3
启动线程后自动执行 run函数其他不可以
import threading
import time
class Th(threading.Thread):
def init(self,name):
threading.Thread.init(self)
self.t_name=name
self.daemon = True # 默认为false,让主线程等待处理完成
def run(self):
time.sleep(1)
print “this is ” + self.t_name
if name == ‘main‘:
thread1 = Th(“Th_1”)
thread1.start()
threading例子4
import threading
import time
class Th(threading.Thread):
def init(self,thread_name):
threading.Thread.init(self)
self.setName(thread_name)
def run(self):
threadLock.acquire()
print self.getName()
for i in range(3):
time.sleep(1)
print str(i)
print self.getName() + ” is over”
threadLock.release()
if name == ‘main‘:
threadLock = threading.Lock()
thread1 = Th(“Th_1”)
thread2 = Th(“Th_2”)
thread1.start()
thread2.start()
后台线程
import threading
import time,random
class MyThread(threading.Thread):
def run(self):
wait_time=random.randrange(1,10)
print “%s will wait %d seconds” % (self.name, wait_time)
time.sleep(wait_time)
print “%s finished!” % self.name
if name==”main“:
for i in range(5):
t = MyThread()
t.setDaemon(True) # 设置为后台线程,主线程完成时不等待子线程完成就结束
t.start()
threading控制最大并发_查询日志中IP信息
!/usr/bin/env python
coding:utf-8
import urllib2
import json
import threading
import time
”’
by:某大牛
QQ:185635687
这个是多线程并发控制. 如果要改成多进程,只需把threading 换成 mulitprocessing.Process , 对, 就是换个名字而已.
”’
获取ip 及其出现次数
def ip_dic(file_obj, dic):
for i in file_obj:
if i:
ip=i.split(‘-‘)[0].strip()
if ip in dic.keys():
dic[ip]=dic[ip] + 1
else:
dic[ip]=1
return dic.iteritems()
目标函数
def get_data(url, ipcounts):
data=urllib2.urlopen(url).read()
datadict=json.loads(data)
fdata = u”ip:%s—%s,%s,%s,%s,%s” %(datadict[“data”][“ip”],ipcounts,datadict[“data”][“country”],datadict[“data”][“region”],datadict[“data”][“city”],datadict[“data”][“isp”])
print fdata
多线程
def threads(iters):
thread_pool = []
for k in iters:
url = “http://ip.taobao.com/service/getIpInfo.php?ip=”
ipcounts = k[1]
url = (url + k[0]).strip()
t = threading.Thread(target=get_data, args=(url, ipcounts))
thread_pool.append(t)
return thread_pool
控制多线程
def startt(t_list, max,second):
l = len(t_list)
n = max
while l > 0:
if l > max:
nl = t_list[:max]
t_list = t_list[max:]
for t in nl:
t.start()
time.sleep(second)
for t in nl:
t.join()
print ‘‘*15, str(n)+ ’ ip has been queried’+’‘*15
n += max
l = len(t_list)
continue
elif l >> Totally ’ + str(n+l ) + ’ ip has been queried’
l = 0
if name ==”main“:
dic={}
with open(‘access.log’) as file_obj:
it = ip_dic(file_obj, dic)
t_list= threads(it)
startt(t_list, 15, 1)
多线程取队列
!/usr/bin/python
import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def init(self, threadID, name, q):
threading.Thread.init(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print “Starting ” + self.name
process_data(self.name, self.q)
print “Exiting ” + self.name
def process_data(threadName, q):
while not exitFlag: # 死循环等待
queueLock.acquire()
if not q.empty(): # 判断队列是否为空
data = q.get()
print “%s processing %s” % (threadName, data)
queueLock.release()
time.sleep(1)
threadList = [“Thread-1”, “Thread-2”, “Thread-3”]
nameList = [“One”, “Two”, “Three”, “Four”, “Five”]
queueLock = threading.Lock() # 锁与队列并无任何关联,其他线程也进行取锁操作的时候就会检查是否有被占用,有就阻塞等待解锁为止
workQueue = Queue.Queue(10)
threads = []
threadID = 1
Create new threads
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
Wait for queue to empty
while not workQueue.empty(): # 死循环判断队列被处理完毕
pass
Notify threads it’s time to exit
exitFlag = 1
Wait for all threads to complete
for t in threads:
t.join()
print “Exiting Main Thread”
Queue通用队列
q=Queue(size) # 创建大小size的Queue对象
qsize() # 返回队列的大小(返回时候,可能被其他进程修改,近似值)
empty() # 如果队列为空返回True,否则Fales
full() # 如果队列已满返回True,否则Fales
put(item,block0) # 把item放到队列中,如果给了block(不为0),函数会一直阻塞到队列中有空间为止
get(block=0) # 从队列中取一个对象,如果给了block(不为0),函数会一直阻塞到队列中有对象为止
get_nowait # 默认get阻塞,这个不阻塞
multiprocessing [多进程并发]
多线程
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls=[‘http://www.baidu.com‘,’http://www.sohu.com‘]
pool=ThreadPool(4) # 线程池
results=pool.map(urllib2.urlopen,urls)
pool.close()
pool.join()
多进程并发
!/usr/bin/env python
encoding:utf8
from multiprocessing import Process
import time,os
def f(name):
time.sleep(1)
print ‘hello ‘,name
print os.getppid() # 取得父进程ID
print os.getpid() # 取得进程ID
process_list = []
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
Queue进程间通信
from multiprocessing import Process,Queue
import time
def f(name):
time.sleep(1)
q.put([‘hello’+str(name)])
process_list = []
q = Queue()
if name == ‘main‘:
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for i in range(10):
print q.get()
Pipe管道
from multiprocessing import Process,Pipe
import time
import os
def f(conn,name):
time.sleep(1)
conn.send([‘hello’+str(name)])
print os.getppid(),’———–’,os.getpid()
process_list = []
parent_conn,child_conn = Pipe()
if name == ‘main‘:
for i in range(10):
p = Process(target=f,args=(child_conn,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
for p in range(10):
print parent_conn.recv()
进程间同步
加锁,使某一时刻只有一个进程 print
from multiprocessing import Process,Lock
import time
import os
def f(name):
lock.acquire()
time.sleep(1)
print ‘hello–’+str(name)
print os.getppid(),’———–’,os.getpid()
lock.release()
process_list = []
lock = Lock()
if name == ‘main‘:
for i in range(10):
p = Process(target=f,args=(i,))
p.start()
process_list.append(p)
for j in process_list:
j.join()
共享内存
通过使用Value或者Array把数据存储在一个共享的内存表中
‘d’和’i’参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。
from multiprocessing import Process,Value,Array
import time
import os
def f(n,a,name):
time.sleep(1)
n.value = name * name
for i in range(len(a)):
a[i] = -i
process_list = []
if name == ‘main‘:
num = Value(‘d’,0.0)
arr = Array(‘i’,range(10))
for i in range(10):
p = Process(target=f,args=(num,arr,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
print num.value
print arr[:]
manager
比共享内存灵活,但缓慢
支持list,dict,Namespace,Lock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value,Array
from multiprocessing import Process,Manager
import time
import os
def f(d,name):
time.sleep(1)
d[name] = name * name
print d
process_list = []
if name == ‘main‘:
manager = Manager()
d = manager.dict()
for i in range(10):
p = Process(target=f,args=(d,i))
p.start()
process_list.append(p)
for j in process_list:
j.join()
print d
最大并发数
import multiprocessing
import time,os
result = []
def run(h):
print ‘threading:’ ,h,os.getpid()
p = multiprocessing.Pool(processes=20)
for i in range(100):
result.append(p.apply_async(run,(i,)))
p.close()
for res in result:
res.get(timeout=5)
9 框架
flask [微型网络开发框架]
http://dormousehole.readthedocs.org/en/latest/
html放在 ./templates/ js放在 ./static/
request.args.get(‘page’, 1) # 获取参数 ?page=1
request.json # 获取传递的整个json数据
request.form.get(“host”,’127’) # 获取表单值
简单实例 # 接收数据和展示
import MySQLdb as mysql
from flask import Flask, request
app = Flask(name)
db.autocommit(True)
c = db.cursor()
“””
CREATE TABLE statusinfo (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
hostname varchar(32) NOT NULL,
load float(10) NOT NULL DEFAULT 0.00,
time int(15) NOT NULL,
memtotal int(15) NOT NULL,
memusage int(15) NOT NULL,
memfree int(15) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=161 DEFAULT CHARSET=utf8;
“””
@app.route(“/collect”, methods=[“GET”, “POST”])
def collect():
sql = “”
if request.method == “POST”:
data = request.json # 获取传递的json
hostname = data[“Host”]
load = data[“LoadAvg”]
time = data[“Time”]
memtotal = data[“MemTotal”]
memusage = data[“MemUsage”]
memfree = data[“MemFree”]
try:
sql = “INSERT INTO statusinfo (hostname,load,time,memtotal,memusage,memfree) VALUES(‘%s’, %s, %s, %s, %s, %s);” % (hostname, load,time,memtotal,memusage,memfree)
ret = c.execute(sql)
return ‘ok’
except mysql.IntegrityError:
return ‘errer’
@app.route(“/show”, methods=[“GET”, “POST”])
def show():
try:
hostname = request.form.get(“hostname”) # 获取表单方式的变量值
sql = “SELECT load FROM statusinfo WHERE hostname = ‘%s’;” % (hostname)
c.execute(sql)
ones = c.fetchall()
return render_template(“sysstatus.html”, data=ones, sql = sql)
except:
print ‘hostname null’
from flask import render_template
@app.route(“/xxx/”)
def hello_xx(name):
return render_template(“sysstatus.html”, name=’teach’)
if name == “main“:
app.run(host=”0.0.0.0”, port=50000, debug=True)
twisted [非阻塞异步服务器框架]
用来进行网络服务和应用程序的编程。虽然 Twisted Matrix 中有大量松散耦合的模块化组件,但该框架的中心概念还是非阻塞异步服务器这一思想。对于习惯于线程技术或分叉服务器的开发人员来说,这是一种新颖的编程风格,但它却能在繁重负载的情况下带来极高的效率。
pip install twisted
from twisted.internet import protocol, reactor, endpoints
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
endpoints.serverFromString(reactor, “tcp:1234”).listen(EchoFactory())
reactor.run()
greenlet [微线程/协程框架]
更加原始的微线程的概念,没有调度,或者叫做协程。这在你需要控制你的代码时很有用。你可以自己构造微线程的 调度器;也可以使用”greenlet”实现高级的控制流。例如可以重新创建构造器;不同于Python的构造器,我们的构造器可以嵌套的调用函数,而被嵌套的函数也可以 yield 一个值。
pip install greenlet
tornado [极轻量级Web服务器框架]
高可伸缩性和epoll非阻塞IO,响应快速,可处理数千并发连接,特别适用用于实时的Web服务
http://www.tornadoweb.cn/documentation
pip install tornado
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write(“Hello, world”)
application = tornado.web.Application([
(r”/”, MainHandler),
])
if name == “main“:
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Scrapy [web抓取框架]
Python开发的一个快速,高层次的屏幕抓取和web抓取框架,用于抓取web站点并从页面中提取结构化的数据。Scrapy用途广泛,可以用于数据挖掘、监测和自动化测试。
pip install scrapy
from scrapy import Spider, Item, Field
class Post(Item):
title = Field()
class BlogSpider(Spider):
name, start_urls = ‘blogspider’, [‘http://blog.scrapinghub.com‘]
def parse(self, response):
return [Post(title=e.extract()) for e in response.css(“h2 a::text”)]
scrapy runspider myspider.py
django [重量级web框架]
bottle [轻量级的Web框架]
10 例子
小算法
斐波那契
将函数结果作为列表可用于循环
def fab(max):
n, a, b = 0, 0, 1
while n < max:
yield b
a, b = b, a + b
n = n + 1
for n in fab(5):
print n
乘法口诀
!/usr/bin/python
for i in range(1,10):
for j in range(1,i+1):
print j,’*’,i,’=’,j*i,
else:
print ”
最小公倍数
1-70的最小公倍数
def c(m,n):
a1=m
b1=n
r=n%m
while r!=0:
n=m
m=r
r=n%m
return (a1*b1)/m
d=1
for i in range(3,71,2):
d = c(d,i)
print d
排序算法
插入排序
def insertion_sort(sort_list):
iter_len = len(sort_list)
if iter_len < 2:
return sort_list
for i in range(1, iter_len):
key = sort_list[i]
j = i - 1
while j>=0 and sort_list[j]>key:
sort_list[j+1] = sort_list[j]
j -= 1
sort_list[j+1] = key
return sort_list
选择排序
def selection_sort(sort_list):
iter_len = len(sort_list)
if iter_len < 2:
return sort_list
for i in range(iter_len-1):
smallest = sort_list[i]
location = i
for j in range(i, iter_len):
if sort_list[j] < smallest:
smallest = sort_list[j]
location = j
if i != location:
sort_list[i], sort_list[location] = sort_list[location], sort_list[i]
return sort_list
冒泡排序算法
def bubblesort(numbers):
for j in range(len(numbers)-1,-1,-1):
for i in range(j):
if numbers[i]>numbers[i+1]:
numbers[i],numbers[i+1] = numbers[i+1],numbers[i]
print(i,j)
print(numbers)
二分算法
python 2f.py 123456789 4
list(‘123456789’) = [‘1’, ‘2’, ‘3’, ‘4’, ‘5’, ‘6’, ‘7’, ‘8’, ‘9’]
!/usr/bin/env python
import sys
def search2(a,m):
low = 0
high = len(a) - 1
while(low m:
high = mid - 1
else:
print mid
return mid
print -1
return -1
if name == “main“:
a = [int(i) for i in list(sys.argv[1])]
m = int(sys.argv[2])
search2(a,m)
将字典中所有time去掉
a={‘version01’: {‘nba’: {‘timenba’: ‘valuesasdfasdf’, ‘nbanbac’: ‘vtimefasdf’, ‘userasdf’: ‘vtimasdf’}}}
eval(str(a).replace(“time”,”“))
PIL图像处理
import Image
im = Image.open(“j.jpg”) # 打开图片
print im.format, im.size, im.mode # 打印图像格式、像素宽和高、模式
JPEG (440, 330) RGB
im.show() # 显示最新加载图像
box = (100, 100, 200, 200)
region = im.crop(box) # 从图像中提取出某个矩形大小的图像
图片等比缩小
-- coding: cp936 --
import Image
import glob, os
图片批处理
def timage():
for files in glob.glob(‘D:\1\*.JPG’):
filepath,filename = os.path.split(files)
filterame,exts = os.path.splitext(filename)
输出路径
opfile = r’D:\22\’
判断opfile是否存在,不存在则创建
if (os.path.isdir(opfile)==False):
os.mkdir(opfile)
im = Image.open(files)
w,h = im.size
im_ss = im.resize((400,400))
im_ss = im.convert(‘P’)
im_ss = im.resize((int(w*0.12), int(h*0.12)))
im_ss.save(opfile+filterame+’.jpg’)
if name==’main‘:
timage()
取系统返回值赋给序列
cmd = os.popen(“df -Ph|awk ‘NR!=1{print $5}’”).readlines();
cmd = os.popen(‘df -h’).read().split(‘\n’)
cmd = os.popen(‘lo 2>&1’).read()
取磁盘使用空间
import commands
df = commands.getoutput(“df -hP”)
[ x.split()[4] for x in df.split(“\n”) ]
[ (x.split()[0],x.split()[4]) for x in df.split(“\n”) if x.split()[4].endswith(“%”) ]
打印表格
map = [[“a”,”b”,”c”],
[“d”,”e”,”f”],
[“g”,”h”,”i”]]
def print_board():
for i in range(0,3):
for j in range(0,3):
print “|”,map[i][j],
if j != 2:
print ‘|’
生成html文件表格
log_file = file(‘check.html’, ‘w’)
log_file.write(“””
| 状态统计 |
| IP | 状态 |
|---|---|
| %s | %s |
“”“)
log_file.flush()
log_file.close()
井字游戏
!/usr/bin/python
http://www.admin10000.com/document/2506.html
def print_board():
for i in range(0,3):
for j in range(0,3):
print map[2-i][j],
if j != 2:
print “|”,
print “”
def check_done():
for i in range(0,3):
if map[i][0] == map[i][1] == map[i][2] != ” ” \
or map[0][i] == map[1][i] == map[2][i] != ” “:
print turn, “won!!!”
return True
if map[0][0] == map[1][1] == map[2][2] != ” ” \
or map[0][2] == map[1][1] == map[2][0] != ” “:
print turn, “won!!!”
return True
if ” ” not in map[0] and ” ” not in map[1] and ” ” not in map[2]:
print “Draw”
return True
return False
turn = “X”
map = [[” “,” “,” “],
[” “,” “,” “],
[” “,” “,” “]]
done = False
while done != True:
print_board()
print turn, “‘s turn”
print
moved = False
while moved != True:
print “Please select position by typing in a number between 1 and 9, see below for which number that is which position…”
print “7|8|9”
print “4|5|6”
print “1|2|3”
print
try:
pos = input(“Select: “)
if pos =1:
Y = pos/3
X = pos%3
if X != 0:
X -=1
else:
X = 2
Y -=1
if map[Y][X] == ” “:
map[Y][X] = turn
moved = True
done = check_done()
if done == False:
if turn == “X”:
turn = “O”
else:
turn = “X”
except:
print “You need to add a numeric value”
网段划分
题目
192.168.1
192.168.3
192.168.2
172.16.3
192.16.1
192.16.2
192.16.3
10.0.4
输出结果:
192.16.1-192.16.3
192.168.1-192.168.3
172.16.3
10.0.4
答案
!/usr/bin/python
f = file(‘a.txt’)
c = f.readlines()
dic={}
for i in c:
a=i.strip().split(‘.’)
if a[0]+’.’+a[1] in dic.keys():
key=dic[“%s.%s” %(a[0],a[1])]
else:
key=[]
key.append(a[2])
dic[a[0]+’.’+a[1]]=sorted(key)
for x,y in dic.items():
if y[0] == y[-1]:
print ‘%s.%s’ %(x,y[0])
else:
print ‘%s.%s-%s.%s’ %(x,y[0],x,y[-1])
统计日志IP
打印出独立IP,并统计独立IP数
219.140.190.130 - - [23/May/2006:08:57:59 +0800] “GET /fg172.exe HTTP/1.1” 200 2350253
221.228.143.52 - - [23/May/2006:08:58:08 +0800] “GET /fg172.exe HTTP/1.1” 206 719996
221.228.143.52 - - [23/May/2006:08:58:08 +0800] “GET /fg172.exe HTTP/1.1” 206 713242
!/usr/bin/python
dic={}
a=open(“a”).readlines()
for i in a:
ip=i.strip().split()[0]
if ip in dic.keys():
dic[ip] = dic[ip] + 1
else:
dic[ip] = 1
for x,y in dic.items():
print x,” “,y
