返回顶部
分享到

Java Web提交任务到Spark

资讯 2016-12-29 21:14 1321人浏览 0人回复
原作者: fansy1990 收藏 分享 邀请
摘要

相关软件版本:Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7机器:windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);centos6.6虚拟机(Hadoop伪分布 ...

相关软件版本:

Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7

机器:

windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);

centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8);

centos7虚拟机(Tomcat,JDK1.8);

1. 场景:


1. windows简单Java程序调用Spark,执行Scala开发的Spark程序,这里包含两种模式:

1> 提交任务到Spark集群,使用standAlone模式执行;

2> 提交任务到Yarn集群,使用yarn-client的模式;

2. windows 开发Java web程序调用Spark,执行Scala开发的Spark程序,同样包含两种模式,参考1.

3. Linux运行java web程序调用Spark,执行Scala开发的Spark程序,包含两种模式,参考1.


2. 实现:

1. 简单Scala程序,该程序的功能是读取HDFS中的log日志文件,过滤log文件中的WARN和ERROR的记录,最后把过滤后的记录写入到HDFS中,代码如下:


import org.apache.spark.{SparkConf, SparkContext}


/**
 * Created by Administrator on 2015/8/23.
 */
object Scala_Test {
  def main(args:Array[String]): Unit ={
    if(args.length!=2){
      System.err.println("Usage:Scala_Test")
    }
    // 初始化SparkConf
    val conf = new SparkConf().setAppName("Scala filter")
    val sc = new SparkContext(conf)

    //  读入数据
    val lines = sc.textFile(args(0))

    // 转换
    val errorsRDD = lines.filter(line => line.contains("ERROR"))
    val warningsRDD = lines.filter(line => line.contains("WARN"))
    val  badLinesRDD = errorsRDD.union(warningsRDD)

    // 写入数据
    badLinesRDD.saveAsTextFile(args(1))

    // 关闭SparkConf
    sc.stop()
  }
}


使用IntelliJ IDEA 并打成jar包备用(lz这里命名为spark_filter.jar);

2.  java调用spark_filter.jar中的Scala_Test 文件,并采用Spark standAlone模式,java代码如下:


package test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.spark.deploy.SparkSubmit;
/**
 * @author fansy
 *
 */
public class SubmitScalaJobToSpark {

	public static void main(String[] args) {
		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); 
		String filename = dateFormat.format(new Date());
		String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
		tmp =tmp.substring(0, tmp.length()-8);
		String[] arg0=new String[]{
				"--master","spark://node101:7077",
				"--deploy-mode","client",
				"--name","test java submit job to spark",
				"--class","Scala_Test",
				"--executor-memory","1G",
//				"spark_filter.jar",
				tmp+"lib/spark_filter.jar",//
				"hdfs://node101:8020/user/root/log.txt",
				"hdfs://node101:8020/user/root/badLines_spark_"+filename
		};
		
		SparkSubmit.main(arg0);
	}
}



具体操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(该文件在Spark压缩文件的lib目录中,同时该文件较大,拷贝需要一定时间) 拷贝到WebRoot/WEB-INF/lib目录。(注意:这里可以直接建立java web项目,在测试java调用时,直接运行java代码即可,在测试web项目时,开启tomcat即可)

java调用spark_filter.jar中的Scala_Test 文件,并采用Yarn模式。采用Yarn模式,不能使用简单的修改master为“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的时候,使用这个,同时配置HADOOP_CONF_DIR路径是可以的,但是在这里,读取不到HADOOP的配置,所以这里采用其他方式,使用yarn.Clent提交的方式,java代码如下:


package test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;

public class SubmitScalaJobToYarn {

	public static void main(String[] args) {
		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); 
		String filename = dateFormat.format(new Date());
		String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
		tmp =tmp.substring(0, tmp.length()-8);
		String[] arg0=new String[]{
				"--name","test java submit job to yarn",
				"--class","Scala_Test",
				"--executor-memory","1G",
//				"WebRoot/WEB-INF/lib/spark_filter.jar",//
				"--jar",tmp+"lib/spark_filter.jar",//
				
				"--arg","hdfs://node101:8020/user/root/log.txt",
				"--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,
				"--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//
				"--archives","hdfs://node101:8020/user/root/servlet-api.jar"//
		};
		
//		SparkSubmit.main(arg0);
		Configuration conf = new Configuration();
		String os = System.getProperty("os.name");
		boolean cross_platform =false;
		if(os.contains("Windows")){
			cross_platform = true;
		}
		conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平台提交任务
		conf.set("fs.defaultFS", "hdfs://node101:8020");// 指定namenode
		conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
		conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
		conf.set("yarn.resourcemanager.scheduler.address", "node101:8030");// 指定资源分配器
		conf.set("mapreduce.jobhistory.address","node101:10020");
		
		 System.setProperty("SPARK_YARN_MODE", "true");

		 SparkConf sparkConf = new SparkConf();
		 ClientArguments cArgs = new ClientArguments(arg0, sparkConf);
		
		new Client(cArgs,conf,sparkConf).run();
	}
}


3. java web测试 任务提交到Spark的两种模式,这里采用最简单的方式,直接配置servlet,其web.xml文件如下:

This is the description of my J2EE componentThis is the display name of my J2EE componentSparkServletservlet.SparkServletThis is the description of my J2EE componentThis is the display name of my J2EE componentYarnServletservlet.YarnServletSparkServlet/servlet/SparkServletYarnServlet/servlet/YarnServlet
SparkServlet如下:


package servlet;

import java.io.IOException;
import java.io.PrintWriter;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import test.SubmitScalaJobToSpark;

public class SparkServlet extends HttpServlet {

	/**
	 * Constructor of the object.
	 */
	public SparkServlet() {
		super();
	}

	/**
	 * Destruction of the servlet. 

	 */
	public void destroy() {
		super.destroy(); // Just puts "destroy" string in log
		// Put your code here
	}

	/**
	 * The doGet method of the servlet. 

	 *
	 * This method is called when a form has its tag value method equals to get.
	 * 
	 * @param request the request send by the client to the server
	 * @param response the response send by the server to the client
	 * @throws ServletException if an error occurred
	 * @throws IOException if an error occurred
	 */
	public void doGet(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {

		this.doPost(request, response);
	}

	/**
	 * The doPost method of the servlet. 

	 *
	 * This method is called when a form has its tag value method equals to post.
	 * 
	 * @param request the request send by the client to the server
	 * @param response the response send by the server to the client
	 * @throws ServletException if an error occurred
	 * @throws IOException if an error occurred
	 */
	public void doPost(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {
		System.out.println("开始SubmitScalaJobToSpark调用......");
		SubmitScalaJobToSpark.main(null);
		//YarnServlet也只是这里不同
		System.out.println("完成SubmitScalaJobToSpark调用!");
		response.setContentType("text/html");
		PrintWriter out = response.getWriter();
		out.println("");
		out.println("");
		out.println("A Servlet");
		out.println("");
		out.print("    This is ");
		out.print(this.getClass());
		out.println(", using the POST method");
		out.println("");
		out.println("");
		out.flush();
		out.close();
	}

	/**
	 * Initialization of the servlet. 

	 *
	 * @throws ServletException if an error occurs
	 */
	public void init() throws ServletException {
		// Put your code here
	}

}
这里只是调用了java编写的任务调用类而已。同时,SparServlet和YarnServlet也只是在调用的地方不同而已。

在web测试时,首先直接在MyEclipse上测试,然后拷贝工程WebRoot到centos7,再次运行tomcat,进行测试。

3. 总结及问题

1. 测试结果:

1> java代码直接提交任务到Spark和Yarn,进行日志文件的过滤,测试是成功运行的。可以在Yarn和Spark的监控中看到相关信息:

同时,在HDFS可以看到输出的文件:

2> java web 提交任务到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet文件夹删掉,因为会和tomcat的servlet-api.jar冲突。

a. 在windows和linux上启动tomcat,提交任务到Spark standAlone,测试成功运行;

b. 在windows和linux上启动tomcat,提交任务到Yarn,测试失败;

2. 遇到的问题:

1> java web 提交任务到Yarn,会失败,失败的主要日志如下:

15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
这个是因为javax.servlet的包被删掉了,和tomcat的冲突。

同时,在日志中还可以看到:

15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
15/08/26 12:39:27 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
15/08/26 12:39:32 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
15/08/26 12:39:33 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copying hdfs://node101:8020/user/root/servlet-api.jar
15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.
这里在环境初始化的时候,上传了两个jar,一个就是spark-assembly-1.4.1-hadoop2.6.0.jar 还有一个就是我们自定义的jar。上传的spark-assembly-1.4.1-hadoop2.6.0.jar 里面没有javax.servlet的文件夹,所以会报错。在java中直接调用(没有删除javax.servlet的时候)同样会看到这样的日志,同样的上传,那时是可以的,也就是说这里确实是删除了包文件夹的关系。那么如何修复呢?

上传servlet-api到hdfs,同时在使用yarn.Client提交任务的时候,添加相关的参数,这里查看参数,发现两个比较相关的参数,--addJars以及--archive 参数,把这两个参数都添加后,看到日志中确实把这个jar包作为了job的共享文件,但是java web提交任务到yarn 还是报这个类找不到的错误。所以这个办法也是行不通!(可以参考http://blog.csdn.NET/fansy1990/article/details/52289826中的部署部分解决这个问题)

使用yarn.Client提交任务到Yarn参考http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ 。

推广广告
星点云香港服务器,CN2高速连接,ping值低可免费换IP,安全稳定,技术团队24小时在线稳定无忧
本文暂无评论,快来抢沙发!

热门问答
云萌主 云萌主-BIGSAAS旗下,由北京合智互联信息技术有限公司在2018年创立,为广大云应用技术爱好者的平台。在云萌主论坛可以查看云应用技术文章、云产品产品最新资讯、技术问答、技术视频。在畅游云上技术的同时,学到最新的云应用产品和技术。
  • 微信公众号

  • Powered by Discuz! X3.5 | Licensed | Copyright © 2001-2022, Aliyun Cloud. | 星点互联设计
  • 京ICP备18052714号 | 营业执照 | |合智互联| QQ