Tag Archives: Spark

[Solved] Spark SQL Error: File xxx could only be written to 0 of the 1 minReplication nodes.

Article Contents
Spark SQL reports an error File xxx could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.

There are 3 datanode(s) running and 3 node(s) are excluded in this operation. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.

21/06/1917:06:27 ERROR Hive: Failed to move: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/hdu.db/user_visit_action/user_visit_action.txt could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2205)
    at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2731)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:568)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:527)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1036)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1000)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:928)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2916)

Exception in thread "main" org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/hdu.db/user_visit_action/user_visit_action.txt could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2205)
    at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2731)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:568)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:527)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1036)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1000)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:928)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2916)
;
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:109)
    at org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:874)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadTable(ExternalCatalogWithListener.scala:167)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadTable(SessionCatalog.scala:491)
    at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:389)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
    at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
    at com.hdu.bigdata.spark.sql.Spark06_SparkSQL_Test$.main(Spark06_SparkSQL_Test.scala:41)
    at com.hdu.bigdata.spark.sql.Spark06_SparkSQL_Test.main(Spark06_SparkSQL_Test.scala)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/hdu.db/user_visit_action/user_visit_action.txt could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2205)
    at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2731)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:568)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:527)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1036)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1000)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:928)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2916)

    at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2966)
    at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:3297)
    at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:2022)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.sql.hive.client.Shim_v2_1.loadTable(HiveShim.scala:1213)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadTable$1(HiveClientImpl.scala:883)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
    at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
    at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
    at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
    at org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:878)
    at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadTable$1(HiveExternalCatalog.scala:880)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
    ... 24 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/hdu.db/user_visit_action/user_visit_action.txt could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2205)
    at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2731)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:568)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:527)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1036)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1000)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:928)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2916)

    at org.apache.hadoop.ipc.Client.call(Client.java:1476)
    at org.apache.hadoop.ipc.Client.call(Client.java:1413)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy29.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy30.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)

Process finished with exit code 1

Cause of the problem:

The namenode node stores the file directory, that is, the folder and file name. The namenode can be accessed locally through the public network, so the folder can be created. When the upload file needs to write data to the datanode, the namenode and the datanode communicate through the LAN, and the namenode returns the private IP address of the datanode, which cannot be accessed locally

Solution:

The returned IP address cannot return the public IP address, so you can set it to return the host name, and you can access the datanode node through the mapping between the host name and the public address. The problem will be solved
because the priority of code setting is the highest, you can set the code directly:

Add configuration information:

config("dfs.client.use.datanode.hostname", "true")
config("dfs.replication", "2")

Add as follows:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().enableHiveSupport().config(sparkConf)
  .config("dfs.client.use.datanode.hostname", "true")
  .config("dfs.replication", "2")
  .getOrCreate()

Error:scalac: Scala compiler JARs not found [How to Solve]

Error:scalac: Scala compiler JARs not found (module 'SparkSql'): C:\Users\***\.m2\repository\org\scala-lang\scala-compiler\2.11.8\scala-compiler-2.11.8.jar

The reason for the problem is that Scala libraries are not added to the project.

To add scala libraries Idea File -> Project Structrue -> Libraries can be added.

How to use C # to get image format without system. Drawing. Common

I wrote a blog to get the correct format of the picture. The code shown in the blog has been working well. Until today, when deploying the program to alicloud function computing, the following error occurred:

System.Drawing is not supported on this platform.

This shows that we can’t use GDI + related functions on the alicloud function computing server. Even so, we can still get the image format by reading the file header

   public static class ImageHelper
    {
        public enum ImageFormat
        {
            Bmp,
            Jpeg,
            Gif,
            Tiff,
            Png,
            Unknown
        }


        public static ImageFormat GetImageFormat(byte[] bytes)
        {
            var bmp = Encoding.ASCII.GetBytes("BM"); // BMP
            var gif = Encoding.ASCII.GetBytes("GIF"); // GIF
            var png = new byte[] {137, 80, 78, 71}; // PNG
            var tiff = new byte[] {73, 73, 42}; // TIFF
            var tiff2 = new byte[] {77, 77, 42}; // TIFF
            var jpeg = new byte[] {255, 216, 255, 224}; // jpeg
            var jpeg2 = new byte[] {255, 216, 255, 225}; // jpeg canon


            if (bmp.SequenceEqual(bytes.Take(bmp.Length)))
            {
                return ImageFormat.Bmp;
            }


            if (gif.SequenceEqual(bytes.Take(gif.Length)))
            {
                return ImageFormat.Gif;
            }


            if (png.SequenceEqual(bytes.Take(png.Length)))
            {
                return ImageFormat.Png;
            }


            if (tiff.SequenceEqual(bytes.Take(tiff.Length)))
            {
                return ImageFormat.Tiff;
            }


            if (tiff2.SequenceEqual(bytes.Take(tiff2.Length)))
            {
                return ImageFormat.Tiff;
            }


            if (jpeg.SequenceEqual(bytes.Take(jpeg.Length)))
            {
                return ImageFormat.Jpeg;
            }


            if (jpeg2.SequenceEqual(bytes.Take(jpeg2.Length)))
            {
                return ImageFormat.Jpeg;
            }


            return ImageFormat.Unknown;
        }
    }

new  ImageHelper   You need a binary array as a parameter, but that doesn’t mean you need to read all the contents of the file into memory. Using the following code can get better running effect:

    var fn = @"D:\1.jpg";
    using (var fs = File.OpenRead(fn))
    {
        var header = new byte[10];
        await fs.ReadAsync(header, 0, 10);
        var ext = ImageHelper.GetImageFormat(header);
        ext.Dump();
    }

Spark shell startup error, error: not found: value spark (low level solved)

Check the reason for the error: java.net.BindException : Cannot assign requested address: Service ‘sparkDriver’ failed after 16 retries (starting from 0)! Consider explicitly setting the appropriate port for the service ‘sparkDriver’ (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries .

When I wipe it, I can’t find the corresponding host. Then I go to see if there is something wrong with the mapping of the hosts file,
sure enough.
Previously, in order to access the Internet, IP was changed to DHCP (dynamic), so the current IP and hosts files are inconsistent.

After changing the hosts file. 666, you can go in again.

SBT command package error solution

Package

Clear

Stack overflow
error occurred: java.lang.StackOverflowError

For this overflow, you need to change the size of the stack. Find the following in SBT’s configuration file conf: sbtconfig.txt , add content:

-Xss2m

Memory overflow
error occurred: java.lang.OutOfMemoryError

Common memory overflow phenomenon, add configuration information:

-The size of xms64m
– xmx512m
can be changed by yourself.

 

 

 

Several ways to view spark task log

The tasks run by spark are often viewed through the web. However, when the spark streaming task is running, the log is often very large, which makes it inconvenient to view the web. Therefore, it is necessary to locate it on the server. Here are two ways to view the driver side and the executor side logs.

1、 View the web log:

The following is the general web interface of four yarn scheduling spark tasks:

Click the first task: application_ 1509845442132_ 3866 enter the interface below. The log recorded in the lower right corner is actually the log of the driver side. The driver side is on the mosaic node.

In addition, we can view the log on the executor node. As shown in the figure above, open the applicationmaster and jump to the general task scheduling interface of spark

After clicking on the executor, you can see four executors and a driver. See the log on the right. Stdout is the output log of println, and stderr is the standard log of spark output.

2、 Server side log view

The task log of sparkstreaming is often very large, so it is not convenient to view it on the web. So we need to go to the server to view it. As you can see from the web, that node is the driver. The driver side log is usually in the following directory: Horn/container logs/

In case you don’t know which directory it is, you can find it directly: find/- name “application_ 1509845442132_ 3866”

The corresponding executor log is also found on the server in this way.

It’s time to upgrade your parquet: IOException: totalvaluecount = = 0

This article is from Huawei cloud community “your parquet should be upgraded: IOException: totalvaluecount = = 0 problem positioning Tour”, original author: wzhfy.

1. Problem description

When using spark SQL to perform ETL task, an error is reported when reading a table: “IOException: totalvaluecount = = 0”, but there is no exception when writing the table.

2. Preliminary analysis

The result of this table is generated after two tables join. After analysis, the result of join produces data skew, and the skew key is null. After join, each task writes a file, so the task whose partition key is null writes a large number of null values to a file, and the number of null values reaches 2.2 billion.

The figure of 2.2 billion is sensitive, just exceeding the maximum value of int 2147483647 (more than 2.1 billion). Therefore, it is suspected that parquet is writing more than int.max There’s a problem with a value.

[note] this paper only focuses on the problem that a large number of null values are written to the same file, resulting in an error when reading. As for whether it is reasonable to generate such a large number of nulls in this column, it is beyond the scope of this paper.

3. Deep dive into parquet (version 1.8.3, some contents may need to be understood in combination with parquet source code)

Entry: Spark (spark 2.3) – & gt; parquet

The parquet call entry is in spark, so the call stack is mined from spark.

InsertIntoHad oopFsRelationCommand.run ()/ SaveAsHiveFile.saveAsHiveFile () -> FileFormatWriter.write ()

There are several steps

    before starting a job, create an outputwriterfactory: ParquetFileFormat.prepareWrite ()。 A series of configuration information related to parquet writing files will be set here. The main one is to set the writesupport class ParquetOutputFormat.setWriteSupportClass (job, classof [parquetwritesupport]), parquetwritesupport is a class defined by spark itself. In executetask () – & gt; writeTask.execute In (), first create the outputwriter (parquetoutputwriter) through the outputwriterfactory: outputWriterFactory.newInstance ()。 For each row of records, use ParquetOutputWriter.write The (internalrow) method writes the parquet file in turn. Before the task ends, call ParquetOutputWriter.close () shut down resources.

3.1 write process

In parquetoutputwriter, through the ParquetOutputFormat.getRecordWriter Construct a recordwriter (parquet recordwriter), which includes:

Writesupport set when

    preparewrite(): responsible for converting spark record and writing to parquet structure parquetfilewriter: responsible for writing to file

    In parquetrecordwriter, the write operation is actually delegated to an internalwriter (internal parquetrecordwriter, constructed with writesupport and parquetfilewriter).

    Now let’s sort out the general process so far:

    single directory writetask/dynam icPartitionWriteTask.execute
    -> ParquetOutputWriter.write -> ParquetRecordWriter.write -> Interna lParquetRecordWriter.write

    Next, interna lParquetRecordWriter.write There are three things in it

    (1) writeSupport.write , i.e ParquetWriteSupport.write There are three steps

        MessageColumnIO.MessageColumnIORecordConsumer .startMessage; ParquetWriteSupport.writeFields : write the value of each column in a row, except null value; MessageColumnIO.MessageColumnIORecordConsumer . endmessage: write null value for missing fields in the second step.
        Columnwriterv1. Writenull – & gt; accountforvaluewritten:
        1) increase the counter valuecount (int type)
        2) to check whether the space is full, writepage – checkpoint 1 is required

        (2) Increase counter RecordCount (long type)

        (3) Check the block size to see if flushrowgrouptostore – checkpoint 2 is required

        Since all the written values are null and the memsize of 1 and 2 checkpoints is 0, page and row group will not be refreshed. As a result, null values are always added to the same page. The counter valuecount of columnwriterv1 is of type int, when it exceeds int.max The overflow becomes a negative number.

        Therefore, flushrowgrouptostore is executed only when the close() method is called (at the end of the task):
        the ParquetOutputWriter.close -> ParquetRecordWriter.close
        -> Interna lParquetRecordWriter.close -> flushRowGroupToStore
        -> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush

        Page will not be written here because valuecount overflow is negative.

        Because writepage has not been called, the totalvaluecount here is always 0.
        ColumnWriterV1.writePage -> C olumnChunkPageWriter.writePage -&Value total

        At the end of the write, interna lParquetRecordWriter.close -> flushRowGroupToStore -> Colum nChunkPageWriteStore.flushToFileWriter -> for each column C olumnChunkPageWriter.writeToFileWriter :

          ParquetFileWriter.startColumn : totalvaluecount is assigned to currentchunkvalueco untParquetFileWriter.writeDataPagesParquetFileWriter . endcolumn: currentchunk valuecount (0) and other metadata information construct a columnchunk metadata, and the relevant information will be written to the file eventually.

        3.2 read process

        Also, take spark as the entry to view.
        Initialization phase: ParquetFileFormat.BuildReaderWithPartitionValues -> Vectorize dParquetRecordReader.initialize -> ParquetFileReader.readFooter -> Parq uetMetadataConverter.readParquetMetadata -> fromParquetMetadata -> ColumnChunkMetaData.get , which contains valuecount (0).

        When reading: vectorize dParquetRecordReader.nextBatch -> checkEndOfRowGroup:
        1) ParquetFileReader.readNextRowGroup -> for each chunk, currentRowGroup.addColumn ( chunk.descriptor.col , chunk.readAllPages ())

        Since getvaluecount is 0, pagesinchunk is empty.

        2) Construct columnchunkpagereader:

        Because the page list is empty, the totalvaluecount is 0, resulting in an error in the construction of vectorizedcolumnreader.

        4. Solution: parquet upgrade (version 1.11.1)

        In the new version, ParquetWriteSupport.write ->
        MessageColumnIO.MessageColumnIORecordConsumer .endMessage ->
        ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:

        In endrecord, the attribute of maximum number of records per page (2W records by default) and the check logic are added. When the limit is exceeded, writepage will be generated, so that the valuecount of columnwriterv1 will not overflow (it will be cleared after each writepage).

        Compared with the old version 1.8.3, columnwritestorev1.endrecord is empty.

        Attachment: a small trick in parquet

        In parquet, in order to save space, when a long type value is within a certain range, int will be used to store it. The method is as follows:

        Determine whether it can be stored with int:

        If you can, use intcolumnchunkmetadata instead of longcolumnchunkmetadata to convert on construction time:

        When you use it, turn it back, in tColumnChunkMetaData.getValueCount -> intToPositiveLong():

        The common int range is – 2 ^ 31 ~ (2 ^ 31 – 1). Because metadata information (such as valuecount) is a non negative integer, it can only store numbers in the 0 ~ (2 ^ 31 – 1) range. In this way, the number in the range of 0 ~ (2 ^ 32 – 1) can be expressed, and the expression range is doubled.

        Attachment: test case code that can be used to reproduce (depending on some spark classes, it can be run in spark project)

        Test case code.txt 1.88kb

         

        Click follow to learn about Huawei’s new cloud technology for the first time~

Yarn: runtime.ContainerExecutionException : launch container failed

introduction:

After the spark submit submits the task, the code of the dirver side is executed normally, but the program gets stuck in the exciter stage and frequently reports errors until the task fails

 

location:

The log failed location prints a lot of warning:

The initial job did not accept any resources. Please check the cluster UI to make sure that the worker process is registered and has enough resources. The initial analysis is about resources. Then yarn logs pull down the logs to see:

The initial heap size of the JVM exceeds the maximum heap size. Check the task environment to find out the truth

 

solve:

The initial memory of the JVM – XMS (the minimum heap value of heap memory) requires 13g, but Excutor.memory Only 12g is given, so the above problem appears. Modify the script to keep it stable excutor.mermory =The size of – XMS is OK, the problem is solved~

Tips: generally – XMS – Xmx (the maximum heap value of heap memory) can be set the same.

Oracle recommends setting the minimum heap size (-Xms)equal to the maximum heap size (-Xmx) to minimize garbage collections.

 

ImportError: cannot import name ‘SparkSession‘

Importerror: cannot import name ‘sparksession’
cannot find sparksession
reference https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

Sparksession is only available after spark2.0
so we need to change the spark version

wget
https://archive.apache.org/dist/spark/spark-2.3.0/
Choose your own version

I cried and went to install it again