1. 概述
在当今数据驱动的商业环境中,企业需要从各种来源收集和分析数据以支持决策。网页爬取作为一种数据收集手段,能够从网页中提取有价值的信息。Kettle作为一种强大的ETL工具,结合Jsoup库,可以实现高效、灵活的网页爬取功能。通过开发一个Kettle组件,用户可以将网页数据抽取任务集成到Kettle的工作流中,实现自动化处理。
2. 组件实现
2.1 组件设计
在Kettle中开发Jsoup组件,需要继承StepInterface
和StepDataInterface
接口,并实现相关的抽象方法。组件的主要类包括:
-
JsoupClientMeta
:负责存储组件的元数据,如配置参数、输入字段和输出字段等。 -
JsoupClientData
:用于存储组件运行时的数据,如文件列表、解析的HTML文档、抽取结果等。 -
JsoupClient
:实现组件的核心逻辑,包括数据抽取、处理和输出。
2.2 初始化组件
在组件初始化阶段,init
方法会被调用。该方法负责加载配置参数,初始化文件列表,并创建输出行元数据。通过解析字段路径,将用户定义的XPath或CSS选择器转换为可执行的抽取路径。
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {meta = (JsoupClientMeta) smi; // 转换元数据接口data = (JsoupClientData) sdi; // 转换数据接口if (super.init(smi, sdi)) { // 初始化父类data.rownr = 1L; // 初始化行号data.nrInputFields = meta.getInputFields().length; // 初始化输入字段数量// 解析字段路径for (int i = 0; i < data.nrInputFields; i++) {JsoupClientDataField xmlDataField = meta.getInputFields()[i];String XPathValue = environmentSubstitute(xmlDataField.getXPath());if (xmlDataField.getElementType() == JsoupClientDataField.ELEMENT_TYPE_ATTRIBUT) { // 处理属性if (!XPathValue.startsWith(JsoupClientMeta.AT)) {XPathValue = JsoupClientMeta.AT + XPathValue;}}xmlDataField.setResolvedXPath(XPathValue); // 设置解析后的XPath}data.PathValue = environmentSubstitute(meta.getLoopXPath()); // 设置循环路径if (Utils.isEmpty(data.PathValue)) { // 检查路径是否为空logError(BaseMessages.getString(PKG, "JsoupData.Error.EmptyPath"));return false;}data.prunePath = environmentSubstitute(meta.getPrunePath()); // 设置剪枝路径if (data.prunePath != null) {if (Utils.isEmpty(data.prunePath.trim())) {data.prunePath = null;} else {if (!data.prunePath.startsWith(JsoupClientMeta.N0DE_SEPARATOR)) {data.prunePath = JsoupClientMeta.N0DE_SEPARATOR + data.prunePath;}}}return true;}return false;
}
2.3 处理每一行数据
在数据处理阶段,processRow
方法会被调用。该方法负责从输入流中获取数据,并使用Jsoup解析HTML文档。根据用户定义的抽取路径,从DOM中提取所需的数据。
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {meta = (JsoupClientMeta) smi;data = (JsoupClientData) sdi;if (first) { // 如果是第一次处理first = false;data.files = meta.getFiles(this); // 初始化文件列表if (!meta.isdoNotFailIfNoFile() && data.files.nrOfFiles() == 0) {throw new KettleException(BaseMessages.getString(PKG, "JsoupData.Log.NoFiles"));}handleMissingFiles(); // 处理缺失文件data.outputRowMeta = new RowMeta(); // 创建输出行元数据meta.getFields(data.outputRowMeta, getStepname(), null, null, this, repository, metaStore); // 生成字段}// 获取HTML行数据Object[] r = getHTMLRow();if (r == null) { // 没有数据或发生错误,结束处理setOutputDone();return false;}putRowOut(r); // 输出数据return true;
}
2.4 获取HTML行数据
getHTMLRow
方法负责从文件列表中读取文件,并使用Jsoup解析HTML文档。根据抽取路径提取数据,并返回处理后的行数据。
private Object[] getHTMLRow() throws KettleException {if (!meta.isInFields()) { // 如果不是从字段中读取if (!openNextFile()) { // 打开下一个文件return null;}}return getHTMLRowPutRowWithErrorhandling(); // 获取数据并处理错误
}private Object[] getHTMLRowPutRowWithErrorhandling() throws KettleException {Object[] r = null;try {if (meta.isInFields()) { // 如果是从字段中读取if (!ReadNextString()) { // 读取下一行数据return null;}if (data.readrow == null) { // 没有数据return null;}}// 处理数据r = processPutRow(data.document);} catch (Exception e) {throw new KettleException(BaseMessages.getString(PKG, "JsoupData.Error.UnableReadFile"), e);}return r;
}
2.5 数据抽取与处理
processPutRow
方法负责从解析的HTML文档中提取数据,并进行清洗和转换。
private Object[] processPutRow(Document doc) throws KettleException {Object[] outputRowData = buildEmptyRow(); // 创建空行数据for (int i = 0; i < data.nrInputFields; i++) {JsoupClientDataField xmlDataField = meta.getInputFields()[i];String XPathValue = xmlDataField.getResolvedXPath();// 根据抽取路径获取数据String nodevalue = null;Elements elements = null;Element ele = null;switch (xmlDataField.getQueryType()) {case JsoupClientDataField.QUERY_TYPE_TAG:elements = doc.getElementsByTag(XPathValue);ele = elements.first();break;case JsoupClientDataField.QUERY_TYPE_SELECT:elements = doc.select(XPathValue);ele = elements.first();break;case JsoupClientDataField.QUERY_TYPE_ID:ele = doc.getElementById(XPathValue);break;case JsoupClientDataField.QUERY_TYPE_CLASS:elements = doc.getElementsByClass(XPathValue);ele = elements.first();break;case JsoupClientDataField.QUERY_TYPE_ATTRIBUTEVALUE:String[] attrs = XPathValue.split(",");elements = doc.getElementsByAttributeValue(attrs[0], attrs[1]);ele = elements.first();break;default:elements = doc.getElementsByTag(XPathValue);ele = elements.first();break;}if (ele == null) { // 没有找到节点continue;}// 获取节点值if (xmlDataField.getResultType() == JsoupClientDataField.RESULT_TYPE_VALUE_OF) {nodevalue = ele.text();} else {nodevalue = ele.html();}// 数据清洗switch (xmlDataField.getTrimType()) {case JsoupClientDataField.TYPE_TRIM_LEFT:nodevalue = Const.ltrim(nodevalue);break;case JsoupClientDataField.TYPE_TRIM_RIGHT:nodevalue = Const.rtrim(nodevalue);break;case JsoupClientDataField.TYPE_TRIM_BOTH:nodevalue = Const.trim(nodevalue);break;default:break;}// 数据转换ValueMetaInterface targetValueMeta = data.outputRowMeta.getValueMeta(data.totalpreviousfields + i);ValueMetaInterface sourceValueMeta = data.convertRowMeta.getValueMeta(data.totalpreviousfields + i);outputRowData[data.totalpreviousfields + i] = targetValueMeta.convertData(sourceValueMeta, nodevalue);// 处理重复字段if (xmlDataField.isRepeated()) {if (data.previousRow != null && Utils.isEmpty(nodevalue)) {outputRowData[data.totalpreviousfields + i] = data.previousRow[data.totalpreviousfields + i];}}}// 添加附加字段int rowIndex = data.totalpreviousfields + data.nrInputFields;if (meta.includeFilename() && !Utils.isEmpty(meta.getFilenameField())) {outputRowData[rowIndex++] = data.filename;}if (meta.includeRowNumber() && !Utils.isEmpty(meta.getRowNumberField())) {outputRowData[rowIndex++] = data.rownr;}if (meta.getShortFileNameField() != null && meta.getShortFileNameField().length() > 0) {outputRowData[rowIndex++] = data.shortFilename;}if (meta.getExtensionField() != null && meta.getExtensionField().length() > 0) {outputRowData[rowIndex++] = data.extension;}if (meta.getPathField() != null && meta.getPathField().length() > 0) {outputRowData[rowIndex++] = data.path;}if (meta.getSizeField() != null && meta.getSizeField().length() > 0) {outputRowData[rowIndex++] = data.size;}if (meta.isHiddenField() != null && meta.isHiddenField().length() > 0) {outputRowData[rowIndex++] = Boolean.valueOf(data.path);}if (meta.getLastModificationDateField() != null && meta.getLastModificationDateField().length() > 0) {outputRowData[rowIndex++] = data.lastModificationDateTime;}if (meta.getUriField() != null && meta.getUriField().length() > 0) {outputRowData[rowIndex++] = data.uriName;}if (meta.getRootUriField() != null && meta.getRootUriField().length() > 0) {outputRowData[rowIndex] = data.rootUriName;}return outputRowData;
}
3. 组件的价值
通过在Kettle中开发Jsoup组件,用户可以将网页数据抽取任务集成到Kettle的工作流中,实现自动化处理。这种集成方式带来了以下价值:
-
提高效率:用户无需手动编写复杂的爬虫代码,可以直接通过Kettle的图形化界面配置Jsoup组件,快速实现数据抽取任务。
-
灵活性:Jsoup组件支持多种配置选项,包括XPath和CSS选择器、数据格式化、清洗规则等,能够满足不同用户的需求。
-
易于维护:通过Kettle的图形化界面配置组件,用户可以轻松地修改和维护数据抽取任务,无需深入理解底层代码。
总之,通过在Kettle中开发Jsoup组件,用户可以高效、灵活地从网页中抽取数据,并将其集成到Kettle的工作流中,实现自动化处理。这种集成方式不仅提高了数据抽取的效率,还增强了数据处理的灵活性和可靠性,为用户提供了一个强大的工具来满足他们的数据抽取需求。