场景

在flinksql-client下创建hive的表,读取kafka数据写入分区时报错

Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
	at java.lang.Class.getDeclaredFields0(Native Method)
	at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
	at java.lang.Class.getDeclaredField(Class.java:2068)
	at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872)
	at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)
	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)
	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2001)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1848)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2158)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:154)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

排查

报错发生在JobManager上,猜测应该是创建插入Hive的job时出错

通过查看日志发现 org.apache.flink.runtime.util.EnvironmentInformation 打印出的classpath不包含hadoop classpath中mapred的相关包

同时使用arthas查看jobmanager的进程,sysenv 和 sysprop均不含share/hadoop/mapreduce/*.jar

通过ps查看 jobmanager的启动脚本命令,不包含-cp的命令

jobmanager进程

jobmanager通过 环境变量值 PWD来加载相关配置,使用arthas sysenv可以查看

PWD /tmp/hadoop-admin/nm-local-dir/usercache/admin/appcache/application_1630478685137_0012/container_1630478685137_0012_01_000001

flink on yarn应用的工作目录与相关文件

在launch_container.sh可以找到如下,生成的CLASS_PATH只包含 HADOOP_COMMON_HOME HADOOP_HDFS_HOME HADOOP_YARN_HOME 相关Jar,而对应的 org.apache.hadoop.mapred.JobConf 的jar是在 share/hadoop/mapreduce/ 下,因此不在列表内,无法读取加载

export CLASSPATH=":lib/antlr-runtime-3.5.2.jar:lib/flink-connector-hive_2.11-1.13.1.jar:lib/flink-csv-1.13.1.jar:lib/flink-json-1.13.1.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.13.1.jar:lib/flink-table_2.11-1.13.1.jar:lib/hive-exec-1.2.2.jar:lib/hive-metastore-1.2.2.jar:lib/libfb303-0.9.2.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.13.1.jar:flink-conf.yaml::$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*"

解决

1.想办法把所需的jar加入class_path的列表中去,配置环境变量 YARN_APPLICAITON_CLASSPATH

2.将jar加入flink的lib中是否生效(上面加粗部分为lib的jar)

1. 修改YARN配置yarn-site.xml

  <property>
        <name>yarn.application.classpath</name>
	<value>
	/opt/hadoop/etc/hadoop,
	/opt/hadoop/share/hadoop/common/*,
	/opt/hadoop/share/hadoop/common/lib/*,
	/opt/hadoop/share/hadoop/hdfs/*,
	/opt/hadoop/share/hadoop/hdfs/lib/*,
	/opt/hadoop/share/hadoop/mapreduce/*,
	/opt/hadoop/share/hadoop/mapreduce/lib/*,
	/opt/hadoop/share/hadoop/yarn/*,
	/opt/hadoop/share/hadoop/yarn/lib/*
	</value>
     </property>
org.apache.flink.yarn.YarnClusterDescriptor (flink 1.13.1) am class_path

通过org.apache.flink.yarn.Utils读取 yarn_applicaiton_classpath或默认值

yarn_applicaiton_classpath 的默认值为 hadoop的核心代码org.apache.hadoop.yarn.conf.YarnConfiguration,默认情况下只包含common/hdfs/yarn的包

2. 将mapd的jar加入flink的lib(可能存在其他异常,未测试)