HiveMetastore客户端自动重连机制的示例分析
小编给大家分享一下Hive Metastore客户端自动重连机制的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
成都创新互联是一家集网站建设,昌邑企业网站建设,昌邑品牌网站建设,网站定制,昌邑网站建设报价,网络营销,网络优化,昌邑网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
前言
本文基于Hive2.1.0的Apache社区版,目的是为了探究Metastore和底层RDBMS和底层服务变更(例如版本升级、服务迁移等运维操作)对客户端和用户的影响。Hive提供了在客户端对Metastore连接超时自动重连的容错机制,允许我们通过调整参数配置调整停服时间限制,在规定时间内重启服务对用户无显著影响。由于Metastore底层RDBMS我们采用的是业内通用的MySQL,因此后面以Mysql来替代RDBMS进行描述和验证
相关参数
参数 | 默认值 | 说明 | 配置范围 |
---|---|---|---|
hive.metastore.connect.retries | 3 | 客户端建立与metastore连接时的重试次数 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.failure.retries | 1 | 客户端访问metastore的失败重试次数 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.connect.retry.delay | 1s | Metastore客户端重连/重试等待的时间 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.timeout | 600s | Metastore客户端socket超时时间,传递给底层Socket,超时之后底层Socket会自动断开 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.lifetime | 0 | socket存活时间,超时之后客户端在下一次访问Metastore时会主动断开现有连接并重新建立连接,0表示不主动断开 | Metastore客户端,如CLI、Hiveserver2等 |
hive.hmshandler.retry.attempts | 10 | 在JDO数据存储出现错误后尝试连接的次数 | Metastore |
hive.hmshandler.retry.interval | 2000ms | JDO连接尝试间隔,单位:ms | Metastore |
hive.server2.thrift.client.connect.retry.limit | 1 | 客户端建立与Hiveserver2连接的重试次数 | Hiveserver2的客户端,如Beeline等 |
hive.server2.thrift.client.retry.limit | 1 | 客户端访问Hiveserver2的失败重试次数 | Hiveserver2的客户端,如Beeline等 |
hive.server2.thrift.client.retry.delay.seconds | 1s | Hiveserver2客户端重连/重试等待的时间 | Hiveserver2的客户端,如Beeline等 |
hive.metastore.connect.retries 和 hive.metastore.failure.retries的区别
为了弄清这两个参数的区别,让我们通过源码来确认一下,ps:为了方便阅读后面会用......省略掉无关的代码逻辑
1. Hive与Metastore交互
CLI和Hiveserver2都是通过org.apache.hadoop.hive.ql.metadata.Hive类与Metastore的交互的。首先让我们以createDatabase(Database, boolean)方法为例来看看具体的交互过程
/** * Create a database * @param db * @param ifNotExist if true, will ignore AlreadyExistsException exception * @throws AlreadyExistsException * @throws HiveException */ public void createDatabase(Database db, boolean ifNotExist) throws AlreadyExistsException, HiveException { try { getMSC().createDatabase(db); } catch (AlreadyExistsException e) { if (!ifNotExist) { throw e; } } catch (Exception e) { throw new HiveException(e); } } /** * @return the metastore client for the current thread * @throws MetaException */ @LimitedPrivate(value = {"Hive"}) @Unstable public synchronized IMetaStoreClient getMSC( boolean allowEmbedded, boolean forceCreate) throws MetaException { if (metaStoreClient == null || forceCreate) { ...... try { metaStoreClient = createMetaStoreClient(allowEmbedded); } catch (RuntimeException ex) { ...... } ...... } return metaStoreClient; }
Hive类维护了一个IMetaStoreClient对象,通过getMSC()方法获取,getMSC()方法在这里采用了懒汉模式去创建,接下来看下Hive是如何创建一个IMetaStoreClient对象的
2. 创建一个IMetaStoreClient对象
// org.apache.hadoop.hive.ql.metadata.Hive.java private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { ...... if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); } else { return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap, SessionHiveMetaStoreClient.class.getName(), allowEmbedded); } }
if后面的分支用于创建客户端内置的本地Metastore,这主要用于开发调试阶段,因此我们只关注else后面的逻辑,即通过RetryingMetaStoreClient.getProxy方法创建一个IMetaStoreClient对象。RetryingMetaStoreClient.getProxy方法通过几次简单地调用重载函数,最终来到下面的方法
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public static IMetaStoreClient getProxy(HiveConf hiveConf, Class>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMapmetaCallTimeMap, String mscClassName) throws MetaException { @SuppressWarnings("unchecked") Class extends IMetaStoreClient> baseClass = (Class extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName); RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, metaCallTimeMap, baseClass); return (IMetaStoreClient) Proxy.newProxyInstance( RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); }
可以看到,这里利用Java代理机制创建并返回了一个IMetaStoreClient的代理——RetryingMetaStoreClient,此后对IMetaStoreClient对象的调用都委托给RetryingMetaStoreClient.invoke 处理,接下来让我们看下RetryingMetaStoreClient.invoke方法是如何处理用户对IMetastoreClient对象的操作的
3. 每次调用IMetaStoreClient对象访问Metastore时的底层实现逻辑
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; int retriesMade = 0; TException caughtException = null; while (true) { try { reloginExpiringKeytabUser(); // 1. 检查是否重连,重连的场景包括: // a) 上一次循环访问Metastore异常,且异常类型支持自动重试访问 // b) 底层socket超时,超时参数:hive.metastore.client.socket.lifetime if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { base.reconnect(); lastConnectionTime = System.currentTimeMillis(); } if (metaCallTimeMap == null) { ret = method.invoke(base, args); } else { // need to capture the timing long startTime = System.currentTimeMillis(); ret = method.invoke(base, args); long timeTaken = System.currentTimeMillis() - startTime; addMethodTime(method, timeTaken); } // 2. 访问Metastore正常,返回结果给上层调用并结束循环,用户不主动结束的情况下底层与Metastore的连接持续保持着 break; // 3. 处理访问Metastore过程中出现的异常,异常主要分三类: // a) 用户操作异常或元数据异常,将异常抛给用户处理并结束循环 // b) 底层连接异常,例如网络问题、Metastore服务异常(停服、连接超限等)等支持自动重连,进入步骤4 // c) 其他未知异常,抛给用户处理并结束循环 } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof TApplicationException) { TApplicationException tae = (TApplicationException)t; switch (tae.getType()) { case TApplicationException.UNSUPPORTED_CLIENT_TYPE: case TApplicationException.UNKNOWN_METHOD: case TApplicationException.WRONG_METHOD_NAME: case TApplicationException.INVALID_PROTOCOL: throw t; default: caughtException = tae; } } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { caughtException = (TException)t; } else if ((t instanceof MetaException) && t.getMessage().matches( "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = (MetaException)t; } else { throw t; } } catch (MetaException e) { if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = e; } else { throw e; } } // 4. 对于支持自动重试的异常,会记录重试次数并验证次数是否超限,是则返回异常并结束循环,否则将以warn形式输出异常日志提醒并等等一段时间后开始下一次循环自动重试访问Metastore。这里用到的重试次数参数和等待时间参数分别是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay if (retriesMade >= retryLimit) { throw caughtException; } retriesMade++; Thread.sleep(retryDelaySeconds * 1000); } return ret; } protected RetryingMetaStoreClient(HiveConf hiveConf, Class>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMapmetaCallTimeMap, Class extends IMetaStoreClient> msClientClass) throws MetaException { this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); this.retryDelaySeconds = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); this.metaCallTimeMap = metaCallTimeMap; this.connectionLifeTimeInMillis = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); ...... this.base = (IMetaStoreClient) MetaStoreUtils.newInstance( msClientClass, constructorArgTypes, constructorArgs); }
从 RetryingMetaStoreClient 的构造函数中可以发现,RetryingMetaStoreClient 维护了一个 HiveMetaStoreClient 对象,用户在上层调用一次 RetryingMetaStoreClient 对象操作,例如第一步的 createDatabase 方法,会经过 RetryingMetaStoreClient.invoke 的封装最终调用HiveMetaStoreClient类中的同名方法进行操作。在 RetryingMetaStoreClient.invoke 中封装了自动重试的逻辑,在底层与Metastore的连接过程中出现异常的情况下会自动重试而不影响上层用户的操作。
这里我们在注释中标注了 invoke 方法中主要的操作步骤,可以看到,重试次数由参数hive.metastore.failure.retries控制,两次重试之间的等待时间由hive.metastore.client.connect.retry.delay控制。
注意,这里我们说的是“重试”,而不是“重连”,一次重试中与Metastore的交互有两步:1. 建立与Metastore的会话 2. 执行用户请求。我们继续看下客户端是怎么建立与Metastore的会话的
4. Metastore重连
// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java @Override public void reconnect() throws MetaException { ...... close(); // 当配置了多个Metastore时,会随机调整Metastore顺序 promoteRandomMetaStoreURI(); open(); } private void open() throws MetaException { isConnected = false; ...... // hive.metastore.client.socket.timeout int clientSocketTimeout = (int) conf.getTimeVar( ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { try { transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); ...... try { transport.open(); isConnected = true; } catch (TTransportException e) { ...... } ...... } catch (MetaException e) { ...... } if (isConnected) { break; } } // Wait before launching the next round of connection retries. if (!isConnected && retryDelaySeconds > 0) { try { Thread.sleep(retryDelaySeconds * 1000); } catch (InterruptedException ignore) {} } } if (!isConnected) { throw new MetaException("Could not connect to meta store using any of the URIs provided." + " Most recent failure: " + StringUtils.stringifyException(tte)); } ...... } public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException { ...... // hive.metastore.connect.retries retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); // hive.metastore.client.connect.retry.delay retryDelaySeconds = conf.getTimeVar( ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); ...... // 初始化一个HiveMetaStoreClient对象时会尝试建立与Metastore的长会话 open(); }
同上一步的重试逻辑类似,与Metastore的连接支持自动重连,由 hive.metastore.connect.retries控制重连次数,hive.metastore.client.connect.retry.delay控制重连等待时间,底层利用Thrift提供的RPC通信服务。
如果配置了多个Metastore地址,每一次重连的时候会按顺序遍历所有的Metastore并尝试与之建立会话,直到有一个会话建立成功为止。
此外,初始化一个HiveMetaStoreClient对象时会调用open()方法尝试建立一个与Metastore的长会话,供后面的用户请求使用
总结
HiveMetaStoreClient.open() 方法建立一个与Metastore的会话,该方法中会在连接失败的情况下自动重连,重连次数、重连等待时间分别由参数 hive.metastore.connect.retries、 hive.metastore.client.connect.retry.delay控制。且每次重连时会遍历用户配置的所有的Metastore直到成功建立一个会话
用户新建一个Metastore客户端(例如启动一个CLI、Hiveserver2进程)时,会初始化并维护一个IMetaStoreClient对象,在初始化时调用 *HiveMetaStoreClient.open()*方法建立一个与Metastore的长会话
用户每次调用IMetaStoreClient中的方法进行业务操作,实际上委托给 RetryingMetaStoreClient.invoke 方法操作,在遇到与Metastore连接等异常时会进行自动重试,重试次数、重试等待时间分别由参数 hive.metastore.failure.retries、 hive.metastore.client.connect.retry.delay控制
RetryingMetaStoreClient.invoke 中每次重试会尝试调用 HiveMetaStoreClient.reconnect() 方法重连Metastore,HiveMetaStoreClient.reconnect() 方法内会调用 HiveMetaStoreClient.open() 去连接Metastore。因此,invoke方法实际上在重试循环中嵌套了循环重连Metastore的操作
所以 hive.metastore.failure.retries参数实际上仅用于在已经建立了Metastore的会话的基础上进行正常的业务访问过程中遇到连接异常等问题时的重试次数限制,而 hive.metastore.connect.retries则是更底层自动重连Metastore的次数限制
此外,hive.server2.thrift.client.connect.retry.limit 同 hive.server2.thrift.client.retry.limit 的区别也与hive.metastore.connect.retries 和 hive.metastore.failure.retries的区别类似,这里就不再赘述,有兴趣的同学可以参照本篇文档去研究下源码
看完了这篇文章,相信你对“Hive Metastore客户端自动重连机制的示例分析”有了一定的了解,如果想了解更多相关知识,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
分享题目:HiveMetastore客户端自动重连机制的示例分析
文章转载:http://scyanting.com/article/gdegdj.html