CGI implement custom response format
This commit is contained in:
		
							parent
							
								
									4157a7b167
								
							
						
					
					
						commit
						24907f821b
					
				| 
						 | 
					@ -6,4 +6,5 @@ server.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import cgi
 | 
					import cgi
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					print("200 text/html")
 | 
				
			||||||
cgi.test()
 | 
					cgi.test()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										46
									
								
								jetforce.py
								
								
								
								
							
							
						
						
									
										46
									
								
								jetforce.py
								
								
								
								
							| 
						 | 
					@ -204,7 +204,7 @@ class StaticDirectoryApplication(JetforceApplication):
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        root_directory: str = "/var/gemini",
 | 
					        root_directory: str = "/var/gemini",
 | 
				
			||||||
        index_file: str = "index.gmi",
 | 
					        index_file: str = "index.gmi",
 | 
				
			||||||
        cgi_directory: str = "cgi-bin/",
 | 
					        cgi_directory: str = "/cgi-bin",
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        super().__init__()
 | 
					        super().__init__()
 | 
				
			||||||
        self.routes.append((RoutePattern(), self.serve_static_file))
 | 
					        self.routes.append((RoutePattern(), self.serve_static_file))
 | 
				
			||||||
| 
						 | 
					@ -218,7 +218,10 @@ class StaticDirectoryApplication(JetforceApplication):
 | 
				
			||||||
        self.mimetypes.add_type("text/gemini", ".gemini")
 | 
					        self.mimetypes.add_type("text/gemini", ".gemini")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def serve_static_file(self, request: Request):
 | 
					    def serve_static_file(self, request: Request):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Convert a URL into a filesystem path, and attempt to serve the file
 | 
				
			||||||
 | 
					        or directory that is represented at that path.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
        url_path = pathlib.Path(request.path.strip("/"))
 | 
					        url_path = pathlib.Path(request.path.strip("/"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        filename = pathlib.Path(os.path.normpath(str(url_path)))
 | 
					        filename = pathlib.Path(os.path.normpath(str(url_path)))
 | 
				
			||||||
| 
						 | 
					@ -229,19 +232,26 @@ class StaticDirectoryApplication(JetforceApplication):
 | 
				
			||||||
        filesystem_path = self.root / filename
 | 
					        filesystem_path = self.root / filename
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if filesystem_path.is_file():
 | 
					        if filesystem_path.is_file():
 | 
				
			||||||
            if str(filename).startswith(self.cgi_directory):
 | 
					            is_cgi = str(filename).startswith(self.cgi_directory)
 | 
				
			||||||
                if os.access(filesystem_path, os.X_OK):
 | 
					            is_exe = os.access(filesystem_path, os.X_OK)
 | 
				
			||||||
 | 
					            if is_cgi and is_exe:
 | 
				
			||||||
                return self.run_cgi_script(filesystem_path, request.environ)
 | 
					                return self.run_cgi_script(filesystem_path, request.environ)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
                mimetype = self.guess_mimetype(filesystem_path.name)
 | 
					                mimetype = self.guess_mimetype(filesystem_path.name)
 | 
				
			||||||
                generator = self.load_file(filesystem_path)
 | 
					                generator = self.load_file(filesystem_path)
 | 
				
			||||||
                return Response(Status.SUCCESS, mimetype, generator)
 | 
					                return Response(Status.SUCCESS, mimetype, generator)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        elif filesystem_path.is_dir():
 | 
					        elif filesystem_path.is_dir():
 | 
				
			||||||
            index_file = filesystem_path / self.index_file
 | 
					            index_file = filesystem_path / self.index_file
 | 
				
			||||||
            if index_file.exists():
 | 
					            if index_file.exists():
 | 
				
			||||||
                generator = self.load_file(index_file)
 | 
					                generator = self.load_file(index_file)
 | 
				
			||||||
 | 
					                return Response(Status.SUCCESS, "text/gemini", generator)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                generator = self.list_directory(url_path, filesystem_path)
 | 
					                generator = self.list_directory(url_path, filesystem_path)
 | 
				
			||||||
                return Response(Status.SUCCESS, "text/gemini", generator)
 | 
					                return Response(Status.SUCCESS, "text/gemini", generator)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            return Response(Status.NOT_FOUND, "Not Found")
 | 
					            return Response(Status.NOT_FOUND, "Not Found")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -260,22 +270,21 @@ class StaticDirectoryApplication(JetforceApplication):
 | 
				
			||||||
            errors="surrogateescape",
 | 
					            errors="surrogateescape",
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Try to get the mimetype from the CGI response's content-type.
 | 
					        status_line = out.stdout.readline().strip()
 | 
				
			||||||
        # Discard all other response headers for now.
 | 
					        status_parts = status_line.split(maxsplit=1)
 | 
				
			||||||
        mimetype = "text/plain"
 | 
					        if len(status_parts) != 2 or not status_parts[0].isdecimal():
 | 
				
			||||||
        for line in out.stdout:
 | 
					            return Response(Status.CGI_ERROR, "Script generated invalid response")
 | 
				
			||||||
            header = line.lower().strip()
 | 
					
 | 
				
			||||||
            if header.startswith("content-type:"):
 | 
					        status, meta = status_parts
 | 
				
			||||||
                mimetype = header.split(":", maxsplit=1)[1]
 | 
					 | 
				
			||||||
            elif not header:
 | 
					 | 
				
			||||||
                # A empty line signals the end of the headers
 | 
					 | 
				
			||||||
                break
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Re-encode the rest of the body as bytes
 | 
					        # Re-encode the rest of the body as bytes
 | 
				
			||||||
        body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
 | 
					        body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
 | 
				
			||||||
        return Response(Status.SUCCESS, mimetype, body)
 | 
					        return Response(int(status), meta, body)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def load_file(self, filesystem_path: pathlib.Path):
 | 
					    def load_file(self, filesystem_path: pathlib.Path):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Load a file using a generator to allow streaming data to the TCP socket.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
        with filesystem_path.open("rb") as fp:
 | 
					        with filesystem_path.open("rb") as fp:
 | 
				
			||||||
            data = fp.read(1024)
 | 
					            data = fp.read(1024)
 | 
				
			||||||
            while data:
 | 
					            while data:
 | 
				
			||||||
| 
						 | 
					@ -300,6 +309,9 @@ class StaticDirectoryApplication(JetforceApplication):
 | 
				
			||||||
                yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode()
 | 
					                yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def guess_mimetype(self, filename: str):
 | 
					    def guess_mimetype(self, filename: str):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Guess the mimetype of a file based on the file extension.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
        mime, encoding = self.mimetypes.guess_type(filename)
 | 
					        mime, encoding = self.mimetypes.guess_type(filename)
 | 
				
			||||||
        if encoding:
 | 
					        if encoding:
 | 
				
			||||||
            return f"{mime}; charset={encoding}"
 | 
					            return f"{mime}; charset={encoding}"
 | 
				
			||||||
| 
						 | 
					@ -578,8 +590,8 @@ def run_server() -> None:
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    parser.add_argument(
 | 
					    parser.add_argument(
 | 
				
			||||||
        "--cgi-dir",
 | 
					        "--cgi-dir",
 | 
				
			||||||
        help="CGI script folder, relative to the root path",
 | 
					        help="CGI script folder, relative to the server's root directory",
 | 
				
			||||||
        default="cgi-bin/",
 | 
					        default="/cgi-bin",
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    parser.add_argument(
 | 
					    parser.add_argument(
 | 
				
			||||||
        "--index-file", help="The gemini directory index file", default="index.gmi"
 | 
					        "--index-file", help="The gemini directory index file", default="index.gmi"
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue