ceph对象存储创建api
# -*- coding:utf-8 -*-
import boto
import boto.s3.connection
import paramiko
class Accountinfo():
"""
用法详见 http://docs.ceph.org.cn/man/8/radosgw-admin/
"""
def __init__(self):
self.hostname = '192.168.44.70'
self.port = 22
self.username = 'root'
self.passwd = '123456'
def new_connect(self):
try:
paramiko.util.log_to_file('paramiko.log')
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname='192.168.44.70',port=22,username='root',password='123456')
return ssh
except Exception as e:
return 'error'
def user_manager(self,username,flag):
"""
为s3访问创建radosgw用户
flag: c -> create,d -> delete
"""
ssh = self.new_connect()
if not isinstance(ssh,str):
c_command = '/usr/bin/radosgw-admin user create --uid="%s" --display-name="%s"' % (username, username.title())
d_command = '/usr/bin/radosgw-admin user rm --uid="%s"' % (username)
if flag == 'c':
stdin, stdout, stderr = ssh.exec_command(c_command)
outstr = stdout.read()
errstr = stdout.read()
ssh.close()
return outstr,errstr
elif flag == 'd':
stdin, stdout, stderr = ssh.exec_command(d_command)
ssh.close()
return 'delete %s success'% username
# return stdout, stderr
else:
ssh.close()
return 'flag == c or d','flag error'
return 'connect error'
class CephS3():
# 单例模式
__instance = None
def __init__(self):
self.access_key = "BKOLF8C5319QK2UIMQ09"
self.secret_key = "jBiFwY3LeHh78tM9W6Y8oQUM2VNIbieGVViB3wEB"
self.host = '192.168.44.70'
self.port = 7480
self.conn = boto.connect_s3(
aws_access_key_id= self.access_key,
aws_secret_access_key= self.secret_key,
host = self.host,
port = self.port,
is_secure=False,
calling_format = boto.s3.connection.OrdinaryCallingFormat()
)
@staticmethod
def get_connect():
if CephS3.__instance:
return CephS3.__instance
else:
CephS3.__instance = CephS3().conn
return CephS3.__instance
def list_all_buckets(self):
con = CephS3.get_connect()
all_buckets = con.get_all_buckets()
for bucket in all_buckets:
print("{name}\t{created}".format(name=bucket.name,created=bucket.creation_date))
def create_bucket(self,bucketname):
# con = self.connect()
con = CephS3.get_connect()
all_bucket = con.get_all_buckets()
all_bucket_name = [i.name for i in all_bucket]
try:
if bucketname not in all_bucket_name:
bucket = con.create_bucket(bucketname)
return 'ok'
# print('Bucket %s create success'% bucketname)
else:
return 'fail'
# print('Bucket %s already exists!' % bucketname)
except Exception as e:
return str(e)
def delete_bucket(self,bucketname):
con = CephS3.get_connect()
all_bucket = con.get_all_buckets()
all_bucket_name = [i.name for i in all_bucket]
try:
if bucketname in all_bucket_name:
bucket = con.delete_bucket(bucketname)
return 'ok'
else:
return 'fail'
except Exception as e:
print(str(e))
def list_bucket_object(self,bucketname):
con = CephS3.get_connect()
bucket = con.get_bucket(bucketname)
list_buckets = bucket.list()
print("%-10s\t%-10s\t%-10s\n"%('Name','Size','Modified'))
for key in list_buckets:
print("{name:<10}\t{size:<10}\t{modified:<10}".format(name=key.name,size=key.size,modified=key.last_modified))
def create_object(self,bucketname,obj_key,obj_value):
"""新建一个内容是字符串 obj_value 的文件 obj_key"""
con = CephS3.get_connect()
bucket = con.get_bucket(bucketname)
list_buckets = bucket.list()
obj_keys = [i.name for i in list_buckets]
try:
if obj_key not in obj_keys:
bucket_key = bucket.new_key(obj_key)
bucket_key.set_contents_from_string(obj_value)
return 'ok'
return 'exists'
except Exception as e:
return str(e)
def modifi_object_acl(self,bucketname,obj_key,acl='public-read'):
"""
修改对象的权限
:param bucketname:
:param obj_key:
:param acl: public-read 公开可读 private 私有
:return:
"""
# con = self.connect()
con = CephS3.get_connect()
bucket = con.get_bucket(bucketname)
try:
bucket_key = bucket.get_key(obj_key)
bucket_key.set_canned_acl(acl)
return 'ok'
except Exception as e:
return str(e)
def make_obj_url(self,bucketname,obj_key,sign=False):
try:
con = CephS3.get_connect()
bucket = con.get_bucket(bucketname)
key = bucket.get_key(obj_key)
if not sign:
key_url_no_sign = key.generate_url(0,query_auth=False,force_http=True)
return key_url_no_sign
key_url_sign = key.generate_url(3600,query_auth=True,force_http=True)
return key_url_sign
except Exception as e:
return str(e)
def download_to_file(self,bucketname,obj_key,dest_file):
try:
# con = self.connect()
con = CephS3.get_connect()
bucket = con.get_bucket(bucketname)
key = bucket.get_key(obj_key)
key.get_contents_to_filename(dest_file)
return 'ok'
except Exception as e:
return str(e)
def delete_bucket_key(self,bucketname,obj_key):
try:
# con = self.connect()
con = CephS3.get_connect()
bucket = con.get_bucket(bucketname)
bucket.delete_key(obj_key)
return 'ok'
except Exception as e:
return str(e)
if __name__ == '__main__':
cephapi = CephS3()
url = cephapi.make_obj_url('bucket_bamboo','haproxy.txt',True)
print(url)
# cephapi.list_bucket_object('bucket_bamboo')
# 创建用户 获取keys 信息
# um = Accountinfo()
# out, err = um.user_manager('wuye','c')
#
# if type(out) == str:
# res = json.loads(out)
# print(json.dumps(res["keys"][0],indent=2))
# else:
# print(out,err)
文章名称:ceph对象存储创建api
文章位置:http://scyanting.com/article/jcssjj.html