ceph对象存储创建api

# -*- coding:utf-8 -*-

import boto
import boto.s3.connection
import paramiko

class Accountinfo():
    """
    用法详见 http://docs.ceph.org.cn/man/8/radosgw-admin/
    """
    def __init__(self):
        self.hostname = '192.168.44.70'
        self.port = 22
        self.username = 'root'
        self.passwd = '123456'

    def new_connect(self):
        try:
            paramiko.util.log_to_file('paramiko.log')
            ssh = paramiko.SSHClient()
            ssh.load_system_host_keys()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(hostname='192.168.44.70',port=22,username='root',password='123456')
            return ssh

        except Exception as e:
            return 'error'

    def user_manager(self,username,flag):
        """
        为s3访问创建radosgw用户
        flag: c -> create,d -> delete
        """
        ssh = self.new_connect()
        if not isinstance(ssh,str):
            c_command = '/usr/bin/radosgw-admin  user create --uid="%s" --display-name="%s"' % (username, username.title())
            d_command = '/usr/bin/radosgw-admin  user rm --uid="%s"' % (username)

            if flag == 'c':
                stdin, stdout, stderr = ssh.exec_command(c_command)
                outstr = stdout.read()
                errstr = stdout.read()
                ssh.close()
                return outstr,errstr
            elif flag == 'd':
                stdin, stdout, stderr = ssh.exec_command(d_command)
                ssh.close()
                return 'delete %s success'% username
                # return stdout, stderr
            else:
                ssh.close()
                return 'flag == c or d','flag error'

        return 'connect error'

class CephS3():

    # 单例模式

    __instance = None

    def __init__(self):
        self.access_key = "BKOLF8C5319QK2UIMQ09"
        self.secret_key = "jBiFwY3LeHh78tM9W6Y8oQUM2VNIbieGVViB3wEB"
        self.host = '192.168.44.70'
        self.port = 7480
        self.conn = boto.connect_s3(
            aws_access_key_id= self.access_key,
            aws_secret_access_key= self.secret_key,
            host = self.host,
            port = self.port,
            is_secure=False,
            calling_format = boto.s3.connection.OrdinaryCallingFormat()
        )

    @staticmethod
    def get_connect():

        if CephS3.__instance:
            return CephS3.__instance
        else:
            CephS3.__instance = CephS3().conn
            return CephS3.__instance

    def list_all_buckets(self):
        con = CephS3.get_connect()
        all_buckets = con.get_all_buckets()

        for bucket in all_buckets:
            print("{name}\t{created}".format(name=bucket.name,created=bucket.creation_date))

    def create_bucket(self,bucketname):
        # con = self.connect()
        con = CephS3.get_connect()
        all_bucket = con.get_all_buckets()
        all_bucket_name = [i.name for i in all_bucket]

        try:
            if bucketname not in all_bucket_name:
                bucket = con.create_bucket(bucketname)
                return 'ok'
                # print('Bucket %s create success'% bucketname)
            else:
                return 'fail'
                # print('Bucket %s already exists!' % bucketname)
        except Exception as e:
            return str(e)

    def delete_bucket(self,bucketname):

        con = CephS3.get_connect()
        all_bucket = con.get_all_buckets()
        all_bucket_name = [i.name for i in all_bucket]

        try:
            if bucketname in all_bucket_name:
                bucket = con.delete_bucket(bucketname)
                return 'ok'

            else:
                return 'fail'

        except Exception as e:
            print(str(e))

    def list_bucket_object(self,bucketname):

        con = CephS3.get_connect()
        bucket = con.get_bucket(bucketname)
        list_buckets = bucket.list()

        print("%-10s\t%-10s\t%-10s\n"%('Name','Size','Modified'))
        for key in list_buckets:
            print("{name:<10}\t{size:<10}\t{modified:<10}".format(name=key.name,size=key.size,modified=key.last_modified))

    def create_object(self,bucketname,obj_key,obj_value):
        """新建一个内容是字符串 obj_value 的文件 obj_key"""

        con = CephS3.get_connect()
        bucket = con.get_bucket(bucketname)
        list_buckets = bucket.list()
        obj_keys = [i.name for i in list_buckets]
        try:
            if obj_key not in obj_keys:
                bucket_key = bucket.new_key(obj_key)
                bucket_key.set_contents_from_string(obj_value)
                return 'ok'
            return 'exists'
        except Exception as e:
            return str(e)

    def modifi_object_acl(self,bucketname,obj_key,acl='public-read'):
        """
        修改对象的权限
        :param bucketname:
        :param obj_key:
        :param acl: public-read 公开可读   private 私有
        :return:
        """
        # con = self.connect()
        con = CephS3.get_connect()
        bucket = con.get_bucket(bucketname)
        try:
            bucket_key = bucket.get_key(obj_key)
            bucket_key.set_canned_acl(acl)
            return 'ok'
        except Exception as e:
            return str(e)

    def make_obj_url(self,bucketname,obj_key,sign=False):
        try:

            con = CephS3.get_connect()
            bucket = con.get_bucket(bucketname)
            key = bucket.get_key(obj_key)
            if not sign:
                key_url_no_sign = key.generate_url(0,query_auth=False,force_http=True)
                return key_url_no_sign
            key_url_sign = key.generate_url(3600,query_auth=True,force_http=True)
            return key_url_sign
        except Exception as e:
            return str(e)

    def download_to_file(self,bucketname,obj_key,dest_file):
        try:
            # con = self.connect()
            con = CephS3.get_connect()
            bucket = con.get_bucket(bucketname)
            key = bucket.get_key(obj_key)
            key.get_contents_to_filename(dest_file)
            return 'ok'
        except Exception as e:
            return str(e)

    def delete_bucket_key(self,bucketname,obj_key):
        try:
            # con = self.connect()
            con = CephS3.get_connect()
            bucket = con.get_bucket(bucketname)
            bucket.delete_key(obj_key)
            return 'ok'
        except Exception as e:
            return str(e)

if __name__ == '__main__':
    cephapi = CephS3()

    url = cephapi.make_obj_url('bucket_bamboo','haproxy.txt',True)
    print(url)

    # cephapi.list_bucket_object('bucket_bamboo')

    # 创建用户 获取keys 信息

    # um = Accountinfo()
    # out, err = um.user_manager('wuye','c')
    #
    # if type(out) == str:
    #     res = json.loads(out)
    #     print(json.dumps(res["keys"][0],indent=2))
    # else:
    #     print(out,err)

文章名称:ceph对象存储创建api
文章位置:http://scyanting.com/article/jcssjj.html